diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 0fa25d25e..9f56ee03a 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -45,9 +45,9 @@ const GossipSubFanoutTTL* = 1.minutes type GossipSub* = ref object of FloodSub - mesh*: Table[string, HashSet[string]] # meshes - topic to peer - fanout*: Table[string, HashSet[string]] # fanout - topic to peer - gossipsub*: Table[string, HashSet[string]] # topic to peer map of all gossipsub peers + mesh*: Table[string, HashSet[string]] # peers that we send messages to when we are subscribed to the topic + fanout*: Table[string, HashSet[string]] # peers that we send messages to when we're not subscribed to the topic + gossipsub*: Table[string, HashSet[string]] # peers that are subscribed to a topic lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics gossip*: Table[string, seq[ControlIHave]] # pending gossip control*: Table[string, ControlMessage] # pending control messages @@ -68,6 +68,31 @@ declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub, "gossipsub peers per topic in gossipsub", labels = ["topic"]) +func addPeer( + table: var Table[string, HashSet[string]], topic: string, + peerId: string): bool = + # returns true if the peer was added, false if it was already in the collection + not table.mgetOrPut(topic, initHashSet[string]()).containsOrIncl(peerId) + +func removePeer( + table: var Table[string, HashSet[string]], topic, peerId: string) = + table.withValue(topic, peers): + peers[].excl(peerId) + if peers[].len == 0: + table.del(topic) + +func hasPeer(table: Table[string, HashSet[string]], topic, peerId: string): bool = + (topic in table) and (peerId in table[topic]) + +func peers(table: Table[string, HashSet[string]], topic: string): int = + if topic in table: + table[topic].len + else: + 0 + +func getPeers(table: Table[string, HashSet[string]], topic: string): HashSet[string] = + table.getOrDefault(topic, initHashSet[string]()) + method init*(g: GossipSub) = proc handler(conn: Connection, proto: string) {.async.} = ## main protocol handler that gets triggered on every @@ -83,119 +108,102 @@ method init*(g: GossipSub) = proc replenishFanout(g: GossipSub, topic: string) = ## get fanout peers for a topic trace "about to replenish fanout" - if topic notin g.fanout: - g.fanout[topic] = initHashSet[string]() - if g.fanout.getOrDefault(topic).len < GossipSubDLo: - trace "replenishing fanout", peers = g.fanout.getOrDefault(topic).len - if topic in toSeq(g.gossipsub.keys): - for p in g.gossipsub.getOrDefault(topic): - if not g.fanout[topic].containsOrIncl(p): - if g.fanout.getOrDefault(topic).len == GossipSubD: + if g.fanout.peers(topic) < GossipSubDLo: + trace "replenishing fanout", peers = g.fanout.peers(topic) + if topic in g.gossipsub: + for peerId in g.gossipsub[topic]: + if g.fanout.addPeer(topic, peerId): + if g.fanout.peers(topic) == GossipSubD: break libp2p_gossipsub_peers_per_topic_fanout - .set(g.fanout.getOrDefault(topic).len.int64, - labelValues = [topic]) + .set(g.fanout.peers(topic).int64, labelValues = [topic]) - trace "fanout replenished with peers", peers = g.fanout.getOrDefault(topic).len - -template moveToMeshHelper(g: GossipSub, - topic: string, - table: Table[string, HashSet[string]]) = - ## move peers from `table` into `mesh` - ## - var peerIds = toSeq(table.getOrDefault(topic)) - - logScope: - topic = topic - meshPeers = g.mesh.getOrDefault(topic).len - peers = peerIds.len - - shuffle(peerIds) - for id in peerIds: - if g.mesh.getOrDefault(topic).len > GossipSubD: - break - - trace "gathering peers for mesh" - if topic notin table: - continue - - trace "getting peers", topic, - peers = peerIds.len - - table[topic].excl(id) # always exclude - if id in g.mesh[topic]: - continue # we already have this peer in the mesh, try again - - if id in g.peers: - let p = g.peers[id] - if p.connected: - # send a graft message to the peer - await p.sendGraft(@[topic]) - g.mesh[topic].incl(id) - trace "got peer", peer = id + trace "fanout replenished with peers", peers = g.fanout.peers(topic) proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = - try: - trace "about to rebalance mesh" - # create a mesh topic that we're subscribing to - if topic notin g.mesh: - g.mesh[topic] = initHashSet[string]() + trace "about to rebalance mesh" + # create a mesh topic that we're subscribing to - if g.mesh.getOrDefault(topic).len < GossipSubDlo: - trace "replenishing mesh", topic - # replenish the mesh if we're below GossipSubDlo + var + grafts, prunes: seq[PubSubPeer] - # move fanout nodes first - g.moveToMeshHelper(topic, g.fanout) + if g.mesh.peers(topic) < GossipSubDlo: + trace "replenishing mesh", topic, peers = g.mesh.peers(topic) + # replenish the mesh if we're below GossipSubDlo + var newPeers = toSeq( + g.gossipsub.getOrDefault(topic, initHashSet[string]()) - + g.mesh.getOrDefault(topic, initHashSet[string]()) + ) - # move gossipsub nodes second - g.moveToMeshHelper(topic, g.gossipsub) + logScope: + topic = topic + meshPeers = g.mesh.peers(topic) + newPeers = newPeers.len - if g.mesh.getOrDefault(topic).len > GossipSubDhi: - # prune peers if we've gone over - var mesh = toSeq(g.mesh.getOrDefault(topic)) - shuffle(mesh) + shuffle(newPeers) - trace "about to prune mesh", mesh = mesh.len - for id in mesh: - if g.mesh.getOrDefault(topic).len <= GossipSubD: - break + trace "getting peers", topic, peers = peerIds.len - trace "pruning peers", peers = g.mesh[topic].len - let p = g.peers[id] + for id in newPeers: + if g.mesh.peers(topic) >= GossipSubD: + break + + let p = g.peers.getOrDefault(id) + if p != nil: # send a graft message to the peer - await p.sendPrune(@[topic]) - g.mesh[topic].excl(id) - if topic in g.gossipsub: - g.gossipsub[topic].incl(id) + grafts.add p + discard g.mesh.addPeer(topic, id) + trace "got peer", peer = id + else: + # Peer should have been removed from mesh also! + warn "Unknown peer in mesh", peer = id - libp2p_gossipsub_peers_per_topic_gossipsub - .set(g.gossipsub.getOrDefault(topic).len.int64, - labelValues = [topic]) + if g.mesh.peers(topic) > GossipSubDhi: + # prune peers if we've gone over + var mesh = toSeq(g.mesh[topic]) + shuffle(mesh) - libp2p_gossipsub_peers_per_topic_fanout - .set(g.fanout.getOrDefault(topic).len.int64, - labelValues = [topic]) + trace "about to prune mesh", mesh = mesh.len + for id in mesh: + if g.mesh.peers(topic) <= GossipSubD: + break - libp2p_gossipsub_peers_per_topic_mesh - .set(g.mesh.getOrDefault(topic).len.int64, - labelValues = [topic]) + trace "pruning peers", peers = g.mesh.peers(topic) + # send a graft message to the peer + g.mesh.removePeer(topic, id) - trace "mesh balanced, got peers", peers = g.mesh.getOrDefault(topic).len, - topicId = topic - except CancelledError as exc: - raise exc - except CatchableError as exc: - warn "exception occurred re-balancing mesh", exc = exc.msg + let p = g.peers.getOrDefault(id) + if p != nil: + prunes.add(p) -proc dropFanoutPeers(g: GossipSub) {.async.} = + libp2p_gossipsub_peers_per_topic_gossipsub + .set(g.gossipsub.peers(topic).int64, labelValues = [topic]) + + libp2p_gossipsub_peers_per_topic_fanout + .set(g.fanout.peers(topic).int64, labelValues = [topic]) + + libp2p_gossipsub_peers_per_topic_mesh + .set(g.mesh.peers(topic).int64, labelValues = [topic]) + + # Send changes to peers after table updates to avoid stale state + for p in grafts: + await p.sendGraft(@[topic]) + for p in prunes: + await p.sendPrune(@[topic]) + + trace "mesh balanced, got peers", peers = g.mesh.peers(topic), + topicId = topic + +proc dropFanoutPeers(g: GossipSub) = # drop peers that we haven't published to in # GossipSubFanoutTTL seconds var dropping = newSeq[string]() + let now = Moment.now() + for topic, val in g.lastFanoutPubSub: - if Moment.now > val: + if now > val: dropping.add(topic) g.fanout.del(topic) trace "dropping fanout topic", topic @@ -204,7 +212,7 @@ proc dropFanoutPeers(g: GossipSub) {.async.} = g.lastFanoutPubSub.del(topic) libp2p_gossipsub_peers_per_topic_fanout - .set(g.fanout.getOrDefault(topic).len.int64, labelValues = [topic]) + .set(g.fanout.peers(topic).int64, labelValues = [topic]) proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} = ## gossip iHave messages to peers @@ -257,7 +265,7 @@ proc heartbeat(g: GossipSub) {.async.} = for t in toSeq(g.topics.keys): await g.rebalanceMesh(t) - await g.dropFanoutPeers() + g.dropFanoutPeers() # replenish known topics to the fanout for t in toSeq(g.fanout.keys): @@ -281,27 +289,23 @@ proc heartbeat(g: GossipSub) {.async.} = method handleDisconnect*(g: GossipSub, peer: PubSubPeer) = ## handle peer disconnects procCall FloodSub(g).handleDisconnect(peer) - for t in toSeq(g.gossipsub.keys): - if t in g.gossipsub: - g.gossipsub[t].excl(peer.id) + g.gossipsub.removePeer(t, peer.id) libp2p_gossipsub_peers_per_topic_gossipsub - .set(g.gossipsub.getOrDefault(t).len.int64, labelValues = [t]) + .set(g.gossipsub.peers(t).int64, labelValues = [t]) for t in toSeq(g.mesh.keys): - if t in g.mesh: - g.mesh[t].excl(peer.id) + g.mesh.removePeer(t, peer.id) libp2p_gossipsub_peers_per_topic_mesh - .set(g.mesh[t].len.int64, labelValues = [t]) + .set(g.mesh.peers(t).int64, labelValues = [t]) for t in toSeq(g.fanout.keys): - if t in g.fanout: - g.fanout[t].excl(peer.id) + g.fanout.removePeer(t, peer.id) libp2p_gossipsub_peers_per_topic_fanout - .set(g.fanout[t].len.int64, labelValues = [t]) + .set(g.fanout.peers(t).int64, labelValues = [t]) method subscribePeer*(p: GossipSub, conn: Connection) = @@ -314,26 +318,26 @@ method subscribeTopic*(g: GossipSub, peerId: string) {.gcsafe, async.} = await procCall PubSub(g).subscribeTopic(topic, subscribe, peerId) - if topic notin g.gossipsub: - g.gossipsub[topic] = initHashSet[string]() - if subscribe: trace "adding subscription for topic", peer = peerId, name = topic # subscribe remote peer to the topic - g.gossipsub[topic].incl(peerId) + discard g.gossipsub.addPeer(topic, peerId) else: trace "removing subscription for topic", peer = peerId, name = topic # unsubscribe remote peer from the topic - g.gossipsub[topic].excl(peerId) - if peerId in g.mesh.getOrDefault(topic): - g.mesh[topic].excl(peerId) - if peerId in g.fanout.getOrDefault(topic): - g.fanout[topic].excl(peerId) + g.gossipsub.removePeer(topic, peerId) + g.mesh.removePeer(topic, peerId) + g.fanout.removePeer(topic, peerId) + + libp2p_gossipsub_peers_per_topic_mesh + .set(g.mesh.peers(topic).int64, labelValues = [topic]) + libp2p_gossipsub_peers_per_topic_fanout + .set(g.fanout.peers(topic).int64, labelValues = [topic]) libp2p_gossipsub_peers_per_topic_gossipsub - .set(g.gossipsub[topic].len.int64, labelValues = [topic]) + .set(g.gossipsub.peers(topic).int64, labelValues = [topic]) - trace "gossip peers", peers = g.gossipsub[topic].len, topic + trace "gossip peers", peers = g.gossipsub.peers(topic), topic # also rebalance current topic if we are subbed to if topic in g.topics: @@ -343,34 +347,39 @@ proc handleGraft(g: GossipSub, peer: PubSubPeer, grafts: seq[ControlGraft], respControl: var ControlMessage) = + let peerId = peer.id for graft in grafts: - trace "processing graft message", peer = peer.id, - topicID = graft.topicID + let topic = graft.topicID + trace "processing graft message", topic, peerId - if graft.topicID in g.topics: - if g.mesh.len < GossipSubD: - g.mesh[graft.topicID].incl(peer.id) + # If they send us a graft before they send us a subscribe, what should + # we do? For now, we add them to mesh but don't add them to gossipsub. + + if topic in g.topics: + if g.mesh.peers(topic) < GossipSubDHi: + # In the spec, there's no mention of DHi here, but implicitly, a + # peer will be removed from the mesh on next rebalance, so we don't want + # this peer to push someone else out + if g.mesh.addPeer(topic, peerId): + g.fanout.removePeer(topic, peer.id) + else: + trace "Peer already in mesh", topic, peerId else: - g.gossipsub[graft.topicID].incl(peer.id) + respControl.prune.add(ControlPrune(topicID: topic)) else: - respControl.prune.add(ControlPrune(topicID: graft.topicID)) + respControl.prune.add(ControlPrune(topicID: topic)) libp2p_gossipsub_peers_per_topic_mesh - .set(g.mesh[graft.topicID].len.int64, labelValues = [graft.topicID]) - - libp2p_gossipsub_peers_per_topic_gossipsub - .set(g.gossipsub[graft.topicID].len.int64, labelValues = [graft.topicID]) + .set(g.mesh.peers(topic).int64, labelValues = [topic]) proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = for prune in prunes: trace "processing prune message", peer = peer.id, topicID = prune.topicID - if prune.topicID in g.mesh: - g.mesh[prune.topicID].excl(peer.id) - g.gossipsub[prune.topicID].incl(peer.id) - libp2p_gossipsub_peers_per_topic_mesh - .set(g.mesh[prune.topicID].len.int64, labelValues = [prune.topicID]) + g.mesh.removePeer(prune.topicID, peer.id) + libp2p_gossipsub_peers_per_topic_mesh + .set(g.mesh.peers(prune.topicID).int64, labelValues = [prune.topicID]) proc handleIHave(g: GossipSub, peer: PubSubPeer, @@ -485,9 +494,11 @@ method unsubscribe*(g: GossipSub, if topic in g.mesh: let peers = g.mesh.getOrDefault(topic) g.mesh.del(topic) + for id in peers: - let p = g.peers[id] - await p.sendPrune(@[topic]) + let p = g.peers.getOrDefault(id) + if p != nil: + await p.sendPrune(@[topic]) method publish*(g: GossipSub, topic: string, diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 1bda8adde..212404dab 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -163,14 +163,24 @@ proc sendMsg*(p: PubSubPeer, p.send(@[RPCMsg(messages: @[Message.init(p.peerInfo, data, topic, sign)])]) proc sendGraft*(p: PubSubPeer, topics: seq[string]) {.async.} = - for topic in topics: - trace "sending graft msg to peer", peer = p.id, topicID = topic - await p.send(@[RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))]) + try: + for topic in topics: + trace "sending graft msg to peer", peer = p.id, topicID = topic + await p.send(@[RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))]) + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "Could not send graft", msg = exc.msg proc sendPrune*(p: PubSubPeer, topics: seq[string]) {.async.} = - for topic in topics: - trace "sending prune msg to peer", peer = p.id, topicID = topic - await p.send(@[RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))]) + try: + for topic in topics: + trace "sending prune msg to peer", peer = p.id, topicID = topic + await p.send(@[RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))]) + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "Could not send prune", msg = exc.msg proc `$`*(p: PubSubPeer): string = p.id diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index a6aa40e1e..c555d703c 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -137,7 +137,7 @@ suite "GossipSub internal": check gossipSub.fanout[topic].len == GossipSubD - await gossipSub.dropFanoutPeers() + gossipSub.dropFanoutPeers() check topic notin gossipSub.fanout await allFuturesThrowing(conns.mapIt(it.close())) @@ -176,7 +176,7 @@ suite "GossipSub internal": check gossipSub.fanout[topic1].len == GossipSubD check gossipSub.fanout[topic2].len == GossipSubD - await gossipSub.dropFanoutPeers() + gossipSub.dropFanoutPeers() check topic1 notin gossipSub.fanout check topic2 in gossipSub.fanout