feat(CommonService): add channel and use commonService in discv5 (#735)

* feat(CommonService): add channel and use commonService in discv5

* fix: add mutex to PushToChan

* fix: remove generic functionality
This commit is contained in:
harsh jain 2023-09-18 16:41:40 +07:00 committed by GitHub
parent fb2df14cb7
commit a5f9ee5ad8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 133 additions and 127 deletions

View File

@ -6,8 +6,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"net" "net"
"sync"
"sync/atomic"
"time" "time"
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
@ -34,23 +32,20 @@ type PeerConnector interface {
} }
type DiscoveryV5 struct { type DiscoveryV5 struct {
params *discV5Parameters params *discV5Parameters
host host.Host host host.Host
config discover.Config config discover.Config
udpAddr *net.UDPAddr udpAddr *net.UDPAddr
listener *discover.UDPv5 listener *discover.UDPv5
localnode *enode.LocalNode localnode *enode.LocalNode
metrics Metrics metrics Metrics
peerChannel *peerChannel
peerConnector PeerConnector peerConnector PeerConnector
NAT nat.Interface NAT nat.Interface
log *zap.Logger log *zap.Logger
started atomic.Bool *peermanager.CommonDiscoveryService
cancel context.CancelFunc
wg *sync.WaitGroup
} }
type discV5Parameters struct { type discV5Parameters struct {
@ -132,13 +127,12 @@ func NewDiscoveryV5(priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConn
} }
return &DiscoveryV5{ return &DiscoveryV5{
params: params, params: params,
peerConnector: peerConnector, peerConnector: peerConnector,
NAT: NAT, NAT: NAT,
wg: &sync.WaitGroup{}, CommonDiscoveryService: peermanager.NewCommonDiscoveryService(),
peerChannel: &peerChannel{}, localnode: localnode,
localnode: localnode, metrics: newMetrics(reg),
metrics: newMetrics(reg),
config: discover.Config{ config: discover.Config{
PrivateKey: priv, PrivateKey: priv,
Bootnodes: params.bootnodes, Bootnodes: params.bootnodes,
@ -167,9 +161,9 @@ func (d *DiscoveryV5) listen(ctx context.Context) error {
d.udpAddr = conn.LocalAddr().(*net.UDPAddr) d.udpAddr = conn.LocalAddr().(*net.UDPAddr)
if d.NAT != nil && !d.udpAddr.IP.IsLoopback() { if d.NAT != nil && !d.udpAddr.IP.IsLoopback() {
d.wg.Add(1) d.WaitGroup().Add(1)
go func() { go func() {
defer d.wg.Done() defer d.WaitGroup().Done()
nat.Map(d.NAT, ctx.Done(), "udp", d.udpAddr.Port, d.udpAddr.Port, "go-waku discv5 discovery") nat.Map(d.NAT, ctx.Done(), "udp", d.udpAddr.Port, d.udpAddr.Port, "go-waku discv5 discovery")
}() }()
@ -197,74 +191,24 @@ func (d *DiscoveryV5) SetHost(h host.Host) {
d.host = h d.host = h
} }
type peerChannel struct {
mutex sync.Mutex
channel chan peermanager.PeerData
started bool
ctx context.Context
}
func (p *peerChannel) Start(ctx context.Context) {
p.mutex.Lock()
defer p.mutex.Unlock()
p.started = true
p.ctx = ctx
p.channel = make(chan peermanager.PeerData)
}
func (p *peerChannel) Stop() {
p.mutex.Lock()
defer p.mutex.Unlock()
if !p.started {
return
}
p.started = false
close(p.channel)
}
func (p *peerChannel) Subscribe() chan peermanager.PeerData {
return p.channel
}
func (p *peerChannel) Publish(peer peermanager.PeerData) bool {
p.mutex.Lock()
defer p.mutex.Unlock()
if !p.started {
return false
}
select {
case p.channel <- peer:
case <-p.ctx.Done():
return false
}
return true
}
// only works if the discovery v5 hasn't been started yet. // only works if the discovery v5 hasn't been started yet.
func (d *DiscoveryV5) Start(ctx context.Context) error { func (d *DiscoveryV5) Start(ctx context.Context) error {
// compare and swap sets the discovery v5 to `started` state return d.CommonDiscoveryService.Start(ctx, d.start)
// and prevents multiple calls to the start method by being atomic. }
if !d.started.CompareAndSwap(false, true) {
return nil
}
ctx, cancel := context.WithCancel(ctx) func (d *DiscoveryV5) start() error {
d.cancel = cancel d.peerConnector.Subscribe(d.Context(), d.GetListeningChan())
d.peerChannel.Start(ctx) err := d.listen(d.Context())
d.peerConnector.Subscribe(ctx, d.peerChannel.Subscribe())
err := d.listen(ctx)
if err != nil { if err != nil {
return err return err
} }
if d.params.autoFindPeers { if d.params.autoFindPeers {
d.wg.Add(1) d.WaitGroup().Add(1)
go func() { go func() {
defer d.wg.Done() defer d.WaitGroup().Done()
d.runDiscoveryV5Loop(ctx) d.runDiscoveryV5Loop(d.Context())
}() }()
} }
@ -284,27 +228,18 @@ func (d *DiscoveryV5) SetBootnodes(nodes []*enode.Node) error {
// only works if the discovery v5 is in running state // only works if the discovery v5 is in running state
// so we can assume that cancel method is set // so we can assume that cancel method is set
func (d *DiscoveryV5) Stop() { func (d *DiscoveryV5) Stop() {
if !d.started.CompareAndSwap(true, false) { // if Discoveryv5 is running, set started to false
return
}
d.cancel()
if d.listener != nil {
d.listener.Close()
d.listener = nil
d.log.Info("stopped Discovery V5")
}
d.wg.Wait()
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
d.log.Info("recovering from panic and quitting") d.log.Info("recovering from panic and quitting")
} }
}() }()
d.CommonDiscoveryService.Stop(func() {
d.peerChannel.Stop() if d.listener != nil {
d.listener.Close()
d.listener = nil
d.log.Info("stopped Discovery V5")
}
})
} }
/* /*
@ -495,7 +430,7 @@ func (d *DiscoveryV5) peerLoop(ctx context.Context) error {
ENR: n, ENR: n,
} }
if d.peerChannel.Publish(peer) { if d.PushToChan(peer) {
d.log.Debug("published peer into peer channel", logging.HostID("peerID", peer.AddrInfo.ID)) d.log.Debug("published peer into peer channel", logging.HostID("peerID", peer.AddrInfo.ID))
} else { } else {
d.log.Debug("could not publish peer into peer channel", logging.HostID("peerID", peer.AddrInfo.ID)) d.log.Debug("could not publish peer into peer channel", logging.HostID("peerID", peer.AddrInfo.ID))
@ -527,8 +462,3 @@ restartLoop:
} }
d.log.Warn("Discv5 loop stopped") d.log.Warn("Discv5 loop stopped")
} }
// IsStarted determines whether discoveryV5 started or not
func (d *DiscoveryV5) IsStarted() bool {
return d.started.Load()
}

View File

@ -0,0 +1,81 @@
package peermanager
import (
"context"
"sync"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/peer"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
)
// PeerData contains information about a peer useful in establishing connections with it.
type PeerData struct {
Origin wps.Origin
AddrInfo peer.AddrInfo
ENR *enode.Node
PubSubTopics []string
}
type CommonDiscoveryService struct {
commonService *protocol.CommonService
channel chan PeerData
}
func NewCommonDiscoveryService() *CommonDiscoveryService {
return &CommonDiscoveryService{
commonService: protocol.NewCommonService(),
}
}
func (sp *CommonDiscoveryService) Start(ctx context.Context, fn func() error) error {
return sp.commonService.Start(ctx, func() error {
// currently is used in discv5,peerConnector,rendevzous for returning new discovered Peers to peerConnector for connecting with them
// mutex protection for this operation
sp.channel = make(chan PeerData)
return fn()
})
}
func (sp *CommonDiscoveryService) Stop(stopFn func()) {
sp.commonService.Stop(func() {
stopFn()
sp.WaitGroup().Wait() // waitgroup is waited here so that channel can be closed after all the go rountines have stopped in service.
// there is a wait in the CommonService too
close(sp.channel)
})
}
func (sp *CommonDiscoveryService) GetListeningChan() <-chan PeerData {
return sp.channel
}
func (sp *CommonDiscoveryService) PushToChan(data PeerData) bool {
sp.RLock()
defer sp.RUnlock()
if err := sp.ErrOnNotRunning(); err != nil {
return false
}
select {
case sp.channel <- data:
return true
case <-sp.Context().Done():
return false
}
}
func (sp *CommonDiscoveryService) RLock() {
sp.commonService.RLock()
}
func (sp *CommonDiscoveryService) RUnlock() {
sp.commonService.RUnlock()
}
func (sp *CommonDiscoveryService) Context() context.Context {
return sp.commonService.Context()
}
func (sp *CommonDiscoveryService) ErrOnNotRunning() error {
return sp.commonService.ErrOnNotRunning()
}
func (sp *CommonDiscoveryService) WaitGroup() *sync.WaitGroup {
return sp.commonService.WaitGroup()
}

View File

@ -9,7 +9,6 @@ import (
"sync" "sync"
"time" "time"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
@ -25,14 +24,6 @@ import (
lru "github.com/hashicorp/golang-lru" lru "github.com/hashicorp/golang-lru"
) )
// PeerData contains information about a peer useful in establishing connections with it.
type PeerData struct {
Origin wps.Origin
AddrInfo peer.AddrInfo
PubSubTopics []string
ENR *enode.Node
}
// PeerConnectionStrategy is a utility to connect to peers, // PeerConnectionStrategy is a utility to connect to peers,
// but only if we have not recently tried connecting to them already // but only if we have not recently tried connecting to them already
type PeerConnectionStrategy struct { type PeerConnectionStrategy struct {

View File

@ -20,6 +20,7 @@ const (
PeerExchange PeerExchange
DNSDiscovery DNSDiscovery
Rendezvous Rendezvous
PeerManager
) )
const peerOrigin = "origin" const peerOrigin = "origin"

View File

@ -31,7 +31,7 @@ type Rendezvous struct {
peerConnector PeerConnector peerConnector PeerConnector
log *zap.Logger log *zap.Logger
*protocol.CommonService *peermanager.CommonDiscoveryService
} }
// PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol // PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol
@ -43,10 +43,10 @@ type PeerConnector interface {
func NewRendezvous(db *DB, peerConnector PeerConnector, log *zap.Logger) *Rendezvous { func NewRendezvous(db *DB, peerConnector PeerConnector, log *zap.Logger) *Rendezvous {
logger := log.Named("rendezvous") logger := log.Named("rendezvous")
return &Rendezvous{ return &Rendezvous{
db: db, db: db,
peerConnector: peerConnector, peerConnector: peerConnector,
log: logger, log: logger,
CommonService: protocol.NewCommonService(), CommonDiscoveryService: peermanager.NewCommonDiscoveryService(),
} }
} }
@ -56,14 +56,19 @@ func (r *Rendezvous) SetHost(h host.Host) {
} }
func (r *Rendezvous) Start(ctx context.Context) error { func (r *Rendezvous) Start(ctx context.Context) error {
return r.CommonService.Start(ctx, r.start) return r.CommonDiscoveryService.Start(ctx, r.start)
} }
func (r *Rendezvous) start() error { func (r *Rendezvous) start() error {
err := r.db.Start(r.Context()) if r.db != nil {
if err != nil { if err := r.db.Start(r.Context()); err != nil {
return err return err
}
} }
if r.peerConnector != nil {
r.peerConnector.Subscribe(r.Context(), r.GetListeningChan())
}
r.rendezvousSvc = rvs.NewRendezvousService(r.host, r.db) r.rendezvousSvc = rvs.NewRendezvousService(r.host, r.db)
r.log.Info("rendezvous protocol started") r.log.Info("rendezvous protocol started")
@ -98,19 +103,14 @@ func (r *Rendezvous) DiscoverWithNamespace(ctx context.Context, namespace string
if len(addrInfo) != 0 { if len(addrInfo) != 0 {
rp.SetSuccess(cookie) rp.SetSuccess(cookie)
peerCh := make(chan peermanager.PeerData)
defer close(peerCh)
r.peerConnector.Subscribe(ctx, peerCh)
for _, p := range addrInfo { for _, p := range addrInfo {
peer := peermanager.PeerData{ peer := peermanager.PeerData{
Origin: peerstore.Rendezvous, Origin: peerstore.Rendezvous,
AddrInfo: p, AddrInfo: p,
PubSubTopics: []string{namespace}, PubSubTopics: []string{namespace},
} }
select { if !r.PushToChan(peer) {
case <-ctx.Done():
return return
case peerCh <- peer:
} }
} }
} else { } else {
@ -180,7 +180,7 @@ func (r *Rendezvous) RegisterWithNamespace(ctx context.Context, namespace string
} }
func (r *Rendezvous) Stop() { func (r *Rendezvous) Stop() {
r.CommonService.Stop(func() { r.CommonDiscoveryService.Stop(func() {
r.host.RemoveStreamHandler(rvs.RendezvousProto) r.host.RemoveStreamHandler(rvs.RendezvousProto)
r.rendezvousSvc = nil r.rendezvousSvc = nil
}) })

View File

@ -92,6 +92,8 @@ func TestRendezvous(t *testing.T) {
rendezvousClient2 := NewRendezvous(nil, myPeerConnector, utils.Logger()) rendezvousClient2 := NewRendezvous(nil, myPeerConnector, utils.Logger())
rendezvousClient2.SetHost(host3) rendezvousClient2.SetHost(host3)
err = rendezvousClient2.Start(ctx)
require.NoError(t, err)
timedCtx, cancel := context.WithTimeout(ctx, 4*time.Second) timedCtx, cancel := context.WithTimeout(ctx, 4*time.Second)
defer cancel() defer cancel()
@ -108,4 +110,5 @@ func TestRendezvous(t *testing.T) {
case p := <-myPeerConnector.ch: case p := <-myPeerConnector.ch:
require.Equal(t, p.AddrInfo.ID.Pretty(), host2.ID().Pretty()) require.Equal(t, p.AddrInfo.ID.Pretty(), host2.ID().Pretty())
} }
rendezvousClient2.Stop()
} }