Gossipsub scoring fixes (#709)

* Use decayInterval as a scoring heartbeat period
* Take mesh delivery window into account
This commit is contained in:
Tanguy 2022-05-11 10:38:43 +02:00 committed by GitHub
parent 32ca1898d9
commit 991549f391
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 182 additions and 30 deletions

View File

@ -42,6 +42,9 @@ proc addSeen*(f: FloodSub, msgId: MessageID): bool =
# Return true if the message has already been seen
f.seen.put(f.seenSalt & msgId)
proc firstSeen*(f: FloodSub, msgId: MessageID): Moment =
f.seen.addedAt(f.seenSalt & msgId)
proc handleSubscribe*(f: FloodSub,
peer: PubsubPeer,
topic: string,

View File

@ -28,7 +28,7 @@ import ./pubsub,
import stew/results
export results
import ./gossipsub/[types, scoring, behavior]
import ./gossipsub/[types, scoring, behavior], ../../utils/heartbeat
export types, scoring, behavior, pubsub
@ -381,12 +381,16 @@ method rpcHandler*(g: GossipSub,
# remote attacking the hash function
if g.addSeen(msgId):
trace "Dropping already-seen message", msgId = shortLog(msgId), peer
# make sure to update score tho before continuing
# TODO: take into account meshMessageDeliveriesWindow
# score only if messages are not too old.
g.rewardDelivered(peer, msg.topicIDs, false)
g.validationSeen.withValue(msgIdSalted, seen): seen[].incl(peer)
var alreadyReceived = false
g.validationSeen.withValue(msgIdSalted, seen):
if seen[].containsOrIncl(peer):
# peer sent us this message twice
alreadyReceived = true
if not alreadyReceived:
let delay = Moment.now() - g.firstSeen(msgId)
g.rewardDelivered(peer, msg.topicIDs, false, delay)
libp2p_gossipsub_duplicate.inc()
@ -563,7 +567,7 @@ method publish*(g: GossipSub,
return peers.len
proc maintainDirectPeers(g: GossipSub) {.async.} =
while g.heartbeatRunning:
heartbeat "GossipSub DirectPeers", 1.minutes:
for id, addrs in g.parameters.directPeers:
let peer = g.peers.getOrDefault(id)
if isNil(peer):
@ -579,8 +583,6 @@ proc maintainDirectPeers(g: GossipSub) {.async.} =
except CatchableError as exc:
debug "Direct peer error dialing", msg = exc.msg
await sleepAsync(1.minutes)
method start*(g: GossipSub) {.async.} =
trace "gossipsub start"
@ -588,8 +590,8 @@ method start*(g: GossipSub) {.async.} =
warn "Starting gossipsub twice"
return
g.heartbeatRunning = true
g.heartbeatFut = g.heartbeat()
g.scoringHeartbeatFut = g.scoringHeartbeat()
g.directPeersLoop = g.maintainDirectPeers()
method stop*(g: GossipSub) {.async.} =
@ -599,13 +601,10 @@ method stop*(g: GossipSub) {.async.} =
return
# stop heartbeat interval
g.heartbeatRunning = false
g.directPeersLoop.cancel()
if not g.heartbeatFut.finished:
trace "awaiting last heartbeat"
await g.heartbeatFut
trace "heartbeat stopped"
g.heartbeatFut = nil
g.scoringHeartbeatFut.cancel()
g.heartbeatFut.cancel()
g.heartbeatFut = nil
method initPubSub*(g: GossipSub)
{.raises: [Defect, InitializationError].} =

View File

@ -14,7 +14,7 @@ import chronos, chronicles, metrics
import "."/[types, scoring]
import ".."/[pubsubpeer, peertable, timedcache, mcache, floodsub, pubsub]
import "../rpc"/[messages]
import "../../.."/[peerid, multiaddress, utility, switch, routing_record, signed_envelope]
import "../../.."/[peerid, multiaddress, utility, switch, routing_record, signed_envelope, utils/heartbeat]
declareGauge(libp2p_gossipsub_cache_window_size, "the number of messages in the cache")
declareGauge(libp2p_gossipsub_peers_per_topic_mesh, "gossipsub peers per topic in mesh", labels = ["topic"])
@ -608,8 +608,6 @@ proc onHeartbeat(g: GossipSub) {.raises: [Defect].} =
peer.iWantBudget = IWantPeerBudget
peer.iHaveBudget = IHavePeerBudget
g.updateScores()
var meshMetrics = MeshMetrics()
for t in toSeq(g.topics.keys):
@ -663,12 +661,10 @@ proc onHeartbeat(g: GossipSub) {.raises: [Defect].} =
# {.pop.} # raises [Defect]
proc heartbeat*(g: GossipSub) {.async.} =
while g.heartbeatRunning:
heartbeat "GossipSub", g.parameters.heartbeatInterval:
trace "running heartbeat", instance = cast[int](g)
g.onHeartbeat()
for trigger in g.heartbeatEvents:
trace "firing heartbeat event", instance = cast[int](g)
trigger.fire()
await sleepAsync(g.parameters.heartbeatInterval)

View File

@ -13,7 +13,7 @@ import std/[tables, sets, options]
import chronos, chronicles, metrics
import "."/[types]
import ".."/[pubsubpeer]
import "../../.."/[peerid, multiaddress, utility, switch]
import "../../.."/[peerid, multiaddress, utility, switch, utils/heartbeat]
declareGauge(libp2p_gossipsub_peers_scores, "the scores of the peers in gossipsub", labels = ["agent"])
declareCounter(libp2p_gossipsub_bad_score_disconnection, "the number of peers disconnected by gossipsub", labels = ["agent"])
@ -254,6 +254,11 @@ proc updateScores*(g: GossipSub) = # avoid async
trace "updated scores", peers = g.peers.len
proc scoringHeartbeat*(g: GossipSub) {.async.} =
heartbeat "Gossipsub scoring", g.parameters.decayInterval:
trace "running scoring heartbeat", instance = cast[int](g)
g.updateScores()
proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, topics: seq[string]) =
for tt in topics:
let t = tt
@ -269,7 +274,7 @@ proc addCapped*[T](stat: var T, diff, cap: T) =
stat += min(diff, cap - stat)
proc rewardDelivered*(
g: GossipSub, peer: PubSubPeer, topics: openArray[string], first: bool) =
g: GossipSub, peer: PubSubPeer, topics: openArray[string], first: bool, delay = ZeroDuration) =
for tt in topics:
let t = tt
if t notin g.topics:
@ -279,6 +284,10 @@ proc rewardDelivered*(
let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init())
# if in mesh add more delivery score
if delay > topicParams.meshMessageDeliveriesWindow:
# Too old
continue
g.withPeerStats(peer.peerId) do (stats: var PeerStats):
stats.topicInfos.withValue(tt, tstats):
if tstats[].inMesh:

View File

@ -162,6 +162,7 @@ type
mcache*: MCache # messages cache
validationSeen*: ValidationSeenTable # peers who sent us message in validation
heartbeatFut*: Future[void] # cancellation future for heartbeat interval
scoringHeartbeatFut*: Future[void] # cancellation future for scoring heartbeat interval
heartbeatRunning*: bool
peerStats*: Table[PeerId, PeerStats]

View File

@ -18,7 +18,7 @@ const Timeout* = 10.seconds # default timeout in ms
type
TimedEntry*[K] = ref object of RootObj
key: K
expiresAt: Moment
addedAt: Moment
next, prev: TimedEntry[K]
TimedCache*[K] = object of RootObj
@ -27,7 +27,8 @@ type
timeout: Duration
func expire*(t: var TimedCache, now: Moment = Moment.now()) =
while t.head != nil and t.head.expiresAt < now:
let expirationLimit = now - t.timeout
while t.head != nil and t.head.addedAt < expirationLimit:
t.entries.del(t.head.key)
t.head.prev = nil
t.head = t.head.next
@ -54,7 +55,7 @@ func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool =
var res = t.del(k) # Refresh existing item
let node = TimedEntry[K](key: k, expiresAt: now + t.timeout)
let node = TimedEntry[K](key: k, addedAt: now)
if t.head == nil:
t.tail = node
@ -62,7 +63,7 @@ func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool =
else:
# search from tail because typically that's where we add when now grows
var cur = t.tail
while cur != nil and node.expiresAt < cur.expiresAt:
while cur != nil and node.addedAt < cur.addedAt:
cur = cur.prev
if cur == nil:
@ -83,6 +84,10 @@ func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool =
func contains*[K](t: TimedCache[K], k: K): bool =
k in t.entries
func addedAt*[K](t: TimedCache[K], k: K): Moment =
t.entries.getOrDefault(k).addedAt
func init*[K](T: type TimedCache[K], timeout: Duration = Timeout): T =
T(
timeout: timeout

View File

@ -0,0 +1,27 @@
# Nim-Libp2p
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [Defect].}
import sequtils
import chronos, chronicles
export chronicles
template heartbeat*(name: string, interval: Duration, body: untyped): untyped =
var nextHeartbeat = Moment.now()
while true:
body
nextHeartbeat += interval
let now = Moment.now()
if nextHeartbeat < now:
info "Missed heartbeat", heartbeat = name, delay = now - nextHeartbeat
nextHeartbeat = now + interval
await sleepAsync(nextHeartbeat - now)

View File

@ -218,7 +218,6 @@ suite "GossipSub":
### await subscribeNodes(nodes)
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = discard
nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
await invalidDetected.wait(10.seconds)
@ -439,3 +438,60 @@ suite "GossipSub":
it.switch.stop())))
await allFuturesThrowing(nodesFut)
asyncTest "GossipSub scoring - decayInterval":
let
nodes = generateNodes(2, gossip = true)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
var gossip = GossipSub(nodes[0])
# MacOs has some nasty jitter when sleeping
# (up to 7 ms), so we need some pretty long
# sleeps to be safe here
gossip.parameters.decayInterval = 300.milliseconds
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
var handlerFut = newFuture[void]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
handlerFut.complete()
await subscribeNodes(nodes)
nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
tryPublish await nodes[0].publish("foobar", toBytes("hello")), 1
await handlerFut
gossip.peerStats[nodes[1].peerInfo.peerId].topicInfos["foobar"].meshMessageDeliveries = 100
gossip.topicParams["foobar"].meshMessageDeliveriesDecay = 0.9
await sleepAsync(1500.milliseconds)
# We should have decayed 5 times, though allowing 4..6
check:
gossip.peerStats[nodes[1].peerInfo.peerId].topicInfos["foobar"].meshMessageDeliveries in 50.0 .. 66.0
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop()
)
await allFuturesThrowing(nodesFut.concat())

