chore_: bump go-waku
This commit is contained in:
parent
d794e43347
commit
0e02c21a80
2
go.mod
2
go.mod
|
@ -95,7 +95,7 @@ require (
|
||||||
github.com/schollz/peerdiscovery v1.7.0
|
github.com/schollz/peerdiscovery v1.7.0
|
||||||
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
|
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
|
||||||
github.com/urfave/cli/v2 v2.27.2
|
github.com/urfave/cli/v2 v2.27.2
|
||||||
github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da
|
github.com/waku-org/go-waku v0.8.1-0.20240923214107-798c9c5d819a
|
||||||
github.com/wk8/go-ordered-map/v2 v2.1.7
|
github.com/wk8/go-ordered-map/v2 v2.1.7
|
||||||
github.com/yeqown/go-qrcode/v2 v2.2.1
|
github.com/yeqown/go-qrcode/v2 v2.2.1
|
||||||
github.com/yeqown/go-qrcode/writer/standard v1.2.1
|
github.com/yeqown/go-qrcode/writer/standard v1.2.1
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -2136,8 +2136,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E
|
||||||
github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ=
|
github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ=
|
||||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
|
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
|
||||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
|
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
|
||||||
github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da h1:bkAJVlJL4Ba83frABWjI9p9MeLGmEHuD/QcjYu3HNbQ=
|
github.com/waku-org/go-waku v0.8.1-0.20240923214107-798c9c5d819a h1:aPT10FgDIUdsnAqy9y5Vzng/dqcr2Qyz1sXOyB7T6ik=
|
||||||
github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg=
|
github.com/waku-org/go-waku v0.8.1-0.20240923214107-798c9c5d819a/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg=
|
||||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
|
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
|
||||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
|
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
|
||||||
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=
|
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=
|
||||||
|
|
|
@ -27,6 +27,8 @@ func (fc FilterConfig) String() string {
|
||||||
return string(jsonStr)
|
return string(jsonStr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const filterSubLoopInterval = 5 * time.Second
|
||||||
|
|
||||||
type Sub struct {
|
type Sub struct {
|
||||||
ContentFilter protocol.ContentFilter
|
ContentFilter protocol.ContentFilter
|
||||||
DataCh chan *protocol.Envelope
|
DataCh chan *protocol.Envelope
|
||||||
|
@ -69,13 +71,7 @@ func defaultOptions() []SubscribeOptions {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscribe
|
// Subscribe
|
||||||
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, opts ...SubscribeOptions) (*Sub, error) {
|
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, params *subscribeParameters) (*Sub, error) {
|
||||||
optList := append(defaultOptions(), opts...)
|
|
||||||
params := new(subscribeParameters)
|
|
||||||
for _, opt := range optList {
|
|
||||||
opt(params)
|
|
||||||
}
|
|
||||||
|
|
||||||
sub := new(Sub)
|
sub := new(Sub)
|
||||||
sub.id = uuid.NewString()
|
sub.id = uuid.NewString()
|
||||||
sub.wf = wf
|
sub.wf = wf
|
||||||
|
@ -95,8 +91,9 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte
|
||||||
sub.multiplex(subs)
|
sub.multiplex(subs)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// filter subscription loop is to check if target subscriptions for a filter are active and if not
|
||||||
go sub.subscriptionLoop(params.batchInterval)
|
// trigger resubscribe.
|
||||||
|
go sub.subscriptionLoop(filterSubLoopInterval)
|
||||||
return sub, nil
|
return sub, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,7 +31,7 @@ type appFilterMap map[string]filterConfig
|
||||||
type FilterManager struct {
|
type FilterManager struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
opts []SubscribeOptions
|
params *subscribeParameters
|
||||||
minPeersPerFilter int
|
minPeersPerFilter int
|
||||||
onlineChecker *onlinechecker.DefaultOnlineChecker
|
onlineChecker *onlinechecker.DefaultOnlineChecker
|
||||||
filterSubscriptions map[string]SubDetails // map of aggregated filters to apiSub details
|
filterSubscriptions map[string]SubDetails // map of aggregated filters to apiSub details
|
||||||
|
@ -64,7 +64,6 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter
|
||||||
// This fn is being mocked in test
|
// This fn is being mocked in test
|
||||||
mgr := new(FilterManager)
|
mgr := new(FilterManager)
|
||||||
mgr.ctx = ctx
|
mgr.ctx = ctx
|
||||||
mgr.opts = opts
|
|
||||||
mgr.logger = logger
|
mgr.logger = logger
|
||||||
mgr.minPeersPerFilter = minPeersPerFilter
|
mgr.minPeersPerFilter = minPeersPerFilter
|
||||||
mgr.envProcessor = envProcessor
|
mgr.envProcessor = envProcessor
|
||||||
|
@ -72,10 +71,17 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter
|
||||||
mgr.node = node
|
mgr.node = node
|
||||||
mgr.onlineChecker = onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker)
|
mgr.onlineChecker = onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker)
|
||||||
mgr.node.SetOnlineChecker(mgr.onlineChecker)
|
mgr.node.SetOnlineChecker(mgr.onlineChecker)
|
||||||
mgr.filterSubBatchDuration = 5 * time.Second
|
|
||||||
mgr.incompleteFilterBatch = make(map[string]filterConfig)
|
mgr.incompleteFilterBatch = make(map[string]filterConfig)
|
||||||
mgr.filterConfigs = make(appFilterMap)
|
mgr.filterConfigs = make(appFilterMap)
|
||||||
mgr.waitingToSubQueue = make(chan filterConfig, 100)
|
mgr.waitingToSubQueue = make(chan filterConfig, 100)
|
||||||
|
|
||||||
|
//parsing the subscribe params only to read the batchInterval passed.
|
||||||
|
mgr.params = new(subscribeParameters)
|
||||||
|
opts = append(defaultOptions(), opts...)
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt(mgr.params)
|
||||||
|
}
|
||||||
|
mgr.filterSubBatchDuration = mgr.params.batchInterval
|
||||||
go mgr.startFilterSubLoop()
|
go mgr.startFilterSubLoop()
|
||||||
return mgr
|
return mgr
|
||||||
}
|
}
|
||||||
|
@ -153,7 +159,7 @@ func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFi
|
||||||
func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
|
func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
|
||||||
ctx, cancel := context.WithCancel(mgr.ctx)
|
ctx, cancel := context.WithCancel(mgr.ctx)
|
||||||
config := FilterConfig{MaxPeers: mgr.minPeersPerFilter}
|
config := FilterConfig{MaxPeers: mgr.minPeersPerFilter}
|
||||||
sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.opts...)
|
sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.params)
|
||||||
mgr.Lock()
|
mgr.Lock()
|
||||||
mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub}
|
mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub}
|
||||||
mgr.Unlock()
|
mgr.Unlock()
|
||||||
|
|
|
@ -292,7 +292,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
|
||||||
w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.onlineChecker, w.opts.prometheusReg, w.log)
|
w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.onlineChecker, w.opts.prometheusReg, w.log)
|
||||||
w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log, w.opts.lightpushOpts...)
|
w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log, w.opts.lightpushOpts...)
|
||||||
|
|
||||||
w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log)
|
w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log, w.opts.storeRateLimit)
|
||||||
|
|
||||||
if params.storeFactory != nil {
|
if params.storeFactory != nil {
|
||||||
w.storeFactory = params.storeFactory
|
w.storeFactory = params.storeFactory
|
||||||
|
@ -752,7 +752,9 @@ func (w *WakuNode) DialPeerWithInfo(ctx context.Context, peerInfo peer.AddrInfo)
|
||||||
func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error {
|
func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error {
|
||||||
err := w.host.Connect(ctx, info)
|
err := w.host.Connect(ctx, info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(info.ID)
|
if w.peermanager != nil {
|
||||||
|
w.peermanager.HandleDialError(err, info.ID)
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -38,6 +38,7 @@ import (
|
||||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"go.uber.org/zap/zapcore"
|
"go.uber.org/zap/zapcore"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Default UserAgent
|
// Default UserAgent
|
||||||
|
@ -94,6 +95,8 @@ type WakuNodeParameters struct {
|
||||||
enableStore bool
|
enableStore bool
|
||||||
messageProvider legacy_store.MessageProvider
|
messageProvider legacy_store.MessageProvider
|
||||||
|
|
||||||
|
storeRateLimit rate.Limit
|
||||||
|
|
||||||
enableRendezvousPoint bool
|
enableRendezvousPoint bool
|
||||||
rendezvousDB *rendezvous.DB
|
rendezvousDB *rendezvous.DB
|
||||||
|
|
||||||
|
@ -139,6 +142,7 @@ var DefaultWakuNodeOptions = []WakuNodeOption{
|
||||||
WithCircuitRelayParams(2*time.Second, 3*time.Minute),
|
WithCircuitRelayParams(2*time.Second, 3*time.Minute),
|
||||||
WithPeerStoreCapacity(DefaultMaxPeerStoreCapacity),
|
WithPeerStoreCapacity(DefaultMaxPeerStoreCapacity),
|
||||||
WithOnlineChecker(onlinechecker.NewDefaultOnlineChecker(true)),
|
WithOnlineChecker(onlinechecker.NewDefaultOnlineChecker(true)),
|
||||||
|
WithWakuStoreRateLimit(8), // Value currently set in status.staging
|
||||||
}
|
}
|
||||||
|
|
||||||
// MultiAddresses return the list of multiaddresses configured in the node
|
// MultiAddresses return the list of multiaddresses configured in the node
|
||||||
|
@ -458,6 +462,16 @@ func WithWakuFilterFullNode(filterOpts ...filter.Option) WakuNodeOption {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithWakuStoreRateLimit is used to set a default rate limit on which storenodes will
|
||||||
|
// be sent per peerID to avoid running into a TOO_MANY_REQUESTS (429) error when consuming
|
||||||
|
// the store protocol from a storenode
|
||||||
|
func WithWakuStoreRateLimit(value rate.Limit) WakuNodeOption {
|
||||||
|
return func(params *WakuNodeParameters) error {
|
||||||
|
params.storeRateLimit = value
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithWakuStore enables the Waku V2 Store protocol and if the messages should
|
// WithWakuStore enables the Waku V2 Store protocol and if the messages should
|
||||||
// be stored or not in a message provider.
|
// be stored or not in a message provider.
|
||||||
func WithWakuStore() WakuNodeOption {
|
func WithWakuStore() WakuNodeOption {
|
||||||
|
|
|
@ -4,7 +4,6 @@ package peermanager
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
@ -277,11 +276,10 @@ func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) {
|
||||||
ctx, cancel := context.WithTimeout(c.Context(), c.dialTimeout)
|
ctx, cancel := context.WithTimeout(c.Context(), c.dialTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
err := c.host.Connect(ctx, pi)
|
err := c.host.Connect(ctx, pi)
|
||||||
if err != nil && !errors.Is(err, context.Canceled) {
|
if err != nil {
|
||||||
c.addConnectionBackoff(pi.ID)
|
c.pm.HandleDialError(err, pi.ID)
|
||||||
c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi.ID)
|
} else {
|
||||||
c.logger.Warn("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err))
|
c.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(pi.ID)
|
||||||
}
|
}
|
||||||
c.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(pi.ID)
|
|
||||||
<-sem
|
<-sem
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import (
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/metadata"
|
"github.com/waku-org/go-waku/waku/v2/protocol/metadata"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||||
"github.com/waku-org/go-waku/waku/v2/service"
|
"github.com/waku-org/go-waku/waku/v2/service"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -87,6 +88,7 @@ type PeerManager struct {
|
||||||
TopicHealthNotifCh chan<- TopicHealthStatus
|
TopicHealthNotifCh chan<- TopicHealthStatus
|
||||||
rttCache *FastestPeerSelector
|
rttCache *FastestPeerSelector
|
||||||
RelayEnabled bool
|
RelayEnabled bool
|
||||||
|
evtDialError event.Emitter
|
||||||
}
|
}
|
||||||
|
|
||||||
// PeerSelection provides various options based on which Peer is selected from a list of peers.
|
// PeerSelection provides various options based on which Peer is selected from a list of peers.
|
||||||
|
@ -249,6 +251,14 @@ func (pm *PeerManager) Start(ctx context.Context) {
|
||||||
go pm.connectivityLoop(ctx)
|
go pm.connectivityLoop(ctx)
|
||||||
}
|
}
|
||||||
go pm.peerStoreLoop(ctx)
|
go pm.peerStoreLoop(ctx)
|
||||||
|
|
||||||
|
if pm.host != nil {
|
||||||
|
var err error
|
||||||
|
pm.evtDialError, err = pm.host.EventBus().Emitter(new(utils.DialError))
|
||||||
|
if err != nil {
|
||||||
|
pm.logger.Error("failed to create dial error emitter", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pm *PeerManager) peerStoreLoop(ctx context.Context) {
|
func (pm *PeerManager) peerStoreLoop(ctx context.Context) {
|
||||||
|
@ -719,3 +729,22 @@ func (pm *PeerManager) addPeerToServiceSlot(proto protocol.ID, peerID peer.ID) {
|
||||||
// getPeers returns nil for WakuRelayIDv200 protocol, but we don't run this ServiceSlot code for WakuRelayIDv200 protocol
|
// getPeers returns nil for WakuRelayIDv200 protocol, but we don't run this ServiceSlot code for WakuRelayIDv200 protocol
|
||||||
pm.serviceSlots.getPeers(proto).add(peerID)
|
pm.serviceSlots.getPeers(proto).add(peerID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) {
|
||||||
|
if err == nil || errors.Is(err, context.Canceled) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if pm.peerConnector != nil {
|
||||||
|
pm.peerConnector.addConnectionBackoff(peerID)
|
||||||
|
}
|
||||||
|
if pm.host != nil {
|
||||||
|
pm.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(peerID)
|
||||||
|
}
|
||||||
|
pm.logger.Warn("connecting to peer", logging.HostID("peerID", peerID), zap.Error(err))
|
||||||
|
if pm.evtDialError != nil {
|
||||||
|
emitterErr := pm.evtDialError.Emit(utils.DialError{Err: err, PeerID: peerID})
|
||||||
|
if emitterErr != nil {
|
||||||
|
pm.logger.Error("failed to emit DialError", zap.Error(emitterErr))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -245,8 +245,8 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
|
||||||
stream, err := wf.h.NewStream(ctx, peerID, FilterSubscribeID_v20beta1)
|
stream, err := wf.h.NewStream(ctx, peerID, FilterSubscribeID_v20beta1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
wf.metrics.RecordError(dialFailure)
|
wf.metrics.RecordError(dialFailure)
|
||||||
if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok {
|
if wf.pm != nil {
|
||||||
ps.AddConnFailure(peerID)
|
wf.pm.HandleDialError(err, peerID)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,7 +14,7 @@ import (
|
||||||
"github.com/libp2p/go-msgio/pbio"
|
"github.com/libp2p/go-msgio/pbio"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/waku-org/go-waku/logging"
|
"github.com/waku-org/go-waku/logging"
|
||||||
"github.com/waku-org/go-waku/waku/v2/peerstore"
|
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||||
|
@ -38,6 +38,7 @@ type (
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
*service.CommonService
|
*service.CommonService
|
||||||
subscriptions *SubscribersMap
|
subscriptions *SubscribersMap
|
||||||
|
pm *peermanager.PeerManager
|
||||||
|
|
||||||
maxSubscriptions int
|
maxSubscriptions int
|
||||||
}
|
}
|
||||||
|
@ -61,6 +62,7 @@ func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Regi
|
||||||
wf.maxSubscriptions = params.MaxSubscribers
|
wf.maxSubscriptions = params.MaxSubscribers
|
||||||
if params.pm != nil {
|
if params.pm != nil {
|
||||||
params.pm.RegisterWakuProtocol(FilterSubscribeID_v20beta1, FilterSubscribeENRField)
|
params.pm.RegisterWakuProtocol(FilterSubscribeID_v20beta1, FilterSubscribeENRField)
|
||||||
|
wf.pm = params.pm
|
||||||
}
|
}
|
||||||
return wf
|
return wf
|
||||||
}
|
}
|
||||||
|
@ -274,8 +276,8 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logge
|
||||||
wf.metrics.RecordError(pushTimeoutFailure)
|
wf.metrics.RecordError(pushTimeoutFailure)
|
||||||
} else {
|
} else {
|
||||||
wf.metrics.RecordError(dialFailure)
|
wf.metrics.RecordError(dialFailure)
|
||||||
if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok {
|
if wf.pm != nil {
|
||||||
ps.AddConnFailure(peerID)
|
wf.pm.HandleDialError(err, peerID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
logger.Error("opening peer stream", zap.Error(err))
|
logger.Error("opening peer stream", zap.Error(err))
|
||||||
|
|
|
@ -205,10 +205,9 @@ func (store *WakuStore) queryFrom(ctx context.Context, historyRequest *pb.Histor
|
||||||
|
|
||||||
stream, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4)
|
stream, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("creating stream to peer", zap.Error(err))
|
|
||||||
store.metrics.RecordError(dialFailure)
|
store.metrics.RecordError(dialFailure)
|
||||||
if ps, ok := store.h.Peerstore().(peerstore.WakuPeerstore); ok {
|
if store.pm != nil {
|
||||||
ps.AddConnFailure(selectedPeer)
|
store.pm.HandleDialError(err, selectedPeer)
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
5
vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go
generated
vendored
5
vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go
generated
vendored
|
@ -195,10 +195,9 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p
|
||||||
|
|
||||||
stream, err := wakuLP.h.NewStream(ctx, peerID, LightPushID_v20beta1)
|
stream, err := wakuLP.h.NewStream(ctx, peerID, LightPushID_v20beta1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("creating stream to peer", zap.Error(err))
|
|
||||||
wakuLP.metrics.RecordError(dialFailure)
|
wakuLP.metrics.RecordError(dialFailure)
|
||||||
if ps, ok := wakuLP.h.Peerstore().(peerstore.WakuPeerstore); ok {
|
if wakuLP.pm != nil {
|
||||||
ps.AddConnFailure(peerID)
|
wakuLP.pm.HandleDialError(err, peerID)
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,8 +76,8 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
|
||||||
|
|
||||||
stream, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1)
|
stream, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if ps, ok := wakuPX.h.Peerstore().(peerstore.WakuPeerstore); ok {
|
if wakuPX.pm != nil {
|
||||||
ps.AddConnFailure(params.selectedPeer)
|
wakuPX.pm.HandleDialError(err, params.selectedPeer)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -123,13 +123,11 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb
|
||||||
}
|
}
|
||||||
|
|
||||||
if params.clusterID != 0 {
|
if params.clusterID != 0 {
|
||||||
wakuPX.log.Debug("clusterID is non zero, filtering by shard")
|
|
||||||
rs, err := wenr.RelaySharding(enrRecord)
|
rs, err := wenr.RelaySharding(enrRecord)
|
||||||
if err != nil || rs == nil || !rs.Contains(uint16(params.clusterID), uint16(params.shard)) {
|
if err != nil || rs == nil || !rs.Contains(uint16(params.clusterID), uint16(params.shard)) {
|
||||||
wakuPX.log.Debug("peer doesn't matches filter", zap.Int("shard", params.shard))
|
wakuPX.log.Debug("peer doesn't matches filter", zap.Int("shard", params.shard))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
wakuPX.log.Debug("peer matches filter", zap.Int("shard", params.shard))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
enodeRecord, err := enode.New(enode.ValidSchemes, enrRecord)
|
enodeRecord, err := enode.New(enode.ValidSchemes, enrRecord)
|
||||||
|
|
|
@ -13,6 +13,10 @@ import (
|
||||||
|
|
||||||
var DefaultRelaySubscriptionBufferSize int = 1024
|
var DefaultRelaySubscriptionBufferSize int = 1024
|
||||||
|
|
||||||
|
// trying to match value here https://github.com/vacp2p/nim-libp2p/pull/1077
|
||||||
|
// note that nim-libp2p has 2 peer queues 1 for priority and other non-priority, whereas go-libp2p seems to have single peer-queue
|
||||||
|
var DefaultPeerOutboundQSize int = 1024
|
||||||
|
|
||||||
type RelaySubscribeParameters struct {
|
type RelaySubscribeParameters struct {
|
||||||
dontConsume bool
|
dontConsume bool
|
||||||
cacheSize uint
|
cacheSize uint
|
||||||
|
@ -109,6 +113,7 @@ func (w *WakuRelay) defaultPubsubOptions() []pubsub.Option {
|
||||||
pubsub.WithSeenMessagesTTL(2 * time.Minute),
|
pubsub.WithSeenMessagesTTL(2 * time.Minute),
|
||||||
pubsub.WithPeerScore(w.peerScoreParams, w.peerScoreThresholds),
|
pubsub.WithPeerScore(w.peerScoreParams, w.peerScoreThresholds),
|
||||||
pubsub.WithPeerScoreInspect(w.peerScoreInspector, 6*time.Second),
|
pubsub.WithPeerScoreInspect(w.peerScoreInspector, 6*time.Second),
|
||||||
|
pubsub.WithPeerOutboundQueueSize(DefaultPeerOutboundQSize),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@ import (
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -69,14 +70,19 @@ type WakuStore struct {
|
||||||
timesource timesource.Timesource
|
timesource timesource.Timesource
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
pm *peermanager.PeerManager
|
pm *peermanager.PeerManager
|
||||||
|
|
||||||
|
defaultRatelimit rate.Limit
|
||||||
|
rateLimiters map[peer.ID]*rate.Limiter
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWakuStore is used to instantiate a StoreV3 client
|
// NewWakuStore is used to instantiate a StoreV3 client
|
||||||
func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource, log *zap.Logger) *WakuStore {
|
func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource, log *zap.Logger, defaultRatelimit rate.Limit) *WakuStore {
|
||||||
s := new(WakuStore)
|
s := new(WakuStore)
|
||||||
s.log = log.Named("store-client")
|
s.log = log.Named("store-client")
|
||||||
s.timesource = timesource
|
s.timesource = timesource
|
||||||
s.pm = pm
|
s.pm = pm
|
||||||
|
s.defaultRatelimit = defaultRatelimit
|
||||||
|
s.rateLimiters = make(map[peer.ID]*rate.Limiter)
|
||||||
|
|
||||||
if pm != nil {
|
if pm != nil {
|
||||||
pm.RegisterWakuProtocol(StoreQueryID_v300, StoreENRField)
|
pm.RegisterWakuProtocol(StoreQueryID_v300, StoreENRField)
|
||||||
|
@ -171,7 +177,7 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
response, err := s.queryFrom(ctx, storeRequest, params.selectedPeer)
|
response, err := s.queryFrom(ctx, storeRequest, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -211,7 +217,7 @@ func (s *WakuStore) Exists(ctx context.Context, messageHash wpb.MessageHash, opt
|
||||||
return len(result.messages) != 0, nil
|
return len(result.messages) != 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) {
|
func (s *WakuStore) next(ctx context.Context, r *Result, opts ...RequestOption) (*Result, error) {
|
||||||
if r.IsComplete() {
|
if r.IsComplete() {
|
||||||
return &Result{
|
return &Result{
|
||||||
store: s,
|
store: s,
|
||||||
|
@ -223,11 +229,22 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
params := new(Parameters)
|
||||||
|
params.selectedPeer = r.PeerID()
|
||||||
|
optList := DefaultOptions()
|
||||||
|
optList = append(optList, opts...)
|
||||||
|
for _, opt := range optList {
|
||||||
|
err := opt(params)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
storeRequest := proto.Clone(r.storeRequest).(*pb.StoreQueryRequest)
|
storeRequest := proto.Clone(r.storeRequest).(*pb.StoreQueryRequest)
|
||||||
storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID())
|
storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID())
|
||||||
storeRequest.PaginationCursor = r.Cursor()
|
storeRequest.PaginationCursor = r.Cursor()
|
||||||
|
|
||||||
response, err := s.queryFrom(ctx, storeRequest, r.PeerID())
|
response, err := s.queryFrom(ctx, storeRequest, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -245,16 +262,27 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRequest, selectedPeer peer.ID) (*pb.StoreQueryResponse, error) {
|
func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRequest, params *Parameters) (*pb.StoreQueryResponse, error) {
|
||||||
logger := s.log.With(logging.HostID("peer", selectedPeer), zap.String("requestId", hex.EncodeToString([]byte(storeRequest.RequestId))))
|
logger := s.log.With(logging.HostID("peer", params.selectedPeer), zap.String("requestId", hex.EncodeToString([]byte(storeRequest.RequestId))))
|
||||||
|
|
||||||
logger.Debug("sending store request")
|
logger.Debug("sending store request")
|
||||||
|
|
||||||
stream, err := s.h.NewStream(ctx, selectedPeer, StoreQueryID_v300)
|
if !params.skipRatelimit {
|
||||||
|
rateLimiter, ok := s.rateLimiters[params.selectedPeer]
|
||||||
|
if !ok {
|
||||||
|
rateLimiter = rate.NewLimiter(s.defaultRatelimit, 1)
|
||||||
|
s.rateLimiters[params.selectedPeer] = rateLimiter
|
||||||
|
}
|
||||||
|
err := rateLimiter.Wait(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stream, err := s.h.NewStream(ctx, params.selectedPeer, StoreQueryID_v300)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("creating stream to peer", zap.Error(err))
|
if s.pm != nil {
|
||||||
if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok {
|
s.pm.HandleDialError(err, params.selectedPeer)
|
||||||
ps.AddConnFailure(selectedPeer)
|
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ type Parameters struct {
|
||||||
pageLimit uint64
|
pageLimit uint64
|
||||||
forward bool
|
forward bool
|
||||||
includeData bool
|
includeData bool
|
||||||
|
skipRatelimit bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type RequestOption func(*Parameters) error
|
type RequestOption func(*Parameters) error
|
||||||
|
@ -115,6 +116,14 @@ func IncludeData(v bool) RequestOption {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Skips the rate limiting for the current request (might cause the store request to fail with TOO_MANY_REQUESTS (429))
|
||||||
|
func SkipRateLimit() RequestOption {
|
||||||
|
return func(params *Parameters) error {
|
||||||
|
params.skipRatelimit = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Default options to be used when querying a store node for results
|
// Default options to be used when querying a store node for results
|
||||||
func DefaultOptions() []RequestOption {
|
func DefaultOptions() []RequestOption {
|
||||||
return []RequestOption{
|
return []RequestOption{
|
||||||
|
|
|
@ -2,6 +2,7 @@ package pb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
)
|
)
|
||||||
|
|
||||||
// MaxContentTopics is the maximum number of allowed contenttopics in a query
|
// MaxContentTopics is the maximum number of allowed contenttopics in a query
|
||||||
|
@ -10,7 +11,6 @@ const MaxContentTopics = 10
|
||||||
var (
|
var (
|
||||||
errMissingRequestID = errors.New("missing RequestId field")
|
errMissingRequestID = errors.New("missing RequestId field")
|
||||||
errMessageHashOtherFields = errors.New("cannot use MessageHashes with ContentTopics/PubsubTopic")
|
errMessageHashOtherFields = errors.New("cannot use MessageHashes with ContentTopics/PubsubTopic")
|
||||||
errRequestIDMismatch = errors.New("requestID in response does not match request")
|
|
||||||
errMaxContentTopics = errors.New("exceeds the maximum number of ContentTopics allowed")
|
errMaxContentTopics = errors.New("exceeds the maximum number of ContentTopics allowed")
|
||||||
errEmptyContentTopic = errors.New("one or more content topics specified is empty")
|
errEmptyContentTopic = errors.New("one or more content topics specified is empty")
|
||||||
errMissingPubsubTopic = errors.New("missing PubsubTopic field")
|
errMissingPubsubTopic = errors.New("missing PubsubTopic field")
|
||||||
|
@ -57,8 +57,8 @@ func (x *StoreQueryRequest) Validate() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *StoreQueryResponse) Validate(requestID string) error {
|
func (x *StoreQueryResponse) Validate(requestID string) error {
|
||||||
if x.RequestId != "" && x.RequestId != requestID {
|
if x.RequestId != "" && x.RequestId != "N/A" && x.RequestId != requestID {
|
||||||
return errRequestIDMismatch
|
return fmt.Errorf("requestID %s in response does not match requestID in request %s", x.RequestId, requestID)
|
||||||
}
|
}
|
||||||
|
|
||||||
if x.StatusCode == nil {
|
if x.StatusCode == nil {
|
||||||
|
|
|
@ -39,14 +39,14 @@ func (r *Result) Response() *pb.StoreQueryResponse {
|
||||||
return r.storeResponse
|
return r.storeResponse
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Result) Next(ctx context.Context) error {
|
func (r *Result) Next(ctx context.Context, opts ...RequestOption) error {
|
||||||
if r.cursor == nil {
|
if r.cursor == nil {
|
||||||
r.done = true
|
r.done = true
|
||||||
r.messages = nil
|
r.messages = nil
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
newResult, err := r.store.next(ctx, r)
|
newResult, err := r.store.next(ctx, r, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,11 @@ import (
|
||||||
"github.com/multiformats/go-multiaddr"
|
"github.com/multiformats/go-multiaddr"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type DialError struct {
|
||||||
|
Err error
|
||||||
|
PeerID peer.ID
|
||||||
|
}
|
||||||
|
|
||||||
// GetPeerID is used to extract the peerID from a multiaddress
|
// GetPeerID is used to extract the peerID from a multiaddress
|
||||||
func GetPeerID(m multiaddr.Multiaddr) (peer.ID, error) {
|
func GetPeerID(m multiaddr.Multiaddr) (peer.ID, error) {
|
||||||
peerIDStr, err := m.ValueForProtocol(multiaddr.P_P2P)
|
peerIDStr, err := m.ValueForProtocol(multiaddr.P_P2P)
|
||||||
|
|
|
@ -1007,7 +1007,7 @@ github.com/waku-org/go-discover/discover/v5wire
|
||||||
github.com/waku-org/go-libp2p-rendezvous
|
github.com/waku-org/go-libp2p-rendezvous
|
||||||
github.com/waku-org/go-libp2p-rendezvous/db
|
github.com/waku-org/go-libp2p-rendezvous/db
|
||||||
github.com/waku-org/go-libp2p-rendezvous/pb
|
github.com/waku-org/go-libp2p-rendezvous/pb
|
||||||
# github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da
|
# github.com/waku-org/go-waku v0.8.1-0.20240923214107-798c9c5d819a
|
||||||
## explicit; go 1.21
|
## explicit; go 1.21
|
||||||
github.com/waku-org/go-waku/logging
|
github.com/waku-org/go-waku/logging
|
||||||
github.com/waku-org/go-waku/tests
|
github.com/waku-org/go-waku/tests
|
||||||
|
|
Loading…
Reference in New Issue