Extended validators (#395)

* gossip extended validation

* fix flood tests

* fix gossip 1.0 tests

* synthax consistency
This commit is contained in:
Giovanni Petrantoni 2020-10-12 16:56:00 +09:00 committed by GitHub
parent c81b665b0d
commit 556213abf4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 39 additions and 23 deletions

View File

@ -85,7 +85,7 @@ method rpcHandler*(f: FloodSub,
# g.anonymize needs no evaluation when receiving messages # g.anonymize needs no evaluation when receiving messages
# as we have a "lax" policy and allow signed messages # as we have a "lax" policy and allow signed messages
if not (await f.validate(msg)): if (await f.validate(msg)) == ValidationResult.Reject:
trace "Dropping message due to failed validation", msgId, peer trace "Dropping message due to failed validation", msgId, peer
continue continue

View File

@ -1062,7 +1062,7 @@ method rpcHandler*(g: GossipSub,
# g.anonymize needs no evaluation when receiving messages # g.anonymize needs no evaluation when receiving messages
# as we have a "lax" policy and allow signed messages # as we have a "lax" policy and allow signed messages
if not (await g.validate(msg)): if (await g.validate(msg)) == ValidationResult.Reject:
debug "Dropping message due to failed validation", msgId, peer debug "Dropping message due to failed validation", msgId, peer
g.punishPeer(peer, msg) g.punishPeer(peer, msg)
continue continue

View File

@ -464,7 +464,7 @@ method rpcHandler*(g: GossipSub,
# g.anonymize needs no evaluation when receiving messages # g.anonymize needs no evaluation when receiving messages
# as we have a "lax" policy and allow signed messages # as we have a "lax" policy and allow signed messages
if not (await g.validate(msg)): if (await g.validate(msg)) == ValidationResult.Reject:
trace "Dropping message due to failed validation", msgId, peer trace "Dropping message due to failed validation", msgId, peer
continue continue

View File

@ -33,6 +33,7 @@ declareGauge(libp2p_pubsub_peers, "pubsub peer instances")
declareGauge(libp2p_pubsub_topics, "pubsub subscribed topics") declareGauge(libp2p_pubsub_topics, "pubsub subscribed topics")
declareCounter(libp2p_pubsub_validation_success, "pubsub successfully validated messages") declareCounter(libp2p_pubsub_validation_success, "pubsub successfully validated messages")
declareCounter(libp2p_pubsub_validation_failure, "pubsub failed validated messages") declareCounter(libp2p_pubsub_validation_failure, "pubsub failed validated messages")
declareCounter(libp2p_pubsub_validation_ignore, "pubsub ignore validated messages")
when defined(libp2p_expensive_metrics): when defined(libp2p_expensive_metrics):
declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"]) declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"])
@ -40,8 +41,11 @@ type
TopicHandler* = proc(topic: string, TopicHandler* = proc(topic: string,
data: seq[byte]): Future[void] {.gcsafe.} data: seq[byte]): Future[void] {.gcsafe.}
ValidationResult* {.pure.} = enum
Accept, Reject, Ignore
ValidatorHandler* = proc(topic: string, ValidatorHandler* = proc(topic: string,
message: Message): Future[bool] {.gcsafe, closure.} message: Message): Future[ValidationResult] {.gcsafe, closure.}
TopicPair* = tuple[topic: string, handler: TopicHandler] TopicPair* = tuple[topic: string, handler: TopicHandler]
@ -309,8 +313,8 @@ method removeValidator*(p: PubSub,
if t in p.validators: if t in p.validators:
p.validators[t].excl(hook) p.validators[t].excl(hook)
method validate*(p: PubSub, message: Message): Future[bool] {.async, base.} = method validate*(p: PubSub, message: Message): Future[ValidationResult] {.async, base.} =
var pending: seq[Future[bool]] var pending: seq[Future[ValidationResult]]
trace "about to validate message" trace "about to validate message"
for topic in message.topicIDs: for topic in message.topicIDs:
trace "looking for validators on topic", topicID = topic, trace "looking for validators on topic", topicID = topic,
@ -320,12 +324,24 @@ method validate*(p: PubSub, message: Message): Future[bool] {.async, base.} =
# TODO: add timeout to validator # TODO: add timeout to validator
pending.add(p.validators[topic].mapIt(it(topic, message))) pending.add(p.validators[topic].mapIt(it(topic, message)))
result = ValidationResult.Accept
let futs = await allFinished(pending) let futs = await allFinished(pending)
result = futs.allIt(not it.failed and it.read()) for fut in futs:
if result: if fut.failed:
result = ValidationResult.Reject
break
let res = fut.read()
if res != ValidationResult.Accept:
result = res
break
case result
of ValidationResult.Accept:
libp2p_pubsub_validation_success.inc() libp2p_pubsub_validation_success.inc()
else: of ValidationResult.Reject:
libp2p_pubsub_validation_failure.inc() libp2p_pubsub_validation_failure.inc()
of ValidationResult.Ignore:
libp2p_pubsub_validation_ignore.inc()
proc init*[PubParams: object | bool]( proc init*[PubParams: object | bool](
P: typedesc[PubSub], P: typedesc[PubSub],

View File

@ -159,10 +159,10 @@ suite "FloodSub":
var validatorFut = newFuture[bool]() var validatorFut = newFuture[bool]()
proc validator(topic: string, proc validator(topic: string,
message: Message): Future[bool] {.async.} = message: Message): Future[ValidationResult] {.async.} =
check topic == "foobar" check topic == "foobar"
validatorFut.complete(true) validatorFut.complete(true)
result = true result = ValidationResult.Accept
nodes[1].addValidator("foobar", validator) nodes[1].addValidator("foobar", validator)
@ -210,9 +210,9 @@ suite "FloodSub":
var validatorFut = newFuture[bool]() var validatorFut = newFuture[bool]()
proc validator(topic: string, proc validator(topic: string,
message: Message): Future[bool] {.async.} = message: Message): Future[ValidationResult] {.async.} =
validatorFut.complete(true) validatorFut.complete(true)
result = false result = ValidationResult.Reject
nodes[1].addValidator("foobar", validator) nodes[1].addValidator("foobar", validator)
@ -262,11 +262,11 @@ suite "FloodSub":
await waitSub(nodes[0], nodes[1], "bar") await waitSub(nodes[0], nodes[1], "bar")
proc validator(topic: string, proc validator(topic: string,
message: Message): Future[bool] {.async.} = message: Message): Future[ValidationResult] {.async.} =
if topic == "foo": if topic == "foo":
result = true result = ValidationResult.Accept
else: else:
result = false result = ValidationResult.Reject
nodes[1].addValidator("foo", "bar", validator) nodes[1].addValidator("foo", "bar", validator)

View File

@ -112,10 +112,10 @@ suite "GossipSub":
var validatorFut = newFuture[bool]() var validatorFut = newFuture[bool]()
proc validator(topic: string, proc validator(topic: string,
message: Message): message: Message):
Future[bool] {.async.} = Future[ValidationResult] {.async.} =
check topic == "foobar" check topic == "foobar"
validatorFut.complete(true) validatorFut.complete(true)
result = true result = ValidationResult.Accept
nodes[1].addValidator("foobar", validator) nodes[1].addValidator("foobar", validator)
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
@ -178,8 +178,8 @@ suite "GossipSub":
var validatorFut = newFuture[bool]() var validatorFut = newFuture[bool]()
proc validator(topic: string, proc validator(topic: string,
message: Message): message: Message):
Future[bool] {.async.} = Future[ValidationResult] {.async.} =
result = false result = ValidationResult.Reject
validatorFut.complete(true) validatorFut.complete(true)
nodes[1].addValidator("foobar", validator) nodes[1].addValidator("foobar", validator)
@ -232,13 +232,13 @@ suite "GossipSub":
var passed, failed: Future[bool] = newFuture[bool]() var passed, failed: Future[bool] = newFuture[bool]()
proc validator(topic: string, proc validator(topic: string,
message: Message): message: Message):
Future[bool] {.async.} = Future[ValidationResult] {.async.} =
result = if topic == "foo": result = if topic == "foo":
passed.complete(true) passed.complete(true)
true ValidationResult.Accept
else: else:
failed.complete(true) failed.complete(true)
false ValidationResult.Reject
nodes[1].addValidator("foo", "bar", validator) nodes[1].addValidator("foo", "bar", validator)
tryPublish await nodes[0].publish("foo", "Hello!".toBytes()), 1 tryPublish await nodes[0].publish("foo", "Hello!".toBytes()), 1