diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index bed5636cb..8bfcf3e14 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -383,6 +383,38 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = trace "sending iwant reply messages", peer g.send(peer, RPCMsg(messages: messages), isHighPriority = false) +proc sendIDontWant( + g: GossipSub, + msg: Message, + msgId: MessageId, + peersToSendIDontWant: HashSet[PubSubPeer], +) = + # If the message is "large enough", let the mesh know that we do not want + # any more copies of it, regardless if it is valid or not. + # + # In the case that it is not valid, this leads to some redundancy + # (since the other peer should not send us an invalid message regardless), + # but the expectation is that this is rare (due to such peers getting + # descored) and that the savings from honest peers are greater than the + # cost a dishonest peer can incur in short time (since the IDONTWANT is + # small). + + # IDONTWANT is only supported by >= GossipSubCodec_12 + let peers = peersToSendIDontWant.filterIt( + it.codec != GossipSubCodec_10 and it.codec != GossipSubCodec_11 + ) + + g.broadcast( + peers, + RPCMsg( + control: some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])])) + ), + isHighPriority = true, + ) + +proc isLargeMessage(msg: Message, msgId: MessageId): bool = + msg.data.len > max(512, msgId.len * 10) + proc validateAndRelay( g: GossipSub, msg: Message, msgId: MessageId, saltedId: SaltedId, peer: PubSubPeer ) {.async.} = @@ -399,29 +431,10 @@ proc validateAndRelay( toSendPeers.incl(peers[]) toSendPeers.excl(peer) - if msg.data.len > max(512, msgId.len * 10): - # If the message is "large enough", let the mesh know that we do not want - # any more copies of it, regardless if it is valid or not. - # - # In the case that it is not valid, this leads to some redundancy - # (since the other peer should not send us an invalid message regardless), - # but the expectation is that this is rare (due to such peers getting - # descored) and that the savings from honest peers are greater than the - # cost a dishonest peer can incur in short time (since the IDONTWANT is - # small). + if isLargeMessage(msg, msgId): var peersToSendIDontWant = HashSet[PubSubPeer]() addToSendPeers(peersToSendIDontWant) - peersToSendIDontWant.exclIfIt( - it.codec == GossipSubCodec_10 or it.codec == GossipSubCodec_11 - ) - g.broadcast( - peersToSendIDontWant, - RPCMsg( - control: - some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])])) - ), - isHighPriority = true, - ) + g.sendIDontWant(msg, msgId, peersToSendIDontWant) let validation = await g.validate(msg) @@ -784,6 +797,9 @@ method publish*(g: GossipSub, topic: string, data: seq[byte]): Future[int] {.asy g.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true) + if isLargeMessage(msg, msgId): + g.sendIDontWant(msg, msgId, peers) + if g.knownTopics.contains(topic): libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic]) else: diff --git a/libp2p/utility.nim b/libp2p/utility.nim index 62fbe97bc..cdfd19ca5 100644 --- a/libp2p/utility.nim +++ b/libp2p/utility.nim @@ -149,3 +149,11 @@ template exclIfIt*[T](set: var HashSet[T], condition: untyped) = if condition: toExcl.incl(it) set.excl(toExcl) + +template filterIt*[T](set: HashSet[T], condition: untyped): HashSet[T] = + var filtered = HashSet[T]() + if set.len != 0: + for it {.inject.} in set: + if condition: + filtered.incl(it) + filtered diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 7277c3625..8ea780f90 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -928,6 +928,39 @@ suite "GossipSub": await allFuturesThrowing(nodesFut.concat()) + asyncTest "e2e - iDontWant is broadcasted on publish": + func dumbMsgIdProvider(m: Message): Result[MessageId, ValidationResult] = + ok(newSeq[byte](10)) + let + nodes = generateNodes(2, gossip = true, msgIdProvider = dumbMsgIdProvider) + + nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start()) + + await nodes[0].switch.connect( + nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs + ) + + proc handlerA(topic: string, data: seq[byte]) {.async.} = + discard + + proc handlerB(topic: string, data: seq[byte]) {.async.} = + discard + + nodes[0].subscribe("foobar", handlerA) + nodes[1].subscribe("foobar", handlerB) + await waitSubGraph(nodes, "foobar") + + var gossip2: GossipSub = GossipSub(nodes[1]) + + tryPublish await nodes[0].publish("foobar", newSeq[byte](10000)), 1 + + checkUntilTimeout: + gossip2.mesh.getOrDefault("foobar").anyIt(it.iDontWants[^1].len == 1) + + await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop()) + + await allFuturesThrowing(nodesFut.concat()) + asyncTest "e2e - iDontWant is sent only for 1.2": # 3 nodes: A <=> B <=> C # (A & C are NOT connected). We pre-emptively send a dontwant from C to B,