fix(filter): bound per-Sub retry storm under sustained subscribe failures

Sustained subscribe failures saturated CPU, leaked 600+ subscriptionLoop
  goroutines, and twice panicked with `strings: Join output length overflow`.
  Five independent issues:

  - api/filter: errcnt budget was gated on `possibleRecursiveError`, which
    matched only `ErrNoPeersAvailable` / `swarm.ErrDialBackoff`. The dominant
    error class never incremented errcnt, so the 3-error-per-5s budget was
    dead code. Replaced gate with `shouldIncrementErrCnt(err)`: counts every
    non-nil error.

  - protocol/filter: WakuFilterLightNode.Subscribe flattened per-peer errors
    via `fmt.Errorf+strings.Join`, losing typed *FilterError and growing
    unboundedly. Replaced with typed `*SubscribeError` (PeerID, ContentTopics,
    Err) plus `HasRateLimitError()`; `Error()` is hard-capped. Concurrent
    per-peer appends now mutex-guarded.

  - api/filter: 60-s rate-limit backoff on `*SubscribeError.HasRateLimitError()`.
    `shouldHonourRateLimitBackoff(rateLimitedUntil, now)` gates ticker push and
    closing-channel checkAndResubscribe. Cleared on subscribe success.

  - api/filter: FilterManager.waitingToSubQueue was a cap-100 chan written and
    drained under the same lock, deadlocking the manager once full. Replaced
    with mutex-guarded slice.

  - api/filter: Sub.cleanup closed DataCh while multiplex forwarders could
    still be sending. Added multiplexWG awaited in cleanup; forwarder send is
    in a select with apiSub.ctx.Done() so it can't deadlock when
    subDetails.C is never closed (node-stop transitions).

  Tests (all under -race):
  - TestSub_CleanupRaceWithMultiplex (50 iter)
  - TestSub_CleanupDoesNotDeadlockWhenSubChannelStaysOpen
  - TestFilterManager_SubscribeFilter_DoesNotDeadlockWhenQueueFull
  - TestShouldIncrementErrCnt
This commit is contained in:
Alex Jbanca 2026-05-14 16:02:12 +03:00 committed by Siddarth Kumar
parent de84ba47f9
commit 2da08c3e14
6 changed files with 777 additions and 34 deletions

View File

