From af3be7966b302be7a5c07c8a8ad6c5217f1e69c8 Mon Sep 17 00:00:00 2001 From: Tanguy Cizain Date: Tue, 3 Aug 2021 15:48:03 +0200 Subject: [PATCH] Websocket Transport (#593) * start of websocket transport * more ws tests * switch to common test * add close to wsstream * update ws & chronicles version * cleanup * removed multicodec * clean ws outgoing connections * renamed to websock * removed stream from logs * renamed ws to websock * add connection closing test to common transport * close incoming connection on ws stop * renamed testwebsocket.nim -> testwstransport.nim * removed raise todo * split out/in connections * add wss to tests * Fix tls (#608) * change log level * fixed issue related to stopping some cosmetic cleanup * use `allFutures` to stop/close things Prevent potential race conditions when stopping two or more transports * misc * point websock to server-case-object branch * interop test with go * removed websock version specification * add daemon -> native ws test * fix & test closed read/write * update readOnce, thanks jangko Co-authored-by: Dmitriy Ryajov --- libp2p.nimble | 5 +- libp2p/builders.nim | 21 ++- libp2p/multiaddress.nim | 13 +- libp2p/transports/tcptransport.nim | 8 +- libp2p/transports/transport.nim | 6 +- libp2p/transports/wstransport.nim | 244 +++++++++++++++++++++++++++++ libp2p/wire.nim | 15 +- tests/commontransport.nim | 106 ++++++++++--- tests/testinterop.nim | 95 ++++++++++- tests/testnative.nim | 1 + tests/testtcptransport.nim | 5 +- tests/testwstransport.nim | 88 +++++++++++ 12 files changed, 557 insertions(+), 50 deletions(-) create mode 100644 libp2p/transports/wstransport.nim create mode 100644 tests/testwstransport.nim diff --git a/libp2p.nimble b/libp2p.nimble index 9496a90..c565adf 100644 --- a/libp2p.nimble +++ b/libp2p.nimble @@ -10,11 +10,12 @@ skipDirs = @["tests", "examples", "Nim", "tools", "scripts", "docs"] requires "nim >= 1.2.0", "nimcrypto >= 0.4.1", "bearssl >= 0.1.4", - "chronicles >= 0.7.2", + "chronicles#ba2817f1", "chronos >= 2.5.2", "metrics", "secp256k1", - "stew#head" + "stew#head", + "https://github.com/status-im/nim-websock" proc runTest(filename: string, verify: bool = true, sign: bool = true, moreoptions: string = "") = diff --git a/libp2p/builders.nim b/libp2p/builders.nim index 9d8a3bd..228db94 100644 --- a/libp2p/builders.nim +++ b/libp2p/builders.nim @@ -22,14 +22,12 @@ export switch, peerid, peerinfo, connection, multiaddress, crypto, errors type + TransportProvider* = proc(upgr: Upgrade): Transport {.gcsafe, raises: [Defect].} + SecureProtocol* {.pure.} = enum Noise, Secio {.deprecated.} - TcpTransportOpts = object - enable: bool - flags: set[ServerFlags] - MplexOpts = object enable: bool newMuxer: MuxerConstructor @@ -39,7 +37,7 @@ type addresses: seq[MultiAddress] secureManagers: seq[SecureProtocol] mplexOpts: MplexOpts - tcpTransportOpts: TcpTransportOpts + transports: seq[TransportProvider] rng: ref BrHmacDrbgContext maxConnections: int maxIn: int @@ -58,7 +56,6 @@ proc new*(T: type[SwitchBuilder]): T = privKey: none(PrivateKey), addresses: @[address], secureManagers: @[], - tcpTransportOpts: TcpTransportOpts(), maxConnections: MaxConnections, maxIn: -1, maxOut: -1, @@ -97,11 +94,13 @@ proc withNoise*(b: SwitchBuilder): SwitchBuilder = b.secureManagers.add(SecureProtocol.Noise) b -proc withTcpTransport*(b: SwitchBuilder, flags: set[ServerFlags] = {}): SwitchBuilder = - b.tcpTransportOpts.enable = true - b.tcpTransportOpts.flags = flags +proc withTransport*(b: SwitchBuilder, prov: TransportProvider): SwitchBuilder = + b.transports.add(prov) b +proc withTcpTransport*(b: SwitchBuilder, flags: set[ServerFlags] = {}): SwitchBuilder = + b.withTransport(proc(upgr: Upgrade): Transport = TcpTransport.new(flags, upgr)) + proc withRng*(b: SwitchBuilder, rng: ref BrHmacDrbgContext): SwitchBuilder = b.rng = rng b @@ -168,8 +167,8 @@ proc build*(b: SwitchBuilder): Switch let transports = block: var transports: seq[Transport] - if b.tcpTransportOpts.enable: - transports.add(Transport(TcpTransport.new(b.tcpTransportOpts.flags, muxedUpgrade))) + for tProvider in b.transports: + transports.add(tProvider(muxedUpgrade)) transports if b.secureManagers.len == 0: diff --git a/libp2p/multiaddress.nim b/libp2p/multiaddress.nim index cb6d42a..ddd4220 100644 --- a/libp2p/multiaddress.nim +++ b/libp2p/multiaddress.nim @@ -362,6 +362,9 @@ const MAProtocol( mcodec: multiCodec("ws"), kind: Marker, size: 0 ), + MAProtocol( + mcodec: multiCodec("wss"), kind: Marker, size: 0 + ), MAProtocol( mcodec: multiCodec("ipfs"), kind: Length, size: 0, coder: TranscoderP2P @@ -411,6 +414,9 @@ const UTP* = mapAnd(UDP, mapEq("utp")) QUIC* = mapAnd(UDP, mapEq("quic")) UNIX* = mapEq("unix") + WS* = mapAnd(TCP, mapEq("ws")) + WSS* = mapAnd(TCP, mapEq("wss")) + WebSockets* = mapOr(WS, WSS) Unreliable* = mapOr(UDP) @@ -978,11 +984,14 @@ proc matchPart(pat: MaPattern, protos: seq[MultiCodec]): MaPatResult = var empty: seq[MultiCodec] var pcs = protos if pat.operator == Or: + result = MaPatResult(flag: false, rem: empty) for a in pat.args: let res = a.matchPart(pcs) if res.flag: - return MaPatResult(flag: true, rem: res.rem) - result = MaPatResult(flag: false, rem: empty) + #Greedy Or + if result.flag == false or + result.rem.len > res.rem.len: + result = res elif pat.operator == And: if len(pcs) < len(pat.args): return MaPatResult(flag: false, rem: empty) diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index 1b6edfc..7ed322f 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -201,7 +201,7 @@ method accept*(self: TcpTransport): Future[Connection] {.async, gcsafe.} = debug "Server was closed", exc = exc.msg raise newTransportClosedError(exc) except CatchableError as exc: - warn "Unexpected error creating connection", exc = exc.msg + debug "Unexpected error accepting connection", exc = exc.msg raise exc method dial*( @@ -218,8 +218,4 @@ method dial*( method handles*(t: TcpTransport, address: MultiAddress): bool {.gcsafe.} = if procCall Transport(t).handles(address): if address.protocols.isOk: - return address.protocols - .get() - .filterIt( - it == multiCodec("tcp") - ).len > 0 + return TCP.match(address) diff --git a/libp2p/transports/transport.nim b/libp2p/transports/transport.nim index 699b605..b9a0f8e 100644 --- a/libp2p/transports/transport.nim +++ b/libp2p/transports/transport.nim @@ -93,4 +93,8 @@ method handles*( # by default we skip circuit addresses to avoid # having to repeat the check in every transport if address.protocols.isOk: - return address.protocols.get().filterIt( it == multiCodec("p2p-circuit") ).len == 0 + return address.protocols + .get() + .filterIt( + it == multiCodec("p2p-circuit") + ).len == 0 diff --git a/libp2p/transports/wstransport.nim b/libp2p/transports/wstransport.nim new file mode 100644 index 0000000..ac922f2 --- /dev/null +++ b/libp2p/transports/wstransport.nim @@ -0,0 +1,244 @@ +## Nim-LibP2P +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [Defect].} + +import std/[sequtils] +import chronos, chronicles +import transport, + ../errors, + ../wire, + ../multicodec, + ../multistream, + ../connmanager, + ../multiaddress, + ../stream/connection, + ../upgrademngrs/upgrade, + websock/websock + +logScope: + topics = "libp2p wstransport" + +export transport, websock + +const + WsTransportTrackerName* = "libp2p.wstransport" + +type + WsStream = ref object of Connection + session: WSSession + +proc init*(T: type WsStream, + session: WSSession, + dir: Direction, + timeout = 10.minutes, + observedAddr: MultiAddress = MultiAddress()): T = + + let stream = T( + session: session, + timeout: timeout, + dir: dir, + observedAddr: observedAddr) + + stream.initStream() + return stream + +method readOnce*( + s: WsStream, + pbytes: pointer, + nbytes: int): Future[int] {.async.} = + let res = await s.session.recv(pbytes, nbytes) + if res == 0 and s.session.readyState == ReadyState.Closed: + raise newLPStreamEOFError() + return res + +method write*( + s: WsStream, + msg: seq[byte]): Future[void] {.async.} = + try: + await s.session.send(msg, Opcode.Binary) + except WSClosedError: + raise newLPStreamEOFError() + +method closeImpl*(s: WsStream): Future[void] {.async.} = + await s.session.close() + await procCall Connection(s).closeImpl() + +type + WsTransport* = ref object of Transport + httpserver: HttpServer + wsserver: WSServer + connections: array[Direction, seq[WsStream]] + + tlsPrivateKey: TLSPrivateKey + tlsCertificate: TLSCertificate + tlsFlags: set[TLSFlags] + flags: set[ServerFlags] + factories: seq[ExtFactory] + rng: Rng + +proc secure*(self: WsTransport): bool = + not (isNil(self.tlsPrivateKey) or isNil(self.tlsCertificate)) + +method start*( + self: WsTransport, + ma: MultiAddress) {.async.} = + ## listen on the transport + ## + + if self.running: + trace "WS transport already running" + return + + await procCall Transport(self).start(ma) + trace "Starting WS transport" + + self.httpserver = + if self.secure: + TlsHttpServer.create( + address = self.ma.initTAddress().tryGet(), + tlsPrivateKey = self.tlsPrivateKey, + tlsCertificate = self.tlsCertificate, + flags = self.flags) + else: + HttpServer.create(self.ma.initTAddress().tryGet()) + + self.wsserver = WSServer.new( + factories = self.factories, + rng = self.rng) + + let codec = if self.secure: + MultiAddress.init("/wss") + else: + MultiAddress.init("/ws") + + # always get the resolved address in case we're bound to 0.0.0.0:0 + self.ma = MultiAddress.init( + self.httpserver.localAddress()).tryGet() & codec.tryGet() + + self.running = true + trace "Listening on", address = self.ma + +method stop*(self: WsTransport) {.async, gcsafe.} = + ## stop the transport + ## + + self.running = false # mark stopped as soon as possible + + try: + trace "Stopping WS transport" + await procCall Transport(self).stop() # call base + + checkFutures( + await allFinished( + self.connections[Direction.In].mapIt(it.close()) & + self.connections[Direction.Out].mapIt(it.close()))) + + # server can be nil + if not isNil(self.httpserver): + self.httpserver.stop() + await self.httpserver.closeWait() + + self.httpserver = nil + trace "Transport stopped" + except CatchableError as exc: + trace "Error shutting down ws transport", exc = exc.msg + +proc trackConnection(self: WsTransport, conn: WsStream, dir: Direction) = + self.connections[dir].add(conn) + proc onClose() {.async.} = + await conn.session.stream.reader.join() + self.connections[dir].keepItIf(it != conn) + trace "Cleaned up client" + asyncSpawn onClose() + +method accept*(self: WsTransport): Future[Connection] {.async, gcsafe.} = + ## accept a new WS connection + ## + + if not self.running: + raise newTransportClosedError() + + try: + let + req = await self.httpserver.accept() + wstransp = await self.wsserver.handleRequest(req) + stream = WsStream.init(wstransp, Direction.In) + + self.trackConnection(stream, Direction.In) + return stream + except TransportOsError as exc: + debug "OS Error", exc = exc.msg + except TransportTooManyError as exc: + debug "Too many files opened", exc = exc.msg + except TransportUseClosedError as exc: + debug "Server was closed", exc = exc.msg + raise newTransportClosedError(exc) + except CatchableError as exc: + warn "Unexpected error accepting connection", exc = exc.msg + raise exc + +method dial*( + self: WsTransport, + address: MultiAddress): Future[Connection] {.async, gcsafe.} = + ## dial a peer + ## + + trace "Dialing remote peer", address = $address + + let + secure = WSS.match(address) + transp = await WebSocket.connect( + address.initTAddress().tryGet(), + "", + secure = secure, + flags = self.tlsFlags) + stream = WsStream.init(transp, Direction.Out) + + self.trackConnection(stream, Direction.Out) + return stream + +method handles*(t: WsTransport, address: MultiAddress): bool {.gcsafe.} = + if procCall Transport(t).handles(address): + if address.protocols.isOk: + return WebSockets.match(address) + +proc new*( + T: typedesc[WsTransport], + upgrade: Upgrade, + tlsPrivateKey: TLSPrivateKey, + tlsCertificate: TLSCertificate, + tlsFlags: set[TLSFlags] = {}, + flags: set[ServerFlags] = {}, + factories: openArray[ExtFactory] = [], + rng: Rng = nil): T = + + T( + upgrader: upgrade, + tlsPrivateKey: tlsPrivateKey, + tlsCertificate: tlsCertificate, + tlsFlags: tlsFlags, + flags: flags, + factories: @factories, + rng: rng) + +proc new*( + T: typedesc[WsTransport], + upgrade: Upgrade, + flags: set[ServerFlags] = {}, + factories: openArray[ExtFactory] = [], + rng: Rng = nil): T = + + T.new( + upgrade = upgrade, + tlsPrivateKey = nil, + tlsCertificate = nil, + flags = flags, + factories = @factories, + rng = rng) diff --git a/libp2p/wire.nim b/libp2p/wire.nim index 845e9d7..761a6fd 100644 --- a/libp2p/wire.nim +++ b/libp2p/wire.nim @@ -19,17 +19,18 @@ else: import posix const - TRANSPMA* = mapOr( - mapAnd(IP, mapEq("udp")), - mapAnd(IP, mapEq("tcp")), - mapAnd(mapEq("unix")) + RTRANSPMA* = mapOr( + TCP, + WebSockets, + UNIX ) - RTRANSPMA* = mapOr( - mapAnd(IP, mapEq("tcp")), - mapAnd(mapEq("unix")) + TRANSPMA* = mapOr( + RTRANSPMA, + UDP ) + proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress] = ## Initialize ``TransportAddress`` with MultiAddress ``ma``. ## diff --git a/tests/commontransport.nim b/tests/commontransport.nim index caaa75b..e8f5839 100644 --- a/tests/commontransport.nim +++ b/tests/commontransport.nim @@ -11,14 +11,24 @@ import ../libp2p/[stream/connection, import ./helpers -proc commonTransportTest*(transportType: typedesc[Transport], ma: string) = - suite $transportType & " common": +type TransportProvider* = proc(): Transport {.gcsafe.} + +proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) = + suite name & " common tests": teardown: checkTrackers() + + asyncTest "can handle local address": + let ma: MultiAddress = Multiaddress.init(ma).tryGet() + let transport1 = prov() + await transport1.start(ma) + check transport1.handles(transport1.ma) + await transport1.stop() + asyncTest "e2e: handle write": let ma: MultiAddress = Multiaddress.init(ma).tryGet() - let transport1: transportType = transportType.new(upgrade = Upgrade()) + let transport1 = prov() await transport1.start(ma) proc acceptHandler() {.async, gcsafe.} = @@ -28,23 +38,25 @@ proc commonTransportTest*(transportType: typedesc[Transport], ma: string) = let handlerWait = acceptHandler() - let transport2: transportType = transportType.new(upgrade = Upgrade()) + let transport2 = prov() let conn = await transport2.dial(transport1.ma) var msg = newSeq[byte](6) await conn.readExactly(addr msg[0], 6) - await conn.close() #for some protocols, closing requires actively, so we must close here - await handlerWait.wait(1.seconds) # when no issues will not wait that long! + await conn.close() #for some protocols, closing requires actively reading, so we must close here - await transport2.stop() - await transport1.stop() + await allFuturesThrowing( + allFinished( + transport1.stop(), + transport2.stop())) check string.fromBytes(msg) == "Hello!" + await handlerWait.wait(1.seconds) # when no issues will not wait that long! asyncTest "e2e: handle read": let ma: MultiAddress = Multiaddress.init(ma).tryGet() - let transport1: transportType = transportType.new(upgrade = Upgrade()) - asyncSpawn transport1.start(ma) + let transport1 = prov() + await transport1.start(ma) proc acceptHandler() {.async, gcsafe.} = let conn = await transport1.accept() @@ -55,35 +67,39 @@ proc commonTransportTest*(transportType: typedesc[Transport], ma: string) = let handlerWait = acceptHandler() - let transport2: transportType = transportType.new(upgrade = Upgrade()) + let transport2 = prov() let conn = await transport2.dial(transport1.ma) await conn.write("Hello!") - await conn.close() #for some protocols, closing requires actively, so we must close here + await conn.close() #for some protocols, closing requires actively reading, so we must close here await handlerWait.wait(1.seconds) # when no issues will not wait that long! - await transport2.stop() - await transport1.stop() + await allFuturesThrowing( + allFinished( + transport1.stop(), + transport2.stop())) asyncTest "e2e: handle dial cancellation": let ma: MultiAddress = Multiaddress.init(ma).tryGet() - let transport1: transportType = transportType.new(upgrade = Upgrade()) + let transport1 = prov() await transport1.start(ma) - let transport2: transportType = transportType.new(upgrade = Upgrade()) + let transport2 = prov() let cancellation = transport2.dial(transport1.ma) await cancellation.cancelAndWait() check cancellation.cancelled - await transport2.stop() - await transport1.stop() + await allFuturesThrowing( + allFinished( + transport1.stop(), + transport2.stop())) asyncTest "e2e: handle accept cancellation": let ma: MultiAddress = Multiaddress.init(ma).tryGet() - let transport1: transportType = transportType.new(upgrade = Upgrade()) + let transport1 = prov() await transport1.start(ma) let acceptHandler = transport1.accept() @@ -91,3 +107,55 @@ proc commonTransportTest*(transportType: typedesc[Transport], ma: string) = check acceptHandler.cancelled await transport1.stop() + + asyncTest "e2e: stopping transport kills connections": + let ma: MultiAddress = Multiaddress.init(ma).tryGet() + + let transport1 = prov() + await transport1.start(ma) + + let transport2 = prov() + + let acceptHandler = transport1.accept() + let conn = await transport2.dial(transport1.ma) + let serverConn = await acceptHandler + + await allFuturesThrowing( + allFinished( + transport1.stop(), + transport2.stop())) + + check serverConn.closed() + check conn.closed() + + asyncTest "read or write on closed connection": + let ma: MultiAddress = Multiaddress.init(ma).tryGet() + let transport1 = prov() + await transport1.start(ma) + + proc acceptHandler() {.async, gcsafe.} = + let conn = await transport1.accept() + await conn.close() + + let handlerWait = acceptHandler() + + let conn = await transport1.dial(transport1.ma) + + var msg = newSeq[byte](6) + try: + await conn.readExactly(addr msg[0], 6) + check false + except CatchableError as exc: + check true + + # we don't HAVE to throw on write on EOF + # (at least TCP doesn't) + try: + await conn.write(msg) + except CatchableError as exc: + check true + + await conn.close() #for some protocols, closing requires actively reading, so we must close here + await handlerWait.wait(1.seconds) # when no issues will not wait that long! + + await transport1.stop() diff --git a/tests/testinterop.nim b/tests/testinterop.nim index 833a4c7..d754b9a 100644 --- a/tests/testinterop.nim +++ b/tests/testinterop.nim @@ -2,7 +2,7 @@ import options, tables import chronos, chronicles, stew/byteutils import helpers import ../libp2p -import ../libp2p/[daemon/daemonapi, varint] +import ../libp2p/[daemon/daemonapi, varint, transports/wstransport, crypto/crypto] type # TODO: Unify both PeerInfo structs @@ -288,6 +288,99 @@ suite "Interop": await daemonNode.close() await sleepAsync(1.seconds) + asyncTest "native -> daemon websocket connection": + var protos = @["/test-stream"] + var test = "TEST STRING" + + var testFuture = newFuture[string]("test.future") + proc nativeHandler(conn: Connection, proto: string) {.async.} = + var line = string.fromBytes(await conn.readLp(1024)) + check line == test + testFuture.complete(line) + await conn.close() + + # custom proto + var proto = new LPProtocol + proto.handler = nativeHandler + proto.codec = protos[0] # codec + + let wsAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0/ws").tryGet() + + let nativeNode = SwitchBuilder + .new() + .withAddress(wsAddress) + .withRng(crypto.newRng()) + .withMplex() + .withTransport(proc (upgr: Upgrade): Transport = WsTransport.new(upgr)) + .withNoise() + .build() + + nativeNode.mount(proto) + + let awaiters = await nativeNode.start() + let nativePeer = nativeNode.peerInfo + + let daemonNode = await newDaemonApi(hostAddresses = @[wsAddress]) + await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) + var stream = await daemonNode.openStream(nativePeer.peerId, protos) + discard await stream.transp.writeLp(test) + + check test == (await wait(testFuture, 10.secs)) + + await stream.close() + await nativeNode.stop() + await allFutures(awaiters) + await daemonNode.close() + await sleepAsync(1.seconds) + + asyncTest "daemon -> native websocket connection": + var protos = @["/test-stream"] + var test = "TEST STRING" + # We are preparing expect string, which should be prefixed with varint + # length and do not have `\r\n` suffix, because we going to use + # readLine(). + var buffer = initVBuffer() + buffer.writeSeq(test & "\r\n") + buffer.finish() + var expect = newString(len(buffer) - 2) + copyMem(addr expect[0], addr buffer.buffer[0], len(expect)) + + let wsAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0/ws").tryGet() + let nativeNode = SwitchBuilder + .new() + .withAddress(wsAddress) + .withRng(crypto.newRng()) + .withMplex() + .withTransport(proc (upgr: Upgrade): Transport = WsTransport.new(upgr)) + .withNoise() + .build() + + let awaiters = await nativeNode.start() + + let daemonNode = await newDaemonApi(hostAddresses = @[wsAddress]) + let daemonPeer = await daemonNode.identity() + + var testFuture = newFuture[string]("test.future") + proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} = + # We should perform `readLp()` instead of `readLine()`. `readLine()` + # here reads actually length prefixed string. + var line = await stream.transp.readLine() + check line == expect + testFuture.complete(line) + await stream.close() + + await daemonNode.addHandler(protos, daemonHandler) + let conn = await nativeNode.dial(NativePeerInfo.init(daemonPeer.peer, + daemonPeer.addresses), + protos[0]) + await conn.writeLp(test & "\r\n") + check expect == (await wait(testFuture, 10.secs)) + + await conn.close() + await nativeNode.stop() + await allFutures(awaiters) + await daemonNode.close() + asyncTest "daemon -> multiple reads and writes": var protos = @["/test-stream"] diff --git a/tests/testnative.nim b/tests/testnative.nim index 081af88..f3a682c 100644 --- a/tests/testnative.nim +++ b/tests/testnative.nim @@ -17,6 +17,7 @@ import testmultibase, testpeerid import testtcptransport, + testwstransport, testmultistream, testbufferstream, testidentify, diff --git a/tests/testtcptransport.nim b/tests/testtcptransport.nim index d46c84c..fdd23d1 100644 --- a/tests/testtcptransport.nim +++ b/tests/testtcptransport.nim @@ -125,4 +125,7 @@ suite "TCP transport": server.close() await server.join() - TcpTransport.commonTransportTest("/ip4/0.0.0.0/tcp/0") + commonTransportTest( + "TcpTransport", + proc (): Transport = TcpTransport.new(upgrade = Upgrade()), + "/ip4/0.0.0.0/tcp/0") diff --git a/tests/testwstransport.nim b/tests/testwstransport.nim new file mode 100644 index 0000000..af3d971 --- /dev/null +++ b/tests/testwstransport.nim @@ -0,0 +1,88 @@ +{.used.} + +import sequtils +import chronos, stew/byteutils +import ../libp2p/[stream/connection, + transports/transport, + transports/wstransport, + upgrademngrs/upgrade, + multiaddress, + errors, + wire] + +import ./helpers, ./commontransport + +const + SecureKey* = """ +-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCdNv0SX02aeZ4/ +Yc+p/Kwd5UVOHlpmK7/TVC/kcjFbdoUuKNn8pnX/fyhgSKpUYut+te7YRiZhqlaL +EZKjfy8GBZwXZnJCevFkTvGTTebXXExLIsLGfJqKeLAdFCQkX8wV3jV1DT5JLV+D +5+HWaiiBr38gsl4ZbfyedTF40JvzokCmcdlx9bpzX1j/b84L/zSwUyyEcgp5G28F +Jh5TnxAeDHJpOVjr8XMb/xoNqiDF6NwF96hvOZC14mZ1TxxW5bUzXprsy0l52pmh +dN3Crz11+t2h519hRKHxT6/l5pTx/+dApXiP6hMV04CQJNnas3NyRxTDR9dNel+3 ++wD7/PRTAgMBAAECggEBAJuXPEbegxMKog7gYoE9S6oaqchySc0sJyCjBPL2ANsg +JRZV38cnh0hhNDh2MfxqGd7Bd6wbYQjvZ88iiRm+WW+ARcby4MnimtxHNNYwFvG0 +qt0BffqqftfkMYfV0x8coAJUdFtvy+DoQstsxhlJ3uTaJtrZLD/GlmjMWzXSX0Vy +FXiLDO7/LoSjsjaf4e4aLofIyLJS3H1T+5cr/d2mdpRzkeWkxShODsK4cRLOlZ5I +pz4Wm2770DTbiYph8ixl/CnmYn6T7V0F5VYujALknipUBeQY4e/A9vrQ/pvqJV+W +JjFUne6Rxg/lJjh8vNJp2bK1ZbzpwmZLaZIoEz8t/qECgYEAzvCCA48uQPaurSQ3 +cvHDhcVwYmEaH8MW8aIW/5l8XJK60GsUHPFhEsfD/ObI5PJJ9aOqgabpRHkvD4ZY +a8QJBxCy6UeogUeKvGks8VQ34SZXLimmgrL9Mlljv0v9PloEkVYbztYyX4GVO0ov +3oH+hKO+/MclzNDyeXZx3Vv4K+UCgYEAwnyb7tqp7fRqm/8EymIZV5pa0p6h609p +EhCBi9ii6d/ewEjsBhs7bPDBO4PO9ylvOvryYZH1hVbQja2anOCBjO8dAHRHWM86 +964TFriywBQkYxp6dsB8nUjLBDza2xAM3m+OGi9/ATuhEAe5sXp/fZL3tkfSaOXI +A7Gzro+kS9cCgYEAtKScSfEeBlWQa9H2mV9UN5z/mtF61YkeqTW+b8cTGVh4vWEL +wKww+gzqGAV6Duk2CLijKeSDMmO64gl7fC83VjSMiTklbhz+jbQeKFhFI0Sty71N +/j+y6NXBTgdOfLRl0lzhj2/JrzdWBtie6tR9UloCaXSKmb04PTFY+kvDWsUCgYBR +krJUnKJpi/qrM2tu93Zpp/QwIxkG+We4i/PKFDNApQVo4S0d4o4qQ1DJBZ/pSxe8 +RUUkZ3PzWVZgFlCjPAcadbBUYHEMbt7sw7Z98ToIFmqspo53AIVD8yQzwtKIz1KW +eXPAx+sdOUV008ivCBIxOVNswPMfzED4S7Bxpw3iQQKBgGJhct2nBsgu0l2/wzh9 +tpKbalW1RllgptNQzjuBEZMTvPF0L+7BE09/exKtt4N9s3yAzi8o6Qo7RHX5djVc +SNgafV4jj7jt2Ilh6KOy9dshtLoEkS1NmiqfVe2go2auXZdyGm+I2yzKWdKGDO0J +diTtYf1sA0PgNXdSyDC03TZl +-----END PRIVATE KEY----- +""" + + SecureCert* = """ +-----BEGIN CERTIFICATE----- +MIIDazCCAlOgAwIBAgIUe9fr78Dz9PedQ5Sq0uluMWQhX9wwDQYJKoZIhvcNAQEL +BQAwRTELMAkGA1UEBhMCSU4xEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yMTAzMTcwOTMzMzZaFw0zMTAz +MTUwOTMzMzZaMEUxCzAJBgNVBAYTAklOMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw +HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEB +AQUAA4IBDwAwggEKAoIBAQCdNv0SX02aeZ4/Yc+p/Kwd5UVOHlpmK7/TVC/kcjFb +doUuKNn8pnX/fyhgSKpUYut+te7YRiZhqlaLEZKjfy8GBZwXZnJCevFkTvGTTebX +XExLIsLGfJqKeLAdFCQkX8wV3jV1DT5JLV+D5+HWaiiBr38gsl4ZbfyedTF40Jvz +okCmcdlx9bpzX1j/b84L/zSwUyyEcgp5G28FJh5TnxAeDHJpOVjr8XMb/xoNqiDF +6NwF96hvOZC14mZ1TxxW5bUzXprsy0l52pmhdN3Crz11+t2h519hRKHxT6/l5pTx +/+dApXiP6hMV04CQJNnas3NyRxTDR9dNel+3+wD7/PRTAgMBAAGjUzBRMB0GA1Ud +DgQWBBRkSY1AkGUpVNxG5fYocfgFODtQmTAfBgNVHSMEGDAWgBRkSY1AkGUpVNxG +5fYocfgFODtQmTAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQBt +D71VH7F8GOQXITFXCrHwEq1Fx3ScuSnL04NJrXw/e9huzLVQOchAYp/EIn4x2utN +S31dt94wvi/IysOVbR1LatYNF5kKgGj2Wc6DH0PswBMk8R1G8QMeCz+hCjf1VDHe +AAW1x2q20rJAvUrT6cRBQqeiMzQj0OaJbvfnd2hu0/d0DFkcuGVgBa2zlbG5rbdU +Jnq7MQfSaZHd0uBgiKkS+Zw6XaYfWfByCAGSnUqRdOChiJ2stFVLvu+9oQ+PJjJt +Er1u9bKTUyeuYpqXr2BP9dqphwu8R4NFVUg6DIRpMFMsybaL7KAd4hD22RXCvc0m +uLu7KODi+eW62MHqs4N2 +-----END CERTIFICATE----- +""" + +suite "WebSocket transport": + teardown: + checkTrackers() + + commonTransportTest( + "WebSocket", + proc (): Transport = WsTransport.new(Upgrade()), + "/ip4/0.0.0.0/tcp/0/ws") + + commonTransportTest( + "WebSocket Secure", + proc (): Transport = + WsTransport.new( + Upgrade(), + TLSPrivateKey.init(SecureKey), + TLSCertificate.init(SecureCert), + {TLSFlags.NoVerifyHost, TLSFlags.NoVerifyServerName}), + "/ip4/0.0.0.0/tcp/0/wss")