diff --git a/cmd/waku/flags.go b/cmd/waku/flags.go index 8b24d189..ab880d48 100644 --- a/cmd/waku/flags.go +++ b/cmd/waku/flags.go @@ -363,10 +363,9 @@ var ( Destination: &options.DiscV5.AutoUpdate, EnvVars: []string{"WAKUNODE2_DISCV5_ENR_AUTO_UPDATE"}, }) - Rendezvous = altsrc.NewBoolFlag(&cli.BoolFlag{ Name: "rendezvous", - Usage: "Enable rendezvous protocol server for peer discovery", + Usage: "Enable rendezvous protocol for peer discovery", Destination: &options.Rendezvous.Enable, EnvVars: []string{"WAKUNODE2_RENDEZVOUS"}, }) @@ -378,6 +377,12 @@ var ( }, EnvVars: []string{"WAKUNODE2_RENDEZVOUSNODE"}, }) + RendezvousServer = altsrc.NewBoolFlag(&cli.BoolFlag{ + Name: "rendezvous-server", + Usage: "Enable rendezvous protocol server so other peers can use this node for discovery", + Destination: &options.Rendezvous.Server, + EnvVars: []string{"WAKUNODE2_RENDEZVOUS_SERVER"}, + }) PeerExchange = altsrc.NewBoolFlag(&cli.BoolFlag{ Name: "peer-exchange", Usage: "Enable waku peer exchange protocol (responder side)", diff --git a/cmd/waku/main.go b/cmd/waku/main.go index b024b943..e728cbeb 100644 --- a/cmd/waku/main.go +++ b/cmd/waku/main.go @@ -75,6 +75,7 @@ func main() { DNSDiscoveryNameServer, Rendezvous, RendezvousNode, + RendezvousServer, MetricsServer, MetricsServerAddress, MetricsServerPort, diff --git a/waku/node.go b/waku/node.go index 8c720198..bdd9d481 100644 --- a/waku/node.go +++ b/waku/node.go @@ -59,7 +59,7 @@ func failOnErr(err error, msg string) { } func requiresDB(options Options) bool { - return options.Store.Enable || options.Rendezvous.Enable + return options.Store.Enable || options.Rendezvous.Server } const dialTimeout = 7 * time.Second @@ -243,6 +243,10 @@ func Execute(options Options) { } if options.Rendezvous.Enable { + nodeOpts = append(nodeOpts, node.WithRendezvous(options.Rendezvous.Nodes)) + } + + if options.Rendezvous.Server { rdb := rendezvous.NewDB(ctx, db, logger) nodeOpts = append(nodeOpts, node.WithRendezvousServer(rdb)) } diff --git a/waku/options.go b/waku/options.go index 659e10b9..d401ad5c 100644 --- a/waku/options.go +++ b/waku/options.go @@ -141,6 +141,7 @@ type PeerExchangeOptions struct { type RendezvousOptions struct { Enable bool + Server bool Nodes []multiaddr.Multiaddr } diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index a02b7293..b7c532f5 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -214,7 +214,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { return nil, err } - w.rendezvous = rendezvous.NewRendezvous(w.host, w.opts.rendezvousDB, w.peerConnector, w.log) + w.rendezvous = rendezvous.NewRendezvous(w.host, w.opts.enableRendezvousServer, w.opts.rendezvousDB, w.opts.enableRendezvous, w.opts.rendezvousNodes, w.peerConnector, w.log) w.relay = relay.NewWakuRelay(w.host, w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...) w.filter = filter.NewWakuFilter(w.host, w.bcaster, w.opts.isFilterFullNode, w.timesource, w.log, w.opts.filterOpts...) w.filterV2Full = filterv2.NewWakuFilterFullnode(w.host, w.bcaster, w.timesource, w.log, w.opts.filterV2Opts...) @@ -393,7 +393,7 @@ func (w *WakuNode) Start(ctx context.Context) error { } } - if w.opts.enableRendezvous { + if w.opts.enableRendezvousServer || w.opts.enableRendezvous { err := w.rendezvous.Start(ctx) if err != nil { return err @@ -425,7 +425,7 @@ func (w *WakuNode) Stop() { defer w.identificationEventSub.Close() defer w.addressChangesSub.Close() - if w.opts.enableRendezvous { + if w.opts.enableRendezvousServer || w.opts.enableRendezvous { w.rendezvous.Stop() } diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index f5078156..e615b608 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -81,7 +81,10 @@ type WakuNodeParameters struct { messageProvider store.MessageProvider enableRendezvous bool - rendezvousDB *rendezvous.DB + rendezvousNodes []multiaddr.Multiaddr + + enableRendezvousServer bool + rendezvousDB *rendezvous.DB swapMode int swapDisconnectThreshold int @@ -437,11 +440,20 @@ func WithWebsockets(address string, port int) WakuNodeOption { } } +// WithRendezvous is a WakuOption used to enable rendezvous as a discovery +func WithRendezvous(rendezvousPoints []multiaddr.Multiaddr) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.enableRendezvous = true + params.rendezvousNodes = rendezvousPoints + return nil + } +} + // WithRendezvousServer is a WakuOption used to set the node as a rendezvous // point, using an specific storage for the peer information func WithRendezvousServer(db *rendezvous.DB) WakuNodeOption { return func(params *WakuNodeParameters) error { - params.enableRendezvous = true + params.enableRendezvousServer = true params.rendezvousDB = db return nil } diff --git a/waku/v2/rendezvous/rendezvous.go b/waku/v2/rendezvous/rendezvous.go index 4c498771..aab7c3ca 100644 --- a/waku/v2/rendezvous/rendezvous.go +++ b/waku/v2/rendezvous/rendezvous.go @@ -2,36 +2,51 @@ package rendezvous import ( "context" + "math" + "sync" + "time" rvs "github.com/berty/go-libp2p-rendezvous" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "go.uber.org/zap" ) const RendezvousID = rvs.RendezvousProto type Rendezvous struct { - host host.Host - peerConnector PeerConnector + host host.Host + + enableServer bool db *DB rendezvousSvc *rvs.RendezvousService - log *zap.Logger + discoverPeers bool + rendezvousPoints []multiaddr.Multiaddr + peerConnector PeerConnector + + log *zap.Logger + wg sync.WaitGroup + cancel context.CancelFunc } type PeerConnector interface { PeerChannel() chan<- peer.AddrInfo } -func NewRendezvous(host host.Host, db *DB, peerConnector PeerConnector, log *zap.Logger) *Rendezvous { +func NewRendezvous(host host.Host, enableServer bool, db *DB, discoverPeers bool, rendevousPoints []multiaddr.Multiaddr, peerConnector PeerConnector, log *zap.Logger) *Rendezvous { logger := log.Named("rendezvous") return &Rendezvous{ - host: host, - db: db, - peerConnector: peerConnector, - log: logger, + host: host, + enableServer: enableServer, + db: db, + discoverPeers: discoverPeers, + rendezvousPoints: rendevousPoints, + peerConnector: peerConnector, + log: logger, } } @@ -41,12 +56,84 @@ func (r *Rendezvous) Start(ctx context.Context) error { return err } - r.rendezvousSvc = rvs.NewRendezvousService(r.host, r.db) + ctx, cancel := context.WithCancel(ctx) + + r.cancel = cancel + + if r.enableServer { + r.rendezvousSvc = rvs.NewRendezvousService(r.host, r.db) + } + + if r.discoverPeers { + r.wg.Add(1) + go r.register(ctx) + } + + // TODO: Execute discovery and push nodes to peer connector. If asking for peers fail, add timeout and exponential backoff + r.log.Info("rendezvous protocol started") return nil } +const registerBackoff = 200 * time.Millisecond +const registerMaxRetries = 7 + +func (r *Rendezvous) callRegister(ctx context.Context, rendezvousClient rvs.RendezvousClient, retries int) (<-chan time.Time, int) { + ttl, err := rendezvousClient.Register(ctx, relay.DefaultWakuTopic, rvs.DefaultTTL) // TODO: determine which topic to use + var t <-chan time.Time + if err != nil { + r.log.Error("registering rendezvous client", zap.Error(err)) + backoff := registerBackoff * time.Duration(math.Exp2(float64(retries))) + t = time.After(backoff) + retries++ + } else { + t = time.After(ttl) + } + + return t, retries +} + +func (r *Rendezvous) register(ctx context.Context) { + for _, m := range r.rendezvousPoints { + r.wg.Add(1) + go func(m multiaddr.Multiaddr) { + r.wg.Done() + + peerIDStr, err := m.ValueForProtocol(multiaddr.P_P2P) + if err != nil { + r.log.Error("error obtaining peerID", zap.Error(err)) + return + } + + peerID, err := peer.Decode(peerIDStr) + if err != nil { + r.log.Error("error obtaining peerID", zap.Error(err)) + return + } + + rendezvousClient := rvs.NewRendezvousClient(r.host, peerID) + retries := 0 + var t <-chan time.Time + + t, retries = r.callRegister(ctx, rendezvousClient, retries) + for { + select { + case <-ctx.Done(): + return + case <-t: + t, retries = r.callRegister(ctx, rendezvousClient, retries) + if retries >= registerMaxRetries { + return + } + } + } + }(m) + } +} + func (r *Rendezvous) Stop() { + r.cancel() + r.wg.Wait() r.host.RemoveStreamHandler(rvs.RendezvousProto) r.rendezvousSvc = nil }