mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-04-14 04:03:20 +00:00
Merge 41c0e56860006900b64c88ebb1a9b4ea44b4600d into f5762af4c4834972d98cdda961b796c7e30613bf
This commit is contained in:
commit
3a02e777b3
@ -1,7 +1,7 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[sequtils, times],
|
||||
std/[sequtils, times, random],
|
||||
chronos,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/peerid,
|
||||
@ -353,3 +353,289 @@ suite "Extended nim-libp2p Peer Store":
|
||||
peerStore[DisconnectBook][p1] == 0
|
||||
peerStore[SourceBook][p1] == default(PeerOrigin)
|
||||
peerStore[DirectionBook][p1] == default(PeerDirection)
|
||||
peerStore[GriefBook][p1] == default(GriefData)
|
||||
|
||||
suite "Extended nim-libp2p Peer Store: grief scores":
|
||||
# These tests mock the clock and work better as a separate suite
|
||||
var peerStore: PeerStore
|
||||
var p1, p2, p3: PeerId
|
||||
|
||||
setup:
|
||||
peerStore = PeerStore.new(nil, capacity = 50)
|
||||
require p1.init(basePeerId & "1")
|
||||
require p2.init(basePeerId & "2")
|
||||
require p3.init(basePeerId & "3")
|
||||
|
||||
# Shorthand: one cooldown interval
|
||||
let interval = GriefCooldownInterval
|
||||
|
||||
test "new peer has grief score 0":
|
||||
check peerStore.getGriefScore(p1) == 0
|
||||
|
||||
test "griefPeer increases score":
|
||||
let t0 = Moment.init(1000, Minute)
|
||||
|
||||
peerStore.griefPeer(p1, 5, t0)
|
||||
check peerStore.getGriefScore(p1, t0) == 5
|
||||
|
||||
test "griefPeer accumulates":
|
||||
let t0 = Moment.init(1000, Minute)
|
||||
|
||||
peerStore.griefPeer(p1, 3, t0)
|
||||
peerStore.griefPeer(p1, 2, t0)
|
||||
check peerStore.getGriefScore(p1, t0) == 5
|
||||
|
||||
test "grief cools down by 1 point per interval":
|
||||
let t0 = Moment.init(1000, Minute)
|
||||
|
||||
peerStore.griefPeer(p1, 5, t0)
|
||||
|
||||
check peerStore.getGriefScore(p1, t0) == 5
|
||||
check peerStore.getGriefScore(p1, t0 + interval * 1) == 4
|
||||
check peerStore.getGriefScore(p1, t0 + interval * 2) == 3
|
||||
check peerStore.getGriefScore(p1, t0 + interval * 3) == 2
|
||||
check peerStore.getGriefScore(p1, t0 + interval * 4) == 1
|
||||
check peerStore.getGriefScore(p1, t0 + interval * 5) == 0
|
||||
|
||||
test "grief floors at 0":
|
||||
let t0 = Moment.init(1000, Minute)
|
||||
|
||||
peerStore.griefPeer(p1, 3, t0)
|
||||
|
||||
# Well past full cooldown, should be 0
|
||||
check peerStore.getGriefScore(p1, t0 + interval * 10) == 0
|
||||
|
||||
test "cooldown preserves remainder":
|
||||
let t0 = Moment.init(1000, Minute)
|
||||
# Half an interval past 2 full intervals
|
||||
let tHalf = t0 + interval * 2 + interval div 2
|
||||
# Complete the 3rd interval
|
||||
let t3 = t0 + interval * 3
|
||||
|
||||
peerStore.griefPeer(p1, 5, t0)
|
||||
|
||||
# After 2.5 intervals, score should be 3
|
||||
check peerStore.getGriefScore(p1, tHalf) == 3
|
||||
|
||||
# After completing the 3rd interval, score should be 2
|
||||
check peerStore.getGriefScore(p1, t3) == 2
|
||||
|
||||
test "grief after full cooldown restarts cooldown time":
|
||||
let t0 = Moment.init(1000, Minute)
|
||||
|
||||
peerStore.griefPeer(p1, 2, t0)
|
||||
|
||||
# Fully cool down
|
||||
check peerStore.getGriefScore(p1, t0 + interval * 5) == 0
|
||||
|
||||
# Grief again
|
||||
let t1 = t0 + interval * 5
|
||||
peerStore.griefPeer(p1, 3, t1)
|
||||
check peerStore.getGriefScore(p1, t1) == 3
|
||||
|
||||
# 1 interval after second grief
|
||||
check peerStore.getGriefScore(p1, t1 + interval) == 2
|
||||
|
||||
test "independent grief scores per peer":
|
||||
let t0 = Moment.init(1000, Minute)
|
||||
|
||||
peerStore.griefPeer(p1, 10, t0)
|
||||
peerStore.griefPeer(p2, 3, t0)
|
||||
|
||||
check peerStore.getGriefScore(p1, t0 + interval * 2) == 8
|
||||
check peerStore.getGriefScore(p2, t0 + interval * 2) == 1
|
||||
check peerStore.getGriefScore(p3, t0 + interval * 2) == 0
|
||||
|
||||
test "grief with default amount is 1":
|
||||
let t0 = Moment.init(1000, Minute)
|
||||
|
||||
peerStore.griefPeer(p1, now = t0)
|
||||
check peerStore.getGriefScore(p1, t0) == 1
|
||||
|
||||
test "griefPeer with zero or negative amount is ignored":
|
||||
let t0 = Moment.init(1000, Minute)
|
||||
|
||||
peerStore.griefPeer(p1, 5, t0)
|
||||
peerStore.griefPeer(p1, 0, t0)
|
||||
peerStore.griefPeer(p1, -3, t0)
|
||||
check peerStore.getGriefScore(p1, t0) == 5
|
||||
|
||||
test "grief added during partial cooldown does not reset cooldown time":
|
||||
let t0 = Moment.init(1000, Minute)
|
||||
let tHalf = t0 + interval * 2 + interval div 2
|
||||
let t3 = t0 + interval * 3
|
||||
let t4 = t0 + interval * 4
|
||||
|
||||
peerStore.griefPeer(p1, 5, t0)
|
||||
|
||||
# At 2.5 intervals: 2 consumed, score 3, half-interval remainder
|
||||
check peerStore.getGriefScore(p1, tHalf) == 3
|
||||
|
||||
# Add more grief — cooldown time should NOT reset, remainder preserved
|
||||
peerStore.griefPeer(p1, 4, tHalf)
|
||||
check peerStore.getGriefScore(p1, tHalf) == 7
|
||||
|
||||
# Remainder completes another interval
|
||||
check peerStore.getGriefScore(p1, t3) == 6
|
||||
|
||||
# And one more full interval
|
||||
check peerStore.getGriefScore(p1, t4) == 5
|
||||
|
||||
test "multiple reads without time change are idempotent":
|
||||
let t0 = Moment.init(1000, Minute)
|
||||
|
||||
peerStore.griefPeer(p1, 10, t0)
|
||||
|
||||
check peerStore.getGriefScore(p1, t0 + interval * 3) == 7
|
||||
check peerStore.getGriefScore(p1, t0 + interval * 3) == 7
|
||||
check peerStore.getGriefScore(p1, t0 + interval * 3) == 7
|
||||
|
||||
test "interleaved grief and cooldown across multiple peers":
|
||||
let t0 = Moment.init(1000, Minute)
|
||||
|
||||
# Stagger grief: p1 at t0, p2 at t0+1interval, p3 at t0+2interval
|
||||
peerStore.griefPeer(p1, 6, t0)
|
||||
peerStore.griefPeer(p2, 4, t0 + interval)
|
||||
peerStore.griefPeer(p3, 2, t0 + interval * 2)
|
||||
|
||||
# At t0+3*interval: p1 lost 3, p2 lost 2, p3 lost 1
|
||||
check peerStore.getGriefScore(p1, t0 + interval * 3) == 3
|
||||
check peerStore.getGriefScore(p2, t0 + interval * 3) == 2
|
||||
check peerStore.getGriefScore(p3, t0 + interval * 3) == 1
|
||||
|
||||
# Grief p2 again at t0+3I
|
||||
peerStore.griefPeer(p2, 10, t0 + interval * 3)
|
||||
|
||||
# At t0+5*interval: p1 lost 5 total, p2 lost 2 more since re-grief, p3 floored at 0
|
||||
check peerStore.getGriefScore(p1, t0 + interval * 5) == 1
|
||||
check peerStore.getGriefScore(p2, t0 + interval * 5) == 10
|
||||
check peerStore.getGriefScore(p3, t0 + interval * 5) == 0
|
||||
|
||||
suite "Extended nim-libp2p Peer Store: grief-based peer selection":
|
||||
# Tests for sortByGriefScore via selectPeers
|
||||
const testProto = "/test/grief/1.0.0"
|
||||
|
||||
proc makePeer(port: int): RemotePeerInfo =
|
||||
let key = generateSecp256k1Key()
|
||||
RemotePeerInfo.init(
|
||||
peerId = PeerId.init(key.getPublicKey().tryGet()).tryGet(),
|
||||
addrs = @[MultiAddress.init("/ip4/127.0.0.1/tcp/" & $port).tryGet()],
|
||||
protocols = @[testProto],
|
||||
)
|
||||
|
||||
test "all peers at grief 0 returns all peers (shuffled)":
|
||||
let switch = newTestSwitch()
|
||||
let pm = PeerManager.new(switch)
|
||||
let peerStore = switch.peerStore
|
||||
let peers = (1..5).mapIt(makePeer(it + 10000))
|
||||
for p in peers:
|
||||
peerStore.addPeer(p)
|
||||
|
||||
let selected = pm.selectPeers(testProto)
|
||||
check selected.len == 5
|
||||
|
||||
test "lower grief peers come before higher grief peers":
|
||||
let switch = newTestSwitch()
|
||||
let pm = PeerManager.new(switch)
|
||||
let peerStore = switch.peerStore
|
||||
let pA = makePeer(20001)
|
||||
let pB = makePeer(20002)
|
||||
let pC = makePeer(20003)
|
||||
peerStore.addPeer(pA)
|
||||
peerStore.addPeer(pB)
|
||||
peerStore.addPeer(pC)
|
||||
|
||||
# pA: grief 0 (bucket 0), pB: grief 5 (bucket 1), pC: grief 15 (bucket 3)
|
||||
peerStore.griefPeer(pB.peerId, 5)
|
||||
peerStore.griefPeer(pC.peerId, 15)
|
||||
|
||||
# Run multiple times to account for shuffle within buckets
|
||||
for i in 0 ..< 20:
|
||||
let selected = pm.selectPeers(testProto)
|
||||
check selected.len == 3
|
||||
# pA (bucket 0) must always be first
|
||||
check selected[0].peerId == pA.peerId
|
||||
# pB (bucket 1) must always come before pC (bucket 3)
|
||||
check selected[1].peerId == pB.peerId
|
||||
check selected[2].peerId == pC.peerId
|
||||
|
||||
test "peers within same bucket are interchangeable":
|
||||
let switch = newTestSwitch()
|
||||
let pm = PeerManager.new(switch)
|
||||
let peerStore = switch.peerStore
|
||||
let pA = makePeer(30001)
|
||||
let pB = makePeer(30002)
|
||||
peerStore.addPeer(pA)
|
||||
peerStore.addPeer(pB)
|
||||
|
||||
# Both within bucket 0 (scores 1 and 4, both div 5 == 0)
|
||||
peerStore.griefPeer(pA.peerId, 1)
|
||||
peerStore.griefPeer(pB.peerId, 4)
|
||||
|
||||
var sawAFirst = false
|
||||
var sawBFirst = false
|
||||
for i in 0 ..< 50:
|
||||
let selected = pm.selectPeers(testProto)
|
||||
check selected.len == 2
|
||||
if selected[0].peerId == pA.peerId:
|
||||
sawAFirst = true
|
||||
else:
|
||||
sawBFirst = true
|
||||
|
||||
# Both orderings should appear since they're in the same bucket
|
||||
check sawAFirst
|
||||
check sawBFirst
|
||||
|
||||
test "peers in different buckets never swap order":
|
||||
let switch = newTestSwitch()
|
||||
let pm = PeerManager.new(switch)
|
||||
let peerStore = switch.peerStore
|
||||
let pLow = makePeer(40001)
|
||||
let pHigh = makePeer(40002)
|
||||
peerStore.addPeer(pLow)
|
||||
peerStore.addPeer(pHigh)
|
||||
|
||||
# pLow in bucket 0 (score 1), pHigh in bucket 1 (score 5)
|
||||
peerStore.griefPeer(pLow.peerId, 1)
|
||||
peerStore.griefPeer(pHigh.peerId, 5)
|
||||
|
||||
for i in 0 ..< 30:
|
||||
let selected = pm.selectPeers(testProto)
|
||||
check selected.len == 2
|
||||
check selected[0].peerId == pLow.peerId
|
||||
check selected[1].peerId == pHigh.peerId
|
||||
|
||||
test "zero-grief peers always come before grieved peers":
|
||||
let switch = newTestSwitch()
|
||||
let pm = PeerManager.new(switch)
|
||||
let peerStore = switch.peerStore
|
||||
let pClean1 = makePeer(50001)
|
||||
let pClean2 = makePeer(50002)
|
||||
let pGrieved = makePeer(50003)
|
||||
peerStore.addPeer(pClean1)
|
||||
peerStore.addPeer(pClean2)
|
||||
peerStore.addPeer(pGrieved)
|
||||
|
||||
peerStore.griefPeer(pGrieved.peerId, 6)
|
||||
|
||||
for i in 0 ..< 20:
|
||||
let selected = pm.selectPeers(testProto)
|
||||
check selected.len == 3
|
||||
# Grieved peer (bucket 1) must be last; clean peers (bucket 0) first
|
||||
check selected[2].peerId == pGrieved.peerId
|
||||
|
||||
test "peers beyond MaxGriefBucket are excluded from selection":
|
||||
let switch = newTestSwitch()
|
||||
let pm = PeerManager.new(switch)
|
||||
let peerStore = switch.peerStore
|
||||
let pGood = makePeer(60001)
|
||||
let pBad = makePeer(60002)
|
||||
peerStore.addPeer(pGood)
|
||||
peerStore.addPeer(pBad)
|
||||
|
||||
# pBad in bucket 4 (score 20, 20 div 5 = 4 > MaxGriefBucket)
|
||||
peerStore.griefPeer(pBad.peerId, 20)
|
||||
|
||||
let selected = pm.selectPeers(testProto)
|
||||
check selected.len == 1
|
||||
check selected[0].peerId == pGood.peerId
|
||||
|
||||
@ -203,6 +203,7 @@ suite "Waku Peer Exchange":
|
||||
# Start and mount peer exchange
|
||||
await allFutures([node1.start(), node2.start()])
|
||||
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])
|
||||
await allFutures([node1.mountPeerExchangeClient(), node2.mountPeerExchangeClient()])
|
||||
|
||||
# Create connection
|
||||
let connOpt = await node2.peerManager.dialPeer(
|
||||
|
||||
@ -215,27 +215,64 @@ proc loadFromStorage(pm: PeerManager) {.gcsafe.} =
|
||||
|
||||
trace "recovered peers from storage", amount = amount
|
||||
|
||||
proc griefPeer*(pm: PeerManager, peerId: PeerId, amount: int = 1) =
|
||||
if not pm.isNil:
|
||||
pm.switch.peerStore.griefPeer(peerId, amount)
|
||||
|
||||
proc sortByGriefScore(pm: PeerManager, peers: var seq[RemotePeerInfo]) =
|
||||
## Sorts peers by grief score ascending, with random shuffling within each
|
||||
## score tier. Peers with lower grief are preferred.
|
||||
## NOTE: shuffling defaultPeerStoreCapacity (750 currently) on demand is
|
||||
## negligible, but if that increases, might be worth exploring different
|
||||
## data structures.
|
||||
let peerStore = pm.switch.peerStore
|
||||
|
||||
# Resolve grief scores for all peers
|
||||
var anyGrief = false
|
||||
var scored: seq[(int, RemotePeerInfo)]
|
||||
for p in peers:
|
||||
let score = peerStore.getGriefScore(p.peerId)
|
||||
if score > 0:
|
||||
anyGrief = true
|
||||
scored.add((score, p))
|
||||
|
||||
# Fast path: if all peers are at 0, just shuffle
|
||||
if not anyGrief:
|
||||
shuffle(peers)
|
||||
return
|
||||
|
||||
# Shuffle first so that within-bucket order is random
|
||||
shuffle(scored)
|
||||
|
||||
# Stable sort by bucket preserves the random order within each bucket
|
||||
scored.sort(
|
||||
proc(a, b: (int, RemotePeerInfo)): int =
|
||||
cmp(a[0] div GriefBucketSize, b[0] div GriefBucketSize),
|
||||
order = SortOrder.Ascending)
|
||||
|
||||
# Drop peers beyond the max grief bucket
|
||||
peers = scored.filterIt(it[0] div GriefBucketSize <= MaxGriefBucket).mapIt(it[1])
|
||||
|
||||
proc selectPeers*(
|
||||
pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic)
|
||||
): seq[RemotePeerInfo] =
|
||||
## Returns all peers that support the given protocol (and optionally shard),
|
||||
## shuffled randomly. Callers can further filter or pick from this list.
|
||||
var peers = pm.switch.peerStore.getPeersByProtocol(proto)
|
||||
## sorted by grief score ascending (shuffled within each score tier).
|
||||
result = pm.switch.peerStore.getPeersByProtocol(proto)
|
||||
trace "Selecting peers from peerstore",
|
||||
protocol = proto, num_peers = peers.len, address = cast[uint](pm.switch.peerStore)
|
||||
protocol = proto, num_peers = result.len, address = cast[uint](pm.switch.peerStore)
|
||||
|
||||
if shard.isSome():
|
||||
let shardInfo = RelayShard.parse(shard.get()).valueOr:
|
||||
trace "Failed to parse shard from pubsub topic", topic = shard.get()
|
||||
return @[]
|
||||
|
||||
peers.keepItIf(
|
||||
result.keepItIf(
|
||||
(it.enr.isSome() and it.enr.get().containsShard(shard.get())) or
|
||||
(it.shards.len > 0 and it.shards.contains(shardInfo.shardId))
|
||||
)
|
||||
|
||||
shuffle(peers)
|
||||
return peers
|
||||
pm.sortByGriefScore(result)
|
||||
|
||||
proc selectPeer*(
|
||||
pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic)
|
||||
@ -640,7 +677,7 @@ proc connectToRelayPeers*(pm: PeerManager) {.async.} =
|
||||
|
||||
var outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
|
||||
|
||||
shuffle(outsideBackoffPeers)
|
||||
pm.sortByGriefScore(outsideBackoffPeers)
|
||||
|
||||
var index = 0
|
||||
var numPendingConnReqs =
|
||||
|
||||
@ -43,6 +43,10 @@ type
|
||||
# Keeps track of peer shards
|
||||
ShardBook* = ref object of PeerBook[seq[uint16]]
|
||||
|
||||
# Keeps track of peer grief: (score, lastCooldownTime)
|
||||
GriefData* = tuple[score: int, cooldownTime: Moment]
|
||||
GriefBook* = ref object of PeerBook[GriefData]
|
||||
|
||||
proc getPeer*(peerStore: PeerStore, peerId: PeerId): RemotePeerInfo =
|
||||
let addresses =
|
||||
if peerStore[LastSeenBook][peerId].isSome():
|
||||
@ -69,6 +73,8 @@ proc getPeer*(peerStore: PeerStore, peerId: PeerId): RemotePeerInfo =
|
||||
direction: peerStore[DirectionBook][peerId],
|
||||
lastFailedConn: peerStore[LastFailedConnBook][peerId],
|
||||
numberFailedConn: peerStore[NumberFailedConnBook][peerId],
|
||||
griefScore: peerStore[GriefBook][peerId].score,
|
||||
griefCooldownTime: peerStore[GriefBook][peerId].cooldownTime,
|
||||
mixPubKey:
|
||||
if peerStore[MixPubKeyBook][peerId] != default(Curve25519Key):
|
||||
some(peerStore[MixPubKeyBook][peerId])
|
||||
@ -143,6 +149,9 @@ proc addPeer*(peerStore: PeerStore, peer: RemotePeerInfo, origin = UnknownOrigin
|
||||
peerStore[LastFailedConnBook].book.hasKeyOrPut(peer.peerId, peer.lastFailedConn)
|
||||
discard
|
||||
peerStore[NumberFailedConnBook].book.hasKeyOrPut(peer.peerId, peer.numberFailedConn)
|
||||
discard peerStore[GriefBook].book.hasKeyOrPut(
|
||||
peer.peerId, (score: peer.griefScore, cooldownTime: peer.griefCooldownTime)
|
||||
)
|
||||
if peer.enr.isSome():
|
||||
peerStore[ENRBook][peer.peerId] = peer.enr.get()
|
||||
|
||||
@ -241,3 +250,78 @@ template forEnrPeers*(
|
||||
let peerOrigin {.inject.} = sourceBook.book.getOrDefault(pid, UnknownOrigin)
|
||||
let peerEnrRecord {.inject.} = enrRecord
|
||||
body
|
||||
|
||||
#~~~~~~~~~~~~~~~~~~#
|
||||
# Grief Management #
|
||||
#~~~~~~~~~~~~~~~~~~#
|
||||
|
||||
const
|
||||
# Each grief point represents this much cooldown time.
|
||||
# A peer with grief score N will fully cool down after N * GriefCooldownInterval.
|
||||
GriefCooldownInterval* = chronos.minutes(1)
|
||||
GriefBucketSize* = 5 ## peers within this many points sort equally
|
||||
MaxGriefBucket* = 3 ## peers in bucket > this are excluded from selection
|
||||
|
||||
MinGriefScore* = 1 ## stream errors, timeouts
|
||||
LowGriefScore* = 2 ## non-success response codes
|
||||
MediumGriefScore* = 3 ## decode failures, protocol violations
|
||||
HighGriefScore* = 5 ## requestId mismatch, active misbehavior
|
||||
|
||||
const defaultGriefData: GriefData = (score: 0, cooldownTime: Moment.init(0, Second))
|
||||
|
||||
proc resolveGriefScore(
|
||||
peerStore: PeerStore, peerId: PeerId,
|
||||
now: Moment = Moment.init(0, Second)
|
||||
): int =
|
||||
## Lazily resolves the grief score for a peer by applying cooldown based on
|
||||
## elapsed time. Updates the stored score and cooldown time in place.
|
||||
## Returns the resolved (current) grief score.
|
||||
## Pass now for testing; default (0) uses the system clock.
|
||||
var data = peerStore[GriefBook].book.getOrDefault(peerId, defaultGriefData)
|
||||
if data.score <= 0:
|
||||
return 0
|
||||
|
||||
let clock = if now == Moment.init(0, Second): Moment.now() else: now
|
||||
let elapsed = clock - data.cooldownTime
|
||||
let cooldowns = int(elapsed.minutes div GriefCooldownInterval.minutes)
|
||||
|
||||
if cooldowns > 0:
|
||||
data.score = max(data.score - cooldowns, 0)
|
||||
# Advance by exactly the consumed cooldown time, preserving remainder
|
||||
data.cooldownTime =
|
||||
data.cooldownTime + chronos.minutes(int64(cooldowns) * GriefCooldownInterval.minutes)
|
||||
peerStore[GriefBook][peerId] = data
|
||||
|
||||
return data.score
|
||||
|
||||
proc griefPeer*(
|
||||
peerStore: PeerStore, peerId: PeerId, amount: int = 1,
|
||||
now: Moment = Moment.init(0, Second)
|
||||
) =
|
||||
## Increases the grief score of a peer by the given amount.
|
||||
## If this is the first grief for a peer at score 0, initializes the cooldown time.
|
||||
## Pass now for testing; default (0) uses the system clock.
|
||||
if amount <= 0:
|
||||
return
|
||||
|
||||
let clock = if now == Moment.init(0, Second): Moment.now() else: now
|
||||
|
||||
# resolveGriefScore already wrote back the cooled-down data, so read it once
|
||||
let currentScore = peerStore.resolveGriefScore(peerId, clock)
|
||||
var data = peerStore[GriefBook].book.getOrDefault(peerId, defaultGriefData)
|
||||
|
||||
data.score += amount
|
||||
|
||||
# If peer was at 0 (no prior cooldown running), start cooldown from now
|
||||
if currentScore == 0:
|
||||
data.cooldownTime = clock
|
||||
|
||||
peerStore[GriefBook][peerId] = data
|
||||
|
||||
proc getGriefScore*(
|
||||
peerStore: PeerStore, peerId: PeerId,
|
||||
now: Moment = Moment.init(0, Second)
|
||||
): int =
|
||||
## Returns the current grief score of a peer after applying cooldown.
|
||||
## Pass now for testing; default (0) uses the system clock.
|
||||
return peerStore.resolveGriefScore(peerId, now)
|
||||
|
||||
@ -62,6 +62,8 @@ type RemotePeerInfo* = ref object
|
||||
direction*: PeerDirection
|
||||
lastFailedConn*: Moment
|
||||
numberFailedConn*: int
|
||||
griefScore*: int
|
||||
griefCooldownTime*: Moment
|
||||
|
||||
func `$`*(remotePeerInfo: RemotePeerInfo): string =
|
||||
$remotePeerInfo.peerId
|
||||
@ -87,6 +89,8 @@ proc init*(
|
||||
direction: PeerDirection = UnknownDirection,
|
||||
lastFailedConn: Moment = Moment.init(0, Second),
|
||||
numberFailedConn: int = 0,
|
||||
griefScore: int = 0,
|
||||
griefCooldownTime: Moment = Moment.init(0, Second),
|
||||
mixPubKey: Option[Curve25519Key] = none(Curve25519Key),
|
||||
): T =
|
||||
RemotePeerInfo(
|
||||
@ -104,6 +108,8 @@ proc init*(
|
||||
direction: direction,
|
||||
lastFailedConn: lastFailedConn,
|
||||
numberFailedConn: numberFailedConn,
|
||||
griefScore: griefScore,
|
||||
griefCooldownTime: griefCooldownTime,
|
||||
mixPubKey: mixPubKey,
|
||||
)
|
||||
|
||||
|
||||
@ -11,7 +11,8 @@ import
|
||||
stew/byteutils
|
||||
import
|
||||
waku/
|
||||
[node/peer_manager, waku_core, events/delivery_events, common/broker/broker_context],
|
||||
[node/peer_manager, waku_core, events/delivery_events,
|
||||
common/broker/broker_context],
|
||||
./common,
|
||||
./protocol_metrics,
|
||||
./rpc_codec,
|
||||
@ -64,6 +65,7 @@ proc sendSubscribeRequest(
|
||||
"exception in waku_filter_v2 client writeLP: " & getCurrentExceptionMsg()
|
||||
trace "exception in waku_filter_v2 client writeLP", error = getCurrentExceptionMsg()
|
||||
waku_filter_errors.inc(labelValues = [errMsg])
|
||||
wfc.peerManager.griefPeer(servicePeer.peerId, MinGriefScore) # stream error: transient
|
||||
return err(FilterSubscribeError.badResponse(errMsg))
|
||||
|
||||
var respBuf: seq[byte]
|
||||
@ -74,11 +76,13 @@ proc sendSubscribeRequest(
|
||||
"exception in waku_filter_v2 client readLp: " & getCurrentExceptionMsg()
|
||||
trace "exception in waku_filter_v2 client readLp", error = getCurrentExceptionMsg()
|
||||
waku_filter_errors.inc(labelValues = [errMsg])
|
||||
wfc.peerManager.griefPeer(servicePeer.peerId, MinGriefScore) # stream error: transient
|
||||
return err(FilterSubscribeError.badResponse(errMsg))
|
||||
|
||||
let response = FilterSubscribeResponse.decode(respBuf).valueOr:
|
||||
trace "Failed to decode filter subscribe response", servicePeer
|
||||
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
|
||||
wfc.peerManager.griefPeer(servicePeer.peerId, MediumGriefScore) # decode failure: protocol violation
|
||||
return err(FilterSubscribeError.badResponse(decodeRpcFailure))
|
||||
|
||||
# DOS protection rate limit checks does not know about request id
|
||||
@ -86,11 +90,13 @@ proc sendSubscribeRequest(
|
||||
response.requestId != filterSubscribeRequest.requestId:
|
||||
trace "Filter subscribe response requestId mismatch", servicePeer, response
|
||||
waku_filter_errors.inc(labelValues = [requestIdMismatch])
|
||||
wfc.peerManager.griefPeer(servicePeer.peerId, HighGriefScore) # requestId mismatch: misbehavior
|
||||
return err(FilterSubscribeError.badResponse(requestIdMismatch))
|
||||
|
||||
if response.statusCode != 200:
|
||||
trace "Filter subscribe error response", servicePeer, response
|
||||
waku_filter_errors.inc(labelValues = [errorResponse])
|
||||
wfc.peerManager.griefPeer(servicePeer.peerId, LowGriefScore) # non-success response: rejection
|
||||
let cause =
|
||||
if response.statusDesc.isSome():
|
||||
response.statusDesc.get()
|
||||
@ -188,6 +194,7 @@ proc initProtocolHandler(wfc: WakuFilterClient) =
|
||||
let msgPush = MessagePush.decode(buf).valueOr:
|
||||
error "Failed to decode message push", peerId = conn.peerId, error = $error
|
||||
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
|
||||
wfc.peerManager.griefPeer(conn.peerId, MediumGriefScore) # decode failure: protocol violation
|
||||
return
|
||||
|
||||
let msg_hash =
|
||||
|
||||
@ -59,6 +59,7 @@ proc sendPushRequest(
|
||||
buffer = await connection.readLp(DefaultMaxRpcSize.int)
|
||||
except LPStreamRemoteClosedError:
|
||||
error "Failed to read response from peer", error = getCurrentExceptionMsg()
|
||||
wl.peerManager.griefPeer(connection.peerId, MinGriefScore) # stream closed: transient
|
||||
return lightpushResultInternalError(
|
||||
"Failed to read response from peer: " & getCurrentExceptionMsg()
|
||||
)
|
||||
@ -66,15 +67,20 @@ proc sendPushRequest(
|
||||
let response = LightpushResponse.decode(buffer).valueOr:
|
||||
error "failed to decode response"
|
||||
waku_lightpush_v3_errors.inc(labelValues = [decodeRpcFailure])
|
||||
wl.peerManager.griefPeer(connection.peerId, MediumGriefScore) # decode failure: protocol violation
|
||||
return lightpushResultInternalError(decodeRpcFailure)
|
||||
|
||||
if response.requestId != req.requestId and
|
||||
response.statusCode != LightPushErrorCode.TOO_MANY_REQUESTS:
|
||||
error "response failure, requestId mismatch",
|
||||
requestId = req.requestId, responseRequestId = response.requestId
|
||||
wl.peerManager.griefPeer(connection.peerId, HighGriefScore) # requestId mismatch: misbehavior
|
||||
return lightpushResultInternalError("response failure, requestId mismatch")
|
||||
|
||||
return toPushResult(response)
|
||||
let pushResult = toPushResult(response)
|
||||
if pushResult.isErr():
|
||||
wl.peerManager.griefPeer(connection.peerId, LowGriefScore) # non-success response: rejection
|
||||
return pushResult
|
||||
|
||||
proc publish*(
|
||||
wl: WakuLightPushClient,
|
||||
|
||||
@ -32,6 +32,7 @@ proc request*(
|
||||
except CatchableError as exc:
|
||||
error "exception when handling peer exchange request", error = exc.msg
|
||||
waku_px_client_errors.inc(labelValues = ["error_sending_or_receiving_px_req"])
|
||||
wpx.peerManager.griefPeer(conn.peerId, MinGriefScore) # stream error: transient
|
||||
callResult = (
|
||||
status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE,
|
||||
status_desc: some($exc.msg),
|
||||
@ -46,6 +47,7 @@ proc request*(
|
||||
|
||||
let decoded = PeerExchangeRpc.decode(buffer).valueOr:
|
||||
error "peer exchange request error decoding buffer", error = $error
|
||||
wpx.peerManager.griefPeer(conn.peerId, MediumGriefScore) # decode failure: protocol violation
|
||||
return err(
|
||||
(
|
||||
status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE,
|
||||
@ -54,6 +56,7 @@ proc request*(
|
||||
)
|
||||
if decoded.response.status_code != PeerExchangeResponseStatusCode.SUCCESS:
|
||||
error "peer exchange request error", status_code = decoded.response.status_code
|
||||
wpx.peerManager.griefPeer(conn.peerId, LowGriefScore) # non-success response: rejection
|
||||
return err(
|
||||
(
|
||||
status_code: decoded.response.status_code,
|
||||
|
||||
@ -8,7 +8,8 @@ import
|
||||
metrics,
|
||||
bearssl/rand
|
||||
import
|
||||
../node/peer_manager, ../utils/requests, ./protocol_metrics, ./common, ./rpc_codec
|
||||
../node/peer_manager, ../utils/requests,
|
||||
./protocol_metrics, ./common, ./rpc_codec
|
||||
|
||||
logScope:
|
||||
topics = "waku store client"
|
||||
@ -42,20 +43,24 @@ proc sendStoreRequest(
|
||||
let writeRes = catch:
|
||||
await connection.writeLP(req.encode().buffer)
|
||||
if writeRes.isErr():
|
||||
self.peerManager.griefPeer(connection.peerId, MinGriefScore) # stream error: transient
|
||||
return err(StoreError(kind: ErrorCode.BAD_REQUEST, cause: writeRes.error.msg))
|
||||
|
||||
let readRes = catch:
|
||||
await connection.readLp(DefaultMaxRpcSize.int)
|
||||
|
||||
let buf = readRes.valueOr:
|
||||
self.peerManager.griefPeer(connection.peerId, MinGriefScore) # stream error: transient
|
||||
return err(StoreError(kind: ErrorCode.BAD_RESPONSE, cause: error.msg))
|
||||
|
||||
let res = StoreQueryResponse.decode(buf).valueOr:
|
||||
waku_store_errors.inc(labelValues = [DecodeRpcFailure])
|
||||
self.peerManager.griefPeer(connection.peerId, MediumGriefScore) # decode failure: protocol violation
|
||||
return err(StoreError(kind: ErrorCode.BAD_RESPONSE, cause: DecodeRpcFailure))
|
||||
|
||||
if res.statusCode != uint32(StatusCode.SUCCESS):
|
||||
waku_store_errors.inc(labelValues = [NoSuccessStatusCode])
|
||||
self.peerManager.griefPeer(connection.peerId, LowGriefScore) # non-success response: rejection
|
||||
return err(StoreError.new(res.statusCode, res.statusDesc))
|
||||
|
||||
if req.pubsubTopic.isSome():
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user