From a5f9ee5ad8ae825a6ccdec5f26373cec81fa2938 Mon Sep 17 00:00:00 2001 From: harsh jain Date: Mon, 18 Sep 2023 16:41:40 +0700 Subject: [PATCH] feat(CommonService): add channel and use commonService in discv5 (#735) * feat(CommonService): add channel and use commonService in discv5 * fix: add mutex to PushToChan * fix: remove generic functionality --- waku/v2/discv5/discover.go | 134 +++++------------- .../peermanager/common_discovery_service.go | 81 +++++++++++ waku/v2/peermanager/peer_connector.go | 9 -- waku/v2/peerstore/waku_peer_store.go | 1 + waku/v2/rendezvous/rendezvous.go | 32 ++--- waku/v2/rendezvous/rendezvous_test.go | 3 + 6 files changed, 133 insertions(+), 127 deletions(-) create mode 100644 waku/v2/peermanager/common_discovery_service.go diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index 84ad5d19..67bd4a2d 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -6,8 +6,6 @@ import ( "errors" "fmt" "net" - "sync" - "sync/atomic" "time" "github.com/libp2p/go-libp2p/core/host" @@ -34,23 +32,20 @@ type PeerConnector interface { } type DiscoveryV5 struct { - params *discV5Parameters - host host.Host - config discover.Config - udpAddr *net.UDPAddr - listener *discover.UDPv5 - localnode *enode.LocalNode - metrics Metrics - peerChannel *peerChannel + params *discV5Parameters + host host.Host + config discover.Config + udpAddr *net.UDPAddr + listener *discover.UDPv5 + localnode *enode.LocalNode + metrics Metrics peerConnector PeerConnector NAT nat.Interface log *zap.Logger - started atomic.Bool - cancel context.CancelFunc - wg *sync.WaitGroup + *peermanager.CommonDiscoveryService } type discV5Parameters struct { @@ -132,13 +127,12 @@ func NewDiscoveryV5(priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConn } return &DiscoveryV5{ - params: params, - peerConnector: peerConnector, - NAT: NAT, - wg: &sync.WaitGroup{}, - peerChannel: &peerChannel{}, - localnode: localnode, - metrics: newMetrics(reg), + params: params, + peerConnector: peerConnector, + NAT: NAT, + CommonDiscoveryService: peermanager.NewCommonDiscoveryService(), + localnode: localnode, + metrics: newMetrics(reg), config: discover.Config{ PrivateKey: priv, Bootnodes: params.bootnodes, @@ -167,9 +161,9 @@ func (d *DiscoveryV5) listen(ctx context.Context) error { d.udpAddr = conn.LocalAddr().(*net.UDPAddr) if d.NAT != nil && !d.udpAddr.IP.IsLoopback() { - d.wg.Add(1) + d.WaitGroup().Add(1) go func() { - defer d.wg.Done() + defer d.WaitGroup().Done() nat.Map(d.NAT, ctx.Done(), "udp", d.udpAddr.Port, d.udpAddr.Port, "go-waku discv5 discovery") }() @@ -197,74 +191,24 @@ func (d *DiscoveryV5) SetHost(h host.Host) { d.host = h } -type peerChannel struct { - mutex sync.Mutex - channel chan peermanager.PeerData - started bool - ctx context.Context -} - -func (p *peerChannel) Start(ctx context.Context) { - p.mutex.Lock() - defer p.mutex.Unlock() - p.started = true - p.ctx = ctx - p.channel = make(chan peermanager.PeerData) -} - -func (p *peerChannel) Stop() { - p.mutex.Lock() - defer p.mutex.Unlock() - if !p.started { - return - } - p.started = false - close(p.channel) -} - -func (p *peerChannel) Subscribe() chan peermanager.PeerData { - return p.channel -} - -func (p *peerChannel) Publish(peer peermanager.PeerData) bool { - p.mutex.Lock() - defer p.mutex.Unlock() - if !p.started { - return false - } - select { - case p.channel <- peer: - case <-p.ctx.Done(): - return false - - } - return true -} - // only works if the discovery v5 hasn't been started yet. func (d *DiscoveryV5) Start(ctx context.Context) error { - // compare and swap sets the discovery v5 to `started` state - // and prevents multiple calls to the start method by being atomic. - if !d.started.CompareAndSwap(false, true) { - return nil - } + return d.CommonDiscoveryService.Start(ctx, d.start) +} - ctx, cancel := context.WithCancel(ctx) - d.cancel = cancel +func (d *DiscoveryV5) start() error { + d.peerConnector.Subscribe(d.Context(), d.GetListeningChan()) - d.peerChannel.Start(ctx) - d.peerConnector.Subscribe(ctx, d.peerChannel.Subscribe()) - - err := d.listen(ctx) + err := d.listen(d.Context()) if err != nil { return err } if d.params.autoFindPeers { - d.wg.Add(1) + d.WaitGroup().Add(1) go func() { - defer d.wg.Done() - d.runDiscoveryV5Loop(ctx) + defer d.WaitGroup().Done() + d.runDiscoveryV5Loop(d.Context()) }() } @@ -284,27 +228,18 @@ func (d *DiscoveryV5) SetBootnodes(nodes []*enode.Node) error { // only works if the discovery v5 is in running state // so we can assume that cancel method is set func (d *DiscoveryV5) Stop() { - if !d.started.CompareAndSwap(true, false) { // if Discoveryv5 is running, set started to false - return - } - - d.cancel() - - if d.listener != nil { - d.listener.Close() - d.listener = nil - d.log.Info("stopped Discovery V5") - } - - d.wg.Wait() - defer func() { if r := recover(); r != nil { d.log.Info("recovering from panic and quitting") } }() - - d.peerChannel.Stop() + d.CommonDiscoveryService.Stop(func() { + if d.listener != nil { + d.listener.Close() + d.listener = nil + d.log.Info("stopped Discovery V5") + } + }) } /* @@ -495,7 +430,7 @@ func (d *DiscoveryV5) peerLoop(ctx context.Context) error { ENR: n, } - if d.peerChannel.Publish(peer) { + if d.PushToChan(peer) { d.log.Debug("published peer into peer channel", logging.HostID("peerID", peer.AddrInfo.ID)) } else { d.log.Debug("could not publish peer into peer channel", logging.HostID("peerID", peer.AddrInfo.ID)) @@ -527,8 +462,3 @@ restartLoop: } d.log.Warn("Discv5 loop stopped") } - -// IsStarted determines whether discoveryV5 started or not -func (d *DiscoveryV5) IsStarted() bool { - return d.started.Load() -} diff --git a/waku/v2/peermanager/common_discovery_service.go b/waku/v2/peermanager/common_discovery_service.go new file mode 100644 index 00000000..0fae5fb5 --- /dev/null +++ b/waku/v2/peermanager/common_discovery_service.go @@ -0,0 +1,81 @@ +package peermanager + +import ( + "context" + "sync" + + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/libp2p/go-libp2p/core/peer" + wps "github.com/waku-org/go-waku/waku/v2/peerstore" + "github.com/waku-org/go-waku/waku/v2/protocol" +) + +// PeerData contains information about a peer useful in establishing connections with it. +type PeerData struct { + Origin wps.Origin + AddrInfo peer.AddrInfo + ENR *enode.Node + PubSubTopics []string +} + +type CommonDiscoveryService struct { + commonService *protocol.CommonService + channel chan PeerData +} + +func NewCommonDiscoveryService() *CommonDiscoveryService { + return &CommonDiscoveryService{ + commonService: protocol.NewCommonService(), + } +} + +func (sp *CommonDiscoveryService) Start(ctx context.Context, fn func() error) error { + return sp.commonService.Start(ctx, func() error { + // currently is used in discv5,peerConnector,rendevzous for returning new discovered Peers to peerConnector for connecting with them + // mutex protection for this operation + sp.channel = make(chan PeerData) + return fn() + }) +} + +func (sp *CommonDiscoveryService) Stop(stopFn func()) { + sp.commonService.Stop(func() { + stopFn() + sp.WaitGroup().Wait() // waitgroup is waited here so that channel can be closed after all the go rountines have stopped in service. + // there is a wait in the CommonService too + close(sp.channel) + }) +} +func (sp *CommonDiscoveryService) GetListeningChan() <-chan PeerData { + return sp.channel +} +func (sp *CommonDiscoveryService) PushToChan(data PeerData) bool { + sp.RLock() + defer sp.RUnlock() + if err := sp.ErrOnNotRunning(); err != nil { + return false + } + select { + case sp.channel <- data: + return true + case <-sp.Context().Done(): + return false + } +} + +func (sp *CommonDiscoveryService) RLock() { + sp.commonService.RLock() +} +func (sp *CommonDiscoveryService) RUnlock() { + sp.commonService.RUnlock() +} + +func (sp *CommonDiscoveryService) Context() context.Context { + return sp.commonService.Context() +} +func (sp *CommonDiscoveryService) ErrOnNotRunning() error { + return sp.commonService.ErrOnNotRunning() +} +func (sp *CommonDiscoveryService) WaitGroup() *sync.WaitGroup { + return sp.commonService.WaitGroup() +} diff --git a/waku/v2/peermanager/peer_connector.go b/waku/v2/peermanager/peer_connector.go index e8c0c31e..cf7414b9 100644 --- a/waku/v2/peermanager/peer_connector.go +++ b/waku/v2/peermanager/peer_connector.go @@ -9,7 +9,6 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/p2p/enode" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -25,14 +24,6 @@ import ( lru "github.com/hashicorp/golang-lru" ) -// PeerData contains information about a peer useful in establishing connections with it. -type PeerData struct { - Origin wps.Origin - AddrInfo peer.AddrInfo - PubSubTopics []string - ENR *enode.Node -} - // PeerConnectionStrategy is a utility to connect to peers, // but only if we have not recently tried connecting to them already type PeerConnectionStrategy struct { diff --git a/waku/v2/peerstore/waku_peer_store.go b/waku/v2/peerstore/waku_peer_store.go index 0d856001..f8390fbd 100644 --- a/waku/v2/peerstore/waku_peer_store.go +++ b/waku/v2/peerstore/waku_peer_store.go @@ -20,6 +20,7 @@ const ( PeerExchange DNSDiscovery Rendezvous + PeerManager ) const peerOrigin = "origin" diff --git a/waku/v2/rendezvous/rendezvous.go b/waku/v2/rendezvous/rendezvous.go index 5b21ffaf..e28d7b15 100644 --- a/waku/v2/rendezvous/rendezvous.go +++ b/waku/v2/rendezvous/rendezvous.go @@ -31,7 +31,7 @@ type Rendezvous struct { peerConnector PeerConnector log *zap.Logger - *protocol.CommonService + *peermanager.CommonDiscoveryService } // PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol @@ -43,10 +43,10 @@ type PeerConnector interface { func NewRendezvous(db *DB, peerConnector PeerConnector, log *zap.Logger) *Rendezvous { logger := log.Named("rendezvous") return &Rendezvous{ - db: db, - peerConnector: peerConnector, - log: logger, - CommonService: protocol.NewCommonService(), + db: db, + peerConnector: peerConnector, + log: logger, + CommonDiscoveryService: peermanager.NewCommonDiscoveryService(), } } @@ -56,14 +56,19 @@ func (r *Rendezvous) SetHost(h host.Host) { } func (r *Rendezvous) Start(ctx context.Context) error { - return r.CommonService.Start(ctx, r.start) + return r.CommonDiscoveryService.Start(ctx, r.start) } func (r *Rendezvous) start() error { - err := r.db.Start(r.Context()) - if err != nil { - return err + if r.db != nil { + if err := r.db.Start(r.Context()); err != nil { + return err + } } + if r.peerConnector != nil { + r.peerConnector.Subscribe(r.Context(), r.GetListeningChan()) + } + r.rendezvousSvc = rvs.NewRendezvousService(r.host, r.db) r.log.Info("rendezvous protocol started") @@ -98,19 +103,14 @@ func (r *Rendezvous) DiscoverWithNamespace(ctx context.Context, namespace string if len(addrInfo) != 0 { rp.SetSuccess(cookie) - peerCh := make(chan peermanager.PeerData) - defer close(peerCh) - r.peerConnector.Subscribe(ctx, peerCh) for _, p := range addrInfo { peer := peermanager.PeerData{ Origin: peerstore.Rendezvous, AddrInfo: p, PubSubTopics: []string{namespace}, } - select { - case <-ctx.Done(): + if !r.PushToChan(peer) { return - case peerCh <- peer: } } } else { @@ -180,7 +180,7 @@ func (r *Rendezvous) RegisterWithNamespace(ctx context.Context, namespace string } func (r *Rendezvous) Stop() { - r.CommonService.Stop(func() { + r.CommonDiscoveryService.Stop(func() { r.host.RemoveStreamHandler(rvs.RendezvousProto) r.rendezvousSvc = nil }) diff --git a/waku/v2/rendezvous/rendezvous_test.go b/waku/v2/rendezvous/rendezvous_test.go index ccf2ac78..a2b1d881 100644 --- a/waku/v2/rendezvous/rendezvous_test.go +++ b/waku/v2/rendezvous/rendezvous_test.go @@ -92,6 +92,8 @@ func TestRendezvous(t *testing.T) { rendezvousClient2 := NewRendezvous(nil, myPeerConnector, utils.Logger()) rendezvousClient2.SetHost(host3) + err = rendezvousClient2.Start(ctx) + require.NoError(t, err) timedCtx, cancel := context.WithTimeout(ctx, 4*time.Second) defer cancel() @@ -108,4 +110,5 @@ func TestRendezvous(t *testing.T) { case p := <-myPeerConnector.ch: require.Equal(t, p.AddrInfo.ID.Pretty(), host2.ID().Pretty()) } + rendezvousClient2.Stop() }