feat: idontwant on publish

This commit is contained in:
Richard Ramos 2024-12-17 18:55:16 -04:00 committed by richΛrd
parent 4dc7a89f45
commit 4f9c21f699
3 changed files with 78 additions and 21 deletions

View File

@ -383,6 +383,38 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
trace "sending iwant reply messages", peer
g.send(peer, RPCMsg(messages: messages), isHighPriority = false)
proc sendIDontWant(
g: GossipSub,
msg: Message,
msgId: MessageId,
peersToSendIDontWant: HashSet[PubSubPeer],
) =
# If the message is "large enough", let the mesh know that we do not want
# any more copies of it, regardless if it is valid or not.
#
# In the case that it is not valid, this leads to some redundancy
# (since the other peer should not send us an invalid message regardless),
# but the expectation is that this is rare (due to such peers getting
# descored) and that the savings from honest peers are greater than the
# cost a dishonest peer can incur in short time (since the IDONTWANT is
# small).
# IDONTWANT is only supported by >= GossipSubCodec_12
let peers = peersToSendIDontWant.filterIt(
it.codec != GossipSubCodec_10 and it.codec != GossipSubCodec_11
)
g.broadcast(
peers,
RPCMsg(
control: some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])]))
),
isHighPriority = true,
)
proc isLargeMessage(msg: Message, msgId: MessageId): bool =
msg.data.len > max(512, msgId.len * 10)
proc validateAndRelay(
g: GossipSub, msg: Message, msgId: MessageId, saltedId: SaltedId, peer: PubSubPeer
) {.async.} =
@ -399,29 +431,10 @@ proc validateAndRelay(
toSendPeers.incl(peers[])
toSendPeers.excl(peer)
if msg.data.len > max(512, msgId.len * 10):
# If the message is "large enough", let the mesh know that we do not want
# any more copies of it, regardless if it is valid or not.
#
# In the case that it is not valid, this leads to some redundancy
# (since the other peer should not send us an invalid message regardless),
# but the expectation is that this is rare (due to such peers getting
# descored) and that the savings from honest peers are greater than the
# cost a dishonest peer can incur in short time (since the IDONTWANT is
# small).
if isLargeMessage(msg, msgId):
var peersToSendIDontWant = HashSet[PubSubPeer]()
addToSendPeers(peersToSendIDontWant)
peersToSendIDontWant.exclIfIt(
it.codec == GossipSubCodec_10 or it.codec == GossipSubCodec_11
)
g.broadcast(
peersToSendIDontWant,
RPCMsg(
control:
some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])]))
),
isHighPriority = true,
)
g.sendIDontWant(msg, msgId, peersToSendIDontWant)
let validation = await g.validate(msg)
@ -784,6 +797,9 @@ method publish*(g: GossipSub, topic: string, data: seq[byte]): Future[int] {.asy
g.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true)
if isLargeMessage(msg, msgId):
g.sendIDontWant(msg, msgId, peers)
if g.knownTopics.contains(topic):
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic])
else:

View File

@ -149,3 +149,11 @@ template exclIfIt*[T](set: var HashSet[T], condition: untyped) =
if condition:
toExcl.incl(it)
set.excl(toExcl)
template filterIt*[T](set: HashSet[T], condition: untyped): HashSet[T] =
var filtered = HashSet[T]()
if set.len != 0:
for it {.inject.} in set:
if condition:
filtered.incl(it)
filtered

View File

@ -928,6 +928,39 @@ suite "GossipSub":
await allFuturesThrowing(nodesFut.concat())
asyncTest "e2e - iDontWant is broadcasted on publish":
func dumbMsgIdProvider(m: Message): Result[MessageId, ValidationResult] =
ok(newSeq[byte](10))
let
nodes = generateNodes(2, gossip = true, msgIdProvider = dumbMsgIdProvider)
nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start())
await nodes[0].switch.connect(
nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs
)
proc handlerA(topic: string, data: seq[byte]) {.async.} =
discard
proc handlerB(topic: string, data: seq[byte]) {.async.} =
discard
nodes[0].subscribe("foobar", handlerA)
nodes[1].subscribe("foobar", handlerB)
await waitSubGraph(nodes, "foobar")
var gossip2: GossipSub = GossipSub(nodes[1])
tryPublish await nodes[0].publish("foobar", newSeq[byte](10000)), 1
checkUntilTimeout:
gossip2.mesh.getOrDefault("foobar").anyIt(it.iDontWants[^1].len == 1)
await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())
await allFuturesThrowing(nodesFut.concat())
asyncTest "e2e - iDontWant is sent only for 1.2":
# 3 nodes: A <=> B <=> C
# (A & C are NOT connected). We pre-emptively send a dontwant from C to B,