mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2025-03-01 16:40:32 +00:00
fix(pubsub): revert async: raises: []
annotation for TopicHandler
and ValidatorHandler
(#1237)
This commit is contained in:
parent
483e1d91ba
commit
d6e5094095
@ -75,9 +75,7 @@ proc oneNode(node: Node, rng: ref HmacDrbgContext) {.async.} =
|
|||||||
# This procedure will handle one of the node of the network
|
# This procedure will handle one of the node of the network
|
||||||
node.gossip.addValidator(
|
node.gossip.addValidator(
|
||||||
["metrics"],
|
["metrics"],
|
||||||
proc(
|
proc(topic: string, message: Message): Future[ValidationResult] {.async.} =
|
||||||
topic: string, message: Message
|
|
||||||
): Future[ValidationResult] {.async: (raises: []).} =
|
|
||||||
let decoded = MetricList.decode(message.data)
|
let decoded = MetricList.decode(message.data)
|
||||||
if decoded.isErr:
|
if decoded.isErr:
|
||||||
return ValidationResult.Reject
|
return ValidationResult.Reject
|
||||||
@ -94,7 +92,7 @@ proc oneNode(node: Node, rng: ref HmacDrbgContext) {.async.} =
|
|||||||
if node.hostname == "John":
|
if node.hostname == "John":
|
||||||
node.gossip.subscribe(
|
node.gossip.subscribe(
|
||||||
"metrics",
|
"metrics",
|
||||||
proc(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc(topic: string, data: seq[byte]) {.async.} =
|
||||||
let m = MetricList.decode(data).expect("metric can be decoded")
|
let m = MetricList.decode(data).expect("metric can be decoded")
|
||||||
echo m
|
echo m
|
||||||
,
|
,
|
||||||
|
@ -194,7 +194,7 @@ proc networking(g: Game) {.async.} =
|
|||||||
|
|
||||||
gossip.subscribe(
|
gossip.subscribe(
|
||||||
"/tron/matchmaking",
|
"/tron/matchmaking",
|
||||||
proc(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc(topic: string, data: seq[byte]) {.async.} =
|
||||||
# If we are still looking for an opponent,
|
# If we are still looking for an opponent,
|
||||||
# try to match anyone broadcasting its address
|
# try to match anyone broadcasting its address
|
||||||
if g.peerFound.finished or g.hasCandidate:
|
if g.peerFound.finished or g.hasCandidate:
|
||||||
|
@ -128,11 +128,11 @@ type
|
|||||||
PeerMessageDecodeError* = object of CatchableError
|
PeerMessageDecodeError* = object of CatchableError
|
||||||
|
|
||||||
TopicHandler* {.public.} =
|
TopicHandler* {.public.} =
|
||||||
proc(topic: string, data: seq[byte]): Future[void] {.gcsafe, async: (raises: []).}
|
proc(topic: string, data: seq[byte]): Future[void] {.gcsafe, raises: [].}
|
||||||
|
|
||||||
ValidatorHandler* {.public.} = proc(
|
ValidatorHandler* {.public.} = proc(
|
||||||
topic: string, message: Message
|
topic: string, message: Message
|
||||||
): Future[ValidationResult] {.gcsafe, async: (raises: []).}
|
): Future[ValidationResult] {.gcsafe, raises: [].}
|
||||||
|
|
||||||
TopicPair* = tuple[topic: string, handler: TopicHandler]
|
TopicPair* = tuple[topic: string, handler: TopicHandler]
|
||||||
|
|
||||||
|
@ -69,7 +69,7 @@ proc testPubSubDaemonPublish(
|
|||||||
|
|
||||||
var finished = false
|
var finished = false
|
||||||
var times = 0
|
var times = 0
|
||||||
proc nativeHandler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc nativeHandler(topic: string, data: seq[byte]) {.async.} =
|
||||||
let smsg = string.fromBytes(data)
|
let smsg = string.fromBytes(data)
|
||||||
check smsg == pubsubData
|
check smsg == pubsubData
|
||||||
times.inc()
|
times.inc()
|
||||||
@ -146,7 +146,7 @@ proc testPubSubNodePublish(
|
|||||||
result = true # don't cancel subscription
|
result = true # don't cancel subscription
|
||||||
|
|
||||||
discard await daemonNode.pubsubSubscribe(testTopic, pubsubHandler)
|
discard await daemonNode.pubsubSubscribe(testTopic, pubsubHandler)
|
||||||
proc nativeHandler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc nativeHandler(topic: string, data: seq[byte]) {.async.} =
|
||||||
discard
|
discard
|
||||||
|
|
||||||
pubsub.subscribe(testTopic, nativeHandler)
|
pubsub.subscribe(testTopic, nativeHandler)
|
||||||
|
@ -45,7 +45,7 @@ suite "FloodSub":
|
|||||||
|
|
||||||
asyncTest "FloodSub basic publish/subscribe A -> B":
|
asyncTest "FloodSub basic publish/subscribe A -> B":
|
||||||
var completionFut = newFuture[bool]()
|
var completionFut = newFuture[bool]()
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
check topic == "foobar"
|
check topic == "foobar"
|
||||||
completionFut.complete(true)
|
completionFut.complete(true)
|
||||||
|
|
||||||
@ -77,7 +77,7 @@ suite "FloodSub":
|
|||||||
|
|
||||||
asyncTest "FloodSub basic publish/subscribe B -> A":
|
asyncTest "FloodSub basic publish/subscribe B -> A":
|
||||||
var completionFut = newFuture[bool]()
|
var completionFut = newFuture[bool]()
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
check topic == "foobar"
|
check topic == "foobar"
|
||||||
completionFut.complete(true)
|
completionFut.complete(true)
|
||||||
|
|
||||||
@ -102,7 +102,7 @@ suite "FloodSub":
|
|||||||
|
|
||||||
asyncTest "FloodSub validation should succeed":
|
asyncTest "FloodSub validation should succeed":
|
||||||
var handlerFut = newFuture[bool]()
|
var handlerFut = newFuture[bool]()
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
check topic == "foobar"
|
check topic == "foobar"
|
||||||
handlerFut.complete(true)
|
handlerFut.complete(true)
|
||||||
|
|
||||||
@ -120,7 +120,7 @@ suite "FloodSub":
|
|||||||
var validatorFut = newFuture[bool]()
|
var validatorFut = newFuture[bool]()
|
||||||
proc validator(
|
proc validator(
|
||||||
topic: string, message: Message
|
topic: string, message: Message
|
||||||
): Future[ValidationResult] {.async: (raises: []).} =
|
): Future[ValidationResult] {.async.} =
|
||||||
check topic == "foobar"
|
check topic == "foobar"
|
||||||
validatorFut.complete(true)
|
validatorFut.complete(true)
|
||||||
result = ValidationResult.Accept
|
result = ValidationResult.Accept
|
||||||
@ -135,7 +135,7 @@ suite "FloodSub":
|
|||||||
await allFuturesThrowing(nodesFut)
|
await allFuturesThrowing(nodesFut)
|
||||||
|
|
||||||
asyncTest "FloodSub validation should fail":
|
asyncTest "FloodSub validation should fail":
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
check false # if we get here, it should fail
|
check false # if we get here, it should fail
|
||||||
|
|
||||||
let
|
let
|
||||||
@ -151,7 +151,7 @@ suite "FloodSub":
|
|||||||
var validatorFut = newFuture[bool]()
|
var validatorFut = newFuture[bool]()
|
||||||
proc validator(
|
proc validator(
|
||||||
topic: string, message: Message
|
topic: string, message: Message
|
||||||
): Future[ValidationResult] {.async: (raises: []).} =
|
): Future[ValidationResult] {.async.} =
|
||||||
validatorFut.complete(true)
|
validatorFut.complete(true)
|
||||||
result = ValidationResult.Reject
|
result = ValidationResult.Reject
|
||||||
|
|
||||||
@ -165,7 +165,7 @@ suite "FloodSub":
|
|||||||
|
|
||||||
asyncTest "FloodSub validation one fails and one succeeds":
|
asyncTest "FloodSub validation one fails and one succeeds":
|
||||||
var handlerFut = newFuture[bool]()
|
var handlerFut = newFuture[bool]()
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
check topic == "foo"
|
check topic == "foo"
|
||||||
handlerFut.complete(true)
|
handlerFut.complete(true)
|
||||||
|
|
||||||
@ -183,7 +183,7 @@ suite "FloodSub":
|
|||||||
|
|
||||||
proc validator(
|
proc validator(
|
||||||
topic: string, message: Message
|
topic: string, message: Message
|
||||||
): Future[ValidationResult] {.async: (raises: []).} =
|
): Future[ValidationResult] {.async.} =
|
||||||
if topic == "foo":
|
if topic == "foo":
|
||||||
result = ValidationResult.Accept
|
result = ValidationResult.Accept
|
||||||
else:
|
else:
|
||||||
@ -210,7 +210,7 @@ suite "FloodSub":
|
|||||||
futs[i] = (
|
futs[i] = (
|
||||||
fut,
|
fut,
|
||||||
(
|
(
|
||||||
proc(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc(topic: string, data: seq[byte]) {.async.} =
|
||||||
check topic == "foobar"
|
check topic == "foobar"
|
||||||
inc counter[]
|
inc counter[]
|
||||||
if counter[] == runs - 1:
|
if counter[] == runs - 1:
|
||||||
@ -257,7 +257,7 @@ suite "FloodSub":
|
|||||||
futs[i] = (
|
futs[i] = (
|
||||||
fut,
|
fut,
|
||||||
(
|
(
|
||||||
proc(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc(topic: string, data: seq[byte]) {.async.} =
|
||||||
check topic == "foobar"
|
check topic == "foobar"
|
||||||
inc counter[]
|
inc counter[]
|
||||||
if counter[] == runs - 1:
|
if counter[] == runs - 1:
|
||||||
@ -305,7 +305,7 @@ suite "FloodSub":
|
|||||||
|
|
||||||
asyncTest "FloodSub message size validation":
|
asyncTest "FloodSub message size validation":
|
||||||
var messageReceived = 0
|
var messageReceived = 0
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
check data.len < 50
|
check data.len < 50
|
||||||
inc(messageReceived)
|
inc(messageReceived)
|
||||||
|
|
||||||
@ -343,7 +343,7 @@ suite "FloodSub":
|
|||||||
|
|
||||||
asyncTest "FloodSub message size validation 2":
|
asyncTest "FloodSub message size validation 2":
|
||||||
var messageReceived = 0
|
var messageReceived = 0
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
inc(messageReceived)
|
inc(messageReceived)
|
||||||
|
|
||||||
let
|
let
|
||||||
|
@ -36,9 +36,7 @@ suite "GossipSub internal":
|
|||||||
asyncTest "subscribe/unsubscribeAll":
|
asyncTest "subscribe/unsubscribeAll":
|
||||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||||
|
|
||||||
proc handler(
|
proc handler(topic: string, data: seq[byte]): Future[void] {.gcsafe, raises: [].} =
|
||||||
topic: string, data: seq[byte]
|
|
||||||
): Future[void] {.gcsafe, async: (raises: []).} =
|
|
||||||
discard
|
discard
|
||||||
|
|
||||||
let topic = "foobar"
|
let topic = "foobar"
|
||||||
@ -670,7 +668,7 @@ suite "GossipSub internal":
|
|||||||
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} =
|
||||||
check false
|
check false
|
||||||
|
|
||||||
proc handler2(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler2(topic: string, data: seq[byte]) {.async.} =
|
||||||
discard
|
discard
|
||||||
|
|
||||||
let topic = "foobar"
|
let topic = "foobar"
|
||||||
@ -771,10 +769,10 @@ suite "GossipSub internal":
|
|||||||
|
|
||||||
var receivedMessages = new(HashSet[seq[byte]])
|
var receivedMessages = new(HashSet[seq[byte]])
|
||||||
|
|
||||||
proc handlerA(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handlerA(topic: string, data: seq[byte]) {.async.} =
|
||||||
receivedMessages[].incl(data)
|
receivedMessages[].incl(data)
|
||||||
|
|
||||||
proc handlerB(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handlerB(topic: string, data: seq[byte]) {.async.} =
|
||||||
discard
|
discard
|
||||||
|
|
||||||
nodes[0].subscribe("foobar", handlerA)
|
nodes[0].subscribe("foobar", handlerA)
|
||||||
|
@ -54,7 +54,7 @@ suite "GossipSub":
|
|||||||
|
|
||||||
asyncTest "GossipSub validation should succeed":
|
asyncTest "GossipSub validation should succeed":
|
||||||
var handlerFut = newFuture[bool]()
|
var handlerFut = newFuture[bool]()
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
check topic == "foobar"
|
check topic == "foobar"
|
||||||
handlerFut.complete(true)
|
handlerFut.complete(true)
|
||||||
|
|
||||||
@ -78,7 +78,7 @@ suite "GossipSub":
|
|||||||
var validatorFut = newFuture[bool]()
|
var validatorFut = newFuture[bool]()
|
||||||
proc validator(
|
proc validator(
|
||||||
topic: string, message: Message
|
topic: string, message: Message
|
||||||
): Future[ValidationResult] {.async: (raises: []).} =
|
): Future[ValidationResult] {.async.} =
|
||||||
check topic == "foobar"
|
check topic == "foobar"
|
||||||
validatorFut.complete(true)
|
validatorFut.complete(true)
|
||||||
result = ValidationResult.Accept
|
result = ValidationResult.Accept
|
||||||
@ -93,7 +93,7 @@ suite "GossipSub":
|
|||||||
await allFuturesThrowing(nodesFut.concat())
|
await allFuturesThrowing(nodesFut.concat())
|
||||||
|
|
||||||
asyncTest "GossipSub validation should fail (reject)":
|
asyncTest "GossipSub validation should fail (reject)":
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
check false # if we get here, it should fail
|
check false # if we get here, it should fail
|
||||||
|
|
||||||
let
|
let
|
||||||
@ -119,7 +119,7 @@ suite "GossipSub":
|
|||||||
var validatorFut = newFuture[bool]()
|
var validatorFut = newFuture[bool]()
|
||||||
proc validator(
|
proc validator(
|
||||||
topic: string, message: Message
|
topic: string, message: Message
|
||||||
): Future[ValidationResult] {.async: (raises: []).} =
|
): Future[ValidationResult] {.async.} =
|
||||||
result = ValidationResult.Reject
|
result = ValidationResult.Reject
|
||||||
validatorFut.complete(true)
|
validatorFut.complete(true)
|
||||||
|
|
||||||
@ -133,7 +133,7 @@ suite "GossipSub":
|
|||||||
await allFuturesThrowing(nodesFut.concat())
|
await allFuturesThrowing(nodesFut.concat())
|
||||||
|
|
||||||
asyncTest "GossipSub validation should fail (ignore)":
|
asyncTest "GossipSub validation should fail (ignore)":
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
check false # if we get here, it should fail
|
check false # if we get here, it should fail
|
||||||
|
|
||||||
let
|
let
|
||||||
@ -159,7 +159,7 @@ suite "GossipSub":
|
|||||||
var validatorFut = newFuture[bool]()
|
var validatorFut = newFuture[bool]()
|
||||||
proc validator(
|
proc validator(
|
||||||
topic: string, message: Message
|
topic: string, message: Message
|
||||||
): Future[ValidationResult] {.async: (raises: []).} =
|
): Future[ValidationResult] {.async.} =
|
||||||
result = ValidationResult.Ignore
|
result = ValidationResult.Ignore
|
||||||
validatorFut.complete(true)
|
validatorFut.complete(true)
|
||||||
|
|
||||||
@ -174,7 +174,7 @@ suite "GossipSub":
|
|||||||
|
|
||||||
asyncTest "GossipSub validation one fails and one succeeds":
|
asyncTest "GossipSub validation one fails and one succeeds":
|
||||||
var handlerFut = newFuture[bool]()
|
var handlerFut = newFuture[bool]()
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
check topic == "foo"
|
check topic == "foo"
|
||||||
handlerFut.complete(true)
|
handlerFut.complete(true)
|
||||||
|
|
||||||
@ -192,7 +192,7 @@ suite "GossipSub":
|
|||||||
var passed, failed: Future[bool] = newFuture[bool]()
|
var passed, failed: Future[bool] = newFuture[bool]()
|
||||||
proc validator(
|
proc validator(
|
||||||
topic: string, message: Message
|
topic: string, message: Message
|
||||||
): Future[ValidationResult] {.async: (raises: []).} =
|
): Future[ValidationResult] {.async.} =
|
||||||
result =
|
result =
|
||||||
if topic == "foo":
|
if topic == "foo":
|
||||||
passed.complete(true)
|
passed.complete(true)
|
||||||
@ -226,7 +226,7 @@ suite "GossipSub":
|
|||||||
sendCounter = 0
|
sendCounter = 0
|
||||||
validatedCounter = 0
|
validatedCounter = 0
|
||||||
|
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
discard
|
discard
|
||||||
|
|
||||||
proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) =
|
proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) =
|
||||||
@ -254,7 +254,7 @@ suite "GossipSub":
|
|||||||
|
|
||||||
proc validator(
|
proc validator(
|
||||||
topic: string, message: Message
|
topic: string, message: Message
|
||||||
): Future[ValidationResult] {.async: (raises: []).} =
|
): Future[ValidationResult] {.async.} =
|
||||||
result = if topic == "foo": ValidationResult.Accept else: ValidationResult.Reject
|
result = if topic == "foo": ValidationResult.Accept else: ValidationResult.Reject
|
||||||
|
|
||||||
nodes[1].addValidator("foo", "bar", validator)
|
nodes[1].addValidator("foo", "bar", validator)
|
||||||
@ -279,7 +279,7 @@ suite "GossipSub":
|
|||||||
|
|
||||||
asyncTest "GossipSub unsub - resub faster than backoff":
|
asyncTest "GossipSub unsub - resub faster than backoff":
|
||||||
var handlerFut = newFuture[bool]()
|
var handlerFut = newFuture[bool]()
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
check topic == "foobar"
|
check topic == "foobar"
|
||||||
handlerFut.complete(true)
|
handlerFut.complete(true)
|
||||||
|
|
||||||
@ -311,7 +311,7 @@ suite "GossipSub":
|
|||||||
var validatorFut = newFuture[bool]()
|
var validatorFut = newFuture[bool]()
|
||||||
proc validator(
|
proc validator(
|
||||||
topic: string, message: Message
|
topic: string, message: Message
|
||||||
): Future[ValidationResult] {.async: (raises: []).} =
|
): Future[ValidationResult] {.async.} =
|
||||||
check topic == "foobar"
|
check topic == "foobar"
|
||||||
validatorFut.complete(true)
|
validatorFut.complete(true)
|
||||||
result = ValidationResult.Accept
|
result = ValidationResult.Accept
|
||||||
@ -326,7 +326,7 @@ suite "GossipSub":
|
|||||||
await allFuturesThrowing(nodesFut.concat())
|
await allFuturesThrowing(nodesFut.concat())
|
||||||
|
|
||||||
asyncTest "e2e - GossipSub should add remote peer topic subscriptions":
|
asyncTest "e2e - GossipSub should add remote peer topic subscriptions":
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
discard
|
discard
|
||||||
|
|
||||||
let
|
let
|
||||||
@ -352,7 +352,7 @@ suite "GossipSub":
|
|||||||
await allFuturesThrowing(nodesFut.concat())
|
await allFuturesThrowing(nodesFut.concat())
|
||||||
|
|
||||||
asyncTest "e2e - GossipSub should add remote peer topic subscriptions if both peers are subscribed":
|
asyncTest "e2e - GossipSub should add remote peer topic subscriptions if both peers are subscribed":
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
discard
|
discard
|
||||||
|
|
||||||
let
|
let
|
||||||
@ -395,7 +395,7 @@ suite "GossipSub":
|
|||||||
|
|
||||||
asyncTest "e2e - GossipSub send over fanout A -> B":
|
asyncTest "e2e - GossipSub send over fanout A -> B":
|
||||||
var passed = newFuture[void]()
|
var passed = newFuture[void]()
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
check topic == "foobar"
|
check topic == "foobar"
|
||||||
passed.complete()
|
passed.complete()
|
||||||
|
|
||||||
@ -443,7 +443,7 @@ suite "GossipSub":
|
|||||||
|
|
||||||
asyncTest "e2e - GossipSub send over fanout A -> B for subscribed topic":
|
asyncTest "e2e - GossipSub send over fanout A -> B for subscribed topic":
|
||||||
var passed = newFuture[void]()
|
var passed = newFuture[void]()
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
check topic == "foobar"
|
check topic == "foobar"
|
||||||
passed.complete()
|
passed.complete()
|
||||||
|
|
||||||
@ -487,7 +487,7 @@ suite "GossipSub":
|
|||||||
|
|
||||||
asyncTest "e2e - GossipSub send over mesh A -> B":
|
asyncTest "e2e - GossipSub send over mesh A -> B":
|
||||||
var passed: Future[bool] = newFuture[bool]()
|
var passed: Future[bool] = newFuture[bool]()
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
check topic == "foobar"
|
check topic == "foobar"
|
||||||
passed.complete(true)
|
passed.complete(true)
|
||||||
|
|
||||||
@ -542,14 +542,14 @@ suite "GossipSub":
|
|||||||
var
|
var
|
||||||
aReceived = 0
|
aReceived = 0
|
||||||
cReceived = 0
|
cReceived = 0
|
||||||
proc handlerA(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handlerA(topic: string, data: seq[byte]) {.async.} =
|
||||||
inc aReceived
|
inc aReceived
|
||||||
check aReceived < 2
|
check aReceived < 2
|
||||||
|
|
||||||
proc handlerB(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handlerB(topic: string, data: seq[byte]) {.async.} =
|
||||||
discard
|
discard
|
||||||
|
|
||||||
proc handlerC(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handlerC(topic: string, data: seq[byte]) {.async.} =
|
||||||
inc cReceived
|
inc cReceived
|
||||||
check cReceived < 2
|
check cReceived < 2
|
||||||
cRelayed.complete()
|
cRelayed.complete()
|
||||||
@ -565,7 +565,7 @@ suite "GossipSub":
|
|||||||
|
|
||||||
proc slowValidator(
|
proc slowValidator(
|
||||||
topic: string, message: Message
|
topic: string, message: Message
|
||||||
): Future[ValidationResult] {.async: (raises: []).} =
|
): Future[ValidationResult] {.async.} =
|
||||||
try:
|
try:
|
||||||
await cRelayed
|
await cRelayed
|
||||||
# Empty A & C caches to detect duplicates
|
# Empty A & C caches to detect duplicates
|
||||||
@ -601,7 +601,7 @@ suite "GossipSub":
|
|||||||
|
|
||||||
asyncTest "e2e - GossipSub send over floodPublish A -> B":
|
asyncTest "e2e - GossipSub send over floodPublish A -> B":
|
||||||
var passed: Future[bool] = newFuture[bool]()
|
var passed: Future[bool] = newFuture[bool]()
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
check topic == "foobar"
|
check topic == "foobar"
|
||||||
passed.complete(true)
|
passed.complete(true)
|
||||||
|
|
||||||
@ -646,7 +646,7 @@ suite "GossipSub":
|
|||||||
await allFuturesThrowing(nodes.mapIt(it.switch.stop()))
|
await allFuturesThrowing(nodes.mapIt(it.switch.stop()))
|
||||||
|
|
||||||
proc connectNodes(nodes: seq[PubSub], target: PubSub) {.async.} =
|
proc connectNodes(nodes: seq[PubSub], target: PubSub) {.async.} =
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
check topic == "foobar"
|
check topic == "foobar"
|
||||||
|
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
@ -659,7 +659,7 @@ suite "GossipSub":
|
|||||||
numPeersFirstMsg: int,
|
numPeersFirstMsg: int,
|
||||||
numPeersSecondMsg: int,
|
numPeersSecondMsg: int,
|
||||||
) {.async.} =
|
) {.async.} =
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
check topic == "foobar"
|
check topic == "foobar"
|
||||||
|
|
||||||
block setup:
|
block setup:
|
||||||
@ -724,9 +724,7 @@ suite "GossipSub":
|
|||||||
var handler: TopicHandler
|
var handler: TopicHandler
|
||||||
closureScope:
|
closureScope:
|
||||||
var peerName = $dialer.peerInfo.peerId
|
var peerName = $dialer.peerInfo.peerId
|
||||||
handler = proc(
|
handler = proc(topic: string, data: seq[byte]) {.async, closure.} =
|
||||||
topic: string, data: seq[byte]
|
|
||||||
) {.async: (raises: []), closure.} =
|
|
||||||
seen.mgetOrPut(peerName, 0).inc()
|
seen.mgetOrPut(peerName, 0).inc()
|
||||||
check topic == "foobar"
|
check topic == "foobar"
|
||||||
if not seenFut.finished() and seen.len >= runs:
|
if not seenFut.finished() and seen.len >= runs:
|
||||||
@ -774,9 +772,7 @@ suite "GossipSub":
|
|||||||
var handler: TopicHandler
|
var handler: TopicHandler
|
||||||
capture dialer, i:
|
capture dialer, i:
|
||||||
var peerName = $dialer.peerInfo.peerId
|
var peerName = $dialer.peerInfo.peerId
|
||||||
handler = proc(
|
handler = proc(topic: string, data: seq[byte]) {.async, closure.} =
|
||||||
topic: string, data: seq[byte]
|
|
||||||
) {.async: (raises: []), closure.} =
|
|
||||||
try:
|
try:
|
||||||
if peerName notin seen:
|
if peerName notin seen:
|
||||||
seen[peerName] = 0
|
seen[peerName] = 0
|
||||||
@ -819,7 +815,7 @@ suite "GossipSub":
|
|||||||
# PX to A & C
|
# PX to A & C
|
||||||
#
|
#
|
||||||
# C sent his SPR, not A
|
# C sent his SPR, not A
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
discard # not used in this test
|
discard # not used in this test
|
||||||
|
|
||||||
let
|
let
|
||||||
@ -888,13 +884,13 @@ suite "GossipSub":
|
|||||||
)
|
)
|
||||||
|
|
||||||
let bFinished = newFuture[void]()
|
let bFinished = newFuture[void]()
|
||||||
proc handlerA(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handlerA(topic: string, data: seq[byte]) {.async.} =
|
||||||
discard
|
discard
|
||||||
|
|
||||||
proc handlerB(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handlerB(topic: string, data: seq[byte]) {.async.} =
|
||||||
bFinished.complete()
|
bFinished.complete()
|
||||||
|
|
||||||
proc handlerC(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handlerC(topic: string, data: seq[byte]) {.async.} =
|
||||||
doAssert false
|
doAssert false
|
||||||
|
|
||||||
nodes[0].subscribe("foobar", handlerA)
|
nodes[0].subscribe("foobar", handlerA)
|
||||||
@ -953,10 +949,10 @@ suite "GossipSub":
|
|||||||
nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs
|
nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs
|
||||||
)
|
)
|
||||||
|
|
||||||
proc handlerA(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handlerA(topic: string, data: seq[byte]) {.async.} =
|
||||||
discard
|
discard
|
||||||
|
|
||||||
proc handlerB(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handlerB(topic: string, data: seq[byte]) {.async.} =
|
||||||
discard
|
discard
|
||||||
|
|
||||||
nodes[0].subscribe("foobar", handlerA)
|
nodes[0].subscribe("foobar", handlerA)
|
||||||
@ -1003,10 +999,10 @@ suite "GossipSub":
|
|||||||
)
|
)
|
||||||
|
|
||||||
let bFinished = newFuture[void]()
|
let bFinished = newFuture[void]()
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
discard
|
discard
|
||||||
|
|
||||||
proc handlerB(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handlerB(topic: string, data: seq[byte]) {.async.} =
|
||||||
bFinished.complete()
|
bFinished.complete()
|
||||||
|
|
||||||
nodeA.subscribe("foobar", handler)
|
nodeA.subscribe("foobar", handler)
|
||||||
@ -1046,7 +1042,7 @@ suite "GossipSub":
|
|||||||
|
|
||||||
await subscribeNodes(nodes)
|
await subscribeNodes(nodes)
|
||||||
|
|
||||||
proc handle(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handle(topic: string, data: seq[byte]) {.async.} =
|
||||||
discard
|
discard
|
||||||
|
|
||||||
let gossip0 = GossipSub(nodes[0])
|
let gossip0 = GossipSub(nodes[0])
|
||||||
@ -1177,7 +1173,7 @@ suite "GossipSub":
|
|||||||
let topic = "foobar"
|
let topic = "foobar"
|
||||||
proc execValidator(
|
proc execValidator(
|
||||||
topic: string, message: messages.Message
|
topic: string, message: messages.Message
|
||||||
): Future[ValidationResult] {.async: (raises: [], raw: true).} =
|
): Future[ValidationResult] {.async: (raw: true).} =
|
||||||
let res = newFuture[ValidationResult]()
|
let res = newFuture[ValidationResult]()
|
||||||
res.complete(ValidationResult.Reject)
|
res.complete(ValidationResult.Reject)
|
||||||
res
|
res
|
||||||
@ -1223,7 +1219,7 @@ suite "GossipSub":
|
|||||||
node1.switch.peerInfo.peerId, node1.switch.peerInfo.addrs
|
node1.switch.peerInfo.peerId, node1.switch.peerInfo.addrs
|
||||||
)
|
)
|
||||||
|
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
discard
|
discard
|
||||||
|
|
||||||
node0.subscribe("foobar", handler)
|
node0.subscribe("foobar", handler)
|
||||||
|
@ -65,7 +65,7 @@ suite "GossipSub":
|
|||||||
var handler: TopicHandler
|
var handler: TopicHandler
|
||||||
closureScope:
|
closureScope:
|
||||||
var peerName = $dialer.peerInfo.peerId
|
var peerName = $dialer.peerInfo.peerId
|
||||||
handler = proc(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
handler = proc(topic: string, data: seq[byte]) {.async, closure.} =
|
||||||
seen.mgetOrPut(peerName, 0).inc()
|
seen.mgetOrPut(peerName, 0).inc()
|
||||||
info "seen up", count = seen.len
|
info "seen up", count = seen.len
|
||||||
check topic == "foobar"
|
check topic == "foobar"
|
||||||
@ -96,7 +96,7 @@ suite "GossipSub":
|
|||||||
|
|
||||||
asyncTest "GossipSub invalid topic subscription":
|
asyncTest "GossipSub invalid topic subscription":
|
||||||
var handlerFut = newFuture[bool]()
|
var handlerFut = newFuture[bool]()
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
check topic == "foobar"
|
check topic == "foobar"
|
||||||
handlerFut.complete(true)
|
handlerFut.complete(true)
|
||||||
|
|
||||||
@ -152,7 +152,7 @@ suite "GossipSub":
|
|||||||
# DO NOT SUBSCRIBE, CONNECTION SHOULD HAPPEN
|
# DO NOT SUBSCRIBE, CONNECTION SHOULD HAPPEN
|
||||||
### await subscribeNodes(nodes)
|
### await subscribeNodes(nodes)
|
||||||
|
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
discard
|
discard
|
||||||
|
|
||||||
nodes[1].subscribe("foobar", handler)
|
nodes[1].subscribe("foobar", handler)
|
||||||
@ -184,11 +184,11 @@ suite "GossipSub":
|
|||||||
)
|
)
|
||||||
|
|
||||||
var handlerFut = newFuture[void]()
|
var handlerFut = newFuture[void]()
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
check topic == "foobar"
|
check topic == "foobar"
|
||||||
handlerFut.complete()
|
handlerFut.complete()
|
||||||
|
|
||||||
proc noop(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc noop(topic: string, data: seq[byte]) {.async.} =
|
||||||
check topic == "foobar"
|
check topic == "foobar"
|
||||||
|
|
||||||
nodes[0].subscribe("foobar", noop)
|
nodes[0].subscribe("foobar", noop)
|
||||||
@ -228,7 +228,7 @@ suite "GossipSub":
|
|||||||
GossipSub(nodes[1]).parameters.graylistThreshold = 100000
|
GossipSub(nodes[1]).parameters.graylistThreshold = 100000
|
||||||
|
|
||||||
var handlerFut = newFuture[void]()
|
var handlerFut = newFuture[void]()
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
check topic == "foobar"
|
check topic == "foobar"
|
||||||
handlerFut.complete()
|
handlerFut.complete()
|
||||||
|
|
||||||
@ -272,9 +272,7 @@ suite "GossipSub":
|
|||||||
var handler: TopicHandler
|
var handler: TopicHandler
|
||||||
closureScope:
|
closureScope:
|
||||||
var peerName = $dialer.peerInfo.peerId
|
var peerName = $dialer.peerInfo.peerId
|
||||||
handler = proc(
|
handler = proc(topic: string, data: seq[byte]) {.async, closure.} =
|
||||||
topic: string, data: seq[byte]
|
|
||||||
) {.async: (raises: []), closure.} =
|
|
||||||
seen.mgetOrPut(peerName, 0).inc()
|
seen.mgetOrPut(peerName, 0).inc()
|
||||||
check topic == "foobar"
|
check topic == "foobar"
|
||||||
if not seenFut.finished() and seen.len >= runs:
|
if not seenFut.finished() and seen.len >= runs:
|
||||||
@ -326,7 +324,7 @@ suite "GossipSub":
|
|||||||
|
|
||||||
# Adding again subscriptions
|
# Adding again subscriptions
|
||||||
|
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
check topic == "foobar"
|
check topic == "foobar"
|
||||||
|
|
||||||
for i in 0 ..< runs:
|
for i in 0 ..< runs:
|
||||||
@ -362,7 +360,7 @@ suite "GossipSub":
|
|||||||
nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start())
|
nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start())
|
||||||
|
|
||||||
var handlerFut = newFuture[void]()
|
var handlerFut = newFuture[void]()
|
||||||
proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} =
|
proc handler(topic: string, data: seq[byte]) {.async.} =
|
||||||
handlerFut.complete()
|
handlerFut.complete()
|
||||||
|
|
||||||
await subscribeNodes(nodes)
|
await subscribeNodes(nodes)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user