feat: use circuit relay in service node

This commit is contained in:
Richard Ramos 2023-05-12 17:52:42 -04:00 committed by RichΛrd
parent ceed9c7d59
commit 9594e54d36
8 changed files with 105 additions and 2 deletions

View File

@ -167,6 +167,13 @@ var (
Usage: "Display listening addresses according to current configuration",
Destination: &options.ShowAddresses,
})
CircuitRelay = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "circuit-relay",
Usage: "Enable circuit relay service",
Value: true,
Destination: &options.CircuitRelay,
EnvVars: []string{"WAKUNODE2_CIRCUIT_RELAY"},
})
LogLevel = cliutils.NewGenericFlagSingleValue(&cli.GenericFlag{
Name: "log-level",
Aliases: []string{"l"},

View File

@ -40,6 +40,7 @@ func main() {
IPAddress,
ExtMultiaddresses,
ShowAddresses,
CircuitRelay,
LogLevel,
LogEncoding,
LogOutput,

1
go.mod
View File

@ -35,6 +35,7 @@ require (
require (
github.com/berty/go-libp2p-rendezvous v0.4.1
github.com/cenkalti/backoff/v4 v4.1.2
github.com/go-chi/chi/v5 v5.0.0
github.com/lib/pq v1.10.4
github.com/waku-org/go-noise v0.0.4

1
go.sum
View File

@ -247,6 +247,7 @@ github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b/go.mod h1:obH5gd0Bsq
github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywRkfhyM/+dE=
github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34=
github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo=
github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=

View File

@ -133,6 +133,11 @@ func Execute(options Options) {
libp2pOpts = append(libp2pOpts, libp2p.NATPortMap()) // Attempt to open ports using uPNP for NATed hosts.)
}
// Node can be a circuit relay server
if options.CircuitRelay {
libp2pOpts = append(libp2pOpts, libp2p.EnableRelayService())
}
if options.UserAgent != "" {
libp2pOpts = append(libp2pOpts, libp2p.UserAgent(options.UserAgent))
}

View File

@ -155,6 +155,7 @@ type Options struct {
KeepAlive time.Duration
AdvertiseAddresses []multiaddr.Multiaddr
ShowAddresses bool
CircuitRelay bool
LogLevel string
LogEncoding string
LogOutput string

View File

@ -9,6 +9,7 @@ import (
"sync"
"time"
backoffv4 "github.com/cenkalti/backoff/v4"
golog "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p"
"go.uber.org/zap"
@ -23,6 +24,8 @@ import (
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
ws "github.com/libp2p/go-libp2p/p2p/transport/websocket"
ma "github.com/multiformats/go-multiaddr"
"go.opencensus.io/stats"
@ -90,6 +93,7 @@ type WakuNode struct {
rlnRelay RLNRelay
wakuFlag enr.WakuEnrBitfield
circuitRelayNodes chan peer.AddrInfo
localNode *enode.LocalNode
@ -177,6 +181,34 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.wg = &sync.WaitGroup{}
w.keepAliveFails = make(map[peer.ID]int)
w.wakuFlag = enr.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableLegacyFilter, w.opts.enableStore, w.opts.enableRelay)
w.circuitRelayNodes = make(chan peer.AddrInfo)
// Use circuit relay with nodes received on circuitRelayNodes channel
params.libP2POpts = append(params.libP2POpts, libp2p.EnableAutoRelayWithPeerSource(
func(ctx context.Context, numPeers int) <-chan peer.AddrInfo {
r := make(chan peer.AddrInfo)
go func() {
defer close(r)
for ; numPeers != 0; numPeers-- {
select {
case v, ok := <-w.circuitRelayNodes:
if !ok {
return
}
select {
case r <- v:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return r
},
autorelay.WithMinInterval(0),
))
if params.enableNTP {
w.timesource = timesource.NewNTPTimesource(w.opts.ntpURLs, w.log)
@ -305,10 +337,11 @@ func (w *WakuNode) Start(ctx context.Context) error {
w.enrChangeCh = make(chan struct{}, 10)
w.wg.Add(3)
w.wg.Add(4)
go w.connectednessListener(ctx)
go w.watchMultiaddressChanges(ctx)
go w.watchENRChanges(ctx)
go w.findRelayNodes(ctx)
err = w.bcaster.Start(ctx)
if err != nil {
@ -812,3 +845,56 @@ func (w *WakuNode) Peers() ([]*Peer, error) {
}
return peers, nil
}
func (w *WakuNode) findRelayNodes(ctx context.Context) {
defer w.wg.Done()
// Feed peers more often right after the bootstrap, then backoff
bo := backoffv4.NewExponentialBackOff()
bo.InitialInterval = 15 * time.Second
bo.Multiplier = 3
bo.MaxInterval = 1 * time.Hour
bo.MaxElapsedTime = 0 // never stop
t := backoffv4.NewTicker(bo)
defer t.Stop()
for {
select {
case <-t.C:
case <-ctx.Done():
return
}
peers, err := w.Peers()
if err != nil {
w.log.Error("failed to fetch peers", zap.Error(err))
continue
}
// Shuffle peers
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] })
for _, p := range peers {
info := w.Host().Peerstore().PeerInfo(p.ID)
supportedProtocols, err := w.Host().Peerstore().SupportsProtocols(p.ID, proto.ProtoIDv2Hop)
if err != nil {
w.log.Error("could not check supported protocols", zap.Error(err))
continue
}
if len(supportedProtocols) == 0 {
continue
}
select {
case <-ctx.Done():
w.log.Debug("context done, auto-relay has enough peers")
return
case w.circuitRelayNodes <- info:
w.log.Debug("published auto-relay peer info", zap.Any("peer-id", p.ID))
}
}
}
}

View File

@ -524,6 +524,7 @@ var DefaultLibP2POptions = []libp2p.Option{
),
libp2p.EnableNATService(),
libp2p.ConnectionManager(newConnManager(200, 300, connmgr.WithGracePeriod(0))),
libp2p.EnableHolePunching(),
}
func newConnManager(lo int, hi int, opts ...connmgr.Option) *connmgr.BasicConnMgr {