From 9594e54d361e40b8c10c07c40bb5971b4aee2678 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Fri, 12 May 2023 17:52:42 -0400 Subject: [PATCH] feat: use circuit relay in service node --- cmd/waku/flags.go | 7 +++ cmd/waku/main.go | 1 + go.mod | 1 + go.sum | 1 + waku/node.go | 5 +++ waku/options.go | 1 + waku/v2/node/wakunode2.go | 90 ++++++++++++++++++++++++++++++++++++- waku/v2/node/wakuoptions.go | 1 + 8 files changed, 105 insertions(+), 2 deletions(-) diff --git a/cmd/waku/flags.go b/cmd/waku/flags.go index 1bf0617f..cf6ad149 100644 --- a/cmd/waku/flags.go +++ b/cmd/waku/flags.go @@ -167,6 +167,13 @@ var ( Usage: "Display listening addresses according to current configuration", Destination: &options.ShowAddresses, }) + CircuitRelay = altsrc.NewBoolFlag(&cli.BoolFlag{ + Name: "circuit-relay", + Usage: "Enable circuit relay service", + Value: true, + Destination: &options.CircuitRelay, + EnvVars: []string{"WAKUNODE2_CIRCUIT_RELAY"}, + }) LogLevel = cliutils.NewGenericFlagSingleValue(&cli.GenericFlag{ Name: "log-level", Aliases: []string{"l"}, diff --git a/cmd/waku/main.go b/cmd/waku/main.go index 1b0d857f..5046bff9 100644 --- a/cmd/waku/main.go +++ b/cmd/waku/main.go @@ -40,6 +40,7 @@ func main() { IPAddress, ExtMultiaddresses, ShowAddresses, + CircuitRelay, LogLevel, LogEncoding, LogOutput, diff --git a/go.mod b/go.mod index 4309f8c0..0fbb4d8c 100644 --- a/go.mod +++ b/go.mod @@ -35,6 +35,7 @@ require ( require ( github.com/berty/go-libp2p-rendezvous v0.4.1 + github.com/cenkalti/backoff/v4 v4.1.2 github.com/go-chi/chi/v5 v5.0.0 github.com/lib/pq v1.10.4 github.com/waku-org/go-noise v0.0.4 diff --git a/go.sum b/go.sum index 4cf9a609..011f449d 100644 --- a/go.sum +++ b/go.sum @@ -247,6 +247,7 @@ github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b/go.mod h1:obH5gd0Bsq github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywRkfhyM/+dE= github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34= github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= +github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo= github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= diff --git a/waku/node.go b/waku/node.go index 9368b99d..70965a49 100644 --- a/waku/node.go +++ b/waku/node.go @@ -133,6 +133,11 @@ func Execute(options Options) { libp2pOpts = append(libp2pOpts, libp2p.NATPortMap()) // Attempt to open ports using uPNP for NATed hosts.) } + // Node can be a circuit relay server + if options.CircuitRelay { + libp2pOpts = append(libp2pOpts, libp2p.EnableRelayService()) + } + if options.UserAgent != "" { libp2pOpts = append(libp2pOpts, libp2p.UserAgent(options.UserAgent)) } diff --git a/waku/options.go b/waku/options.go index da69f0f2..a004e038 100644 --- a/waku/options.go +++ b/waku/options.go @@ -155,6 +155,7 @@ type Options struct { KeepAlive time.Duration AdvertiseAddresses []multiaddr.Multiaddr ShowAddresses bool + CircuitRelay bool LogLevel string LogEncoding string LogOutput string diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 02515660..96bb219a 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -9,6 +9,7 @@ import ( "sync" "time" + backoffv4 "github.com/cenkalti/backoff/v4" golog "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p" "go.uber.org/zap" @@ -23,6 +24,8 @@ import ( "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/discovery/backoff" + "github.com/libp2p/go-libp2p/p2p/host/autorelay" + "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto" ws "github.com/libp2p/go-libp2p/p2p/transport/websocket" ma "github.com/multiformats/go-multiaddr" "go.opencensus.io/stats" @@ -89,7 +92,8 @@ type WakuNode struct { store ReceptorService rlnRelay RLNRelay - wakuFlag enr.WakuEnrBitfield + wakuFlag enr.WakuEnrBitfield + circuitRelayNodes chan peer.AddrInfo localNode *enode.LocalNode @@ -177,6 +181,34 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.wg = &sync.WaitGroup{} w.keepAliveFails = make(map[peer.ID]int) w.wakuFlag = enr.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableLegacyFilter, w.opts.enableStore, w.opts.enableRelay) + w.circuitRelayNodes = make(chan peer.AddrInfo) + + // Use circuit relay with nodes received on circuitRelayNodes channel + params.libP2POpts = append(params.libP2POpts, libp2p.EnableAutoRelayWithPeerSource( + func(ctx context.Context, numPeers int) <-chan peer.AddrInfo { + r := make(chan peer.AddrInfo) + go func() { + defer close(r) + for ; numPeers != 0; numPeers-- { + select { + case v, ok := <-w.circuitRelayNodes: + if !ok { + return + } + select { + case r <- v: + case <-ctx.Done(): + return + } + case <-ctx.Done(): + return + } + } + }() + return r + }, + autorelay.WithMinInterval(0), + )) if params.enableNTP { w.timesource = timesource.NewNTPTimesource(w.opts.ntpURLs, w.log) @@ -305,10 +337,11 @@ func (w *WakuNode) Start(ctx context.Context) error { w.enrChangeCh = make(chan struct{}, 10) - w.wg.Add(3) + w.wg.Add(4) go w.connectednessListener(ctx) go w.watchMultiaddressChanges(ctx) go w.watchENRChanges(ctx) + go w.findRelayNodes(ctx) err = w.bcaster.Start(ctx) if err != nil { @@ -812,3 +845,56 @@ func (w *WakuNode) Peers() ([]*Peer, error) { } return peers, nil } + +func (w *WakuNode) findRelayNodes(ctx context.Context) { + defer w.wg.Done() + + // Feed peers more often right after the bootstrap, then backoff + bo := backoffv4.NewExponentialBackOff() + bo.InitialInterval = 15 * time.Second + bo.Multiplier = 3 + bo.MaxInterval = 1 * time.Hour + bo.MaxElapsedTime = 0 // never stop + t := backoffv4.NewTicker(bo) + defer t.Stop() + for { + select { + case <-t.C: + case <-ctx.Done(): + return + } + + peers, err := w.Peers() + if err != nil { + w.log.Error("failed to fetch peers", zap.Error(err)) + continue + } + + // Shuffle peers + rand.Seed(time.Now().UnixNano()) + rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] }) + + for _, p := range peers { + info := w.Host().Peerstore().PeerInfo(p.ID) + + supportedProtocols, err := w.Host().Peerstore().SupportsProtocols(p.ID, proto.ProtoIDv2Hop) + if err != nil { + w.log.Error("could not check supported protocols", zap.Error(err)) + continue + } + + if len(supportedProtocols) == 0 { + continue + } + + select { + case <-ctx.Done(): + w.log.Debug("context done, auto-relay has enough peers") + return + + case w.circuitRelayNodes <- info: + w.log.Debug("published auto-relay peer info", zap.Any("peer-id", p.ID)) + } + } + } +} diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 4720a4df..4891fd9b 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -524,6 +524,7 @@ var DefaultLibP2POptions = []libp2p.Option{ ), libp2p.EnableNATService(), libp2p.ConnectionManager(newConnManager(200, 300, connmgr.WithGracePeriod(0))), + libp2p.EnableHolePunching(), } func newConnManager(lo int, hi int, opts ...connmgr.Option) *connmgr.BasicConnMgr {