@ -4,11 +4,11 @@ import (
"context"
"encoding/json"
"errors"
"sync"
"time"
"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
@ -33,6 +33,16 @@ func (fc FilterConfig) String() string {
const filterSubLoopInterval = 5 * time.Second
const filterSubMaxErrCnt = 3
// filterRateLimitBackoff is how long the apiSub waits before re-issuing a
// subscribe attempt after at least one peer returned HTTP 429
// ("filter request rejected due rate limit exceeded"). The waku server uses
// 429 to ask clients to slow down; the previous implementation flattened the
// typed peer error into a plain *errors.errorString so the apiSub never saw
// the signal and kept retrying aggressively. With the typed *SubscribeError
// (see protocol/filter/subscribe_error.go) the apiSub now honors the signal
// by suppressing retries for filterRateLimitBackoff after a 429.
const filterRateLimitBackoff = 60 * time.Second
type Sub struct {
ContentFilter protocol.ContentFilter
DataCh chan *protocol.Envelope
@ -47,6 +57,16 @@ type Sub struct {
resubscribeInProgress bool
id string
errcnt int
// rateLimitedUntil is set when subscribe() observes a *SubscribeError whose
// FailedPeers contain at least one HTTP 429. While time.Now().Before(rateLimitedUntil),
// subscriptionLoop suppresses retry triggers (ticker push and checkAndResubscribe).
// Cleared by a successful subscribe(). Read/written only from the subscriptionLoop
// goroutine; no lock needed.
rateLimitedUntil time.Time
// multiplexWG tracks per-subscription goroutines that forward envelopes from
// subDetails.C to DataCh. cleanup() must wait for them before close(DataCh)
// to avoid "send on closed channel" panics during teardown.
multiplexWG sync.WaitGroup
}
type subscribeParameters struct {
@ -119,6 +139,12 @@ func (apiSub *Sub) subscriptionLoop(loopInterval time.Duration) {
select {
case <-ticker.C:
apiSub.errcnt = 0 //reset errorCount
if shouldHonourRateLimitBackoff(apiSub.rateLimitedUntil, time.Now()) {
apiSub.log.Debug("ticker push suppressed by rate-limit backoff",
zap.Time("rate-limited-until", apiSub.rateLimitedUntil),
)
continue
}
if apiSub.onlineChecker.IsOnline() && len(apiSub.subs) < apiSub.Config.MaxPeers &&
!apiSub.resubscribeInProgress && len(apiSub.closing) < apiSub.Config.MaxPeers {
apiSub.closing <- ""
@ -128,17 +154,27 @@ func (apiSub *Sub) subscriptionLoop(loopInterval time.Duration) {
apiSub.cleanup()
return
case subId := <-apiSub.closing:
if shouldHonourRateLimitBackoff(apiSub.rateLimitedUntil, time.Now()) {
apiSub.log.Debug("checkAndResubscribe suppressed by rate-limit backoff",
zap.Time("rate-limited-until", apiSub.rateLimitedUntil),
)
continue
}
if apiSub.errcnt < filterSubMaxErrCnt {
apiSub.resubscribeInProgress = true
//trigger resubscribe flow for subscription.
apiSub.checkAndResubscribe(subId)
} else {
apiSub.log.Debug("retry suppressed by errcnt bound",
zap.Int("errcnt", apiSub.errcnt),
zap.Int("filter-sub-max-err-cnt", filterSubMaxErrCnt),
)
}
}
}
}
func (apiSub *Sub) checkAndResubscribe(subId string) {
var failedPeer peer.ID
if subId != "" {
apiSub.log.Debug("subscription close and resubscribe", zap.String("sub-id", subId), zap.Stringer("content-filter", apiSub.ContentFilter))
@ -164,6 +200,9 @@ func (apiSub *Sub) cleanup() {
apiSub.log.Info("failed to unsubscribe filter", zap.Error(err))
}
}
// Wait for in-flight multiplex goroutines to exit before closing DataCh,
// otherwise they may panic sending to a closed channel.
apiSub.multiplexWG.Wait()
close(apiSub.DataCh)
}
@ -188,8 +227,29 @@ func (apiSub *Sub) resubscribe(failedPeer peer.ID) {
apiSub.multiplex(subs)
}
func possibleRecursiveError(err error) bool {
return errors.Is(err, utils.ErrNoPeersAvailable) || errors.Is(err, swarm.ErrDialBackoff)
// shouldIncrementErrCnt reports whether a Sub.subscribe error should count
// toward the per-5-s-window retry budget filterSubMaxErrCnt. The previous
// implementation gated only on errors.Is(err, utils.ErrNoPeersAvailable) ||
// errors.Is(err, swarm.ErrDialBackoff), which matched 0 of 1014 observed
// production failures because the dominant error from
// WakuFilterLightNode.Subscribe is a generic *errors.errorString wrapper
// (or, post-Bug-2-fix, a typed *SubscribeError) — neither of those
// sentinels. With the bound effectively disabled, every subscribe failure
// pushed the closing channel and re-entered subscribe, producing a tight
// retry loop (~1100/sec aggregate observed).
//
// Counting any non-nil error makes the 3-error-per-5-s budget actually bind
// for all failure modes.
func shouldIncrementErrCnt(err error) bool {
return err != nil
}
// shouldHonourRateLimitBackoff reports whether the apiSub is currently within
// a rate-limit backoff window and should skip retry triggers. now == rateLimitedUntil
// is treated as "window has just elapsed" → false (allow retry), so a zero-value
// rateLimitedUntil (never set) is always false.
func shouldHonourRateLimitBackoff(rateLimitedUntil, now time.Time) bool {
return now.Before(rateLimitedUntil)
}
func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int, peersToExclude ...peer.ID) ([]*subscription.SubscriptionDetails, error) {
@ -206,8 +266,31 @@ func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int
subs, err := apiSub.wf.Subscribe(apiSub.ctx, contentFilter, options...)
if err != nil {
if possibleRecursiveError(err) {
apiSub.log.Warn("subscribe error",
zap.Error(err),
zap.Int("errcnt-before-inc", apiSub.errcnt),
)
// If any peer responded HTTP 429, enter a rate-limit backoff window.
// subscriptionLoop's gates will suppress retry triggers for
// filterRateLimitBackoff. The typed *SubscribeError comes from
// protocol/filter/client.go.
var subErr *filter.SubscribeError
if errors.As(err, &subErr) && subErr.HasRateLimitError() {
apiSub.rateLimitedUntil = time.Now().Add(filterRateLimitBackoff)
apiSub.log.Warn("rate-limited by peer, backing off",
zap.Duration("backoff", filterRateLimitBackoff),
zap.Time("until", apiSub.rateLimitedUntil),
zap.Int("failed-peers", len(subErr.FailedPeers)),
)
}
if shouldIncrementErrCnt(err) {
apiSub.errcnt++
apiSub.log.Debug("errcnt incremented",
zap.Int("new-errcnt", apiSub.errcnt),
zap.Error(err),
)
}
//Inform of error, so that resubscribe can be triggered if required
if len(apiSub.closing) < apiSub.Config.MaxPeers {
@ -221,19 +304,44 @@ func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int
// TODO: Once filter error handling indicates specific error, this can be handled better.
return nil, err
}
// On full success, clear any prior rate-limit backoff so retries can resume
// normally if a fresh failure occurs later.
apiSub.rateLimitedUntil = time.Time{}
apiSub.log.Debug("subscribe success", zap.Int("subs-count", len(subs)))
return subs, nil
}
func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) {
// Multiplex onto single channel
// Goroutines will exit once sub channels are closed
// Goroutines exit when subDetails.C is closed or apiSub.ctx is done.
// cleanup() waits on multiplexWG before close(DataCh) to avoid a race.
for _, subDetails := range subs {
apiSub.subs[subDetails.ID] = subDetails
apiSub.multiplexWG.Add(1)
go func(subDetails *subscription.SubscriptionDetails) {
defer utils.LogOnPanic()
defer apiSub.multiplexWG.Done()
apiSub.log.Debug("new multiplex", zap.String("sub-id", subDetails.ID))
for env := range subDetails.C {
apiSub.DataCh <- env
// Both the receive and the send must be cancelable via apiSub.ctx:
// during node teardown, UnsubscribeWithSubscription may early-return
// from ErrOnNotRunning() without calling sub.Close(), leaving
// subDetails.C open forever. A bare `for env := range subDetails.C`
// would then block here, multiplexWG.Wait() in cleanup() would block
// on it, and the whole filter shutdown would deadlock.
for {
select {
case env, ok := <-subDetails.C:
if !ok {
return
}
select {
case apiSub.DataCh <- env:
case <-apiSub.ctx.Done():
return
}
case <-apiSub.ctx.Done():
return
}
}
}(subDetails)
go func(subDetails *subscription.SubscriptionDetails) {

View File

@ -42,7 +42,12 @@ type FilterManager struct {
filterSubBatchDuration time.Duration
incompleteFilterBatch map[string]filterConfig
filterConfigs appFilterMap // map of application filterID to {aggregatedFilterID, application ContentFilter}
waitingToSubQueue chan filterConfig
// waitingToSubQueue holds filter batches that arrived while the node was offline.
// Always accessed under mgr.Lock(); a slice (rather than a bounded channel) avoids
// a deadlock where SubscribeFilter would block on a full channel while still
// holding mgr.Lock(), preventing the only drainer (checkAndProcessQueue, also
// invoked under the same lock) from running.
waitingToSubQueue []filterConfig
envProcessor EnevelopeProcessor
networkConnType byte
}
@ -77,7 +82,6 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter
mgr.node.SetOnlineChecker(mgr.onlineChecker)
mgr.incompleteFilterBatch = make(map[string]filterConfig)
mgr.filterConfigs = make(appFilterMap)
mgr.waitingToSubQueue = make(chan filterConfig, 100)
mgr.networkConnType = initNetworkConnType
//parsing the subscribe params only to read the batchInterval passed.
@ -142,8 +146,10 @@ func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFi
go mgr.subscribeAndRunLoop(afilter)
} else {
mgr.logger.Debug("crossed pubsubTopic batchsize and offline, queuing filters", zap.String("agg-filter-id", afilter.ID), zap.String("topic", cf.PubsubTopic), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics)+len(cf.ContentTopics)))
// queue existing batch as node is not online
mgr.waitingToSubQueue <- afilter
// queue existing batch as node is not online.
// Safe: caller holds mgr.Lock() so the append is atomic with respect
// to checkAndProcessQueue and other mutations.
mgr.waitingToSubQueue = append(mgr.waitingToSubQueue, afilter)
}
afilter = filterConfig{uuid.NewString(), cf}
mgr.logger.Debug("creating a new pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.String("topic", cf.PubsubTopic), zap.Stringer("content-filter", cf))
@ -183,23 +189,32 @@ func (mgr *FilterManager) NetworkChange() {
mgr.node.PingPeers() // ping all peers to check if subscriptions are alive
}
// checkAndProcessQueue drains the offline-pending filter queue. For each batch
// that matches the given pubsubTopic (or always, when pubsubTopic == ""), a
// subscribe goroutine is spawned; non-matching batches are retained for a
// future call. Caller must hold mgr.Lock().
func (mgr *FilterManager) checkAndProcessQueue(pubsubTopic string) {
if len(mgr.waitingToSubQueue) > 0 {
for af := range mgr.waitingToSubQueue {
// TODO: change the below logic once topic specific health is implemented for lightClients
if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic {
// check if any filter subs are pending and subscribe them
mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter))
go mgr.subscribeAndRunLoop(af)
} else {
mgr.waitingToSubQueue <- af
}
if len(mgr.waitingToSubQueue) == 0 {
mgr.logger.Debug("no pending subscriptions")
break
}
if len(mgr.waitingToSubQueue) == 0 {
return
}
// Reuse the slice's backing array. Each iteration reads `af` (value copy)
// before any potential overwrite at the same index, so partitioning in place
// is safe.
remaining := mgr.waitingToSubQueue[:0]
for _, af := range mgr.waitingToSubQueue {
// TODO: change the below logic once topic specific health is implemented for lightClients
if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic {
// check if any filter subs are pending and subscribe them
mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter))
go mgr.subscribeAndRunLoop(af)
} else {
remaining = append(remaining, af)
}
}
mgr.waitingToSubQueue = remaining
if len(mgr.waitingToSubQueue) == 0 {
mgr.logger.Debug("no pending subscriptions")
}
}
func (mgr *FilterManager) closeAndWait(wg *sync.WaitGroup, asub *SubDetails) {

View File

@ -0,0 +1,328 @@
package filter
import (
"context"
"errors"
"fmt"
"io"
"testing"
"time"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
"github.com/waku-org/go-waku/waku/v2/protocol"
pkgfilter "github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
// TestSub_CleanupDoesNotDeadlockWhenSubChannelStaysOpen verifies that cleanup()
// returns even when subDetails.C is never closed by Unsubscribe. This is the
// real-world failure mode during node-stop transitions, where
// UnsubscribeWithSubscription early-returns from ErrOnNotRunning() and does NOT
// call sub.Close(), leaving subDetails.C open. The forwarder goroutine MUST
// react to apiSub.ctx.Done() — not depend on subDetails.C closing — so
// multiplexWG.Wait() in cleanup() can complete and the filter teardown isn't
// deadlocked. The test asserts cleanup() returns within a short timeout.
func TestSub_CleanupDoesNotDeadlockWhenSubChannelStaysOpen(t *testing.T) {
apiSub := &Sub{
DataCh: make(chan *protocol.Envelope, 16),
subs: make(subscription.SubscriptionSet),
log: zap.NewNop(),
}
apiSub.ctx, apiSub.cancel = context.WithCancel(context.Background())
sd := &subscription.SubscriptionDetails{
ID: "test-deadlock",
C: make(chan *protocol.Envelope), // intentionally NEVER closed
Closing: make(chan bool, 1),
}
apiSub.multiplex([]*subscription.SubscriptionDetails{sd})
// Same HACK as the other test — avoid the nil wf path in cleanup().
apiSub.subs = make(subscription.SubscriptionSet)
// Let the forwarder goroutine reach its blocking receive on sd.C.
time.Sleep(10 * time.Millisecond)
done := make(chan struct{})
go func() {
apiSub.cancel()
apiSub.cleanup()
close(done)
}()
select {
case <-done:
// cleanup() returned — the forwarder respected ctx.Done() and exited,
// multiplexWG.Wait() unblocked.
case <-time.After(2 * time.Second):
t.Fatal("cleanup() did not return within 2s — likely deadlocked on " +
"an uncancelable receive from subDetails.C")
}
}
// TestSub_CleanupRaceWithMultiplex exercises the race between Sub.cleanup() closing
// apiSub.DataCh and the per-subscription forwarder goroutines spawned by Sub.multiplex()
// executing apiSub.DataCh <- env.
//
// Without the WaitGroup + ctx-aware select fix, the forwarder goroutine is blocked on
// a send to an unbuffered DataCh, cleanup() proceeds to close(DataCh), the send wakes
// on a closed channel, and utils.LogOnPanic re-panics — killing the test binary.
//
// With the fix, the forwarder's select exits via apiSub.ctx.Done(), cleanup()'s
// multiplexWG.Wait() unblocks, and close(DataCh) is safe.
//
// The test does not use //go:build !race (unlike filter_test.go in this package)
// because the failure mode here is a runtime panic, not a data race the detector
// surfaces. It is fine — and good practice — to also run it under -race.
func TestSub_CleanupRaceWithMultiplex(t *testing.T) {
for i := 0; i < 50; i++ {
runCleanupRaceIteration(t)
}
}
func runCleanupRaceIteration(t *testing.T) {
t.Helper()
apiSub := &Sub{
// Unbuffered DataCh with no consumer forces the forwarder to block on send,
// putting it in the exact state the production race depends on.
DataCh: make(chan *protocol.Envelope),
subs: make(subscription.SubscriptionSet),
log: zap.NewNop(),
}
apiSub.ctx, apiSub.cancel = context.WithCancel(context.Background())
sd := &subscription.SubscriptionDetails{
ID: "test-sub",
C: make(chan *protocol.Envelope, 100),
Closing: make(chan bool, 1),
}
// Pre-fill the per-sub channel so the forwarder has envelopes ready to pump.
for j := 0; j < 50; j++ {
sd.C <- &protocol.Envelope{}
}
// Spawn the production multiplex code under test.
apiSub.multiplex([]*subscription.SubscriptionDetails{sd})
// HACK: clear apiSub.subs so cleanup() does not invoke
// apiSub.wf.UnsubscribeWithSubscription on a nil WakuFilterLightNode. This test
// targets the close-vs-send race, not the unsubscribe RPC. The forwarder
// goroutines spawned by multiplex() still hold their own references to sd, so
// they keep pumping after this clear.
apiSub.subs = make(subscription.SubscriptionSet)
// Yield long enough for the forwarder goroutine to begin and block on the
// send to apiSub.DataCh. 10 ms is comfortably above the typical scheduling
// latency even on heavily loaded CI machines.
time.Sleep(10 * time.Millisecond)
// Reproduce what subscriptionLoop does on apiSub.ctx.Done().
apiSub.cancel()
apiSub.cleanup()
// If the unpatched bug is present, the forwarder goroutine has already
// panicked above and killed the test binary via utils.LogOnPanic's re-panic.
// Reaching this line means the close(DataCh) raced cleanly.
}
// TestFilterManager_SubscribeFilter_DoesNotDeadlockWhenQueueFull reproduces the
// capacity deadlock observed in production: SubscribeFilter holds mgr.Lock() and
// performs `mgr.waitingToSubQueue <- afilter` at filter_manager.go:146 while the
// node is offline. The queue is bounded (cap 100). The only drainer
// (checkAndProcessQueue) is itself invoked inside mgr.Lock() — so once the
// queue fills, the next send blocks forever, holding the lock, and the entire
// FilterManager (filter receive, messenger, notifications) freezes.
//
// Setup details:
// - Each call uses the same PubsubTopic and 50 ContentTopics. The second and
// every subsequent call sums to 50+50=100 > filterSubBatchSize(=90), which
// triggers the offline branch at filter_manager.go:143-147 — pushing one
// batch into waitingToSubQueue per call (after the first).
// - ~102 calls fill the cap-100 queue; the 103rd will block indefinitely
// under the unpatched code. We run 200 calls to be comfortably past the
// threshold even if filterSubBatchSize changes.
//
// Pre-fix: test hangs and fails at the 5s deadline.
// Post-fix (slice-backed queue): test completes in milliseconds.
func TestFilterManager_SubscribeFilter_DoesNotDeadlockWhenQueueFull(t *testing.T) {
mgr := &FilterManager{
logger: zap.NewNop(),
onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker),
incompleteFilterBatch: make(map[string]filterConfig),
filterConfigs: make(appFilterMap),
filterSubscriptions: make(map[string]SubDetails),
// waitingToSubQueue is now a slice; zero-value nil is the empty queue.
}
done := make(chan struct{})
go func() {
defer close(done)
for i := 0; i < 200; i++ {
cf := protocol.ContentFilter{
PubsubTopic: "/waku/2/rs/16/32",
ContentTopics: makeNContentTopics(50, i),
}
mgr.SubscribeFilter(fmt.Sprintf("filter-%d", i), cf)
}
}()
select {
case <-done:
// Pass — no deadlock under offline overflow.
case <-time.After(5 * time.Second):
t.Fatal("FilterManager.SubscribeFilter deadlocked when waitingToSubQueue overflowed; " +
"a producer holding mgr.Lock() blocked on a bounded channel whose only drainer " +
"also needs mgr.Lock(). Fix: replace the bounded chan with a mutex-guarded slice.")
}
}
// makeNContentTopics returns a ContentTopicSet of n unique topics, seeded so
// repeated calls with different seeds yield non-overlapping topic sets.
func makeNContentTopics(n, seed int) protocol.ContentTopicSet {
topics := make([]string, n)
for j := 0; j < n; j++ {
topics[j] = fmt.Sprintf("/test/%d-%d/proto", seed, j)
}
return protocol.NewContentTopicSet(topics...)
}
// TestShouldIncrementErrCnt validates the predicate that gates whether a
// Sub.subscribe error counts toward the per-5-s-window retry budget
// (filterSubMaxErrCnt). The previous implementation used possibleRecursiveError,
// which only matched utils.ErrNoPeersAvailable and swarm.ErrDialBackoff — and
// observed empirically as 0 increments across 1014 production failures because
// the dominant production error is a generic *errors.errorString wrapper
// returned from protocol/filter/client.go (the per-peer aggregator), neither
// of those sentinels.
//
// The fix replaces the predicate with one that counts every non-nil error.
// This test will fail to compile against the unpatched filter.go (the
// shouldIncrementErrCnt symbol does not exist there).
func TestShouldIncrementErrCnt(t *testing.T) {
cases := []struct {
name string
err error
expect bool
}{
{name: "nil → false", err: nil, expect: false},
{
name: "plain *errors.errorString (the dominant production error)",
err: errors.New("subscriptions failed for contentTopics: /waku/1/0xabcdef/rfc26"),
expect: true,
},
{
name: "utils.ErrNoPeersAvailable (was counted before)",
err: utils.ErrNoPeersAvailable,
expect: true,
},
{
name: "swarm.ErrDialBackoff (was counted before)",
err: swarm.ErrDialBackoff,
expect: true,
},
{
name: "context.DeadlineExceeded (request timeout)",
err: context.DeadlineExceeded,
expect: true,
},
{
name: "wrapped via fmt.Errorf %w (io.EOF unwrap chain)",
err: fmt.Errorf("wrapped: %w", io.EOF),
expect: true,
},
{
name: "*FilterError{Code: 429} (rate limit from server)",
err: &pkgfilter.FilterError{Code: 429, Message: "rate limited"},
expect: true,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.expect, shouldIncrementErrCnt(tc.err))
})
}
}
// TestShouldHonourRateLimitBackoff covers the pure predicate that gates retry
// triggers in subscriptionLoop once a peer has responded with HTTP 429. While
// in the backoff window the subscriptionLoop must stop pushing to apiSub.closing
// and stop calling checkAndResubscribe — otherwise it would keep slamming peers
// that explicitly asked us to back off.
func TestShouldHonourRateLimitBackoff(t *testing.T) {
now := time.Date(2026, 5, 13, 12, 0, 0, 0, time.UTC)
cases := []struct {
name string
rateLimitedUntil time.Time
now time.Time
expect bool
}{
{
name: "zero rateLimitedUntil (never set) — false",
rateLimitedUntil: time.Time{},
now: now,
expect: false,
},
{
name: "now == rateLimitedUntil exactly — false (window has elapsed)",
rateLimitedUntil: now,
now: now,
expect: false,
},
{
name: "now == rateLimitedUntil - 1ns — true (still inside window)",
rateLimitedUntil: now.Add(1 * time.Nanosecond),
now: now,
expect: true,
},
{
name: "now within 30s of a 60s backoff — true",
rateLimitedUntil: now.Add(30 * time.Second),
now: now,
expect: true,
},
{
name: "now after the backoff window — false",
rateLimitedUntil: now.Add(-1 * time.Second),
now: now,
expect: false,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.expect, shouldHonourRateLimitBackoff(tc.rateLimitedUntil, tc.now))
})
}
}
// TestSub_BackoffAfterRateLimit_GateLogic exercises the Sub fields directly:
// when rateLimitedUntil is set in the future, subscriptionLoop's logic gates
// must reject retry triggers. This is a unit-level check against the Sub
// struct's contract, not a full subscribe-flow integration test (which would
// require stubbing *filter.WakuFilterLightNode, currently a concrete struct).
func TestSub_BackoffAfterRateLimit_GateLogic(t *testing.T) {
apiSub := &Sub{log: zap.NewNop()}
apiSub.ctx, apiSub.cancel = context.WithCancel(context.Background())
defer apiSub.cancel()
// Before any rate-limit, the gate must allow retries.
require.False(t, shouldHonourRateLimitBackoff(apiSub.rateLimitedUntil, time.Now()),
"with zero rateLimitedUntil, backoff gate must allow retries")
// Simulate a 429: subscribe() sets rateLimitedUntil to now+filterRateLimitBackoff.
apiSub.rateLimitedUntil = time.Now().Add(filterRateLimitBackoff)
require.True(t, shouldHonourRateLimitBackoff(apiSub.rateLimitedUntil, time.Now()),
"after setting rateLimitedUntil = now + filterRateLimitBackoff, gate must suppress retries")
// Simulate the success path clearing the backoff.
apiSub.rateLimitedUntil = time.Time{}
require.False(t, shouldHonourRateLimitBackoff(apiSub.rateLimitedUntil, time.Now()),
"after subscribe success clears rateLimitedUntil, gate must allow retries again")
}

View File

@ -7,7 +7,6 @@ import (
"fmt"
"math"
"net/http"
"strings"
"sync"
"time"
@ -403,7 +402,8 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
return nil, err
}
failedContentTopics := []string{}
var failedPeersMu sync.Mutex
failedPeers := []PeerSubscribeFailure{}
subscriptions := make([]*subscription.SubscriptionDetails, 0)
for pubSubTopic, cTopics := range pubSubTopicMap {
var selectedPeers peer.IDSlice
@ -429,7 +429,17 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
wf.metrics.RecordError(peerNotFoundFailure)
wf.log.Error("selecting peer", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
zap.Error(err))
failedContentTopics = append(failedContentTopics, cTopics...)
selectErr := err
if selectErr == nil {
selectErr = errors.New("no peers selected")
}
failedPeersMu.Lock()
failedPeers = append(failedPeers, PeerSubscribeFailure{
PeerID: "",
ContentTopics: append([]string(nil), cTopics...),
Err: selectErr,
})
failedPeersMu.Unlock()
continue
}
var cFilter protocol.ContentFilter
@ -456,7 +466,13 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
if err != nil {
wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
zap.Error(err))
failedContentTopics = append(failedContentTopics, cTopics...)
failedPeersMu.Lock()
failedPeers = append(failedPeers, PeerSubscribeFailure{
PeerID: ID,
ContentTopics: append([]string(nil), cTopics...),
Err: err,
})
failedPeersMu.Unlock()
} else {
wf.log.Debug("subscription successful", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), zap.Stringer("peer", ID))
tmpSubs[index] = wf.subscriptions.NewSubscription(ID, cFilter)
@ -471,11 +487,10 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
}
}
if len(failedContentTopics) > 0 {
return subscriptions, fmt.Errorf("subscriptions failed for contentTopics: %s", strings.Join(failedContentTopics, ","))
} else {
return subscriptions, nil
if len(failedPeers) > 0 {
return subscriptions, &SubscribeError{FailedPeers: failedPeers}
}
return subscriptions, nil
}
// FilterSubscription is used to obtain an object from which you could receive messages received via filter protocol

