feat(gossipsub): support version 1.2.0 (#1106)

This commit is contained in:
diegomrsantos 2024-06-12 15:46:47 +02:00 committed by GitHub
parent dc83a1e9b6
commit 96bfefc928
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 35 additions and 26 deletions

View File

@ -223,7 +223,8 @@ method init*(g: GossipSub) =
trace "GossipSub handler leaks an error", exc = exc.msg, conn
g.handler = handler
g.codecs &= GossipSubCodec
g.codecs &= GossipSubCodec_12
g.codecs &= GossipSubCodec_11
g.codecs &= GossipSubCodec_10
method onNewPeer*(g: GossipSub, peer: PubSubPeer) =
@ -408,10 +409,10 @@ proc validateAndRelay(
# descored) and that the savings from honest peers are greater than the
# cost a dishonest peer can incur in short time (since the IDONTWANT is
# small).
var toSendPeers = HashSet[PubSubPeer]()
addToSendPeers(toSendPeers)
var peersToSendIDontWant = HashSet[PubSubPeer]()
addToSendPeers(peersToSendIDontWant)
g.broadcast(
toSendPeers,
peersToSendIDontWant,
RPCMsg(
control:
some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])]))
@ -456,18 +457,17 @@ proc validateAndRelay(
# Don't send it to peers that sent it during validation
toSendPeers.excl(seenPeers)
var peersWhoSentIdontwant = HashSet[PubSubPeer]()
for peer in toSendPeers:
for heDontWant in peer.heDontWants:
if saltedId in heDontWant:
peersWhoSentIdontwant.incl(peer)
proc isMsgInIdontWant(it: PubSubPeer): bool =
for iDontWant in it.iDontWants:
if saltedId in iDontWant:
libp2p_gossipsub_idontwant_saved_messages.inc
libp2p_gossipsub_saved_bytes.inc(
msg.data.len.int64, labelValues = ["idontwant"]
)
break
toSendPeers.excl(peersWhoSentIdontwant)
# avoids len(s) == length` the length of the HashSet changed while iterating over it [AssertionDefect]
return true
return false
toSendPeers.exclIfIt(isMsgInIdontWant(it))
# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages

View File

@ -306,9 +306,9 @@ proc handleIHave*(
proc handleIDontWant*(g: GossipSub, peer: PubSubPeer, iDontWants: seq[ControlIWant]) =
for dontWant in iDontWants:
for messageId in dontWant.messageIDs:
if peer.heDontWants[^1].len > 1000:
if peer.iDontWants[^1].len > 1000:
break
peer.heDontWants[^1].incl(g.salt(messageId))
peer.iDontWants[^1].incl(g.salt(messageId))
proc handleIWant*(
g: GossipSub, peer: PubSubPeer, iwants: seq[ControlIWant]
@ -705,9 +705,9 @@ proc onHeartbeat(g: GossipSub) =
peer.sentIHaves.addFirst(default(HashSet[MessageId]))
if peer.sentIHaves.len > g.parameters.historyLength:
discard peer.sentIHaves.popLast()
peer.heDontWants.addFirst(default(HashSet[SaltedId]))
if peer.heDontWants.len > g.parameters.historyLength:
discard peer.heDontWants.popLast()
peer.iDontWants.addFirst(default(HashSet[SaltedId]))
if peer.iDontWants.len > g.parameters.historyLength:
discard peer.iDontWants.popLast()
peer.iHaveBudget = IHavePeerBudget
peer.pingBudget = PingsPeerBudget

View File

@ -18,7 +18,8 @@ import "../../.."/[peerid, multiaddress, utility]
export options, tables, sets
const
GossipSubCodec* = "/meshsub/1.1.0"
GossipSubCodec_12* = "/meshsub/1.2.0"
GossipSubCodec_11* = "/meshsub/1.1.0"
GossipSubCodec_10* = "/meshsub/1.0.0"
# overlay parameters

View File

@ -103,7 +103,7 @@ type
score*: float64
sentIHaves*: Deque[HashSet[MessageId]]
heDontWants*: Deque[HashSet[SaltedId]]
iDontWants*: Deque[HashSet[SaltedId]]
## IDONTWANT contains unvalidated message id:s which may be long and/or
## expensive to look up, so we apply the same salting to them as during
## unvalidated message processing
@ -545,5 +545,5 @@ proc new*(
maxNumElementsInNonPriorityQueue: maxNumElementsInNonPriorityQueue,
)
result.sentIHaves.addFirst(default(HashSet[MessageId]))
result.heDontWants.addFirst(default(HashSet[SaltedId]))
result.iDontWants.addFirst(default(HashSet[SaltedId]))
result.startSendNonPriorityTask()

View File

@ -9,7 +9,7 @@
{.push raises: [].}
import std/options, std/macros
import std/[sets, options, macros]
import stew/[byteutils, results]
export results
@ -140,3 +140,11 @@ template toOpt*[T, E](self: Result[T, E]): Opt[T] =
Opt.some(temp.unsafeGet())
else:
Opt.none(type(T))
template exclIfIt*[T](set: var HashSet[T], condition: untyped) =
if set.len != 0:
var toExcl = HashSet[T]()
for it {.inject.} in set:
if condition:
toExcl.incl(it)
set.excl(toExcl)

View File

@ -854,16 +854,16 @@ suite "GossipSub":
isHighPriority = true,
)
checkUntilTimeout:
gossip2.mesh.getOrDefault("foobar").anyIt(it.heDontWants[^1].len == 1)
gossip2.mesh.getOrDefault("foobar").anyIt(it.iDontWants[^1].len == 1)
tryPublish await nodes[0].publish("foobar", newSeq[byte](10000)), 1
await bFinished
checkUntilTimeout:
toSeq(gossip3.mesh.getOrDefault("foobar")).anyIt(it.heDontWants[^1].len == 1)
toSeq(gossip3.mesh.getOrDefault("foobar")).anyIt(it.iDontWants[^1].len == 1)
check:
toSeq(gossip1.mesh.getOrDefault("foobar")).anyIt(it.heDontWants[^1].len == 0)
toSeq(gossip1.mesh.getOrDefault("foobar")).anyIt(it.iDontWants[^1].len == 0)
await allFuturesThrowing(
nodes[0].switch.stop(), nodes[1].switch.stop(), nodes[2].switch.stop()

View File

@ -28,9 +28,9 @@ type TestGossipSub* = ref object of GossipSub
proc getPubSubPeer*(p: TestGossipSub, peerId: PeerId): PubSubPeer =
proc getConn(): Future[Connection] =
p.switch.dial(peerId, GossipSubCodec)
p.switch.dial(peerId, GossipSubCodec_12)
let pubSubPeer = PubSubPeer.new(peerId, getConn, nil, GossipSubCodec, 1024 * 1024)
let pubSubPeer = PubSubPeer.new(peerId, getConn, nil, GossipSubCodec_12, 1024 * 1024)
debug "created new pubsub peer", peerId
p.peers[peerId] = pubSubPeer