diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 50b5e0b20..73324e1e6 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -79,8 +79,8 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = var msg = decodeRpcMsg(data) trace "decoded msg from peer", peer = p.id, msg = msg.shortLog # trigger hooks - for obs in p.observers[]: - obs.onRecv(p, msg) + p.recvObservers(msg) + await p.handler(p, @[msg]) p.recvdRpcCache.put(digest) finally: diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 1a315fc58..5d0281b94 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -224,7 +224,7 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = except CancelledError as exc: raise exc except CatchableError as exc: - debug "ending multistream", err = exc.msg + trace "error in multistream", err = exc.msg proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index 82312f8fb..169917a6d 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -50,7 +50,7 @@ suite "FloodSub": let nodes = generateNodes(2) - nodesFut = await all( + nodesFut = await allFinished( nodes[0].start(), nodes[1].start() ) @@ -64,12 +64,12 @@ suite "FloodSub": result = await completionFut.wait(5.seconds) - await all( + await allFuturesThrowing( nodes[0].stop(), nodes[1].stop() ) - await all(nodesFut.concat()) + await allFuturesThrowing(nodesFut.concat()) check: waitFor(runTests()) == true @@ -95,8 +95,8 @@ suite "FloodSub": result = await completionFut.wait(5.seconds) - await all(nodes[0].stop(), nodes[1].stop()) - await all(awaiters) + await allFuturesThrowing(nodes[0].stop(), nodes[1].stop()) + await allFuturesThrowing(awaiters) check: waitFor(runTests()) == true @@ -129,8 +129,10 @@ suite "FloodSub": await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) check (await handlerFut) == true - await all(nodes[0].stop(), nodes[1].stop()) - await all(awaiters) + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop()) + await allFuturesThrowing(awaiters) result = true check: @@ -160,8 +162,10 @@ suite "FloodSub": await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) - await all(nodes[0].stop(), nodes[1].stop()) - await all(awaiters) + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop()) + await allFuturesThrowing(awaiters) result = true check: @@ -197,8 +201,10 @@ suite "FloodSub": await nodes[0].publish("foo", cast[seq[byte]]("Hello!")) await nodes[0].publish("bar", cast[seq[byte]]("Hello!")) - await all(nodes[0].stop(), nodes[1].stop()) - await all(awaiters) + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop()) + await allFuturesThrowing(awaiters) result = true check: @@ -242,16 +248,16 @@ suite "FloodSub": for y in 0..= runs: + seenFut.complete() + + subs &= dialer.subscribe("foobar", handler) + + await allFuturesThrowing(subs) + await wait(nodes[0].publish("foobar", + cast[seq[byte]]("from node " & + nodes[1].peerInfo.id)), + 1.minutes) + + await wait(seenFut, 5.minutes) + check: seen.len >= runs + for k, v in seen.pairs: + check: v == 1 + + await allFuturesThrowing(nodes.mapIt(it.stop())) + await allFuturesThrowing(awaitters) result = true check: diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index 4b2e3be2b..337364cc8 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -17,6 +17,20 @@ proc subscribeNodes*(nodes: seq[Switch]) {.async.} = dials.add(dialer.connect(node.peerInfo)) await allFutures(dials) +proc subscribeSparseNodes*(nodes: seq[Switch], degree: int = 2) {.async.} = + if nodes.len < degree: + raise (ref CatchableError)(msg: "nodes count needs to be greater or equal to degree!") + + var dials: seq[Future[void]] + for i, dialer in nodes: + if (i mod degree) != 0: + continue + + for node in nodes: + if dialer.peerInfo.peerId != node.peerInfo.peerId: + dials.add(dialer.connect(node.peerInfo)) + await allFutures(dials) + proc subscribeRandom*(nodes: seq[Switch]) {.async.} = var dials: seq[Future[void]] for dialer in nodes: diff --git a/tests/testbufferstream.nim b/tests/testbufferstream.nim index 2a57c7bc3..e8d6e3821 100644 --- a/tests/testbufferstream.nim +++ b/tests/testbufferstream.nim @@ -250,7 +250,7 @@ suite "BufferStream": await buf1.pushTo(cast[seq[byte]]("Hello2!")) await buf2.pushTo(cast[seq[byte]]("Hello1!")) - await all(readFut1, readFut2) + await allFuturesThrowing(readFut1, readFut2) check: res1 == cast[seq[byte]]("Hello2!") @@ -300,7 +300,7 @@ suite "BufferStream": await buf1.write(cast[seq[byte]]("Hello1!")) await buf2.write(cast[seq[byte]]("Hello2!")) - await all(readFut1, readFut2) + await allFuturesThrowing(readFut1, readFut2) check: res1 == cast[seq[byte]]("Hello2!") @@ -376,7 +376,7 @@ suite "BufferStream": await buf1.write(cast[seq[byte]]("Hello1!")) await buf2.write(cast[seq[byte]]("Hello2!")) - await all(readFut1, readFut2) + await allFuturesThrowing(readFut1, readFut2) check: res1 == cast[seq[byte]]("Hello2!") @@ -437,7 +437,7 @@ suite "BufferStream": var writerFut = writer() var readerFut = reader() - await all(readerFut, writerFut) + await allFuturesThrowing(readerFut, writerFut) result = true await buf1.close() diff --git a/tests/testmplex.nim b/tests/testmplex.nim index de6c3922c..36633d80c 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -245,7 +245,9 @@ suite "Mplex": await done.wait(1.seconds) await conn.close() await mplexDialFut - await all(transport1.close(), transport2.close()) + await allFuturesThrowing( + transport1.close(), + transport2.close()) await listenFut waitFor(testNewStream()) @@ -284,7 +286,9 @@ suite "Mplex": await done.wait(1.seconds) await conn.close() await mplexDialFut - await all(transport1.close(), transport2.close()) + await allFuturesThrowing( + transport1.close(), + transport2.close()) await listenFut waitFor(testNewStream()) @@ -331,7 +335,9 @@ suite "Mplex": await stream.close() await conn.close() await mplexDialFut - await all(transport1.close(), transport2.close()) + await allFuturesThrowing( + transport1.close(), + transport2.close()) await listenFut waitFor(testNewStream()) @@ -368,7 +374,9 @@ suite "Mplex": await done.wait(1.seconds) await conn.close() await mplexDialFut - await all(transport1.close(), transport2.close()) + await allFuturesThrowing( + transport1.close(), + transport2.close()) await listenFut waitFor(testNewStream()) @@ -410,7 +418,9 @@ suite "Mplex": await done.wait(10.seconds) await conn.close() await mplexDialFut - await all(transport1.close(), transport2.close()) + await allFuturesThrowing( + transport1.close(), + transport2.close()) await listenFut waitFor(testNewStream()) @@ -454,7 +464,9 @@ suite "Mplex": await done.wait(5.seconds) await conn.close() await mplexDialFut - await all(transport1.close(), transport2.close()) + await allFuturesThrowing( + transport1.close(), + transport2.close()) await listenFut waitFor(testNewStream()) @@ -522,7 +534,9 @@ suite "Mplex": await complete.wait(1.seconds) await mplexDialFut - await all(transport1.close(), transport2.close()) + await allFuturesThrowing( + transport1.close(), + transport2.close()) await listenFut waitFor(test()) @@ -579,7 +593,9 @@ suite "Mplex": await stream.close() await conn.close() await mplexDialFut - await all(transport1.close(), transport2.close()) + await allFuturesThrowing( + transport1.close(), + transport2.close()) await listenFut waitFor(test()) diff --git a/tests/testmultistream.nim b/tests/testmultistream.nim index ba7a6c46f..ecb64f58c 100644 --- a/tests/testmultistream.nim +++ b/tests/testmultistream.nim @@ -277,7 +277,9 @@ suite "Multistream select": await transport2.close() await transport1.close() - await all(handlerWait1.wait(5000.millis) #[if OK won't happen!!]#, handlerWait2.wait(5000.millis) #[if OK won't happen!!]#) + await allFuturesThrowing( + handlerWait1.wait(5000.millis), + handlerWait2.wait(5000.millis)) check: waitFor(endToEnd()) == true diff --git a/tests/testnoise.nim b/tests/testnoise.nim index 5565d25c5..778328564 100644 --- a/tests/testnoise.nim +++ b/tests/testnoise.nim @@ -222,8 +222,10 @@ suite "Noise": check "Hello!" == msg await conn.close() - await all(switch1.stop(), switch2.stop()) - await all(awaiters) + await allFuturesThrowing( + switch1.stop(), + switch2.stop()) + await allFuturesThrowing(awaiters) result = true check: diff --git a/tests/testswitch.nim b/tests/testswitch.nim index 81bde522d..725433a2a 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -67,14 +67,13 @@ suite "Switch": check "Hello!" == msg await conn.close() - await all( - done.wait(5.seconds), #[if OK won't happen!!]# + await allFuturesThrowing( + done.wait(5.seconds), switch1.stop(), - switch2.stop(), - ) + switch2.stop()) # this needs to go at end - await all(awaiters) + await allFuturesThrowing(awaiters) waitFor(testSwitch()) @@ -125,14 +124,14 @@ suite "Switch": check (ConnectionTracker(connTracker).opened == (ConnectionTracker(connTracker).closed + 8.uint64)) - await all( - done.wait(5.seconds), #[if OK won't happen!!]# + await allFuturesThrowing( + done.wait(5.seconds), switch1.stop(), switch2.stop(), ) # this needs to go at end - await all(awaiters) + await allFuturesThrowing(awaiters) waitFor(testSwitch()) @@ -172,12 +171,12 @@ suite "Switch": except LPStreamError: result = false - await all( + await allFuturesThrowing( conn.close(), switch1.stop(), switch2.stop() ) - await all(awaiters) + await allFuturesThrowing(awaiters) check: waitFor(testSwitch()) == true @@ -210,11 +209,10 @@ suite "Switch": check switch1.connections.len == 0 check switch2.connections.len == 0 - await all( + await allFuturesThrowing( switch1.stop(), - switch2.stop() - ) - await all(awaiters) + switch2.stop()) + await allFuturesThrowing(awaiters) waitFor(testSwitch())