Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e85be9f42e | |||
| cfd7730194 | |||
| 1937ada0cd | |||
| d64cd6415d | |||
| c1851d9453 | |||
| 8f73453681 | |||
| 165db3d31c | |||
| 1bd1516fd1 | |||
| 81ba35a987 | |||
| 53d63cf8e9 | |||
| 4213a43b53 |
@@ -81,8 +81,8 @@ func newMsgQueue(ctx context.Context, l Listener, le ListenerWithErr) *msgQueue
|
||||
}
|
||||
|
||||
func (q *msgQueue) run() {
|
||||
var batch [maxDrainBatch]msgOrErr
|
||||
for {
|
||||
// wait until there is something on the queue or we are closed
|
||||
q.cond.L.Lock()
|
||||
for q.size == 0 && !q.closed {
|
||||
q.cond.Wait()
|
||||
@@ -91,28 +91,32 @@ func (q *msgQueue) run() {
|
||||
q.cond.L.Unlock()
|
||||
return
|
||||
}
|
||||
item := q.q[q.front]
|
||||
q.front = (q.front + 1) % BufferSize
|
||||
q.size--
|
||||
// Drain up to maxDrainBatch items while holding the lock.
|
||||
n := min(q.size, maxDrainBatch)
|
||||
for i := range n {
|
||||
batch[i] = q.q[q.front]
|
||||
q.front = (q.front + 1) % BufferSize
|
||||
}
|
||||
q.size -= n
|
||||
q.cond.L.Unlock()
|
||||
|
||||
// process item without holding lock
|
||||
if item.err == nil {
|
||||
// real message
|
||||
if q.l != nil {
|
||||
q.l(q.ctx, item.msg)
|
||||
// Dispatch each message individually without holding the lock.
|
||||
for i := range n {
|
||||
item := batch[i]
|
||||
if item.err == nil {
|
||||
if q.l != nil {
|
||||
q.l(q.ctx, item.msg)
|
||||
continue
|
||||
}
|
||||
if q.le != nil {
|
||||
q.le(q.ctx, item.msg, nil)
|
||||
continue
|
||||
}
|
||||
continue
|
||||
}
|
||||
if q.le != nil {
|
||||
q.le(q.ctx, item.msg, nil)
|
||||
continue
|
||||
q.le(q.ctx, nil, item.err)
|
||||
}
|
||||
// unhittable
|
||||
continue
|
||||
}
|
||||
// if the listener wants errors, send it.
|
||||
if q.le != nil {
|
||||
q.le(q.ctx, nil, item.err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -233,6 +237,12 @@ type PGPubsub struct {
|
||||
// for a subscriber before dropping messages.
|
||||
const BufferSize = 2048
|
||||
|
||||
// maxDrainBatch is the maximum number of messages to drain from the ring
|
||||
// buffer per iteration. Batching amortizes the cost of mutex
|
||||
// acquire/release and cond.Wait across many messages, improving drain
|
||||
// throughput during bursts.
|
||||
const maxDrainBatch = 256
|
||||
|
||||
// Subscribe calls the listener when an event matching the name is received.
|
||||
func (p *PGPubsub) Subscribe(event string, listener Listener) (cancel func(), err error) {
|
||||
return p.subscribeQueue(event, newMsgQueue(context.Background(), listener, nil))
|
||||
|
||||
+37
-19
@@ -19,6 +19,7 @@ import (
|
||||
"github.com/google/uuid"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"golang.org/x/sync/singleflight"
|
||||
"golang.org/x/xerrors"
|
||||
"tailscale.com/derp"
|
||||
"tailscale.com/tailcfg"
|
||||
@@ -389,6 +390,7 @@ type MultiAgentController struct {
|
||||
// connections to the destination
|
||||
tickets map[uuid.UUID]map[uuid.UUID]struct{}
|
||||
coordination *tailnet.BasicCoordination
|
||||
sendGroup singleflight.Group
|
||||
|
||||
cancel context.CancelFunc
|
||||
expireOldAgentsDone chan struct{}
|
||||
@@ -418,28 +420,44 @@ func (m *MultiAgentController) New(client tailnet.CoordinatorClient) tailnet.Clo
|
||||
|
||||
func (m *MultiAgentController) ensureAgent(agentID uuid.UUID) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
_, ok := m.connectionTimes[agentID]
|
||||
// If we don't have the agent, subscribe.
|
||||
if !ok {
|
||||
m.logger.Debug(context.Background(),
|
||||
"subscribing to agent", slog.F("agent_id", agentID))
|
||||
if m.coordination != nil {
|
||||
err := m.coordination.Client.Send(&proto.CoordinateRequest{
|
||||
AddTunnel: &proto.CoordinateRequest_Tunnel{Id: agentID[:]},
|
||||
})
|
||||
if err != nil {
|
||||
err = xerrors.Errorf("subscribe agent: %w", err)
|
||||
m.coordination.SendErr(err)
|
||||
_ = m.coordination.Client.Close()
|
||||
m.coordination = nil
|
||||
return err
|
||||
}
|
||||
}
|
||||
m.tickets[agentID] = map[uuid.UUID]struct{}{}
|
||||
if ok {
|
||||
m.connectionTimes[agentID] = time.Now()
|
||||
m.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
m.mu.Unlock()
|
||||
|
||||
m.logger.Debug(context.Background(),
|
||||
"subscribing to agent", slog.F("agent_id", agentID))
|
||||
|
||||
_, err, _ := m.sendGroup.Do(agentID.String(), func() (interface{}, error) {
|
||||
m.mu.Lock()
|
||||
coord := m.coordination
|
||||
m.mu.Unlock()
|
||||
if coord == nil {
|
||||
return nil, xerrors.New("no active coordination")
|
||||
}
|
||||
err := coord.Client.Send(&proto.CoordinateRequest{
|
||||
AddTunnel: &proto.CoordinateRequest_Tunnel{Id: agentID[:]},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m.mu.Lock()
|
||||
m.tickets[agentID] = map[uuid.UUID]struct{}{}
|
||||
m.mu.Unlock()
|
||||
return nil, nil
|
||||
})
|
||||
if err != nil {
|
||||
m.logger.Error(context.Background(), "ensureAgent send failed",
|
||||
slog.F("agent_id", agentID), slog.Error(err))
|
||||
return xerrors.Errorf("send AddTunnel: %w", err)
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
m.connectionTimes[agentID] = time.Now()
|
||||
m.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -730,7 +730,10 @@ func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
log := s.Logger.With(slog.F("agent_id", appToken.AgentID))
|
||||
log := s.Logger.With(
|
||||
slog.F("agent_id", appToken.AgentID),
|
||||
slog.F("workspace_id", appToken.WorkspaceID),
|
||||
)
|
||||
log.Debug(ctx, "resolved PTY request")
|
||||
|
||||
values := r.URL.Query()
|
||||
@@ -765,19 +768,21 @@ func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
|
||||
})
|
||||
return
|
||||
}
|
||||
go httpapi.HeartbeatClose(ctx, s.Logger, cancel, conn)
|
||||
|
||||
ctx, wsNetConn := WebsocketNetConn(ctx, conn, websocket.MessageBinary)
|
||||
defer wsNetConn.Close() // Also closes conn.
|
||||
|
||||
go httpapi.HeartbeatClose(ctx, log, cancel, conn)
|
||||
|
||||
dialStart := time.Now()
|
||||
|
||||
agentConn, release, err := s.AgentProvider.AgentConn(ctx, appToken.AgentID)
|
||||
if err != nil {
|
||||
log.Debug(ctx, "dial workspace agent", slog.Error(err))
|
||||
log.Debug(ctx, "dial workspace agent", slog.Error(err), slog.F("elapsed_ms", time.Since(dialStart).Milliseconds()))
|
||||
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("dial workspace agent: %s", err))
|
||||
return
|
||||
}
|
||||
defer release()
|
||||
log.Debug(ctx, "dialed workspace agent")
|
||||
log.Debug(ctx, "dialed workspace agent", slog.F("elapsed_ms", time.Since(dialStart).Milliseconds()))
|
||||
// #nosec G115 - Safe conversion for terminal height/width which are expected to be within uint16 range (0-65535)
|
||||
ptNetConn, err := agentConn.ReconnectingPTY(ctx, reconnect, uint16(height), uint16(width), r.URL.Query().Get("command"), func(arp *workspacesdk.AgentReconnectingPTYInit) {
|
||||
arp.Container = container
|
||||
@@ -785,12 +790,12 @@ func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
|
||||
arp.BackendType = backendType
|
||||
})
|
||||
if err != nil {
|
||||
log.Debug(ctx, "dial reconnecting pty server in workspace agent", slog.Error(err))
|
||||
log.Debug(ctx, "dial reconnecting pty server in workspace agent", slog.Error(err), slog.F("elapsed_ms", time.Since(dialStart).Milliseconds()))
|
||||
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("dial: %s", err))
|
||||
return
|
||||
}
|
||||
defer ptNetConn.Close()
|
||||
log.Debug(ctx, "obtained PTY")
|
||||
log.Debug(ctx, "obtained PTY", slog.F("elapsed_ms", time.Since(dialStart).Milliseconds()))
|
||||
|
||||
report := newStatsReportFromSignedToken(*appToken)
|
||||
s.collectStats(report)
|
||||
@@ -800,7 +805,7 @@ func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
|
||||
}()
|
||||
|
||||
agentssh.Bicopy(ctx, wsNetConn, ptNetConn)
|
||||
log.Debug(ctx, "pty Bicopy finished")
|
||||
log.Debug(ctx, "pty Bicopy finished", slog.F("elapsed_ms", time.Since(dialStart).Milliseconds()))
|
||||
}
|
||||
|
||||
func (s *Server) collectStats(stats StatsReport) {
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/google/uuid"
|
||||
"golang.org/x/sync/singleflight"
|
||||
"golang.org/x/xerrors"
|
||||
gProto "google.golang.org/protobuf/proto"
|
||||
|
||||
@@ -33,9 +34,9 @@ const (
|
||||
eventReadyForHandshake = "tailnet_ready_for_handshake"
|
||||
HeartbeatPeriod = time.Second * 2
|
||||
MissedHeartbeats = 3
|
||||
numQuerierWorkers = 10
|
||||
numQuerierWorkers = 40
|
||||
numBinderWorkers = 10
|
||||
numTunnelerWorkers = 10
|
||||
numTunnelerWorkers = 20
|
||||
numHandshakerWorkers = 5
|
||||
dbMaxBackoff = 10 * time.Second
|
||||
cleanupPeriod = time.Hour
|
||||
@@ -770,6 +771,9 @@ func (m *mapper) bestToUpdate(best map[uuid.UUID]mapping) *proto.CoordinateRespo
|
||||
|
||||
for k := range m.sent {
|
||||
if _, ok := best[k]; !ok {
|
||||
m.logger.Debug(m.ctx, "peer no longer in best mappings, sending DISCONNECTED",
|
||||
slog.F("peer_id", k),
|
||||
)
|
||||
resp.PeerUpdates = append(resp.PeerUpdates, &proto.CoordinateResponse_PeerUpdate{
|
||||
Id: agpl.UUIDToByteSlice(k),
|
||||
Kind: proto.CoordinateResponse_PeerUpdate_DISCONNECTED,
|
||||
@@ -820,6 +824,8 @@ type querier struct {
|
||||
mu sync.Mutex
|
||||
mappers map[mKey]*mapper
|
||||
healthy bool
|
||||
|
||||
resyncGroup singleflight.Group
|
||||
}
|
||||
|
||||
func newQuerier(ctx context.Context,
|
||||
@@ -958,7 +964,7 @@ func (q *querier) cleanupConn(c *connIO) {
|
||||
|
||||
// maxBatchSize is the maximum number of keys to process in a single batch
|
||||
// query.
|
||||
const maxBatchSize = 50
|
||||
const maxBatchSize = 200
|
||||
|
||||
func (q *querier) peerUpdateWorker() {
|
||||
defer q.wg.Done()
|
||||
@@ -1207,8 +1213,13 @@ func (q *querier) subscribe() {
|
||||
func (q *querier) listenPeer(_ context.Context, msg []byte, err error) {
|
||||
if xerrors.Is(err, pubsub.ErrDroppedMessages) {
|
||||
q.logger.Warn(q.ctx, "pubsub may have dropped peer updates")
|
||||
// we need to schedule a full resync of peer mappings
|
||||
q.resyncPeerMappings()
|
||||
// Schedule a full resync asynchronously so we don't block the
|
||||
// pubsub drain goroutine. Singleflight coalesces concurrent
|
||||
// resync requests.
|
||||
go q.resyncGroup.Do("resync", func() (any, error) {
|
||||
q.resyncPeerMappings()
|
||||
return nil, nil
|
||||
})
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
@@ -1234,8 +1245,13 @@ func (q *querier) listenPeer(_ context.Context, msg []byte, err error) {
|
||||
func (q *querier) listenTunnel(_ context.Context, msg []byte, err error) {
|
||||
if xerrors.Is(err, pubsub.ErrDroppedMessages) {
|
||||
q.logger.Warn(q.ctx, "pubsub may have dropped tunnel updates")
|
||||
// we need to schedule a full resync of peer mappings
|
||||
q.resyncPeerMappings()
|
||||
// Schedule a full resync asynchronously so we don't block the
|
||||
// pubsub drain goroutine. Singleflight coalesces concurrent
|
||||
// resync requests.
|
||||
go q.resyncGroup.Do("resync", func() (any, error) {
|
||||
q.resyncPeerMappings()
|
||||
return nil, nil
|
||||
})
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
@@ -1601,6 +1617,10 @@ func (h *heartbeats) filter(mappings []mapping) []mapping {
|
||||
// the only mapping available for it. Newer mappings will take
|
||||
// precedence.
|
||||
m.kind = proto.CoordinateResponse_PeerUpdate_LOST
|
||||
h.logger.Debug(h.ctx, "mapping rewritten to LOST due to missed heartbeats",
|
||||
slog.F("peer_id", m.peer),
|
||||
slog.F("coordinator_id", m.coordinator),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+10
-1
@@ -732,7 +732,16 @@ func (l *peerLifecycle) setLostTimer(c *configMaps) {
|
||||
if l.lostTimer != nil {
|
||||
l.lostTimer.Stop()
|
||||
}
|
||||
ttl := lostTimeout - c.clock.Since(l.lastHandshake)
|
||||
var ttl time.Duration
|
||||
if l.lastHandshake.IsZero() {
|
||||
// Peer has never completed a handshake. Give it the full
|
||||
// lostTimeout to establish one rather than deleting it
|
||||
// immediately. A zero lastHandshake just means WireGuard
|
||||
// hasn't connected yet, not that the peer is gone.
|
||||
ttl = lostTimeout
|
||||
} else {
|
||||
ttl = lostTimeout - c.clock.Since(l.lastHandshake)
|
||||
}
|
||||
if ttl <= 0 {
|
||||
ttl = time.Nanosecond
|
||||
}
|
||||
|
||||
@@ -641,6 +641,97 @@ func TestConfigMaps_updatePeers_lost(t *testing.T) {
|
||||
_ = testutil.TryReceive(ctx, t, done)
|
||||
}
|
||||
|
||||
func TestConfigMaps_updatePeers_lost_zero_handshake(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
logger := testutil.Logger(t)
|
||||
fEng := newFakeEngineConfigurable()
|
||||
nodePrivateKey := key.NewNode()
|
||||
nodeID := tailcfg.NodeID(5)
|
||||
discoKey := key.NewDisco()
|
||||
uut := newConfigMaps(logger, fEng, nodeID, nodePrivateKey, discoKey.Public(), CoderDNSSuffixFQDN)
|
||||
defer uut.close()
|
||||
mClock := quartz.NewMock(t)
|
||||
uut.clock = mClock
|
||||
|
||||
p1ID := uuid.UUID{1}
|
||||
p1Node := newTestNode(1)
|
||||
p1n, err := NodeToProto(p1Node)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Respond to the status request from updatePeers(NODE) with no
|
||||
// handshake information, so lastHandshake stays zero.
|
||||
expectNoStatus := func() <-chan struct{} {
|
||||
called := make(chan struct{})
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.Error("timeout waiting for status")
|
||||
return
|
||||
case b := <-fEng.status:
|
||||
_ = b // don't add any peer
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.Error("timeout sending done")
|
||||
case fEng.statusDone <- struct{}{}:
|
||||
close(called)
|
||||
}
|
||||
}()
|
||||
return called
|
||||
}
|
||||
|
||||
// Add the peer via NODE update — no handshake in status.
|
||||
s1 := expectNoStatus()
|
||||
updates := []*proto.CoordinateResponse_PeerUpdate{
|
||||
{
|
||||
Id: p1ID[:],
|
||||
Kind: proto.CoordinateResponse_PeerUpdate_NODE,
|
||||
Node: p1n,
|
||||
},
|
||||
}
|
||||
uut.updatePeers(updates)
|
||||
nm := testutil.TryReceive(ctx, t, fEng.setNetworkMap)
|
||||
r := testutil.TryReceive(ctx, t, fEng.reconfig)
|
||||
require.Len(t, nm.Peers, 1)
|
||||
require.Len(t, r.wg.Peers, 1)
|
||||
_ = testutil.TryReceive(ctx, t, s1)
|
||||
|
||||
// Mark the peer as LOST, still with no handshake.
|
||||
s2 := expectNoStatus()
|
||||
updates[0].Kind = proto.CoordinateResponse_PeerUpdate_LOST
|
||||
updates[0].Node = nil
|
||||
uut.updatePeers(updates)
|
||||
_ = testutil.TryReceive(ctx, t, s2)
|
||||
|
||||
// Peer should NOT be removed immediately.
|
||||
select {
|
||||
case <-fEng.setNetworkMap:
|
||||
t.Fatal("should not reprogram")
|
||||
default:
|
||||
// OK!
|
||||
}
|
||||
|
||||
// Prepare a status response for when the lost timer fires after
|
||||
// lostTimeout. Return empty status (no handshake ever happened).
|
||||
s3 := expectNoStatus()
|
||||
mClock.Advance(lostTimeout).MustWait(ctx)
|
||||
_ = testutil.TryReceive(ctx, t, s3)
|
||||
|
||||
// Now the peer should be removed.
|
||||
nm = testutil.TryReceive(ctx, t, fEng.setNetworkMap)
|
||||
r = testutil.TryReceive(ctx, t, fEng.reconfig)
|
||||
require.Len(t, nm.Peers, 0)
|
||||
require.Len(t, r.wg.Peers, 0)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
uut.close()
|
||||
}()
|
||||
_ = testutil.TryReceive(ctx, t, done)
|
||||
}
|
||||
|
||||
func TestConfigMaps_updatePeers_lost_and_found(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
|
||||
+50
-42
@@ -12,6 +12,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sync/singleflight"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/google/uuid"
|
||||
"github.com/tailscale/wireguard-go/tun"
|
||||
@@ -463,6 +465,8 @@ type Conn struct {
|
||||
|
||||
trafficStats *connstats.Statistics
|
||||
lastNetInfo *tailcfg.NetInfo
|
||||
|
||||
awaitReachableGroup singleflight.Group
|
||||
}
|
||||
|
||||
func (c *Conn) GetNetInfo() *tailcfg.NetInfo {
|
||||
@@ -599,56 +603,60 @@ func (c *Conn) DERPMap() *tailcfg.DERPMap {
|
||||
// address is reachable. It's the callers responsibility to provide
|
||||
// a timeout, otherwise this function will block forever.
|
||||
func (c *Conn) AwaitReachable(ctx context.Context, ip netip.Addr) bool {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel() // Cancel all pending pings on exit.
|
||||
result, _, _ := c.awaitReachableGroup.Do(ip.String(), func() (interface{}, error) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel() // Cancel all pending pings on exit.
|
||||
|
||||
completedCtx, completed := context.WithCancel(context.Background())
|
||||
defer completed()
|
||||
completedCtx, completed := context.WithCancel(context.Background())
|
||||
defer completed()
|
||||
|
||||
run := func() {
|
||||
// Safety timeout, initially we'll have around 10-20 goroutines
|
||||
// running in parallel. The exponential backoff will converge
|
||||
// around ~1 ping / 30s, this means we'll have around 10-20
|
||||
// goroutines pending towards the end as well.
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
||||
defer cancel()
|
||||
run := func() {
|
||||
// Safety timeout, initially we'll have around 10-20 goroutines
|
||||
// running in parallel. The exponential backoff will converge
|
||||
// around ~1 ping / 30s, this means we'll have around 10-20
|
||||
// goroutines pending towards the end as well.
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
// For reachability, we use TSMP ping, which pings at the IP layer, and
|
||||
// therefore requires that wireguard and the netstack are up. If we
|
||||
// don't wait for wireguard to be up, we could miss a handshake, and it
|
||||
// might take 5 seconds for the handshake to be retried. A 5s initial
|
||||
// round trip can set us up for poor TCP performance, since the initial
|
||||
// round-trip-time sets the initial retransmit timeout.
|
||||
_, _, _, err := c.pingWithType(ctx, ip, tailcfg.PingTSMP)
|
||||
if err == nil {
|
||||
completed()
|
||||
// For reachability, we use TSMP ping, which pings at the IP layer,
|
||||
// and therefore requires that wireguard and the netstack are up.
|
||||
// If we don't wait for wireguard to be up, we could miss a
|
||||
// handshake, and it might take 5 seconds for the handshake to be
|
||||
// retried. A 5s initial round trip can set us up for poor TCP
|
||||
// performance, since the initial round-trip-time sets the initial
|
||||
// retransmit timeout.
|
||||
_, _, _, err := c.pingWithType(ctx, ip, tailcfg.PingTSMP)
|
||||
if err == nil {
|
||||
completed()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
eb := backoff.NewExponentialBackOff()
|
||||
eb.MaxElapsedTime = 0
|
||||
eb.InitialInterval = 50 * time.Millisecond
|
||||
eb.MaxInterval = 30 * time.Second
|
||||
// Consume the first interval since
|
||||
// we'll fire off a ping immediately.
|
||||
_ = eb.NextBackOff()
|
||||
eb := backoff.NewExponentialBackOff()
|
||||
eb.MaxElapsedTime = 0
|
||||
eb.InitialInterval = 50 * time.Millisecond
|
||||
eb.MaxInterval = 5 * time.Second
|
||||
// Consume the first interval since
|
||||
// we'll fire off a ping immediately.
|
||||
_ = eb.NextBackOff()
|
||||
|
||||
t := backoff.NewTicker(eb)
|
||||
defer t.Stop()
|
||||
t := backoff.NewTicker(eb)
|
||||
defer t.Stop()
|
||||
|
||||
go run()
|
||||
for {
|
||||
select {
|
||||
case <-completedCtx.Done():
|
||||
return true
|
||||
case <-t.C:
|
||||
// Pings can take a while, so we can run multiple
|
||||
// in parallel to return ASAP.
|
||||
go run()
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
go run()
|
||||
for {
|
||||
select {
|
||||
case <-completedCtx.Done():
|
||||
return true, nil
|
||||
case <-t.C:
|
||||
// Pings can take a while, so we can run multiple
|
||||
// in parallel to return ASAP.
|
||||
go run()
|
||||
case <-ctx.Done():
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
return result.(bool)
|
||||
}
|
||||
|
||||
// Closed is a channel that ends when the connection has
|
||||
|
||||
Reference in New Issue
Block a user