Make observedAddr optional (#772)

Co-authored-by: Tanguy <tanguy@status.im>
This commit is contained in:
diegomrsantos 2022-09-22 21:55:59 +02:00 committed by GitHub
parent 5e7e009445
commit a56c3bc296
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 83 additions and 52 deletions

View File

@ -13,10 +13,13 @@ else:
{.push raises: [].} {.push raises: [].}
import chronos import chronos
import stew/results
import peerid, import peerid,
stream/connection, stream/connection,
transports/transport transports/transport
export results
type type
Dial* = ref object of RootObj Dial* = ref object of RootObj
@ -69,5 +72,5 @@ method addTransport*(
method tryDial*( method tryDial*(
self: Dial, self: Dial,
peerId: PeerId, peerId: PeerId,
addrs: seq[MultiAddress]): Future[MultiAddress] {.async, base.} = addrs: seq[MultiAddress]): Future[Opt[MultiAddress]] {.async, base.} =
doAssert(false, "Not implemented!") doAssert(false, "Not implemented!")

View File

@ -9,6 +9,7 @@
import std/[sugar, tables] import std/[sugar, tables]
import stew/results
import pkg/[chronos, import pkg/[chronos,
chronicles, chronicles,
metrics] metrics]
@ -24,7 +25,7 @@ import dial,
upgrademngrs/upgrade, upgrademngrs/upgrade,
errors errors
export dial, errors export dial, errors, results
logScope: logScope:
topics = "libp2p dialer" topics = "libp2p dialer"
@ -189,7 +190,7 @@ proc negotiateStream(
method tryDial*( method tryDial*(
self: Dialer, self: Dialer,
peerId: PeerId, peerId: PeerId,
addrs: seq[MultiAddress]): Future[MultiAddress] {.async.} = addrs: seq[MultiAddress]): Future[Opt[MultiAddress]] {.async.} =
## Create a protocol stream in order to check ## Create a protocol stream in order to check
## if a connection is possible. ## if a connection is possible.
## Doesn't use the Connection Manager to save it. ## Doesn't use the Connection Manager to save it.

View File

@ -13,6 +13,7 @@ else:
{.push raises: [].} {.push raises: [].}
import std/[options, sets, sequtils] import std/[options, sets, sequtils]
import stew/results
import chronos, chronicles, stew/objects import chronos, chronicles, stew/objects
import ../protocol, import ../protocol,
../../switch, ../../switch,
@ -226,7 +227,10 @@ proc tryDial(a: Autonat, conn: Connection, addrs: seq[MultiAddress]) {.async.} =
try: try:
await a.sem.acquire() await a.sem.acquire()
let ma = await a.switch.dialer.tryDial(conn.peerId, addrs) let ma = await a.switch.dialer.tryDial(conn.peerId, addrs)
await conn.sendResponseOk(ma) if ma.isSome:
await conn.sendResponseOk(ma.get())
else:
await conn.sendResponseError(DialError, "Missing observed address")
except CancelledError as exc: except CancelledError as exc:
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:
@ -241,15 +245,19 @@ proc handleDial(a: Autonat, conn: Connection, msg: AutonatMsg): Future[void] =
if peerInfo.id.isSome() and peerInfo.id.get() != conn.peerId: if peerInfo.id.isSome() and peerInfo.id.get() != conn.peerId:
return conn.sendResponseError(BadRequest, "PeerId mismatch") return conn.sendResponseError(BadRequest, "PeerId mismatch")
var isRelayed = conn.observedAddr.contains(multiCodec("p2p-circuit")) if conn.observedAddr.isNone:
return conn.sendResponseError(BadRequest, "Missing observed address")
let observedAddr = conn.observedAddr.get()
var isRelayed = observedAddr.contains(multiCodec("p2p-circuit"))
if isRelayed.isErr() or isRelayed.get(): if isRelayed.isErr() or isRelayed.get():
return conn.sendResponseError(DialRefused, "Refused to dial a relayed observed address") return conn.sendResponseError(DialRefused, "Refused to dial a relayed observed address")
let hostIp = conn.observedAddr[0] let hostIp = observedAddr[0]
if hostIp.isErr() or not IP.match(hostIp.get()): if hostIp.isErr() or not IP.match(hostIp.get()):
trace "wrong observed address", address=conn.observedAddr trace "wrong observed address", address=observedAddr
return conn.sendResponseError(InternalError, "Expected an IP address") return conn.sendResponseError(InternalError, "Expected an IP address")
var addrs = initHashSet[MultiAddress]() var addrs = initHashSet[MultiAddress]()
addrs.incl(conn.observedAddr) addrs.incl(observedAddr)
for ma in peerInfo.addrs: for ma in peerInfo.addrs:
isRelayed = ma.contains(multiCodec("p2p-circuit")) isRelayed = ma.contains(multiCodec("p2p-circuit"))
if isRelayed.isErr() or isRelayed.get(): if isRelayed.isErr() or isRelayed.get():

View File

@ -16,6 +16,7 @@ else:
{.push raises: [].} {.push raises: [].}
import std/[sequtils, options, strutils, sugar] import std/[sequtils, options, strutils, sugar]
import stew/results
import chronos, chronicles import chronos, chronicles
import ../protobuf/minprotobuf, import ../protobuf/minprotobuf,
../peerinfo, ../peerinfo,
@ -80,7 +81,7 @@ chronicles.expandIt(IdentifyInfo):
if iinfo.signedPeerRecord.isSome(): "Some" if iinfo.signedPeerRecord.isSome(): "Some"
else: "None" else: "None"
proc encodeMsg(peerInfo: PeerInfo, observedAddr: MultiAddress, sendSpr: bool): ProtoBuffer proc encodeMsg(peerInfo: PeerInfo, observedAddr: Opt[MultiAddress], sendSpr: bool): ProtoBuffer
{.raises: [Defect].} = {.raises: [Defect].} =
result = initProtoBuffer() result = initProtoBuffer()
@ -91,7 +92,8 @@ proc encodeMsg(peerInfo: PeerInfo, observedAddr: MultiAddress, sendSpr: bool): P
result.write(2, ma.data.buffer) result.write(2, ma.data.buffer)
for proto in peerInfo.protocols: for proto in peerInfo.protocols:
result.write(3, proto) result.write(3, proto)
result.write(4, observedAddr.data.buffer) if observedAddr.isSome:
result.write(4, observedAddr.get().data.buffer)
let protoVersion = ProtoVersion let protoVersion = ProtoVersion
result.write(5, protoVersion) result.write(5, protoVersion)
let agentVersion = if peerInfo.agentVersion.len <= 0: let agentVersion = if peerInfo.agentVersion.len <= 0:

View File

@ -12,7 +12,8 @@ when (NimMajor, NimMinor) < (1, 4):
else: else:
{.push raises: [].} {.push raises: [].}
import std/[sequtils, strutils, tables, hashes] import std/[sequtils, strutils, tables, hashes, options]
import stew/results
import chronos, chronicles, nimcrypto/sha2, metrics import chronos, chronicles, nimcrypto/sha2, metrics
import rpc/[messages, message, protobuf], import rpc/[messages, message, protobuf],
../../peerid, ../../peerid,
@ -174,7 +175,7 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} =
trace "Get new send connection", p, newConn trace "Get new send connection", p, newConn
p.sendConn = newConn p.sendConn = newConn
p.address = some(p.sendConn.observedAddr) p.address = if p.sendConn.observedAddr.isSome: some(p.sendConn.observedAddr.get) else: none(MultiAddress)
if p.onEvent != nil: if p.onEvent != nil:
p.onEvent(p, PubSubPeerEvent(kind: PubSubPeerEventKind.Connected)) p.onEvent(p, PubSubPeerEvent(kind: PubSubPeerEventKind.Connected))

View File

@ -13,6 +13,7 @@ else:
{.push raises: [].} {.push raises: [].}
import std/[strformat] import std/[strformat]
import stew/results
import chronos, chronicles import chronos, chronicles
import ../protocol, import ../protocol,
../../stream/streamseq, ../../stream/streamseq,
@ -21,7 +22,7 @@ import ../protocol,
../../peerinfo, ../../peerinfo,
../../errors ../../errors
export protocol export protocol, results
logScope: logScope:
topics = "libp2p secure" topics = "libp2p secure"
@ -48,7 +49,7 @@ chronicles.formatIt(SecureConn): shortLog(it)
proc new*(T: type SecureConn, proc new*(T: type SecureConn,
conn: Connection, conn: Connection,
peerId: PeerId, peerId: PeerId,
observedAddr: MultiAddress, observedAddr: Opt[MultiAddress],
timeout: Duration = DefaultConnectionTimeout): T = timeout: Duration = DefaultConnectionTimeout): T =
result = T(stream: conn, result = T(stream: conn,
peerId: peerId, peerId: peerId,

View File

@ -13,10 +13,13 @@ else:
{.push raises: [].} {.push raises: [].}
import std/[oids, strformat] import std/[oids, strformat]
import stew/results
import chronos, chronicles, metrics import chronos, chronicles, metrics
import connection import connection
import ../utility import ../utility
export results
logScope: logScope:
topics = "libp2p chronosstream" topics = "libp2p chronosstream"
@ -60,7 +63,7 @@ proc init*(C: type ChronosStream,
client: StreamTransport, client: StreamTransport,
dir: Direction, dir: Direction,
timeout = DefaultChronosStreamTimeout, timeout = DefaultChronosStreamTimeout,
observedAddr: MultiAddress = MultiAddress()): ChronosStream = observedAddr: Opt[MultiAddress]): ChronosStream =
result = C(client: client, result = C(client: client,
timeout: timeout, timeout: timeout,
dir: dir, dir: dir,

View File

@ -13,13 +13,14 @@ else:
{.push raises: [].} {.push raises: [].}
import std/[hashes, oids, strformat] import std/[hashes, oids, strformat]
import stew/results
import chronicles, chronos, metrics import chronicles, chronos, metrics
import lpstream, import lpstream,
../multiaddress, ../multiaddress,
../peerinfo, ../peerinfo,
../errors ../errors
export lpstream, peerinfo, errors export lpstream, peerinfo, errors, results
logScope: logScope:
topics = "libp2p connection" topics = "libp2p connection"
@ -37,7 +38,7 @@ type
timerTaskFut: Future[void] # the current timer instance timerTaskFut: Future[void] # the current timer instance
timeoutHandler*: TimeoutHandler # timeout handler timeoutHandler*: TimeoutHandler # timeout handler
peerId*: PeerId peerId*: PeerId
observedAddr*: MultiAddress observedAddr*: Opt[MultiAddress]
upgraded*: Future[void] upgraded*: Future[void]
protocol*: string # protocol used by the connection, used as tag for metrics protocol*: string # protocol used by the connection, used as tag for metrics
transportDir*: Direction # The bottom level transport (generally the socket) direction transportDir*: Direction # The bottom level transport (generally the socket) direction
@ -160,9 +161,9 @@ method getWrapped*(s: Connection): Connection {.base.} =
proc new*(C: type Connection, proc new*(C: type Connection,
peerId: PeerId, peerId: PeerId,
dir: Direction, dir: Direction,
observedAddr: Opt[MultiAddress],
timeout: Duration = DefaultConnectionTimeout, timeout: Duration = DefaultConnectionTimeout,
timeoutHandler: TimeoutHandler = nil, timeoutHandler: TimeoutHandler = nil): Connection =
observedAddr: MultiAddress = MultiAddress()): Connection =
result = C(peerId: peerId, result = C(peerId: peerId,
dir: dir, dir: dir,
timeout: timeout, timeout: timeout,

View File

@ -15,6 +15,7 @@ else:
{.push raises: [].} {.push raises: [].}
import std/[oids, sequtils] import std/[oids, sequtils]
import stew/results
import chronos, chronicles import chronos, chronicles
import transport, import transport,
../errors, ../errors,
@ -31,7 +32,7 @@ import transport,
logScope: logScope:
topics = "libp2p tcptransport" topics = "libp2p tcptransport"
export transport export transport, results
const const
TcpTransportTrackerName* = "libp2p.tcptransport" TcpTransportTrackerName* = "libp2p.tcptransport"
@ -71,18 +72,20 @@ proc setupTcpTransportTracker(): TcpTransportTracker =
result.isLeaked = leakTransport result.isLeaked = leakTransport
addTracker(TcpTransportTrackerName, result) addTracker(TcpTransportTrackerName, result)
proc connHandler*(self: TcpTransport, proc getObservedAddr(client: StreamTransport): Future[MultiAddress] {.async.} =
client: StreamTransport,
dir: Direction): Future[Connection] {.async.} =
var observedAddr: MultiAddress = MultiAddress()
try: try:
observedAddr = MultiAddress.init(client.remoteAddress).tryGet() return MultiAddress.init(client.remoteAddress).tryGet()
except CatchableError as exc: except CatchableError as exc:
trace "Failed to create observedAddr", exc = exc.msg trace "Failed to create observedAddr", exc = exc.msg
if not(isNil(client) and client.closed): if not(isNil(client) and client.closed):
await client.closeWait() await client.closeWait()
raise exc raise exc
proc connHandler*(self: TcpTransport,
client: StreamTransport,
observedAddr: Opt[MultiAddress],
dir: Direction): Future[Connection] {.async.} =
trace "Handling tcp connection", address = $observedAddr, trace "Handling tcp connection", address = $observedAddr,
dir = $dir, dir = $dir,
clients = self.clients[Direction.In].len + clients = self.clients[Direction.In].len +
@ -222,7 +225,8 @@ method accept*(self: TcpTransport): Future[Connection] {.async, gcsafe.} =
self.acceptFuts[index] = self.servers[index].accept() self.acceptFuts[index] = self.servers[index].accept()
let transp = await finished let transp = await finished
return await self.connHandler(transp, Direction.In) let observedAddr = await getObservedAddr(transp)
return await self.connHandler(transp, Opt.some(observedAddr), Direction.In)
except TransportOsError as exc: except TransportOsError as exc:
# TODO: it doesn't sound like all OS errors # TODO: it doesn't sound like all OS errors
# can be ignored, we should re-raise those # can be ignored, we should re-raise those
@ -250,7 +254,8 @@ method dial*(
let transp = await connect(address) let transp = await connect(address)
try: try:
return await self.connHandler(transp, Direction.Out) let observedAddr = await getObservedAddr(transp)
return await self.connHandler(transp, Opt.some(observedAddr), Direction.Out)
except CatchableError as err: except CatchableError as err:
await transp.closeWait() await transp.closeWait()
raise err raise err

View File

@ -15,6 +15,7 @@ else:
{.push raises: [].} {.push raises: [].}
import std/[sequtils] import std/[sequtils]
import stew/results
import chronos, chronicles import chronos, chronicles
import transport, import transport,
../errors, ../errors,
@ -31,7 +32,7 @@ import transport,
logScope: logScope:
topics = "libp2p wstransport" topics = "libp2p wstransport"
export transport, websock export transport, websock, results
const const
WsTransportTrackerName* = "libp2p.wstransport" WsTransportTrackerName* = "libp2p.wstransport"
@ -45,8 +46,8 @@ type
proc new*(T: type WsStream, proc new*(T: type WsStream,
session: WSSession, session: WSSession,
dir: Direction, dir: Direction,
timeout = 10.minutes, observedAddr: Opt[MultiAddress],
observedAddr: MultiAddress = MultiAddress()): T = timeout = 10.minutes): T =
let stream = T( let stream = T(
session: session, session: session,
@ -221,8 +222,7 @@ proc connHandler(self: WsTransport,
await stream.close() await stream.close()
raise exc raise exc
let conn = WsStream.new(stream, dir) let conn = WsStream.new(stream, dir, Opt.some(observedAddr))
conn.observedAddr = observedAddr
self.connections[dir].add(conn) self.connections[dir].add(conn)
proc onClose() {.async.} = proc onClose() {.async.} =

View File

@ -1,7 +1,7 @@
{.used.} {.used.}
import sequtils import sequtils
import chronos, stew/byteutils import chronos, stew/[byteutils, results]
import ../libp2p/[stream/connection, import ../libp2p/[stream/connection,
transports/transport, transports/transport,
upgrademngrs/upgrade, upgrademngrs/upgrade,
@ -35,14 +35,16 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
proc acceptHandler() {.async, gcsafe.} = proc acceptHandler() {.async, gcsafe.} =
let conn = await transport1.accept() let conn = await transport1.accept()
check transport1.handles(conn.observedAddr) if conn.observedAddr.isSome():
check transport1.handles(conn.observedAddr.get())
await conn.close() await conn.close()
let handlerWait = acceptHandler() let handlerWait = acceptHandler()
let conn = await transport2.dial(transport1.addrs[0]) let conn = await transport2.dial(transport1.addrs[0])
check transport2.handles(conn.observedAddr) if conn.observedAddr.isSome():
check transport2.handles(conn.observedAddr.get())
await conn.close() #for some protocols, closing requires actively reading, so we must close here await conn.close() #for some protocols, closing requires actively reading, so we must close here

View File

@ -1,4 +1,5 @@
import sequtils import sequtils
import stew/results
import chronos import chronos
import ../libp2p/[connmanager, import ../libp2p/[connmanager,
stream/connection, stream/connection,
@ -9,6 +10,9 @@ import ../libp2p/[connmanager,
import helpers import helpers
proc getConnection(peerId: PeerId, dir: Direction = Direction.In): Connection =
return Connection.new(peerId, dir, Opt.none(MultiAddress))
type type
TestMuxer = ref object of Muxer TestMuxer = ref object of Muxer
peerId: PeerId peerId: PeerId
@ -18,7 +22,7 @@ method newStream*(
name: string = "", name: string = "",
lazy: bool = false): lazy: bool = false):
Future[Connection] {.async, gcsafe.} = Future[Connection] {.async, gcsafe.} =
result = Connection.new(m.peerId, Direction.Out) result = getConnection(m.peerId, Direction.Out)
suite "Connection Manager": suite "Connection Manager":
teardown: teardown:
@ -27,7 +31,7 @@ suite "Connection Manager":
asyncTest "add and retrieve a connection": asyncTest "add and retrieve a connection":
let connMngr = ConnManager.new() let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn = Connection.new(peerId, Direction.In) let conn = getConnection(peerId)
connMngr.storeConn(conn) connMngr.storeConn(conn)
check conn in connMngr check conn in connMngr
@ -41,7 +45,7 @@ suite "Connection Manager":
asyncTest "shouldn't allow a closed connection": asyncTest "shouldn't allow a closed connection":
let connMngr = ConnManager.new() let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn = Connection.new(peerId, Direction.In) let conn = getConnection(peerId)
await conn.close() await conn.close()
expect CatchableError: expect CatchableError:
@ -52,7 +56,7 @@ suite "Connection Manager":
asyncTest "shouldn't allow an EOFed connection": asyncTest "shouldn't allow an EOFed connection":
let connMngr = ConnManager.new() let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn = Connection.new(peerId, Direction.In) let conn = getConnection(peerId)
conn.isEof = true conn.isEof = true
expect CatchableError: expect CatchableError:
@ -64,7 +68,7 @@ suite "Connection Manager":
asyncTest "add and retrieve a muxer": asyncTest "add and retrieve a muxer":
let connMngr = ConnManager.new() let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn = Connection.new(peerId, Direction.In) let conn = getConnection(peerId)
let muxer = new Muxer let muxer = new Muxer
muxer.connection = conn muxer.connection = conn
@ -80,7 +84,7 @@ suite "Connection Manager":
asyncTest "shouldn't allow a muxer for an untracked connection": asyncTest "shouldn't allow a muxer for an untracked connection":
let connMngr = ConnManager.new() let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn = Connection.new(peerId, Direction.In) let conn = getConnection(peerId)
let muxer = new Muxer let muxer = new Muxer
muxer.connection = conn muxer.connection = conn
@ -94,8 +98,8 @@ suite "Connection Manager":
asyncTest "get conn with direction": asyncTest "get conn with direction":
let connMngr = ConnManager.new() let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn1 = Connection.new(peerId, Direction.Out) let conn1 = getConnection(peerId, Direction.Out)
let conn2 = Connection.new(peerId, Direction.In) let conn2 = getConnection(peerId)
connMngr.storeConn(conn1) connMngr.storeConn(conn1)
connMngr.storeConn(conn2) connMngr.storeConn(conn2)
@ -114,7 +118,7 @@ suite "Connection Manager":
asyncTest "get muxed stream for peer": asyncTest "get muxed stream for peer":
let connMngr = ConnManager.new() let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn = Connection.new(peerId, Direction.In) let conn = getConnection(peerId)
let muxer = new TestMuxer let muxer = new TestMuxer
muxer.peerId = peerId muxer.peerId = peerId
@ -134,7 +138,7 @@ suite "Connection Manager":
asyncTest "get stream from directed connection": asyncTest "get stream from directed connection":
let connMngr = ConnManager.new() let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn = Connection.new(peerId, Direction.In) let conn = getConnection(peerId)
let muxer = new TestMuxer let muxer = new TestMuxer
muxer.peerId = peerId muxer.peerId = peerId
@ -155,7 +159,7 @@ suite "Connection Manager":
asyncTest "get stream from any connection": asyncTest "get stream from any connection":
let connMngr = ConnManager.new() let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn = Connection.new(peerId, Direction.In) let conn = getConnection(peerId)
let muxer = new TestMuxer let muxer = new TestMuxer
muxer.peerId = peerId muxer.peerId = peerId
@ -175,11 +179,11 @@ suite "Connection Manager":
let connMngr = ConnManager.new(maxConnsPerPeer = 1) let connMngr = ConnManager.new(maxConnsPerPeer = 1)
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
connMngr.storeConn(Connection.new(peerId, Direction.In)) connMngr.storeConn(getConnection(peerId))
let conns = @[ let conns = @[
Connection.new(peerId, Direction.In), getConnection(peerId),
Connection.new(peerId, Direction.In)] getConnection(peerId)]
expect TooManyConnectionsError: expect TooManyConnectionsError:
connMngr.storeConn(conns[0]) connMngr.storeConn(conns[0])
@ -193,7 +197,7 @@ suite "Connection Manager":
asyncTest "cleanup on connection close": asyncTest "cleanup on connection close":
let connMngr = ConnManager.new() let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet() let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn = Connection.new(peerId, Direction.In) let conn = getConnection(peerId)
let muxer = new Muxer let muxer = new Muxer
muxer.connection = conn muxer.connection = conn
@ -220,7 +224,7 @@ suite "Connection Manager":
Direction.In else: Direction.In else:
Direction.Out Direction.Out
let conn = Connection.new(peerId, dir) let conn = getConnection(peerId, dir)
let muxer = new Muxer let muxer = new Muxer
muxer.connection = conn muxer.connection = conn
@ -353,7 +357,7 @@ suite "Connection Manager":
let slot = await ((connMngr.getOutgoingSlot()).wait(10.millis)) let slot = await ((connMngr.getOutgoingSlot()).wait(10.millis))
let conn = let conn =
Connection.new( getConnection(
PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(), PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(),
Direction.In) Direction.In)