nim-libp2p/tests/testconnmngr.nim

392 lines
11 KiB
Nim

{.used.}
# Nim-Libp2p
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import std/[sequtils,tables]
import stew/results
import chronos
import ../libp2p/[connmanager,
stream/connection,
crypto/crypto,
muxers/muxer,
peerinfo,
errors]
import helpers
proc getMuxer(peerId: PeerId, dir: Direction = Direction.In): Muxer =
return Muxer(connection: Connection.new(peerId, dir, Opt.none(MultiAddress)))
type
TestMuxer = ref object of Muxer
peerId: PeerId
method newStream*(
m: TestMuxer,
name: string = "",
lazy: bool = false):
Future[Connection] {.async, gcsafe.} =
result = Connection.new(m.peerId, Direction.Out, Opt.none(MultiAddress))
suite "Connection Manager":
teardown:
checkTrackers()
asyncTest "add and retrieve a muxer":
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let mux = getMuxer(peerId)
connMngr.storeMuxer(mux)
check mux in connMngr
let peerMux = connMngr.selectMuxer(peerId)
check peerMux == mux
check peerMux.connection.dir == Direction.In
await connMngr.close()
asyncTest "get all connections":
let connMngr = ConnManager.new()
let peers = toSeq(0..<2).mapIt(PeerId.random.tryGet())
let muxs = toSeq(0..<2).mapIt(getMuxer(peers[it]))
for mux in muxs: connMngr.storeMuxer(mux)
let conns = connMngr.getConnections()
let connsMux = toSeq(conns.values).mapIt(it[0])
check unorderedCompare(connsMux, muxs)
await connMngr.close()
asyncTest "shouldn't allow a closed connection":
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let mux = getMuxer(peerId)
await mux.connection.close()
expect CatchableError:
connMngr.storeMuxer(mux)
await connMngr.close()
asyncTest "shouldn't allow an EOFed connection":
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let mux = getMuxer(peerId)
mux.connection.isEof = true
expect CatchableError:
connMngr.storeMuxer(mux)
await mux.close()
await connMngr.close()
asyncTest "shouldn't allow a muxer with no connection":
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let muxer = getMuxer(peerId)
let conn = muxer.connection
muxer.connection = nil
expect CatchableError:
connMngr.storeMuxer(muxer)
await conn.close()
await muxer.close()
await connMngr.close()
asyncTest "get conn with direction":
# This would work with 1 as well cause of a bug in connmanager that will get fixed soon
let connMngr = ConnManager.new(maxConnsPerPeer = 2)
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let mux1 = getMuxer(peerId, Direction.Out)
let mux2 = getMuxer(peerId)
connMngr.storeMuxer(mux1)
connMngr.storeMuxer(mux2)
check mux1 in connMngr
check mux2 in connMngr
let outMux = connMngr.selectMuxer(peerId, Direction.Out)
let inMux = connMngr.selectMuxer(peerId, Direction.In)
check outMux != inMux
check outMux == mux1
check inMux == mux2
check outMux.connection.dir == Direction.Out
check inMux.connection.dir == Direction.In
await connMngr.close()
asyncTest "get muxed stream for peer":
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let muxer = new TestMuxer
let connection = Connection.new(peerId, Direction.In, Opt.none(MultiAddress))
muxer.peerId = peerId
muxer.connection = connection
connMngr.storeMuxer(muxer)
check muxer in connMngr
let stream = await connMngr.getStream(peerId)
check not(isNil(stream))
check stream.peerId == peerId
await connMngr.close()
await connection.close()
await stream.close()
asyncTest "get stream from directed connection":
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let muxer = new TestMuxer
let connection = Connection.new(peerId, Direction.In, Opt.none(MultiAddress))
muxer.peerId = peerId
muxer.connection = connection
connMngr.storeMuxer(muxer)
check muxer in connMngr
let stream1 = await connMngr.getStream(peerId, Direction.In)
check not(isNil(stream1))
let stream2 = await connMngr.getStream(peerId, Direction.Out)
check isNil(stream2)
await connMngr.close()
await stream1.close()
await connection.close()
asyncTest "should raise on too many connections":
let connMngr = ConnManager.new(maxConnsPerPeer = 0)
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
connMngr.storeMuxer(getMuxer(peerId))
let muxs = @[getMuxer(peerId)]
expect TooManyConnectionsError:
connMngr.storeMuxer(muxs[0])
await connMngr.close()
await allFuturesThrowing(
allFutures(muxs.mapIt( it.close() )))
asyncTest "expect connection from peer":
# FIXME This should be 1 instead of 0, it will get fixed soon
let connMngr = ConnManager.new(maxConnsPerPeer = 0)
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
connMngr.storeMuxer(getMuxer(peerId))
let muxs = @[
getMuxer(peerId),
getMuxer(peerId)]
expect TooManyConnectionsError:
connMngr.storeMuxer(muxs[0])
let waitedConn1 = connMngr.expectConnection(peerId, In)
expect AlreadyExpectingConnectionError:
discard await connMngr.expectConnection(peerId, In)
await waitedConn1.cancelAndWait()
let
waitedConn2 = connMngr.expectConnection(peerId, In)
waitedConn3 = connMngr.expectConnection(PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(), In)
conn = getMuxer(peerId)
connMngr.storeMuxer(conn)
check (await waitedConn2) == conn
expect TooManyConnectionsError:
connMngr.storeMuxer(muxs[1])
await connMngr.close()
checkExpiring: waitedConn3.cancelled()
await allFuturesThrowing(
allFutures(muxs.mapIt( it.close() )))
asyncTest "cleanup on connection close":
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let muxer = getMuxer(peerId)
connMngr.storeMuxer(muxer)
check muxer in connMngr
await muxer.close()
checkExpiring: muxer notin connMngr
await connMngr.close()
asyncTest "drop connections for peer":
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
for i in 0..<2:
let dir = if i mod 2 == 0:
Direction.In else:
Direction.Out
let muxer = getMuxer(peerId, dir)
connMngr.storeMuxer(muxer)
check muxer in connMngr
check not(isNil(connMngr.selectMuxer(peerId, dir)))
check peerId in connMngr
await connMngr.dropPeer(peerId)
checkExpiring: peerId notin connMngr
check isNil(connMngr.selectMuxer(peerId, Direction.In))
check isNil(connMngr.selectMuxer(peerId, Direction.Out))
await connMngr.close()
asyncTest "track total incoming connection limits":
let connMngr = ConnManager.new(maxConnections = 3)
for i in 0..<3:
check await connMngr.getIncomingSlot().withTimeout(10.millis)
# should timeout adding a connection over the limit
check not(await connMngr.getIncomingSlot().withTimeout(10.millis))
await connMngr.close()
asyncTest "track total outgoing connection limits":
let connMngr = ConnManager.new(maxConnections = 3)
for i in 0..<3:
discard connMngr.getOutgoingSlot()
# should throw adding a connection over the limit
expect TooManyConnectionsError:
discard connMngr.getOutgoingSlot()
await connMngr.close()
asyncTest "track both incoming and outgoing total connections limits - fail on incoming":
let connMngr = ConnManager.new(maxConnections = 3)
for i in 0..<3:
discard connMngr.getOutgoingSlot()
# should timeout adding a connection over the limit
check not(await connMngr.getIncomingSlot().withTimeout(10.millis))
await connMngr.close()
asyncTest "track both incoming and outgoing total connections limits - fail on outgoing":
let connMngr = ConnManager.new(maxConnections = 3)
for i in 0..<3:
check await connMngr.getIncomingSlot().withTimeout(10.millis)
# should throw adding a connection over the limit
expect TooManyConnectionsError:
discard connMngr.getOutgoingSlot()
await connMngr.close()
asyncTest "track max incoming connection limits":
let connMngr = ConnManager.new(maxIn = 3)
for i in 0..<3:
check await connMngr.getIncomingSlot().withTimeout(10.millis)
check not(await connMngr.getIncomingSlot().withTimeout(10.millis))
await connMngr.close()
asyncTest "track max outgoing connection limits":
let connMngr = ConnManager.new(maxOut = 3)
for i in 0..<3:
discard connMngr.getOutgoingSlot()
# should throw adding a connection over the limit
expect TooManyConnectionsError:
discard connMngr.getOutgoingSlot()
await connMngr.close()
asyncTest "track incoming max connections limits - fail on incoming":
let connMngr = ConnManager.new(maxOut = 3)
for i in 0..<3:
discard connMngr.getOutgoingSlot()
# should timeout adding a connection over the limit
check not(await connMngr.getIncomingSlot().withTimeout(10.millis))
await connMngr.close()
asyncTest "track incoming max connections limits - fail on outgoing":
let connMngr = ConnManager.new(maxIn = 3)
for i in 0..<3:
check await connMngr.getIncomingSlot().withTimeout(10.millis)
# should throw adding a connection over the limit
expect TooManyConnectionsError:
discard connMngr.getOutgoingSlot()
await connMngr.close()
asyncTest "allow force dial":
let connMngr = ConnManager.new(maxConnections = 2)
for i in 0..<3:
discard connMngr.getOutgoingSlot(true)
# should throw adding a connection over the limit
expect TooManyConnectionsError:
discard connMngr.getOutgoingSlot(false)
await connMngr.close()
asyncTest "release slot on connection end":
let connMngr = ConnManager.new(maxConnections = 3)
var muxs: seq[Muxer]
for i in 0..<3:
let slot = connMngr.getOutgoingSlot()
let muxer =
getMuxer(
PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(),
Direction.In)
slot.trackMuxer(muxer)
muxs.add(muxer)
# should be full now
let incomingSlot = connMngr.getIncomingSlot()
check (await incomingSlot.withTimeout(10.millis)) == false
await allFuturesThrowing(
allFutures(muxs.mapIt( it.close() )))
check await incomingSlot.withTimeout(10.millis)
await connMngr.close()