diff --git a/cmd/waku/node.go b/cmd/waku/node.go index e156bfbd..7be2616b 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -326,15 +326,7 @@ func Execute(options Options) { if len(options.Rendezvous.Nodes) != 0 { // Register the node in rendezvous point - var rp []peer.ID - for _, n := range options.Rendezvous.Nodes { - peerID, err := utils.GetPeerID(n) - if err != nil { - failOnErr(err, "registering rendezvous nodes") - } - rp = append(rp, peerID) - } - iter := rendezvous.NewRendezvousPointIterator(rp) + iter := rendezvous.NewRendezvousPointIterator(options.Rendezvous.Nodes) wg.Add(1) go func(nodeTopic string) { @@ -348,7 +340,7 @@ func Execute(options Options) { return case <-t.C: // Register in rendezvous points periodically - wakuNode.Rendezvous().RegisterWithTopic(ctx, nodeTopic, iter.RendezvousPoints()) + wakuNode.Rendezvous().RegisterWithNamespace(ctx, nodeTopic, iter.RendezvousPoints()) } } }(nodeTopic) @@ -375,7 +367,7 @@ func Execute(options Options) { continue } ctx, cancel := context.WithTimeout(ctx, 7*time.Second) - wakuNode.Rendezvous().DiscoverWithTopic(ctx, nodeTopic, rp, peersToFind) + wakuNode.Rendezvous().DiscoverWithNamespace(ctx, nodeTopic, rp, peersToFind) cancel() } } diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index 79faf8c7..f03fa8d3 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -29,6 +29,7 @@ import ( var ErrNoDiscV5Listener = errors.New("no discv5 listener") +// PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol type PeerConnector interface { Subscribe(context.Context, <-chan v2.PeerData) } diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 41e7742e..e415d902 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -457,7 +457,7 @@ func (w *WakuNode) Start(ctx context.Context) error { } w.rendezvous.SetHost(host) - if w.opts.enableRendezvous { + if w.opts.enableRendezvousPoint { err := w.rendezvous.Start(ctx) if err != nil { return err diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 950500e6..2be021d1 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -80,8 +80,8 @@ type WakuNodeParameters struct { enableStore bool messageProvider store.MessageProvider - enableRendezvous bool - rendezvousDB *rendezvous.DB + enableRendezvousPoint bool + rendezvousDB *rendezvous.DB discoveryMinPeers int @@ -472,7 +472,7 @@ func WithWebsockets(address string, port int) WakuNodeOption { // point, using an specific storage for the peer information func WithRendezvous(db *rendezvous.DB) WakuNodeOption { return func(params *WakuNodeParameters) error { - params.enableRendezvous = true + params.enableRendezvousPoint = true params.rendezvousDB = db return nil } diff --git a/waku/v2/protocol/peer_exchange/protocol.go b/waku/v2/protocol/peer_exchange/protocol.go index abb92602..bf8f011d 100644 --- a/waku/v2/protocol/peer_exchange/protocol.go +++ b/waku/v2/protocol/peer_exchange/protocol.go @@ -31,6 +31,7 @@ var ( ErrInvalidId = errors.New("invalid request id") ) +// PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol type PeerConnector interface { Subscribe(context.Context, <-chan v2.PeerData) } diff --git a/waku/v2/rendezvous/iterator.go b/waku/v2/rendezvous/iterator.go index cfa75340..a5c2cd00 100644 --- a/waku/v2/rendezvous/iterator.go +++ b/waku/v2/rendezvous/iterator.go @@ -6,17 +6,22 @@ import ( "sort" "time" - "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" + "github.com/waku-org/go-waku/waku/v2/utils" ) type RendezvousPointIterator struct { rendezvousPoints []*RendezvousPoint } -func NewRendezvousPointIterator(rendezvousPoints []peer.ID) *RendezvousPointIterator { +// NewRendezvousPointIterator creates an iterator with a backoff mechanism to use random rendezvous points taking into account successful/unsuccesful connection attempts +func NewRendezvousPointIterator(rendezvousPoints []multiaddr.Multiaddr) *RendezvousPointIterator { var rendevousPoints []*RendezvousPoint for _, rp := range rendezvousPoints { - rendevousPoints = append(rendevousPoints, NewRendezvousPoint(rp)) + peerID, err := utils.GetPeerID(rp) + if err == nil { + rendevousPoints = append(rendevousPoints, NewRendezvousPoint(peerID)) + } } return &RendezvousPointIterator{ @@ -24,10 +29,12 @@ func NewRendezvousPointIterator(rendezvousPoints []peer.ID) *RendezvousPointIter } } +// RendezvousPoints returns the list of rendezvous points registered in this iterator func (r *RendezvousPointIterator) RendezvousPoints() []*RendezvousPoint { return r.rendezvousPoints } +// Next will return a channel that will be triggered as soon as the next rendevous point is available to be used (depending on backoff time) func (r *RendezvousPointIterator) Next(ctx context.Context) <-chan *RendezvousPoint { var dialableRP []*RendezvousPoint now := time.Now() diff --git a/waku/v2/rendezvous/rendezvous.go b/waku/v2/rendezvous/rendezvous.go index e9183568..1c2a0726 100644 --- a/waku/v2/rendezvous/rendezvous.go +++ b/waku/v2/rendezvous/rendezvous.go @@ -16,9 +16,14 @@ import ( "go.uber.org/zap" ) +// RendezvousID is the current protocol ID used for Rendezvous const RendezvousID = rvs.RendezvousProto + +// RegisterDefaultTTL indicates the TTL used by default when registering a node in a rendezvous point +// TODO: Register* functions should allow setting up a custom TTL const RegisterDefaultTTL = rvs.DefaultTTL * time.Second +// Rendezvous is the implementation containing the logic to registering a node and discovering new peers using rendezvous protocol type Rendezvous struct { host host.Host @@ -32,10 +37,12 @@ type Rendezvous struct { cancel context.CancelFunc } +// PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol type PeerConnector interface { Subscribe(context.Context, <-chan v2.PeerData) } +// NewRendezvous creates an instance of Rendezvous struct func NewRendezvous(db *DB, peerConnector PeerConnector, log *zap.Logger) *Rendezvous { logger := log.Named("rendezvous") return &Rendezvous{ @@ -73,19 +80,22 @@ func (r *Rendezvous) Start(ctx context.Context) error { const registerBackoff = 200 * time.Millisecond const registerMaxRetries = 7 +// Discover is used to find a number of peers that use the default pubsub topic func (r *Rendezvous) Discover(ctx context.Context, rp *RendezvousPoint, numPeers int) { - r.DiscoverWithTopic(ctx, protocol.DefaultPubsubTopic().String(), rp, numPeers) + r.DiscoverWithNamespace(ctx, protocol.DefaultPubsubTopic().String(), rp, numPeers) } +// DiscoverShard is used to find a number of peers that support an specific cluster and shard index func (r *Rendezvous) DiscoverShard(ctx context.Context, rp *RendezvousPoint, cluster uint16, shard uint16, numPeers int) { namespace := ShardToNamespace(cluster, shard) - r.DiscoverWithTopic(ctx, namespace, rp, numPeers) + r.DiscoverWithNamespace(ctx, namespace, rp, numPeers) } -func (r *Rendezvous) DiscoverWithTopic(ctx context.Context, topic string, rp *RendezvousPoint, numPeers int) { +// DiscoverWithNamespace is uded to find a number of peers using a custom namespace (usually a pubsub topic) +func (r *Rendezvous) DiscoverWithNamespace(ctx context.Context, namespace string, rp *RendezvousPoint, numPeers int) { rendezvousClient := rvs.NewRendezvousClient(r.host, rp.id) - addrInfo, cookie, err := rendezvousClient.Discover(ctx, topic, numPeers, rp.cookie) + addrInfo, cookie, err := rendezvousClient.Discover(ctx, namespace, numPeers, rp.cookie) if err != nil { r.log.Error("could not discover new peers", zap.Error(err)) rp.Delay() @@ -115,8 +125,8 @@ func (r *Rendezvous) DiscoverWithTopic(ctx context.Context, topic string, rp *Re } -func (r *Rendezvous) callRegister(ctx context.Context, topic string, rendezvousClient rvs.RendezvousClient, retries int) (<-chan time.Time, int) { - ttl, err := rendezvousClient.Register(ctx, topic, rvs.DefaultTTL) +func (r *Rendezvous) callRegister(ctx context.Context, namespace string, rendezvousClient rvs.RendezvousClient, retries int) (<-chan time.Time, int) { + ttl, err := rendezvousClient.Register(ctx, namespace, rvs.DefaultTTL) var t <-chan time.Time if err != nil { r.log.Error("registering rendezvous client", zap.Error(err)) @@ -130,22 +140,26 @@ func (r *Rendezvous) callRegister(ctx context.Context, topic string, rendezvousC return t, retries } +// Register registers the node in the rendezvous points using the default pubsub topic as namespace func (r *Rendezvous) Register(ctx context.Context, rendezvousPoints []*RendezvousPoint) { - r.RegisterWithTopic(ctx, protocol.DefaultPubsubTopic().String(), rendezvousPoints) + r.RegisterWithNamespace(ctx, protocol.DefaultPubsubTopic().String(), rendezvousPoints) } +// RegisterShard registers the node in the rendezvous points using a shard as namespace func (r *Rendezvous) RegisterShard(ctx context.Context, cluster uint16, shard uint16, rendezvousPoints []*RendezvousPoint) { namespace := ShardToNamespace(cluster, shard) - r.RegisterWithTopic(ctx, namespace, rendezvousPoints) + r.RegisterWithNamespace(ctx, namespace, rendezvousPoints) } +// RegisterRelayShards registers the node in the rendezvous point by specifying a RelayShards struct (more than one shard index can be registered) func (r *Rendezvous) RegisterRelayShards(ctx context.Context, rs protocol.RelayShards, rendezvousPoints []*RendezvousPoint) { for _, idx := range rs.Indices { go r.RegisterShard(ctx, rs.Cluster, idx, rendezvousPoints) } } -func (r *Rendezvous) RegisterWithTopic(ctx context.Context, topic string, rendezvousPoints []*RendezvousPoint) { +// RegisterWithNamespace registers the node in the rendezvous point by using an specific namespace (usually a pubsub topic) +func (r *Rendezvous) RegisterWithNamespace(ctx context.Context, namespace string, rendezvousPoints []*RendezvousPoint) { for _, m := range rendezvousPoints { r.wg.Add(1) go func(m *RendezvousPoint) { @@ -155,13 +169,13 @@ func (r *Rendezvous) RegisterWithTopic(ctx context.Context, topic string, rendez retries := 0 var t <-chan time.Time - t, retries = r.callRegister(ctx, topic, rendezvousClient, retries) + t, retries = r.callRegister(ctx, namespace, rendezvousClient, retries) for { select { case <-ctx.Done(): return case <-t: - t, retries = r.callRegister(ctx, topic, rendezvousClient, retries) + t, retries = r.callRegister(ctx, namespace, rendezvousClient, retries) if retries >= registerMaxRetries { return } @@ -182,6 +196,7 @@ func (r *Rendezvous) Stop() { r.rendezvousSvc = nil } +// ShardToNamespace translates a cluster and shard index into a rendezvous namespace func ShardToNamespace(cluster uint16, shard uint16) string { return fmt.Sprintf("rs/%d/%d", cluster, shard) } diff --git a/waku/v2/rendezvous/rendezvous_point.go b/waku/v2/rendezvous/rendezvous_point.go index 7654587e..196638cb 100644 --- a/waku/v2/rendezvous/rendezvous_point.go +++ b/waku/v2/rendezvous/rendezvous_point.go @@ -9,6 +9,7 @@ import ( "github.com/libp2p/go-libp2p/p2p/discovery/backoff" ) +// RendezvousPoint is a structure that represent a node that can be used to discover new peers type RendezvousPoint struct { sync.RWMutex @@ -19,6 +20,7 @@ type RendezvousPoint struct { nextTry time.Time } +// NewRendezvousPoint is used to create a RendezvousPoint func NewRendezvousPoint(peerID peer.ID) *RendezvousPoint { rngSrc := rand.NewSource(rand.Int63()) minBackoff, maxBackoff := time.Second*30, time.Hour @@ -35,6 +37,7 @@ func NewRendezvousPoint(peerID peer.ID) *RendezvousPoint { return rp } +// Delay is used to indicate that the connection to a rendezvous point failed func (rp *RendezvousPoint) Delay() { rp.Lock() defer rp.Unlock() @@ -42,14 +45,17 @@ func (rp *RendezvousPoint) Delay() { rp.nextTry = time.Now().Add(rp.bkf.Delay()) } +// SetSuccess is used to indicate that a connection to a rendezvous point was succesful func (rp *RendezvousPoint) SetSuccess(cookie []byte) { rp.Lock() defer rp.Unlock() rp.bkf.Reset() + rp.nextTry = time.Now() rp.cookie = cookie } +// NextTry returns when can a rendezvous point be used again func (rp *RendezvousPoint) NextTry() time.Time { rp.RLock() defer rp.RUnlock() diff --git a/waku/v2/rendezvous/rendezvous_test.go b/waku/v2/rendezvous/rendezvous_test.go index 5ead840c..bdf5cf31 100644 --- a/waku/v2/rendezvous/rendezvous_test.go +++ b/waku/v2/rendezvous/rendezvous_test.go @@ -76,7 +76,7 @@ func TestRendezvous(t *testing.T) { rendezvousClient1 := NewRendezvous(nil, nil, utils.Logger()) rendezvousClient1.SetHost(host2) - rendezvousClient1.RegisterWithTopic(context.Background(), testTopic, []*RendezvousPoint{host1RP}) + rendezvousClient1.RegisterWithNamespace(context.Background(), testTopic, []*RendezvousPoint{host1RP}) port3, err := tests.FindFreePort(t, "", 5) require.NoError(t, err) @@ -95,7 +95,7 @@ func TestRendezvous(t *testing.T) { timedCtx, cancel := context.WithTimeout(ctx, 4*time.Second) defer cancel() - go rendezvousClient2.DiscoverWithTopic(timedCtx, testTopic, host1RP, 1) + go rendezvousClient2.DiscoverWithNamespace(timedCtx, testTopic, host1RP, 1) time.Sleep(500 * time.Millisecond) timer := time.After(3 * time.Second)