feat: add initial API for boundary log forwarding to coderd (#21293)

Add the AgentAPI changes to support the feature that transmits boundary
logs from workspaces to coderd via the agent API for eventual re-emission to
stderr. The API handlers are stubs for now because I'm trying to land
this feature from multiple smaller PRs.

High level architecture:
- Boundary records resource access in batches and sends proto message to
agent
- Agent proxies messages to coderd **(captured by the API changes in
this PR)**
- coderd re-emits logs to stderr

RFC:
https://www.notion.so/coderhq/Agent-Boundary-Logs-2afd579be59280f29629fc9823ac41ba
This commit is contained in:
Zach
2025-12-19 10:41:39 -07:00
committed by GitHub
parent ea00e72063
commit 9d1493a13a
15 changed files with 761 additions and 301 deletions
+21 -20
View File
@@ -105,8 +105,8 @@ type Options struct {
}
type Client interface {
ConnectRPC26(ctx context.Context) (
proto.DRPCAgentClient26, tailnetproto.DRPCTailnetClient26, error,
ConnectRPC27(ctx context.Context) (
proto.DRPCAgentClient27, tailnetproto.DRPCTailnetClient27, error,
)
tailnet.DERPMapRewriter
agentsdk.RefreshableSessionTokenProvider
@@ -506,7 +506,7 @@ func (t *trySingleflight) Do(key string, fn func()) {
fn()
}
func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient26) error {
func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
tickerDone := make(chan struct{})
collectDone := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
@@ -721,7 +721,7 @@ func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient26
// reportLifecycle reports the current lifecycle state once. All state
// changes are reported in order.
func (a *agent) reportLifecycle(ctx context.Context, aAPI proto.DRPCAgentClient26) error {
func (a *agent) reportLifecycle(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
for {
select {
case <-a.lifecycleUpdate:
@@ -801,7 +801,7 @@ func (a *agent) setLifecycle(state codersdk.WorkspaceAgentLifecycle) {
}
// reportConnectionsLoop reports connections to the agent for auditing.
func (a *agent) reportConnectionsLoop(ctx context.Context, aAPI proto.DRPCAgentClient26) error {
func (a *agent) reportConnectionsLoop(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
for {
select {
case <-a.reportConnectionsUpdate:
@@ -932,7 +932,7 @@ func (a *agent) reportConnection(id uuid.UUID, connectionType proto.Connection_T
// fetchServiceBannerLoop fetches the service banner on an interval. It will
// not be fetched immediately; the expectation is that it is primed elsewhere
// (and must be done before the session actually starts).
func (a *agent) fetchServiceBannerLoop(ctx context.Context, aAPI proto.DRPCAgentClient26) error {
func (a *agent) fetchServiceBannerLoop(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
ticker := time.NewTicker(a.announcementBannersRefreshInterval)
defer ticker.Stop()
for {
@@ -967,7 +967,7 @@ func (a *agent) run() (retErr error) {
}
// ConnectRPC returns the dRPC connection we use for the Agent and Tailnet v2+ APIs
aAPI, tAPI, err := a.client.ConnectRPC26(a.hardCtx)
aAPI, tAPI, err := a.client.ConnectRPC27(a.hardCtx)
if err != nil {
return err
}
@@ -984,7 +984,7 @@ func (a *agent) run() (retErr error) {
connMan := newAPIConnRoutineManager(a.gracefulCtx, a.hardCtx, a.logger, aAPI, tAPI)
connMan.startAgentAPI("init notification banners", gracefulShutdownBehaviorStop,
func(ctx context.Context, aAPI proto.DRPCAgentClient26) error {
func(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
bannersProto, err := aAPI.GetAnnouncementBanners(ctx, &proto.GetAnnouncementBannersRequest{})
if err != nil {
return xerrors.Errorf("fetch service banner: %w", err)
@@ -1001,7 +1001,7 @@ func (a *agent) run() (retErr error) {
// sending logs gets gracefulShutdownBehaviorRemain because we want to send logs generated by
// shutdown scripts.
connMan.startAgentAPI("send logs", gracefulShutdownBehaviorRemain,
func(ctx context.Context, aAPI proto.DRPCAgentClient26) error {
func(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
err := a.logSender.SendLoop(ctx, aAPI)
if xerrors.Is(err, agentsdk.ErrLogLimitExceeded) {
// we don't want this error to tear down the API connection and propagate to the
@@ -1020,7 +1020,7 @@ func (a *agent) run() (retErr error) {
connMan.startAgentAPI("report metadata", gracefulShutdownBehaviorStop, a.reportMetadata)
// resources monitor can cease as soon as we start gracefully shutting down.
connMan.startAgentAPI("resources monitor", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient26) error {
connMan.startAgentAPI("resources monitor", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
logger := a.logger.Named("resources_monitor")
clk := quartz.NewReal()
config, err := aAPI.GetResourcesMonitoringConfiguration(ctx, &proto.GetResourcesMonitoringConfigurationRequest{})
@@ -1067,7 +1067,7 @@ func (a *agent) run() (retErr error) {
connMan.startAgentAPI("handle manifest", gracefulShutdownBehaviorStop, a.handleManifest(manifestOK))
connMan.startAgentAPI("app health reporter", gracefulShutdownBehaviorStop,
func(ctx context.Context, aAPI proto.DRPCAgentClient26) error {
func(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
if err := manifestOK.wait(ctx); err != nil {
return xerrors.Errorf("no manifest: %w", err)
}
@@ -1100,7 +1100,7 @@ func (a *agent) run() (retErr error) {
connMan.startAgentAPI("fetch service banner loop", gracefulShutdownBehaviorStop, a.fetchServiceBannerLoop)
connMan.startAgentAPI("stats report loop", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient26) error {
connMan.startAgentAPI("stats report loop", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
if err := networkOK.wait(ctx); err != nil {
return xerrors.Errorf("no network: %w", err)
}
@@ -1115,8 +1115,8 @@ func (a *agent) run() (retErr error) {
}
// handleManifest returns a function that fetches and processes the manifest
func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context, aAPI proto.DRPCAgentClient26) error {
return func(ctx context.Context, aAPI proto.DRPCAgentClient26) error {
func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
return func(ctx context.Context, aAPI proto.DRPCAgentClient27) error {
var (
sentResult = false
err error
@@ -1279,7 +1279,7 @@ func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context,
func (a *agent) createDevcontainer(
ctx context.Context,
aAPI proto.DRPCAgentClient26,
aAPI proto.DRPCAgentClient27,
dc codersdk.WorkspaceAgentDevcontainer,
script codersdk.WorkspaceAgentScript,
) (err error) {
@@ -1311,8 +1311,8 @@ func (a *agent) createDevcontainer(
// createOrUpdateNetwork waits for the manifest to be set using manifestOK, then creates or updates
// the tailnet using the information in the manifest
func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(context.Context, proto.DRPCAgentClient26) error {
return func(ctx context.Context, aAPI proto.DRPCAgentClient26) (retErr error) {
func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(context.Context, proto.DRPCAgentClient27) error {
return func(ctx context.Context, aAPI proto.DRPCAgentClient27) (retErr error) {
if err := manifestOK.wait(ctx); err != nil {
return xerrors.Errorf("no manifest: %w", err)
}
@@ -1401,6 +1401,7 @@ func (a *agent) updateCommandEnv(current []string) (updated []string, err error)
"CODER_WORKSPACE_NAME": manifest.WorkspaceName,
"CODER_WORKSPACE_AGENT_NAME": manifest.AgentName,
"CODER_WORKSPACE_OWNER_NAME": manifest.OwnerName,
"CODER_WORKSPACE_ID": manifest.WorkspaceID.String(),
// Specific Coder subcommands require the agent token exposed!
"CODER_AGENT_TOKEN": a.client.GetSessionToken(),
@@ -2098,7 +2099,7 @@ const (
type apiConnRoutineManager struct {
logger slog.Logger
aAPI proto.DRPCAgentClient26
aAPI proto.DRPCAgentClient27
tAPI tailnetproto.DRPCTailnetClient24
eg *errgroup.Group
stopCtx context.Context
@@ -2107,7 +2108,7 @@ type apiConnRoutineManager struct {
func newAPIConnRoutineManager(
gracefulCtx, hardCtx context.Context, logger slog.Logger,
aAPI proto.DRPCAgentClient26, tAPI tailnetproto.DRPCTailnetClient24,
aAPI proto.DRPCAgentClient27, tAPI tailnetproto.DRPCTailnetClient24,
) *apiConnRoutineManager {
// routines that remain in operation during graceful shutdown use the remainCtx. They'll still
// exit if the errgroup hits an error, which usually means a problem with the conn.
@@ -2140,7 +2141,7 @@ func newAPIConnRoutineManager(
// but for Tailnet.
func (a *apiConnRoutineManager) startAgentAPI(
name string, behavior gracefulShutdownBehavior,
f func(context.Context, proto.DRPCAgentClient26) error,
f func(context.Context, proto.DRPCAgentClient27) error,
) {
logger := a.logger.With(slog.F("name", name))
var ctx context.Context
+2 -2
View File
@@ -147,12 +147,12 @@ type SubAgentClient interface {
// agent API client.
type subAgentAPIClient struct {
logger slog.Logger
api agentproto.DRPCAgentClient26
api agentproto.DRPCAgentClient27
}
var _ SubAgentClient = (*subAgentAPIClient)(nil)
func NewSubAgentClientFromAPI(logger slog.Logger, agentAPI agentproto.DRPCAgentClient26) SubAgentClient {
func NewSubAgentClientFromAPI(logger slog.Logger, agentAPI agentproto.DRPCAgentClient27) SubAgentClient {
if agentAPI == nil {
panic("developer error: agentAPI cannot be nil")
}
+2 -2
View File
@@ -81,7 +81,7 @@ func TestSubAgentClient_CreateWithDisplayApps(t *testing.T) {
agentAPI := agenttest.NewClient(t, logger, uuid.New(), agentsdk.Manifest{}, statsCh, tailnet.NewCoordinator(logger))
agentClient, _, err := agentAPI.ConnectRPC26(ctx)
agentClient, _, err := agentAPI.ConnectRPC27(ctx)
require.NoError(t, err)
subAgentClient := agentcontainers.NewSubAgentClientFromAPI(logger, agentClient)
@@ -245,7 +245,7 @@ func TestSubAgentClient_CreateWithDisplayApps(t *testing.T) {
agentAPI := agenttest.NewClient(t, logger, uuid.New(), agentsdk.Manifest{}, statsCh, tailnet.NewCoordinator(logger))
agentClient, _, err := agentAPI.ConnectRPC26(ctx)
agentClient, _, err := agentAPI.ConnectRPC27(ctx)
require.NoError(t, err)
subAgentClient := agentcontainers.NewSubAgentClientFromAPI(logger, agentClient)
+6 -2
View File
@@ -124,8 +124,8 @@ func (c *Client) Close() {
c.derpMapOnce.Do(func() { close(c.derpMapUpdates) })
}
func (c *Client) ConnectRPC26(ctx context.Context) (
agentproto.DRPCAgentClient26, proto.DRPCTailnetClient26, error,
func (c *Client) ConnectRPC27(ctx context.Context) (
agentproto.DRPCAgentClient27, proto.DRPCTailnetClient27, error,
) {
conn, lis := drpcsdk.MemTransportPipe()
c.LastWorkspaceAgent = func() {
@@ -405,6 +405,10 @@ func (f *FakeAgentAPI) ReportConnection(_ context.Context, req *agentproto.Repor
return &emptypb.Empty{}, nil
}
func (*FakeAgentAPI) ReportBoundaryLogs(_ context.Context, _ *agentproto.ReportBoundaryLogsRequest) (*agentproto.ReportBoundaryLogsResponse, error) {
return &agentproto.ReportBoundaryLogsResponse{}, nil
}
func (f *FakeAgentAPI) GetConnectionReports() []*agentproto.ReportConnectionRequest {
f.Lock()
defer f.Unlock()
+7
View File
@@ -0,0 +1,7 @@
// Package boundarylogproxy provides a Unix socket server that receives boundary
// audit logs and forwards them to coderd via the agent API.
package boundarylogproxy
// Server a placeholder for the server that will listen on a Unix socket for
// boundary logs to be forwarded.
type Server struct{}
+602 -271
View File
File diff suppressed because it is too large Load Diff
+31
View File
@@ -460,6 +460,36 @@ message ListSubAgentsResponse {
repeated SubAgent agents = 1;
}
// BoundaryLog represents a log for a single resource access processed
// by boundary.
message BoundaryLog {
message HttpRequest {
string method = 1;
string url = 2;
// The rule that resulted in this HTTP request not being allowed.
// Only populated when allowed = false.
string matched_rule = 3;
}
// Whether boundary allowed this resource access.
bool allowed = 1;
// The timestamp when boundary processed this resource access.
google.protobuf.Timestamp time = 2;
// The resource being accessed by boundary.
oneof resource {
HttpRequest http_request = 3;
}
}
// ReportBoundaryLogsRequest is a request to re-emit the given BoundaryLogs.
message ReportBoundaryLogsRequest {
repeated BoundaryLog logs = 1;
}
message ReportBoundaryLogsResponse {}
service Agent {
rpc GetManifest(GetManifestRequest) returns (Manifest);
rpc GetServiceBanner(GetServiceBannerRequest) returns (ServiceBanner);
@@ -477,4 +507,5 @@ service Agent {
rpc CreateSubAgent(CreateSubAgentRequest) returns (CreateSubAgentResponse);
rpc DeleteSubAgent(DeleteSubAgentRequest) returns (DeleteSubAgentResponse);
rpc ListSubAgents(ListSubAgentsRequest) returns (ListSubAgentsResponse);
rpc ReportBoundaryLogs(ReportBoundaryLogsRequest) returns (ReportBoundaryLogsResponse);
}
+41 -1
View File
@@ -55,6 +55,7 @@ type DRPCAgentClient interface {
CreateSubAgent(ctx context.Context, in *CreateSubAgentRequest) (*CreateSubAgentResponse, error)
DeleteSubAgent(ctx context.Context, in *DeleteSubAgentRequest) (*DeleteSubAgentResponse, error)
ListSubAgents(ctx context.Context, in *ListSubAgentsRequest) (*ListSubAgentsResponse, error)
ReportBoundaryLogs(ctx context.Context, in *ReportBoundaryLogsRequest) (*ReportBoundaryLogsResponse, error)
}
type drpcAgentClient struct {
@@ -211,6 +212,15 @@ func (c *drpcAgentClient) ListSubAgents(ctx context.Context, in *ListSubAgentsRe
return out, nil
}
func (c *drpcAgentClient) ReportBoundaryLogs(ctx context.Context, in *ReportBoundaryLogsRequest) (*ReportBoundaryLogsResponse, error) {
out := new(ReportBoundaryLogsResponse)
err := c.cc.Invoke(ctx, "/coder.agent.v2.Agent/ReportBoundaryLogs", drpcEncoding_File_agent_proto_agent_proto{}, in, out)
if err != nil {
return nil, err
}
return out, nil
}
type DRPCAgentServer interface {
GetManifest(context.Context, *GetManifestRequest) (*Manifest, error)
GetServiceBanner(context.Context, *GetServiceBannerRequest) (*ServiceBanner, error)
@@ -228,6 +238,7 @@ type DRPCAgentServer interface {
CreateSubAgent(context.Context, *CreateSubAgentRequest) (*CreateSubAgentResponse, error)
DeleteSubAgent(context.Context, *DeleteSubAgentRequest) (*DeleteSubAgentResponse, error)
ListSubAgents(context.Context, *ListSubAgentsRequest) (*ListSubAgentsResponse, error)
ReportBoundaryLogs(context.Context, *ReportBoundaryLogsRequest) (*ReportBoundaryLogsResponse, error)
}
type DRPCAgentUnimplementedServer struct{}
@@ -296,9 +307,13 @@ func (s *DRPCAgentUnimplementedServer) ListSubAgents(context.Context, *ListSubAg
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}
func (s *DRPCAgentUnimplementedServer) ReportBoundaryLogs(context.Context, *ReportBoundaryLogsRequest) (*ReportBoundaryLogsResponse, error) {
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}
type DRPCAgentDescription struct{}
func (DRPCAgentDescription) NumMethods() int { return 16 }
func (DRPCAgentDescription) NumMethods() int { return 17 }
func (DRPCAgentDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, interface{}, bool) {
switch n {
@@ -446,6 +461,15 @@ func (DRPCAgentDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver,
in1.(*ListSubAgentsRequest),
)
}, DRPCAgentServer.ListSubAgents, true
case 16:
return "/coder.agent.v2.Agent/ReportBoundaryLogs", drpcEncoding_File_agent_proto_agent_proto{},
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return srv.(DRPCAgentServer).
ReportBoundaryLogs(
ctx,
in1.(*ReportBoundaryLogsRequest),
)
}, DRPCAgentServer.ReportBoundaryLogs, true
default:
return "", nil, nil, nil, false
}
@@ -710,3 +734,19 @@ func (x *drpcAgent_ListSubAgentsStream) SendAndClose(m *ListSubAgentsResponse) e
}
return x.CloseSend()
}
type DRPCAgent_ReportBoundaryLogsStream interface {
drpc.Stream
SendAndClose(*ReportBoundaryLogsResponse) error
}
type drpcAgent_ReportBoundaryLogsStream struct {
drpc.Stream
}
func (x *drpcAgent_ReportBoundaryLogsStream) SendAndClose(m *ReportBoundaryLogsResponse) error {
if err := x.MsgSend(m, drpcEncoding_File_agent_proto_agent_proto{}); err != nil {
return err
}
return x.CloseSend()
}
+7
View File
@@ -65,3 +65,10 @@ type DRPCAgentClient26 interface {
DeleteSubAgent(ctx context.Context, in *DeleteSubAgentRequest) (*DeleteSubAgentResponse, error)
ListSubAgents(ctx context.Context, in *ListSubAgentsRequest) (*ListSubAgentsResponse, error)
}
// DRPCAgentClient27 is the Agent API at v2.7. It adds the ReportBoundaryLogs
// RPC for forwarding boundary audit logs to coderd. Compatible with Coder v2.30+
type DRPCAgentClient27 interface {
DRPCAgentClient26
ReportBoundaryLogs(ctx context.Context, in *ReportBoundaryLogsRequest) (*ReportBoundaryLogsResponse, error)
}
+3
View File
@@ -54,6 +54,7 @@ type API struct {
*ScriptsAPI
*ConnLogAPI
*SubAgentAPI
*BoundaryLogsAPI
*tailnet.DRPCService
cachedWorkspaceFields *CachedWorkspaceFields
@@ -219,6 +220,8 @@ func New(opts Options, workspace database.Workspace) *API {
Database: opts.Database,
}
api.BoundaryLogsAPI = &BoundaryLogsAPI{}
// Start background cache refresh loop to handle workspace changes
// like prebuild claims where owner_id and other fields may be modified in the DB.
go api.startCacheRefreshLoop(opts.AuthenticatedCtx)
+15
View File
@@ -0,0 +1,15 @@
package agentapi
import (
"context"
"golang.org/x/xerrors"
agentproto "github.com/coder/coder/v2/agent/proto"
)
type BoundaryLogsAPI struct{}
func (*BoundaryLogsAPI) ReportBoundaryLogs(context.Context, *agentproto.ReportBoundaryLogsRequest) (*agentproto.ReportBoundaryLogsResponse, error) {
return nil, xerrors.New("not implemented")
}
+1 -1
View File
@@ -2679,7 +2679,7 @@ func requireGetManifest(ctx context.Context, t testing.TB, aAPI agentproto.DRPCA
}
func postStartup(ctx context.Context, t testing.TB, client agent.Client, startup *agentproto.Startup) error {
aAPI, _, err := client.ConnectRPC26(ctx)
aAPI, _, err := client.ConnectRPC27(ctx)
require.NoError(t, err)
defer func() {
cErr := aAPI.DRPCConn().Close()
+13 -1
View File
@@ -244,7 +244,7 @@ func (c *Client) ConnectRPC25(ctx context.Context) (
return proto.NewDRPCAgentClient(conn), tailnetproto.NewDRPCTailnetClient(conn), nil
}
// ConnectRPC25 returns a dRPC client to the Agent API v2.5. It is useful when you want to be
// ConnectRPC26 returns a dRPC client to the Agent API v2.6. It is useful when you want to be
// maximally compatible with Coderd Release Versions from 2.24+
func (c *Client) ConnectRPC26(ctx context.Context) (
proto.DRPCAgentClient26, tailnetproto.DRPCTailnetClient26, error,
@@ -256,6 +256,18 @@ func (c *Client) ConnectRPC26(ctx context.Context) (
return proto.NewDRPCAgentClient(conn), tailnetproto.NewDRPCTailnetClient(conn), nil
}
// ConnectRPC27 returns a dRPC client to the Agent API v2.7. It is useful when you want to be
// maximally compatible with Coderd Release Versions from 2.30+
func (c *Client) ConnectRPC27(ctx context.Context) (
proto.DRPCAgentClient27, tailnetproto.DRPCTailnetClient27, error,
) {
conn, err := c.connectRPCVersion(ctx, apiversion.New(2, 7))
if err != nil {
return nil, nil, err
}
return proto.NewDRPCAgentClient(conn), tailnetproto.NewDRPCTailnetClient(conn), nil
}
// ConnectRPC connects to the workspace agent API and tailnet API
func (c *Client) ConnectRPC(ctx context.Context) (drpc.Conn, error) {
return c.connectRPCVersion(ctx, proto.CurrentVersion)
+5
View File
@@ -50,3 +50,8 @@ type DRPCTailnetClient25 interface {
type DRPCTailnetClient26 interface {
DRPCTailnetClient25
}
// DRPCTailnetClient26 is the Tailnet API at v2.7.
type DRPCTailnetClient27 interface {
DRPCTailnetClient26
}
+5 -1
View File
@@ -56,9 +56,13 @@ import (
// - Added support for DeleteSubAgent RPC on the Agent API.
// - Added support for ListSubAgents RPC on the Agent API.
// - Add ORGANIZATION SharingLevel
//
// API v2.7:
// - Added support for ReportBoundaryLogs RPC on the Agent API for forwarding
// boundary audit logs to coderd.
const (
CurrentMajor = 2
CurrentMinor = 6
CurrentMinor = 7
)
var CurrentVersion = apiversion.New(CurrentMajor, CurrentMinor)