diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index e83e5d7a6..e9fb57ffb 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -223,7 +223,8 @@ method init*(g: GossipSub) = trace "GossipSub handler leaks an error", exc = exc.msg, conn g.handler = handler - g.codecs &= GossipSubCodec + g.codecs &= GossipSubCodec_12 + g.codecs &= GossipSubCodec_11 g.codecs &= GossipSubCodec_10 method onNewPeer*(g: GossipSub, peer: PubSubPeer) = @@ -408,10 +409,10 @@ proc validateAndRelay( # descored) and that the savings from honest peers are greater than the # cost a dishonest peer can incur in short time (since the IDONTWANT is # small). - var toSendPeers = HashSet[PubSubPeer]() - addToSendPeers(toSendPeers) + var peersToSendIDontWant = HashSet[PubSubPeer]() + addToSendPeers(peersToSendIDontWant) g.broadcast( - toSendPeers, + peersToSendIDontWant, RPCMsg( control: some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])])) @@ -456,18 +457,17 @@ proc validateAndRelay( # Don't send it to peers that sent it during validation toSendPeers.excl(seenPeers) - var peersWhoSentIdontwant = HashSet[PubSubPeer]() - for peer in toSendPeers: - for heDontWant in peer.heDontWants: - if saltedId in heDontWant: - peersWhoSentIdontwant.incl(peer) + proc isMsgInIdontWant(it: PubSubPeer): bool = + for iDontWant in it.iDontWants: + if saltedId in iDontWant: libp2p_gossipsub_idontwant_saved_messages.inc libp2p_gossipsub_saved_bytes.inc( msg.data.len.int64, labelValues = ["idontwant"] ) - break - toSendPeers.excl(peersWhoSentIdontwant) - # avoids len(s) == length` the length of the HashSet changed while iterating over it [AssertionDefect] + return true + return false + + toSendPeers.exclIfIt(isMsgInIdontWant(it)) # In theory, if topics are the same in all messages, we could batch - we'd # also have to be careful to only include validated messages diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index 889b73ac3..bbcfa0c07 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -306,9 +306,9 @@ proc handleIHave*( proc handleIDontWant*(g: GossipSub, peer: PubSubPeer, iDontWants: seq[ControlIWant]) = for dontWant in iDontWants: for messageId in dontWant.messageIDs: - if peer.heDontWants[^1].len > 1000: + if peer.iDontWants[^1].len > 1000: break - peer.heDontWants[^1].incl(g.salt(messageId)) + peer.iDontWants[^1].incl(g.salt(messageId)) proc handleIWant*( g: GossipSub, peer: PubSubPeer, iwants: seq[ControlIWant] @@ -705,9 +705,9 @@ proc onHeartbeat(g: GossipSub) = peer.sentIHaves.addFirst(default(HashSet[MessageId])) if peer.sentIHaves.len > g.parameters.historyLength: discard peer.sentIHaves.popLast() - peer.heDontWants.addFirst(default(HashSet[SaltedId])) - if peer.heDontWants.len > g.parameters.historyLength: - discard peer.heDontWants.popLast() + peer.iDontWants.addFirst(default(HashSet[SaltedId])) + if peer.iDontWants.len > g.parameters.historyLength: + discard peer.iDontWants.popLast() peer.iHaveBudget = IHavePeerBudget peer.pingBudget = PingsPeerBudget diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index 3fb98749a..d50e09824 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -18,7 +18,8 @@ import "../../.."/[peerid, multiaddress, utility] export options, tables, sets const - GossipSubCodec* = "/meshsub/1.1.0" + GossipSubCodec_12* = "/meshsub/1.2.0" + GossipSubCodec_11* = "/meshsub/1.1.0" GossipSubCodec_10* = "/meshsub/1.0.0" # overlay parameters diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 2e40a6dc0..9dd00f66a 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -103,7 +103,7 @@ type score*: float64 sentIHaves*: Deque[HashSet[MessageId]] - heDontWants*: Deque[HashSet[SaltedId]] + iDontWants*: Deque[HashSet[SaltedId]] ## IDONTWANT contains unvalidated message id:s which may be long and/or ## expensive to look up, so we apply the same salting to them as during ## unvalidated message processing @@ -545,5 +545,5 @@ proc new*( maxNumElementsInNonPriorityQueue: maxNumElementsInNonPriorityQueue, ) result.sentIHaves.addFirst(default(HashSet[MessageId])) - result.heDontWants.addFirst(default(HashSet[SaltedId])) + result.iDontWants.addFirst(default(HashSet[SaltedId])) result.startSendNonPriorityTask() diff --git a/libp2p/utility.nim b/libp2p/utility.nim index 8b0870a45..542eff026 100644 --- a/libp2p/utility.nim +++ b/libp2p/utility.nim @@ -9,7 +9,7 @@ {.push raises: [].} -import std/options, std/macros +import std/[sets, options, macros] import stew/[byteutils, results] export results @@ -140,3 +140,11 @@ template toOpt*[T, E](self: Result[T, E]): Opt[T] = Opt.some(temp.unsafeGet()) else: Opt.none(type(T)) + +template exclIfIt*[T](set: var HashSet[T], condition: untyped) = + if set.len != 0: + var toExcl = HashSet[T]() + for it {.inject.} in set: + if condition: + toExcl.incl(it) + set.excl(toExcl) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index ac4eb9595..b6cc5828f 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -854,16 +854,16 @@ suite "GossipSub": isHighPriority = true, ) checkUntilTimeout: - gossip2.mesh.getOrDefault("foobar").anyIt(it.heDontWants[^1].len == 1) + gossip2.mesh.getOrDefault("foobar").anyIt(it.iDontWants[^1].len == 1) tryPublish await nodes[0].publish("foobar", newSeq[byte](10000)), 1 await bFinished checkUntilTimeout: - toSeq(gossip3.mesh.getOrDefault("foobar")).anyIt(it.heDontWants[^1].len == 1) + toSeq(gossip3.mesh.getOrDefault("foobar")).anyIt(it.iDontWants[^1].len == 1) check: - toSeq(gossip1.mesh.getOrDefault("foobar")).anyIt(it.heDontWants[^1].len == 0) + toSeq(gossip1.mesh.getOrDefault("foobar")).anyIt(it.iDontWants[^1].len == 0) await allFuturesThrowing( nodes[0].switch.stop(), nodes[1].switch.stop(), nodes[2].switch.stop() diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index 0d009deb0..abe7aa2ed 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -28,9 +28,9 @@ type TestGossipSub* = ref object of GossipSub proc getPubSubPeer*(p: TestGossipSub, peerId: PeerId): PubSubPeer = proc getConn(): Future[Connection] = - p.switch.dial(peerId, GossipSubCodec) + p.switch.dial(peerId, GossipSubCodec_12) - let pubSubPeer = PubSubPeer.new(peerId, getConn, nil, GossipSubCodec, 1024 * 1024) + let pubSubPeer = PubSubPeer.new(peerId, getConn, nil, GossipSubCodec_12, 1024 * 1024) debug "created new pubsub peer", peerId p.peers[peerId] = pubSubPeer