Websocket Transport (#593)
* start of websocket transport * more ws tests * switch to common test * add close to wsstream * update ws & chronicles version * cleanup * removed multicodec * clean ws outgoing connections * renamed to websock * removed stream from logs * renamed ws to websock * add connection closing test to common transport * close incoming connection on ws stop * renamed testwebsocket.nim -> testwstransport.nim * removed raise todo * split out/in connections * add wss to tests * Fix tls (#608) * change log level * fixed issue related to stopping some cosmetic cleanup * use `allFutures` to stop/close things Prevent potential race conditions when stopping two or more transports * misc * point websock to server-case-object branch * interop test with go * removed websock version specification * add daemon -> native ws test * fix & test closed read/write * update readOnce, thanks jangko Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com>
This commit is contained in:
parent
8b438142ce
commit
af3be7966b
|
@ -10,11 +10,12 @@ skipDirs = @["tests", "examples", "Nim", "tools", "scripts", "docs"]
|
|||
requires "nim >= 1.2.0",
|
||||
"nimcrypto >= 0.4.1",
|
||||
"bearssl >= 0.1.4",
|
||||
"chronicles >= 0.7.2",
|
||||
"chronicles#ba2817f1",
|
||||
"chronos >= 2.5.2",
|
||||
"metrics",
|
||||
"secp256k1",
|
||||
"stew#head"
|
||||
"stew#head",
|
||||
"https://github.com/status-im/nim-websock"
|
||||
|
||||
proc runTest(filename: string, verify: bool = true, sign: bool = true,
|
||||
moreoptions: string = "") =
|
||||
|
|
|
@ -22,14 +22,12 @@ export
|
|||
switch, peerid, peerinfo, connection, multiaddress, crypto, errors
|
||||
|
||||
type
|
||||
TransportProvider* = proc(upgr: Upgrade): Transport {.gcsafe, raises: [Defect].}
|
||||
|
||||
SecureProtocol* {.pure.} = enum
|
||||
Noise,
|
||||
Secio {.deprecated.}
|
||||
|
||||
TcpTransportOpts = object
|
||||
enable: bool
|
||||
flags: set[ServerFlags]
|
||||
|
||||
MplexOpts = object
|
||||
enable: bool
|
||||
newMuxer: MuxerConstructor
|
||||
|
@ -39,7 +37,7 @@ type
|
|||
addresses: seq[MultiAddress]
|
||||
secureManagers: seq[SecureProtocol]
|
||||
mplexOpts: MplexOpts
|
||||
tcpTransportOpts: TcpTransportOpts
|
||||
transports: seq[TransportProvider]
|
||||
rng: ref BrHmacDrbgContext
|
||||
maxConnections: int
|
||||
maxIn: int
|
||||
|
@ -58,7 +56,6 @@ proc new*(T: type[SwitchBuilder]): T =
|
|||
privKey: none(PrivateKey),
|
||||
addresses: @[address],
|
||||
secureManagers: @[],
|
||||
tcpTransportOpts: TcpTransportOpts(),
|
||||
maxConnections: MaxConnections,
|
||||
maxIn: -1,
|
||||
maxOut: -1,
|
||||
|
@ -97,11 +94,13 @@ proc withNoise*(b: SwitchBuilder): SwitchBuilder =
|
|||
b.secureManagers.add(SecureProtocol.Noise)
|
||||
b
|
||||
|
||||
proc withTcpTransport*(b: SwitchBuilder, flags: set[ServerFlags] = {}): SwitchBuilder =
|
||||
b.tcpTransportOpts.enable = true
|
||||
b.tcpTransportOpts.flags = flags
|
||||
proc withTransport*(b: SwitchBuilder, prov: TransportProvider): SwitchBuilder =
|
||||
b.transports.add(prov)
|
||||
b
|
||||
|
||||
proc withTcpTransport*(b: SwitchBuilder, flags: set[ServerFlags] = {}): SwitchBuilder =
|
||||
b.withTransport(proc(upgr: Upgrade): Transport = TcpTransport.new(flags, upgr))
|
||||
|
||||
proc withRng*(b: SwitchBuilder, rng: ref BrHmacDrbgContext): SwitchBuilder =
|
||||
b.rng = rng
|
||||
b
|
||||
|
@ -168,8 +167,8 @@ proc build*(b: SwitchBuilder): Switch
|
|||
let
|
||||
transports = block:
|
||||
var transports: seq[Transport]
|
||||
if b.tcpTransportOpts.enable:
|
||||
transports.add(Transport(TcpTransport.new(b.tcpTransportOpts.flags, muxedUpgrade)))
|
||||
for tProvider in b.transports:
|
||||
transports.add(tProvider(muxedUpgrade))
|
||||
transports
|
||||
|
||||
if b.secureManagers.len == 0:
|
||||
|
|
|
@ -362,6 +362,9 @@ const
|
|||
MAProtocol(
|
||||
mcodec: multiCodec("ws"), kind: Marker, size: 0
|
||||
),
|
||||
MAProtocol(
|
||||
mcodec: multiCodec("wss"), kind: Marker, size: 0
|
||||
),
|
||||
MAProtocol(
|
||||
mcodec: multiCodec("ipfs"), kind: Length, size: 0,
|
||||
coder: TranscoderP2P
|
||||
|
@ -411,6 +414,9 @@ const
|
|||
UTP* = mapAnd(UDP, mapEq("utp"))
|
||||
QUIC* = mapAnd(UDP, mapEq("quic"))
|
||||
UNIX* = mapEq("unix")
|
||||
WS* = mapAnd(TCP, mapEq("ws"))
|
||||
WSS* = mapAnd(TCP, mapEq("wss"))
|
||||
WebSockets* = mapOr(WS, WSS)
|
||||
|
||||
Unreliable* = mapOr(UDP)
|
||||
|
||||
|
@ -978,11 +984,14 @@ proc matchPart(pat: MaPattern, protos: seq[MultiCodec]): MaPatResult =
|
|||
var empty: seq[MultiCodec]
|
||||
var pcs = protos
|
||||
if pat.operator == Or:
|
||||
result = MaPatResult(flag: false, rem: empty)
|
||||
for a in pat.args:
|
||||
let res = a.matchPart(pcs)
|
||||
if res.flag:
|
||||
return MaPatResult(flag: true, rem: res.rem)
|
||||
result = MaPatResult(flag: false, rem: empty)
|
||||
#Greedy Or
|
||||
if result.flag == false or
|
||||
result.rem.len > res.rem.len:
|
||||
result = res
|
||||
elif pat.operator == And:
|
||||
if len(pcs) < len(pat.args):
|
||||
return MaPatResult(flag: false, rem: empty)
|
||||
|
|
|
@ -201,7 +201,7 @@ method accept*(self: TcpTransport): Future[Connection] {.async, gcsafe.} =
|
|||
debug "Server was closed", exc = exc.msg
|
||||
raise newTransportClosedError(exc)
|
||||
except CatchableError as exc:
|
||||
warn "Unexpected error creating connection", exc = exc.msg
|
||||
debug "Unexpected error accepting connection", exc = exc.msg
|
||||
raise exc
|
||||
|
||||
method dial*(
|
||||
|
@ -218,8 +218,4 @@ method dial*(
|
|||
method handles*(t: TcpTransport, address: MultiAddress): bool {.gcsafe.} =
|
||||
if procCall Transport(t).handles(address):
|
||||
if address.protocols.isOk:
|
||||
return address.protocols
|
||||
.get()
|
||||
.filterIt(
|
||||
it == multiCodec("tcp")
|
||||
).len > 0
|
||||
return TCP.match(address)
|
||||
|
|
|
@ -93,4 +93,8 @@ method handles*(
|
|||
# by default we skip circuit addresses to avoid
|
||||
# having to repeat the check in every transport
|
||||
if address.protocols.isOk:
|
||||
return address.protocols.get().filterIt( it == multiCodec("p2p-circuit") ).len == 0
|
||||
return address.protocols
|
||||
.get()
|
||||
.filterIt(
|
||||
it == multiCodec("p2p-circuit")
|
||||
).len == 0
|
||||
|
|
|
@ -0,0 +1,244 @@
|
|||
## Nim-LibP2P
|
||||
## Copyright (c) 2021 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import std/[sequtils]
|
||||
import chronos, chronicles
|
||||
import transport,
|
||||
../errors,
|
||||
../wire,
|
||||
../multicodec,
|
||||
../multistream,
|
||||
../connmanager,
|
||||
../multiaddress,
|
||||
../stream/connection,
|
||||
../upgrademngrs/upgrade,
|
||||
websock/websock
|
||||
|
||||
logScope:
|
||||
topics = "libp2p wstransport"
|
||||
|
||||
export transport, websock
|
||||
|
||||
const
|
||||
WsTransportTrackerName* = "libp2p.wstransport"
|
||||
|
||||
type
|
||||
WsStream = ref object of Connection
|
||||
session: WSSession
|
||||
|
||||
proc init*(T: type WsStream,
|
||||
session: WSSession,
|
||||
dir: Direction,
|
||||
timeout = 10.minutes,
|
||||
observedAddr: MultiAddress = MultiAddress()): T =
|
||||
|
||||
let stream = T(
|
||||
session: session,
|
||||
timeout: timeout,
|
||||
dir: dir,
|
||||
observedAddr: observedAddr)
|
||||
|
||||
stream.initStream()
|
||||
return stream
|
||||
|
||||
method readOnce*(
|
||||
s: WsStream,
|
||||
pbytes: pointer,
|
||||
nbytes: int): Future[int] {.async.} =
|
||||
let res = await s.session.recv(pbytes, nbytes)
|
||||
if res == 0 and s.session.readyState == ReadyState.Closed:
|
||||
raise newLPStreamEOFError()
|
||||
return res
|
||||
|
||||
method write*(
|
||||
s: WsStream,
|
||||
msg: seq[byte]): Future[void] {.async.} =
|
||||
try:
|
||||
await s.session.send(msg, Opcode.Binary)
|
||||
except WSClosedError:
|
||||
raise newLPStreamEOFError()
|
||||
|
||||
method closeImpl*(s: WsStream): Future[void] {.async.} =
|
||||
await s.session.close()
|
||||
await procCall Connection(s).closeImpl()
|
||||
|
||||
type
|
||||
WsTransport* = ref object of Transport
|
||||
httpserver: HttpServer
|
||||
wsserver: WSServer
|
||||
connections: array[Direction, seq[WsStream]]
|
||||
|
||||
tlsPrivateKey: TLSPrivateKey
|
||||
tlsCertificate: TLSCertificate
|
||||
tlsFlags: set[TLSFlags]
|
||||
flags: set[ServerFlags]
|
||||
factories: seq[ExtFactory]
|
||||
rng: Rng
|
||||
|
||||
proc secure*(self: WsTransport): bool =
|
||||
not (isNil(self.tlsPrivateKey) or isNil(self.tlsCertificate))
|
||||
|
||||
method start*(
|
||||
self: WsTransport,
|
||||
ma: MultiAddress) {.async.} =
|
||||
## listen on the transport
|
||||
##
|
||||
|
||||
if self.running:
|
||||
trace "WS transport already running"
|
||||
return
|
||||
|
||||
await procCall Transport(self).start(ma)
|
||||
trace "Starting WS transport"
|
||||
|
||||
self.httpserver =
|
||||
if self.secure:
|
||||
TlsHttpServer.create(
|
||||
address = self.ma.initTAddress().tryGet(),
|
||||
tlsPrivateKey = self.tlsPrivateKey,
|
||||
tlsCertificate = self.tlsCertificate,
|
||||
flags = self.flags)
|
||||
else:
|
||||
HttpServer.create(self.ma.initTAddress().tryGet())
|
||||
|
||||
self.wsserver = WSServer.new(
|
||||
factories = self.factories,
|
||||
rng = self.rng)
|
||||
|
||||
let codec = if self.secure:
|
||||
MultiAddress.init("/wss")
|
||||
else:
|
||||
MultiAddress.init("/ws")
|
||||
|
||||
# always get the resolved address in case we're bound to 0.0.0.0:0
|
||||
self.ma = MultiAddress.init(
|
||||
self.httpserver.localAddress()).tryGet() & codec.tryGet()
|
||||
|
||||
self.running = true
|
||||
trace "Listening on", address = self.ma
|
||||
|
||||
method stop*(self: WsTransport) {.async, gcsafe.} =
|
||||
## stop the transport
|
||||
##
|
||||
|
||||
self.running = false # mark stopped as soon as possible
|
||||
|
||||
try:
|
||||
trace "Stopping WS transport"
|
||||
await procCall Transport(self).stop() # call base
|
||||
|
||||
checkFutures(
|
||||
await allFinished(
|
||||
self.connections[Direction.In].mapIt(it.close()) &
|
||||
self.connections[Direction.Out].mapIt(it.close())))
|
||||
|
||||
# server can be nil
|
||||
if not isNil(self.httpserver):
|
||||
self.httpserver.stop()
|
||||
await self.httpserver.closeWait()
|
||||
|
||||
self.httpserver = nil
|
||||
trace "Transport stopped"
|
||||
except CatchableError as exc:
|
||||
trace "Error shutting down ws transport", exc = exc.msg
|
||||
|
||||
proc trackConnection(self: WsTransport, conn: WsStream, dir: Direction) =
|
||||
self.connections[dir].add(conn)
|
||||
proc onClose() {.async.} =
|
||||
await conn.session.stream.reader.join()
|
||||
self.connections[dir].keepItIf(it != conn)
|
||||
trace "Cleaned up client"
|
||||
asyncSpawn onClose()
|
||||
|
||||
method accept*(self: WsTransport): Future[Connection] {.async, gcsafe.} =
|
||||
## accept a new WS connection
|
||||
##
|
||||
|
||||
if not self.running:
|
||||
raise newTransportClosedError()
|
||||
|
||||
try:
|
||||
let
|
||||
req = await self.httpserver.accept()
|
||||
wstransp = await self.wsserver.handleRequest(req)
|
||||
stream = WsStream.init(wstransp, Direction.In)
|
||||
|
||||
self.trackConnection(stream, Direction.In)
|
||||
return stream
|
||||
except TransportOsError as exc:
|
||||
debug "OS Error", exc = exc.msg
|
||||
except TransportTooManyError as exc:
|
||||
debug "Too many files opened", exc = exc.msg
|
||||
except TransportUseClosedError as exc:
|
||||
debug "Server was closed", exc = exc.msg
|
||||
raise newTransportClosedError(exc)
|
||||
except CatchableError as exc:
|
||||
warn "Unexpected error accepting connection", exc = exc.msg
|
||||
raise exc
|
||||
|
||||
method dial*(
|
||||
self: WsTransport,
|
||||
address: MultiAddress): Future[Connection] {.async, gcsafe.} =
|
||||
## dial a peer
|
||||
##
|
||||
|
||||
trace "Dialing remote peer", address = $address
|
||||
|
||||
let
|
||||
secure = WSS.match(address)
|
||||
transp = await WebSocket.connect(
|
||||
address.initTAddress().tryGet(),
|
||||
"",
|
||||
secure = secure,
|
||||
flags = self.tlsFlags)
|
||||
stream = WsStream.init(transp, Direction.Out)
|
||||
|
||||
self.trackConnection(stream, Direction.Out)
|
||||
return stream
|
||||
|
||||
method handles*(t: WsTransport, address: MultiAddress): bool {.gcsafe.} =
|
||||
if procCall Transport(t).handles(address):
|
||||
if address.protocols.isOk:
|
||||
return WebSockets.match(address)
|
||||
|
||||
proc new*(
|
||||
T: typedesc[WsTransport],
|
||||
upgrade: Upgrade,
|
||||
tlsPrivateKey: TLSPrivateKey,
|
||||
tlsCertificate: TLSCertificate,
|
||||
tlsFlags: set[TLSFlags] = {},
|
||||
flags: set[ServerFlags] = {},
|
||||
factories: openArray[ExtFactory] = [],
|
||||
rng: Rng = nil): T =
|
||||
|
||||
T(
|
||||
upgrader: upgrade,
|
||||
tlsPrivateKey: tlsPrivateKey,
|
||||
tlsCertificate: tlsCertificate,
|
||||
tlsFlags: tlsFlags,
|
||||
flags: flags,
|
||||
factories: @factories,
|
||||
rng: rng)
|
||||
|
||||
proc new*(
|
||||
T: typedesc[WsTransport],
|
||||
upgrade: Upgrade,
|
||||
flags: set[ServerFlags] = {},
|
||||
factories: openArray[ExtFactory] = [],
|
||||
rng: Rng = nil): T =
|
||||
|
||||
T.new(
|
||||
upgrade = upgrade,
|
||||
tlsPrivateKey = nil,
|
||||
tlsCertificate = nil,
|
||||
flags = flags,
|
||||
factories = @factories,
|
||||
rng = rng)
|
|
@ -19,17 +19,18 @@ else:
|
|||
import posix
|
||||
|
||||
const
|
||||
TRANSPMA* = mapOr(
|
||||
mapAnd(IP, mapEq("udp")),
|
||||
mapAnd(IP, mapEq("tcp")),
|
||||
mapAnd(mapEq("unix"))
|
||||
RTRANSPMA* = mapOr(
|
||||
TCP,
|
||||
WebSockets,
|
||||
UNIX
|
||||
)
|
||||
|
||||
RTRANSPMA* = mapOr(
|
||||
mapAnd(IP, mapEq("tcp")),
|
||||
mapAnd(mapEq("unix"))
|
||||
TRANSPMA* = mapOr(
|
||||
RTRANSPMA,
|
||||
UDP
|
||||
)
|
||||
|
||||
|
||||
proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress] =
|
||||
## Initialize ``TransportAddress`` with MultiAddress ``ma``.
|
||||
##
|
||||
|
|
|
@ -11,14 +11,24 @@ import ../libp2p/[stream/connection,
|
|||
|
||||
import ./helpers
|
||||
|
||||
proc commonTransportTest*(transportType: typedesc[Transport], ma: string) =
|
||||
suite $transportType & " common":
|
||||
type TransportProvider* = proc(): Transport {.gcsafe.}
|
||||
|
||||
proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
|
||||
suite name & " common tests":
|
||||
teardown:
|
||||
checkTrackers()
|
||||
|
||||
asyncTest "can handle local address":
|
||||
let ma: MultiAddress = Multiaddress.init(ma).tryGet()
|
||||
let transport1 = prov()
|
||||
await transport1.start(ma)
|
||||
check transport1.handles(transport1.ma)
|
||||
await transport1.stop()
|
||||
|
||||
asyncTest "e2e: handle write":
|
||||
let ma: MultiAddress = Multiaddress.init(ma).tryGet()
|
||||
|
||||
let transport1: transportType = transportType.new(upgrade = Upgrade())
|
||||
let transport1 = prov()
|
||||
await transport1.start(ma)
|
||||
|
||||
proc acceptHandler() {.async, gcsafe.} =
|
||||
|
@ -28,23 +38,25 @@ proc commonTransportTest*(transportType: typedesc[Transport], ma: string) =
|
|||
|
||||
let handlerWait = acceptHandler()
|
||||
|
||||
let transport2: transportType = transportType.new(upgrade = Upgrade())
|
||||
let transport2 = prov()
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
var msg = newSeq[byte](6)
|
||||
await conn.readExactly(addr msg[0], 6)
|
||||
|
||||
await conn.close() #for some protocols, closing requires actively, so we must close here
|
||||
await handlerWait.wait(1.seconds) # when no issues will not wait that long!
|
||||
await conn.close() #for some protocols, closing requires actively reading, so we must close here
|
||||
|
||||
await transport2.stop()
|
||||
await transport1.stop()
|
||||
await allFuturesThrowing(
|
||||
allFinished(
|
||||
transport1.stop(),
|
||||
transport2.stop()))
|
||||
|
||||
check string.fromBytes(msg) == "Hello!"
|
||||
await handlerWait.wait(1.seconds) # when no issues will not wait that long!
|
||||
|
||||
asyncTest "e2e: handle read":
|
||||
let ma: MultiAddress = Multiaddress.init(ma).tryGet()
|
||||
let transport1: transportType = transportType.new(upgrade = Upgrade())
|
||||
asyncSpawn transport1.start(ma)
|
||||
let transport1 = prov()
|
||||
await transport1.start(ma)
|
||||
|
||||
proc acceptHandler() {.async, gcsafe.} =
|
||||
let conn = await transport1.accept()
|
||||
|
@ -55,35 +67,39 @@ proc commonTransportTest*(transportType: typedesc[Transport], ma: string) =
|
|||
|
||||
let handlerWait = acceptHandler()
|
||||
|
||||
let transport2: transportType = transportType.new(upgrade = Upgrade())
|
||||
let transport2 = prov()
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
await conn.write("Hello!")
|
||||
|
||||
await conn.close() #for some protocols, closing requires actively, so we must close here
|
||||
await conn.close() #for some protocols, closing requires actively reading, so we must close here
|
||||
await handlerWait.wait(1.seconds) # when no issues will not wait that long!
|
||||
|
||||
await transport2.stop()
|
||||
await transport1.stop()
|
||||
await allFuturesThrowing(
|
||||
allFinished(
|
||||
transport1.stop(),
|
||||
transport2.stop()))
|
||||
|
||||
asyncTest "e2e: handle dial cancellation":
|
||||
let ma: MultiAddress = Multiaddress.init(ma).tryGet()
|
||||
|
||||
let transport1: transportType = transportType.new(upgrade = Upgrade())
|
||||
let transport1 = prov()
|
||||
await transport1.start(ma)
|
||||
|
||||
let transport2: transportType = transportType.new(upgrade = Upgrade())
|
||||
let transport2 = prov()
|
||||
let cancellation = transport2.dial(transport1.ma)
|
||||
|
||||
await cancellation.cancelAndWait()
|
||||
check cancellation.cancelled
|
||||
|
||||
await transport2.stop()
|
||||
await transport1.stop()
|
||||
await allFuturesThrowing(
|
||||
allFinished(
|
||||
transport1.stop(),
|
||||
transport2.stop()))
|
||||
|
||||
asyncTest "e2e: handle accept cancellation":
|
||||
let ma: MultiAddress = Multiaddress.init(ma).tryGet()
|
||||
|
||||
let transport1: transportType = transportType.new(upgrade = Upgrade())
|
||||
let transport1 = prov()
|
||||
await transport1.start(ma)
|
||||
|
||||
let acceptHandler = transport1.accept()
|
||||
|
@ -91,3 +107,55 @@ proc commonTransportTest*(transportType: typedesc[Transport], ma: string) =
|
|||
check acceptHandler.cancelled
|
||||
|
||||
await transport1.stop()
|
||||
|
||||
asyncTest "e2e: stopping transport kills connections":
|
||||
let ma: MultiAddress = Multiaddress.init(ma).tryGet()
|
||||
|
||||
let transport1 = prov()
|
||||
await transport1.start(ma)
|
||||
|
||||
let transport2 = prov()
|
||||
|
||||
let acceptHandler = transport1.accept()
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
let serverConn = await acceptHandler
|
||||
|
||||
await allFuturesThrowing(
|
||||
allFinished(
|
||||
transport1.stop(),
|
||||
transport2.stop()))
|
||||
|
||||
check serverConn.closed()
|
||||
check conn.closed()
|
||||
|
||||
asyncTest "read or write on closed connection":
|
||||
let ma: MultiAddress = Multiaddress.init(ma).tryGet()
|
||||
let transport1 = prov()
|
||||
await transport1.start(ma)
|
||||
|
||||
proc acceptHandler() {.async, gcsafe.} =
|
||||
let conn = await transport1.accept()
|
||||
await conn.close()
|
||||
|
||||
let handlerWait = acceptHandler()
|
||||
|
||||
let conn = await transport1.dial(transport1.ma)
|
||||
|
||||
var msg = newSeq[byte](6)
|
||||
try:
|
||||
await conn.readExactly(addr msg[0], 6)
|
||||
check false
|
||||
except CatchableError as exc:
|
||||
check true
|
||||
|
||||
# we don't HAVE to throw on write on EOF
|
||||
# (at least TCP doesn't)
|
||||
try:
|
||||
await conn.write(msg)
|
||||
except CatchableError as exc:
|
||||
check true
|
||||
|
||||
await conn.close() #for some protocols, closing requires actively reading, so we must close here
|
||||
await handlerWait.wait(1.seconds) # when no issues will not wait that long!
|
||||
|
||||
await transport1.stop()
|
||||
|
|
|
@ -2,7 +2,7 @@ import options, tables
|
|||
import chronos, chronicles, stew/byteutils
|
||||
import helpers
|
||||
import ../libp2p
|
||||
import ../libp2p/[daemon/daemonapi, varint]
|
||||
import ../libp2p/[daemon/daemonapi, varint, transports/wstransport, crypto/crypto]
|
||||
|
||||
type
|
||||
# TODO: Unify both PeerInfo structs
|
||||
|
@ -288,6 +288,99 @@ suite "Interop":
|
|||
await daemonNode.close()
|
||||
await sleepAsync(1.seconds)
|
||||
|
||||
asyncTest "native -> daemon websocket connection":
|
||||
var protos = @["/test-stream"]
|
||||
var test = "TEST STRING"
|
||||
|
||||
var testFuture = newFuture[string]("test.future")
|
||||
proc nativeHandler(conn: Connection, proto: string) {.async.} =
|
||||
var line = string.fromBytes(await conn.readLp(1024))
|
||||
check line == test
|
||||
testFuture.complete(line)
|
||||
await conn.close()
|
||||
|
||||
# custom proto
|
||||
var proto = new LPProtocol
|
||||
proto.handler = nativeHandler
|
||||
proto.codec = protos[0] # codec
|
||||
|
||||
let wsAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0/ws").tryGet()
|
||||
|
||||
let nativeNode = SwitchBuilder
|
||||
.new()
|
||||
.withAddress(wsAddress)
|
||||
.withRng(crypto.newRng())
|
||||
.withMplex()
|
||||
.withTransport(proc (upgr: Upgrade): Transport = WsTransport.new(upgr))
|
||||
.withNoise()
|
||||
.build()
|
||||
|
||||
nativeNode.mount(proto)
|
||||
|
||||
let awaiters = await nativeNode.start()
|
||||
let nativePeer = nativeNode.peerInfo
|
||||
|
||||
let daemonNode = await newDaemonApi(hostAddresses = @[wsAddress])
|
||||
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
|
||||
var stream = await daemonNode.openStream(nativePeer.peerId, protos)
|
||||
discard await stream.transp.writeLp(test)
|
||||
|
||||
check test == (await wait(testFuture, 10.secs))
|
||||
|
||||
await stream.close()
|
||||
await nativeNode.stop()
|
||||
await allFutures(awaiters)
|
||||
await daemonNode.close()
|
||||
await sleepAsync(1.seconds)
|
||||
|
||||
asyncTest "daemon -> native websocket connection":
|
||||
var protos = @["/test-stream"]
|
||||
var test = "TEST STRING"
|
||||
# We are preparing expect string, which should be prefixed with varint
|
||||
# length and do not have `\r\n` suffix, because we going to use
|
||||
# readLine().
|
||||
var buffer = initVBuffer()
|
||||
buffer.writeSeq(test & "\r\n")
|
||||
buffer.finish()
|
||||
var expect = newString(len(buffer) - 2)
|
||||
copyMem(addr expect[0], addr buffer.buffer[0], len(expect))
|
||||
|
||||
let wsAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0/ws").tryGet()
|
||||
let nativeNode = SwitchBuilder
|
||||
.new()
|
||||
.withAddress(wsAddress)
|
||||
.withRng(crypto.newRng())
|
||||
.withMplex()
|
||||
.withTransport(proc (upgr: Upgrade): Transport = WsTransport.new(upgr))
|
||||
.withNoise()
|
||||
.build()
|
||||
|
||||
let awaiters = await nativeNode.start()
|
||||
|
||||
let daemonNode = await newDaemonApi(hostAddresses = @[wsAddress])
|
||||
let daemonPeer = await daemonNode.identity()
|
||||
|
||||
var testFuture = newFuture[string]("test.future")
|
||||
proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} =
|
||||
# We should perform `readLp()` instead of `readLine()`. `readLine()`
|
||||
# here reads actually length prefixed string.
|
||||
var line = await stream.transp.readLine()
|
||||
check line == expect
|
||||
testFuture.complete(line)
|
||||
await stream.close()
|
||||
|
||||
await daemonNode.addHandler(protos, daemonHandler)
|
||||
let conn = await nativeNode.dial(NativePeerInfo.init(daemonPeer.peer,
|
||||
daemonPeer.addresses),
|
||||
protos[0])
|
||||
await conn.writeLp(test & "\r\n")
|
||||
check expect == (await wait(testFuture, 10.secs))
|
||||
|
||||
await conn.close()
|
||||
await nativeNode.stop()
|
||||
await allFutures(awaiters)
|
||||
await daemonNode.close()
|
||||
|
||||
asyncTest "daemon -> multiple reads and writes":
|
||||
var protos = @["/test-stream"]
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@ import testmultibase,
|
|||
testpeerid
|
||||
|
||||
import testtcptransport,
|
||||
testwstransport,
|
||||
testmultistream,
|
||||
testbufferstream,
|
||||
testidentify,
|
||||
|
|
|
@ -125,4 +125,7 @@ suite "TCP transport":
|
|||
server.close()
|
||||
await server.join()
|
||||
|
||||
TcpTransport.commonTransportTest("/ip4/0.0.0.0/tcp/0")
|
||||
commonTransportTest(
|
||||
"TcpTransport",
|
||||
proc (): Transport = TcpTransport.new(upgrade = Upgrade()),
|
||||
"/ip4/0.0.0.0/tcp/0")
|
||||
|
|
|
@ -0,0 +1,88 @@
|
|||
{.used.}
|
||||
|
||||
import sequtils
|
||||
import chronos, stew/byteutils
|
||||
import ../libp2p/[stream/connection,
|
||||
transports/transport,
|
||||
transports/wstransport,
|
||||
upgrademngrs/upgrade,
|
||||
multiaddress,
|
||||
errors,
|
||||
wire]
|
||||
|
||||
import ./helpers, ./commontransport
|
||||
|
||||
const
|
||||
SecureKey* = """
|
||||
-----BEGIN PRIVATE KEY-----
|
||||
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCdNv0SX02aeZ4/
|
||||
Yc+p/Kwd5UVOHlpmK7/TVC/kcjFbdoUuKNn8pnX/fyhgSKpUYut+te7YRiZhqlaL
|
||||
EZKjfy8GBZwXZnJCevFkTvGTTebXXExLIsLGfJqKeLAdFCQkX8wV3jV1DT5JLV+D
|
||||
5+HWaiiBr38gsl4ZbfyedTF40JvzokCmcdlx9bpzX1j/b84L/zSwUyyEcgp5G28F
|
||||
Jh5TnxAeDHJpOVjr8XMb/xoNqiDF6NwF96hvOZC14mZ1TxxW5bUzXprsy0l52pmh
|
||||
dN3Crz11+t2h519hRKHxT6/l5pTx/+dApXiP6hMV04CQJNnas3NyRxTDR9dNel+3
|
||||
+wD7/PRTAgMBAAECggEBAJuXPEbegxMKog7gYoE9S6oaqchySc0sJyCjBPL2ANsg
|
||||
JRZV38cnh0hhNDh2MfxqGd7Bd6wbYQjvZ88iiRm+WW+ARcby4MnimtxHNNYwFvG0
|
||||
qt0BffqqftfkMYfV0x8coAJUdFtvy+DoQstsxhlJ3uTaJtrZLD/GlmjMWzXSX0Vy
|
||||
FXiLDO7/LoSjsjaf4e4aLofIyLJS3H1T+5cr/d2mdpRzkeWkxShODsK4cRLOlZ5I
|
||||
pz4Wm2770DTbiYph8ixl/CnmYn6T7V0F5VYujALknipUBeQY4e/A9vrQ/pvqJV+W
|
||||
JjFUne6Rxg/lJjh8vNJp2bK1ZbzpwmZLaZIoEz8t/qECgYEAzvCCA48uQPaurSQ3
|
||||
cvHDhcVwYmEaH8MW8aIW/5l8XJK60GsUHPFhEsfD/ObI5PJJ9aOqgabpRHkvD4ZY
|
||||
a8QJBxCy6UeogUeKvGks8VQ34SZXLimmgrL9Mlljv0v9PloEkVYbztYyX4GVO0ov
|
||||
3oH+hKO+/MclzNDyeXZx3Vv4K+UCgYEAwnyb7tqp7fRqm/8EymIZV5pa0p6h609p
|
||||
EhCBi9ii6d/ewEjsBhs7bPDBO4PO9ylvOvryYZH1hVbQja2anOCBjO8dAHRHWM86
|
||||
964TFriywBQkYxp6dsB8nUjLBDza2xAM3m+OGi9/ATuhEAe5sXp/fZL3tkfSaOXI
|
||||
A7Gzro+kS9cCgYEAtKScSfEeBlWQa9H2mV9UN5z/mtF61YkeqTW+b8cTGVh4vWEL
|
||||
wKww+gzqGAV6Duk2CLijKeSDMmO64gl7fC83VjSMiTklbhz+jbQeKFhFI0Sty71N
|
||||
/j+y6NXBTgdOfLRl0lzhj2/JrzdWBtie6tR9UloCaXSKmb04PTFY+kvDWsUCgYBR
|
||||
krJUnKJpi/qrM2tu93Zpp/QwIxkG+We4i/PKFDNApQVo4S0d4o4qQ1DJBZ/pSxe8
|
||||
RUUkZ3PzWVZgFlCjPAcadbBUYHEMbt7sw7Z98ToIFmqspo53AIVD8yQzwtKIz1KW
|
||||
eXPAx+sdOUV008ivCBIxOVNswPMfzED4S7Bxpw3iQQKBgGJhct2nBsgu0l2/wzh9
|
||||
tpKbalW1RllgptNQzjuBEZMTvPF0L+7BE09/exKtt4N9s3yAzi8o6Qo7RHX5djVc
|
||||
SNgafV4jj7jt2Ilh6KOy9dshtLoEkS1NmiqfVe2go2auXZdyGm+I2yzKWdKGDO0J
|
||||
diTtYf1sA0PgNXdSyDC03TZl
|
||||
-----END PRIVATE KEY-----
|
||||
"""
|
||||
|
||||
SecureCert* = """
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIIDazCCAlOgAwIBAgIUe9fr78Dz9PedQ5Sq0uluMWQhX9wwDQYJKoZIhvcNAQEL
|
||||
BQAwRTELMAkGA1UEBhMCSU4xEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
|
||||
GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yMTAzMTcwOTMzMzZaFw0zMTAz
|
||||
MTUwOTMzMzZaMEUxCzAJBgNVBAYTAklOMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw
|
||||
HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEB
|
||||
AQUAA4IBDwAwggEKAoIBAQCdNv0SX02aeZ4/Yc+p/Kwd5UVOHlpmK7/TVC/kcjFb
|
||||
doUuKNn8pnX/fyhgSKpUYut+te7YRiZhqlaLEZKjfy8GBZwXZnJCevFkTvGTTebX
|
||||
XExLIsLGfJqKeLAdFCQkX8wV3jV1DT5JLV+D5+HWaiiBr38gsl4ZbfyedTF40Jvz
|
||||
okCmcdlx9bpzX1j/b84L/zSwUyyEcgp5G28FJh5TnxAeDHJpOVjr8XMb/xoNqiDF
|
||||
6NwF96hvOZC14mZ1TxxW5bUzXprsy0l52pmhdN3Crz11+t2h519hRKHxT6/l5pTx
|
||||
/+dApXiP6hMV04CQJNnas3NyRxTDR9dNel+3+wD7/PRTAgMBAAGjUzBRMB0GA1Ud
|
||||
DgQWBBRkSY1AkGUpVNxG5fYocfgFODtQmTAfBgNVHSMEGDAWgBRkSY1AkGUpVNxG
|
||||
5fYocfgFODtQmTAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQBt
|
||||
D71VH7F8GOQXITFXCrHwEq1Fx3ScuSnL04NJrXw/e9huzLVQOchAYp/EIn4x2utN
|
||||
S31dt94wvi/IysOVbR1LatYNF5kKgGj2Wc6DH0PswBMk8R1G8QMeCz+hCjf1VDHe
|
||||
AAW1x2q20rJAvUrT6cRBQqeiMzQj0OaJbvfnd2hu0/d0DFkcuGVgBa2zlbG5rbdU
|
||||
Jnq7MQfSaZHd0uBgiKkS+Zw6XaYfWfByCAGSnUqRdOChiJ2stFVLvu+9oQ+PJjJt
|
||||
Er1u9bKTUyeuYpqXr2BP9dqphwu8R4NFVUg6DIRpMFMsybaL7KAd4hD22RXCvc0m
|
||||
uLu7KODi+eW62MHqs4N2
|
||||
-----END CERTIFICATE-----
|
||||
"""
|
||||
|
||||
suite "WebSocket transport":
|
||||
teardown:
|
||||
checkTrackers()
|
||||
|
||||
commonTransportTest(
|
||||
"WebSocket",
|
||||
proc (): Transport = WsTransport.new(Upgrade()),
|
||||
"/ip4/0.0.0.0/tcp/0/ws")
|
||||
|
||||
commonTransportTest(
|
||||
"WebSocket Secure",
|
||||
proc (): Transport =
|
||||
WsTransport.new(
|
||||
Upgrade(),
|
||||
TLSPrivateKey.init(SecureKey),
|
||||
TLSCertificate.init(SecureCert),
|
||||
{TLSFlags.NoVerifyHost, TLSFlags.NoVerifyServerName}),
|
||||
"/ip4/0.0.0.0/tcp/0/wss")
|
Loading…
Reference in New Issue