refactor: make discovery connector subscribe to discovery services

This commit is contained in:
Richard Ramos 2023-07-07 08:35:22 -04:00 committed by richΛrd
parent 5ca26ef897
commit b26859fc6a
9 changed files with 107 additions and 59 deletions

View File

@ -147,19 +147,17 @@ func NewTestPeerDiscoverer() *TestPeerDiscoverer {
peerCh: make(chan v2.PeerData, 10),
}
go func() {
for p := range result.peerCh {
result.Lock()
result.peerMap[p.AddrInfo.ID] = struct{}{}
result.Unlock()
}
}()
return result
}
func (t *TestPeerDiscoverer) PeerChannel() chan<- v2.PeerData {
return t.peerCh
func (t *TestPeerDiscoverer) Subscribe(ctx context.Context, ch <-chan v2.PeerData) {
go func() {
for p := range ch {
t.Lock()
t.peerMap[p.AddrInfo.ID] = struct{}{}
t.Unlock()
}
}()
}
func (t *TestPeerDiscoverer) HasPeer(p peer.ID) bool {

View File

@ -40,6 +40,7 @@ type PeerConnectionStrategy struct {
dialTimeout time.Duration
peerCh chan PeerData
dialCh chan peer.AddrInfo
subscriptions []<-chan PeerData
backoff backoff.BackoffFactory
mux sync.Mutex
@ -78,9 +79,33 @@ type PeerData struct {
ENR *enode.Node
}
// PeerChannel exposes the channel on which discovered peers should be pushed
func (c *PeerConnectionStrategy) PeerChannel() chan<- PeerData {
return c.peerCh
// Subscribe receives channels on which discovered peers should be pushed
func (c *PeerConnectionStrategy) Subscribe(ctx context.Context, ch <-chan PeerData) {
if c.cancel != nil {
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.consumeSubscription(ctx, ch)
}()
} else {
c.subscriptions = append(c.subscriptions, ch)
}
}
func (c *PeerConnectionStrategy) consumeSubscription(ctx context.Context, ch <-chan PeerData) {
for {
select {
case <-ctx.Done():
return
case p := <-ch:
select {
case <-ctx.Done():
return
case c.peerCh <- p:
}
}
}
}
// Sets the host to be able to mount or consume a protocol
@ -104,6 +129,8 @@ func (c *PeerConnectionStrategy) Start(ctx context.Context) error {
go c.workPublisher(ctx)
go c.dialPeers(ctx)
c.consumeSubscriptions(ctx)
return nil
}
@ -118,6 +145,7 @@ func (c *PeerConnectionStrategy) Stop() {
close(c.peerCh)
close(c.dialCh)
c.subscriptions = nil
c.cancel = nil
}
@ -159,6 +187,16 @@ func (c *PeerConnectionStrategy) shouldDialPeers(ctx context.Context) {
}
}
func (c *PeerConnectionStrategy) consumeSubscriptions(ctx context.Context) {
for _, subs := range c.subscriptions {
c.wg.Add(1)
go func(s <-chan PeerData) {
defer c.wg.Done()
c.consumeSubscription(ctx, s)
}(subs)
}
}
func (c *PeerConnectionStrategy) publishWork(ctx context.Context, p peer.AddrInfo) {
select {
case c.dialCh <- p:

View File

@ -28,6 +28,10 @@ import (
var ErrNoDiscV5Listener = errors.New("no discv5 listener")
type PeerConnector interface {
Subscribe(context.Context, <-chan v2.PeerData)
}
type DiscoveryV5 struct {
params *discV5Parameters
host host.Host
@ -35,7 +39,9 @@ type DiscoveryV5 struct {
udpAddr *net.UDPAddr
listener *discover.UDPv5
localnode *enode.LocalNode
peerConnector PeerConnector
peerCh chan v2.PeerData
NAT nat.Interface
log *zap.Logger
@ -101,10 +107,6 @@ func DefaultOptions() []DiscoveryV5Option {
}
}
type PeerConnector interface {
PeerChannel() chan<- v2.PeerData
}
func NewDiscoveryV5(priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConnector PeerConnector, log *zap.Logger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) {
params := new(discV5Parameters)
optList := DefaultOptions()
@ -121,8 +123,8 @@ func NewDiscoveryV5(priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConn
}
return &DiscoveryV5{
peerConnector: peerConnector,
params: params,
peerConnector: peerConnector,
NAT: NAT,
wg: &sync.WaitGroup{},
localnode: localnode,
@ -195,6 +197,9 @@ func (d *DiscoveryV5) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
d.cancel = cancel
d.peerCh = make(chan v2.PeerData)
d.peerConnector.Subscribe(ctx, d.peerCh)
err := d.listen(ctx)
if err != nil {
return err
@ -225,6 +230,7 @@ func (d *DiscoveryV5) Stop() {
if !d.started.CompareAndSwap(true, false) { // if Discoveryv5 is running, set started to false
return
}
d.cancel()
if d.listener != nil {
@ -234,6 +240,8 @@ func (d *DiscoveryV5) Stop() {
}
d.wg.Wait()
close(d.peerCh)
}
/*
@ -402,7 +410,7 @@ func (d *DiscoveryV5) peerLoop(ctx context.Context) error {
}
select {
case d.peerConnector.PeerChannel() <- peer:
case d.peerCh <- peer:
case <-ctx.Done():
return nil
}

View File

@ -4,7 +4,6 @@ import (
"context"
"github.com/libp2p/go-libp2p/core/host"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
)
@ -19,8 +18,3 @@ type ReceptorService interface {
Stop()
Start(context.Context, relay.Subscription) error
}
type PeerConnectorService interface {
Service
PeerChannel() chan<- v2.PeerData
}

View File

@ -81,10 +81,10 @@ type WakuNode struct {
timesource timesource.Timesource
peerstore peerstore.Peerstore
peerConnector *v2.PeerConnectionStrategy
relay Service
lightPush Service
peerConnector PeerConnectorService
discoveryV5 Service
peerExchange Service
rendezvous Service

View File

@ -103,6 +103,10 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb
wakuPX.wg.Add(1)
go func() {
defer wakuPX.wg.Done()
peerCh := make(chan v2.PeerData)
defer close(peerCh)
wakuPX.peerConnector.Subscribe(ctx, peerCh)
for _, p := range discoveredPeers {
peer := v2.PeerData{
Origin: peers.PeerExchange,
@ -112,7 +116,7 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb
select {
case <-ctx.Done():
return
case wakuPX.peerConnector.PeerChannel() <- peer:
case peerCh <- peer:
}
}
}()

View File

@ -31,6 +31,10 @@ var (
ErrInvalidId = errors.New("invalid request id")
)
type PeerConnector interface {
Subscribe(context.Context, <-chan v2.PeerData)
}
type WakuPeerExchange struct {
h host.Host
disc *discv5.DiscoveryV5
@ -44,10 +48,6 @@ type WakuPeerExchange struct {
enrCache *enrCache
}
type PeerConnector interface {
PeerChannel() chan<- v2.PeerData
}
// NewWakuPeerExchange returns a new instance of WakuPeerExchange struct
func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, log *zap.Logger) (*WakuPeerExchange, error) {
newEnrCache, err := newEnrCache(MaxCacheSize)
@ -59,6 +59,7 @@ func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector,
wakuPX.log = log.Named("wakupx")
wakuPX.enrCache = newEnrCache
wakuPX.peerConnector = peerConnector
return wakuPX, nil
}

View File

@ -26,6 +26,10 @@ type rendezvousPoint struct {
cookie []byte
}
type PeerConnector interface {
Subscribe(context.Context, <-chan v2.PeerData)
}
type Rendezvous struct {
host host.Host
@ -41,10 +45,7 @@ type Rendezvous struct {
cancel context.CancelFunc
}
type PeerConnector interface {
PeerChannel() chan<- v2.PeerData
}
// NewRendezvous creates an instance of a Rendezvous which might act as rendezvous point for other nodes, or act as a client node
func NewRendezvous(enableServer bool, db *DB, rendezvousPoints []peer.ID, peerConnector PeerConnector, log *zap.Logger) *Rendezvous {
logger := log.Named("rendezvous")
@ -95,7 +96,6 @@ func (r *Rendezvous) getRandomServer() *rendezvousPoint {
}
func (r *Rendezvous) Discover(ctx context.Context, topic string, numPeers int) {
defer r.wg.Done()
for {
select {
case <-ctx.Done():
@ -118,17 +118,22 @@ func (r *Rendezvous) Discover(ctx context.Context, topic string, numPeers int) {
server.cookie = cookie
server.Unlock()
peerCh := make(chan v2.PeerData)
r.peerConnector.Subscribe(context.Background(), peerCh)
for _, addr := range addrInfo {
peer := v2.PeerData{
Origin: peers.Rendezvous,
AddrInfo: addr,
}
fmt.Println("PPPPPPPPPPPPPP")
select {
case r.peerConnector.PeerChannel() <- peer:
case peerCh <- peer:
fmt.Println("DISCOVERED")
case <-ctx.Done():
return
}
}
close(peerCh)
} else {
// TODO: improve this by adding an exponential backoff?
time.Sleep(5 * time.Second)

View File

@ -19,17 +19,17 @@ import (
)
type PeerConn struct {
ch chan v2.PeerData
ch <-chan v2.PeerData
}
func (p PeerConn) PeerChannel() chan<- v2.PeerData {
return p.ch
func (p *PeerConn) Subscribe(ctx context.Context, ch <-chan v2.PeerData) {
p.ch = ch
}
func NewPeerConn() PeerConn {
func NewPeerConn() *PeerConn {
x := PeerConn{}
x.ch = make(chan v2.PeerData, 1000)
return x
return &x
}
const testTopic = "test"
@ -48,7 +48,7 @@ func TestRendezvous(t *testing.T) {
require.NoError(t, err)
rdb := NewDB(context.Background(), db, utils.Logger())
rendezvousPoint := NewRendezvous(true, rdb, nil, nil, utils.Logger())
rendezvousPoint := NewRendezvous(true, rdb, nil, NewPeerConn(), utils.Logger())
rendezvousPoint.SetHost(host1)
err = rendezvousPoint.Start(context.Background())
require.NoError(t, err)
@ -69,7 +69,7 @@ func TestRendezvous(t *testing.T) {
err = host2.Peerstore().AddProtocols(info.ID, RendezvousID)
require.NoError(t, err)
rendezvousClient1 := NewRendezvous(false, nil, []peer.ID{host1.ID()}, nil, utils.Logger())
rendezvousClient1 := NewRendezvous(false, nil, []peer.ID{host1.ID()}, NewPeerConn(), utils.Logger())
rendezvousClient1.SetHost(host2)
err = rendezvousClient1.Start(context.Background())
require.NoError(t, err)
@ -87,7 +87,6 @@ func TestRendezvous(t *testing.T) {
require.NoError(t, err)
myPeerConnector := NewPeerConn()
rendezvousClient2 := NewRendezvous(false, nil, []peer.ID{host1.ID()}, myPeerConnector, utils.Logger())
rendezvousClient2.SetHost(host3)
err = rendezvousClient2.Start(context.Background())
@ -98,8 +97,9 @@ func TestRendezvous(t *testing.T) {
defer cancel()
go rendezvousClient2.Discover(timedCtx, testTopic, 5)
time.Sleep(500 * time.Millisecond)
timer := time.After(5 * time.Second)
timer := time.After(3 * time.Second)
select {
case <-timer:
require.Fail(t, "no peer discovered")