diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 54d6f281b..7637439a1 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -42,6 +42,9 @@ proc addSeen*(f: FloodSub, msgId: MessageID): bool = # Return true if the message has already been seen f.seen.put(f.seenSalt & msgId) +proc firstSeen*(f: FloodSub, msgId: MessageID): Moment = + f.seen.addedAt(f.seenSalt & msgId) + proc handleSubscribe*(f: FloodSub, peer: PubsubPeer, topic: string, diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 175e6956f..5f94f2f65 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -28,7 +28,7 @@ import ./pubsub, import stew/results export results -import ./gossipsub/[types, scoring, behavior] +import ./gossipsub/[types, scoring, behavior], ../../utils/heartbeat export types, scoring, behavior, pubsub @@ -381,12 +381,16 @@ method rpcHandler*(g: GossipSub, # remote attacking the hash function if g.addSeen(msgId): trace "Dropping already-seen message", msgId = shortLog(msgId), peer - # make sure to update score tho before continuing - # TODO: take into account meshMessageDeliveriesWindow - # score only if messages are not too old. - g.rewardDelivered(peer, msg.topicIDs, false) - g.validationSeen.withValue(msgIdSalted, seen): seen[].incl(peer) + var alreadyReceived = false + g.validationSeen.withValue(msgIdSalted, seen): + if seen[].containsOrIncl(peer): + # peer sent us this message twice + alreadyReceived = true + + if not alreadyReceived: + let delay = Moment.now() - g.firstSeen(msgId) + g.rewardDelivered(peer, msg.topicIDs, false, delay) libp2p_gossipsub_duplicate.inc() @@ -563,7 +567,7 @@ method publish*(g: GossipSub, return peers.len proc maintainDirectPeers(g: GossipSub) {.async.} = - while g.heartbeatRunning: + heartbeat "GossipSub DirectPeers", 1.minutes: for id, addrs in g.parameters.directPeers: let peer = g.peers.getOrDefault(id) if isNil(peer): @@ -579,8 +583,6 @@ proc maintainDirectPeers(g: GossipSub) {.async.} = except CatchableError as exc: debug "Direct peer error dialing", msg = exc.msg - await sleepAsync(1.minutes) - method start*(g: GossipSub) {.async.} = trace "gossipsub start" @@ -588,8 +590,8 @@ method start*(g: GossipSub) {.async.} = warn "Starting gossipsub twice" return - g.heartbeatRunning = true g.heartbeatFut = g.heartbeat() + g.scoringHeartbeatFut = g.scoringHeartbeat() g.directPeersLoop = g.maintainDirectPeers() method stop*(g: GossipSub) {.async.} = @@ -599,13 +601,10 @@ method stop*(g: GossipSub) {.async.} = return # stop heartbeat interval - g.heartbeatRunning = false g.directPeersLoop.cancel() - if not g.heartbeatFut.finished: - trace "awaiting last heartbeat" - await g.heartbeatFut - trace "heartbeat stopped" - g.heartbeatFut = nil + g.scoringHeartbeatFut.cancel() + g.heartbeatFut.cancel() + g.heartbeatFut = nil method initPubSub*(g: GossipSub) {.raises: [Defect, InitializationError].} = diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index 66bd69ddb..17e830ec3 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -14,7 +14,7 @@ import chronos, chronicles, metrics import "."/[types, scoring] import ".."/[pubsubpeer, peertable, timedcache, mcache, floodsub, pubsub] import "../rpc"/[messages] -import "../../.."/[peerid, multiaddress, utility, switch, routing_record, signed_envelope] +import "../../.."/[peerid, multiaddress, utility, switch, routing_record, signed_envelope, utils/heartbeat] declareGauge(libp2p_gossipsub_cache_window_size, "the number of messages in the cache") declareGauge(libp2p_gossipsub_peers_per_topic_mesh, "gossipsub peers per topic in mesh", labels = ["topic"]) @@ -608,8 +608,6 @@ proc onHeartbeat(g: GossipSub) {.raises: [Defect].} = peer.iWantBudget = IWantPeerBudget peer.iHaveBudget = IHavePeerBudget - g.updateScores() - var meshMetrics = MeshMetrics() for t in toSeq(g.topics.keys): @@ -663,12 +661,10 @@ proc onHeartbeat(g: GossipSub) {.raises: [Defect].} = # {.pop.} # raises [Defect] proc heartbeat*(g: GossipSub) {.async.} = - while g.heartbeatRunning: + heartbeat "GossipSub", g.parameters.heartbeatInterval: trace "running heartbeat", instance = cast[int](g) g.onHeartbeat() for trigger in g.heartbeatEvents: trace "firing heartbeat event", instance = cast[int](g) trigger.fire() - - await sleepAsync(g.parameters.heartbeatInterval) diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index 7a2bf21a2..872ad0b6b 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -13,7 +13,7 @@ import std/[tables, sets, options] import chronos, chronicles, metrics import "."/[types] import ".."/[pubsubpeer] -import "../../.."/[peerid, multiaddress, utility, switch] +import "../../.."/[peerid, multiaddress, utility, switch, utils/heartbeat] declareGauge(libp2p_gossipsub_peers_scores, "the scores of the peers in gossipsub", labels = ["agent"]) declareCounter(libp2p_gossipsub_bad_score_disconnection, "the number of peers disconnected by gossipsub", labels = ["agent"]) @@ -254,6 +254,11 @@ proc updateScores*(g: GossipSub) = # avoid async trace "updated scores", peers = g.peers.len +proc scoringHeartbeat*(g: GossipSub) {.async.} = + heartbeat "Gossipsub scoring", g.parameters.decayInterval: + trace "running scoring heartbeat", instance = cast[int](g) + g.updateScores() + proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, topics: seq[string]) = for tt in topics: let t = tt @@ -269,7 +274,7 @@ proc addCapped*[T](stat: var T, diff, cap: T) = stat += min(diff, cap - stat) proc rewardDelivered*( - g: GossipSub, peer: PubSubPeer, topics: openArray[string], first: bool) = + g: GossipSub, peer: PubSubPeer, topics: openArray[string], first: bool, delay = ZeroDuration) = for tt in topics: let t = tt if t notin g.topics: @@ -279,6 +284,10 @@ proc rewardDelivered*( let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init()) # if in mesh add more delivery score + if delay > topicParams.meshMessageDeliveriesWindow: + # Too old + continue + g.withPeerStats(peer.peerId) do (stats: var PeerStats): stats.topicInfos.withValue(tt, tstats): if tstats[].inMesh: diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index 5736c01fd..11279eacf 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -162,6 +162,7 @@ type mcache*: MCache # messages cache validationSeen*: ValidationSeenTable # peers who sent us message in validation heartbeatFut*: Future[void] # cancellation future for heartbeat interval + scoringHeartbeatFut*: Future[void] # cancellation future for scoring heartbeat interval heartbeatRunning*: bool peerStats*: Table[PeerId, PeerStats] diff --git a/libp2p/protocols/pubsub/timedcache.nim b/libp2p/protocols/pubsub/timedcache.nim index ab77e3581..ad47846fa 100644 --- a/libp2p/protocols/pubsub/timedcache.nim +++ b/libp2p/protocols/pubsub/timedcache.nim @@ -18,7 +18,7 @@ const Timeout* = 10.seconds # default timeout in ms type TimedEntry*[K] = ref object of RootObj key: K - expiresAt: Moment + addedAt: Moment next, prev: TimedEntry[K] TimedCache*[K] = object of RootObj @@ -27,7 +27,8 @@ type timeout: Duration func expire*(t: var TimedCache, now: Moment = Moment.now()) = - while t.head != nil and t.head.expiresAt < now: + let expirationLimit = now - t.timeout + while t.head != nil and t.head.addedAt < expirationLimit: t.entries.del(t.head.key) t.head.prev = nil t.head = t.head.next @@ -54,7 +55,7 @@ func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool = var res = t.del(k) # Refresh existing item - let node = TimedEntry[K](key: k, expiresAt: now + t.timeout) + let node = TimedEntry[K](key: k, addedAt: now) if t.head == nil: t.tail = node @@ -62,7 +63,7 @@ func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool = else: # search from tail because typically that's where we add when now grows var cur = t.tail - while cur != nil and node.expiresAt < cur.expiresAt: + while cur != nil and node.addedAt < cur.addedAt: cur = cur.prev if cur == nil: @@ -83,6 +84,10 @@ func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool = func contains*[K](t: TimedCache[K], k: K): bool = k in t.entries +func addedAt*[K](t: TimedCache[K], k: K): Moment = + t.entries.getOrDefault(k).addedAt + + func init*[K](T: type TimedCache[K], timeout: Duration = Timeout): T = T( timeout: timeout diff --git a/libp2p/utils/heartbeat.nim b/libp2p/utils/heartbeat.nim new file mode 100644 index 000000000..3fad818a0 --- /dev/null +++ b/libp2p/utils/heartbeat.nim @@ -0,0 +1,27 @@ +# Nim-Libp2p +# Copyright (c) 2022 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +{.push raises: [Defect].} + +import sequtils +import chronos, chronicles + +export chronicles + +template heartbeat*(name: string, interval: Duration, body: untyped): untyped = + var nextHeartbeat = Moment.now() + while true: + body + + nextHeartbeat += interval + let now = Moment.now() + if nextHeartbeat < now: + info "Missed heartbeat", heartbeat = name, delay = now - nextHeartbeat + nextHeartbeat = now + interval + await sleepAsync(nextHeartbeat - now) diff --git a/tests/pubsub/testgossipsub2.nim b/tests/pubsub/testgossipsub2.nim index 4c960d1ce..70d44b865 100644 --- a/tests/pubsub/testgossipsub2.nim +++ b/tests/pubsub/testgossipsub2.nim @@ -218,7 +218,6 @@ suite "GossipSub": ### await subscribeNodes(nodes) proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = discard - nodes[0].subscribe("foobar", handler) nodes[1].subscribe("foobar", handler) await invalidDetected.wait(10.seconds) @@ -439,3 +438,60 @@ suite "GossipSub": it.switch.stop()))) await allFuturesThrowing(nodesFut) + + asyncTest "GossipSub scoring - decayInterval": + + let + nodes = generateNodes(2, gossip = true) + + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), + ) + + var gossip = GossipSub(nodes[0]) + # MacOs has some nasty jitter when sleeping + # (up to 7 ms), so we need some pretty long + # sleeps to be safe here + gossip.parameters.decayInterval = 300.milliseconds + + # start pubsub + await allFuturesThrowing( + allFinished( + nodes[0].start(), + nodes[1].start(), + )) + + var handlerFut = newFuture[void]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + handlerFut.complete() + + await subscribeNodes(nodes) + + nodes[0].subscribe("foobar", handler) + nodes[1].subscribe("foobar", handler) + + tryPublish await nodes[0].publish("foobar", toBytes("hello")), 1 + + await handlerFut + + gossip.peerStats[nodes[1].peerInfo.peerId].topicInfos["foobar"].meshMessageDeliveries = 100 + gossip.topicParams["foobar"].meshMessageDeliveriesDecay = 0.9 + await sleepAsync(1500.milliseconds) + + # We should have decayed 5 times, though allowing 4..6 + check: + gossip.peerStats[nodes[1].peerInfo.peerId].topicInfos["foobar"].meshMessageDeliveries in 50.0 .. 66.0 + + await allFuturesThrowing( + nodes[0].switch.stop(), + nodes[1].switch.stop() + ) + + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) + + await allFuturesThrowing(nodesFut.concat()) diff --git a/tests/testheartbeat.nim b/tests/testheartbeat.nim new file mode 100644 index 000000000..6a748f90f --- /dev/null +++ b/tests/testheartbeat.nim @@ -0,0 +1,55 @@ +import chronos + +import ../libp2p/utils/heartbeat +import ./helpers + +# MacOs has some nasty jitter when sleeping +# (up to 7 ms), so we skip test there +when not defined(macosx): + suite "Heartbeat": + + asyncTest "simple heartbeat": + var i = 0 + proc t() {.async.} = + heartbeat "shouldn't see this", 30.milliseconds: + i.inc() + let hb = t() + await sleepAsync(300.milliseconds) + await hb.cancelAndWait() + check: + i in 9..11 + + asyncTest "change heartbeat period on the fly": + var i = 0 + proc t() {.async.} = + var period = 30.milliseconds + heartbeat "shouldn't see this", period: + i.inc() + if i >= 4: + period = 75.milliseconds + let hb = t() + await sleepAsync(500.milliseconds) + await hb.cancelAndWait() + + # 4x 30 ms heartbeat = 120ms + # (500 ms - 120 ms) / 75ms = 5x 75ms + # total 9 + check: + i in 8..10 + + asyncTest "catch up on slow heartbeat": + var i = 0 + proc t() {.async.} = + heartbeat "this is normal", 30.milliseconds: + if i < 3: + await sleepAsync(150.milliseconds) + i.inc() + + let hb = t() + await sleepAsync(900.milliseconds) + await hb.cancelAndWait() + # 3x (150ms heartbeat + 30ms interval) = 540ms + # 360ms remaining, / 30ms = 12x + # total 15 + check: + i in 14..16 diff --git a/tests/testnative.nim b/tests/testnative.nim index 519e89854..1baf69abe 100644 --- a/tests/testnative.nim +++ b/tests/testnative.nim @@ -2,7 +2,8 @@ import testvarint, testconnection, testminprotobuf, teststreamseq, - testsemaphore + testsemaphore, + testheartbeat import testminasn1, testrsa,