mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-02 04:43:10 +00:00
Adds exponential backoff to re-spawing new streams for supposedly dead peers (#483)
* updates gitignore * implements draft solution * consolidates update and get * extends test * adds cleaner logic * removes a redundant else case * refactors cleanup in a goroutine * adds a jitter to backoff * stretches the sleep for cleanup * reduces jitter time * fixes a test * adds maximum backoff attempts * returns error for closing channel * refactors peer status exceed backoff threshold * converts if-else to switch * nit * consolidates update and maximum backoff check * bug fix * nit * refactors cleanup with a ticker object
This commit is contained in:
parent
0ea9140c95
commit
06b5ba4763
2
.gitignore
vendored
2
.gitignore
vendored
@ -1,3 +1,5 @@
|
||||
cover.out
|
||||
prof.out
|
||||
go-floodsub.test
|
||||
|
||||
.idea/
|
||||
|
||||
107
backoff.go
Normal file
107
backoff.go
Normal file
@ -0,0 +1,107 @@
|
||||
package pubsub
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
)
|
||||
|
||||
const (
|
||||
MinBackoffDelay = 100 * time.Millisecond
|
||||
MaxBackoffDelay = 10 * time.Second
|
||||
TimeToLive = 10 * time.Minute
|
||||
BackoffCleanupInterval = 1 * time.Minute
|
||||
BackoffMultiplier = 2
|
||||
MaxBackoffJitterCoff = 100
|
||||
MaxBackoffAttempts = 4
|
||||
)
|
||||
|
||||
type backoffHistory struct {
|
||||
duration time.Duration
|
||||
lastTried time.Time
|
||||
attempts int
|
||||
}
|
||||
|
||||
type backoff struct {
|
||||
mu sync.Mutex
|
||||
info map[peer.ID]*backoffHistory
|
||||
ct int // size threshold that kicks off the cleaner
|
||||
ci time.Duration // cleanup intervals
|
||||
maxAttempts int // maximum backoff attempts prior to ejection
|
||||
}
|
||||
|
||||
func newBackoff(ctx context.Context, sizeThreshold int, cleanupInterval time.Duration, maxAttempts int) *backoff {
|
||||
b := &backoff{
|
||||
mu: sync.Mutex{},
|
||||
ct: sizeThreshold,
|
||||
ci: cleanupInterval,
|
||||
maxAttempts: maxAttempts,
|
||||
info: make(map[peer.ID]*backoffHistory),
|
||||
}
|
||||
|
||||
rand.Seed(time.Now().UnixNano()) // used for jitter
|
||||
go b.cleanupLoop(ctx)
|
||||
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *backoff) updateAndGet(id peer.ID) (time.Duration, error) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
h, ok := b.info[id]
|
||||
switch {
|
||||
case !ok || time.Since(h.lastTried) > TimeToLive:
|
||||
// first request goes immediately.
|
||||
h = &backoffHistory{
|
||||
duration: time.Duration(0),
|
||||
attempts: 0,
|
||||
}
|
||||
case h.attempts >= b.maxAttempts:
|
||||
return 0, fmt.Errorf("peer %s has reached its maximum backoff attempts", id)
|
||||
|
||||
case h.duration < MinBackoffDelay:
|
||||
h.duration = MinBackoffDelay
|
||||
|
||||
case h.duration < MaxBackoffDelay:
|
||||
jitter := rand.Intn(MaxBackoffJitterCoff)
|
||||
h.duration = (BackoffMultiplier * h.duration) + time.Duration(jitter)*time.Millisecond
|
||||
if h.duration > MaxBackoffDelay || h.duration < 0 {
|
||||
h.duration = MaxBackoffDelay
|
||||
}
|
||||
}
|
||||
|
||||
h.attempts += 1
|
||||
h.lastTried = time.Now()
|
||||
b.info[id] = h
|
||||
return h.duration, nil
|
||||
}
|
||||
|
||||
func (b *backoff) cleanup() {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
for id, h := range b.info {
|
||||
if time.Since(h.lastTried) > TimeToLive {
|
||||
delete(b.info, id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *backoff) cleanupLoop(ctx context.Context) {
|
||||
ticker := time.NewTicker(b.ci)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return // pubsub shutting down
|
||||
case <-ticker.C:
|
||||
b.cleanup()
|
||||
}
|
||||
}
|
||||
}
|
||||
122
backoff_test.go
Normal file
122
backoff_test.go
Normal file
@ -0,0 +1,122 @@
|
||||
package pubsub
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
)
|
||||
|
||||
func TestBackoff_Update(t *testing.T) {
|
||||
id1 := peer.ID("peer-1")
|
||||
id2 := peer.ID("peer-2")
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
size := 10
|
||||
cleanupInterval := 5 * time.Second
|
||||
maxBackoffAttempts := 10
|
||||
|
||||
b := newBackoff(ctx, size, cleanupInterval, maxBackoffAttempts)
|
||||
|
||||
if len(b.info) > 0 {
|
||||
t.Fatal("non-empty info map for backoff")
|
||||
}
|
||||
|
||||
if d, err := b.updateAndGet(id1); d != time.Duration(0) || err != nil {
|
||||
t.Fatalf("invalid initialization: %v, \t, %s", d, err)
|
||||
}
|
||||
if d, err := b.updateAndGet(id2); d != time.Duration(0) || err != nil {
|
||||
t.Fatalf("invalid initialization: %v, \t, %s", d, err)
|
||||
}
|
||||
|
||||
for i := 0; i < maxBackoffAttempts-1; i++ {
|
||||
got, err := b.updateAndGet(id1)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error post update: %s", err)
|
||||
}
|
||||
|
||||
expected := time.Duration(math.Pow(BackoffMultiplier, float64(i)) *
|
||||
float64(MinBackoffDelay+MaxBackoffJitterCoff*time.Millisecond))
|
||||
if expected > MaxBackoffDelay {
|
||||
expected = MaxBackoffDelay
|
||||
}
|
||||
|
||||
if expected < got { // considering jitter, expected backoff must always be greater than or equal to actual.
|
||||
t.Fatalf("invalid backoff result, expected: %v, got: %v", expected, got)
|
||||
}
|
||||
}
|
||||
|
||||
// trying once more beyond the threshold, hence expecting exceeding threshold
|
||||
if _, err := b.updateAndGet(id1); err == nil {
|
||||
t.Fatalf("expected an error for going beyond threshold but got nil")
|
||||
}
|
||||
|
||||
got, err := b.updateAndGet(id2)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error post update: %s", err)
|
||||
}
|
||||
if got != MinBackoffDelay {
|
||||
t.Fatalf("invalid backoff result, expected: %v, got: %v", MinBackoffDelay, got)
|
||||
}
|
||||
|
||||
// sets last tried of id2 to long ago that it resets back upon next try.
|
||||
// update attempts on id2 are below threshold, hence peer should never go beyond backoff attempt threshold.
|
||||
b.info[id2].lastTried = time.Now().Add(-TimeToLive)
|
||||
got, err = b.updateAndGet(id2)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error post update: %s", err)
|
||||
}
|
||||
if got != time.Duration(0) {
|
||||
t.Fatalf("invalid ttl expiration, expected: %v, got: %v", time.Duration(0), got)
|
||||
}
|
||||
|
||||
if len(b.info) != 2 {
|
||||
t.Fatalf("pre-invalidation attempt, info map size mismatch, expected: %d, got: %d", 2, len(b.info))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestBackoff_Clean(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
size := 10
|
||||
cleanupInterval := 2 * time.Second
|
||||
maxBackoffAttempts := 100 // setting attempts to a high number hence testing cleanup logic.
|
||||
b := newBackoff(ctx, size, cleanupInterval, maxBackoffAttempts)
|
||||
|
||||
for i := 0; i < size; i++ {
|
||||
id := peer.ID(fmt.Sprintf("peer-%d", i))
|
||||
_, err := b.updateAndGet(id)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error post update: %s", err)
|
||||
}
|
||||
b.info[id].lastTried = time.Now().Add(-TimeToLive) // enforces expiry
|
||||
}
|
||||
|
||||
if len(b.info) != size {
|
||||
t.Fatalf("info map size mismatch, expected: %d, got: %d", size, len(b.info))
|
||||
}
|
||||
|
||||
// waits for a cleanup loop to kick-in
|
||||
time.Sleep(2 * cleanupInterval)
|
||||
|
||||
// next update should trigger cleanup
|
||||
got, err := b.updateAndGet(peer.ID("some-new-peer"))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error post update: %s", err)
|
||||
}
|
||||
if got != time.Duration(0) {
|
||||
t.Fatalf("invalid backoff result, expected: %v, got: %v", time.Duration(0), got)
|
||||
}
|
||||
|
||||
// except "some-new-peer" every other records must be cleaned up
|
||||
if len(b.info) != 1 {
|
||||
t.Fatalf("info map size mismatch, expected: %d, got: %d", 1, len(b.info))
|
||||
}
|
||||
}
|
||||
10
comm.go
10
comm.go
@ -4,6 +4,7 @@ import (
|
||||
"bufio"
|
||||
"context"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
@ -121,6 +122,15 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PubSub) handleNewPeerWithBackoff(ctx context.Context, pid peer.ID, backoff time.Duration, outgoing <-chan *RPC) {
|
||||
select {
|
||||
case <-time.After(backoff):
|
||||
p.handleNewPeer(ctx, pid, outgoing)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) {
|
||||
pid := s.Conn().RemotePeer()
|
||||
r := protoio.NewDelimitedReader(s, p.maxMessageSize)
|
||||
|
||||
11
pubsub.go
11
pubsub.go
@ -112,6 +112,8 @@ type PubSub struct {
|
||||
peerDeadPrioLk sync.RWMutex
|
||||
peerDeadMx sync.Mutex
|
||||
peerDeadPend map[peer.ID]struct{}
|
||||
// backoff for retrying new connections to dead peers
|
||||
deadPeerBackoff *backoff
|
||||
|
||||
// The set of topics we are subscribed to
|
||||
mySubs map[string]map[*Subscription]struct{}
|
||||
@ -255,6 +257,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
|
||||
newPeerError: make(chan peer.ID),
|
||||
peerDead: make(chan struct{}, 1),
|
||||
peerDeadPend: make(map[peer.ID]struct{}),
|
||||
deadPeerBackoff: newBackoff(ctx, 1000, BackoffCleanupInterval, MaxBackoffAttempts),
|
||||
cancelCh: make(chan *Subscription),
|
||||
getPeers: make(chan *listPeerReq),
|
||||
addSub: make(chan *addSubReq),
|
||||
@ -694,12 +697,18 @@ func (p *PubSub) handleDeadPeers() {
|
||||
close(ch)
|
||||
|
||||
if p.host.Network().Connectedness(pid) == network.Connected {
|
||||
backoffDelay, err := p.deadPeerBackoff.updateAndGet(pid)
|
||||
if err != nil {
|
||||
log.Debug(err)
|
||||
continue
|
||||
}
|
||||
|
||||
// still connected, must be a duplicate connection being closed.
|
||||
// we respawn the writer as we need to ensure there is a stream active
|
||||
log.Debugf("peer declared dead but still connected; respawning writer: %s", pid)
|
||||
messages := make(chan *RPC, p.peerOutboundQueueSize)
|
||||
messages <- p.getHelloPacket()
|
||||
go p.handleNewPeer(p.ctx, pid, messages)
|
||||
go p.handleNewPeerWithBackoff(p.ctx, pid, backoffDelay, messages)
|
||||
p.peers[pid] = messages
|
||||
continue
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user