From 0e02c21a806485b254a7f8980fcd7f90c6120180 Mon Sep 17 00:00:00 2001 From: Arseniy Klempner Date: Tue, 24 Sep 2024 17:02:17 -0700 Subject: [PATCH] chore_: bump go-waku --- go.mod | 2 +- go.sum | 4 +- .../go-waku/waku/v2/api/filter/filter.go | 15 +++--- .../waku/v2/api/filter/filter_manager.go | 14 ++++-- .../go-waku/waku/v2/node/wakunode2.go | 6 ++- .../go-waku/waku/v2/node/wakuoptions.go | 14 ++++++ .../waku/v2/peermanager/peer_connector.go | 10 ++-- .../waku/v2/peermanager/peer_manager.go | 29 +++++++++++ .../go-waku/waku/v2/protocol/filter/client.go | 4 +- .../go-waku/waku/v2/protocol/filter/server.go | 8 ++-- .../legacy_store/waku_store_client.go | 5 +- .../v2/protocol/lightpush/waku_lightpush.go | 5 +- .../waku/v2/protocol/peer_exchange/client.go | 6 +-- .../go-waku/waku/v2/protocol/relay/config.go | 5 ++ .../go-waku/waku/v2/protocol/store/client.go | 48 +++++++++++++++---- .../go-waku/waku/v2/protocol/store/options.go | 9 ++++ .../waku/v2/protocol/store/pb/validation.go | 6 +-- .../go-waku/waku/v2/protocol/store/result.go | 4 +- .../waku-org/go-waku/waku/v2/utils/peer.go | 5 ++ vendor/modules.txt | 2 +- 20 files changed, 146 insertions(+), 55 deletions(-) diff --git a/go.mod b/go.mod index c03523339..398c867fe 100644 --- a/go.mod +++ b/go.mod @@ -95,7 +95,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da + github.com/waku-org/go-waku v0.8.1-0.20240923214107-798c9c5d819a github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index d89cbbe3b..f93566395 100644 --- a/go.sum +++ b/go.sum @@ -2136,8 +2136,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da h1:bkAJVlJL4Ba83frABWjI9p9MeLGmEHuD/QcjYu3HNbQ= -github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg= +github.com/waku-org/go-waku v0.8.1-0.20240923214107-798c9c5d819a h1:aPT10FgDIUdsnAqy9y5Vzng/dqcr2Qyz1sXOyB7T6ik= +github.com/waku-org/go-waku v0.8.1-0.20240923214107-798c9c5d819a/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter.go index f8123704f..b8cf14550 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter.go @@ -27,6 +27,8 @@ func (fc FilterConfig) String() string { return string(jsonStr) } +const filterSubLoopInterval = 5 * time.Second + type Sub struct { ContentFilter protocol.ContentFilter DataCh chan *protocol.Envelope @@ -69,13 +71,7 @@ func defaultOptions() []SubscribeOptions { } // Subscribe -func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, opts ...SubscribeOptions) (*Sub, error) { - optList := append(defaultOptions(), opts...) - params := new(subscribeParameters) - for _, opt := range optList { - opt(params) - } - +func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, params *subscribeParameters) (*Sub, error) { sub := new(Sub) sub.id = uuid.NewString() sub.wf = wf @@ -95,8 +91,9 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte sub.multiplex(subs) } } - - go sub.subscriptionLoop(params.batchInterval) + // filter subscription loop is to check if target subscriptions for a filter are active and if not + // trigger resubscribe. + go sub.subscriptionLoop(filterSubLoopInterval) return sub, nil } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go index e4b6e524d..84261882e 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go @@ -31,7 +31,7 @@ type appFilterMap map[string]filterConfig type FilterManager struct { sync.Mutex ctx context.Context - opts []SubscribeOptions + params *subscribeParameters minPeersPerFilter int onlineChecker *onlinechecker.DefaultOnlineChecker filterSubscriptions map[string]SubDetails // map of aggregated filters to apiSub details @@ -64,7 +64,6 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter // This fn is being mocked in test mgr := new(FilterManager) mgr.ctx = ctx - mgr.opts = opts mgr.logger = logger mgr.minPeersPerFilter = minPeersPerFilter mgr.envProcessor = envProcessor @@ -72,10 +71,17 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter mgr.node = node mgr.onlineChecker = onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker) mgr.node.SetOnlineChecker(mgr.onlineChecker) - mgr.filterSubBatchDuration = 5 * time.Second mgr.incompleteFilterBatch = make(map[string]filterConfig) mgr.filterConfigs = make(appFilterMap) mgr.waitingToSubQueue = make(chan filterConfig, 100) + + //parsing the subscribe params only to read the batchInterval passed. + mgr.params = new(subscribeParameters) + opts = append(defaultOptions(), opts...) + for _, opt := range opts { + opt(mgr.params) + } + mgr.filterSubBatchDuration = mgr.params.batchInterval go mgr.startFilterSubLoop() return mgr } @@ -153,7 +159,7 @@ func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFi func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) { ctx, cancel := context.WithCancel(mgr.ctx) config := FilterConfig{MaxPeers: mgr.minPeersPerFilter} - sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.opts...) + sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.params) mgr.Lock() mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub} mgr.Unlock() diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go index f9dc443fc..380880019 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go @@ -292,7 +292,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.onlineChecker, w.opts.prometheusReg, w.log) w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log, w.opts.lightpushOpts...) - w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log) + w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log, w.opts.storeRateLimit) if params.storeFactory != nil { w.storeFactory = params.storeFactory @@ -752,7 +752,9 @@ func (w *WakuNode) DialPeerWithInfo(ctx context.Context, peerInfo peer.AddrInfo) func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error { err := w.host.Connect(ctx, info) if err != nil { - w.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(info.ID) + if w.peermanager != nil { + w.peermanager.HandleDialError(err, info.ID) + } return err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go index 445065de6..112cafe61 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go @@ -38,6 +38,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "golang.org/x/time/rate" ) // Default UserAgent @@ -94,6 +95,8 @@ type WakuNodeParameters struct { enableStore bool messageProvider legacy_store.MessageProvider + storeRateLimit rate.Limit + enableRendezvousPoint bool rendezvousDB *rendezvous.DB @@ -139,6 +142,7 @@ var DefaultWakuNodeOptions = []WakuNodeOption{ WithCircuitRelayParams(2*time.Second, 3*time.Minute), WithPeerStoreCapacity(DefaultMaxPeerStoreCapacity), WithOnlineChecker(onlinechecker.NewDefaultOnlineChecker(true)), + WithWakuStoreRateLimit(8), // Value currently set in status.staging } // MultiAddresses return the list of multiaddresses configured in the node @@ -458,6 +462,16 @@ func WithWakuFilterFullNode(filterOpts ...filter.Option) WakuNodeOption { } } +// WithWakuStoreRateLimit is used to set a default rate limit on which storenodes will +// be sent per peerID to avoid running into a TOO_MANY_REQUESTS (429) error when consuming +// the store protocol from a storenode +func WithWakuStoreRateLimit(value rate.Limit) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.storeRateLimit = value + return nil + } +} + // WithWakuStore enables the Waku V2 Store protocol and if the messages should // be stored or not in a message provider. func WithWakuStore() WakuNodeOption { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_connector.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_connector.go index bd844b20c..ac130ef0b 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_connector.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_connector.go @@ -4,7 +4,6 @@ package peermanager import ( "context" - "errors" "math/rand" "sync" "sync/atomic" @@ -277,11 +276,10 @@ func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) { ctx, cancel := context.WithTimeout(c.Context(), c.dialTimeout) defer cancel() err := c.host.Connect(ctx, pi) - if err != nil && !errors.Is(err, context.Canceled) { - c.addConnectionBackoff(pi.ID) - c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi.ID) - c.logger.Warn("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err)) + if err != nil { + c.pm.HandleDialError(err, pi.ID) + } else { + c.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(pi.ID) } - c.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(pi.ID) <-sem } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go index 2ac489a04..d548923d9 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go @@ -23,6 +23,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/metadata" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/service" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -87,6 +88,7 @@ type PeerManager struct { TopicHealthNotifCh chan<- TopicHealthStatus rttCache *FastestPeerSelector RelayEnabled bool + evtDialError event.Emitter } // PeerSelection provides various options based on which Peer is selected from a list of peers. @@ -249,6 +251,14 @@ func (pm *PeerManager) Start(ctx context.Context) { go pm.connectivityLoop(ctx) } go pm.peerStoreLoop(ctx) + + if pm.host != nil { + var err error + pm.evtDialError, err = pm.host.EventBus().Emitter(new(utils.DialError)) + if err != nil { + pm.logger.Error("failed to create dial error emitter", zap.Error(err)) + } + } } func (pm *PeerManager) peerStoreLoop(ctx context.Context) { @@ -719,3 +729,22 @@ func (pm *PeerManager) addPeerToServiceSlot(proto protocol.ID, peerID peer.ID) { // getPeers returns nil for WakuRelayIDv200 protocol, but we don't run this ServiceSlot code for WakuRelayIDv200 protocol pm.serviceSlots.getPeers(proto).add(peerID) } + +func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) { + if err == nil || errors.Is(err, context.Canceled) { + return + } + if pm.peerConnector != nil { + pm.peerConnector.addConnectionBackoff(peerID) + } + if pm.host != nil { + pm.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(peerID) + } + pm.logger.Warn("connecting to peer", logging.HostID("peerID", peerID), zap.Error(err)) + if pm.evtDialError != nil { + emitterErr := pm.evtDialError.Emit(utils.DialError{Err: err, PeerID: peerID}) + if emitterErr != nil { + pm.logger.Error("failed to emit DialError", zap.Error(emitterErr)) + } + } +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go index a9d2b496d..41b497848 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go @@ -245,8 +245,8 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte, stream, err := wf.h.NewStream(ctx, peerID, FilterSubscribeID_v20beta1) if err != nil { wf.metrics.RecordError(dialFailure) - if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(peerID) + if wf.pm != nil { + wf.pm.HandleDialError(err, peerID) } return err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go index 9a2e25d6b..2b17de4b8 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go @@ -14,7 +14,7 @@ import ( "github.com/libp2p/go-msgio/pbio" "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" - "github.com/waku-org/go-waku/waku/v2/peerstore" + "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" @@ -38,6 +38,7 @@ type ( log *zap.Logger *service.CommonService subscriptions *SubscribersMap + pm *peermanager.PeerManager maxSubscriptions int } @@ -61,6 +62,7 @@ func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Regi wf.maxSubscriptions = params.MaxSubscribers if params.pm != nil { params.pm.RegisterWakuProtocol(FilterSubscribeID_v20beta1, FilterSubscribeENRField) + wf.pm = params.pm } return wf } @@ -274,8 +276,8 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logge wf.metrics.RecordError(pushTimeoutFailure) } else { wf.metrics.RecordError(dialFailure) - if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(peerID) + if wf.pm != nil { + wf.pm.HandleDialError(err, peerID) } } logger.Error("opening peer stream", zap.Error(err)) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_client.go index 03f7c9b21..ef971f003 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/waku_store_client.go @@ -205,10 +205,9 @@ func (store *WakuStore) queryFrom(ctx context.Context, historyRequest *pb.Histor stream, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4) if err != nil { - logger.Error("creating stream to peer", zap.Error(err)) store.metrics.RecordError(dialFailure) - if ps, ok := store.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(selectedPeer) + if store.pm != nil { + store.pm.HandleDialError(err, selectedPeer) } return nil, err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go index 8200fddfc..f0c005a64 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go @@ -195,10 +195,9 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p stream, err := wakuLP.h.NewStream(ctx, peerID, LightPushID_v20beta1) if err != nil { - logger.Error("creating stream to peer", zap.Error(err)) wakuLP.metrics.RecordError(dialFailure) - if ps, ok := wakuLP.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(peerID) + if wakuLP.pm != nil { + wakuLP.pm.HandleDialError(err, peerID) } return nil, err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/client.go index 8075e2de8..8a9d8e127 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/client.go @@ -76,8 +76,8 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts stream, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1) if err != nil { - if ps, ok := wakuPX.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(params.selectedPeer) + if wakuPX.pm != nil { + wakuPX.pm.HandleDialError(err, params.selectedPeer) } return err } @@ -123,13 +123,11 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb } if params.clusterID != 0 { - wakuPX.log.Debug("clusterID is non zero, filtering by shard") rs, err := wenr.RelaySharding(enrRecord) if err != nil || rs == nil || !rs.Contains(uint16(params.clusterID), uint16(params.shard)) { wakuPX.log.Debug("peer doesn't matches filter", zap.Int("shard", params.shard)) continue } - wakuPX.log.Debug("peer matches filter", zap.Int("shard", params.shard)) } enodeRecord, err := enode.New(enode.ValidSchemes, enrRecord) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/config.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/config.go index f0f41f800..bc1ea9334 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/config.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/config.go @@ -13,6 +13,10 @@ import ( var DefaultRelaySubscriptionBufferSize int = 1024 +// trying to match value here https://github.com/vacp2p/nim-libp2p/pull/1077 +// note that nim-libp2p has 2 peer queues 1 for priority and other non-priority, whereas go-libp2p seems to have single peer-queue +var DefaultPeerOutboundQSize int = 1024 + type RelaySubscribeParameters struct { dontConsume bool cacheSize uint @@ -109,6 +113,7 @@ func (w *WakuRelay) defaultPubsubOptions() []pubsub.Option { pubsub.WithSeenMessagesTTL(2 * time.Minute), pubsub.WithPeerScore(w.peerScoreParams, w.peerScoreThresholds), pubsub.WithPeerScoreInspect(w.peerScoreInspector, 6*time.Second), + pubsub.WithPeerOutboundQueueSize(DefaultPeerOutboundQSize), } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go index 92c47ff4c..f7427b979 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go @@ -19,6 +19,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/timesource" "go.uber.org/zap" + "golang.org/x/time/rate" "google.golang.org/protobuf/proto" ) @@ -69,14 +70,19 @@ type WakuStore struct { timesource timesource.Timesource log *zap.Logger pm *peermanager.PeerManager + + defaultRatelimit rate.Limit + rateLimiters map[peer.ID]*rate.Limiter } // NewWakuStore is used to instantiate a StoreV3 client -func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource, log *zap.Logger) *WakuStore { +func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource, log *zap.Logger, defaultRatelimit rate.Limit) *WakuStore { s := new(WakuStore) s.log = log.Named("store-client") s.timesource = timesource s.pm = pm + s.defaultRatelimit = defaultRatelimit + s.rateLimiters = make(map[peer.ID]*rate.Limiter) if pm != nil { pm.RegisterWakuProtocol(StoreQueryID_v300, StoreENRField) @@ -171,7 +177,7 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ return nil, err } - response, err := s.queryFrom(ctx, storeRequest, params.selectedPeer) + response, err := s.queryFrom(ctx, storeRequest, params) if err != nil { return nil, err } @@ -211,7 +217,7 @@ func (s *WakuStore) Exists(ctx context.Context, messageHash wpb.MessageHash, opt return len(result.messages) != 0, nil } -func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) { +func (s *WakuStore) next(ctx context.Context, r *Result, opts ...RequestOption) (*Result, error) { if r.IsComplete() { return &Result{ store: s, @@ -223,11 +229,22 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) { }, nil } + params := new(Parameters) + params.selectedPeer = r.PeerID() + optList := DefaultOptions() + optList = append(optList, opts...) + for _, opt := range optList { + err := opt(params) + if err != nil { + return nil, err + } + } + storeRequest := proto.Clone(r.storeRequest).(*pb.StoreQueryRequest) storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID()) storeRequest.PaginationCursor = r.Cursor() - response, err := s.queryFrom(ctx, storeRequest, r.PeerID()) + response, err := s.queryFrom(ctx, storeRequest, params) if err != nil { return nil, err } @@ -245,16 +262,27 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) { } -func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRequest, selectedPeer peer.ID) (*pb.StoreQueryResponse, error) { - logger := s.log.With(logging.HostID("peer", selectedPeer), zap.String("requestId", hex.EncodeToString([]byte(storeRequest.RequestId)))) +func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRequest, params *Parameters) (*pb.StoreQueryResponse, error) { + logger := s.log.With(logging.HostID("peer", params.selectedPeer), zap.String("requestId", hex.EncodeToString([]byte(storeRequest.RequestId)))) logger.Debug("sending store request") - stream, err := s.h.NewStream(ctx, selectedPeer, StoreQueryID_v300) + if !params.skipRatelimit { + rateLimiter, ok := s.rateLimiters[params.selectedPeer] + if !ok { + rateLimiter = rate.NewLimiter(s.defaultRatelimit, 1) + s.rateLimiters[params.selectedPeer] = rateLimiter + } + err := rateLimiter.Wait(ctx) + if err != nil { + return nil, err + } + } + + stream, err := s.h.NewStream(ctx, params.selectedPeer, StoreQueryID_v300) if err != nil { - logger.Error("creating stream to peer", zap.Error(err)) - if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(selectedPeer) + if s.pm != nil { + s.pm.HandleDialError(err, params.selectedPeer) } return nil, err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/options.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/options.go index b38afd53a..b8deba47c 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/options.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/options.go @@ -19,6 +19,7 @@ type Parameters struct { pageLimit uint64 forward bool includeData bool + skipRatelimit bool } type RequestOption func(*Parameters) error @@ -115,6 +116,14 @@ func IncludeData(v bool) RequestOption { } } +// Skips the rate limiting for the current request (might cause the store request to fail with TOO_MANY_REQUESTS (429)) +func SkipRateLimit() RequestOption { + return func(params *Parameters) error { + params.skipRatelimit = true + return nil + } +} + // Default options to be used when querying a store node for results func DefaultOptions() []RequestOption { return []RequestOption{ diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/pb/validation.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/pb/validation.go index f54dea90e..542c6a052 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/pb/validation.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/pb/validation.go @@ -2,6 +2,7 @@ package pb import ( "errors" + "fmt" ) // MaxContentTopics is the maximum number of allowed contenttopics in a query @@ -10,7 +11,6 @@ const MaxContentTopics = 10 var ( errMissingRequestID = errors.New("missing RequestId field") errMessageHashOtherFields = errors.New("cannot use MessageHashes with ContentTopics/PubsubTopic") - errRequestIDMismatch = errors.New("requestID in response does not match request") errMaxContentTopics = errors.New("exceeds the maximum number of ContentTopics allowed") errEmptyContentTopic = errors.New("one or more content topics specified is empty") errMissingPubsubTopic = errors.New("missing PubsubTopic field") @@ -57,8 +57,8 @@ func (x *StoreQueryRequest) Validate() error { } func (x *StoreQueryResponse) Validate(requestID string) error { - if x.RequestId != "" && x.RequestId != requestID { - return errRequestIDMismatch + if x.RequestId != "" && x.RequestId != "N/A" && x.RequestId != requestID { + return fmt.Errorf("requestID %s in response does not match requestID in request %s", x.RequestId, requestID) } if x.StatusCode == nil { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/result.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/result.go index 5ea4765ec..604d6453c 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/result.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/result.go @@ -39,14 +39,14 @@ func (r *Result) Response() *pb.StoreQueryResponse { return r.storeResponse } -func (r *Result) Next(ctx context.Context) error { +func (r *Result) Next(ctx context.Context, opts ...RequestOption) error { if r.cursor == nil { r.done = true r.messages = nil return nil } - newResult, err := r.store.next(ctx, r) + newResult, err := r.store.next(ctx, r, opts...) if err != nil { return err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/utils/peer.go b/vendor/github.com/waku-org/go-waku/waku/v2/utils/peer.go index 8321dc3e3..b732fa149 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/utils/peer.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/utils/peer.go @@ -5,6 +5,11 @@ import ( "github.com/multiformats/go-multiaddr" ) +type DialError struct { + Err error + PeerID peer.ID +} + // GetPeerID is used to extract the peerID from a multiaddress func GetPeerID(m multiaddr.Multiaddr) (peer.ID, error) { peerIDStr, err := m.ValueForProtocol(multiaddr.P_P2P) diff --git a/vendor/modules.txt b/vendor/modules.txt index 634b75422..21d0505d6 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1007,7 +1007,7 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da +# github.com/waku-org/go-waku v0.8.1-0.20240923214107-798c9c5d819a ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests