From 8a4e8a00a231885e4e4244d9efdeaabbf170326b Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Mon, 3 Jun 2024 10:34:05 +0200 Subject: [PATCH] Send IDONTWANT before validating message (#1103) --- libp2p/protocols/pubsub/gossipsub.nim | 57 +++++++++++++++++---------- 1 file changed, 36 insertions(+), 21 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 1a8be68a3..3a50e05b9 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -363,6 +363,30 @@ proc validateAndRelay(g: GossipSub, msgId: MessageId, saltedId: SaltedId, peer: PubSubPeer) {.async.} = try: + template topic: string = msg.topic + + proc addToSendPeers(toSendPeers: var HashSet[PubSubPeer]) = + g.floodsub.withValue(topic, peers): toSendPeers.incl(peers[]) + g.mesh.withValue(topic, peers): toSendPeers.incl(peers[]) + g.subscribedDirectPeers.withValue(topic, peers): toSendPeers.incl(peers[]) + toSendPeers.excl(peer) + + if msg.data.len > max(512, msgId.len * 10): + # If the message is "large enough", let the mesh know that we do not want + # any more copies of it, regardless if it is valid or not. + # + # In the case that it is not valid, this leads to some redundancy + # (since the other peer should not send us an invalid message regardless), + # but the expectation is that this is rare (due to such peers getting + # descored) and that the savings from honest peers are greater than the + # cost a dishonest peer can incur in short time (since the IDONTWANT is + # small). + var toSendPeers = HashSet[PubSubPeer]() + addToSendPeers(toSendPeers) + g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage( + idontwant: @[ControlIWant(messageIDs: @[msgId])] + ))), isHighPriority = true) + let validation = await g.validate(msg) var seenPeers: HashSet[PubSubPeer] @@ -383,40 +407,30 @@ proc validateAndRelay(g: GossipSub, of ValidationResult.Accept: discard + if topic notin g.topics: + return # Topic was unsubscribed while validating + # store in cache only after validation g.mcache.put(msgId, msg) - let topic = msg.topic g.rewardDelivered(peer, topic, true) + # The send list typically matches the idontwant list from above, but + # might differ if validation takes time var toSendPeers = HashSet[PubSubPeer]() - if topic notin g.topics: - return - - g.floodsub.withValue(topic, peers): toSendPeers.incl(peers[]) - g.mesh.withValue(topic, peers): toSendPeers.incl(peers[]) - g.subscribedDirectPeers.withValue(topic, peers): toSendPeers.incl(peers[]) - - # Don't send it to source peer, or peers that - # sent it during validation - toSendPeers.excl(peer) + addToSendPeers(toSendPeers) + # Don't send it to peers that sent it during validation toSendPeers.excl(seenPeers) - # IDontWant is only worth it if the message is substantially - # bigger than the messageId - if msg.data.len > msgId.len * 10: - g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage( - idontwant: @[ControlIWant(messageIDs: @[msgId])] - ))), isHighPriority = true) - + var peersWhoSentIdontwant = HashSet[PubSubPeer]() for peer in toSendPeers: for heDontWant in peer.heDontWants: if saltedId in heDontWant: - seenPeers.incl(peer) + peersWhoSentIdontwant.incl(peer) libp2p_gossipsub_idontwant_saved_messages.inc libp2p_gossipsub_saved_bytes.inc(msg.data.len.int64, labelValues = ["idontwant"]) break - toSendPeers.excl(seenPeers) + toSendPeers.excl(peersWhoSentIdontwant) # avoids len(s) == length` the length of the HashSet changed while iterating over it [AssertionDefect] # In theory, if topics are the same in all messages, we could batch - we'd # also have to be careful to only include validated messages @@ -501,6 +515,8 @@ method rpcHandler*(g: GossipSub, for i in 0..