mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-05-18 00:09:52 +00:00
Merge 41c0e56860006900b64c88ebb1a9b4ea44b4600d into 75864a705ea0b913d517a5f3640747f8709e9e53
This commit is contained in:
commit
baf6e372c1
@ -1,7 +1,7 @@
|
|||||||
{.used.}
|
{.used.}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[sequtils, times],
|
std/[sequtils, times, random],
|
||||||
chronos,
|
chronos,
|
||||||
libp2p/crypto/crypto,
|
libp2p/crypto/crypto,
|
||||||
libp2p/peerid,
|
libp2p/peerid,
|
||||||
@ -353,3 +353,289 @@ suite "Extended nim-libp2p Peer Store":
|
|||||||
peerStore[DisconnectBook][p1] == 0
|
peerStore[DisconnectBook][p1] == 0
|
||||||
peerStore[SourceBook][p1] == default(PeerOrigin)
|
peerStore[SourceBook][p1] == default(PeerOrigin)
|
||||||
peerStore[DirectionBook][p1] == default(PeerDirection)
|
peerStore[DirectionBook][p1] == default(PeerDirection)
|
||||||
|
peerStore[GriefBook][p1] == default(GriefData)
|
||||||
|
|
||||||
|
suite "Extended nim-libp2p Peer Store: grief scores":
|
||||||
|
# These tests mock the clock and work better as a separate suite
|
||||||
|
var peerStore: PeerStore
|
||||||
|
var p1, p2, p3: PeerId
|
||||||
|
|
||||||
|
setup:
|
||||||
|
peerStore = PeerStore.new(nil, capacity = 50)
|
||||||
|
require p1.init(basePeerId & "1")
|
||||||
|
require p2.init(basePeerId & "2")
|
||||||
|
require p3.init(basePeerId & "3")
|
||||||
|
|
||||||
|
# Shorthand: one cooldown interval
|
||||||
|
let interval = GriefCooldownInterval
|
||||||
|
|
||||||
|
test "new peer has grief score 0":
|
||||||
|
check peerStore.getGriefScore(p1) == 0
|
||||||
|
|
||||||
|
test "griefPeer increases score":
|
||||||
|
let t0 = Moment.init(1000, Minute)
|
||||||
|
|
||||||
|
peerStore.griefPeer(p1, 5, t0)
|
||||||
|
check peerStore.getGriefScore(p1, t0) == 5
|
||||||
|
|
||||||
|
test "griefPeer accumulates":
|
||||||
|
let t0 = Moment.init(1000, Minute)
|
||||||
|
|
||||||
|
peerStore.griefPeer(p1, 3, t0)
|
||||||
|
peerStore.griefPeer(p1, 2, t0)
|
||||||
|
check peerStore.getGriefScore(p1, t0) == 5
|
||||||
|
|
||||||
|
test "grief cools down by 1 point per interval":
|
||||||
|
let t0 = Moment.init(1000, Minute)
|
||||||
|
|
||||||
|
peerStore.griefPeer(p1, 5, t0)
|
||||||
|
|
||||||
|
check peerStore.getGriefScore(p1, t0) == 5
|
||||||
|
check peerStore.getGriefScore(p1, t0 + interval * 1) == 4
|
||||||
|
check peerStore.getGriefScore(p1, t0 + interval * 2) == 3
|
||||||
|
check peerStore.getGriefScore(p1, t0 + interval * 3) == 2
|
||||||
|
check peerStore.getGriefScore(p1, t0 + interval * 4) == 1
|
||||||
|
check peerStore.getGriefScore(p1, t0 + interval * 5) == 0
|
||||||
|
|
||||||
|
test "grief floors at 0":
|
||||||
|
let t0 = Moment.init(1000, Minute)
|
||||||
|
|
||||||
|
peerStore.griefPeer(p1, 3, t0)
|
||||||
|
|
||||||
|
# Well past full cooldown, should be 0
|
||||||
|
check peerStore.getGriefScore(p1, t0 + interval * 10) == 0
|
||||||
|
|
||||||
|
test "cooldown preserves remainder":
|
||||||
|
let t0 = Moment.init(1000, Minute)
|
||||||
|
# Half an interval past 2 full intervals
|
||||||
|
let tHalf = t0 + interval * 2 + interval div 2
|
||||||
|
# Complete the 3rd interval
|
||||||
|
let t3 = t0 + interval * 3
|
||||||
|
|
||||||
|
peerStore.griefPeer(p1, 5, t0)
|
||||||
|
|
||||||
|
# After 2.5 intervals, score should be 3
|
||||||
|
check peerStore.getGriefScore(p1, tHalf) == 3
|
||||||
|
|
||||||
|
# After completing the 3rd interval, score should be 2
|
||||||
|
check peerStore.getGriefScore(p1, t3) == 2
|
||||||
|
|
||||||
|
test "grief after full cooldown restarts cooldown time":
|
||||||
|
let t0 = Moment.init(1000, Minute)
|
||||||
|
|
||||||
|
peerStore.griefPeer(p1, 2, t0)
|
||||||
|
|
||||||
|
# Fully cool down
|
||||||
|
check peerStore.getGriefScore(p1, t0 + interval * 5) == 0
|
||||||
|
|
||||||
|
# Grief again
|
||||||
|
let t1 = t0 + interval * 5
|
||||||
|
peerStore.griefPeer(p1, 3, t1)
|
||||||
|
check peerStore.getGriefScore(p1, t1) == 3
|
||||||
|
|
||||||
|
# 1 interval after second grief
|
||||||
|
check peerStore.getGriefScore(p1, t1 + interval) == 2
|
||||||
|
|
||||||
|
test "independent grief scores per peer":
|
||||||
|
let t0 = Moment.init(1000, Minute)
|
||||||
|
|
||||||
|
peerStore.griefPeer(p1, 10, t0)
|
||||||
|
peerStore.griefPeer(p2, 3, t0)
|
||||||
|
|
||||||
|
check peerStore.getGriefScore(p1, t0 + interval * 2) == 8
|
||||||
|
check peerStore.getGriefScore(p2, t0 + interval * 2) == 1
|
||||||
|
check peerStore.getGriefScore(p3, t0 + interval * 2) == 0
|
||||||
|
|
||||||
|
test "grief with default amount is 1":
|
||||||
|
let t0 = Moment.init(1000, Minute)
|
||||||
|
|
||||||
|
peerStore.griefPeer(p1, now = t0)
|
||||||
|
check peerStore.getGriefScore(p1, t0) == 1
|
||||||
|
|
||||||
|
test "griefPeer with zero or negative amount is ignored":
|
||||||
|
let t0 = Moment.init(1000, Minute)
|
||||||
|
|
||||||
|
peerStore.griefPeer(p1, 5, t0)
|
||||||
|
peerStore.griefPeer(p1, 0, t0)
|
||||||
|
peerStore.griefPeer(p1, -3, t0)
|
||||||
|
check peerStore.getGriefScore(p1, t0) == 5
|
||||||
|
|
||||||
|
test "grief added during partial cooldown does not reset cooldown time":
|
||||||
|
let t0 = Moment.init(1000, Minute)
|
||||||
|
let tHalf = t0 + interval * 2 + interval div 2
|
||||||
|
let t3 = t0 + interval * 3
|
||||||
|
let t4 = t0 + interval * 4
|
||||||
|
|
||||||
|
peerStore.griefPeer(p1, 5, t0)
|
||||||
|
|
||||||
|
# At 2.5 intervals: 2 consumed, score 3, half-interval remainder
|
||||||
|
check peerStore.getGriefScore(p1, tHalf) == 3
|
||||||
|
|
||||||
|
# Add more grief — cooldown time should NOT reset, remainder preserved
|
||||||
|
peerStore.griefPeer(p1, 4, tHalf)
|
||||||
|
check peerStore.getGriefScore(p1, tHalf) == 7
|
||||||
|
|
||||||
|
# Remainder completes another interval
|
||||||
|
check peerStore.getGriefScore(p1, t3) == 6
|
||||||
|
|
||||||
|
# And one more full interval
|
||||||
|
check peerStore.getGriefScore(p1, t4) == 5
|
||||||
|
|
||||||
|
test "multiple reads without time change are idempotent":
|
||||||
|
let t0 = Moment.init(1000, Minute)
|
||||||
|
|
||||||
|
peerStore.griefPeer(p1, 10, t0)
|
||||||
|
|
||||||
|
check peerStore.getGriefScore(p1, t0 + interval * 3) == 7
|
||||||
|
check peerStore.getGriefScore(p1, t0 + interval * 3) == 7
|
||||||
|
check peerStore.getGriefScore(p1, t0 + interval * 3) == 7
|
||||||
|
|
||||||
|
test "interleaved grief and cooldown across multiple peers":
|
||||||
|
let t0 = Moment.init(1000, Minute)
|
||||||
|
|
||||||
|
# Stagger grief: p1 at t0, p2 at t0+1interval, p3 at t0+2interval
|
||||||
|
peerStore.griefPeer(p1, 6, t0)
|
||||||
|
peerStore.griefPeer(p2, 4, t0 + interval)
|
||||||
|
peerStore.griefPeer(p3, 2, t0 + interval * 2)
|
||||||
|
|
||||||
|
# At t0+3*interval: p1 lost 3, p2 lost 2, p3 lost 1
|
||||||
|
check peerStore.getGriefScore(p1, t0 + interval * 3) == 3
|
||||||
|
check peerStore.getGriefScore(p2, t0 + interval * 3) == 2
|
||||||
|
check peerStore.getGriefScore(p3, t0 + interval * 3) == 1
|
||||||
|
|
||||||
|
# Grief p2 again at t0+3I
|
||||||
|
peerStore.griefPeer(p2, 10, t0 + interval * 3)
|
||||||
|
|
||||||
|
# At t0+5*interval: p1 lost 5 total, p2 lost 2 more since re-grief, p3 floored at 0
|
||||||
|
check peerStore.getGriefScore(p1, t0 + interval * 5) == 1
|
||||||
|
check peerStore.getGriefScore(p2, t0 + interval * 5) == 10
|
||||||
|
check peerStore.getGriefScore(p3, t0 + interval * 5) == 0
|
||||||
|
|
||||||
|
suite "Extended nim-libp2p Peer Store: grief-based peer selection":
|
||||||
|
# Tests for sortByGriefScore via selectPeers
|
||||||
|
const testProto = "/test/grief/1.0.0"
|
||||||
|
|
||||||
|
proc makePeer(port: int): RemotePeerInfo =
|
||||||
|
let key = generateSecp256k1Key()
|
||||||
|
RemotePeerInfo.init(
|
||||||
|
peerId = PeerId.init(key.getPublicKey().tryGet()).tryGet(),
|
||||||
|
addrs = @[MultiAddress.init("/ip4/127.0.0.1/tcp/" & $port).tryGet()],
|
||||||
|
protocols = @[testProto],
|
||||||
|
)
|
||||||
|
|
||||||
|
test "all peers at grief 0 returns all peers (shuffled)":
|
||||||
|
let switch = newTestSwitch()
|
||||||
|
let pm = PeerManager.new(switch)
|
||||||
|
let peerStore = switch.peerStore
|
||||||
|
let peers = (1..5).mapIt(makePeer(it + 10000))
|
||||||
|
for p in peers:
|
||||||
|
peerStore.addPeer(p)
|
||||||
|
|
||||||
|
let selected = pm.selectPeers(testProto)
|
||||||
|
check selected.len == 5
|
||||||
|
|
||||||
|
test "lower grief peers come before higher grief peers":
|
||||||
|
let switch = newTestSwitch()
|
||||||
|
let pm = PeerManager.new(switch)
|
||||||
|
let peerStore = switch.peerStore
|
||||||
|
let pA = makePeer(20001)
|
||||||
|
let pB = makePeer(20002)
|
||||||
|
let pC = makePeer(20003)
|
||||||
|
peerStore.addPeer(pA)
|
||||||
|
peerStore.addPeer(pB)
|
||||||
|
peerStore.addPeer(pC)
|
||||||
|
|
||||||
|
# pA: grief 0 (bucket 0), pB: grief 5 (bucket 1), pC: grief 15 (bucket 3)
|
||||||
|
peerStore.griefPeer(pB.peerId, 5)
|
||||||
|
peerStore.griefPeer(pC.peerId, 15)
|
||||||
|
|
||||||
|
# Run multiple times to account for shuffle within buckets
|
||||||
|
for i in 0 ..< 20:
|
||||||
|
let selected = pm.selectPeers(testProto)
|
||||||
|
check selected.len == 3
|
||||||
|
# pA (bucket 0) must always be first
|
||||||
|
check selected[0].peerId == pA.peerId
|
||||||
|
# pB (bucket 1) must always come before pC (bucket 3)
|
||||||
|
check selected[1].peerId == pB.peerId
|
||||||
|
check selected[2].peerId == pC.peerId
|
||||||
|
|
||||||
|
test "peers within same bucket are interchangeable":
|
||||||
|
let switch = newTestSwitch()
|
||||||
|
let pm = PeerManager.new(switch)
|
||||||
|
let peerStore = switch.peerStore
|
||||||
|
let pA = makePeer(30001)
|
||||||
|
let pB = makePeer(30002)
|
||||||
|
peerStore.addPeer(pA)
|
||||||
|
peerStore.addPeer(pB)
|
||||||
|
|
||||||
|
# Both within bucket 0 (scores 1 and 4, both div 5 == 0)
|
||||||
|
peerStore.griefPeer(pA.peerId, 1)
|
||||||
|
peerStore.griefPeer(pB.peerId, 4)
|
||||||
|
|
||||||
|
var sawAFirst = false
|
||||||
|
var sawBFirst = false
|
||||||
|
for i in 0 ..< 50:
|
||||||
|
let selected = pm.selectPeers(testProto)
|
||||||
|
check selected.len == 2
|
||||||
|
if selected[0].peerId == pA.peerId:
|
||||||
|
sawAFirst = true
|
||||||
|
else:
|
||||||
|
sawBFirst = true
|
||||||
|
|
||||||
|
# Both orderings should appear since they're in the same bucket
|
||||||
|
check sawAFirst
|
||||||
|
check sawBFirst
|
||||||
|
|
||||||
|
test "peers in different buckets never swap order":
|
||||||
|
let switch = newTestSwitch()
|
||||||
|
let pm = PeerManager.new(switch)
|
||||||
|
let peerStore = switch.peerStore
|
||||||
|
let pLow = makePeer(40001)
|
||||||
|
let pHigh = makePeer(40002)
|
||||||
|
peerStore.addPeer(pLow)
|
||||||
|
peerStore.addPeer(pHigh)
|
||||||
|
|
||||||
|
# pLow in bucket 0 (score 1), pHigh in bucket 1 (score 5)
|
||||||
|
peerStore.griefPeer(pLow.peerId, 1)
|
||||||
|
peerStore.griefPeer(pHigh.peerId, 5)
|
||||||
|
|
||||||
|
for i in 0 ..< 30:
|
||||||
|
let selected = pm.selectPeers(testProto)
|
||||||
|
check selected.len == 2
|
||||||
|
check selected[0].peerId == pLow.peerId
|
||||||
|
check selected[1].peerId == pHigh.peerId
|
||||||
|
|
||||||
|
test "zero-grief peers always come before grieved peers":
|
||||||
|
let switch = newTestSwitch()
|
||||||
|
let pm = PeerManager.new(switch)
|
||||||
|
let peerStore = switch.peerStore
|
||||||
|
let pClean1 = makePeer(50001)
|
||||||
|
let pClean2 = makePeer(50002)
|
||||||
|
let pGrieved = makePeer(50003)
|
||||||
|
peerStore.addPeer(pClean1)
|
||||||
|
peerStore.addPeer(pClean2)
|
||||||
|
peerStore.addPeer(pGrieved)
|
||||||
|
|
||||||
|
peerStore.griefPeer(pGrieved.peerId, 6)
|
||||||
|
|
||||||
|
for i in 0 ..< 20:
|
||||||
|
let selected = pm.selectPeers(testProto)
|
||||||
|
check selected.len == 3
|
||||||
|
# Grieved peer (bucket 1) must be last; clean peers (bucket 0) first
|
||||||
|
check selected[2].peerId == pGrieved.peerId
|
||||||
|
|
||||||
|
test "peers beyond MaxGriefBucket are excluded from selection":
|
||||||
|
let switch = newTestSwitch()
|
||||||
|
let pm = PeerManager.new(switch)
|
||||||
|
let peerStore = switch.peerStore
|
||||||
|
let pGood = makePeer(60001)
|
||||||
|
let pBad = makePeer(60002)
|
||||||
|
peerStore.addPeer(pGood)
|
||||||
|
peerStore.addPeer(pBad)
|
||||||
|
|
||||||
|
# pBad in bucket 4 (score 20, 20 div 5 = 4 > MaxGriefBucket)
|
||||||
|
peerStore.griefPeer(pBad.peerId, 20)
|
||||||
|
|
||||||
|
let selected = pm.selectPeers(testProto)
|
||||||
|
check selected.len == 1
|
||||||
|
check selected[0].peerId == pGood.peerId
|
||||||
|
|||||||
@ -203,6 +203,7 @@ suite "Waku Peer Exchange":
|
|||||||
# Start and mount peer exchange
|
# Start and mount peer exchange
|
||||||
await allFutures([node1.start(), node2.start()])
|
await allFutures([node1.start(), node2.start()])
|
||||||
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])
|
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])
|
||||||
|
await allFutures([node1.mountPeerExchangeClient(), node2.mountPeerExchangeClient()])
|
||||||
|
|
||||||
# Create connection
|
# Create connection
|
||||||
let connOpt = await node2.peerManager.dialPeer(
|
let connOpt = await node2.peerManager.dialPeer(
|
||||||
|
|||||||
@ -215,27 +215,64 @@ proc loadFromStorage(pm: PeerManager) {.gcsafe.} =
|
|||||||
|
|
||||||
trace "recovered peers from storage", amount = amount
|
trace "recovered peers from storage", amount = amount
|
||||||
|
|
||||||
|
proc griefPeer*(pm: PeerManager, peerId: PeerId, amount: int = 1) =
|
||||||
|
if not pm.isNil:
|
||||||
|
pm.switch.peerStore.griefPeer(peerId, amount)
|
||||||
|
|
||||||
|
proc sortByGriefScore(pm: PeerManager, peers: var seq[RemotePeerInfo]) =
|
||||||
|
## Sorts peers by grief score ascending, with random shuffling within each
|
||||||
|
## score tier. Peers with lower grief are preferred.
|
||||||
|
## NOTE: shuffling defaultPeerStoreCapacity (750 currently) on demand is
|
||||||
|
## negligible, but if that increases, might be worth exploring different
|
||||||
|
## data structures.
|
||||||
|
let peerStore = pm.switch.peerStore
|
||||||
|
|
||||||
|
# Resolve grief scores for all peers
|
||||||
|
var anyGrief = false
|
||||||
|
var scored: seq[(int, RemotePeerInfo)]
|
||||||
|
for p in peers:
|
||||||
|
let score = peerStore.getGriefScore(p.peerId)
|
||||||
|
if score > 0:
|
||||||
|
anyGrief = true
|
||||||
|
scored.add((score, p))
|
||||||
|
|
||||||
|
# Fast path: if all peers are at 0, just shuffle
|
||||||
|
if not anyGrief:
|
||||||
|
shuffle(peers)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Shuffle first so that within-bucket order is random
|
||||||
|
shuffle(scored)
|
||||||
|
|
||||||
|
# Stable sort by bucket preserves the random order within each bucket
|
||||||
|
scored.sort(
|
||||||
|
proc(a, b: (int, RemotePeerInfo)): int =
|
||||||
|
cmp(a[0] div GriefBucketSize, b[0] div GriefBucketSize),
|
||||||
|
order = SortOrder.Ascending)
|
||||||
|
|
||||||
|
# Drop peers beyond the max grief bucket
|
||||||
|
peers = scored.filterIt(it[0] div GriefBucketSize <= MaxGriefBucket).mapIt(it[1])
|
||||||
|
|
||||||
proc selectPeers*(
|
proc selectPeers*(
|
||||||
pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic)
|
pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic)
|
||||||
): seq[RemotePeerInfo] =
|
): seq[RemotePeerInfo] =
|
||||||
## Returns all peers that support the given protocol (and optionally shard),
|
## Returns all peers that support the given protocol (and optionally shard),
|
||||||
## shuffled randomly. Callers can further filter or pick from this list.
|
## sorted by grief score ascending (shuffled within each score tier).
|
||||||
var peers = pm.switch.peerStore.getPeersByProtocol(proto)
|
result = pm.switch.peerStore.getPeersByProtocol(proto)
|
||||||
trace "Selecting peers from peerstore",
|
trace "Selecting peers from peerstore",
|
||||||
protocol = proto, num_peers = peers.len, address = cast[uint](pm.switch.peerStore)
|
protocol = proto, num_peers = result.len, address = cast[uint](pm.switch.peerStore)
|
||||||
|
|
||||||
if shard.isSome():
|
if shard.isSome():
|
||||||
let shardInfo = RelayShard.parse(shard.get()).valueOr:
|
let shardInfo = RelayShard.parse(shard.get()).valueOr:
|
||||||
trace "Failed to parse shard from pubsub topic", topic = shard.get()
|
trace "Failed to parse shard from pubsub topic", topic = shard.get()
|
||||||
return @[]
|
return @[]
|
||||||
|
|
||||||
peers.keepItIf(
|
result.keepItIf(
|
||||||
(it.enr.isSome() and it.enr.get().containsShard(shard.get())) or
|
(it.enr.isSome() and it.enr.get().containsShard(shard.get())) or
|
||||||
(it.shards.len > 0 and it.shards.contains(shardInfo.shardId))
|
(it.shards.len > 0 and it.shards.contains(shardInfo.shardId))
|
||||||
)
|
)
|
||||||
|
|
||||||
shuffle(peers)
|
pm.sortByGriefScore(result)
|
||||||
return peers
|
|
||||||
|
|
||||||
proc selectPeer*(
|
proc selectPeer*(
|
||||||
pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic)
|
pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic)
|
||||||
@ -640,7 +677,7 @@ proc connectToRelayPeers*(pm: PeerManager) {.async.} =
|
|||||||
|
|
||||||
var outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
|
var outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
|
||||||
|
|
||||||
shuffle(outsideBackoffPeers)
|
pm.sortByGriefScore(outsideBackoffPeers)
|
||||||
|
|
||||||
var index = 0
|
var index = 0
|
||||||
var numPendingConnReqs =
|
var numPendingConnReqs =
|
||||||
|
|||||||
@ -43,6 +43,10 @@ type
|
|||||||
# Keeps track of peer shards
|
# Keeps track of peer shards
|
||||||
ShardBook* = ref object of PeerBook[seq[uint16]]
|
ShardBook* = ref object of PeerBook[seq[uint16]]
|
||||||
|
|
||||||
|
# Keeps track of peer grief: (score, lastCooldownTime)
|
||||||
|
GriefData* = tuple[score: int, cooldownTime: Moment]
|
||||||
|
GriefBook* = ref object of PeerBook[GriefData]
|
||||||
|
|
||||||
proc getPeer*(peerStore: PeerStore, peerId: PeerId): RemotePeerInfo =
|
proc getPeer*(peerStore: PeerStore, peerId: PeerId): RemotePeerInfo =
|
||||||
let addresses =
|
let addresses =
|
||||||
if peerStore[LastSeenBook][peerId].isSome():
|
if peerStore[LastSeenBook][peerId].isSome():
|
||||||
@ -69,6 +73,8 @@ proc getPeer*(peerStore: PeerStore, peerId: PeerId): RemotePeerInfo =
|
|||||||
direction: peerStore[DirectionBook][peerId],
|
direction: peerStore[DirectionBook][peerId],
|
||||||
lastFailedConn: peerStore[LastFailedConnBook][peerId],
|
lastFailedConn: peerStore[LastFailedConnBook][peerId],
|
||||||
numberFailedConn: peerStore[NumberFailedConnBook][peerId],
|
numberFailedConn: peerStore[NumberFailedConnBook][peerId],
|
||||||
|
griefScore: peerStore[GriefBook][peerId].score,
|
||||||
|
griefCooldownTime: peerStore[GriefBook][peerId].cooldownTime,
|
||||||
mixPubKey:
|
mixPubKey:
|
||||||
if peerStore[MixPubKeyBook][peerId] != default(Curve25519Key):
|
if peerStore[MixPubKeyBook][peerId] != default(Curve25519Key):
|
||||||
some(peerStore[MixPubKeyBook][peerId])
|
some(peerStore[MixPubKeyBook][peerId])
|
||||||
@ -143,6 +149,9 @@ proc addPeer*(peerStore: PeerStore, peer: RemotePeerInfo, origin = UnknownOrigin
|
|||||||
peerStore[LastFailedConnBook].book.hasKeyOrPut(peer.peerId, peer.lastFailedConn)
|
peerStore[LastFailedConnBook].book.hasKeyOrPut(peer.peerId, peer.lastFailedConn)
|
||||||
discard
|
discard
|
||||||
peerStore[NumberFailedConnBook].book.hasKeyOrPut(peer.peerId, peer.numberFailedConn)
|
peerStore[NumberFailedConnBook].book.hasKeyOrPut(peer.peerId, peer.numberFailedConn)
|
||||||
|
discard peerStore[GriefBook].book.hasKeyOrPut(
|
||||||
|
peer.peerId, (score: peer.griefScore, cooldownTime: peer.griefCooldownTime)
|
||||||
|
)
|
||||||
if peer.enr.isSome():
|
if peer.enr.isSome():
|
||||||
peerStore[ENRBook][peer.peerId] = peer.enr.get()
|
peerStore[ENRBook][peer.peerId] = peer.enr.get()
|
||||||
|
|
||||||
@ -241,3 +250,78 @@ template forEnrPeers*(
|
|||||||
let peerOrigin {.inject.} = sourceBook.book.getOrDefault(pid, UnknownOrigin)
|
let peerOrigin {.inject.} = sourceBook.book.getOrDefault(pid, UnknownOrigin)
|
||||||
let peerEnrRecord {.inject.} = enrRecord
|
let peerEnrRecord {.inject.} = enrRecord
|
||||||
body
|
body
|
||||||
|
|
||||||
|
#~~~~~~~~~~~~~~~~~~#
|
||||||
|
# Grief Management #
|
||||||
|
#~~~~~~~~~~~~~~~~~~#
|
||||||
|
|
||||||
|
const
|
||||||
|
# Each grief point represents this much cooldown time.
|
||||||
|
# A peer with grief score N will fully cool down after N * GriefCooldownInterval.
|
||||||
|
GriefCooldownInterval* = chronos.minutes(1)
|
||||||
|
GriefBucketSize* = 5 ## peers within this many points sort equally
|
||||||
|
MaxGriefBucket* = 3 ## peers in bucket > this are excluded from selection
|
||||||
|
|
||||||
|
MinGriefScore* = 1 ## stream errors, timeouts
|
||||||
|
LowGriefScore* = 2 ## non-success response codes
|
||||||
|
MediumGriefScore* = 3 ## decode failures, protocol violations
|
||||||
|
HighGriefScore* = 5 ## requestId mismatch, active misbehavior
|
||||||
|
|
||||||
|
const defaultGriefData: GriefData = (score: 0, cooldownTime: Moment.init(0, Second))
|
||||||
|
|
||||||
|
proc resolveGriefScore(
|
||||||
|
peerStore: PeerStore, peerId: PeerId,
|
||||||
|
now: Moment = Moment.init(0, Second)
|
||||||
|
): int =
|
||||||
|
## Lazily resolves the grief score for a peer by applying cooldown based on
|
||||||
|
## elapsed time. Updates the stored score and cooldown time in place.
|
||||||
|
## Returns the resolved (current) grief score.
|
||||||
|
## Pass now for testing; default (0) uses the system clock.
|
||||||
|
var data = peerStore[GriefBook].book.getOrDefault(peerId, defaultGriefData)
|
||||||
|
if data.score <= 0:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
let clock = if now == Moment.init(0, Second): Moment.now() else: now
|
||||||
|
let elapsed = clock - data.cooldownTime
|
||||||
|
let cooldowns = int(elapsed.minutes div GriefCooldownInterval.minutes)
|
||||||
|
|
||||||
|
if cooldowns > 0:
|
||||||
|
data.score = max(data.score - cooldowns, 0)
|
||||||
|
# Advance by exactly the consumed cooldown time, preserving remainder
|
||||||
|
data.cooldownTime =
|
||||||
|
data.cooldownTime + chronos.minutes(int64(cooldowns) * GriefCooldownInterval.minutes)
|
||||||
|
peerStore[GriefBook][peerId] = data
|
||||||
|
|
||||||
|
return data.score
|
||||||
|
|
||||||
|
proc griefPeer*(
|
||||||
|
peerStore: PeerStore, peerId: PeerId, amount: int = 1,
|
||||||
|
now: Moment = Moment.init(0, Second)
|
||||||
|
) =
|
||||||
|
## Increases the grief score of a peer by the given amount.
|
||||||
|
## If this is the first grief for a peer at score 0, initializes the cooldown time.
|
||||||
|
## Pass now for testing; default (0) uses the system clock.
|
||||||
|
if amount <= 0:
|
||||||
|
return
|
||||||
|
|
||||||
|
let clock = if now == Moment.init(0, Second): Moment.now() else: now
|
||||||
|
|
||||||
|
# resolveGriefScore already wrote back the cooled-down data, so read it once
|
||||||
|
let currentScore = peerStore.resolveGriefScore(peerId, clock)
|
||||||
|
var data = peerStore[GriefBook].book.getOrDefault(peerId, defaultGriefData)
|
||||||
|
|
||||||
|
data.score += amount
|
||||||
|
|
||||||
|
# If peer was at 0 (no prior cooldown running), start cooldown from now
|
||||||
|
if currentScore == 0:
|
||||||
|
data.cooldownTime = clock
|
||||||
|
|
||||||
|
peerStore[GriefBook][peerId] = data
|
||||||
|
|
||||||
|
proc getGriefScore*(
|
||||||
|
peerStore: PeerStore, peerId: PeerId,
|
||||||
|
now: Moment = Moment.init(0, Second)
|
||||||
|
): int =
|
||||||
|
## Returns the current grief score of a peer after applying cooldown.
|
||||||
|
## Pass now for testing; default (0) uses the system clock.
|
||||||
|
return peerStore.resolveGriefScore(peerId, now)
|
||||||
|
|||||||
@ -62,6 +62,8 @@ type RemotePeerInfo* = ref object
|
|||||||
direction*: PeerDirection
|
direction*: PeerDirection
|
||||||
lastFailedConn*: Moment
|
lastFailedConn*: Moment
|
||||||
numberFailedConn*: int
|
numberFailedConn*: int
|
||||||
|
griefScore*: int
|
||||||
|
griefCooldownTime*: Moment
|
||||||
|
|
||||||
func `$`*(remotePeerInfo: RemotePeerInfo): string =
|
func `$`*(remotePeerInfo: RemotePeerInfo): string =
|
||||||
$remotePeerInfo.peerId
|
$remotePeerInfo.peerId
|
||||||
@ -87,6 +89,8 @@ proc init*(
|
|||||||
direction: PeerDirection = UnknownDirection,
|
direction: PeerDirection = UnknownDirection,
|
||||||
lastFailedConn: Moment = Moment.init(0, Second),
|
lastFailedConn: Moment = Moment.init(0, Second),
|
||||||
numberFailedConn: int = 0,
|
numberFailedConn: int = 0,
|
||||||
|
griefScore: int = 0,
|
||||||
|
griefCooldownTime: Moment = Moment.init(0, Second),
|
||||||
mixPubKey: Option[Curve25519Key] = none(Curve25519Key),
|
mixPubKey: Option[Curve25519Key] = none(Curve25519Key),
|
||||||
): T =
|
): T =
|
||||||
RemotePeerInfo(
|
RemotePeerInfo(
|
||||||
@ -104,6 +108,8 @@ proc init*(
|
|||||||
direction: direction,
|
direction: direction,
|
||||||
lastFailedConn: lastFailedConn,
|
lastFailedConn: lastFailedConn,
|
||||||
numberFailedConn: numberFailedConn,
|
numberFailedConn: numberFailedConn,
|
||||||
|
griefScore: griefScore,
|
||||||
|
griefCooldownTime: griefCooldownTime,
|
||||||
mixPubKey: mixPubKey,
|
mixPubKey: mixPubKey,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@ -11,7 +11,8 @@ import
|
|||||||
stew/byteutils
|
stew/byteutils
|
||||||
import
|
import
|
||||||
waku/
|
waku/
|
||||||
[node/peer_manager, waku_core, events/delivery_events, common/broker/broker_context],
|
[node/peer_manager, waku_core, events/delivery_events,
|
||||||
|
common/broker/broker_context],
|
||||||
./common,
|
./common,
|
||||||
./protocol_metrics,
|
./protocol_metrics,
|
||||||
./rpc_codec,
|
./rpc_codec,
|
||||||
@ -64,6 +65,7 @@ proc sendSubscribeRequest(
|
|||||||
"exception in waku_filter_v2 client writeLP: " & getCurrentExceptionMsg()
|
"exception in waku_filter_v2 client writeLP: " & getCurrentExceptionMsg()
|
||||||
trace "exception in waku_filter_v2 client writeLP", error = getCurrentExceptionMsg()
|
trace "exception in waku_filter_v2 client writeLP", error = getCurrentExceptionMsg()
|
||||||
waku_filter_errors.inc(labelValues = [errMsg])
|
waku_filter_errors.inc(labelValues = [errMsg])
|
||||||
|
wfc.peerManager.griefPeer(servicePeer.peerId, MinGriefScore) # stream error: transient
|
||||||
return err(FilterSubscribeError.badResponse(errMsg))
|
return err(FilterSubscribeError.badResponse(errMsg))
|
||||||
|
|
||||||
var respBuf: seq[byte]
|
var respBuf: seq[byte]
|
||||||
@ -74,11 +76,13 @@ proc sendSubscribeRequest(
|
|||||||
"exception in waku_filter_v2 client readLp: " & getCurrentExceptionMsg()
|
"exception in waku_filter_v2 client readLp: " & getCurrentExceptionMsg()
|
||||||
trace "exception in waku_filter_v2 client readLp", error = getCurrentExceptionMsg()
|
trace "exception in waku_filter_v2 client readLp", error = getCurrentExceptionMsg()
|
||||||
waku_filter_errors.inc(labelValues = [errMsg])
|
waku_filter_errors.inc(labelValues = [errMsg])
|
||||||
|
wfc.peerManager.griefPeer(servicePeer.peerId, MinGriefScore) # stream error: transient
|
||||||
return err(FilterSubscribeError.badResponse(errMsg))
|
return err(FilterSubscribeError.badResponse(errMsg))
|
||||||
|
|
||||||
let response = FilterSubscribeResponse.decode(respBuf).valueOr:
|
let response = FilterSubscribeResponse.decode(respBuf).valueOr:
|
||||||
trace "Failed to decode filter subscribe response", servicePeer
|
trace "Failed to decode filter subscribe response", servicePeer
|
||||||
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
|
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
|
||||||
|
wfc.peerManager.griefPeer(servicePeer.peerId, MediumGriefScore) # decode failure: protocol violation
|
||||||
return err(FilterSubscribeError.badResponse(decodeRpcFailure))
|
return err(FilterSubscribeError.badResponse(decodeRpcFailure))
|
||||||
|
|
||||||
# DOS protection rate limit checks does not know about request id
|
# DOS protection rate limit checks does not know about request id
|
||||||
@ -86,11 +90,13 @@ proc sendSubscribeRequest(
|
|||||||
response.requestId != filterSubscribeRequest.requestId:
|
response.requestId != filterSubscribeRequest.requestId:
|
||||||
trace "Filter subscribe response requestId mismatch", servicePeer, response
|
trace "Filter subscribe response requestId mismatch", servicePeer, response
|
||||||
waku_filter_errors.inc(labelValues = [requestIdMismatch])
|
waku_filter_errors.inc(labelValues = [requestIdMismatch])
|
||||||
|
wfc.peerManager.griefPeer(servicePeer.peerId, HighGriefScore) # requestId mismatch: misbehavior
|
||||||
return err(FilterSubscribeError.badResponse(requestIdMismatch))
|
return err(FilterSubscribeError.badResponse(requestIdMismatch))
|
||||||
|
|
||||||
if response.statusCode != 200:
|
if response.statusCode != 200:
|
||||||
trace "Filter subscribe error response", servicePeer, response
|
trace "Filter subscribe error response", servicePeer, response
|
||||||
waku_filter_errors.inc(labelValues = [errorResponse])
|
waku_filter_errors.inc(labelValues = [errorResponse])
|
||||||
|
wfc.peerManager.griefPeer(servicePeer.peerId, LowGriefScore) # non-success response: rejection
|
||||||
let cause =
|
let cause =
|
||||||
if response.statusDesc.isSome():
|
if response.statusDesc.isSome():
|
||||||
response.statusDesc.get()
|
response.statusDesc.get()
|
||||||
@ -188,6 +194,7 @@ proc initProtocolHandler(wfc: WakuFilterClient) =
|
|||||||
let msgPush = MessagePush.decode(buf).valueOr:
|
let msgPush = MessagePush.decode(buf).valueOr:
|
||||||
error "Failed to decode message push", peerId = conn.peerId, error = $error
|
error "Failed to decode message push", peerId = conn.peerId, error = $error
|
||||||
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
|
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
|
||||||
|
wfc.peerManager.griefPeer(conn.peerId, MediumGriefScore) # decode failure: protocol violation
|
||||||
return
|
return
|
||||||
|
|
||||||
let msg_hash =
|
let msg_hash =
|
||||||
|
|||||||
@ -59,6 +59,7 @@ proc sendPushRequest(
|
|||||||
buffer = await connection.readLp(DefaultMaxRpcSize.int)
|
buffer = await connection.readLp(DefaultMaxRpcSize.int)
|
||||||
except LPStreamRemoteClosedError:
|
except LPStreamRemoteClosedError:
|
||||||
error "Failed to read response from peer", error = getCurrentExceptionMsg()
|
error "Failed to read response from peer", error = getCurrentExceptionMsg()
|
||||||
|
wl.peerManager.griefPeer(connection.peerId, MinGriefScore) # stream closed: transient
|
||||||
return lightpushResultInternalError(
|
return lightpushResultInternalError(
|
||||||
"Failed to read response from peer: " & getCurrentExceptionMsg()
|
"Failed to read response from peer: " & getCurrentExceptionMsg()
|
||||||
)
|
)
|
||||||
@ -66,15 +67,20 @@ proc sendPushRequest(
|
|||||||
let response = LightpushResponse.decode(buffer).valueOr:
|
let response = LightpushResponse.decode(buffer).valueOr:
|
||||||
error "failed to decode response"
|
error "failed to decode response"
|
||||||
waku_lightpush_v3_errors.inc(labelValues = [decodeRpcFailure])
|
waku_lightpush_v3_errors.inc(labelValues = [decodeRpcFailure])
|
||||||
|
wl.peerManager.griefPeer(connection.peerId, MediumGriefScore) # decode failure: protocol violation
|
||||||
return lightpushResultInternalError(decodeRpcFailure)
|
return lightpushResultInternalError(decodeRpcFailure)
|
||||||
|
|
||||||
if response.requestId != req.requestId and
|
if response.requestId != req.requestId and
|
||||||
response.statusCode != LightPushErrorCode.TOO_MANY_REQUESTS:
|
response.statusCode != LightPushErrorCode.TOO_MANY_REQUESTS:
|
||||||
error "response failure, requestId mismatch",
|
error "response failure, requestId mismatch",
|
||||||
requestId = req.requestId, responseRequestId = response.requestId
|
requestId = req.requestId, responseRequestId = response.requestId
|
||||||
|
wl.peerManager.griefPeer(connection.peerId, HighGriefScore) # requestId mismatch: misbehavior
|
||||||
return lightpushResultInternalError("response failure, requestId mismatch")
|
return lightpushResultInternalError("response failure, requestId mismatch")
|
||||||
|
|
||||||
return toPushResult(response)
|
let pushResult = toPushResult(response)
|
||||||
|
if pushResult.isErr():
|
||||||
|
wl.peerManager.griefPeer(connection.peerId, LowGriefScore) # non-success response: rejection
|
||||||
|
return pushResult
|
||||||
|
|
||||||
proc publish*(
|
proc publish*(
|
||||||
wl: WakuLightPushClient,
|
wl: WakuLightPushClient,
|
||||||
|
|||||||
@ -32,6 +32,7 @@ proc request*(
|
|||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
error "exception when handling peer exchange request", error = exc.msg
|
error "exception when handling peer exchange request", error = exc.msg
|
||||||
waku_px_client_errors.inc(labelValues = ["error_sending_or_receiving_px_req"])
|
waku_px_client_errors.inc(labelValues = ["error_sending_or_receiving_px_req"])
|
||||||
|
wpx.peerManager.griefPeer(conn.peerId, MinGriefScore) # stream error: transient
|
||||||
callResult = (
|
callResult = (
|
||||||
status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE,
|
status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE,
|
||||||
status_desc: some($exc.msg),
|
status_desc: some($exc.msg),
|
||||||
@ -46,6 +47,7 @@ proc request*(
|
|||||||
|
|
||||||
let decoded = PeerExchangeRpc.decode(buffer).valueOr:
|
let decoded = PeerExchangeRpc.decode(buffer).valueOr:
|
||||||
error "peer exchange request error decoding buffer", error = $error
|
error "peer exchange request error decoding buffer", error = $error
|
||||||
|
wpx.peerManager.griefPeer(conn.peerId, MediumGriefScore) # decode failure: protocol violation
|
||||||
return err(
|
return err(
|
||||||
(
|
(
|
||||||
status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE,
|
status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE,
|
||||||
@ -54,6 +56,7 @@ proc request*(
|
|||||||
)
|
)
|
||||||
if decoded.response.status_code != PeerExchangeResponseStatusCode.SUCCESS:
|
if decoded.response.status_code != PeerExchangeResponseStatusCode.SUCCESS:
|
||||||
error "peer exchange request error", status_code = decoded.response.status_code
|
error "peer exchange request error", status_code = decoded.response.status_code
|
||||||
|
wpx.peerManager.griefPeer(conn.peerId, LowGriefScore) # non-success response: rejection
|
||||||
return err(
|
return err(
|
||||||
(
|
(
|
||||||
status_code: decoded.response.status_code,
|
status_code: decoded.response.status_code,
|
||||||
|
|||||||
@ -8,7 +8,8 @@ import
|
|||||||
metrics,
|
metrics,
|
||||||
bearssl/rand
|
bearssl/rand
|
||||||
import
|
import
|
||||||
../node/peer_manager, ../utils/requests, ./protocol_metrics, ./common, ./rpc_codec
|
../node/peer_manager, ../utils/requests,
|
||||||
|
./protocol_metrics, ./common, ./rpc_codec
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "waku store client"
|
topics = "waku store client"
|
||||||
@ -42,20 +43,24 @@ proc sendStoreRequest(
|
|||||||
let writeRes = catch:
|
let writeRes = catch:
|
||||||
await connection.writeLP(req.encode().buffer)
|
await connection.writeLP(req.encode().buffer)
|
||||||
if writeRes.isErr():
|
if writeRes.isErr():
|
||||||
|
self.peerManager.griefPeer(connection.peerId, MinGriefScore) # stream error: transient
|
||||||
return err(StoreError(kind: ErrorCode.BAD_REQUEST, cause: writeRes.error.msg))
|
return err(StoreError(kind: ErrorCode.BAD_REQUEST, cause: writeRes.error.msg))
|
||||||
|
|
||||||
let readRes = catch:
|
let readRes = catch:
|
||||||
await connection.readLp(DefaultMaxRpcSize.int)
|
await connection.readLp(DefaultMaxRpcSize.int)
|
||||||
|
|
||||||
let buf = readRes.valueOr:
|
let buf = readRes.valueOr:
|
||||||
|
self.peerManager.griefPeer(connection.peerId, MinGriefScore) # stream error: transient
|
||||||
return err(StoreError(kind: ErrorCode.BAD_RESPONSE, cause: error.msg))
|
return err(StoreError(kind: ErrorCode.BAD_RESPONSE, cause: error.msg))
|
||||||
|
|
||||||
let res = StoreQueryResponse.decode(buf).valueOr:
|
let res = StoreQueryResponse.decode(buf).valueOr:
|
||||||
waku_store_errors.inc(labelValues = [DecodeRpcFailure])
|
waku_store_errors.inc(labelValues = [DecodeRpcFailure])
|
||||||
|
self.peerManager.griefPeer(connection.peerId, MediumGriefScore) # decode failure: protocol violation
|
||||||
return err(StoreError(kind: ErrorCode.BAD_RESPONSE, cause: DecodeRpcFailure))
|
return err(StoreError(kind: ErrorCode.BAD_RESPONSE, cause: DecodeRpcFailure))
|
||||||
|
|
||||||
if res.statusCode != uint32(StatusCode.SUCCESS):
|
if res.statusCode != uint32(StatusCode.SUCCESS):
|
||||||
waku_store_errors.inc(labelValues = [NoSuccessStatusCode])
|
waku_store_errors.inc(labelValues = [NoSuccessStatusCode])
|
||||||
|
self.peerManager.griefPeer(connection.peerId, LowGriefScore) # non-success response: rejection
|
||||||
return err(StoreError.new(res.statusCode, res.statusDesc))
|
return err(StoreError.new(res.statusCode, res.statusDesc))
|
||||||
|
|
||||||
if req.pubsubTopic.isSome():
|
if req.pubsubTopic.isSome():
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user