55
tests/testheartbeat.nim Normal file
View File

@ -0,0 +1,55 @@
import chronos
import ../libp2p/utils/heartbeat
import ./helpers
# MacOs has some nasty jitter when sleeping
# (up to 7 ms), so we skip test there
when not defined(macosx):
suite "Heartbeat":
asyncTest "simple heartbeat":
var i = 0
proc t() {.async.} =
heartbeat "shouldn't see this", 30.milliseconds:
i.inc()
let hb = t()
await sleepAsync(300.milliseconds)
await hb.cancelAndWait()
check:
i in 9..11
asyncTest "change heartbeat period on the fly":
var i = 0
proc t() {.async.} =
var period = 30.milliseconds
heartbeat "shouldn't see this", period:
i.inc()
if i >= 4:
period = 75.milliseconds
let hb = t()
await sleepAsync(500.milliseconds)
await hb.cancelAndWait()
# 4x 30 ms heartbeat = 120ms
# (500 ms - 120 ms) / 75ms = 5x 75ms
# total 9
check:
i in 8..10
asyncTest "catch up on slow heartbeat":
var i = 0
proc t() {.async.} =
heartbeat "this is normal", 30.milliseconds:
if i < 3:
await sleepAsync(150.milliseconds)
i.inc()
let hb = t()
await sleepAsync(900.milliseconds)
await hb.cancelAndWait()
# 3x (150ms heartbeat + 30ms interval) = 540ms
# 360ms remaining, / 30ms = 12x
# total 15
check:
i in 14..16

View File

@ -2,7 +2,8 @@ import testvarint,
testconnection,
testminprotobuf,
teststreamseq,
testsemaphore
testsemaphore,
testheartbeat
import testminasn1,
testrsa,