diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index de4656708..62d973d6e 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -85,7 +85,7 @@ method rpcHandler*(f: FloodSub, # g.anonymize needs no evaluation when receiving messages # as we have a "lax" policy and allow signed messages - if not (await f.validate(msg)): + if (await f.validate(msg)) == ValidationResult.Reject: trace "Dropping message due to failed validation", msgId, peer continue diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 9d7bf6b51..9468cb7ab 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -1062,7 +1062,7 @@ method rpcHandler*(g: GossipSub, # g.anonymize needs no evaluation when receiving messages # as we have a "lax" policy and allow signed messages - if not (await g.validate(msg)): + if (await g.validate(msg)) == ValidationResult.Reject: debug "Dropping message due to failed validation", msgId, peer g.punishPeer(peer, msg) continue diff --git a/libp2p/protocols/pubsub/gossipsub10.nim b/libp2p/protocols/pubsub/gossipsub10.nim index 01afcfebf..cb97c1025 100644 --- a/libp2p/protocols/pubsub/gossipsub10.nim +++ b/libp2p/protocols/pubsub/gossipsub10.nim @@ -464,7 +464,7 @@ method rpcHandler*(g: GossipSub, # g.anonymize needs no evaluation when receiving messages # as we have a "lax" policy and allow signed messages - if not (await g.validate(msg)): + if (await g.validate(msg)) == ValidationResult.Reject: trace "Dropping message due to failed validation", msgId, peer continue diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 0cbca3210..c06736fd2 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -33,6 +33,7 @@ declareGauge(libp2p_pubsub_peers, "pubsub peer instances") declareGauge(libp2p_pubsub_topics, "pubsub subscribed topics") declareCounter(libp2p_pubsub_validation_success, "pubsub successfully validated messages") declareCounter(libp2p_pubsub_validation_failure, "pubsub failed validated messages") +declareCounter(libp2p_pubsub_validation_ignore, "pubsub ignore validated messages") when defined(libp2p_expensive_metrics): declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"]) @@ -40,8 +41,11 @@ type TopicHandler* = proc(topic: string, data: seq[byte]): Future[void] {.gcsafe.} + ValidationResult* {.pure.} = enum + Accept, Reject, Ignore + ValidatorHandler* = proc(topic: string, - message: Message): Future[bool] {.gcsafe, closure.} + message: Message): Future[ValidationResult] {.gcsafe, closure.} TopicPair* = tuple[topic: string, handler: TopicHandler] @@ -309,8 +313,8 @@ method removeValidator*(p: PubSub, if t in p.validators: p.validators[t].excl(hook) -method validate*(p: PubSub, message: Message): Future[bool] {.async, base.} = - var pending: seq[Future[bool]] +method validate*(p: PubSub, message: Message): Future[ValidationResult] {.async, base.} = + var pending: seq[Future[ValidationResult]] trace "about to validate message" for topic in message.topicIDs: trace "looking for validators on topic", topicID = topic, @@ -320,12 +324,24 @@ method validate*(p: PubSub, message: Message): Future[bool] {.async, base.} = # TODO: add timeout to validator pending.add(p.validators[topic].mapIt(it(topic, message))) + result = ValidationResult.Accept let futs = await allFinished(pending) - result = futs.allIt(not it.failed and it.read()) - if result: + for fut in futs: + if fut.failed: + result = ValidationResult.Reject + break + let res = fut.read() + if res != ValidationResult.Accept: + result = res + break + + case result + of ValidationResult.Accept: libp2p_pubsub_validation_success.inc() - else: + of ValidationResult.Reject: libp2p_pubsub_validation_failure.inc() + of ValidationResult.Ignore: + libp2p_pubsub_validation_ignore.inc() proc init*[PubParams: object | bool]( P: typedesc[PubSub], diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index dcd8f0253..367ad25d6 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -159,10 +159,10 @@ suite "FloodSub": var validatorFut = newFuture[bool]() proc validator(topic: string, - message: Message): Future[bool] {.async.} = + message: Message): Future[ValidationResult] {.async.} = check topic == "foobar" validatorFut.complete(true) - result = true + result = ValidationResult.Accept nodes[1].addValidator("foobar", validator) @@ -210,9 +210,9 @@ suite "FloodSub": var validatorFut = newFuture[bool]() proc validator(topic: string, - message: Message): Future[bool] {.async.} = + message: Message): Future[ValidationResult] {.async.} = validatorFut.complete(true) - result = false + result = ValidationResult.Reject nodes[1].addValidator("foobar", validator) @@ -262,11 +262,11 @@ suite "FloodSub": await waitSub(nodes[0], nodes[1], "bar") proc validator(topic: string, - message: Message): Future[bool] {.async.} = + message: Message): Future[ValidationResult] {.async.} = if topic == "foo": - result = true + result = ValidationResult.Accept else: - result = false + result = ValidationResult.Reject nodes[1].addValidator("foo", "bar", validator) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index e50182e3c..77046bed7 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -112,10 +112,10 @@ suite "GossipSub": var validatorFut = newFuture[bool]() proc validator(topic: string, message: Message): - Future[bool] {.async.} = + Future[ValidationResult] {.async.} = check topic == "foobar" validatorFut.complete(true) - result = true + result = ValidationResult.Accept nodes[1].addValidator("foobar", validator) tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 @@ -178,8 +178,8 @@ suite "GossipSub": var validatorFut = newFuture[bool]() proc validator(topic: string, message: Message): - Future[bool] {.async.} = - result = false + Future[ValidationResult] {.async.} = + result = ValidationResult.Reject validatorFut.complete(true) nodes[1].addValidator("foobar", validator) @@ -232,13 +232,13 @@ suite "GossipSub": var passed, failed: Future[bool] = newFuture[bool]() proc validator(topic: string, message: Message): - Future[bool] {.async.} = + Future[ValidationResult] {.async.} = result = if topic == "foo": passed.complete(true) - true + ValidationResult.Accept else: failed.complete(true) - false + ValidationResult.Reject nodes[1].addValidator("foo", "bar", validator) tryPublish await nodes[0].publish("foo", "Hello!".toBytes()), 1