diff --git a/waku/v2/rendezvous/rendezvous.go b/waku/v2/rendezvous/rendezvous.go index f43b8214..7c2b5f29 100644 --- a/waku/v2/rendezvous/rendezvous.go +++ b/waku/v2/rendezvous/rendezvous.go @@ -5,11 +5,13 @@ import ( "fmt" "math" "math/rand" + "sort" "sync" "time" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/discovery/backoff" rvs "github.com/waku-org/go-libp2p-rendezvous" v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/peers" @@ -24,6 +26,9 @@ type rendezvousPoint struct { id peer.ID cookie []byte + + bkf backoff.BackoffStrategy + nextTry time.Time } type PeerConnector interface { @@ -49,10 +54,17 @@ type Rendezvous struct { func NewRendezvous(enableServer bool, db *DB, rendezvousPoints []peer.ID, peerConnector PeerConnector, log *zap.Logger) *Rendezvous { logger := log.Named("rendezvous") + rngSrc := rand.NewSource(rand.Int63()) + minBackoff, maxBackoff := time.Second*30, time.Hour + bkf := backoff.NewExponentialBackoff(minBackoff, maxBackoff, backoff.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc)) + var rendevousPoints []*rendezvousPoint + now := time.Now() for _, rp := range rendezvousPoints { rendevousPoints = append(rendevousPoints, &rendezvousPoint{ - id: rp, + id: rp, + nextTry: now, + bkf: bkf(), }) } @@ -91,8 +103,40 @@ func (r *Rendezvous) Start(ctx context.Context) error { const registerBackoff = 200 * time.Millisecond const registerMaxRetries = 7 -func (r *Rendezvous) getRandomServer() *rendezvousPoint { - return r.rendezvousPoints[rand.Intn(len(r.rendezvousPoints))] // nolint: gosec +func (r *Rendezvous) getRandomRendezvousPoint(ctx context.Context) <-chan *rendezvousPoint { + var dialableRP []*rendezvousPoint + now := time.Now() + for _, rp := range r.rendezvousPoints { + if now.After(rp.NextTry()) { + dialableRP = append(dialableRP, rp) + } + } + + result := make(chan *rendezvousPoint, 1) + + if len(dialableRP) > 0 { + result <- r.rendezvousPoints[rand.Intn(len(r.rendezvousPoints))] // nolint: gosec + } else { + if len(r.rendezvousPoints) > 0 { + sort.Slice(r.rendezvousPoints, func(i, j int) bool { + return r.rendezvousPoints[i].nextTry.Before(r.rendezvousPoints[j].nextTry) + }) + + tryIn := r.rendezvousPoints[0].NextTry().Sub(now) + timer := time.NewTimer(tryIn) + defer timer.Stop() + + select { + case <-ctx.Done(): + break + case <-timer.C: + result <- r.rendezvousPoints[0] + } + } + } + + close(result) + return result } func (r *Rendezvous) Discover(ctx context.Context, topic string, numPeers int) { @@ -100,23 +144,22 @@ func (r *Rendezvous) Discover(ctx context.Context, topic string, numPeers int) { select { case <-ctx.Done(): return - default: - server := r.getRandomServer() + case server, ok := <-r.getRandomRendezvousPoint(ctx): + if !ok { + return + } rendezvousClient := rvs.NewRendezvousClient(r.host, server.id) addrInfo, cookie, err := rendezvousClient.Discover(ctx, topic, numPeers, server.cookie) if err != nil { r.log.Error("could not discover new peers", zap.Error(err)) - cookie = nil - // TODO: add backoff strategy - // continue + server.Delay() + continue } if len(addrInfo) != 0 { - server.Lock() - server.cookie = cookie - server.Unlock() + server.SetSuccess(cookie) peerCh := make(chan v2.PeerData) r.peerConnector.Subscribe(context.Background(), peerCh) @@ -135,8 +178,7 @@ func (r *Rendezvous) Discover(ctx context.Context, topic string, numPeers int) { } close(peerCh) } else { - // TODO: improve this by adding an exponential backoff? - time.Sleep(5 * time.Second) + server.Delay() } } } @@ -209,3 +251,24 @@ func (r *Rendezvous) Stop() { func ShardToNamespace(cluster uint16, shard uint16) string { return fmt.Sprintf("rs/%d/%d", cluster, shard) } + +func (rp *rendezvousPoint) Delay() { + rp.Lock() + defer rp.Unlock() + + rp.nextTry = time.Now().Add(rp.bkf.Delay()) +} + +func (rp *rendezvousPoint) SetSuccess(cookie []byte) { + rp.Lock() + defer rp.Unlock() + + rp.bkf.Reset() + rp.cookie = cookie +} + +func (rp *rendezvousPoint) NextTry() time.Time { + rp.RLock() + defer rp.RUnlock() + return rp.nextTry +}