waku2 - restart discv5 when peers are low and use newer go-waku code to deal with discovery

This commit is contained in:
Richard Ramos 2023-01-16 09:00:07 -04:00 committed by RichΛrd
parent 1bab7ae056
commit 7de1753549
20 changed files with 628 additions and 343 deletions

4
go.mod
View File

@ -77,7 +77,7 @@ require (
github.com/gorilla/sessions v1.2.1
github.com/meirf/gopart v0.0.0-20180520194036-37e9492a85a8
github.com/rmg/iso4217 v1.0.0
github.com/waku-org/go-waku v0.3.2-0.20230110124657-7d2a0ac0e25f
github.com/waku-org/go-waku v0.3.2-0.20230116133230-a4230afc62fb
)
require (
@ -236,7 +236,7 @@ require (
github.com/tklauser/go-sysconf v0.3.6 // indirect
github.com/tklauser/numcpus v0.2.2 // indirect
github.com/tyler-smith/go-bip39 v1.1.0 // indirect
github.com/urfave/cli/v2 v2.20.2 // indirect
github.com/urfave/cli/v2 v2.23.7 // indirect
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 // indirect
github.com/waku-org/go-zerokit-rln v0.1.7-wakuorg // indirect
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect

10
go.sum
View File

@ -104,7 +104,7 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I=
github.com/BurntSushi/toml v1.2.1 h1:9F2/+DoOYIOksmaJFPw1tGFy1eDnIJXg+UHjuD8lTak=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/ClickHouse/clickhouse-go v1.4.3/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI=
github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
@ -2048,8 +2048,8 @@ github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijb
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
github.com/urfave/cli/v2 v2.20.2 h1:dKA0LUjznZpwmmbrc0pOgcLTEilnHeM8Av9Yng77gHM=
github.com/urfave/cli/v2 v2.20.2/go.mod h1:1CNUng3PtjQMtRzJO4FMXBQvkGtuYRxxiR9xMa7jMwI=
github.com/urfave/cli/v2 v2.23.7 h1:YHDQ46s3VghFHFf1DdF+Sh7H4RqhcM+t0TmZRJx4oJY=
github.com/urfave/cli/v2 v2.23.7/go.mod h1:GHupkWPMM0M/sj1a2b4wUrWBPzazNrIjouW6fmdJLxc=
github.com/vacp2p/mvds v0.0.24-0.20201124060106-26d8e94130d8 h1:aSQuY64yglPb7I6lZRXt/xWD4ExM1DZo8Gpb7xCz6zk=
github.com/vacp2p/mvds v0.0.24-0.20201124060106-26d8e94130d8/go.mod h1:uUmtiahU7efOVl/5w5yk9jOze5xYpDZDrSrT8TvHXjQ=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
@ -2067,8 +2067,8 @@ github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 h1:xwY0kW5XZFimdqfZb9cZwT1S3VJP9j3AE6bdNd9boXM=
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw=
github.com/waku-org/go-waku v0.3.2-0.20230110124657-7d2a0ac0e25f h1:i49S329tLM10Rbrw+oW3kYQrf9HoumtEzE+CgXoB73U=
github.com/waku-org/go-waku v0.3.2-0.20230110124657-7d2a0ac0e25f/go.mod h1:ubX5lF4SsiC8tVjZXKQU94OwCYPkD3znJtkKn3aF8Fw=
github.com/waku-org/go-waku v0.3.2-0.20230116133230-a4230afc62fb h1:66ae/38EApilmfSMMx49ySuM5T1LCNEbra4LTRbDPw0=
github.com/waku-org/go-waku v0.3.2-0.20230116133230-a4230afc62fb/go.mod h1:sI14mN/sM8inIb2x2b462wydSEFyOyuDKI1cjiVIIpM=
github.com/waku-org/go-zerokit-rln v0.1.7-wakuorg h1:2vVIBCtBih2w1K9ll8YnToTDZvbxcgbsClsPlJS/kkg=
github.com/waku-org/go-zerokit-rln v0.1.7-wakuorg/go.mod h1:GlyaVeEWNEBxVJrWC6jFTvb4LNb9d9qnjdS6EiWVUvk=
github.com/wealdtech/go-ens/v3 v3.5.0 h1:Huc9GxBgiGweCOGTYomvsg07K2QggAqZpZ5SuiZdC8o=

View File

@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
"sort"
"strings"
"time"
)
@ -20,6 +21,7 @@ var (
errInvalidActionType = NewExitError("ERROR invalid Action type. "+
fmt.Sprintf("Must be `func(*Context`)` or `func(*Context) error). %s", contactSysadmin)+
fmt.Sprintf("See %s", appActionDeprecationURL), 2)
ignoreFlagPrefix = "test." // this is to ignore test flags when adding flags from other packages
SuggestFlag SuggestFlagFunc = suggestFlag
SuggestCommand SuggestCommandFunc = suggestCommand
@ -103,12 +105,21 @@ type App struct {
// cli.go uses text/template to render templates. You can
// render custom help text by setting this variable.
CustomAppHelpTemplate string
// SliceFlagSeparator is used to customize the separator for SliceFlag, the default is ","
SliceFlagSeparator string
// DisableSliceFlagSeparator is used to disable SliceFlagSeparator, the default is false
DisableSliceFlagSeparator bool
// Boolean to enable short-option handling so user can combine several
// single-character bool arguments into one
// i.e. foobar -o -v -> foobar -ov
UseShortOptionHandling bool
// Enable suggestions for commands and flags
Suggest bool
// Allows global flags set by libraries which use flag.XXXVar(...) directly
// to be parsed through this library
AllowExtFlags bool
// Treat all flags as normal arguments if true
SkipFlagParsing bool
didSetup bool
@ -195,6 +206,16 @@ func (a *App) Setup() {
a.ErrWriter = os.Stderr
}
if a.AllowExtFlags {
// add global flags added by other packages
flag.VisitAll(func(f *flag.Flag) {
// skip test flags
if !strings.HasPrefix(f.Name, ignoreFlagPrefix) {
a.Flags = append(a.Flags, &extFlag{f})
}
})
}
var newCommands []*Command
for _, c := range a.Commands {
@ -241,6 +262,12 @@ func (a *App) Setup() {
if a.Metadata == nil {
a.Metadata = make(map[string]interface{})
}
if len(a.SliceFlagSeparator) != 0 {
defaultSliceFlagSeparator = a.SliceFlagSeparator
}
disableSliceFlagSeparator = a.DisableSliceFlagSeparator
}
func (a *App) newRootCommand() *Command {
@ -264,6 +291,7 @@ func (a *App) newRootCommand() *Command {
HelpName: a.HelpName,
CustomHelpTemplate: a.CustomAppHelpTemplate,
categories: a.categories,
SkipFlagParsing: a.SkipFlagParsing,
isRoot: true,
}
}

View File

@ -136,6 +136,10 @@ func (c *Command) setup(ctx *Context) {
newCmds = append(newCmds, scmd)
}
c.Subcommands = newCmds
if c.BashComplete == nil {
c.BashComplete = DefaultCompleteWithFlags(c)
}
}
func (c *Command) Run(cCtx *Context, arguments ...string) (err error) {
@ -148,11 +152,7 @@ func (c *Command) Run(cCtx *Context, arguments ...string) (err error) {
set, err := c.parseFlags(&a, cCtx.shellComplete)
cCtx.flagSet = set
if c.isRoot {
if checkCompletions(cCtx) {
return nil
}
} else if checkCommandCompletions(cCtx, c.Name) {
if checkCompletions(cCtx) {
return nil
}
@ -203,7 +203,7 @@ func (c *Command) Run(cCtx *Context, arguments ...string) (err error) {
cerr := cCtx.checkRequiredFlags(c.Flags)
if cerr != nil {
_ = ShowSubcommandHelp(cCtx)
_ = helpCommand.Action(cCtx)
return cerr
}
@ -252,7 +252,7 @@ func (c *Command) Run(cCtx *Context, arguments ...string) (err error) {
}
}
}
} else if cCtx.App.DefaultCommand != "" {
} else if c.isRoot && cCtx.App.DefaultCommand != "" {
if dc := cCtx.App.Command(cCtx.App.DefaultCommand); dc != c {
cmd = dc
}

View File

@ -82,7 +82,27 @@ func (cCtx *Context) IsSet(name string) bool {
func (cCtx *Context) LocalFlagNames() []string {
var names []string
cCtx.flagSet.Visit(makeFlagNameVisitor(&names))
return names
// Check the flags which have been set via env or file
if cCtx.Command != nil && cCtx.Command.Flags != nil {
for _, f := range cCtx.Command.Flags {
if f.IsSet() {
names = append(names, f.Names()...)
}
}
}
// Sort out the duplicates since flag could be set via multiple
// paths
m := map[string]struct{}{}
var unames []string
for _, name := range names {
if _, ok := m[name]; !ok {
m[name] = struct{}{}
unames = append(unames, name)
}
}
return unames
}
// FlagNames returns a slice of flag names used by the this context and all of
@ -90,7 +110,7 @@ func (cCtx *Context) LocalFlagNames() []string {
func (cCtx *Context) FlagNames() []string {
var names []string
for _, pCtx := range cCtx.Lineage() {
pCtx.flagSet.Visit(makeFlagNameVisitor(&names))
names = append(names, pCtx.LocalFlagNames()...)
}
return names
}

View File

@ -83,7 +83,7 @@ type ExitCoder interface {
type exitError struct {
exitCode int
message interface{}
err error
}
// NewExitError calls Exit to create a new ExitCoder.
@ -98,23 +98,38 @@ func NewExitError(message interface{}, exitCode int) ExitCoder {
//
// This is the simplest way to trigger a non-zero exit code for an App without
// having to call os.Exit manually. During testing, this behavior can be avoided
// by overiding the ExitErrHandler function on an App or the package-global
// by overriding the ExitErrHandler function on an App or the package-global
// OsExiter function.
func Exit(message interface{}, exitCode int) ExitCoder {
var err error
switch e := message.(type) {
case ErrorFormatter:
err = fmt.Errorf("%+v", message)
case error:
err = e
default:
err = fmt.Errorf("%+v", message)
}
return &exitError{
message: message,
err: err,
exitCode: exitCode,
}
}
func (ee *exitError) Error() string {
return fmt.Sprintf("%v", ee.message)
return ee.err.Error()
}
func (ee *exitError) ExitCode() int {
return ee.exitCode
}
func (ee *exitError) Unwrap() error {
return ee.err
}
// HandleExitCoder handles errors implementing ExitCoder by printing their
// message and calling OsExiter with the given exit code.
//

View File

@ -98,7 +98,7 @@ func (a *App) prepareFishCommands(commands []*Command, allCommands *[]string, pr
a.prepareFishFlags(command.VisibleFlags(), command.Names())...,
)
// recursevly iterate subcommands
// recursively iterate subcommands
if len(command.Subcommands) > 0 {
completions = append(
completions,

View File

@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"io/ioutil"
"os"
"regexp"
"runtime"
"strings"
@ -14,6 +15,11 @@ import (
const defaultPlaceholder = "value"
var (
defaultSliceFlagSeparator = ","
disableSliceFlagSeparator = false
)
var (
slPfx = fmt.Sprintf("sl:::%d:::", time.Now().UTC().UnixNano())
@ -268,19 +274,23 @@ func prefixedNames(names []string, placeholder string) string {
return prefixed
}
func envFormat(envVars []string, prefix, sep, suffix string) string {
if len(envVars) > 0 {
return fmt.Sprintf(" [%s%s%s]", prefix, strings.Join(envVars, sep), suffix)
}
return ""
}
func defaultEnvFormat(envVars []string) string {
return envFormat(envVars, "$", ", $", "")
}
func withEnvHint(envVars []string, str string) string {
envText := ""
if len(envVars) > 0 {
prefix := "$"
suffix := ""
sep := ", $"
if runtime.GOOS == "windows" {
prefix = "%"
suffix = "%"
sep = "%, %"
}
envText = fmt.Sprintf(" [%s%s%s]", prefix, strings.Join(envVars, sep), suffix)
if runtime.GOOS != "windows" || os.Getenv("PSHOME") != "" {
envText = defaultEnvFormat(envVars)
} else {
envText = envFormat(envVars, "%", "%, %", "%")
}
return str + envText
}
@ -373,5 +383,9 @@ func flagFromEnvOrFile(envVars []string, filePath string) (value string, fromWhe
}
func flagSplitMultiValues(val string) []string {
return strings.Split(val, ",")
if disableSliceFlagSeparator {
return []string{val}
}
return strings.Split(val, defaultSliceFlagSeparator)
}

48
vendor/github.com/urfave/cli/v2/flag_ext.go generated vendored Normal file
View File

@ -0,0 +1,48 @@
package cli
import "flag"
type extFlag struct {
f *flag.Flag
}
func (e *extFlag) Apply(fs *flag.FlagSet) error {
fs.Var(e.f.Value, e.f.Name, e.f.Usage)
return nil
}
func (e *extFlag) Names() []string {
return []string{e.f.Name}
}
func (e *extFlag) IsSet() bool {
return false
}
func (e *extFlag) String() string {
return FlagStringer(e)
}
func (e *extFlag) IsVisible() bool {
return true
}
func (e *extFlag) TakesValue() bool {
return false
}
func (e *extFlag) GetUsage() string {
return e.f.Usage
}
func (e *extFlag) GetValue() string {
return e.f.Value.String()
}
func (e *extFlag) GetDefaultText() string {
return e.f.DefValue
}
func (e *extFlag) GetEnvVars() []string {
return nil
}

View File

@ -316,12 +316,21 @@ type App struct {
// cli.go uses text/template to render templates. You can
// render custom help text by setting this variable.
CustomAppHelpTemplate string
// SliceFlagSeparator is used to customize the separator for SliceFlag, the default is ","
SliceFlagSeparator string
// DisableSliceFlagSeparator is used to disable SliceFlagSeparator, the default is false
DisableSliceFlagSeparator bool
// Boolean to enable short-option handling so user can combine several
// single-character bool arguments into one
// i.e. foobar -o -v -> foobar -ov
UseShortOptionHandling bool
// Enable suggestions for commands and flags
Suggest bool
// Allows global flags set by libraries which use flag.XXXVar(...) directly
// to be parsed through this library
AllowExtFlags bool
// Treat all flags as normal arguments if true
SkipFlagParsing bool
// Has unexported fields.
}
@ -841,7 +850,7 @@ func Exit(message interface{}, exitCode int) ExitCoder
This is the simplest way to trigger a non-zero exit code for an App
without having to call os.Exit manually. During testing, this behavior
can be avoided by overiding the ExitErrHandler function on an App or the
can be avoided by overriding the ExitErrHandler function on an App or the
package-global OsExiter function.
func NewExitError(message interface{}, exitCode int) ExitCoder
@ -2430,6 +2439,7 @@ type InputSourceContext interface {
String(name string) (string, error)
StringSlice(name string) ([]string, error)
IntSlice(name string) ([]int, error)
Int64Slice(name string) ([]int64, error)
Generic(name string) (cli.Generic, error)
Bool(name string) (bool, error)
@ -2487,6 +2497,9 @@ func (f *Int64SliceFlag) Apply(set *flag.FlagSet) error
Apply saves the flagSet for later usage calls, then calls the wrapped
Int64SliceFlag.Apply
func (f *Int64SliceFlag) ApplyInputSourceValue(cCtx *cli.Context, isc InputSourceContext) error
ApplyInputSourceValue applies a Int64Slice value if required
type IntFlag struct {
*cli.IntFlag
// Has unexported fields.
@ -2547,6 +2560,10 @@ func (fsm *MapInputSource) Generic(name string) (cli.Generic, error)
func (fsm *MapInputSource) Int(name string) (int, error)
Int returns an int from the map if it exists otherwise returns 0
func (fsm *MapInputSource) Int64Slice(name string) ([]int64, error)
Int64Slice returns an []int64 from the map if it exists otherwise returns
nil
func (fsm *MapInputSource) IntSlice(name string) ([]int, error)
IntSlice returns an []int from the map if it exists otherwise returns nil

View File

@ -60,7 +60,7 @@ var helpCommand = &Command{
}
// Case 1 & 2
// Special case when running help on main app itself as opposed to indivdual
// Special case when running help on main app itself as opposed to individual
// commands/subcommands
if cCtx.parentContext.App == nil {
_ = ShowAppHelp(cCtx)
@ -188,7 +188,7 @@ func printFlagSuggestions(lastArg string, flags []Flag, writer io.Writer) {
// this will get total count utf8 letters in flag name
count := utf8.RuneCountInString(name)
if count > 2 {
count = 2 // resuse this count to generate single - or -- in flag completion
count = 2 // reuse this count to generate single - or -- in flag completion
}
// if flag name has more than one utf8 letter and last argument in cli has -- prefix then
// skip flag completion for short flags example -v or -x
@ -227,7 +227,7 @@ func DefaultCompleteWithFlags(cmd *Command) func(cCtx *Context) {
return
}
printCommandSuggestions(cCtx.App.Commands, cCtx.App.Writer)
printCommandSuggestions(cCtx.Command.Subcommands, cCtx.App.Writer)
}
}
@ -246,7 +246,7 @@ func ShowCommandHelp(ctx *Context, command string) error {
}
for _, c := range commands {
if c.HasName(command) {
if !ctx.App.HideHelpCommand && !c.HasName(helpName) && len(c.Subcommands) != 0 {
if !ctx.App.HideHelpCommand && !c.HasName(helpName) && len(c.Subcommands) != 0 && c.Command(helpName) == nil {
c.Subcommands = append(c.Subcommands, helpCommandDontUse)
}
if !ctx.App.HideHelp && HelpFlag != nil {
@ -308,15 +308,15 @@ func printVersion(cCtx *Context) {
// ShowCompletions prints the lists of commands within a given context
func ShowCompletions(cCtx *Context) {
a := cCtx.App
if a != nil && a.BashComplete != nil {
a.BashComplete(cCtx)
c := cCtx.Command
if c != nil && c.BashComplete != nil {
c.BashComplete(cCtx)
}
}
// ShowCommandCompletions prints the custom completions for a given command
func ShowCommandCompletions(ctx *Context, command string) {
c := ctx.App.Command(command)
c := ctx.Command.Command(command)
if c != nil {
if c.BashComplete != nil {
c.BashComplete(ctx)
@ -453,7 +453,7 @@ func checkCompletions(cCtx *Context) bool {
if args := cCtx.Args(); args.Present() {
name := args.First()
if cmd := cCtx.App.Command(name); cmd != nil {
if cmd := cCtx.Command.Command(name); cmd != nil {
// let the command handle the completion
return false
}
@ -463,15 +463,6 @@ func checkCompletions(cCtx *Context) bool {
return true
}
func checkCommandCompletions(c *Context, name string) bool {
if !c.shellComplete {
return false
}
ShowCommandCompletions(c, name)
return true
}
func subtract(a, b int) int {
return a - b
}

View File

@ -0,0 +1,167 @@
package v2
// Adapted from github.com/libp2p/go-libp2p@v0.23.2/p2p/discovery/backoff/backoffconnector.go
import (
"context"
"errors"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
"github.com/waku-org/go-waku/logging"
"go.uber.org/zap"
lru "github.com/hashicorp/golang-lru"
)
// PeerConnectionStrategy is a utility to connect to peers, but only if we have not recently tried connecting to them already
type PeerConnectionStrategy struct {
cache *lru.TwoQueueCache
host host.Host
cancel context.CancelFunc
wg sync.WaitGroup
minPeers int
dialTimeout time.Duration
peerCh chan peer.AddrInfo
backoff backoff.BackoffFactory
mux sync.Mutex
logger *zap.Logger
}
// NewPeerConnectionStrategy creates a utility to connect to peers, but only if we have not recently tried connecting to them already.
// cacheSize is the size of a TwoQueueCache
// dialTimeout is how long we attempt to connect to a peer before giving up
// minPeers is the minimum number of peers that the node should have
// backoff describes the strategy used to decide how long to backoff after previously attempting to connect to a peer
func NewPeerConnectionStrategy(h host.Host, cacheSize int, minPeers int, dialTimeout time.Duration, backoff backoff.BackoffFactory, logger *zap.Logger) (*PeerConnectionStrategy, error) {
cache, err := lru.New2Q(cacheSize)
if err != nil {
return nil, err
}
return &PeerConnectionStrategy{
cache: cache,
host: h,
wg: sync.WaitGroup{},
minPeers: minPeers,
dialTimeout: dialTimeout,
backoff: backoff,
logger: logger.Named("discovery-connector"),
peerCh: make(chan peer.AddrInfo),
}, nil
}
type connCacheData struct {
nextTry time.Time
strat backoff.BackoffStrategy
}
// PeerChannel exposes the channel on which discovered peers should be pushed
func (c *PeerConnectionStrategy) PeerChannel() chan<- peer.AddrInfo {
return c.peerCh
}
// Start attempts to connect to the peers passed in by peerCh. Will not connect to peers if they are within the backoff period.
func (c *PeerConnectionStrategy) Start(ctx context.Context) error {
if c.cancel != nil {
return errors.New("already started")
}
ctx, cancel := context.WithCancel(ctx)
c.cancel = cancel
c.wg.Add(1)
go c.dialPeers(ctx)
return nil
}
func (c *PeerConnectionStrategy) Stop() {
if c.cancel == nil {
return
}
c.cancel()
c.wg.Wait()
}
func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) {
defer c.wg.Done()
maxGoRoutines := c.minPeers
if maxGoRoutines > 15 {
maxGoRoutines = 15
}
sem := make(chan struct{}, maxGoRoutines)
for {
select {
case pi, ok := <-c.peerCh:
if !ok {
return
}
if pi.ID == c.host.ID() || pi.ID == "" {
continue
}
c.mux.Lock()
val, ok := c.cache.Get(pi.ID)
var cachedPeer *connCacheData
if ok {
tv := val.(*connCacheData)
now := time.Now()
if now.Before(tv.nextTry) {
c.mux.Unlock()
continue
}
tv.nextTry = now.Add(tv.strat.Delay())
} else {
cachedPeer = &connCacheData{strat: c.backoff()}
cachedPeer.nextTry = time.Now().Add(cachedPeer.strat.Delay())
c.cache.Add(pi.ID, cachedPeer)
}
c.mux.Unlock()
if c.host.Network().Connectedness(pi.ID) == network.Connected {
continue
}
sem <- struct{}{}
go func(pi peer.AddrInfo) {
ctx, cancel := context.WithTimeout(ctx, c.dialTimeout)
defer cancel()
err := c.host.Connect(ctx, pi)
if err != nil {
c.logger.Info("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err))
}
<-sem
}(pi)
ticker := time.NewTicker(1 * time.Second)
peerCntLoop:
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if len(c.host.Network().Peers()) < c.minPeers {
ticker.Stop()
break peerCntLoop
}
}
}
case <-ctx.Done():
if ctx.Err() != nil {
c.logger.Info("discovery: backoff connector context error", zap.Error(ctx.Err()))
}
return
}
}
}

View File

@ -3,12 +3,10 @@ package discv5
import (
"context"
"crypto/ecdsa"
"math/rand"
"errors"
"net"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-discover/discover"
@ -17,48 +15,32 @@ import (
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/p2p/nat"
)
type DiscoveryV5 struct {
sync.RWMutex
discovery.Discovery
params *discV5Parameters
host host.Host
config discover.Config
udpAddr *net.UDPAddr
listener *discover.UDPv5
localnode *enode.LocalNode
NAT nat.Interface
params *discV5Parameters
host host.Host
config discover.Config
udpAddr *net.UDPAddr
listener *discover.UDPv5
localnode *enode.LocalNode
peerConnector PeerConnector
NAT nat.Interface
log *zap.Logger
started bool
cancel context.CancelFunc
wg *sync.WaitGroup
peerCache peerCache
}
type peerCache struct {
sync.RWMutex
recs map[peer.ID]PeerRecord
rng *rand.Rand
}
type PeerRecord struct {
expire int64
Peer peer.AddrInfo
Node enode.Node
}
type discV5Parameters struct {
autoUpdate bool
bootnodes []*enode.Node
udpPort int
udpPort uint
advertiseAddr *net.IP
}
@ -84,7 +66,7 @@ func WithAdvertiseAddr(addr net.IP) DiscoveryV5Option {
}
}
func WithUDPPort(port int) DiscoveryV5Option {
func WithUDPPort(port uint) DiscoveryV5Option {
return func(params *discV5Parameters) {
params.udpPort = port
}
@ -96,9 +78,11 @@ func DefaultOptions() []DiscoveryV5Option {
}
}
const MaxPeersToDiscover = 600
type PeerConnector interface {
PeerChannel() chan<- peer.AddrInfo
}
func NewDiscoveryV5(host host.Host, priv *ecdsa.PrivateKey, localnode *enode.LocalNode, log *zap.Logger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) {
func NewDiscoveryV5(host host.Host, priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConnector PeerConnector, log *zap.Logger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) {
params := new(discV5Parameters)
optList := DefaultOptions()
optList = append(optList, opts...)
@ -114,29 +98,22 @@ func NewDiscoveryV5(host host.Host, priv *ecdsa.PrivateKey, localnode *enode.Loc
}
return &DiscoveryV5{
host: host,
params: params,
NAT: NAT,
wg: &sync.WaitGroup{},
peerCache: peerCache{
rng: rand.New(rand.NewSource(rand.Int63())),
recs: make(map[peer.ID]PeerRecord),
},
localnode: localnode,
host: host,
peerConnector: peerConnector,
params: params,
NAT: NAT,
wg: &sync.WaitGroup{},
localnode: localnode,
config: discover.Config{
PrivateKey: priv,
Bootnodes: params.bootnodes,
ValidNodeFn: func(n enode.Node) bool {
// TODO: track https://github.com/status-im/nim-waku/issues/770 for improvements over validation func
return evaluateNode(&n)
},
V5Config: discover.V5Config{
ProtocolID: &protocolID,
},
},
udpAddr: &net.UDPAddr{
IP: net.IPv4zero,
Port: params.udpPort,
Port: int(params.udpPort),
},
log: logger,
}, nil
@ -194,6 +171,7 @@ func (d *DiscoveryV5) Start(ctx context.Context) error {
return err
}
d.wg.Add(1)
go d.runDiscoveryV5Loop(ctx)
return nil
@ -241,27 +219,15 @@ func isWakuNode(node *enode.Node) bool {
}
*/
func hasTCPPort(node *enode.Node) bool {
enrTCP := new(enr.TCP)
if err := node.Record().Load(enr.WithEntry(enrTCP.ENRKey(), enrTCP)); err != nil {
if !enr.IsNotFound(err) {
utils.Logger().Named("discv5").Error("retrieving port for enr", logging.ENode("enr", node))
}
return false
}
return true
}
func evaluateNode(node *enode.Node) bool {
if node == nil || node.IP() == nil {
if node == nil {
return false
}
// TODO: consider node filtering based on ENR; we do not filter based on ENR in the first waku discv5 beta stage
if /*!isWakuNode(node) ||*/ !hasTCPPort(node) {
/*if !isWakuNode(node) {
return false
}
}*/
_, err := utils.EnodeToPeerInfo(node)
@ -273,27 +239,25 @@ func evaluateNode(node *enode.Node) bool {
return true
}
func (d *DiscoveryV5) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
// Get options
var options discovery.Options
err := options.Apply(opts...)
if err != nil {
return 0, err
func (d *DiscoveryV5) Iterator() (enode.Iterator, error) {
if d.listener == nil {
return nil, errors.New("no discv5 listener")
}
// TODO: once discv5 spec introduces capability and topic discovery, implement this function
return 20 * time.Minute, nil
iterator := d.listener.RandomNodes()
return enode.Filter(iterator, evaluateNode), nil
}
func (d *DiscoveryV5) iterate(ctx context.Context, iterator enode.Iterator, limit int) {
defer d.wg.Done()
func (d *DiscoveryV5) iterate(ctx context.Context) {
iterator, err := d.Iterator()
if err != nil {
d.log.Error("obtaining iterator", zap.Error(err))
return
}
defer iterator.Close()
for {
if len(d.peerCache.recs) >= limit {
break
}
if ctx.Err() != nil {
break
}
@ -315,106 +279,33 @@ func (d *DiscoveryV5) iterate(ctx context.Context, iterator enode.Iterator, limi
continue
}
d.peerCache.Lock()
for _, p := range peerAddrs {
d.peerCache.recs[p.ID] = PeerRecord{
expire: time.Now().Unix() + 3600, // Expires in 1hr
Peer: p,
Node: *iterator.Node(),
}
}
d.peerCache.Unlock()
}
}
func (d *DiscoveryV5) removeExpiredPeers() int {
// Remove all expired entries from cache
currentTime := time.Now().Unix()
newCacheSize := len(d.peerCache.recs)
for p := range d.peerCache.recs {
rec := d.peerCache.recs[p]
if rec.expire < currentTime {
newCacheSize--
delete(d.peerCache.recs, p)
if len(peerAddrs) != 0 {
d.peerConnector.PeerChannel() <- peerAddrs[0]
}
}
return newCacheSize
}
func (d *DiscoveryV5) runDiscoveryV5Loop(ctx context.Context) {
iterator := d.listener.RandomNodes()
iterator = enode.Filter(iterator, evaluateNode)
defer iterator.Close()
defer d.wg.Done()
d.wg.Add(1)
ch := make(chan struct{}, 1)
ch <- struct{}{} // Initial execution
go d.iterate(ctx, iterator, MaxPeersToDiscover)
<-ctx.Done()
d.log.Warn("Discv5 loop stopped")
}
func (d *DiscoveryV5) FindNodes(ctx context.Context, topic string, opts ...discovery.Option) ([]PeerRecord, error) {
// Get options
var options discovery.Options
err := options.Apply(opts...)
if err != nil {
return nil, err
}
limit := options.Limit
if limit == 0 || limit > MaxPeersToDiscover {
limit = MaxPeersToDiscover
}
// We are ignoring the topic. Future versions might use a map[string]*peerCache instead where the string represents the pubsub topic
d.peerCache.Lock()
defer d.peerCache.Unlock()
d.removeExpiredPeers()
// Randomize and fill channel with available records
count := len(d.peerCache.recs)
if limit < count {
count = limit
}
perm := d.peerCache.rng.Perm(len(d.peerCache.recs))[0:count]
permSet := make(map[int]int)
for i, v := range perm {
permSet[v] = i
}
sendLst := make([]PeerRecord, count)
iter := 0
for k := range d.peerCache.recs {
if sendIndex, ok := permSet[iter]; ok {
sendLst[sendIndex] = d.peerCache.recs[k]
restartLoop:
for {
select {
case <-ch:
if d.listener == nil {
break
}
d.iterate(ctx)
ch <- struct{}{}
case <-ctx.Done():
close(ch)
break restartLoop
}
iter++
}
return sendLst, err
}
func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
records, err := d.FindNodes(ctx, topic, opts...)
if err != nil {
return nil, err
}
chPeer := make(chan peer.AddrInfo, len(records))
for _, r := range records {
chPeer <- r.Peer
}
close(chPeer)
return chPeer, err
d.log.Warn("Discv5 loop stopped")
}
func (d *DiscoveryV5) IsStarted() bool {

View File

@ -26,24 +26,47 @@ func (w *WakuNode) newLocalnode(priv *ecdsa.PrivateKey) (*enode.LocalNode, error
return enode.NewLocalNode(db, priv), nil
}
func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, priv *ecdsa.PrivateKey, wsAddr []ma.Multiaddr, ipAddr *net.TCPAddr, udpPort int, wakuFlags utils.WakuEnrBitfield, advertiseAddr *net.IP, log *zap.Logger) error {
localnode.SetFallbackUDP(udpPort)
func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, wsAddr []ma.Multiaddr, ipAddr *net.TCPAddr, udpPort uint, wakuFlags utils.WakuEnrBitfield, advertiseAddr *net.IP, shouldAutoUpdate bool, log *zap.Logger) error {
localnode.SetFallbackUDP(int(udpPort))
localnode.Set(enr.WithEntry(utils.WakuENRField, wakuFlags))
localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
localnode.SetStaticIP(ipAddr.IP)
if udpPort > 0 && udpPort <= math.MaxUint16 {
localnode.Set(enr.UDP(uint16(udpPort))) // lgtm [go/incorrect-integer-conversion]
}
if ipAddr.Port > 0 && ipAddr.Port <= math.MaxUint16 {
localnode.Set(enr.TCP(uint16(ipAddr.Port))) // lgtm [go/incorrect-integer-conversion]
} else {
log.Error("setting tcpPort", zap.Int("port", ipAddr.Port))
if udpPort > math.MaxUint16 {
return errors.New("invalid udp port number")
}
if advertiseAddr != nil {
// An advertised address disables libp2p address updates
// and discv5 predictions
localnode.SetStaticIP(*advertiseAddr)
localnode.Set(enr.TCP(uint16(ipAddr.Port))) // TODO: ipv6?
} else if !shouldAutoUpdate {
// We received a libp2p address update. Autoupdate is disabled
// Using a static ip will disable endpoint prediction.
localnode.SetStaticIP(ipAddr.IP)
localnode.Set(enr.TCP(uint16(ipAddr.Port))) // TODO: ipv6?
} else {
// We received a libp2p address update, but we should still
// allow discv5 to update the enr record. We set the localnode
// keys manually. It's possible that the ENR record might get
// updated automatically
ip4 := ipAddr.IP.To4()
ip6 := ipAddr.IP.To16()
if ip4 != nil && !ip4.IsUnspecified() {
localnode.Set(enr.IPv4(ip4))
localnode.Set(enr.TCP(uint16(ipAddr.Port)))
} else {
localnode.Delete(enr.IPv4{})
localnode.Delete(enr.TCP(0))
}
if ip6 != nil && !ip6.IsUnspecified() {
localnode.Set(enr.IPv6(ip6))
localnode.Set(enr.TCP6(ipAddr.Port))
} else {
localnode.Delete(enr.IPv6{})
localnode.Delete(enr.TCP6(0))
}
}
// Adding websocket multiaddresses
@ -222,26 +245,10 @@ func (w *WakuNode) setupENR(ctx context.Context, addrs []ma.Multiaddr) error {
return err
}
// TODO: make this optional depending on DNS Disc being enabled
if w.opts.privKey != nil && w.localNode != nil {
err := w.updateLocalNode(w.localNode, w.opts.privKey, wsAddresses, ipAddr, w.opts.udpPort, w.wakuFlag, w.opts.advertiseAddr, w.log)
if err != nil {
w.log.Error("obtaining ENR record from multiaddress", logging.MultiAddrs("multiaddr", extAddr), zap.Error(err))
return err
} else {
w.log.Info("enr record", logging.ENode("enr", w.localNode.Node()))
// Restarting DiscV5
discV5 := w.DiscV5()
if discV5 != nil && discV5.IsStarted() {
w.log.Info("restarting discv5")
w.discoveryV5.Stop()
err = w.discoveryV5.Start(ctx)
if err != nil {
w.log.Error("could not restart discv5", zap.Error(err))
return err
}
}
}
err = w.updateLocalNode(w.localNode, wsAddresses, ipAddr, w.opts.udpPort, w.wakuFlag, w.opts.advertiseAddr, w.opts.discV5autoUpdate, w.log)
if err != nil {
w.log.Error("obtaining ENR record from multiaddress", logging.MultiAddrs("multiaddr", extAddr), zap.Error(err))
return err
}
return nil

View File

@ -3,6 +3,7 @@ package node
import (
"context"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol"
)
@ -15,3 +16,8 @@ type ReceptorService interface {
Service
MessageChannel() chan *protocol.Envelope
}
type PeerConnectorService interface {
Service
PeerChannel() chan<- peer.AddrInfo
}

View File

@ -4,12 +4,12 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"sync"
"time"
"github.com/libp2p/go-libp2p"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/common"
@ -21,6 +21,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
ws "github.com/libp2p/go-libp2p/p2p/transport/websocket"
ma "github.com/multiformats/go-multiaddr"
"go.opencensus.io/stats"
@ -70,21 +71,20 @@ type WakuNode struct {
log *zap.Logger
timesource timesource.Timesource
relay Service
lightPush Service
swap Service
discoveryV5 Service
peerExchange Service
filter ReceptorService
store ReceptorService
rlnRelay RLNRelay
relay Service
lightPush Service
swap Service
peerConnector PeerConnectorService
discoveryV5 Service
peerExchange Service
filter ReceptorService
store ReceptorService
rlnRelay RLNRelay
wakuFlag utils.WakuEnrBitfield
localNode *enode.LocalNode
addrChan chan ma.Multiaddr
bcaster v2.Broadcaster
connectionNotif ConnectionNotifier
@ -166,7 +166,6 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.opts = params
w.log = params.logger.Named("node2")
w.wg = &sync.WaitGroup{}
w.addrChan = make(chan ma.Multiaddr, 1024)
w.keepAliveFails = make(map[peer.ID]int)
w.wakuFlag = utils.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilter, w.opts.enableStore, w.opts.enableRelay)
@ -181,16 +180,27 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.log.Error("creating localnode", zap.Error(err))
}
// Setup peer connection strategy
cacheSize := 600
rngSrc := rand.NewSource(rand.Int63())
minBackoff, maxBackoff := time.Second*30, time.Hour
bkf := backoff.NewExponentialBackoff(minBackoff, maxBackoff, backoff.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc))
w.peerConnector, err = v2.NewPeerConnectionStrategy(host, cacheSize, w.opts.discoveryMinPeers, network.DialPeerTimeout, bkf, w.log)
if err != nil {
w.log.Error("creating peer connection strategy", zap.Error(err))
}
if w.opts.enableDiscV5 {
err := w.mountDiscV5()
if err != nil {
return nil, err
}
w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(w.DiscV5(), w.opts.discV5Opts...))
}
w.peerExchange = peer_exchange.NewWakuPeerExchange(w.host, w.DiscV5(), w.log)
w.peerExchange, err = peer_exchange.NewWakuPeerExchange(w.host, w.DiscV5(), w.peerConnector, w.log)
if err != nil {
return nil, err
}
w.relay = relay.NewWakuRelay(w.host, w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...)
w.filter = filter.NewWakuFilter(w.host, w.bcaster, w.opts.isFilterFullNode, w.timesource, w.log, w.opts.filterOpts...)
@ -228,14 +238,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
return w, nil
}
func (w *WakuNode) onAddrChange() {
for m := range w.addrChan {
_ = m
// TODO: determine if still needed. Otherwise remove
}
}
func (w *WakuNode) checkForAddressChanges(ctx context.Context) {
func (w *WakuNode) watchMultiaddressChanges(ctx context.Context) {
defer w.wg.Done()
addrs := w.ListenAddresses()
@ -244,7 +247,6 @@ func (w *WakuNode) checkForAddressChanges(ctx context.Context) {
for {
select {
case <-ctx.Done():
close(w.addrChan)
return
case <-first:
w.log.Info("listening", logging.MultiAddrs("multiaddr", addrs...))
@ -264,9 +266,6 @@ func (w *WakuNode) checkForAddressChanges(ctx context.Context) {
if diff {
addrs = newAddrs
w.log.Info("listening addresses update received", logging.MultiAddrs("multiaddr", addrs...))
for _, addr := range addrs {
w.addrChan <- addr
}
_ = w.setupENR(ctx, addrs)
}
}
@ -281,16 +280,21 @@ func (w *WakuNode) Start(ctx context.Context) error {
w.connectionNotif = NewConnectionNotifier(ctx, w.host, w.log)
w.host.Network().Notify(w.connectionNotif)
w.wg.Add(2)
w.wg.Add(3)
go w.connectednessListener(ctx)
go w.checkForAddressChanges(ctx)
go w.onAddrChange()
go w.watchMultiaddressChanges(ctx)
go w.watchENRChanges(ctx)
if w.opts.keepAliveInterval > time.Duration(0) {
w.wg.Add(1)
go w.startKeepAlive(ctx, w.opts.keepAliveInterval)
}
err := w.peerConnector.Start(ctx)
if err != nil {
return err
}
if w.opts.enableNTP {
err := w.timesource.Start(ctx)
if err != nil {
@ -339,7 +343,7 @@ func (w *WakuNode) Start(ctx context.Context) error {
w.bcaster.Register(nil, w.filter.MessageChannel())
}
err := w.setupENR(ctx, w.ListenAddresses())
err = w.setupENR(ctx, w.ListenAddresses())
if err != nil {
return err
}
@ -386,6 +390,8 @@ func (w *WakuNode) Stop() {
w.discoveryV5.Stop()
}
w.peerConnector.Stop()
_ = w.stopRlnRelay()
w.timesource.Stop()
@ -405,6 +411,31 @@ func (w *WakuNode) ID() string {
return w.host.ID().Pretty()
}
func (w *WakuNode) watchENRChanges(ctx context.Context) {
defer w.wg.Done()
timer := time.NewTicker(1 * time.Second)
var prevNodeVal string
for {
select {
case <-ctx.Done():
return
case <-timer.C:
if w.localNode != nil {
currNodeVal := w.localNode.Node().String()
if prevNodeVal != currNodeVal {
if prevNodeVal == "" {
w.log.Info("enr record", logging.ENode("enr", w.localNode.Node()))
} else {
w.log.Info("new enr record", logging.ENode("enr", w.localNode.Node()))
}
prevNodeVal = currNodeVal
}
}
}
}
}
// ListenAddresses returns all the multiaddresses used by the host
func (w *WakuNode) ListenAddresses() []ma.Multiaddr {
hostInfo, _ := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", w.host.ID().Pretty()))
@ -517,7 +548,7 @@ func (w *WakuNode) mountDiscV5() error {
}
var err error
w.discoveryV5, err = discv5.NewDiscoveryV5(w.Host(), w.opts.privKey, w.localNode, w.log, discV5Options...)
w.discoveryV5, err = discv5.NewDiscoveryV5(w.Host(), w.opts.privKey, w.localNode, w.peerConnector, w.log, discV5Options...)
return err
}

View File

@ -74,10 +74,11 @@ type WakuNodeParameters struct {
swapDisconnectThreshold int
swapPaymentThreshold int
discoveryMinPeers int
enableDiscV5 bool
udpPort int
udpPort uint
discV5bootnodes []*enode.Node
discV5Opts []pubsub.DiscoverOpt
discV5autoUpdate bool
enablePeerExchange bool
@ -108,6 +109,7 @@ type WakuNodeOption func(*WakuNodeParameters) error
// Default options used in the libp2p node
var DefaultWakuNodeOptions = []WakuNodeOption{
WithDiscoverParams(150),
WithLogger(utils.Logger()),
}
@ -281,13 +283,19 @@ func WithWakuRelayAndMinPeers(minRelayPeersToPublish int, opts ...pubsub.Option)
}
}
func WithDiscoverParams(minPeers int) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.discoveryMinPeers = minPeers
return nil
}
}
// WithDiscoveryV5 is a WakuOption used to enable DiscV5 peer discovery
func WithDiscoveryV5(udpPort int, bootnodes []*enode.Node, autoUpdate bool, discoverOpts ...pubsub.DiscoverOpt) WakuNodeOption {
func WithDiscoveryV5(udpPort uint, bootnodes []*enode.Node, autoUpdate bool) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableDiscV5 = true
params.udpPort = udpPort
params.discV5bootnodes = bootnodes
params.discV5Opts = discoverOpts
params.discV5autoUpdate = autoUpdate
return nil
}

View File

@ -32,7 +32,6 @@ import (
const PeerExchangeID_v20alpha1 = libp2pProtocol.ID("/vac/waku/peer-exchange/2.0.0-alpha1")
const MaxCacheSize = 1000
const CacheCleanWindow = 200
const dialTimeout = 7 * time.Second
var (
ErrNoPeersAvailable = errors.New("no suitable remote peers")
@ -40,7 +39,7 @@ var (
)
type peerRecord struct {
node enode.Node
node *enode.Node
idx int
}
@ -50,29 +49,35 @@ type WakuPeerExchange struct {
log *zap.Logger
cancel context.CancelFunc
started bool
wg sync.WaitGroup
cancel context.CancelFunc
wg sync.WaitGroup
peerConnector PeerConnector
peerCh chan peer.AddrInfo
enrCache map[enode.ID]peerRecord // todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/
enrCacheMutex sync.RWMutex
rng *rand.Rand
}
type PeerConnector interface {
PeerChannel() chan<- peer.AddrInfo
}
// NewWakuPeerExchange returns a new instance of WakuPeerExchange struct
func NewWakuPeerExchange(h host.Host, disc *discv5.DiscoveryV5, log *zap.Logger) *WakuPeerExchange {
func NewWakuPeerExchange(h host.Host, disc *discv5.DiscoveryV5, peerConnector PeerConnector, log *zap.Logger) (*WakuPeerExchange, error) {
wakuPX := new(WakuPeerExchange)
wakuPX.h = h
wakuPX.disc = disc
wakuPX.log = log.Named("wakupx")
wakuPX.enrCache = make(map[enode.ID]peerRecord)
wakuPX.rng = rand.New(rand.NewSource(rand.Int63()))
return wakuPX
wakuPX.peerConnector = peerConnector
return wakuPX, nil
}
// Start inits the peer exchange protocol
func (wakuPX *WakuPeerExchange) Start(ctx context.Context) error {
if wakuPX.started {
if wakuPX.cancel != nil {
return errors.New("peer exchange already started")
}
@ -80,14 +85,14 @@ func (wakuPX *WakuPeerExchange) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
wakuPX.cancel = cancel
wakuPX.started = true
wakuPX.peerCh = make(chan peer.AddrInfo)
wakuPX.h.SetStreamHandlerMatch(PeerExchangeID_v20alpha1, protocol.PrefixTextMatch(string(PeerExchangeID_v20alpha1)), wakuPX.onRequest(ctx))
wakuPX.log.Info("Peer exchange protocol started")
wakuPX.wg.Add(1)
wakuPX.wg.Add(2)
go wakuPX.runPeerExchangeDiscv5Loop(ctx)
go wakuPX.handleNewPeers(ctx)
return nil
}
@ -115,28 +120,36 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb
return err
}
if wakuPX.h.Network().Connectedness(peerInfo.ID) != network.Connected {
peers = append(peers, *peerInfo)
}
peers = append(peers, *peerInfo)
}
if len(peers) != 0 {
log.Info("connecting to newly discovered peers", zap.Int("count", len(peers)))
for _, p := range peers {
func(p peer.AddrInfo) {
ctx, cancel := context.WithTimeout(ctx, dialTimeout)
defer cancel()
err := wakuPX.h.Connect(ctx, p)
if err != nil {
log.Info("connecting to peer", zap.String("peer", p.ID.Pretty()), zap.Error(err))
}
}(p)
}
wakuPX.wg.Add(1)
go func() {
defer wakuPX.wg.Done()
for _, p := range peers {
wakuPX.peerCh <- p
}
}()
}
return nil
}
func (wakuPX *WakuPeerExchange) handleNewPeers(ctx context.Context) {
defer wakuPX.wg.Done()
for {
select {
case <-ctx.Done():
return
case p := <-wakuPX.peerCh:
wakuPX.peerConnector.PeerChannel() <- p
}
}
}
func (wakuPX *WakuPeerExchange) onRequest(ctx context.Context) func(s network.Stream) {
return func(s network.Stream) {
defer s.Close()
@ -202,8 +215,9 @@ func (wakuPX *WakuPeerExchange) Stop() {
if wakuPX.cancel == nil {
return
}
wakuPX.cancel()
wakuPX.h.RemoveStreamHandler(PeerExchangeID_v20alpha1)
wakuPX.cancel()
close(wakuPX.peerCh)
wakuPX.wg.Wait()
}
@ -307,28 +321,41 @@ func (wakuPX *WakuPeerExchange) cleanCache() {
wakuPX.enrCache = r
}
func (wakuPX *WakuPeerExchange) findPeers(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
peerRecords, err := wakuPX.disc.FindNodes(ctx, "")
func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) {
iterator, err := wakuPX.disc.Iterator()
if err != nil {
wakuPX.log.Error("finding peers", zap.Error(err))
wakuPX.log.Error("obtaining iterator", zap.Error(err))
return
}
defer iterator.Close()
cnt := 0
wakuPX.enrCacheMutex.Lock()
for _, p := range peerRecords {
cnt++
wakuPX.enrCache[p.Node.ID()] = peerRecord{
idx: len(wakuPX.enrCache),
node: p.Node,
for {
if ctx.Err() != nil {
break
}
exists := iterator.Next()
if !exists {
break
}
addresses, err := utils.Multiaddress(iterator.Node())
if err != nil {
wakuPX.log.Error("extracting multiaddrs from enr", zap.Error(err))
continue
}
if len(addresses) == 0 {
continue
}
wakuPX.enrCacheMutex.Lock()
wakuPX.enrCache[iterator.Node().ID()] = peerRecord{
idx: len(wakuPX.enrCache),
node: iterator.Node(),
}
wakuPX.enrCacheMutex.Unlock()
}
wakuPX.enrCacheMutex.Unlock()
wakuPX.log.Info("discovered px peers via discv5", zap.Int("count", cnt))
wakuPX.cleanCache()
}
func (wakuPX *WakuPeerExchange) runPeerExchangeDiscv5Loop(ctx context.Context) {
@ -340,24 +367,23 @@ func (wakuPX *WakuPeerExchange) runPeerExchangeDiscv5Loop(ctx context.Context) {
return
}
wakuPX.log.Info("starting peer exchange discovery v5 loop")
ch := make(chan struct{}, 1)
ch <- struct{}{} // Initial execution
ticker := time.NewTicker(30 * time.Second)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
// This loop "competes" with the loop in wakunode2
// For the purpose of collecting px peers, 30 sec intervals should be enough
wakuPX.findPeers(ctx)
restartLoop:
for {
select {
case <-ctx.Done():
return
case <-ch:
wakuPX.iterate(ctx)
ch <- struct{}{}
case <-ticker.C:
wakuPX.findPeers(ctx)
wakuPX.cleanCache()
case <-ctx.Done():
close(ch)
break restartLoop
}
}
}

4
vendor/modules.txt vendored
View File

@ -964,7 +964,7 @@ github.com/tsenart/tb
## explicit; go 1.14
github.com/tyler-smith/go-bip39
github.com/tyler-smith/go-bip39/wordlists
# github.com/urfave/cli/v2 v2.20.2
# github.com/urfave/cli/v2 v2.23.7
## explicit; go 1.18
github.com/urfave/cli/v2
# github.com/vacp2p/mvds v0.0.24-0.20201124060106-26d8e94130d8
@ -985,7 +985,7 @@ github.com/vacp2p/mvds/transport
github.com/waku-org/go-discover/discover
github.com/waku-org/go-discover/discover/v4wire
github.com/waku-org/go-discover/discover/v5wire
# github.com/waku-org/go-waku v0.3.2-0.20230110124657-7d2a0ac0e25f
# github.com/waku-org/go-waku v0.3.2-0.20230116133230-a4230afc62fb
## explicit; go 1.18
github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/waku/persistence

View File

@ -70,7 +70,6 @@ import (
"github.com/status-im/status-go/wakuv2/common"
"github.com/status-im/status-go/wakuv2/persistence"
"github.com/libp2p/go-libp2p/core/discovery"
node "github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
@ -263,6 +262,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
node.WithHostAddress(hostAddr),
node.WithConnectionStatusChannel(connStatusChan),
node.WithKeepAlive(time.Duration(cfg.KeepAliveInterval) * time.Second),
node.WithDiscoverParams(cfg.DiscoveryLimit),
node.WithLogger(logger),
}
@ -273,7 +273,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
return nil, err
}
opts = append(opts, node.WithDiscoveryV5(cfg.UDPPort, bootnodes, cfg.AutoUpdate, pubsub.WithDiscoveryOpts(discovery.Limit(cfg.DiscoveryLimit))))
opts = append(opts, node.WithDiscoveryV5(uint(cfg.UDPPort), bootnodes, cfg.AutoUpdate))
// Peer exchange requires DiscV5 to run (might change in future versions of the protocol)
if cfg.PeerExchange {
@ -330,6 +330,8 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
go func() {
defer waku.wg.Done()
isConnected := false
for {
select {
case <-waku.quit:
@ -346,6 +348,20 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
}
waku.connStatusMu.Unlock()
signal.SendPeerStats(latestConnStatus)
// Restarting DiscV5
if !latestConnStatus.IsOnline && isConnected {
isConnected = false
waku.node.DiscV5().Stop()
} else if latestConnStatus.IsOnline && !isConnected {
isConnected = true
if !waku.node.DiscV5().IsStarted() {
err := waku.node.DiscV5().Start(ctx)
if err != nil {
waku.logger.Error("Could not start DiscV5", zap.Error(err))
}
}
}
}
}
}()