diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 8ff1b8abe..f1abead1d 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -101,7 +101,7 @@ method rpcHandler*(f: FloodSub, trace "exception in message handler", exc = exc.msg # forward the message to all peers interested in it - let published = await f.publishHelper(toSendPeers, m.messages, DefaultSendTimeout) + let published = await f.broadcast(toSendPeers, RPCMsg(messages: m.messages), DefaultSendTimeout) trace "forwared message to peers", peers = published @@ -137,7 +137,7 @@ method publish*(f: FloodSub, let msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign) # start the future but do not wait yet - let published = await f.publishHelper(f.floodsub.getOrDefault(topic), @[msg], timeout) + let published = await f.broadcast(f.floodsub.getOrDefault(topic), RPCMsg(messages: @[msg]), timeout) when defined(libp2p_expensive_metrics): libp2p_pubsub_messages_published.inc(labelValues = [topic]) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 91ee2752c..8935598a2 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -155,10 +155,10 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = .set(g.mesh.peers(topic).int64, labelValues = [topic]) # Send changes to peers after table updates to avoid stale state - for p in grafts: - await p.sendGraft(@[topic]) - for p in prunes: - await p.sendPrune(@[topic]) + let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)]))) + let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)]))) + discard g.broadcast(grafts, graft, DefaultSendTimeout) + discard g.broadcast(prunes, prune, DefaultSendTimeout) trace "mesh balanced, got peers", peers = g.mesh.peers(topic) @@ -229,10 +229,10 @@ proc heartbeat(g: GossipSub) {.async.} = g.replenishFanout(t) let peers = g.getGossipPeers() - var sent: seq[Future[void]] + var sent: seq[Future[int]] for peer, control in peers: g.peers.withValue(peer.peerId, pubsubPeer) do: - sent &= pubsubPeer[].send(RPCMsg(control: some(control))) + sent &= g.broadcast([pubsubPeer[]], RPCMsg(control: some(control)), DefaultSendTimeout) checkFutures(await allFinished(sent)) g.mcache.shift() # shift the cache @@ -436,7 +436,7 @@ method rpcHandler*(g: GossipSub, trace "exception in message handler", exc = exc.msg # forward the message to all peers interested in it - let published = await g.publishHelper(toSendPeers, m.messages, DefaultSendTimeout) + let published = await g.broadcast(toSendPeers, RPCMsg(messages: m.messages), DefaultSendTimeout) trace "forwared message to peers", peers = published @@ -453,8 +453,7 @@ method rpcHandler*(g: GossipSub, respControl.ihave.len > 0: try: info "sending control message", msg = respControl - await peer.send( - RPCMsg(control: some(respControl), messages: messages)) + let _ = await g.broadcast([peer], RPCMsg(control: some(respControl), messages: messages), DefaultSendTimeout) except CancelledError as exc: raise exc except CatchableError as exc: @@ -477,10 +476,8 @@ method unsubscribe*(g: GossipSub, let peers = g.mesh.getOrDefault(topic) g.mesh.del(topic) - var pending = newSeq[Future[void]]() - for peer in peers: - pending.add(peer.sendPrune(@[topic])) - checkFutures(await allFinished(pending)) + let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)]))) + discard g.broadcast(peers, prune, DefaultSendTimeout) method unsubscribeAll*(g: GossipSub, topic: string) {.async.} = await procCall PubSub(g).unsubscribeAll(topic) @@ -489,10 +486,8 @@ method unsubscribeAll*(g: GossipSub, topic: string) {.async.} = let peers = g.mesh.getOrDefault(topic) g.mesh.del(topic) - var pending = newSeq[Future[void]]() - for peer in peers: - pending.add(peer.sendPrune(@[topic])) - checkFutures(await allFinished(pending)) + let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)]))) + discard g.broadcast(peers, prune, DefaultSendTimeout) method publish*(g: GossipSub, topic: string, @@ -533,7 +528,7 @@ method publish*(g: GossipSub, if msgId notin g.mcache: g.mcache.put(msgId, msg) - let published = await g.publishHelper(peers, @[msg], timeout) + let published = await g.broadcast(peers, RPCMsg(messages: @[msg]), timeout) when defined(libp2p_expensive_metrics): if published > 0: libp2p_pubsub_messages_published.inc(labelValues = [topic]) diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 7c0cc6369..72fc5138a 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -72,12 +72,45 @@ method unsubscribePeer*(p: PubSub, peerId: PeerID) {.base.} = libp2p_pubsub_peers.set(p.peers.len.int64) +proc broadcast*(p: PubSub, + sendPeers: HashSet[PubSubPeer] | seq[PubSubPeer] | array[1, PubSubPeer], + msg: RPCMsg, + timeout: Duration): Future[int] {.async.} = + # send messages and cleanup failed peers + var sent: seq[tuple[id: PeerID, fut: Future[void]]] + for sendPeer in sendPeers: + if sendPeer.isNil: + continue + + # avoid sending to self + if sendPeer.peerId == p.peerInfo.peerId: + continue + + trace "broadcast to peer", peer = sendPeer.id, message = msg + sent.add((id: sendPeer.peerId, fut: sendPeer.send(msg, timeout))) + + var broadcasted: seq[PeerID] + var failed: seq[PeerID] + let futs = await allFinished(sent.mapIt(it.fut)) + for s in futs: + let f = sent.filterIt(it.fut == s) + if f.len > 0: + if s.failed: + trace "broadcast to peer failed", peer = f[0].id + p.unsubscribePeer(f[0].id) + failed.add(f[0].id) + else: + trace "broadcast to peer succeeded", peer = f[0].id + broadcasted.add(f[0].id) + + return broadcasted.len + proc sendSubs*(p: PubSub, peer: PubSubPeer, topics: seq[string], subscribe: bool) {.async.} = ## send subscriptions to remote peer - await peer.sendSubOpts(topics, subscribe) + discard await p.broadcast([peer], RPCMsg(subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it))), DefaultSendTimeout) method subscribeTopic*(p: PubSub, topic: string, @@ -218,39 +251,6 @@ method subscribe*(p: PubSub, # metrics libp2p_pubsub_topics.set(p.topics.len.int64) -proc publishHelper*(p: PubSub, - sendPeers: HashSet[PubSubPeer], - msgs: seq[Message], - timeout: Duration): Future[int] {.async.} = - # send messages and cleanup failed peers - var sent: seq[tuple[id: PeerID, fut: Future[void]]] - for sendPeer in sendPeers: - if sendPeer.isNil: - continue - - # avoid sending to self - if sendPeer.peerId == p.peerInfo.peerId: - continue - - trace "sending messages to peer", peer = sendPeer.id, msgs - sent.add((id: sendPeer.peerId, fut: sendPeer.send(RPCMsg(messages: msgs), timeout))) - - var published: seq[PeerID] - var failed: seq[PeerID] - let futs = await allFinished(sent.mapIt(it.fut)) - for s in futs: - let f = sent.filterIt(it.fut == s) - if f.len > 0: - if s.failed: - trace "sending messages to peer failed", peer = f[0].id - p.unsubscribePeer(f[0].id) - failed.add(f[0].id) - else: - trace "sending messages to peer succeeded", peer = f[0].id - published.add(f[0].id) - - return published.len - method publish*(p: PubSub, topic: string, data: seq[byte], diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 5de6f4dbf..6843852ca 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -190,39 +190,6 @@ proc send*( raise exc -proc sendSubOpts*(p: PubSubPeer, topics: seq[string], subscribe: bool) {.async.} = - trace "sending subscriptions", peer = p.id, subscribe, topicIDs = topics - - try: - await p.send(RPCMsg( - subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it)))) - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "exception sending subscriptions", exc = exc.msg - -proc sendGraft*(p: PubSubPeer, topics: seq[string]) {.async.} = - trace "sending graft to peer", peer = p.id, topicIDs = topics - - try: - await p.send(RPCMsg(control: some( - ControlMessage(graft: topics.mapIt(ControlGraft(topicID: it)))))) - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "exception sending grafts", exc = exc.msg - -proc sendPrune*(p: PubSubPeer, topics: seq[string]) {.async.} = - trace "sending prune to peer", peer = p.id, topicIDs = topics - - try: - await p.send(RPCMsg(control: some( - ControlMessage(prune: topics.mapIt(ControlPrune(topicID: it)))))) - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "exception sending prunes", exc = exc.msg - proc `$`*(p: PubSubPeer): string = p.id