fix: avoid instantiating a logger if provided /dev/null (#24027)
- Adds some additional context to workspace traffic logging - Fails traffic tests if 0 bytes read from connection
This commit is contained in:
@@ -104,7 +104,7 @@ func (b *Builder) Build(inv *serpent.Invocation) (log slog.Logger, closeLog func
|
|||||||
|
|
||||||
addSinkIfProvided := func(sinkFn func(io.Writer) slog.Sink, loc string) error {
|
addSinkIfProvided := func(sinkFn func(io.Writer) slog.Sink, loc string) error {
|
||||||
switch loc {
|
switch loc {
|
||||||
case "":
|
case "", "/dev/null":
|
||||||
case "/dev/stdout":
|
case "/dev/stdout":
|
||||||
sinks = append(sinks, sinkFn(inv.Stdout))
|
sinks = append(sinks, sinkFn(inv.Stdout))
|
||||||
|
|
||||||
|
|||||||
@@ -1401,6 +1401,9 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *serpent.Command {
|
|||||||
// Setup our workspace agent connection.
|
// Setup our workspace agent connection.
|
||||||
config := workspacetraffic.Config{
|
config := workspacetraffic.Config{
|
||||||
AgentID: agent.ID,
|
AgentID: agent.ID,
|
||||||
|
WorkspaceID: ws.ID,
|
||||||
|
WorkspaceName: ws.Name,
|
||||||
|
AgentName: agent.Name,
|
||||||
BytesPerTick: bytesPerTick,
|
BytesPerTick: bytesPerTick,
|
||||||
Duration: strategy.timeout,
|
Duration: strategy.timeout,
|
||||||
TickInterval: tickInterval,
|
TickInterval: tickInterval,
|
||||||
|
|||||||
@@ -12,6 +12,12 @@ import (
|
|||||||
type Config struct {
|
type Config struct {
|
||||||
// AgentID is the workspace agent ID to which to connect.
|
// AgentID is the workspace agent ID to which to connect.
|
||||||
AgentID uuid.UUID `json:"agent_id"`
|
AgentID uuid.UUID `json:"agent_id"`
|
||||||
|
// WorkspaceID is the workspace ID, used for logging.
|
||||||
|
WorkspaceID uuid.UUID `json:"workspace_id"`
|
||||||
|
// WorkspaceName is the workspace name, used for logging.
|
||||||
|
WorkspaceName string `json:"workspace_name"`
|
||||||
|
// AgentName is the agent name, used for logging.
|
||||||
|
AgentName string `json:"agent_name"`
|
||||||
|
|
||||||
// BytesPerTick is the number of bytes to send to the agent per tick.
|
// BytesPerTick is the number of bytes to send to the agent per tick.
|
||||||
BytesPerTick int64 `json:"bytes_per_tick"`
|
BytesPerTick int64 `json:"bytes_per_tick"`
|
||||||
|
|||||||
@@ -76,7 +76,12 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
|
|||||||
echo = r.cfg.Echo
|
echo = r.cfg.Echo
|
||||||
)
|
)
|
||||||
|
|
||||||
logger = logger.With(slog.F("agent_id", agentID))
|
logger = logger.With(
|
||||||
|
slog.F("agent_id", agentID),
|
||||||
|
slog.F("workspace_id", r.cfg.WorkspaceID),
|
||||||
|
slog.F("workspace_name", r.cfg.WorkspaceName),
|
||||||
|
slog.F("agent_name", r.cfg.AgentName),
|
||||||
|
)
|
||||||
|
|
||||||
logger.Debug(ctx, "config",
|
logger.Debug(ctx, "config",
|
||||||
slog.F("reconnecting_pty_id", reconnect),
|
slog.F("reconnecting_pty_id", reconnect),
|
||||||
@@ -153,6 +158,14 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
|
|||||||
conn.readMetrics = r.cfg.ReadMetrics
|
conn.readMetrics = r.cfg.ReadMetrics
|
||||||
conn.writeMetrics = r.cfg.WriteMetrics
|
conn.writeMetrics = r.cfg.WriteMetrics
|
||||||
|
|
||||||
|
logTrafficSummary := func() {
|
||||||
|
//nolint:gocritic
|
||||||
|
logger.Info(ctx, "traffic summary",
|
||||||
|
slog.F("actual_bytes_read", r.cfg.ReadMetrics.GetTotalBytes()),
|
||||||
|
slog.F("actual_bytes_written", r.cfg.WriteMetrics.GetTotalBytes()),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
// Create a ticker for sending data to the conn.
|
// Create a ticker for sending data to the conn.
|
||||||
tick := time.NewTicker(tickInterval)
|
tick := time.NewTicker(tickInterval)
|
||||||
defer tick.Stop()
|
defer tick.Stop()
|
||||||
@@ -179,10 +192,18 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
|
|||||||
|
|
||||||
var waitCloseTimeoutCh <-chan struct{}
|
var waitCloseTimeoutCh <-chan struct{}
|
||||||
deadlineCtxCh := deadlineCtx.Done()
|
deadlineCtxCh := deadlineCtx.Done()
|
||||||
|
deadlineReached := false
|
||||||
wchRef, rchRef := wch, rch
|
wchRef, rchRef := wch, rch
|
||||||
for {
|
for {
|
||||||
if wchRef == nil && rchRef == nil {
|
if wchRef == nil && rchRef == nil {
|
||||||
logger.Info(ctx, "reading and writing to agent complete! Closing connection")
|
logTrafficSummary()
|
||||||
|
if !deadlineReached {
|
||||||
|
return xerrors.Errorf("test did not complete: context canceled after %s of %s",
|
||||||
|
time.Since(start).Truncate(time.Second), r.cfg.Duration)
|
||||||
|
}
|
||||||
|
if r.cfg.ReadMetrics.GetTotalBytes() == 0 {
|
||||||
|
return xerrors.Errorf("zero bytes read from agent")
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -192,23 +213,27 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
|
|||||||
slog.F("write_done", wchRef == nil),
|
slog.F("write_done", wchRef == nil),
|
||||||
slog.F("read_done", rchRef == nil),
|
slog.F("read_done", rchRef == nil),
|
||||||
)
|
)
|
||||||
|
logTrafficSummary()
|
||||||
return xerrors.Errorf("timed out waiting for read/write to complete: %w", ctx.Err())
|
return xerrors.Errorf("timed out waiting for read/write to complete: %w", ctx.Err())
|
||||||
case <-deadlineCtxCh:
|
case <-deadlineCtxCh:
|
||||||
go func() {
|
go func() {
|
||||||
_ = closeConn()
|
_ = closeConn()
|
||||||
}()
|
}()
|
||||||
deadlineCtxCh = nil // Only trigger once.
|
deadlineCtxCh = nil // Only trigger once.
|
||||||
|
deadlineReached = true
|
||||||
// Wait at most closeTimeout for the connection to close cleanly.
|
// Wait at most closeTimeout for the connection to close cleanly.
|
||||||
waitCtx, cancel := context.WithTimeout(context.Background(), waitCloseTimeout)
|
waitCtx, cancel := context.WithTimeout(context.Background(), waitCloseTimeout)
|
||||||
defer cancel() //nolint:revive // Only called once.
|
defer cancel() //nolint:revive // Only called once.
|
||||||
waitCloseTimeoutCh = waitCtx.Done()
|
waitCloseTimeoutCh = waitCtx.Done()
|
||||||
case err = <-wchRef:
|
case err = <-wchRef:
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
logTrafficSummary()
|
||||||
return xerrors.Errorf("write to agent: %w", err)
|
return xerrors.Errorf("write to agent: %w", err)
|
||||||
}
|
}
|
||||||
wchRef = nil
|
wchRef = nil
|
||||||
case err = <-rchRef:
|
case err = <-rchRef:
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
logTrafficSummary()
|
||||||
return xerrors.Errorf("read from agent: %w", err)
|
return xerrors.Errorf("read from agent: %w", err)
|
||||||
}
|
}
|
||||||
rchRef = nil
|
rchRef = nil
|
||||||
|
|||||||
Reference in New Issue
Block a user