diff --git a/libp2p/protocols/pubsub/gossipsub11.nim b/libp2p/protocols/pubsub/gossipsub11.nim index 13865e31f..a1bca8191 100644 --- a/libp2p/protocols/pubsub/gossipsub11.nim +++ b/libp2p/protocols/pubsub/gossipsub11.nim @@ -75,9 +75,17 @@ type heartbeatRunning: bool heartbeatLock: AsyncLock # heartbeat lock to prevent two consecutive concurrent heartbeats -declareGauge(libp2p_gossipsub_peers_per_topic_mesh, "gossipsub peers per topic in mesh", labels = ["topic"]) -declareGauge(libp2p_gossipsub_peers_per_topic_fanout, "gossipsub peers per topic in fanout", labels = ["topic"]) -declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub, "gossipsub peers per topic in gossipsub", labels = ["topic"]) +declareGauge(libp2p_gossipsub_peers_per_topic_mesh, + "gossipsub peers per topic in mesh", + labels = ["topic"]) + +declareGauge(libp2p_gossipsub_peers_per_topic_fanout, + "gossipsub peers per topic in fanout", + labels = ["topic"]) + +declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub, + "gossipsub peers per topic in gossipsub", + labels = ["topic"]) proc init*(_: type[GossipSubParams]): GossipSubParams = GossipSubParams( @@ -113,16 +121,54 @@ proc replenishFanout(g: GossipSub, topic: string) = if g.fanout.getOrDefault(topic).len < GossipSubDLo: trace "replenishing fanout", peers = g.fanout.getOrDefault(topic).len - if topic in g.gossipsub: + if topic in toSeq(g.gossipsub.keys): for p in g.gossipsub.getOrDefault(topic): if not g.fanout[topic].containsOrIncl(p): if g.fanout.getOrDefault(topic).len == GossipSubD: break libp2p_gossipsub_peers_per_topic_fanout - .set(g.fanout.getOrDefault(topic).len.int64, labelValues = [topic]) + .set(g.fanout.getOrDefault(topic).len.int64, + labelValues = [topic]) + trace "fanout replenished with peers", peers = g.fanout.getOrDefault(topic).len +template moveToMeshHelper(g: GossipSub, + topic: string, + table: Table[string, HashSet[string]]) = + ## move peers from `table` into `mesh` + ## + var peerIds = toSeq(table.getOrDefault(topic)) + + logScope: + topic = topic + meshPeers = g.mesh.getOrDefault(topic).len + peers = peerIds.len + + shuffle(peerIds) + for id in peerIds: + if g.mesh.getOrDefault(topic).len > GossipSubD: + break + + trace "gathering peers for mesh" + if topic notin table: + continue + + trace "getting peers", topic, + peers = peerIds.len + + table[topic].excl(id) # always exclude + if id in g.mesh[topic]: + continue # we already have this peer in the mesh, try again + + if id in g.peers: + let p = g.peers[id] + if p.connected: + # send a graft message to the peer + await p.sendGraft(@[topic]) + g.mesh[topic].incl(id) + trace "got peer", peer = id + proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = try: trace "about to rebalance mesh" @@ -130,31 +176,19 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = if topic notin g.mesh: g.mesh[topic] = initHashSet[string]() - # https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.0.md#mesh-maintenance - if g.mesh.getOrDefault(topic).len < GossipSubDlo and topic in g.topics: - var availPeers = toSeq(g.gossipsub.getOrDefault(topic)) - shuffle(availPeers) - if availPeers.len > GossipSubD: - availPeers = availPeers[0.. GossipSubDhi: - trace "about to prune mesh", mesh = g.mesh.getOrDefault(topic).len - + # prune peers if we've gone over + # ATTN possible perf bottleneck here... score is a "red" function # and we call a lot of Table[] etc etc @@ -169,7 +203,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = elif peerx == peery: 0 else: 1) - while g.mesh.getOrDefault(topic).len > GossipSubD: + while g.mesh[topic].len > GossipSubD: trace "pruning peers", peers = g.mesh[topic].len # pop a low score peer @@ -184,13 +218,16 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = await p.sendPrune(@[topic]) libp2p_gossipsub_peers_per_topic_gossipsub - .set(g.gossipsub.getOrDefault(topic).len.int64, labelValues = [topic]) + .set(g.gossipsub.getOrDefault(topic).len.int64, + labelValues = [topic]) libp2p_gossipsub_peers_per_topic_fanout - .set(g.fanout.getOrDefault(topic).len.int64, labelValues = [topic]) + .set(g.fanout.getOrDefault(topic).len.int64, + labelValues = [topic]) libp2p_gossipsub_peers_per_topic_mesh - .set(g.mesh.getOrDefault(topic).len.int64, labelValues = [topic]) + .set(g.mesh.getOrDefault(topic).len.int64, + labelValues = [topic]) trace "mesh balanced, got peers", peers = g.mesh.getOrDefault(topic).len, topicId = topic @@ -217,93 +254,104 @@ proc dropFanoutPeers(g: GossipSub) {.async.} = proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} = ## gossip iHave messages to peers + ## + + trace "getting gossip peers (iHave)" let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys)) + let controlMsg = ControlMessage() for topic in topics: - let mesh: HashSet[string] = g.mesh.getOrDefault(topic) - let fanout: HashSet[string] = g.fanout.getOrDefault(topic) + var allPeers = toSeq(g.gossipsub.getOrDefault(topic)) + shuffle(allPeers) + + let mesh = g.mesh.getOrDefault(topic) + let fanout = g.fanout.getOrDefault(topic) let gossipPeers = mesh + fanout let mids = g.mcache.window(topic) - if mids.len > 0: - let ihave = ControlIHave(topicID: topic, - messageIDs: toSeq(mids)) + if mids.len <= 0: + continue - if topic notin g.gossipsub: - trace "topic not in gossip array, skipping", topicID = topic + let ihave = ControlIHave(topicID: topic, + messageIDs: toSeq(mids)) + + if topic notin g.gossipsub: + trace "topic not in gossip array, skipping", topicID = topic + continue + + for id in allPeers: + if result.len >= GossipSubD: + trace "got gossip peers", peers = result.len + break + + if allPeers.len == 0: + trace "no peers for topic, skipping", topicID = topic + break + + if id in gossipPeers: continue - - var extraPeers = toSeq(g.gossipsub[topic]) - shuffle(extraPeers) - for peer in extraPeers: - if result.len < GossipSubD and - peer notin gossipPeers and - peer notin result: - result[peer] = ControlMessage(ihave: @[ihave]) + + if id notin result: + result[id] = controlMsg + + result[id].ihave.add(ihave) proc heartbeat(g: GossipSub) {.async.} = while g.heartbeatRunning: - withLock g.heartbeatLock: - try: - trace "running heartbeat" + try: + trace "running heartbeat" - for t in toSeq(g.topics.keys): - await g.rebalanceMesh(t) + for t in toSeq(g.topics.keys): + await g.rebalanceMesh(t) - await g.dropFanoutPeers() + await g.dropFanoutPeers() - # replenish known topics to the fanout - for t in toSeq(g.fanout.keys): - g.replenishFanout(t) + # replenish known topics to the fanout + for t in toSeq(g.fanout.keys): + g.replenishFanout(t) - let peers = g.getGossipPeers() - var sent: seq[Future[void]] - for peer in peers.keys: - if peer in g.peers: - sent &= g.peers[peer].send(@[RPCMsg(control: some(peers[peer]))]) - checkFutures(await allFinished(sent)) + let peers = g.getGossipPeers() + var sent: seq[Future[void]] + for peer in peers.keys: + if peer in g.peers: + sent &= g.peers[peer].send(@[RPCMsg(control: some(peers[peer]))]) + checkFutures(await allFinished(sent)) - g.mcache.shift() # shift the cache - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "exception ocurred in gossipsub heartbeat", exc = exc.msg + g.mcache.shift() # shift the cache + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "exception ocurred in gossipsub heartbeat", exc = exc.msg await sleepAsync(1.seconds) -method handleDisconnect*(g: GossipSub, peer: PubSubPeer) {.async.} = +method handleDisconnect*(g: GossipSub, peer: PubSubPeer) = ## handle peer disconnects - trace "peer disconnected", peer=peer.id + procCall FloodSub(g).handleDisconnect(peer) - await procCall FloodSub(g).handleDisconnect(peer) + for t in toSeq(g.gossipsub.keys): + g.gossipsub[t].excl(peer.id) - withLock g.heartbeatLock: - for t in toSeq(g.gossipsub.keys): - if t in g.gossipsub: - g.gossipsub[t].excl(peer.id) + libp2p_gossipsub_peers_per_topic_gossipsub + .set(g.gossipsub.getOrDefault(t).len.int64, labelValues = [t]) - libp2p_gossipsub_peers_per_topic_gossipsub - .set(g.gossipsub.getOrDefault(t).len.int64, labelValues = [t]) + for t in toSeq(g.mesh.keys): + g.mesh[t].excl(peer.id) - # mostly for metrics - await procCall PubSub(g).subscribeTopic(t, false, peer.id) - - for t in toSeq(g.mesh.keys): - if t in g.mesh: - g.mesh[t].excl(peer.id) - - libp2p_gossipsub_peers_per_topic_mesh + libp2p_gossipsub_peers_per_topic_mesh .set(g.mesh[t].len.int64, labelValues = [t]) - for t in toSeq(g.fanout.keys): - if t in g.fanout: - g.fanout[t].excl(peer.id) + for t in toSeq(g.fanout.keys): + g.fanout[t].excl(peer.id) - libp2p_gossipsub_peers_per_topic_fanout + libp2p_gossipsub_peers_per_topic_fanout .set(g.fanout[t].len.int64, labelValues = [t]) -method subscribeToPeer*(p: GossipSub, - conn: Connection) {.async.} = - await procCall PubSub(p).subscribeToPeer(conn) + if peer.peerInfo.maintain: + g.explicitPeers.excl(peer.id) + +method subscribePeer*(p: GossipSub, + conn: Connection) = + procCall PubSub(p).subscribePeer(conn) asyncCheck p.handleConn(conn, GossipSubCodec_11) method subscribeTopic*(g: GossipSub, @@ -311,28 +359,27 @@ method subscribeTopic*(g: GossipSub, subscribe: bool, peerId: string) {.gcsafe, async.} = await procCall PubSub(g).subscribeTopic(topic, subscribe, peerId) + + if topic notin g.gossipsub: + g.gossipsub[topic] = initHashSet[string]() - withLock g.heartbeatLock: - if topic notin g.gossipsub: - g.gossipsub[topic] = initHashSet[string]() - - if subscribe: - trace "adding subscription for topic", peer = peerId, name = topic - # subscribe remote peer to the topic - g.gossipsub[topic].incl(peerId) - if peerId in g.explicit: + if subscribe: + trace "adding subscription for topic", peer = peerId, name = topic + # subscribe remote peer to the topic + g.gossipsub[topic].incl(peerId) + if peerId in g.explicitPeers: g.explicit[topic].incl(peerId) - else: - trace "removing subscription for topic", peer = peerId, name = topic - # unsubscribe remote peer from the topic - g.gossipsub[topic].excl(peerId) - if peerId in g.explicit: + else: + trace "removing subscription for topic", peer = peerId, name = topic + # unsubscribe remote peer from the topic + g.gossipsub[topic].excl(peerId) + if peerId in g.explicitPeers: g.explicit[topic].excl(peerId) - libp2p_gossipsub_peers_per_topic_gossipsub - .set(g.gossipsub[topic].len.int64, labelValues = [topic]) + libp2p_gossipsub_peers_per_topic_gossipsub + .set(g.gossipsub[topic].len.int64, labelValues = [topic]) - trace "gossip peers", peers = g.gossipsub[topic].len, topic + trace "gossip peers", peers = g.gossipsub[topic].len, topic # also rebalance current topic if we are subbed to if topic in g.topics: @@ -450,29 +497,19 @@ method rpcHandler*(g: GossipSub, trace "calling handler for message", topicId = t, localPeer = g.peerInfo.id, fromPeer = msg.fromPeer.pretty - await h(t, msg.data) # trigger user provided handler + try: + await h(t, msg.data) # trigger user provided handler + except CatchableError as exc: + trace "exception in message handler", exc = exc.msg # forward the message to all peers interested in it - for p in toSendPeers: - if p in g.peers: - let id = g.peers[p].peerInfo.peerId - trace "about to forward message to peer", peerId = id, msgs = m.messages + let (published, failed) = await g.sendHelper(toSendPeers, m.messages) + for p in failed: + let peer = g.peers.getOrDefault(p) + if not(isNil(peer)): + g.handleDisconnect(peer) # cleanup failed peers - if id == peer.peerInfo.peerId: - trace "not forwarding message to originator", peerId = id - continue - - let msgs = m.messages.filterIt( - # don't forward to message originator - id != it.fromPeer - ) - - var sent: seq[Future[void]] - if msgs.len > 0: - trace "forwarding message to", peerId = id - sent.add(g.peers[p].send(@[RPCMsg(messages: msgs)])) - sent = await allFinished(sent) - checkFutures(sent) + trace "forwared message to peers", peers = published.len var respControl: ControlMessage if m.control.isSome: @@ -527,53 +564,38 @@ method publish*(g: GossipSub, debug "publish: including flood/high score peer", peer = id peers.incl(id) - if topic in g.topics: # if we're subscribed use the mesh - peers = g.mesh.getOrDefault(topic) - else: # not subscribed, send to fanout peers - # try optimistically + if topic in g.topics: # if we're subscribed use the mesh + peers = g.mesh.getOrDefault(topic) + else: # not subscribed, send to fanout peers + # try optimistically + peers = g.fanout.getOrDefault(topic) + if peers.len == 0: + # ok we had nothing.. let's try replenish inline + g.replenishFanout(topic) peers = g.fanout.getOrDefault(topic) - if peers.len == 0: - # ok we had nothing.. let's try replenish inline - g.replenishFanout(topic) - peers = g.fanout.getOrDefault(topic) - let - msg = Message.init(g.peerInfo, data, topic, g.sign) - msgId = g.msgIdProvider(msg) + let + msg = Message.init(g.peerInfo, data, topic, g.sign) + msgId = g.msgIdProvider(msg) - trace "created new message", msg + trace "created new message", msg - trace "publishing on topic", name = topic, peers = peers - if msgId notin g.mcache: - g.mcache.put(msgId, msg) + trace "publishing on topic", name = topic, peers = peers + if msgId notin g.mcache: + g.mcache.put(msgId, msg) - var sent: seq[Future[void]] - for p in peers: - # avoid sending to self - if p == g.peerInfo.id: - continue - - let peer = g.peers.getOrDefault(p) - if not isNil(peer) and not isNil(peer.peerInfo): - trace "publish: sending message to peer", peer = p - sent.add(peer.send(@[RPCMsg(messages: @[msg])])) - else: - # this absolutely should not happen - # if it happens there is a bug that needs fixing asap - # this ain't no place to manage connections - fatal "publish: peer or peerInfo was nil", missing = p - doAssert(false, "publish: peer or peerInfo was nil") - - sent = await allFinished(sent) - checkFutures(sent) + let (published, failed) = await g.sendHelper(peers, @[msg]) + for p in failed: + let peer = g.peers.getOrDefault(p) + g.handleDisconnect(peer) # cleanup failed peers + if published.len > 0: libp2p_pubsub_messages_published.inc(labelValues = [topic]) - return sent.filterIt(not it.failed).len - else: - return 0 + trace "published message to peers", peers = published.len, + msg = msg.shortLog() + return published.len - method start*(g: GossipSub) {.async.} = trace "gossipsub start" diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index b714cc7c7..75cfbf5c1 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -107,7 +107,12 @@ method subscribeTopic*(p: PubSub, topic: string, subscribe: bool, peerId: string) {.base, async.} = - discard + var peer = p.peers.getOrDefault(peerId) + if not isNil(peer): + if subscribe: + peer.topics.incl(topic) + else: + peer.topics.excl(topic) method rpcHandler*(p: PubSub, peer: PubSubPeer,