refactor: rendezvous

This commit is contained in:
Richard Ramos 2023-06-23 11:50:32 -04:00 committed by richΛrd
parent fff255d0ef
commit cd358c7bd6
5 changed files with 90 additions and 32 deletions

View File

@ -345,6 +345,40 @@ func Execute(options Options) {
sub, err := wakuNode.Relay().SubscribeToTopic(ctx, nodeTopic) sub, err := wakuNode.Relay().SubscribeToTopic(ctx, nodeTopic)
failOnErr(err, "Error subscring to topic") failOnErr(err, "Error subscring to topic")
sub.Unsubscribe() sub.Unsubscribe()
if options.Rendezvous.Enable {
// Register the node in rendezvous point
// TODO: we have to determine how discovery would work with relay subscriptions.
// It might make sense to use pubsub.WithDiscovery option of gossipsub and
// register DiscV5, PeerExchange and Rendezvous. This should be an
// application concern instead of having (i.e. ./build/waku or the status app)
// instead of having the wakunode being the one deciding to advertise which
// topics/shards it supports
wakuNode.Rendezvous().Register(ctx, nodeTopic)
go func(nodeTopic string) {
desiredOutDegree := wakuNode.Relay().Params().D
t := time.NewTicker(7 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
peerCnt := len(wakuNode.Relay().PubSub().ListPeers(nodeTopic))
peersToFind := desiredOutDegree - peerCnt
if peersToFind <= 0 {
continue
}
ctx, cancel := context.WithTimeout(ctx, 7*time.Second)
wakuNode.Rendezvous().Discover(ctx, nodeTopic, peersToFind)
cancel()
}
}
}(nodeTopic)
}
} }
for _, protectedTopic := range options.Relay.ProtectedTopics { for _, protectedTopic := range options.Relay.ProtectedTopics {

View File

@ -274,7 +274,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
rendezvousPoints = append(rendezvousPoints, peerID) rendezvousPoints = append(rendezvousPoints, peerID)
} }
w.rendezvous = rendezvous.NewRendezvous(w.opts.enableRendezvousServer, w.opts.rendezvousDB, w.opts.enableRendezvous, rendezvousPoints, w.peerConnector, w.log) w.rendezvous = rendezvous.NewRendezvous(w.opts.enableRendezvousServer, w.opts.rendezvousDB, rendezvousPoints, w.peerConnector, w.log)
w.relay = relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...) w.relay = relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...)
w.legacyFilter = legacy_filter.NewWakuFilter(w.bcaster, w.opts.isLegacyFilterFullnode, w.timesource, w.log, w.opts.legacyFilterOpts...) w.legacyFilter = legacy_filter.NewWakuFilter(w.bcaster, w.opts.isLegacyFilterFullnode, w.timesource, w.log, w.opts.legacyFilterOpts...)
w.filterFullnode = filter.NewWakuFilterFullnode(w.timesource, w.log, w.opts.filterOpts...) w.filterFullnode = filter.NewWakuFilterFullnode(w.timesource, w.log, w.opts.filterOpts...)
@ -474,7 +474,7 @@ func (w *WakuNode) Start(ctx context.Context) error {
} }
w.rendezvous.SetHost(host) w.rendezvous.SetHost(host)
if w.opts.enableRendezvousServer || w.opts.enableRendezvous { if w.opts.enableRendezvousServer {
err := w.rendezvous.Start(ctx) err := w.rendezvous.Start(ctx)
if err != nil { if err != nil {
return err return err
@ -504,7 +504,7 @@ func (w *WakuNode) Stop() {
defer w.identificationEventSub.Close() defer w.identificationEventSub.Close()
defer w.addressChangesSub.Close() defer w.addressChangesSub.Close()
if w.opts.enableRendezvousServer || w.opts.enableRendezvous { if w.opts.enableRendezvousServer {
w.rendezvous.Stop() w.rendezvous.Stop()
} }
@ -652,6 +652,14 @@ func (w *WakuNode) PeerExchange() *peer_exchange.WakuPeerExchange {
return nil return nil
} }
// Rendezvous is used to access any operation related to Rendezvous
func (w *WakuNode) Rendezvous() *rendezvous.Rendezvous {
if result, ok := w.rendezvous.(*rendezvous.Rendezvous); ok {
return result
}
return nil
}
// Broadcaster is used to access the message broadcaster that is used to push // Broadcaster is used to access the message broadcaster that is used to push
// messages to different protocols // messages to different protocols
func (w *WakuNode) Broadcaster() relay.Broadcaster { func (w *WakuNode) Broadcaster() relay.Broadcaster {

View File

@ -82,10 +82,9 @@ type WakuNodeParameters struct {
resumeNodes []multiaddr.Multiaddr resumeNodes []multiaddr.Multiaddr
messageProvider store.MessageProvider messageProvider store.MessageProvider
enableRendezvous bool
rendezvousNodes []multiaddr.Multiaddr rendezvousNodes []multiaddr.Multiaddr
enableRendezvousServer bool enableRendezvousServer bool
rendezvousDB *rendezvous.DB rendezvousDB *rendezvous.DB
discoveryMinPeers int discoveryMinPeers int
@ -487,7 +486,6 @@ func WithWebsockets(address string, port int) WakuNodeOption {
// WithRendezvous is a WakuOption used to enable rendezvous as a discovery // WithRendezvous is a WakuOption used to enable rendezvous as a discovery
func WithRendezvous(rendezvousPoints []multiaddr.Multiaddr) WakuNodeOption { func WithRendezvous(rendezvousPoints []multiaddr.Multiaddr) WakuNodeOption {
return func(params *WakuNodeParameters) error { return func(params *WakuNodeParameters) error {
params.enableRendezvous = true
params.rendezvousNodes = rendezvousPoints params.rendezvousNodes = rendezvousPoints
return nil return nil
} }

View File

@ -2,6 +2,7 @@ package rendezvous
import ( import (
"context" "context"
"fmt"
"math" "math"
"math/rand" "math/rand"
"sync" "sync"
@ -12,7 +13,7 @@ import (
rvs "github.com/waku-org/go-libp2p-rendezvous" rvs "github.com/waku-org/go-libp2p-rendezvous"
v2 "github.com/waku-org/go-waku/waku/v2" v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/peers" "github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -32,7 +33,6 @@ type Rendezvous struct {
db *DB db *DB
rendezvousSvc *rvs.RendezvousService rendezvousSvc *rvs.RendezvousService
discoverPeers bool
rendezvousPoints []*rendezvousPoint rendezvousPoints []*rendezvousPoint
peerConnector PeerConnector peerConnector PeerConnector
@ -45,7 +45,7 @@ type PeerConnector interface {
PeerChannel() chan<- v2.PeerData PeerChannel() chan<- v2.PeerData
} }
func NewRendezvous(enableServer bool, db *DB, discoverPeers bool, rendezvousPoints []peer.ID, peerConnector PeerConnector, log *zap.Logger) *Rendezvous { func NewRendezvous(enableServer bool, db *DB, rendezvousPoints []peer.ID, peerConnector PeerConnector, log *zap.Logger) *Rendezvous {
logger := log.Named("rendezvous") logger := log.Named("rendezvous")
var rendevousPoints []*rendezvousPoint var rendevousPoints []*rendezvousPoint
@ -58,7 +58,6 @@ func NewRendezvous(enableServer bool, db *DB, discoverPeers bool, rendezvousPoin
return &Rendezvous{ return &Rendezvous{
enableServer: enableServer, enableServer: enableServer,
db: db, db: db,
discoverPeers: discoverPeers,
rendezvousPoints: rendevousPoints, rendezvousPoints: rendevousPoints,
peerConnector: peerConnector, peerConnector: peerConnector,
log: logger, log: logger,
@ -84,14 +83,6 @@ func (r *Rendezvous) Start(ctx context.Context) error {
r.rendezvousSvc = rvs.NewRendezvousService(r.host, r.db) r.rendezvousSvc = rvs.NewRendezvousService(r.host, r.db)
} }
r.wg.Add(1)
go r.register(ctx)
if r.discoverPeers {
r.wg.Add(1)
go r.discover(ctx)
}
r.log.Info("rendezvous protocol started") r.log.Info("rendezvous protocol started")
return nil return nil
} }
@ -103,7 +94,7 @@ func (r *Rendezvous) getRandomServer() *rendezvousPoint {
return r.rendezvousPoints[rand.Intn(len(r.rendezvousPoints))] // nolint: gosec return r.rendezvousPoints[rand.Intn(len(r.rendezvousPoints))] // nolint: gosec
} }
func (r *Rendezvous) discover(ctx context.Context) { func (r *Rendezvous) Discover(ctx context.Context, topic string, numPeers int) {
defer r.wg.Done() defer r.wg.Done()
for { for {
select { select {
@ -114,7 +105,7 @@ func (r *Rendezvous) discover(ctx context.Context) {
rendezvousClient := rvs.NewRendezvousClient(r.host, server.id) rendezvousClient := rvs.NewRendezvousClient(r.host, server.id)
addrInfo, cookie, err := rendezvousClient.Discover(ctx, relay.DefaultWakuTopic, 5, server.cookie) addrInfo, cookie, err := rendezvousClient.Discover(ctx, topic, numPeers, server.cookie)
if err != nil { if err != nil {
r.log.Error("could not discover new peers", zap.Error(err)) r.log.Error("could not discover new peers", zap.Error(err))
cookie = nil cookie = nil
@ -146,8 +137,13 @@ func (r *Rendezvous) discover(ctx context.Context) {
} }
} }
func (r *Rendezvous) callRegister(ctx context.Context, rendezvousClient rvs.RendezvousClient, retries int) (<-chan time.Time, int) { func (r *Rendezvous) DiscoverShard(ctx context.Context, cluster uint16, shard uint16, numPeers int) {
ttl, err := rendezvousClient.Register(ctx, relay.DefaultWakuTopic, rvs.DefaultTTL) // TODO: determine which topic to use namespace := ShardToNamespace(cluster, shard)
r.Discover(ctx, namespace, numPeers)
}
func (r *Rendezvous) callRegister(ctx context.Context, rendezvousClient rvs.RendezvousClient, topic string, retries int) (<-chan time.Time, int) {
ttl, err := rendezvousClient.Register(ctx, topic, rvs.DefaultTTL)
var t <-chan time.Time var t <-chan time.Time
if err != nil { if err != nil {
r.log.Error("registering rendezvous client", zap.Error(err)) r.log.Error("registering rendezvous client", zap.Error(err))
@ -161,9 +157,7 @@ func (r *Rendezvous) callRegister(ctx context.Context, rendezvousClient rvs.Rend
return t, retries return t, retries
} }
func (r *Rendezvous) register(ctx context.Context) { func (r *Rendezvous) Register(ctx context.Context, topic string) {
defer r.wg.Done()
for _, m := range r.rendezvousPoints { for _, m := range r.rendezvousPoints {
r.wg.Add(1) r.wg.Add(1)
go func(m *rendezvousPoint) { go func(m *rendezvousPoint) {
@ -173,13 +167,13 @@ func (r *Rendezvous) register(ctx context.Context) {
retries := 0 retries := 0
var t <-chan time.Time var t <-chan time.Time
t, retries = r.callRegister(ctx, rendezvousClient, retries) t, retries = r.callRegister(ctx, rendezvousClient, topic, retries)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-t: case <-t:
t, retries = r.callRegister(ctx, rendezvousClient, retries) t, retries = r.callRegister(ctx, rendezvousClient, topic, retries)
if retries >= registerMaxRetries { if retries >= registerMaxRetries {
return return
} }
@ -189,9 +183,24 @@ func (r *Rendezvous) register(ctx context.Context) {
} }
} }
func (r *Rendezvous) RegisterShard(ctx context.Context, cluster uint16, shard uint16) {
namespace := ShardToNamespace(cluster, shard)
r.Register(ctx, namespace)
}
func (r *Rendezvous) RegisterRelayShards(ctx context.Context, rs protocol.RelayShards) {
for _, idx := range rs.Indices {
go r.RegisterShard(ctx, rs.Cluster, idx)
}
}
func (r *Rendezvous) Stop() { func (r *Rendezvous) Stop() {
r.cancel() r.cancel()
r.wg.Wait() r.wg.Wait()
r.host.RemoveStreamHandler(rvs.RendezvousProto) r.host.RemoveStreamHandler(rvs.RendezvousProto)
r.rendezvousSvc = nil r.rendezvousSvc = nil
} }
func ShardToNamespace(cluster uint16, shard uint16) string {
return fmt.Sprintf("rs/%d/%d", cluster, shard)
}

View File

@ -32,6 +32,8 @@ func NewPeerConn() PeerConn {
return x return x
} }
const testTopic = "test"
func TestRendezvous(t *testing.T) { func TestRendezvous(t *testing.T) {
port1, err := tests.FindFreePort(t, "", 5) port1, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err) require.NoError(t, err)
@ -46,7 +48,7 @@ func TestRendezvous(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
rdb := NewDB(context.Background(), db, utils.Logger()) rdb := NewDB(context.Background(), db, utils.Logger())
rendezvousPoint := NewRendezvous(true, rdb, false, nil, nil, utils.Logger()) rendezvousPoint := NewRendezvous(true, rdb, nil, nil, utils.Logger())
rendezvousPoint.SetHost(host1) rendezvousPoint.SetHost(host1)
err = rendezvousPoint.Start(context.Background()) err = rendezvousPoint.Start(context.Background())
require.NoError(t, err) require.NoError(t, err)
@ -67,12 +69,14 @@ func TestRendezvous(t *testing.T) {
err = host2.Peerstore().AddProtocols(info.ID, RendezvousID) err = host2.Peerstore().AddProtocols(info.ID, RendezvousID)
require.NoError(t, err) require.NoError(t, err)
rendezvousClient1 := NewRendezvous(false, nil, false, []peer.ID{host1.ID()}, nil, utils.Logger()) rendezvousClient1 := NewRendezvous(false, nil, []peer.ID{host1.ID()}, nil, utils.Logger())
rendezvousClient1.SetHost(host2) rendezvousClient1.SetHost(host2)
err = rendezvousClient1.Start(context.Background()) err = rendezvousClient1.Start(context.Background())
require.NoError(t, err) require.NoError(t, err)
defer rendezvousClient1.Stop() defer rendezvousClient1.Stop()
rendezvousClient1.Register(context.Background(), testTopic)
port3, err := tests.FindFreePort(t, "", 5) port3, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err) require.NoError(t, err)
host3, err := tests.MakeHost(context.Background(), port3, rand.Reader) host3, err := tests.MakeHost(context.Background(), port3, rand.Reader)
@ -84,12 +88,17 @@ func TestRendezvous(t *testing.T) {
myPeerConnector := NewPeerConn() myPeerConnector := NewPeerConn()
rendezvousClient2 := NewRendezvous(false, nil, true, []peer.ID{host1.ID()}, myPeerConnector, utils.Logger()) rendezvousClient2 := NewRendezvous(false, nil, []peer.ID{host1.ID()}, myPeerConnector, utils.Logger())
rendezvousClient2.SetHost(host3) rendezvousClient2.SetHost(host3)
err = rendezvousClient2.Start(context.Background()) err = rendezvousClient2.Start(context.Background())
require.NoError(t, err) require.NoError(t, err)
defer rendezvousClient2.Stop() defer rendezvousClient2.Stop()
timedCtx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
defer cancel()
go rendezvousClient2.Discover(timedCtx, testTopic, 5)
timer := time.After(5 * time.Second) timer := time.After(5 * time.Second)
select { select {
case <-timer: case <-timer: