feat: rendezvous client

This commit is contained in:
Richard Ramos 2023-03-09 18:42:50 -04:00 committed by RichΛrd
parent 21ad496d42
commit 2b30726c14
7 changed files with 127 additions and 17 deletions

View File

@ -363,10 +363,9 @@ var (
Destination: &options.DiscV5.AutoUpdate,
EnvVars: []string{"WAKUNODE2_DISCV5_ENR_AUTO_UPDATE"},
})
Rendezvous = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "rendezvous",
Usage: "Enable rendezvous protocol server for peer discovery",
Usage: "Enable rendezvous protocol for peer discovery",
Destination: &options.Rendezvous.Enable,
EnvVars: []string{"WAKUNODE2_RENDEZVOUS"},
})
@ -378,6 +377,12 @@ var (
},
EnvVars: []string{"WAKUNODE2_RENDEZVOUSNODE"},
})
RendezvousServer = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "rendezvous-server",
Usage: "Enable rendezvous protocol server so other peers can use this node for discovery",
Destination: &options.Rendezvous.Server,
EnvVars: []string{"WAKUNODE2_RENDEZVOUS_SERVER"},
})
PeerExchange = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "peer-exchange",
Usage: "Enable waku peer exchange protocol (responder side)",

View File

@ -75,6 +75,7 @@ func main() {
DNSDiscoveryNameServer,
Rendezvous,
RendezvousNode,
RendezvousServer,
MetricsServer,
MetricsServerAddress,
MetricsServerPort,

View File

@ -59,7 +59,7 @@ func failOnErr(err error, msg string) {
}
func requiresDB(options Options) bool {
return options.Store.Enable || options.Rendezvous.Enable
return options.Store.Enable || options.Rendezvous.Server
}
const dialTimeout = 7 * time.Second
@ -243,6 +243,10 @@ func Execute(options Options) {
}
if options.Rendezvous.Enable {
nodeOpts = append(nodeOpts, node.WithRendezvous(options.Rendezvous.Nodes))
}
if options.Rendezvous.Server {
rdb := rendezvous.NewDB(ctx, db, logger)
nodeOpts = append(nodeOpts, node.WithRendezvousServer(rdb))
}

View File

@ -141,6 +141,7 @@ type PeerExchangeOptions struct {
type RendezvousOptions struct {
Enable bool
Server bool
Nodes []multiaddr.Multiaddr
}

View File

@ -214,7 +214,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
return nil, err
}
w.rendezvous = rendezvous.NewRendezvous(w.host, w.opts.rendezvousDB, w.peerConnector, w.log)
w.rendezvous = rendezvous.NewRendezvous(w.host, w.opts.enableRendezvousServer, w.opts.rendezvousDB, w.opts.enableRendezvous, w.opts.rendezvousNodes, w.peerConnector, w.log)
w.relay = relay.NewWakuRelay(w.host, w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...)
w.filter = filter.NewWakuFilter(w.host, w.bcaster, w.opts.isFilterFullNode, w.timesource, w.log, w.opts.filterOpts...)
w.filterV2Full = filterv2.NewWakuFilterFullnode(w.host, w.bcaster, w.timesource, w.log, w.opts.filterV2Opts...)
@ -393,7 +393,7 @@ func (w *WakuNode) Start(ctx context.Context) error {
}
}
if w.opts.enableRendezvous {
if w.opts.enableRendezvousServer || w.opts.enableRendezvous {
err := w.rendezvous.Start(ctx)
if err != nil {
return err
@ -425,7 +425,7 @@ func (w *WakuNode) Stop() {
defer w.identificationEventSub.Close()
defer w.addressChangesSub.Close()
if w.opts.enableRendezvous {
if w.opts.enableRendezvousServer || w.opts.enableRendezvous {
w.rendezvous.Stop()
}

View File

@ -81,7 +81,10 @@ type WakuNodeParameters struct {
messageProvider store.MessageProvider
enableRendezvous bool
rendezvousDB *rendezvous.DB
rendezvousNodes []multiaddr.Multiaddr
enableRendezvousServer bool
rendezvousDB *rendezvous.DB
swapMode int
swapDisconnectThreshold int
@ -437,11 +440,20 @@ func WithWebsockets(address string, port int) WakuNodeOption {
}
}
// WithRendezvous is a WakuOption used to enable rendezvous as a discovery
func WithRendezvous(rendezvousPoints []multiaddr.Multiaddr) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableRendezvous = true
params.rendezvousNodes = rendezvousPoints
return nil
}
}
// WithRendezvousServer is a WakuOption used to set the node as a rendezvous
// point, using an specific storage for the peer information
func WithRendezvousServer(db *rendezvous.DB) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableRendezvous = true
params.enableRendezvousServer = true
params.rendezvousDB = db
return nil
}

View File

@ -2,36 +2,51 @@ package rendezvous
import (
"context"
"math"
"sync"
"time"
rvs "github.com/berty/go-libp2p-rendezvous"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap"
)
const RendezvousID = rvs.RendezvousProto
type Rendezvous struct {
host host.Host
peerConnector PeerConnector
host host.Host
enableServer bool
db *DB
rendezvousSvc *rvs.RendezvousService
log *zap.Logger
discoverPeers bool
rendezvousPoints []multiaddr.Multiaddr
peerConnector PeerConnector
log *zap.Logger
wg sync.WaitGroup
cancel context.CancelFunc
}
type PeerConnector interface {
PeerChannel() chan<- peer.AddrInfo
}
func NewRendezvous(host host.Host, db *DB, peerConnector PeerConnector, log *zap.Logger) *Rendezvous {
func NewRendezvous(host host.Host, enableServer bool, db *DB, discoverPeers bool, rendevousPoints []multiaddr.Multiaddr, peerConnector PeerConnector, log *zap.Logger) *Rendezvous {
logger := log.Named("rendezvous")
return &Rendezvous{
host: host,
db: db,
peerConnector: peerConnector,
log: logger,
host: host,
enableServer: enableServer,
db: db,
discoverPeers: discoverPeers,
rendezvousPoints: rendevousPoints,
peerConnector: peerConnector,
log: logger,
}
}
@ -41,12 +56,84 @@ func (r *Rendezvous) Start(ctx context.Context) error {
return err
}
r.rendezvousSvc = rvs.NewRendezvousService(r.host, r.db)
ctx, cancel := context.WithCancel(ctx)
r.cancel = cancel
if r.enableServer {
r.rendezvousSvc = rvs.NewRendezvousService(r.host, r.db)
}
if r.discoverPeers {
r.wg.Add(1)
go r.register(ctx)
}
// TODO: Execute discovery and push nodes to peer connector. If asking for peers fail, add timeout and exponential backoff
r.log.Info("rendezvous protocol started")
return nil
}
const registerBackoff = 200 * time.Millisecond
const registerMaxRetries = 7
func (r *Rendezvous) callRegister(ctx context.Context, rendezvousClient rvs.RendezvousClient, retries int) (<-chan time.Time, int) {
ttl, err := rendezvousClient.Register(ctx, relay.DefaultWakuTopic, rvs.DefaultTTL) // TODO: determine which topic to use
var t <-chan time.Time
if err != nil {
r.log.Error("registering rendezvous client", zap.Error(err))
backoff := registerBackoff * time.Duration(math.Exp2(float64(retries)))
t = time.After(backoff)
retries++
} else {
t = time.After(ttl)
}
return t, retries
}
func (r *Rendezvous) register(ctx context.Context) {
for _, m := range r.rendezvousPoints {
r.wg.Add(1)
go func(m multiaddr.Multiaddr) {
r.wg.Done()
peerIDStr, err := m.ValueForProtocol(multiaddr.P_P2P)
if err != nil {
r.log.Error("error obtaining peerID", zap.Error(err))
return
}
peerID, err := peer.Decode(peerIDStr)
if err != nil {
r.log.Error("error obtaining peerID", zap.Error(err))
return
}
rendezvousClient := rvs.NewRendezvousClient(r.host, peerID)
retries := 0
var t <-chan time.Time
t, retries = r.callRegister(ctx, rendezvousClient, retries)
for {
select {
case <-ctx.Done():
return
case <-t:
t, retries = r.callRegister(ctx, rendezvousClient, retries)
if retries >= registerMaxRetries {
return
}
}
}
}(m)
}
}
func (r *Rendezvous) Stop() {
r.cancel()
r.wg.Wait()
r.host.RemoveStreamHandler(rvs.RendezvousProto)
r.rendezvousSvc = nil
}