diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index f46260ecc..2691180dd 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -282,6 +282,64 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = peer, RPCMsg(control: some(respControl), messages: messages)) +proc validateAndRelay(g: GossipSub, + msg: Message, + msgId, msgIdSalted: MessageId, + peer: PubSubPeer) {.async.} = + try: + let validation = await g.validate(msg) + + var seenPeers: HashSet[PubSubPeer] + discard g.validationSeen.pop(msgIdSalted, seenPeers) + libp2p_gossipsub_duplicate_during_validation.inc(seenPeers.len.int64) + + case validation + of ValidationResult.Reject: + debug "Dropping message after validation, reason: reject", + msgId = shortLog(msgId), peer + g.punishInvalidMessage(peer, msg.topicIDs) + return + of ValidationResult.Ignore: + debug "Dropping message after validation, reason: ignore", + msgId = shortLog(msgId), peer + return + of ValidationResult.Accept: + discard + + # store in cache only after validation + g.mcache.put(msgId, msg) + + g.rewardDelivered(peer, msg.topicIDs, true) + + var toSendPeers = HashSet[PubSubPeer]() + for t in msg.topicIDs: # for every topic in the message + if t notin g.topics: + continue + + g.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) + g.mesh.withValue(t, peers): toSendPeers.incl(peers[]) + + # Don't send it to source peer, or peers that + # sent it during validation + toSendPeers.excl(peer) + toSendPeers.excl(seenPeers) + + # In theory, if topics are the same in all messages, we could batch - we'd + # also have to be careful to only include validated messages + g.broadcast(toSendPeers, RPCMsg(messages: @[msg])) + trace "forwared message to peers", peers = toSendPeers.len, msgId, peer + for topic in msg.topicIDs: + if topic notin g.topics: continue + + if g.knownTopics.contains(topic): + libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = [topic]) + else: + libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = ["generic"]) + + await handleData(g, topic, msg.data) + except CatchableError as exc: + info "validateAndRelay failed", msg=exc.msg + method rpcHandler*(g: GossipSub, peer: PubSubPeer, rpcMsg: RPCMsg) {.async.} = @@ -343,54 +401,7 @@ method rpcHandler*(g: GossipSub, # (eg, pop everything you put in it) g.validationSeen[msgIdSalted] = initHashSet[PubSubPeer]() - let validation = await g.validate(msg) - - var seenPeers: HashSet[PubSubPeer] - discard g.validationSeen.pop(msgIdSalted, seenPeers) - libp2p_gossipsub_duplicate_during_validation.inc(seenPeers.len.int64) - - case validation - of ValidationResult.Reject: - debug "Dropping message after validation, reason: reject", - msgId = shortLog(msgId), peer - g.punishInvalidMessage(peer, msg.topicIDs) - continue - of ValidationResult.Ignore: - debug "Dropping message after validation, reason: ignore", - msgId = shortLog(msgId), peer - continue - of ValidationResult.Accept: - discard - - # store in cache only after validation - g.mcache.put(msgId, msg) - - g.rewardDelivered(peer, msg.topicIDs, true) - - var toSendPeers = HashSet[PubSubPeer]() - for t in msg.topicIDs: # for every topic in the message - if t notin g.topics: - continue - - g.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) - g.mesh.withValue(t, peers): toSendPeers.incl(peers[]) - - await handleData(g, t, msg.data) - - # Don't send it to source peer, or peers that - # sent it during validation - toSendPeers.excl(peer) - toSendPeers.excl(seenPeers) - - # In theory, if topics are the same in all messages, we could batch - we'd - # also have to be careful to only include validated messages - g.broadcast(toSendPeers, RPCMsg(messages: @[msg])) - trace "forwared message to peers", peers = toSendPeers.len, msgId, peer - for topic in msg.topicIDs: - if g.knownTopics.contains(topic): - libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = [topic]) - else: - libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = ["generic"]) + asyncSpawn g.validateAndRelay(msg, msgId, msgIdSalted, peer) if rpcMsg.control.isSome(): g.handleControl(peer, rpcMsg.control.unsafeGet()) diff --git a/tests/asyncunit.nim b/tests/asyncunit.nim index 9da6838a0..fa10c9eb2 100644 --- a/tests/asyncunit.nim +++ b/tests/asyncunit.nim @@ -22,3 +22,25 @@ template asyncTest*(name: string, body: untyped): untyped = proc() {.async, gcsafe.} = body )()) + +template flakyAsyncTest*(name: string, attempts: int, body: untyped): untyped = + test name: + var attemptNumber = 0 + while attemptNumber < attempts: + let isLastAttempt = attemptNumber == attempts - 1 + inc attemptNumber + try: + waitFor(( + proc() {.async, gcsafe.} = + body + )()) + except Exception as e: + if isLastAttempt: raise e + else: testStatusIMPL = TestStatus.FAILED + finally: + if not isLastAttempt: + if testStatusIMPL == TestStatus.FAILED: + # Retry + testStatusIMPL = TestStatus.OK + else: + break diff --git a/tests/testdaemon.nim b/tests/testdaemon.nim index 885cb1c5d..5c1da3d8b 100644 --- a/tests/testdaemon.nim +++ b/tests/testdaemon.nim @@ -1,4 +1,4 @@ -import chronos, unittest2 +import chronos, unittest2, helpers import ../libp2p/daemon/daemonapi, ../libp2p/multiaddress, ../libp2p/multicodec, ../libp2p/cid, ../libp2p/multihash, ../libp2p/peerid @@ -140,9 +140,7 @@ when isMainModule: # test "Provide CID test": # check: # waitFor(provideCidTest()) == true - test "GossipSub test": - check: - waitFor(pubsubTest({PSGossipSub})) == true - test "FloodSub test": - check: - waitFor(pubsubTest({PSFloodSub})) == true + flakyAsyncTest "GossipSub test", attempts=4: + check (await pubsubTest({PSGossipSub})) == true + flakyAsyncTest "FloodSub test", attempts=4: + check (await pubsubTest({PSFloodSub})) == true