From 73168b6eae0017dd7faa21b3895e676236d69067 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 24 Nov 2021 14:01:12 -0600 Subject: [PATCH] Add support for multiple addresses to transports (#598) * add test for multiple local addresses * allow transports to listen on multiple addrs * fix tcp transport accept * check switch addrs are correct * switch test to port 0 * close accepted peers on close * ignore CancelledError in transport accept * test ci * only accept in accept loop * avoid accept greedyness * close acceptedPeers * accept doesn't crash on cancelled fut * add common transport test * close conn on handling failure * close accepted peers in two steps * test for macos * revert accept greedyness * fix dialing cancel * test chronos fix * add ws * ws cancellation * small fix * remove chronos blocked test * fix testping * Fix transport's switch start (like #609) * bump chronos * Websocket: handle both ws & wss Co-authored-by: Tanguy Cizain Co-authored-by: Tanguy --- libp2p.nimble | 2 +- libp2p/builders.nim | 5 +- libp2p/switch.nim | 37 ++++++---- libp2p/transports/tcptransport.nim | 75 ++++++++++++++----- libp2p/transports/transport.nim | 13 ++-- libp2p/transports/wstransport.nim | 115 ++++++++++++++++++++--------- tests/commontransport.nim | 79 ++++++++++++++++---- tests/testidentify.nim | 23 +++--- tests/testmplex.nim | 60 +++++++-------- tests/testmultistream.nim | 16 ++-- tests/testnoise.nim | 40 +++++----- tests/testping.nim | 12 +-- tests/testswitch.nim | 79 ++++++++++++++++++-- tests/testtcptransport.nim | 8 +- tests/testwstransport.nim | 6 +- 15 files changed, 394 insertions(+), 176 deletions(-) diff --git a/libp2p.nimble b/libp2p.nimble index 0a16a32c1..ef0a4efa2 100644 --- a/libp2p.nimble +++ b/libp2p.nimble @@ -12,7 +12,7 @@ requires "nim >= 1.2.0", "https://github.com/ba0f3/dnsclient.nim == 0.1.0", "bearssl >= 0.1.4", "chronicles#ba2817f1", - "chronos >= 2.5.2", + "chronos >= 3.0.6", "metrics", "secp256k1", "stew#head", diff --git a/libp2p/builders.nim b/libp2p/builders.nim index f722c66af..13c23327b 100644 --- a/libp2p/builders.nim +++ b/libp2p/builders.nim @@ -197,7 +197,7 @@ proc build*(b: SwitchBuilder): Switch proc newStandardSwitch*( privKey = none(PrivateKey), - address = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(), + addrs: MultiAddress | seq[MultiAddress] = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(), secureManagers: openarray[SecureProtocol] = [ SecureProtocol.Noise, ], @@ -214,9 +214,10 @@ proc newStandardSwitch*( if SecureProtocol.Secio in secureManagers: quit("Secio is deprecated!") # use of secio is unsafe + let addrs = when addrs is MultiAddress: @[addrs] else: addrs var b = SwitchBuilder .new() - .withAddress(address) + .withAddresses(addrs) .withRng(rng) .withMaxConnections(maxConnections) .withMaxIn(maxIn) diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 8f19c5f23..bc82d0961 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -11,6 +11,7 @@ import std/[tables, options, + sequtils, sets, oids, sugar, @@ -236,25 +237,33 @@ proc stop*(s: Switch) {.async.} = proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = trace "starting switch for peer", peerInfo = s.peerInfo var startFuts: seq[Future[void]] + for t in s.transports: + let addrs = s.peerInfo.addrs.filterIt( + t.handles(it) + ) + + s.peerInfo.addrs.keepItIf( + it notin addrs + ) + + if addrs.len > 0: + startFuts.add(t.start(addrs)) + + await allFutures(startFuts) + + for s in startFuts: + if s.failed: + info "Failed to start one transport", error=s.error.msg + for t in s.transports: # for each transport - for i, a in s.peerInfo.addrs: - if t.handles(a): # check if it handles the multiaddr - let transpStart = t.start(a) - startFuts.add(transpStart) - try: - await transpStart - s.peerInfo.addrs[i] = t.ma # update peer's address - s.acceptFuts.add(s.accept(t)) - except CancelledError as exc: - await s.stop() - raise exc - except CatchableError as exc: - debug "Failed to start one transport", address = $a, err = exc.msg - continue + if t.addrs.len > 0: + s.acceptFuts.add(s.accept(t)) + s.peerInfo.addrs &= t.addrs debug "Started libp2p node", peer = s.peerInfo return startFuts # listen for incoming connections + proc newSwitch*(peerInfo: PeerInfo, transports: seq[Transport], identity: Identify, diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index 9a19a6909..1c2fd3d40 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -32,9 +32,10 @@ const type TcpTransport* = ref object of Transport - server*: StreamServer + servers*: seq[StreamServer] clients: array[Direction, seq[StreamTransport]] flags: set[ServerFlags] + acceptFuts: seq[Future[StreamTransport]] TcpTransportTracker* = ref object of TrackerBase opened*: uint64 @@ -121,34 +122,42 @@ proc new*( let transport = T( flags: flags, - upgrader: upgrade - ) + upgrader: upgrade) inc getTcpTransportTracker().opened return transport method start*( self: TcpTransport, - ma: MultiAddress) {.async.} = + addrs: seq[MultiAddress]) {.async.} = ## listen on the transport ## if self.running: - trace "TCP transport already running" + warn "TCP transport already running" return - await procCall Transport(self).start(ma) + await procCall Transport(self).start(addrs) trace "Starting TCP transport" - self.server = createStreamServer( - ma = self.ma, - flags = self.flags, - udata = self) + for i, ma in addrs: + if not self.handles(ma): + trace "Invalid address detected, skipping!", address = ma + continue - # always get the resolved address in case we're bound to 0.0.0.0:0 - self.ma = MultiAddress.init(self.server.sock.getLocalAddress()).tryGet() + let server = createStreamServer( + ma = ma, + flags = self.flags, + udata = self) - trace "Listening on", address = self.ma + # always get the resolved address in case we're bound to 0.0.0.0:0 + self.addrs[i] = MultiAddress.init( + server.sock.getLocalAddress() + ).tryGet() + + self.servers &= server + + trace "Listening on", address = ma method stop*(self: TcpTransport) {.async, gcsafe.} = ## stop the transport @@ -163,11 +172,21 @@ method stop*(self: TcpTransport) {.async, gcsafe.} = self.clients[Direction.In].mapIt(it.closeWait()) & self.clients[Direction.Out].mapIt(it.closeWait()))) - # server can be nil - if not isNil(self.server): - await self.server.closeWait() + var toWait: seq[Future[void]] + for fut in self.acceptFuts: + if not fut.finished: + toWait.add(fut.cancelAndWait()) + elif fut.done: + toWait.add(fut.read().closeWait()) + + for server in self.servers: + server.stop() + toWait.add(server.closeWait()) + + await allFutures(toWait) + + self.servers = @[] - self.server = nil trace "Transport stopped" inc getTcpTransportTracker().closed except CatchableError as exc: @@ -181,7 +200,19 @@ method accept*(self: TcpTransport): Future[Connection] {.async, gcsafe.} = raise newTransportClosedError() try: - let transp = await self.server.accept() + if self.acceptFuts.len <= 0: + self.acceptFuts = self.servers.mapIt(it.accept()) + + if self.acceptFuts.len <= 0: + return + + let + finished = await one(self.acceptFuts) + index = self.acceptFuts.find(finished) + + self.acceptFuts[index] = self.servers[index].accept() + + let transp = await finished return await self.connHandler(transp, Direction.In) except TransportOsError as exc: # TODO: it doesn't sound like all OS errors @@ -193,6 +224,8 @@ method accept*(self: TcpTransport): Future[Connection] {.async, gcsafe.} = except TransportUseClosedError as exc: debug "Server was closed", exc = exc.msg raise newTransportClosedError(exc) + except CancelledError as exc: + raise except CatchableError as exc: debug "Unexpected error accepting connection", exc = exc.msg raise exc @@ -207,7 +240,11 @@ method dial*( trace "Dialing remote peer", address = $address let transp = await connect(address) - return await self.connHandler(transp, Direction.Out) + try: + return await self.connHandler(transp, Direction.Out) + except CatchableError as err: + await transp.closeWait() + raise err method handles*(t: TcpTransport, address: MultiAddress): bool {.gcsafe.} = if procCall Transport(t).handles(address): diff --git a/libp2p/transports/transport.nim b/libp2p/transports/transport.nim index 8ff1a6fd4..593a85fe6 100644 --- a/libp2p/transports/transport.nim +++ b/libp2p/transports/transport.nim @@ -22,10 +22,11 @@ logScope: type TransportError* = object of LPError + TransportInvalidAddrError* = object of TransportError TransportClosedError* = object of TransportError Transport* = ref object of RootObj - ma*: Multiaddress + addrs*: seq[Multiaddress] running*: bool upgrader*: Upgrade @@ -35,20 +36,20 @@ proc newTransportClosedError*(parent: ref Exception = nil): ref LPError = method start*( self: Transport, - ma: MultiAddress): Future[void] {.base, async.} = + addrs: seq[MultiAddress]) {.base, async.} = ## start the transport ## - trace "starting transport", address = $ma - self.ma = ma + trace "starting transport on addrs", address = $addrs + self.addrs = addrs self.running = true -method stop*(self: Transport): Future[void] {.base, async.} = +method stop*(self: Transport) {.base, async.} = ## stop and cleanup the transport ## including all outstanding connections ## - trace "stopping transport", address = $self.ma + trace "stopping transport", address = $self.addrs self.running = false method accept*(self: Transport): Future[Connection] diff --git a/libp2p/transports/wstransport.nim b/libp2p/transports/wstransport.nim index 7694e2f84..e68331c25 100644 --- a/libp2p/transports/wstransport.nim +++ b/libp2p/transports/wstransport.nim @@ -72,10 +72,12 @@ method closeImpl*(s: WsStream): Future[void] {.async.} = type WsTransport* = ref object of Transport - httpserver: HttpServer + httpservers: seq[HttpServer] wsserver: WSServer connections: array[Direction, seq[WsStream]] + acceptFuts: seq[Future[HttpRequest]] + tlsPrivateKey: TLSPrivateKey tlsCertificate: TLSCertificate tlsFlags: set[TLSFlags] @@ -88,42 +90,55 @@ proc secure*(self: WsTransport): bool = method start*( self: WsTransport, - ma: MultiAddress) {.async.} = + addrs: seq[MultiAddress]) {.async.} = ## listen on the transport ## if self.running: - trace "WS transport already running" + warn "WS transport already running" return - await procCall Transport(self).start(ma) + await procCall Transport(self).start(addrs) trace "Starting WS transport" - self.httpserver = - if self.secure: - TlsHttpServer.create( - address = self.ma.initTAddress().tryGet(), - tlsPrivateKey = self.tlsPrivateKey, - tlsCertificate = self.tlsCertificate, - flags = self.flags) - else: - HttpServer.create(self.ma.initTAddress().tryGet()) - self.wsserver = WSServer.new( factories = self.factories, rng = self.rng) - let codec = if self.secure: - MultiAddress.init("/wss") - else: - MultiAddress.init("/ws") + + for i, ma in addrs: + let isWss = + if WSS.match(ma): + if self.secure: true + else: + warn "Trying to listen on a WSS address without setting the certificate!" + false + else: false - # always get the resolved address in case we're bound to 0.0.0.0:0 - self.ma = MultiAddress.init( - self.httpserver.localAddress()).tryGet() & codec.tryGet() + let httpserver = + if isWss: + TlsHttpServer.create( + address = ma.initTAddress().tryGet(), + tlsPrivateKey = self.tlsPrivateKey, + tlsCertificate = self.tlsCertificate, + flags = self.flags) + else: + HttpServer.create(ma.initTAddress().tryGet()) + + self.httpservers &= httpserver + + let codec = if isWss: + MultiAddress.init("/wss") + else: + MultiAddress.init("/ws") + + # always get the resolved address in case we're bound to 0.0.0.0:0 + self.addrs[i] = MultiAddress.init( + httpserver.localAddress()).tryGet() & codec.tryGet() + + trace "Listening on", addresses = self.addrs self.running = true - trace "Listening on", address = self.ma method stop*(self: WsTransport) {.async, gcsafe.} = ## stop the transport @@ -140,24 +155,33 @@ method stop*(self: WsTransport) {.async, gcsafe.} = self.connections[Direction.In].mapIt(it.close()) & self.connections[Direction.Out].mapIt(it.close()))) - # server can be nil - if not isNil(self.httpserver): - self.httpserver.stop() - await self.httpserver.closeWait() + var toWait: seq[Future[void]] + for fut in self.acceptFuts: + if not fut.finished: + toWait.add(fut.cancelAndWait()) + elif fut.done: + toWait.add(fut.read().stream.closeWait()) - self.httpserver = nil + for server in self.httpservers: + server.stop() + toWait.add(server.closeWait()) + + await allFutures(toWait) + + self.httpservers = @[] trace "Transport stopped" except CatchableError as exc: trace "Error shutting down ws transport", exc = exc.msg proc connHandler(self: WsTransport, stream: WsSession, + secure: bool, dir: Direction): Future[Connection] {.async.} = let observedAddr = try: let codec = - if self.secure: + if secure: MultiAddress.init("/wss") else: MultiAddress.init("/ws") @@ -189,11 +213,29 @@ method accept*(self: WsTransport): Future[Connection] {.async, gcsafe.} = raise newTransportClosedError() try: - let - req = await self.httpserver.accept() - wstransp = await self.wsserver.handleRequest(req) + if self.acceptFuts.len <= 0: + self.acceptFuts = self.httpservers.mapIt(it.accept()) - return await self.connHandler(wstransp, Direction.In) + if self.acceptFuts.len <= 0: + return + + let + finished = await one(self.acceptFuts) + index = self.acceptFuts.find(finished) + + self.acceptFuts[index] = self.httpservers[index].accept() + + let req = await finished + + try: + let + wstransp = await self.wsserver.handleRequest(req) + isSecure = self.httpservers[index].secure + + return await self.connHandler(wstransp, isSecure, Direction.In) + except CatchableError as exc: + await req.stream.closeWait() + raise exc except TransportOsError as exc: debug "OS Error", exc = exc.msg except TransportTooManyError as exc: @@ -201,6 +243,9 @@ method accept*(self: WsTransport): Future[Connection] {.async, gcsafe.} = except TransportUseClosedError as exc: debug "Server was closed", exc = exc.msg raise newTransportClosedError(exc) + except CancelledError as exc: + # bubble up silently + raise exc except CatchableError as exc: warn "Unexpected error accepting connection", exc = exc.msg raise exc @@ -223,7 +268,11 @@ method dial*( hostName = hostname, flags = self.tlsFlags) - return await self.connHandler(transp, Direction.Out) + try: + return await self.connHandler(transp, secure, Direction.Out) + except CatchableError as exc: + await transp.close() + raise exc method handles*(t: WsTransport, address: MultiAddress): bool {.gcsafe.} = if procCall Transport(t).handles(address): diff --git a/tests/commontransport.nim b/tests/commontransport.nim index 70db2a211..2e94f4c7b 100644 --- a/tests/commontransport.nim +++ b/tests/commontransport.nim @@ -19,14 +19,14 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) = checkTrackers() asyncTest "can handle local address": - let ma: MultiAddress = Multiaddress.init(ma).tryGet() + let ma = @[Multiaddress.init(ma).tryGet()] let transport1 = prov() await transport1.start(ma) - check transport1.handles(transport1.ma) + check transport1.handles(transport1.addrs[0]) await transport1.stop() asyncTest "e2e: handle observedAddr": - let ma: MultiAddress = Multiaddress.init(ma).tryGet() + let ma = @[Multiaddress.init(ma).tryGet()] let transport1 = prov() await transport1.start(ma) @@ -40,7 +40,7 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) = let handlerWait = acceptHandler() - let conn = await transport2.dial(transport1.ma) + let conn = await transport2.dial(transport1.addrs[0]) check transport2.handles(conn.observedAddr) @@ -54,7 +54,7 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) = await handlerWait.wait(1.seconds) # when no issues will not wait that long! asyncTest "e2e: handle write": - let ma: MultiAddress = Multiaddress.init(ma).tryGet() + let ma = @[Multiaddress.init(ma).tryGet()] let transport1 = prov() await transport1.start(ma) @@ -67,7 +67,7 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) = let handlerWait = acceptHandler() let transport2 = prov() - let conn = await transport2.dial(transport1.ma) + let conn = await transport2.dial(transport1.addrs[0]) var msg = newSeq[byte](6) await conn.readExactly(addr msg[0], 6) @@ -82,7 +82,7 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) = await handlerWait.wait(1.seconds) # when no issues will not wait that long! asyncTest "e2e: handle read": - let ma: MultiAddress = Multiaddress.init(ma).tryGet() + let ma = @[Multiaddress.init(ma).tryGet()] let transport1 = prov() await transport1.start(ma) @@ -96,7 +96,7 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) = let handlerWait = acceptHandler() let transport2 = prov() - let conn = await transport2.dial(transport1.ma) + let conn = await transport2.dial(transport1.addrs[0]) await conn.write("Hello!") await conn.close() #for some protocols, closing requires actively reading, so we must close here @@ -108,13 +108,13 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) = transport2.stop())) asyncTest "e2e: handle dial cancellation": - let ma: MultiAddress = Multiaddress.init(ma).tryGet() + let ma = @[Multiaddress.init(ma).tryGet()] let transport1 = prov() await transport1.start(ma) let transport2 = prov() - let cancellation = transport2.dial(transport1.ma) + let cancellation = transport2.dial(transport1.addrs[0]) await cancellation.cancelAndWait() check cancellation.cancelled @@ -125,7 +125,7 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) = transport2.stop())) asyncTest "e2e: handle accept cancellation": - let ma: MultiAddress = Multiaddress.init(ma).tryGet() + let ma = @[Multiaddress.init(ma).tryGet()] let transport1 = prov() await transport1.start(ma) @@ -136,8 +136,57 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) = await transport1.stop() + asyncTest "e2e should allow multiple local addresses": + let addrs = @[MultiAddress.init(ma).tryGet(), + MultiAddress.init(ma).tryGet()] + + + let transport1 = prov() + await transport1.start(addrs) + + proc acceptHandler() {.async, gcsafe.} = + while true: + let conn = await transport1.accept() + await conn.write("Hello!") + await conn.close() + + let handlerWait = acceptHandler() + + check transport1.addrs.len == 2 + check transport1.addrs[0] != transport1.addrs[1] + + var msg = newSeq[byte](6) + + proc client(ma: MultiAddress) {.async.} = + let conn1 = await transport1.dial(ma) + await conn1.readExactly(addr msg[0], 6) + check string.fromBytes(msg) == "Hello!" + await conn1.close() + + #Dial the same server multiple time in a row + await client(transport1.addrs[0]) + await client(transport1.addrs[0]) + await client(transport1.addrs[0]) + + #Dial the same server on different addresses + await client(transport1.addrs[1]) + await client(transport1.addrs[0]) + await client(transport1.addrs[1]) + + #Cancel a dial + #TODO add back once chronos fixes cancellation + #let + # dial1 = transport1.dial(transport1.addrs[1]) + # dial2 = transport1.dial(transport1.addrs[0]) + #await dial1.cancelAndWait() + #await dial2.cancelAndWait() + + await handlerWait.cancelAndWait() + + await transport1.stop() + asyncTest "e2e: stopping transport kills connections": - let ma: MultiAddress = Multiaddress.init(ma).tryGet() + let ma = @[Multiaddress.init(ma).tryGet()] let transport1 = prov() await transport1.start(ma) @@ -145,7 +194,7 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) = let transport2 = prov() let acceptHandler = transport1.accept() - let conn = await transport2.dial(transport1.ma) + let conn = await transport2.dial(transport1.addrs[0]) let serverConn = await acceptHandler await allFuturesThrowing( @@ -157,7 +206,7 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) = check conn.closed() asyncTest "read or write on closed connection": - let ma: MultiAddress = Multiaddress.init(ma).tryGet() + let ma = @[Multiaddress.init(ma).tryGet()] let transport1 = prov() await transport1.start(ma) @@ -167,7 +216,7 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) = let handlerWait = acceptHandler() - let conn = await transport1.dial(transport1.ma) + let conn = await transport1.dial(transport1.addrs[0]) var msg = newSeq[byte](6) try: diff --git a/tests/testidentify.nim b/tests/testidentify.nim index f4ab7aa55..36cd80a93 100644 --- a/tests/testidentify.nim +++ b/tests/testidentify.nim @@ -22,7 +22,7 @@ suite "Identify": suite "handle identify message": var - ma {.threadvar.}: MultiAddress + ma {.threadvar.}: seq[MultiAddress] remoteSecKey {.threadvar.}: PrivateKey remotePeerInfo {.threadvar.}: PeerInfo serverFut {.threadvar.}: Future[void] @@ -36,10 +36,12 @@ suite "Identify": conn {.threadvar.}: Connection asyncSetup: - ma = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] remoteSecKey = PrivateKey.random(ECDSA, rng[]).get() remotePeerInfo = PeerInfo.new( - remoteSecKey, [ma], ["/test/proto1/1.0.0", "/test/proto2/1.0.0"]) + remoteSecKey, + ma, + ["/test/proto1/1.0.0", "/test/proto2/1.0.0"]) transport1 = TcpTransport.new(upgrade = Upgrade()) transport2 = TcpTransport.new(upgrade = Upgrade()) @@ -65,13 +67,13 @@ suite "Identify": await msListen.handle(c) acceptFut = acceptHandler() - conn = await transport2.dial(transport1.ma) + conn = await transport2.dial(transport1.addrs[0]) discard await msDial.select(conn, IdentifyCodec) let id = await identifyProto2.identify(conn, remotePeerInfo.peerId) check id.pubKey.get() == remoteSecKey.getPublicKey().get() - check id.addrs[0] == ma + check id.addrs == ma check id.protoVersion.get() == ProtoVersion check id.agentVersion.get() == AgentVersion check id.protos == @["/test/proto1/1.0.0", "/test/proto2/1.0.0"] @@ -88,13 +90,13 @@ suite "Identify": await msListen.handle(c) acceptFut = acceptHandler() - conn = await transport2.dial(transport1.ma) + conn = await transport2.dial(transport1.addrs[0]) discard await msDial.select(conn, IdentifyCodec) let id = await identifyProto2.identify(conn, remotePeerInfo.peerId) check id.pubKey.get() == remoteSecKey.getPublicKey().get() - check id.addrs[0] == ma + check id.addrs == ma check id.protoVersion.get() == ProtoVersion check id.agentVersion.get() == customAgentVersion check id.protos == @["/test/proto1/1.0.0", "/test/proto2/1.0.0"] @@ -114,7 +116,7 @@ suite "Identify": await conn.close() acceptFut = acceptHandler() - conn = await transport2.dial(transport1.ma) + conn = await transport2.dial(transport1.addrs[0]) expect IdentityNoMatchError: let pi2 = PeerInfo.new(PrivateKey.random(ECDSA, rng[]).get()) @@ -147,7 +149,10 @@ suite "Identify": awaiters.add(await switch1.start()) awaiters.add(await switch2.start()) - conn = await switch2.dial(switch1.peerInfo.peerId, switch1.peerInfo.addrs, IdentifyPushCodec) + conn = await switch2.dial( + switch1.peerInfo.peerId, + switch1.peerInfo.addrs, + IdentifyPushCodec) check: switch1.peerStore.addressBook.get(switch2.peerInfo.peerId) == switch2.peerInfo.addrs.toHashSet() diff --git a/tests/testmplex.nim b/tests/testmplex.nim index ed15df404..614101298 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -378,7 +378,7 @@ suite "Mplex": suite "mplex e2e": asyncTest "read/write receiver": - let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let listenFut = transport1.start(ma) @@ -397,7 +397,7 @@ suite "Mplex": let acceptFut = acceptHandler() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) - let conn = await transport2.dial(transport1.ma) + let conn = await transport2.dial(transport1.addrs[0]) let mplexDial = Mplex.new(conn) let mplexDialFut = mplexDial.handle() @@ -415,7 +415,7 @@ suite "Mplex": await listenFut asyncTest "read/write receiver lazy": - let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let listenFut = transport1.start(ma) @@ -434,7 +434,7 @@ suite "Mplex": let acceptFut = acceptHandler() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) - let conn = await transport2.dial(transport1.ma) + let conn = await transport2.dial(transport1.addrs[0]) let mplexDial = Mplex.new(conn) let stream = await mplexDial.newStream(lazy = true) @@ -454,7 +454,7 @@ suite "Mplex": asyncTest "write fragmented": let - ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] listenJob = newFuture[void]() var bigseq = newSeqOfCap[uint8](MaxMsgSize * 2) @@ -486,7 +486,7 @@ suite "Mplex": let acceptFut = acceptHandler() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) - let conn = await transport2.dial(transport1.ma) + let conn = await transport2.dial(transport1.addrs[0]) let mplexDial = Mplex.new(conn) let mplexDialFut = mplexDial.handle() @@ -506,7 +506,7 @@ suite "Mplex": await listenFut asyncTest "read/write initiator": - let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let listenFut = transport1.start(ma) @@ -523,7 +523,7 @@ suite "Mplex": await mplexListen.close() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) - let conn = await transport2.dial(transport1.ma) + let conn = await transport2.dial(transport1.addrs[0]) let acceptFut = acceptHandler() let mplexDial = Mplex.new(conn) @@ -542,7 +542,7 @@ suite "Mplex": await listenFut asyncTest "multiple streams": - let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1 = TcpTransport.new(upgrade = Upgrade()) let listenFut = transport1.start(ma) @@ -565,7 +565,7 @@ suite "Mplex": await mplexListen.close() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) - let conn = await transport2.dial(transport1.ma) + let conn = await transport2.dial(transport1.addrs[0]) let acceptFut = acceptHandler() let mplexDial = Mplex.new(conn) @@ -586,7 +586,7 @@ suite "Mplex": await listenFut asyncTest "multiple read/write streams": - let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let listenFut = transport1.start(ma) @@ -610,7 +610,7 @@ suite "Mplex": await mplexListen.close() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) - let conn = await transport2.dial(transport1.ma) + let conn = await transport2.dial(transport1.addrs[0]) let acceptFut = acceptHandler() let mplexDial = Mplex.new(conn) @@ -633,7 +633,7 @@ suite "Mplex": await listenFut asyncTest "channel closes listener with EOF": - let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1 = TcpTransport.new(upgrade = Upgrade()) var listenStreams: seq[Connection] @@ -658,7 +658,7 @@ suite "Mplex": await transport1.start(ma) let acceptFut = acceptHandler() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) - let conn = await transport2.dial(transport1.ma) + let conn = await transport2.dial(transport1.addrs[0]) let mplexDial = Mplex.new(conn) let mplexDialFut = mplexDial.handle() @@ -681,7 +681,7 @@ suite "Mplex": await acceptFut asyncTest "channel closes dialer with EOF": - let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1 = TcpTransport.new(upgrade = Upgrade()) var count = 0 @@ -706,7 +706,7 @@ suite "Mplex": let acceptFut = acceptHandler() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) - let conn = await transport2.dial(transport1.ma) + let conn = await transport2.dial(transport1.addrs[0]) let mplexDial = Mplex.new(conn) let mplexDialFut = mplexDial.handle() @@ -746,7 +746,7 @@ suite "Mplex": await acceptFut asyncTest "dialing mplex closes both ends": - let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1 = TcpTransport.new(upgrade = Upgrade()) var listenStreams: seq[Connection] @@ -765,7 +765,7 @@ suite "Mplex": let acceptFut = acceptHandler() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) - let conn = await transport2.dial(transport1.ma) + let conn = await transport2.dial(transport1.addrs[0]) let mplexDial = Mplex.new(conn) let mplexDialFut = mplexDial.handle() @@ -788,7 +788,7 @@ suite "Mplex": await acceptFut asyncTest "listening mplex closes both ends": - let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1 = TcpTransport.new(upgrade = Upgrade()) var mplexListen: Mplex @@ -808,7 +808,7 @@ suite "Mplex": let acceptFut = acceptHandler() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) - let conn = await transport2.dial(transport1.ma) + let conn = await transport2.dial(transport1.addrs[0]) let mplexDial = Mplex.new(conn) let mplexDialFut = mplexDial.handle() @@ -831,7 +831,7 @@ suite "Mplex": await acceptFut asyncTest "canceling mplex handler closes both ends": - let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1 = TcpTransport.new(upgrade = Upgrade()) var mplexHandle: Future[void] @@ -852,7 +852,7 @@ suite "Mplex": let acceptFut = acceptHandler() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) - let conn = await transport2.dial(transport1.ma) + let conn = await transport2.dial(transport1.addrs[0]) let mplexDial = Mplex.new(conn) let mplexDialFut = mplexDial.handle() @@ -874,7 +874,7 @@ suite "Mplex": transport2.stop()) asyncTest "closing dialing connection should close both ends": - let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1 = TcpTransport.new(upgrade = Upgrade()) var listenStreams: seq[Connection] @@ -893,7 +893,7 @@ suite "Mplex": let acceptFut = acceptHandler() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) - let conn = await transport2.dial(transport1.ma) + let conn = await transport2.dial(transport1.addrs[0]) let mplexDial = Mplex.new(conn) let mplexDialFut = mplexDial.handle() @@ -916,7 +916,7 @@ suite "Mplex": await acceptFut asyncTest "canceling listening connection should close both ends": - let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1 = TcpTransport.new(upgrade = Upgrade()) var listenConn: Connection @@ -936,7 +936,7 @@ suite "Mplex": let acceptFut = acceptHandler() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) - let conn = await transport2.dial(transport1.ma) + let conn = await transport2.dial(transport1.addrs[0]) let mplexDial = Mplex.new(conn) let mplexDialFut = mplexDial.handle() @@ -961,7 +961,7 @@ suite "Mplex": suite "jitter": asyncTest "channel should be able to handle erratic read/writes": - let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let listenFut = transport1.start(ma) @@ -985,7 +985,7 @@ suite "Mplex": await mplexListen.close() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) - let conn = await transport2.dial(transport1.ma) + let conn = await transport2.dial(transport1.addrs[0]) let acceptFut = acceptHandler() let mplexDial = Mplex.new(conn) @@ -1033,7 +1033,7 @@ suite "Mplex": await listenFut asyncTest "channel should handle 1 byte read/write": - let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let listenFut = transport1.start(ma) @@ -1054,7 +1054,7 @@ suite "Mplex": await mplexListen.close() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) - let conn = await transport2.dial(transport1.ma) + let conn = await transport2.dial(transport1.addrs[0]) let acceptFut = acceptHandler() let mplexDial = Mplex.new(conn) diff --git a/tests/testmultistream.nim b/tests/testmultistream.nim index 3f0b5ef78..c0be9f5b2 100644 --- a/tests/testmultistream.nim +++ b/tests/testmultistream.nim @@ -234,7 +234,7 @@ suite "Multistream select": await ms.handle(conn) asyncTest "e2e - handle": - let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] var protocol: LPProtocol = new LPProtocol proc testHandler(conn: Connection, @@ -260,7 +260,7 @@ suite "Multistream select": let msDial = MultistreamSelect.new() let transport2 = TcpTransport.new(upgrade = Upgrade()) - let conn = await transport2.dial(transport1.ma) + let conn = await transport2.dial(transport1.addrs[0]) check (await msDial.select(conn, "/test/proto/1.0.0")) == true @@ -274,7 +274,7 @@ suite "Multistream select": await handlerWait.wait(30.seconds) asyncTest "e2e - ls": - let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let handlerWait = newFuture[void]() @@ -312,7 +312,7 @@ suite "Multistream select": let acceptFut = acceptHandler() let msDial = MultistreamSelect.new() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) - let conn = await transport2.dial(transport1.ma) + let conn = await transport2.dial(transport1.addrs[0]) let ls = await msDial.list(conn) let protos: seq[string] = @["/test/proto1/1.0.0", "/test/proto2/1.0.0"] @@ -326,7 +326,7 @@ suite "Multistream select": await listenFut.wait(5.seconds) asyncTest "e2e - select one from a list with unsupported protos": - let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] var protocol: LPProtocol = new LPProtocol proc testHandler(conn: Connection, @@ -350,7 +350,7 @@ suite "Multistream select": let acceptFut = acceptHandler() let msDial = MultistreamSelect.new() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) - let conn = await transport2.dial(transport1.ma) + let conn = await transport2.dial(transport1.addrs[0]) check (await msDial.select(conn, @["/test/proto/1.0.0", "/test/no/proto/1.0.0"])) == "/test/proto/1.0.0" @@ -364,7 +364,7 @@ suite "Multistream select": await transport1.stop() asyncTest "e2e - select one with both valid": - let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] var protocol: LPProtocol = new LPProtocol proc testHandler(conn: Connection, @@ -388,7 +388,7 @@ suite "Multistream select": let acceptFut = acceptHandler() let msDial = MultistreamSelect.new() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) - let conn = await transport2.dial(transport1.ma) + let conn = await transport2.dial(transport1.addrs[0]) check (await msDial.select(conn, @[ diff --git a/tests/testnoise.nim b/tests/testnoise.nim index 5d65c751c..5db1f5057 100644 --- a/tests/testnoise.nim +++ b/tests/testnoise.nim @@ -88,9 +88,9 @@ suite "Noise": asyncTest "e2e: handle write + noise": let - server = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + server = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] serverPrivKey = PrivateKey.random(ECDSA, rng[]).get() - serverInfo = PeerInfo.new(serverPrivKey, [server]) + serverInfo = PeerInfo.new(serverPrivKey, server) serverNoise = Noise.new(rng, serverPrivKey, outgoing = false) let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) @@ -109,9 +109,9 @@ suite "Noise": acceptFut = acceptHandler() transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) clientPrivKey = PrivateKey.random(ECDSA, rng[]).get() - clientInfo = PeerInfo.new(clientPrivKey, [transport1.ma]) + clientInfo = PeerInfo.new(clientPrivKey, transport1.addrs) clientNoise = Noise.new(rng, clientPrivKey, outgoing = true) - conn = await transport2.dial(transport1.ma) + conn = await transport2.dial(transport1.addrs[0]) conn.peerId = serverInfo.peerId let sconn = await clientNoise.secure(conn, true) @@ -129,9 +129,9 @@ suite "Noise": asyncTest "e2e: handle write + noise (wrong prologue)": let - server = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + server = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] serverPrivKey = PrivateKey.random(ECDSA, rng[]).get() - serverInfo = PeerInfo.new(serverPrivKey, [server]) + serverInfo = PeerInfo.new(serverPrivKey, server) serverNoise = Noise.new(rng, serverPrivKey, outgoing = false) let @@ -153,9 +153,9 @@ suite "Noise": handlerWait = acceptHandler() transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) clientPrivKey = PrivateKey.random(ECDSA, rng[]).get() - clientInfo = PeerInfo.new(clientPrivKey, [transport1.ma]) + clientInfo = PeerInfo.new(clientPrivKey, transport1.addrs) clientNoise = Noise.new(rng, clientPrivKey, outgoing = true, commonPrologue = @[1'u8, 2'u8, 3'u8]) - conn = await transport2.dial(transport1.ma) + conn = await transport2.dial(transport1.addrs[0]) conn.peerId = serverInfo.peerId var sconn: Connection = nil @@ -169,9 +169,9 @@ suite "Noise": asyncTest "e2e: handle read + noise": let - server = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + server = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] serverPrivKey = PrivateKey.random(ECDSA, rng[]).get() - serverInfo = PeerInfo.new(serverPrivKey, [server]) + serverInfo = PeerInfo.new(serverPrivKey, server) serverNoise = Noise.new(rng, serverPrivKey, outgoing = false) readTask = newFuture[void]() @@ -193,9 +193,9 @@ suite "Noise": acceptFut = acceptHandler() transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) clientPrivKey = PrivateKey.random(ECDSA, rng[]).get() - clientInfo = PeerInfo.new(clientPrivKey, [transport1.ma]) + clientInfo = PeerInfo.new(clientPrivKey, transport1.addrs) clientNoise = Noise.new(rng, clientPrivKey, outgoing = true) - conn = await transport2.dial(transport1.ma) + conn = await transport2.dial(transport1.addrs[0]) conn.peerId = serverInfo.peerId let sconn = await clientNoise.secure(conn, true) @@ -208,9 +208,9 @@ suite "Noise": asyncTest "e2e: handle read + noise fragmented": let - server = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + server = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] serverPrivKey = PrivateKey.random(ECDSA, rng[]).get() - serverInfo = PeerInfo.new(serverPrivKey, [server]) + serverInfo = PeerInfo.new(serverPrivKey, server) serverNoise = Noise.new(rng, serverPrivKey, outgoing = false) readTask = newFuture[void]() @@ -235,9 +235,9 @@ suite "Noise": acceptFut = acceptHandler() transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) clientPrivKey = PrivateKey.random(ECDSA, rng[]).get() - clientInfo = PeerInfo.new(clientPrivKey, [transport1.ma]) + clientInfo = PeerInfo.new(clientPrivKey, transport1.addrs) clientNoise = Noise.new(rng, clientPrivKey, outgoing = true) - conn = await transport2.dial(transport1.ma) + conn = await transport2.dial(transport1.addrs[0]) conn.peerId = serverInfo.peerId let sconn = await clientNoise.secure(conn, true) @@ -252,8 +252,8 @@ suite "Noise": await listenFut asyncTest "e2e use switch dial proto string": - let ma1: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() - let ma2: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let ma1 = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let ma2 = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() var peerInfo1, peerInfo2: PeerInfo var switch1, switch2: Switch @@ -280,8 +280,8 @@ suite "Noise": await allFuturesThrowing(awaiters) asyncTest "e2e test wrong secure negotiation": - let ma1: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() - let ma2: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let ma1 = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let ma2 = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() var peerInfo1, peerInfo2: PeerInfo var switch1, switch2: Switch diff --git a/tests/testping.nim b/tests/testping.nim index 5ad36ed31..5c3a63c32 100644 --- a/tests/testping.nim +++ b/tests/testping.nim @@ -56,13 +56,13 @@ suite "Ping": asyncTest "simple ping": msListen.addHandler(PingCodec, pingProto1) - serverFut = transport1.start(ma) + serverFut = transport1.start(@[ma]) proc acceptHandler(): Future[void] {.async, gcsafe.} = let c = await transport1.accept() await msListen.handle(c) acceptFut = acceptHandler() - conn = await transport2.dial(transport1.ma) + conn = await transport2.dial(transport1.addrs[0]) discard await msDial.select(conn, PingCodec) let time = await pingProto2.ping(conn) @@ -71,14 +71,14 @@ suite "Ping": asyncTest "ping callback": msDial.addHandler(PingCodec, pingProto2) - serverFut = transport1.start(ma) + serverFut = transport1.start(@[ma]) proc acceptHandler(): Future[void] {.async, gcsafe.} = let c = await transport1.accept() discard await msListen.select(c, PingCodec) discard await pingProto1.ping(c) acceptFut = acceptHandler() - conn = await transport2.dial(transport1.ma) + conn = await transport2.dial(transport1.addrs[0]) await msDial.handle(conn) check pingReceivedCount == 1 @@ -96,13 +96,13 @@ suite "Ping": fakePingProto.handler = fakeHandle msListen.addHandler(PingCodec, fakePingProto) - serverFut = transport1.start(ma) + serverFut = transport1.start(@[ma]) proc acceptHandler(): Future[void] {.async, gcsafe.} = let c = await transport1.accept() await msListen.handle(c) acceptFut = acceptHandler() - conn = await transport2.dial(transport1.ma) + conn = await transport2.dial(transport1.addrs[0]) discard await msDial.select(conn, PingCodec) let p = pingProto2.ping(conn) diff --git a/tests/testswitch.nim b/tests/testswitch.nim index 5b1fb072a..9a21afdb1 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -625,7 +625,7 @@ suite "Switch": # for most of the steps in the upgrade flow - # this is just a basic test for dials asyncTest "e2e canceling dial should not leak": - let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport = TcpTransport.new(upgrade = Upgrade()) await transport.start(ma) @@ -645,7 +645,7 @@ suite "Switch": awaiters.add(await switch.start()) var peerId = PeerID.init(PrivateKey.random(ECDSA, rng[]).get()).get() - let connectFut = switch.connect(peerId, @[transport.ma]) + let connectFut = switch.connect(peerId, transport.addrs) await sleepAsync(500.millis) connectFut.cancel() await handlerWait @@ -662,7 +662,7 @@ suite "Switch": await allFuturesThrowing(awaiters) asyncTest "e2e closing remote conn should not leak": - let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport = TcpTransport.new(upgrade = Upgrade()) await transport.start(ma) @@ -679,7 +679,7 @@ suite "Switch": var peerId = PeerID.init(PrivateKey.random(ECDSA, rng[]).get()).get() expect LPStreamClosedError: - await switch.connect(peerId, @[transport.ma]) + await switch.connect(peerId, transport.addrs) await handlerWait @@ -905,6 +905,73 @@ suite "Switch": switch1.peerStore.protoBook.get(switch2.peerInfo.peerId) == switch2.peerInfo.protocols.toHashSet() switch2.peerStore.protoBook.get(switch1.peerInfo.peerId) == switch1.peerInfo.protocols.toHashSet() + asyncTest "e2e should allow multiple local addresses": + proc handle(conn: Connection, proto: string) {.async, gcsafe.} = + try: + let msg = string.fromBytes(await conn.readLp(1024)) + check "Hello!" == msg + await conn.writeLp("Hello!") + finally: + await conn.close() + + let testProto = new TestProto + testProto.codec = TestCodec + testProto.handler = handle + + let addrs = @[MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(), + MultiAddress.init("/ip6/::1/tcp/0").tryGet()] + + let switch1 = newStandardSwitch( + addrs = addrs, + transportFlags = {ServerFlags.ReuseAddr, ServerFlags.ReusePort}) + + switch1.mount(testProto) + + let switch2 = newStandardSwitch() + let switch3 = newStandardSwitch( + addrs = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet() + ) + + await allFuturesThrowing( + switch1.start(), + switch2.start(), + switch3.start()) + + check IP4.matchPartial(switch1.peerInfo.addrs[0]) + check IP6.matchPartial(switch1.peerInfo.addrs[1]) + + let conn = await switch2.dial( + switch1.peerInfo.peerId, + @[switch1.peerInfo.addrs[0]], + TestCodec) + + check switch1.isConnected(switch2.peerInfo.peerId) + check switch2.isConnected(switch1.peerInfo.peerId) + + await conn.writeLp("Hello!") + check "Hello!" == string.fromBytes(await conn.readLp(1024)) + await conn.close() + + let connv6 = await switch3.dial( + switch1.peerInfo.peerId, + @[switch1.peerInfo.addrs[1]], + TestCodec) + + check switch1.isConnected(switch3.peerInfo.peerId) + check switch3.isConnected(switch1.peerInfo.peerId) + + await connv6.writeLp("Hello!") + check "Hello!" == string.fromBytes(await connv6.readLp(1024)) + await connv6.close() + + await allFuturesThrowing( + switch1.stop(), + switch2.stop(), + switch3.stop()) + + check not switch1.isConnected(switch2.peerInfo.peerId) + check not switch2.isConnected(switch1.peerInfo.peerId) + asyncTest "e2e dial dns4 address": var awaiters: seq[Future[void]] let resolver = MockResolver.new() @@ -963,8 +1030,8 @@ suite "Switch": await allFuturesThrowing(awaiters) resolver.txtResponses["_dnsaddr.test.io"] = @[ - "dnsaddr=/ip4/127.0.0.1" & $destSwitch.peerInfo.addrs[1][1].tryGet() & "/ws", - "dnsaddr=/ip4/127.0.0.1" & $destSwitch.peerInfo.addrs[0][1].tryGet() + "dnsaddr=" & $destSwitch.peerInfo.addrs[0], + "dnsaddr=" & $destSwitch.peerInfo.addrs[1] ] let testAddr = MultiAddress.init("/dnsaddr/test.io/").tryGet() diff --git a/tests/testtcptransport.nim b/tests/testtcptransport.nim index fdd23d13f..09f1f8782 100644 --- a/tests/testtcptransport.nim +++ b/tests/testtcptransport.nim @@ -17,7 +17,7 @@ suite "TCP transport": checkTrackers() asyncTest "test listener: handle write": - let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport: TcpTransport = TcpTransport.new(upgrade = Upgrade()) asyncSpawn transport.start(ma) @@ -28,7 +28,7 @@ suite "TCP transport": let handlerWait = acceptHandler() - let streamTransport = await connect(transport.ma) + let streamTransport = await connect(transport.addrs[0]) let msg = await streamTransport.read(6) @@ -38,7 +38,7 @@ suite "TCP transport": check string.fromBytes(msg) == "Hello!" asyncTest "test listener: handle read": - let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport: TcpTransport = TcpTransport.new(upgrade = Upgrade()) asyncSpawn transport.start(ma) @@ -51,7 +51,7 @@ suite "TCP transport": await conn.close() let handlerWait = acceptHandler() - let streamTransport: StreamTransport = await connect(transport.ma) + let streamTransport: StreamTransport = await connect(transport.addrs[0]) let sent = await streamTransport.write("Hello!") await handlerWait.wait(1.seconds) # when no issues will not wait that long! diff --git a/tests/testwstransport.nim b/tests/testwstransport.nim index aaf95765a..ab3b9d78e 100644 --- a/tests/testwstransport.nim +++ b/tests/testwstransport.nim @@ -71,7 +71,7 @@ suite "WebSocket transport": "/ip4/0.0.0.0/tcp/0/wss") asyncTest "Hostname verification": - let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0/wss").tryGet() + let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0/wss").tryGet()] let transport1 = WsTransport.new(Upgrade(), TLSPrivateKey.init(SecureKey), TLSCertificate.init(SecureCert), {TLSFlags.NoVerifyHost}) await transport1.start(ma) @@ -84,12 +84,12 @@ suite "WebSocket transport": let handlerWait = acceptHandler() # ws.test is in certificate - let conn = await transport1.dial("ws.test", transport1.ma) + let conn = await transport1.dial("ws.test", transport1.addrs[0]) await conn.close() try: - let conn = await transport1.dial("ws.wronghostname", transport1.ma) + let conn = await transport1.dial("ws.wronghostname", transport1.addrs[0]) check false except CatchableError as exc: check true