From 41c0e56860006900b64c88ebb1a9b4ea44b4600d Mon Sep 17 00:00:00 2001 From: Fabiana Cecin Date: Fri, 27 Mar 2026 19:57:28 -0300 Subject: [PATCH] initial impl of peer grief (deprioritization) for PeerStore --- tests/test_peer_store_extended.nim | 288 ++++++++++++++++++++- tests/waku_peer_exchange/test_protocol.nim | 1 + waku/node/peer_manager/peer_manager.nim | 51 +++- waku/node/peer_manager/waku_peer_store.nim | 84 ++++++ waku/waku_core/peers.nim | 6 + waku/waku_filter_v2/client.nim | 9 +- waku/waku_lightpush/client.nim | 8 +- waku/waku_peer_exchange/client.nim | 3 + waku/waku_store/client.nim | 7 +- 9 files changed, 446 insertions(+), 11 deletions(-) diff --git a/tests/test_peer_store_extended.nim b/tests/test_peer_store_extended.nim index 16926c7c2..6fedef768 100644 --- a/tests/test_peer_store_extended.nim +++ b/tests/test_peer_store_extended.nim @@ -1,7 +1,7 @@ {.used.} import - std/[sequtils, times], + std/[sequtils, times, random], chronos, libp2p/crypto/crypto, libp2p/peerid, @@ -353,3 +353,289 @@ suite "Extended nim-libp2p Peer Store": peerStore[DisconnectBook][p1] == 0 peerStore[SourceBook][p1] == default(PeerOrigin) peerStore[DirectionBook][p1] == default(PeerDirection) + peerStore[GriefBook][p1] == default(GriefData) + + suite "Extended nim-libp2p Peer Store: grief scores": + # These tests mock the clock and work better as a separate suite + var peerStore: PeerStore + var p1, p2, p3: PeerId + + setup: + peerStore = PeerStore.new(nil, capacity = 50) + require p1.init(basePeerId & "1") + require p2.init(basePeerId & "2") + require p3.init(basePeerId & "3") + + # Shorthand: one cooldown interval + let interval = GriefCooldownInterval + + test "new peer has grief score 0": + check peerStore.getGriefScore(p1) == 0 + + test "griefPeer increases score": + let t0 = Moment.init(1000, Minute) + + peerStore.griefPeer(p1, 5, t0) + check peerStore.getGriefScore(p1, t0) == 5 + + test "griefPeer accumulates": + let t0 = Moment.init(1000, Minute) + + peerStore.griefPeer(p1, 3, t0) + peerStore.griefPeer(p1, 2, t0) + check peerStore.getGriefScore(p1, t0) == 5 + + test "grief cools down by 1 point per interval": + let t0 = Moment.init(1000, Minute) + + peerStore.griefPeer(p1, 5, t0) + + check peerStore.getGriefScore(p1, t0) == 5 + check peerStore.getGriefScore(p1, t0 + interval * 1) == 4 + check peerStore.getGriefScore(p1, t0 + interval * 2) == 3 + check peerStore.getGriefScore(p1, t0 + interval * 3) == 2 + check peerStore.getGriefScore(p1, t0 + interval * 4) == 1 + check peerStore.getGriefScore(p1, t0 + interval * 5) == 0 + + test "grief floors at 0": + let t0 = Moment.init(1000, Minute) + + peerStore.griefPeer(p1, 3, t0) + + # Well past full cooldown, should be 0 + check peerStore.getGriefScore(p1, t0 + interval * 10) == 0 + + test "cooldown preserves remainder": + let t0 = Moment.init(1000, Minute) + # Half an interval past 2 full intervals + let tHalf = t0 + interval * 2 + interval div 2 + # Complete the 3rd interval + let t3 = t0 + interval * 3 + + peerStore.griefPeer(p1, 5, t0) + + # After 2.5 intervals, score should be 3 + check peerStore.getGriefScore(p1, tHalf) == 3 + + # After completing the 3rd interval, score should be 2 + check peerStore.getGriefScore(p1, t3) == 2 + + test "grief after full cooldown restarts cooldown time": + let t0 = Moment.init(1000, Minute) + + peerStore.griefPeer(p1, 2, t0) + + # Fully cool down + check peerStore.getGriefScore(p1, t0 + interval * 5) == 0 + + # Grief again + let t1 = t0 + interval * 5 + peerStore.griefPeer(p1, 3, t1) + check peerStore.getGriefScore(p1, t1) == 3 + + # 1 interval after second grief + check peerStore.getGriefScore(p1, t1 + interval) == 2 + + test "independent grief scores per peer": + let t0 = Moment.init(1000, Minute) + + peerStore.griefPeer(p1, 10, t0) + peerStore.griefPeer(p2, 3, t0) + + check peerStore.getGriefScore(p1, t0 + interval * 2) == 8 + check peerStore.getGriefScore(p2, t0 + interval * 2) == 1 + check peerStore.getGriefScore(p3, t0 + interval * 2) == 0 + + test "grief with default amount is 1": + let t0 = Moment.init(1000, Minute) + + peerStore.griefPeer(p1, now = t0) + check peerStore.getGriefScore(p1, t0) == 1 + + test "griefPeer with zero or negative amount is ignored": + let t0 = Moment.init(1000, Minute) + + peerStore.griefPeer(p1, 5, t0) + peerStore.griefPeer(p1, 0, t0) + peerStore.griefPeer(p1, -3, t0) + check peerStore.getGriefScore(p1, t0) == 5 + + test "grief added during partial cooldown does not reset cooldown time": + let t0 = Moment.init(1000, Minute) + let tHalf = t0 + interval * 2 + interval div 2 + let t3 = t0 + interval * 3 + let t4 = t0 + interval * 4 + + peerStore.griefPeer(p1, 5, t0) + + # At 2.5 intervals: 2 consumed, score 3, half-interval remainder + check peerStore.getGriefScore(p1, tHalf) == 3 + + # Add more grief — cooldown time should NOT reset, remainder preserved + peerStore.griefPeer(p1, 4, tHalf) + check peerStore.getGriefScore(p1, tHalf) == 7 + + # Remainder completes another interval + check peerStore.getGriefScore(p1, t3) == 6 + + # And one more full interval + check peerStore.getGriefScore(p1, t4) == 5 + + test "multiple reads without time change are idempotent": + let t0 = Moment.init(1000, Minute) + + peerStore.griefPeer(p1, 10, t0) + + check peerStore.getGriefScore(p1, t0 + interval * 3) == 7 + check peerStore.getGriefScore(p1, t0 + interval * 3) == 7 + check peerStore.getGriefScore(p1, t0 + interval * 3) == 7 + + test "interleaved grief and cooldown across multiple peers": + let t0 = Moment.init(1000, Minute) + + # Stagger grief: p1 at t0, p2 at t0+1interval, p3 at t0+2interval + peerStore.griefPeer(p1, 6, t0) + peerStore.griefPeer(p2, 4, t0 + interval) + peerStore.griefPeer(p3, 2, t0 + interval * 2) + + # At t0+3*interval: p1 lost 3, p2 lost 2, p3 lost 1 + check peerStore.getGriefScore(p1, t0 + interval * 3) == 3 + check peerStore.getGriefScore(p2, t0 + interval * 3) == 2 + check peerStore.getGriefScore(p3, t0 + interval * 3) == 1 + + # Grief p2 again at t0+3I + peerStore.griefPeer(p2, 10, t0 + interval * 3) + + # At t0+5*interval: p1 lost 5 total, p2 lost 2 more since re-grief, p3 floored at 0 + check peerStore.getGriefScore(p1, t0 + interval * 5) == 1 + check peerStore.getGriefScore(p2, t0 + interval * 5) == 10 + check peerStore.getGriefScore(p3, t0 + interval * 5) == 0 + + suite "Extended nim-libp2p Peer Store: grief-based peer selection": + # Tests for sortByGriefScore via selectPeers + const testProto = "/test/grief/1.0.0" + + proc makePeer(port: int): RemotePeerInfo = + let key = generateSecp256k1Key() + RemotePeerInfo.init( + peerId = PeerId.init(key.getPublicKey().tryGet()).tryGet(), + addrs = @[MultiAddress.init("/ip4/127.0.0.1/tcp/" & $port).tryGet()], + protocols = @[testProto], + ) + + test "all peers at grief 0 returns all peers (shuffled)": + let switch = newTestSwitch() + let pm = PeerManager.new(switch) + let peerStore = switch.peerStore + let peers = (1..5).mapIt(makePeer(it + 10000)) + for p in peers: + peerStore.addPeer(p) + + let selected = pm.selectPeers(testProto) + check selected.len == 5 + + test "lower grief peers come before higher grief peers": + let switch = newTestSwitch() + let pm = PeerManager.new(switch) + let peerStore = switch.peerStore + let pA = makePeer(20001) + let pB = makePeer(20002) + let pC = makePeer(20003) + peerStore.addPeer(pA) + peerStore.addPeer(pB) + peerStore.addPeer(pC) + + # pA: grief 0 (bucket 0), pB: grief 5 (bucket 1), pC: grief 15 (bucket 3) + peerStore.griefPeer(pB.peerId, 5) + peerStore.griefPeer(pC.peerId, 15) + + # Run multiple times to account for shuffle within buckets + for i in 0 ..< 20: + let selected = pm.selectPeers(testProto) + check selected.len == 3 + # pA (bucket 0) must always be first + check selected[0].peerId == pA.peerId + # pB (bucket 1) must always come before pC (bucket 3) + check selected[1].peerId == pB.peerId + check selected[2].peerId == pC.peerId + + test "peers within same bucket are interchangeable": + let switch = newTestSwitch() + let pm = PeerManager.new(switch) + let peerStore = switch.peerStore + let pA = makePeer(30001) + let pB = makePeer(30002) + peerStore.addPeer(pA) + peerStore.addPeer(pB) + + # Both within bucket 0 (scores 1 and 4, both div 5 == 0) + peerStore.griefPeer(pA.peerId, 1) + peerStore.griefPeer(pB.peerId, 4) + + var sawAFirst = false + var sawBFirst = false + for i in 0 ..< 50: + let selected = pm.selectPeers(testProto) + check selected.len == 2 + if selected[0].peerId == pA.peerId: + sawAFirst = true + else: + sawBFirst = true + + # Both orderings should appear since they're in the same bucket + check sawAFirst + check sawBFirst + + test "peers in different buckets never swap order": + let switch = newTestSwitch() + let pm = PeerManager.new(switch) + let peerStore = switch.peerStore + let pLow = makePeer(40001) + let pHigh = makePeer(40002) + peerStore.addPeer(pLow) + peerStore.addPeer(pHigh) + + # pLow in bucket 0 (score 1), pHigh in bucket 1 (score 5) + peerStore.griefPeer(pLow.peerId, 1) + peerStore.griefPeer(pHigh.peerId, 5) + + for i in 0 ..< 30: + let selected = pm.selectPeers(testProto) + check selected.len == 2 + check selected[0].peerId == pLow.peerId + check selected[1].peerId == pHigh.peerId + + test "zero-grief peers always come before grieved peers": + let switch = newTestSwitch() + let pm = PeerManager.new(switch) + let peerStore = switch.peerStore + let pClean1 = makePeer(50001) + let pClean2 = makePeer(50002) + let pGrieved = makePeer(50003) + peerStore.addPeer(pClean1) + peerStore.addPeer(pClean2) + peerStore.addPeer(pGrieved) + + peerStore.griefPeer(pGrieved.peerId, 6) + + for i in 0 ..< 20: + let selected = pm.selectPeers(testProto) + check selected.len == 3 + # Grieved peer (bucket 1) must be last; clean peers (bucket 0) first + check selected[2].peerId == pGrieved.peerId + + test "peers beyond MaxGriefBucket are excluded from selection": + let switch = newTestSwitch() + let pm = PeerManager.new(switch) + let peerStore = switch.peerStore + let pGood = makePeer(60001) + let pBad = makePeer(60002) + peerStore.addPeer(pGood) + peerStore.addPeer(pBad) + + # pBad in bucket 4 (score 20, 20 div 5 = 4 > MaxGriefBucket) + peerStore.griefPeer(pBad.peerId, 20) + + let selected = pm.selectPeers(testProto) + check selected.len == 1 + check selected[0].peerId == pGood.peerId diff --git a/tests/waku_peer_exchange/test_protocol.nim b/tests/waku_peer_exchange/test_protocol.nim index 74cdba110..94c4143e0 100644 --- a/tests/waku_peer_exchange/test_protocol.nim +++ b/tests/waku_peer_exchange/test_protocol.nim @@ -203,6 +203,7 @@ suite "Waku Peer Exchange": # Start and mount peer exchange await allFutures([node1.start(), node2.start()]) await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()]) + await allFutures([node1.mountPeerExchangeClient(), node2.mountPeerExchangeClient()]) # Create connection let connOpt = await node2.peerManager.dialPeer( diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index e3eb8d75b..a994f652c 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -215,27 +215,64 @@ proc loadFromStorage(pm: PeerManager) {.gcsafe.} = trace "recovered peers from storage", amount = amount +proc griefPeer*(pm: PeerManager, peerId: PeerId, amount: int = 1) = + if not pm.isNil: + pm.switch.peerStore.griefPeer(peerId, amount) + +proc sortByGriefScore(pm: PeerManager, peers: var seq[RemotePeerInfo]) = + ## Sorts peers by grief score ascending, with random shuffling within each + ## score tier. Peers with lower grief are preferred. + ## NOTE: shuffling defaultPeerStoreCapacity (750 currently) on demand is + ## negligible, but if that increases, might be worth exploring different + ## data structures. + let peerStore = pm.switch.peerStore + + # Resolve grief scores for all peers + var anyGrief = false + var scored: seq[(int, RemotePeerInfo)] + for p in peers: + let score = peerStore.getGriefScore(p.peerId) + if score > 0: + anyGrief = true + scored.add((score, p)) + + # Fast path: if all peers are at 0, just shuffle + if not anyGrief: + shuffle(peers) + return + + # Shuffle first so that within-bucket order is random + shuffle(scored) + + # Stable sort by bucket preserves the random order within each bucket + scored.sort( + proc(a, b: (int, RemotePeerInfo)): int = + cmp(a[0] div GriefBucketSize, b[0] div GriefBucketSize), + order = SortOrder.Ascending) + + # Drop peers beyond the max grief bucket + peers = scored.filterIt(it[0] div GriefBucketSize <= MaxGriefBucket).mapIt(it[1]) + proc selectPeers*( pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic) ): seq[RemotePeerInfo] = ## Returns all peers that support the given protocol (and optionally shard), - ## shuffled randomly. Callers can further filter or pick from this list. - var peers = pm.switch.peerStore.getPeersByProtocol(proto) + ## sorted by grief score ascending (shuffled within each score tier). + result = pm.switch.peerStore.getPeersByProtocol(proto) trace "Selecting peers from peerstore", - protocol = proto, num_peers = peers.len, address = cast[uint](pm.switch.peerStore) + protocol = proto, num_peers = result.len, address = cast[uint](pm.switch.peerStore) if shard.isSome(): let shardInfo = RelayShard.parse(shard.get()).valueOr: trace "Failed to parse shard from pubsub topic", topic = shard.get() return @[] - peers.keepItIf( + result.keepItIf( (it.enr.isSome() and it.enr.get().containsShard(shard.get())) or (it.shards.len > 0 and it.shards.contains(shardInfo.shardId)) ) - shuffle(peers) - return peers + pm.sortByGriefScore(result) proc selectPeer*( pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic) @@ -640,7 +677,7 @@ proc connectToRelayPeers*(pm: PeerManager) {.async.} = var outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId)) - shuffle(outsideBackoffPeers) + pm.sortByGriefScore(outsideBackoffPeers) var index = 0 var numPendingConnReqs = diff --git a/waku/node/peer_manager/waku_peer_store.nim b/waku/node/peer_manager/waku_peer_store.nim index 93ac9ad2e..2bc2ec9e7 100644 --- a/waku/node/peer_manager/waku_peer_store.nim +++ b/waku/node/peer_manager/waku_peer_store.nim @@ -43,6 +43,10 @@ type # Keeps track of peer shards ShardBook* = ref object of PeerBook[seq[uint16]] + # Keeps track of peer grief: (score, lastCooldownTime) + GriefData* = tuple[score: int, cooldownTime: Moment] + GriefBook* = ref object of PeerBook[GriefData] + proc getPeer*(peerStore: PeerStore, peerId: PeerId): RemotePeerInfo = let addresses = if peerStore[LastSeenBook][peerId].isSome(): @@ -69,6 +73,8 @@ proc getPeer*(peerStore: PeerStore, peerId: PeerId): RemotePeerInfo = direction: peerStore[DirectionBook][peerId], lastFailedConn: peerStore[LastFailedConnBook][peerId], numberFailedConn: peerStore[NumberFailedConnBook][peerId], + griefScore: peerStore[GriefBook][peerId].score, + griefCooldownTime: peerStore[GriefBook][peerId].cooldownTime, mixPubKey: if peerStore[MixPubKeyBook][peerId] != default(Curve25519Key): some(peerStore[MixPubKeyBook][peerId]) @@ -143,6 +149,9 @@ proc addPeer*(peerStore: PeerStore, peer: RemotePeerInfo, origin = UnknownOrigin peerStore[LastFailedConnBook].book.hasKeyOrPut(peer.peerId, peer.lastFailedConn) discard peerStore[NumberFailedConnBook].book.hasKeyOrPut(peer.peerId, peer.numberFailedConn) + discard peerStore[GriefBook].book.hasKeyOrPut( + peer.peerId, (score: peer.griefScore, cooldownTime: peer.griefCooldownTime) + ) if peer.enr.isSome(): peerStore[ENRBook][peer.peerId] = peer.enr.get() @@ -241,3 +250,78 @@ template forEnrPeers*( let peerOrigin {.inject.} = sourceBook.book.getOrDefault(pid, UnknownOrigin) let peerEnrRecord {.inject.} = enrRecord body + +#~~~~~~~~~~~~~~~~~~# +# Grief Management # +#~~~~~~~~~~~~~~~~~~# + +const + # Each grief point represents this much cooldown time. + # A peer with grief score N will fully cool down after N * GriefCooldownInterval. + GriefCooldownInterval* = chronos.minutes(1) + GriefBucketSize* = 5 ## peers within this many points sort equally + MaxGriefBucket* = 3 ## peers in bucket > this are excluded from selection + + MinGriefScore* = 1 ## stream errors, timeouts + LowGriefScore* = 2 ## non-success response codes + MediumGriefScore* = 3 ## decode failures, protocol violations + HighGriefScore* = 5 ## requestId mismatch, active misbehavior + +const defaultGriefData: GriefData = (score: 0, cooldownTime: Moment.init(0, Second)) + +proc resolveGriefScore( + peerStore: PeerStore, peerId: PeerId, + now: Moment = Moment.init(0, Second) +): int = + ## Lazily resolves the grief score for a peer by applying cooldown based on + ## elapsed time. Updates the stored score and cooldown time in place. + ## Returns the resolved (current) grief score. + ## Pass now for testing; default (0) uses the system clock. + var data = peerStore[GriefBook].book.getOrDefault(peerId, defaultGriefData) + if data.score <= 0: + return 0 + + let clock = if now == Moment.init(0, Second): Moment.now() else: now + let elapsed = clock - data.cooldownTime + let cooldowns = int(elapsed.minutes div GriefCooldownInterval.minutes) + + if cooldowns > 0: + data.score = max(data.score - cooldowns, 0) + # Advance by exactly the consumed cooldown time, preserving remainder + data.cooldownTime = + data.cooldownTime + chronos.minutes(int64(cooldowns) * GriefCooldownInterval.minutes) + peerStore[GriefBook][peerId] = data + + return data.score + +proc griefPeer*( + peerStore: PeerStore, peerId: PeerId, amount: int = 1, + now: Moment = Moment.init(0, Second) +) = + ## Increases the grief score of a peer by the given amount. + ## If this is the first grief for a peer at score 0, initializes the cooldown time. + ## Pass now for testing; default (0) uses the system clock. + if amount <= 0: + return + + let clock = if now == Moment.init(0, Second): Moment.now() else: now + + # resolveGriefScore already wrote back the cooled-down data, so read it once + let currentScore = peerStore.resolveGriefScore(peerId, clock) + var data = peerStore[GriefBook].book.getOrDefault(peerId, defaultGriefData) + + data.score += amount + + # If peer was at 0 (no prior cooldown running), start cooldown from now + if currentScore == 0: + data.cooldownTime = clock + + peerStore[GriefBook][peerId] = data + +proc getGriefScore*( + peerStore: PeerStore, peerId: PeerId, + now: Moment = Moment.init(0, Second) +): int = + ## Returns the current grief score of a peer after applying cooldown. + ## Pass now for testing; default (0) uses the system clock. + return peerStore.resolveGriefScore(peerId, now) diff --git a/waku/waku_core/peers.nim b/waku/waku_core/peers.nim index c4b8b593e..9a97ee832 100644 --- a/waku/waku_core/peers.nim +++ b/waku/waku_core/peers.nim @@ -62,6 +62,8 @@ type RemotePeerInfo* = ref object direction*: PeerDirection lastFailedConn*: Moment numberFailedConn*: int + griefScore*: int + griefCooldownTime*: Moment func `$`*(remotePeerInfo: RemotePeerInfo): string = $remotePeerInfo.peerId @@ -87,6 +89,8 @@ proc init*( direction: PeerDirection = UnknownDirection, lastFailedConn: Moment = Moment.init(0, Second), numberFailedConn: int = 0, + griefScore: int = 0, + griefCooldownTime: Moment = Moment.init(0, Second), mixPubKey: Option[Curve25519Key] = none(Curve25519Key), ): T = RemotePeerInfo( @@ -104,6 +108,8 @@ proc init*( direction: direction, lastFailedConn: lastFailedConn, numberFailedConn: numberFailedConn, + griefScore: griefScore, + griefCooldownTime: griefCooldownTime, mixPubKey: mixPubKey, ) diff --git a/waku/waku_filter_v2/client.nim b/waku/waku_filter_v2/client.nim index 265bf5e7b..559dc48dd 100644 --- a/waku/waku_filter_v2/client.nim +++ b/waku/waku_filter_v2/client.nim @@ -11,7 +11,8 @@ import stew/byteutils import waku/ - [node/peer_manager, waku_core, events/delivery_events, common/broker/broker_context], + [node/peer_manager, waku_core, events/delivery_events, + common/broker/broker_context], ./common, ./protocol_metrics, ./rpc_codec, @@ -64,6 +65,7 @@ proc sendSubscribeRequest( "exception in waku_filter_v2 client writeLP: " & getCurrentExceptionMsg() trace "exception in waku_filter_v2 client writeLP", error = getCurrentExceptionMsg() waku_filter_errors.inc(labelValues = [errMsg]) + wfc.peerManager.griefPeer(servicePeer.peerId, MinGriefScore) # stream error: transient return err(FilterSubscribeError.badResponse(errMsg)) var respBuf: seq[byte] @@ -74,11 +76,13 @@ proc sendSubscribeRequest( "exception in waku_filter_v2 client readLp: " & getCurrentExceptionMsg() trace "exception in waku_filter_v2 client readLp", error = getCurrentExceptionMsg() waku_filter_errors.inc(labelValues = [errMsg]) + wfc.peerManager.griefPeer(servicePeer.peerId, MinGriefScore) # stream error: transient return err(FilterSubscribeError.badResponse(errMsg)) let response = FilterSubscribeResponse.decode(respBuf).valueOr: trace "Failed to decode filter subscribe response", servicePeer waku_filter_errors.inc(labelValues = [decodeRpcFailure]) + wfc.peerManager.griefPeer(servicePeer.peerId, MediumGriefScore) # decode failure: protocol violation return err(FilterSubscribeError.badResponse(decodeRpcFailure)) # DOS protection rate limit checks does not know about request id @@ -86,11 +90,13 @@ proc sendSubscribeRequest( response.requestId != filterSubscribeRequest.requestId: trace "Filter subscribe response requestId mismatch", servicePeer, response waku_filter_errors.inc(labelValues = [requestIdMismatch]) + wfc.peerManager.griefPeer(servicePeer.peerId, HighGriefScore) # requestId mismatch: misbehavior return err(FilterSubscribeError.badResponse(requestIdMismatch)) if response.statusCode != 200: trace "Filter subscribe error response", servicePeer, response waku_filter_errors.inc(labelValues = [errorResponse]) + wfc.peerManager.griefPeer(servicePeer.peerId, LowGriefScore) # non-success response: rejection let cause = if response.statusDesc.isSome(): response.statusDesc.get() @@ -188,6 +194,7 @@ proc initProtocolHandler(wfc: WakuFilterClient) = let msgPush = MessagePush.decode(buf).valueOr: error "Failed to decode message push", peerId = conn.peerId, error = $error waku_filter_errors.inc(labelValues = [decodeRpcFailure]) + wfc.peerManager.griefPeer(conn.peerId, MediumGriefScore) # decode failure: protocol violation return let msg_hash = diff --git a/waku/waku_lightpush/client.nim b/waku/waku_lightpush/client.nim index fd12c49d2..b8aab34cd 100644 --- a/waku/waku_lightpush/client.nim +++ b/waku/waku_lightpush/client.nim @@ -59,6 +59,7 @@ proc sendPushRequest( buffer = await connection.readLp(DefaultMaxRpcSize.int) except LPStreamRemoteClosedError: error "Failed to read response from peer", error = getCurrentExceptionMsg() + wl.peerManager.griefPeer(connection.peerId, MinGriefScore) # stream closed: transient return lightpushResultInternalError( "Failed to read response from peer: " & getCurrentExceptionMsg() ) @@ -66,15 +67,20 @@ proc sendPushRequest( let response = LightpushResponse.decode(buffer).valueOr: error "failed to decode response" waku_lightpush_v3_errors.inc(labelValues = [decodeRpcFailure]) + wl.peerManager.griefPeer(connection.peerId, MediumGriefScore) # decode failure: protocol violation return lightpushResultInternalError(decodeRpcFailure) if response.requestId != req.requestId and response.statusCode != LightPushErrorCode.TOO_MANY_REQUESTS: error "response failure, requestId mismatch", requestId = req.requestId, responseRequestId = response.requestId + wl.peerManager.griefPeer(connection.peerId, HighGriefScore) # requestId mismatch: misbehavior return lightpushResultInternalError("response failure, requestId mismatch") - return toPushResult(response) + let pushResult = toPushResult(response) + if pushResult.isErr(): + wl.peerManager.griefPeer(connection.peerId, LowGriefScore) # non-success response: rejection + return pushResult proc publish*( wl: WakuLightPushClient, diff --git a/waku/waku_peer_exchange/client.nim b/waku/waku_peer_exchange/client.nim index 15426c464..b9f81afe8 100644 --- a/waku/waku_peer_exchange/client.nim +++ b/waku/waku_peer_exchange/client.nim @@ -32,6 +32,7 @@ proc request*( except CatchableError as exc: error "exception when handling peer exchange request", error = exc.msg waku_px_client_errors.inc(labelValues = ["error_sending_or_receiving_px_req"]) + wpx.peerManager.griefPeer(conn.peerId, MinGriefScore) # stream error: transient callResult = ( status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE, status_desc: some($exc.msg), @@ -46,6 +47,7 @@ proc request*( let decoded = PeerExchangeRpc.decode(buffer).valueOr: error "peer exchange request error decoding buffer", error = $error + wpx.peerManager.griefPeer(conn.peerId, MediumGriefScore) # decode failure: protocol violation return err( ( status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE, @@ -54,6 +56,7 @@ proc request*( ) if decoded.response.status_code != PeerExchangeResponseStatusCode.SUCCESS: error "peer exchange request error", status_code = decoded.response.status_code + wpx.peerManager.griefPeer(conn.peerId, LowGriefScore) # non-success response: rejection return err( ( status_code: decoded.response.status_code, diff --git a/waku/waku_store/client.nim b/waku/waku_store/client.nim index 5b261af47..16e8c6e41 100644 --- a/waku/waku_store/client.nim +++ b/waku/waku_store/client.nim @@ -8,7 +8,8 @@ import metrics, bearssl/rand import - ../node/peer_manager, ../utils/requests, ./protocol_metrics, ./common, ./rpc_codec + ../node/peer_manager, ../utils/requests, + ./protocol_metrics, ./common, ./rpc_codec logScope: topics = "waku store client" @@ -42,20 +43,24 @@ proc sendStoreRequest( let writeRes = catch: await connection.writeLP(req.encode().buffer) if writeRes.isErr(): + self.peerManager.griefPeer(connection.peerId, MinGriefScore) # stream error: transient return err(StoreError(kind: ErrorCode.BAD_REQUEST, cause: writeRes.error.msg)) let readRes = catch: await connection.readLp(DefaultMaxRpcSize.int) let buf = readRes.valueOr: + self.peerManager.griefPeer(connection.peerId, MinGriefScore) # stream error: transient return err(StoreError(kind: ErrorCode.BAD_RESPONSE, cause: error.msg)) let res = StoreQueryResponse.decode(buf).valueOr: waku_store_errors.inc(labelValues = [DecodeRpcFailure]) + self.peerManager.griefPeer(connection.peerId, MediumGriefScore) # decode failure: protocol violation return err(StoreError(kind: ErrorCode.BAD_RESPONSE, cause: DecodeRpcFailure)) if res.statusCode != uint32(StatusCode.SUCCESS): waku_store_errors.inc(labelValues = [NoSuccessStatusCode]) + self.peerManager.griefPeer(connection.peerId, LowGriefScore) # non-success response: rejection return err(StoreError.new(res.statusCode, res.statusDesc)) if req.pubsubTopic.isSome():