Compare commits

...

1 Commits

Author SHA1 Message Date
Garrett Delfosse 7ed3aa7877 exp: implement boundary logging telemetry
Add telemetry tracking for boundary feature usage:

- Add boundary_active_users and boundary_active_workspaces database tables
- Create BoundaryTelemetryCollector to collect usage in memory and flush to DB
- Record user/workspace/template IDs when boundary logs are received
- Include boundary active users/workspaces in telemetry snapshots
- Add cleanup of old boundary telemetry data (>24h) via dbpurge
2026-01-20 10:09:17 -05:00
18 changed files with 480 additions and 8 deletions
+11 -2
View File
@@ -865,6 +865,14 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
if err != nil {
return xerrors.Errorf("remove secrets from deployment values: %w", err)
}
// Create boundary telemetry collector for tracking boundary feature usage.
boundaryTelemetryCollector := telemetry.NewBoundaryTelemetryCollector(
options.Database,
logger.Named("boundary_telemetry"),
)
options.BoundaryTelemetryCollector = boundaryTelemetryCollector
telemetryReporter, err := telemetry.New(telemetry.Options{
Disabled: !vals.Telemetry.Enable.Value(),
BuiltinPostgres: builtinPostgres,
@@ -873,8 +881,9 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
Experiments: coderd.ReadExperiments(options.Logger, options.DeploymentValues.Experiments.Value()),
Logger: logger.Named("telemetry"),
URL: vals.Telemetry.URL.Value(),
Tunnel: tunnel != nil,
DeploymentConfig: deploymentConfigWithoutSecrets,
Tunnel: tunnel != nil,
DeploymentConfig: deploymentConfigWithoutSecrets,
BoundaryTelemetryCollector: boundaryTelemetryCollector,
ParseLicenseJWT: func(lic *telemetry.License) error {
// This will be nil when running in AGPL-only mode.
if options.ParseLicenseClaims == nil {
+8 -2
View File
@@ -25,6 +25,7 @@ import (
"github.com/coder/coder/v2/coderd/externalauth"
"github.com/coder/coder/v2/coderd/notifications"
"github.com/coder/coder/v2/coderd/prometheusmetrics"
"github.com/coder/coder/v2/coderd/telemetry"
"github.com/coder/coder/v2/coderd/tracing"
"github.com/coder/coder/v2/coderd/workspacestats"
"github.com/coder/coder/v2/coderd/wspubsub"
@@ -68,6 +69,7 @@ type Options struct {
AgentID uuid.UUID
OwnerID uuid.UUID
WorkspaceID uuid.UUID
TemplateID uuid.UUID
OrganizationID uuid.UUID
AuthenticatedCtx context.Context
@@ -84,6 +86,7 @@ type Options struct {
PublishWorkspaceUpdateFn func(ctx context.Context, userID uuid.UUID, event wspubsub.WorkspaceEvent)
PublishWorkspaceAgentLogsUpdateFn func(ctx context.Context, workspaceAgentID uuid.UUID, msg agentsdk.LogsNotifyMessage)
NetworkTelemetryHandler func(batch []*tailnetproto.TelemetryEvent)
BoundaryTelemetryCollector *telemetry.BoundaryTelemetryCollector
AccessURL *url.URL
AppHostname string
@@ -221,8 +224,11 @@ func New(opts Options, workspace database.Workspace) *API {
}
api.BoundaryLogsAPI = &BoundaryLogsAPI{
Log: opts.Log,
WorkspaceID: opts.WorkspaceID,
Log: opts.Log,
OwnerID: opts.OwnerID,
WorkspaceID: opts.WorkspaceID,
TemplateID: opts.TemplateID,
BoundaryTelemetryCollector: opts.BoundaryTelemetryCollector,
}
// Start background cache refresh loop to handle workspace changes
+11 -2
View File
@@ -8,14 +8,23 @@ import (
"cdr.dev/slog/v3"
agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/telemetry"
)
type BoundaryLogsAPI struct {
Log slog.Logger
WorkspaceID uuid.UUID
Log slog.Logger
OwnerID uuid.UUID
WorkspaceID uuid.UUID
TemplateID uuid.UUID
BoundaryTelemetryCollector *telemetry.BoundaryTelemetryCollector
}
func (a *BoundaryLogsAPI) ReportBoundaryLogs(ctx context.Context, req *agentproto.ReportBoundaryLogsRequest) (*agentproto.ReportBoundaryLogsResponse, error) {
// Record boundary usage for telemetry if we have any logs.
if len(req.Logs) > 0 && a.BoundaryTelemetryCollector != nil {
a.BoundaryTelemetryCollector.RecordBoundaryUsage(a.OwnerID, a.WorkspaceID, a.TemplateID)
}
for _, l := range req.Logs {
var logTime time.Time
if l.Time != nil {
+1
View File
@@ -197,6 +197,7 @@ type Options struct {
DERPMapUpdateFrequency time.Duration
NetworkTelemetryBatchFrequency time.Duration
NetworkTelemetryBatchMaxSize int
BoundaryTelemetryCollector *telemetry.BoundaryTelemetryCollector
SwaggerEndpoint bool
TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
UserQuietHoursScheduleStore *atomic.Pointer[schedule.UserQuietHoursScheduleStore]
+42
View File
@@ -1663,6 +1663,20 @@ func (q *querier) DeleteApplicationConnectAPIKeysByUserID(ctx context.Context, u
return q.db.DeleteApplicationConnectAPIKeysByUserID(ctx, userID)
}
func (q *querier) DeleteBoundaryActiveUsersBefore(ctx context.Context, recordedAt time.Time) error {
if err := q.authorizeContext(ctx, policy.ActionDelete, rbac.ResourceSystem); err != nil {
return err
}
return q.db.DeleteBoundaryActiveUsersBefore(ctx, recordedAt)
}
func (q *querier) DeleteBoundaryActiveWorkspacesBefore(ctx context.Context, recordedAt time.Time) error {
if err := q.authorizeContext(ctx, policy.ActionDelete, rbac.ResourceSystem); err != nil {
return err
}
return q.db.DeleteBoundaryActiveWorkspacesBefore(ctx, recordedAt)
}
func (q *querier) DeleteCoordinator(ctx context.Context, id uuid.UUID) error {
if err := q.authorizeContext(ctx, policy.ActionDelete, rbac.ResourceTailnetCoordinator); err != nil {
return err
@@ -2251,6 +2265,20 @@ func (q *querier) GetAuthorizationUserRoles(ctx context.Context, userID uuid.UUI
return q.db.GetAuthorizationUserRoles(ctx, userID)
}
func (q *querier) GetBoundaryActiveUsersSince(ctx context.Context, recordedAt time.Time) ([]uuid.UUID, error) {
if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceSystem); err != nil {
return nil, err
}
return q.db.GetBoundaryActiveUsersSince(ctx, recordedAt)
}
func (q *querier) GetBoundaryActiveWorkspacesSince(ctx context.Context, recordedAt time.Time) ([]database.GetBoundaryActiveWorkspacesSinceRow, error) {
if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceSystem); err != nil {
return nil, err
}
return q.db.GetBoundaryActiveWorkspacesSince(ctx, recordedAt)
}
func (q *querier) GetConnectionLogsOffset(ctx context.Context, arg database.GetConnectionLogsOffsetParams) ([]database.GetConnectionLogsOffsetRow, error) {
// Just like with the audit logs query, shortcut if the user is an owner.
err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceConnectionLog)
@@ -4153,6 +4181,20 @@ func (q *querier) InsertAuditLog(ctx context.Context, arg database.InsertAuditLo
return insert(q.log, q.auth, rbac.ResourceAuditLog, q.db.InsertAuditLog)(ctx, arg)
}
func (q *querier) InsertBoundaryActiveUser(ctx context.Context, arg database.InsertBoundaryActiveUserParams) error {
if err := q.authorizeContext(ctx, policy.ActionCreate, rbac.ResourceSystem); err != nil {
return err
}
return q.db.InsertBoundaryActiveUser(ctx, arg)
}
func (q *querier) InsertBoundaryActiveWorkspace(ctx context.Context, arg database.InsertBoundaryActiveWorkspaceParams) error {
if err := q.authorizeContext(ctx, policy.ActionCreate, rbac.ResourceSystem); err != nil {
return err
}
return q.db.InsertBoundaryActiveWorkspace(ctx, arg)
}
func (q *querier) InsertCryptoKey(ctx context.Context, arg database.InsertCryptoKeyParams) (database.CryptoKey, error) {
if err := q.authorizeContext(ctx, policy.ActionCreate, rbac.ResourceCryptoKey); err != nil {
return database.CryptoKey{}, err
+48
View File
@@ -336,6 +336,22 @@ func (m queryMetricsStore) DeleteApplicationConnectAPIKeysByUserID(ctx context.C
return r0
}
func (m queryMetricsStore) DeleteBoundaryActiveUsersBefore(ctx context.Context, recordedAt time.Time) error {
start := time.Now()
r0 := m.s.DeleteBoundaryActiveUsersBefore(ctx, recordedAt)
m.queryLatencies.WithLabelValues("DeleteBoundaryActiveUsersBefore").Observe(time.Since(start).Seconds())
m.queryCounts.WithLabelValues(httpmw.ExtractHTTPRoute(ctx), httpmw.ExtractHTTPMethod(ctx), "DeleteBoundaryActiveUsersBefore").Inc()
return r0
}
func (m queryMetricsStore) DeleteBoundaryActiveWorkspacesBefore(ctx context.Context, recordedAt time.Time) error {
start := time.Now()
r0 := m.s.DeleteBoundaryActiveWorkspacesBefore(ctx, recordedAt)
m.queryLatencies.WithLabelValues("DeleteBoundaryActiveWorkspacesBefore").Observe(time.Since(start).Seconds())
m.queryCounts.WithLabelValues(httpmw.ExtractHTTPRoute(ctx), httpmw.ExtractHTTPMethod(ctx), "DeleteBoundaryActiveWorkspacesBefore").Inc()
return r0
}
func (m queryMetricsStore) DeleteCoordinator(ctx context.Context, id uuid.UUID) error {
start := time.Now()
r0 := m.s.DeleteCoordinator(ctx, id)
@@ -927,6 +943,22 @@ func (m queryMetricsStore) GetAuthorizationUserRoles(ctx context.Context, userID
return r0, r1
}
func (m queryMetricsStore) GetBoundaryActiveUsersSince(ctx context.Context, recordedAt time.Time) ([]uuid.UUID, error) {
start := time.Now()
r0, r1 := m.s.GetBoundaryActiveUsersSince(ctx, recordedAt)
m.queryLatencies.WithLabelValues("GetBoundaryActiveUsersSince").Observe(time.Since(start).Seconds())
m.queryCounts.WithLabelValues(httpmw.ExtractHTTPRoute(ctx), httpmw.ExtractHTTPMethod(ctx), "GetBoundaryActiveUsersSince").Inc()
return r0, r1
}
func (m queryMetricsStore) GetBoundaryActiveWorkspacesSince(ctx context.Context, recordedAt time.Time) ([]database.GetBoundaryActiveWorkspacesSinceRow, error) {
start := time.Now()
r0, r1 := m.s.GetBoundaryActiveWorkspacesSince(ctx, recordedAt)
m.queryLatencies.WithLabelValues("GetBoundaryActiveWorkspacesSince").Observe(time.Since(start).Seconds())
m.queryCounts.WithLabelValues(httpmw.ExtractHTTPRoute(ctx), httpmw.ExtractHTTPMethod(ctx), "GetBoundaryActiveWorkspacesSince").Inc()
return r0, r1
}
func (m queryMetricsStore) GetConnectionLogsOffset(ctx context.Context, arg database.GetConnectionLogsOffsetParams) ([]database.GetConnectionLogsOffsetRow, error) {
start := time.Now()
r0, r1 := m.s.GetConnectionLogsOffset(ctx, arg)
@@ -2727,6 +2759,22 @@ func (m queryMetricsStore) InsertAuditLog(ctx context.Context, arg database.Inse
return r0, r1
}
func (m queryMetricsStore) InsertBoundaryActiveUser(ctx context.Context, arg database.InsertBoundaryActiveUserParams) error {
start := time.Now()
r0 := m.s.InsertBoundaryActiveUser(ctx, arg)
m.queryLatencies.WithLabelValues("InsertBoundaryActiveUser").Observe(time.Since(start).Seconds())
m.queryCounts.WithLabelValues(httpmw.ExtractHTTPRoute(ctx), httpmw.ExtractHTTPMethod(ctx), "InsertBoundaryActiveUser").Inc()
return r0
}
func (m queryMetricsStore) InsertBoundaryActiveWorkspace(ctx context.Context, arg database.InsertBoundaryActiveWorkspaceParams) error {
start := time.Now()
r0 := m.s.InsertBoundaryActiveWorkspace(ctx, arg)
m.queryLatencies.WithLabelValues("InsertBoundaryActiveWorkspace").Observe(time.Since(start).Seconds())
m.queryCounts.WithLabelValues(httpmw.ExtractHTTPRoute(ctx), httpmw.ExtractHTTPMethod(ctx), "InsertBoundaryActiveWorkspace").Inc()
return r0
}
func (m queryMetricsStore) InsertCryptoKey(ctx context.Context, arg database.InsertCryptoKeyParams) (database.CryptoKey, error) {
start := time.Now()
r0, r1 := m.s.InsertCryptoKey(ctx, arg)
+11
View File
@@ -34,6 +34,9 @@ const (
// long enough to cover the maximum interval of a heartbeat event (currently
// 1 hour) plus some buffer.
maxTelemetryHeartbeatAge = 24 * time.Hour
// Boundary telemetry data only needs to persist for 24 hours since snapshots
// are taken every 30 minutes.
maxBoundaryTelemetryAge = 24 * time.Hour
)
// New creates a new periodically purging database instance.
@@ -127,6 +130,14 @@ func New(ctx context.Context, logger slog.Logger, db database.Store, vals *coder
return xerrors.Errorf("failed to delete old telemetry locks: %w", err)
}
deleteOldBoundaryTelemetryBefore := start.Add(-maxBoundaryTelemetryAge)
if err := tx.DeleteBoundaryActiveUsersBefore(ctx, deleteOldBoundaryTelemetryBefore); err != nil {
return xerrors.Errorf("failed to delete old boundary active users: %w", err)
}
if err := tx.DeleteBoundaryActiveWorkspacesBefore(ctx, deleteOldBoundaryTelemetryBefore); err != nil {
return xerrors.Errorf("failed to delete old boundary active workspaces: %w", err)
}
deleteOldAuditLogConnectionEventsBefore := start.Add(-maxAuditLogConnectionEventAge)
if err := tx.DeleteOldAuditLogConnectionEvents(ctx, database.DeleteOldAuditLogConnectionEventsParams{
BeforeTime: deleteOldAuditLogConnectionEventsBefore,
+27
View File
@@ -1182,6 +1182,19 @@ CREATE TABLE audit_logs (
resource_icon text NOT NULL
);
CREATE TABLE boundary_active_users (
id uuid DEFAULT gen_random_uuid() NOT NULL,
user_id uuid NOT NULL,
recorded_at timestamp with time zone DEFAULT now() NOT NULL
);
CREATE TABLE boundary_active_workspaces (
id uuid DEFAULT gen_random_uuid() NOT NULL,
workspace_id uuid NOT NULL,
template_id uuid NOT NULL,
recorded_at timestamp with time zone DEFAULT now() NOT NULL
);
CREATE TABLE connection_logs (
id uuid NOT NULL,
connect_time timestamp with time zone NOT NULL,
@@ -3019,6 +3032,12 @@ ALTER TABLE ONLY api_keys
ALTER TABLE ONLY audit_logs
ADD CONSTRAINT audit_logs_pkey PRIMARY KEY (id);
ALTER TABLE ONLY boundary_active_users
ADD CONSTRAINT boundary_active_users_pkey PRIMARY KEY (id);
ALTER TABLE ONLY boundary_active_workspaces
ADD CONSTRAINT boundary_active_workspaces_pkey PRIMARY KEY (id);
ALTER TABLE ONLY connection_logs
ADD CONSTRAINT connection_logs_pkey PRIMARY KEY (id);
@@ -3350,6 +3369,14 @@ CREATE INDEX idx_audit_log_user_id ON audit_logs USING btree (user_id);
CREATE INDEX idx_audit_logs_time_desc ON audit_logs USING btree ("time" DESC);
CREATE INDEX idx_boundary_active_users_recorded_at ON boundary_active_users USING btree (recorded_at);
CREATE INDEX idx_boundary_active_users_user_id ON boundary_active_users USING btree (user_id);
CREATE INDEX idx_boundary_active_workspaces_recorded_at ON boundary_active_workspaces USING btree (recorded_at);
CREATE INDEX idx_boundary_active_workspaces_workspace_id ON boundary_active_workspaces USING btree (workspace_id);
CREATE INDEX idx_connection_logs_connect_time_desc ON connection_logs USING btree (connect_time DESC);
CREATE UNIQUE INDEX idx_connection_logs_connection_id_workspace_id_agent_name ON connection_logs USING btree (connection_id, workspace_id, agent_name);
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS boundary_active_workspaces;
DROP TABLE IF EXISTS boundary_active_users;
@@ -0,0 +1,21 @@
-- Tables to track boundary feature usage for telemetry reporting.
-- Data is collected from boundary log streams and inserted periodically by each replica.
CREATE TABLE boundary_active_users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL,
recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_boundary_active_users_user_id ON boundary_active_users(user_id);
CREATE INDEX idx_boundary_active_users_recorded_at ON boundary_active_users(recorded_at);
CREATE TABLE boundary_active_workspaces (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workspace_id UUID NOT NULL,
template_id UUID NOT NULL,
recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_boundary_active_workspaces_workspace_id ON boundary_active_workspaces(workspace_id);
CREATE INDEX idx_boundary_active_workspaces_recorded_at ON boundary_active_workspaces(recorded_at);
+13
View File
@@ -3693,6 +3693,19 @@ type AuditLog struct {
ResourceIcon string `db:"resource_icon" json:"resource_icon"`
}
type BoundaryActiveUser struct {
ID uuid.UUID `db:"id" json:"id"`
UserID uuid.UUID `db:"user_id" json:"user_id"`
RecordedAt time.Time `db:"recorded_at" json:"recorded_at"`
}
type BoundaryActiveWorkspace struct {
ID uuid.UUID `db:"id" json:"id"`
WorkspaceID uuid.UUID `db:"workspace_id" json:"workspace_id"`
TemplateID uuid.UUID `db:"template_id" json:"template_id"`
RecordedAt time.Time `db:"recorded_at" json:"recorded_at"`
}
type ConnectionLog struct {
ID uuid.UUID `db:"id" json:"id"`
ConnectTime time.Time `db:"connect_time" json:"connect_time"`
+6
View File
@@ -88,6 +88,8 @@ type sqlcQuerier interface {
// be recreated.
DeleteAllWebpushSubscriptions(ctx context.Context) error
DeleteApplicationConnectAPIKeysByUserID(ctx context.Context, userID uuid.UUID) error
DeleteBoundaryActiveUsersBefore(ctx context.Context, recordedAt time.Time) error
DeleteBoundaryActiveWorkspacesBefore(ctx context.Context, recordedAt time.Time) error
DeleteCoordinator(ctx context.Context, id uuid.UUID) error
DeleteCryptoKey(ctx context.Context, arg DeleteCryptoKeyParams) (CryptoKey, error)
DeleteCustomRole(ctx context.Context, arg DeleteCustomRoleParams) error
@@ -193,6 +195,8 @@ type sqlcQuerier interface {
// This function returns roles for authorization purposes. Implied member roles
// are included.
GetAuthorizationUserRoles(ctx context.Context, userID uuid.UUID) (GetAuthorizationUserRolesRow, error)
GetBoundaryActiveUsersSince(ctx context.Context, recordedAt time.Time) ([]uuid.UUID, error)
GetBoundaryActiveWorkspacesSince(ctx context.Context, recordedAt time.Time) ([]GetBoundaryActiveWorkspacesSinceRow, error)
GetConnectionLogsOffset(ctx context.Context, arg GetConnectionLogsOffsetParams) ([]GetConnectionLogsOffsetRow, error)
GetCoordinatorResumeTokenSigningKey(ctx context.Context) (string, error)
GetCryptoKeyByFeatureAndSequence(ctx context.Context, arg GetCryptoKeyByFeatureAndSequenceParams) (CryptoKey, error)
@@ -545,6 +549,8 @@ type sqlcQuerier interface {
// every member of the org.
InsertAllUsersGroup(ctx context.Context, organizationID uuid.UUID) (Group, error)
InsertAuditLog(ctx context.Context, arg InsertAuditLogParams) (AuditLog, error)
InsertBoundaryActiveUser(ctx context.Context, arg InsertBoundaryActiveUserParams) error
InsertBoundaryActiveWorkspace(ctx context.Context, arg InsertBoundaryActiveWorkspaceParams) error
InsertCryptoKey(ctx context.Context, arg InsertCryptoKeyParams) (CryptoKey, error)
InsertCustomRole(ctx context.Context, arg InsertCustomRoleParams) (CustomRole, error)
InsertDBCryptKey(ctx context.Context, arg InsertDBCryptKeyParams) error
+110
View File
@@ -1980,6 +1980,116 @@ func (q *sqlQuerier) InsertAuditLog(ctx context.Context, arg InsertAuditLogParam
return i, err
}
const deleteBoundaryActiveUsersBefore = `-- name: DeleteBoundaryActiveUsersBefore :exec
DELETE FROM boundary_active_users WHERE recorded_at < $1
`
func (q *sqlQuerier) DeleteBoundaryActiveUsersBefore(ctx context.Context, recordedAt time.Time) error {
_, err := q.db.ExecContext(ctx, deleteBoundaryActiveUsersBefore, recordedAt)
return err
}
const deleteBoundaryActiveWorkspacesBefore = `-- name: DeleteBoundaryActiveWorkspacesBefore :exec
DELETE FROM boundary_active_workspaces WHERE recorded_at < $1
`
func (q *sqlQuerier) DeleteBoundaryActiveWorkspacesBefore(ctx context.Context, recordedAt time.Time) error {
_, err := q.db.ExecContext(ctx, deleteBoundaryActiveWorkspacesBefore, recordedAt)
return err
}
const getBoundaryActiveUsersSince = `-- name: GetBoundaryActiveUsersSince :many
SELECT DISTINCT user_id FROM boundary_active_users
WHERE recorded_at > $1
`
func (q *sqlQuerier) GetBoundaryActiveUsersSince(ctx context.Context, recordedAt time.Time) ([]uuid.UUID, error) {
rows, err := q.db.QueryContext(ctx, getBoundaryActiveUsersSince, recordedAt)
if err != nil {
return nil, err
}
defer rows.Close()
var items []uuid.UUID
for rows.Next() {
var user_id uuid.UUID
if err := rows.Scan(&user_id); err != nil {
return nil, err
}
items = append(items, user_id)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getBoundaryActiveWorkspacesSince = `-- name: GetBoundaryActiveWorkspacesSince :many
SELECT DISTINCT workspace_id, template_id FROM boundary_active_workspaces
WHERE recorded_at > $1
`
type GetBoundaryActiveWorkspacesSinceRow struct {
WorkspaceID uuid.UUID `db:"workspace_id" json:"workspace_id"`
TemplateID uuid.UUID `db:"template_id" json:"template_id"`
}
func (q *sqlQuerier) GetBoundaryActiveWorkspacesSince(ctx context.Context, recordedAt time.Time) ([]GetBoundaryActiveWorkspacesSinceRow, error) {
rows, err := q.db.QueryContext(ctx, getBoundaryActiveWorkspacesSince, recordedAt)
if err != nil {
return nil, err
}
defer rows.Close()
var items []GetBoundaryActiveWorkspacesSinceRow
for rows.Next() {
var i GetBoundaryActiveWorkspacesSinceRow
if err := rows.Scan(&i.WorkspaceID, &i.TemplateID); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const insertBoundaryActiveUser = `-- name: InsertBoundaryActiveUser :exec
INSERT INTO boundary_active_users (user_id, recorded_at)
VALUES ($1, $2)
`
type InsertBoundaryActiveUserParams struct {
UserID uuid.UUID `db:"user_id" json:"user_id"`
RecordedAt time.Time `db:"recorded_at" json:"recorded_at"`
}
func (q *sqlQuerier) InsertBoundaryActiveUser(ctx context.Context, arg InsertBoundaryActiveUserParams) error {
_, err := q.db.ExecContext(ctx, insertBoundaryActiveUser, arg.UserID, arg.RecordedAt)
return err
}
const insertBoundaryActiveWorkspace = `-- name: InsertBoundaryActiveWorkspace :exec
INSERT INTO boundary_active_workspaces (workspace_id, template_id, recorded_at)
VALUES ($1, $2, $3)
`
type InsertBoundaryActiveWorkspaceParams struct {
WorkspaceID uuid.UUID `db:"workspace_id" json:"workspace_id"`
TemplateID uuid.UUID `db:"template_id" json:"template_id"`
RecordedAt time.Time `db:"recorded_at" json:"recorded_at"`
}
func (q *sqlQuerier) InsertBoundaryActiveWorkspace(ctx context.Context, arg InsertBoundaryActiveWorkspaceParams) error {
_, err := q.db.ExecContext(ctx, insertBoundaryActiveWorkspace, arg.WorkspaceID, arg.TemplateID, arg.RecordedAt)
return err
}
const countConnectionLogs = `-- name: CountConnectionLogs :one
SELECT
COUNT(*) AS count
@@ -0,0 +1,21 @@
-- name: InsertBoundaryActiveUser :exec
INSERT INTO boundary_active_users (user_id, recorded_at)
VALUES ($1, $2);
-- name: InsertBoundaryActiveWorkspace :exec
INSERT INTO boundary_active_workspaces (workspace_id, template_id, recorded_at)
VALUES ($1, $2, $3);
-- name: GetBoundaryActiveUsersSince :many
SELECT DISTINCT user_id FROM boundary_active_users
WHERE recorded_at > $1;
-- name: GetBoundaryActiveWorkspacesSince :many
SELECT DISTINCT workspace_id, template_id FROM boundary_active_workspaces
WHERE recorded_at > $1;
-- name: DeleteBoundaryActiveUsersBefore :exec
DELETE FROM boundary_active_users WHERE recorded_at < $1;
-- name: DeleteBoundaryActiveWorkspacesBefore :exec
DELETE FROM boundary_active_workspaces WHERE recorded_at < $1;
+2
View File
@@ -13,6 +13,8 @@ const (
UniqueAibridgeUserPromptsPkey UniqueConstraint = "aibridge_user_prompts_pkey" // ALTER TABLE ONLY aibridge_user_prompts ADD CONSTRAINT aibridge_user_prompts_pkey PRIMARY KEY (id);
UniqueAPIKeysPkey UniqueConstraint = "api_keys_pkey" // ALTER TABLE ONLY api_keys ADD CONSTRAINT api_keys_pkey PRIMARY KEY (id);
UniqueAuditLogsPkey UniqueConstraint = "audit_logs_pkey" // ALTER TABLE ONLY audit_logs ADD CONSTRAINT audit_logs_pkey PRIMARY KEY (id);
UniqueBoundaryActiveUsersPkey UniqueConstraint = "boundary_active_users_pkey" // ALTER TABLE ONLY boundary_active_users ADD CONSTRAINT boundary_active_users_pkey PRIMARY KEY (id);
UniqueBoundaryActiveWorkspacesPkey UniqueConstraint = "boundary_active_workspaces_pkey" // ALTER TABLE ONLY boundary_active_workspaces ADD CONSTRAINT boundary_active_workspaces_pkey PRIMARY KEY (id);
UniqueConnectionLogsPkey UniqueConstraint = "connection_logs_pkey" // ALTER TABLE ONLY connection_logs ADD CONSTRAINT connection_logs_pkey PRIMARY KEY (id);
UniqueCryptoKeysPkey UniqueConstraint = "crypto_keys_pkey" // ALTER TABLE ONLY crypto_keys ADD CONSTRAINT crypto_keys_pkey PRIMARY KEY (feature, sequence);
UniqueCustomRolesUniqueKey UniqueConstraint = "custom_roles_unique_key" // ALTER TABLE ONLY custom_roles ADD CONSTRAINT custom_roles_unique_key UNIQUE (name, organization_id);
+93
View File
@@ -0,0 +1,93 @@
package telemetry
import (
"context"
"sync"
"time"
"github.com/google/uuid"
"cdr.dev/slog/v3"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbtime"
)
// BoundaryTelemetryCollector collects boundary feature usage data in memory
// and periodically flushes it to the database for telemetry reporting.
type BoundaryTelemetryCollector struct {
db database.Store
logger slog.Logger
mu sync.Mutex
activeUsers map[uuid.UUID]struct{}
activeWorkspaces map[uuid.UUID]uuid.UUID // workspaceID -> templateID
}
// NewBoundaryTelemetryCollector creates a new collector for boundary telemetry.
func NewBoundaryTelemetryCollector(db database.Store, logger slog.Logger) *BoundaryTelemetryCollector {
return &BoundaryTelemetryCollector{
db: db,
logger: logger,
activeUsers: make(map[uuid.UUID]struct{}),
activeWorkspaces: make(map[uuid.UUID]uuid.UUID),
}
}
// RecordBoundaryUsage records that a user/workspace used the boundary feature.
func (c *BoundaryTelemetryCollector) RecordBoundaryUsage(userID, workspaceID, templateID uuid.UUID) {
c.mu.Lock()
defer c.mu.Unlock()
c.activeUsers[userID] = struct{}{}
c.activeWorkspaces[workspaceID] = templateID
}
// Flush writes the collected data to the database and clears the in-memory state.
// This should be called on the same interval as the telemetry snapshot.
func (c *BoundaryTelemetryCollector) Flush(ctx context.Context) error {
c.mu.Lock()
users := c.activeUsers
workspaces := c.activeWorkspaces
c.activeUsers = make(map[uuid.UUID]struct{})
c.activeWorkspaces = make(map[uuid.UUID]uuid.UUID)
c.mu.Unlock()
now := dbtime.Now()
for userID := range users {
err := c.db.InsertBoundaryActiveUser(ctx, database.InsertBoundaryActiveUserParams{
UserID: userID,
RecordedAt: now,
})
if err != nil {
c.logger.Error(ctx, "failed to insert boundary active user",
slog.F("user_id", userID),
slog.Error(err))
}
}
for workspaceID, templateID := range workspaces {
err := c.db.InsertBoundaryActiveWorkspace(ctx, database.InsertBoundaryActiveWorkspaceParams{
WorkspaceID: workspaceID,
TemplateID: templateID,
RecordedAt: now,
})
if err != nil {
c.logger.Error(ctx, "failed to insert boundary active workspace",
slog.F("workspace_id", workspaceID),
slog.F("template_id", templateID),
slog.Error(err))
}
}
return nil
}
// Cleanup removes old boundary telemetry data from the database.
func (c *BoundaryTelemetryCollector) Cleanup(ctx context.Context, before time.Time) error {
err := c.db.DeleteBoundaryActiveUsersBefore(ctx, before)
if err != nil {
return err
}
return c.db.DeleteBoundaryActiveWorkspacesBefore(ctx, before)
}
+51 -2
View File
@@ -58,8 +58,9 @@ type Options struct {
BuiltinPostgres bool
Tunnel bool
SnapshotFrequency time.Duration
ParseLicenseJWT func(lic *License) error
SnapshotFrequency time.Duration
ParseLicenseJWT func(lic *License) error
BoundaryTelemetryCollector *BoundaryTelemetryCollector
}
// New constructs a reporter for telemetry data.
@@ -291,6 +292,14 @@ func (r *remoteReporter) reportWithDeployment() {
r.options.Logger.Debug(r.ctx, "update deployment", slog.Error(err))
return
}
// Flush boundary telemetry data to the database before creating the snapshot.
if r.options.BoundaryTelemetryCollector != nil {
if err := r.options.BoundaryTelemetryCollector.Flush(r.ctx); err != nil {
r.options.Logger.Error(r.ctx, "failed to flush boundary telemetry", slog.Error(err))
}
}
snapshot, err := r.createSnapshot()
if errors.Is(err, context.Canceled) {
return
@@ -759,6 +768,33 @@ func (r *remoteReporter) createSnapshot() (*Snapshot, error) {
snapshot.AIBridgeInterceptionsSummaries = summaries
return nil
})
eg.Go(func() error {
// Query boundary active users from the last snapshot interval.
since := dbtime.Time(r.options.Clock.Now().Add(-r.options.SnapshotFrequency))
users, err := r.options.Database.GetBoundaryActiveUsersSince(ctx, since)
if err != nil {
return xerrors.Errorf("get boundary active users: %w", err)
}
snapshot.BoundaryActiveUsers = make([]BoundaryActiveUser, 0, len(users))
for _, userID := range users {
snapshot.BoundaryActiveUsers = append(snapshot.BoundaryActiveUsers, BoundaryActiveUser{
ID: userID,
})
}
workspaces, err := r.options.Database.GetBoundaryActiveWorkspacesSince(ctx, since)
if err != nil {
return xerrors.Errorf("get boundary active workspaces: %w", err)
}
snapshot.BoundaryActiveWorkspaces = make([]BoundaryActiveWorkspace, 0, len(workspaces))
for _, ws := range workspaces {
snapshot.BoundaryActiveWorkspaces = append(snapshot.BoundaryActiveWorkspaces, BoundaryActiveWorkspace{
ID: ws.WorkspaceID,
TemplateID: ws.TemplateID,
})
}
return nil
})
err := eg.Wait()
if err != nil {
@@ -1309,6 +1345,8 @@ type Snapshot struct {
UserTailnetConnections []UserTailnetConnection `json:"user_tailnet_connections"`
PrebuiltWorkspaces []PrebuiltWorkspace `json:"prebuilt_workspaces"`
AIBridgeInterceptionsSummaries []AIBridgeInterceptionsSummary `json:"aibridge_interceptions_summaries"`
BoundaryActiveUsers []BoundaryActiveUser `json:"boundary_active_users"`
BoundaryActiveWorkspaces []BoundaryActiveWorkspace `json:"boundary_active_workspaces"`
}
// Deployment contains information about the host running Coder.
@@ -1995,6 +2033,17 @@ type AIBridgeInterceptionsSummary struct {
InjectedToolCallErrorCount int64 `json:"injected_tool_call_error_count"`
}
// BoundaryActiveUser represents a user who has used the boundary feature.
type BoundaryActiveUser struct {
ID uuid.UUID `json:"id"`
}
// BoundaryActiveWorkspace represents a workspace that has used the boundary feature.
type BoundaryActiveWorkspace struct {
ID uuid.UUID `json:"id"`
TemplateID uuid.UUID `json:"template_id"`
}
func ConvertAIBridgeInterceptionsSummary(endTime time.Time, provider, model, client string, summary database.CalculateAIBridgeInterceptionsTelemetrySummaryRow) AIBridgeInterceptionsSummary {
return AIBridgeInterceptionsSummary{
ID: uuid.New(),
+2
View File
@@ -130,6 +130,7 @@ func (api *API) workspaceAgentRPC(rw http.ResponseWriter, r *http.Request) {
AgentID: workspaceAgent.ID,
OwnerID: workspace.OwnerID,
WorkspaceID: workspace.ID,
TemplateID: workspace.TemplateID,
OrganizationID: workspace.OrganizationID,
AuthenticatedCtx: ctx,
@@ -146,6 +147,7 @@ func (api *API) workspaceAgentRPC(rw http.ResponseWriter, r *http.Request) {
PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate,
PublishWorkspaceAgentLogsUpdateFn: api.publishWorkspaceAgentLogsUpdate,
NetworkTelemetryHandler: api.NetworkTelemetryBatcher.Handler,
BoundaryTelemetryCollector: api.BoundaryTelemetryCollector,
AccessURL: api.AccessURL,
AppHostname: api.AppHostname,