View File

@ -0,0 +1,134 @@
package filter
import (
"errors"
"fmt"
"net/http"
"strings"
"github.com/libp2p/go-libp2p/core/peer"
)
// PeerSubscribeFailure captures a single peer's subscribe failure: the peer
// involved, the content topics that were attempted, and the underlying error
// (typically *FilterError, a dial error, or a stream-reset error).
type PeerSubscribeFailure struct {
PeerID peer.ID
ContentTopics []string
Err error
}
// SubscribeError is the typed error returned by WakuFilterLightNode.Subscribe
// when one or more peers fail to accept a subscription. It preserves the
// per-peer error (including typed *FilterError) so callers — notably the
// apiSub layer — can react to specific failure codes such as HTTP 429
// (rate-limit).
//
// The previous implementation flattened all per-peer failures into a single
// plain *errors.errorString via fmt.Errorf+strings.Join. That lost the typed
// peer error AND could panic with "strings: Join output length overflow"
// under sustained failure storms (observed in production). SubscribeError
// fixes both problems: the typed peers are preserved, and Error() is
// bounded so it cannot overflow.
type SubscribeError struct {
FailedPeers []PeerSubscribeFailure
}
// errorMaxBytes bounds the rendered string from Error() so it cannot grow
// unboundedly with FailedPeers and trigger a string-allocation overflow.
const errorMaxBytes = 4096
// Error renders a bounded, log-grep-friendly message. The "subscribe failed"
// prefix is intentional: existing log greps look for "subscriptions failed"
// (the prior wording) and "subscribe failed" both still hit on "failed".
func (s *SubscribeError) Error() string {
if s == nil {
return "<nil SubscribeError>"
}
var b strings.Builder
b.Grow(256)
firstCode := s.firstFilterErrorCode()
if firstCode != 0 {
fmt.Fprintf(&b, "subscribe failed for %d peer(s) (first peer code=%d)",
len(s.FailedPeers), firstCode)
} else {
fmt.Fprintf(&b, "subscribe failed for %d peer(s)", len(s.FailedPeers))
}
if len(s.FailedPeers) == 0 {
return b.String()
}
b.WriteString(": ")
// Render at most a small fixed number of peers; truncate the rest. The
// per-peer rendering uses up to one topic and the err Code/Msg, never
// the full topic list — so total length is hard-bounded.
const maxPeersRendered = 3
rendered := 0
for i, fp := range s.FailedPeers {
if rendered >= maxPeersRendered {
fmt.Fprintf(&b, " (+%d more)", len(s.FailedPeers)-rendered)
break
}
if i > 0 {
b.WriteString("; ")
}
topicPreview := "<none>"
if len(fp.ContentTopics) > 0 {
topicPreview = fp.ContentTopics[0]
if len(fp.ContentTopics) > 1 {
topicPreview = fmt.Sprintf("%s (+%d more)", topicPreview, len(fp.ContentTopics)-1)
}
}
errMsg := "<nil>"
if fp.Err != nil {
errMsg = fp.Err.Error()
}
fmt.Fprintf(&b, "peer=%s topic=%s err=%s", fp.PeerID, topicPreview, errMsg)
rendered++
// Defensive: if a single peer's error string is pathologically large
// (unlikely but possible), stop early.
if b.Len() >= errorMaxBytes {
break
}
}
out := b.String()
if len(out) > errorMaxBytes {
out = out[:errorMaxBytes-3] + "..."
}
return out
}
// HasRateLimitError reports whether at least one peer returned HTTP 429
// (filter request rejected due to rate limit). The apiSub layer uses this
// to apply a longer backoff that respects the rate-limit signal instead of
// hammering the peer further.
func (s *SubscribeError) HasRateLimitError() bool {
if s == nil {
return false
}
var fe *FilterError
for _, fp := range s.FailedPeers {
if fp.Err == nil {
continue
}
if errors.As(fp.Err, &fe) && fe.Code == http.StatusTooManyRequests {
return true
}
}
return false
}
// firstFilterErrorCode returns the Code of the first *FilterError found in
// FailedPeers, or 0 if none. Used by Error() for a quick at-a-glance signal
// in logs (typically the most useful single number when triaging).
func (s *SubscribeError) firstFilterErrorCode() int {
var fe *FilterError
for _, fp := range s.FailedPeers {
if fp.Err == nil {
continue
}
if errors.As(fp.Err, &fe) {
return fe.Code
}
}
return 0
}

