chore(waku2): bump go-waku to notice disconnection from peers faster

This commit is contained in:
Richard Ramos 2023-01-26 16:31:27 -04:00 committed by RichΛrd
parent dad7221f36
commit cccea5f14b
11 changed files with 79 additions and 51 deletions

2
go.mod
View File

@ -77,7 +77,7 @@ require (
github.com/gorilla/sessions v1.2.1
github.com/meirf/gopart v0.0.0-20180520194036-37e9492a85a8
github.com/rmg/iso4217 v1.0.0
github.com/waku-org/go-waku v0.3.2-0.20230117214048-e0ccdbe9665d
github.com/waku-org/go-waku v0.4.1-0.20230127175953-d4473e9c46e4
)
require (

4
go.sum
View File

@ -2067,8 +2067,8 @@ github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 h1:xwY0kW5XZFimdqfZb9cZwT1S3VJP9j3AE6bdNd9boXM=
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw=
github.com/waku-org/go-waku v0.3.2-0.20230117214048-e0ccdbe9665d h1:W+I90Y9avF506cmqr4jZX+HeKf5+MY8SMQRZhMR/k9g=
github.com/waku-org/go-waku v0.3.2-0.20230117214048-e0ccdbe9665d/go.mod h1:sI14mN/sM8inIb2x2b462wydSEFyOyuDKI1cjiVIIpM=
github.com/waku-org/go-waku v0.4.1-0.20230127175953-d4473e9c46e4 h1:6WS5u6VRNc1SC5ORC5IdepUeo71JgaLoDcvMlDK7Jx8=
github.com/waku-org/go-waku v0.4.1-0.20230127175953-d4473e9c46e4/go.mod h1:sI14mN/sM8inIb2x2b462wydSEFyOyuDKI1cjiVIIpM=
github.com/waku-org/go-zerokit-rln v0.1.7-wakuorg h1:2vVIBCtBih2w1K9ll8YnToTDZvbxcgbsClsPlJS/kkg=
github.com/waku-org/go-zerokit-rln v0.1.7-wakuorg/go.mod h1:GlyaVeEWNEBxVJrWC6jFTvb4LNb9d9qnjdS6EiWVUvk=
github.com/wealdtech/go-ens/v3 v3.5.0 h1:Huc9GxBgiGweCOGTYomvsg07K2QggAqZpZ5SuiZdC8o=

View File

@ -4,8 +4,10 @@ import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"net"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
@ -248,11 +250,10 @@ func (d *DiscoveryV5) Iterator() (enode.Iterator, error) {
return enode.Filter(iterator, evaluateNode), nil
}
func (d *DiscoveryV5) iterate(ctx context.Context) {
func (d *DiscoveryV5) iterate(ctx context.Context) error {
iterator, err := d.Iterator()
if err != nil {
d.log.Debug("obtaining iterator", zap.Error(err))
return
return fmt.Errorf("obtaining iterator: %w", err)
}
defer iterator.Close()
@ -282,11 +283,13 @@ func (d *DiscoveryV5) iterate(ctx context.Context) {
if len(peerAddrs) != 0 {
select {
case <-ctx.Done():
return
return nil
case d.peerConnector.PeerChannel() <- peerAddrs[0]:
}
}
}
return nil
}
func (d *DiscoveryV5) runDiscoveryV5Loop(ctx context.Context) {
@ -299,10 +302,11 @@ restartLoop:
for {
select {
case <-ch:
if d.listener == nil {
break
err := d.iterate(ctx)
if err != nil {
d.log.Debug("iterating discv5", zap.Error(err))
time.Sleep(2 * time.Second)
}
d.iterate(ctx)
ch <- struct{}{}
case <-ctx.Done():
close(ch)

View File

@ -2,6 +2,7 @@ package node
import (
"context"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/host"
@ -50,12 +51,15 @@ func (w *WakuNode) startKeepAlive(ctx context.Context, t time.Duration) {
// Network's peers collection,
// contains only currently active peers
for _, p := range w.host.Network().Peers() {
pingWg := sync.WaitGroup{}
peersToPing := w.host.Network().Peers()
pingWg.Add(len(peersToPing))
for _, p := range peersToPing {
if p != w.host.ID() {
w.wg.Add(1)
go w.pingPeer(ctx, p)
go w.pingPeer(ctx, &pingWg, p)
}
}
pingWg.Wait()
lastTimeExecuted = w.timesource.Now()
case <-ctx.Done():
@ -65,10 +69,8 @@ func (w *WakuNode) startKeepAlive(ctx context.Context, t time.Duration) {
}
}
func (w *WakuNode) pingPeer(ctx context.Context, peer peer.ID) {
w.keepAliveMutex.Lock()
defer w.keepAliveMutex.Unlock()
defer w.wg.Done()
func (w *WakuNode) pingPeer(ctx context.Context, wg *sync.WaitGroup, peer peer.ID) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, 7*time.Second)
defer cancel()
@ -79,16 +81,21 @@ func (w *WakuNode) pingPeer(ctx context.Context, peer peer.ID) {
select {
case res := <-pr:
if res.Error != nil {
w.keepAliveMutex.Lock()
w.keepAliveFails[peer]++
w.keepAliveMutex.Unlock()
logger.Debug("could not ping", zap.Error(res.Error))
} else {
delete(w.keepAliveFails, peer)
}
case <-ctx.Done():
w.keepAliveMutex.Lock()
w.keepAliveFails[peer]++
w.keepAliveMutex.Unlock()
logger.Debug("could not ping (context done)", zap.Error(ctx.Err()))
}
w.keepAliveMutex.Lock()
if w.keepAliveFails[peer] > maxAllowedPingFailures && w.host.Network().Connectedness(peer) == network.Connected {
logger.Info("disconnecting peer")
if err := w.host.Network().ClosePeer(peer); err != nil {
@ -96,4 +103,5 @@ func (w *WakuNode) pingPeer(ctx context.Context, peer peer.ID) {
}
w.keepAliveFails[peer] = 0
}
w.keepAliveMutex.Unlock()
}

View File

@ -308,12 +308,14 @@ func (w *WakuNode) Start(ctx context.Context) error {
return err
}
sub, err := w.Relay().Subscribe(ctx)
if err != nil {
return err
}
if !w.opts.noDefaultWakuTopic {
sub, err := w.Relay().Subscribe(ctx)
if err != nil {
return err
}
w.Broadcaster().Unregister(&relay.DefaultWakuTopic, sub.C)
w.Broadcaster().Unregister(&relay.DefaultWakuTopic, sub.C)
}
}
w.store = w.storeFactory(w)
@ -573,17 +575,19 @@ func (w *WakuNode) startStore(ctx context.Context) error {
peerIDs = append(peerIDs, pID)
}
w.wg.Add(1)
go func() {
defer w.wg.Done()
if !w.opts.noDefaultWakuTopic {
w.wg.Add(1)
go func() {
defer w.wg.Done()
ctxWithTimeout, ctxCancel := context.WithTimeout(ctx, 20*time.Second)
defer ctxCancel()
if _, err := w.store.(store.Store).Resume(ctxWithTimeout, string(relay.DefaultWakuTopic), peerIDs); err != nil {
w.log.Error("Could not resume history", zap.Error(err))
time.Sleep(10 * time.Second)
}
}()
ctxWithTimeout, ctxCancel := context.WithTimeout(ctx, 20*time.Second)
defer ctxCancel()
if _, err := w.store.(store.Store).Resume(ctxWithTimeout, string(relay.DefaultWakuTopic), peerIDs); err != nil {
w.log.Error("Could not resume history", zap.Error(err))
time.Sleep(10 * time.Second)
}
}()
}
}
return nil
}

View File

@ -57,11 +57,12 @@ type WakuNodeParameters struct {
logger *zap.Logger
enableRelay bool
enableFilter bool
isFilterFullNode bool
filterOpts []filter.Option
wOpts []pubsub.Option
noDefaultWakuTopic bool
enableRelay bool
enableFilter bool
isFilterFullNode bool
filterOpts []filter.Option
wOpts []pubsub.Option
minRelayPeersToPublish int
@ -266,6 +267,15 @@ func WithLibP2POptions(opts ...libp2p.Option) WakuNodeOption {
}
}
// NoDefaultWakuTopic will stop the node from subscribing to the default
// pubsub topic automatically
func NoDefaultWakuTopic() WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.noDefaultWakuTopic = true
return nil
}
}
// WithWakuRelay enables the Waku V2 Relay protocol. This WakuNodeOption
// accepts a list of WakuRelay gossipsub option to setup the protocol
func WithWakuRelay(opts ...pubsub.Option) WakuNodeOption {

View File

@ -5,6 +5,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"math"
"math/rand"
"sync"
@ -311,11 +312,10 @@ func (wakuPX *WakuPeerExchange) cleanCache() {
wakuPX.enrCache = r
}
func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) {
func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) error {
iterator, err := wakuPX.disc.Iterator()
if err != nil {
wakuPX.log.Debug("obtaining iterator", zap.Error(err))
return
return fmt.Errorf("obtaining iterator: %w", err)
}
defer iterator.Close()
@ -346,6 +346,8 @@ func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) {
}
wakuPX.enrCacheMutex.Unlock()
}
return nil
}
func (wakuPX *WakuPeerExchange) runPeerExchangeDiscv5Loop(ctx context.Context) {
@ -367,7 +369,11 @@ restartLoop:
for {
select {
case <-ch:
wakuPX.iterate(ctx)
err := wakuPX.iterate(ctx)
if err != nil {
wakuPX.log.Debug("iterating peer exchange", zap.Error(err))
time.Sleep(2 * time.Second)
}
ch <- struct{}{}
case <-ticker.C:
wakuPX.cleanCache()

View File

@ -206,9 +206,10 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec
}
if historyResponseRPC.Response == nil {
logger.Error("empty response")
metrics.RecordStoreError(store.ctx, "emptyRpcResponseFailure")
return nil, ErrEmptyResponse
// Empty response
return &pb.HistoryResponse{
PagingInfo: &pb.PagingInfo{},
}, nil
}
metrics.RecordMessage(ctx, "retrieved", len(historyResponseRPC.Response.Messages))

View File

@ -41,8 +41,6 @@ var (
ErrFailedQuery = errors.New("failed to resolve the query")
ErrFutureMessage = errors.New("message timestamp in the future")
ErrEmptyResponse = errors.New("empty store response")
)
type WakuSwap interface {

2
vendor/modules.txt vendored
View File

@ -985,7 +985,7 @@ github.com/vacp2p/mvds/transport
github.com/waku-org/go-discover/discover
github.com/waku-org/go-discover/discover/v4wire
github.com/waku-org/go-discover/discover/v5wire
# github.com/waku-org/go-waku v0.3.2-0.20230117214048-e0ccdbe9665d
# github.com/waku-org/go-waku v0.4.1-0.20230127175953-d4473e9c46e4
## explicit; go 1.18
github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/waku/persistence

View File

@ -1137,10 +1137,7 @@ func (w *Waku) Query(ctx context.Context, peerID peer.ID, topics []common.TopicT
}
result, err := w.node.Store().Query(ctx, query, opts...)
if err != nil && errors.Is(err, store.ErrEmptyResponse) {
// No messages
return nil, nil
} else if err != nil {
if err != nil {
w.logger.Error("error querying storenode", zap.String("requestID", hexutil.Encode(requestID)), zap.String("peerID", peerID.String()), zap.Error(err))
signal.SendHistoricMessagesRequestFailed(requestID, peerID, err)
return nil, err