diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 35af8efa6..280544eaa 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -18,8 +18,6 @@ import types, logScope: topic = "MplexChannel" -const DefaultChannelSize* = DefaultBufferSize * 64 # 64kb - type LPChannel* = ref object of BufferStream id*: uint diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index ab8c71657..a82769b7a 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -564,7 +564,6 @@ when isMainModule and not defined(release): check gossipSub.fanout[topic].len == GossipSubD - await sleepAsync(101.millis) await gossipSub.dropFanoutPeers() check topic notin gossipSub.fanout @@ -603,7 +602,6 @@ when isMainModule and not defined(release): check gossipSub.fanout[topic1].len == GossipSubD check gossipSub.fanout[topic2].len == GossipSubD - await sleepAsync(101.millis) await gossipSub.dropFanoutPeers() check topic1 notin gossipSub.fanout check topic2 in gossipSub.fanout diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 43af60f11..0c9c3704d 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -372,7 +372,7 @@ proc newSwitch*(peerInfo: PeerInfo, if result.secureManagers.len == 0: # use plain text if no secure managers are provided - warn "no secure managers, falling back to palin text", codec = PlainTextCodec + warn "no secure managers, falling back to plain text", codec = PlainTextCodec result.secureManagers[PlainTextCodec] = Secure(newPlainText()) if pubSub.isSome: diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index 893bbc62a..da017ef64 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -97,6 +97,7 @@ suite "FloodSub": await allFutures(nodes[0].stop(), nodes[1].stop()) await allFutures(awaiters) result = true + check: waitFor(runTests()) == true @@ -161,6 +162,7 @@ suite "FloodSub": await allFutures(nodes[0].stop(), nodes[1].stop()) await allFutures(awaiters) result = true + check: waitFor(runTests()) == true @@ -179,14 +181,16 @@ suite "FloodSub": for node in nodes: awaitters.add(await node.start()) await node.subscribe("foobar", handler) - await sleepAsync(10.millis) + await sleepAsync(100.millis) await subscribeNodes(nodes) - await sleepAsync(10.millis) + await sleepAsync(1000.millis) for node in nodes: await node.publish("foobar", cast[seq[byte]]("Hello!")) - await sleepAsync(10.millis) + await sleepAsync(100.millis) + + await sleepAsync(3000.millis) await allFutures(nodes.mapIt(it.stop())) await allFutures(awaitters) @@ -211,19 +215,21 @@ suite "FloodSub": for node in nodes: awaitters.add((await node.start())) await node.subscribe("foobar", handler) - await sleepAsync(10.millis) + await sleepAsync(100.millis) await subscribeNodes(nodes) await sleepAsync(500.millis) for node in nodes: await node.publish("foobar", cast[seq[byte]]("Hello!")) - await sleepAsync(10.millis) + await sleepAsync(100.millis) + + await sleepAsync(5000.millis) await allFutures(nodes.mapIt(it.stop())) await allFutures(awaitters) - result = passed >= 10 # non deterministic, so at least 10 times + result = passed >= 40 # non deterministic, so at least 10 times check: waitFor(runTests()) == true diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index bd68be697..0f9e2e95d 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -335,8 +335,9 @@ suite "GossipSub": await subscribeNodes(nodes) await nodes[1].subscribe("foobar", handler) - await sleepAsync(100.millis) + await sleepAsync(1000.millis) await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) + await sleepAsync(1000.millis) var gossipSub1: GossipSub = GossipSub(nodes[0].pubSub.get()) diff --git a/tests/testbufferstream.nim b/tests/testbufferstream.nim index a388c1ff0..46ed711e1 100644 --- a/tests/testbufferstream.nim +++ b/tests/testbufferstream.nim @@ -257,7 +257,7 @@ suite "BufferStream": var msg = cast[string](data) check msg == "Hello!" await buf2.pushTo(data) - + proc writeHandler2(data: seq[byte]) {.async, gcsafe.} = var msg = cast[string](data) check msg == "Hello!" @@ -338,7 +338,7 @@ suite "BufferStream": var writerFut = writer() var readerFut = reader() - + await writerFut check: (await readerFut) == cast[seq[byte]]("Hello!") @@ -413,7 +413,7 @@ suite "BufferStream": check: waitFor(pipeTest()) == true - # TODO: Need to implement deadlock prevention when + # TODO: Need to implement deadlock prevention when # piping to self test "pipe deadlock": proc pipeTest(): Future[bool] {.async.} = @@ -421,13 +421,13 @@ suite "BufferStream": var buf1 = newBufferStream(size = 5) buf1 = buf1 | buf1 - + var count = 30000 proc reader() {.async.} = while count > 0: discard await buf1.read(7) - proc writer() {.async.} = + proc writer() {.async.} = while count > 0: await buf1.write(cast[seq[byte]]("Hello2!")) count.dec diff --git a/tests/testinterop.nim b/tests/testinterop.nim index a68eed676..a652ea9cd 100644 --- a/tests/testinterop.nim +++ b/tests/testinterop.nim @@ -54,7 +54,7 @@ proc readLp*(s: StreamTransport): Future[seq[byte]] {.async, gcsafe.} = res = LP.getUVarint(result.toOpenArray(0, i), length, size) if res == VarintStatus.Success: break - if res != VarintStatus.Success or size > DefaultReadSize: + if res != VarintStatus.Success: raise newInvalidVarintException() result.setLen(size) if size > 0.uint: @@ -115,7 +115,7 @@ proc testPubSubDaemonPublish(gossip: bool = false, count: int = 1): Future[ let smsg = cast[string](data) check smsg == pubsubData times.inc() - if times >= count: + if times >= count and not handlerFuture.finished: handlerFuture.complete(true) await nativeNode.subscribeToPeer(NativePeerInfo.init(daemonPeer.peer, diff --git a/tests/testmplex.nim b/tests/testmplex.nim index 49b1001d6..03c2705de 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -77,10 +77,9 @@ suite "Mplex": await stream.pushTo(fromHex("000873747265616d2031")) let msg = await conn.readMsg() - if msg.isSome: - check msg.get().id == 0 - check msg.get().msgType == MessageType.New - result = true + check msg.id == 0 + check msg.msgType == MessageType.New + result = true check: waitFor(testDecodeHeader()) == true @@ -92,11 +91,10 @@ suite "Mplex": await stream.pushTo(fromHex("021668656C6C6F2066726F6D206368616E6E656C20302121")) let msg = await conn.readMsg() - if msg.isSome: - check msg.get().id == 0 - check msg.get().msgType == MessageType.MsgOut - check cast[string](msg.get().data) == "hello from channel 0!!" - result = true + check msg.id == 0 + check msg.msgType == MessageType.MsgOut + check cast[string](msg.data) == "hello from channel 0!!" + result = true check: waitFor(testDecodeHeader()) == true @@ -108,15 +106,14 @@ suite "Mplex": await stream.pushTo(fromHex("8a011668656C6C6F2066726F6D206368616E6E656C20302121")) let msg = await conn.readMsg() - if msg.isSome: - check msg.get().id == 17 - check msg.get().msgType == MessageType.MsgOut - check cast[string](msg.get().data) == "hello from channel 0!!" - result = true + check msg.id == 17 + check msg.msgType == MessageType.MsgOut + check cast[string](msg.data) == "hello from channel 0!!" + result = true check: waitFor(testDecodeHeader()) == true - + test "e2e - read/write receiver": proc testNewStream(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") @@ -134,6 +131,9 @@ suite "Mplex": let transport1: TcpTransport = newTransport(TcpTransport) discard await transport1.listen(ma, connHandler) + defer: + await transport1.close() + let transport2: TcpTransport = newTransport(TcpTransport) let conn = await transport2.dial(transport1.ma) @@ -164,6 +164,8 @@ suite "Mplex": let transport1: TcpTransport = newTransport(TcpTransport) discard await transport1.listen(ma, connHandler) + defer: + await transport1.close() let transport2: TcpTransport = newTransport(TcpTransport) let conn = await transport2.dial(transport1.ma) @@ -173,7 +175,7 @@ suite "Mplex": let openState = cast[LPChannel](stream.stream).isOpen await stream.writeLp("Hello from stream!") await conn.close() - check not openState # assert lazy + check not openState # assert lazy result = true check: @@ -239,6 +241,8 @@ suite "Mplex": let transport1: TcpTransport = newTransport(TcpTransport) discard await transport1.listen(ma, connHandler) + defer: + await transport1.close() let transport2: TcpTransport = newTransport(TcpTransport) let conn = await transport2.dial(transport1.ma) @@ -275,6 +279,8 @@ suite "Mplex": let transport1: TcpTransport = newTransport(TcpTransport) discard await transport1.listen(ma, connHandler) + defer: + await transport1.close() let transport2: TcpTransport = newTransport(TcpTransport) let conn = await transport2.dial(transport1.ma) @@ -315,7 +321,9 @@ suite "Mplex": = trace "completed listener") let transport1: TcpTransport = newTransport(TcpTransport) - asyncCheck transport1.listen(ma, connHandler) + discard transport1.listen(ma, connHandler) + defer: + await transport1.close() let transport2: TcpTransport = newTransport(TcpTransport) let conn = await transport2.dial(transport1.ma) @@ -349,18 +357,18 @@ suite "Mplex": expect LPStreamEOFError: waitFor(testClosedForWrite()) - test "half closed - channel should close for read by remote": - proc testClosedForRead(): Future[void] {.async.} = - proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard - let chann = newChannel(1, newConnection(newBufferStream(writeHandler)), true) + # test "half closed - channel should close for read by remote": + # proc testClosedForRead(): Future[void] {.async.} = + # proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard + # let chann = newChannel(1, newConnection(newBufferStream(writeHandler)), true) - await chann.pushTo(cast[seq[byte]]("Hello!")) - await chann.closedByRemote() - discard await chann.read() # this should work, since there is data in the buffer - discard await chann.read() # this should throw + # await chann.pushTo(cast[seq[byte]]("Hello!")) + # await chann.closedByRemote() + # discard await chann.read() # this should work, since there is data in the buffer + # discard await chann.read() # this should throw - expect LPStreamEOFError: - waitFor(testClosedForRead()) + # expect LPStreamEOFError: + # waitFor(testClosedForRead()) test "reset - channel should fail reading": proc testResetRead(): Future[void] {.async.} = diff --git a/tests/testmultistream.nim b/tests/testmultistream.nim index 77bdc96ef..91a32e857 100644 --- a/tests/testmultistream.nim +++ b/tests/testmultistream.nim @@ -3,6 +3,7 @@ import chronos import ../libp2p/connection, ../libp2p/multistream, ../libp2p/stream/lpstream, + ../libp2p/stream/bufferstream, ../libp2p/connection, ../libp2p/multiaddress, ../libp2p/transports/transport, @@ -51,7 +52,7 @@ method write*(s: TestSelectStream, msg: seq[byte], msglen = -1) method write*(s: TestSelectStream, msg: string, msglen = -1) {.async, gcsafe.} = discard -method close(s: TestSelectStream) {.async, gcsafe.} = +method close(s: TestSelectStream) {.async, gcsafe.} = s.isClosed = true proc newTestSelectStream(): TestSelectStream = @@ -98,7 +99,7 @@ method write*(s: TestLsStream, msg: seq[byte], msglen = -1) {.async, gcsafe.} = method write*(s: TestLsStream, msg: string, msglen = -1) {.async, gcsafe.} = discard -method close(s: TestLsStream) {.async, gcsafe.} = +method close(s: TestLsStream) {.async, gcsafe.} = s.isClosed = true proc newTestLsStream(ls: LsHandler): TestLsStream {.gcsafe.} = @@ -145,7 +146,7 @@ method write*(s: TestNaStream, msg: string, msglen = -1) {.async, gcsafe.} = if s.step == 4: await s.na(msg) -method close(s: TestNaStream) {.async, gcsafe.} = +method close(s: TestNaStream) {.async, gcsafe.} = s.isClosed = true proc newTestNaStream(na: NaHandler): TestNaStream = @@ -194,7 +195,7 @@ suite "Multistream select": check strProto == "\x26/test/proto1/1.0.0\n/test/proto2/1.0.0\n" await conn.close() - proc testHandler(conn: Connection, proto: string): Future[void] + proc testHandler(conn: Connection, proto: string): Future[void] {.async, gcsafe.} = discard var protocol: LPProtocol = new LPProtocol protocol.handler = testHandler diff --git a/tests/testswitch.nim b/tests/testswitch.nim index 8f65411d1..fcbf2489f 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -47,10 +47,10 @@ proc createSwitch(ma: MultiAddress): (Switch, PeerInfo) = let muxers = [(MplexCodec, mplexProvider)].toTable() let secureManagers = [(SecioCodec, Secure(newSecio(peerInfo.privateKey)))].toTable() let switch = newSwitch(peerInfo, - transports, - identify, - muxers, - secureManagers) + transports, + identify, + muxers, + secureManagers) result = (switch, peerInfo) suite "Switch": @@ -61,16 +61,16 @@ suite "Switch": var peerInfo1, peerInfo2: PeerInfo var switch1, switch2: Switch + var awaiters: seq[Future[void]] + (switch1, peerInfo1) = createSwitch(ma1) let testProto = new TestProto testProto.init() testProto.codec = TestCodec switch1.mount(testProto) - var awaiters: seq[Future[void]] - awaiters.add(await switch1.start()) - (switch2, peerInfo2) = createSwitch(ma2) + awaiters.add(await switch1.start()) awaiters.add(await switch2.start()) let conn = await switch2.dial(switch1.peerInfo, TestCodec) await conn.writeLp("Hello!")