Restart discovery when it fails
When discovery fails to be seeded with bootstrap/fallback nodes, it never recovers. This commit changes the behavior so that status-go retries fetching bootnodes, and restarts discovery when that happens.
This commit is contained in:
parent
f98e65258a
commit
8593866862
|
@ -6,6 +6,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/status-im/status-go/connection"
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
"github.com/status-im/status-go/waku"
|
||||
wakucommon "github.com/status-im/status-go/waku/common"
|
||||
|
@ -256,6 +257,7 @@ func (w *gethWakuWrapper) MarkP2PMessageAsProcessed(hash common.Hash) {
|
|||
func (w *gethWakuWrapper) RequestStoreMessages(peerID []byte, r types.MessagesRequest) (*types.StoreRequestCursor, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
func (w *gethWakuWrapper) ConnectionChanged(_ connection.State) {}
|
||||
|
||||
type wakuFilterWrapper struct {
|
||||
filter *wakucommon.Filter
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/status-im/status-go/connection"
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
"github.com/status-im/status-go/wakuv2"
|
||||
wakucommon "github.com/status-im/status-go/wakuv2/common"
|
||||
|
@ -272,6 +273,10 @@ func (w *gethWakuV2Wrapper) SubscribeToConnStatusChanges() (*types.ConnStatusSub
|
|||
return w.waku.SubscribeToConnStatusChanges(), nil
|
||||
}
|
||||
|
||||
func (w *gethWakuV2Wrapper) ConnectionChanged(state connection.State) {
|
||||
w.waku.ConnectionChanged(state)
|
||||
}
|
||||
|
||||
type wakuV2FilterWrapper struct {
|
||||
filter *wakucommon.Filter
|
||||
id string
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/pborman/uuid"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/status-im/status-go/connection"
|
||||
)
|
||||
|
||||
type ConnStatus struct {
|
||||
|
@ -149,4 +150,7 @@ type Waku interface {
|
|||
|
||||
// MarkP2PMessageAsProcessed tells the waku layer that a P2P message has been processed
|
||||
MarkP2PMessageAsProcessed(common.Hash)
|
||||
|
||||
// ConnectionChanged is called whenever the client knows its connection status has changed
|
||||
ConnectionChanged(connection.State)
|
||||
}
|
||||
|
|
2
go.mod
2
go.mod
|
@ -80,7 +80,7 @@ require github.com/fogleman/gg v1.3.0
|
|||
require (
|
||||
github.com/gorilla/sessions v1.2.1
|
||||
github.com/meirf/gopart v0.0.0-20180520194036-37e9492a85a8
|
||||
github.com/waku-org/go-waku v0.2.3-test.0.20221209175307-685142e7b743
|
||||
github.com/waku-org/go-waku v0.2.3-test.0.20221212154545-7443daea4cd4
|
||||
)
|
||||
|
||||
require (
|
||||
|
|
4
go.sum
4
go.sum
|
@ -2065,8 +2065,8 @@ github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1
|
|||
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
|
||||
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 h1:xwY0kW5XZFimdqfZb9cZwT1S3VJP9j3AE6bdNd9boXM=
|
||||
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw=
|
||||
github.com/waku-org/go-waku v0.2.3-test.0.20221209175307-685142e7b743 h1:Q6bNLLCE7+OGrRlsmcrglKeURXmaagemIWTrvrJTgK4=
|
||||
github.com/waku-org/go-waku v0.2.3-test.0.20221209175307-685142e7b743/go.mod h1:MzmxeUFKOSGqI+3ditwJVmiDXtWW7p4vZhmFeAcwKyI=
|
||||
github.com/waku-org/go-waku v0.2.3-test.0.20221212154545-7443daea4cd4 h1:lzIDkSLbaAHzYlGgZB9BEWFEbVFZLmFAvzE2fiN61pg=
|
||||
github.com/waku-org/go-waku v0.2.3-test.0.20221212154545-7443daea4cd4/go.mod h1:MzmxeUFKOSGqI+3ditwJVmiDXtWW7p4vZhmFeAcwKyI=
|
||||
github.com/waku-org/go-zerokit-rln v0.1.7-wakuorg h1:2vVIBCtBih2w1K9ll8YnToTDZvbxcgbsClsPlJS/kkg=
|
||||
github.com/waku-org/go-zerokit-rln v0.1.7-wakuorg/go.mod h1:GlyaVeEWNEBxVJrWC6jFTvb4LNb9d9qnjdS6EiWVUvk=
|
||||
github.com/waku-org/noise v1.0.2 h1:7WmlhpJ0eliBzwzKz6SoTqQznaEU2IuebHF3oCekqqs=
|
||||
|
|
|
@ -643,7 +643,11 @@ func (s *MessengerContactRequestSuite) TestAcceptLatestContactRequestForContact(
|
|||
resp, err = WaitOnMessengerResponse(
|
||||
theirMessenger,
|
||||
func(r *MessengerResponse) bool {
|
||||
return len(r.Contacts) > 0 && len(r.Messages()) > 0 && len(r.ActivityCenterNotifications()) > 0
|
||||
contactRequests, _, err := theirMessenger.PendingContactRequests("", 10)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return len(contactRequests) == 1
|
||||
},
|
||||
"no messages",
|
||||
)
|
||||
|
|
|
@ -82,7 +82,9 @@ func (s *MessengerGroupChatSuite) createEmptyGroupChat(creator *Messenger, name
|
|||
func (s *MessengerGroupChatSuite) verifyGroupChatCreated(member *Messenger, expectedChatActive bool) {
|
||||
response, err := WaitOnMessengerResponse(
|
||||
member,
|
||||
func(r *MessengerResponse) bool { return len(r.Chats()) > 0 },
|
||||
func(r *MessengerResponse) bool {
|
||||
return len(r.Chats()) == 1 && r.Chats()[0].Active == expectedChatActive
|
||||
},
|
||||
"chat invitation not received",
|
||||
)
|
||||
s.Require().NoError(err)
|
||||
|
|
|
@ -800,6 +800,7 @@ func (m *Messenger) RemoveFilters(filters []*transport.Filter) error {
|
|||
}
|
||||
|
||||
func (m *Messenger) ConnectionChanged(state connection.State) {
|
||||
m.transport.ConnectionChanged(state)
|
||||
if !m.connectionState.Offline && state.Offline {
|
||||
m.sender.StopDatasync()
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
"go.uber.org/zap"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/status-im/status-go/connection"
|
||||
"github.com/status-im/status-go/eth-node/crypto"
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
)
|
||||
|
@ -79,8 +80,9 @@ type Transport struct {
|
|||
|
||||
// NewTransport returns a new Transport.
|
||||
// TODO: leaving a chat should verify that for a given public key
|
||||
// there are no other chats. It may happen that we leave a private chat
|
||||
// but still have a public chat for a given public key.
|
||||
//
|
||||
// there are no other chats. It may happen that we leave a private chat
|
||||
// but still have a public chat for a given public key.
|
||||
func NewTransport(
|
||||
waku types.Waku,
|
||||
privateKey *ecdsa.PrivateKey,
|
||||
|
@ -654,3 +656,7 @@ func (t *Transport) MarkP2PMessageAsProcessed(hash common.Hash) {
|
|||
func (t *Transport) SubscribeToConnStatusChanges() (*types.ConnStatusSubscription, error) {
|
||||
return t.waku.SubscribeToConnStatusChanges()
|
||||
}
|
||||
|
||||
func (t *Transport) ConnectionChanged(state connection.State) {
|
||||
t.waku.ConnectionChanged(state)
|
||||
}
|
||||
|
|
|
@ -64,13 +64,28 @@ func WithDB(db *sql.DB) DBOption {
|
|||
}
|
||||
}
|
||||
|
||||
type ConnectionPoolOptions struct {
|
||||
MaxOpenConnections int
|
||||
MaxIdleConnections int
|
||||
ConnectionMaxLifetime time.Duration
|
||||
ConnectionMaxIdleTime time.Duration
|
||||
}
|
||||
|
||||
// WithDriver is a DBOption that will open a *sql.DB connection
|
||||
func WithDriver(driverName string, datasourceName string) DBOption {
|
||||
func WithDriver(driverName string, datasourceName string, connectionPoolOptions ...ConnectionPoolOptions) DBOption {
|
||||
return func(d *DBStore) error {
|
||||
db, err := sql.Open(driverName, datasourceName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(connectionPoolOptions) != 0 {
|
||||
db.SetConnMaxIdleTime(connectionPoolOptions[0].ConnectionMaxIdleTime)
|
||||
db.SetConnMaxLifetime(connectionPoolOptions[0].ConnectionMaxLifetime)
|
||||
db.SetMaxIdleConns(connectionPoolOptions[0].MaxIdleConnections)
|
||||
db.SetMaxOpenConns(connectionPoolOptions[0].MaxOpenConnections)
|
||||
}
|
||||
|
||||
d.db = db
|
||||
return nil
|
||||
}
|
||||
|
@ -119,31 +134,8 @@ func NewDBStore(log *zap.Logger, options ...DBOption) (*DBStore, error) {
|
|||
}
|
||||
}
|
||||
|
||||
// Disable concurrent access as not supported by the driver
|
||||
result.db.SetMaxOpenConns(1)
|
||||
|
||||
var seq string
|
||||
var name string
|
||||
var file string // file will be empty if DB is :memory"
|
||||
err := result.db.QueryRow("PRAGMA database_list").Scan(&seq, &name, &file)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// readers do not block writers and faster i/o operations
|
||||
// https://www.sqlite.org/draft/wal.html
|
||||
// must be set after db is encrypted
|
||||
var mode string
|
||||
err = result.db.QueryRow("PRAGMA journal_mode=WAL").Scan(&mode)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if mode != WALMode && file != "" {
|
||||
return nil, fmt.Errorf("unable to set journal_mode to WAL. actual mode %s", mode)
|
||||
}
|
||||
|
||||
if result.enableMigrations {
|
||||
err = migrations.Migrate(result.db)
|
||||
err := migrations.Migrate(result.db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -8,9 +8,6 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||
"github.com/ethereum/go-ethereum/p2p/nat"
|
||||
"github.com/libp2p/go-libp2p/core/discovery"
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
|
@ -18,6 +15,10 @@ import (
|
|||
"github.com/waku-org/go-waku/logging"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||
"github.com/ethereum/go-ethereum/p2p/nat"
|
||||
)
|
||||
|
||||
type DiscoveryV5 struct {
|
||||
|
@ -26,23 +27,20 @@ type DiscoveryV5 struct {
|
|||
discovery.Discovery
|
||||
|
||||
params *discV5Parameters
|
||||
ctx context.Context
|
||||
host host.Host
|
||||
config discover.Config
|
||||
udpAddr *net.UDPAddr
|
||||
listener *discover.UDPv5
|
||||
localnode *enode.LocalNode
|
||||
NAT nat.Interface
|
||||
quit chan struct{}
|
||||
started bool
|
||||
|
||||
log *zap.Logger
|
||||
|
||||
wg *sync.WaitGroup
|
||||
started bool
|
||||
cancel context.CancelFunc
|
||||
wg *sync.WaitGroup
|
||||
|
||||
peerCache peerCache
|
||||
discoverCtx context.Context
|
||||
discoverCancelFunc context.CancelFunc
|
||||
peerCache peerCache
|
||||
}
|
||||
|
||||
type peerCache struct {
|
||||
|
@ -100,7 +98,7 @@ func DefaultOptions() []DiscoveryV5Option {
|
|||
|
||||
const MaxPeersToDiscover = 600
|
||||
|
||||
func NewDiscoveryV5(ctx context.Context, host host.Host, priv *ecdsa.PrivateKey, localnode *enode.LocalNode, log *zap.Logger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) {
|
||||
func NewDiscoveryV5(host host.Host, priv *ecdsa.PrivateKey, localnode *enode.LocalNode, log *zap.Logger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) {
|
||||
params := new(discV5Parameters)
|
||||
optList := DefaultOptions()
|
||||
optList = append(optList, opts...)
|
||||
|
@ -116,7 +114,6 @@ func NewDiscoveryV5(ctx context.Context, host host.Host, priv *ecdsa.PrivateKey,
|
|||
}
|
||||
|
||||
return &DiscoveryV5{
|
||||
ctx: ctx,
|
||||
host: host,
|
||||
params: params,
|
||||
NAT: NAT,
|
||||
|
@ -149,7 +146,7 @@ func (d *DiscoveryV5) Node() *enode.Node {
|
|||
return d.localnode.Node()
|
||||
}
|
||||
|
||||
func (d *DiscoveryV5) listen() error {
|
||||
func (d *DiscoveryV5) listen(ctx context.Context) error {
|
||||
conn, err := net.ListenUDP("udp", d.udpAddr)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -160,7 +157,7 @@ func (d *DiscoveryV5) listen() error {
|
|||
d.wg.Add(1)
|
||||
go func() {
|
||||
defer d.wg.Done()
|
||||
nat.Map(d.NAT, d.quit, "udp", d.udpAddr.Port, d.udpAddr.Port, "go-waku discv5 discovery")
|
||||
nat.Map(d.NAT, ctx.Done(), "udp", d.udpAddr.Port, d.udpAddr.Port, "go-waku discv5 discovery")
|
||||
}()
|
||||
|
||||
}
|
||||
|
@ -182,45 +179,35 @@ func (d *DiscoveryV5) listen() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (d *DiscoveryV5) Start() error {
|
||||
func (d *DiscoveryV5) Start(ctx context.Context) error {
|
||||
d.Lock()
|
||||
defer d.Unlock()
|
||||
|
||||
if d.started {
|
||||
return nil
|
||||
}
|
||||
d.wg.Wait() // Waiting for any go routines to stop
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
d.wg.Wait() // Waiting for other go routines to stop
|
||||
|
||||
d.quit = make(chan struct{}, 1)
|
||||
d.cancel = cancel
|
||||
d.started = true
|
||||
|
||||
err := d.listen()
|
||||
err := d.listen(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// create cancellable
|
||||
d.discoverCtx, d.discoverCancelFunc = context.WithCancel(d.ctx)
|
||||
go d.runDiscoveryV5Loop()
|
||||
go d.runDiscoveryV5Loop(ctx)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *DiscoveryV5) SetBootnodes(nodes []*enode.Node) error {
|
||||
return d.listener.SetFallbackNodes(nodes)
|
||||
return d.listener.SetFallbackNodes(nodes)
|
||||
}
|
||||
|
||||
func (d *DiscoveryV5) Stop() {
|
||||
d.Lock()
|
||||
defer d.Unlock()
|
||||
|
||||
if !d.started {
|
||||
return
|
||||
}
|
||||
|
||||
close(d.quit)
|
||||
d.discoverCancelFunc()
|
||||
d.cancel()
|
||||
|
||||
d.listener.Close()
|
||||
d.listener = nil
|
||||
|
@ -294,7 +281,7 @@ func (d *DiscoveryV5) Advertise(ctx context.Context, ns string, opts ...discover
|
|||
return 20 * time.Minute, nil
|
||||
}
|
||||
|
||||
func (d *DiscoveryV5) iterate(iterator enode.Iterator, limit int, doneCh chan struct{}) {
|
||||
func (d *DiscoveryV5) iterate(ctx context.Context, iterator enode.Iterator, limit int) {
|
||||
defer d.wg.Done()
|
||||
|
||||
for {
|
||||
|
@ -302,7 +289,7 @@ func (d *DiscoveryV5) iterate(iterator enode.Iterator, limit int, doneCh chan st
|
|||
break
|
||||
}
|
||||
|
||||
if d.discoverCtx.Err() != nil {
|
||||
if ctx.Err() != nil {
|
||||
break
|
||||
}
|
||||
|
||||
|
@ -333,8 +320,6 @@ func (d *DiscoveryV5) iterate(iterator enode.Iterator, limit int, doneCh chan st
|
|||
}
|
||||
d.peerCache.Unlock()
|
||||
}
|
||||
|
||||
close(doneCh)
|
||||
}
|
||||
|
||||
func (d *DiscoveryV5) removeExpiredPeers() int {
|
||||
|
@ -353,21 +338,16 @@ func (d *DiscoveryV5) removeExpiredPeers() int {
|
|||
return newCacheSize
|
||||
}
|
||||
|
||||
func (d *DiscoveryV5) runDiscoveryV5Loop() {
|
||||
func (d *DiscoveryV5) runDiscoveryV5Loop(ctx context.Context) {
|
||||
iterator := d.listener.RandomNodes()
|
||||
iterator = enode.Filter(iterator, evaluateNode)
|
||||
defer iterator.Close()
|
||||
|
||||
doneCh := make(chan struct{})
|
||||
|
||||
d.wg.Add(1)
|
||||
|
||||
go d.iterate(iterator, MaxPeersToDiscover, doneCh)
|
||||
go d.iterate(ctx, iterator, MaxPeersToDiscover)
|
||||
|
||||
select {
|
||||
case <-d.discoverCtx.Done():
|
||||
case <-doneCh:
|
||||
}
|
||||
<-ctx.Done()
|
||||
|
||||
d.log.Warn("Discv5 loop stopped")
|
||||
}
|
||||
|
|
|
@ -96,7 +96,7 @@ func (w *WakuNode) connectednessListener() {
|
|||
|
||||
for {
|
||||
select {
|
||||
case <-w.quit:
|
||||
case <-w.ctx.Done():
|
||||
return
|
||||
case <-w.protocolEventSub.Out():
|
||||
case <-w.identificationEventSub.Out():
|
||||
|
|
|
@ -54,7 +54,8 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
|
|||
}
|
||||
|
||||
lastTimeExecuted = w.timesource.Now()
|
||||
case <-w.quit:
|
||||
case <-w.ctx.Done():
|
||||
w.log.Info("stopping ping protocol")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
@ -241,7 +241,7 @@ func (w *WakuNode) setupENR(addrs []ma.Multiaddr) error {
|
|||
if w.discoveryV5 != nil && w.discoveryV5.IsStarted() {
|
||||
w.log.Info("restarting discv5")
|
||||
w.discoveryV5.Stop()
|
||||
err = w.discoveryV5.Start()
|
||||
err = w.discoveryV5.Start(w.ctx)
|
||||
if err != nil {
|
||||
w.log.Error("could not restart discv5", zap.Error(err))
|
||||
return err
|
||||
|
|
|
@ -8,11 +8,12 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/libp2p/go-libp2p"
|
||||
"go.uber.org/zap"
|
||||
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
"github.com/libp2p/go-libp2p/core/event"
|
||||
|
@ -94,9 +95,8 @@ type WakuNode struct {
|
|||
keepAliveMutex sync.Mutex
|
||||
keepAliveFails map[peer.ID]int
|
||||
|
||||
ctx context.Context
|
||||
ctx context.Context // TODO: remove this
|
||||
cancel context.CancelFunc
|
||||
quit chan struct{}
|
||||
wg *sync.WaitGroup
|
||||
|
||||
// Channel passed to WakuNode constructor
|
||||
|
@ -170,7 +170,6 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
|
|||
w.ctx = ctx
|
||||
w.opts = params
|
||||
w.log = params.logger.Named("node2")
|
||||
w.quit = make(chan struct{})
|
||||
w.wg = &sync.WaitGroup{}
|
||||
w.addrChan = make(chan ma.Multiaddr, 1024)
|
||||
w.keepAliveFails = make(map[peer.ID]int)
|
||||
|
@ -235,7 +234,7 @@ func (w *WakuNode) checkForAddressChanges() {
|
|||
first <- struct{}{}
|
||||
for {
|
||||
select {
|
||||
case <-w.quit:
|
||||
case <-w.ctx.Done():
|
||||
close(w.addrChan)
|
||||
return
|
||||
case <-first:
|
||||
|
@ -268,7 +267,7 @@ func (w *WakuNode) checkForAddressChanges() {
|
|||
// Start initializes all the protocols that were setup in the WakuNode
|
||||
func (w *WakuNode) Start() error {
|
||||
if w.opts.enableNTP {
|
||||
err := w.timesource.Start()
|
||||
err := w.timesource.Start(w.ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -357,9 +356,7 @@ func (w *WakuNode) Start() error {
|
|||
|
||||
// Stop stops the WakuNode and closess all connections to the host
|
||||
func (w *WakuNode) Stop() {
|
||||
defer w.cancel()
|
||||
|
||||
close(w.quit)
|
||||
w.cancel()
|
||||
|
||||
w.bcaster.Close()
|
||||
|
||||
|
@ -523,14 +520,14 @@ func (w *WakuNode) mountDiscV5() error {
|
|||
}
|
||||
|
||||
var err error
|
||||
w.discoveryV5, err = discv5.NewDiscoveryV5(w.ctx, w.Host(), w.opts.privKey, w.localNode, w.log, discV5Options...)
|
||||
w.discoveryV5, err = discv5.NewDiscoveryV5(w.Host(), w.opts.privKey, w.localNode, w.log, discV5Options...)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *WakuNode) mountPeerExchange() error {
|
||||
w.peerExchange = peer_exchange.NewWakuPeerExchange(w.ctx, w.host, w.discoveryV5, w.log)
|
||||
return w.peerExchange.Start()
|
||||
w.peerExchange = peer_exchange.NewWakuPeerExchange(w.host, w.discoveryV5, w.log)
|
||||
return w.peerExchange.Start(w.ctx)
|
||||
}
|
||||
|
||||
func (w *WakuNode) startStore() error {
|
||||
|
@ -673,6 +670,12 @@ func (w *WakuNode) PeerStats() PeerStats {
|
|||
return p
|
||||
}
|
||||
|
||||
// Set the bootnodes on discv5
|
||||
func (w *WakuNode) SetDiscV5Bootnodes(nodes []*enode.Node) error {
|
||||
w.opts.discV5bootnodes = nodes
|
||||
return w.discoveryV5.SetBootnodes(nodes)
|
||||
}
|
||||
|
||||
// Peers return the list of peers, addresses, protocols supported and connection status
|
||||
func (w *WakuNode) Peers() ([]*Peer, error) {
|
||||
var peers []*Peer
|
||||
|
|
|
@ -4,7 +4,6 @@
|
|||
package node
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
|
||||
|
@ -81,7 +80,7 @@ func (w *WakuNode) mountRlnRelay() error {
|
|||
|
||||
// mount the rln relay protocol in the on-chain/dynamic mode
|
||||
var err error
|
||||
w.rlnRelay, err = rln.RlnRelayDynamic(context.Background(), w.relay, w.opts.rlnETHClientAddress, w.opts.rlnETHPrivateKey, w.opts.rlnMembershipContractAddress, memKeyPair, w.opts.rlnRelayMemIndex, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.opts.rlnRegistrationHandler, w.timesource, w.log)
|
||||
w.rlnRelay, err = rln.RlnRelayDynamic(w.ctx, w.relay, w.opts.rlnETHClientAddress, w.opts.rlnETHPrivateKey, w.opts.rlnMembershipContractAddress, memKeyPair, w.opts.rlnRelayMemIndex, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.opts.rlnRegistrationHandler, w.timesource, w.log)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -317,8 +317,9 @@ func WithWakuFilter(fullNode bool, filterOpts ...filter.Option) WakuNodeOption {
|
|||
}
|
||||
|
||||
// WithWakuStore enables the Waku V2 Store protocol and if the messages should
|
||||
// be stored or not in a message provider
|
||||
func WithWakuStore(shouldStoreMessages bool, resumeNodes []multiaddr.Multiaddr) WakuNodeOption {
|
||||
// be stored or not in a message provider. If resumeNodes are specified, the
|
||||
// store will attempt to resume message history using those nodes
|
||||
func WithWakuStore(shouldStoreMessages bool, resumeNodes ...multiaddr.Multiaddr) WakuNodeOption {
|
||||
return func(params *WakuNodeParameters) error {
|
||||
params.enableStore = true
|
||||
params.storeMsgs = shouldStoreMessages
|
||||
|
|
125
vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/waku_peer_exchange.go
generated
vendored
125
vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/waku_peer_exchange.go
generated
vendored
|
@ -46,24 +46,21 @@ type peerRecord struct {
|
|||
|
||||
type WakuPeerExchange struct {
|
||||
h host.Host
|
||||
ctx context.Context
|
||||
disc *discv5.DiscoveryV5
|
||||
|
||||
log *zap.Logger
|
||||
quit chan struct{}
|
||||
wg sync.WaitGroup
|
||||
log *zap.Logger
|
||||
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
|
||||
enrCache map[enode.ID]peerRecord // todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/
|
||||
enrCacheMutex sync.RWMutex
|
||||
rng *rand.Rand
|
||||
|
||||
started bool
|
||||
}
|
||||
|
||||
// NewWakuPeerExchange returns a new instance of WakuPeerExchange struct
|
||||
func NewWakuPeerExchange(ctx context.Context, h host.Host, disc *discv5.DiscoveryV5, log *zap.Logger) *WakuPeerExchange {
|
||||
func NewWakuPeerExchange(h host.Host, disc *discv5.DiscoveryV5, log *zap.Logger) *WakuPeerExchange {
|
||||
wakuPX := new(WakuPeerExchange)
|
||||
wakuPX.ctx = ctx
|
||||
wakuPX.h = h
|
||||
wakuPX.disc = disc
|
||||
wakuPX.log = log.Named("wakupx")
|
||||
|
@ -73,19 +70,21 @@ func NewWakuPeerExchange(ctx context.Context, h host.Host, disc *discv5.Discover
|
|||
}
|
||||
|
||||
// Start inits the peer exchange protocol
|
||||
func (wakuPX *WakuPeerExchange) Start() error {
|
||||
wakuPX.h.SetStreamHandlerMatch(PeerExchangeID_v20alpha1, protocol.PrefixTextMatch(string(PeerExchangeID_v20alpha1)), wakuPX.onRequest)
|
||||
func (wakuPX *WakuPeerExchange) Start(ctx context.Context) error {
|
||||
wakuPX.wg.Wait() // Waiting for any go routines to stop
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
wakuPX.cancel = cancel
|
||||
|
||||
wakuPX.h.SetStreamHandlerMatch(PeerExchangeID_v20alpha1, protocol.PrefixTextMatch(string(PeerExchangeID_v20alpha1)), wakuPX.onRequest(ctx))
|
||||
wakuPX.log.Info("Peer exchange protocol started")
|
||||
wakuPX.started = true
|
||||
wakuPX.quit = make(chan struct{}, 1)
|
||||
|
||||
wakuPX.wg.Add(1)
|
||||
go wakuPX.runPeerExchangeDiscv5Loop()
|
||||
go wakuPX.runPeerExchangeDiscv5Loop(ctx)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (wakuPX *WakuPeerExchange) handleResponse(response *pb.PeerExchangeResponse) error {
|
||||
func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb.PeerExchangeResponse) error {
|
||||
var peers []peer.AddrInfo
|
||||
for _, p := range response.PeerInfos {
|
||||
enrRecord := &enr.Record{}
|
||||
|
@ -118,7 +117,7 @@ func (wakuPX *WakuPeerExchange) handleResponse(response *pb.PeerExchangeResponse
|
|||
log.Info("connecting to newly discovered peers", zap.Int("count", len(peers)))
|
||||
for _, p := range peers {
|
||||
func(p peer.AddrInfo) {
|
||||
ctx, cancel := context.WithTimeout(wakuPX.ctx, dialTimeout)
|
||||
ctx, cancel := context.WithTimeout(ctx, dialTimeout)
|
||||
defer cancel()
|
||||
err := wakuPX.h.Connect(ctx, p)
|
||||
if err != nil {
|
||||
|
@ -131,35 +130,37 @@ func (wakuPX *WakuPeerExchange) handleResponse(response *pb.PeerExchangeResponse
|
|||
return nil
|
||||
}
|
||||
|
||||
func (wakuPX *WakuPeerExchange) onRequest(s network.Stream) {
|
||||
defer s.Close()
|
||||
logger := wakuPX.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
|
||||
requestRPC := &pb.PeerExchangeRPC{}
|
||||
reader := protoio.NewDelimitedReader(s, math.MaxInt32)
|
||||
err := reader.ReadMsg(requestRPC)
|
||||
if err != nil {
|
||||
logger.Error("reading request", zap.Error(err))
|
||||
metrics.RecordPeerExchangeError(wakuPX.ctx, "decodeRpcFailure")
|
||||
return
|
||||
}
|
||||
|
||||
if requestRPC.Query != nil {
|
||||
logger.Info("request received")
|
||||
err := wakuPX.respond(requestRPC.Query.NumPeers, s.Conn().RemotePeer())
|
||||
func (wakuPX *WakuPeerExchange) onRequest(ctx context.Context) func(s network.Stream) {
|
||||
return func(s network.Stream) {
|
||||
defer s.Close()
|
||||
logger := wakuPX.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
|
||||
requestRPC := &pb.PeerExchangeRPC{}
|
||||
reader := protoio.NewDelimitedReader(s, math.MaxInt32)
|
||||
err := reader.ReadMsg(requestRPC)
|
||||
if err != nil {
|
||||
logger.Error("responding", zap.Error(err))
|
||||
metrics.RecordPeerExchangeError(wakuPX.ctx, "pxFailure")
|
||||
logger.Error("reading request", zap.Error(err))
|
||||
metrics.RecordPeerExchangeError(ctx, "decodeRpcFailure")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if requestRPC.Response != nil {
|
||||
logger.Info("response received")
|
||||
err := wakuPX.handleResponse(requestRPC.Response)
|
||||
if err != nil {
|
||||
logger.Error("handling response", zap.Error(err))
|
||||
metrics.RecordPeerExchangeError(wakuPX.ctx, "pxFailure")
|
||||
return
|
||||
if requestRPC.Query != nil {
|
||||
logger.Info("request received")
|
||||
err := wakuPX.respond(ctx, requestRPC.Query.NumPeers, s.Conn().RemotePeer())
|
||||
if err != nil {
|
||||
logger.Error("responding", zap.Error(err))
|
||||
metrics.RecordPeerExchangeError(ctx, "pxFailure")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if requestRPC.Response != nil {
|
||||
logger.Info("response received")
|
||||
err := wakuPX.handleResponse(ctx, requestRPC.Response)
|
||||
if err != nil {
|
||||
logger.Error("handling response", zap.Error(err))
|
||||
metrics.RecordPeerExchangeError(ctx, "pxFailure")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -176,7 +177,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
|
|||
}
|
||||
|
||||
if params.selectedPeer == "" {
|
||||
metrics.RecordPeerExchangeError(wakuPX.ctx, "dialError")
|
||||
metrics.RecordPeerExchangeError(ctx, "dialError")
|
||||
return ErrNoPeersAvailable
|
||||
}
|
||||
|
||||
|
@ -186,35 +187,27 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
|
|||
},
|
||||
}
|
||||
|
||||
return wakuPX.sendPeerExchangeRPCToPeer(requestRPC, params.selectedPeer)
|
||||
}
|
||||
|
||||
// IsStarted returns if the peer exchange protocol has been mounted or not
|
||||
func (wakuPX *WakuPeerExchange) IsStarted() bool {
|
||||
return wakuPX.started
|
||||
return wakuPX.sendPeerExchangeRPCToPeer(ctx, requestRPC, params.selectedPeer)
|
||||
}
|
||||
|
||||
// Stop unmounts the peer exchange protocol
|
||||
func (wakuPX *WakuPeerExchange) Stop() {
|
||||
if wakuPX.started {
|
||||
wakuPX.h.RemoveStreamHandler(PeerExchangeID_v20alpha1)
|
||||
wakuPX.started = false
|
||||
close(wakuPX.quit)
|
||||
wakuPX.wg.Wait()
|
||||
}
|
||||
wakuPX.cancel()
|
||||
wakuPX.h.RemoveStreamHandler(PeerExchangeID_v20alpha1)
|
||||
wakuPX.wg.Wait()
|
||||
}
|
||||
|
||||
func (wakuPX *WakuPeerExchange) sendPeerExchangeRPCToPeer(rpc *pb.PeerExchangeRPC, peerID peer.ID) error {
|
||||
func (wakuPX *WakuPeerExchange) sendPeerExchangeRPCToPeer(ctx context.Context, rpc *pb.PeerExchangeRPC, peerID peer.ID) error {
|
||||
logger := wakuPX.log.With(logging.HostID("peer", peerID))
|
||||
|
||||
// We connect first so dns4 addresses are resolved (NewStream does not do it)
|
||||
err := wakuPX.h.Connect(wakuPX.ctx, wakuPX.h.Peerstore().PeerInfo(peerID))
|
||||
err := wakuPX.h.Connect(ctx, wakuPX.h.Peerstore().PeerInfo(peerID))
|
||||
if err != nil {
|
||||
logger.Error("connecting peer", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
connOpt, err := wakuPX.h.NewStream(wakuPX.ctx, peerID, PeerExchangeID_v20alpha1)
|
||||
connOpt, err := wakuPX.h.NewStream(ctx, peerID, PeerExchangeID_v20alpha1)
|
||||
if err != nil {
|
||||
logger.Error("creating stream to peer", zap.Error(err))
|
||||
return err
|
||||
|
@ -231,7 +224,7 @@ func (wakuPX *WakuPeerExchange) sendPeerExchangeRPCToPeer(rpc *pb.PeerExchangeRP
|
|||
return nil
|
||||
}
|
||||
|
||||
func (wakuPX *WakuPeerExchange) respond(numPeers uint64, peerID peer.ID) error {
|
||||
func (wakuPX *WakuPeerExchange) respond(ctx context.Context, numPeers uint64, peerID peer.ID) error {
|
||||
records, err := wakuPX.getENRsFromCache(numPeers)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -241,7 +234,7 @@ func (wakuPX *WakuPeerExchange) respond(numPeers uint64, peerID peer.ID) error {
|
|||
responseRPC.Response = new(pb.PeerExchangeResponse)
|
||||
responseRPC.Response.PeerInfos = records
|
||||
|
||||
return wakuPX.sendPeerExchangeRPCToPeer(responseRPC, peerID)
|
||||
return wakuPX.sendPeerExchangeRPCToPeer(ctx, responseRPC, peerID)
|
||||
}
|
||||
|
||||
func (wakuPX *WakuPeerExchange) getENRsFromCache(numPeers uint64) ([]*pb.PeerInfo, error) {
|
||||
|
@ -304,12 +297,8 @@ func (wakuPX *WakuPeerExchange) cleanCache() {
|
|||
wakuPX.enrCache = r
|
||||
}
|
||||
|
||||
func (wakuPX *WakuPeerExchange) findPeers() {
|
||||
if !wakuPX.disc.IsStarted() {
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(wakuPX.ctx, 2*time.Second)
|
||||
func (wakuPX *WakuPeerExchange) findPeers(ctx context.Context) {
|
||||
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
|
||||
defer cancel()
|
||||
peerRecords, err := wakuPX.disc.FindNodes(ctx, "")
|
||||
if err != nil {
|
||||
|
@ -332,7 +321,7 @@ func (wakuPX *WakuPeerExchange) findPeers() {
|
|||
wakuPX.cleanCache()
|
||||
}
|
||||
|
||||
func (wakuPX *WakuPeerExchange) runPeerExchangeDiscv5Loop() {
|
||||
func (wakuPX *WakuPeerExchange) runPeerExchangeDiscv5Loop(ctx context.Context) {
|
||||
defer wakuPX.wg.Done()
|
||||
|
||||
// Runs a discv5 loop adding new peers to the px peer cache
|
||||
|
@ -349,15 +338,15 @@ func (wakuPX *WakuPeerExchange) runPeerExchangeDiscv5Loop() {
|
|||
// This loop "competes" with the loop in wakunode2
|
||||
// For the purpose of collecting px peers, 30 sec intervals should be enough
|
||||
|
||||
wakuPX.findPeers()
|
||||
wakuPX.findPeers(ctx)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-wakuPX.quit:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case <-ticker.C:
|
||||
wakuPX.findPeers()
|
||||
wakuPX.findPeers(ctx)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -247,7 +247,7 @@ func (w *WakuRelay) SubscribeToTopic(ctx context.Context, topic string) (*Subscr
|
|||
w.bcaster.Register(&topic, subscription.C)
|
||||
}
|
||||
|
||||
go w.subscribeToTopic(topic, subscription, sub)
|
||||
go w.subscribeToTopic(ctx, topic, subscription, sub)
|
||||
|
||||
return subscription, nil
|
||||
}
|
||||
|
@ -307,8 +307,8 @@ func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) <
|
|||
return msgChannel
|
||||
}
|
||||
|
||||
func (w *WakuRelay) subscribeToTopic(t string, subscription *Subscription, sub *pubsub.Subscription) {
|
||||
ctx, err := tag.New(context.Background(), tag.Insert(metrics.KeyType, "relay"))
|
||||
func (w *WakuRelay) subscribeToTopic(ctx context.Context, t string, subscription *Subscription, sub *pubsub.Subscription) {
|
||||
ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "relay"))
|
||||
if err != nil {
|
||||
w.log.Error("creating tag map", zap.Error(err))
|
||||
return
|
||||
|
|
|
@ -33,7 +33,7 @@ func register(ctx context.Context, idComm r.IDCommitment, ethAccountPrivateKey *
|
|||
}
|
||||
defer backend.Close()
|
||||
|
||||
chainID, err := backend.ChainID(context.Background())
|
||||
chainID, err := backend.ChainID(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -256,10 +256,13 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
|
|||
result := &Result{
|
||||
Messages: response.Messages,
|
||||
query: q,
|
||||
cursor: response.PagingInfo.Cursor,
|
||||
peerId: params.selectedPeer,
|
||||
}
|
||||
|
||||
if response.PagingInfo != nil {
|
||||
result.cursor = response.PagingInfo.Cursor
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ package timesource
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"sort"
|
||||
"sync"
|
||||
|
@ -133,8 +134,8 @@ type NTPTimeSource struct {
|
|||
timeQuery ntpQuery // for ease of testing
|
||||
log *zap.Logger
|
||||
|
||||
quit chan struct{}
|
||||
wg sync.WaitGroup
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
|
||||
mu sync.RWMutex
|
||||
latestOffset time.Duration
|
||||
|
@ -162,9 +163,11 @@ func (s *NTPTimeSource) updateOffset() error {
|
|||
|
||||
// runPeriodically runs periodically the given function based on NTPTimeSource
|
||||
// synchronization limits (fastNTPSyncPeriod / slowNTPSyncPeriod)
|
||||
func (s *NTPTimeSource) runPeriodically(fn func() error) error {
|
||||
func (s *NTPTimeSource) runPeriodically(ctx context.Context, fn func() error) error {
|
||||
var period time.Duration
|
||||
s.quit = make(chan struct{})
|
||||
|
||||
s.log.Info("starting service")
|
||||
|
||||
// we try to do it synchronously so that user can have reliable messages right away
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
|
@ -177,7 +180,8 @@ func (s *NTPTimeSource) runPeriodically(fn func() error) error {
|
|||
period = s.fastNTPSyncPeriod
|
||||
}
|
||||
|
||||
case <-s.quit:
|
||||
case <-ctx.Done():
|
||||
s.log.Info("stopping service")
|
||||
s.wg.Done()
|
||||
return
|
||||
}
|
||||
|
@ -188,16 +192,16 @@ func (s *NTPTimeSource) runPeriodically(fn func() error) error {
|
|||
}
|
||||
|
||||
// Start runs a goroutine that updates local offset every updatePeriod.
|
||||
func (s *NTPTimeSource) Start() error {
|
||||
return s.runPeriodically(s.updateOffset)
|
||||
func (s *NTPTimeSource) Start(ctx context.Context) error {
|
||||
s.wg.Wait() // Waiting for other go routines to stop
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
s.cancel = cancel
|
||||
return s.runPeriodically(ctx, s.updateOffset)
|
||||
}
|
||||
|
||||
// Stop goroutine that updates time source.
|
||||
func (s *NTPTimeSource) Stop() error {
|
||||
if s.quit == nil {
|
||||
return nil
|
||||
}
|
||||
close(s.quit)
|
||||
s.cancel()
|
||||
s.wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -1,9 +1,12 @@
|
|||
package timesource
|
||||
|
||||
import "time"
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Timesource interface {
|
||||
Now() time.Time
|
||||
Start() error
|
||||
Start(ctx context.Context) error
|
||||
Stop() error
|
||||
}
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
package timesource
|
||||
|
||||
import "time"
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
type WallClockTimeSource struct {
|
||||
}
|
||||
|
@ -13,7 +16,7 @@ func (t *WallClockTimeSource) Now() time.Time {
|
|||
return time.Now()
|
||||
}
|
||||
|
||||
func (t *WallClockTimeSource) Start() error {
|
||||
func (t *WallClockTimeSource) Start(ctx context.Context) error {
|
||||
// Do nothing
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -990,7 +990,7 @@ github.com/vacp2p/mvds/transport
|
|||
github.com/waku-org/go-discover/discover
|
||||
github.com/waku-org/go-discover/discover/v4wire
|
||||
github.com/waku-org/go-discover/discover/v5wire
|
||||
# github.com/waku-org/go-waku v0.2.3-test.0.20221209175307-685142e7b743
|
||||
# github.com/waku-org/go-waku v0.2.3-test.0.20221212154545-7443daea4cd4
|
||||
## explicit; go 1.18
|
||||
github.com/waku-org/go-waku/logging
|
||||
github.com/waku-org/go-waku/waku/persistence
|
||||
|
|
128
wakuv2/waku.go
128
wakuv2/waku.go
|
@ -25,6 +25,7 @@ import (
|
|||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
"net"
|
||||
"runtime"
|
||||
|
@ -62,6 +63,7 @@ import (
|
|||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
|
||||
"github.com/status-im/status-go/connection"
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
"github.com/status-im/status-go/signal"
|
||||
"github.com/status-im/status-go/timesource"
|
||||
|
@ -77,6 +79,8 @@ import (
|
|||
const messageQueueLimit = 1024
|
||||
const requestTimeout = 30 * time.Second
|
||||
const autoRelayMinInterval = 2 * time.Second
|
||||
const bootnodesQueryBackoffMs = 200
|
||||
const bootnodesMaxRetries = 7
|
||||
|
||||
type settings struct {
|
||||
LightClient bool // Indicates if the node is a light client
|
||||
|
@ -132,12 +136,29 @@ type Waku struct {
|
|||
|
||||
// NTP Synced timesource
|
||||
timesource *timesource.NTPTimeSource
|
||||
|
||||
// seededBootnodesForDiscV5 indicates whether we manage to retrieve discovery
|
||||
// bootnodes successfully
|
||||
seededBootnodesForDiscV5 bool
|
||||
|
||||
// offline indicates whether we have detected connectivity
|
||||
offline bool
|
||||
|
||||
// connectionChanged is channel that notifies when connectivity has changed
|
||||
connectionChanged chan struct{}
|
||||
|
||||
// discV5BootstrapNodes is the ENR to be used to fetch bootstrap nodes for discovery
|
||||
discV5BootstrapNodes []string
|
||||
}
|
||||
|
||||
// New creates a WakuV2 client ready to communicate through the LibP2P network.
|
||||
func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *sql.DB, timesource *timesource.NTPTimeSource) (*Waku, error) {
|
||||
var err error
|
||||
if logger == nil {
|
||||
logger = zap.NewNop()
|
||||
logger, err = zap.NewDevelopment()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
cfg = setDefaults(cfg)
|
||||
|
@ -154,6 +175,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
|
|||
sendQueue: make(chan *pb.WakuMessage, 1000),
|
||||
connStatusSubscriptions: make(map[string]*types.ConnStatusSubscription),
|
||||
quit: make(chan struct{}),
|
||||
connectionChanged: make(chan struct{}),
|
||||
wg: sync.WaitGroup{},
|
||||
dnsAddressCache: make(map[string][]dnsdisc.DiscoveredNode),
|
||||
dnsAddressCacheLock: &sync.RWMutex{},
|
||||
|
@ -161,6 +183,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
|
|||
storeMsgIDsMu: sync.RWMutex{},
|
||||
timeSource: time.Now,
|
||||
logger: logger,
|
||||
discV5BootstrapNodes: cfg.DiscV5BootstrapNodes,
|
||||
}
|
||||
|
||||
// Disabling light client mode if using status.prod or undefined
|
||||
|
@ -181,7 +204,6 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
|
|||
waku.filterMsgChannel = make(chan *protocol.Envelope, 1024)
|
||||
|
||||
var privateKey *ecdsa.PrivateKey
|
||||
var err error
|
||||
if nodeKey != "" {
|
||||
privateKey, err = crypto.HexToECDSA(nodeKey)
|
||||
} else {
|
||||
|
@ -226,8 +248,10 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
|
|||
if cfg.EnableDiscV5 {
|
||||
bootnodes, err := waku.getDiscV5BootstrapNodes(ctx, cfg.DiscV5BootstrapNodes)
|
||||
if err != nil {
|
||||
logger.Error("failed to get bootstrap nodes", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
opts = append(opts, node.WithDiscoveryV5(cfg.UDPPort, bootnodes, cfg.AutoUpdate, pubsub.WithDiscoveryOpts(discovery.Limit(cfg.DiscoveryLimit))))
|
||||
|
||||
// Peer exchange requires DiscV5 to run (might change in future versions of the protocol)
|
||||
|
@ -276,7 +300,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
|
|||
}
|
||||
|
||||
if cfg.EnableDiscV5 {
|
||||
err := waku.node.DiscV5().Start()
|
||||
err := waku.node.DiscV5().Start(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -358,12 +382,16 @@ func (w *Waku) getDiscV5BootstrapNodes(ctx context.Context, addresses []string)
|
|||
}
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
w.seededBootnodesForDiscV5 = len(result) > 0
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
type fnApplyToEachPeer func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup)
|
||||
|
||||
func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnApplyToEachPeer) {
|
||||
w.logger.Info("retrieving nodes", zap.String("enr", enrtreeAddress))
|
||||
ctx, cancel := context.WithTimeout(ctx, requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
|
@ -372,15 +400,17 @@ func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnA
|
|||
|
||||
discNodes, ok := w.dnsAddressCache[enrtreeAddress]
|
||||
if !ok {
|
||||
// NOTE: Temporary fix for DNS resolution on android/ios, as gomobile does not support it
|
||||
// NOTE: Temporary fix for DNS resolution on android/ios, as gomobile does not support it
|
||||
discoveredNodes, err := dnsdisc.RetrieveNodes(ctx, enrtreeAddress, dnsdisc.WithNameserver("1.1.1.1"))
|
||||
if err != nil {
|
||||
w.logger.Warn("dns discovery error ", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
w.dnsAddressCache[enrtreeAddress] = append(w.dnsAddressCache[enrtreeAddress], discoveredNodes...)
|
||||
discNodes = w.dnsAddressCache[enrtreeAddress]
|
||||
if len(discoveredNodes) != 0 {
|
||||
w.dnsAddressCache[enrtreeAddress] = append(w.dnsAddressCache[enrtreeAddress], discoveredNodes...)
|
||||
discNodes = w.dnsAddressCache[enrtreeAddress]
|
||||
}
|
||||
}
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
|
@ -1093,6 +1123,7 @@ func (w *Waku) Start() error {
|
|||
}
|
||||
|
||||
go w.broadcast()
|
||||
go w.seedBootnodesForDiscV5()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -1104,6 +1135,7 @@ func (w *Waku) Stop() error {
|
|||
w.node.Stop()
|
||||
close(w.quit)
|
||||
close(w.filterMsgChannel)
|
||||
close(w.connectionChanged)
|
||||
w.wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
@ -1256,7 +1288,7 @@ func (w *Waku) StartDiscV5() error {
|
|||
return errors.New("discv5 is not setup")
|
||||
}
|
||||
|
||||
return w.node.DiscV5().Start()
|
||||
return w.node.DiscV5().Start(context.Background())
|
||||
}
|
||||
|
||||
func (w *Waku) StopDiscV5() error {
|
||||
|
@ -1268,6 +1300,88 @@ func (w *Waku) StopDiscV5() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (w *Waku) ConnectionChanged(state connection.State) {
|
||||
if !state.Offline && w.offline {
|
||||
select {
|
||||
case w.connectionChanged <- struct{}{}:
|
||||
}
|
||||
}
|
||||
|
||||
w.offline = !state.Offline
|
||||
}
|
||||
|
||||
// seedBootnodesForDiscV5 tries to fetch bootnodes
|
||||
// from an ENR periodically.
|
||||
// It backs off exponentially until maxRetries, at which point it restarts from 0
|
||||
// It also restarts if there's a connection change signalled from the client
|
||||
func (w *Waku) seedBootnodesForDiscV5() {
|
||||
ticker := time.NewTicker(200 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
var lastTry = time.Now().UnixNano() / int64(time.Millisecond)
|
||||
var retries = 0
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if w.seededBootnodesForDiscV5 {
|
||||
w.logger.Info("stopped querying bootnodes")
|
||||
return
|
||||
}
|
||||
now := time.Now().UnixNano() / int64(time.Millisecond)
|
||||
backoff := bootnodesQueryBackoffMs * int64(math.Exp2(float64(retries)))
|
||||
|
||||
if lastTry+backoff < now {
|
||||
err := w.restartDiscV5()
|
||||
if err != nil {
|
||||
w.logger.Warn("failed to restart discv5", zap.Error(err))
|
||||
}
|
||||
|
||||
lastTry = now
|
||||
retries++
|
||||
// We reset the retries after a while and restart
|
||||
if retries > bootnodesMaxRetries {
|
||||
retries = 0
|
||||
}
|
||||
|
||||
}
|
||||
// If we go online, trigger immediately
|
||||
case <-w.connectionChanged:
|
||||
now := time.Now().UnixNano() / int64(time.Millisecond)
|
||||
backoff := bootnodesQueryBackoffMs * int64(math.Exp2(float64(retries)))
|
||||
// check we haven't run too eagerly, in case connection
|
||||
// is flapping
|
||||
if lastTry+backoff < now {
|
||||
err := w.restartDiscV5()
|
||||
if err != nil {
|
||||
w.logger.Warn("failed to restart discv5", zap.Error(err))
|
||||
}
|
||||
|
||||
}
|
||||
retries = 0
|
||||
lastTry = now
|
||||
|
||||
case <-w.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Restart discv5, re-retrieving bootstrap nodes
|
||||
func (w *Waku) restartDiscV5() error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
bootnodes, err := w.getDiscV5BootstrapNodes(ctx, w.discV5BootstrapNodes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(bootnodes) == 0 {
|
||||
return errors.New("failed to fetch bootnodes")
|
||||
}
|
||||
|
||||
w.logger.Info("restarting discv5 with nodes", zap.Any("nodes", bootnodes))
|
||||
return w.node.SetDiscV5Bootnodes(bootnodes)
|
||||
}
|
||||
|
||||
func (w *Waku) AddStorePeer(address string) (string, error) {
|
||||
addr, err := multiaddr.NewMultiaddr(address)
|
||||
if err != nil {
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
package wakuv2
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff/v3"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/status-im/status-go/protocol/tt"
|
||||
)
|
||||
|
||||
var testENRBootstrap = "enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@prod.nodes.status.im"
|
||||
|
||||
func TestDiscoveryV5(t *testing.T) {
|
||||
config := &Config{}
|
||||
config.EnableDiscV5 = true
|
||||
config.DiscV5BootstrapNodes = []string{testENRBootstrap}
|
||||
config.DiscoveryLimit = 20
|
||||
config.UDPPort = 9001
|
||||
w, err := New("", "", config, nil, nil, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, w.Start())
|
||||
|
||||
err = tt.RetryWithBackOff(func() error {
|
||||
if len(w.Peers()) == 0 {
|
||||
return errors.New("no peers discovered")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NotEqual(t, 0, len(w.Peers()))
|
||||
require.NoError(t, w.Stop())
|
||||
}
|
||||
|
||||
func TestRestartDiscoveryV5(t *testing.T) {
|
||||
config := &Config{}
|
||||
config.EnableDiscV5 = true
|
||||
// Use wrong discv5 bootstrap address, to simulate being offline
|
||||
config.DiscV5BootstrapNodes = []string{"enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@1.1.1.2"}
|
||||
config.DiscoveryLimit = 20
|
||||
config.UDPPort = 9002
|
||||
w, err := New("", "", config, nil, nil, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, w.Start())
|
||||
|
||||
require.False(t, w.seededBootnodesForDiscV5)
|
||||
|
||||
options := func(b *backoff.ExponentialBackOff) {
|
||||
b.MaxElapsedTime = 2 * time.Second
|
||||
}
|
||||
|
||||
// Sanity check, not great, but it's probably helpful
|
||||
err = tt.RetryWithBackOff(func() error {
|
||||
if len(w.Peers()) == 0 {
|
||||
return errors.New("no peers discovered")
|
||||
}
|
||||
return nil
|
||||
}, options)
|
||||
|
||||
require.Error(t, err)
|
||||
|
||||
w.discV5BootstrapNodes = []string{testENRBootstrap}
|
||||
|
||||
options = func(b *backoff.ExponentialBackOff) {
|
||||
b.MaxElapsedTime = 30 * time.Second
|
||||
}
|
||||
|
||||
err = tt.RetryWithBackOff(func() error {
|
||||
if len(w.Peers()) == 0 {
|
||||
return errors.New("no peers discovered")
|
||||
}
|
||||
return nil
|
||||
}, options)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.True(t, w.seededBootnodesForDiscV5)
|
||||
require.NotEqual(t, 0, len(w.Peers()))
|
||||
require.NoError(t, w.Stop())
|
||||
}
|
Loading…
Reference in New Issue