Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 7ed3aa7877 |
+11
-2
@@ -865,6 +865,14 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
|
||||
if err != nil {
|
||||
return xerrors.Errorf("remove secrets from deployment values: %w", err)
|
||||
}
|
||||
|
||||
// Create boundary telemetry collector for tracking boundary feature usage.
|
||||
boundaryTelemetryCollector := telemetry.NewBoundaryTelemetryCollector(
|
||||
options.Database,
|
||||
logger.Named("boundary_telemetry"),
|
||||
)
|
||||
options.BoundaryTelemetryCollector = boundaryTelemetryCollector
|
||||
|
||||
telemetryReporter, err := telemetry.New(telemetry.Options{
|
||||
Disabled: !vals.Telemetry.Enable.Value(),
|
||||
BuiltinPostgres: builtinPostgres,
|
||||
@@ -873,8 +881,9 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
|
||||
Experiments: coderd.ReadExperiments(options.Logger, options.DeploymentValues.Experiments.Value()),
|
||||
Logger: logger.Named("telemetry"),
|
||||
URL: vals.Telemetry.URL.Value(),
|
||||
Tunnel: tunnel != nil,
|
||||
DeploymentConfig: deploymentConfigWithoutSecrets,
|
||||
Tunnel: tunnel != nil,
|
||||
DeploymentConfig: deploymentConfigWithoutSecrets,
|
||||
BoundaryTelemetryCollector: boundaryTelemetryCollector,
|
||||
ParseLicenseJWT: func(lic *telemetry.License) error {
|
||||
// This will be nil when running in AGPL-only mode.
|
||||
if options.ParseLicenseClaims == nil {
|
||||
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
"github.com/coder/coder/v2/coderd/externalauth"
|
||||
"github.com/coder/coder/v2/coderd/notifications"
|
||||
"github.com/coder/coder/v2/coderd/prometheusmetrics"
|
||||
"github.com/coder/coder/v2/coderd/telemetry"
|
||||
"github.com/coder/coder/v2/coderd/tracing"
|
||||
"github.com/coder/coder/v2/coderd/workspacestats"
|
||||
"github.com/coder/coder/v2/coderd/wspubsub"
|
||||
@@ -68,6 +69,7 @@ type Options struct {
|
||||
AgentID uuid.UUID
|
||||
OwnerID uuid.UUID
|
||||
WorkspaceID uuid.UUID
|
||||
TemplateID uuid.UUID
|
||||
OrganizationID uuid.UUID
|
||||
|
||||
AuthenticatedCtx context.Context
|
||||
@@ -84,6 +86,7 @@ type Options struct {
|
||||
PublishWorkspaceUpdateFn func(ctx context.Context, userID uuid.UUID, event wspubsub.WorkspaceEvent)
|
||||
PublishWorkspaceAgentLogsUpdateFn func(ctx context.Context, workspaceAgentID uuid.UUID, msg agentsdk.LogsNotifyMessage)
|
||||
NetworkTelemetryHandler func(batch []*tailnetproto.TelemetryEvent)
|
||||
BoundaryTelemetryCollector *telemetry.BoundaryTelemetryCollector
|
||||
|
||||
AccessURL *url.URL
|
||||
AppHostname string
|
||||
@@ -221,8 +224,11 @@ func New(opts Options, workspace database.Workspace) *API {
|
||||
}
|
||||
|
||||
api.BoundaryLogsAPI = &BoundaryLogsAPI{
|
||||
Log: opts.Log,
|
||||
WorkspaceID: opts.WorkspaceID,
|
||||
Log: opts.Log,
|
||||
OwnerID: opts.OwnerID,
|
||||
WorkspaceID: opts.WorkspaceID,
|
||||
TemplateID: opts.TemplateID,
|
||||
BoundaryTelemetryCollector: opts.BoundaryTelemetryCollector,
|
||||
}
|
||||
|
||||
// Start background cache refresh loop to handle workspace changes
|
||||
|
||||
@@ -8,14 +8,23 @@ import (
|
||||
|
||||
"cdr.dev/slog/v3"
|
||||
agentproto "github.com/coder/coder/v2/agent/proto"
|
||||
"github.com/coder/coder/v2/coderd/telemetry"
|
||||
)
|
||||
|
||||
type BoundaryLogsAPI struct {
|
||||
Log slog.Logger
|
||||
WorkspaceID uuid.UUID
|
||||
Log slog.Logger
|
||||
OwnerID uuid.UUID
|
||||
WorkspaceID uuid.UUID
|
||||
TemplateID uuid.UUID
|
||||
BoundaryTelemetryCollector *telemetry.BoundaryTelemetryCollector
|
||||
}
|
||||
|
||||
func (a *BoundaryLogsAPI) ReportBoundaryLogs(ctx context.Context, req *agentproto.ReportBoundaryLogsRequest) (*agentproto.ReportBoundaryLogsResponse, error) {
|
||||
// Record boundary usage for telemetry if we have any logs.
|
||||
if len(req.Logs) > 0 && a.BoundaryTelemetryCollector != nil {
|
||||
a.BoundaryTelemetryCollector.RecordBoundaryUsage(a.OwnerID, a.WorkspaceID, a.TemplateID)
|
||||
}
|
||||
|
||||
for _, l := range req.Logs {
|
||||
var logTime time.Time
|
||||
if l.Time != nil {
|
||||
|
||||
@@ -197,6 +197,7 @@ type Options struct {
|
||||
DERPMapUpdateFrequency time.Duration
|
||||
NetworkTelemetryBatchFrequency time.Duration
|
||||
NetworkTelemetryBatchMaxSize int
|
||||
BoundaryTelemetryCollector *telemetry.BoundaryTelemetryCollector
|
||||
SwaggerEndpoint bool
|
||||
TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
|
||||
UserQuietHoursScheduleStore *atomic.Pointer[schedule.UserQuietHoursScheduleStore]
|
||||
|
||||
@@ -1663,6 +1663,20 @@ func (q *querier) DeleteApplicationConnectAPIKeysByUserID(ctx context.Context, u
|
||||
return q.db.DeleteApplicationConnectAPIKeysByUserID(ctx, userID)
|
||||
}
|
||||
|
||||
func (q *querier) DeleteBoundaryActiveUsersBefore(ctx context.Context, recordedAt time.Time) error {
|
||||
if err := q.authorizeContext(ctx, policy.ActionDelete, rbac.ResourceSystem); err != nil {
|
||||
return err
|
||||
}
|
||||
return q.db.DeleteBoundaryActiveUsersBefore(ctx, recordedAt)
|
||||
}
|
||||
|
||||
func (q *querier) DeleteBoundaryActiveWorkspacesBefore(ctx context.Context, recordedAt time.Time) error {
|
||||
if err := q.authorizeContext(ctx, policy.ActionDelete, rbac.ResourceSystem); err != nil {
|
||||
return err
|
||||
}
|
||||
return q.db.DeleteBoundaryActiveWorkspacesBefore(ctx, recordedAt)
|
||||
}
|
||||
|
||||
func (q *querier) DeleteCoordinator(ctx context.Context, id uuid.UUID) error {
|
||||
if err := q.authorizeContext(ctx, policy.ActionDelete, rbac.ResourceTailnetCoordinator); err != nil {
|
||||
return err
|
||||
@@ -2251,6 +2265,20 @@ func (q *querier) GetAuthorizationUserRoles(ctx context.Context, userID uuid.UUI
|
||||
return q.db.GetAuthorizationUserRoles(ctx, userID)
|
||||
}
|
||||
|
||||
func (q *querier) GetBoundaryActiveUsersSince(ctx context.Context, recordedAt time.Time) ([]uuid.UUID, error) {
|
||||
if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceSystem); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return q.db.GetBoundaryActiveUsersSince(ctx, recordedAt)
|
||||
}
|
||||
|
||||
func (q *querier) GetBoundaryActiveWorkspacesSince(ctx context.Context, recordedAt time.Time) ([]database.GetBoundaryActiveWorkspacesSinceRow, error) {
|
||||
if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceSystem); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return q.db.GetBoundaryActiveWorkspacesSince(ctx, recordedAt)
|
||||
}
|
||||
|
||||
func (q *querier) GetConnectionLogsOffset(ctx context.Context, arg database.GetConnectionLogsOffsetParams) ([]database.GetConnectionLogsOffsetRow, error) {
|
||||
// Just like with the audit logs query, shortcut if the user is an owner.
|
||||
err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceConnectionLog)
|
||||
@@ -4153,6 +4181,20 @@ func (q *querier) InsertAuditLog(ctx context.Context, arg database.InsertAuditLo
|
||||
return insert(q.log, q.auth, rbac.ResourceAuditLog, q.db.InsertAuditLog)(ctx, arg)
|
||||
}
|
||||
|
||||
func (q *querier) InsertBoundaryActiveUser(ctx context.Context, arg database.InsertBoundaryActiveUserParams) error {
|
||||
if err := q.authorizeContext(ctx, policy.ActionCreate, rbac.ResourceSystem); err != nil {
|
||||
return err
|
||||
}
|
||||
return q.db.InsertBoundaryActiveUser(ctx, arg)
|
||||
}
|
||||
|
||||
func (q *querier) InsertBoundaryActiveWorkspace(ctx context.Context, arg database.InsertBoundaryActiveWorkspaceParams) error {
|
||||
if err := q.authorizeContext(ctx, policy.ActionCreate, rbac.ResourceSystem); err != nil {
|
||||
return err
|
||||
}
|
||||
return q.db.InsertBoundaryActiveWorkspace(ctx, arg)
|
||||
}
|
||||
|
||||
func (q *querier) InsertCryptoKey(ctx context.Context, arg database.InsertCryptoKeyParams) (database.CryptoKey, error) {
|
||||
if err := q.authorizeContext(ctx, policy.ActionCreate, rbac.ResourceCryptoKey); err != nil {
|
||||
return database.CryptoKey{}, err
|
||||
|
||||
@@ -336,6 +336,22 @@ func (m queryMetricsStore) DeleteApplicationConnectAPIKeysByUserID(ctx context.C
|
||||
return r0
|
||||
}
|
||||
|
||||
func (m queryMetricsStore) DeleteBoundaryActiveUsersBefore(ctx context.Context, recordedAt time.Time) error {
|
||||
start := time.Now()
|
||||
r0 := m.s.DeleteBoundaryActiveUsersBefore(ctx, recordedAt)
|
||||
m.queryLatencies.WithLabelValues("DeleteBoundaryActiveUsersBefore").Observe(time.Since(start).Seconds())
|
||||
m.queryCounts.WithLabelValues(httpmw.ExtractHTTPRoute(ctx), httpmw.ExtractHTTPMethod(ctx), "DeleteBoundaryActiveUsersBefore").Inc()
|
||||
return r0
|
||||
}
|
||||
|
||||
func (m queryMetricsStore) DeleteBoundaryActiveWorkspacesBefore(ctx context.Context, recordedAt time.Time) error {
|
||||
start := time.Now()
|
||||
r0 := m.s.DeleteBoundaryActiveWorkspacesBefore(ctx, recordedAt)
|
||||
m.queryLatencies.WithLabelValues("DeleteBoundaryActiveWorkspacesBefore").Observe(time.Since(start).Seconds())
|
||||
m.queryCounts.WithLabelValues(httpmw.ExtractHTTPRoute(ctx), httpmw.ExtractHTTPMethod(ctx), "DeleteBoundaryActiveWorkspacesBefore").Inc()
|
||||
return r0
|
||||
}
|
||||
|
||||
func (m queryMetricsStore) DeleteCoordinator(ctx context.Context, id uuid.UUID) error {
|
||||
start := time.Now()
|
||||
r0 := m.s.DeleteCoordinator(ctx, id)
|
||||
@@ -927,6 +943,22 @@ func (m queryMetricsStore) GetAuthorizationUserRoles(ctx context.Context, userID
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
func (m queryMetricsStore) GetBoundaryActiveUsersSince(ctx context.Context, recordedAt time.Time) ([]uuid.UUID, error) {
|
||||
start := time.Now()
|
||||
r0, r1 := m.s.GetBoundaryActiveUsersSince(ctx, recordedAt)
|
||||
m.queryLatencies.WithLabelValues("GetBoundaryActiveUsersSince").Observe(time.Since(start).Seconds())
|
||||
m.queryCounts.WithLabelValues(httpmw.ExtractHTTPRoute(ctx), httpmw.ExtractHTTPMethod(ctx), "GetBoundaryActiveUsersSince").Inc()
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
func (m queryMetricsStore) GetBoundaryActiveWorkspacesSince(ctx context.Context, recordedAt time.Time) ([]database.GetBoundaryActiveWorkspacesSinceRow, error) {
|
||||
start := time.Now()
|
||||
r0, r1 := m.s.GetBoundaryActiveWorkspacesSince(ctx, recordedAt)
|
||||
m.queryLatencies.WithLabelValues("GetBoundaryActiveWorkspacesSince").Observe(time.Since(start).Seconds())
|
||||
m.queryCounts.WithLabelValues(httpmw.ExtractHTTPRoute(ctx), httpmw.ExtractHTTPMethod(ctx), "GetBoundaryActiveWorkspacesSince").Inc()
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
func (m queryMetricsStore) GetConnectionLogsOffset(ctx context.Context, arg database.GetConnectionLogsOffsetParams) ([]database.GetConnectionLogsOffsetRow, error) {
|
||||
start := time.Now()
|
||||
r0, r1 := m.s.GetConnectionLogsOffset(ctx, arg)
|
||||
@@ -2727,6 +2759,22 @@ func (m queryMetricsStore) InsertAuditLog(ctx context.Context, arg database.Inse
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
func (m queryMetricsStore) InsertBoundaryActiveUser(ctx context.Context, arg database.InsertBoundaryActiveUserParams) error {
|
||||
start := time.Now()
|
||||
r0 := m.s.InsertBoundaryActiveUser(ctx, arg)
|
||||
m.queryLatencies.WithLabelValues("InsertBoundaryActiveUser").Observe(time.Since(start).Seconds())
|
||||
m.queryCounts.WithLabelValues(httpmw.ExtractHTTPRoute(ctx), httpmw.ExtractHTTPMethod(ctx), "InsertBoundaryActiveUser").Inc()
|
||||
return r0
|
||||
}
|
||||
|
||||
func (m queryMetricsStore) InsertBoundaryActiveWorkspace(ctx context.Context, arg database.InsertBoundaryActiveWorkspaceParams) error {
|
||||
start := time.Now()
|
||||
r0 := m.s.InsertBoundaryActiveWorkspace(ctx, arg)
|
||||
m.queryLatencies.WithLabelValues("InsertBoundaryActiveWorkspace").Observe(time.Since(start).Seconds())
|
||||
m.queryCounts.WithLabelValues(httpmw.ExtractHTTPRoute(ctx), httpmw.ExtractHTTPMethod(ctx), "InsertBoundaryActiveWorkspace").Inc()
|
||||
return r0
|
||||
}
|
||||
|
||||
func (m queryMetricsStore) InsertCryptoKey(ctx context.Context, arg database.InsertCryptoKeyParams) (database.CryptoKey, error) {
|
||||
start := time.Now()
|
||||
r0, r1 := m.s.InsertCryptoKey(ctx, arg)
|
||||
|
||||
@@ -34,6 +34,9 @@ const (
|
||||
// long enough to cover the maximum interval of a heartbeat event (currently
|
||||
// 1 hour) plus some buffer.
|
||||
maxTelemetryHeartbeatAge = 24 * time.Hour
|
||||
// Boundary telemetry data only needs to persist for 24 hours since snapshots
|
||||
// are taken every 30 minutes.
|
||||
maxBoundaryTelemetryAge = 24 * time.Hour
|
||||
)
|
||||
|
||||
// New creates a new periodically purging database instance.
|
||||
@@ -127,6 +130,14 @@ func New(ctx context.Context, logger slog.Logger, db database.Store, vals *coder
|
||||
return xerrors.Errorf("failed to delete old telemetry locks: %w", err)
|
||||
}
|
||||
|
||||
deleteOldBoundaryTelemetryBefore := start.Add(-maxBoundaryTelemetryAge)
|
||||
if err := tx.DeleteBoundaryActiveUsersBefore(ctx, deleteOldBoundaryTelemetryBefore); err != nil {
|
||||
return xerrors.Errorf("failed to delete old boundary active users: %w", err)
|
||||
}
|
||||
if err := tx.DeleteBoundaryActiveWorkspacesBefore(ctx, deleteOldBoundaryTelemetryBefore); err != nil {
|
||||
return xerrors.Errorf("failed to delete old boundary active workspaces: %w", err)
|
||||
}
|
||||
|
||||
deleteOldAuditLogConnectionEventsBefore := start.Add(-maxAuditLogConnectionEventAge)
|
||||
if err := tx.DeleteOldAuditLogConnectionEvents(ctx, database.DeleteOldAuditLogConnectionEventsParams{
|
||||
BeforeTime: deleteOldAuditLogConnectionEventsBefore,
|
||||
|
||||
Generated
+27
@@ -1182,6 +1182,19 @@ CREATE TABLE audit_logs (
|
||||
resource_icon text NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE boundary_active_users (
|
||||
id uuid DEFAULT gen_random_uuid() NOT NULL,
|
||||
user_id uuid NOT NULL,
|
||||
recorded_at timestamp with time zone DEFAULT now() NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE boundary_active_workspaces (
|
||||
id uuid DEFAULT gen_random_uuid() NOT NULL,
|
||||
workspace_id uuid NOT NULL,
|
||||
template_id uuid NOT NULL,
|
||||
recorded_at timestamp with time zone DEFAULT now() NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE connection_logs (
|
||||
id uuid NOT NULL,
|
||||
connect_time timestamp with time zone NOT NULL,
|
||||
@@ -3019,6 +3032,12 @@ ALTER TABLE ONLY api_keys
|
||||
ALTER TABLE ONLY audit_logs
|
||||
ADD CONSTRAINT audit_logs_pkey PRIMARY KEY (id);
|
||||
|
||||
ALTER TABLE ONLY boundary_active_users
|
||||
ADD CONSTRAINT boundary_active_users_pkey PRIMARY KEY (id);
|
||||
|
||||
ALTER TABLE ONLY boundary_active_workspaces
|
||||
ADD CONSTRAINT boundary_active_workspaces_pkey PRIMARY KEY (id);
|
||||
|
||||
ALTER TABLE ONLY connection_logs
|
||||
ADD CONSTRAINT connection_logs_pkey PRIMARY KEY (id);
|
||||
|
||||
@@ -3350,6 +3369,14 @@ CREATE INDEX idx_audit_log_user_id ON audit_logs USING btree (user_id);
|
||||
|
||||
CREATE INDEX idx_audit_logs_time_desc ON audit_logs USING btree ("time" DESC);
|
||||
|
||||
CREATE INDEX idx_boundary_active_users_recorded_at ON boundary_active_users USING btree (recorded_at);
|
||||
|
||||
CREATE INDEX idx_boundary_active_users_user_id ON boundary_active_users USING btree (user_id);
|
||||
|
||||
CREATE INDEX idx_boundary_active_workspaces_recorded_at ON boundary_active_workspaces USING btree (recorded_at);
|
||||
|
||||
CREATE INDEX idx_boundary_active_workspaces_workspace_id ON boundary_active_workspaces USING btree (workspace_id);
|
||||
|
||||
CREATE INDEX idx_connection_logs_connect_time_desc ON connection_logs USING btree (connect_time DESC);
|
||||
|
||||
CREATE UNIQUE INDEX idx_connection_logs_connection_id_workspace_id_agent_name ON connection_logs USING btree (connection_id, workspace_id, agent_name);
|
||||
|
||||
@@ -0,0 +1,2 @@
|
||||
DROP TABLE IF EXISTS boundary_active_workspaces;
|
||||
DROP TABLE IF EXISTS boundary_active_users;
|
||||
@@ -0,0 +1,21 @@
|
||||
-- Tables to track boundary feature usage for telemetry reporting.
|
||||
-- Data is collected from boundary log streams and inserted periodically by each replica.
|
||||
|
||||
CREATE TABLE boundary_active_users (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
user_id UUID NOT NULL,
|
||||
recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
|
||||
CREATE INDEX idx_boundary_active_users_user_id ON boundary_active_users(user_id);
|
||||
CREATE INDEX idx_boundary_active_users_recorded_at ON boundary_active_users(recorded_at);
|
||||
|
||||
CREATE TABLE boundary_active_workspaces (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
workspace_id UUID NOT NULL,
|
||||
template_id UUID NOT NULL,
|
||||
recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
|
||||
CREATE INDEX idx_boundary_active_workspaces_workspace_id ON boundary_active_workspaces(workspace_id);
|
||||
CREATE INDEX idx_boundary_active_workspaces_recorded_at ON boundary_active_workspaces(recorded_at);
|
||||
@@ -3693,6 +3693,19 @@ type AuditLog struct {
|
||||
ResourceIcon string `db:"resource_icon" json:"resource_icon"`
|
||||
}
|
||||
|
||||
type BoundaryActiveUser struct {
|
||||
ID uuid.UUID `db:"id" json:"id"`
|
||||
UserID uuid.UUID `db:"user_id" json:"user_id"`
|
||||
RecordedAt time.Time `db:"recorded_at" json:"recorded_at"`
|
||||
}
|
||||
|
||||
type BoundaryActiveWorkspace struct {
|
||||
ID uuid.UUID `db:"id" json:"id"`
|
||||
WorkspaceID uuid.UUID `db:"workspace_id" json:"workspace_id"`
|
||||
TemplateID uuid.UUID `db:"template_id" json:"template_id"`
|
||||
RecordedAt time.Time `db:"recorded_at" json:"recorded_at"`
|
||||
}
|
||||
|
||||
type ConnectionLog struct {
|
||||
ID uuid.UUID `db:"id" json:"id"`
|
||||
ConnectTime time.Time `db:"connect_time" json:"connect_time"`
|
||||
|
||||
@@ -88,6 +88,8 @@ type sqlcQuerier interface {
|
||||
// be recreated.
|
||||
DeleteAllWebpushSubscriptions(ctx context.Context) error
|
||||
DeleteApplicationConnectAPIKeysByUserID(ctx context.Context, userID uuid.UUID) error
|
||||
DeleteBoundaryActiveUsersBefore(ctx context.Context, recordedAt time.Time) error
|
||||
DeleteBoundaryActiveWorkspacesBefore(ctx context.Context, recordedAt time.Time) error
|
||||
DeleteCoordinator(ctx context.Context, id uuid.UUID) error
|
||||
DeleteCryptoKey(ctx context.Context, arg DeleteCryptoKeyParams) (CryptoKey, error)
|
||||
DeleteCustomRole(ctx context.Context, arg DeleteCustomRoleParams) error
|
||||
@@ -193,6 +195,8 @@ type sqlcQuerier interface {
|
||||
// This function returns roles for authorization purposes. Implied member roles
|
||||
// are included.
|
||||
GetAuthorizationUserRoles(ctx context.Context, userID uuid.UUID) (GetAuthorizationUserRolesRow, error)
|
||||
GetBoundaryActiveUsersSince(ctx context.Context, recordedAt time.Time) ([]uuid.UUID, error)
|
||||
GetBoundaryActiveWorkspacesSince(ctx context.Context, recordedAt time.Time) ([]GetBoundaryActiveWorkspacesSinceRow, error)
|
||||
GetConnectionLogsOffset(ctx context.Context, arg GetConnectionLogsOffsetParams) ([]GetConnectionLogsOffsetRow, error)
|
||||
GetCoordinatorResumeTokenSigningKey(ctx context.Context) (string, error)
|
||||
GetCryptoKeyByFeatureAndSequence(ctx context.Context, arg GetCryptoKeyByFeatureAndSequenceParams) (CryptoKey, error)
|
||||
@@ -545,6 +549,8 @@ type sqlcQuerier interface {
|
||||
// every member of the org.
|
||||
InsertAllUsersGroup(ctx context.Context, organizationID uuid.UUID) (Group, error)
|
||||
InsertAuditLog(ctx context.Context, arg InsertAuditLogParams) (AuditLog, error)
|
||||
InsertBoundaryActiveUser(ctx context.Context, arg InsertBoundaryActiveUserParams) error
|
||||
InsertBoundaryActiveWorkspace(ctx context.Context, arg InsertBoundaryActiveWorkspaceParams) error
|
||||
InsertCryptoKey(ctx context.Context, arg InsertCryptoKeyParams) (CryptoKey, error)
|
||||
InsertCustomRole(ctx context.Context, arg InsertCustomRoleParams) (CustomRole, error)
|
||||
InsertDBCryptKey(ctx context.Context, arg InsertDBCryptKeyParams) error
|
||||
|
||||
@@ -1980,6 +1980,116 @@ func (q *sqlQuerier) InsertAuditLog(ctx context.Context, arg InsertAuditLogParam
|
||||
return i, err
|
||||
}
|
||||
|
||||
const deleteBoundaryActiveUsersBefore = `-- name: DeleteBoundaryActiveUsersBefore :exec
|
||||
DELETE FROM boundary_active_users WHERE recorded_at < $1
|
||||
`
|
||||
|
||||
func (q *sqlQuerier) DeleteBoundaryActiveUsersBefore(ctx context.Context, recordedAt time.Time) error {
|
||||
_, err := q.db.ExecContext(ctx, deleteBoundaryActiveUsersBefore, recordedAt)
|
||||
return err
|
||||
}
|
||||
|
||||
const deleteBoundaryActiveWorkspacesBefore = `-- name: DeleteBoundaryActiveWorkspacesBefore :exec
|
||||
DELETE FROM boundary_active_workspaces WHERE recorded_at < $1
|
||||
`
|
||||
|
||||
func (q *sqlQuerier) DeleteBoundaryActiveWorkspacesBefore(ctx context.Context, recordedAt time.Time) error {
|
||||
_, err := q.db.ExecContext(ctx, deleteBoundaryActiveWorkspacesBefore, recordedAt)
|
||||
return err
|
||||
}
|
||||
|
||||
const getBoundaryActiveUsersSince = `-- name: GetBoundaryActiveUsersSince :many
|
||||
SELECT DISTINCT user_id FROM boundary_active_users
|
||||
WHERE recorded_at > $1
|
||||
`
|
||||
|
||||
func (q *sqlQuerier) GetBoundaryActiveUsersSince(ctx context.Context, recordedAt time.Time) ([]uuid.UUID, error) {
|
||||
rows, err := q.db.QueryContext(ctx, getBoundaryActiveUsersSince, recordedAt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []uuid.UUID
|
||||
for rows.Next() {
|
||||
var user_id uuid.UUID
|
||||
if err := rows.Scan(&user_id); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, user_id)
|
||||
}
|
||||
if err := rows.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const getBoundaryActiveWorkspacesSince = `-- name: GetBoundaryActiveWorkspacesSince :many
|
||||
SELECT DISTINCT workspace_id, template_id FROM boundary_active_workspaces
|
||||
WHERE recorded_at > $1
|
||||
`
|
||||
|
||||
type GetBoundaryActiveWorkspacesSinceRow struct {
|
||||
WorkspaceID uuid.UUID `db:"workspace_id" json:"workspace_id"`
|
||||
TemplateID uuid.UUID `db:"template_id" json:"template_id"`
|
||||
}
|
||||
|
||||
func (q *sqlQuerier) GetBoundaryActiveWorkspacesSince(ctx context.Context, recordedAt time.Time) ([]GetBoundaryActiveWorkspacesSinceRow, error) {
|
||||
rows, err := q.db.QueryContext(ctx, getBoundaryActiveWorkspacesSince, recordedAt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []GetBoundaryActiveWorkspacesSinceRow
|
||||
for rows.Next() {
|
||||
var i GetBoundaryActiveWorkspacesSinceRow
|
||||
if err := rows.Scan(&i.WorkspaceID, &i.TemplateID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const insertBoundaryActiveUser = `-- name: InsertBoundaryActiveUser :exec
|
||||
INSERT INTO boundary_active_users (user_id, recorded_at)
|
||||
VALUES ($1, $2)
|
||||
`
|
||||
|
||||
type InsertBoundaryActiveUserParams struct {
|
||||
UserID uuid.UUID `db:"user_id" json:"user_id"`
|
||||
RecordedAt time.Time `db:"recorded_at" json:"recorded_at"`
|
||||
}
|
||||
|
||||
func (q *sqlQuerier) InsertBoundaryActiveUser(ctx context.Context, arg InsertBoundaryActiveUserParams) error {
|
||||
_, err := q.db.ExecContext(ctx, insertBoundaryActiveUser, arg.UserID, arg.RecordedAt)
|
||||
return err
|
||||
}
|
||||
|
||||
const insertBoundaryActiveWorkspace = `-- name: InsertBoundaryActiveWorkspace :exec
|
||||
INSERT INTO boundary_active_workspaces (workspace_id, template_id, recorded_at)
|
||||
VALUES ($1, $2, $3)
|
||||
`
|
||||
|
||||
type InsertBoundaryActiveWorkspaceParams struct {
|
||||
WorkspaceID uuid.UUID `db:"workspace_id" json:"workspace_id"`
|
||||
TemplateID uuid.UUID `db:"template_id" json:"template_id"`
|
||||
RecordedAt time.Time `db:"recorded_at" json:"recorded_at"`
|
||||
}
|
||||
|
||||
func (q *sqlQuerier) InsertBoundaryActiveWorkspace(ctx context.Context, arg InsertBoundaryActiveWorkspaceParams) error {
|
||||
_, err := q.db.ExecContext(ctx, insertBoundaryActiveWorkspace, arg.WorkspaceID, arg.TemplateID, arg.RecordedAt)
|
||||
return err
|
||||
}
|
||||
|
||||
const countConnectionLogs = `-- name: CountConnectionLogs :one
|
||||
SELECT
|
||||
COUNT(*) AS count
|
||||
|
||||
@@ -0,0 +1,21 @@
|
||||
-- name: InsertBoundaryActiveUser :exec
|
||||
INSERT INTO boundary_active_users (user_id, recorded_at)
|
||||
VALUES ($1, $2);
|
||||
|
||||
-- name: InsertBoundaryActiveWorkspace :exec
|
||||
INSERT INTO boundary_active_workspaces (workspace_id, template_id, recorded_at)
|
||||
VALUES ($1, $2, $3);
|
||||
|
||||
-- name: GetBoundaryActiveUsersSince :many
|
||||
SELECT DISTINCT user_id FROM boundary_active_users
|
||||
WHERE recorded_at > $1;
|
||||
|
||||
-- name: GetBoundaryActiveWorkspacesSince :many
|
||||
SELECT DISTINCT workspace_id, template_id FROM boundary_active_workspaces
|
||||
WHERE recorded_at > $1;
|
||||
|
||||
-- name: DeleteBoundaryActiveUsersBefore :exec
|
||||
DELETE FROM boundary_active_users WHERE recorded_at < $1;
|
||||
|
||||
-- name: DeleteBoundaryActiveWorkspacesBefore :exec
|
||||
DELETE FROM boundary_active_workspaces WHERE recorded_at < $1;
|
||||
@@ -13,6 +13,8 @@ const (
|
||||
UniqueAibridgeUserPromptsPkey UniqueConstraint = "aibridge_user_prompts_pkey" // ALTER TABLE ONLY aibridge_user_prompts ADD CONSTRAINT aibridge_user_prompts_pkey PRIMARY KEY (id);
|
||||
UniqueAPIKeysPkey UniqueConstraint = "api_keys_pkey" // ALTER TABLE ONLY api_keys ADD CONSTRAINT api_keys_pkey PRIMARY KEY (id);
|
||||
UniqueAuditLogsPkey UniqueConstraint = "audit_logs_pkey" // ALTER TABLE ONLY audit_logs ADD CONSTRAINT audit_logs_pkey PRIMARY KEY (id);
|
||||
UniqueBoundaryActiveUsersPkey UniqueConstraint = "boundary_active_users_pkey" // ALTER TABLE ONLY boundary_active_users ADD CONSTRAINT boundary_active_users_pkey PRIMARY KEY (id);
|
||||
UniqueBoundaryActiveWorkspacesPkey UniqueConstraint = "boundary_active_workspaces_pkey" // ALTER TABLE ONLY boundary_active_workspaces ADD CONSTRAINT boundary_active_workspaces_pkey PRIMARY KEY (id);
|
||||
UniqueConnectionLogsPkey UniqueConstraint = "connection_logs_pkey" // ALTER TABLE ONLY connection_logs ADD CONSTRAINT connection_logs_pkey PRIMARY KEY (id);
|
||||
UniqueCryptoKeysPkey UniqueConstraint = "crypto_keys_pkey" // ALTER TABLE ONLY crypto_keys ADD CONSTRAINT crypto_keys_pkey PRIMARY KEY (feature, sequence);
|
||||
UniqueCustomRolesUniqueKey UniqueConstraint = "custom_roles_unique_key" // ALTER TABLE ONLY custom_roles ADD CONSTRAINT custom_roles_unique_key UNIQUE (name, organization_id);
|
||||
|
||||
@@ -0,0 +1,93 @@
|
||||
package telemetry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
|
||||
"cdr.dev/slog/v3"
|
||||
"github.com/coder/coder/v2/coderd/database"
|
||||
"github.com/coder/coder/v2/coderd/database/dbtime"
|
||||
)
|
||||
|
||||
// BoundaryTelemetryCollector collects boundary feature usage data in memory
|
||||
// and periodically flushes it to the database for telemetry reporting.
|
||||
type BoundaryTelemetryCollector struct {
|
||||
db database.Store
|
||||
logger slog.Logger
|
||||
|
||||
mu sync.Mutex
|
||||
activeUsers map[uuid.UUID]struct{}
|
||||
activeWorkspaces map[uuid.UUID]uuid.UUID // workspaceID -> templateID
|
||||
}
|
||||
|
||||
// NewBoundaryTelemetryCollector creates a new collector for boundary telemetry.
|
||||
func NewBoundaryTelemetryCollector(db database.Store, logger slog.Logger) *BoundaryTelemetryCollector {
|
||||
return &BoundaryTelemetryCollector{
|
||||
db: db,
|
||||
logger: logger,
|
||||
activeUsers: make(map[uuid.UUID]struct{}),
|
||||
activeWorkspaces: make(map[uuid.UUID]uuid.UUID),
|
||||
}
|
||||
}
|
||||
|
||||
// RecordBoundaryUsage records that a user/workspace used the boundary feature.
|
||||
func (c *BoundaryTelemetryCollector) RecordBoundaryUsage(userID, workspaceID, templateID uuid.UUID) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
c.activeUsers[userID] = struct{}{}
|
||||
c.activeWorkspaces[workspaceID] = templateID
|
||||
}
|
||||
|
||||
// Flush writes the collected data to the database and clears the in-memory state.
|
||||
// This should be called on the same interval as the telemetry snapshot.
|
||||
func (c *BoundaryTelemetryCollector) Flush(ctx context.Context) error {
|
||||
c.mu.Lock()
|
||||
users := c.activeUsers
|
||||
workspaces := c.activeWorkspaces
|
||||
c.activeUsers = make(map[uuid.UUID]struct{})
|
||||
c.activeWorkspaces = make(map[uuid.UUID]uuid.UUID)
|
||||
c.mu.Unlock()
|
||||
|
||||
now := dbtime.Now()
|
||||
|
||||
for userID := range users {
|
||||
err := c.db.InsertBoundaryActiveUser(ctx, database.InsertBoundaryActiveUserParams{
|
||||
UserID: userID,
|
||||
RecordedAt: now,
|
||||
})
|
||||
if err != nil {
|
||||
c.logger.Error(ctx, "failed to insert boundary active user",
|
||||
slog.F("user_id", userID),
|
||||
slog.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
for workspaceID, templateID := range workspaces {
|
||||
err := c.db.InsertBoundaryActiveWorkspace(ctx, database.InsertBoundaryActiveWorkspaceParams{
|
||||
WorkspaceID: workspaceID,
|
||||
TemplateID: templateID,
|
||||
RecordedAt: now,
|
||||
})
|
||||
if err != nil {
|
||||
c.logger.Error(ctx, "failed to insert boundary active workspace",
|
||||
slog.F("workspace_id", workspaceID),
|
||||
slog.F("template_id", templateID),
|
||||
slog.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Cleanup removes old boundary telemetry data from the database.
|
||||
func (c *BoundaryTelemetryCollector) Cleanup(ctx context.Context, before time.Time) error {
|
||||
err := c.db.DeleteBoundaryActiveUsersBefore(ctx, before)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return c.db.DeleteBoundaryActiveWorkspacesBefore(ctx, before)
|
||||
}
|
||||
@@ -58,8 +58,9 @@ type Options struct {
|
||||
BuiltinPostgres bool
|
||||
Tunnel bool
|
||||
|
||||
SnapshotFrequency time.Duration
|
||||
ParseLicenseJWT func(lic *License) error
|
||||
SnapshotFrequency time.Duration
|
||||
ParseLicenseJWT func(lic *License) error
|
||||
BoundaryTelemetryCollector *BoundaryTelemetryCollector
|
||||
}
|
||||
|
||||
// New constructs a reporter for telemetry data.
|
||||
@@ -291,6 +292,14 @@ func (r *remoteReporter) reportWithDeployment() {
|
||||
r.options.Logger.Debug(r.ctx, "update deployment", slog.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
// Flush boundary telemetry data to the database before creating the snapshot.
|
||||
if r.options.BoundaryTelemetryCollector != nil {
|
||||
if err := r.options.BoundaryTelemetryCollector.Flush(r.ctx); err != nil {
|
||||
r.options.Logger.Error(r.ctx, "failed to flush boundary telemetry", slog.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
snapshot, err := r.createSnapshot()
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return
|
||||
@@ -759,6 +768,33 @@ func (r *remoteReporter) createSnapshot() (*Snapshot, error) {
|
||||
snapshot.AIBridgeInterceptionsSummaries = summaries
|
||||
return nil
|
||||
})
|
||||
eg.Go(func() error {
|
||||
// Query boundary active users from the last snapshot interval.
|
||||
since := dbtime.Time(r.options.Clock.Now().Add(-r.options.SnapshotFrequency))
|
||||
users, err := r.options.Database.GetBoundaryActiveUsersSince(ctx, since)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("get boundary active users: %w", err)
|
||||
}
|
||||
snapshot.BoundaryActiveUsers = make([]BoundaryActiveUser, 0, len(users))
|
||||
for _, userID := range users {
|
||||
snapshot.BoundaryActiveUsers = append(snapshot.BoundaryActiveUsers, BoundaryActiveUser{
|
||||
ID: userID,
|
||||
})
|
||||
}
|
||||
|
||||
workspaces, err := r.options.Database.GetBoundaryActiveWorkspacesSince(ctx, since)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("get boundary active workspaces: %w", err)
|
||||
}
|
||||
snapshot.BoundaryActiveWorkspaces = make([]BoundaryActiveWorkspace, 0, len(workspaces))
|
||||
for _, ws := range workspaces {
|
||||
snapshot.BoundaryActiveWorkspaces = append(snapshot.BoundaryActiveWorkspaces, BoundaryActiveWorkspace{
|
||||
ID: ws.WorkspaceID,
|
||||
TemplateID: ws.TemplateID,
|
||||
})
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
err := eg.Wait()
|
||||
if err != nil {
|
||||
@@ -1309,6 +1345,8 @@ type Snapshot struct {
|
||||
UserTailnetConnections []UserTailnetConnection `json:"user_tailnet_connections"`
|
||||
PrebuiltWorkspaces []PrebuiltWorkspace `json:"prebuilt_workspaces"`
|
||||
AIBridgeInterceptionsSummaries []AIBridgeInterceptionsSummary `json:"aibridge_interceptions_summaries"`
|
||||
BoundaryActiveUsers []BoundaryActiveUser `json:"boundary_active_users"`
|
||||
BoundaryActiveWorkspaces []BoundaryActiveWorkspace `json:"boundary_active_workspaces"`
|
||||
}
|
||||
|
||||
// Deployment contains information about the host running Coder.
|
||||
@@ -1995,6 +2033,17 @@ type AIBridgeInterceptionsSummary struct {
|
||||
InjectedToolCallErrorCount int64 `json:"injected_tool_call_error_count"`
|
||||
}
|
||||
|
||||
// BoundaryActiveUser represents a user who has used the boundary feature.
|
||||
type BoundaryActiveUser struct {
|
||||
ID uuid.UUID `json:"id"`
|
||||
}
|
||||
|
||||
// BoundaryActiveWorkspace represents a workspace that has used the boundary feature.
|
||||
type BoundaryActiveWorkspace struct {
|
||||
ID uuid.UUID `json:"id"`
|
||||
TemplateID uuid.UUID `json:"template_id"`
|
||||
}
|
||||
|
||||
func ConvertAIBridgeInterceptionsSummary(endTime time.Time, provider, model, client string, summary database.CalculateAIBridgeInterceptionsTelemetrySummaryRow) AIBridgeInterceptionsSummary {
|
||||
return AIBridgeInterceptionsSummary{
|
||||
ID: uuid.New(),
|
||||
|
||||
@@ -130,6 +130,7 @@ func (api *API) workspaceAgentRPC(rw http.ResponseWriter, r *http.Request) {
|
||||
AgentID: workspaceAgent.ID,
|
||||
OwnerID: workspace.OwnerID,
|
||||
WorkspaceID: workspace.ID,
|
||||
TemplateID: workspace.TemplateID,
|
||||
OrganizationID: workspace.OrganizationID,
|
||||
|
||||
AuthenticatedCtx: ctx,
|
||||
@@ -146,6 +147,7 @@ func (api *API) workspaceAgentRPC(rw http.ResponseWriter, r *http.Request) {
|
||||
PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate,
|
||||
PublishWorkspaceAgentLogsUpdateFn: api.publishWorkspaceAgentLogsUpdate,
|
||||
NetworkTelemetryHandler: api.NetworkTelemetryBatcher.Handler,
|
||||
BoundaryTelemetryCollector: api.BoundaryTelemetryCollector,
|
||||
|
||||
AccessURL: api.AccessURL,
|
||||
AppHostname: api.AppHostname,
|
||||
|
||||
Reference in New Issue
Block a user