From d306c448fa75a310a7ebba5e3b8867e659fa1355 Mon Sep 17 00:00:00 2001 From: Tanguy Date: Tue, 24 Jan 2023 10:55:51 +0100 Subject: [PATCH] GossipSub: cancel inflight msgs when receiving duplicate --- libp2p/protocols/pubsub/gossipsub.nim | 27 +++++++++++- .../protocols/pubsub/gossipsub/behavior.nim | 2 +- libp2p/protocols/pubsub/gossipsub/types.nim | 2 + libp2p/protocols/pubsub/pubsub.nim | 41 +++++++++++++++---- libp2p/protocols/pubsub/pubsubpeer.nim | 8 ++-- 5 files changed, 65 insertions(+), 15 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 1ee60dbaa..66d921102 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -43,6 +43,7 @@ logScope: declareCounter(libp2p_gossipsub_failed_publish, "number of failed publish") declareCounter(libp2p_gossipsub_invalid_topic_subscription, "number of invalid topic subscriptions that happened") declareCounter(libp2p_gossipsub_duplicate_during_validation, "number of duplicates received during message validation") +declareCounter(libp2p_gossipsub_duplicate_during_broadcast, "number of duplicates received during message broadcast") declareCounter(libp2p_gossipsub_duplicate, "number of duplicates received") declareCounter(libp2p_gossipsub_received, "number of messages received (deduplicated)") @@ -291,10 +292,28 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"]) trace "sending control message", msg = shortLog(respControl), peer - g.send( + asyncSpawn g.send( peer, RPCMsg(control: some(respControl), messages: messages)) +proc lazyBroadcast( + g: GossipSub, + peers: seq[PubSubPeer], + msgIdSalted: MessageId, + msg: RPCMsg) {.async.} = + let + futsTable = g.cancellableBroadcast(peers, msg) + futs = toSeq(futsTable.values) + g.sendingFutures[msgIdSalted] = futsTable + await allFutures(futs) + g.sendingFutures.del(msgIdSalted) + + # This isn't equal to the amount of saved bandwidth, + # because lower layer may send the data even after + # cancellation + let cancelledCount = futs.countIt(it.cancelled) + libp2p_gossipsub_duplicate_during_broadcast.inc(cancelledCount.int64) + proc validateAndRelay(g: GossipSub, msg: Message, msgId, msgIdSalted: MessageId, @@ -339,7 +358,7 @@ proc validateAndRelay(g: GossipSub, # In theory, if topics are the same in all messages, we could batch - we'd # also have to be careful to only include validated messages - g.broadcast(toSendPeers, RPCMsg(messages: @[msg])) + asyncSpawn g.lazyBroadcast(toSeq(toSendPeers), msgIdSalted, RPCMsg(messages: @[msg])) trace "forwared message to peers", peers = toSendPeers.len, msgId, peer for topic in msg.topicIds: if topic notin g.topics: continue @@ -397,6 +416,10 @@ method rpcHandler*(g: GossipSub, let delay = Moment.now() - g.firstSeen(msgId) g.rewardDelivered(peer, msg.topicIds, false, delay) + g.sendingFutures.withValue(msgIdSalted, futs): + futs[].withValue(peer.peerId, fut): + fut[].cancel() + libp2p_gossipsub_duplicate.inc() # onto the next message diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index 57ea160ee..8f2cd09a4 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -677,7 +677,7 @@ proc onHeartbeat(g: GossipSub) {.raises: [Defect].} = libp2p_pubsub_broadcast_ihave.inc(labelValues = [ihave.topicId]) else: libp2p_pubsub_broadcast_ihave.inc(labelValues = ["generic"]) - g.send(peer, RPCMsg(control: some(control))) + asyncSpawn g.send(peer, RPCMsg(control: some(control))) g.mcache.shift() # shift the cache diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index fdc5314a8..088d5bcf1 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -145,6 +145,7 @@ type BackoffTable* = Table[string, Table[PeerId, Moment]] ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]] + SendingFuturesTable* = Table[MessageId, Table[PeerId, Future[void]]] RoutingRecordsPair* = tuple[id: PeerId, record: Option[PeerRecord]] RoutingRecordsHandler* = @@ -164,6 +165,7 @@ type control*: Table[string, ControlMessage] # pending control messages mcache*: MCache # messages cache validationSeen*: ValidationSeenTable # peers who sent us message in validation + sendingFutures*: SendingFuturesTable # messages which are currently being sent heartbeatFut*: Future[void] # cancellation future for heartbeat interval scoringHeartbeatFut*: Future[void] # cancellation future for scoring heartbeat interval heartbeatRunning*: bool diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 1cc8d9810..786a9243d 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -139,20 +139,17 @@ method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} = libp2p_pubsub_peers.set(p.peers.len.int64) -proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) {.raises: [Defect].} = +proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg): Future[void] {.raises: [Defect].} = ## Attempt to send `msg` to remote peer ## trace "sending pubsub message to peer", peer, msg = shortLog(msg) peer.send(msg, p.anonymize) -proc broadcast*( +proc updateBroadcastMetrics( p: PubSub, - sendPeers: auto, # Iteratble[PubSubPeer] - msg: RPCMsg) {.raises: [Defect].} = - ## Attempt to send `msg` to the given peers - - let npeers = sendPeers.len.int64 + msg: RPCMsg, + npeers: int64) = for sub in msg.subscriptions: if sub.subscribe: if p.knownTopics.contains(sub.topic): @@ -192,24 +189,50 @@ proc broadcast*( else: libp2p_pubsub_broadcast_prune.inc(npeers, labelValues = ["generic"]) +proc broadcast*( + p: PubSub, + sendPeers: auto, # Iteratble[PubSubPeer] + msg: RPCMsg) {.raises: [Defect].} = + ## Attempt to send `msg` to the given peers + + p.updateBroadcastMetrics(msg, sendPeers.len.int64) trace "broadcasting messages to peers", peers = sendPeers.len, msg = shortLog(msg) if anyIt(sendPeers, it.hasObservers): for peer in sendPeers: - p.send(peer, msg) + asyncSpawn p.send(peer, msg) else: # Fast path that only encodes message once let encoded = encodeRpcMsg(msg, p.anonymize) for peer in sendPeers: asyncSpawn peer.sendEncoded(encoded) +proc cancellableBroadcast*( + p: PubSub, + sendPeers: auto, # Iteratble[PubSubPeer] + msg: RPCMsg): Table[PeerId, Future[void]] {.raises: [Defect].} = + ## Attempt to send `msg` to the given peers + + p.updateBroadcastMetrics(msg, sendPeers.len) + trace "broadcasting messages to peers", + peers = sendPeers.len, msg = shortLog(msg) + + if anyIt(sendPeers, it.hasObservers): + for peer in sendPeers: + result[peer.peerId] = p.send(peer, msg) + else: + # Fast path that only encodes message once + let encoded = encodeRpcMsg(msg, p.anonymize) + for peer in sendPeers: + result[peer.peerId] = peer.sendEncoded(encoded) + proc sendSubs*(p: PubSub, peer: PubSubPeer, topics: openArray[string], subscribe: bool) = ## send subscriptions to remote peer - p.send(peer, RPCMsg.withSubs(topics, subscribe)) + asyncSpawn p.send(peer, RPCMsg.withSubs(topics, subscribe)) for topic in topics: if subscribe: diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index ebdbd4d20..25749ce48 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -248,7 +248,9 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [Defect], async.} = try: await conn.writeLp(msg) trace "sent pubsub message to remote", conn - except CatchableError as exc: # never cancelled + except CancelledError as exc: + raise exc + except CatchableError as exc: # Because we detach the send call from the currently executing task using # asyncSpawn, no exceptions may leak out of it trace "Unable to send to remote", conn, msg = exc.msg @@ -257,7 +259,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [Defect], async.} = await conn.close() # This will clean up the send connection -proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [Defect].} = +proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool): Future[void] {.raises: [Defect].} = trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg) # When sending messages, we take care to re-encode them with the right @@ -277,7 +279,7 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [Defect].} = sendMetrics(msg) encodeRpcMsg(msg, anonymize) - asyncSpawn p.sendEncoded(encoded) + return p.sendEncoded(encoded) proc new*( T: typedesc[PubSubPeer],