Add support for multiple addresses to transports (#598)

* add test for multiple local addresses

* allow transports to listen on multiple addrs

* fix tcp transport accept

* check switch addrs are correct

* switch test to port 0

* close accepted peers on close

* ignore CancelledError in transport accept

* test ci

* only accept in accept loop

* avoid accept greedyness

* close acceptedPeers

* accept doesn't crash on cancelled fut

* add common transport test

* close conn on handling failure

* close accepted peers in two steps

* test for macos

* revert accept greedyness

* fix dialing cancel

* test chronos fix

* add ws

* ws cancellation

* small fix

* remove chronos blocked test

* fix testping

* Fix transport's switch start (like #609)

* bump chronos

* Websocket: handle both ws & wss

Co-authored-by: Tanguy Cizain <tanguycizain@gmail.com>
Co-authored-by: Tanguy <tanguy@status.im>
This commit is contained in:
Dmitriy Ryajov 2021-11-24 14:01:12 -06:00 committed by GitHub
parent 98184d9dfd
commit 73168b6eae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 394 additions and 176 deletions

View File

@ -12,7 +12,7 @@ requires "nim >= 1.2.0",
"https://github.com/ba0f3/dnsclient.nim == 0.1.0", "https://github.com/ba0f3/dnsclient.nim == 0.1.0",
"bearssl >= 0.1.4", "bearssl >= 0.1.4",
"chronicles#ba2817f1", "chronicles#ba2817f1",
"chronos >= 2.5.2", "chronos >= 3.0.6",
"metrics", "metrics",
"secp256k1", "secp256k1",
"stew#head", "stew#head",

View File

@ -197,7 +197,7 @@ proc build*(b: SwitchBuilder): Switch
proc newStandardSwitch*( proc newStandardSwitch*(
privKey = none(PrivateKey), privKey = none(PrivateKey),
address = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(), addrs: MultiAddress | seq[MultiAddress] = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
secureManagers: openarray[SecureProtocol] = [ secureManagers: openarray[SecureProtocol] = [
SecureProtocol.Noise, SecureProtocol.Noise,
], ],
@ -214,9 +214,10 @@ proc newStandardSwitch*(
if SecureProtocol.Secio in secureManagers: if SecureProtocol.Secio in secureManagers:
quit("Secio is deprecated!") # use of secio is unsafe quit("Secio is deprecated!") # use of secio is unsafe
let addrs = when addrs is MultiAddress: @[addrs] else: addrs
var b = SwitchBuilder var b = SwitchBuilder
.new() .new()
.withAddress(address) .withAddresses(addrs)
.withRng(rng) .withRng(rng)
.withMaxConnections(maxConnections) .withMaxConnections(maxConnections)
.withMaxIn(maxIn) .withMaxIn(maxIn)

View File

@ -11,6 +11,7 @@
import std/[tables, import std/[tables,
options, options,
sequtils,
sets, sets,
oids, oids,
sugar, sugar,
@ -236,25 +237,33 @@ proc stop*(s: Switch) {.async.} =
proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
trace "starting switch for peer", peerInfo = s.peerInfo trace "starting switch for peer", peerInfo = s.peerInfo
var startFuts: seq[Future[void]] var startFuts: seq[Future[void]]
for t in s.transports:
let addrs = s.peerInfo.addrs.filterIt(
t.handles(it)
)
s.peerInfo.addrs.keepItIf(
it notin addrs
)
if addrs.len > 0:
startFuts.add(t.start(addrs))
await allFutures(startFuts)
for s in startFuts:
if s.failed:
info "Failed to start one transport", error=s.error.msg
for t in s.transports: # for each transport for t in s.transports: # for each transport
for i, a in s.peerInfo.addrs: if t.addrs.len > 0:
if t.handles(a): # check if it handles the multiaddr s.acceptFuts.add(s.accept(t))
let transpStart = t.start(a) s.peerInfo.addrs &= t.addrs
startFuts.add(transpStart)
try:
await transpStart
s.peerInfo.addrs[i] = t.ma # update peer's address
s.acceptFuts.add(s.accept(t))
except CancelledError as exc:
await s.stop()
raise exc
except CatchableError as exc:
debug "Failed to start one transport", address = $a, err = exc.msg
continue
debug "Started libp2p node", peer = s.peerInfo debug "Started libp2p node", peer = s.peerInfo
return startFuts # listen for incoming connections return startFuts # listen for incoming connections
proc newSwitch*(peerInfo: PeerInfo, proc newSwitch*(peerInfo: PeerInfo,
transports: seq[Transport], transports: seq[Transport],
identity: Identify, identity: Identify,

View File

@ -32,9 +32,10 @@ const
type type
TcpTransport* = ref object of Transport TcpTransport* = ref object of Transport
server*: StreamServer servers*: seq[StreamServer]
clients: array[Direction, seq[StreamTransport]] clients: array[Direction, seq[StreamTransport]]
flags: set[ServerFlags] flags: set[ServerFlags]
acceptFuts: seq[Future[StreamTransport]]
TcpTransportTracker* = ref object of TrackerBase TcpTransportTracker* = ref object of TrackerBase
opened*: uint64 opened*: uint64
@ -121,34 +122,42 @@ proc new*(
let transport = T( let transport = T(
flags: flags, flags: flags,
upgrader: upgrade upgrader: upgrade)
)
inc getTcpTransportTracker().opened inc getTcpTransportTracker().opened
return transport return transport
method start*( method start*(
self: TcpTransport, self: TcpTransport,
ma: MultiAddress) {.async.} = addrs: seq[MultiAddress]) {.async.} =
## listen on the transport ## listen on the transport
## ##
if self.running: if self.running:
trace "TCP transport already running" warn "TCP transport already running"
return return
await procCall Transport(self).start(ma) await procCall Transport(self).start(addrs)
trace "Starting TCP transport" trace "Starting TCP transport"
self.server = createStreamServer( for i, ma in addrs:
ma = self.ma, if not self.handles(ma):
flags = self.flags, trace "Invalid address detected, skipping!", address = ma
udata = self) continue
# always get the resolved address in case we're bound to 0.0.0.0:0 let server = createStreamServer(
self.ma = MultiAddress.init(self.server.sock.getLocalAddress()).tryGet() ma = ma,
flags = self.flags,
udata = self)
trace "Listening on", address = self.ma # always get the resolved address in case we're bound to 0.0.0.0:0
self.addrs[i] = MultiAddress.init(
server.sock.getLocalAddress()
).tryGet()
self.servers &= server
trace "Listening on", address = ma
method stop*(self: TcpTransport) {.async, gcsafe.} = method stop*(self: TcpTransport) {.async, gcsafe.} =
## stop the transport ## stop the transport
@ -163,11 +172,21 @@ method stop*(self: TcpTransport) {.async, gcsafe.} =
self.clients[Direction.In].mapIt(it.closeWait()) & self.clients[Direction.In].mapIt(it.closeWait()) &
self.clients[Direction.Out].mapIt(it.closeWait()))) self.clients[Direction.Out].mapIt(it.closeWait())))
# server can be nil var toWait: seq[Future[void]]
if not isNil(self.server): for fut in self.acceptFuts:
await self.server.closeWait() if not fut.finished:
toWait.add(fut.cancelAndWait())
elif fut.done:
toWait.add(fut.read().closeWait())
for server in self.servers:
server.stop()
toWait.add(server.closeWait())
await allFutures(toWait)
self.servers = @[]
self.server = nil
trace "Transport stopped" trace "Transport stopped"
inc getTcpTransportTracker().closed inc getTcpTransportTracker().closed
except CatchableError as exc: except CatchableError as exc:
@ -181,7 +200,19 @@ method accept*(self: TcpTransport): Future[Connection] {.async, gcsafe.} =
raise newTransportClosedError() raise newTransportClosedError()
try: try:
let transp = await self.server.accept() if self.acceptFuts.len <= 0:
self.acceptFuts = self.servers.mapIt(it.accept())
if self.acceptFuts.len <= 0:
return
let
finished = await one(self.acceptFuts)
index = self.acceptFuts.find(finished)
self.acceptFuts[index] = self.servers[index].accept()
let transp = await finished
return await self.connHandler(transp, Direction.In) return await self.connHandler(transp, Direction.In)
except TransportOsError as exc: except TransportOsError as exc:
# TODO: it doesn't sound like all OS errors # TODO: it doesn't sound like all OS errors
@ -193,6 +224,8 @@ method accept*(self: TcpTransport): Future[Connection] {.async, gcsafe.} =
except TransportUseClosedError as exc: except TransportUseClosedError as exc:
debug "Server was closed", exc = exc.msg debug "Server was closed", exc = exc.msg
raise newTransportClosedError(exc) raise newTransportClosedError(exc)
except CancelledError as exc:
raise
except CatchableError as exc: except CatchableError as exc:
debug "Unexpected error accepting connection", exc = exc.msg debug "Unexpected error accepting connection", exc = exc.msg
raise exc raise exc
@ -207,7 +240,11 @@ method dial*(
trace "Dialing remote peer", address = $address trace "Dialing remote peer", address = $address
let transp = await connect(address) let transp = await connect(address)
return await self.connHandler(transp, Direction.Out) try:
return await self.connHandler(transp, Direction.Out)
except CatchableError as err:
await transp.closeWait()
raise err
method handles*(t: TcpTransport, address: MultiAddress): bool {.gcsafe.} = method handles*(t: TcpTransport, address: MultiAddress): bool {.gcsafe.} =
if procCall Transport(t).handles(address): if procCall Transport(t).handles(address):

View File

@ -22,10 +22,11 @@ logScope:
type type
TransportError* = object of LPError TransportError* = object of LPError
TransportInvalidAddrError* = object of TransportError
TransportClosedError* = object of TransportError TransportClosedError* = object of TransportError
Transport* = ref object of RootObj Transport* = ref object of RootObj
ma*: Multiaddress addrs*: seq[Multiaddress]
running*: bool running*: bool
upgrader*: Upgrade upgrader*: Upgrade
@ -35,20 +36,20 @@ proc newTransportClosedError*(parent: ref Exception = nil): ref LPError =
method start*( method start*(
self: Transport, self: Transport,
ma: MultiAddress): Future[void] {.base, async.} = addrs: seq[MultiAddress]) {.base, async.} =
## start the transport ## start the transport
## ##
trace "starting transport", address = $ma trace "starting transport on addrs", address = $addrs
self.ma = ma self.addrs = addrs
self.running = true self.running = true
method stop*(self: Transport): Future[void] {.base, async.} = method stop*(self: Transport) {.base, async.} =
## stop and cleanup the transport ## stop and cleanup the transport
## including all outstanding connections ## including all outstanding connections
## ##
trace "stopping transport", address = $self.ma trace "stopping transport", address = $self.addrs
self.running = false self.running = false
method accept*(self: Transport): Future[Connection] method accept*(self: Transport): Future[Connection]

View File

@ -72,10 +72,12 @@ method closeImpl*(s: WsStream): Future[void] {.async.} =
type type
WsTransport* = ref object of Transport WsTransport* = ref object of Transport
httpserver: HttpServer httpservers: seq[HttpServer]
wsserver: WSServer wsserver: WSServer
connections: array[Direction, seq[WsStream]] connections: array[Direction, seq[WsStream]]
acceptFuts: seq[Future[HttpRequest]]
tlsPrivateKey: TLSPrivateKey tlsPrivateKey: TLSPrivateKey
tlsCertificate: TLSCertificate tlsCertificate: TLSCertificate
tlsFlags: set[TLSFlags] tlsFlags: set[TLSFlags]
@ -88,42 +90,55 @@ proc secure*(self: WsTransport): bool =
method start*( method start*(
self: WsTransport, self: WsTransport,
ma: MultiAddress) {.async.} = addrs: seq[MultiAddress]) {.async.} =
## listen on the transport ## listen on the transport
## ##
if self.running: if self.running:
trace "WS transport already running" warn "WS transport already running"
return return
await procCall Transport(self).start(ma) await procCall Transport(self).start(addrs)
trace "Starting WS transport" trace "Starting WS transport"
self.httpserver =
if self.secure:
TlsHttpServer.create(
address = self.ma.initTAddress().tryGet(),
tlsPrivateKey = self.tlsPrivateKey,
tlsCertificate = self.tlsCertificate,
flags = self.flags)
else:
HttpServer.create(self.ma.initTAddress().tryGet())
self.wsserver = WSServer.new( self.wsserver = WSServer.new(
factories = self.factories, factories = self.factories,
rng = self.rng) rng = self.rng)
let codec = if self.secure:
MultiAddress.init("/wss") for i, ma in addrs:
else: let isWss =
MultiAddress.init("/ws") if WSS.match(ma):
if self.secure: true
else:
warn "Trying to listen on a WSS address without setting the certificate!"
false
else: false
# always get the resolved address in case we're bound to 0.0.0.0:0 let httpserver =
self.ma = MultiAddress.init( if isWss:
self.httpserver.localAddress()).tryGet() & codec.tryGet() TlsHttpServer.create(
address = ma.initTAddress().tryGet(),
tlsPrivateKey = self.tlsPrivateKey,
tlsCertificate = self.tlsCertificate,
flags = self.flags)
else:
HttpServer.create(ma.initTAddress().tryGet())
self.httpservers &= httpserver
let codec = if isWss:
MultiAddress.init("/wss")
else:
MultiAddress.init("/ws")
# always get the resolved address in case we're bound to 0.0.0.0:0
self.addrs[i] = MultiAddress.init(
httpserver.localAddress()).tryGet() & codec.tryGet()
trace "Listening on", addresses = self.addrs
self.running = true self.running = true
trace "Listening on", address = self.ma
method stop*(self: WsTransport) {.async, gcsafe.} = method stop*(self: WsTransport) {.async, gcsafe.} =
## stop the transport ## stop the transport
@ -140,24 +155,33 @@ method stop*(self: WsTransport) {.async, gcsafe.} =
self.connections[Direction.In].mapIt(it.close()) & self.connections[Direction.In].mapIt(it.close()) &
self.connections[Direction.Out].mapIt(it.close()))) self.connections[Direction.Out].mapIt(it.close())))
# server can be nil var toWait: seq[Future[void]]
if not isNil(self.httpserver): for fut in self.acceptFuts:
self.httpserver.stop() if not fut.finished:
await self.httpserver.closeWait() toWait.add(fut.cancelAndWait())
elif fut.done:
toWait.add(fut.read().stream.closeWait())
self.httpserver = nil for server in self.httpservers:
server.stop()
toWait.add(server.closeWait())
await allFutures(toWait)
self.httpservers = @[]
trace "Transport stopped" trace "Transport stopped"
except CatchableError as exc: except CatchableError as exc:
trace "Error shutting down ws transport", exc = exc.msg trace "Error shutting down ws transport", exc = exc.msg
proc connHandler(self: WsTransport, proc connHandler(self: WsTransport,
stream: WsSession, stream: WsSession,
secure: bool,
dir: Direction): Future[Connection] {.async.} = dir: Direction): Future[Connection] {.async.} =
let observedAddr = let observedAddr =
try: try:
let let
codec = codec =
if self.secure: if secure:
MultiAddress.init("/wss") MultiAddress.init("/wss")
else: else:
MultiAddress.init("/ws") MultiAddress.init("/ws")
@ -189,11 +213,29 @@ method accept*(self: WsTransport): Future[Connection] {.async, gcsafe.} =
raise newTransportClosedError() raise newTransportClosedError()
try: try:
let if self.acceptFuts.len <= 0:
req = await self.httpserver.accept() self.acceptFuts = self.httpservers.mapIt(it.accept())
wstransp = await self.wsserver.handleRequest(req)
return await self.connHandler(wstransp, Direction.In) if self.acceptFuts.len <= 0:
return
let
finished = await one(self.acceptFuts)
index = self.acceptFuts.find(finished)
self.acceptFuts[index] = self.httpservers[index].accept()
let req = await finished
try:
let
wstransp = await self.wsserver.handleRequest(req)
isSecure = self.httpservers[index].secure
return await self.connHandler(wstransp, isSecure, Direction.In)
except CatchableError as exc:
await req.stream.closeWait()
raise exc
except TransportOsError as exc: except TransportOsError as exc:
debug "OS Error", exc = exc.msg debug "OS Error", exc = exc.msg
except TransportTooManyError as exc: except TransportTooManyError as exc:
@ -201,6 +243,9 @@ method accept*(self: WsTransport): Future[Connection] {.async, gcsafe.} =
except TransportUseClosedError as exc: except TransportUseClosedError as exc:
debug "Server was closed", exc = exc.msg debug "Server was closed", exc = exc.msg
raise newTransportClosedError(exc) raise newTransportClosedError(exc)
except CancelledError as exc:
# bubble up silently
raise exc
except CatchableError as exc: except CatchableError as exc:
warn "Unexpected error accepting connection", exc = exc.msg warn "Unexpected error accepting connection", exc = exc.msg
raise exc raise exc
@ -223,7 +268,11 @@ method dial*(
hostName = hostname, hostName = hostname,
flags = self.tlsFlags) flags = self.tlsFlags)
return await self.connHandler(transp, Direction.Out) try:
return await self.connHandler(transp, secure, Direction.Out)
except CatchableError as exc:
await transp.close()
raise exc
method handles*(t: WsTransport, address: MultiAddress): bool {.gcsafe.} = method handles*(t: WsTransport, address: MultiAddress): bool {.gcsafe.} =
if procCall Transport(t).handles(address): if procCall Transport(t).handles(address):

