diff --git a/tests/testswitch.nim b/tests/testswitch.nim index 31ca5ac..55ccaf6 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -2,11 +2,12 @@ import unittest, tables import chronos -import chronicles +import stew/byteutils import nimcrypto/sysrand import ../libp2p/[errors, switch, multistream, + standard_setup, stream/bufferstream, protocols/identify, connection, @@ -30,25 +31,6 @@ const type TestProto = ref object of LPProtocol -proc createSwitch(ma: MultiAddress): (Switch, PeerInfo) = - var peerInfo: PeerInfo = PeerInfo.init(PrivateKey.random(ECDSA).tryGet()) - peerInfo.addrs.add(ma) - let identify = newIdentify(peerInfo) - - proc createMplex(conn: Connection): Muxer = - result = newMplex(conn) - - let mplexProvider = newMuxerProvider(createMplex, MplexCodec) - let transports = @[Transport(TcpTransport.init())] - let muxers = [(MplexCodec, mplexProvider)].toTable() - let secureManagers = [Secure(newSecio(peerInfo.privateKey))] - let switch = newSwitch(peerInfo, - transports, - identify, - muxers, - secureManagers) - result = (switch, peerInfo) - suite "Switch": teardown: for tracker in testTrackers(): @@ -57,42 +39,53 @@ suite "Switch": test "e2e use switch dial proto string": proc testSwitch() {.async, gcsafe.} = - let ma1: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() - let ma2: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() - - var peerInfo1, peerInfo2: PeerInfo - var switch1, switch2: Switch - var awaiters: seq[Future[void]] - - (switch1, peerInfo1) = createSwitch(ma1) - let done = newFuture[void]() - proc handle(conn: Connection, proto: string) {.async, gcsafe.} = - let msg = cast[string](await conn.readLp(1024)) - check "Hello!" == msg - await conn.writeLp("Hello!") - await conn.close() - done.complete() + try: + let msg = string.fromBytes(await conn.readLp(1024)) + check "Hello!" == msg + await conn.writeLp("Hello!") + finally: + await conn.close() + done.complete() let testProto = new TestProto testProto.codec = TestCodec testProto.handler = handle + + let switch1 = newStandardSwitch() switch1.mount(testProto) - (switch2, peerInfo2) = createSwitch(ma2) + let switch2 = newStandardSwitch() + var awaiters: seq[Future[void]] awaiters.add(await switch1.start()) awaiters.add(await switch2.start()) let conn = await switch2.dial(switch1.peerInfo, TestCodec) - await conn.writeLp("Hello!") - let msg = cast[string](await conn.readLp(1024)) + let msg = string.fromBytes(await conn.readLp(1024)) check "Hello!" == msg + await conn.close() + + await sleepAsync(2.seconds) # wait a little for cleanup to happen + var bufferTracker = getTracker(BufferStreamTrackerName) + # echo bufferTracker.dump() + + # plus 4 for the pubsub streams + check (BufferStreamTracker(bufferTracker).opened == + (BufferStreamTracker(bufferTracker).closed + 4.uint64)) + + var connTracker = getTracker(ConnectionTrackerName) + # echo connTracker.dump() + + # plus 8 is for the secured connection and the socket + # and the pubsub streams that won't clean up until + # `disconnect()` or `stop()` + check (ConnectionTracker(connTracker).opened == + (ConnectionTracker(connTracker).closed + 8.uint64)) await all( - done.wait(5.seconds) #[if OK won't happen!!]#, - conn.close(), + done.wait(5.seconds), #[if OK won't happen!!]# switch1.stop(), switch2.stop(), ) @@ -102,37 +95,37 @@ suite "Switch": waitFor(testSwitch()) - test "e2e use switch no proto string": + test "e2e use connect then dial": proc testSwitch(): Future[bool] {.async, gcsafe.} = - let ma1: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") - let ma2: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") - - var peerInfo1, peerInfo2: PeerInfo - var switch1, switch2: Switch var awaiters: seq[Future[void]] - (switch1, peerInfo1) = createSwitch(ma1) - proc handle(conn: Connection, proto: string) {.async, gcsafe.} = - let msg = cast[string](await conn.readLp(1024)) - check "Hello!" == msg - await conn.writeLp("Hello!") - await conn.close() + try: + let msg = string.fromBytes(await conn.readLp(1024)) + check "Hello!" == msg + finally: + await conn.writeLp("Hello!") + await conn.close() let testProto = new TestProto testProto.codec = TestCodec testProto.handler = handle + + let switch1 = newStandardSwitch() switch1.mount(testProto) - (switch2, peerInfo2) = createSwitch(ma2) + let switch2 = newStandardSwitch() awaiters.add(await switch1.start()) awaiters.add(await switch2.start()) + await switch2.connect(switch1.peerInfo) + check switch1.peerInfo.id in switch2.connections + let conn = await switch2.dial(switch1.peerInfo, TestCodec) try: await conn.writeLp("Hello!") - let msg = cast[string](await conn.readLp(1024)) + let msg = string.fromBytes(await conn.readLp(1024)) check "Hello!" == msg result = true except LPStreamError: @@ -148,6 +141,36 @@ suite "Switch": check: waitFor(testSwitch()) == true + test "e2e should not leak on peer disconnect": + proc testSwitch() {.async, gcsafe.} = + var awaiters: seq[Future[void]] + + let switch1 = newStandardSwitch() + let switch2 = newStandardSwitch() + awaiters.add(await switch1.start()) + awaiters.add(await switch2.start()) + await switch2.connect(switch1.peerInfo) + + await sleepAsync(100.millis) + await switch2.disconnect(switch1.peerInfo) + + await sleepAsync(2.seconds) + var bufferTracker = getTracker(BufferStreamTrackerName) + # echo bufferTracker.dump() + check bufferTracker.isLeaked() == false + + var connTracker = getTracker(ConnectionTrackerName) + # echo connTracker.dump() + check connTracker.isLeaked() == false + + await all( + switch1.stop(), + switch2.stop() + ) + await all(awaiters) + + waitFor(testSwitch()) + # test "e2e: handle read + secio fragmented": # proc testListenerDialer(): Future[bool] {.async.} = # let