Send IDONTWANT before validating message (#1103)

This commit is contained in:
Jacek Sieka 2024-06-03 10:34:05 +02:00 committed by GitHub
parent 77d40c34f4
commit 8a4e8a00a2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 36 additions and 21 deletions

View File

@ -363,6 +363,30 @@ proc validateAndRelay(g: GossipSub,
msgId: MessageId, saltedId: SaltedId, msgId: MessageId, saltedId: SaltedId,
peer: PubSubPeer) {.async.} = peer: PubSubPeer) {.async.} =
try: try:
template topic: string = msg.topic
proc addToSendPeers(toSendPeers: var HashSet[PubSubPeer]) =
g.floodsub.withValue(topic, peers): toSendPeers.incl(peers[])
g.mesh.withValue(topic, peers): toSendPeers.incl(peers[])
g.subscribedDirectPeers.withValue(topic, peers): toSendPeers.incl(peers[])
toSendPeers.excl(peer)
if msg.data.len > max(512, msgId.len * 10):
# If the message is "large enough", let the mesh know that we do not want
# any more copies of it, regardless if it is valid or not.
#
# In the case that it is not valid, this leads to some redundancy
# (since the other peer should not send us an invalid message regardless),
# but the expectation is that this is rare (due to such peers getting
# descored) and that the savings from honest peers are greater than the
# cost a dishonest peer can incur in short time (since the IDONTWANT is
# small).
var toSendPeers = HashSet[PubSubPeer]()
addToSendPeers(toSendPeers)
g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(
idontwant: @[ControlIWant(messageIDs: @[msgId])]
))), isHighPriority = true)
let validation = await g.validate(msg) let validation = await g.validate(msg)
var seenPeers: HashSet[PubSubPeer] var seenPeers: HashSet[PubSubPeer]
@ -383,40 +407,30 @@ proc validateAndRelay(g: GossipSub,
of ValidationResult.Accept: of ValidationResult.Accept:
discard discard
if topic notin g.topics:
return # Topic was unsubscribed while validating
# store in cache only after validation # store in cache only after validation
g.mcache.put(msgId, msg) g.mcache.put(msgId, msg)
let topic = msg.topic
g.rewardDelivered(peer, topic, true) g.rewardDelivered(peer, topic, true)
# The send list typically matches the idontwant list from above, but
# might differ if validation takes time
var toSendPeers = HashSet[PubSubPeer]() var toSendPeers = HashSet[PubSubPeer]()
if topic notin g.topics: addToSendPeers(toSendPeers)
return # Don't send it to peers that sent it during validation
g.floodsub.withValue(topic, peers): toSendPeers.incl(peers[])
g.mesh.withValue(topic, peers): toSendPeers.incl(peers[])
g.subscribedDirectPeers.withValue(topic, peers): toSendPeers.incl(peers[])
# Don't send it to source peer, or peers that
# sent it during validation
toSendPeers.excl(peer)
toSendPeers.excl(seenPeers) toSendPeers.excl(seenPeers)
# IDontWant is only worth it if the message is substantially var peersWhoSentIdontwant = HashSet[PubSubPeer]()
# bigger than the messageId
if msg.data.len > msgId.len * 10:
g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(
idontwant: @[ControlIWant(messageIDs: @[msgId])]
))), isHighPriority = true)
for peer in toSendPeers: for peer in toSendPeers:
for heDontWant in peer.heDontWants: for heDontWant in peer.heDontWants:
if saltedId in heDontWant: if saltedId in heDontWant:
seenPeers.incl(peer) peersWhoSentIdontwant.incl(peer)
libp2p_gossipsub_idontwant_saved_messages.inc libp2p_gossipsub_idontwant_saved_messages.inc
libp2p_gossipsub_saved_bytes.inc(msg.data.len.int64, labelValues = ["idontwant"]) libp2p_gossipsub_saved_bytes.inc(msg.data.len.int64, labelValues = ["idontwant"])
break break
toSendPeers.excl(seenPeers) toSendPeers.excl(peersWhoSentIdontwant) # avoids len(s) == length` the length of the HashSet changed while iterating over it [AssertionDefect]
# In theory, if topics are the same in all messages, we could batch - we'd # In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages # also have to be careful to only include validated messages
@ -501,6 +515,8 @@ method rpcHandler*(g: GossipSub,
for i in 0..<rpcMsg.messages.len(): # for every message for i in 0..<rpcMsg.messages.len(): # for every message
template msg: untyped = rpcMsg.messages[i] template msg: untyped = rpcMsg.messages[i]
template topic: string = msg.topic
let msgIdResult = g.msgIdProvider(msg) let msgIdResult = g.msgIdProvider(msg)
if msgIdResult.isErr: if msgIdResult.isErr:
@ -512,7 +528,6 @@ method rpcHandler*(g: GossipSub,
let let
msgId = msgIdResult.get msgId = msgIdResult.get
msgIdSalted = g.salt(msgId) msgIdSalted = g.salt(msgId)
topic = msg.topic
if g.addSeen(msgIdSalted): if g.addSeen(msgIdSalted):
trace "Dropping already-seen message", msgId = shortLog(msgId), peer trace "Dropping already-seen message", msgId = shortLog(msgId), peer