diff --git a/cmd/waku/flags.go b/cmd/waku/flags.go index f4430590..3c94cd47 100644 --- a/cmd/waku/flags.go +++ b/cmd/waku/flags.go @@ -395,8 +395,8 @@ var ( }) RendezvousServer = altsrc.NewBoolFlag(&cli.BoolFlag{ Name: "rendezvous-server", - Usage: "Enable rendezvous protocol server so other peers can use this node for discovery", - Destination: &options.Rendezvous.Server, + Usage: "Enable rendezvous protocol so other peers can use this node for discovery", + Destination: &options.Rendezvous.Enable, EnvVars: []string{"WAKUNODE2_RENDEZVOUS_SERVER"}, }) PeerExchange = altsrc.NewBoolFlag(&cli.BoolFlag{ diff --git a/cmd/waku/node.go b/cmd/waku/node.go index 2043382c..e156bfbd 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -65,7 +65,7 @@ func failOnErr(err error, msg string) { } func requiresDB(options Options) bool { - return options.Store.Enable || options.Rendezvous.Server + return options.Store.Enable || options.Rendezvous.Enable } func scalePerc(value float64) float64 { @@ -282,12 +282,8 @@ func Execute(options Options) { } if options.Rendezvous.Enable { - nodeOpts = append(nodeOpts, node.WithRendezvous(options.Rendezvous.Nodes)) - } - - if options.Rendezvous.Server { rdb := rendezvous.NewDB(ctx, db, logger) - nodeOpts = append(nodeOpts, node.WithRendezvousServer(rdb)) + nodeOpts = append(nodeOpts, node.WithRendezvous(rdb)) } checkForRLN(logger, options, &nodeOpts) @@ -319,6 +315,8 @@ func Execute(options Options) { options.Relay.Topics = *cli.NewStringSlice(relay.DefaultWakuTopic) } + var wg sync.WaitGroup + if options.Relay.Enable { for _, nodeTopic := range options.Relay.Topics.Value() { nodeTopic := nodeTopic @@ -326,17 +324,38 @@ func Execute(options Options) { failOnErr(err, "Error subscring to topic") sub.Unsubscribe() - if options.Rendezvous.Enable { + if len(options.Rendezvous.Nodes) != 0 { // Register the node in rendezvous point - // TODO: we have to determine how discovery would work with relay subscriptions. - // It might make sense to use pubsub.WithDiscovery option of gossipsub and - // register DiscV5, PeerExchange and Rendezvous. This should be an - // application concern instead of having (i.e. ./build/waku or the status app) - // instead of having the wakunode being the one deciding to advertise which - // topics/shards it supports - wakuNode.Rendezvous().Register(ctx, nodeTopic) + var rp []peer.ID + for _, n := range options.Rendezvous.Nodes { + peerID, err := utils.GetPeerID(n) + if err != nil { + failOnErr(err, "registering rendezvous nodes") + } + rp = append(rp, peerID) + } + iter := rendezvous.NewRendezvousPointIterator(rp) + wg.Add(1) go func(nodeTopic string) { + t := time.NewTicker(rendezvous.RegisterDefaultTTL) + defer t.Stop() + defer wg.Done() + + for { + select { + case <-ctx.Done(): + return + case <-t.C: + // Register in rendezvous points periodically + wakuNode.Rendezvous().RegisterWithTopic(ctx, nodeTopic, iter.RendezvousPoints()) + } + } + }(nodeTopic) + + wg.Add(1) + go func(nodeTopic string) { + defer wg.Done() desiredOutDegree := wakuNode.Relay().Params().D t := time.NewTicker(7 * time.Second) defer t.Stop() @@ -351,8 +370,12 @@ func Execute(options Options) { continue } + rp := <-iter.Next(ctx) + if rp == nil { + continue + } ctx, cancel := context.WithTimeout(ctx, 7*time.Second) - wakuNode.Rendezvous().Discover(ctx, nodeTopic, peersToFind) + wakuNode.Rendezvous().DiscoverWithTopic(ctx, nodeTopic, rp, peersToFind) cancel() } } @@ -413,8 +436,6 @@ func Execute(options Options) { } } - var wg sync.WaitGroup - if options.Store.Enable && len(options.Store.ResumeNodes) != 0 { // TODO: extract this to a function and run it when you go offline // TODO: determine if a store is listening to a topic diff --git a/cmd/waku/options.go b/cmd/waku/options.go index bec9492d..998297ba 100644 --- a/cmd/waku/options.go +++ b/cmd/waku/options.go @@ -140,7 +140,6 @@ type PeerExchangeOptions struct { // RendezvousOptions are settings used with the rendezvous protocol type RendezvousOptions struct { Enable bool - Server bool Nodes []multiaddr.Multiaddr } diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 108a7a3a..41e7742e 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -264,16 +264,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { return nil, err } - var rendezvousPoints []peer.ID - for _, p := range w.opts.rendezvousNodes { - peerID, err := utils.GetPeerID(p) - if err != nil { - return nil, err - } - rendezvousPoints = append(rendezvousPoints, peerID) - } - - w.rendezvous = rendezvous.NewRendezvous(w.opts.enableRendezvousServer, w.opts.rendezvousDB, rendezvousPoints, w.peerConnector, w.log) + w.rendezvous = rendezvous.NewRendezvous(w.opts.rendezvousDB, w.peerConnector, w.log) w.relay = relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...) w.legacyFilter = legacy_filter.NewWakuFilter(w.bcaster, w.opts.isLegacyFilterFullnode, w.timesource, w.log, w.opts.legacyFilterOpts...) w.filterFullnode = filter.NewWakuFilterFullnode(w.timesource, w.log, w.opts.filterOpts...) @@ -466,7 +457,7 @@ func (w *WakuNode) Start(ctx context.Context) error { } w.rendezvous.SetHost(host) - if w.opts.enableRendezvousServer { + if w.opts.enableRendezvous { err := w.rendezvous.Start(ctx) if err != nil { return err @@ -496,10 +487,6 @@ func (w *WakuNode) Stop() { defer w.identificationEventSub.Close() defer w.addressChangesSub.Close() - if w.opts.enableRendezvousServer { - w.rendezvous.Stop() - } - w.relay.Stop() w.lightPush.Stop() w.store.Stop() @@ -510,6 +497,7 @@ func (w *WakuNode) Stop() { w.discoveryV5.Stop() } w.peerExchange.Stop() + w.rendezvous.Stop() w.peerConnector.Stop() diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 5b39af51..950500e6 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -80,10 +80,8 @@ type WakuNodeParameters struct { enableStore bool messageProvider store.MessageProvider - rendezvousNodes []multiaddr.Multiaddr - enableRendezvousServer bool - - rendezvousDB *rendezvous.DB + enableRendezvous bool + rendezvousDB *rendezvous.DB discoveryMinPeers int @@ -470,19 +468,11 @@ func WithWebsockets(address string, port int) WakuNodeOption { } } -// WithRendezvous is a WakuOption used to enable rendezvous as a discovery -func WithRendezvous(rendezvousPoints []multiaddr.Multiaddr) WakuNodeOption { - return func(params *WakuNodeParameters) error { - params.rendezvousNodes = rendezvousPoints - return nil - } -} - -// WithRendezvousServer is a WakuOption used to set the node as a rendezvous +// WithRendezvous is a WakuOption used to set the node as a rendezvous // point, using an specific storage for the peer information -func WithRendezvousServer(db *rendezvous.DB) WakuNodeOption { +func WithRendezvous(db *rendezvous.DB) WakuNodeOption { return func(params *WakuNodeParameters) error { - params.enableRendezvousServer = true + params.enableRendezvous = true params.rendezvousDB = db return nil } diff --git a/waku/v2/rendezvous/iterator.go b/waku/v2/rendezvous/iterator.go new file mode 100644 index 00000000..cfa75340 --- /dev/null +++ b/waku/v2/rendezvous/iterator.go @@ -0,0 +1,65 @@ +package rendezvous + +import ( + "context" + "math/rand" + "sort" + "time" + + "github.com/libp2p/go-libp2p/core/peer" +) + +type RendezvousPointIterator struct { + rendezvousPoints []*RendezvousPoint +} + +func NewRendezvousPointIterator(rendezvousPoints []peer.ID) *RendezvousPointIterator { + var rendevousPoints []*RendezvousPoint + for _, rp := range rendezvousPoints { + rendevousPoints = append(rendevousPoints, NewRendezvousPoint(rp)) + } + + return &RendezvousPointIterator{ + rendezvousPoints: rendevousPoints, + } +} + +func (r *RendezvousPointIterator) RendezvousPoints() []*RendezvousPoint { + return r.rendezvousPoints +} + +func (r *RendezvousPointIterator) Next(ctx context.Context) <-chan *RendezvousPoint { + var dialableRP []*RendezvousPoint + now := time.Now() + for _, rp := range r.rendezvousPoints { + if now.After(rp.NextTry()) { + dialableRP = append(dialableRP, rp) + } + } + + result := make(chan *RendezvousPoint, 1) + + if len(dialableRP) > 0 { + result <- r.rendezvousPoints[rand.Intn(len(r.rendezvousPoints))] // nolint: gosec + } else { + if len(r.rendezvousPoints) > 0 { + sort.Slice(r.rendezvousPoints, func(i, j int) bool { + return r.rendezvousPoints[i].nextTry.Before(r.rendezvousPoints[j].nextTry) + }) + + tryIn := r.rendezvousPoints[0].NextTry().Sub(now) + timer := time.NewTimer(tryIn) + defer timer.Stop() + + select { + case <-ctx.Done(): + break + case <-timer.C: + result <- r.rendezvousPoints[0] + } + } + } + + close(result) + return result +} diff --git a/waku/v2/rendezvous/rendezvous.go b/waku/v2/rendezvous/rendezvous.go index 7c2b5f29..e9183568 100644 --- a/waku/v2/rendezvous/rendezvous.go +++ b/waku/v2/rendezvous/rendezvous.go @@ -2,16 +2,13 @@ package rendezvous import ( "context" + "errors" "fmt" "math" - "math/rand" - "sort" "sync" "time" "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/p2p/discovery/backoff" rvs "github.com/waku-org/go-libp2p-rendezvous" v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/peers" @@ -20,60 +17,31 @@ import ( ) const RendezvousID = rvs.RendezvousProto - -type rendezvousPoint struct { - sync.RWMutex - - id peer.ID - cookie []byte - - bkf backoff.BackoffStrategy - nextTry time.Time -} - -type PeerConnector interface { - Subscribe(context.Context, <-chan v2.PeerData) -} +const RegisterDefaultTTL = rvs.DefaultTTL * time.Second type Rendezvous struct { host host.Host - enableServer bool db *DB rendezvousSvc *rvs.RendezvousService - rendezvousPoints []*rendezvousPoint - peerConnector PeerConnector + peerConnector PeerConnector log *zap.Logger wg sync.WaitGroup cancel context.CancelFunc } -// NewRendezvous creates an instance of a Rendezvous which might act as rendezvous point for other nodes, or act as a client node -func NewRendezvous(enableServer bool, db *DB, rendezvousPoints []peer.ID, peerConnector PeerConnector, log *zap.Logger) *Rendezvous { +type PeerConnector interface { + Subscribe(context.Context, <-chan v2.PeerData) +} + +func NewRendezvous(db *DB, peerConnector PeerConnector, log *zap.Logger) *Rendezvous { logger := log.Named("rendezvous") - - rngSrc := rand.NewSource(rand.Int63()) - minBackoff, maxBackoff := time.Second*30, time.Hour - bkf := backoff.NewExponentialBackoff(minBackoff, maxBackoff, backoff.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc)) - - var rendevousPoints []*rendezvousPoint - now := time.Now() - for _, rp := range rendezvousPoints { - rendevousPoints = append(rendevousPoints, &rendezvousPoint{ - id: rp, - nextTry: now, - bkf: bkf(), - }) - } - return &Rendezvous{ - enableServer: enableServer, - db: db, - rendezvousPoints: rendevousPoints, - peerConnector: peerConnector, - log: logger, + db: db, + peerConnector: peerConnector, + log: logger, } } @@ -83,19 +51,21 @@ func (r *Rendezvous) SetHost(h host.Host) { } func (r *Rendezvous) Start(ctx context.Context) error { + if r.cancel != nil { + return errors.New("already started") + } + ctx, cancel := context.WithCancel(ctx) r.cancel = cancel - if r.enableServer { - err := r.db.Start(ctx) - if err != nil { - cancel() - return err - } - - r.rendezvousSvc = rvs.NewRendezvousService(r.host, r.db) + err := r.db.Start(ctx) + if err != nil { + cancel() + return err } + r.rendezvousSvc = rvs.NewRendezvousService(r.host, r.db) + r.log.Info("rendezvous protocol started") return nil } @@ -103,93 +73,49 @@ func (r *Rendezvous) Start(ctx context.Context) error { const registerBackoff = 200 * time.Millisecond const registerMaxRetries = 7 -func (r *Rendezvous) getRandomRendezvousPoint(ctx context.Context) <-chan *rendezvousPoint { - var dialableRP []*rendezvousPoint - now := time.Now() - for _, rp := range r.rendezvousPoints { - if now.After(rp.NextTry()) { - dialableRP = append(dialableRP, rp) - } +func (r *Rendezvous) Discover(ctx context.Context, rp *RendezvousPoint, numPeers int) { + r.DiscoverWithTopic(ctx, protocol.DefaultPubsubTopic().String(), rp, numPeers) +} + +func (r *Rendezvous) DiscoverShard(ctx context.Context, rp *RendezvousPoint, cluster uint16, shard uint16, numPeers int) { + namespace := ShardToNamespace(cluster, shard) + r.DiscoverWithTopic(ctx, namespace, rp, numPeers) +} + +func (r *Rendezvous) DiscoverWithTopic(ctx context.Context, topic string, rp *RendezvousPoint, numPeers int) { + rendezvousClient := rvs.NewRendezvousClient(r.host, rp.id) + + addrInfo, cookie, err := rendezvousClient.Discover(ctx, topic, numPeers, rp.cookie) + if err != nil { + r.log.Error("could not discover new peers", zap.Error(err)) + rp.Delay() + return } - result := make(chan *rendezvousPoint, 1) - - if len(dialableRP) > 0 { - result <- r.rendezvousPoints[rand.Intn(len(r.rendezvousPoints))] // nolint: gosec - } else { - if len(r.rendezvousPoints) > 0 { - sort.Slice(r.rendezvousPoints, func(i, j int) bool { - return r.rendezvousPoints[i].nextTry.Before(r.rendezvousPoints[j].nextTry) - }) - - tryIn := r.rendezvousPoints[0].NextTry().Sub(now) - timer := time.NewTimer(tryIn) - defer timer.Stop() + if len(addrInfo) != 0 { + rp.SetSuccess(cookie) + peerCh := make(chan v2.PeerData) + defer close(peerCh) + r.peerConnector.Subscribe(ctx, peerCh) + for _, p := range addrInfo { + peer := v2.PeerData{ + Origin: peers.Rendezvous, + AddrInfo: p, + } select { case <-ctx.Done(): - break - case <-timer.C: - result <- r.rendezvousPoints[0] - } - } - } - - close(result) - return result -} - -func (r *Rendezvous) Discover(ctx context.Context, topic string, numPeers int) { - for { - select { - case <-ctx.Done(): - return - case server, ok := <-r.getRandomRendezvousPoint(ctx): - if !ok { return - } - - rendezvousClient := rvs.NewRendezvousClient(r.host, server.id) - - addrInfo, cookie, err := rendezvousClient.Discover(ctx, topic, numPeers, server.cookie) - if err != nil { - r.log.Error("could not discover new peers", zap.Error(err)) - server.Delay() - continue - } - - if len(addrInfo) != 0 { - server.SetSuccess(cookie) - - peerCh := make(chan v2.PeerData) - r.peerConnector.Subscribe(context.Background(), peerCh) - for _, addr := range addrInfo { - peer := v2.PeerData{ - Origin: peers.Rendezvous, - AddrInfo: addr, - } - fmt.Println("PPPPPPPPPPPPPP") - select { - case peerCh <- peer: - fmt.Println("DISCOVERED") - case <-ctx.Done(): - return - } - } - close(peerCh) - } else { - server.Delay() + case peerCh <- peer: } } + } else { + rp.Delay() } + } -func (r *Rendezvous) DiscoverShard(ctx context.Context, cluster uint16, shard uint16, numPeers int) { - namespace := ShardToNamespace(cluster, shard) - r.Discover(ctx, namespace, numPeers) -} - -func (r *Rendezvous) callRegister(ctx context.Context, rendezvousClient rvs.RendezvousClient, topic string, retries int) (<-chan time.Time, int) { +func (r *Rendezvous) callRegister(ctx context.Context, topic string, rendezvousClient rvs.RendezvousClient, retries int) (<-chan time.Time, int) { ttl, err := rendezvousClient.Register(ctx, topic, rvs.DefaultTTL) var t <-chan time.Time if err != nil { @@ -204,23 +130,38 @@ func (r *Rendezvous) callRegister(ctx context.Context, rendezvousClient rvs.Rend return t, retries } -func (r *Rendezvous) Register(ctx context.Context, topic string) { - for _, m := range r.rendezvousPoints { +func (r *Rendezvous) Register(ctx context.Context, rendezvousPoints []*RendezvousPoint) { + r.RegisterWithTopic(ctx, protocol.DefaultPubsubTopic().String(), rendezvousPoints) +} + +func (r *Rendezvous) RegisterShard(ctx context.Context, cluster uint16, shard uint16, rendezvousPoints []*RendezvousPoint) { + namespace := ShardToNamespace(cluster, shard) + r.RegisterWithTopic(ctx, namespace, rendezvousPoints) +} + +func (r *Rendezvous) RegisterRelayShards(ctx context.Context, rs protocol.RelayShards, rendezvousPoints []*RendezvousPoint) { + for _, idx := range rs.Indices { + go r.RegisterShard(ctx, rs.Cluster, idx, rendezvousPoints) + } +} + +func (r *Rendezvous) RegisterWithTopic(ctx context.Context, topic string, rendezvousPoints []*RendezvousPoint) { + for _, m := range rendezvousPoints { r.wg.Add(1) - go func(m *rendezvousPoint) { + go func(m *RendezvousPoint) { r.wg.Done() rendezvousClient := rvs.NewRendezvousClient(r.host, m.id) retries := 0 var t <-chan time.Time - t, retries = r.callRegister(ctx, rendezvousClient, topic, retries) + t, retries = r.callRegister(ctx, topic, rendezvousClient, retries) for { select { case <-ctx.Done(): return case <-t: - t, retries = r.callRegister(ctx, rendezvousClient, topic, retries) + t, retries = r.callRegister(ctx, topic, rendezvousClient, retries) if retries >= registerMaxRetries { return } @@ -230,18 +171,11 @@ func (r *Rendezvous) Register(ctx context.Context, topic string) { } } -func (r *Rendezvous) RegisterShard(ctx context.Context, cluster uint16, shard uint16) { - namespace := ShardToNamespace(cluster, shard) - r.Register(ctx, namespace) -} - -func (r *Rendezvous) RegisterRelayShards(ctx context.Context, rs protocol.RelayShards) { - for _, idx := range rs.Indices { - go r.RegisterShard(ctx, rs.Cluster, idx) - } -} - func (r *Rendezvous) Stop() { + if r.cancel == nil { + return + } + r.cancel() r.wg.Wait() r.host.RemoveStreamHandler(rvs.RendezvousProto) @@ -251,24 +185,3 @@ func (r *Rendezvous) Stop() { func ShardToNamespace(cluster uint16, shard uint16) string { return fmt.Sprintf("rs/%d/%d", cluster, shard) } - -func (rp *rendezvousPoint) Delay() { - rp.Lock() - defer rp.Unlock() - - rp.nextTry = time.Now().Add(rp.bkf.Delay()) -} - -func (rp *rendezvousPoint) SetSuccess(cookie []byte) { - rp.Lock() - defer rp.Unlock() - - rp.bkf.Reset() - rp.cookie = cookie -} - -func (rp *rendezvousPoint) NextTry() time.Time { - rp.RLock() - defer rp.RUnlock() - return rp.nextTry -} diff --git a/waku/v2/rendezvous/rendezvous_point.go b/waku/v2/rendezvous/rendezvous_point.go new file mode 100644 index 00000000..7654587e --- /dev/null +++ b/waku/v2/rendezvous/rendezvous_point.go @@ -0,0 +1,57 @@ +package rendezvous + +import ( + "math/rand" + "sync" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/discovery/backoff" +) + +type RendezvousPoint struct { + sync.RWMutex + + id peer.ID + cookie []byte + + bkf backoff.BackoffStrategy + nextTry time.Time +} + +func NewRendezvousPoint(peerID peer.ID) *RendezvousPoint { + rngSrc := rand.NewSource(rand.Int63()) + minBackoff, maxBackoff := time.Second*30, time.Hour + bkf := backoff.NewExponentialBackoff(minBackoff, maxBackoff, backoff.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc)) + + now := time.Now() + + rp := &RendezvousPoint{ + id: peerID, + nextTry: now, + bkf: bkf(), + } + + return rp +} + +func (rp *RendezvousPoint) Delay() { + rp.Lock() + defer rp.Unlock() + + rp.nextTry = time.Now().Add(rp.bkf.Delay()) +} + +func (rp *RendezvousPoint) SetSuccess(cookie []byte) { + rp.Lock() + defer rp.Unlock() + + rp.bkf.Reset() + rp.cookie = cookie +} + +func (rp *RendezvousPoint) NextTry() time.Time { + rp.RLock() + defer rp.RUnlock() + return rp.nextTry +} diff --git a/waku/v2/rendezvous/rendezvous_test.go b/waku/v2/rendezvous/rendezvous_test.go index 2843a5c4..5ead840c 100644 --- a/waku/v2/rendezvous/rendezvous_test.go +++ b/waku/v2/rendezvous/rendezvous_test.go @@ -35,9 +35,12 @@ func NewPeerConn() *PeerConn { const testTopic = "test" func TestRendezvous(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + port1, err := tests.FindFreePort(t, "", 5) require.NoError(t, err) - host1, err := tests.MakeHost(context.Background(), port1, rand.Reader) + host1, err := tests.MakeHost(ctx, port1, rand.Reader) require.NoError(t, err) var db *sql.DB @@ -47,19 +50,20 @@ func TestRendezvous(t *testing.T) { err = migration(db) require.NoError(t, err) - rdb := NewDB(context.Background(), db, utils.Logger()) - rendezvousPoint := NewRendezvous(true, rdb, nil, NewPeerConn(), utils.Logger()) + rdb := NewDB(ctx, db, utils.Logger()) + rendezvousPoint := NewRendezvous(rdb, nil, utils.Logger()) rendezvousPoint.SetHost(host1) - err = rendezvousPoint.Start(context.Background()) + err = rendezvousPoint.Start(ctx) require.NoError(t, err) defer rendezvousPoint.Stop() + host1RP := NewRendezvousPoint(host1.ID()) hostInfo, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", host1.ID().Pretty())) host1Addr := host1.Addrs()[0].Encapsulate(hostInfo) port2, err := tests.FindFreePort(t, "", 5) require.NoError(t, err) - host2, err := tests.MakeHost(context.Background(), port2, rand.Reader) + host2, err := tests.MakeHost(ctx, port2, rand.Reader) require.NoError(t, err) info, err := peer.AddrInfoFromP2pAddr(host1Addr) @@ -69,13 +73,10 @@ func TestRendezvous(t *testing.T) { err = host2.Peerstore().AddProtocols(info.ID, RendezvousID) require.NoError(t, err) - rendezvousClient1 := NewRendezvous(false, nil, []peer.ID{host1.ID()}, NewPeerConn(), utils.Logger()) + rendezvousClient1 := NewRendezvous(nil, nil, utils.Logger()) rendezvousClient1.SetHost(host2) - err = rendezvousClient1.Start(context.Background()) - require.NoError(t, err) - defer rendezvousClient1.Stop() - rendezvousClient1.Register(context.Background(), testTopic) + rendezvousClient1.RegisterWithTopic(context.Background(), testTopic, []*RendezvousPoint{host1RP}) port3, err := tests.FindFreePort(t, "", 5) require.NoError(t, err) @@ -87,16 +88,14 @@ func TestRendezvous(t *testing.T) { require.NoError(t, err) myPeerConnector := NewPeerConn() - rendezvousClient2 := NewRendezvous(false, nil, []peer.ID{host1.ID()}, myPeerConnector, utils.Logger()) - rendezvousClient2.SetHost(host3) - err = rendezvousClient2.Start(context.Background()) - require.NoError(t, err) - defer rendezvousClient2.Stop() - timedCtx, cancel := context.WithTimeout(context.Background(), 4*time.Second) + rendezvousClient2 := NewRendezvous(nil, myPeerConnector, utils.Logger()) + rendezvousClient2.SetHost(host3) + + timedCtx, cancel := context.WithTimeout(ctx, 4*time.Second) defer cancel() - go rendezvousClient2.Discover(timedCtx, testTopic, 5) + go rendezvousClient2.DiscoverWithTopic(timedCtx, testTopic, host1RP, 1) time.Sleep(500 * time.Millisecond) timer := time.After(3 * time.Second)