View File

@ -19,14 +19,14 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
checkTrackers() checkTrackers()
asyncTest "can handle local address": asyncTest "can handle local address":
let ma: MultiAddress = Multiaddress.init(ma).tryGet() let ma = @[Multiaddress.init(ma).tryGet()]
let transport1 = prov() let transport1 = prov()
await transport1.start(ma) await transport1.start(ma)
check transport1.handles(transport1.ma) check transport1.handles(transport1.addrs[0])
await transport1.stop() await transport1.stop()
asyncTest "e2e: handle observedAddr": asyncTest "e2e: handle observedAddr":
let ma: MultiAddress = Multiaddress.init(ma).tryGet() let ma = @[Multiaddress.init(ma).tryGet()]
let transport1 = prov() let transport1 = prov()
await transport1.start(ma) await transport1.start(ma)
@ -40,7 +40,7 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
let handlerWait = acceptHandler() let handlerWait = acceptHandler()
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.addrs[0])
check transport2.handles(conn.observedAddr) check transport2.handles(conn.observedAddr)
@ -54,7 +54,7 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
await handlerWait.wait(1.seconds) # when no issues will not wait that long! await handlerWait.wait(1.seconds) # when no issues will not wait that long!
asyncTest "e2e: handle write": asyncTest "e2e: handle write":
let ma: MultiAddress = Multiaddress.init(ma).tryGet() let ma = @[Multiaddress.init(ma).tryGet()]
let transport1 = prov() let transport1 = prov()
await transport1.start(ma) await transport1.start(ma)
@ -67,7 +67,7 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
let handlerWait = acceptHandler() let handlerWait = acceptHandler()
let transport2 = prov() let transport2 = prov()
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.addrs[0])
var msg = newSeq[byte](6) var msg = newSeq[byte](6)
await conn.readExactly(addr msg[0], 6) await conn.readExactly(addr msg[0], 6)
@ -82,7 +82,7 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
await handlerWait.wait(1.seconds) # when no issues will not wait that long! await handlerWait.wait(1.seconds) # when no issues will not wait that long!
asyncTest "e2e: handle read": asyncTest "e2e: handle read":
let ma: MultiAddress = Multiaddress.init(ma).tryGet() let ma = @[Multiaddress.init(ma).tryGet()]
let transport1 = prov() let transport1 = prov()
await transport1.start(ma) await transport1.start(ma)
@ -96,7 +96,7 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
let handlerWait = acceptHandler() let handlerWait = acceptHandler()
let transport2 = prov() let transport2 = prov()
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.addrs[0])
await conn.write("Hello!") await conn.write("Hello!")
await conn.close() #for some protocols, closing requires actively reading, so we must close here await conn.close() #for some protocols, closing requires actively reading, so we must close here
@ -108,13 +108,13 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
transport2.stop())) transport2.stop()))
asyncTest "e2e: handle dial cancellation": asyncTest "e2e: handle dial cancellation":
let ma: MultiAddress = Multiaddress.init(ma).tryGet() let ma = @[Multiaddress.init(ma).tryGet()]
let transport1 = prov() let transport1 = prov()
await transport1.start(ma) await transport1.start(ma)
let transport2 = prov() let transport2 = prov()
let cancellation = transport2.dial(transport1.ma) let cancellation = transport2.dial(transport1.addrs[0])
await cancellation.cancelAndWait() await cancellation.cancelAndWait()
check cancellation.cancelled check cancellation.cancelled
@ -125,7 +125,7 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
transport2.stop())) transport2.stop()))
asyncTest "e2e: handle accept cancellation": asyncTest "e2e: handle accept cancellation":
let ma: MultiAddress = Multiaddress.init(ma).tryGet() let ma = @[Multiaddress.init(ma).tryGet()]
let transport1 = prov() let transport1 = prov()
await transport1.start(ma) await transport1.start(ma)
@ -136,8 +136,57 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
await transport1.stop() await transport1.stop()
asyncTest "e2e should allow multiple local addresses":
let addrs = @[MultiAddress.init(ma).tryGet(),
MultiAddress.init(ma).tryGet()]
let transport1 = prov()
await transport1.start(addrs)
proc acceptHandler() {.async, gcsafe.} =
while true:
let conn = await transport1.accept()
await conn.write("Hello!")
await conn.close()
let handlerWait = acceptHandler()
check transport1.addrs.len == 2
check transport1.addrs[0] != transport1.addrs[1]
var msg = newSeq[byte](6)
proc client(ma: MultiAddress) {.async.} =
let conn1 = await transport1.dial(ma)
await conn1.readExactly(addr msg[0], 6)
check string.fromBytes(msg) == "Hello!"
await conn1.close()
#Dial the same server multiple time in a row
await client(transport1.addrs[0])
await client(transport1.addrs[0])
await client(transport1.addrs[0])
#Dial the same server on different addresses
await client(transport1.addrs[1])
await client(transport1.addrs[0])
await client(transport1.addrs[1])
#Cancel a dial
#TODO add back once chronos fixes cancellation
#let
# dial1 = transport1.dial(transport1.addrs[1])
# dial2 = transport1.dial(transport1.addrs[0])
#await dial1.cancelAndWait()
#await dial2.cancelAndWait()
await handlerWait.cancelAndWait()
await transport1.stop()
asyncTest "e2e: stopping transport kills connections": asyncTest "e2e: stopping transport kills connections":
let ma: MultiAddress = Multiaddress.init(ma).tryGet() let ma = @[Multiaddress.init(ma).tryGet()]
let transport1 = prov() let transport1 = prov()
await transport1.start(ma) await transport1.start(ma)
@ -145,7 +194,7 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
let transport2 = prov() let transport2 = prov()
let acceptHandler = transport1.accept() let acceptHandler = transport1.accept()
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.addrs[0])
let serverConn = await acceptHandler let serverConn = await acceptHandler
await allFuturesThrowing( await allFuturesThrowing(
@ -157,7 +206,7 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
check conn.closed() check conn.closed()
asyncTest "read or write on closed connection": asyncTest "read or write on closed connection":
let ma: MultiAddress = Multiaddress.init(ma).tryGet() let ma = @[Multiaddress.init(ma).tryGet()]
let transport1 = prov() let transport1 = prov()
await transport1.start(ma) await transport1.start(ma)
@ -167,7 +216,7 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
let handlerWait = acceptHandler() let handlerWait = acceptHandler()
let conn = await transport1.dial(transport1.ma) let conn = await transport1.dial(transport1.addrs[0])
var msg = newSeq[byte](6) var msg = newSeq[byte](6)
try: try:

View File

@ -22,7 +22,7 @@ suite "Identify":
suite "handle identify message": suite "handle identify message":
var var
ma {.threadvar.}: MultiAddress ma {.threadvar.}: seq[MultiAddress]
remoteSecKey {.threadvar.}: PrivateKey remoteSecKey {.threadvar.}: PrivateKey
remotePeerInfo {.threadvar.}: PeerInfo remotePeerInfo {.threadvar.}: PeerInfo
serverFut {.threadvar.}: Future[void] serverFut {.threadvar.}: Future[void]
@ -36,10 +36,12 @@ suite "Identify":
conn {.threadvar.}: Connection conn {.threadvar.}: Connection
asyncSetup: asyncSetup:
ma = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
remoteSecKey = PrivateKey.random(ECDSA, rng[]).get() remoteSecKey = PrivateKey.random(ECDSA, rng[]).get()
remotePeerInfo = PeerInfo.new( remotePeerInfo = PeerInfo.new(
remoteSecKey, [ma], ["/test/proto1/1.0.0", "/test/proto2/1.0.0"]) remoteSecKey,
ma,
["/test/proto1/1.0.0", "/test/proto2/1.0.0"])
transport1 = TcpTransport.new(upgrade = Upgrade()) transport1 = TcpTransport.new(upgrade = Upgrade())
transport2 = TcpTransport.new(upgrade = Upgrade()) transport2 = TcpTransport.new(upgrade = Upgrade())
@ -65,13 +67,13 @@ suite "Identify":
await msListen.handle(c) await msListen.handle(c)
acceptFut = acceptHandler() acceptFut = acceptHandler()
conn = await transport2.dial(transport1.ma) conn = await transport2.dial(transport1.addrs[0])
discard await msDial.select(conn, IdentifyCodec) discard await msDial.select(conn, IdentifyCodec)
let id = await identifyProto2.identify(conn, remotePeerInfo.peerId) let id = await identifyProto2.identify(conn, remotePeerInfo.peerId)
check id.pubKey.get() == remoteSecKey.getPublicKey().get() check id.pubKey.get() == remoteSecKey.getPublicKey().get()
check id.addrs[0] == ma check id.addrs == ma
check id.protoVersion.get() == ProtoVersion check id.protoVersion.get() == ProtoVersion
check id.agentVersion.get() == AgentVersion check id.agentVersion.get() == AgentVersion
check id.protos == @["/test/proto1/1.0.0", "/test/proto2/1.0.0"] check id.protos == @["/test/proto1/1.0.0", "/test/proto2/1.0.0"]
@ -88,13 +90,13 @@ suite "Identify":
await msListen.handle(c) await msListen.handle(c)
acceptFut = acceptHandler() acceptFut = acceptHandler()
conn = await transport2.dial(transport1.ma) conn = await transport2.dial(transport1.addrs[0])
discard await msDial.select(conn, IdentifyCodec) discard await msDial.select(conn, IdentifyCodec)
let id = await identifyProto2.identify(conn, remotePeerInfo.peerId) let id = await identifyProto2.identify(conn, remotePeerInfo.peerId)
check id.pubKey.get() == remoteSecKey.getPublicKey().get() check id.pubKey.get() == remoteSecKey.getPublicKey().get()
check id.addrs[0] == ma check id.addrs == ma
check id.protoVersion.get() == ProtoVersion check id.protoVersion.get() == ProtoVersion
check id.agentVersion.get() == customAgentVersion check id.agentVersion.get() == customAgentVersion
check id.protos == @["/test/proto1/1.0.0", "/test/proto2/1.0.0"] check id.protos == @["/test/proto1/1.0.0", "/test/proto2/1.0.0"]
@ -114,7 +116,7 @@ suite "Identify":
await conn.close() await conn.close()
acceptFut = acceptHandler() acceptFut = acceptHandler()
conn = await transport2.dial(transport1.ma) conn = await transport2.dial(transport1.addrs[0])
expect IdentityNoMatchError: expect IdentityNoMatchError:
let pi2 = PeerInfo.new(PrivateKey.random(ECDSA, rng[]).get()) let pi2 = PeerInfo.new(PrivateKey.random(ECDSA, rng[]).get())
@ -147,7 +149,10 @@ suite "Identify":
awaiters.add(await switch1.start()) awaiters.add(await switch1.start())
awaiters.add(await switch2.start()) awaiters.add(await switch2.start())
conn = await switch2.dial(switch1.peerInfo.peerId, switch1.peerInfo.addrs, IdentifyPushCodec) conn = await switch2.dial(
switch1.peerInfo.peerId,
switch1.peerInfo.addrs,
IdentifyPushCodec)
check: check:
switch1.peerStore.addressBook.get(switch2.peerInfo.peerId) == switch2.peerInfo.addrs.toHashSet() switch1.peerStore.addressBook.get(switch2.peerInfo.peerId) == switch2.peerInfo.addrs.toHashSet()

View File

@ -378,7 +378,7 @@ suite "Mplex":
suite "mplex e2e": suite "mplex e2e":
asyncTest "read/write receiver": asyncTest "read/write receiver":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let listenFut = transport1.start(ma) let listenFut = transport1.start(ma)
@ -397,7 +397,7 @@ suite "Mplex":
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.addrs[0])
let mplexDial = Mplex.new(conn) let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle() let mplexDialFut = mplexDial.handle()
@ -415,7 +415,7 @@ suite "Mplex":
await listenFut await listenFut
asyncTest "read/write receiver lazy": asyncTest "read/write receiver lazy":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let listenFut = transport1.start(ma) let listenFut = transport1.start(ma)
@ -434,7 +434,7 @@ suite "Mplex":
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.addrs[0])
let mplexDial = Mplex.new(conn) let mplexDial = Mplex.new(conn)
let stream = await mplexDial.newStream(lazy = true) let stream = await mplexDial.newStream(lazy = true)
@ -454,7 +454,7 @@ suite "Mplex":
asyncTest "write fragmented": asyncTest "write fragmented":
let let
ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
listenJob = newFuture[void]() listenJob = newFuture[void]()
var bigseq = newSeqOfCap[uint8](MaxMsgSize * 2) var bigseq = newSeqOfCap[uint8](MaxMsgSize * 2)
@ -486,7 +486,7 @@ suite "Mplex":
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.addrs[0])
let mplexDial = Mplex.new(conn) let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle() let mplexDialFut = mplexDial.handle()
@ -506,7 +506,7 @@ suite "Mplex":
await listenFut await listenFut
asyncTest "read/write initiator": asyncTest "read/write initiator":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let listenFut = transport1.start(ma) let listenFut = transport1.start(ma)
@ -523,7 +523,7 @@ suite "Mplex":
await mplexListen.close() await mplexListen.close()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.addrs[0])
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let mplexDial = Mplex.new(conn) let mplexDial = Mplex.new(conn)
@ -542,7 +542,7 @@ suite "Mplex":
await listenFut await listenFut
asyncTest "multiple streams": asyncTest "multiple streams":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport1 = TcpTransport.new(upgrade = Upgrade()) let transport1 = TcpTransport.new(upgrade = Upgrade())
let listenFut = transport1.start(ma) let listenFut = transport1.start(ma)
@ -565,7 +565,7 @@ suite "Mplex":
await mplexListen.close() await mplexListen.close()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.addrs[0])
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let mplexDial = Mplex.new(conn) let mplexDial = Mplex.new(conn)
@ -586,7 +586,7 @@ suite "Mplex":
await listenFut await listenFut
asyncTest "multiple read/write streams": asyncTest "multiple read/write streams":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let listenFut = transport1.start(ma) let listenFut = transport1.start(ma)
@ -610,7 +610,7 @@ suite "Mplex":
await mplexListen.close() await mplexListen.close()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.addrs[0])
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let mplexDial = Mplex.new(conn) let mplexDial = Mplex.new(conn)
@ -633,7 +633,7 @@ suite "Mplex":
await listenFut await listenFut
asyncTest "channel closes listener with EOF": asyncTest "channel closes listener with EOF":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport1 = TcpTransport.new(upgrade = Upgrade()) let transport1 = TcpTransport.new(upgrade = Upgrade())
var listenStreams: seq[Connection] var listenStreams: seq[Connection]
@ -658,7 +658,7 @@ suite "Mplex":
await transport1.start(ma) await transport1.start(ma)
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.addrs[0])
let mplexDial = Mplex.new(conn) let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle() let mplexDialFut = mplexDial.handle()
@ -681,7 +681,7 @@ suite "Mplex":
await acceptFut await acceptFut
asyncTest "channel closes dialer with EOF": asyncTest "channel closes dialer with EOF":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport1 = TcpTransport.new(upgrade = Upgrade()) let transport1 = TcpTransport.new(upgrade = Upgrade())
var count = 0 var count = 0
@ -706,7 +706,7 @@ suite "Mplex":
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.addrs[0])
let mplexDial = Mplex.new(conn) let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle() let mplexDialFut = mplexDial.handle()
@ -746,7 +746,7 @@ suite "Mplex":
await acceptFut await acceptFut
asyncTest "dialing mplex closes both ends": asyncTest "dialing mplex closes both ends":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport1 = TcpTransport.new(upgrade = Upgrade()) let transport1 = TcpTransport.new(upgrade = Upgrade())
var listenStreams: seq[Connection] var listenStreams: seq[Connection]
@ -765,7 +765,7 @@ suite "Mplex":
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.addrs[0])
let mplexDial = Mplex.new(conn) let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle() let mplexDialFut = mplexDial.handle()
@ -788,7 +788,7 @@ suite "Mplex":
await acceptFut await acceptFut
asyncTest "listening mplex closes both ends": asyncTest "listening mplex closes both ends":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport1 = TcpTransport.new(upgrade = Upgrade()) let transport1 = TcpTransport.new(upgrade = Upgrade())
var mplexListen: Mplex var mplexListen: Mplex
@ -808,7 +808,7 @@ suite "Mplex":
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.addrs[0])
let mplexDial = Mplex.new(conn) let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle() let mplexDialFut = mplexDial.handle()
@ -831,7 +831,7 @@ suite "Mplex":
await acceptFut await acceptFut
asyncTest "canceling mplex handler closes both ends": asyncTest "canceling mplex handler closes both ends":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport1 = TcpTransport.new(upgrade = Upgrade()) let transport1 = TcpTransport.new(upgrade = Upgrade())
var mplexHandle: Future[void] var mplexHandle: Future[void]
@ -852,7 +852,7 @@ suite "Mplex":
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.addrs[0])
let mplexDial = Mplex.new(conn) let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle() let mplexDialFut = mplexDial.handle()
@ -874,7 +874,7 @@ suite "Mplex":
transport2.stop()) transport2.stop())
asyncTest "closing dialing connection should close both ends": asyncTest "closing dialing connection should close both ends":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport1 = TcpTransport.new(upgrade = Upgrade()) let transport1 = TcpTransport.new(upgrade = Upgrade())
var listenStreams: seq[Connection] var listenStreams: seq[Connection]
@ -893,7 +893,7 @@ suite "Mplex":
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.addrs[0])
let mplexDial = Mplex.new(conn) let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle() let mplexDialFut = mplexDial.handle()
@ -916,7 +916,7 @@ suite "Mplex":
await acceptFut await acceptFut
asyncTest "canceling listening connection should close both ends": asyncTest "canceling listening connection should close both ends":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport1 = TcpTransport.new(upgrade = Upgrade()) let transport1 = TcpTransport.new(upgrade = Upgrade())
var listenConn: Connection var listenConn: Connection
@ -936,7 +936,7 @@ suite "Mplex":
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.addrs[0])
let mplexDial = Mplex.new(conn) let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle() let mplexDialFut = mplexDial.handle()
@ -961,7 +961,7 @@ suite "Mplex":
suite "jitter": suite "jitter":
asyncTest "channel should be able to handle erratic read/writes": asyncTest "channel should be able to handle erratic read/writes":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let listenFut = transport1.start(ma) let listenFut = transport1.start(ma)
@ -985,7 +985,7 @@ suite "Mplex":
await mplexListen.close() await mplexListen.close()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.addrs[0])
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let mplexDial = Mplex.new(conn) let mplexDial = Mplex.new(conn)
@ -1033,7 +1033,7 @@ suite "Mplex":
await listenFut await listenFut
asyncTest "channel should handle 1 byte read/write": asyncTest "channel should handle 1 byte read/write":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let listenFut = transport1.start(ma) let listenFut = transport1.start(ma)
@ -1054,7 +1054,7 @@ suite "Mplex":
await mplexListen.close() await mplexListen.close()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.addrs[0])
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let mplexDial = Mplex.new(conn) let mplexDial = Mplex.new(conn)

