Gossipsub: process messages concurrently (#680)

* Gossip sub: process messages concurrently

* Retries for flaky test
This commit is contained in:
Tanguy 2021-12-27 11:17:00 +01:00 committed by GitHub
parent 58f383e661
commit fb0d10b6fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 86 additions and 55 deletions

View File

@ -282,6 +282,64 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
peer, peer,
RPCMsg(control: some(respControl), messages: messages)) RPCMsg(control: some(respControl), messages: messages))
proc validateAndRelay(g: GossipSub,
msg: Message,
msgId, msgIdSalted: MessageId,
peer: PubSubPeer) {.async.} =
try:
let validation = await g.validate(msg)
var seenPeers: HashSet[PubSubPeer]
discard g.validationSeen.pop(msgIdSalted, seenPeers)
libp2p_gossipsub_duplicate_during_validation.inc(seenPeers.len.int64)
case validation
of ValidationResult.Reject:
debug "Dropping message after validation, reason: reject",
msgId = shortLog(msgId), peer
g.punishInvalidMessage(peer, msg.topicIDs)
return
of ValidationResult.Ignore:
debug "Dropping message after validation, reason: ignore",
msgId = shortLog(msgId), peer
return
of ValidationResult.Accept:
discard
# store in cache only after validation
g.mcache.put(msgId, msg)
g.rewardDelivered(peer, msg.topicIDs, true)
var toSendPeers = HashSet[PubSubPeer]()
for t in msg.topicIDs: # for every topic in the message
if t notin g.topics:
continue
g.floodsub.withValue(t, peers): toSendPeers.incl(peers[])
g.mesh.withValue(t, peers): toSendPeers.incl(peers[])
# Don't send it to source peer, or peers that
# sent it during validation
toSendPeers.excl(peer)
toSendPeers.excl(seenPeers)
# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
trace "forwared message to peers", peers = toSendPeers.len, msgId, peer
for topic in msg.topicIDs:
if topic notin g.topics: continue
if g.knownTopics.contains(topic):
libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = [topic])
else:
libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = ["generic"])
await handleData(g, topic, msg.data)
except CatchableError as exc:
info "validateAndRelay failed", msg=exc.msg
method rpcHandler*(g: GossipSub, method rpcHandler*(g: GossipSub,
peer: PubSubPeer, peer: PubSubPeer,
rpcMsg: RPCMsg) {.async.} = rpcMsg: RPCMsg) {.async.} =
@ -343,54 +401,7 @@ method rpcHandler*(g: GossipSub,
# (eg, pop everything you put in it) # (eg, pop everything you put in it)
g.validationSeen[msgIdSalted] = initHashSet[PubSubPeer]() g.validationSeen[msgIdSalted] = initHashSet[PubSubPeer]()
let validation = await g.validate(msg) asyncSpawn g.validateAndRelay(msg, msgId, msgIdSalted, peer)
var seenPeers: HashSet[PubSubPeer]
discard g.validationSeen.pop(msgIdSalted, seenPeers)
libp2p_gossipsub_duplicate_during_validation.inc(seenPeers.len.int64)
case validation
of ValidationResult.Reject:
debug "Dropping message after validation, reason: reject",
msgId = shortLog(msgId), peer
g.punishInvalidMessage(peer, msg.topicIDs)
continue
of ValidationResult.Ignore:
debug "Dropping message after validation, reason: ignore",
msgId = shortLog(msgId), peer
continue
of ValidationResult.Accept:
discard
# store in cache only after validation
g.mcache.put(msgId, msg)
g.rewardDelivered(peer, msg.topicIDs, true)
var toSendPeers = HashSet[PubSubPeer]()
for t in msg.topicIDs: # for every topic in the message
if t notin g.topics:
continue
g.floodsub.withValue(t, peers): toSendPeers.incl(peers[])
g.mesh.withValue(t, peers): toSendPeers.incl(peers[])
await handleData(g, t, msg.data)
# Don't send it to source peer, or peers that
# sent it during validation
toSendPeers.excl(peer)
toSendPeers.excl(seenPeers)
# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
trace "forwared message to peers", peers = toSendPeers.len, msgId, peer
for topic in msg.topicIDs:
if g.knownTopics.contains(topic):
libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = [topic])
else:
libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = ["generic"])
if rpcMsg.control.isSome(): if rpcMsg.control.isSome():
g.handleControl(peer, rpcMsg.control.unsafeGet()) g.handleControl(peer, rpcMsg.control.unsafeGet())

View File

@ -22,3 +22,25 @@ template asyncTest*(name: string, body: untyped): untyped =
proc() {.async, gcsafe.} = proc() {.async, gcsafe.} =
body body
)()) )())
template flakyAsyncTest*(name: string, attempts: int, body: untyped): untyped =
test name:
var attemptNumber = 0
while attemptNumber < attempts:
let isLastAttempt = attemptNumber == attempts - 1
inc attemptNumber
try:
waitFor((
proc() {.async, gcsafe.} =
body
)())
except Exception as e:
if isLastAttempt: raise e
else: testStatusIMPL = TestStatus.FAILED
finally:
if not isLastAttempt:
if testStatusIMPL == TestStatus.FAILED:
# Retry
testStatusIMPL = TestStatus.OK
else:
break

View File

@ -1,4 +1,4 @@
import chronos, unittest2 import chronos, unittest2, helpers
import ../libp2p/daemon/daemonapi, ../libp2p/multiaddress, ../libp2p/multicodec, import ../libp2p/daemon/daemonapi, ../libp2p/multiaddress, ../libp2p/multicodec,
../libp2p/cid, ../libp2p/multihash, ../libp2p/peerid ../libp2p/cid, ../libp2p/multihash, ../libp2p/peerid
@ -140,9 +140,7 @@ when isMainModule:
# test "Provide CID test": # test "Provide CID test":
# check: # check:
# waitFor(provideCidTest()) == true # waitFor(provideCidTest()) == true
test "GossipSub test": flakyAsyncTest "GossipSub test", attempts=4:
check: check (await pubsubTest({PSGossipSub})) == true
waitFor(pubsubTest({PSGossipSub})) == true flakyAsyncTest "FloodSub test", attempts=4:
test "FloodSub test": check (await pubsubTest({PSFloodSub})) == true
check:
waitFor(pubsubTest({PSFloodSub})) == true