108 lines
2.3 KiB
Go
Raw Normal View History

package pubsub
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
2022-11-04 09:57:20 -04:00
"github.com/libp2p/go-libp2p/core/peer"
)
const (
MinBackoffDelay = 100 * time.Millisecond
MaxBackoffDelay = 10 * time.Second
TimeToLive = 10 * time.Minute
BackoffCleanupInterval = 1 * time.Minute
BackoffMultiplier = 2
MaxBackoffJitterCoff = 100
MaxBackoffAttempts = 4
)
type backoffHistory struct {
duration time.Duration
lastTried time.Time
attempts int
}
type backoff struct {
mu sync.Mutex
info map[peer.ID]*backoffHistory
ct int // size threshold that kicks off the cleaner
ci time.Duration // cleanup intervals
maxAttempts int // maximum backoff attempts prior to ejection
}
func newBackoff(ctx context.Context, sizeThreshold int, cleanupInterval time.Duration, maxAttempts int) *backoff {
b := &backoff{
mu: sync.Mutex{},
ct: sizeThreshold,
ci: cleanupInterval,
maxAttempts: maxAttempts,
info: make(map[peer.ID]*backoffHistory),
}
rand.Seed(time.Now().UnixNano()) // used for jitter
go b.cleanupLoop(ctx)
return b
}
func (b *backoff) updateAndGet(id peer.ID) (time.Duration, error) {
b.mu.Lock()
defer b.mu.Unlock()
h, ok := b.info[id]
switch {
case !ok || time.Since(h.lastTried) > TimeToLive:
// first request goes immediately.
h = &backoffHistory{
duration: time.Duration(0),
attempts: 0,
}
case h.attempts >= b.maxAttempts:
return 0, fmt.Errorf("peer %s has reached its maximum backoff attempts", id)
case h.duration < MinBackoffDelay:
h.duration = MinBackoffDelay
case h.duration < MaxBackoffDelay:
jitter := rand.Intn(MaxBackoffJitterCoff)
h.duration = (BackoffMultiplier * h.duration) + time.Duration(jitter)*time.Millisecond
if h.duration > MaxBackoffDelay || h.duration < 0 {
h.duration = MaxBackoffDelay
}
}
h.attempts += 1
h.lastTried = time.Now()
b.info[id] = h
return h.duration, nil
}
func (b *backoff) cleanup() {
b.mu.Lock()
defer b.mu.Unlock()
for id, h := range b.info {
if time.Since(h.lastTried) > TimeToLive {
delete(b.info, id)
}
}
}
func (b *backoff) cleanupLoop(ctx context.Context) {
ticker := time.NewTicker(b.ci)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return // pubsub shutting down
case <-ticker.C:
b.cleanup()
}
}
}