From b26859fc6ac45c31dec8da5b390a0c988d84fd59 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Fri, 7 Jul 2023 08:35:22 -0400 Subject: [PATCH] refactor: make discovery connector subscribe to discovery services --- tests/utils.go | 18 ++++---- waku/v2/discovery_connector.go | 54 ++++++++++++++++++---- waku/v2/discv5/discover.go | 32 ++++++++----- waku/v2/node/service.go | 6 --- waku/v2/node/wakunode2.go | 4 +- waku/v2/protocol/peer_exchange/client.go | 6 ++- waku/v2/protocol/peer_exchange/protocol.go | 9 ++-- waku/v2/rendezvous/rendezvous.go | 17 ++++--- waku/v2/rendezvous/rendezvous_test.go | 20 ++++---- 9 files changed, 107 insertions(+), 59 deletions(-) diff --git a/tests/utils.go b/tests/utils.go index eb839668..0f3a623c 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -147,19 +147,17 @@ func NewTestPeerDiscoverer() *TestPeerDiscoverer { peerCh: make(chan v2.PeerData, 10), } - go func() { - for p := range result.peerCh { - result.Lock() - result.peerMap[p.AddrInfo.ID] = struct{}{} - result.Unlock() - } - }() - return result } -func (t *TestPeerDiscoverer) PeerChannel() chan<- v2.PeerData { - return t.peerCh +func (t *TestPeerDiscoverer) Subscribe(ctx context.Context, ch <-chan v2.PeerData) { + go func() { + for p := range ch { + t.Lock() + t.peerMap[p.AddrInfo.ID] = struct{}{} + t.Unlock() + } + }() } func (t *TestPeerDiscoverer) HasPeer(p peer.ID) bool { diff --git a/waku/v2/discovery_connector.go b/waku/v2/discovery_connector.go index 44b0bcb6..9459dcb7 100644 --- a/waku/v2/discovery_connector.go +++ b/waku/v2/discovery_connector.go @@ -35,11 +35,12 @@ type PeerConnectionStrategy struct { workerCtx context.Context workerCancel context.CancelFunc - wg sync.WaitGroup - minPeers int - dialTimeout time.Duration - peerCh chan PeerData - dialCh chan peer.AddrInfo + wg sync.WaitGroup + minPeers int + dialTimeout time.Duration + peerCh chan PeerData + dialCh chan peer.AddrInfo + subscriptions []<-chan PeerData backoff backoff.BackoffFactory mux sync.Mutex @@ -78,9 +79,33 @@ type PeerData struct { ENR *enode.Node } -// PeerChannel exposes the channel on which discovered peers should be pushed -func (c *PeerConnectionStrategy) PeerChannel() chan<- PeerData { - return c.peerCh +// Subscribe receives channels on which discovered peers should be pushed +func (c *PeerConnectionStrategy) Subscribe(ctx context.Context, ch <-chan PeerData) { + if c.cancel != nil { + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.consumeSubscription(ctx, ch) + }() + } else { + c.subscriptions = append(c.subscriptions, ch) + } +} + +func (c *PeerConnectionStrategy) consumeSubscription(ctx context.Context, ch <-chan PeerData) { + for { + select { + case <-ctx.Done(): + return + case p := <-ch: + select { + case <-ctx.Done(): + return + case c.peerCh <- p: + } + } + } + } // Sets the host to be able to mount or consume a protocol @@ -104,6 +129,8 @@ func (c *PeerConnectionStrategy) Start(ctx context.Context) error { go c.workPublisher(ctx) go c.dialPeers(ctx) + c.consumeSubscriptions(ctx) + return nil } @@ -118,6 +145,7 @@ func (c *PeerConnectionStrategy) Stop() { close(c.peerCh) close(c.dialCh) + c.subscriptions = nil c.cancel = nil } @@ -159,6 +187,16 @@ func (c *PeerConnectionStrategy) shouldDialPeers(ctx context.Context) { } } +func (c *PeerConnectionStrategy) consumeSubscriptions(ctx context.Context) { + for _, subs := range c.subscriptions { + c.wg.Add(1) + go func(s <-chan PeerData) { + defer c.wg.Done() + c.consumeSubscription(ctx, s) + }(subs) + } +} + func (c *PeerConnectionStrategy) publishWork(ctx context.Context, p peer.AddrInfo) { select { case c.dialCh <- p: diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index 38254570..a825cc43 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -28,14 +28,20 @@ import ( var ErrNoDiscV5Listener = errors.New("no discv5 listener") +type PeerConnector interface { + Subscribe(context.Context, <-chan v2.PeerData) +} + type DiscoveryV5 struct { - params *discV5Parameters - host host.Host - config discover.Config - udpAddr *net.UDPAddr - listener *discover.UDPv5 - localnode *enode.LocalNode + params *discV5Parameters + host host.Host + config discover.Config + udpAddr *net.UDPAddr + listener *discover.UDPv5 + localnode *enode.LocalNode + peerConnector PeerConnector + peerCh chan v2.PeerData NAT nat.Interface log *zap.Logger @@ -101,10 +107,6 @@ func DefaultOptions() []DiscoveryV5Option { } } -type PeerConnector interface { - PeerChannel() chan<- v2.PeerData -} - func NewDiscoveryV5(priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConnector PeerConnector, log *zap.Logger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) { params := new(discV5Parameters) optList := DefaultOptions() @@ -121,8 +123,8 @@ func NewDiscoveryV5(priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConn } return &DiscoveryV5{ - peerConnector: peerConnector, params: params, + peerConnector: peerConnector, NAT: NAT, wg: &sync.WaitGroup{}, localnode: localnode, @@ -195,6 +197,9 @@ func (d *DiscoveryV5) Start(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) d.cancel = cancel + d.peerCh = make(chan v2.PeerData) + d.peerConnector.Subscribe(ctx, d.peerCh) + err := d.listen(ctx) if err != nil { return err @@ -225,6 +230,7 @@ func (d *DiscoveryV5) Stop() { if !d.started.CompareAndSwap(true, false) { // if Discoveryv5 is running, set started to false return } + d.cancel() if d.listener != nil { @@ -234,6 +240,8 @@ func (d *DiscoveryV5) Stop() { } d.wg.Wait() + + close(d.peerCh) } /* @@ -402,7 +410,7 @@ func (d *DiscoveryV5) peerLoop(ctx context.Context) error { } select { - case d.peerConnector.PeerChannel() <- peer: + case d.peerCh <- peer: case <-ctx.Done(): return nil } diff --git a/waku/v2/node/service.go b/waku/v2/node/service.go index bc0ff8d2..398e22fe 100644 --- a/waku/v2/node/service.go +++ b/waku/v2/node/service.go @@ -4,7 +4,6 @@ import ( "context" "github.com/libp2p/go-libp2p/core/host" - v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/protocol/relay" ) @@ -19,8 +18,3 @@ type ReceptorService interface { Stop() Start(context.Context, relay.Subscription) error } - -type PeerConnectorService interface { - Service - PeerChannel() chan<- v2.PeerData -} diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 09d95417..462c1352 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -80,11 +80,11 @@ type WakuNode struct { log *zap.Logger timesource timesource.Timesource - peerstore peerstore.Peerstore + peerstore peerstore.Peerstore + peerConnector *v2.PeerConnectionStrategy relay Service lightPush Service - peerConnector PeerConnectorService discoveryV5 Service peerExchange Service rendezvous Service diff --git a/waku/v2/protocol/peer_exchange/client.go b/waku/v2/protocol/peer_exchange/client.go index e7c9a5da..cdb8f81d 100644 --- a/waku/v2/protocol/peer_exchange/client.go +++ b/waku/v2/protocol/peer_exchange/client.go @@ -103,6 +103,10 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb wakuPX.wg.Add(1) go func() { defer wakuPX.wg.Done() + + peerCh := make(chan v2.PeerData) + defer close(peerCh) + wakuPX.peerConnector.Subscribe(ctx, peerCh) for _, p := range discoveredPeers { peer := v2.PeerData{ Origin: peers.PeerExchange, @@ -112,7 +116,7 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb select { case <-ctx.Done(): return - case wakuPX.peerConnector.PeerChannel() <- peer: + case peerCh <- peer: } } }() diff --git a/waku/v2/protocol/peer_exchange/protocol.go b/waku/v2/protocol/peer_exchange/protocol.go index 64d718cc..51f157aa 100644 --- a/waku/v2/protocol/peer_exchange/protocol.go +++ b/waku/v2/protocol/peer_exchange/protocol.go @@ -31,6 +31,10 @@ var ( ErrInvalidId = errors.New("invalid request id") ) +type PeerConnector interface { + Subscribe(context.Context, <-chan v2.PeerData) +} + type WakuPeerExchange struct { h host.Host disc *discv5.DiscoveryV5 @@ -44,10 +48,6 @@ type WakuPeerExchange struct { enrCache *enrCache } -type PeerConnector interface { - PeerChannel() chan<- v2.PeerData -} - // NewWakuPeerExchange returns a new instance of WakuPeerExchange struct func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, log *zap.Logger) (*WakuPeerExchange, error) { newEnrCache, err := newEnrCache(MaxCacheSize) @@ -59,6 +59,7 @@ func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, wakuPX.log = log.Named("wakupx") wakuPX.enrCache = newEnrCache wakuPX.peerConnector = peerConnector + return wakuPX, nil } diff --git a/waku/v2/rendezvous/rendezvous.go b/waku/v2/rendezvous/rendezvous.go index 0b098f76..f43b8214 100644 --- a/waku/v2/rendezvous/rendezvous.go +++ b/waku/v2/rendezvous/rendezvous.go @@ -26,6 +26,10 @@ type rendezvousPoint struct { cookie []byte } +type PeerConnector interface { + Subscribe(context.Context, <-chan v2.PeerData) +} + type Rendezvous struct { host host.Host @@ -41,10 +45,7 @@ type Rendezvous struct { cancel context.CancelFunc } -type PeerConnector interface { - PeerChannel() chan<- v2.PeerData -} - +// NewRendezvous creates an instance of a Rendezvous which might act as rendezvous point for other nodes, or act as a client node func NewRendezvous(enableServer bool, db *DB, rendezvousPoints []peer.ID, peerConnector PeerConnector, log *zap.Logger) *Rendezvous { logger := log.Named("rendezvous") @@ -95,7 +96,6 @@ func (r *Rendezvous) getRandomServer() *rendezvousPoint { } func (r *Rendezvous) Discover(ctx context.Context, topic string, numPeers int) { - defer r.wg.Done() for { select { case <-ctx.Done(): @@ -118,17 +118,22 @@ func (r *Rendezvous) Discover(ctx context.Context, topic string, numPeers int) { server.cookie = cookie server.Unlock() + peerCh := make(chan v2.PeerData) + r.peerConnector.Subscribe(context.Background(), peerCh) for _, addr := range addrInfo { peer := v2.PeerData{ Origin: peers.Rendezvous, AddrInfo: addr, } + fmt.Println("PPPPPPPPPPPPPP") select { - case r.peerConnector.PeerChannel() <- peer: + case peerCh <- peer: + fmt.Println("DISCOVERED") case <-ctx.Done(): return } } + close(peerCh) } else { // TODO: improve this by adding an exponential backoff? time.Sleep(5 * time.Second) diff --git a/waku/v2/rendezvous/rendezvous_test.go b/waku/v2/rendezvous/rendezvous_test.go index 0864702c..2843a5c4 100644 --- a/waku/v2/rendezvous/rendezvous_test.go +++ b/waku/v2/rendezvous/rendezvous_test.go @@ -19,17 +19,17 @@ import ( ) type PeerConn struct { - ch chan v2.PeerData + ch <-chan v2.PeerData } -func (p PeerConn) PeerChannel() chan<- v2.PeerData { - return p.ch +func (p *PeerConn) Subscribe(ctx context.Context, ch <-chan v2.PeerData) { + p.ch = ch + } -func NewPeerConn() PeerConn { +func NewPeerConn() *PeerConn { x := PeerConn{} - x.ch = make(chan v2.PeerData, 1000) - return x + return &x } const testTopic = "test" @@ -48,7 +48,7 @@ func TestRendezvous(t *testing.T) { require.NoError(t, err) rdb := NewDB(context.Background(), db, utils.Logger()) - rendezvousPoint := NewRendezvous(true, rdb, nil, nil, utils.Logger()) + rendezvousPoint := NewRendezvous(true, rdb, nil, NewPeerConn(), utils.Logger()) rendezvousPoint.SetHost(host1) err = rendezvousPoint.Start(context.Background()) require.NoError(t, err) @@ -69,7 +69,7 @@ func TestRendezvous(t *testing.T) { err = host2.Peerstore().AddProtocols(info.ID, RendezvousID) require.NoError(t, err) - rendezvousClient1 := NewRendezvous(false, nil, []peer.ID{host1.ID()}, nil, utils.Logger()) + rendezvousClient1 := NewRendezvous(false, nil, []peer.ID{host1.ID()}, NewPeerConn(), utils.Logger()) rendezvousClient1.SetHost(host2) err = rendezvousClient1.Start(context.Background()) require.NoError(t, err) @@ -87,7 +87,6 @@ func TestRendezvous(t *testing.T) { require.NoError(t, err) myPeerConnector := NewPeerConn() - rendezvousClient2 := NewRendezvous(false, nil, []peer.ID{host1.ID()}, myPeerConnector, utils.Logger()) rendezvousClient2.SetHost(host3) err = rendezvousClient2.Start(context.Background()) @@ -98,8 +97,9 @@ func TestRendezvous(t *testing.T) { defer cancel() go rendezvousClient2.Discover(timedCtx, testTopic, 5) + time.Sleep(500 * time.Millisecond) - timer := time.After(5 * time.Second) + timer := time.After(3 * time.Second) select { case <-timer: require.Fail(t, "no peer discovered")