Compare commits

...

3 Commits

Author SHA1 Message Date
Danny Kopping 38bb5f3fab feat: refactor chatd to use objstore
Signed-off-by: Danny Kopping <danny@coder.com>
2026-04-10 14:56:56 +00:00
Danny Kopping 129a1d1e98 chore: wire into server
Signed-off-by: Danny Kopping <danny@coder.com>
2026-04-10 14:10:05 +00:00
Danny Kopping 0f6dbfdc44 feat: objstore pkg
Signed-off-by: Danny Kopping <danny@coder.com>
2026-04-10 13:24:52 +00:00
35 changed files with 1510 additions and 74 deletions
+9 -1
View File
@@ -79,6 +79,7 @@ import (
"github.com/coder/coder/v2/coderd/notifications"
"github.com/coder/coder/v2/coderd/notifications/reports"
"github.com/coder/coder/v2/coderd/oauthpki"
"github.com/coder/coder/v2/coderd/objstore"
"github.com/coder/coder/v2/coderd/pproflabel"
"github.com/coder/coder/v2/coderd/prometheusmetrics"
"github.com/coder/coder/v2/coderd/prometheusmetrics/insights"
@@ -638,12 +639,19 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
vals.WorkspaceHostnameSuffix.String())
}
objStore, err := objstore.FromConfig(ctx, vals.ObjectStore, r.globalConfig)
if err != nil {
return xerrors.Errorf("initialize object store: %w", err)
}
defer objStore.Close()
options := &coderd.Options{
AccessURL: vals.AccessURL.Value(),
AppHostname: appHostname,
AppHostnameRegex: appHostnameRegex,
Logger: logger.Named("coderd"),
Database: nil,
ObjectStore: objStore,
BaseDERPMap: derpMap,
Pubsub: nil,
CacheDir: cacheDir,
@@ -1075,7 +1083,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
defer shutdownConns()
// Ensures that old database entries are cleaned up over time!
purger := dbpurge.New(ctx, logger.Named("dbpurge"), options.Database, options.DeploymentValues, quartz.NewReal(), options.PrometheusRegistry)
purger := dbpurge.New(ctx, logger.Named("dbpurge"), options.Database, options.DeploymentValues, quartz.NewReal(), options.PrometheusRegistry, objStore)
defer purger.Close()
// Updates workspace usage
+35
View File
@@ -773,6 +773,41 @@ OIDC OPTIONS:
requirement, and can lead to an insecure OIDC configuration. It is not
recommended to use this flag.
OBJECT STORE OPTIONS:
Configure the object storage backend for binary data (chat files, transcripts,
etc.). Defaults to local filesystem storage.
--objectstore-backend string, $CODER_OBJECTSTORE_BACKEND (default: local)
The storage backend for binary data such as chat files. Valid values:
local, s3, gcs.
--objectstore-gcs-bucket string, $CODER_OBJECTSTORE_GCS_BUCKET
GCS bucket name. Required when the backend is "gcs".
--objectstore-gcs-credentials-file string, $CODER_OBJECTSTORE_GCS_CREDENTIALS_FILE
Path to a GCS service account key file. If empty, Application Default
Credentials are used.
--objectstore-gcs-prefix string, $CODER_OBJECTSTORE_GCS_PREFIX
Optional key prefix within the GCS bucket.
--objectstore-local-dir string, $CODER_OBJECTSTORE_LOCAL_DIR
Root directory for the local filesystem object store backend. Only
used when the backend is "local".
--objectstore-s3-bucket string, $CODER_OBJECTSTORE_S3_BUCKET
S3 bucket name. Required when the backend is "s3".
--objectstore-s3-endpoint string, $CODER_OBJECTSTORE_S3_ENDPOINT
Custom S3-compatible endpoint URL (e.g. for MinIO, R2, Cloudflare).
Leave empty for standard AWS S3.
--objectstore-s3-prefix string, $CODER_OBJECTSTORE_S3_PREFIX
Optional key prefix within the S3 bucket.
--objectstore-s3-region string, $CODER_OBJECTSTORE_S3_REGION
AWS region for the S3 bucket.
PROVISIONING OPTIONS:
Tune the behavior of the provisioner, which is responsible for creating,
updating, and deleting workspace resources.
+34
View File
@@ -908,3 +908,37 @@ retention:
# build are always retained. Set to 0 to disable automatic deletion.
# (default: 7d, type: duration)
workspace_agent_logs: 168h0m0s
# Configure the object storage backend for binary data (chat files, transcripts,
# etc.). Defaults to local filesystem storage.
objectStore:
# The storage backend for binary data such as chat files. Valid values: local, s3,
# gcs.
# (default: local, type: string)
backend: local
# Root directory for the local filesystem object store backend. Only used when the
# backend is "local".
# (default: <unset>, type: string)
local_dir: ""
# S3 bucket name. Required when the backend is "s3".
# (default: <unset>, type: string)
s3_bucket: ""
# AWS region for the S3 bucket.
# (default: <unset>, type: string)
s3_region: ""
# Optional key prefix within the S3 bucket.
# (default: <unset>, type: string)
s3_prefix: ""
# Custom S3-compatible endpoint URL (e.g. for MinIO, R2, Cloudflare). Leave empty
# for standard AWS S3.
# (default: <unset>, type: string)
s3_endpoint: ""
# GCS bucket name. Required when the backend is "gcs".
# (default: <unset>, type: string)
gcs_bucket: ""
# Optional key prefix within the GCS bucket.
# (default: <unset>, type: string)
gcs_prefix: ""
# Path to a GCS service account key file. If empty, Application Default
# Credentials are used.
# (default: <unset>, type: string)
gcs_credentials_file: ""
+44
View File
@@ -15925,6 +15925,9 @@ const docTemplate = `{
"oauth2": {
"$ref": "#/definitions/codersdk.OAuth2Config"
},
"object_store": {
"$ref": "#/definitions/codersdk.ObjectStoreConfig"
},
"oidc": {
"$ref": "#/definitions/codersdk.OIDCConfig"
},
@@ -17941,6 +17944,47 @@ const docTemplate = `{
}
}
},
"codersdk.ObjectStoreConfig": {
"type": "object",
"properties": {
"backend": {
"description": "Backend selects the storage backend: \"local\" (default), \"s3\", or \"gcs\".",
"type": "string"
},
"gcs_bucket": {
"description": "GCSBucket is the GCS bucket name. Required when Backend is \"gcs\".",
"type": "string"
},
"gcs_credentials_file": {
"description": "GCSCredentialsFile is an optional path to a GCS service account\nkey file. If empty, Application Default Credentials are used.",
"type": "string"
},
"gcs_prefix": {
"description": "GCSPrefix is an optional key prefix within the GCS bucket.",
"type": "string"
},
"local_dir": {
"description": "LocalDir is the root directory for the local filesystem backend.\nOnly used when Backend is \"local\". Defaults to \u003cconfig-dir\u003e/objectstore/.",
"type": "string"
},
"s3_bucket": {
"description": "S3Bucket is the S3 bucket name. Required when Backend is \"s3\".",
"type": "string"
},
"s3_endpoint": {
"description": "S3Endpoint is a custom S3-compatible endpoint URL (for MinIO, R2, etc.).",
"type": "string"
},
"s3_prefix": {
"description": "S3Prefix is an optional key prefix within the S3 bucket.",
"type": "string"
},
"s3_region": {
"description": "S3Region is the AWS region for the S3 bucket.",
"type": "string"
}
}
},
"codersdk.OptionType": {
"type": "string",
"enum": [
+44
View File
@@ -14392,6 +14392,9 @@
"oauth2": {
"$ref": "#/definitions/codersdk.OAuth2Config"
},
"object_store": {
"$ref": "#/definitions/codersdk.ObjectStoreConfig"
},
"oidc": {
"$ref": "#/definitions/codersdk.OIDCConfig"
},
@@ -16338,6 +16341,47 @@
}
}
},
"codersdk.ObjectStoreConfig": {
"type": "object",
"properties": {
"backend": {
"description": "Backend selects the storage backend: \"local\" (default), \"s3\", or \"gcs\".",
"type": "string"
},
"gcs_bucket": {
"description": "GCSBucket is the GCS bucket name. Required when Backend is \"gcs\".",
"type": "string"
},
"gcs_credentials_file": {
"description": "GCSCredentialsFile is an optional path to a GCS service account\nkey file. If empty, Application Default Credentials are used.",
"type": "string"
},
"gcs_prefix": {
"description": "GCSPrefix is an optional key prefix within the GCS bucket.",
"type": "string"
},
"local_dir": {
"description": "LocalDir is the root directory for the local filesystem backend.\nOnly used when Backend is \"local\". Defaults to \u003cconfig-dir\u003e/objectstore/.",
"type": "string"
},
"s3_bucket": {
"description": "S3Bucket is the S3 bucket name. Required when Backend is \"s3\".",
"type": "string"
},
"s3_endpoint": {
"description": "S3Endpoint is a custom S3-compatible endpoint URL (for MinIO, R2, etc.).",
"type": "string"
},
"s3_prefix": {
"description": "S3Prefix is an optional key prefix within the S3 bucket.",
"type": "string"
},
"s3_region": {
"description": "S3Region is the AWS region for the S3 bucket.",
"type": "string"
}
}
},
"codersdk.OptionType": {
"type": "string",
"enum": ["string", "number", "bool", "list(string)"],
+3
View File
@@ -71,6 +71,7 @@ import (
"github.com/coder/coder/v2/coderd/metricscache"
"github.com/coder/coder/v2/coderd/notifications"
"github.com/coder/coder/v2/coderd/oauth2provider"
"github.com/coder/coder/v2/coderd/objstore"
"github.com/coder/coder/v2/coderd/portsharing"
"github.com/coder/coder/v2/coderd/pproflabel"
"github.com/coder/coder/v2/coderd/prebuilds"
@@ -158,6 +159,7 @@ type Options struct {
AppHostnameRegex *regexp.Regexp
Logger slog.Logger
Database database.Store
ObjectStore objstore.Store
Pubsub pubsub.Pubsub
RuntimeConfig *runtimeconfig.Manager
@@ -792,6 +794,7 @@ func New(options *Options) *API {
Pubsub: options.Pubsub,
WebpushDispatcher: options.WebPushDispatcher,
UsageTracker: options.WorkspaceUsageTracker,
ObjectStore: options.ObjectStore,
})
gitSyncLogger := options.Logger.Named("gitsync")
refresher := gitsync.NewRefresher(
+2 -2
View File
@@ -2042,9 +2042,9 @@ func (q *querier) DeleteOldAuditLogs(ctx context.Context, arg database.DeleteOld
return q.db.DeleteOldAuditLogs(ctx, arg)
}
func (q *querier) DeleteOldChatFiles(ctx context.Context, arg database.DeleteOldChatFilesParams) (int64, error) {
func (q *querier) DeleteOldChatFiles(ctx context.Context, arg database.DeleteOldChatFilesParams) ([]database.DeleteOldChatFilesRow, error) {
if err := q.authorizeContext(ctx, policy.ActionDelete, rbac.ResourceSystem); err != nil {
return 0, err
return nil, err
}
return q.db.DeleteOldChatFiles(ctx, arg)
}
+1 -1
View File
@@ -600,7 +600,7 @@ func (m queryMetricsStore) DeleteOldAuditLogs(ctx context.Context, arg database.
return r0, r1
}
func (m queryMetricsStore) DeleteOldChatFiles(ctx context.Context, arg database.DeleteOldChatFilesParams) (int64, error) {
func (m queryMetricsStore) DeleteOldChatFiles(ctx context.Context, arg database.DeleteOldChatFilesParams) ([]database.DeleteOldChatFilesRow, error) {
start := time.Now()
r0, r1 := m.s.DeleteOldChatFiles(ctx, arg)
m.queryLatencies.WithLabelValues("DeleteOldChatFiles").Observe(time.Since(start).Seconds())
+2 -2
View File
@@ -999,10 +999,10 @@ func (mr *MockStoreMockRecorder) DeleteOldAuditLogs(ctx, arg any) *gomock.Call {
}
// DeleteOldChatFiles mocks base method.
func (m *MockStore) DeleteOldChatFiles(ctx context.Context, arg database.DeleteOldChatFilesParams) (int64, error) {
func (m *MockStore) DeleteOldChatFiles(ctx context.Context, arg database.DeleteOldChatFilesParams) ([]database.DeleteOldChatFilesRow, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "DeleteOldChatFiles", ctx, arg)
ret0, _ := ret[0].(int64)
ret0, _ := ret[0].([]database.DeleteOldChatFilesRow)
ret1, _ := ret[1].(error)
return ret0, ret1
}
+100 -2
View File
@@ -3,6 +3,7 @@ package dbpurge
import (
"context"
"io"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
@@ -12,6 +13,7 @@ import (
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/objstore"
"github.com/coder/coder/v2/coderd/pproflabel"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/quartz"
@@ -41,11 +43,15 @@ const (
chatFilesBatchSize = 1000
)
// chatFilesNamespace is the object store namespace under which chat
// files are stored.
const chatFilesNamespace = "chatfiles"
// New creates a new periodically purging database instance.
// It is the caller's responsibility to call Close on the returned instance.
//
// This is for cleaning up old, unused resources from the database that take up space.
func New(ctx context.Context, logger slog.Logger, db database.Store, vals *codersdk.DeploymentValues, clk quartz.Clock, reg prometheus.Registerer) io.Closer {
func New(ctx context.Context, logger slog.Logger, db database.Store, vals *codersdk.DeploymentValues, clk quartz.Clock, reg prometheus.Registerer, objStore objstore.Store) io.Closer {
closed := make(chan struct{})
ctx, cancelFunc := context.WithCancel(ctx)
@@ -69,6 +75,22 @@ func New(ctx context.Context, logger slog.Logger, db database.Store, vals *coder
}, []string{"record_type"})
reg.MustRegister(recordsPurged)
objStoreInflight := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "coderd",
Subsystem: "dbpurge",
Name: "objstore_delete_inflight",
Help: "Number of object store files currently enqueued for deletion.",
})
reg.MustRegister(objStoreInflight)
objStoreDeleted := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "coderd",
Subsystem: "dbpurge",
Name: "objstore_files_deleted_total",
Help: "Total number of object store files successfully deleted.",
})
reg.MustRegister(objStoreDeleted)
inst := &instance{
cancel: cancelFunc,
closed: closed,
@@ -77,6 +99,9 @@ func New(ctx context.Context, logger slog.Logger, db database.Store, vals *coder
clk: clk,
iterationDuration: iterationDuration,
recordsPurged: recordsPurged,
objStore: objStore,
objStoreInflight: objStoreInflight,
objStoreDeleted: objStoreDeleted,
}
// Start the ticker with the initial delay.
@@ -250,13 +275,20 @@ func (i *instance) purgeTick(ctx context.Context, db database.Store, start time.
return xerrors.Errorf("failed to delete old chats: %w", err)
}
purgedChatFiles, err = tx.DeleteOldChatFiles(ctx, database.DeleteOldChatFilesParams{
deletedFiles, err := tx.DeleteOldChatFiles(ctx, database.DeleteOldChatFilesParams{
BeforeTime: deleteChatsBefore,
LimitCount: chatFilesBatchSize,
})
if err != nil {
return xerrors.Errorf("failed to delete old chat files: %w", err)
}
purgedChatFiles = int64(len(deletedFiles))
// Collect object store keys from the deleted rows
// and delete them in a background goroutine so
// slow object store I/O does not hold the
// advisory lock or block the next tick.
i.deleteObjStoreKeys(ctx, deletedFiles)
}
i.logger.Debug(ctx, "purged old database entries",
slog.F("workspace_agent_logs", purgedWorkspaceAgentLogs),
@@ -295,6 +327,13 @@ type instance struct {
clk quartz.Clock
iterationDuration *prometheus.HistogramVec
recordsPurged *prometheus.CounterVec
objStore objstore.Store
objStoreInflight prometheus.Gauge
objStoreDeleted prometheus.Counter
// objDeleteMu serializes background object store delete batches
// so at most one goroutine is deleting at a time.
objDeleteMu sync.Mutex
}
func (i *instance) Close() error {
@@ -302,3 +341,62 @@ func (i *instance) Close() error {
<-i.closed
return nil
}
// deleteObjStoreKeys removes object store entries for the given
// deleted chat file rows. The work runs in a background goroutine
// guarded by a mutex so that slow object store I/O never blocks
// the purge transaction or the next tick. At most one delete batch
// runs at a time; if a batch is already in flight the new keys are
// silently dropped (they will be orphan-collected on a future tick
// if needed).
func (i *instance) deleteObjStoreKeys(ctx context.Context, rows []database.DeleteOldChatFilesRow) {
// Collect non-empty object store keys.
var keys []string
for _, r := range rows {
if r.ObjectStoreKey.Valid && r.ObjectStoreKey.String != "" {
keys = append(keys, r.ObjectStoreKey.String)
}
}
if len(keys) == 0 {
return
}
// Try to acquire the mutex without blocking. If another
// delete batch is already running, skip this one.
if !i.objDeleteMu.TryLock() {
i.logger.Debug(ctx, "object store delete already in progress, skipping batch",
slog.F("skipped_keys", len(keys)))
return
}
i.objStoreInflight.Add(float64(len(keys)))
go func() {
defer i.objDeleteMu.Unlock()
var deleted int
for _, key := range keys {
if ctx.Err() != nil {
remaining := len(keys) - deleted
i.objStoreInflight.Sub(float64(remaining))
i.logger.Debug(ctx, "context canceled during object store cleanup",
slog.F("deleted", deleted),
slog.F("remaining", remaining))
return
}
if err := i.objStore.Delete(ctx, chatFilesNamespace, key); err != nil {
i.logger.Warn(ctx, "failed to delete chat file from object store",
slog.F("key", key),
slog.Error(err))
} else {
deleted++
}
i.objStoreInflight.Dec()
}
i.objStoreDeleted.Add(float64(deleted))
i.logger.Debug(ctx, "deleted chat files from object store",
slog.F("deleted", deleted),
slog.F("failed", len(keys)-deleted))
}()
}
+19 -19
View File
@@ -56,7 +56,7 @@ func TestPurge(t *testing.T) {
mDB := dbmock.NewMockStore(gomock.NewController(t))
mDB.EXPECT().GetChatRetentionDays(gomock.Any()).Return(int32(0), nil).AnyTimes()
mDB.EXPECT().InTx(gomock.Any(), database.DefaultTXOptions().WithID("db_purge")).Return(nil).Times(2)
purger := dbpurge.New(context.Background(), testutil.Logger(t), mDB, &codersdk.DeploymentValues{}, clk, prometheus.NewRegistry())
purger := dbpurge.New(context.Background(), testutil.Logger(t), mDB, &codersdk.DeploymentValues{}, clk, prometheus.NewRegistry(), nil)
<-done // wait for doTick() to run.
require.NoError(t, purger.Close())
}
@@ -90,7 +90,7 @@ func TestMetrics(t *testing.T) {
Retention: codersdk.RetentionConfig{
APIKeys: serpent.Duration(7 * 24 * time.Hour), // 7 days retention
},
}, clk, reg)
}, clk, reg, nil)
defer closer.Close()
testutil.TryReceive(ctx, t, done)
@@ -158,7 +158,7 @@ func TestMetrics(t *testing.T) {
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true})
done := awaitDoTick(ctx, t, clk)
closer := dbpurge.New(ctx, logger, mDB, &codersdk.DeploymentValues{}, clk, reg)
closer := dbpurge.New(ctx, logger, mDB, &codersdk.DeploymentValues{}, clk, reg, nil)
defer closer.Close()
testutil.TryReceive(ctx, t, done)
@@ -248,7 +248,7 @@ func TestDeleteOldWorkspaceAgentStats(t *testing.T) {
})
// when
closer := dbpurge.New(ctx, logger, db, &codersdk.DeploymentValues{}, clk, prometheus.NewRegistry())
closer := dbpurge.New(ctx, logger, db, &codersdk.DeploymentValues{}, clk, prometheus.NewRegistry(), nil)
defer closer.Close()
// then
@@ -273,7 +273,7 @@ func TestDeleteOldWorkspaceAgentStats(t *testing.T) {
// Start a new purger to immediately trigger delete after rollup.
_ = closer.Close()
closer = dbpurge.New(ctx, logger, db, &codersdk.DeploymentValues{}, clk, prometheus.NewRegistry())
closer = dbpurge.New(ctx, logger, db, &codersdk.DeploymentValues{}, clk, prometheus.NewRegistry(), nil)
defer closer.Close()
// then
@@ -368,7 +368,7 @@ func TestDeleteOldWorkspaceAgentLogs(t *testing.T) {
Retention: codersdk.RetentionConfig{
WorkspaceAgentLogs: serpent.Duration(7 * 24 * time.Hour),
},
}, clk, prometheus.NewRegistry())
}, clk, prometheus.NewRegistry(), nil)
defer closer.Close()
<-done // doTick() has now run.
@@ -583,7 +583,7 @@ func TestDeleteOldWorkspaceAgentLogsRetention(t *testing.T) {
done := awaitDoTick(ctx, t, clk)
closer := dbpurge.New(ctx, logger, db, &codersdk.DeploymentValues{
Retention: tc.retentionConfig,
}, clk, prometheus.NewRegistry())
}, clk, prometheus.NewRegistry(), nil)
defer closer.Close()
testutil.TryReceive(ctx, t, done)
@@ -674,7 +674,7 @@ func TestDeleteOldProvisionerDaemons(t *testing.T) {
require.NoError(t, err)
// when
closer := dbpurge.New(ctx, logger, db, &codersdk.DeploymentValues{}, clk, prometheus.NewRegistry())
closer := dbpurge.New(ctx, logger, db, &codersdk.DeploymentValues{}, clk, prometheus.NewRegistry(), nil)
defer closer.Close()
// then
@@ -778,7 +778,7 @@ func TestDeleteOldAuditLogConnectionEvents(t *testing.T) {
// Run the purge
done := awaitDoTick(ctx, t, clk)
closer := dbpurge.New(ctx, logger, db, &codersdk.DeploymentValues{}, clk, prometheus.NewRegistry())
closer := dbpurge.New(ctx, logger, db, &codersdk.DeploymentValues{}, clk, prometheus.NewRegistry(), nil)
defer closer.Close()
// Wait for tick
testutil.TryReceive(ctx, t, done)
@@ -941,7 +941,7 @@ func TestDeleteOldTelemetryHeartbeats(t *testing.T) {
require.NoError(t, err)
done := awaitDoTick(ctx, t, clk)
closer := dbpurge.New(ctx, logger, db, &codersdk.DeploymentValues{}, clk, prometheus.NewRegistry())
closer := dbpurge.New(ctx, logger, db, &codersdk.DeploymentValues{}, clk, prometheus.NewRegistry(), nil)
defer closer.Close()
<-done // doTick() has now run.
@@ -1060,7 +1060,7 @@ func TestDeleteOldConnectionLogs(t *testing.T) {
done := awaitDoTick(ctx, t, clk)
closer := dbpurge.New(ctx, logger, db, &codersdk.DeploymentValues{
Retention: tc.retentionConfig,
}, clk, prometheus.NewRegistry())
}, clk, prometheus.NewRegistry(), nil)
defer closer.Close()
testutil.TryReceive(ctx, t, done)
@@ -1316,7 +1316,7 @@ func TestDeleteOldAIBridgeRecords(t *testing.T) {
Retention: serpent.Duration(tc.retention),
},
},
}, clk, prometheus.NewRegistry())
}, clk, prometheus.NewRegistry(), nil)
defer closer.Close()
testutil.TryReceive(ctx, t, done)
@@ -1403,7 +1403,7 @@ func TestDeleteOldAuditLogs(t *testing.T) {
done := awaitDoTick(ctx, t, clk)
closer := dbpurge.New(ctx, logger, db, &codersdk.DeploymentValues{
Retention: tc.retentionConfig,
}, clk, prometheus.NewRegistry())
}, clk, prometheus.NewRegistry(), nil)
defer closer.Close()
testutil.TryReceive(ctx, t, done)
@@ -1493,7 +1493,7 @@ func TestDeleteOldAuditLogs(t *testing.T) {
Retention: codersdk.RetentionConfig{
AuditLogs: serpent.Duration(retentionPeriod),
},
}, clk, prometheus.NewRegistry())
}, clk, prometheus.NewRegistry(), nil)
defer closer.Close()
testutil.TryReceive(ctx, t, done)
@@ -1613,7 +1613,7 @@ func TestDeleteExpiredAPIKeys(t *testing.T) {
done := awaitDoTick(ctx, t, clk)
closer := dbpurge.New(ctx, logger, db, &codersdk.DeploymentValues{
Retention: tc.retentionConfig,
}, clk, prometheus.NewRegistry())
}, clk, prometheus.NewRegistry(), nil)
defer closer.Close()
testutil.TryReceive(ctx, t, done)
@@ -1740,7 +1740,7 @@ func TestDeleteOldChatFiles(t *testing.T) {
oldFileID := createChatFile(ctx, t, db, rawDB, deps.user.ID, deps.org.ID, now.Add(-31*24*time.Hour))
done := awaitDoTick(ctx, t, clk)
closer := dbpurge.New(ctx, logger, db, &codersdk.DeploymentValues{}, clk, prometheus.NewRegistry())
closer := dbpurge.New(ctx, logger, db, &codersdk.DeploymentValues{}, clk, prometheus.NewRegistry(), nil)
defer closer.Close()
testutil.TryReceive(ctx, t, done)
@@ -1797,7 +1797,7 @@ func TestDeleteOldChatFiles(t *testing.T) {
activeChat := createChat(ctx, t, db, rawDB, deps.user.ID, deps.modelConfig.ID, false, now)
done := awaitDoTick(ctx, t, clk)
closer := dbpurge.New(ctx, logger, db, &codersdk.DeploymentValues{}, clk, prometheus.NewRegistry())
closer := dbpurge.New(ctx, logger, db, &codersdk.DeploymentValues{}, clk, prometheus.NewRegistry(), nil)
defer closer.Close()
testutil.TryReceive(ctx, t, done)
@@ -1854,7 +1854,7 @@ func TestDeleteOldChatFiles(t *testing.T) {
fileBoundary := createChatFile(ctx, t, db, rawDB, deps.user.ID, deps.org.ID, now.Add(-30*24*time.Hour).Add(time.Hour))
done := awaitDoTick(ctx, t, clk)
closer := dbpurge.New(ctx, logger, db, &codersdk.DeploymentValues{}, clk, prometheus.NewRegistry())
closer := dbpurge.New(ctx, logger, db, &codersdk.DeploymentValues{}, clk, prometheus.NewRegistry(), nil)
defer closer.Close()
testutil.TryReceive(ctx, t, done)
@@ -1934,7 +1934,7 @@ func TestDeleteOldChatFiles(t *testing.T) {
require.NoError(t, err)
done := awaitDoTick(ctx, t, clk)
closer := dbpurge.New(ctx, logger, db, &codersdk.DeploymentValues{}, clk, prometheus.NewRegistry())
closer := dbpurge.New(ctx, logger, db, &codersdk.DeploymentValues{}, clk, prometheus.NewRegistry(), nil)
defer closer.Close()
testutil.TryReceive(ctx, t, done)
+2 -1
View File
@@ -1293,7 +1293,8 @@ CREATE TABLE chat_files (
created_at timestamp with time zone DEFAULT now() NOT NULL,
name text DEFAULT ''::text NOT NULL,
mimetype text NOT NULL,
data bytea NOT NULL
data bytea,
object_store_key text
);
CREATE TABLE chat_messages (
@@ -0,0 +1,7 @@
-- Backfill any NULL data values before restoring NOT NULL would require
-- reading from the object store, which is not possible in a migration.
-- Instead, delete rows that only exist in the object store.
DELETE FROM chat_files WHERE data IS NULL;
ALTER TABLE chat_files ALTER COLUMN data SET NOT NULL;
ALTER TABLE chat_files DROP COLUMN object_store_key;
@@ -0,0 +1,8 @@
-- Add object_store_key to track files stored in external object storage.
-- When non-NULL, the file data lives in the object store under this key
-- and the data column may be NULL.
ALTER TABLE chat_files ADD COLUMN object_store_key TEXT;
-- Make data nullable so new writes can skip the BYTEA column when
-- storing in the object store.
ALTER TABLE chat_files ALTER COLUMN data DROP NOT NULL;
+8 -7
View File
@@ -4275,13 +4275,14 @@ type ChatDiffStatus struct {
}
type ChatFile struct {
ID uuid.UUID `db:"id" json:"id"`
OwnerID uuid.UUID `db:"owner_id" json:"owner_id"`
OrganizationID uuid.UUID `db:"organization_id" json:"organization_id"`
CreatedAt time.Time `db:"created_at" json:"created_at"`
Name string `db:"name" json:"name"`
Mimetype string `db:"mimetype" json:"mimetype"`
Data []byte `db:"data" json:"data"`
ID uuid.UUID `db:"id" json:"id"`
OwnerID uuid.UUID `db:"owner_id" json:"owner_id"`
OrganizationID uuid.UUID `db:"organization_id" json:"organization_id"`
CreatedAt time.Time `db:"created_at" json:"created_at"`
Name string `db:"name" json:"name"`
Mimetype string `db:"mimetype" json:"mimetype"`
Data []byte `db:"data" json:"data"`
ObjectStoreKey sql.NullString `db:"object_store_key" json:"object_store_key"`
}
type ChatFileLink struct {
+3 -1
View File
@@ -138,7 +138,9 @@ type sqlcQuerier interface {
// 1. Orphaned files not linked to any chat.
// 2. Files whose every referencing chat has been archived for longer
// than the retention period.
DeleteOldChatFiles(ctx context.Context, arg DeleteOldChatFilesParams) (int64, error)
// Returns the deleted rows so callers can clean up associated object
// store entries.
DeleteOldChatFiles(ctx context.Context, arg DeleteOldChatFilesParams) ([]DeleteOldChatFilesRow, error)
// Deletes chats that have been archived for longer than the given
// threshold. Active (non-archived) chats are never deleted.
// Related chat_messages, chat_diff_statuses, and
+37 -9
View File
@@ -2900,7 +2900,7 @@ func (q *sqlQuerier) UpsertBoundaryUsageStats(ctx context.Context, arg UpsertBou
return new_period, err
}
const deleteOldChatFiles = `-- name: DeleteOldChatFiles :execrows
const deleteOldChatFiles = `-- name: DeleteOldChatFiles :many
WITH kept_file_ids AS (
-- NOTE: This uses updated_at as a proxy for archive time
-- because there is no archived_at column. Correctness
@@ -2924,6 +2924,7 @@ deletable AS (
DELETE FROM chat_files
USING deletable
WHERE chat_files.id = deletable.id
RETURNING chat_files.id, chat_files.object_store_key
`
type DeleteOldChatFilesParams struct {
@@ -2931,6 +2932,11 @@ type DeleteOldChatFilesParams struct {
LimitCount int32 `db:"limit_count" json:"limit_count"`
}
type DeleteOldChatFilesRow struct {
ID uuid.UUID `db:"id" json:"id"`
ObjectStoreKey sql.NullString `db:"object_store_key" json:"object_store_key"`
}
// TODO(cian): Add indexes on chats(archived, updated_at) and
// chat_files(created_at) for purge query performance.
// See: https://github.com/coder/internal/issues/1438
@@ -2940,16 +2946,34 @@ type DeleteOldChatFilesParams struct {
// 1. Orphaned files not linked to any chat.
// 2. Files whose every referencing chat has been archived for longer
// than the retention period.
func (q *sqlQuerier) DeleteOldChatFiles(ctx context.Context, arg DeleteOldChatFilesParams) (int64, error) {
result, err := q.db.ExecContext(ctx, deleteOldChatFiles, arg.BeforeTime, arg.LimitCount)
//
// Returns the deleted rows so callers can clean up associated object
// store entries.
func (q *sqlQuerier) DeleteOldChatFiles(ctx context.Context, arg DeleteOldChatFilesParams) ([]DeleteOldChatFilesRow, error) {
rows, err := q.db.QueryContext(ctx, deleteOldChatFiles, arg.BeforeTime, arg.LimitCount)
if err != nil {
return 0, err
return nil, err
}
return result.RowsAffected()
defer rows.Close()
var items []DeleteOldChatFilesRow
for rows.Next() {
var i DeleteOldChatFilesRow
if err := rows.Scan(&i.ID, &i.ObjectStoreKey); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getChatFileByID = `-- name: GetChatFileByID :one
SELECT id, owner_id, organization_id, created_at, name, mimetype, data FROM chat_files WHERE id = $1::uuid
SELECT id, owner_id, organization_id, created_at, name, mimetype, data, object_store_key FROM chat_files WHERE id = $1::uuid
`
func (q *sqlQuerier) GetChatFileByID(ctx context.Context, id uuid.UUID) (ChatFile, error) {
@@ -2963,6 +2987,7 @@ func (q *sqlQuerier) GetChatFileByID(ctx context.Context, id uuid.UUID) (ChatFil
&i.Name,
&i.Mimetype,
&i.Data,
&i.ObjectStoreKey,
)
return i, err
}
@@ -3018,7 +3043,7 @@ func (q *sqlQuerier) GetChatFileMetadataByChatID(ctx context.Context, chatID uui
}
const getChatFilesByIDs = `-- name: GetChatFilesByIDs :many
SELECT id, owner_id, organization_id, created_at, name, mimetype, data FROM chat_files WHERE id = ANY($1::uuid[])
SELECT id, owner_id, organization_id, created_at, name, mimetype, data, object_store_key FROM chat_files WHERE id = ANY($1::uuid[])
`
func (q *sqlQuerier) GetChatFilesByIDs(ctx context.Context, ids []uuid.UUID) ([]ChatFile, error) {
@@ -3038,6 +3063,7 @@ func (q *sqlQuerier) GetChatFilesByIDs(ctx context.Context, ids []uuid.UUID) ([]
&i.Name,
&i.Mimetype,
&i.Data,
&i.ObjectStoreKey,
); err != nil {
return nil, err
}
@@ -3053,8 +3079,8 @@ func (q *sqlQuerier) GetChatFilesByIDs(ctx context.Context, ids []uuid.UUID) ([]
}
const insertChatFile = `-- name: InsertChatFile :one
INSERT INTO chat_files (owner_id, organization_id, name, mimetype, data)
VALUES ($1::uuid, $2::uuid, $3::text, $4::text, $5::bytea)
INSERT INTO chat_files (owner_id, organization_id, name, mimetype, data, object_store_key)
VALUES ($1::uuid, $2::uuid, $3::text, $4::text, $5::bytea, $6::text)
RETURNING id, owner_id, organization_id, created_at, name, mimetype
`
@@ -3064,6 +3090,7 @@ type InsertChatFileParams struct {
Name string `db:"name" json:"name"`
Mimetype string `db:"mimetype" json:"mimetype"`
Data []byte `db:"data" json:"data"`
ObjectStoreKey string `db:"object_store_key" json:"object_store_key"`
}
type InsertChatFileRow struct {
@@ -3082,6 +3109,7 @@ func (q *sqlQuerier) InsertChatFile(ctx context.Context, arg InsertChatFileParam
arg.Name,
arg.Mimetype,
arg.Data,
arg.ObjectStoreKey,
)
var i InsertChatFileRow
err := row.Scan(
+7 -4
View File
@@ -1,6 +1,6 @@
-- name: InsertChatFile :one
INSERT INTO chat_files (owner_id, organization_id, name, mimetype, data)
VALUES (@owner_id::uuid, @organization_id::uuid, @name::text, @mimetype::text, @data::bytea)
INSERT INTO chat_files (owner_id, organization_id, name, mimetype, data, object_store_key)
VALUES (@owner_id::uuid, @organization_id::uuid, @name::text, @mimetype::text, @data::bytea, @object_store_key::text)
RETURNING id, owner_id, organization_id, created_at, name, mimetype;
-- name: GetChatFileByID :one
@@ -22,13 +22,15 @@ ORDER BY cf.created_at ASC;
-- TODO(cian): Add indexes on chats(archived, updated_at) and
-- chat_files(created_at) for purge query performance.
-- See: https://github.com/coder/internal/issues/1438
-- name: DeleteOldChatFiles :execrows
-- name: DeleteOldChatFiles :many
-- Deletes chat files that are older than the given threshold and are
-- not referenced by any chat that is still active or was archived
-- within the same threshold window. This covers two cases:
-- 1. Orphaned files not linked to any chat.
-- 2. Files whose every referencing chat has been archived for longer
-- than the retention period.
-- Returns the deleted rows so callers can clean up associated object
-- store entries.
WITH kept_file_ids AS (
-- NOTE: This uses updated_at as a proxy for archive time
-- because there is no archived_at column. Correctness
@@ -51,4 +53,5 @@ deletable AS (
)
DELETE FROM chat_files
USING deletable
WHERE chat_files.id = deletable.id;
WHERE chat_files.id = deletable.id
RETURNING chat_files.id, chat_files.object_store_key;
+33 -1
View File
@@ -2988,6 +2988,8 @@ const (
maxChatFileSize = 10 << 20
// maxChatFileName is the maximum length of an uploaded file name.
maxChatFileName = 255
// chatFilesNamespace is the object store namespace for chat files.
chatFilesNamespace = "chatfiles"
)
// allowedChatFileMIMETypes lists the content types accepted for chat
@@ -3784,12 +3786,21 @@ func (api *API) postChatFile(rw http.ResponseWriter, r *http.Request) {
}
}
key := uuid.New().String()
if err := api.ObjectStore.Write(ctx, chatFilesNamespace, key, data); err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Failed to save chat file.",
Detail: err.Error(),
})
return
}
chatFile, err := api.Database.InsertChatFile(ctx, database.InsertChatFileParams{
OwnerID: apiKey.UserID,
OrganizationID: orgID,
Name: filename,
Mimetype: detected,
Data: data,
ObjectStoreKey: key,
})
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
@@ -3836,6 +3847,27 @@ func (api *API) chatFileByID(rw http.ResponseWriter, r *http.Request) {
rw.Header().Set("Content-Disposition", "inline")
}
rw.Header().Set("Cache-Control", "private, max-age=31536000, immutable")
// Serve from object store, falling back to the database BYTEA
// column for files that predate the migration.
if chatFile.ObjectStoreKey.Valid && chatFile.ObjectStoreKey.String != "" {
rc, info, err := api.ObjectStore.Read(ctx, chatFilesNamespace, chatFile.ObjectStoreKey.String)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Failed to read chat file from storage.",
Detail: err.Error(),
})
return
}
defer rc.Close()
rw.Header().Set("Content-Length", strconv.FormatInt(info.Size, 10))
rw.WriteHeader(http.StatusOK)
if _, err := io.Copy(rw, rc); err != nil {
api.Logger.Debug(ctx, "failed to stream chat file response", slog.Error(err))
}
return
}
rw.Header().Set("Content-Length", strconv.Itoa(len(chatFile.Data)))
rw.WriteHeader(http.StatusOK)
if _, err := rw.Write(chatFile.Data); err != nil {
+197
View File
@@ -0,0 +1,197 @@
package objstore
import (
"context"
"os"
"path/filepath"
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"gocloud.dev/blob"
"gocloud.dev/blob/fileblob"
"gocloud.dev/blob/gcsblob"
"gocloud.dev/blob/s3blob"
"gocloud.dev/gcp"
"golang.org/x/oauth2/google"
"golang.org/x/xerrors"
"github.com/coder/coder/v2/codersdk"
)
// Backend enumerates the supported storage backends.
type Backend string
const (
BackendLocal Backend = "local"
BackendS3 Backend = "s3"
BackendGCS Backend = "gcs"
)
// LocalConfig configures the local filesystem backend.
type LocalConfig struct {
// Dir is the root directory for stored objects. The directory
// is created if it does not exist.
Dir string
}
// S3Config configures an S3-compatible backend.
type S3Config struct {
Bucket string
Region string
// Prefix is an optional key prefix within the bucket.
Prefix string
// Endpoint is a custom S3-compatible endpoint (e.g. MinIO, R2).
// Leave empty for standard AWS S3.
Endpoint string
}
// GCSConfig configures a Google Cloud Storage backend.
type GCSConfig struct {
Bucket string
// Prefix is an optional key prefix within the bucket.
Prefix string
// CredentialsFile is an optional path to a service account key
// file. If empty, Application Default Credentials are used.
CredentialsFile string
}
// NewLocal creates a Store backed by the local filesystem.
func NewLocal(cfg LocalConfig) (Store, error) {
if cfg.Dir == "" {
return nil, xerrors.New("local object store directory is required")
}
if err := os.MkdirAll(cfg.Dir, 0o700); err != nil {
return nil, xerrors.Errorf("create object store directory %q: %w", cfg.Dir, err)
}
bucket, err := fileblob.OpenBucket(cfg.Dir, &fileblob.Options{
// Place temp files next to the target files instead of
// os.TempDir. This avoids EXDEV (cross-device link) errors
// when the storage directory is on a different filesystem.
NoTempDir: true,
// We handle metadata in the database, not in sidecar files.
Metadata: fileblob.MetadataDontWrite,
})
if err != nil {
return nil, xerrors.Errorf("open local bucket at %q: %w", cfg.Dir, err)
}
return newPrefixed(bucket, ""), nil
}
// NewS3 creates a Store backed by an S3-compatible service.
func NewS3(ctx context.Context, cfg S3Config) (Store, error) {
if cfg.Bucket == "" {
return nil, xerrors.New("S3 bucket name is required")
}
opts := []func(*awsconfig.LoadOptions) error{}
if cfg.Region != "" {
opts = append(opts, awsconfig.WithRegion(cfg.Region))
}
awsCfg, err := awsconfig.LoadDefaultConfig(ctx, opts...)
if err != nil {
return nil, xerrors.Errorf("load AWS config: %w", err)
}
s3Opts := []func(*s3.Options){}
if cfg.Endpoint != "" {
s3Opts = append(s3Opts, func(o *s3.Options) {
o.BaseEndpoint = aws.String(cfg.Endpoint)
o.UsePathStyle = true
})
}
client := s3.NewFromConfig(awsCfg, s3Opts...)
bucket, err := s3blob.OpenBucket(ctx, client, cfg.Bucket, nil)
if err != nil {
return nil, xerrors.Errorf("open S3 bucket %q: %w", cfg.Bucket, err)
}
return newPrefixed(bucket, cfg.Prefix), nil
}
// NewGCS creates a Store backed by Google Cloud Storage.
func NewGCS(ctx context.Context, cfg GCSConfig) (Store, error) {
if cfg.Bucket == "" {
return nil, xerrors.New("GCS bucket name is required")
}
var creds *google.Credentials
var err error
if cfg.CredentialsFile != "" {
jsonData, err := os.ReadFile(cfg.CredentialsFile)
if err != nil {
return nil, xerrors.Errorf("read GCS credentials file %q: %w", cfg.CredentialsFile, err)
}
//nolint:staticcheck // CredentialsFromJSON is the standard way to load service account keys.
creds, err = google.CredentialsFromJSON(ctx, jsonData, "https://www.googleapis.com/auth/cloud-platform")
if err != nil {
return nil, xerrors.Errorf("parse GCS credentials: %w", err)
}
} else {
creds, err = gcp.DefaultCredentials(ctx)
if err != nil {
return nil, xerrors.Errorf("obtain GCP default credentials: %w", err)
}
}
gcpClient, err := gcp.NewHTTPClient(gcp.DefaultTransport(), gcp.CredentialsTokenSource(creds))
if err != nil {
return nil, xerrors.Errorf("create GCP HTTP client: %w", err)
}
bucket, err := gcsblob.OpenBucket(ctx, gcpClient, cfg.Bucket, nil)
if err != nil {
return nil, xerrors.Errorf("open GCS bucket %q: %w", cfg.Bucket, err)
}
return newPrefixed(bucket, cfg.Prefix), nil
}
// newPrefixed wraps a bucket with an optional key prefix and returns
// a Store.
func newPrefixed(bucket *blob.Bucket, prefix string) Store {
if prefix != "" {
bucket = blob.PrefixedBucket(bucket, prefix+"/")
}
return New(bucket)
}
// FromConfig creates a Store from deployment configuration. The
// configDir is the Coder config directory (e.g. ~/.config/coderv2)
// and is used as the default root when the local backend is selected
// without an explicit directory.
func FromConfig(ctx context.Context, cfg codersdk.ObjectStoreConfig, configDir string) (Store, error) {
switch Backend(cfg.Backend.String()) {
case BackendLocal, "":
dir := cfg.LocalDir.String()
if dir == "" {
dir = filepath.Join(configDir, "objectstore")
}
return NewLocal(LocalConfig{Dir: dir})
case BackendS3:
return NewS3(ctx, S3Config{
Bucket: cfg.S3Bucket.String(),
Region: cfg.S3Region.String(),
Prefix: cfg.S3Prefix.String(),
Endpoint: cfg.S3Endpoint.String(),
})
case BackendGCS:
return NewGCS(ctx, GCSConfig{
Bucket: cfg.GCSBucket.String(),
Prefix: cfg.GCSPrefix.String(),
CredentialsFile: cfg.GCSCredentialsFile.String(),
})
default:
return nil, xerrors.Errorf("unknown object store backend: %q", cfg.Backend.String())
}
}
+63
View File
@@ -0,0 +1,63 @@
package objstore
import (
"context"
"io"
"iter"
"time"
"golang.org/x/xerrors"
)
// Sentinel errors.
var (
// ErrNotFound is returned when a Read or Delete targets a key
// that does not exist.
ErrNotFound = xerrors.New("object not found")
// ErrClosed is returned when an operation is attempted on a
// closed store.
ErrClosed = xerrors.New("object store closed")
)
// ObjectInfo describes a stored object.
type ObjectInfo struct {
// Key is the object's key within its namespace.
Key string
// Size is the object's size in bytes. May be -1 if unknown.
Size int64
// LastModified is the time the object was last written.
LastModified time.Time
}
// Store provides namespace-scoped CRUD operations on opaque binary
// objects. Namespaces are implicit string prefixes created on first
// write; they require no registration.
//
// Implementations must be safe for concurrent use.
type Store interface {
// Read returns a reader for the object at namespace/key. The
// caller MUST close the returned ReadCloser when done. Returns
// ErrNotFound if the object does not exist.
Read(ctx context.Context, namespace, key string) (io.ReadCloser, ObjectInfo, error)
// Write stores data at namespace/key. Semantics are
// unconditional put: last writer wins.
Write(ctx context.Context, namespace, key string, data []byte) error
// List returns an iterator over objects in the given namespace
// whose keys start with prefix. Pass "" for prefix to list all
// objects in the namespace.
//
// The iterator is lazy and fetches pages on demand. Context
// cancellation is respected between page fetches.
List(ctx context.Context, namespace, prefix string) iter.Seq2[ObjectInfo, error]
// Delete removes the object at namespace/key. Returns
// ErrNotFound if the object does not exist.
Delete(ctx context.Context, namespace, key string) error
// Close releases any resources held by the store.
// Operations after Close return ErrClosed.
io.Closer
}
@@ -0,0 +1,146 @@
// Package objstoretest provides an in-memory Store implementation
// for use in tests.
package objstoretest
import (
"bytes"
"context"
"io"
"iter"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/coder/coder/v2/coderd/objstore"
)
type memObject struct {
data []byte
modified time.Time
}
// MemoryStore is an in-memory implementation of objstore.Store.
// It is safe for concurrent use.
type MemoryStore struct {
mu sync.RWMutex
objects map[string]memObject // full key = namespace/key
closed atomic.Bool
}
// NewMemory returns a Store backed entirely by memory. Useful for
// unit tests that need object storage but don't care about backend
// specifics.
func NewMemory() *MemoryStore {
return &MemoryStore{
objects: make(map[string]memObject),
}
}
func (m *MemoryStore) Read(_ context.Context, namespace, key string) (io.ReadCloser, objstore.ObjectInfo, error) {
if m.closed.Load() {
return nil, objstore.ObjectInfo{}, objstore.ErrClosed
}
full := fullKey(namespace, key)
m.mu.RLock()
obj, ok := m.objects[full]
m.mu.RUnlock()
if !ok {
return nil, objstore.ObjectInfo{}, objstore.ErrNotFound
}
info := objstore.ObjectInfo{
Key: key,
Size: int64(len(obj.data)),
LastModified: obj.modified,
}
return io.NopCloser(bytes.NewReader(obj.data)), info, nil
}
func (m *MemoryStore) Write(_ context.Context, namespace, key string, data []byte) error {
if m.closed.Load() {
return objstore.ErrClosed
}
full := fullKey(namespace, key)
// Copy to avoid retaining caller's slice.
cp := make([]byte, len(data))
copy(cp, data)
m.mu.Lock()
m.objects[full] = memObject{
data: cp,
modified: time.Now(),
}
m.mu.Unlock()
return nil
}
func (m *MemoryStore) List(_ context.Context, namespace, prefix string) iter.Seq2[objstore.ObjectInfo, error] {
return func(yield func(objstore.ObjectInfo, error) bool) {
if m.closed.Load() {
yield(objstore.ObjectInfo{}, objstore.ErrClosed)
return
}
fullPrefix := namespace + "/"
if prefix != "" {
fullPrefix += prefix
}
m.mu.RLock()
defer m.mu.RUnlock()
for k, obj := range m.objects {
if !strings.HasPrefix(k, fullPrefix) {
continue
}
relKey := k[len(namespace)+1:]
info := objstore.ObjectInfo{
Key: relKey,
Size: int64(len(obj.data)),
LastModified: obj.modified,
}
if !yield(info, nil) {
return
}
}
}
}
func (m *MemoryStore) Delete(_ context.Context, namespace, key string) error {
if m.closed.Load() {
return objstore.ErrClosed
}
full := fullKey(namespace, key)
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.objects[full]; !ok {
return objstore.ErrNotFound
}
delete(m.objects, full)
return nil
}
func (m *MemoryStore) Close() error {
if m.closed.Swap(true) {
return nil
}
return nil
}
func fullKey(namespace, key string) string {
return namespace + "/" + key
}
// Compile-time check.
var _ objstore.Store = (*MemoryStore)(nil)
+150
View File
@@ -0,0 +1,150 @@
package objstore
import (
"context"
"io"
"iter"
"path"
"sync/atomic"
"gocloud.dev/blob"
"gocloud.dev/gcerrors"
"golang.org/x/xerrors"
)
// bucketStore implements Store on top of a gocloud.dev/blob.Bucket.
// Namespaces are mapped to key prefixes separated by "/".
type bucketStore struct {
bucket *blob.Bucket
closed atomic.Bool
}
// New wraps a gocloud.dev/blob.Bucket as a Store. The caller
// retains no ownership of the bucket after this call; Close on
// the returned Store will close the underlying bucket.
func New(bucket *blob.Bucket) Store {
return &bucketStore{bucket: bucket}
}
func (s *bucketStore) Read(ctx context.Context, namespace, key string) (io.ReadCloser, ObjectInfo, error) {
if s.closed.Load() {
return nil, ObjectInfo{}, ErrClosed
}
objKey := objectKey(namespace, key)
// Fetch attributes first so we can populate ObjectInfo
// before handing back the reader.
attrs, err := s.bucket.Attributes(ctx, objKey)
if err != nil {
return nil, ObjectInfo{}, mapError(err, namespace, key)
}
reader, err := s.bucket.NewReader(ctx, objKey, nil)
if err != nil {
return nil, ObjectInfo{}, mapError(err, namespace, key)
}
info := ObjectInfo{
Key: key,
Size: attrs.Size,
LastModified: attrs.ModTime,
}
return reader, info, nil
}
func (s *bucketStore) Write(ctx context.Context, namespace, key string, data []byte) error {
if s.closed.Load() {
return ErrClosed
}
return mapError(
s.bucket.WriteAll(ctx, objectKey(namespace, key), data, nil),
namespace, key,
)
}
func (s *bucketStore) List(ctx context.Context, namespace, prefix string) iter.Seq2[ObjectInfo, error] {
return func(yield func(ObjectInfo, error) bool) {
if s.closed.Load() {
yield(ObjectInfo{}, ErrClosed)
return
}
fullPrefix := namespace + "/"
if prefix != "" {
fullPrefix += prefix
}
it := s.bucket.List(&blob.ListOptions{
Prefix: fullPrefix,
})
for {
obj, err := it.Next(ctx)
if err != nil {
if err == io.EOF {
return
}
if !yield(ObjectInfo{}, xerrors.Errorf("list %q/%q: %w", namespace, prefix, err)) {
return
}
return
}
if obj.IsDir {
continue
}
// Strip namespace prefix from key to return
// namespace-relative keys.
relKey := obj.Key[len(namespace)+1:]
info := ObjectInfo{
Key: relKey,
Size: obj.Size,
LastModified: obj.ModTime,
}
if !yield(info, nil) {
return
}
}
}
}
func (s *bucketStore) Delete(ctx context.Context, namespace, key string) error {
if s.closed.Load() {
return ErrClosed
}
return mapError(
s.bucket.Delete(ctx, objectKey(namespace, key)),
namespace, key,
)
}
func (s *bucketStore) Close() error {
if s.closed.Swap(true) {
return nil
}
return s.bucket.Close()
}
// objectKey builds the full key from namespace and key.
func objectKey(namespace, key string) string {
return path.Join(namespace, key)
}
// mapError translates gocloud error codes into our sentinel
// errors.
func mapError(err error, namespace, key string) error {
if err == nil {
return nil
}
if gcerrors.Code(err) == gcerrors.NotFound {
return xerrors.Errorf("%s/%s: %w", namespace, key, ErrNotFound)
}
return err
}
// Compile-time check.
var _ Store = (*bucketStore)(nil)
+198
View File
@@ -0,0 +1,198 @@
package objstore_test
import (
"context"
"errors"
"io"
"slices"
"testing"
"github.com/stretchr/testify/require"
"github.com/coder/coder/v2/coderd/objstore"
)
func TestLocalFS(t *testing.T) {
t.Parallel()
newStore := func(t *testing.T) objstore.Store {
t.Helper()
store, err := objstore.NewLocal(objstore.LocalConfig{Dir: t.TempDir()})
require.NoError(t, err)
t.Cleanup(func() { store.Close() })
return store
}
t.Run("WriteAndRead", func(t *testing.T) {
t.Parallel()
store := newStore(t)
ctx := context.Background()
data := []byte("hello, object store")
err := store.Write(ctx, "ns", "key1", data)
require.NoError(t, err)
rc, info, err := store.Read(ctx, "ns", "key1")
require.NoError(t, err)
defer rc.Close()
require.Equal(t, "key1", info.Key)
require.Equal(t, int64(len(data)), info.Size)
require.False(t, info.LastModified.IsZero())
got, err := io.ReadAll(rc)
require.NoError(t, err)
require.Equal(t, data, got)
})
t.Run("ReadNotFound", func(t *testing.T) {
t.Parallel()
store := newStore(t)
ctx := context.Background()
_, _, err := store.Read(ctx, "ns", "nonexistent")
require.Error(t, err)
require.True(t, errors.Is(err, objstore.ErrNotFound), "expected ErrNotFound, got: %v", err)
})
t.Run("Overwrite", func(t *testing.T) {
t.Parallel()
store := newStore(t)
ctx := context.Background()
err := store.Write(ctx, "ns", "key1", []byte("v1"))
require.NoError(t, err)
err = store.Write(ctx, "ns", "key1", []byte("v2"))
require.NoError(t, err)
rc, _, err := store.Read(ctx, "ns", "key1")
require.NoError(t, err)
defer rc.Close()
got, err := io.ReadAll(rc)
require.NoError(t, err)
require.Equal(t, []byte("v2"), got)
})
t.Run("Delete", func(t *testing.T) {
t.Parallel()
store := newStore(t)
ctx := context.Background()
err := store.Write(ctx, "ns", "key1", []byte("data"))
require.NoError(t, err)
err = store.Delete(ctx, "ns", "key1")
require.NoError(t, err)
_, _, err = store.Read(ctx, "ns", "key1")
require.True(t, errors.Is(err, objstore.ErrNotFound))
})
t.Run("DeleteNotFound", func(t *testing.T) {
t.Parallel()
store := newStore(t)
ctx := context.Background()
err := store.Delete(ctx, "ns", "nonexistent")
require.True(t, errors.Is(err, objstore.ErrNotFound), "expected ErrNotFound, got: %v", err)
})
t.Run("ListAll", func(t *testing.T) {
t.Parallel()
store := newStore(t)
ctx := context.Background()
err := store.Write(ctx, "ns", "a", []byte("1"))
require.NoError(t, err)
err = store.Write(ctx, "ns", "b", []byte("2"))
require.NoError(t, err)
err = store.Write(ctx, "ns", "c", []byte("3"))
require.NoError(t, err)
var keys []string
for info, err := range store.List(ctx, "ns", "") {
require.NoError(t, err)
keys = append(keys, info.Key)
}
slices.Sort(keys)
require.Equal(t, []string{"a", "b", "c"}, keys)
})
t.Run("ListWithPrefix", func(t *testing.T) {
t.Parallel()
store := newStore(t)
ctx := context.Background()
err := store.Write(ctx, "ns", "logs/a", []byte("1"))
require.NoError(t, err)
err = store.Write(ctx, "ns", "logs/b", []byte("2"))
require.NoError(t, err)
err = store.Write(ctx, "ns", "other/c", []byte("3"))
require.NoError(t, err)
var keys []string
for info, err := range store.List(ctx, "ns", "logs/") {
require.NoError(t, err)
keys = append(keys, info.Key)
}
slices.Sort(keys)
require.Equal(t, []string{"logs/a", "logs/b"}, keys)
})
t.Run("ListEmptyNamespace", func(t *testing.T) {
t.Parallel()
store := newStore(t)
ctx := context.Background()
var count int
for _, err := range store.List(ctx, "empty", "") {
require.NoError(t, err)
count++
}
require.Zero(t, count)
})
t.Run("NamespaceIsolation", func(t *testing.T) {
t.Parallel()
store := newStore(t)
ctx := context.Background()
err := store.Write(ctx, "ns1", "key", []byte("ns1-data"))
require.NoError(t, err)
err = store.Write(ctx, "ns2", "key", []byte("ns2-data"))
require.NoError(t, err)
rc1, _, err := store.Read(ctx, "ns1", "key")
require.NoError(t, err)
got1, _ := io.ReadAll(rc1)
rc1.Close()
rc2, _, err := store.Read(ctx, "ns2", "key")
require.NoError(t, err)
got2, _ := io.ReadAll(rc2)
rc2.Close()
require.Equal(t, []byte("ns1-data"), got1)
require.Equal(t, []byte("ns2-data"), got2)
})
t.Run("CloseThenOps", func(t *testing.T) {
t.Parallel()
store, err := objstore.NewLocal(objstore.LocalConfig{Dir: t.TempDir()})
require.NoError(t, err)
err = store.Close()
require.NoError(t, err)
err = store.Write(context.Background(), "ns", "key", []byte("data"))
require.True(t, errors.Is(err, objstore.ErrClosed), "expected ErrClosed, got: %v", err)
_, _, err = store.Read(context.Background(), "ns", "key")
require.True(t, errors.Is(err, objstore.ErrClosed), "expected ErrClosed, got: %v", err)
})
}
+33 -2
View File
@@ -7,6 +7,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"maps"
"net/http"
"slices"
@@ -28,6 +29,7 @@ import (
"github.com/coder/coder/v2/coderd/database/db2sdk"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/objstore"
coderdpubsub "github.com/coder/coder/v2/coderd/pubsub"
"github.com/coder/coder/v2/coderd/util/ptr"
"github.com/coder/coder/v2/coderd/util/xjson"
@@ -145,6 +147,7 @@ type Server struct {
usageTracker *workspacestats.UsageTracker
clock quartz.Clock
recordingSem chan struct{}
objStore objstore.Store
// Configuration
pendingChatAcquireInterval time.Duration
@@ -2691,6 +2694,9 @@ type Config struct {
WebpushDispatcher webpush.Dispatcher
UsageTracker *workspacestats.UsageTracker
Clock quartz.Clock
// ObjectStore is optional. When set, chat file data is stored
// in the object store instead of the database BYTEA column.
ObjectStore objstore.Store
}
// New creates a new chat processor. The processor polls for pending
@@ -2756,6 +2762,7 @@ func New(cfg Config) *Server {
usageTracker: cfg.UsageTracker,
clock: clk,
recordingSem: make(chan struct{}, maxConcurrentRecordingUploads),
objStore: cfg.ObjectStore,
wakeCh: make(chan struct{}, 1),
heartbeatRegistry: make(map[uuid.UUID]*heartbeatEntry),
}
@@ -3908,7 +3915,9 @@ func (p *Server) subscribeChatControl(
}
// chatFileResolver returns a FileResolver that fetches chat file
// content from the database by ID.
// content. When an object store is configured and the file has an
// object_store_key, data is read from the store. Otherwise it falls
// back to the database BYTEA column.
func (p *Server) chatFileResolver() chatprompt.FileResolver {
return func(ctx context.Context, ids []uuid.UUID) (map[uuid.UUID]chatprompt.FileData, error) {
files, err := p.db.GetChatFilesByIDs(ctx, ids)
@@ -3917,9 +3926,13 @@ func (p *Server) chatFileResolver() chatprompt.FileResolver {
}
result := make(map[uuid.UUID]chatprompt.FileData, len(files))
for _, f := range files {
data, err := p.readChatFileData(ctx, f)
if err != nil {
return nil, xerrors.Errorf("read chat file %s: %w", f.ID, err)
}
result[f.ID] = chatprompt.FileData{
Name: f.Name,
Data: f.Data,
Data: data,
MediaType: f.Mimetype,
}
}
@@ -3927,6 +3940,24 @@ func (p *Server) chatFileResolver() chatprompt.FileResolver {
}
}
// chatFilesNamespace is the object store namespace for chat files.
const chatFilesNamespace = "chatfiles"
// readChatFileData returns the binary content for a chat file. It
// reads from the object store, falling back to the database BYTEA
// column for files that predate the migration.
func (p *Server) readChatFileData(ctx context.Context, f database.ChatFile) ([]byte, error) {
if f.ObjectStoreKey.Valid && f.ObjectStoreKey.String != "" {
rc, _, err := p.objStore.Read(ctx, chatFilesNamespace, f.ObjectStoreKey.String)
if err != nil {
return nil, err
}
defer rc.Close()
return io.ReadAll(rc)
}
return f.Data, nil
}
// tryAutoPromoteQueuedMessage pops the next queued message and converts it
// into a pending user message inside the caller's transaction. Queued
// messages were already admitted through SendMessage, so this preserves FIFO
+34 -19
View File
@@ -163,34 +163,24 @@ func (p *Server) stopAndStoreRecording(
}
}
// Second pass: store the collected data in the database.
// Second pass: store the collected data.
if videoData != nil {
//nolint:gocritic // AsChatd is required to insert chat files from the recording pipeline.
row, err := p.db.InsertChatFile(dbauthz.AsChatd(ctx), database.InsertChatFileParams{
OwnerID: ownerID,
OrganizationID: ws.OrganizationID,
Name: fmt.Sprintf("recording-%s.mp4", p.clock.Now().UTC().Format("2006-01-02T15-04-05Z")),
Mimetype: "video/mp4",
Data: videoData,
})
row, err := p.storeChatFile(ctx, ownerID, ws.OrganizationID,
fmt.Sprintf("recording-%s.mp4", p.clock.Now().UTC().Format("2006-01-02T15-04-05Z")),
"video/mp4", videoData)
if err != nil {
p.logger.Warn(ctx, "failed to store recording in database",
p.logger.Warn(ctx, "failed to store recording",
slog.Error(err))
} else {
result.recordingFileID = row.ID.String()
}
}
if thumbnailData != nil && result.recordingFileID != "" {
//nolint:gocritic // AsChatd is required to insert chat files from the recording pipeline.
row, err := p.db.InsertChatFile(dbauthz.AsChatd(ctx), database.InsertChatFileParams{
OwnerID: ownerID,
OrganizationID: ws.OrganizationID,
Name: fmt.Sprintf("thumbnail-%s.jpg", p.clock.Now().UTC().Format("2006-01-02T15-04-05Z")),
Mimetype: "image/jpeg",
Data: thumbnailData,
})
row, err := p.storeChatFile(ctx, ownerID, ws.OrganizationID,
fmt.Sprintf("thumbnail-%s.jpg", p.clock.Now().UTC().Format("2006-01-02T15-04-05Z")),
"image/jpeg", thumbnailData)
if err != nil {
p.logger.Warn(ctx, "failed to store thumbnail in database",
p.logger.Warn(ctx, "failed to store thumbnail",
slog.Error(err))
} else {
result.thumbnailFileID = row.ID.String()
@@ -199,3 +189,28 @@ func (p *Server) stopAndStoreRecording(
return result
}
// storeChatFile writes file data to the object store (when
// configured) and inserts a metadata row into chat_files. Falls back
// to storing the data directly in the database BYTEA column when no
// object store is available.
func (p *Server) storeChatFile(
ctx context.Context,
ownerID, orgID uuid.UUID,
name, mimetype string,
data []byte,
) (database.InsertChatFileRow, error) {
key := uuid.New().String()
if err := p.objStore.Write(ctx, chatFilesNamespace, key, data); err != nil {
return database.InsertChatFileRow{}, fmt.Errorf("write to object store: %w", err)
}
//nolint:gocritic // AsChatd is required to insert chat files from the recording pipeline.
return p.db.InsertChatFile(dbauthz.AsChatd(ctx), database.InsertChatFileParams{
OwnerID: ownerID,
OrganizationID: orgID,
Name: name,
Mimetype: mimetype,
ObjectStoreKey: key,
})
}
+122
View File
@@ -645,6 +645,7 @@ type DeploymentValues struct {
HideAITasks serpent.Bool `json:"hide_ai_tasks,omitempty" typescript:",notnull"`
AI AIConfig `json:"ai,omitempty"`
StatsCollection StatsCollectionConfig `json:"stats_collection,omitempty" typescript:",notnull"`
ObjectStore ObjectStoreConfig `json:"object_store,omitempty" typescript:",notnull"`
Config serpent.YAMLConfigPath `json:"config,omitempty" typescript:",notnull"`
WriteConfig serpent.Bool `json:"write_config,omitempty" typescript:",notnull"`
@@ -1046,6 +1047,31 @@ type HealthcheckConfig struct {
}
// RetentionConfig contains configuration for data retention policies.
// ObjectStoreConfig configures the object storage backend used for
// binary data such as chat files and transcripts.
type ObjectStoreConfig struct {
// Backend selects the storage backend: "local" (default), "s3", or "gcs".
Backend serpent.String `json:"backend" typescript:",notnull"`
// LocalDir is the root directory for the local filesystem backend.
// Only used when Backend is "local". Defaults to <config-dir>/objectstore/.
LocalDir serpent.String `json:"local_dir" typescript:",notnull"`
// S3Bucket is the S3 bucket name. Required when Backend is "s3".
S3Bucket serpent.String `json:"s3_bucket" typescript:",notnull"`
// S3Region is the AWS region for the S3 bucket.
S3Region serpent.String `json:"s3_region" typescript:",notnull"`
// S3Prefix is an optional key prefix within the S3 bucket.
S3Prefix serpent.String `json:"s3_prefix" typescript:",notnull"`
// S3Endpoint is a custom S3-compatible endpoint URL (for MinIO, R2, etc.).
S3Endpoint serpent.String `json:"s3_endpoint" typescript:",notnull"`
// GCSBucket is the GCS bucket name. Required when Backend is "gcs".
GCSBucket serpent.String `json:"gcs_bucket" typescript:",notnull"`
// GCSPrefix is an optional key prefix within the GCS bucket.
GCSPrefix serpent.String `json:"gcs_prefix" typescript:",notnull"`
// GCSCredentialsFile is an optional path to a GCS service account
// key file. If empty, Application Default Credentials are used.
GCSCredentialsFile serpent.String `json:"gcs_credentials_file" typescript:",notnull"`
}
// These settings control how long various types of data are retained in the database
// before being automatically purged. Setting a value to 0 disables retention for that
// data type (data is kept indefinitely).
@@ -1462,6 +1488,11 @@ func (c *DeploymentValues) Options() serpent.OptionSet {
Description: "Configure data retention policies for various database tables. Retention policies automatically purge old data to reduce database size and improve performance. Setting a retention duration to 0 disables automatic purging for that data type.",
YAML: "retention",
}
deploymentGroupObjectStore = serpent.Group{
Name: "Object Store",
Description: "Configure the object storage backend for binary data (chat files, transcripts, etc.). Defaults to local filesystem storage.",
YAML: "objectStore",
}
)
httpAddress := serpent.Option{
@@ -4018,6 +4049,97 @@ Write out the current server config as YAML to stdout.`,
YAML: "workspace_agent_logs",
Annotations: serpent.Annotations{}.Mark(annotationFormatDuration, "true"),
},
// Object Store options
{
Name: "Object Store Backend",
Description: "The storage backend for binary data such as chat files. Valid values: local, s3, gcs.",
Flag: "objectstore-backend",
Env: "CODER_OBJECTSTORE_BACKEND",
Value: &c.ObjectStore.Backend,
Default: "local",
Group: &deploymentGroupObjectStore,
YAML: "backend",
},
{
Name: "Object Store Local Directory",
Description: "Root directory for the local filesystem object store backend. Only used when the backend is \"local\".",
Flag: "objectstore-local-dir",
Env: "CODER_OBJECTSTORE_LOCAL_DIR",
Value: &c.ObjectStore.LocalDir,
Default: "",
Group: &deploymentGroupObjectStore,
YAML: "local_dir",
},
{
Name: "Object Store S3 Bucket",
Description: "S3 bucket name. Required when the backend is \"s3\".",
Flag: "objectstore-s3-bucket",
Env: "CODER_OBJECTSTORE_S3_BUCKET",
Value: &c.ObjectStore.S3Bucket,
Default: "",
Group: &deploymentGroupObjectStore,
YAML: "s3_bucket",
},
{
Name: "Object Store S3 Region",
Description: "AWS region for the S3 bucket.",
Flag: "objectstore-s3-region",
Env: "CODER_OBJECTSTORE_S3_REGION",
Value: &c.ObjectStore.S3Region,
Default: "",
Group: &deploymentGroupObjectStore,
YAML: "s3_region",
},
{
Name: "Object Store S3 Prefix",
Description: "Optional key prefix within the S3 bucket.",
Flag: "objectstore-s3-prefix",
Env: "CODER_OBJECTSTORE_S3_PREFIX",
Value: &c.ObjectStore.S3Prefix,
Default: "",
Group: &deploymentGroupObjectStore,
YAML: "s3_prefix",
},
{
Name: "Object Store S3 Endpoint",
Description: "Custom S3-compatible endpoint URL (e.g. for MinIO, R2, Cloudflare). Leave empty for standard AWS S3.",
Flag: "objectstore-s3-endpoint",
Env: "CODER_OBJECTSTORE_S3_ENDPOINT",
Value: &c.ObjectStore.S3Endpoint,
Default: "",
Group: &deploymentGroupObjectStore,
YAML: "s3_endpoint",
},
{
Name: "Object Store GCS Bucket",
Description: "GCS bucket name. Required when the backend is \"gcs\".",
Flag: "objectstore-gcs-bucket",
Env: "CODER_OBJECTSTORE_GCS_BUCKET",
Value: &c.ObjectStore.GCSBucket,
Default: "",
Group: &deploymentGroupObjectStore,
YAML: "gcs_bucket",
},
{
Name: "Object Store GCS Prefix",
Description: "Optional key prefix within the GCS bucket.",
Flag: "objectstore-gcs-prefix",
Env: "CODER_OBJECTSTORE_GCS_PREFIX",
Value: &c.ObjectStore.GCSPrefix,
Default: "",
Group: &deploymentGroupObjectStore,
YAML: "gcs_prefix",
},
{
Name: "Object Store GCS Credentials File",
Description: "Path to a GCS service account key file. If empty, Application Default Credentials are used.",
Flag: "objectstore-gcs-credentials-file",
Env: "CODER_OBJECTSTORE_GCS_CREDENTIALS_FILE",
Value: &c.ObjectStore.GCSCredentialsFile,
Default: "",
Group: &deploymentGroupObjectStore,
YAML: "gcs_credentials_file",
},
{
Name: "Enable Authorization Recordings",
Description: "All api requests will have a header including all authorization calls made during the request. " +
+2
View File
@@ -201,6 +201,8 @@ deployment. They will always be available from the agent.
| `coderd_db_tx_duration_seconds` | histogram | Duration of transactions in seconds. | `success` `tx_id` |
| `coderd_db_tx_executions_count` | counter | Total count of transactions executed. 'retries' is expected to be 0 for a successful transaction. | `retries` `success` `tx_id` |
| `coderd_dbpurge_iteration_duration_seconds` | histogram | Duration of each dbpurge iteration in seconds. | `success` |
| `coderd_dbpurge_objstore_delete_inflight` | gauge | Number of object store files currently enqueued for deletion. | |
| `coderd_dbpurge_objstore_files_deleted_total` | counter | Total number of object store files successfully deleted. | |
| `coderd_dbpurge_records_purged_total` | counter | Total number of records purged by type. | `record_type` |
| `coderd_experiments` | gauge | Indicates whether each experiment is enabled (1) or not (0) | `experiment` |
| `coderd_insights_applications_usage_seconds` | gauge | The application usage per template. | `application_name` `organization_name` `slug` `template_name` |
+11
View File
@@ -404,6 +404,17 @@ curl -X GET http://coder-server:8080/api/v2/deployment/config \
"enterprise_base_url": "string"
}
},
"object_store": {
"backend": "string",
"gcs_bucket": "string",
"gcs_credentials_file": "string",
"gcs_prefix": "string",
"local_dir": "string",
"s3_bucket": "string",
"s3_endpoint": "string",
"s3_prefix": "string",
"s3_region": "string"
},
"oidc": {
"allow_signups": true,
"auth_url_params": {},
+53
View File
@@ -3456,6 +3456,17 @@ CreateWorkspaceRequest provides options for creating a new workspace. Only one o
"enterprise_base_url": "string"
}
},
"object_store": {
"backend": "string",
"gcs_bucket": "string",
"gcs_credentials_file": "string",
"gcs_prefix": "string",
"local_dir": "string",
"s3_bucket": "string",
"s3_endpoint": "string",
"s3_prefix": "string",
"s3_region": "string"
},
"oidc": {
"allow_signups": true,
"auth_url_params": {},
@@ -4034,6 +4045,17 @@ CreateWorkspaceRequest provides options for creating a new workspace. Only one o
"enterprise_base_url": "string"
}
},
"object_store": {
"backend": "string",
"gcs_bucket": "string",
"gcs_credentials_file": "string",
"gcs_prefix": "string",
"local_dir": "string",
"s3_bucket": "string",
"s3_endpoint": "string",
"s3_prefix": "string",
"s3_region": "string"
},
"oidc": {
"allow_signups": true,
"auth_url_params": {},
@@ -4291,6 +4313,7 @@ CreateWorkspaceRequest provides options for creating a new workspace. Only one o
| `metrics_cache_refresh_interval` | integer | false | | |
| `notifications` | [codersdk.NotificationsConfig](#codersdknotificationsconfig) | false | | |
| `oauth2` | [codersdk.OAuth2Config](#codersdkoauth2config) | false | | |
| `object_store` | [codersdk.ObjectStoreConfig](#codersdkobjectstoreconfig) | false | | |
| `oidc` | [codersdk.OIDCConfig](#codersdkoidcconfig) | false | | |
| `pg_auth` | string | false | | |
| `pg_conn_max_idle` | string | false | | |
@@ -6444,6 +6467,36 @@ Only certain features set these fields: - FeatureManagedAgentLimit|
| `user_roles_default` | array of string | false | | |
| `username_field` | string | false | | |
## codersdk.ObjectStoreConfig
```json
{
"backend": "string",
"gcs_bucket": "string",
"gcs_credentials_file": "string",
"gcs_prefix": "string",
"local_dir": "string",
"s3_bucket": "string",
"s3_endpoint": "string",
"s3_prefix": "string",
"s3_region": "string"
}
```
### Properties
| Name | Type | Required | Restrictions | Description |
|------------------------|--------|----------|--------------|---------------------------------------------------------------------------------------------------------------------------------------------|
| `backend` | string | false | | Backend selects the storage backend: "local" (default), "s3", or "gcs". |
| `gcs_bucket` | string | false | | Gcs bucket is the GCS bucket name. Required when Backend is "gcs". |
| `gcs_credentials_file` | string | false | | Gcs credentials file is an optional path to a GCS service account key file. If empty, Application Default Credentials are used. |
| `gcs_prefix` | string | false | | Gcs prefix is an optional key prefix within the GCS bucket. |
| `local_dir` | string | false | | Local dir is the root directory for the local filesystem backend. Only used when Backend is "local". Defaults to <config-dir>/objectstore/. |
| `s3_bucket` | string | false | | S3 bucket is the S3 bucket name. Required when Backend is "s3". |
| `s3_endpoint` | string | false | | S3 endpoint is a custom S3-compatible endpoint URL (for MinIO, R2, etc.). |
| `s3_prefix` | string | false | | S3 prefix is an optional key prefix within the S3 bucket. |
| `s3_region` | string | false | | S3 region is the AWS region for the S3 bucket. |
## codersdk.OptionType
```json
+35
View File
@@ -774,6 +774,41 @@ OIDC OPTIONS:
requirement, and can lead to an insecure OIDC configuration. It is not
recommended to use this flag.
OBJECT STORE OPTIONS:
Configure the object storage backend for binary data (chat files, transcripts,
etc.). Defaults to local filesystem storage.
--objectstore-backend string, $CODER_OBJECTSTORE_BACKEND (default: local)
The storage backend for binary data such as chat files. Valid values:
local, s3, gcs.
--objectstore-gcs-bucket string, $CODER_OBJECTSTORE_GCS_BUCKET
GCS bucket name. Required when the backend is "gcs".
--objectstore-gcs-credentials-file string, $CODER_OBJECTSTORE_GCS_CREDENTIALS_FILE
Path to a GCS service account key file. If empty, Application Default
Credentials are used.
--objectstore-gcs-prefix string, $CODER_OBJECTSTORE_GCS_PREFIX
Optional key prefix within the GCS bucket.
--objectstore-local-dir string, $CODER_OBJECTSTORE_LOCAL_DIR
Root directory for the local filesystem object store backend. Only
used when the backend is "local".
--objectstore-s3-bucket string, $CODER_OBJECTSTORE_S3_BUCKET
S3 bucket name. Required when the backend is "s3".
--objectstore-s3-endpoint string, $CODER_OBJECTSTORE_S3_ENDPOINT
Custom S3-compatible endpoint URL (e.g. for MinIO, R2, Cloudflare).
Leave empty for standard AWS S3.
--objectstore-s3-prefix string, $CODER_OBJECTSTORE_S3_PREFIX
Optional key prefix within the S3 bucket.
--objectstore-s3-region string, $CODER_OBJECTSTORE_S3_REGION
AWS region for the S3 bucket.
PROVISIONING OPTIONS:
Tune the behavior of the provisioner, which is responsible for creating,
updating, and deleting workspace resources.
+4 -2
View File
@@ -494,6 +494,7 @@ require (
require (
charm.land/fantasy v0.8.1
github.com/anthropics/anthropic-sdk-go v1.19.0
github.com/aws/aws-sdk-go-v2/service/s3 v1.97.3
github.com/brianvoe/gofakeit/v7 v7.14.0
github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225
github.com/coder/aibridge v1.1.1-0.20260408143328-f72a795f1e77
@@ -508,6 +509,7 @@ require (
github.com/invopop/jsonschema v0.13.0
github.com/mark3labs/mcp-go v0.38.0
github.com/shopspring/decimal v1.4.0
gocloud.dev v0.45.0
gonum.org/v1/gonum v0.17.0
)
@@ -536,9 +538,9 @@ require (
github.com/aquasecurity/trivy v0.61.1-0.20250407075540-f1329c7ea1aa // indirect
github.com/aquasecurity/trivy-checks v1.12.2-0.20251219190323-79d27547baf5 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.8 // indirect
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.12 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.13 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.21 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.97.3 // indirect
github.com/aws/aws-sdk-go-v2/service/signin v1.0.8 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d // indirect
@@ -571,10 +573,10 @@ require (
github.com/go-openapi/swag/stringutils v0.25.4 // indirect
github.com/go-openapi/swag/typeutils v0.25.4 // indirect
github.com/go-openapi/swag/yamlutils v0.25.4 // indirect
github.com/go-sql-driver/mysql v1.9.3 // indirect
github.com/goccy/go-json v0.10.5 // indirect
github.com/goccy/go-yaml v1.19.2 // indirect
github.com/google/go-containerregistry v0.20.7 // indirect
github.com/google/wire v0.7.0 // indirect
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect
github.com/hashicorp/aws-sdk-go-base/v2 v2.0.0-beta.70 // indirect
github.com/hashicorp/go-getter v1.8.4 // indirect
+10
View File
@@ -171,6 +171,8 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.20 h1:zOgq3uezl5nznfoK3ODuqb
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.20/go.mod h1:z/MVwUARehy6GAg/yQ1GO2IMl0k++cu1ohP9zo887wE=
github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.6.14 h1:gKXU53GYsPuYgkdTdMHh6vNdcbIgoxFQLQGjg+iRG+k=
github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.6.14/go.mod h1:jyoemRAktfCyZR9bTb5gT3kn/Vj2KwYDm0Pev5TsmEQ=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.12 h1:Zy6Tme1AA13kX8x3CnkHx5cqdGWGaj/anwOiWGnA0Xo=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.12/go.mod h1:ql4uXYKoTM9WUAUSmthY4AtPVrlTBZOvnBJTiCUdPxI=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.21 h1:Rgg6wvjjtX8bNHcvi9OnXWwcE0a2vGpbwmtICOsvcf4=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.21/go.mod h1:A/kJFst/nm//cyqonihbdpQZwiUhhzpqTsdbhDdRF9c=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.21 h1:PEgGVtPoB6NTpPrBgqSE5hE/o47Ij9qk/SEZFbUOe9A=
@@ -659,6 +661,10 @@ github.com/google/go-github/v61 v61.0.0 h1:VwQCBwhyE9JclCI+22/7mLB1PuU9eowCXKY5p
github.com/google/go-github/v61 v61.0.0/go.mod h1:0WR+KmsWX75G2EbpyGsGmradjo3IiciuI4BmdVCobQY=
github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8=
github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU=
github.com/google/go-replayers/grpcreplay v1.3.0 h1:1Keyy0m1sIpqstQmgz307zhiJ1pV4uIlFds5weTmxbo=
github.com/google/go-replayers/grpcreplay v1.3.0/go.mod h1:v6NgKtkijC0d3e3RW8il6Sy5sqRVUwoQa4mHOGEy8DI=
github.com/google/go-replayers/httpreplay v1.2.0 h1:VM1wEyyjaoU53BwrOnaf9VhAyQQEEioJvFYxYcLRKzk=
github.com/google/go-replayers/httpreplay v1.2.0/go.mod h1:WahEFFZZ7a1P4VM1qEeHy+tME4bwyqPcwWbNlUI1Mcg=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
@@ -675,6 +681,8 @@ github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaU
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/wire v0.7.0 h1:JxUKI6+CVBgCO2WToKy/nQk0sS+amI9z9EjVmdaocj4=
github.com/google/wire v0.7.0/go.mod h1:n6YbUQD9cPKTnHXEBN2DXlOp/mVADhVErcMFb0v3J18=
github.com/googleapis/enterprise-certificate-proxy v0.3.14 h1:yh8ncqsbUY4shRD5dA6RlzjJaT4hi3kII+zYw8wmLb8=
github.com/googleapis/enterprise-certificate-proxy v0.3.14/go.mod h1:vqVt9yG9480NtzREnTlmGSBmFrA+bzb0yl0TxoBQXOg=
github.com/googleapis/gax-go/v2 v2.19.0 h1:fYQaUOiGwll0cGj7jmHT/0nPlcrZDFPrZRhTsoCr8hE=
@@ -1359,6 +1367,8 @@ go4.org/mem v0.0.0-20220726221520-4f986261bf13 h1:CbZeCBZ0aZj8EfVgnqQcYZgf0lpZ3H
go4.org/mem v0.0.0-20220726221520-4f986261bf13/go.mod h1:reUoABIJ9ikfM5sgtSF3Wushcza7+WeD01VB9Lirh3g=
go4.org/netipx v0.0.0-20230728180743-ad4cb58a6516 h1:X66ZEoMN2SuaoI/dfZVYobB6E5zjZyyHUMWlCA7MgGE=
go4.org/netipx v0.0.0-20230728180743-ad4cb58a6516/go.mod h1:TQvodOM+hJTioNQJilmLXu08JNb8i+ccq418+KWu1/Y=
gocloud.dev v0.45.0 h1:WknIK8IbRdmynDvara3Q7G6wQhmEiOGwpgJufbM39sY=
gocloud.dev v0.45.0/go.mod h1:0kXKmkCLG6d31N7NyLZWzt7jDSQura9zD/mWgiB6THI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200117160349-530e935923ad/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+6
View File
@@ -235,6 +235,12 @@ coderd_db_tx_executions_count{success="",retries="",tx_id=""} 0
# HELP coderd_dbpurge_iteration_duration_seconds Duration of each dbpurge iteration in seconds.
# TYPE coderd_dbpurge_iteration_duration_seconds histogram
coderd_dbpurge_iteration_duration_seconds{success=""} 0
# HELP coderd_dbpurge_objstore_delete_inflight Number of object store files currently enqueued for deletion.
# TYPE coderd_dbpurge_objstore_delete_inflight gauge
coderd_dbpurge_objstore_delete_inflight 0
# HELP coderd_dbpurge_objstore_files_deleted_total Total number of object store files successfully deleted.
# TYPE coderd_dbpurge_objstore_files_deleted_total counter
coderd_dbpurge_objstore_files_deleted_total 0
# HELP coderd_dbpurge_records_purged_total Total number of records purged by type.
# TYPE coderd_dbpurge_records_purged_total counter
coderd_dbpurge_records_purged_total{record_type=""} 0
+48 -1
View File
@@ -3256,6 +3256,7 @@ export interface DeploymentValues {
readonly hide_ai_tasks?: boolean;
readonly ai?: AIConfig;
readonly stats_collection?: StatsCollectionConfig;
readonly object_store?: ObjectStoreConfig;
readonly config?: string;
readonly write_config?: boolean;
/**
@@ -4983,6 +4984,53 @@ export interface OIDCConfig {
readonly redirect_url: string;
}
// From codersdk/deployment.go
/**
* RetentionConfig contains configuration for data retention policies.
* ObjectStoreConfig configures the object storage backend used for
* binary data such as chat files and transcripts.
*/
export interface ObjectStoreConfig {
/**
* Backend selects the storage backend: "local" (default), "s3", or "gcs".
*/
readonly backend: string;
/**
* LocalDir is the root directory for the local filesystem backend.
* Only used when Backend is "local". Defaults to <config-dir>/objectstore/.
*/
readonly local_dir: string;
/**
* S3Bucket is the S3 bucket name. Required when Backend is "s3".
*/
readonly s3_bucket: string;
/**
* S3Region is the AWS region for the S3 bucket.
*/
readonly s3_region: string;
/**
* S3Prefix is an optional key prefix within the S3 bucket.
*/
readonly s3_prefix: string;
/**
* S3Endpoint is a custom S3-compatible endpoint URL (for MinIO, R2, etc.).
*/
readonly s3_endpoint: string;
/**
* GCSBucket is the GCS bucket name. Required when Backend is "gcs".
*/
readonly gcs_bucket: string;
/**
* GCSPrefix is an optional key prefix within the GCS bucket.
*/
readonly gcs_prefix: string;
/**
* GCSCredentialsFile is an optional path to a GCS service account
* key file. If empty, Application Default Credentials are used.
*/
readonly gcs_credentials_file: string;
}
// From codersdk/parameters.go
export type OptionType = "bool" | "list(string)" | "number" | "string";
@@ -6096,7 +6144,6 @@ export interface ResumeTaskResponse {
// From codersdk/deployment.go
/**
* RetentionConfig contains configuration for data retention policies.
* These settings control how long various types of data are retained in the database
* before being automatically purged. Setting a value to 0 disables retention for that
* data type (data is kept indefinitely).