diff --git a/waku/node.go b/waku/node.go index 657ccaf8..70d9fc50 100644 --- a/waku/node.go +++ b/waku/node.go @@ -345,6 +345,40 @@ func Execute(options Options) { sub, err := wakuNode.Relay().SubscribeToTopic(ctx, nodeTopic) failOnErr(err, "Error subscring to topic") sub.Unsubscribe() + + if options.Rendezvous.Enable { + // Register the node in rendezvous point + // TODO: we have to determine how discovery would work with relay subscriptions. + // It might make sense to use pubsub.WithDiscovery option of gossipsub and + // register DiscV5, PeerExchange and Rendezvous. This should be an + // application concern instead of having (i.e. ./build/waku or the status app) + // instead of having the wakunode being the one deciding to advertise which + // topics/shards it supports + wakuNode.Rendezvous().Register(ctx, nodeTopic) + + go func(nodeTopic string) { + desiredOutDegree := wakuNode.Relay().Params().D + t := time.NewTicker(7 * time.Second) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + peerCnt := len(wakuNode.Relay().PubSub().ListPeers(nodeTopic)) + peersToFind := desiredOutDegree - peerCnt + if peersToFind <= 0 { + continue + } + + ctx, cancel := context.WithTimeout(ctx, 7*time.Second) + wakuNode.Rendezvous().Discover(ctx, nodeTopic, peersToFind) + cancel() + } + } + }(nodeTopic) + + } } for _, protectedTopic := range options.Relay.ProtectedTopics { diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 98de1e42..4729d431 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -274,7 +274,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { rendezvousPoints = append(rendezvousPoints, peerID) } - w.rendezvous = rendezvous.NewRendezvous(w.opts.enableRendezvousServer, w.opts.rendezvousDB, w.opts.enableRendezvous, rendezvousPoints, w.peerConnector, w.log) + w.rendezvous = rendezvous.NewRendezvous(w.opts.enableRendezvousServer, w.opts.rendezvousDB, rendezvousPoints, w.peerConnector, w.log) w.relay = relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...) w.legacyFilter = legacy_filter.NewWakuFilter(w.bcaster, w.opts.isLegacyFilterFullnode, w.timesource, w.log, w.opts.legacyFilterOpts...) w.filterFullnode = filter.NewWakuFilterFullnode(w.timesource, w.log, w.opts.filterOpts...) @@ -474,7 +474,7 @@ func (w *WakuNode) Start(ctx context.Context) error { } w.rendezvous.SetHost(host) - if w.opts.enableRendezvousServer || w.opts.enableRendezvous { + if w.opts.enableRendezvousServer { err := w.rendezvous.Start(ctx) if err != nil { return err @@ -504,7 +504,7 @@ func (w *WakuNode) Stop() { defer w.identificationEventSub.Close() defer w.addressChangesSub.Close() - if w.opts.enableRendezvousServer || w.opts.enableRendezvous { + if w.opts.enableRendezvousServer { w.rendezvous.Stop() } @@ -652,6 +652,14 @@ func (w *WakuNode) PeerExchange() *peer_exchange.WakuPeerExchange { return nil } +// Rendezvous is used to access any operation related to Rendezvous +func (w *WakuNode) Rendezvous() *rendezvous.Rendezvous { + if result, ok := w.rendezvous.(*rendezvous.Rendezvous); ok { + return result + } + return nil +} + // Broadcaster is used to access the message broadcaster that is used to push // messages to different protocols func (w *WakuNode) Broadcaster() relay.Broadcaster { diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 7271a68e..47652ba3 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -82,11 +82,10 @@ type WakuNodeParameters struct { resumeNodes []multiaddr.Multiaddr messageProvider store.MessageProvider - enableRendezvous bool - rendezvousNodes []multiaddr.Multiaddr - + rendezvousNodes []multiaddr.Multiaddr enableRendezvousServer bool - rendezvousDB *rendezvous.DB + + rendezvousDB *rendezvous.DB discoveryMinPeers int @@ -487,7 +486,6 @@ func WithWebsockets(address string, port int) WakuNodeOption { // WithRendezvous is a WakuOption used to enable rendezvous as a discovery func WithRendezvous(rendezvousPoints []multiaddr.Multiaddr) WakuNodeOption { return func(params *WakuNodeParameters) error { - params.enableRendezvous = true params.rendezvousNodes = rendezvousPoints return nil } diff --git a/waku/v2/rendezvous/rendezvous.go b/waku/v2/rendezvous/rendezvous.go index 5b6b0d9a..0b098f76 100644 --- a/waku/v2/rendezvous/rendezvous.go +++ b/waku/v2/rendezvous/rendezvous.go @@ -2,6 +2,7 @@ package rendezvous import ( "context" + "fmt" "math" "math/rand" "sync" @@ -12,7 +13,7 @@ import ( rvs "github.com/waku-org/go-libp2p-rendezvous" v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/peers" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/protocol" "go.uber.org/zap" ) @@ -32,7 +33,6 @@ type Rendezvous struct { db *DB rendezvousSvc *rvs.RendezvousService - discoverPeers bool rendezvousPoints []*rendezvousPoint peerConnector PeerConnector @@ -45,7 +45,7 @@ type PeerConnector interface { PeerChannel() chan<- v2.PeerData } -func NewRendezvous(enableServer bool, db *DB, discoverPeers bool, rendezvousPoints []peer.ID, peerConnector PeerConnector, log *zap.Logger) *Rendezvous { +func NewRendezvous(enableServer bool, db *DB, rendezvousPoints []peer.ID, peerConnector PeerConnector, log *zap.Logger) *Rendezvous { logger := log.Named("rendezvous") var rendevousPoints []*rendezvousPoint @@ -58,7 +58,6 @@ func NewRendezvous(enableServer bool, db *DB, discoverPeers bool, rendezvousPoin return &Rendezvous{ enableServer: enableServer, db: db, - discoverPeers: discoverPeers, rendezvousPoints: rendevousPoints, peerConnector: peerConnector, log: logger, @@ -84,14 +83,6 @@ func (r *Rendezvous) Start(ctx context.Context) error { r.rendezvousSvc = rvs.NewRendezvousService(r.host, r.db) } - r.wg.Add(1) - go r.register(ctx) - - if r.discoverPeers { - r.wg.Add(1) - go r.discover(ctx) - } - r.log.Info("rendezvous protocol started") return nil } @@ -103,7 +94,7 @@ func (r *Rendezvous) getRandomServer() *rendezvousPoint { return r.rendezvousPoints[rand.Intn(len(r.rendezvousPoints))] // nolint: gosec } -func (r *Rendezvous) discover(ctx context.Context) { +func (r *Rendezvous) Discover(ctx context.Context, topic string, numPeers int) { defer r.wg.Done() for { select { @@ -114,7 +105,7 @@ func (r *Rendezvous) discover(ctx context.Context) { rendezvousClient := rvs.NewRendezvousClient(r.host, server.id) - addrInfo, cookie, err := rendezvousClient.Discover(ctx, relay.DefaultWakuTopic, 5, server.cookie) + addrInfo, cookie, err := rendezvousClient.Discover(ctx, topic, numPeers, server.cookie) if err != nil { r.log.Error("could not discover new peers", zap.Error(err)) cookie = nil @@ -146,8 +137,13 @@ func (r *Rendezvous) discover(ctx context.Context) { } } -func (r *Rendezvous) callRegister(ctx context.Context, rendezvousClient rvs.RendezvousClient, retries int) (<-chan time.Time, int) { - ttl, err := rendezvousClient.Register(ctx, relay.DefaultWakuTopic, rvs.DefaultTTL) // TODO: determine which topic to use +func (r *Rendezvous) DiscoverShard(ctx context.Context, cluster uint16, shard uint16, numPeers int) { + namespace := ShardToNamespace(cluster, shard) + r.Discover(ctx, namespace, numPeers) +} + +func (r *Rendezvous) callRegister(ctx context.Context, rendezvousClient rvs.RendezvousClient, topic string, retries int) (<-chan time.Time, int) { + ttl, err := rendezvousClient.Register(ctx, topic, rvs.DefaultTTL) var t <-chan time.Time if err != nil { r.log.Error("registering rendezvous client", zap.Error(err)) @@ -161,9 +157,7 @@ func (r *Rendezvous) callRegister(ctx context.Context, rendezvousClient rvs.Rend return t, retries } -func (r *Rendezvous) register(ctx context.Context) { - defer r.wg.Done() - +func (r *Rendezvous) Register(ctx context.Context, topic string) { for _, m := range r.rendezvousPoints { r.wg.Add(1) go func(m *rendezvousPoint) { @@ -173,13 +167,13 @@ func (r *Rendezvous) register(ctx context.Context) { retries := 0 var t <-chan time.Time - t, retries = r.callRegister(ctx, rendezvousClient, retries) + t, retries = r.callRegister(ctx, rendezvousClient, topic, retries) for { select { case <-ctx.Done(): return case <-t: - t, retries = r.callRegister(ctx, rendezvousClient, retries) + t, retries = r.callRegister(ctx, rendezvousClient, topic, retries) if retries >= registerMaxRetries { return } @@ -189,9 +183,24 @@ func (r *Rendezvous) register(ctx context.Context) { } } +func (r *Rendezvous) RegisterShard(ctx context.Context, cluster uint16, shard uint16) { + namespace := ShardToNamespace(cluster, shard) + r.Register(ctx, namespace) +} + +func (r *Rendezvous) RegisterRelayShards(ctx context.Context, rs protocol.RelayShards) { + for _, idx := range rs.Indices { + go r.RegisterShard(ctx, rs.Cluster, idx) + } +} + func (r *Rendezvous) Stop() { r.cancel() r.wg.Wait() r.host.RemoveStreamHandler(rvs.RendezvousProto) r.rendezvousSvc = nil } + +func ShardToNamespace(cluster uint16, shard uint16) string { + return fmt.Sprintf("rs/%d/%d", cluster, shard) +} diff --git a/waku/v2/rendezvous/rendezvous_test.go b/waku/v2/rendezvous/rendezvous_test.go index 87b04c64..0864702c 100644 --- a/waku/v2/rendezvous/rendezvous_test.go +++ b/waku/v2/rendezvous/rendezvous_test.go @@ -32,6 +32,8 @@ func NewPeerConn() PeerConn { return x } +const testTopic = "test" + func TestRendezvous(t *testing.T) { port1, err := tests.FindFreePort(t, "", 5) require.NoError(t, err) @@ -46,7 +48,7 @@ func TestRendezvous(t *testing.T) { require.NoError(t, err) rdb := NewDB(context.Background(), db, utils.Logger()) - rendezvousPoint := NewRendezvous(true, rdb, false, nil, nil, utils.Logger()) + rendezvousPoint := NewRendezvous(true, rdb, nil, nil, utils.Logger()) rendezvousPoint.SetHost(host1) err = rendezvousPoint.Start(context.Background()) require.NoError(t, err) @@ -67,12 +69,14 @@ func TestRendezvous(t *testing.T) { err = host2.Peerstore().AddProtocols(info.ID, RendezvousID) require.NoError(t, err) - rendezvousClient1 := NewRendezvous(false, nil, false, []peer.ID{host1.ID()}, nil, utils.Logger()) + rendezvousClient1 := NewRendezvous(false, nil, []peer.ID{host1.ID()}, nil, utils.Logger()) rendezvousClient1.SetHost(host2) err = rendezvousClient1.Start(context.Background()) require.NoError(t, err) defer rendezvousClient1.Stop() + rendezvousClient1.Register(context.Background(), testTopic) + port3, err := tests.FindFreePort(t, "", 5) require.NoError(t, err) host3, err := tests.MakeHost(context.Background(), port3, rand.Reader) @@ -84,12 +88,17 @@ func TestRendezvous(t *testing.T) { myPeerConnector := NewPeerConn() - rendezvousClient2 := NewRendezvous(false, nil, true, []peer.ID{host1.ID()}, myPeerConnector, utils.Logger()) + rendezvousClient2 := NewRendezvous(false, nil, []peer.ID{host1.ID()}, myPeerConnector, utils.Logger()) rendezvousClient2.SetHost(host3) err = rendezvousClient2.Start(context.Background()) require.NoError(t, err) defer rendezvousClient2.Stop() + timedCtx, cancel := context.WithTimeout(context.Background(), 4*time.Second) + defer cancel() + + go rendezvousClient2.Discover(timedCtx, testTopic, 5) + timer := time.After(5 * time.Second) select { case <-timer: