import std/[sequtils,tables] import stew/results import chronos import ../libp2p/[connmanager, stream/connection, crypto/crypto, muxers/muxer, peerinfo, errors] import helpers proc getMuxer(peerId: PeerId, dir: Direction = Direction.In): Muxer = return Muxer(connection: Connection.new(peerId, dir, Opt.none(MultiAddress))) type TestMuxer = ref object of Muxer peerId: PeerId method newStream*( m: TestMuxer, name: string = "", lazy: bool = false): Future[Connection] {.async, gcsafe.} = result = Connection.new(m.peerId, Direction.Out, Opt.none(MultiAddress)) suite "Connection Manager": teardown: checkTrackers() asyncTest "add and retrieve a muxer": let connMngr = ConnManager.new() let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() let mux = getMuxer(peerId) connMngr.storeMuxer(mux) check mux in connMngr let peerMux = connMngr.selectMuxer(peerId) check peerMux == mux check peerMux.connection.dir == Direction.In await connMngr.close() asyncTest "get all connections": let connMngr = ConnManager.new() let peers = toSeq(0..<2).mapIt(PeerId.random.tryGet()) let muxs = toSeq(0..<2).mapIt(getMuxer(peers[it])) for mux in muxs: connMngr.storeMuxer(mux) let conns = connMngr.getConnections() let connsMux = toSeq(conns.values).mapIt(it[0]) check unorderedCompare(connsMux, muxs) await connMngr.close() asyncTest "shouldn't allow a closed connection": let connMngr = ConnManager.new() let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() let mux = getMuxer(peerId) await mux.connection.close() expect CatchableError: connMngr.storeMuxer(mux) await connMngr.close() asyncTest "shouldn't allow an EOFed connection": let connMngr = ConnManager.new() let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() let mux = getMuxer(peerId) mux.connection.isEof = true expect CatchableError: connMngr.storeMuxer(mux) await mux.close() await connMngr.close() asyncTest "shouldn't allow a muxer with no connection": let connMngr = ConnManager.new() let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() let muxer = getMuxer(peerId) let conn = muxer.connection muxer.connection = nil expect CatchableError: connMngr.storeMuxer(muxer) await conn.close() await muxer.close() await connMngr.close() asyncTest "get conn with direction": # This would work with 1 as well cause of a bug in connmanager that will get fixed soon let connMngr = ConnManager.new(maxConnsPerPeer = 2) let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() let mux1 = getMuxer(peerId, Direction.Out) let mux2 = getMuxer(peerId) connMngr.storeMuxer(mux1) connMngr.storeMuxer(mux2) check mux1 in connMngr check mux2 in connMngr let outMux = connMngr.selectMuxer(peerId, Direction.Out) let inMux = connMngr.selectMuxer(peerId, Direction.In) check outMux != inMux check outMux == mux1 check inMux == mux2 check outMux.connection.dir == Direction.Out check inMux.connection.dir == Direction.In await connMngr.close() asyncTest "get muxed stream for peer": let connMngr = ConnManager.new() let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() let muxer = new TestMuxer let connection = Connection.new(peerId, Direction.In, Opt.none(MultiAddress)) muxer.peerId = peerId muxer.connection = connection connMngr.storeMuxer(muxer) check muxer in connMngr let stream = await connMngr.getStream(peerId) check not(isNil(stream)) check stream.peerId == peerId await connMngr.close() await connection.close() await stream.close() asyncTest "get stream from directed connection": let connMngr = ConnManager.new() let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() let muxer = new TestMuxer let connection = Connection.new(peerId, Direction.In, Opt.none(MultiAddress)) muxer.peerId = peerId muxer.connection = connection connMngr.storeMuxer(muxer) check muxer in connMngr let stream1 = await connMngr.getStream(peerId, Direction.In) check not(isNil(stream1)) let stream2 = await connMngr.getStream(peerId, Direction.Out) check isNil(stream2) await connMngr.close() await stream1.close() await connection.close() asyncTest "should raise on too many connections": let connMngr = ConnManager.new(maxConnsPerPeer = 0) let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() connMngr.storeMuxer(getMuxer(peerId)) let muxs = @[getMuxer(peerId)] expect TooManyConnectionsError: connMngr.storeMuxer(muxs[0]) await connMngr.close() await allFuturesThrowing( allFutures(muxs.mapIt( it.close() ))) asyncTest "expect connection from peer": # FIXME This should be 1 instead of 0, it will get fixed soon let connMngr = ConnManager.new(maxConnsPerPeer = 0) let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() connMngr.storeMuxer(getMuxer(peerId)) let muxs = @[ getMuxer(peerId), getMuxer(peerId)] expect TooManyConnectionsError: connMngr.storeMuxer(muxs[0]) let waitedConn1 = connMngr.expectConnection(peerId, In) expect AlreadyExpectingConnectionError: discard await connMngr.expectConnection(peerId, In) await waitedConn1.cancelAndWait() let waitedConn2 = connMngr.expectConnection(peerId, In) waitedConn3 = connMngr.expectConnection(PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(), In) conn = getMuxer(peerId) connMngr.storeMuxer(conn) check (await waitedConn2) == conn expect TooManyConnectionsError: connMngr.storeMuxer(muxs[1]) await connMngr.close() checkExpiring: waitedConn3.cancelled() await allFuturesThrowing( allFutures(muxs.mapIt( it.close() ))) asyncTest "cleanup on connection close": let connMngr = ConnManager.new() let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() let muxer = getMuxer(peerId) connMngr.storeMuxer(muxer) check muxer in connMngr await muxer.close() checkExpiring: muxer notin connMngr await connMngr.close() asyncTest "drop connections for peer": let connMngr = ConnManager.new() let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() for i in 0..<2: let dir = if i mod 2 == 0: Direction.In else: Direction.Out let muxer = getMuxer(peerId, dir) connMngr.storeMuxer(muxer) check muxer in connMngr check not(isNil(connMngr.selectMuxer(peerId, dir))) check peerId in connMngr await connMngr.dropPeer(peerId) checkExpiring: peerId notin connMngr check isNil(connMngr.selectMuxer(peerId, Direction.In)) check isNil(connMngr.selectMuxer(peerId, Direction.Out)) await connMngr.close() asyncTest "track total incoming connection limits": let connMngr = ConnManager.new(maxConnections = 3) for i in 0..<3: check await connMngr.getIncomingSlot().withTimeout(10.millis) # should timeout adding a connection over the limit check not(await connMngr.getIncomingSlot().withTimeout(10.millis)) await connMngr.close() asyncTest "track total outgoing connection limits": let connMngr = ConnManager.new(maxConnections = 3) for i in 0..<3: discard connMngr.getOutgoingSlot() # should throw adding a connection over the limit expect TooManyConnectionsError: discard connMngr.getOutgoingSlot() await connMngr.close() asyncTest "track both incoming and outgoing total connections limits - fail on incoming": let connMngr = ConnManager.new(maxConnections = 3) for i in 0..<3: discard connMngr.getOutgoingSlot() # should timeout adding a connection over the limit check not(await connMngr.getIncomingSlot().withTimeout(10.millis)) await connMngr.close() asyncTest "track both incoming and outgoing total connections limits - fail on outgoing": let connMngr = ConnManager.new(maxConnections = 3) for i in 0..<3: check await connMngr.getIncomingSlot().withTimeout(10.millis) # should throw adding a connection over the limit expect TooManyConnectionsError: discard connMngr.getOutgoingSlot() await connMngr.close() asyncTest "track max incoming connection limits": let connMngr = ConnManager.new(maxIn = 3) for i in 0..<3: check await connMngr.getIncomingSlot().withTimeout(10.millis) check not(await connMngr.getIncomingSlot().withTimeout(10.millis)) await connMngr.close() asyncTest "track max outgoing connection limits": let connMngr = ConnManager.new(maxOut = 3) for i in 0..<3: discard connMngr.getOutgoingSlot() # should throw adding a connection over the limit expect TooManyConnectionsError: discard connMngr.getOutgoingSlot() await connMngr.close() asyncTest "track incoming max connections limits - fail on incoming": let connMngr = ConnManager.new(maxOut = 3) for i in 0..<3: discard connMngr.getOutgoingSlot() # should timeout adding a connection over the limit check not(await connMngr.getIncomingSlot().withTimeout(10.millis)) await connMngr.close() asyncTest "track incoming max connections limits - fail on outgoing": let connMngr = ConnManager.new(maxIn = 3) for i in 0..<3: check await connMngr.getIncomingSlot().withTimeout(10.millis) # should throw adding a connection over the limit expect TooManyConnectionsError: discard connMngr.getOutgoingSlot() await connMngr.close() asyncTest "allow force dial": let connMngr = ConnManager.new(maxConnections = 2) for i in 0..<3: discard connMngr.getOutgoingSlot(true) # should throw adding a connection over the limit expect TooManyConnectionsError: discard connMngr.getOutgoingSlot(false) await connMngr.close() asyncTest "release slot on connection end": let connMngr = ConnManager.new(maxConnections = 3) var muxs: seq[Muxer] for i in 0..<3: let slot = connMngr.getOutgoingSlot() let muxer = getMuxer( PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(), Direction.In) slot.trackMuxer(muxer) muxs.add(muxer) # should be full now let incomingSlot = connMngr.getIncomingSlot() check (await incomingSlot.withTimeout(10.millis)) == false await allFuturesThrowing( allFutures(muxs.mapIt( it.close() ))) check await incomingSlot.withTimeout(10.millis) await connMngr.close()