diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3c84f5c6f..b53a4f2ec 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -77,7 +77,10 @@ jobs: key: ${{ runner.os }}-vendor-modules-${{ steps.submodules.outputs.hash }} - name: Make update - run: make update + run: | + make update + # TEMPORARY: apply nim-libp2p QUIC patches + bash temp/apply.sh - name: Build binaries run: make V=1 QUICK_AND_DIRTY_COMPILER=1 all @@ -118,7 +121,10 @@ jobs: key: ${{ runner.os }}-vendor-modules-${{ steps.submodules.outputs.hash }} - name: Make update - run: make update + run: | + make update + # TEMPORARY: apply nim-libp2p QUIC patches + bash temp/apply.sh - name: Run tests run: | diff --git a/.github/workflows/container-image.yml b/.github/workflows/container-image.yml index 2bc08be2f..f4309b289 100644 --- a/.github/workflows/container-image.yml +++ b/.github/workflows/container-image.yml @@ -66,6 +66,8 @@ jobs: if: ${{ steps.secrets.outcome == 'success' }} run: | make update + # TEMPORARY: apply nim-libp2p QUIC patches + bash temp/apply.sh make -j${NPROC} V=1 QUICK_AND_DIRTY_COMPILER=1 NIMFLAGS="-d:disableMarchNative -d:postgres -d:chronicles_colors:none" wakunode2 diff --git a/temp/apply.sh b/temp/apply.sh new file mode 100755 index 000000000..cc2705d5b --- /dev/null +++ b/temp/apply.sh @@ -0,0 +1,37 @@ +#!/bin/bash +# Apply vendor patches for QUIC support. +# Run from the logos-delivery root directory: +# bash temp/apply.sh +# +# This copies patched files from temp/vendor/ on top of vendor/. +# Run this AFTER `make update` to re-apply patches over fresh vendor state. +# +# nim-libp2p patches: +# - quictransport.nim: getStreams, remote-close propagation, session.closed guard +# - muxer.nim: base getStreams returns @[] instead of raising +# - switch.nim: imports quictransport for vtable registration +# +# nim-lsquic patches: +# - stream.nim: doProcess() on immediate write path (fixes stalled sends) +# - context/context.nim: nil guard in makeStream +# - context/client.nim: nil lsquicConn on connection close (prevents dangling pointer) +# - context/server.nim: nil lsquicConn on connection close (prevents dangling pointer) + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ROOT_DIR="$(cd "$SCRIPT_DIR/.." && pwd)" + +echo "Applying nim-libp2p QUIC patches from $SCRIPT_DIR/vendor/ to $ROOT_DIR/vendor/" + +find "$SCRIPT_DIR/vendor" -type f | while read src; do + dst="$ROOT_DIR/${src#$SCRIPT_DIR/}" + if [ -f "$dst" ]; then + cp "$src" "$dst" + echo " patched: ${dst#$ROOT_DIR/}" + else + echo " WARNING: target not found: ${dst#$ROOT_DIR/}" + fi +done + +echo "Done. Now rebuild: make wakunode2" diff --git a/temp/vendor/nim-libp2p/libp2p/muxers/muxer.nim b/temp/vendor/nim-libp2p/libp2p/muxers/muxer.nim new file mode 100644 index 000000000..da5dfee65 --- /dev/null +++ b/temp/vendor/nim-libp2p/libp2p/muxers/muxer.nim @@ -0,0 +1,69 @@ +# SPDX-License-Identifier: Apache-2.0 OR MIT +# Copyright (c) Status Research & Development GmbH + +{.push raises: [].} + +import chronos, chronicles +import ../stream/connection, ../errors + +logScope: + topics = "libp2p muxer" + +const DefaultChanTimeout* = 5.minutes + +type + MuxerError* = object of LPError + TooManyChannels* = object of MuxerError + + StreamHandler* = proc(conn: Connection): Future[void] {.async: (raises: []).} + MuxerHandler* = proc(muxer: Muxer): Future[void] {.async: (raises: []).} + + Muxer* = ref object of RootObj + streamHandler*: StreamHandler + handler*: Future[void].Raising([]) + connection*: Connection + + # user provider proc that returns a constructed Muxer + MuxerConstructor* = proc(conn: Connection): Muxer {.gcsafe, closure, raises: [].} + + # this wraps a creator proc that knows how to make muxers + MuxerProvider* = object + newMuxer*: MuxerConstructor + codec*: string + +func shortLog*(m: Muxer): auto = + if m == nil: + "nil" + else: + shortLog(m.connection) + +chronicles.formatIt(Muxer): + shortLog(it) + +# muxer interface +method newStream*( + m: Muxer, name: string = "", lazy: bool = false +): Future[Connection] {. + base, async: (raises: [CancelledError, LPStreamError, MuxerError], raw: true) +.} = + raiseAssert("[Muxer.newStream] abstract method not implemented!") + +when defined(libp2p_agents_metrics): + method setShortAgent*(m: Muxer, shortAgent: string) {.base, gcsafe.} = + m.connection.shortAgent = shortAgent + +method close*(m: Muxer) {.base, async: (raises: []).} = + if m.connection != nil: + await m.connection.close() + +method handle*(m: Muxer): Future[void] {.base, async: (raises: []).} = + discard + +proc new*( + T: typedesc[MuxerProvider], creator: MuxerConstructor, codec: string +): T {.gcsafe.} = + let muxerProvider = T(newMuxer: creator, codec: codec) + muxerProvider + +method getStreams*(m: Muxer): seq[Connection] {.base, gcsafe.} = + @[] diff --git a/temp/vendor/nim-libp2p/libp2p/switch.nim b/temp/vendor/nim-libp2p/libp2p/switch.nim new file mode 100644 index 000000000..0a261d9f9 --- /dev/null +++ b/temp/vendor/nim-libp2p/libp2p/switch.nim @@ -0,0 +1,432 @@ +# SPDX-License-Identifier: Apache-2.0 OR MIT +# Copyright (c) Status Research & Development GmbH + +## The switch is the core of libp2p, which brings together the +## transports, the connection manager, the upgrader and other +## parts to allow programs to use libp2p + +{.push raises: [].} + +import std/[tables, options, sequtils, sets, oids] + +import chronos, chronicles, metrics + +import + stream/connection, + transports/transport, + transports/tcptransport, + transports/quictransport, + upgrademngrs/upgrade, + multistream, + multiaddress, + protocols/protocol, + protocols/secure/secure, + peerinfo, + utils/semaphore, + ./muxers/muxer, + connmanager, + nameresolving/nameresolver, + peerid, + peerstore, + errors, + utility, + dialer + +export connmanager, upgrade, dialer, peerstore + +logScope: + topics = "libp2p switch" + +#TODO: General note - use a finite state machine to manage the different +# steps of connections establishing and upgrading. This makes everything +# more robust and less prone to ordering attacks - i.e. muxing can come if +# and only if the channel has been secured (i.e. if a secure manager has been +# previously provided) + +const ConcurrentUpgrades* = 4 + +type + Switch* {.public.} = ref object of Dial + peerInfo*: PeerInfo + connManager*: ConnManager + transports*: seq[Transport] + ms*: MultistreamSelect + acceptFuts: seq[Future[void]] + dialer*: Dial + peerStore*: PeerStore + nameResolver*: NameResolver + started: bool + services*: seq[Service] + + UpgradeError* = object of LPError + + Service* = ref object of RootObj + inUse: bool + +method setup*( + self: Service, switch: Switch +): Future[bool] {.base, async: (raises: [CancelledError]).} = + if self.inUse: + warn "service setup has already been called" + return false + self.inUse = true + return true + +method run*(self: Service, switch: Switch) {.base, async: (raises: [CancelledError]).} = + doAssert(false, "[Service.run] abstract method not implemented!") + +method stop*( + self: Service, switch: Switch +): Future[bool] {.base, async: (raises: [CancelledError]).} = + if not self.inUse: + warn "service is already stopped" + return false + self.inUse = false + return true + +proc addConnEventHandler*( + s: Switch, handler: ConnEventHandler, kind: ConnEventKind +) {.public.} = + ## Adds a ConnEventHandler, which will be triggered when + ## a connection to a peer is created or dropped. + ## There may be multiple connections per peer. + ## + ## The handler should not raise. + s.connManager.addConnEventHandler(handler, kind) + +proc removeConnEventHandler*( + s: Switch, handler: ConnEventHandler, kind: ConnEventKind +) {.public.} = + s.connManager.removeConnEventHandler(handler, kind) + +proc addPeerEventHandler*( + s: Switch, handler: PeerEventHandler, kind: PeerEventKind +) {.public.} = + ## Adds a PeerEventHandler, which will be triggered when + ## a peer connects or disconnects from us. + ## + ## The handler should not raise. + s.connManager.addPeerEventHandler(handler, kind) + +proc removePeerEventHandler*( + s: Switch, handler: PeerEventHandler, kind: PeerEventKind +) {.public.} = + s.connManager.removePeerEventHandler(handler, kind) + +method addTransport*(s: Switch, t: Transport) = + s.transports &= t + s.dialer.addTransport(t) + +proc connectedPeers*(s: Switch, dir: Direction): seq[PeerId] = + s.connManager.connectedPeers(dir) + +proc isConnected*(s: Switch, peerId: PeerId): bool {.public.} = + ## returns true if the peer has one or more + ## associated connections + ## + + peerId in s.connManager + +proc disconnect*( + s: Switch, peerId: PeerId +) {.public, async: (raises: [CancelledError]).} = + ## Disconnect from a peer, waiting for the connection(s) to be dropped + await s.connManager.dropPeer(peerId) + +method connect*( + s: Switch, + peerId: PeerId, + addrs: seq[MultiAddress], + forceDial = false, + reuseConnection = true, + dir = Direction.Out, +): Future[void] {. + public, async: (raises: [DialFailedError, CancelledError], raw: true) +.} = + ## Connects to a peer without opening a stream to it + + s.dialer.connect(peerId, addrs, forceDial, reuseConnection, dir) + +method connect*( + s: Switch, address: MultiAddress, allowUnknownPeerId = false +): Future[PeerId] {.async: (raises: [DialFailedError, CancelledError], raw: true).} = + ## Connects to a peer and retrieve its PeerId + ## + ## If the P2P part is missing from the MA and `allowUnknownPeerId` is set + ## to true, this will discover the PeerId while connecting. This exposes + ## you to MiTM attacks, so it shouldn't be used without care! + + s.dialer.connect(address, allowUnknownPeerId) + +method dial*( + s: Switch, peerId: PeerId, protos: seq[string] +): Future[Connection] {. + public, async: (raises: [DialFailedError, CancelledError], raw: true) +.} = + ## Open a stream to a connected peer with the specified `protos` + + s.dialer.dial(peerId, protos) + +proc dial*( + s: Switch, peerId: PeerId, proto: string +): Future[Connection] {. + public, async: (raises: [DialFailedError, CancelledError], raw: true) +.} = + ## Open a stream to a connected peer with the specified `proto` + + dial(s, peerId, @[proto]) + +method dial*( + s: Switch, + peerId: PeerId, + addrs: seq[MultiAddress], + protos: seq[string], + forceDial = false, +): Future[Connection] {. + public, async: (raises: [DialFailedError, CancelledError], raw: true) +.} = + ## Connected to a peer and open a stream + ## with the specified `protos` + + s.dialer.dial(peerId, addrs, protos, forceDial) + +proc dial*( + s: Switch, peerId: PeerId, addrs: seq[MultiAddress], proto: string +): Future[Connection] {. + public, async: (raises: [DialFailedError, CancelledError], raw: true) +.} = + ## Connected to a peer and open a stream + ## with the specified `proto` + + dial(s, peerId, addrs, @[proto]) + +proc mount*[T: LPProtocol]( + s: Switch, proto: T, matcher: Matcher = nil +) {.gcsafe, raises: [LPError], public.} = + ## mount a protocol to the switch + + if isNil(proto.handler): + raise newException(LPError, "Protocol has to define a handle method or proc") + + if proto.codec.len == 0: + raise newException(LPError, "Protocol has to define a codec string") + + if s.started and not proto.started: + raise newException(LPError, "Protocol not started") + + s.ms.addHandler(proto.codecs, proto, matcher) + s.peerInfo.protocols.add(proto.codec) + +proc upgrader( + switch: Switch, trans: Transport, conn: Connection +) {.async: (raises: [CancelledError, UpgradeError]).} = + try: + let muxed = await trans.upgrade(conn, Opt.none(PeerId)) + switch.connManager.storeMuxer(muxed) + await switch.peerStore.identify(muxed, conn.transportDir) + await switch.connManager.triggerPeerEvents( + muxed.connection.peerId, + PeerEvent(kind: PeerEventKind.Identified, initiator: false), + ) + except CancelledError as e: + raise e + except CatchableError as e: + raise newException(UpgradeError, "catchable error upgrader: " & e.msg, e) + +proc upgradeMonitor( + switch: Switch, trans: Transport, conn: Connection, upgrades: AsyncSemaphore +) {.async: (raises: []).} = + var upgradeSuccessful = false + try: + await switch.upgrader(trans, conn).wait(30.seconds) + trace "Connection upgrade succeeded" + upgradeSuccessful = true + except CancelledError: + trace "Connection upgrade cancelled", conn + except AsyncTimeoutError: + trace "Connection upgrade timeout", conn + libp2p_failed_upgrades_incoming.inc() + except UpgradeError as e: + trace "Connection upgrade failed", description = e.msg, conn + libp2p_failed_upgrades_incoming.inc() + finally: + if (not upgradeSuccessful) and (not isNil(conn)): + await conn.close() + try: + upgrades.release() + except AsyncSemaphoreError: + raiseAssert "semaphore released without acquire" + +proc accept(s: Switch, transport: Transport) {.async: (raises: []).} = + ## switch accept loop, ran for every transport + ## + let upgrades = newAsyncSemaphore(ConcurrentUpgrades) + + while transport.running: + try: + await upgrades.acquire() # first wait for an upgrade slot to become available + except CancelledError: + return + + var conn: Connection + try: + debug "About to accept incoming connection" + # remember to always release the slot when + # the upgrade succeeds or fails, this is + # currently done by the `upgradeMonitor` + let slot = await s.connManager.getIncomingSlot() + conn = + try: + await transport.accept() + except CancelledError as exc: + slot.release() + raise exc + except CatchableError as exc: + slot.release() + raise + newException(CatchableError, "failed to accept connection: " & exc.msg, exc) + if isNil(conn): + # A nil connection means that we might have hit a + # file-handle limit (or another non-fatal error), + # we can get one on the next try + debug "Unable to get a connection" + slot.release() + try: + upgrades.release() + except AsyncSemaphoreError: + raiseAssert "semaphore released without acquire" + continue + + slot.trackConnection(conn) + + # set the direction of this bottom level transport + # in order to be able to consume this information in gossipsub if required + # gossipsub gives priority to connections we make + conn.transportDir = Direction.In + + debug "Accepted an incoming connection", conn + asyncSpawn s.upgradeMonitor(transport, conn, upgrades) + except CancelledError: + try: + upgrades.release() + except AsyncSemaphoreError: + raiseAssert "semaphore released without acquire" + return + except CatchableError as exc: + error "Exception in accept loop, exiting", description = exc.msg + if not isNil(conn): + await conn.close() + try: + upgrades.release() + except AsyncSemaphoreError: + raiseAssert "semaphore released without acquire" + return + +proc stop*(s: Switch) {.public, async: (raises: [CancelledError]).} = + ## Stop listening on every transport, and + ## close every active connections + + trace "Stopping switch" + + s.started = false + + try: + # Stop accepting incoming connections + await allFutures(s.acceptFuts.mapIt(it.cancelAndWait())).wait(1.seconds) + except CatchableError as exc: + debug "Cannot cancel accepts", description = exc.msg + + for service in s.services: + discard await service.stop(s) + + # close and cleanup all connections + await s.connManager.close() + + for transp in s.transports: + try: + await transp.stop() + except CancelledError as exc: + raise exc + except CatchableError as exc: + warn "error cleaning up transports", description = exc.msg + + await s.ms.stop() + + trace "Switch stopped" + +proc start*(s: Switch) {.public, async: (raises: [CancelledError, LPError]).} = + ## Start listening on every transport + + if s.started: + warn "Switch has already been started" + return + + debug "starting switch for peer", peerInfo = s.peerInfo + var startFuts: seq[Future[void]] + for t in s.transports: + let addrs = s.peerInfo.listenAddrs.filterIt(t.handles(it)) + + s.peerInfo.listenAddrs.keepItIf(it notin addrs) + + if addrs.len > 0 or t.running: + let fut = t.start(addrs) + startFuts.add(fut) + if t of TcpTransport: + await fut + s.acceptFuts.add(s.accept(t)) + s.peerInfo.listenAddrs &= t.addrs + + # some transports require some services to be running + # in order to finish their startup process + for service in s.services: + discard await service.setup(s) + + await allFutures(startFuts) + + for fut in startFuts: + if fut.failed: + await s.stop() + raise newException( + LPError, "starting transports failed: " & $fut.error.msg, fut.error + ) + + for t in s.transports: # for each transport + if t.addrs.len > 0 or t.running: + if t of TcpTransport: + continue # already added previously + s.acceptFuts.add(s.accept(t)) + s.peerInfo.listenAddrs &= t.addrs + + await s.peerInfo.update() + await s.ms.start() + s.started = true + + debug "Started libp2p node", peer = s.peerInfo + +proc newSwitch*( + peerInfo: PeerInfo, + transports: seq[Transport], + secureManagers: openArray[Secure] = [], + connManager: ConnManager, + ms: MultistreamSelect, + peerStore: PeerStore, + nameResolver: NameResolver = nil, + services = newSeq[Service](), +): Switch {.raises: [LPError].} = + if secureManagers.len == 0: + raise newException(LPError, "Provide at least one secure manager") + + let switch = Switch( + peerInfo: peerInfo, + ms: ms, + transports: transports, + connManager: connManager, + peerStore: peerStore, + dialer: + Dialer.new(peerInfo.peerId, connManager, peerStore, transports, nameResolver), + nameResolver: nameResolver, + services: services, + ) + + switch.connManager.peerStore = peerStore + return switch diff --git a/temp/vendor/nim-libp2p/libp2p/transports/quictransport.nim b/temp/vendor/nim-libp2p/libp2p/transports/quictransport.nim new file mode 100644 index 000000000..0f9f6f66f --- /dev/null +++ b/temp/vendor/nim-libp2p/libp2p/transports/quictransport.nim @@ -0,0 +1,455 @@ +# SPDX-License-Identifier: Apache-2.0 OR MIT +# Copyright (c) Status Research & Development GmbH + +import std/[sequtils, sets] +import chronos, chronicles, metrics, results +import lsquic +import + ../wire, + ../multiaddress, + ../multicodec, + ../muxers/muxer, + ../stream/connection, + ../upgrademngrs/upgrade +import ./transport +import tls/certificate + +export multiaddress +export multicodec +export connection +export transport + +logScope: + topics = "libp2p quictransport" + +type + P2PConnection = connection.Connection + QuicConnection = lsquic.Connection + QuicTransportError* = object of transport.TransportError + QuicTransportDialError* = object of transport.TransportDialError + QuicTransportAcceptStopped* = object of QuicTransportError + + QuicStream* = ref object of P2PConnection + session: QuicSession + stream: Stream + + QuicSession* = ref object of P2PConnection + connection: QuicConnection + streams: seq[QuicStream] + +const alpn = "libp2p" + +initializeLsquic() + +proc new( + _: type QuicStream, + stream: Stream, + dir: Direction, + session: QuicSession, + oaddr: Opt[MultiAddress], + laddr: Opt[MultiAddress], + peerId: PeerId, +): QuicStream = + let quicstream = QuicStream( + session: session, + stream: stream, + observedAddr: oaddr, + localAddr: laddr, + peerId: peerId, + ) + quicstream.objName = "QuicStream" + quicstream.dir = dir + procCall P2PConnection(quicstream).initStream() + quicstream + +method getWrapped*(self: QuicStream): P2PConnection = + self + +method readOnce*( + stream: QuicStream, pbytes: pointer, nbytes: int +): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} = + if stream.atEof: + raise newLPStreamRemoteClosedError() + + let readLen = + try: + await stream.stream.readOnce(cast[ptr byte](pbytes), nbytes) + except StreamError as e: + raise (ref LPStreamError)(msg: "error in readOnce: " & e.msg, parent: e) + + if readLen == 0: + stream.isEof = true + return 0 + + stream.activity = true + libp2p_network_bytes.inc(readLen.int64, labelValues = ["in"]) + return readLen + +method write*( + stream: QuicStream, bytes: seq[byte] +) {.async: (raises: [CancelledError, LPStreamError]).} = + try: + await stream.stream.write(bytes) + libp2p_network_bytes.inc(bytes.len.int64, labelValues = ["out"]) + except StreamError: + raise newLPStreamRemoteClosedError() + +method closeWrite*(stream: QuicStream) {.async: (raises: []).} = + ## Close the write side of the QUIC stream + try: + await stream.stream.close() + except CancelledError, StreamError: + discard + +method closeImpl*(stream: QuicStream) {.async: (raises: []).} = + try: + await stream.stream.close() + except CancelledError, StreamError: + discard + await procCall P2PConnection(stream).closeImpl() + +# Session +method closed*(session: QuicSession): bool {.raises: [].} = + procCall P2PConnection(session).isClosed or session.connection.isClosed + +method close*(session: QuicSession) {.async: (raises: []).} = + await noCancel allFutures(session.streams.mapIt(it.close())) + session.connection.close() + await procCall P2PConnection(session).close() + +proc getStream( + session: QuicSession, direction = Direction.In +): Future[QuicStream] {.async: (raises: [CancelledError, ConnectionError]).} = + if session.closed: + raise newException(ConnectionClosedError, "session is closed") + + var stream: Stream + case direction + of Direction.In: + stream = await session.connection.incomingStream() + of Direction.Out: + stream = await session.connection.openStream() + + let qs = QuicStream.new( + stream, direction, session, session.observedAddr, session.localAddr, session.peerId + ) + when defined(libp2p_agents_metrics): + qs.shortAgent = session.shortAgent + + # Inherit transportDir from parent session for GossipSub outbound peer tracking + qs.transportDir = session.transportDir + + session.streams.add(qs) + return qs + +method getWrapped*(self: QuicSession): P2PConnection = + self + +# Muxer +type QuicMuxer* = ref object of Muxer + session: QuicSession + handleFut: Future[void] + +proc new*( + _: type QuicMuxer, conn: P2PConnection, peerId: Opt[PeerId] = Opt.none(PeerId) +): QuicMuxer {.raises: [CertificateParsingError, LPError].} = + let session = QuicSession(conn) + session.peerId = peerId.valueOr: + let certificates = session.connection.certificates() + if certificates.len != 1: + raise (ref QuicTransportError)(msg: "expected one certificate in connection") + let cert = parse(certificates[0]) + cert.peerId() + QuicMuxer(session: session, connection: conn) + +when defined(libp2p_agents_metrics): + method setShortAgent*(m: QuicMuxer, shortAgent: string) = + m.session.shortAgent = shortAgent + for s in m.session.streams: + s.shortAgent = shortAgent + m.connection.shortAgent = shortAgent + +method newStream*( + m: QuicMuxer, name: string = "", lazy: bool = false +): Future[P2PConnection] {. + async: (raises: [CancelledError, LPStreamError, MuxerError]) +.} = + try: + return await m.session.getStream(Direction.Out) + except ConnectionError as e: + raise newException(MuxerError, "error in newStream: " & e.msg, e) + +method handle*(m: QuicMuxer): Future[void] {.async: (raises: []).} = + proc handleStream(stream: QuicStream) {.async: (raises: []).} = + ## call the muxer stream handler for this channel + ## + await m.streamHandler(stream) + trace "finished handling stream" + doAssert(stream.closed, "connection not closed by handler!") + + while not (m.session.atEof or m.session.closed): + try: + let stream = await m.session.getStream(Direction.In) + asyncSpawn handleStream(stream) + except ConnectionClosedError: + break # stop handling, connection was closed + except CancelledError: + continue # keep handling, until connection is closed + except ConnectionError as e: + # keep handling, until connection is closed. + # this stream failed but we need to keep handling for other streams. + trace "QuicMuxer.handler got error while opening stream", msg = e.msg + + if not m.session.isClosed: + await m.session.close() + +method getStreams*(m: QuicMuxer): seq[P2PConnection] {.gcsafe.} = + for s in m.session.streams: + result.add(P2PConnection(s)) + +method close*(m: QuicMuxer) {.async: (raises: []).} = + try: + await m.session.close() + if not isNil(m.handleFut): + m.handleFut.cancelSoon() + except CatchableError: + discard + +# Transport +type QuicUpgrade = ref object of Upgrade + +type CertGenerator = + proc(kp: KeyPair): CertificateX509 {.gcsafe, raises: [TLSCertificateError].} + +type QuicTransport* = ref object of Transport + listener: Listener + client: Opt[QuicClient] + privateKey: PrivateKey + connections: seq[P2PConnection] + rng: ref HmacDrbgContext + certGenerator: CertGenerator + +proc makeCertificateVerifier(): CertificateVerifier = + proc certificateVerifier(serverName: string, certificatesDer: seq[seq[byte]]): bool = + if certificatesDer.len != 1: + trace "CertificateVerifier: expected one certificate in the chain", + cert_count = certificatesDer.len + return false + + let cert = + try: + parse(certificatesDer[0]) + except CertificateParsingError as e: + trace "CertificateVerifier: failed to parse certificate", msg = e.msg + return false + + return cert.verify() + + return CustomCertificateVerifier.init(certificateVerifier) + +proc defaultCertGenerator( + kp: KeyPair +): CertificateX509 {.gcsafe, raises: [TLSCertificateError].} = + return generateX509(kp, encodingFormat = EncodingFormat.PEM) + +proc new*(_: type QuicTransport, u: Upgrade, privateKey: PrivateKey): QuicTransport = + let self = QuicTransport( + upgrader: QuicUpgrade(ms: u.ms), + privateKey: privateKey, + certGenerator: defaultCertGenerator, + ) + procCall Transport(self).initialize() + self + +proc new*( + _: type QuicTransport, + u: Upgrade, + privateKey: PrivateKey, + certGenerator: CertGenerator, +): QuicTransport = + let self = QuicTransport( + upgrader: QuicUpgrade(ms: u.ms), + privateKey: privateKey, + certGenerator: certGenerator, + ) + procCall Transport(self).initialize() + self + +method handles*(transport: QuicTransport, address: MultiAddress): bool {.raises: [].} = + if not procCall Transport(transport).handles(address): + return false + QUIC_V1.match(address) + +proc makeConfig(self: QuicTransport): TLSConfig = + let pubkey = self.privateKey.getPublicKey().valueOr: + raiseAssert "could not obtain public key" + + let cert = self.certGenerator(KeyPair(seckey: self.privateKey, pubkey: pubkey)) + let certVerifier = makeCertificateVerifier() + let tlsConfig = TLSConfig.new( + cert.certificate, cert.privateKey, @[alpn].toHashSet(), Opt.some(certVerifier) + ) + return tlsConfig + +proc getRng(self: QuicTransport): ref HmacDrbgContext = + if self.rng.isNil: + self.rng = newRng() + + return self.rng + +proc toMultiAddress(ta: TransportAddress): MultiAddress {.raises: [MaError].} = + ## Returns quic MultiAddress from TransportAddress + MultiAddress.init(ta, IPPROTO_UDP).get() & MultiAddress.init("/quic-v1").get() + +method start*( + self: QuicTransport, addrs: seq[MultiAddress] +) {.async: (raises: [LPError, transport.TransportError, CancelledError]).} = + doAssert self.listener.isNil, "start() already called" + # TODO(#1663): handle multiple addr + + try: + let server = QuicServer.new(self.makeConfig()) + self.listener = server.listen(initTAddress(addrs[0]).tryGet) + let listenMA = @[toMultiAddress(self.listener.localAddress())] + await procCall Transport(self).start(listenMA) + except QuicConfigError as exc: + raiseAssert "invalid quic setup: " & $exc.msg + except TLSCertificateError as exc: + raise (ref QuicTransportError)( + msg: "tlscert error in quic start: " & exc.msg, parent: exc + ) + except QuicError as exc: + raise + (ref QuicTransportError)(msg: "quicerror in quic start: " & exc.msg, parent: exc) + except TransportOsError as exc: + raise (ref QuicTransportError)( + msg: "transport error in quic start: " & exc.msg, parent: exc + ) + +method stop*(transport: QuicTransport) {.async: (raises: []).} = + let futs = transport.connections.mapIt(it.close()) + await noCancel allFutures(futs) + + if not transport.listener.isNil: + try: + await transport.listener.stop() + except CatchableError as exc: + trace "Error shutting down Quic transport", description = exc.msg + transport.listener = nil + + transport.client.withValue(client): + await noCancel client.stop() + + transport.client = Opt.none(QuicClient) + await procCall Transport(transport).stop() + +proc wrapConnection( + transport: QuicTransport, connection: QuicConnection, transportDir: Direction +): QuicSession {.raises: [TransportOsError].} = + var observedAddr: MultiAddress + var localAddr: MultiAddress + try: + observedAddr = toMultiAddress(connection.remoteAddress()) + localAddr = toMultiAddress(connection.localAddress()) + except MaError as e: + raiseAssert "Multiaddr Error" & e.msg + + let session = QuicSession( + dir: transportDir, + objName: "QuicSession", + connection: connection, + observedAddr: Opt.some(observedAddr), + localAddr: Opt.some(localAddr), + ) + session.initStream() + + # Set the transport direction for outbound peer tracking in GossipSub 1.1 + session.transportDir = transportDir + + transport.connections.add(session) + + proc onClose() {.async: (raises: []).} = + await noCancel session.join() + transport.connections.keepItIf(it != session) + trace "Cleaned up client" + + asyncSpawn onClose() + + return session + +method accept*( + self: QuicTransport +): Future[connection.Connection] {. + async: (raises: [transport.TransportError, CancelledError]) +.} = + if not self.running: + # stop accept only when transport is stopped (not when error occurs) + raise newException(QuicTransportAcceptStopped, "Quic transport stopped") + + doAssert not self.listener.isNil, "call start() before calling accept()" + + try: + let connection = await self.listener.accept() + return self.wrapConnection(connection, Direction.In) + except QuicError as exc: + debug "Quic Error", description = exc.msg + except common.TransportError as exc: + debug "Transport Error", description = exc.msg + except TransportOsError as exc: + debug "OS Error", description = exc.msg + +method dial*( + self: QuicTransport, + hostname: string, + address: MultiAddress, + peerId: Opt[PeerId] = Opt.none(PeerId), +): Future[connection.Connection] {. + async: (raises: [transport.TransportError, CancelledError]) +.} = + let taAddress = + try: + initTAddress(address).tryGet + except LPError as e: + raise newException( + QuicTransportDialError, "error in quic dial: invald address: " & e.msg, e + ) + + try: + if not self.client.isSome: + self.client = Opt.some(QuicClient.new(self.makeConfig())) + + let client = self.client.get() + let quicConnection = await client.dial(taAddress) + return self.wrapConnection(quicConnection, Direction.Out) + except QuicConfigError as e: + raise newException( + QuicTransportDialError, "error in quic dial: invalid tls config:" & e.msg, e + ) + except TLSCertificateError as e: + raise newException( + QuicTransportDialError, "error in quic dial: tls certificate error:" & e.msg, e + ) + except TransportOsError as e: + raise newException(QuicTransportDialError, "error in quic dial:" & e.msg, e) + except DialError as e: + raise newException(QuicTransportDialError, "error in quic dial:" & e.msg, e) + except QuicError as e: + raise newException(QuicTransportDialError, "error in quic dial:" & e.msg, e) + +method upgrade*( + self: QuicTransport, conn: P2PConnection, peerId: Opt[PeerId] +): Future[Muxer] {.async: (raises: [CancelledError, LPError]).} = + let muxer = QuicMuxer.new(conn, peerId) + muxer.streamHandler = proc(conn: P2PConnection) {.async: (raises: []).} = + trace "Starting stream handler" + try: + await self.upgrader.ms.handle(conn) # handle incoming connection + except CancelledError as exc: + return + except CatchableError as exc: + trace "exception in stream handler", conn, msg = exc.msg + finally: + await conn.closeWithEOF() + trace "Stream handler done", conn + muxer.handleFut = muxer.handle() + return muxer diff --git a/temp/vendor/nim-lsquic/lsquic/context/client.nim b/temp/vendor/nim-lsquic/lsquic/context/client.nim new file mode 100644 index 000000000..b72f299de --- /dev/null +++ b/temp/vendor/nim-lsquic/lsquic/context/client.nim @@ -0,0 +1,164 @@ +# SPDX-License-Identifier: Apache-2.0 OR MIT +# Copyright (c) Status Research & Development GmbH + +import results +import chronicles +import chronos +import chronos/osdefs +import ./[context, io, stream] +import ../[lsquic_ffi, errors, tlsconfig, timeout, stream, certificates] +import ../helpers/sequninit + +proc onNewConn( + stream_if_ctx: pointer, conn: ptr lsquic_conn_t +): ptr lsquic_conn_ctx_t {.cdecl.} = + debug "New connection established: client" + let conn_ctx = lsquic_conn_get_ctx(conn) + cast[ptr lsquic_conn_ctx_t](conn_ctx) + +proc onHandshakeDone( + conn: ptr lsquic_conn_t, status: enum_lsquic_hsk_status +) {.cdecl.} = + debug "Handshake done", status + let conn_ctx = lsquic_conn_get_ctx(conn) + if conn_ctx.isNil: + debug "conn_ctx is nil in onHandshakeDone" + return + + let quicClientConn = cast[QuicConnection](conn_ctx) + if quicClientConn.connectedFut.finished: + return + + if status == LSQ_HSK_FAIL or status == LSQ_HSK_RESUMED_FAIL: + quicClientConn.connectedFut.fail( + newException(DialError, "could not connect to server. Handshake failed") + ) + else: + let x509chain = lsquic_conn_get_full_cert_chain(quicClientConn.lsquicConn) + let certChain = x509chain.getCertChain() + OPENSSL_sk_free(cast[ptr OPENSSL_STACK](x509chain)) + quicClientConn.certChain = certChain + + quicClientConn.connectedFut.complete() + +proc onConnClosed(conn: ptr lsquic_conn_t) {.cdecl.} = + debug "Connection closed: client" + let conn_ctx = lsquic_conn_get_ctx(conn) + if not conn_ctx.isNil: + let quicClientConn = cast[QuicConnection](conn_ctx) + if not quicClientConn.connectedFut.finished: + # Not connected yet + var buf: array[256, char] + let connStatus = + lsquic_conn_status(conn, cast[cstring](addr buf[0]), buf.len.csize_t) + let msg = $cast[cstring](addr buf[0]) + quicClientConn.connectedFut.fail( + newException( + DialError, "could not connect to server. Status: " & $connStatus & ". " & msg + ) + ) + quicClientConn.cancelPending() + quicClientConn.lsquicConn = nil + quicClientConn.onClose() + GC_unref(quicClientConn) + lsquic_conn_set_ctx(conn, nil) + +method dial*( + ctx: ClientContext, + local: TransportAddress, + remote: TransportAddress, + connectedFut: Future[void], + onClose: proc() {.gcsafe, raises: [].}, +): Result[QuicConnection, string] {.raises: [], gcsafe.} = + var + localAddress: Sockaddr_storage + localAddrLen: SockLen + remoteAddress: Sockaddr_storage + remoteAddrLen: SockLen + + local.toSAddr(localAddress, localAddrLen) + remote.toSAddr(remoteAddress, remoteAddrLen) + + # TODO: should use constructor + let quicClientConn = QuicConnection( + isOutgoing: true, + connectedFut: connectedFut, + local: local, + remote: remote, + incoming: newAsyncQueue[Stream](), + onClose: onClose, + ) + GC_ref(quicClientConn) # Keep it pinned until on_conn_closed is called + let conn = lsquic_engine_connect( + ctx.engine, + N_LSQVER, + cast[ptr SockAddr](addr localAddress), + cast[ptr SockAddr](addr remoteAddress), + cast[pointer](ctx), + cast[ptr lsquic_conn_ctx_t](quicClientConn), + nil, + 0, + nil, + 0, + nil, + 0, + ) + if conn.isNil: + return err("could not dial: " & $remote) + + quicClientConn.lsquicConn = conn + + ok(quicClientConn) + +const Cubic = 1 + +proc new*(T: typedesc[ClientContext], tlsConfig: TLSConfig): Result[T, string] = + var ctx = ClientContext() + ctx.tlsConfig = tlsConfig + ctx.setupSSLContext() + + lsquic_engine_init_settings(addr ctx.settings, 0) + ctx.settings.es_versions = 1.cuint shl LSQVER_I001.cuint #IETF QUIC v1 + ctx.settings.es_cc_algo = Cubic + ctx.settings.es_dplpmtud = 1 + ctx.settings.es_base_plpmtu = 1280 + ctx.settings.es_max_plpmtu = 0 + ctx.settings.es_pace_packets = 1 + + ctx.settings.es_cfcw = 1 * 1024 * 1024 + ctx.settings.es_max_cfcw = 2 * 1024 * 1024 + ctx.settings.es_sfcw = 256 * 1024 + ctx.settings.es_max_sfcw = 512 * 1024 + ctx.settings.es_init_max_stream_data_bidi_local = ctx.settings.es_sfcw + ctx.settings.es_init_max_stream_data_bidi_remote = ctx.settings.es_sfcw + ctx.settings.es_max_batch_size = 32 + + ctx.stream_if = struct_lsquic_stream_if( + on_new_conn: onNewConn, + on_hsk_done: onHandshakeDone, + on_conn_closed: onConnClosed, + on_new_stream: onNewStream, + on_read: onRead, + on_write: onWrite, + on_close: onClose, + ) + ctx.api = struct_lsquic_engine_api( + ea_settings: addr ctx.settings, + ea_stream_if_ctx: cast[pointer](ctx), + ea_packets_out_ctx: cast[pointer](ctx), + ea_stream_if: addr ctx.stream_if, + ea_get_ssl_ctx: getSSLCtx, + ea_packets_out: sendPacketsOut, + ) + + ctx.engine = lsquic_engine_new(0, addr ctx.api) + if ctx.engine.isNil: + return err("failed to create lsquic engine") + + ctx.tickTimeout = newTimeout( + proc() = + ctx.engine_process() + ) + ctx.tickTimeout.set(Moment.now()) + + return ok(ctx) diff --git a/temp/vendor/nim-lsquic/lsquic/context/context.nim b/temp/vendor/nim-lsquic/lsquic/context/context.nim new file mode 100644 index 000000000..260f5164a --- /dev/null +++ b/temp/vendor/nim-lsquic/lsquic/context/context.nim @@ -0,0 +1,269 @@ +# SPDX-License-Identifier: Apache-2.0 OR MIT +# Copyright (c) Status Research & Development GmbH + +import chronos +import chronos/osdefs +import chronicles +import + ../[lsquic_ffi, errors, tlsconfig, timeout, certificates, certificateverifier, stream] + +let SSL_CTX_ID = SSL_CTX_get_ex_new_index(0, nil, nil, nil, nil) # Yes, this is global +doAssert SSL_CTX_ID >= 0, "could not generate global ssl_ctx id" + +type QuicContext* = ref object of RootObj + settings*: struct_lsquic_engine_settings + api*: struct_lsquic_engine_api + engine*: ptr struct_lsquic_engine + stream_if*: struct_lsquic_stream_if + tlsConfig*: TLSConfig + tickTimeout*: Timeout + sslCtx*: ptr SSL_CTX + fd*: cint + +proc engine_process*(ctx: QuicContext) = + lsquic_engine_process_conns(ctx.engine) + + if lsquic_engine_has_unsent_packets(ctx.engine) != 0: + lsquic_engine_send_unsent_packets(ctx.engine) + + var diff: cint + if lsquic_engine_earliest_adv_tick(ctx.engine, addr diff) == 0: + return + + let delta = + if diff < 0: LSQUIC_DF_CLOCK_GRANULARITY.microseconds else: diff.microseconds + ctx.tickTimeout.set(delta) + +type PendingStream = object + stream: Stream + created: Future[void].Raising([CancelledError, ConnectionError]) + +type QuicConnection* = ref object of RootObj + isOutgoing*: bool + local*: TransportAddress + remote*: TransportAddress + lsquicConn*: ptr lsquic_conn_t + onClose*: proc() {.gcsafe, raises: [].} + closedLocal*: bool + closedRemote*: bool + incoming*: AsyncQueue[Stream] + connectedFut*: Future[void] + pendingStreams: seq[PendingStream] + certChain*: seq[seq[byte]] + +type ClientContext* = ref object of QuicContext + +type ServerContext* = ref object of QuicContext + incoming*: AsyncQueue[QuicConnection] + +proc processWhenReady*(quicContext: QuicContext) = + quicContext.tickTimeout.set(Moment.now()) + +proc incomingStream*( + quicConn: QuicConnection +): Future[Stream] {.async: (raises: [CancelledError]).} = + await quicConn.incoming.get() + +proc addPendingStream*( + quicConn: QuicConnection, s: Stream +): Future[void].Raising([CancelledError, ConnectionError]) {.raises: [], gcsafe.} = + let created = Future[void].Raising([CancelledError, ConnectionError]).init( + "QuicConnection.addPendingStream" + ) + quicConn.pendingStreams.add(PendingStream(stream: s, created: created)) + created + +proc popPendingStream*( + quicConn: QuicConnection, stream: ptr lsquic_stream_t +): Opt[Stream] {.raises: [], gcsafe.} = + if quicConn.pendingStreams.len == 0: + debug "no pending streams!" + return Opt.none(Stream) + + let pending = quicConn.pendingStreams.pop() + pending.stream.quicStream = stream + pending.created.complete() + Opt.some(pending.stream) + +proc cancelPending*(quicConn: QuicConnection) = + for pending in quicConn.pendingStreams: + pending.created.fail(newException(ConnectionError, "can't open new streams")) + +proc alpnSelectProtoCB( + ssl: ptr SSL, + outv: ptr ptr uint8, + outlen: ptr uint8, + inv: ptr uint8, + inlen: cuint, + userData: pointer, +): cint {.cdecl.} = + let serverCtx = cast[ServerContext](userData) + + if ( + SSL_select_next_proto( + outv, + outlen, + cast[ptr uint8](serverCtx.tlsConfig.alpnWire.cstring), + cast[cuint](serverCtx.tlsConfig.alpnWire.len), + inv, + inlen, + ) == OPENSSL_NPN_NEGOTIATED + ): + return SSL_TLSEXT_ERR_OK + + return SSL_TLSEXT_ERR_ALERT_FATAL + +proc verifyCertificate( + ssl: ptr SSL, out_alert: ptr uint8 +): enum_ssl_verify_result_t {.cdecl.} = + let sslCtx = SSL_get_SSL_CTX(ssl) + + let quicCtx = cast[QuicContext](SSL_CTX_get_ex_data(sslCtx, SSL_CTX_ID)) + if quicCtx.isNil: + raiseAssert "could not obtain context" + + let derCertificates = getFullCertChain(ssl) + + let serverName = SSL_get_servername(ssl, TLSEXT_NAMETYPE_host_name) + doAssert quicCtx.tlsConfig.certVerifier.isSome, "no custom validator set" + if quicCtx.tlsConfig.certVerifier.get().verify($serverName, derCertificates): + return ssl_verify_ok + else: + out_alert[] = SSL_AD_CERTIFICATE_UNKNOWN + return ssl_verify_invalid + +proc setupSSLContext*(quicCtx: QuicContext) = + let sslCtx = SSL_CTX_new( + if quicCtx is ServerContext: + TLS_server_method() + else: + TLS_client_method() + ) + if sslCtx.isNil: + raiseAssert "failed to create sslCtx" + + if SSL_CTX_set_ex_data(sslCtx, SSL_CTX_ID, cast[pointer](quicCtx)) != 1: + raiseAssert "could not set data in sslCtx" + + var opts = + 0 or SSL_OP_NO_SSLv2 or SSL_OP_NO_SSLv3 or SSL_OP_NO_TLSv1 or SSL_OP_NO_TLSv1_1 or + SSL_OP_CIPHER_SERVER_PREFERENCE + discard SSL_CTX_set_options(sslCtx, opts.uint32) + + if quicCtx.tlsConfig.key.len != 0 and quicCtx.tlsConfig.certificate.len != 0: + let pkey = quicCtx.tlsConfig.key.toPKey().valueOr: + raiseAssert "could not convert certificate to pkey: " & error + + let cert = quicCtx.tlsConfig.certificate.toX509().valueOr: + raiseAssert "could not convert certificate to x509: " & error + + defer: + X509_free(cert) + EVP_PKEY_free(pkey) + + if SSL_CTX_use_certificate(sslCtx, cert) != 1: + raiseAssert "could not use certificate" + + if SSL_CTX_use_PrivateKey(sslCtx, pkey) != 1: + raiseAssert "could not use private key" + + if SSL_CTX_check_private_key(sslCtx) != 1: + raiseAssert "cant use private key with certificate" + + if (SSL_CTX_set1_sigalgs_list(sslCtx, "ed25519:ecdsa_secp256r1_sha256") != 1): + raiseAssert "could not set supported algorithm list" + + if quicCtx.tlsConfig.certVerifier.isSome: + SSL_CTX_set_custom_verify( + sslCtx, SSL_VERIFY_PEER or SSL_VERIFY_FAIL_IF_NO_PEER_CERT, verifyCertificate + ) + + if quicCtx of ServerContext: + SSL_CTX_set_alpn_select_cb(sslCtx, alpnSelectProtoCB, cast[pointer](quicCtx)) + else: + if SSL_CTX_set_alpn_protos( + sslCtx, + cast[ptr uint8](quicCtx.tlsConfig.alpnWire.cstring), + cast[cuint](quicCtx.tlsConfig.alpnWire.len), + ) != 0: + raiseAssert "can't set client alpn" + + discard SSL_CTX_set_min_proto_version(sslCtx, TLS1_3_VERSION) + discard SSL_CTX_set_max_proto_version(sslCtx, TLS1_3_VERSION) + + quicCtx.sslCtx = sslCtx + +proc getSSLCtx*(peer_ctx: pointer, sockaddr: ptr SockAddr): ptr SSL_CTX {.cdecl.} = + let quicCtx = cast[QuicContext](peer_ctx) + quicCtx.sslCtx + +proc stop*(ctx: QuicContext) {.raises: [].} = + ctx.tickTimeout.stop() + lsquic_engine_destroy(ctx.engine) + +proc close*(ctx: QuicContext, conn: QuicConnection) = + if conn != nil and conn.lsquicConn != nil: + lsquic_conn_close(conn.lsquicConn) + ctx.processWhenReady() + +proc abort*(ctx: QuicContext, conn: QuicConnection) = + if conn != nil and conn.lsquicConn != nil: + lsquic_conn_abort(conn.lsquicConn) + ctx.processWhenReady() + +method dial*( + ctx: QuicContext, + local: TransportAddress, + remote: TransportAddress, + connectedFut: Future[void], + onClose: proc() {.gcsafe, raises: [].}, +): Result[QuicConnection, string] {.base, gcsafe, raises: [].} = + raiseAssert "dial not implemented" + +proc makeStream*(ctx: QuicContext, quicConn: QuicConnection) {.raises: [].} = + debug "Creating stream" + if quicConn == nil or quicConn.lsquicConn == nil: + debug "Cannot create stream: connection is nil" + return + lsquic_conn_make_stream(quicConn.lsquicConn) + +proc onNewStream*( + stream_if_ctx: pointer, stream: ptr lsquic_stream_t +): ptr lsquic_stream_ctx_t {.cdecl.} = + debug "New stream created" + let conn = lsquic_stream_conn(stream) + let conn_ctx = lsquic_conn_get_ctx(conn) + if conn_ctx.isNil: + debug "conn_ctx is nil in onNewStream" + return nil + + let quicConn = cast[QuicConnection](conn_ctx) + let stream_id = lsquic_stream_id(stream).int + let isLocal = + if quicConn.isOutgoing: + (stream_id and 1) == 0 + else: + (stream_id and 1) == 1 + + let streamCtx = + if isLocal: + let s = quicConn.popPendingStream(stream).valueOr: + return + # Whoever opens the stream writes first + discard lsquic_stream_wantread(stream, 0) + discard lsquic_stream_wantwrite(stream, 1) + s + else: + let s = Stream.new(stream) + quicConn.incoming.putNoWait(s) + # Whoever opens the stream reads first + discard lsquic_stream_wantread(stream, 1) + discard lsquic_stream_wantwrite(stream, 0) + s + + return cast[ptr lsquic_stream_ctx_t](streamCtx) + +proc certificates*( + ctx: QuicContext, conn: QuicConnection +): seq[seq[byte]] {.raises: [].} = + conn.certChain diff --git a/temp/vendor/nim-lsquic/lsquic/context/server.nim b/temp/vendor/nim-lsquic/lsquic/context/server.nim new file mode 100644 index 000000000..da4a61f9e --- /dev/null +++ b/temp/vendor/nim-lsquic/lsquic/context/server.nim @@ -0,0 +1,102 @@ +# SPDX-License-Identifier: Apache-2.0 OR MIT +# Copyright (c) Status Research & Development GmbH + +import results +import chronicles +import chronos +import chronos/osdefs +import ./[context, io, stream] +import ../[lsquic_ffi, tlsconfig, timeout, stream, certificates] +import ../helpers/[sequninit, transportaddr] + +proc onNewConn( + stream_if_ctx: pointer, conn: ptr lsquic_conn_t +): ptr lsquic_conn_ctx_t {.cdecl.} = + debug "New connection established: server" + var local: ptr SockAddr + var remote: ptr SockAddr + discard lsquic_conn_get_sockaddr(conn, addr local, addr remote) + + let x509chain = lsquic_conn_get_full_cert_chain(conn) + let certChain = x509chain.getCertChain() + OPENSSL_sk_free(cast[ptr OPENSSL_STACK](x509chain)) + + # TODO: should use a constructor + let quicConn = QuicConnection( + isOutgoing: false, + incoming: newAsyncQueue[Stream](), + local: local.toTransportAddress(), + remote: remote.toTransportAddress(), + lsquicConn: conn, + certChain: certChain, + onClose: proc() = + discard, + ) + GC_ref(quicConn) # Keep it pinned until on_conn_closed is called + let serverCtx = cast[ServerContext](stream_if_ctx) + serverCtx.incoming.putNoWait(quicConn) + cast[ptr lsquic_conn_ctx_t](quicConn) + +proc onConnClosed(conn: ptr lsquic_conn_t) {.cdecl.} = + debug "Connection closed: server" + let conn_ctx = lsquic_conn_get_ctx(conn) + if not conn_ctx.isNil: + let quicConn = cast[QuicConnection](conn_ctx) + quicConn.lsquicConn = nil + quicConn.onClose() + GC_unref(quicConn) + lsquic_conn_set_ctx(conn, nil) + +const Cubic = 1 + +proc new*(T: typedesc[ServerContext], tlsConfig: TLSConfig): Result[T, string] = + var ctx = ServerContext() + ctx.tlsConfig = tlsConfig + ctx.incoming = newAsyncQueue[QuicConnection]() + ctx.setupSSLContext() + + lsquic_engine_init_settings(addr ctx.settings, LSENG_SERVER) + ctx.settings.es_versions = 1.cuint shl LSQVER_I001.cuint #IETF QUIC v1 + ctx.settings.es_cc_algo = Cubic + ctx.settings.es_dplpmtud = 1 + ctx.settings.es_base_plpmtu = 1280 + ctx.settings.es_max_plpmtu = 0 + ctx.settings.es_pace_packets = 1 + + ctx.settings.es_cfcw = 1 * 1024 * 1024 + ctx.settings.es_max_cfcw = 2 * 1024 * 1024 + ctx.settings.es_sfcw = 256 * 1024 + ctx.settings.es_max_sfcw = 512 * 1024 + ctx.settings.es_init_max_stream_data_bidi_local = ctx.settings.es_sfcw + ctx.settings.es_init_max_stream_data_bidi_remote = ctx.settings.es_sfcw + ctx.settings.es_max_batch_size = 32 + + ctx.stream_if = struct_lsquic_stream_if( + on_new_conn: onNewConn, + on_conn_closed: onConnClosed, + on_new_stream: onNewStream, + on_read: onRead, + on_write: onWrite, + on_close: onClose, + ) + + ctx.api = struct_lsquic_engine_api( + ea_settings: addr ctx.settings, + ea_stream_if_ctx: cast[pointer](ctx), + ea_packets_out_ctx: cast[pointer](ctx), + ea_stream_if: addr ctx.stream_if, + ea_get_ssl_ctx: getSSLCtx, + ea_packets_out: sendPacketsOut, + ) + + ctx.engine = lsquic_engine_new(LSENG_SERVER, addr ctx.api) + if ctx.engine.isNil: + return err("failed to create lsquic engine") + + ctx.tickTimeout = newTimeout( + proc() = + ctx.engine_process() + ) + ctx.tickTimeout.set(Moment.now()) + + return ok(ctx) diff --git a/temp/vendor/nim-lsquic/lsquic/stream.nim b/temp/vendor/nim-lsquic/lsquic/stream.nim new file mode 100644 index 000000000..91ad7c256 --- /dev/null +++ b/temp/vendor/nim-lsquic/lsquic/stream.nim @@ -0,0 +1,194 @@ +# SPDX-License-Identifier: Apache-2.0 OR MIT +# Copyright (c) Status Research & Development GmbH + +import std/[deques, posix] +import chronos +import chronicles +import ./[lsquic_ffi, errors] + +type WriteTask* = object + data*: ptr byte + dataLen*: int + offset*: int + doneFut*: Future[void].Raising([CancelledError, StreamError]) + +type ReadTask* = object + data*: ptr byte + dataLen*: int + doneFut*: Future[int].Raising([CancelledError, StreamError]) + +type Stream* = ref object + quicStream*: ptr lsquic_stream_t + closedByEngine*: bool + closeWrite*: bool + # This is called when on_close callback is executed + closed*: AsyncEvent + # Reuse a single closed-event waiter to minimize allocations on hot paths. + # (no per call allocation) + closedWaiter*: Future[void].Raising([CancelledError]) + writeLock*: AsyncLock + toWrite*: Opt[WriteTask] + readLock*: AsyncLock + isEof*: bool # Received a FIN from remote + toRead*: Opt[ReadTask] + doProcess*: proc() {.gcsafe, raises: [].} + +proc new*(T: typedesc[Stream], quicStream: ptr lsquic_stream_t = nil): T = + let closed = newAsyncEvent() + let closedWaiter = closed.wait() + let s = Stream( + quicStream: quicStream, + closed: closed, + closedWaiter: closedWaiter, + readLock: newAsyncLock(), + writeLock: newAsyncLock(), + ) + GC_ref(s) # Keep it pinned until stream_if.on_close is executed + s + +proc abortPendingWrites*(stream: Stream, reason: string = "") = + let task = stream.toWrite.valueOr: + return + task.doneFut.fail(newException(StreamError, reason)) + stream.toWrite = Opt.none(WriteTask) + +proc abort*(stream: Stream) = + if stream.closeWrite and stream.isEof: + if not stream.closed.isSet(): + stream.closed.fire() + stream.abortPendingWrites("stream aborted") + return + + if not stream.closedByEngine: + let ret = lsquic_stream_close(stream.quicStream) + if ret != 0: + trace "could not abort stream", streamId = lsquic_stream_id(stream.quicStream) + stream.doProcess() + + stream.closeWrite = true + stream.isEof = true + stream.abortPendingWrites("stream aborted") + stream.closed.fire() + +proc close*(stream: Stream) {.async: (raises: [StreamError, CancelledError]).} = + if stream.closeWrite or stream.closedByEngine: + return + + # Closing only the write side + let ret = lsquic_stream_shutdown(stream.quicStream, 1) + if ret == 0: + if stream.isEof: + if lsquic_stream_close(stream.quicStream) != 0: + stream.abort() + raise newException(StreamError, "could not close the stream") + stream.doProcess() + + stream.abortPendingWrites("steam closed") + stream.closeWrite = true + +proc readOnce*( + stream: Stream, dst: ptr byte, dstLen: int +): Future[int] {.async: (raises: [CancelledError, StreamError]).} = + if dstLen == 0 or dst.isNil: + raiseAssert "dst cannot be nil" + + if stream.isEof or stream.closedByEngine: + return 0 + + await stream.readLock.acquire() + + defer: + try: + stream.readLock.release() + except AsyncLockError: + discard # should not happen - lock acquired directly above + + # In case stream was closed while waiting for lock being acquired + if stream.closedByEngine: + return 0 + + let n = lsquic_stream_read(stream.quicStream, dst, dstLen.csize_t) + + if n == 0: + stream.isEof = true + return 0 + elif n > 0: + return n + + if n < 0 and errno != EWOULDBLOCK: + stream.abort() + raise newException(StreamError, "could not read: " & $errno) + + if lsquic_stream_wantread(stream.quicStream, 1) == -1: + stream.abort() + raise newException(StreamError, "could not set wantread") + + let doneFut = + Future[int].Raising([CancelledError, StreamError]).init("Stream.readOnce") + stream.toRead = Opt.some(ReadTask(data: dst, dataLen: dstLen, doneFut: doneFut)) + + stream.doProcess() + + let raceFut = await race(stream.closedWaiter, doneFut) + if raceFut == stream.closedWaiter: + await doneFut.cancelAndWait() + stream.isEof = true + stream.closeWrite = true + return 0 + + return await doneFut + +template readOnce*(stream: Stream, dst: var openArray[byte]): untyped = + ## Convenience helper that forwards an openArray/seq to the pointer-based API. + (if dst.len == 0: stream.readOnce(nil, 0) + else: stream.readOnce(dst[0].addr, dst.len)) + +proc write*( + stream: Stream, data: seq[byte] +) {.async: (raises: [CancelledError, StreamError]).} = + if data.len == 0: + return + + if stream.closeWrite or stream.closedByEngine: + raise newException(StreamError, "stream closed") + + await stream.writeLock.acquire() + + defer: + try: + stream.writeLock.release() + except AsyncLockError: + discard # should not happen - lock acquired directly above + + if stream.closedByEngine: + raise newException(StreamError, "stream closed") + + # Try to write immediately + let p = data[0].addr + let n = lsquic_stream_write(stream.quicStream, p, data.len.csize_t) + if n >= data.len: + if lsquic_stream_flush(stream.quicStream) != 0: + stream.abort() + stream.doProcess() + return + elif n < 0: + error "could not write to stream", streamId = lsquic_stream_id(stream.quicStream), n + raise newException(StreamError, "could not write") + + # Enqueue otherwise + let doneFut = Future[void].Raising([CancelledError, StreamError]).init("Stream.write") + stream.toWrite = Opt.some( + WriteTask(data: data[0].addr, dataLen: data.len, doneFut: doneFut, offset: n) + ) + + discard lsquic_stream_wantwrite(stream.quicStream, 1) + + stream.doProcess() + + let raceFut = await race(stream.closedWaiter, doneFut) + if raceFut == stream.closedWaiter: + if not doneFut.finished: + doneFut.fail(newException(StreamError, "stream closed")) + stream.closeWrite = true + + await doneFut diff --git a/tests/node/test_wakunode_peer_manager.nim b/tests/node/test_wakunode_peer_manager.nim index ed58db7fe..e49533c33 100644 --- a/tests/node/test_wakunode_peer_manager.nim +++ b/tests/node/test_wakunode_peer_manager.nim @@ -60,9 +60,9 @@ suite "Peer Manager": serverKey = generateSecp256k1Key() clientKey = generateSecp256k1Key() - server = newTestWakuNode(serverKey, listenIp, Port(3000)) + server = newTestWakuNode(serverKey, listenIp, Port(3000), quicEnabled = false) serverPeerStore = server.peerManager.switch.peerStore - client = newTestWakuNode(clientKey, listenIp, Port(3001)) + client = newTestWakuNode(clientKey, listenIp, Port(3001), quicEnabled = false) clientPeerStore = client.peerManager.switch.peerStore await allFutures(server.start(), client.start()) diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index f78c3831f..443df113b 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -270,9 +270,15 @@ procSuite "Peer Manager": database = SqliteDatabase.new(":memory:")[] storage = WakuPeerStorage.new(database)[] node1 = newTestWakuNode( - generateSecp256k1Key(), getPrimaryIPAddr(), Port(44048), peerStorage = storage + generateSecp256k1Key(), + getPrimaryIPAddr(), + Port(44048), + peerStorage = storage, + quicEnabled = false, + ) + node2 = newTestWakuNode( + generateSecp256k1Key(), getPrimaryIPAddr(), Port(34023), quicEnabled = false ) - node2 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(34023)) node1.mountMetadata(0, @[0'u16]).expect("Mounted Waku Metadata") node2.mountMetadata(0, @[0'u16]).expect("Mounted Waku Metadata") @@ -311,6 +317,7 @@ procSuite "Peer Manager": parseIpAddress("127.0.0.1"), Port(56037), peerStorage = storage, + quicEnabled = false, ) node3.mountMetadata(0, @[0'u16]).expect("Mounted Waku Metadata") @@ -343,9 +350,15 @@ procSuite "Peer Manager": database = SqliteDatabase.new(":memory:")[] storage = WakuPeerStorage.new(database)[] node1 = newTestWakuNode( - generateSecp256k1Key(), getPrimaryIPAddr(), Port(44048), peerStorage = storage + generateSecp256k1Key(), + getPrimaryIPAddr(), + Port(44048), + peerStorage = storage, + quicEnabled = false, + ) + node2 = newTestWakuNode( + generateSecp256k1Key(), getPrimaryIPAddr(), Port(34023), quicEnabled = false ) - node2 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(34023)) node1.mountMetadata(0, @[0'u16]).expect("Mounted Waku Metadata") node2.mountMetadata(0, @[0'u16]).expect("Mounted Waku Metadata") @@ -384,6 +397,7 @@ procSuite "Peer Manager": parseIpAddress("127.0.0.1"), Port(56037), peerStorage = storage, + quicEnabled = false, ) node3.mountMetadata(0, @[0'u16]).expect("Mounted Waku Metadata") diff --git a/tests/test_wakunode.nim b/tests/test_wakunode.nim index a7f1084fb..a908cb051 100644 --- a/tests/test_wakunode.nim +++ b/tests/test_wakunode.nim @@ -185,20 +185,21 @@ suite "WakuNode": bindPort = Port(61006) extIp = some(getPrimaryIPAddr()) extPort = some(Port(61008)) - node = newTestWakuNode(nodeKey, bindIp, bindPort, extIp, extPort) + node = + newTestWakuNode(nodeKey, bindIp, bindPort, extIp, extPort, quicEnabled = false) let bindEndpoint = MultiAddress.init(bindIp, tcpProtocol, bindPort) announcedEndpoint = MultiAddress.init(extIp.get(), tcpProtocol, extPort.get()) check: - # Check that underlying peer info contains only bindIp before starting - node.switch.peerInfo.listenAddrs.len == 1 + # Check that underlying peer info contains bindIp before starting + node.switch.peerInfo.listenAddrs.len >= 1 node.switch.peerInfo.listenAddrs.contains(bindEndpoint) # Underlying peer info has not updated addrs before starting node.switch.peerInfo.addrs.len == 0 - node.announcedAddresses.len == 1 + node.announcedAddresses.len >= 1 node.announcedAddresses.contains(announcedEndpoint) await node.start() @@ -206,14 +207,52 @@ suite "WakuNode": check: node.started # Underlying peer info listenAddrs has not changed - node.switch.peerInfo.listenAddrs.len == 1 + node.switch.peerInfo.listenAddrs.len >= 1 node.switch.peerInfo.listenAddrs.contains(bindEndpoint) # Check that underlying peer info is updated with announced address - node.switch.peerInfo.addrs.len == 1 + node.switch.peerInfo.addrs.len >= 1 node.switch.peerInfo.addrs.contains(announcedEndpoint) await node.stop() + asyncTest "Peer info updates with correct announced addresses (QUIC)": + let + nodeKey = generateSecp256k1Key() + bindIp = parseIpAddress("0.0.0.0") + bindPort = Port(61006) + quicPort = Port(0) + extIp = some(getPrimaryIPAddr()) + extPort = some(Port(61008)) + node = newTestWakuNode( + nodeKey, + bindIp, + bindPort, + extIp, + extPort, + quicEnabled = true, + quicBindPort = quicPort, + ) + + let tcpAnnounced = MultiAddress.init(extIp.get(), tcpProtocol, extPort.get()) + + check: + node.switch.peerInfo.listenAddrs.len >= 2 + node.switch.peerInfo.addrs.len == 0 + node.announcedAddresses.len >= 2 + node.announcedAddresses.contains(tcpAnnounced) + node.announcedAddresses.anyIt("/quic-v1" in $it) + + await node.start() + + check: + node.started + node.switch.peerInfo.listenAddrs.len >= 2 + node.switch.peerInfo.addrs.len >= 2 + node.switch.peerInfo.addrs.contains(tcpAnnounced) + node.switch.peerInfo.addrs.anyIt("/quic-v1" in $it) + + await node.stop() + asyncTest "Node can use dns4 in announced addresses": let nodeKey = generateSecp256k1Key() @@ -229,7 +268,7 @@ suite "WakuNode": ) check: - node.announcedAddresses.len == 1 + node.announcedAddresses.len >= 1 node.announcedAddresses.contains(expectedDns4Addr) asyncTest "Node uses dns4 resolved ip in announced addresses if no extIp is provided": diff --git a/tests/testlib/wakunode.nim b/tests/testlib/wakunode.nim index e904604ab..fade47444 100644 --- a/tests/testlib/wakunode.nim +++ b/tests/testlib/wakunode.nim @@ -60,6 +60,8 @@ proc newTestWakuNode*( wsBindPort: Port = (Port) 8000, wsEnabled: bool = false, wssEnabled: bool = false, + quicBindPort: Port = (Port) 0, + quicEnabled: bool = true, secureKey: string = "", secureCert: string = "", wakuFlags = none(CapabilitiesBitfield), @@ -74,6 +76,12 @@ proc newTestWakuNode*( ): WakuNode = logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT) + let bindIp = + if quicEnabled and $bindIp == "0.0.0.0": + parseIpAddress("127.0.0.1") + else: + bindIp + var resolvedExtIp = extIp # Update extPort to default value if it's missing and there's an extIp or a DNS domain @@ -105,6 +113,8 @@ proc newTestWakuNode*( wsBindPort = some(wsBindPort), wsEnabled = wsEnabled, wssEnabled = wssEnabled, + quicBindPort = some(quicBindPort), + quicEnabled = quicEnabled, dns4DomainName = dns4DomainName, discv5UdpPort = discv5UdpPort, wakuFlags = wakuFlags, diff --git a/tests/waku_filter_v2/test_waku_client.nim b/tests/waku_filter_v2/test_waku_client.nim index c57699d39..59890b3c8 100644 --- a/tests/waku_filter_v2/test_waku_client.nim +++ b/tests/waku_filter_v2/test_waku_client.nim @@ -2255,12 +2255,24 @@ suite "Waku Filter - End to End": contentTopic = DefaultContentTopic contentTopicSeq = @[contentTopic] - client = - newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(23450)) - server = - newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(23451)) - client2nd = - newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(23452)) + client = newTestWakuNode( + generateSecp256k1Key(), + parseIpAddress("0.0.0.0"), + Port(23450), + quicEnabled = false, + ) + server = newTestWakuNode( + generateSecp256k1Key(), + parseIpAddress("0.0.0.0"), + Port(23451), + quicEnabled = false, + ) + client2nd = newTestWakuNode( + generateSecp256k1Key(), + parseIpAddress("0.0.0.0"), + Port(23452), + quicEnabled = false, + ) await allFutures(server.start(), client.start(), client2nd.start()) diff --git a/tests/waku_peer_exchange/test_protocol.nim b/tests/waku_peer_exchange/test_protocol.nim index 74cdba110..567c3c78c 100644 --- a/tests/waku_peer_exchange/test_protocol.nim +++ b/tests/waku_peer_exchange/test_protocol.nim @@ -50,6 +50,7 @@ suite "Waku Peer Exchange": some(extIp), wakuFlags = some(flags), discv5UdpPort = some(nodeUdpPort1), + quicEnabled = false, ) nodeKey2 = generateSecp256k1Key() @@ -62,6 +63,7 @@ suite "Waku Peer Exchange": some(extIp), wakuFlags = some(flags), discv5UdpPort = some(nodeUdpPort2), + quicEnabled = false, ) nodeKey3 = generateSecp256k1Key() @@ -74,6 +76,7 @@ suite "Waku Peer Exchange": some(extIp), wakuFlags = some(flags), discv5UdpPort = some(nodeUdpPort3), + quicEnabled = false, ) # discv5 @@ -304,10 +307,18 @@ suite "Waku Peer Exchange": asyncTest "Request with invalid peer info": # Given two valid nodes with PeerExchange let - node1 = - newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) - node2 = - newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + node1 = newTestWakuNode( + generateSecp256k1Key(), + parseIpAddress("0.0.0.0"), + Port(0), + quicEnabled = false, + ) + node2 = newTestWakuNode( + generateSecp256k1Key(), + parseIpAddress("0.0.0.0"), + Port(0), + quicEnabled = false, + ) # Start and mount peer exchange await allFutures([node1.start(), node2.start()]) @@ -326,7 +337,12 @@ suite "Waku Peer Exchange": asyncTest "Connections are closed after response is sent": # Create 3 nodes let nodes = toSeq(0 ..< 3).mapIt( - newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + newTestWakuNode( + generateSecp256k1Key(), + parseIpAddress("0.0.0.0"), + Port(0), + quicEnabled = false, + ) ) await allFutures(nodes.mapIt(it.start())) diff --git a/tests/waku_relay/test_wakunode_relay.nim b/tests/waku_relay/test_wakunode_relay.nim index a687119bd..e4d57ce51 100644 --- a/tests/waku_relay/test_wakunode_relay.nim +++ b/tests/waku_relay/test_wakunode_relay.nim @@ -400,7 +400,9 @@ suite "WakuNode - Relay": asyncTest "Messages relaying fails with non-overlapping transports (TCP or Websockets)": let nodeKey1 = generateSecp256k1Key() - node1 = newTestWakuNode(nodeKey1, parseIpAddress("0.0.0.0"), bindPort = Port(0)) + node1 = newTestWakuNode( + nodeKey1, parseIpAddress("0.0.0.0"), bindPort = Port(0), quicEnabled = false + ) nodeKey2 = generateSecp256k1Key() node2 = newTestWakuNode( nodeKey2, @@ -408,6 +410,7 @@ suite "WakuNode - Relay": bindPort = Port(0), wsBindPort = Port(0), wsEnabled = true, + quicEnabled = false, ) shard = DefaultRelayShard contentTopic = ContentTopic("/waku/2/default-content/proto") @@ -629,10 +632,14 @@ suite "WakuNode - Relay": for j in 0 ..< 50: discard await nodes[0].wakuRelay.publish(topic, urandom(1 * (10 ^ 3))) - # long wait, must be higher than the configured decayInterval (how often score is updated) - await sleepAsync(20.seconds) + # wait for decayInterval to pass so gossipsub scores update and bad peer is disconnected + let deadline = Moment.now() + 30.seconds + while Moment.now() < deadline: + if nodes[0].peerManager.switch.connManager.getConnections().len == 0: + break + await sleepAsync(200.millis) - # all nodes lower the score of nodes[0] (will change if gossipsub params or amount of msg changes) + # all nodes lower the score of nodes[0] (will change if gossipsub params or amount of msg changes) for i in 1 ..< 5: check: nodes[i].wakuRelay.peerStats[nodes[0].switch.peerInfo.peerId].score == -249999.9 diff --git a/tests/wakunode_rest/test_rest_debug.nim b/tests/wakunode_rest/test_rest_debug.nim index 4bd2e8c02..8a17a0a9a 100644 --- a/tests/wakunode_rest/test_rest_debug.nim +++ b/tests/wakunode_rest/test_rest_debug.nim @@ -55,8 +55,10 @@ suite "Waku v2 REST API - Debug": check: response.status == 200 $response.contentType == $MIMETYPE_JSON - response.data.listenAddresses == - @[$node.switch.peerInfo.addrs[^1] & "/p2p/" & $node.switch.peerInfo.peerId] + response.data.listenAddresses.len >= 1 + response.data.listenAddresses.contains( + $node.switch.peerInfo.addrs[^1] & "/p2p/" & $node.switch.peerInfo.peerId + ) await restServer.stop() await restServer.closeWait() diff --git a/tests/wakunode_rest/test_rest_filter.nim b/tests/wakunode_rest/test_rest_filter.nim index 1a4731d6a..5062de296 100644 --- a/tests/wakunode_rest/test_rest_filter.nim +++ b/tests/wakunode_rest/test_rest_filter.nim @@ -35,7 +35,8 @@ proc testWakuNode(): WakuNode = extIp = parseIpAddress("127.0.0.1") port = Port(0) - return newTestWakuNode(privkey, bindIp, port, some(extIp), some(port)) + return + newTestWakuNode(privkey, bindIp, port, some(extIp), some(port), quicEnabled = false) type RestFilterTest = object serviceNode: WakuNode diff --git a/tools/confutils/cli_args.nim b/tools/confutils/cli_args.nim index a99ba43ee..1889c864a 100644 --- a/tools/confutils/cli_args.nim +++ b/tools/confutils/cli_args.nim @@ -680,6 +680,19 @@ with the drawback of consuming some more bandwidth.""", name: "websocket-secure-cert-path" .}: string + ## QUIC transport config + quicSupport* {. + desc: "Enable QUIC transport: true|false", + defaultValue: false, + name: "quic-support" + .}: bool + + quicPort* {. + desc: "QUIC transport listening port (UDP).", + defaultValue: 9090, + name: "quic-port" + .}: Port + ## Rate limitation config, if not set, rate limit checks will not be performed rateLimits* {. desc: @@ -1111,6 +1124,9 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] = b.webSocketConf.withKeyPath(n.websocketSecureKeyPath) b.webSocketConf.withCertPath(n.websocketSecureCertPath) + b.quicConf.withEnabled(n.quicSupport) + b.quicConf.withQuicPort(n.quicPort) + if n.rateLimits.len > 0: b.rateLimitConf.withRateLimits(n.rateLimits) diff --git a/waku/factory/builder.nim b/waku/factory/builder.nim index 87b0db492..838269362 100644 --- a/waku/factory/builder.nim +++ b/waku/factory/builder.nim @@ -187,6 +187,7 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] = privKey = builder.nodekey, address = builder.netConfig.get().hostAddress, wsAddress = builder.netConfig.get().wsHostAddress, + quicAddress = builder.netConfig.get().quicHostAddress, transportFlags = {ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay}, rng = rng, maxConnections = builder.switchMaxConnections.get(builders.MaxConnections), diff --git a/waku/factory/conf_builder/conf_builder.nim b/waku/factory/conf_builder/conf_builder.nim index b8d0316c3..1ac9fdef7 100644 --- a/waku/factory/conf_builder/conf_builder.nim +++ b/waku/factory/conf_builder/conf_builder.nim @@ -7,6 +7,7 @@ import ./dns_discovery_conf_builder, ./discv5_conf_builder, ./web_socket_conf_builder, + ./quic_conf_builder, ./metrics_server_conf_builder, ./rate_limit_conf_builder, ./rln_relay_conf_builder, @@ -16,6 +17,6 @@ import export waku_conf_builder, filter_service_conf_builder, store_sync_conf_builder, store_service_conf_builder, rest_server_conf_builder, dns_discovery_conf_builder, - discv5_conf_builder, web_socket_conf_builder, metrics_server_conf_builder, - rate_limit_conf_builder, rln_relay_conf_builder, mix_conf_builder, - kademlia_discovery_conf_builder + discv5_conf_builder, web_socket_conf_builder, quic_conf_builder, + metrics_server_conf_builder, rate_limit_conf_builder, rln_relay_conf_builder, + mix_conf_builder, kademlia_discovery_conf_builder diff --git a/waku/factory/conf_builder/quic_conf_builder.nim b/waku/factory/conf_builder/quic_conf_builder.nim new file mode 100644 index 000000000..9a3b3fd1a --- /dev/null +++ b/waku/factory/conf_builder/quic_conf_builder.nim @@ -0,0 +1,33 @@ +import chronicles, std/[net, options], results +import waku/factory/waku_conf + +logScope: + topics = "waku conf builder quic" + +######################### +## QUIC Config Builder ## +######################### +type QuicConfBuilder* = object + enabled*: Option[bool] + quicPort*: Option[Port] + +proc init*(T: type QuicConfBuilder): QuicConfBuilder = + QuicConfBuilder() + +proc withEnabled*(b: var QuicConfBuilder, enabled: bool) = + b.enabled = some(enabled) + +proc withQuicPort*(b: var QuicConfBuilder, quicPort: Port) = + b.quicPort = some(quicPort) + +proc withQuicPort*(b: var QuicConfBuilder, quicPort: uint16) = + b.quicPort = some(Port(quicPort)) + +proc build*(b: QuicConfBuilder): Result[Option[QuicConf], string] = + if not b.enabled.get(false): + return ok(none(QuicConf)) + + if b.quicPort.isNone(): + return err("quic.port is not specified") + + return ok(some(QuicConf(port: b.quicPort.get()))) diff --git a/waku/factory/conf_builder/waku_conf_builder.nim b/waku/factory/conf_builder/waku_conf_builder.nim index 956d733d3..a064005f2 100644 --- a/waku/factory/conf_builder/waku_conf_builder.nim +++ b/waku/factory/conf_builder/waku_conf_builder.nim @@ -23,6 +23,7 @@ import ./dns_discovery_conf_builder, ./discv5_conf_builder, ./web_socket_conf_builder, + ./quic_conf_builder, ./metrics_server_conf_builder, ./rate_limit_conf_builder, ./rln_relay_conf_builder, @@ -81,6 +82,7 @@ type WakuConfBuilder* = object storeServiceConf*: StoreServiceConfBuilder mixConf*: MixConfBuilder webSocketConf*: WebSocketConfBuilder + quicConf*: QuicConfBuilder rateLimitConf*: RateLimitConfBuilder kademliaDiscoveryConf*: KademliaDiscoveryConfBuilder # End conf builders @@ -142,6 +144,7 @@ proc init*(T: type WakuConfBuilder): WakuConfBuilder = rlnRelayConf: RlnRelayConfBuilder.init(), storeServiceConf: StoreServiceConfBuilder.init(), webSocketConf: WebSocketConfBuilder.init(), + quicConf: QuicConfBuilder.init(), rateLimitConf: RateLimitConfBuilder.init(), kademliaDiscoveryConf: KademliaDiscoveryConfBuilder.init(), ) @@ -546,6 +549,9 @@ proc build*( let webSocketConf = builder.webSocketConf.build().valueOr: return err("WebSocket Conf building failed: " & $error) + let quicConf = builder.quicConf.build().valueOr: + return err("QUIC Conf building failed: " & $error) + let rateLimit = builder.rateLimitConf.build().valueOr: return err("Rate limits Conf building failed: " & $error) @@ -707,6 +713,7 @@ proc build*( ), portsShift: portsShift, webSocketConf: webSocketConf, + quicConf: quicConf, dnsAddrsNameServers: dnsAddrsNameServers, peerPersistence: peerPersistence, peerStoreCapacity: builder.peerStoreCapacity, diff --git a/waku/factory/internal_config.nim b/waku/factory/internal_config.nim index 7aad6e615..0515dc8d1 100644 --- a/waku/factory/internal_config.nim +++ b/waku/factory/internal_config.nim @@ -59,6 +59,7 @@ proc networkConfiguration*( conf: EndpointConf, discv5Conf: Option[Discv5Conf], webSocketConf: Option[WebSocketConf], + quicConf: Option[QuicConf], wakuFlags: CapabilitiesBitfield, dnsAddrsNameServers: seq[IpAddress], portsShift: uint16, @@ -109,6 +110,13 @@ proc networkConfiguration*( else: (false, none(Port), false) + let (quicEnabled, quicBindPort) = + if quicConf.isSome: + let qConf = quicConf.get() + (true, some(Port(qConf.port.uint16 + portsShift))) + else: + (false, none(Port)) + # Wrap in none because NetConfig does not have a default constructor # TODO: We could change bindIp in NetConfig to be something less restrictive # than IpAddress, which doesn't allow default construction @@ -123,6 +131,8 @@ proc networkConfiguration*( wsBindPort = wsBindPort, wsEnabled = wsEnabled, wssEnabled = wssEnabled, + quicBindPort = quicBindPort, + quicEnabled = quicEnabled, dns4DomainName = conf.dns4DomainName, discv5UdpPort = discv5UdpPort, wakuFlags = some(wakuFlags), diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 52b719b8f..676848294 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -463,8 +463,8 @@ proc setupNode*( let netConfig = ( await networkConfiguration( wakuConf.clusterId, wakuConf.endpointConf, wakuConf.discv5Conf, - wakuConf.webSocketConf, wakuConf.wakuFlags, wakuConf.dnsAddrsNameServers, - wakuConf.portsShift, clientId, + wakuConf.webSocketConf, wakuConf.quicConf, wakuConf.wakuFlags, + wakuConf.dnsAddrsNameServers, wakuConf.portsShift, clientId, ) ).valueOr: error "failed to create internal config", error = error diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 45e0edee0..79c9a7eac 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -263,7 +263,7 @@ proc getRunningNetConfig(waku: ptr Waku): Future[Result[NetConfig, string]] {.as let netConf = ( await networkConfiguration( conf.clusterId, conf.endpointConf, conf.discv5Conf, conf.webSocketConf, - conf.wakuFlags, conf.dnsAddrsNameServers, conf.portsShift, clientId, + conf.quicConf, conf.wakuFlags, conf.dnsAddrsNameServers, conf.portsShift, clientId, ) ).valueOr: return err("Could not update NetConfig: " & error) diff --git a/waku/factory/waku_conf.nim b/waku/factory/waku_conf.nim index 4934faccc..4b6409932 100644 --- a/waku/factory/waku_conf.nim +++ b/waku/factory/waku_conf.nim @@ -32,6 +32,9 @@ type WebSocketConf* = object port*: Port secureConf*: Option[WebSocketSecureConf] +type QuicConf* = object + port*: Port + # TODO: should be defined in validator_signed.nim and imported here type ProtectedShard* {.requiresInit.} = object shard*: uint16 @@ -112,6 +115,7 @@ type WakuConf* {.requiresInit.} = ref object restServerConf*: Option[RestServerConf] metricsServerConf*: Option[MetricsServerConf] webSocketConf*: Option[WebSocketConf] + quicConf*: Option[QuicConf] mixConf*: Option[MixConf] kademliaDiscoveryConf*: Option[KademliaDiscoveryConf] diff --git a/waku/node/net_config.nim b/waku/node/net_config.nim index 4802694c4..ce25e9138 100644 --- a/waku/node/net_config.nim +++ b/waku/node/net_config.nim @@ -9,6 +9,7 @@ type NetConfig* = object hostAddress*: MultiAddress clusterId*: uint16 wsHostAddress*: Option[MultiAddress] + quicHostAddress*: Option[MultiAddress] hostExtAddress*: Option[MultiAddress] wsExtAddress*: Option[MultiAddress] wssEnabled*: bool @@ -74,6 +75,8 @@ proc init*( wsBindPort: Option[Port] = some(DefaultWsBindPort), wsEnabled: bool = false, wssEnabled: bool = false, + quicBindPort = none(Port), + quicEnabled: bool = false, dns4DomainName = none(string), discv5UdpPort = none(Port), clusterId: uint16 = 0, @@ -94,6 +97,17 @@ proc init*( except CatchableError: return err(getCurrentExceptionMsg()) + var quicHostAddress = none(MultiAddress) + if quicEnabled and quicBindPort.isSome(): + try: + quicHostAddress = some( + MultiAddress + .init("/ip4/" & $bindIp & "/udp/" & $quicBindPort.get() & "/quic-v1") + .tryGet() + ) + except CatchableError: + return err(getCurrentExceptionMsg()) + let enrIp = if extIp.isSome(): extIp @@ -106,7 +120,7 @@ proc init*( some(bindPort) # Setup external addresses, if available - var hostExtAddress, wsExtAddress = none(MultiAddress) + var hostExtAddress, wsExtAddress, quicExtAddress = none(MultiAddress) if dns4DomainName.isSome(): # Use dns4 for externally announced addresses @@ -123,6 +137,19 @@ proc init*( ) except CatchableError: return err(getCurrentExceptionMsg()) + + if quicHostAddress.isSome(): + try: + quicExtAddress = some( + MultiAddress + .init( + "/dns4/" & dns4DomainName.get() & "/udp/" & $quicBindPort.get() & + "/quic-v1" + ) + .tryGet() + ) + except CatchableError: + return err(getCurrentExceptionMsg()) else: # No public domain name, use ext IP if available if extIp.isSome() and extPort.isSome(): @@ -137,6 +164,18 @@ proc init*( except CatchableError: return err(getCurrentExceptionMsg()) + if quicHostAddress.isSome(): + try: + quicExtAddress = some( + MultiAddress + .init("/ip4/" & $extIp.get() & "/udp/" & $quicBindPort.get() & "/quic-v1") + .tryGet() + ) + except CatchableError: + return err( + "Failed to create external QUIC multiaddress: " & getCurrentExceptionMsg() + ) + var announcedAddresses = newSeq[MultiAddress]() if not extMultiAddrsOnly: @@ -152,6 +191,11 @@ proc init*( # Only publish wsHostAddress if a WS address is not set in extMultiAddrs announcedAddresses.add(wsHostAddress.get()) + if quicExtAddress.isSome(): + announcedAddresses.add(quicExtAddress.get()) + elif quicHostAddress.isSome(): + announcedAddresses.add(formatListenAddress(quicHostAddress.get())) + # External multiaddrs that the operator may have configured if extMultiAddrs.len > 0: announcedAddresses.add(extMultiAddrs) @@ -161,7 +205,7 @@ proc init*( # https://rfc.vac.dev/spec/31/#many-connection-types enrMultiaddrs = announcedAddresses.filterIt( it.hasProtocol("dns4") or it.hasProtocol("dns6") or it.hasProtocol("ws") or - it.hasProtocol("wss") + it.hasProtocol("wss") or it.hasProtocol("quic-v1") ) ok( @@ -169,6 +213,7 @@ proc init*( hostAddress: hostAddress, clusterId: clusterId, wsHostAddress: wsHostAddress, + quicHostAddress: quicHostAddress, hostExtAddress: hostExtAddress, wsExtAddress: wsExtAddress, extIp: extIp, diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index e3eb8d75b..05af13208 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -7,7 +7,13 @@ import chronos, chronicles, metrics, - libp2p/[multistream, muxers/muxer, nameresolving/nameresolver, peerstore], + libp2p/[ + multistream, + muxers/muxer, + nameresolving/nameresolver, + peerstore, + transports/quictransport, + ], waku/[ waku_core, waku_relay, diff --git a/waku/node/waku_switch.nim b/waku/node/waku_switch.nim index d1af77662..c8c91177f 100644 --- a/waku/node/waku_switch.nim +++ b/waku/node/waku_switch.nim @@ -57,6 +57,7 @@ proc newWakuSwitch*( privKey = none(crypto.PrivateKey), address = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(), wsAddress = none(MultiAddress), + quicAddress = none(MultiAddress), secureManagers: openarray[SecureProtocol] = [SecureProtocol.Noise], transportFlags: set[ServerFlags] = {}, rng: ref HmacDrbgContext, @@ -101,15 +102,26 @@ proc newWakuSwitch*( b = b.withAgentVersion(agentString.get()) if privKey.isSome(): b = b.withPrivateKey(privKey.get()) - if wsAddress.isSome(): - b = b.withAddresses(@[wsAddress.get(), address]) + # Collect all listen addresses + var addresses: seq[MultiAddress] + if wsAddress.isSome(): + addresses.add(wsAddress.get()) + addresses.add(address) + if quicAddress.isSome(): + addresses.add(quicAddress.get()) + + b = b.withAddresses(addresses) + + # Add WS transport if enabled + if wsAddress.isSome(): if wssEnabled: b = b.withWssTransport(secureKeyPath, secureCertPath) else: b = b.withWsTransport() - else: - b = b.withAddress(address) + + if quicAddress.isSome(): + b = b.withQuicTransport() if not rendezvous.isNil(): b = b.withRendezVous(rendezvous) diff --git a/waku/waku_core/multiaddrstr.nim b/waku/waku_core/multiaddrstr.nim index cd0caf2aa..f5999ad1c 100644 --- a/waku/waku_core/multiaddrstr.nim +++ b/waku/waku_core/multiaddrstr.nim @@ -1,5 +1,6 @@ {.push raises: [].} +import std/strutils import libp2p/[peerinfo, switch] import ./peers @@ -8,14 +9,18 @@ proc constructMultiaddrStr*(wireaddr: MultiAddress, peerId: PeerId): string = # Constructs a multiaddress with both wire address and p2p identity return $wireaddr & "/p2p/" & $peerId +proc firstAddr(addrs: seq[MultiAddress]): MultiAddress = + for a in addrs: + if "/quic-v1" notin $a: + return a + return addrs[0] + proc constructMultiaddrStr*(peerInfo: PeerInfo): string = - # Constructs a multiaddress with both location (wire) address and p2p identity if peerInfo.listenAddrs.len == 0: return "" - return constructMultiaddrStr(peerInfo.listenAddrs[0], peerInfo.peerId) + return constructMultiaddrStr(firstAddr(peerInfo.listenAddrs), peerInfo.peerId) proc constructMultiaddrStr*(remotePeerInfo: RemotePeerInfo): string = - # Constructs a multiaddress with both location (wire) address and p2p identity if remotePeerInfo.addrs.len == 0: return "" - return constructMultiaddrStr(remotePeerInfo.addrs[0], remotePeerInfo.peerId) + return constructMultiaddrStr(firstAddr(remotePeerInfo.addrs), remotePeerInfo.peerId) diff --git a/waku/waku_core/peers.nim b/waku/waku_core/peers.nim index c4b8b593e..aed30e37a 100644 --- a/waku/waku_core/peers.nim +++ b/waku/waku_core/peers.nim @@ -330,7 +330,9 @@ converter toRemotePeerInfo*(peerInfo: PeerInfo): RemotePeerInfo = ## Useful for testing or internal connections RemotePeerInfo( peerId: peerInfo.peerId, - addrs: peerInfo.listenAddrs, + addrs: + peerInfo.listenAddrs.filterIt("/quic-v1" in $it) & + peerInfo.listenAddrs.filterIt("/quic-v1" notin $it), enr: none(enr.Record), protocols: peerInfo.protocols, shards: @[],