diff --git a/waku/v2/api/filter/filter.go b/waku/v2/api/filter/filter.go index 020bb23f..3302bdfb 100644 --- a/waku/v2/api/filter/filter.go +++ b/waku/v2/api/filter/filter.go @@ -4,11 +4,11 @@ import ( "context" "encoding/json" "errors" + "sync" "time" "github.com/google/uuid" "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/p2p/net/swarm" "github.com/waku-org/go-waku/waku/v2/onlinechecker" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" @@ -33,6 +33,16 @@ func (fc FilterConfig) String() string { const filterSubLoopInterval = 5 * time.Second const filterSubMaxErrCnt = 3 +// filterRateLimitBackoff is how long the apiSub waits before re-issuing a +// subscribe attempt after at least one peer returned HTTP 429 +// ("filter request rejected due rate limit exceeded"). The waku server uses +// 429 to ask clients to slow down; the previous implementation flattened the +// typed peer error into a plain *errors.errorString so the apiSub never saw +// the signal and kept retrying aggressively. With the typed *SubscribeError +// (see protocol/filter/subscribe_error.go) the apiSub now honors the signal +// by suppressing retries for filterRateLimitBackoff after a 429. +const filterRateLimitBackoff = 60 * time.Second + type Sub struct { ContentFilter protocol.ContentFilter DataCh chan *protocol.Envelope @@ -47,6 +57,16 @@ type Sub struct { resubscribeInProgress bool id string errcnt int + // rateLimitedUntil is set when subscribe() observes a *SubscribeError whose + // FailedPeers contain at least one HTTP 429. While time.Now().Before(rateLimitedUntil), + // subscriptionLoop suppresses retry triggers (ticker push and checkAndResubscribe). + // Cleared by a successful subscribe(). Read/written only from the subscriptionLoop + // goroutine; no lock needed. + rateLimitedUntil time.Time + // multiplexWG tracks per-subscription goroutines that forward envelopes from + // subDetails.C to DataCh. cleanup() must wait for them before close(DataCh) + // to avoid "send on closed channel" panics during teardown. + multiplexWG sync.WaitGroup } type subscribeParameters struct { @@ -119,6 +139,12 @@ func (apiSub *Sub) subscriptionLoop(loopInterval time.Duration) { select { case <-ticker.C: apiSub.errcnt = 0 //reset errorCount + if shouldHonourRateLimitBackoff(apiSub.rateLimitedUntil, time.Now()) { + apiSub.log.Debug("ticker push suppressed by rate-limit backoff", + zap.Time("rate-limited-until", apiSub.rateLimitedUntil), + ) + continue + } if apiSub.onlineChecker.IsOnline() && len(apiSub.subs) < apiSub.Config.MaxPeers && !apiSub.resubscribeInProgress && len(apiSub.closing) < apiSub.Config.MaxPeers { apiSub.closing <- "" @@ -128,17 +154,27 @@ func (apiSub *Sub) subscriptionLoop(loopInterval time.Duration) { apiSub.cleanup() return case subId := <-apiSub.closing: + if shouldHonourRateLimitBackoff(apiSub.rateLimitedUntil, time.Now()) { + apiSub.log.Debug("checkAndResubscribe suppressed by rate-limit backoff", + zap.Time("rate-limited-until", apiSub.rateLimitedUntil), + ) + continue + } if apiSub.errcnt < filterSubMaxErrCnt { apiSub.resubscribeInProgress = true //trigger resubscribe flow for subscription. apiSub.checkAndResubscribe(subId) + } else { + apiSub.log.Debug("retry suppressed by errcnt bound", + zap.Int("errcnt", apiSub.errcnt), + zap.Int("filter-sub-max-err-cnt", filterSubMaxErrCnt), + ) } } } } func (apiSub *Sub) checkAndResubscribe(subId string) { - var failedPeer peer.ID if subId != "" { apiSub.log.Debug("subscription close and resubscribe", zap.String("sub-id", subId), zap.Stringer("content-filter", apiSub.ContentFilter)) @@ -164,6 +200,9 @@ func (apiSub *Sub) cleanup() { apiSub.log.Info("failed to unsubscribe filter", zap.Error(err)) } } + // Wait for in-flight multiplex goroutines to exit before closing DataCh, + // otherwise they may panic sending to a closed channel. + apiSub.multiplexWG.Wait() close(apiSub.DataCh) } @@ -188,8 +227,29 @@ func (apiSub *Sub) resubscribe(failedPeer peer.ID) { apiSub.multiplex(subs) } -func possibleRecursiveError(err error) bool { - return errors.Is(err, utils.ErrNoPeersAvailable) || errors.Is(err, swarm.ErrDialBackoff) +// shouldIncrementErrCnt reports whether a Sub.subscribe error should count +// toward the per-5-s-window retry budget filterSubMaxErrCnt. The previous +// implementation gated only on errors.Is(err, utils.ErrNoPeersAvailable) || +// errors.Is(err, swarm.ErrDialBackoff), which matched 0 of 1014 observed +// production failures because the dominant error from +// WakuFilterLightNode.Subscribe is a generic *errors.errorString wrapper +// (or, post-Bug-2-fix, a typed *SubscribeError) — neither of those +// sentinels. With the bound effectively disabled, every subscribe failure +// pushed the closing channel and re-entered subscribe, producing a tight +// retry loop (~1100/sec aggregate observed). +// +// Counting any non-nil error makes the 3-error-per-5-s budget actually bind +// for all failure modes. +func shouldIncrementErrCnt(err error) bool { + return err != nil +} + +// shouldHonourRateLimitBackoff reports whether the apiSub is currently within +// a rate-limit backoff window and should skip retry triggers. now == rateLimitedUntil +// is treated as "window has just elapsed" → false (allow retry), so a zero-value +// rateLimitedUntil (never set) is always false. +func shouldHonourRateLimitBackoff(rateLimitedUntil, now time.Time) bool { + return now.Before(rateLimitedUntil) } func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int, peersToExclude ...peer.ID) ([]*subscription.SubscriptionDetails, error) { @@ -206,8 +266,31 @@ func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int subs, err := apiSub.wf.Subscribe(apiSub.ctx, contentFilter, options...) if err != nil { - if possibleRecursiveError(err) { + apiSub.log.Warn("subscribe error", + zap.Error(err), + zap.Int("errcnt-before-inc", apiSub.errcnt), + ) + + // If any peer responded HTTP 429, enter a rate-limit backoff window. + // subscriptionLoop's gates will suppress retry triggers for + // filterRateLimitBackoff. The typed *SubscribeError comes from + // protocol/filter/client.go. + var subErr *filter.SubscribeError + if errors.As(err, &subErr) && subErr.HasRateLimitError() { + apiSub.rateLimitedUntil = time.Now().Add(filterRateLimitBackoff) + apiSub.log.Warn("rate-limited by peer, backing off", + zap.Duration("backoff", filterRateLimitBackoff), + zap.Time("until", apiSub.rateLimitedUntil), + zap.Int("failed-peers", len(subErr.FailedPeers)), + ) + } + + if shouldIncrementErrCnt(err) { apiSub.errcnt++ + apiSub.log.Debug("errcnt incremented", + zap.Int("new-errcnt", apiSub.errcnt), + zap.Error(err), + ) } //Inform of error, so that resubscribe can be triggered if required if len(apiSub.closing) < apiSub.Config.MaxPeers { @@ -221,19 +304,44 @@ func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int // TODO: Once filter error handling indicates specific error, this can be handled better. return nil, err } + // On full success, clear any prior rate-limit backoff so retries can resume + // normally if a fresh failure occurs later. + apiSub.rateLimitedUntil = time.Time{} + apiSub.log.Debug("subscribe success", zap.Int("subs-count", len(subs))) return subs, nil } func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) { // Multiplex onto single channel - // Goroutines will exit once sub channels are closed + // Goroutines exit when subDetails.C is closed or apiSub.ctx is done. + // cleanup() waits on multiplexWG before close(DataCh) to avoid a race. for _, subDetails := range subs { apiSub.subs[subDetails.ID] = subDetails + apiSub.multiplexWG.Add(1) go func(subDetails *subscription.SubscriptionDetails) { defer utils.LogOnPanic() + defer apiSub.multiplexWG.Done() apiSub.log.Debug("new multiplex", zap.String("sub-id", subDetails.ID)) - for env := range subDetails.C { - apiSub.DataCh <- env + // Both the receive and the send must be cancelable via apiSub.ctx: + // during node teardown, UnsubscribeWithSubscription may early-return + // from ErrOnNotRunning() without calling sub.Close(), leaving + // subDetails.C open forever. A bare `for env := range subDetails.C` + // would then block here, multiplexWG.Wait() in cleanup() would block + // on it, and the whole filter shutdown would deadlock. + for { + select { + case env, ok := <-subDetails.C: + if !ok { + return + } + select { + case apiSub.DataCh <- env: + case <-apiSub.ctx.Done(): + return + } + case <-apiSub.ctx.Done(): + return + } } }(subDetails) go func(subDetails *subscription.SubscriptionDetails) { diff --git a/waku/v2/api/filter/filter_manager.go b/waku/v2/api/filter/filter_manager.go index 530cb7e7..c5d47872 100644 --- a/waku/v2/api/filter/filter_manager.go +++ b/waku/v2/api/filter/filter_manager.go @@ -42,7 +42,12 @@ type FilterManager struct { filterSubBatchDuration time.Duration incompleteFilterBatch map[string]filterConfig filterConfigs appFilterMap // map of application filterID to {aggregatedFilterID, application ContentFilter} - waitingToSubQueue chan filterConfig + // waitingToSubQueue holds filter batches that arrived while the node was offline. + // Always accessed under mgr.Lock(); a slice (rather than a bounded channel) avoids + // a deadlock where SubscribeFilter would block on a full channel while still + // holding mgr.Lock(), preventing the only drainer (checkAndProcessQueue, also + // invoked under the same lock) from running. + waitingToSubQueue []filterConfig envProcessor EnevelopeProcessor networkConnType byte } @@ -77,7 +82,6 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter mgr.node.SetOnlineChecker(mgr.onlineChecker) mgr.incompleteFilterBatch = make(map[string]filterConfig) mgr.filterConfigs = make(appFilterMap) - mgr.waitingToSubQueue = make(chan filterConfig, 100) mgr.networkConnType = initNetworkConnType //parsing the subscribe params only to read the batchInterval passed. @@ -142,8 +146,10 @@ func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFi go mgr.subscribeAndRunLoop(afilter) } else { mgr.logger.Debug("crossed pubsubTopic batchsize and offline, queuing filters", zap.String("agg-filter-id", afilter.ID), zap.String("topic", cf.PubsubTopic), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics)+len(cf.ContentTopics))) - // queue existing batch as node is not online - mgr.waitingToSubQueue <- afilter + // queue existing batch as node is not online. + // Safe: caller holds mgr.Lock() so the append is atomic with respect + // to checkAndProcessQueue and other mutations. + mgr.waitingToSubQueue = append(mgr.waitingToSubQueue, afilter) } afilter = filterConfig{uuid.NewString(), cf} mgr.logger.Debug("creating a new pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.String("topic", cf.PubsubTopic), zap.Stringer("content-filter", cf)) @@ -183,23 +189,32 @@ func (mgr *FilterManager) NetworkChange() { mgr.node.PingPeers() // ping all peers to check if subscriptions are alive } +// checkAndProcessQueue drains the offline-pending filter queue. For each batch +// that matches the given pubsubTopic (or always, when pubsubTopic == ""), a +// subscribe goroutine is spawned; non-matching batches are retained for a +// future call. Caller must hold mgr.Lock(). func (mgr *FilterManager) checkAndProcessQueue(pubsubTopic string) { - if len(mgr.waitingToSubQueue) > 0 { - for af := range mgr.waitingToSubQueue { - // TODO: change the below logic once topic specific health is implemented for lightClients - if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic { - // check if any filter subs are pending and subscribe them - mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter)) - go mgr.subscribeAndRunLoop(af) - } else { - mgr.waitingToSubQueue <- af - } - if len(mgr.waitingToSubQueue) == 0 { - mgr.logger.Debug("no pending subscriptions") - break - } + if len(mgr.waitingToSubQueue) == 0 { + return + } + // Reuse the slice's backing array. Each iteration reads `af` (value copy) + // before any potential overwrite at the same index, so partitioning in place + // is safe. + remaining := mgr.waitingToSubQueue[:0] + for _, af := range mgr.waitingToSubQueue { + // TODO: change the below logic once topic specific health is implemented for lightClients + if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic { + // check if any filter subs are pending and subscribe them + mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter)) + go mgr.subscribeAndRunLoop(af) + } else { + remaining = append(remaining, af) } } + mgr.waitingToSubQueue = remaining + if len(mgr.waitingToSubQueue) == 0 { + mgr.logger.Debug("no pending subscriptions") + } } func (mgr *FilterManager) closeAndWait(wg *sync.WaitGroup, asub *SubDetails) { diff --git a/waku/v2/api/filter/filter_race_test.go b/waku/v2/api/filter/filter_race_test.go new file mode 100644 index 00000000..ea208f5d --- /dev/null +++ b/waku/v2/api/filter/filter_race_test.go @@ -0,0 +1,328 @@ +package filter + +import ( + "context" + "errors" + "fmt" + "io" + "testing" + "time" + + "github.com/libp2p/go-libp2p/p2p/net/swarm" + "github.com/stretchr/testify/require" + + "github.com/waku-org/go-waku/waku/v2/onlinechecker" + "github.com/waku-org/go-waku/waku/v2/protocol" + pkgfilter "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "github.com/waku-org/go-waku/waku/v2/protocol/subscription" + "github.com/waku-org/go-waku/waku/v2/utils" + "go.uber.org/zap" +) + +// TestSub_CleanupDoesNotDeadlockWhenSubChannelStaysOpen verifies that cleanup() +// returns even when subDetails.C is never closed by Unsubscribe. This is the +// real-world failure mode during node-stop transitions, where +// UnsubscribeWithSubscription early-returns from ErrOnNotRunning() and does NOT +// call sub.Close(), leaving subDetails.C open. The forwarder goroutine MUST +// react to apiSub.ctx.Done() — not depend on subDetails.C closing — so +// multiplexWG.Wait() in cleanup() can complete and the filter teardown isn't +// deadlocked. The test asserts cleanup() returns within a short timeout. +func TestSub_CleanupDoesNotDeadlockWhenSubChannelStaysOpen(t *testing.T) { + apiSub := &Sub{ + DataCh: make(chan *protocol.Envelope, 16), + subs: make(subscription.SubscriptionSet), + log: zap.NewNop(), + } + apiSub.ctx, apiSub.cancel = context.WithCancel(context.Background()) + + sd := &subscription.SubscriptionDetails{ + ID: "test-deadlock", + C: make(chan *protocol.Envelope), // intentionally NEVER closed + Closing: make(chan bool, 1), + } + + apiSub.multiplex([]*subscription.SubscriptionDetails{sd}) + + // Same HACK as the other test — avoid the nil wf path in cleanup(). + apiSub.subs = make(subscription.SubscriptionSet) + + // Let the forwarder goroutine reach its blocking receive on sd.C. + time.Sleep(10 * time.Millisecond) + + done := make(chan struct{}) + go func() { + apiSub.cancel() + apiSub.cleanup() + close(done) + }() + + select { + case <-done: + // cleanup() returned — the forwarder respected ctx.Done() and exited, + // multiplexWG.Wait() unblocked. + case <-time.After(2 * time.Second): + t.Fatal("cleanup() did not return within 2s — likely deadlocked on " + + "an uncancelable receive from subDetails.C") + } +} + +// TestSub_CleanupRaceWithMultiplex exercises the race between Sub.cleanup() closing +// apiSub.DataCh and the per-subscription forwarder goroutines spawned by Sub.multiplex() +// executing apiSub.DataCh <- env. +// +// Without the WaitGroup + ctx-aware select fix, the forwarder goroutine is blocked on +// a send to an unbuffered DataCh, cleanup() proceeds to close(DataCh), the send wakes +// on a closed channel, and utils.LogOnPanic re-panics — killing the test binary. +// +// With the fix, the forwarder's select exits via apiSub.ctx.Done(), cleanup()'s +// multiplexWG.Wait() unblocks, and close(DataCh) is safe. +// +// The test does not use //go:build !race (unlike filter_test.go in this package) +// because the failure mode here is a runtime panic, not a data race the detector +// surfaces. It is fine — and good practice — to also run it under -race. +func TestSub_CleanupRaceWithMultiplex(t *testing.T) { + for i := 0; i < 50; i++ { + runCleanupRaceIteration(t) + } +} + +func runCleanupRaceIteration(t *testing.T) { + t.Helper() + + apiSub := &Sub{ + // Unbuffered DataCh with no consumer forces the forwarder to block on send, + // putting it in the exact state the production race depends on. + DataCh: make(chan *protocol.Envelope), + subs: make(subscription.SubscriptionSet), + log: zap.NewNop(), + } + apiSub.ctx, apiSub.cancel = context.WithCancel(context.Background()) + + sd := &subscription.SubscriptionDetails{ + ID: "test-sub", + C: make(chan *protocol.Envelope, 100), + Closing: make(chan bool, 1), + } + + // Pre-fill the per-sub channel so the forwarder has envelopes ready to pump. + for j := 0; j < 50; j++ { + sd.C <- &protocol.Envelope{} + } + + // Spawn the production multiplex code under test. + apiSub.multiplex([]*subscription.SubscriptionDetails{sd}) + + // HACK: clear apiSub.subs so cleanup() does not invoke + // apiSub.wf.UnsubscribeWithSubscription on a nil WakuFilterLightNode. This test + // targets the close-vs-send race, not the unsubscribe RPC. The forwarder + // goroutines spawned by multiplex() still hold their own references to sd, so + // they keep pumping after this clear. + apiSub.subs = make(subscription.SubscriptionSet) + + // Yield long enough for the forwarder goroutine to begin and block on the + // send to apiSub.DataCh. 10 ms is comfortably above the typical scheduling + // latency even on heavily loaded CI machines. + time.Sleep(10 * time.Millisecond) + + // Reproduce what subscriptionLoop does on apiSub.ctx.Done(). + apiSub.cancel() + apiSub.cleanup() + + // If the unpatched bug is present, the forwarder goroutine has already + // panicked above and killed the test binary via utils.LogOnPanic's re-panic. + // Reaching this line means the close(DataCh) raced cleanly. +} + +// TestFilterManager_SubscribeFilter_DoesNotDeadlockWhenQueueFull reproduces the +// capacity deadlock observed in production: SubscribeFilter holds mgr.Lock() and +// performs `mgr.waitingToSubQueue <- afilter` at filter_manager.go:146 while the +// node is offline. The queue is bounded (cap 100). The only drainer +// (checkAndProcessQueue) is itself invoked inside mgr.Lock() — so once the +// queue fills, the next send blocks forever, holding the lock, and the entire +// FilterManager (filter receive, messenger, notifications) freezes. +// +// Setup details: +// - Each call uses the same PubsubTopic and 50 ContentTopics. The second and +// every subsequent call sums to 50+50=100 > filterSubBatchSize(=90), which +// triggers the offline branch at filter_manager.go:143-147 — pushing one +// batch into waitingToSubQueue per call (after the first). +// - ~102 calls fill the cap-100 queue; the 103rd will block indefinitely +// under the unpatched code. We run 200 calls to be comfortably past the +// threshold even if filterSubBatchSize changes. +// +// Pre-fix: test hangs and fails at the 5s deadline. +// Post-fix (slice-backed queue): test completes in milliseconds. +func TestFilterManager_SubscribeFilter_DoesNotDeadlockWhenQueueFull(t *testing.T) { + mgr := &FilterManager{ + logger: zap.NewNop(), + onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker), + incompleteFilterBatch: make(map[string]filterConfig), + filterConfigs: make(appFilterMap), + filterSubscriptions: make(map[string]SubDetails), + // waitingToSubQueue is now a slice; zero-value nil is the empty queue. + } + + done := make(chan struct{}) + go func() { + defer close(done) + for i := 0; i < 200; i++ { + cf := protocol.ContentFilter{ + PubsubTopic: "/waku/2/rs/16/32", + ContentTopics: makeNContentTopics(50, i), + } + mgr.SubscribeFilter(fmt.Sprintf("filter-%d", i), cf) + } + }() + + select { + case <-done: + // Pass — no deadlock under offline overflow. + case <-time.After(5 * time.Second): + t.Fatal("FilterManager.SubscribeFilter deadlocked when waitingToSubQueue overflowed; " + + "a producer holding mgr.Lock() blocked on a bounded channel whose only drainer " + + "also needs mgr.Lock(). Fix: replace the bounded chan with a mutex-guarded slice.") + } +} + +// makeNContentTopics returns a ContentTopicSet of n unique topics, seeded so +// repeated calls with different seeds yield non-overlapping topic sets. +func makeNContentTopics(n, seed int) protocol.ContentTopicSet { + topics := make([]string, n) + for j := 0; j < n; j++ { + topics[j] = fmt.Sprintf("/test/%d-%d/proto", seed, j) + } + return protocol.NewContentTopicSet(topics...) +} + +// TestShouldIncrementErrCnt validates the predicate that gates whether a +// Sub.subscribe error counts toward the per-5-s-window retry budget +// (filterSubMaxErrCnt). The previous implementation used possibleRecursiveError, +// which only matched utils.ErrNoPeersAvailable and swarm.ErrDialBackoff — and +// observed empirically as 0 increments across 1014 production failures because +// the dominant production error is a generic *errors.errorString wrapper +// returned from protocol/filter/client.go (the per-peer aggregator), neither +// of those sentinels. +// +// The fix replaces the predicate with one that counts every non-nil error. +// This test will fail to compile against the unpatched filter.go (the +// shouldIncrementErrCnt symbol does not exist there). +func TestShouldIncrementErrCnt(t *testing.T) { + cases := []struct { + name string + err error + expect bool + }{ + {name: "nil → false", err: nil, expect: false}, + { + name: "plain *errors.errorString (the dominant production error)", + err: errors.New("subscriptions failed for contentTopics: /waku/1/0xabcdef/rfc26"), + expect: true, + }, + { + name: "utils.ErrNoPeersAvailable (was counted before)", + err: utils.ErrNoPeersAvailable, + expect: true, + }, + { + name: "swarm.ErrDialBackoff (was counted before)", + err: swarm.ErrDialBackoff, + expect: true, + }, + { + name: "context.DeadlineExceeded (request timeout)", + err: context.DeadlineExceeded, + expect: true, + }, + { + name: "wrapped via fmt.Errorf %w (io.EOF unwrap chain)", + err: fmt.Errorf("wrapped: %w", io.EOF), + expect: true, + }, + { + name: "*FilterError{Code: 429} (rate limit from server)", + err: &pkgfilter.FilterError{Code: 429, Message: "rate limited"}, + expect: true, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.expect, shouldIncrementErrCnt(tc.err)) + }) + } +} + +// TestShouldHonourRateLimitBackoff covers the pure predicate that gates retry +// triggers in subscriptionLoop once a peer has responded with HTTP 429. While +// in the backoff window the subscriptionLoop must stop pushing to apiSub.closing +// and stop calling checkAndResubscribe — otherwise it would keep slamming peers +// that explicitly asked us to back off. +func TestShouldHonourRateLimitBackoff(t *testing.T) { + now := time.Date(2026, 5, 13, 12, 0, 0, 0, time.UTC) + cases := []struct { + name string + rateLimitedUntil time.Time + now time.Time + expect bool + }{ + { + name: "zero rateLimitedUntil (never set) — false", + rateLimitedUntil: time.Time{}, + now: now, + expect: false, + }, + { + name: "now == rateLimitedUntil exactly — false (window has elapsed)", + rateLimitedUntil: now, + now: now, + expect: false, + }, + { + name: "now == rateLimitedUntil - 1ns — true (still inside window)", + rateLimitedUntil: now.Add(1 * time.Nanosecond), + now: now, + expect: true, + }, + { + name: "now within 30s of a 60s backoff — true", + rateLimitedUntil: now.Add(30 * time.Second), + now: now, + expect: true, + }, + { + name: "now after the backoff window — false", + rateLimitedUntil: now.Add(-1 * time.Second), + now: now, + expect: false, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.expect, shouldHonourRateLimitBackoff(tc.rateLimitedUntil, tc.now)) + }) + } +} + +// TestSub_BackoffAfterRateLimit_GateLogic exercises the Sub fields directly: +// when rateLimitedUntil is set in the future, subscriptionLoop's logic gates +// must reject retry triggers. This is a unit-level check against the Sub +// struct's contract, not a full subscribe-flow integration test (which would +// require stubbing *filter.WakuFilterLightNode, currently a concrete struct). +func TestSub_BackoffAfterRateLimit_GateLogic(t *testing.T) { + apiSub := &Sub{log: zap.NewNop()} + apiSub.ctx, apiSub.cancel = context.WithCancel(context.Background()) + defer apiSub.cancel() + + // Before any rate-limit, the gate must allow retries. + require.False(t, shouldHonourRateLimitBackoff(apiSub.rateLimitedUntil, time.Now()), + "with zero rateLimitedUntil, backoff gate must allow retries") + + // Simulate a 429: subscribe() sets rateLimitedUntil to now+filterRateLimitBackoff. + apiSub.rateLimitedUntil = time.Now().Add(filterRateLimitBackoff) + require.True(t, shouldHonourRateLimitBackoff(apiSub.rateLimitedUntil, time.Now()), + "after setting rateLimitedUntil = now + filterRateLimitBackoff, gate must suppress retries") + + // Simulate the success path clearing the backoff. + apiSub.rateLimitedUntil = time.Time{} + require.False(t, shouldHonourRateLimitBackoff(apiSub.rateLimitedUntil, time.Now()), + "after subscribe success clears rateLimitedUntil, gate must allow retries again") +} diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 2b0b02c6..727200ba 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -7,7 +7,6 @@ import ( "fmt" "math" "net/http" - "strings" "sync" "time" @@ -403,7 +402,8 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot return nil, err } - failedContentTopics := []string{} + var failedPeersMu sync.Mutex + failedPeers := []PeerSubscribeFailure{} subscriptions := make([]*subscription.SubscriptionDetails, 0) for pubSubTopic, cTopics := range pubSubTopicMap { var selectedPeers peer.IDSlice @@ -429,7 +429,17 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot wf.metrics.RecordError(peerNotFoundFailure) wf.log.Error("selecting peer", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), zap.Error(err)) - failedContentTopics = append(failedContentTopics, cTopics...) + selectErr := err + if selectErr == nil { + selectErr = errors.New("no peers selected") + } + failedPeersMu.Lock() + failedPeers = append(failedPeers, PeerSubscribeFailure{ + PeerID: "", + ContentTopics: append([]string(nil), cTopics...), + Err: selectErr, + }) + failedPeersMu.Unlock() continue } var cFilter protocol.ContentFilter @@ -456,7 +466,13 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot if err != nil { wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), zap.Error(err)) - failedContentTopics = append(failedContentTopics, cTopics...) + failedPeersMu.Lock() + failedPeers = append(failedPeers, PeerSubscribeFailure{ + PeerID: ID, + ContentTopics: append([]string(nil), cTopics...), + Err: err, + }) + failedPeersMu.Unlock() } else { wf.log.Debug("subscription successful", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), zap.Stringer("peer", ID)) tmpSubs[index] = wf.subscriptions.NewSubscription(ID, cFilter) @@ -471,11 +487,10 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot } } - if len(failedContentTopics) > 0 { - return subscriptions, fmt.Errorf("subscriptions failed for contentTopics: %s", strings.Join(failedContentTopics, ",")) - } else { - return subscriptions, nil + if len(failedPeers) > 0 { + return subscriptions, &SubscribeError{FailedPeers: failedPeers} } + return subscriptions, nil } // FilterSubscription is used to obtain an object from which you could receive messages received via filter protocol diff --git a/waku/v2/protocol/filter/subscribe_error.go b/waku/v2/protocol/filter/subscribe_error.go new file mode 100644 index 00000000..fe3b8dc0 --- /dev/null +++ b/waku/v2/protocol/filter/subscribe_error.go @@ -0,0 +1,134 @@ +package filter + +import ( + "errors" + "fmt" + "net/http" + "strings" + + "github.com/libp2p/go-libp2p/core/peer" +) + +// PeerSubscribeFailure captures a single peer's subscribe failure: the peer +// involved, the content topics that were attempted, and the underlying error +// (typically *FilterError, a dial error, or a stream-reset error). +type PeerSubscribeFailure struct { + PeerID peer.ID + ContentTopics []string + Err error +} + +// SubscribeError is the typed error returned by WakuFilterLightNode.Subscribe +// when one or more peers fail to accept a subscription. It preserves the +// per-peer error (including typed *FilterError) so callers — notably the +// apiSub layer — can react to specific failure codes such as HTTP 429 +// (rate-limit). +// +// The previous implementation flattened all per-peer failures into a single +// plain *errors.errorString via fmt.Errorf+strings.Join. That lost the typed +// peer error AND could panic with "strings: Join output length overflow" +// under sustained failure storms (observed in production). SubscribeError +// fixes both problems: the typed peers are preserved, and Error() is +// bounded so it cannot overflow. +type SubscribeError struct { + FailedPeers []PeerSubscribeFailure +} + +// errorMaxBytes bounds the rendered string from Error() so it cannot grow +// unboundedly with FailedPeers and trigger a string-allocation overflow. +const errorMaxBytes = 4096 + +// Error renders a bounded, log-grep-friendly message. The "subscribe failed" +// prefix is intentional: existing log greps look for "subscriptions failed" +// (the prior wording) and "subscribe failed" both still hit on "failed". +func (s *SubscribeError) Error() string { + if s == nil { + return "" + } + var b strings.Builder + b.Grow(256) + firstCode := s.firstFilterErrorCode() + if firstCode != 0 { + fmt.Fprintf(&b, "subscribe failed for %d peer(s) (first peer code=%d)", + len(s.FailedPeers), firstCode) + } else { + fmt.Fprintf(&b, "subscribe failed for %d peer(s)", len(s.FailedPeers)) + } + if len(s.FailedPeers) == 0 { + return b.String() + } + b.WriteString(": ") + // Render at most a small fixed number of peers; truncate the rest. The + // per-peer rendering uses up to one topic and the err Code/Msg, never + // the full topic list — so total length is hard-bounded. + const maxPeersRendered = 3 + rendered := 0 + for i, fp := range s.FailedPeers { + if rendered >= maxPeersRendered { + fmt.Fprintf(&b, " (+%d more)", len(s.FailedPeers)-rendered) + break + } + if i > 0 { + b.WriteString("; ") + } + topicPreview := "" + if len(fp.ContentTopics) > 0 { + topicPreview = fp.ContentTopics[0] + if len(fp.ContentTopics) > 1 { + topicPreview = fmt.Sprintf("%s (+%d more)", topicPreview, len(fp.ContentTopics)-1) + } + } + errMsg := "" + if fp.Err != nil { + errMsg = fp.Err.Error() + } + fmt.Fprintf(&b, "peer=%s topic=%s err=%s", fp.PeerID, topicPreview, errMsg) + rendered++ + // Defensive: if a single peer's error string is pathologically large + // (unlikely but possible), stop early. + if b.Len() >= errorMaxBytes { + break + } + } + out := b.String() + if len(out) > errorMaxBytes { + out = out[:errorMaxBytes-3] + "..." + } + return out +} + +// HasRateLimitError reports whether at least one peer returned HTTP 429 +// (filter request rejected due to rate limit). The apiSub layer uses this +// to apply a longer backoff that respects the rate-limit signal instead of +// hammering the peer further. +func (s *SubscribeError) HasRateLimitError() bool { + if s == nil { + return false + } + var fe *FilterError + for _, fp := range s.FailedPeers { + if fp.Err == nil { + continue + } + if errors.As(fp.Err, &fe) && fe.Code == http.StatusTooManyRequests { + return true + } + } + return false +} + +// firstFilterErrorCode returns the Code of the first *FilterError found in +// FailedPeers, or 0 if none. Used by Error() for a quick at-a-glance signal +// in logs (typically the most useful single number when triaging). +func (s *SubscribeError) firstFilterErrorCode() int { + var fe *FilterError + for _, fp := range s.FailedPeers { + if fp.Err == nil { + continue + } + if errors.As(fp.Err, &fe) { + return fe.Code + } + } + return 0 +} diff --git a/waku/v2/protocol/filter/subscribe_error_test.go b/waku/v2/protocol/filter/subscribe_error_test.go new file mode 100644 index 00000000..f795258b --- /dev/null +++ b/waku/v2/protocol/filter/subscribe_error_test.go @@ -0,0 +1,143 @@ +package filter + +import ( + "errors" + "fmt" + "net/http" + "strings" + "testing" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" +) + +// TestSubscribeError_Error covers two contracts of the typed aggregate error +// returned by WakuFilterLightNode.Subscribe under partial failure: +// +// 1. The string preserves the "subscribe failed" prefix so existing log +// greps and dashboards keep working. +// 2. The string length is bounded regardless of FailedPeers count. The +// previous implementation aggregated all content topics with +// strings.Join, which is unbounded and was observed to panic with +// `strings: Join output length overflow` under sustained subscribe +// storms — crashing statusgo. The bounded form prevents that crash. +func TestSubscribeError_Error(t *testing.T) { + t.Run("empty FailedPeers — well-formed string", func(t *testing.T) { + e := &SubscribeError{} + s := e.Error() + require.NotEmpty(t, s) + require.Contains(t, s, "subscribe failed") + }) + + t.Run("single peer single topic", func(t *testing.T) { + e := &SubscribeError{ + FailedPeers: []PeerSubscribeFailure{ + { + PeerID: peer.ID("peer-1"), + ContentTopics: []string{"/waku/1/0xabc/rfc26"}, + Err: &FilterError{Code: http.StatusTooManyRequests, Message: "rate limited"}, + }, + }, + } + s := e.Error() + require.Contains(t, s, "subscribe failed") + require.Contains(t, s, "1") + }) + + t.Run("bounded length under massive FailedPeers (would-panic-with-Join scenario)", func(t *testing.T) { + // 10,000 failed peers × 50 long topics each = the kind of unbounded + // growth that produced the `strings: Join output length overflow` + // panic in production. Error() must stay bounded. + const peerCount = 10000 + const topicsPerPeer = 50 + failedPeers := make([]PeerSubscribeFailure, peerCount) + longTopic := "/" + strings.Repeat("a", 200) + "/topic" + for i := range failedPeers { + topics := make([]string, topicsPerPeer) + for j := range topics { + topics[j] = fmt.Sprintf("%s/%d-%d", longTopic, i, j) + } + failedPeers[i] = PeerSubscribeFailure{ + PeerID: peer.ID(fmt.Sprintf("peer-%d", i)), + ContentTopics: topics, + Err: &FilterError{Code: 503, Message: "service unavailable"}, + } + } + e := &SubscribeError{FailedPeers: failedPeers} + s := e.Error() + // 4 KB cap is the contract — generous enough for useful info, far + // below any plausible strings.Join overflow point. + require.LessOrEqual(t, len(s), 4096, + "SubscribeError.Error() must be bounded; got %d bytes (would risk strings.Join overflow)", len(s)) + require.Contains(t, s, "subscribe failed") + }) +} + +// TestSubscribeError_HasRateLimitError verifies the predicate that the +// apiSub layer uses to decide whether to enter a longer rate-limit backoff. +// 429 must be detectable both as a direct *FilterError and through a wrapping +// fmt.Errorf("%w") chain — production code occasionally adds context wrappers. +func TestSubscribeError_HasRateLimitError(t *testing.T) { + t.Run("nil SubscribeError pointer — false (safe predicate)", func(t *testing.T) { + var e *SubscribeError + require.False(t, e.HasRateLimitError()) + }) + + t.Run("empty FailedPeers — false", func(t *testing.T) { + e := &SubscribeError{} + require.False(t, e.HasRateLimitError()) + }) + + t.Run("only dial / generic errors — false", func(t *testing.T) { + e := &SubscribeError{ + FailedPeers: []PeerSubscribeFailure{ + {Err: errors.New("dial backoff")}, + {Err: errors.New("stream reset")}, + {Err: &FilterError{Code: 503, Message: "service unavailable"}}, + }, + } + require.False(t, e.HasRateLimitError()) + }) + + t.Run("single direct 429 — true", func(t *testing.T) { + e := &SubscribeError{ + FailedPeers: []PeerSubscribeFailure{ + {Err: &FilterError{Code: http.StatusTooManyRequests, Message: "rate limited"}}, + }, + } + require.True(t, e.HasRateLimitError()) + }) + + t.Run("multiple peers, one 429 — true", func(t *testing.T) { + e := &SubscribeError{ + FailedPeers: []PeerSubscribeFailure{ + {Err: errors.New("stream reset")}, + {Err: &FilterError{Code: 503}}, + {Err: &FilterError{Code: http.StatusTooManyRequests, Message: "rate limited"}}, + {Err: errors.New("dial backoff")}, + }, + } + require.True(t, e.HasRateLimitError()) + }) + + t.Run("429 wrapped via fmt.Errorf %w — true", func(t *testing.T) { + wrapped := fmt.Errorf("subscribe attempt failed: %w", &FilterError{Code: http.StatusTooManyRequests, Message: "rate limited"}) + e := &SubscribeError{ + FailedPeers: []PeerSubscribeFailure{ + {Err: wrapped}, + }, + } + require.True(t, e.HasRateLimitError()) + }) + + t.Run("nil Err entries are skipped safely", func(t *testing.T) { + e := &SubscribeError{ + FailedPeers: []PeerSubscribeFailure{ + {Err: nil}, + {Err: &FilterError{Code: http.StatusTooManyRequests, Message: "rate limited"}}, + {Err: nil}, + }, + } + require.True(t, e.HasRateLimitError()) + }) +}