From 05e789a34f6f1c3d037f85232c3d14d6a5e3932f Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni <7008900+sinkingsugar@users.noreply.github.com> Date: Sat, 19 Dec 2020 23:43:32 +0900 Subject: [PATCH] Gossipsub refactor (#490) * refactor peerStats, re-enable scores for testing * remove gossip 1.0 * cleanup * codecov matrix fixes * restore previous score on onNewPeer * fix coverage n checks * unsubscribeAll gossipsub fixes * refactor unsub/sub * refactor onNewPeer and fix score flow * disable scores by default (change in tests later) * fix tests, enable scores in tests * fix wrongly merged test * ensure topic removal from topics table * small typo fix * testinterop fixes --- .github/workflows/codecov.yml | 8 +- codecov.yml | 4 +- libp2p.nimble | 3 - libp2p/protocols/pubsub/floodsub.nim | 12 +- libp2p/protocols/pubsub/gossipsub.nim | 146 +++--- libp2p/protocols/pubsub/gossipsub10.nim | 637 ------------------------ libp2p/protocols/pubsub/pubsub.nim | 9 +- tests/pubsub/testfloodsub.nim | 16 +- tests/pubsub/testgossipinternal10.nim | 347 ------------- tests/pubsub/testgossipsub.nim | 39 +- tests/pubsub/utils.nim | 17 +- tests/testinterop.nim | 4 +- 12 files changed, 129 insertions(+), 1113 deletions(-) delete mode 100644 libp2p/protocols/pubsub/gossipsub10.nim delete mode 100644 tests/pubsub/testgossipinternal10.nim diff --git a/.github/workflows/codecov.yml b/.github/workflows/codecov.yml index f2d2221a6..a156877e5 100644 --- a/.github/workflows/codecov.yml +++ b/.github/workflows/codecov.yml @@ -10,15 +10,11 @@ jobs: nim-options: [ "", "-d:libp2p_pubsub_anonymize=true -d:libp2p_pubsub_sign=false -d:libp2p_pubsub_verify=false", - "-d:libp2p_pubsub_sign=true -d:libp2p_pubsub_verify=true", - "-d:fallback_gossipsub_10", - "-d:fallback_gossipsub_10 -d:libp2p_pubsub_anonymize=true -d:libp2p_pubsub_sign=false -d:libp2p_pubsub_verify=false", - "-d:fallback_gossipsub_10 -d:libp2p_pubsub_sign=true -d:libp2p_pubsub_verify=true" + "-d:libp2p_pubsub_sign=true -d:libp2p_pubsub_verify=true" ] test-program: [ "tests/pubsub/testpubsub", - "tests/pubsub/testgossipinternal", - "tests/pubsub/testgossipinternal10" + "tests/pubsub/testgossipinternal" ] steps: - uses: actions/checkout@v2 diff --git a/codecov.yml b/codecov.yml index 06a9122d8..25835b9a9 100644 --- a/codecov.yml +++ b/codecov.yml @@ -5,10 +5,10 @@ codecov: # notice that this number is for PRs; # like this we disabled notify on pure branches report # which is fine I guess - after_n_builds: 45 + after_n_builds: 25 comment: layout: "reach, diff, flags, files" - after_n_builds: 45 # must be the number of coverage report builds + after_n_builds: 25 # must be the number of coverage report builds coverage: status: project: diff --git a/libp2p.nimble b/libp2p.nimble index 8ecd0647d..12d0b7509 100644 --- a/libp2p.nimble +++ b/libp2p.nimble @@ -51,9 +51,6 @@ task testpubsub, "Runs pubsub tests": runTest("pubsub/testpubsub") runTest("pubsub/testpubsub", sign = false, verify = false) runTest("pubsub/testpubsub", sign = false, verify = false, moreoptions = "-d:libp2p_pubsub_anonymize=true") - runTest("pubsub/testgossipinternal10", sign = false, verify = false, moreoptions = "-d:pubsub_internal_testing") - runTest("pubsub/testpubsub", moreoptions = "-d:fallback_gossipsub_10") - runTest("pubsub/testpubsub", sign = false, verify = false, moreoptions = "-d:fallback_gossipsub_10") task testpubsub_slim, "Runs pubsub tests": runTest("pubsub/testgossipinternal", sign = false, verify = false, moreoptions = "-d:pubsub_internal_testing") diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 446e6c8d5..a604b7380 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -37,9 +37,9 @@ method subscribeTopic*(f: FloodSub, peer topic - # this is a workaround for a race condition + # this is a workaround for a race condition # that can happen if we disconnect the peer very early - # in the future we might use this as a test case + # in the future we might use this as a test case # and eventually remove this workaround if subscribe and peer.peerId notin f.peers: trace "ignoring unknown peer" @@ -183,14 +183,14 @@ method publish*(f: FloodSub, return peers.len method unsubscribe*(f: FloodSub, - topics: seq[TopicPair]) {.async.} = - await procCall PubSub(f).unsubscribe(topics) + topics: seq[TopicPair]) = + procCall PubSub(f).unsubscribe(topics) for p in f.peers.values: f.sendSubs(p, topics.mapIt(it.topic).deduplicate(), false) -method unsubscribeAll*(f: FloodSub, topic: string) {.async.} = - await procCall PubSub(f).unsubscribeAll(topic) +method unsubscribeAll*(f: FloodSub, topic: string) = + procCall PubSub(f).unsubscribeAll(topic) for p in f.peers.values: f.sendSubs(p, @[topic], false) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 5e75505c0..fc078c617 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -106,6 +106,7 @@ type PeerStats* = object topicInfos*: Table[string, TopicInfo] expire*: Moment # updated on disconnect, to retain scores until expire + score*: float64 # a copy of the score to keep in case the peer is disconnected GossipSubParams* = object explicit: bool @@ -157,7 +158,7 @@ type heartbeatFut: Future[void] # cancellation future for heartbeat interval heartbeatRunning: bool - peerStats: Table[PubSubPeer, PeerStats] + peerStats: Table[PeerID, PeerStats] parameters*: GossipSubParams topicParams*: Table[string, TopicParams] directPeersLoop: Future[void] @@ -246,7 +247,7 @@ proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] = proc init*(_: type[TopicParams]): TopicParams = TopicParams( - topicWeight: 0.0, # disable score + topicWeight: 0.0, # disabled by default timeInMeshWeight: 0.01, timeInMeshQuantum: 1.seconds, timeInMeshCap: 10.0, @@ -306,19 +307,21 @@ method init*(g: GossipSub) = g.codecs &= GossipSubCodec g.codecs &= GossipSubCodec_10 +proc initPeerStats(g: GossipSub, peer: PubSubPeer) = + g.peerStats[peer.peerId] = PeerStats() + peer.iWantBudget = IWantPeerBudget + peer.iHaveBudget = IHavePeerBudget + method onNewPeer(g: GossipSub, peer: PubSubPeer) = - if peer notin g.peerStats: - # new peer - g.peerStats[peer] = PeerStats() - peer.iWantBudget = IWantPeerBudget - peer.iHaveBudget = IHavePeerBudget - return + if peer.peerId notin g.peerStats: + g.initPeerStats(peer) else: # we knew this peer - discard + # restore previously stored score + peer.score = g.peerStats[peer.peerId].score proc grafted(g: GossipSub, p: PubSubPeer, topic: string) = - g.peerStats.withValue(p, stats): + g.peerStats.withValue(p.peerId, stats): var info = stats.topicInfos.getOrDefault(topic) info.graftTime = Moment.now() info.meshTime = 0.seconds @@ -327,15 +330,15 @@ proc grafted(g: GossipSub, p: PubSubPeer, topic: string) = # mgetOrPut does not work, so we gotta do this without referencing stats.topicInfos[topic] = info - assert(g.peerStats[p].topicInfos[topic].inMesh == true) + assert(g.peerStats[p.peerId].topicInfos[topic].inMesh == true) trace "grafted", peer=p, topic do: - g.onNewPeer(p) + g.initPeerStats(p) g.grafted(p, topic) proc pruned(g: GossipSub, p: PubSubPeer, topic: string) = - g.peerStats.withValue(p, stats): + g.peerStats.withValue(p.peerId, stats): when not defined(release): g.prunedPeers.incl(p) @@ -682,18 +685,21 @@ proc updateScores(g: GossipSub) = # avoid async trace "updating scores", peers = g.peers.len let now = Moment.now() - var evicting: seq[PubSubPeer] + var evicting: seq[PeerID] - for peer, stats in g.peerStats.mpairs: - trace "updating peer score", peer - var n_topics = 0 - var is_grafted = 0 - - if not peer.connected: + for peerId, stats in g.peerStats.mpairs: + let peer = g.peers.getOrDefault(peerId) + if isNil(peer) or not(peer.connected): if now > stats.expire: - evicting.add(peer) + evicting.add(peerId) trace "evicted peer from memory", peer - continue + continue + + trace "updating peer score", peer + + var + n_topics = 0 + is_grafted = 0 # Per topic for topic, topicParams in g.topicParams: @@ -771,6 +777,9 @@ proc updateScores(g: GossipSub) = # avoid async if peer.behaviourPenalty < g.parameters.decayToZero: peer.behaviourPenalty = 0 + # copy into stats the score to keep until expired + stats.score = peer.score + assert(g.peerStats[peer.peerId].score == peer.score) # nim sanity check trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted for peer in evicting: @@ -899,7 +908,7 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) = libp2p_gossipsub_peers_per_topic_fanout .set(g.fanout.peers(t).int64, labelValues = [t]) - g.peerStats.withValue(pubSubPeer, stats): + g.peerStats.withValue(pubSubPeer.peerId, stats): stats[].expire = Moment.now() + g.parameters.retainScore for topic, info in stats[].topicInfos.mpairs: info.firstMessageDeliveries = 0 @@ -927,8 +936,6 @@ method subscribeTopic*(g: GossipSub, if subscribe: trace "peer subscribed to topic" - # populate scoring structs and such - g.onNewPeer(peer) # subscribe remote peer to the topic discard g.gossipsub.addPeer(topic, peer) if peer.peerId in g.parameters.directPeers: @@ -959,13 +966,13 @@ proc punishPeer(g: GossipSub, peer: PubSubPeer, topics: seq[string]) = # ensure we init a new topic if unknown let _ = g.topicParams.mgetOrPut(t, TopicParams.init()) # update stats - g.peerStats.withValue(peer, stats): + g.peerStats.withValue(peer.peerId, stats): stats[].topicInfos.withValue(t, tstats): tstats[].invalidMessageDeliveries += 1 do: # if we have no stats populate! stats[].topicInfos[t] = TopicInfo(invalidMessageDeliveries: 1) do: # if we have no stats populate! - g.peerStats[peer] = + g.peerStats[peer.peerId] = block: var stats = PeerStats() stats.topicInfos[t] = TopicInfo(invalidMessageDeliveries: 1) @@ -1012,8 +1019,8 @@ proc handleGraft(g: GossipSub, continue - if peer notin g.peerStats: - g.onNewPeer(peer) + if peer.peerId notin g.peerStats: + g.initPeerStats(peer) # not in the spec exactly, but let's avoid way too low score peers # other clients do it too also was an audit recommendation @@ -1132,7 +1139,7 @@ method rpcHandler*(g: GossipSub, for t in msg.topicIDs: # for every topic in the message let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init()) # if in mesh add more delivery score - g.peerStats.withValue(peer, pstats): + g.peerStats.withValue(peer.peerId, pstats): pstats[].topicInfos.withValue(t, stats): if stats[].inMesh: # TODO: take into account meshMessageDeliveriesWindow @@ -1143,7 +1150,7 @@ method rpcHandler*(g: GossipSub, do: # make sure we don't loose this information pstats[].topicInfos[t] = TopicInfo(meshMessageDeliveries: 1) do: # make sure we don't loose this information - g.peerStats[peer] = + g.peerStats[peer.peerId] = block: var stats = PeerStats() stats.topicInfos[t] = TopicInfo(meshMessageDeliveries: 1) @@ -1190,7 +1197,7 @@ method rpcHandler*(g: GossipSub, for t in msg.topicIDs: # for every topic in the message let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init()) - g.peerStats.withValue(peer, pstats): + g.peerStats.withValue(peer.peerId, pstats): pstats[].topicInfos.withValue(t, stats): # contribute to peer score first delivery stats[].firstMessageDeliveries += 1 @@ -1205,7 +1212,7 @@ method rpcHandler*(g: GossipSub, do: # make sure we don't loose this information pstats[].topicInfos[t] = TopicInfo(firstMessageDeliveries: 1, meshMessageDeliveries: 1) do: # make sure we don't loose this information - g.peerStats[peer] = + g.peerStats[peer.peerId] = block: var stats = PeerStats() stats.topicInfos[t] = TopicInfo(firstMessageDeliveries: 1, meshMessageDeliveries: 1) @@ -1234,7 +1241,6 @@ method rpcHandler*(g: GossipSub, if respControl.graft.len > 0 or respControl.prune.len > 0 or respControl.ihave.len > 0 or messages.len > 0: - trace "sending control message", msg = shortLog(respControl), peer g.send( peer, @@ -1242,8 +1248,8 @@ method rpcHandler*(g: GossipSub, method subscribe*(g: GossipSub, topic: string, - handler: TopicHandler) {.async.} = - await procCall PubSub(g).subscribe(topic, handler) + handler: TopicHandler) = + procCall PubSub(g).subscribe(topic, handler) # if we have a fanout on this topic break it if topic in g.fanout: @@ -1251,42 +1257,48 @@ method subscribe*(g: GossipSub, g.rebalanceMesh(topic) +method unsubscribeAll*(g: GossipSub, topic: string) = + var + msg = RPCMsg.withSubs(@[topic], subscribe = false) + gpeers = g.gossipsub.getOrDefault(topic) + + if topic in g.mesh: + let mpeers = g.mesh.getOrDefault(topic) + + # remove mesh peers from gpeers, we send 2 different messages + gpeers = gpeers - mpeers + # send to peers NOT in mesh first + g.broadcast(toSeq(gpeers), msg) + + g.mesh.del(topic) + + for peer in mpeers: + trace "pruning unsubscribeAll call peer", peer, score = peer.score + g.pruned(peer, topic) + + msg.control = + some(ControlMessage(prune: + @[ControlPrune(topicID: topic, + peers: g.peerExchangeList(topic), + backoff: g.parameters.pruneBackoff.seconds.uint64)])) + + # send to peers IN mesh now + g.broadcast(toSeq(mpeers), msg) + else: + g.broadcast(toSeq(gpeers), msg) + + # finally let's remove from g.topics, do that by calling PubSub + procCall PubSub(g).unsubscribeAll(topic) + method unsubscribe*(g: GossipSub, - topics: seq[TopicPair]) {.async.} = - await procCall PubSub(g).unsubscribe(topics) + topics: seq[TopicPair]) = + procCall PubSub(g).unsubscribe(topics) for (topic, handler) in topics: # delete from mesh only if no handlers are left + # (handlers are removed in pubsub unsubscribe above) if topic notin g.topics: - if topic in g.mesh: - let peers = g.mesh[topic] - g.mesh.del(topic) - g.topicParams.del(topic) - for peer in peers: - trace "pruning unsubscribe call peer", peer, score = peer.score - g.pruned(peer, topic) - let prune = RPCMsg(control: some(ControlMessage( - prune: @[ControlPrune( - topicID: topic, - peers: g.peerExchangeList(topic), - backoff: g.parameters.pruneBackoff.seconds.uint64)]))) - g.broadcast(toSeq(peers), prune) - -method unsubscribeAll*(g: GossipSub, topic: string) {.async.} = - await procCall PubSub(g).unsubscribeAll(topic) - - if topic in g.mesh: - let peers = g.mesh.getOrDefault(topic) - g.mesh.del(topic) - for peer in peers: - trace "pruning unsubscribeAll call peer", peer, score = peer.score - g.pruned(peer, topic) - let prune = RPCMsg(control: some(ControlMessage( - prune: @[ControlPrune( - topicID: topic, - peers: g.peerExchangeList(topic), - backoff: g.parameters.pruneBackoff.seconds.uint64)]))) - g.broadcast(toSeq(peers), prune) + g.unsubscribeAll(topic) method publish*(g: GossipSub, topic: string, diff --git a/libp2p/protocols/pubsub/gossipsub10.nim b/libp2p/protocols/pubsub/gossipsub10.nim deleted file mode 100644 index 3d22d76f0..000000000 --- a/libp2p/protocols/pubsub/gossipsub10.nim +++ /dev/null @@ -1,637 +0,0 @@ -## Nim-LibP2P -## Copyright (c) 2019 Status Research & Development GmbH -## Licensed under either of -## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -## * MIT license ([LICENSE-MIT](LICENSE-MIT)) -## at your option. -## This file may not be copied, modified, or distributed except according to -## those terms. - -# TODO: this module is temporary to allow -# for quick switchover fro 1.1 to 1.0. -# This should be removed once 1.1 is stable -# enough. - -import std/[options, random, sequtils, sets, tables] -import chronos, chronicles, metrics -import ./pubsub, - ./floodsub, - ./pubsubpeer, - ./peertable, - ./mcache, - ./timedcache, - ./rpc/[messages, message], - ../protocol, - ../../stream/connection, - ../../peerinfo, - ../../peerid, - ../../utility - -logScope: - topics = "libp2p gossipsub" - -const GossipSubCodec* = "/meshsub/1.0.0" - -# overlay parameters -const GossipSubD* = 6 -const GossipSubDlo* = 4 -const GossipSubDhi* = 12 - -# gossip parameters -const GossipSubHistoryLength* = 5 -const GossipSubHistoryGossip* = 3 - -# heartbeat interval -const GossipSubHeartbeatInitialDelay* = 100.millis -const GossipSubHeartbeatInterval* = 1.seconds - -# fanout ttl -const GossipSubFanoutTTL* = 1.minutes - -type - GossipSub* = ref object of FloodSub - mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic - fanout*: PeerTable # peers that we send messages to when we're not subscribed to the topic - gossipsub*: PeerTable # peers that are subscribed to a topic - lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics - gossip*: Table[string, seq[ControlIHave]] # pending gossip - control*: Table[string, ControlMessage] # pending control messages - mcache*: MCache # messages cache - heartbeatFut: Future[void] # cancellation future for heartbeat interval - heartbeatRunning: bool - heartbeatEvents*: seq[AsyncEvent] - parameters*: GossipSubParams - - GossipSubParams* = object - # stubs - explicit: bool - pruneBackoff*: Duration - floodPublish*: bool - gossipFactor*: float64 - dScore*: int - dOut*: int - dLazy*: int - - gossipThreshold*: float64 - publishThreshold*: float64 - graylistThreshold*: float64 - acceptPXThreshold*: float64 - opportunisticGraftThreshold*: float64 - decayInterval*: Duration - decayToZero*: float64 - retainScore*: Duration - - appSpecificWeight*: float64 - ipColocationFactorWeight*: float64 - ipColocationFactorThreshold*: float64 - behaviourPenaltyWeight*: float64 - behaviourPenaltyDecay*: float64 - - directPeers*: seq[PeerId] - -proc init*(G: type[GossipSubParams]): G = discard - -when defined(libp2p_expensive_metrics): - declareGauge(libp2p_gossipsub_peers_per_topic_mesh, - "gossipsub peers per topic in mesh", - labels = ["topic"]) - - declareGauge(libp2p_gossipsub_peers_per_topic_fanout, - "gossipsub peers per topic in fanout", - labels = ["topic"]) - - declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub, - "gossipsub peers per topic in gossipsub", - labels = ["topic"]) - -method init*(g: GossipSub) = - proc handler(conn: Connection, proto: string) {.async.} = - ## main protocol handler that gets triggered on every - ## connection for a protocol string - ## e.g. ``/floodsub/1.0.0``, etc... - ## - try: - await g.handleConn(conn, proto) - except CancelledError: - # This is top-level procedure which will work as separate task, so it - # do not need to propogate CancelledError. - trace "Unexpected cancellation in gossipsub handler", conn - except CatchableError as exc: - trace "GossipSub handler leaks an error", exc = exc.msg, conn - - g.handler = handler - g.codec = GossipSubCodec - -proc replenishFanout(g: GossipSub, topic: string) = - ## get fanout peers for a topic - logScope: topic - trace "about to replenish fanout" - - if g.fanout.peers(topic) < GossipSubDLo: - trace "replenishing fanout", peers = g.fanout.peers(topic) - if topic in g.gossipsub: - for peer in g.gossipsub[topic]: - if g.fanout.addPeer(topic, peer): - if g.fanout.peers(topic) == GossipSubD: - break - - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_peers_per_topic_fanout - .set(g.fanout.peers(topic).int64, labelValues = [topic]) - - trace "fanout replenished with peers", peers = g.fanout.peers(topic) - -method onPubSubPeerEvent*(p: GossipSub, peer: PubsubPeer, event: PubSubPeerEvent) {.gcsafe.} = - case event.kind - of PubSubPeerEventKind.Connected: - discard - of PubSubPeerEventKind.Disconnected: - # If a send connection is lost, it's better to remove peer from the mesh - - # if it gets reestablished, the peer will be readded to the mesh, and if it - # doesn't, well.. then we hope the peer is going away! - for _, peers in p.mesh.mpairs(): - peers.excl(peer) - for _, peers in p.fanout.mpairs(): - peers.excl(peer) - - procCall FloodSub(p).onPubSubPeerEvent(peer, event) - - -proc rebalanceMesh(g: GossipSub, topic: string) = - logScope: - topic - mesh = g.mesh.peers(topic) - gossipsub = g.gossipsub.peers(topic) - - trace "rebalancing mesh" - - # create a mesh topic that we're subscribing to - - var - grafts, prunes: seq[PubSubPeer] - - if g.mesh.peers(topic) < GossipSubDlo: - trace "replenishing mesh", peers = g.mesh.peers(topic) - # replenish the mesh if we're below Dlo - grafts = toSeq( - g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) - - g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()) - ).filterIt(it.connected) - - shuffle(grafts) - - # Graft peers so we reach a count of D - grafts.setLen(min(grafts.len, GossipSubD - g.mesh.peers(topic))) - - trace "grafting", grafts = grafts.len - - for peer in grafts: - if g.mesh.addPeer(topic, peer): - g.fanout.removePeer(topic, peer) - - if g.mesh.peers(topic) > GossipSubDhi: - # prune peers if we've gone over Dhi - prunes = toSeq(g.mesh[topic]) - shuffle(prunes) - prunes.setLen(prunes.len - GossipSubD) # .. down to D peers - - trace "pruning", prunes = prunes.len - for peer in prunes: - g.mesh.removePeer(topic, peer) - - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_peers_per_topic_gossipsub - .set(g.gossipsub.peers(topic).int64, labelValues = [topic]) - - libp2p_gossipsub_peers_per_topic_fanout - .set(g.fanout.peers(topic).int64, labelValues = [topic]) - - libp2p_gossipsub_peers_per_topic_mesh - .set(g.mesh.peers(topic).int64, labelValues = [topic]) - - trace "mesh balanced" - - # Send changes to peers after table updates to avoid stale state - if grafts.len > 0: - let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)]))) - g.broadcast(grafts, graft) - if prunes.len > 0: - let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)]))) - g.broadcast(prunes, prune) - -proc dropFanoutPeers(g: GossipSub) = - # drop peers that we haven't published to in - # GossipSubFanoutTTL seconds - let now = Moment.now() - for topic in toSeq(g.lastFanoutPubSub.keys): - let val = g.lastFanoutPubSub[topic] - if now > val: - g.fanout.del(topic) - g.lastFanoutPubSub.del(topic) - trace "dropping fanout topic", topic - - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_peers_per_topic_fanout - .set(g.fanout.peers(topic).int64, labelValues = [topic]) - -proc getGossipPeers(g: GossipSub): Table[PubSubPeer, ControlMessage] {.gcsafe.} = - ## gossip iHave messages to peers - ## - - trace "getting gossip peers (iHave)" - let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys)) - let controlMsg = ControlMessage() - for topic in topics: - var allPeers = toSeq(g.gossipsub.getOrDefault(topic)) - shuffle(allPeers) - - let mesh = g.mesh.getOrDefault(topic) - let fanout = g.fanout.getOrDefault(topic) - - let gossipPeers = mesh + fanout - let mids = g.mcache.window(topic) - if not mids.len > 0: - continue - - if topic notin g.gossipsub: - trace "topic not in gossip array, skipping", topic - continue - - let ihave = ControlIHave(topicID: topic, messageIDs: toSeq(mids)) - for peer in allPeers: - if result.len >= GossipSubD: - trace "got gossip peers", peers = result.len - break - - if peer in gossipPeers: - continue - - if peer notin result: - result[peer] = controlMsg - - result[peer].ihave.add(ihave) - -proc heartbeat(g: GossipSub) {.async.} = - while g.heartbeatRunning: - try: - trace "running heartbeat" - - for t in toSeq(g.topics.keys): - g.rebalanceMesh(t) - - g.dropFanoutPeers() - - # replenish known topics to the fanout - for t in toSeq(g.fanout.keys): - g.replenishFanout(t) - - let peers = g.getGossipPeers() - for peer, control in peers: - g.peers.withValue(peer.peerId, pubsubPeer) do: - g.send( - pubsubPeer[], - RPCMsg(control: some(control))) - - g.mcache.shift() # shift the cache - except CancelledError as exc: - raise exc - except CatchableError as exc: - warn "exception ocurred in gossipsub heartbeat", exc = exc.msg - - for trigger in g.heartbeatEvents: - trace "firing heartbeat event", instance = cast[int](g) - trigger.fire() - - await sleepAsync(GossipSubHeartbeatInterval) - -method unsubscribePeer*(g: GossipSub, peer: PeerID) = - ## handle peer disconnects - ## - - trace "unsubscribing gossipsub peer", peer - let pubSubPeer = g.peers.getOrDefault(peer) - if pubSubPeer.isNil: - trace "no peer to unsubscribe", peer - return - - for t in toSeq(g.gossipsub.keys): - g.gossipsub.removePeer(t, pubSubPeer) - - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_peers_per_topic_gossipsub - .set(g.gossipsub.peers(t).int64, labelValues = [t]) - - for t in toSeq(g.mesh.keys): - g.mesh.removePeer(t, pubSubPeer) - - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_peers_per_topic_mesh - .set(g.mesh.peers(t).int64, labelValues = [t]) - - for t in toSeq(g.fanout.keys): - g.fanout.removePeer(t, pubSubPeer) - - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_peers_per_topic_fanout - .set(g.fanout.peers(t).int64, labelValues = [t]) - - procCall FloodSub(g).unsubscribePeer(peer) - -method subscribeTopic*(g: GossipSub, - topic: string, - subscribe: bool, - peer: PubSubPeer) {.gcsafe.} = - # Skip floodsub - we don't want it to add the peer to `g.floodsub` - procCall PubSub(g).subscribeTopic(topic, subscribe, peer) - - logScope: - peer - topic - - if subscribe: - trace "peer subscribed to topic" - # subscribe remote peer to the topic - discard g.gossipsub.addPeer(topic, peer) - else: - trace "peer unsubscribed from topic" - # unsubscribe remote peer from the topic - g.gossipsub.removePeer(topic, peer) - g.mesh.removePeer(topic, peer) - g.fanout.removePeer(topic, peer) - - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_peers_per_topic_mesh - .set(g.mesh.peers(topic).int64, labelValues = [topic]) - libp2p_gossipsub_peers_per_topic_fanout - .set(g.fanout.peers(topic).int64, labelValues = [topic]) - - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_peers_per_topic_gossipsub - .set(g.gossipsub.peers(topic).int64, labelValues = [topic]) - - trace "gossip peers", peers = g.gossipsub.peers(topic), topic - -proc handleGraft(g: GossipSub, - peer: PubSubPeer, - grafts: seq[ControlGraft]): seq[ControlPrune] = - for graft in grafts: - let topic = graft.topicID - logScope: - peer - topic - - trace "peer grafted topic" - - # If they send us a graft before they send us a subscribe, what should - # we do? For now, we add them to mesh but don't add them to gossipsub. - - if topic in g.topics: - if g.mesh.peers(topic) < GossipSubDHi: - # In the spec, there's no mention of DHi here, but implicitly, a - # peer will be removed from the mesh on next rebalance, so we don't want - # this peer to push someone else out - if g.mesh.addPeer(topic, peer): - g.fanout.removePeer(topic, peer) - else: - trace "peer already in mesh" - else: - result.add(ControlPrune(topicID: topic)) - else: - debug "peer grafting topic we're not interested in" - result.add(ControlPrune(topicID: topic)) - - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_peers_per_topic_mesh - .set(g.mesh.peers(topic).int64, labelValues = [topic]) - libp2p_gossipsub_peers_per_topic_fanout - .set(g.fanout.peers(topic).int64, labelValues = [topic]) - -proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = - for prune in prunes: - trace "peer pruned topic", peer, topic = prune.topicID - - g.mesh.removePeer(prune.topicID, peer) - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_peers_per_topic_mesh - .set(g.mesh.peers(prune.topicID).int64, labelValues = [prune.topicID]) - -proc handleIHave(g: GossipSub, - peer: PubSubPeer, - ihaves: seq[ControlIHave]): ControlIWant = - for ihave in ihaves: - trace "peer sent ihave", - peer, topic = ihave.topicID, msgs = ihave.messageIDs - - if ihave.topicID in g.mesh: - for m in ihave.messageIDs: - if m notin g.seen: - result.messageIDs.add(m) - -proc handleIWant(g: GossipSub, - peer: PubSubPeer, - iwants: seq[ControlIWant]): seq[Message] = - for iwant in iwants: - for mid in iwant.messageIDs: - trace "peer sent iwant", peer, messageID = mid - let msg = g.mcache.get(mid) - if msg.isSome: - result.add(msg.get()) - -method rpcHandler*(g: GossipSub, - peer: PubSubPeer, - rpcMsg: RPCMsg) {.async.} = - await procCall PubSub(g).rpcHandler(peer, rpcMsg) - - for msg in rpcMsg.messages: # for every message - let msgId = g.msgIdProvider(msg) - - if g.seen.put(msgId): - trace "Dropping already-seen message", msgId, peer - continue - - g.mcache.put(msgId, msg) - - if (msg.signature.len > 0 or g.verifySignature) and not msg.verify(): - # always validate if signature is present or required - debug "Dropping message due to failed signature verification", msgId, peer - continue - - if msg.seqno.len > 0 and msg.seqno.len != 8: - # if we have seqno should be 8 bytes long - debug "Dropping message due to invalid seqno length", msgId, peer - continue - - # g.anonymize needs no evaluation when receiving messages - # as we have a "lax" policy and allow signed messages - - let validation = await g.validate(msg) - case validation - of ValidationResult.Reject: - debug "Dropping message after validation, reason: reject", msgId, peer - continue - of ValidationResult.Ignore: - debug "Dropping message after validation, reason: ignore", msgId, peer - continue - of ValidationResult.Accept: - discard - - var toSendPeers = initHashSet[PubSubPeer]() - for t in msg.topicIDs: # for every topic in the message - g.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) - g.mesh.withValue(t, peers): toSendPeers.incl(peers[]) - - await handleData(g, t, msg.data) - - # In theory, if topics are the same in all messages, we could batch - we'd - # also have to be careful to only include validated messages - g.broadcast(toSeq(toSendPeers), RPCMsg(messages: @[msg])) - trace "forwared message to peers", peers = toSendPeers.len, msgId, peer - - if rpcMsg.control.isSome: - let control = rpcMsg.control.get() - g.handlePrune(peer, control.prune) - - var respControl: ControlMessage - respControl.iwant.add(g.handleIHave(peer, control.ihave)) - respControl.prune.add(g.handleGraft(peer, control.graft)) - let messages = g.handleIWant(peer, control.iwant) - - if respControl.graft.len > 0 or respControl.prune.len > 0 or - respControl.ihave.len > 0 or messages.len > 0: - - trace "sending control message", msg = shortLog(respControl), peer - g.send( - peer, - RPCMsg(control: some(respControl), messages: messages)) - -method subscribe*(g: GossipSub, - topic: string, - handler: TopicHandler) {.async.} = - await procCall PubSub(g).subscribe(topic, handler) - g.rebalanceMesh(topic) - -method unsubscribe*(g: GossipSub, - topics: seq[TopicPair]) {.async.} = - await procCall PubSub(g).unsubscribe(topics) - - for (topic, handler) in topics: - # delete from mesh only if no handlers are left - if topic notin g.topics: - if topic in g.mesh: - let peers = g.mesh[topic] - g.mesh.del(topic) - - let prune = RPCMsg( - control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)]))) - g.broadcast(toSeq(peers), prune) - -method unsubscribeAll*(g: GossipSub, topic: string) {.async.} = - await procCall PubSub(g).unsubscribeAll(topic) - - if topic in g.mesh: - let peers = g.mesh.getOrDefault(topic) - g.mesh.del(topic) - - let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)]))) - g.broadcast(toSeq(peers), prune) - -method publish*(g: GossipSub, - topic: string, - data: seq[byte]): Future[int] {.async.} = - # base returns always 0 - discard await procCall PubSub(g).publish(topic, data) - - logScope: topic - trace "Publishing message on topic", data = data.shortLog - - if topic.len <= 0: # data could be 0/empty - debug "Empty topic, skipping publish" - return 0 - - var peers: HashSet[PubSubPeer] - if topic in g.topics: # if we're subscribed use the mesh - peers = g.mesh.getOrDefault(topic) - else: # not subscribed, send to fanout peers - # try optimistically - peers = g.fanout.getOrDefault(topic) - if peers.len == 0: - # ok we had nothing.. let's try replenish inline - g.replenishFanout(topic) - peers = g.fanout.getOrDefault(topic) - - # even if we couldn't publish, - # we still attempted to publish - # on the topic, so it makes sense - # to update the last topic publish - # time - g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL) - - if peers.len == 0: - debug "No peers for topic, skipping publish" - return 0 - - inc g.msgSeqno - let - msg = - if g.anonymize: - Message.init(none(PeerInfo), data, topic, none(uint64), false) - else: - Message.init(some(g.peerInfo), data, topic, some(g.msgSeqno), g.sign) - msgId = g.msgIdProvider(msg) - - logScope: msgId - - trace "Created new message", msg = shortLog(msg), peers = peers.len - - if g.seen.put(msgId): - # custom msgid providers might cause this - trace "Dropping already-seen message" - return 0 - - g.mcache.put(msgId, msg) - - g.broadcast(toSeq(peers), RPCMsg(messages: @[msg])) - when defined(libp2p_expensive_metrics): - if peers.len > 0: - libp2p_pubsub_messages_published.inc(labelValues = [topic]) - - trace "Published message to peers" - - return peers.len - -method start*(g: GossipSub) {.async.} = - trace "gossipsub start" - - if not g.heartbeatFut.isNil: - warn "Starting gossipsub twice" - return - - g.heartbeatRunning = true - g.heartbeatFut = g.heartbeat() - -method stop*(g: GossipSub) {.async.} = - trace "gossipsub stop" - if g.heartbeatFut.isNil: - warn "Stopping gossipsub without starting it" - return - - # stop heartbeat interval - g.heartbeatRunning = false - if not g.heartbeatFut.finished: - trace "awaiting last heartbeat" - await g.heartbeatFut - trace "heartbeat stopped" - g.heartbeatFut = nil - - -method initPubSub*(g: GossipSub) = - procCall FloodSub(g).initPubSub() - - randomize() - g.mcache = MCache.init(GossipSubHistoryGossip, GossipSubHistoryLength) - g.mesh = initTable[string, HashSet[PubSubPeer]]() # meshes - topic to peer - g.fanout = initTable[string, HashSet[PubSubPeer]]() # fanout - topic to peer - g.gossipsub = initTable[string, HashSet[PubSubPeer]]()# topic to peer map of all gossipsub peers - g.lastFanoutPubSub = initTable[string, Moment]() # last publish time for fanout topics - g.gossip = initTable[string, seq[ControlIHave]]() # pending gossip - g.control = initTable[string, ControlMessage]() # pending control messages diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index e7c0844a4..030b67990 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -231,7 +231,7 @@ method subscribePeer*(p: PubSub, peer: PeerID) {.base.} = peer.outbound = true # flag as outbound method unsubscribe*(p: PubSub, - topics: seq[TopicPair]) {.base, async.} = + topics: seq[TopicPair]) {.base.} = ## unsubscribe from a list of ``topic`` strings for t in topics: let @@ -250,19 +250,18 @@ method unsubscribe*(p: PubSub, proc unsubscribe*(p: PubSub, topic: string, - handler: TopicHandler): Future[void] = + handler: TopicHandler) = ## unsubscribe from a ``topic`` string ## - p.unsubscribe(@[(topic, handler)]) -method unsubscribeAll*(p: PubSub, topic: string) {.base, async.} = +method unsubscribeAll*(p: PubSub, topic: string) {.base.} = p.topics.del(topic) libp2p_pubsub_topics.set(p.topics.len.int64) method subscribe*(p: PubSub, topic: string, - handler: TopicHandler) {.base, async.} = + handler: TopicHandler) {.base.} = ## subscribe to a topic ## ## ``topic`` - a string topic to subscribe to diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index 389483eb3..4fd8b75f5 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -62,7 +62,7 @@ suite "FloodSub": await subscribeNodes(nodes) - await nodes[1].subscribe("foobar", handler) + nodes[1].subscribe("foobar", handler) await waitSub(nodes[0], nodes[1], "foobar") check (await nodes[0].publish("foobar", "Hello!".toBytes())) > 0 @@ -104,7 +104,7 @@ suite "FloodSub": await subscribeNodes(nodes) - await nodes[0].subscribe("foobar", handler) + nodes[0].subscribe("foobar", handler) await waitSub(nodes[1], nodes[0], "foobar") check (await nodes[1].publish("foobar", "Hello!".toBytes())) > 0 @@ -147,7 +147,7 @@ suite "FloodSub": await subscribeNodes(nodes) - await nodes[1].subscribe("foobar", handler) + nodes[1].subscribe("foobar", handler) await waitSub(nodes[0], nodes[1], "foobar") var validatorFut = newFuture[bool]() @@ -195,7 +195,7 @@ suite "FloodSub": )) await subscribeNodes(nodes) - await nodes[1].subscribe("foobar", handler) + nodes[1].subscribe("foobar", handler) await waitSub(nodes[0], nodes[1], "foobar") var validatorFut = newFuture[bool]() @@ -243,9 +243,9 @@ suite "FloodSub": )) await subscribeNodes(nodes) - await nodes[1].subscribe("foo", handler) + nodes[1].subscribe("foo", handler) await waitSub(nodes[0], nodes[1], "foo") - await nodes[1].subscribe("bar", handler) + nodes[1].subscribe("bar", handler) await waitSub(nodes[0], nodes[1], "bar") proc validator(topic: string, @@ -299,7 +299,7 @@ suite "FloodSub": await subscribeNodes(nodes) for i in 0..= runs: seenFut.complete() - await dialer.subscribe("foobar", handler) + dialer.subscribe("foobar", handler) await waitSub(nodes[0], dialer, "foobar") tryPublish await wait(nodes[0].publish("foobar", @@ -640,7 +635,7 @@ suite "GossipSub": if not seenFut.finished() and seen.len >= runs: seenFut.complete() - await dialer.subscribe("foobar", handler) + dialer.subscribe("foobar", handler) await waitSub(nodes[0], dialer, "foobar") tryPublish await wait(nodes[0].publish("foobar", diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index 70e913ab0..254fff365 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -4,18 +4,14 @@ const libp2p_pubsub_verify {.booldefine.} = true libp2p_pubsub_anonymize {.booldefine.} = false -import random +import random, tables import chronos import ../../libp2p/[standard_setup, protocols/pubsub/pubsub, + protocols/pubsub/gossipsub, protocols/pubsub/floodsub, protocols/secure/secure] -when defined(fallback_gossipsub_10): - import ../../libp2p/protocols/pubsub/gossipsub10 -else: - import ../../libp2p/protocols/pubsub/gossipsub - export standard_setup randomize() @@ -35,14 +31,19 @@ proc generateNodes*( for i in 0..