nim-libp2p/libp2p/transports/wstransport.nim

371 lines
9.7 KiB
Nim

# Nim-LibP2P
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
## WebSocket & WebSocket Secure transport implementation
{.push raises: [].}
import std/[sequtils]
import stew/results
import chronos, chronicles
import
transport,
../errors,
../wire,
../multicodec,
../multistream,
../connmanager,
../multiaddress,
../utility,
../stream/connection,
../upgrademngrs/upgrade,
websock/websock
logScope:
topics = "libp2p wstransport"
export transport, websock, results
const DefaultHeadersTimeout = 3.seconds
type WsStream = ref object of Connection
session: WSSession
method initStream*(s: WsStream) =
if s.objName.len == 0:
s.objName = "WsStream"
procCall Connection(s).initStream()
proc new*(
T: type WsStream,
session: WSSession,
dir: Direction,
observedAddr: Opt[MultiAddress],
timeout = 10.minutes,
): T =
let stream =
T(session: session, timeout: timeout, dir: dir, observedAddr: observedAddr)
stream.initStream()
return stream
template mapExceptions(body: untyped): untyped =
try:
body
except AsyncStreamIncompleteError:
raise newLPStreamIncompleteError()
except AsyncStreamLimitError:
raise newLPStreamLimitError()
except AsyncStreamUseClosedError:
raise newLPStreamEOFError()
except WSClosedError:
raise newLPStreamEOFError()
except WebSocketError:
raise newLPStreamEOFError()
except CatchableError:
raise newLPStreamEOFError()
method readOnce*(
s: WsStream, pbytes: pointer, nbytes: int
): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} =
let res = mapExceptions(await s.session.recv(pbytes, nbytes))
if res == 0 and s.session.readyState == ReadyState.Closed:
raise newLPStreamEOFError()
s.activity = true # reset activity flag
return res
method write*(
s: WsStream, msg: seq[byte]
): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} =
mapExceptions(await s.session.send(msg, Opcode.Binary))
s.activity = true # reset activity flag
method closeImpl*(s: WsStream): Future[void] {.async: (raises: []).} =
try:
await s.session.close()
except CatchableError:
discard
await procCall Connection(s).closeImpl()
method getWrapped*(s: WsStream): Connection =
nil
type WsTransport* = ref object of Transport
httpservers: seq[HttpServer]
wsserver: WSServer
connections: array[Direction, seq[WsStream]]
acceptFuts: seq[Future[HttpRequest]]
tlsPrivateKey: TLSPrivateKey
tlsCertificate: TLSCertificate
tlsFlags: set[TLSFlags]
flags: set[ServerFlags]
handshakeTimeout: Duration
factories: seq[ExtFactory]
rng: ref HmacDrbgContext
proc secure*(self: WsTransport): bool =
not (isNil(self.tlsPrivateKey) or isNil(self.tlsCertificate))
method start*(self: WsTransport, addrs: seq[MultiAddress]) {.async.} =
## listen on the transport
##
if self.running:
warn "WS transport already running"
return
await procCall Transport(self).start(addrs)
trace "Starting WS transport"
self.wsserver = WSServer.new(factories = self.factories, rng = self.rng)
for i, ma in addrs:
let isWss =
if WSS.match(ma):
if self.secure:
true
else:
warn "Trying to listen on a WSS address without setting certificate!"
false
else:
false
let httpserver =
if isWss:
TlsHttpServer.create(
address = ma.initTAddress().tryGet(),
tlsPrivateKey = self.tlsPrivateKey,
tlsCertificate = self.tlsCertificate,
flags = self.flags,
handshakeTimeout = self.handshakeTimeout,
)
else:
HttpServer.create(
ma.initTAddress().tryGet(), handshakeTimeout = self.handshakeTimeout
)
self.httpservers &= httpserver
let codec =
if isWss:
if ma.contains(multiCodec("tls")) == MaResult[bool].ok(true):
MultiAddress.init("/tls/ws")
else:
MultiAddress.init("/wss")
else:
MultiAddress.init("/ws")
# always get the resolved address in case we're bound to 0.0.0.0:0
self.addrs[i] =
MultiAddress.init(httpserver.localAddress()).tryGet() & codec.tryGet()
trace "Listening on", addresses = self.addrs
self.running = true
method stop*(self: WsTransport) {.async.} =
## stop the transport
##
self.running = false # mark stopped as soon as possible
try:
trace "Stopping WS transport"
await procCall Transport(self).stop() # call base
checkFutures(
await allFinished(
self.connections[Direction.In].mapIt(it.close()) &
self.connections[Direction.Out].mapIt(it.close())
)
)
var toWait: seq[Future[void]]
for fut in self.acceptFuts:
if not fut.finished:
toWait.add(fut.cancelAndWait())
elif fut.done:
toWait.add(fut.read().stream.closeWait())
for server in self.httpservers:
server.stop()
toWait.add(server.closeWait())
await allFutures(toWait)
self.httpservers = @[]
trace "Transport stopped"
except CatchableError as exc:
trace "Error shutting down ws transport", description = exc.msg
proc connHandler(
self: WsTransport, stream: WSSession, secure: bool, dir: Direction
): Future[Connection] {.async.} =
let observedAddr =
try:
let
codec =
if secure:
MultiAddress.init("/wss")
else:
MultiAddress.init("/ws")
remoteAddr = stream.stream.reader.tsource.remoteAddress
MultiAddress.init(remoteAddr).tryGet() & codec.tryGet()
except CatchableError as exc:
trace "Failed to create observedAddr", description = exc.msg
if not (isNil(stream) and stream.stream.reader.closed):
await stream.close()
raise exc
let conn = WsStream.new(stream, dir, Opt.some(observedAddr))
self.connections[dir].add(conn)
proc onClose() {.async.} =
await conn.session.stream.reader.join()
self.connections[dir].keepItIf(it != conn)
trace "Cleaned up client"
asyncSpawn onClose()
return conn
method accept*(self: WsTransport): Future[Connection] {.async.} =
## accept a new WS connection
##
if not self.running:
raise newTransportClosedError()
if self.acceptFuts.len <= 0:
self.acceptFuts = self.httpservers.mapIt(it.accept())
if self.acceptFuts.len <= 0:
return
let
finished = await one(self.acceptFuts)
index = self.acceptFuts.find(finished)
self.acceptFuts[index] = self.httpservers[index].accept()
try:
let req = await finished
try:
let
wstransp = await self.wsserver.handleRequest(req).wait(self.handshakeTimeout)
isSecure = self.httpservers[index].secure
return await self.connHandler(wstransp, isSecure, Direction.In)
except CatchableError as exc:
await req.stream.closeWait()
raise exc
except WebSocketError as exc:
debug "Websocket Error", description = exc.msg
except HttpError as exc:
debug "Http Error", description = exc.msg
except AsyncStreamError as exc:
debug "AsyncStream Error", description = exc.msg
except TransportTooManyError as exc:
debug "Too many files opened", description = exc.msg
except TransportAbortedError as exc:
debug "Connection aborted", description = exc.msg
except AsyncTimeoutError as exc:
debug "Timed out", description = exc.msg
except TransportUseClosedError as exc:
debug "Server was closed", description = exc.msg
raise newTransportClosedError(exc)
except CancelledError as exc:
raise exc
except TransportOsError as exc:
debug "OS Error", description = exc.msg
except CatchableError as exc:
info "Unexpected error accepting connection", description = exc.msg
raise exc
method dial*(
self: WsTransport,
hostname: string,
address: MultiAddress,
peerId: Opt[PeerId] = Opt.none(PeerId),
): Future[Connection] {.async.} =
## dial a peer
##
trace "Dialing remote peer", address = $address
let
secure = WSS.match(address)
transp = await WebSocket.connect(
address.initTAddress().tryGet(),
"",
secure = secure,
hostName = hostname,
flags = self.tlsFlags,
)
try:
return await self.connHandler(transp, secure, Direction.Out)
except CatchableError as exc:
await transp.close()
raise exc
method handles*(t: WsTransport, address: MultiAddress): bool {.gcsafe.} =
if procCall Transport(t).handles(address):
if address.protocols.isOk:
return WebSockets.match(address)
proc new*(
T: typedesc[WsTransport],
upgrade: Upgrade,
tlsPrivateKey: TLSPrivateKey,
tlsCertificate: TLSCertificate,
tlsFlags: set[TLSFlags] = {},
flags: set[ServerFlags] = {},
factories: openArray[ExtFactory] = [],
rng: ref HmacDrbgContext = nil,
handshakeTimeout = DefaultHeadersTimeout,
): T {.public.} =
## Creates a secure WebSocket transport
T(
upgrader: upgrade,
tlsPrivateKey: tlsPrivateKey,
tlsCertificate: tlsCertificate,
tlsFlags: tlsFlags,
flags: flags,
factories: @factories,
rng: rng,
handshakeTimeout: handshakeTimeout,
)
proc new*(
T: typedesc[WsTransport],
upgrade: Upgrade,
flags: set[ServerFlags] = {},
factories: openArray[ExtFactory] = [],
rng: ref HmacDrbgContext = nil,
handshakeTimeout = DefaultHeadersTimeout,
): T {.public.} =
## Creates a clear-text WebSocket transport
T.new(
upgrade = upgrade,
tlsPrivateKey = nil,
tlsCertificate = nil,
flags = flags,
factories = @factories,
rng = rng,
handshakeTimeout = handshakeTimeout,
)