diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 05f7ab009..6e41e0eb1 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -99,49 +99,48 @@ proc replenishFanout(g: GossipSub, topic: string) = trace "fanout replenished with peers", peers = g.fanout.peers(topic) proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = + logScope: + topic + trace "about to rebalance mesh" + # create a mesh topic that we're subscribing to var grafts, prunes: seq[PubSubPeer] if g.mesh.peers(topic) < GossipSubDlo: - trace "replenishing mesh", topic, peers = g.mesh.peers(topic) - # replenish the mesh if we're below GossipSubDlo - var newPeers = toSeq( + trace "replenishing mesh", peers = g.mesh.peers(topic) + # replenish the mesh if we're below Dlo + grafts = toSeq( g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) - g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()) ) logScope: - topic = topic meshPeers = g.mesh.peers(topic) - newPeers = newPeers.len + grafts = grafts.len - shuffle(newPeers) + shuffle(grafts) - trace "getting peers", topic, peers = newPeers.len + # Graft peers so we reach a count of D + grafts.setLen(min(grafts.len, GossipSubD - g.mesh.peers(topic))) - for peer in newPeers: - # send a graft message to the peer - grafts.add peer - discard g.mesh.addPeer(topic, peer) - trace "got peer", peer = $peer + trace "getting peers", topic, peers = grafts.len + + for peer in grafts: + if g.mesh.addPeer(topic, peer): + g.fanout.removePeer(topic, peer) if g.mesh.peers(topic) > GossipSubDhi: - # prune peers if we've gone over - var mesh = toSeq(g.mesh[topic]) - shuffle(mesh) + # prune peers if we've gone over Dhi + prunes = toSeq(g.mesh[topic]) + shuffle(prunes) + prunes.setLen(prunes.len - GossipSubD) # .. down to D peers - trace "about to prune mesh", mesh = mesh.len - for peer in mesh: - if g.mesh.peers(topic) <= GossipSubD: - break - - trace "pruning peers", peers = g.mesh.peers(topic) - # send a graft message to the peer + trace "about to prune mesh", prunes = prunes.len + for peer in prunes: g.mesh.removePeer(topic, peer) - prunes.add(peer) libp2p_gossipsub_peers_per_topic_gossipsub .set(g.gossipsub.peers(topic).int64, labelValues = [topic]) @@ -158,8 +157,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = for p in prunes: await p.sendPrune(@[topic]) - trace "mesh balanced, got peers", peers = g.mesh.peers(topic), - topicId = topic + trace "mesh balanced, got peers", peers = g.mesh.peers(topic) proc dropFanoutPeers(g: GossipSub) = # drop peers that we haven't published to in @@ -275,17 +273,21 @@ method subscribeTopic*(g: GossipSub, peerId: string) {.gcsafe, async.} = await procCall FloodSub(g).subscribeTopic(topic, subscribe, peerId) + logScope: + peer = peerId + topic + let peer = g.peers.getOrDefault(peerId) if peer == nil: debug "subscribeTopic on a nil peer!" return if subscribe: - trace "adding subscription for topic", peer = peerId, name = topic + trace "peer subscribed to topic" # subscribe remote peer to the topic discard g.gossipsub.addPeer(topic, peer) else: - trace "removing subscription for topic", peer = peerId, name = topic + trace "peer unsubscribed from topic" # unsubscribe remote peer from the topic g.gossipsub.removePeer(topic, peer) g.mesh.removePeer(topic, peer) @@ -310,7 +312,11 @@ proc handleGraft(g: GossipSub, grafts: seq[ControlGraft]): seq[ControlPrune] = for graft in grafts: let topic = graft.topicID - trace "processing graft message", topic, peer = $peer + logScope: + peer = peer.id + topic + + trace "peer grafted topic" # If they send us a graft before they send us a subscribe, what should # we do? For now, we add them to mesh but don't add them to gossipsub. @@ -323,19 +329,21 @@ proc handleGraft(g: GossipSub, if g.mesh.addPeer(topic, peer): g.fanout.removePeer(topic, peer) else: - trace "Peer already in mesh", topic, peer = $peer + trace "peer already in mesh" else: result.add(ControlPrune(topicID: topic)) else: + debug "peer grafting topic we're not interested in" result.add(ControlPrune(topicID: topic)) libp2p_gossipsub_peers_per_topic_mesh .set(g.mesh.peers(topic).int64, labelValues = [topic]) + libp2p_gossipsub_peers_per_topic_fanout + .set(g.fanout.peers(topic).int64, labelValues = [topic]) proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = for prune in prunes: - trace "processing prune message", peer = $peer, - topicID = prune.topicID + trace "peer pruned topic", peer = peer.id, topic = prune.topicID g.mesh.removePeer(prune.topicID, peer) libp2p_gossipsub_peers_per_topic_mesh @@ -345,9 +353,8 @@ proc handleIHave(g: GossipSub, peer: PubSubPeer, ihaves: seq[ControlIHave]): ControlIWant = for ihave in ihaves: - trace "processing ihave message", peer = $peer, - topicID = ihave.topicID, - msgs = ihave.messageIDs + trace "peer sent ihave", + peer = peer.id, topic = ihave.topicID, msgs = ihave.messageIDs if ihave.topicID in g.mesh: for m in ihave.messageIDs: @@ -359,8 +366,7 @@ proc handleIWant(g: GossipSub, iwants: seq[ControlIWant]): seq[Message] = for iwant in iwants: for mid in iwant.messageIDs: - trace "processing iwant message", peer = $peer, - messageID = mid + trace "peer sent iwant", peer = peer.id, messageID = mid let msg = g.mcache.get(mid) if msg.isSome: result.add(msg.get()) @@ -462,8 +468,8 @@ method publish*(g: GossipSub, data: seq[byte]): Future[int] {.async.} = # base returns always 0 discard await procCall PubSub(g).publish(topic, data) - trace "about to publish message on topic", name = topic, - data = data.shortLog + trace "publishing message on topic", topic, data = data.shortLog + var peers: HashSet[PubSubPeer] if topic.len <= 0: # data could be 0/empty return 0 @@ -490,9 +496,8 @@ method publish*(g: GossipSub, msg = Message.init(g.peerInfo, data, topic, g.msgSeqno, g.sign) msgId = g.msgIdProvider(msg) - trace "created new message", msg + trace "created new message", msg, topic, peers = peers.len - trace "publishing on topic", topic, peers = peers.len if msgId notin g.mcache: g.mcache.put(msgId, msg) diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 5f6888510..a850b68d1 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -210,9 +210,9 @@ method unsubscribe*(p: PubSub, if p.topics[t.topic].handler.len <= 0: p.topics.del(t.topic) -method unsubscribe*(p: PubSub, +proc unsubscribe*(p: PubSub, topic: string, - handler: TopicHandler): Future[void] {.base.} = + handler: TopicHandler): Future[void] = ## unsubscribe from a ``topic`` string p.unsubscribe(@[(topic, handler)]) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index c0f25870a..8458f1038 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -97,28 +97,30 @@ proc sendObservers(p: PubSubPeer, msg: var RPCMsg) = obs.onSend(p, msg) proc handle*(p: PubSubPeer, conn: Connection) {.async.} = - trace "handling pubsub rpc", peer = p.id, closed = conn.closed + logScope: + peer = p.id + debug "starting pubsub read loop for peer", closed = conn.closed try: try: p.refs.inc() while not conn.closed: - trace "waiting for data", peer = p.id, closed = conn.closed + trace "waiting for data", closed = conn.closed let data = await conn.readLp(64 * 1024) let digest = $(sha256.digest(data)) - trace "read data from peer", peer = p.id, data = data.shortLog + trace "read data from peer", data = data.shortLog if digest in p.recvdRpcCache: libp2p_pubsub_skipped_received_messages.inc(labelValues = [p.id]) - trace "message already received, skipping", peer = p.id + trace "message already received, skipping" continue var rmsg = decodeRpcMsg(data) if rmsg.isErr(): - notice "failed to decode msg from peer", peer = p.id + notice "failed to decode msg from peer" break var msg = rmsg.get() - trace "decoded msg from peer", peer = p.id, msg = msg.shortLog + trace "decoded msg from peer", msg = msg.shortLog # trigger hooks p.recvObservers(msg) @@ -130,7 +132,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = await p.handler(p, @[msg]) p.recvdRpcCache.put(digest) finally: - trace "exiting pubsub peer read loop", peer = p.id + debug "exiting pubsub peer read loop" await conn.close() except CancelledError as exc: @@ -196,12 +198,12 @@ proc sendSubOpts*(p: PubSubPeer, topics: seq[string], subscribe: bool): Future[v subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it)))) proc sendGraft*(p: PubSubPeer, topics: seq[string]): Future[void] = - trace "sending graft msg to peer", peer = p.id, topicIDs = topics + trace "sending graft to peer", peer = p.id, topicIDs = topics p.send(RPCMsg(control: some( ControlMessage(graft: topics.mapIt(ControlGraft(topicID: it)))))) proc sendPrune*(p: PubSubPeer, topics: seq[string]): Future[void] = - trace "sending prune msg to peer", peer = p.id, topicIDs = topics + trace "sending prune to peer", peer = p.id, topicIDs = topics p.send(RPCMsg(control: some( ControlMessage(prune: topics.mapIt(ControlPrune(topicID: it))))))