perf: improve memory use and cpu usage for OpenAI requests handled by bridge (#21838)
Apply optimizations: * https://github.com/openai/openai-go/pull/602 * https://github.com/coder/aibridge/pull/160 These reduce CPU time and allocation count for OpenAI `chat/completions` and `responses` APIs, making the use of OpenAI chat models through AI Bridge more performant. In order to test these changes, we add scaletesting support for the responses API.
This commit is contained in:
@@ -49,6 +49,9 @@ Examples:
|
||||
# Test OpenAI API through bridge
|
||||
coder scaletest bridge --mode bridge --provider openai --concurrent-users 10 --request-count 5 --num-messages 10
|
||||
|
||||
# Test OpenAI Responses API through bridge
|
||||
coder scaletest bridge --mode bridge --provider responses --concurrent-users 10 --request-count 5 --num-messages 10
|
||||
|
||||
# Test Anthropic API through bridge
|
||||
coder scaletest bridge --mode bridge --provider anthropic --concurrent-users 10 --request-count 5 --num-messages 10
|
||||
|
||||
@@ -219,9 +222,9 @@ Examples:
|
||||
{
|
||||
Flag: "provider",
|
||||
Env: "CODER_SCALETEST_BRIDGE_PROVIDER",
|
||||
Default: "openai",
|
||||
Required: true,
|
||||
Description: "API provider to use.",
|
||||
Value: serpent.EnumOf(&provider, "openai", "anthropic"),
|
||||
Value: serpent.EnumOf(&provider, "completions", "messages", "responses"),
|
||||
},
|
||||
{
|
||||
Flag: "request-count",
|
||||
|
||||
@@ -62,6 +62,7 @@ func (*RootCmd) scaletestLLMMock() *serpent.Command {
|
||||
|
||||
_, _ = fmt.Fprintf(inv.Stdout, "Mock LLM API server started on %s\n", srv.APIAddress())
|
||||
_, _ = fmt.Fprintf(inv.Stdout, " OpenAI endpoint: %s/v1/chat/completions\n", srv.APIAddress())
|
||||
_, _ = fmt.Fprintf(inv.Stdout, " OpenAI responses endpoint: %s/v1/responses\n", srv.APIAddress())
|
||||
_, _ = fmt.Fprintf(inv.Stdout, " Anthropic endpoint: %s/v1/messages\n", srv.APIAddress())
|
||||
|
||||
<-ctx.Done()
|
||||
|
||||
@@ -473,7 +473,7 @@ require (
|
||||
github.com/anthropics/anthropic-sdk-go v1.19.0
|
||||
github.com/brianvoe/gofakeit/v7 v7.14.0
|
||||
github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225
|
||||
github.com/coder/aibridge v1.0.0
|
||||
github.com/coder/aibridge v1.0.1-0.20260202135542-9e2857aaac8f
|
||||
github.com/coder/aisdk-go v0.0.9
|
||||
github.com/coder/boundary v0.6.1
|
||||
github.com/coder/preview v1.0.4
|
||||
@@ -591,4 +591,9 @@ tool (
|
||||
storj.io/drpc/cmd/protoc-gen-go-drpc
|
||||
)
|
||||
|
||||
// Replace sdks with our own optimized forks until relevant upstream PRs are merged.
|
||||
// https://github.com/anthropics/anthropic-sdk-go/pull/262
|
||||
replace github.com/anthropics/anthropic-sdk-go v1.19.0 => github.com/dannykopping/anthropic-sdk-go v0.0.0-20251230111224-88a4315810bd
|
||||
|
||||
// https://github.com/openai/openai-go/pull/602
|
||||
replace github.com/openai/openai-go/v3 => github.com/SasSwart/openai-go/v3 v3.0.0-20260202093810-72af3b857f95
|
||||
|
||||
@@ -693,6 +693,8 @@ github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8
|
||||
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
|
||||
github.com/ProtonMail/go-crypto v1.3.0 h1:ILq8+Sf5If5DCpHQp4PbZdS1J7HDFRXz/+xKBiRGFrw=
|
||||
github.com/ProtonMail/go-crypto v1.3.0/go.mod h1:9whxjD8Rbs29b4XWbB8irEcE8KHMqaR2e7GWU1R+/PE=
|
||||
github.com/SasSwart/openai-go/v3 v3.0.0-20260202093810-72af3b857f95 h1:HVJp3FanNaeFAlwg0/lkdkSnwFemHnwwjXBM8KRj540=
|
||||
github.com/SasSwart/openai-go/v3 v3.0.0-20260202093810-72af3b857f95/go.mod h1:cdufnVK14cWcT9qA1rRtrXx4FTRsgbDPW7Ia7SS5cZo=
|
||||
github.com/SherClockHolmes/webpush-go v1.4.0 h1:ocnzNKWN23T9nvHi6IfyrQjkIc0oJWv1B1pULsf9i3s=
|
||||
github.com/SherClockHolmes/webpush-go v1.4.0/go.mod h1:XSq8pKX11vNV8MJEMwjrlTkxhAj1zKfxmyhdV7Pd6UA=
|
||||
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8=
|
||||
@@ -925,8 +927,8 @@ github.com/cncf/xds/go v0.0.0-20251022180443-0feb69152e9f h1:Y8xYupdHxryycyPlc9Y
|
||||
github.com/cncf/xds/go v0.0.0-20251022180443-0feb69152e9f/go.mod h1:HlzOvOjVBOfTGSRXRyY0OiCS/3J1akRGQQpRO/7zyF4=
|
||||
github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225 h1:tRIViZ5JRmzdOEo5wUWngaGEFBG8OaE1o2GIHN5ujJ8=
|
||||
github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225/go.mod h1:rNLVpYgEVeu1Zk29K64z6Od8RBP9DwqCu9OfCzh8MR4=
|
||||
github.com/coder/aibridge v1.0.0 h1:gQYJ8Q83tnfWKqObu9DR4mNFTh4uTwq1rkRtuqO3x20=
|
||||
github.com/coder/aibridge v1.0.0/go.mod h1:NzO3dgxVXosV3a405QdhfGjisGW44tffhYDVTiV9KJ8=
|
||||
github.com/coder/aibridge v1.0.1-0.20260202135542-9e2857aaac8f h1:DaSKB6p/CgDN3RXHESvFbvsT2P9XRZm0th2+MiFImwE=
|
||||
github.com/coder/aibridge v1.0.1-0.20260202135542-9e2857aaac8f/go.mod h1:M1aoiK6qmybTjD2nzcTCRPXzA/I0Ned+MAxUmz4Ju+k=
|
||||
github.com/coder/aisdk-go v0.0.9 h1:Vzo/k2qwVGLTR10ESDeP2Ecek1SdPfZlEjtTfMveiVo=
|
||||
github.com/coder/aisdk-go v0.0.9/go.mod h1:KF6/Vkono0FJJOtWtveh5j7yfNrSctVTpwgweYWSp5M=
|
||||
github.com/coder/boundary v0.6.1 h1:hLnrincIFA8Wak5SrH/xQDIIhkKQpnHVotLwC585z7g=
|
||||
@@ -1685,8 +1687,6 @@ github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisti
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor v0.120.1/go.mod h1:Z/S1brD5gU2Ntht/bHxBVnGxXKTvZDr0dNv/riUzPmY=
|
||||
github.com/openai/openai-go v1.12.0 h1:NBQCnXzqOTv5wsgNC36PrFEiskGfO5wccfCWDo9S1U0=
|
||||
github.com/openai/openai-go v1.12.0/go.mod h1:g461MYGXEXBVdV5SaR/5tNzNbSfwTBBefwc+LlDCK0Y=
|
||||
github.com/openai/openai-go/v3 v3.15.0 h1:hk99rM7YPz+M99/5B/zOQcVwFRLLMdprVGx1vaZ8XMo=
|
||||
github.com/openai/openai-go/v3 v3.15.0/go.mod h1:cdufnVK14cWcT9qA1rRtrXx4FTRsgbDPW7Ia7SS5cZo=
|
||||
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
|
||||
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
|
||||
github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040=
|
||||
|
||||
@@ -32,7 +32,7 @@ type Config struct {
|
||||
// Only used in direct mode.
|
||||
UpstreamURL string `json:"upstream_url"`
|
||||
|
||||
// Provider is the API provider to use: "openai" or "anthropic".
|
||||
// Provider is the API provider to use: "completions", "messages", or "responses".
|
||||
Provider string `json:"provider"`
|
||||
|
||||
// RequestCount is the number of requests to make per runner.
|
||||
@@ -77,8 +77,8 @@ func (c Config) Validate() error {
|
||||
}
|
||||
|
||||
// Validate provider
|
||||
if c.Provider != "openai" && c.Provider != "anthropic" {
|
||||
return xerrors.New("provider must be either 'openai' or 'anthropic'")
|
||||
if c.Provider != "completions" && c.Provider != "messages" && c.Provider != "responses" {
|
||||
return xerrors.New("provider must be 'completions', 'messages', or 'responses'")
|
||||
}
|
||||
|
||||
if c.Mode == RequestModeDirect {
|
||||
|
||||
@@ -19,20 +19,52 @@ type message struct {
|
||||
|
||||
func NewProviderStrategy(provider string) ProviderStrategy {
|
||||
switch provider {
|
||||
case "anthropic":
|
||||
return &anthropicProvider{}
|
||||
case "messages":
|
||||
return &messagesProvider{}
|
||||
case "completions":
|
||||
return &chatCompletionsProvider{}
|
||||
case "responses":
|
||||
return &responsesProvider{}
|
||||
default:
|
||||
return &openAIProvider{}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
type openAIProvider struct{}
|
||||
var _ ProviderStrategy = &responsesProvider{}
|
||||
|
||||
func (*openAIProvider) DefaultModel() string {
|
||||
type responsesProvider struct{}
|
||||
|
||||
type chatCompletionsProvider struct{}
|
||||
|
||||
func (*responsesProvider) DefaultModel() string {
|
||||
return "gpt-5"
|
||||
}
|
||||
|
||||
func (*responsesProvider) formatMessages(messages []message) []any {
|
||||
formatted := make([]any, 0, len(messages))
|
||||
for _, msg := range messages {
|
||||
formatted = append(formatted, map[string]any{
|
||||
"type": "message",
|
||||
"role": msg.Role,
|
||||
"content": msg.Content,
|
||||
})
|
||||
}
|
||||
return formatted
|
||||
}
|
||||
|
||||
func (*responsesProvider) buildRequestBody(model string, messages []any, stream bool) map[string]any {
|
||||
return map[string]any{
|
||||
"model": model,
|
||||
"input": messages,
|
||||
"stream": stream,
|
||||
}
|
||||
}
|
||||
|
||||
func (*chatCompletionsProvider) DefaultModel() string {
|
||||
return "gpt-4"
|
||||
}
|
||||
|
||||
func (*openAIProvider) formatMessages(messages []message) []any {
|
||||
func (*chatCompletionsProvider) formatMessages(messages []message) []any {
|
||||
formatted := make([]any, 0, len(messages))
|
||||
for _, msg := range messages {
|
||||
formatted = append(formatted, map[string]string{
|
||||
@@ -43,7 +75,7 @@ func (*openAIProvider) formatMessages(messages []message) []any {
|
||||
return formatted
|
||||
}
|
||||
|
||||
func (*openAIProvider) buildRequestBody(model string, messages []any, stream bool) map[string]any {
|
||||
func (*chatCompletionsProvider) buildRequestBody(model string, messages []any, stream bool) map[string]any {
|
||||
return map[string]any{
|
||||
"model": model,
|
||||
"messages": messages,
|
||||
@@ -51,13 +83,13 @@ func (*openAIProvider) buildRequestBody(model string, messages []any, stream boo
|
||||
}
|
||||
}
|
||||
|
||||
type anthropicProvider struct{}
|
||||
type messagesProvider struct{}
|
||||
|
||||
func (*anthropicProvider) DefaultModel() string {
|
||||
func (*messagesProvider) DefaultModel() string {
|
||||
return "claude-3-opus-20240229"
|
||||
}
|
||||
|
||||
func (*anthropicProvider) formatMessages(messages []message) []any {
|
||||
func (*messagesProvider) formatMessages(messages []message) []any {
|
||||
formatted := make([]any, 0, len(messages))
|
||||
for _, msg := range messages {
|
||||
formatted = append(formatted, map[string]any{
|
||||
@@ -73,7 +105,7 @@ func (*anthropicProvider) formatMessages(messages []message) []any {
|
||||
return formatted
|
||||
}
|
||||
|
||||
func (*anthropicProvider) buildRequestBody(model string, messages []any, stream bool) map[string]any {
|
||||
func (*messagesProvider) buildRequestBody(model string, messages []any, stream bool) map[string]any {
|
||||
return map[string]any{
|
||||
"model": model,
|
||||
"messages": messages,
|
||||
|
||||
+65
-5
@@ -6,6 +6,7 @@ import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
|
||||
@@ -248,13 +249,19 @@ func (r *Runner) makeRequest(ctx context.Context, logger slog.Logger, url, token
|
||||
}
|
||||
|
||||
func (r *Runner) handleNonStreamingResponse(ctx context.Context, logger slog.Logger, resp *http.Response) error {
|
||||
if r.cfg.Provider == "anthropic" {
|
||||
return r.handleAnthropicResponse(ctx, logger, resp)
|
||||
switch r.cfg.Provider {
|
||||
case "messages":
|
||||
return r.handleMessagesResponse(ctx, logger, resp)
|
||||
case "responses":
|
||||
return r.handleResponsesResponse(ctx, logger, resp)
|
||||
case "completions":
|
||||
return r.handleCompletionsResponse(ctx, logger, resp)
|
||||
default:
|
||||
return xerrors.Errorf("unsupported provider: %s", r.cfg.Provider)
|
||||
}
|
||||
return r.handleOpenAIResponse(ctx, logger, resp)
|
||||
}
|
||||
|
||||
func (r *Runner) handleOpenAIResponse(ctx context.Context, logger slog.Logger, resp *http.Response) error {
|
||||
func (r *Runner) handleCompletionsResponse(ctx context.Context, logger slog.Logger, resp *http.Response) error {
|
||||
var response struct {
|
||||
ID string `json:"id"`
|
||||
Model string `json:"model"`
|
||||
@@ -291,7 +298,60 @@ func (r *Runner) handleOpenAIResponse(ctx context.Context, logger slog.Logger, r
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Runner) handleAnthropicResponse(ctx context.Context, logger slog.Logger, resp *http.Response) error {
|
||||
func (r *Runner) handleResponsesResponse(ctx context.Context, logger slog.Logger, resp *http.Response) error {
|
||||
var response struct {
|
||||
ID string `json:"id"`
|
||||
Model string `json:"model"`
|
||||
Output []struct {
|
||||
Type string `json:"type"`
|
||||
Role string `json:"role"`
|
||||
Content []struct {
|
||||
Type string `json:"type"`
|
||||
Text string `json:"text"`
|
||||
} `json:"content"`
|
||||
} `json:"output"`
|
||||
Usage struct {
|
||||
InputTokens int `json:"input_tokens"`
|
||||
OutputTokens int `json:"output_tokens"`
|
||||
TotalTokens int `json:"total_tokens"`
|
||||
} `json:"usage"`
|
||||
}
|
||||
|
||||
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
|
||||
return xerrors.Errorf("decode response: %w", err)
|
||||
}
|
||||
|
||||
var assistantContent string
|
||||
var contentBuilder strings.Builder
|
||||
for _, item := range response.Output {
|
||||
if item.Role != "assistant" {
|
||||
continue
|
||||
}
|
||||
for _, content := range item.Content {
|
||||
if content.Type != "output_text" {
|
||||
continue
|
||||
}
|
||||
_, _ = contentBuilder.WriteString(content.Text)
|
||||
}
|
||||
}
|
||||
assistantContent = contentBuilder.String()
|
||||
if assistantContent != "" {
|
||||
logger.Debug(ctx, "received response",
|
||||
slog.F("response_id", response.ID),
|
||||
slog.F("content_length", len(assistantContent)),
|
||||
)
|
||||
}
|
||||
|
||||
if response.Usage.TotalTokens > 0 {
|
||||
r.totalTokens += int64(response.Usage.TotalTokens)
|
||||
r.cfg.Metrics.AddTokens("input", int64(response.Usage.InputTokens))
|
||||
r.cfg.Metrics.AddTokens("output", int64(response.Usage.OutputTokens))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Runner) handleMessagesResponse(ctx context.Context, logger slog.Logger, resp *http.Response) error {
|
||||
var response struct {
|
||||
ID string `json:"id"`
|
||||
Model string `json:"model"`
|
||||
|
||||
@@ -64,9 +64,12 @@ func (s *bridgeStrategy) Setup(ctx context.Context, id string, logs io.Writer) (
|
||||
slog.F("user_id", newUser.ID.String()),
|
||||
)
|
||||
|
||||
if s.provider == "anthropic" {
|
||||
switch s.provider {
|
||||
case "messages":
|
||||
requestURL = fmt.Sprintf("%s/api/v2/aibridge/anthropic/v1/messages", s.client.URL)
|
||||
} else {
|
||||
case "responses":
|
||||
requestURL = fmt.Sprintf("%s/api/v2/aibridge/openai/v1/responses", s.client.URL)
|
||||
case "completions":
|
||||
requestURL = fmt.Sprintf("%s/api/v2/aibridge/openai/v1/chat/completions", s.client.URL)
|
||||
}
|
||||
logger.Info(ctx, "bridge runner in bridge mode",
|
||||
|
||||
@@ -77,6 +77,27 @@ type openAIResponse struct {
|
||||
} `json:"usage"`
|
||||
}
|
||||
|
||||
type responsesResponse struct {
|
||||
ID string `json:"id"`
|
||||
Object string `json:"object"`
|
||||
Created int64 `json:"created"`
|
||||
Model string `json:"model"`
|
||||
Output []struct {
|
||||
ID string `json:"id,omitempty"`
|
||||
Type string `json:"type"`
|
||||
Role string `json:"role"`
|
||||
Content []struct {
|
||||
Type string `json:"type"`
|
||||
Text string `json:"text"`
|
||||
} `json:"content"`
|
||||
} `json:"output"`
|
||||
Usage struct {
|
||||
InputTokens int `json:"input_tokens"`
|
||||
OutputTokens int `json:"output_tokens"`
|
||||
TotalTokens int `json:"total_tokens"`
|
||||
} `json:"usage"`
|
||||
}
|
||||
|
||||
type anthropicResponse struct {
|
||||
ID string `json:"id"`
|
||||
Type string `json:"type"`
|
||||
@@ -152,6 +173,7 @@ func (s *Server) startAPIServer(ctx context.Context) error {
|
||||
mux := http.NewServeMux()
|
||||
|
||||
mux.HandleFunc("POST /v1/chat/completions", s.handleOpenAI)
|
||||
mux.HandleFunc("POST /v1/responses", s.handleResponses)
|
||||
mux.HandleFunc("POST /v1/messages", s.handleAnthropic)
|
||||
|
||||
var handler http.Handler = mux
|
||||
@@ -262,6 +284,93 @@ func (s *Server) handleAnthropic(w http.ResponseWriter, r *http.Request) {
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Server) handleResponses(w http.ResponseWriter, r *http.Request) {
|
||||
pproflabel.Do(r.Context(), pproflabel.Service("llm-mock"), func(ctx context.Context) {
|
||||
s.handleResponsesWithLabels(w, r.WithContext(ctx))
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Server) handleResponsesWithLabels(w http.ResponseWriter, r *http.Request) {
|
||||
s.logger.Debug(r.Context(), "handling OpenAI responses request")
|
||||
defer s.logger.Debug(r.Context(), "handled OpenAI responses request")
|
||||
|
||||
ctx := r.Context()
|
||||
requestID := uuid.New()
|
||||
now := time.Now()
|
||||
|
||||
var req llmRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
s.logger.Error(ctx, "failed to parse OpenAI responses request", slog.Error(err))
|
||||
http.Error(w, "invalid request body", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if s.artificialLatency > 0 {
|
||||
time.Sleep(s.artificialLatency)
|
||||
}
|
||||
|
||||
var resp responsesResponse
|
||||
resp.ID = fmt.Sprintf("resp_%s", requestID.String()[:8])
|
||||
resp.Object = "response"
|
||||
resp.Created = now.Unix()
|
||||
resp.Model = req.Model
|
||||
|
||||
var responseContent string
|
||||
if s.responsePayloadSize > 0 {
|
||||
pattern := "x"
|
||||
repeated := strings.Repeat(pattern, s.responsePayloadSize)
|
||||
responseContent = repeated[:s.responsePayloadSize]
|
||||
} else {
|
||||
responseContent = "This is a mock response from OpenAI Responses."
|
||||
}
|
||||
|
||||
resp.Output = []struct {
|
||||
ID string `json:"id,omitempty"`
|
||||
Type string `json:"type"`
|
||||
Role string `json:"role"`
|
||||
Content []struct {
|
||||
Type string `json:"type"`
|
||||
Text string `json:"text"`
|
||||
} `json:"content"`
|
||||
}{
|
||||
{
|
||||
ID: fmt.Sprintf("msg_%s", requestID.String()[:8]),
|
||||
Type: "message",
|
||||
Role: "assistant",
|
||||
Content: []struct {
|
||||
Type string `json:"type"`
|
||||
Text string `json:"text"`
|
||||
}{
|
||||
{
|
||||
Type: "output_text",
|
||||
Text: responseContent,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
resp.Usage.InputTokens = 10
|
||||
resp.Usage.OutputTokens = 5
|
||||
resp.Usage.TotalTokens = 15
|
||||
|
||||
responseBody, _ := json.Marshal(resp)
|
||||
|
||||
if req.Stream {
|
||||
s.sendResponsesStream(ctx, w, resp)
|
||||
} else {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
if _, err := w.Write(responseBody); err != nil {
|
||||
s.logger.Error(ctx, "failed to write OpenAI responses response",
|
||||
slog.F("request_id", requestID),
|
||||
slog.Error(err),
|
||||
slog.F("error_type", "write_error"),
|
||||
slog.F("likely_cause", "network_error"),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handleAnthropicWithLabels(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
requestID := uuid.New()
|
||||
@@ -396,6 +505,69 @@ func (s *Server) sendOpenAIStream(ctx context.Context, w http.ResponseWriter, re
|
||||
writeChunk("data: [DONE]\n\n")
|
||||
}
|
||||
|
||||
func (s *Server) sendResponsesStream(ctx context.Context, w http.ResponseWriter, resp responsesResponse) {
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
w.Header().Set("Connection", "keep-alive")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
|
||||
flusher, ok := w.(http.Flusher)
|
||||
if !ok {
|
||||
s.logger.Error(ctx, "responseWriter does not support flushing",
|
||||
slog.F("response_id", resp.ID),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
writeChunk := func(data string) bool {
|
||||
if _, err := fmt.Fprintf(w, "%s", data); err != nil {
|
||||
s.logger.Error(ctx, "failed to write OpenAI responses stream chunk",
|
||||
slog.F("response_id", resp.ID),
|
||||
slog.Error(err),
|
||||
slog.F("error_type", "write_error"),
|
||||
slog.F("likely_cause", "network_error"),
|
||||
)
|
||||
return false
|
||||
}
|
||||
flusher.Flush()
|
||||
return true
|
||||
}
|
||||
|
||||
deltaChunk := map[string]interface{}{
|
||||
"id": resp.ID,
|
||||
"object": "response.output_text.delta",
|
||||
"created": resp.Created,
|
||||
"model": resp.Model,
|
||||
"output_index": 0,
|
||||
"content_index": 0,
|
||||
"delta": resp.Output[0].Content[0].Text,
|
||||
}
|
||||
deltaBytes, _ := json.Marshal(deltaChunk)
|
||||
if !writeChunk(fmt.Sprintf("data: %s\n\n", deltaBytes)) {
|
||||
return
|
||||
}
|
||||
|
||||
finalChunk := map[string]interface{}{
|
||||
"id": resp.ID,
|
||||
"object": "response.completed",
|
||||
"created": resp.Created,
|
||||
"model": resp.Model,
|
||||
"response": map[string]interface{}{
|
||||
"id": resp.ID,
|
||||
"object": resp.Object,
|
||||
"created": resp.Created,
|
||||
"model": resp.Model,
|
||||
"output": resp.Output,
|
||||
"usage": resp.Usage,
|
||||
},
|
||||
}
|
||||
finalBytes, _ := json.Marshal(finalChunk)
|
||||
if !writeChunk(fmt.Sprintf("data: %s\n\n", finalBytes)) {
|
||||
return
|
||||
}
|
||||
writeChunk("data: [DONE]\n\n")
|
||||
}
|
||||
|
||||
func (s *Server) sendAnthropicStream(ctx context.Context, w http.ResponseWriter, resp anthropicResponse) {
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
|
||||
Reference in New Issue
Block a user