View File

@ -234,7 +234,7 @@ suite "Multistream select":
await ms.handle(conn) await ms.handle(conn)
asyncTest "e2e - handle": asyncTest "e2e - handle":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
var protocol: LPProtocol = new LPProtocol var protocol: LPProtocol = new LPProtocol
proc testHandler(conn: Connection, proc testHandler(conn: Connection,
@ -260,7 +260,7 @@ suite "Multistream select":
let msDial = MultistreamSelect.new() let msDial = MultistreamSelect.new()
let transport2 = TcpTransport.new(upgrade = Upgrade()) let transport2 = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.addrs[0])
check (await msDial.select(conn, "/test/proto/1.0.0")) == true check (await msDial.select(conn, "/test/proto/1.0.0")) == true
@ -274,7 +274,7 @@ suite "Multistream select":
await handlerWait.wait(30.seconds) await handlerWait.wait(30.seconds)
asyncTest "e2e - ls": asyncTest "e2e - ls":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let let
handlerWait = newFuture[void]() handlerWait = newFuture[void]()
@ -312,7 +312,7 @@ suite "Multistream select":
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let msDial = MultistreamSelect.new() let msDial = MultistreamSelect.new()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.addrs[0])
let ls = await msDial.list(conn) let ls = await msDial.list(conn)
let protos: seq[string] = @["/test/proto1/1.0.0", "/test/proto2/1.0.0"] let protos: seq[string] = @["/test/proto1/1.0.0", "/test/proto2/1.0.0"]
@ -326,7 +326,7 @@ suite "Multistream select":
await listenFut.wait(5.seconds) await listenFut.wait(5.seconds)
asyncTest "e2e - select one from a list with unsupported protos": asyncTest "e2e - select one from a list with unsupported protos":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
var protocol: LPProtocol = new LPProtocol var protocol: LPProtocol = new LPProtocol
proc testHandler(conn: Connection, proc testHandler(conn: Connection,
@ -350,7 +350,7 @@ suite "Multistream select":
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let msDial = MultistreamSelect.new() let msDial = MultistreamSelect.new()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.addrs[0])
check (await msDial.select(conn, check (await msDial.select(conn,
@["/test/proto/1.0.0", "/test/no/proto/1.0.0"])) == "/test/proto/1.0.0" @["/test/proto/1.0.0", "/test/no/proto/1.0.0"])) == "/test/proto/1.0.0"
@ -364,7 +364,7 @@ suite "Multistream select":
await transport1.stop() await transport1.stop()
asyncTest "e2e - select one with both valid": asyncTest "e2e - select one with both valid":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
var protocol: LPProtocol = new LPProtocol var protocol: LPProtocol = new LPProtocol
proc testHandler(conn: Connection, proc testHandler(conn: Connection,
@ -388,7 +388,7 @@ suite "Multistream select":
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let msDial = MultistreamSelect.new() let msDial = MultistreamSelect.new()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.addrs[0])
check (await msDial.select(conn, check (await msDial.select(conn,
@[ @[

View File

@ -88,9 +88,9 @@ suite "Noise":
asyncTest "e2e: handle write + noise": asyncTest "e2e: handle write + noise":
let let
server = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() server = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
serverPrivKey = PrivateKey.random(ECDSA, rng[]).get() serverPrivKey = PrivateKey.random(ECDSA, rng[]).get()
serverInfo = PeerInfo.new(serverPrivKey, [server]) serverInfo = PeerInfo.new(serverPrivKey, server)
serverNoise = Noise.new(rng, serverPrivKey, outgoing = false) serverNoise = Noise.new(rng, serverPrivKey, outgoing = false)
let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
@ -109,9 +109,9 @@ suite "Noise":
acceptFut = acceptHandler() acceptFut = acceptHandler()
transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
clientPrivKey = PrivateKey.random(ECDSA, rng[]).get() clientPrivKey = PrivateKey.random(ECDSA, rng[]).get()
clientInfo = PeerInfo.new(clientPrivKey, [transport1.ma]) clientInfo = PeerInfo.new(clientPrivKey, transport1.addrs)
clientNoise = Noise.new(rng, clientPrivKey, outgoing = true) clientNoise = Noise.new(rng, clientPrivKey, outgoing = true)
conn = await transport2.dial(transport1.ma) conn = await transport2.dial(transport1.addrs[0])
conn.peerId = serverInfo.peerId conn.peerId = serverInfo.peerId
let sconn = await clientNoise.secure(conn, true) let sconn = await clientNoise.secure(conn, true)
@ -129,9 +129,9 @@ suite "Noise":
asyncTest "e2e: handle write + noise (wrong prologue)": asyncTest "e2e: handle write + noise (wrong prologue)":
let let
server = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() server = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
serverPrivKey = PrivateKey.random(ECDSA, rng[]).get() serverPrivKey = PrivateKey.random(ECDSA, rng[]).get()
serverInfo = PeerInfo.new(serverPrivKey, [server]) serverInfo = PeerInfo.new(serverPrivKey, server)
serverNoise = Noise.new(rng, serverPrivKey, outgoing = false) serverNoise = Noise.new(rng, serverPrivKey, outgoing = false)
let let
@ -153,9 +153,9 @@ suite "Noise":
handlerWait = acceptHandler() handlerWait = acceptHandler()
transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
clientPrivKey = PrivateKey.random(ECDSA, rng[]).get() clientPrivKey = PrivateKey.random(ECDSA, rng[]).get()
clientInfo = PeerInfo.new(clientPrivKey, [transport1.ma]) clientInfo = PeerInfo.new(clientPrivKey, transport1.addrs)
clientNoise = Noise.new(rng, clientPrivKey, outgoing = true, commonPrologue = @[1'u8, 2'u8, 3'u8]) clientNoise = Noise.new(rng, clientPrivKey, outgoing = true, commonPrologue = @[1'u8, 2'u8, 3'u8])
conn = await transport2.dial(transport1.ma) conn = await transport2.dial(transport1.addrs[0])
conn.peerId = serverInfo.peerId conn.peerId = serverInfo.peerId
var sconn: Connection = nil var sconn: Connection = nil
@ -169,9 +169,9 @@ suite "Noise":
asyncTest "e2e: handle read + noise": asyncTest "e2e: handle read + noise":
let let
server = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() server = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
serverPrivKey = PrivateKey.random(ECDSA, rng[]).get() serverPrivKey = PrivateKey.random(ECDSA, rng[]).get()
serverInfo = PeerInfo.new(serverPrivKey, [server]) serverInfo = PeerInfo.new(serverPrivKey, server)
serverNoise = Noise.new(rng, serverPrivKey, outgoing = false) serverNoise = Noise.new(rng, serverPrivKey, outgoing = false)
readTask = newFuture[void]() readTask = newFuture[void]()
@ -193,9 +193,9 @@ suite "Noise":
acceptFut = acceptHandler() acceptFut = acceptHandler()
transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
clientPrivKey = PrivateKey.random(ECDSA, rng[]).get() clientPrivKey = PrivateKey.random(ECDSA, rng[]).get()
clientInfo = PeerInfo.new(clientPrivKey, [transport1.ma]) clientInfo = PeerInfo.new(clientPrivKey, transport1.addrs)
clientNoise = Noise.new(rng, clientPrivKey, outgoing = true) clientNoise = Noise.new(rng, clientPrivKey, outgoing = true)
conn = await transport2.dial(transport1.ma) conn = await transport2.dial(transport1.addrs[0])
conn.peerId = serverInfo.peerId conn.peerId = serverInfo.peerId
let sconn = await clientNoise.secure(conn, true) let sconn = await clientNoise.secure(conn, true)
@ -208,9 +208,9 @@ suite "Noise":
asyncTest "e2e: handle read + noise fragmented": asyncTest "e2e: handle read + noise fragmented":
let let
server = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() server = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
serverPrivKey = PrivateKey.random(ECDSA, rng[]).get() serverPrivKey = PrivateKey.random(ECDSA, rng[]).get()
serverInfo = PeerInfo.new(serverPrivKey, [server]) serverInfo = PeerInfo.new(serverPrivKey, server)
serverNoise = Noise.new(rng, serverPrivKey, outgoing = false) serverNoise = Noise.new(rng, serverPrivKey, outgoing = false)
readTask = newFuture[void]() readTask = newFuture[void]()
@ -235,9 +235,9 @@ suite "Noise":
acceptFut = acceptHandler() acceptFut = acceptHandler()
transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
clientPrivKey = PrivateKey.random(ECDSA, rng[]).get() clientPrivKey = PrivateKey.random(ECDSA, rng[]).get()
clientInfo = PeerInfo.new(clientPrivKey, [transport1.ma]) clientInfo = PeerInfo.new(clientPrivKey, transport1.addrs)
clientNoise = Noise.new(rng, clientPrivKey, outgoing = true) clientNoise = Noise.new(rng, clientPrivKey, outgoing = true)
conn = await transport2.dial(transport1.ma) conn = await transport2.dial(transport1.addrs[0])
conn.peerId = serverInfo.peerId conn.peerId = serverInfo.peerId
let sconn = await clientNoise.secure(conn, true) let sconn = await clientNoise.secure(conn, true)
@ -252,8 +252,8 @@ suite "Noise":
await listenFut await listenFut
asyncTest "e2e use switch dial proto string": asyncTest "e2e use switch dial proto string":
let ma1: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma1 = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let ma2: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma2 = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
var peerInfo1, peerInfo2: PeerInfo var peerInfo1, peerInfo2: PeerInfo
var switch1, switch2: Switch var switch1, switch2: Switch
@ -280,8 +280,8 @@ suite "Noise":
await allFuturesThrowing(awaiters) await allFuturesThrowing(awaiters)
asyncTest "e2e test wrong secure negotiation": asyncTest "e2e test wrong secure negotiation":
let ma1: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma1 = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let ma2: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma2 = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
var peerInfo1, peerInfo2: PeerInfo var peerInfo1, peerInfo2: PeerInfo
var switch1, switch2: Switch var switch1, switch2: Switch

View File

@ -56,13 +56,13 @@ suite "Ping":
asyncTest "simple ping": asyncTest "simple ping":
msListen.addHandler(PingCodec, pingProto1) msListen.addHandler(PingCodec, pingProto1)
serverFut = transport1.start(ma) serverFut = transport1.start(@[ma])
proc acceptHandler(): Future[void] {.async, gcsafe.} = proc acceptHandler(): Future[void] {.async, gcsafe.} =
let c = await transport1.accept() let c = await transport1.accept()
await msListen.handle(c) await msListen.handle(c)
acceptFut = acceptHandler() acceptFut = acceptHandler()
conn = await transport2.dial(transport1.ma) conn = await transport2.dial(transport1.addrs[0])
discard await msDial.select(conn, PingCodec) discard await msDial.select(conn, PingCodec)
let time = await pingProto2.ping(conn) let time = await pingProto2.ping(conn)
@ -71,14 +71,14 @@ suite "Ping":
asyncTest "ping callback": asyncTest "ping callback":
msDial.addHandler(PingCodec, pingProto2) msDial.addHandler(PingCodec, pingProto2)
serverFut = transport1.start(ma) serverFut = transport1.start(@[ma])
proc acceptHandler(): Future[void] {.async, gcsafe.} = proc acceptHandler(): Future[void] {.async, gcsafe.} =
let c = await transport1.accept() let c = await transport1.accept()
discard await msListen.select(c, PingCodec) discard await msListen.select(c, PingCodec)
discard await pingProto1.ping(c) discard await pingProto1.ping(c)
acceptFut = acceptHandler() acceptFut = acceptHandler()
conn = await transport2.dial(transport1.ma) conn = await transport2.dial(transport1.addrs[0])
await msDial.handle(conn) await msDial.handle(conn)
check pingReceivedCount == 1 check pingReceivedCount == 1
@ -96,13 +96,13 @@ suite "Ping":
fakePingProto.handler = fakeHandle fakePingProto.handler = fakeHandle
msListen.addHandler(PingCodec, fakePingProto) msListen.addHandler(PingCodec, fakePingProto)
serverFut = transport1.start(ma) serverFut = transport1.start(@[ma])
proc acceptHandler(): Future[void] {.async, gcsafe.} = proc acceptHandler(): Future[void] {.async, gcsafe.} =
let c = await transport1.accept() let c = await transport1.accept()
await msListen.handle(c) await msListen.handle(c)
acceptFut = acceptHandler() acceptFut = acceptHandler()
conn = await transport2.dial(transport1.ma) conn = await transport2.dial(transport1.addrs[0])
discard await msDial.select(conn, PingCodec) discard await msDial.select(conn, PingCodec)
let p = pingProto2.ping(conn) let p = pingProto2.ping(conn)

View File

@ -625,7 +625,7 @@ suite "Switch":
# for most of the steps in the upgrade flow - # for most of the steps in the upgrade flow -
# this is just a basic test for dials # this is just a basic test for dials
asyncTest "e2e canceling dial should not leak": asyncTest "e2e canceling dial should not leak":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport = TcpTransport.new(upgrade = Upgrade()) let transport = TcpTransport.new(upgrade = Upgrade())
await transport.start(ma) await transport.start(ma)
@ -645,7 +645,7 @@ suite "Switch":
awaiters.add(await switch.start()) awaiters.add(await switch.start())
var peerId = PeerID.init(PrivateKey.random(ECDSA, rng[]).get()).get() var peerId = PeerID.init(PrivateKey.random(ECDSA, rng[]).get()).get()
let connectFut = switch.connect(peerId, @[transport.ma]) let connectFut = switch.connect(peerId, transport.addrs)
await sleepAsync(500.millis) await sleepAsync(500.millis)
connectFut.cancel() connectFut.cancel()
await handlerWait await handlerWait
@ -662,7 +662,7 @@ suite "Switch":
await allFuturesThrowing(awaiters) await allFuturesThrowing(awaiters)
asyncTest "e2e closing remote conn should not leak": asyncTest "e2e closing remote conn should not leak":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport = TcpTransport.new(upgrade = Upgrade()) let transport = TcpTransport.new(upgrade = Upgrade())
await transport.start(ma) await transport.start(ma)
@ -679,7 +679,7 @@ suite "Switch":
var peerId = PeerID.init(PrivateKey.random(ECDSA, rng[]).get()).get() var peerId = PeerID.init(PrivateKey.random(ECDSA, rng[]).get()).get()
expect LPStreamClosedError: expect LPStreamClosedError:
await switch.connect(peerId, @[transport.ma]) await switch.connect(peerId, transport.addrs)
await handlerWait await handlerWait
@ -905,6 +905,73 @@ suite "Switch":
switch1.peerStore.protoBook.get(switch2.peerInfo.peerId) == switch2.peerInfo.protocols.toHashSet() switch1.peerStore.protoBook.get(switch2.peerInfo.peerId) == switch2.peerInfo.protocols.toHashSet()
switch2.peerStore.protoBook.get(switch1.peerInfo.peerId) == switch1.peerInfo.protocols.toHashSet() switch2.peerStore.protoBook.get(switch1.peerInfo.peerId) == switch1.peerInfo.protocols.toHashSet()
asyncTest "e2e should allow multiple local addresses":
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
try:
let msg = string.fromBytes(await conn.readLp(1024))
check "Hello!" == msg
await conn.writeLp("Hello!")
finally:
await conn.close()
let testProto = new TestProto
testProto.codec = TestCodec
testProto.handler = handle
let addrs = @[MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
MultiAddress.init("/ip6/::1/tcp/0").tryGet()]
let switch1 = newStandardSwitch(
addrs = addrs,
transportFlags = {ServerFlags.ReuseAddr, ServerFlags.ReusePort})
switch1.mount(testProto)
let switch2 = newStandardSwitch()
let switch3 = newStandardSwitch(
addrs = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet()
)
await allFuturesThrowing(
switch1.start(),
switch2.start(),
switch3.start())
check IP4.matchPartial(switch1.peerInfo.addrs[0])
check IP6.matchPartial(switch1.peerInfo.addrs[1])
let conn = await switch2.dial(
switch1.peerInfo.peerId,
@[switch1.peerInfo.addrs[0]],
TestCodec)
check switch1.isConnected(switch2.peerInfo.peerId)
check switch2.isConnected(switch1.peerInfo.peerId)
await conn.writeLp("Hello!")
check "Hello!" == string.fromBytes(await conn.readLp(1024))
await conn.close()
let connv6 = await switch3.dial(
switch1.peerInfo.peerId,
@[switch1.peerInfo.addrs[1]],
TestCodec)
check switch1.isConnected(switch3.peerInfo.peerId)
check switch3.isConnected(switch1.peerInfo.peerId)
await connv6.writeLp("Hello!")
check "Hello!" == string.fromBytes(await connv6.readLp(1024))
await connv6.close()
await allFuturesThrowing(
switch1.stop(),
switch2.stop(),
switch3.stop())
check not switch1.isConnected(switch2.peerInfo.peerId)
check not switch2.isConnected(switch1.peerInfo.peerId)
asyncTest "e2e dial dns4 address": asyncTest "e2e dial dns4 address":
var awaiters: seq[Future[void]] var awaiters: seq[Future[void]]
let resolver = MockResolver.new() let resolver = MockResolver.new()
@ -963,8 +1030,8 @@ suite "Switch":
await allFuturesThrowing(awaiters) await allFuturesThrowing(awaiters)
resolver.txtResponses["_dnsaddr.test.io"] = @[ resolver.txtResponses["_dnsaddr.test.io"] = @[
"dnsaddr=/ip4/127.0.0.1" & $destSwitch.peerInfo.addrs[1][1].tryGet() & "/ws", "dnsaddr=" & $destSwitch.peerInfo.addrs[0],
"dnsaddr=/ip4/127.0.0.1" & $destSwitch.peerInfo.addrs[0][1].tryGet() "dnsaddr=" & $destSwitch.peerInfo.addrs[1]
] ]
let testAddr = MultiAddress.init("/dnsaddr/test.io/").tryGet() let testAddr = MultiAddress.init("/dnsaddr/test.io/").tryGet()

View File

@ -17,7 +17,7 @@ suite "TCP transport":
checkTrackers() checkTrackers()
asyncTest "test listener: handle write": asyncTest "test listener: handle write":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let transport: TcpTransport = TcpTransport.new(upgrade = Upgrade())
asyncSpawn transport.start(ma) asyncSpawn transport.start(ma)
@ -28,7 +28,7 @@ suite "TCP transport":
let handlerWait = acceptHandler() let handlerWait = acceptHandler()
let streamTransport = await connect(transport.ma) let streamTransport = await connect(transport.addrs[0])
let msg = await streamTransport.read(6) let msg = await streamTransport.read(6)
@ -38,7 +38,7 @@ suite "TCP transport":
check string.fromBytes(msg) == "Hello!" check string.fromBytes(msg) == "Hello!"
asyncTest "test listener: handle read": asyncTest "test listener: handle read":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let transport: TcpTransport = TcpTransport.new(upgrade = Upgrade())
asyncSpawn transport.start(ma) asyncSpawn transport.start(ma)
@ -51,7 +51,7 @@ suite "TCP transport":
await conn.close() await conn.close()
let handlerWait = acceptHandler() let handlerWait = acceptHandler()
let streamTransport: StreamTransport = await connect(transport.ma) let streamTransport: StreamTransport = await connect(transport.addrs[0])
let sent = await streamTransport.write("Hello!") let sent = await streamTransport.write("Hello!")
await handlerWait.wait(1.seconds) # when no issues will not wait that long! await handlerWait.wait(1.seconds) # when no issues will not wait that long!

View File

@ -71,7 +71,7 @@ suite "WebSocket transport":
"/ip4/0.0.0.0/tcp/0/wss") "/ip4/0.0.0.0/tcp/0/wss")
asyncTest "Hostname verification": asyncTest "Hostname verification":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0/wss").tryGet() let ma = @[Multiaddress.init("/ip4/0.0.0.0/tcp/0/wss").tryGet()]
let transport1 = WsTransport.new(Upgrade(), TLSPrivateKey.init(SecureKey), TLSCertificate.init(SecureCert), {TLSFlags.NoVerifyHost}) let transport1 = WsTransport.new(Upgrade(), TLSPrivateKey.init(SecureKey), TLSCertificate.init(SecureCert), {TLSFlags.NoVerifyHost})
await transport1.start(ma) await transport1.start(ma)
@ -84,12 +84,12 @@ suite "WebSocket transport":
let handlerWait = acceptHandler() let handlerWait = acceptHandler()
# ws.test is in certificate # ws.test is in certificate
let conn = await transport1.dial("ws.test", transport1.ma) let conn = await transport1.dial("ws.test", transport1.addrs[0])
await conn.close() await conn.close()
try: try:
let conn = await transport1.dial("ws.wronghostname", transport1.ma) let conn = await transport1.dial("ws.wronghostname", transport1.addrs[0])
check false check false
except CatchableError as exc: except CatchableError as exc:
check true check true