View File

@ -0,0 +1,143 @@
package filter
import (
"errors"
"fmt"
"net/http"
"strings"
"testing"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
)
// TestSubscribeError_Error covers two contracts of the typed aggregate error
// returned by WakuFilterLightNode.Subscribe under partial failure:
//
// 1. The string preserves the "subscribe failed" prefix so existing log
// greps and dashboards keep working.
// 2. The string length is bounded regardless of FailedPeers count. The
// previous implementation aggregated all content topics with
// strings.Join, which is unbounded and was observed to panic with
// `strings: Join output length overflow` under sustained subscribe
// storms — crashing statusgo. The bounded form prevents that crash.
func TestSubscribeError_Error(t *testing.T) {
t.Run("empty FailedPeers — well-formed string", func(t *testing.T) {
e := &SubscribeError{}
s := e.Error()
require.NotEmpty(t, s)
require.Contains(t, s, "subscribe failed")
})
t.Run("single peer single topic", func(t *testing.T) {
e := &SubscribeError{
FailedPeers: []PeerSubscribeFailure{
{
PeerID: peer.ID("peer-1"),
ContentTopics: []string{"/waku/1/0xabc/rfc26"},
Err: &FilterError{Code: http.StatusTooManyRequests, Message: "rate limited"},
},
},
}
s := e.Error()
require.Contains(t, s, "subscribe failed")
require.Contains(t, s, "1")
})
t.Run("bounded length under massive FailedPeers (would-panic-with-Join scenario)", func(t *testing.T) {
// 10,000 failed peers × 50 long topics each = the kind of unbounded
// growth that produced the `strings: Join output length overflow`
// panic in production. Error() must stay bounded.
const peerCount = 10000
const topicsPerPeer = 50
failedPeers := make([]PeerSubscribeFailure, peerCount)
longTopic := "/" + strings.Repeat("a", 200) + "/topic"
for i := range failedPeers {
topics := make([]string, topicsPerPeer)
for j := range topics {
topics[j] = fmt.Sprintf("%s/%d-%d", longTopic, i, j)
}
failedPeers[i] = PeerSubscribeFailure{
PeerID: peer.ID(fmt.Sprintf("peer-%d", i)),
ContentTopics: topics,
Err: &FilterError{Code: 503, Message: "service unavailable"},
}
}
e := &SubscribeError{FailedPeers: failedPeers}
s := e.Error()
// 4 KB cap is the contract — generous enough for useful info, far
// below any plausible strings.Join overflow point.
require.LessOrEqual(t, len(s), 4096,
"SubscribeError.Error() must be bounded; got %d bytes (would risk strings.Join overflow)", len(s))
require.Contains(t, s, "subscribe failed")
})
}
// TestSubscribeError_HasRateLimitError verifies the predicate that the
// apiSub layer uses to decide whether to enter a longer rate-limit backoff.
// 429 must be detectable both as a direct *FilterError and through a wrapping
// fmt.Errorf("%w") chain — production code occasionally adds context wrappers.
func TestSubscribeError_HasRateLimitError(t *testing.T) {
t.Run("nil SubscribeError pointer — false (safe predicate)", func(t *testing.T) {
var e *SubscribeError
require.False(t, e.HasRateLimitError())
})
t.Run("empty FailedPeers — false", func(t *testing.T) {
e := &SubscribeError{}
require.False(t, e.HasRateLimitError())
})
t.Run("only dial / generic errors — false", func(t *testing.T) {
e := &SubscribeError{
FailedPeers: []PeerSubscribeFailure{
{Err: errors.New("dial backoff")},
{Err: errors.New("stream reset")},
{Err: &FilterError{Code: 503, Message: "service unavailable"}},
},
}
require.False(t, e.HasRateLimitError())
})
t.Run("single direct 429 — true", func(t *testing.T) {
e := &SubscribeError{
FailedPeers: []PeerSubscribeFailure{
{Err: &FilterError{Code: http.StatusTooManyRequests, Message: "rate limited"}},
},
}
require.True(t, e.HasRateLimitError())
})
t.Run("multiple peers, one 429 — true", func(t *testing.T) {
e := &SubscribeError{
FailedPeers: []PeerSubscribeFailure{
{Err: errors.New("stream reset")},
{Err: &FilterError{Code: 503}},
{Err: &FilterError{Code: http.StatusTooManyRequests, Message: "rate limited"}},
{Err: errors.New("dial backoff")},
},
}
require.True(t, e.HasRateLimitError())
})
t.Run("429 wrapped via fmt.Errorf %w — true", func(t *testing.T) {
wrapped := fmt.Errorf("subscribe attempt failed: %w", &FilterError{Code: http.StatusTooManyRequests, Message: "rate limited"})
e := &SubscribeError{
FailedPeers: []PeerSubscribeFailure{
{Err: wrapped},
},
}
require.True(t, e.HasRateLimitError())
})
t.Run("nil Err entries are skipped safely", func(t *testing.T) {
e := &SubscribeError{
FailedPeers: []PeerSubscribeFailure{
{Err: nil},
{Err: &FilterError{Code: http.StatusTooManyRequests, Message: "rate limited"}},
{Err: nil},
},
}
require.True(t, e.HasRateLimitError())
})
}