Merge 70b3c2d7e2013dc0a135c741234c7b201945b5be into 9a344553e765775bb9859b58d0f5e284c51e9a98

This commit is contained in:
Fabiana Cecin 2026-04-07 14:33:25 -04:00 committed by GitHub
commit 6688adaeaf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 2024 additions and 51 deletions

View File

@ -77,7 +77,10 @@ jobs:
key: ${{ runner.os }}-vendor-modules-${{ steps.submodules.outputs.hash }}
- name: Make update
run: make update
run: |
make update
# TEMPORARY: apply nim-libp2p QUIC patches
bash temp/apply.sh
- name: Build binaries
run: make V=1 QUICK_AND_DIRTY_COMPILER=1 all
@ -118,7 +121,10 @@ jobs:
key: ${{ runner.os }}-vendor-modules-${{ steps.submodules.outputs.hash }}
- name: Make update
run: make update
run: |
make update
# TEMPORARY: apply nim-libp2p QUIC patches
bash temp/apply.sh
- name: Run tests
run: |

View File

@ -66,6 +66,8 @@ jobs:
if: ${{ steps.secrets.outcome == 'success' }}
run: |
make update
# TEMPORARY: apply nim-libp2p QUIC patches
bash temp/apply.sh
make -j${NPROC} V=1 QUICK_AND_DIRTY_COMPILER=1 NIMFLAGS="-d:disableMarchNative -d:postgres -d:chronicles_colors:none" wakunode2

37
temp/apply.sh Executable file
View File

@ -0,0 +1,37 @@
#!/bin/bash
# Apply vendor patches for QUIC support.
# Run from the logos-delivery root directory:
# bash temp/apply.sh
#
# This copies patched files from temp/vendor/ on top of vendor/.
# Run this AFTER `make update` to re-apply patches over fresh vendor state.
#
# nim-libp2p patches:
# - quictransport.nim: getStreams, remote-close propagation, session.closed guard
# - muxer.nim: base getStreams returns @[] instead of raising
# - switch.nim: imports quictransport for vtable registration
#
# nim-lsquic patches:
# - stream.nim: doProcess() on immediate write path (fixes stalled sends)
# - context/context.nim: nil guard in makeStream
# - context/client.nim: nil lsquicConn on connection close (prevents dangling pointer)
# - context/server.nim: nil lsquicConn on connection close (prevents dangling pointer)
set -e
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
ROOT_DIR="$(cd "$SCRIPT_DIR/.." && pwd)"
echo "Applying nim-libp2p QUIC patches from $SCRIPT_DIR/vendor/ to $ROOT_DIR/vendor/"
find "$SCRIPT_DIR/vendor" -type f | while read src; do
dst="$ROOT_DIR/${src#$SCRIPT_DIR/}"
if [ -f "$dst" ]; then
cp "$src" "$dst"
echo " patched: ${dst#$ROOT_DIR/}"
else
echo " WARNING: target not found: ${dst#$ROOT_DIR/}"
fi
done
echo "Done. Now rebuild: make wakunode2"

View File

@ -0,0 +1,69 @@
# SPDX-License-Identifier: Apache-2.0 OR MIT
# Copyright (c) Status Research & Development GmbH
{.push raises: [].}
import chronos, chronicles
import ../stream/connection, ../errors
logScope:
topics = "libp2p muxer"
const DefaultChanTimeout* = 5.minutes
type
MuxerError* = object of LPError
TooManyChannels* = object of MuxerError
StreamHandler* = proc(conn: Connection): Future[void] {.async: (raises: []).}
MuxerHandler* = proc(muxer: Muxer): Future[void] {.async: (raises: []).}
Muxer* = ref object of RootObj
streamHandler*: StreamHandler
handler*: Future[void].Raising([])
connection*: Connection
# user provider proc that returns a constructed Muxer
MuxerConstructor* = proc(conn: Connection): Muxer {.gcsafe, closure, raises: [].}
# this wraps a creator proc that knows how to make muxers
MuxerProvider* = object
newMuxer*: MuxerConstructor
codec*: string
func shortLog*(m: Muxer): auto =
if m == nil:
"nil"
else:
shortLog(m.connection)
chronicles.formatIt(Muxer):
shortLog(it)
# muxer interface
method newStream*(
m: Muxer, name: string = "", lazy: bool = false
): Future[Connection] {.
base, async: (raises: [CancelledError, LPStreamError, MuxerError], raw: true)
.} =
raiseAssert("[Muxer.newStream] abstract method not implemented!")
when defined(libp2p_agents_metrics):
method setShortAgent*(m: Muxer, shortAgent: string) {.base, gcsafe.} =
m.connection.shortAgent = shortAgent
method close*(m: Muxer) {.base, async: (raises: []).} =
if m.connection != nil:
await m.connection.close()
method handle*(m: Muxer): Future[void] {.base, async: (raises: []).} =
discard
proc new*(
T: typedesc[MuxerProvider], creator: MuxerConstructor, codec: string
): T {.gcsafe.} =
let muxerProvider = T(newMuxer: creator, codec: codec)
muxerProvider
method getStreams*(m: Muxer): seq[Connection] {.base, gcsafe.} =
@[]

432
temp/vendor/nim-libp2p/libp2p/switch.nim vendored Normal file
View File

@ -0,0 +1,432 @@
# SPDX-License-Identifier: Apache-2.0 OR MIT
# Copyright (c) Status Research & Development GmbH
## The switch is the core of libp2p, which brings together the
## transports, the connection manager, the upgrader and other
## parts to allow programs to use libp2p
{.push raises: [].}
import std/[tables, options, sequtils, sets, oids]
import chronos, chronicles, metrics
import
stream/connection,
transports/transport,
transports/tcptransport,
transports/quictransport,
upgrademngrs/upgrade,
multistream,
multiaddress,
protocols/protocol,
protocols/secure/secure,
peerinfo,
utils/semaphore,
./muxers/muxer,
connmanager,
nameresolving/nameresolver,
peerid,
peerstore,
errors,
utility,
dialer
export connmanager, upgrade, dialer, peerstore
logScope:
topics = "libp2p switch"
#TODO: General note - use a finite state machine to manage the different
# steps of connections establishing and upgrading. This makes everything
# more robust and less prone to ordering attacks - i.e. muxing can come if
# and only if the channel has been secured (i.e. if a secure manager has been
# previously provided)
const ConcurrentUpgrades* = 4
type
Switch* {.public.} = ref object of Dial
peerInfo*: PeerInfo
connManager*: ConnManager
transports*: seq[Transport]
ms*: MultistreamSelect
acceptFuts: seq[Future[void]]
dialer*: Dial
peerStore*: PeerStore
nameResolver*: NameResolver
started: bool
services*: seq[Service]
UpgradeError* = object of LPError
Service* = ref object of RootObj
inUse: bool
method setup*(
self: Service, switch: Switch
): Future[bool] {.base, async: (raises: [CancelledError]).} =
if self.inUse:
warn "service setup has already been called"
return false
self.inUse = true
return true
method run*(self: Service, switch: Switch) {.base, async: (raises: [CancelledError]).} =
doAssert(false, "[Service.run] abstract method not implemented!")
method stop*(
self: Service, switch: Switch
): Future[bool] {.base, async: (raises: [CancelledError]).} =
if not self.inUse:
warn "service is already stopped"
return false
self.inUse = false
return true
proc addConnEventHandler*(
s: Switch, handler: ConnEventHandler, kind: ConnEventKind
) {.public.} =
## Adds a ConnEventHandler, which will be triggered when
## a connection to a peer is created or dropped.
## There may be multiple connections per peer.
##
## The handler should not raise.
s.connManager.addConnEventHandler(handler, kind)
proc removeConnEventHandler*(
s: Switch, handler: ConnEventHandler, kind: ConnEventKind
) {.public.} =
s.connManager.removeConnEventHandler(handler, kind)
proc addPeerEventHandler*(
s: Switch, handler: PeerEventHandler, kind: PeerEventKind
) {.public.} =
## Adds a PeerEventHandler, which will be triggered when
## a peer connects or disconnects from us.
##
## The handler should not raise.
s.connManager.addPeerEventHandler(handler, kind)
proc removePeerEventHandler*(
s: Switch, handler: PeerEventHandler, kind: PeerEventKind
) {.public.} =
s.connManager.removePeerEventHandler(handler, kind)
method addTransport*(s: Switch, t: Transport) =
s.transports &= t
s.dialer.addTransport(t)
proc connectedPeers*(s: Switch, dir: Direction): seq[PeerId] =
s.connManager.connectedPeers(dir)
proc isConnected*(s: Switch, peerId: PeerId): bool {.public.} =
## returns true if the peer has one or more
## associated connections
##
peerId in s.connManager
proc disconnect*(
s: Switch, peerId: PeerId
) {.public, async: (raises: [CancelledError]).} =
## Disconnect from a peer, waiting for the connection(s) to be dropped
await s.connManager.dropPeer(peerId)
method connect*(
s: Switch,
peerId: PeerId,
addrs: seq[MultiAddress],
forceDial = false,
reuseConnection = true,
dir = Direction.Out,
): Future[void] {.
public, async: (raises: [DialFailedError, CancelledError], raw: true)
.} =
## Connects to a peer without opening a stream to it
s.dialer.connect(peerId, addrs, forceDial, reuseConnection, dir)
method connect*(
s: Switch, address: MultiAddress, allowUnknownPeerId = false
): Future[PeerId] {.async: (raises: [DialFailedError, CancelledError], raw: true).} =
## Connects to a peer and retrieve its PeerId
##
## If the P2P part is missing from the MA and `allowUnknownPeerId` is set
## to true, this will discover the PeerId while connecting. This exposes
## you to MiTM attacks, so it shouldn't be used without care!
s.dialer.connect(address, allowUnknownPeerId)
method dial*(
s: Switch, peerId: PeerId, protos: seq[string]
): Future[Connection] {.
public, async: (raises: [DialFailedError, CancelledError], raw: true)
.} =
## Open a stream to a connected peer with the specified `protos`
s.dialer.dial(peerId, protos)
proc dial*(
s: Switch, peerId: PeerId, proto: string
): Future[Connection] {.
public, async: (raises: [DialFailedError, CancelledError], raw: true)
.} =
## Open a stream to a connected peer with the specified `proto`
dial(s, peerId, @[proto])
method dial*(
s: Switch,
peerId: PeerId,
addrs: seq[MultiAddress],
protos: seq[string],
forceDial = false,
): Future[Connection] {.
public, async: (raises: [DialFailedError, CancelledError], raw: true)
.} =
## Connected to a peer and open a stream
## with the specified `protos`
s.dialer.dial(peerId, addrs, protos, forceDial)
proc dial*(
s: Switch, peerId: PeerId, addrs: seq[MultiAddress], proto: string
): Future[Connection] {.
public, async: (raises: [DialFailedError, CancelledError], raw: true)
.} =
## Connected to a peer and open a stream
## with the specified `proto`
dial(s, peerId, addrs, @[proto])
proc mount*[T: LPProtocol](
s: Switch, proto: T, matcher: Matcher = nil
) {.gcsafe, raises: [LPError], public.} =
## mount a protocol to the switch
if isNil(proto.handler):
raise newException(LPError, "Protocol has to define a handle method or proc")
if proto.codec.len == 0:
raise newException(LPError, "Protocol has to define a codec string")
if s.started and not proto.started:
raise newException(LPError, "Protocol not started")
s.ms.addHandler(proto.codecs, proto, matcher)
s.peerInfo.protocols.add(proto.codec)
proc upgrader(
switch: Switch, trans: Transport, conn: Connection
) {.async: (raises: [CancelledError, UpgradeError]).} =
try:
let muxed = await trans.upgrade(conn, Opt.none(PeerId))
switch.connManager.storeMuxer(muxed)
await switch.peerStore.identify(muxed, conn.transportDir)
await switch.connManager.triggerPeerEvents(
muxed.connection.peerId,
PeerEvent(kind: PeerEventKind.Identified, initiator: false),
)
except CancelledError as e:
raise e
except CatchableError as e:
raise newException(UpgradeError, "catchable error upgrader: " & e.msg, e)
proc upgradeMonitor(
switch: Switch, trans: Transport, conn: Connection, upgrades: AsyncSemaphore
) {.async: (raises: []).} =
var upgradeSuccessful = false
try:
await switch.upgrader(trans, conn).wait(30.seconds)
trace "Connection upgrade succeeded"
upgradeSuccessful = true
except CancelledError:
trace "Connection upgrade cancelled", conn
except AsyncTimeoutError:
trace "Connection upgrade timeout", conn
libp2p_failed_upgrades_incoming.inc()
except UpgradeError as e:
trace "Connection upgrade failed", description = e.msg, conn
libp2p_failed_upgrades_incoming.inc()
finally:
if (not upgradeSuccessful) and (not isNil(conn)):
await conn.close()
try:
upgrades.release()
except AsyncSemaphoreError:
raiseAssert "semaphore released without acquire"
proc accept(s: Switch, transport: Transport) {.async: (raises: []).} =
## switch accept loop, ran for every transport
##
let upgrades = newAsyncSemaphore(ConcurrentUpgrades)
while transport.running:
try:
await upgrades.acquire() # first wait for an upgrade slot to become available
except CancelledError:
return
var conn: Connection
try:
debug "About to accept incoming connection"
# remember to always release the slot when
# the upgrade succeeds or fails, this is
# currently done by the `upgradeMonitor`
let slot = await s.connManager.getIncomingSlot()
conn =
try:
await transport.accept()
except CancelledError as exc:
slot.release()
raise exc
except CatchableError as exc:
slot.release()
raise
newException(CatchableError, "failed to accept connection: " & exc.msg, exc)
if isNil(conn):
# A nil connection means that we might have hit a
# file-handle limit (or another non-fatal error),
# we can get one on the next try
debug "Unable to get a connection"
slot.release()
try:
upgrades.release()
except AsyncSemaphoreError:
raiseAssert "semaphore released without acquire"
continue
slot.trackConnection(conn)
# set the direction of this bottom level transport
# in order to be able to consume this information in gossipsub if required
# gossipsub gives priority to connections we make
conn.transportDir = Direction.In
debug "Accepted an incoming connection", conn
asyncSpawn s.upgradeMonitor(transport, conn, upgrades)
except CancelledError:
try:
upgrades.release()
except AsyncSemaphoreError:
raiseAssert "semaphore released without acquire"
return
except CatchableError as exc:
error "Exception in accept loop, exiting", description = exc.msg
if not isNil(conn):
await conn.close()
try:
upgrades.release()
except AsyncSemaphoreError:
raiseAssert "semaphore released without acquire"
return
proc stop*(s: Switch) {.public, async: (raises: [CancelledError]).} =
## Stop listening on every transport, and
## close every active connections
trace "Stopping switch"
s.started = false
try:
# Stop accepting incoming connections
await allFutures(s.acceptFuts.mapIt(it.cancelAndWait())).wait(1.seconds)
except CatchableError as exc:
debug "Cannot cancel accepts", description = exc.msg
for service in s.services:
discard await service.stop(s)
# close and cleanup all connections
await s.connManager.close()
for transp in s.transports:
try:
await transp.stop()
except CancelledError as exc:
raise exc
except CatchableError as exc:
warn "error cleaning up transports", description = exc.msg
await s.ms.stop()
trace "Switch stopped"
proc start*(s: Switch) {.public, async: (raises: [CancelledError, LPError]).} =
## Start listening on every transport
if s.started:
warn "Switch has already been started"
return
debug "starting switch for peer", peerInfo = s.peerInfo
var startFuts: seq[Future[void]]
for t in s.transports:
let addrs = s.peerInfo.listenAddrs.filterIt(t.handles(it))
s.peerInfo.listenAddrs.keepItIf(it notin addrs)
if addrs.len > 0 or t.running:
let fut = t.start(addrs)
startFuts.add(fut)
if t of TcpTransport:
await fut
s.acceptFuts.add(s.accept(t))
s.peerInfo.listenAddrs &= t.addrs
# some transports require some services to be running
# in order to finish their startup process
for service in s.services:
discard await service.setup(s)
await allFutures(startFuts)
for fut in startFuts:
if fut.failed:
await s.stop()
raise newException(
LPError, "starting transports failed: " & $fut.error.msg, fut.error
)
for t in s.transports: # for each transport
if t.addrs.len > 0 or t.running:
if t of TcpTransport:
continue # already added previously
s.acceptFuts.add(s.accept(t))
s.peerInfo.listenAddrs &= t.addrs
await s.peerInfo.update()
await s.ms.start()
s.started = true
debug "Started libp2p node", peer = s.peerInfo
proc newSwitch*(
peerInfo: PeerInfo,
transports: seq[Transport],
secureManagers: openArray[Secure] = [],
connManager: ConnManager,
ms: MultistreamSelect,
peerStore: PeerStore,
nameResolver: NameResolver = nil,
services = newSeq[Service](),
): Switch {.raises: [LPError].} =
if secureManagers.len == 0:
raise newException(LPError, "Provide at least one secure manager")
let switch = Switch(
peerInfo: peerInfo,
ms: ms,
transports: transports,
connManager: connManager,
peerStore: peerStore,
dialer:
Dialer.new(peerInfo.peerId, connManager, peerStore, transports, nameResolver),
nameResolver: nameResolver,
services: services,
)
switch.connManager.peerStore = peerStore
return switch

View File

@ -0,0 +1,455 @@
# SPDX-License-Identifier: Apache-2.0 OR MIT
# Copyright (c) Status Research & Development GmbH
import std/[sequtils, sets]
import chronos, chronicles, metrics, results
import lsquic
import
../wire,
../multiaddress,
../multicodec,
../muxers/muxer,
../stream/connection,
../upgrademngrs/upgrade
import ./transport
import tls/certificate
export multiaddress
export multicodec
export connection
export transport
logScope:
topics = "libp2p quictransport"
type
P2PConnection = connection.Connection
QuicConnection = lsquic.Connection
QuicTransportError* = object of transport.TransportError
QuicTransportDialError* = object of transport.TransportDialError
QuicTransportAcceptStopped* = object of QuicTransportError
QuicStream* = ref object of P2PConnection
session: QuicSession
stream: Stream
QuicSession* = ref object of P2PConnection
connection: QuicConnection
streams: seq[QuicStream]
const alpn = "libp2p"
initializeLsquic()
proc new(
_: type QuicStream,
stream: Stream,
dir: Direction,
session: QuicSession,
oaddr: Opt[MultiAddress],
laddr: Opt[MultiAddress],
peerId: PeerId,
): QuicStream =
let quicstream = QuicStream(
session: session,
stream: stream,
observedAddr: oaddr,
localAddr: laddr,
peerId: peerId,
)
quicstream.objName = "QuicStream"
quicstream.dir = dir
procCall P2PConnection(quicstream).initStream()
quicstream
method getWrapped*(self: QuicStream): P2PConnection =
self
method readOnce*(
stream: QuicStream, pbytes: pointer, nbytes: int
): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} =
if stream.atEof:
raise newLPStreamRemoteClosedError()
let readLen =
try:
await stream.stream.readOnce(cast[ptr byte](pbytes), nbytes)
except StreamError as e:
raise (ref LPStreamError)(msg: "error in readOnce: " & e.msg, parent: e)
if readLen == 0:
stream.isEof = true
return 0
stream.activity = true
libp2p_network_bytes.inc(readLen.int64, labelValues = ["in"])
return readLen
method write*(
stream: QuicStream, bytes: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
try:
await stream.stream.write(bytes)
libp2p_network_bytes.inc(bytes.len.int64, labelValues = ["out"])
except StreamError:
raise newLPStreamRemoteClosedError()
method closeWrite*(stream: QuicStream) {.async: (raises: []).} =
## Close the write side of the QUIC stream
try:
await stream.stream.close()
except CancelledError, StreamError:
discard
method closeImpl*(stream: QuicStream) {.async: (raises: []).} =
try:
await stream.stream.close()
except CancelledError, StreamError:
discard
await procCall P2PConnection(stream).closeImpl()
# Session
method closed*(session: QuicSession): bool {.raises: [].} =
procCall P2PConnection(session).isClosed or session.connection.isClosed
method close*(session: QuicSession) {.async: (raises: []).} =
await noCancel allFutures(session.streams.mapIt(it.close()))
session.connection.close()
await procCall P2PConnection(session).close()
proc getStream(
session: QuicSession, direction = Direction.In
): Future[QuicStream] {.async: (raises: [CancelledError, ConnectionError]).} =
if session.closed:
raise newException(ConnectionClosedError, "session is closed")
var stream: Stream
case direction
of Direction.In:
stream = await session.connection.incomingStream()
of Direction.Out:
stream = await session.connection.openStream()
let qs = QuicStream.new(
stream, direction, session, session.observedAddr, session.localAddr, session.peerId
)
when defined(libp2p_agents_metrics):
qs.shortAgent = session.shortAgent
# Inherit transportDir from parent session for GossipSub outbound peer tracking
qs.transportDir = session.transportDir
session.streams.add(qs)
return qs
method getWrapped*(self: QuicSession): P2PConnection =
self
# Muxer
type QuicMuxer* = ref object of Muxer
session: QuicSession
handleFut: Future[void]
proc new*(
_: type QuicMuxer, conn: P2PConnection, peerId: Opt[PeerId] = Opt.none(PeerId)
): QuicMuxer {.raises: [CertificateParsingError, LPError].} =
let session = QuicSession(conn)
session.peerId = peerId.valueOr:
let certificates = session.connection.certificates()
if certificates.len != 1:
raise (ref QuicTransportError)(msg: "expected one certificate in connection")
let cert = parse(certificates[0])
cert.peerId()
QuicMuxer(session: session, connection: conn)
when defined(libp2p_agents_metrics):
method setShortAgent*(m: QuicMuxer, shortAgent: string) =
m.session.shortAgent = shortAgent
for s in m.session.streams:
s.shortAgent = shortAgent
m.connection.shortAgent = shortAgent
method newStream*(
m: QuicMuxer, name: string = "", lazy: bool = false
): Future[P2PConnection] {.
async: (raises: [CancelledError, LPStreamError, MuxerError])
.} =
try:
return await m.session.getStream(Direction.Out)
except ConnectionError as e:
raise newException(MuxerError, "error in newStream: " & e.msg, e)
method handle*(m: QuicMuxer): Future[void] {.async: (raises: []).} =
proc handleStream(stream: QuicStream) {.async: (raises: []).} =
## call the muxer stream handler for this channel
##
await m.streamHandler(stream)
trace "finished handling stream"
doAssert(stream.closed, "connection not closed by handler!")
while not (m.session.atEof or m.session.closed):
try:
let stream = await m.session.getStream(Direction.In)
asyncSpawn handleStream(stream)
except ConnectionClosedError:
break # stop handling, connection was closed
except CancelledError:
continue # keep handling, until connection is closed
except ConnectionError as e:
# keep handling, until connection is closed.
# this stream failed but we need to keep handling for other streams.
trace "QuicMuxer.handler got error while opening stream", msg = e.msg
if not m.session.isClosed:
await m.session.close()
method getStreams*(m: QuicMuxer): seq[P2PConnection] {.gcsafe.} =
for s in m.session.streams:
result.add(P2PConnection(s))
method close*(m: QuicMuxer) {.async: (raises: []).} =
try:
await m.session.close()
if not isNil(m.handleFut):
m.handleFut.cancelSoon()
except CatchableError:
discard
# Transport
type QuicUpgrade = ref object of Upgrade
type CertGenerator =
proc(kp: KeyPair): CertificateX509 {.gcsafe, raises: [TLSCertificateError].}
type QuicTransport* = ref object of Transport
listener: Listener
client: Opt[QuicClient]
privateKey: PrivateKey
connections: seq[P2PConnection]
rng: ref HmacDrbgContext
certGenerator: CertGenerator
proc makeCertificateVerifier(): CertificateVerifier =
proc certificateVerifier(serverName: string, certificatesDer: seq[seq[byte]]): bool =
if certificatesDer.len != 1:
trace "CertificateVerifier: expected one certificate in the chain",
cert_count = certificatesDer.len
return false
let cert =
try:
parse(certificatesDer[0])
except CertificateParsingError as e:
trace "CertificateVerifier: failed to parse certificate", msg = e.msg
return false
return cert.verify()
return CustomCertificateVerifier.init(certificateVerifier)
proc defaultCertGenerator(
kp: KeyPair
): CertificateX509 {.gcsafe, raises: [TLSCertificateError].} =
return generateX509(kp, encodingFormat = EncodingFormat.PEM)
proc new*(_: type QuicTransport, u: Upgrade, privateKey: PrivateKey): QuicTransport =
let self = QuicTransport(
upgrader: QuicUpgrade(ms: u.ms),
privateKey: privateKey,
certGenerator: defaultCertGenerator,
)
procCall Transport(self).initialize()
self
proc new*(
_: type QuicTransport,
u: Upgrade,
privateKey: PrivateKey,
certGenerator: CertGenerator,
): QuicTransport =
let self = QuicTransport(
upgrader: QuicUpgrade(ms: u.ms),
privateKey: privateKey,
certGenerator: certGenerator,
)
procCall Transport(self).initialize()
self
method handles*(transport: QuicTransport, address: MultiAddress): bool {.raises: [].} =
if not procCall Transport(transport).handles(address):
return false
QUIC_V1.match(address)
proc makeConfig(self: QuicTransport): TLSConfig =
let pubkey = self.privateKey.getPublicKey().valueOr:
raiseAssert "could not obtain public key"
let cert = self.certGenerator(KeyPair(seckey: self.privateKey, pubkey: pubkey))
let certVerifier = makeCertificateVerifier()
let tlsConfig = TLSConfig.new(
cert.certificate, cert.privateKey, @[alpn].toHashSet(), Opt.some(certVerifier)
)
return tlsConfig
proc getRng(self: QuicTransport): ref HmacDrbgContext =
if self.rng.isNil:
self.rng = newRng()
return self.rng
proc toMultiAddress(ta: TransportAddress): MultiAddress {.raises: [MaError].} =
## Returns quic MultiAddress from TransportAddress
MultiAddress.init(ta, IPPROTO_UDP).get() & MultiAddress.init("/quic-v1").get()
method start*(
self: QuicTransport, addrs: seq[MultiAddress]
) {.async: (raises: [LPError, transport.TransportError, CancelledError]).} =
doAssert self.listener.isNil, "start() already called"
# TODO(#1663): handle multiple addr
try:
let server = QuicServer.new(self.makeConfig())
self.listener = server.listen(initTAddress(addrs[0]).tryGet)
let listenMA = @[toMultiAddress(self.listener.localAddress())]
await procCall Transport(self).start(listenMA)
except QuicConfigError as exc:
raiseAssert "invalid quic setup: " & $exc.msg
except TLSCertificateError as exc:
raise (ref QuicTransportError)(
msg: "tlscert error in quic start: " & exc.msg, parent: exc
)
except QuicError as exc:
raise
(ref QuicTransportError)(msg: "quicerror in quic start: " & exc.msg, parent: exc)
except TransportOsError as exc:
raise (ref QuicTransportError)(
msg: "transport error in quic start: " & exc.msg, parent: exc
)
method stop*(transport: QuicTransport) {.async: (raises: []).} =
let futs = transport.connections.mapIt(it.close())
await noCancel allFutures(futs)
if not transport.listener.isNil:
try:
await transport.listener.stop()
except CatchableError as exc:
trace "Error shutting down Quic transport", description = exc.msg
transport.listener = nil
transport.client.withValue(client):
await noCancel client.stop()
transport.client = Opt.none(QuicClient)
await procCall Transport(transport).stop()
proc wrapConnection(
transport: QuicTransport, connection: QuicConnection, transportDir: Direction
): QuicSession {.raises: [TransportOsError].} =
var observedAddr: MultiAddress
var localAddr: MultiAddress
try:
observedAddr = toMultiAddress(connection.remoteAddress())
localAddr = toMultiAddress(connection.localAddress())
except MaError as e:
raiseAssert "Multiaddr Error" & e.msg
let session = QuicSession(
dir: transportDir,
objName: "QuicSession",
connection: connection,
observedAddr: Opt.some(observedAddr),
localAddr: Opt.some(localAddr),
)
session.initStream()
# Set the transport direction for outbound peer tracking in GossipSub 1.1
session.transportDir = transportDir
transport.connections.add(session)
proc onClose() {.async: (raises: []).} =
await noCancel session.join()
transport.connections.keepItIf(it != session)
trace "Cleaned up client"
asyncSpawn onClose()
return session
method accept*(
self: QuicTransport
): Future[connection.Connection] {.
async: (raises: [transport.TransportError, CancelledError])
.} =
if not self.running:
# stop accept only when transport is stopped (not when error occurs)
raise newException(QuicTransportAcceptStopped, "Quic transport stopped")
doAssert not self.listener.isNil, "call start() before calling accept()"
try:
let connection = await self.listener.accept()
return self.wrapConnection(connection, Direction.In)
except QuicError as exc:
debug "Quic Error", description = exc.msg
except common.TransportError as exc:
debug "Transport Error", description = exc.msg
except TransportOsError as exc:
debug "OS Error", description = exc.msg
method dial*(
self: QuicTransport,
hostname: string,
address: MultiAddress,
peerId: Opt[PeerId] = Opt.none(PeerId),
): Future[connection.Connection] {.
async: (raises: [transport.TransportError, CancelledError])
.} =
let taAddress =
try:
initTAddress(address).tryGet
except LPError as e:
raise newException(
QuicTransportDialError, "error in quic dial: invald address: " & e.msg, e
)
try:
if not self.client.isSome:
self.client = Opt.some(QuicClient.new(self.makeConfig()))
let client = self.client.get()
let quicConnection = await client.dial(taAddress)
return self.wrapConnection(quicConnection, Direction.Out)
except QuicConfigError as e:
raise newException(
QuicTransportDialError, "error in quic dial: invalid tls config:" & e.msg, e
)
except TLSCertificateError as e:
raise newException(
QuicTransportDialError, "error in quic dial: tls certificate error:" & e.msg, e
)
except TransportOsError as e:
raise newException(QuicTransportDialError, "error in quic dial:" & e.msg, e)
except DialError as e:
raise newException(QuicTransportDialError, "error in quic dial:" & e.msg, e)
except QuicError as e:
raise newException(QuicTransportDialError, "error in quic dial:" & e.msg, e)
method upgrade*(
self: QuicTransport, conn: P2PConnection, peerId: Opt[PeerId]
): Future[Muxer] {.async: (raises: [CancelledError, LPError]).} =
let muxer = QuicMuxer.new(conn, peerId)
muxer.streamHandler = proc(conn: P2PConnection) {.async: (raises: []).} =
trace "Starting stream handler"
try:
await self.upgrader.ms.handle(conn) # handle incoming connection
except CancelledError as exc:
return
except CatchableError as exc:
trace "exception in stream handler", conn, msg = exc.msg
finally:
await conn.closeWithEOF()
trace "Stream handler done", conn
muxer.handleFut = muxer.handle()
return muxer

View File

@ -0,0 +1,164 @@
# SPDX-License-Identifier: Apache-2.0 OR MIT
# Copyright (c) Status Research & Development GmbH
import results
import chronicles
import chronos
import chronos/osdefs
import ./[context, io, stream]
import ../[lsquic_ffi, errors, tlsconfig, timeout, stream, certificates]
import ../helpers/sequninit
proc onNewConn(
stream_if_ctx: pointer, conn: ptr lsquic_conn_t
): ptr lsquic_conn_ctx_t {.cdecl.} =
debug "New connection established: client"
let conn_ctx = lsquic_conn_get_ctx(conn)
cast[ptr lsquic_conn_ctx_t](conn_ctx)
proc onHandshakeDone(
conn: ptr lsquic_conn_t, status: enum_lsquic_hsk_status
) {.cdecl.} =
debug "Handshake done", status
let conn_ctx = lsquic_conn_get_ctx(conn)
if conn_ctx.isNil:
debug "conn_ctx is nil in onHandshakeDone"
return
let quicClientConn = cast[QuicConnection](conn_ctx)
if quicClientConn.connectedFut.finished:
return
if status == LSQ_HSK_FAIL or status == LSQ_HSK_RESUMED_FAIL:
quicClientConn.connectedFut.fail(
newException(DialError, "could not connect to server. Handshake failed")
)
else:
let x509chain = lsquic_conn_get_full_cert_chain(quicClientConn.lsquicConn)
let certChain = x509chain.getCertChain()
OPENSSL_sk_free(cast[ptr OPENSSL_STACK](x509chain))
quicClientConn.certChain = certChain
quicClientConn.connectedFut.complete()
proc onConnClosed(conn: ptr lsquic_conn_t) {.cdecl.} =
debug "Connection closed: client"
let conn_ctx = lsquic_conn_get_ctx(conn)
if not conn_ctx.isNil:
let quicClientConn = cast[QuicConnection](conn_ctx)
if not quicClientConn.connectedFut.finished:
# Not connected yet
var buf: array[256, char]
let connStatus =
lsquic_conn_status(conn, cast[cstring](addr buf[0]), buf.len.csize_t)
let msg = $cast[cstring](addr buf[0])
quicClientConn.connectedFut.fail(
newException(
DialError, "could not connect to server. Status: " & $connStatus & ". " & msg
)
)
quicClientConn.cancelPending()
quicClientConn.lsquicConn = nil
quicClientConn.onClose()
GC_unref(quicClientConn)
lsquic_conn_set_ctx(conn, nil)
method dial*(
ctx: ClientContext,
local: TransportAddress,
remote: TransportAddress,
connectedFut: Future[void],
onClose: proc() {.gcsafe, raises: [].},
): Result[QuicConnection, string] {.raises: [], gcsafe.} =
var
localAddress: Sockaddr_storage
localAddrLen: SockLen
remoteAddress: Sockaddr_storage
remoteAddrLen: SockLen
local.toSAddr(localAddress, localAddrLen)
remote.toSAddr(remoteAddress, remoteAddrLen)
# TODO: should use constructor
let quicClientConn = QuicConnection(
isOutgoing: true,
connectedFut: connectedFut,
local: local,
remote: remote,
incoming: newAsyncQueue[Stream](),
onClose: onClose,
)
GC_ref(quicClientConn) # Keep it pinned until on_conn_closed is called
let conn = lsquic_engine_connect(
ctx.engine,
N_LSQVER,
cast[ptr SockAddr](addr localAddress),
cast[ptr SockAddr](addr remoteAddress),
cast[pointer](ctx),
cast[ptr lsquic_conn_ctx_t](quicClientConn),
nil,
0,
nil,
0,
nil,
0,
)
if conn.isNil:
return err("could not dial: " & $remote)
quicClientConn.lsquicConn = conn
ok(quicClientConn)
const Cubic = 1
proc new*(T: typedesc[ClientContext], tlsConfig: TLSConfig): Result[T, string] =
var ctx = ClientContext()
ctx.tlsConfig = tlsConfig
ctx.setupSSLContext()
lsquic_engine_init_settings(addr ctx.settings, 0)
ctx.settings.es_versions = 1.cuint shl LSQVER_I001.cuint #IETF QUIC v1
ctx.settings.es_cc_algo = Cubic
ctx.settings.es_dplpmtud = 1
ctx.settings.es_base_plpmtu = 1280
ctx.settings.es_max_plpmtu = 0
ctx.settings.es_pace_packets = 1
ctx.settings.es_cfcw = 1 * 1024 * 1024
ctx.settings.es_max_cfcw = 2 * 1024 * 1024
ctx.settings.es_sfcw = 256 * 1024
ctx.settings.es_max_sfcw = 512 * 1024
ctx.settings.es_init_max_stream_data_bidi_local = ctx.settings.es_sfcw
ctx.settings.es_init_max_stream_data_bidi_remote = ctx.settings.es_sfcw
ctx.settings.es_max_batch_size = 32
ctx.stream_if = struct_lsquic_stream_if(
on_new_conn: onNewConn,
on_hsk_done: onHandshakeDone,
on_conn_closed: onConnClosed,
on_new_stream: onNewStream,
on_read: onRead,
on_write: onWrite,
on_close: onClose,
)
ctx.api = struct_lsquic_engine_api(
ea_settings: addr ctx.settings,
ea_stream_if_ctx: cast[pointer](ctx),
ea_packets_out_ctx: cast[pointer](ctx),
ea_stream_if: addr ctx.stream_if,
ea_get_ssl_ctx: getSSLCtx,
ea_packets_out: sendPacketsOut,
)
ctx.engine = lsquic_engine_new(0, addr ctx.api)
if ctx.engine.isNil:
return err("failed to create lsquic engine")
ctx.tickTimeout = newTimeout(
proc() =
ctx.engine_process()
)
ctx.tickTimeout.set(Moment.now())
return ok(ctx)

View File

@ -0,0 +1,269 @@
# SPDX-License-Identifier: Apache-2.0 OR MIT
# Copyright (c) Status Research & Development GmbH
import chronos
import chronos/osdefs
import chronicles
import
../[lsquic_ffi, errors, tlsconfig, timeout, certificates, certificateverifier, stream]
let SSL_CTX_ID = SSL_CTX_get_ex_new_index(0, nil, nil, nil, nil) # Yes, this is global
doAssert SSL_CTX_ID >= 0, "could not generate global ssl_ctx id"
type QuicContext* = ref object of RootObj
settings*: struct_lsquic_engine_settings
api*: struct_lsquic_engine_api
engine*: ptr struct_lsquic_engine
stream_if*: struct_lsquic_stream_if
tlsConfig*: TLSConfig
tickTimeout*: Timeout
sslCtx*: ptr SSL_CTX
fd*: cint
proc engine_process*(ctx: QuicContext) =
lsquic_engine_process_conns(ctx.engine)
if lsquic_engine_has_unsent_packets(ctx.engine) != 0:
lsquic_engine_send_unsent_packets(ctx.engine)
var diff: cint
if lsquic_engine_earliest_adv_tick(ctx.engine, addr diff) == 0:
return
let delta =
if diff < 0: LSQUIC_DF_CLOCK_GRANULARITY.microseconds else: diff.microseconds
ctx.tickTimeout.set(delta)
type PendingStream = object
stream: Stream
created: Future[void].Raising([CancelledError, ConnectionError])
type QuicConnection* = ref object of RootObj
isOutgoing*: bool
local*: TransportAddress
remote*: TransportAddress
lsquicConn*: ptr lsquic_conn_t
onClose*: proc() {.gcsafe, raises: [].}
closedLocal*: bool
closedRemote*: bool
incoming*: AsyncQueue[Stream]
connectedFut*: Future[void]
pendingStreams: seq[PendingStream]
certChain*: seq[seq[byte]]
type ClientContext* = ref object of QuicContext
type ServerContext* = ref object of QuicContext
incoming*: AsyncQueue[QuicConnection]
proc processWhenReady*(quicContext: QuicContext) =
quicContext.tickTimeout.set(Moment.now())
proc incomingStream*(
quicConn: QuicConnection
): Future[Stream] {.async: (raises: [CancelledError]).} =
await quicConn.incoming.get()
proc addPendingStream*(
quicConn: QuicConnection, s: Stream
): Future[void].Raising([CancelledError, ConnectionError]) {.raises: [], gcsafe.} =
let created = Future[void].Raising([CancelledError, ConnectionError]).init(
"QuicConnection.addPendingStream"
)
quicConn.pendingStreams.add(PendingStream(stream: s, created: created))
created
proc popPendingStream*(
quicConn: QuicConnection, stream: ptr lsquic_stream_t
): Opt[Stream] {.raises: [], gcsafe.} =
if quicConn.pendingStreams.len == 0:
debug "no pending streams!"
return Opt.none(Stream)
let pending = quicConn.pendingStreams.pop()
pending.stream.quicStream = stream
pending.created.complete()
Opt.some(pending.stream)
proc cancelPending*(quicConn: QuicConnection) =
for pending in quicConn.pendingStreams:
pending.created.fail(newException(ConnectionError, "can't open new streams"))
proc alpnSelectProtoCB(
ssl: ptr SSL,
outv: ptr ptr uint8,
outlen: ptr uint8,
inv: ptr uint8,
inlen: cuint,
userData: pointer,
): cint {.cdecl.} =
let serverCtx = cast[ServerContext](userData)
if (
SSL_select_next_proto(
outv,
outlen,
cast[ptr uint8](serverCtx.tlsConfig.alpnWire.cstring),
cast[cuint](serverCtx.tlsConfig.alpnWire.len),
inv,
inlen,
) == OPENSSL_NPN_NEGOTIATED
):
return SSL_TLSEXT_ERR_OK
return SSL_TLSEXT_ERR_ALERT_FATAL
proc verifyCertificate(
ssl: ptr SSL, out_alert: ptr uint8
): enum_ssl_verify_result_t {.cdecl.} =
let sslCtx = SSL_get_SSL_CTX(ssl)
let quicCtx = cast[QuicContext](SSL_CTX_get_ex_data(sslCtx, SSL_CTX_ID))
if quicCtx.isNil:
raiseAssert "could not obtain context"
let derCertificates = getFullCertChain(ssl)
let serverName = SSL_get_servername(ssl, TLSEXT_NAMETYPE_host_name)
doAssert quicCtx.tlsConfig.certVerifier.isSome, "no custom validator set"
if quicCtx.tlsConfig.certVerifier.get().verify($serverName, derCertificates):
return ssl_verify_ok
else:
out_alert[] = SSL_AD_CERTIFICATE_UNKNOWN
return ssl_verify_invalid
proc setupSSLContext*(quicCtx: QuicContext) =
let sslCtx = SSL_CTX_new(
if quicCtx is ServerContext:
TLS_server_method()
else:
TLS_client_method()
)
if sslCtx.isNil:
raiseAssert "failed to create sslCtx"
if SSL_CTX_set_ex_data(sslCtx, SSL_CTX_ID, cast[pointer](quicCtx)) != 1:
raiseAssert "could not set data in sslCtx"
var opts =
0 or SSL_OP_NO_SSLv2 or SSL_OP_NO_SSLv3 or SSL_OP_NO_TLSv1 or SSL_OP_NO_TLSv1_1 or
SSL_OP_CIPHER_SERVER_PREFERENCE
discard SSL_CTX_set_options(sslCtx, opts.uint32)
if quicCtx.tlsConfig.key.len != 0 and quicCtx.tlsConfig.certificate.len != 0:
let pkey = quicCtx.tlsConfig.key.toPKey().valueOr:
raiseAssert "could not convert certificate to pkey: " & error
let cert = quicCtx.tlsConfig.certificate.toX509().valueOr:
raiseAssert "could not convert certificate to x509: " & error
defer:
X509_free(cert)
EVP_PKEY_free(pkey)
if SSL_CTX_use_certificate(sslCtx, cert) != 1:
raiseAssert "could not use certificate"
if SSL_CTX_use_PrivateKey(sslCtx, pkey) != 1:
raiseAssert "could not use private key"
if SSL_CTX_check_private_key(sslCtx) != 1:
raiseAssert "cant use private key with certificate"
if (SSL_CTX_set1_sigalgs_list(sslCtx, "ed25519:ecdsa_secp256r1_sha256") != 1):
raiseAssert "could not set supported algorithm list"
if quicCtx.tlsConfig.certVerifier.isSome:
SSL_CTX_set_custom_verify(
sslCtx, SSL_VERIFY_PEER or SSL_VERIFY_FAIL_IF_NO_PEER_CERT, verifyCertificate
)
if quicCtx of ServerContext:
SSL_CTX_set_alpn_select_cb(sslCtx, alpnSelectProtoCB, cast[pointer](quicCtx))
else:
if SSL_CTX_set_alpn_protos(
sslCtx,
cast[ptr uint8](quicCtx.tlsConfig.alpnWire.cstring),
cast[cuint](quicCtx.tlsConfig.alpnWire.len),
) != 0:
raiseAssert "can't set client alpn"
discard SSL_CTX_set_min_proto_version(sslCtx, TLS1_3_VERSION)
discard SSL_CTX_set_max_proto_version(sslCtx, TLS1_3_VERSION)
quicCtx.sslCtx = sslCtx
proc getSSLCtx*(peer_ctx: pointer, sockaddr: ptr SockAddr): ptr SSL_CTX {.cdecl.} =
let quicCtx = cast[QuicContext](peer_ctx)
quicCtx.sslCtx
proc stop*(ctx: QuicContext) {.raises: [].} =
ctx.tickTimeout.stop()
lsquic_engine_destroy(ctx.engine)
proc close*(ctx: QuicContext, conn: QuicConnection) =
if conn != nil and conn.lsquicConn != nil:
lsquic_conn_close(conn.lsquicConn)
ctx.processWhenReady()
proc abort*(ctx: QuicContext, conn: QuicConnection) =
if conn != nil and conn.lsquicConn != nil:
lsquic_conn_abort(conn.lsquicConn)
ctx.processWhenReady()
method dial*(
ctx: QuicContext,
local: TransportAddress,
remote: TransportAddress,
connectedFut: Future[void],
onClose: proc() {.gcsafe, raises: [].},
): Result[QuicConnection, string] {.base, gcsafe, raises: [].} =
raiseAssert "dial not implemented"
proc makeStream*(ctx: QuicContext, quicConn: QuicConnection) {.raises: [].} =
debug "Creating stream"
if quicConn == nil or quicConn.lsquicConn == nil:
debug "Cannot create stream: connection is nil"
return
lsquic_conn_make_stream(quicConn.lsquicConn)
proc onNewStream*(
stream_if_ctx: pointer, stream: ptr lsquic_stream_t
): ptr lsquic_stream_ctx_t {.cdecl.} =
debug "New stream created"
let conn = lsquic_stream_conn(stream)
let conn_ctx = lsquic_conn_get_ctx(conn)
if conn_ctx.isNil:
debug "conn_ctx is nil in onNewStream"
return nil
let quicConn = cast[QuicConnection](conn_ctx)
let stream_id = lsquic_stream_id(stream).int
let isLocal =
if quicConn.isOutgoing:
(stream_id and 1) == 0
else:
(stream_id and 1) == 1
let streamCtx =
if isLocal:
let s = quicConn.popPendingStream(stream).valueOr:
return
# Whoever opens the stream writes first
discard lsquic_stream_wantread(stream, 0)
discard lsquic_stream_wantwrite(stream, 1)
s
else:
let s = Stream.new(stream)
quicConn.incoming.putNoWait(s)
# Whoever opens the stream reads first
discard lsquic_stream_wantread(stream, 1)
discard lsquic_stream_wantwrite(stream, 0)
s
return cast[ptr lsquic_stream_ctx_t](streamCtx)
proc certificates*(
ctx: QuicContext, conn: QuicConnection
): seq[seq[byte]] {.raises: [].} =
conn.certChain

View File

@ -0,0 +1,102 @@
# SPDX-License-Identifier: Apache-2.0 OR MIT
# Copyright (c) Status Research & Development GmbH
import results
import chronicles
import chronos
import chronos/osdefs
import ./[context, io, stream]
import ../[lsquic_ffi, tlsconfig, timeout, stream, certificates]
import ../helpers/[sequninit, transportaddr]
proc onNewConn(
stream_if_ctx: pointer, conn: ptr lsquic_conn_t
): ptr lsquic_conn_ctx_t {.cdecl.} =
debug "New connection established: server"
var local: ptr SockAddr
var remote: ptr SockAddr
discard lsquic_conn_get_sockaddr(conn, addr local, addr remote)
let x509chain = lsquic_conn_get_full_cert_chain(conn)
let certChain = x509chain.getCertChain()
OPENSSL_sk_free(cast[ptr OPENSSL_STACK](x509chain))
# TODO: should use a constructor
let quicConn = QuicConnection(
isOutgoing: false,
incoming: newAsyncQueue[Stream](),
local: local.toTransportAddress(),
remote: remote.toTransportAddress(),
lsquicConn: conn,
certChain: certChain,
onClose: proc() =
discard,
)
GC_ref(quicConn) # Keep it pinned until on_conn_closed is called
let serverCtx = cast[ServerContext](stream_if_ctx)
serverCtx.incoming.putNoWait(quicConn)
cast[ptr lsquic_conn_ctx_t](quicConn)
proc onConnClosed(conn: ptr lsquic_conn_t) {.cdecl.} =
debug "Connection closed: server"
let conn_ctx = lsquic_conn_get_ctx(conn)
if not conn_ctx.isNil:
let quicConn = cast[QuicConnection](conn_ctx)
quicConn.lsquicConn = nil
quicConn.onClose()
GC_unref(quicConn)
lsquic_conn_set_ctx(conn, nil)
const Cubic = 1
proc new*(T: typedesc[ServerContext], tlsConfig: TLSConfig): Result[T, string] =
var ctx = ServerContext()
ctx.tlsConfig = tlsConfig
ctx.incoming = newAsyncQueue[QuicConnection]()
ctx.setupSSLContext()
lsquic_engine_init_settings(addr ctx.settings, LSENG_SERVER)
ctx.settings.es_versions = 1.cuint shl LSQVER_I001.cuint #IETF QUIC v1
ctx.settings.es_cc_algo = Cubic
ctx.settings.es_dplpmtud = 1
ctx.settings.es_base_plpmtu = 1280
ctx.settings.es_max_plpmtu = 0
ctx.settings.es_pace_packets = 1
ctx.settings.es_cfcw = 1 * 1024 * 1024
ctx.settings.es_max_cfcw = 2 * 1024 * 1024
ctx.settings.es_sfcw = 256 * 1024
ctx.settings.es_max_sfcw = 512 * 1024
ctx.settings.es_init_max_stream_data_bidi_local = ctx.settings.es_sfcw
ctx.settings.es_init_max_stream_data_bidi_remote = ctx.settings.es_sfcw
ctx.settings.es_max_batch_size = 32
ctx.stream_if = struct_lsquic_stream_if(
on_new_conn: onNewConn,
on_conn_closed: onConnClosed,
on_new_stream: onNewStream,
on_read: onRead,
on_write: onWrite,
on_close: onClose,
)
ctx.api = struct_lsquic_engine_api(
ea_settings: addr ctx.settings,
ea_stream_if_ctx: cast[pointer](ctx),
ea_packets_out_ctx: cast[pointer](ctx),
ea_stream_if: addr ctx.stream_if,
ea_get_ssl_ctx: getSSLCtx,
ea_packets_out: sendPacketsOut,
)
ctx.engine = lsquic_engine_new(LSENG_SERVER, addr ctx.api)
if ctx.engine.isNil:
return err("failed to create lsquic engine")
ctx.tickTimeout = newTimeout(
proc() =
ctx.engine_process()
)
ctx.tickTimeout.set(Moment.now())
return ok(ctx)

194
temp/vendor/nim-lsquic/lsquic/stream.nim vendored Normal file
View File

@ -0,0 +1,194 @@
# SPDX-License-Identifier: Apache-2.0 OR MIT
# Copyright (c) Status Research & Development GmbH
import std/[deques, posix]
import chronos
import chronicles
import ./[lsquic_ffi, errors]
type WriteTask* = object
data*: ptr byte
dataLen*: int
offset*: int
doneFut*: Future[void].Raising([CancelledError, StreamError])
type ReadTask* = object
data*: ptr byte
dataLen*: int
doneFut*: Future[int].Raising([CancelledError, StreamError])
type Stream* = ref object
quicStream*: ptr lsquic_stream_t
closedByEngine*: bool
closeWrite*: bool
# This is called when on_close callback is executed
closed*: AsyncEvent
# Reuse a single closed-event waiter to minimize allocations on hot paths.
# (no per call allocation)
closedWaiter*: Future[void].Raising([CancelledError])
writeLock*: AsyncLock
toWrite*: Opt[WriteTask]
readLock*: AsyncLock
isEof*: bool # Received a FIN from remote
toRead*: Opt[ReadTask]
doProcess*: proc() {.gcsafe, raises: [].}
proc new*(T: typedesc[Stream], quicStream: ptr lsquic_stream_t = nil): T =
let closed = newAsyncEvent()
let closedWaiter = closed.wait()
let s = Stream(
quicStream: quicStream,
closed: closed,
closedWaiter: closedWaiter,
readLock: newAsyncLock(),
writeLock: newAsyncLock(),
)
GC_ref(s) # Keep it pinned until stream_if.on_close is executed
s
proc abortPendingWrites*(stream: Stream, reason: string = "") =
let task = stream.toWrite.valueOr:
return
task.doneFut.fail(newException(StreamError, reason))
stream.toWrite = Opt.none(WriteTask)
proc abort*(stream: Stream) =
if stream.closeWrite and stream.isEof:
if not stream.closed.isSet():
stream.closed.fire()
stream.abortPendingWrites("stream aborted")
return
if not stream.closedByEngine:
let ret = lsquic_stream_close(stream.quicStream)
if ret != 0:
trace "could not abort stream", streamId = lsquic_stream_id(stream.quicStream)
stream.doProcess()
stream.closeWrite = true
stream.isEof = true
stream.abortPendingWrites("stream aborted")
stream.closed.fire()
proc close*(stream: Stream) {.async: (raises: [StreamError, CancelledError]).} =
if stream.closeWrite or stream.closedByEngine:
return
# Closing only the write side
let ret = lsquic_stream_shutdown(stream.quicStream, 1)
if ret == 0:
if stream.isEof:
if lsquic_stream_close(stream.quicStream) != 0:
stream.abort()
raise newException(StreamError, "could not close the stream")
stream.doProcess()
stream.abortPendingWrites("steam closed")
stream.closeWrite = true
proc readOnce*(
stream: Stream, dst: ptr byte, dstLen: int
): Future[int] {.async: (raises: [CancelledError, StreamError]).} =
if dstLen == 0 or dst.isNil:
raiseAssert "dst cannot be nil"
if stream.isEof or stream.closedByEngine:
return 0
await stream.readLock.acquire()
defer:
try:
stream.readLock.release()
except AsyncLockError:
discard # should not happen - lock acquired directly above
# In case stream was closed while waiting for lock being acquired
if stream.closedByEngine:
return 0
let n = lsquic_stream_read(stream.quicStream, dst, dstLen.csize_t)
if n == 0:
stream.isEof = true
return 0
elif n > 0:
return n
if n < 0 and errno != EWOULDBLOCK:
stream.abort()
raise newException(StreamError, "could not read: " & $errno)
if lsquic_stream_wantread(stream.quicStream, 1) == -1:
stream.abort()
raise newException(StreamError, "could not set wantread")
let doneFut =
Future[int].Raising([CancelledError, StreamError]).init("Stream.readOnce")
stream.toRead = Opt.some(ReadTask(data: dst, dataLen: dstLen, doneFut: doneFut))
stream.doProcess()
let raceFut = await race(stream.closedWaiter, doneFut)
if raceFut == stream.closedWaiter:
await doneFut.cancelAndWait()
stream.isEof = true
stream.closeWrite = true
return 0
return await doneFut
template readOnce*(stream: Stream, dst: var openArray[byte]): untyped =
## Convenience helper that forwards an openArray/seq to the pointer-based API.
(if dst.len == 0: stream.readOnce(nil, 0)
else: stream.readOnce(dst[0].addr, dst.len))
proc write*(
stream: Stream, data: seq[byte]
) {.async: (raises: [CancelledError, StreamError]).} =
if data.len == 0:
return
if stream.closeWrite or stream.closedByEngine:
raise newException(StreamError, "stream closed")
await stream.writeLock.acquire()
defer:
try:
stream.writeLock.release()
except AsyncLockError:
discard # should not happen - lock acquired directly above
if stream.closedByEngine:
raise newException(StreamError, "stream closed")
# Try to write immediately
let p = data[0].addr
let n = lsquic_stream_write(stream.quicStream, p, data.len.csize_t)
if n >= data.len:
if lsquic_stream_flush(stream.quicStream) != 0:
stream.abort()
stream.doProcess()
return
elif n < 0:
error "could not write to stream", streamId = lsquic_stream_id(stream.quicStream), n
raise newException(StreamError, "could not write")
# Enqueue otherwise
let doneFut = Future[void].Raising([CancelledError, StreamError]).init("Stream.write")
stream.toWrite = Opt.some(
WriteTask(data: data[0].addr, dataLen: data.len, doneFut: doneFut, offset: n)
)
discard lsquic_stream_wantwrite(stream.quicStream, 1)
stream.doProcess()
let raceFut = await race(stream.closedWaiter, doneFut)
if raceFut == stream.closedWaiter:
if not doneFut.finished:
doneFut.fail(newException(StreamError, "stream closed"))
stream.closeWrite = true
await doneFut

View File

@ -60,9 +60,9 @@ suite "Peer Manager":
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()
server = newTestWakuNode(serverKey, listenIp, Port(3000))
server = newTestWakuNode(serverKey, listenIp, Port(3000), quicEnabled = false)
serverPeerStore = server.peerManager.switch.peerStore
client = newTestWakuNode(clientKey, listenIp, Port(3001))
client = newTestWakuNode(clientKey, listenIp, Port(3001), quicEnabled = false)
clientPeerStore = client.peerManager.switch.peerStore
await allFutures(server.start(), client.start())

View File

@ -270,9 +270,15 @@ procSuite "Peer Manager":
database = SqliteDatabase.new(":memory:")[]
storage = WakuPeerStorage.new(database)[]
node1 = newTestWakuNode(
generateSecp256k1Key(), getPrimaryIPAddr(), Port(44048), peerStorage = storage
generateSecp256k1Key(),
getPrimaryIPAddr(),
Port(44048),
peerStorage = storage,
quicEnabled = false,
)
node2 = newTestWakuNode(
generateSecp256k1Key(), getPrimaryIPAddr(), Port(34023), quicEnabled = false
)
node2 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(34023))
node1.mountMetadata(0, @[0'u16]).expect("Mounted Waku Metadata")
node2.mountMetadata(0, @[0'u16]).expect("Mounted Waku Metadata")
@ -311,6 +317,7 @@ procSuite "Peer Manager":
parseIpAddress("127.0.0.1"),
Port(56037),
peerStorage = storage,
quicEnabled = false,
)
node3.mountMetadata(0, @[0'u16]).expect("Mounted Waku Metadata")
@ -343,9 +350,15 @@ procSuite "Peer Manager":
database = SqliteDatabase.new(":memory:")[]
storage = WakuPeerStorage.new(database)[]
node1 = newTestWakuNode(
generateSecp256k1Key(), getPrimaryIPAddr(), Port(44048), peerStorage = storage
generateSecp256k1Key(),
getPrimaryIPAddr(),
Port(44048),
peerStorage = storage,
quicEnabled = false,
)
node2 = newTestWakuNode(
generateSecp256k1Key(), getPrimaryIPAddr(), Port(34023), quicEnabled = false
)
node2 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(34023))
node1.mountMetadata(0, @[0'u16]).expect("Mounted Waku Metadata")
node2.mountMetadata(0, @[0'u16]).expect("Mounted Waku Metadata")
@ -384,6 +397,7 @@ procSuite "Peer Manager":
parseIpAddress("127.0.0.1"),
Port(56037),
peerStorage = storage,
quicEnabled = false,
)
node3.mountMetadata(0, @[0'u16]).expect("Mounted Waku Metadata")

View File

@ -185,20 +185,21 @@ suite "WakuNode":
bindPort = Port(61006)
extIp = some(getPrimaryIPAddr())
extPort = some(Port(61008))
node = newTestWakuNode(nodeKey, bindIp, bindPort, extIp, extPort)
node =
newTestWakuNode(nodeKey, bindIp, bindPort, extIp, extPort, quicEnabled = false)
let
bindEndpoint = MultiAddress.init(bindIp, tcpProtocol, bindPort)
announcedEndpoint = MultiAddress.init(extIp.get(), tcpProtocol, extPort.get())
check:
# Check that underlying peer info contains only bindIp before starting
node.switch.peerInfo.listenAddrs.len == 1
# Check that underlying peer info contains bindIp before starting
node.switch.peerInfo.listenAddrs.len >= 1
node.switch.peerInfo.listenAddrs.contains(bindEndpoint)
# Underlying peer info has not updated addrs before starting
node.switch.peerInfo.addrs.len == 0
node.announcedAddresses.len == 1
node.announcedAddresses.len >= 1
node.announcedAddresses.contains(announcedEndpoint)
await node.start()
@ -206,14 +207,52 @@ suite "WakuNode":
check:
node.started
# Underlying peer info listenAddrs has not changed
node.switch.peerInfo.listenAddrs.len == 1
node.switch.peerInfo.listenAddrs.len >= 1
node.switch.peerInfo.listenAddrs.contains(bindEndpoint)
# Check that underlying peer info is updated with announced address
node.switch.peerInfo.addrs.len == 1
node.switch.peerInfo.addrs.len >= 1
node.switch.peerInfo.addrs.contains(announcedEndpoint)
await node.stop()
asyncTest "Peer info updates with correct announced addresses (QUIC)":
let
nodeKey = generateSecp256k1Key()
bindIp = parseIpAddress("0.0.0.0")
bindPort = Port(61006)
quicPort = Port(0)
extIp = some(getPrimaryIPAddr())
extPort = some(Port(61008))
node = newTestWakuNode(
nodeKey,
bindIp,
bindPort,
extIp,
extPort,
quicEnabled = true,
quicBindPort = quicPort,
)
let tcpAnnounced = MultiAddress.init(extIp.get(), tcpProtocol, extPort.get())
check:
node.switch.peerInfo.listenAddrs.len >= 2
node.switch.peerInfo.addrs.len == 0
node.announcedAddresses.len >= 2
node.announcedAddresses.contains(tcpAnnounced)
node.announcedAddresses.anyIt("/quic-v1" in $it)
await node.start()
check:
node.started
node.switch.peerInfo.listenAddrs.len >= 2
node.switch.peerInfo.addrs.len >= 2
node.switch.peerInfo.addrs.contains(tcpAnnounced)
node.switch.peerInfo.addrs.anyIt("/quic-v1" in $it)
await node.stop()
asyncTest "Node can use dns4 in announced addresses":
let
nodeKey = generateSecp256k1Key()
@ -229,7 +268,7 @@ suite "WakuNode":
)
check:
node.announcedAddresses.len == 1
node.announcedAddresses.len >= 1
node.announcedAddresses.contains(expectedDns4Addr)
asyncTest "Node uses dns4 resolved ip in announced addresses if no extIp is provided":

View File

@ -60,6 +60,8 @@ proc newTestWakuNode*(
wsBindPort: Port = (Port) 8000,
wsEnabled: bool = false,
wssEnabled: bool = false,
quicBindPort: Port = (Port) 0,
quicEnabled: bool = true,
secureKey: string = "",
secureCert: string = "",
wakuFlags = none(CapabilitiesBitfield),
@ -74,6 +76,12 @@ proc newTestWakuNode*(
): WakuNode =
logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT)
let bindIp =
if quicEnabled and $bindIp == "0.0.0.0":
parseIpAddress("127.0.0.1")
else:
bindIp
var resolvedExtIp = extIp
# Update extPort to default value if it's missing and there's an extIp or a DNS domain
@ -105,6 +113,8 @@ proc newTestWakuNode*(
wsBindPort = some(wsBindPort),
wsEnabled = wsEnabled,
wssEnabled = wssEnabled,
quicBindPort = some(quicBindPort),
quicEnabled = quicEnabled,
dns4DomainName = dns4DomainName,
discv5UdpPort = discv5UdpPort,
wakuFlags = wakuFlags,

View File

@ -2255,12 +2255,24 @@ suite "Waku Filter - End to End":
contentTopic = DefaultContentTopic
contentTopicSeq = @[contentTopic]
client =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(23450))
server =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(23451))
client2nd =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(23452))
client = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
Port(23450),
quicEnabled = false,
)
server = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
Port(23451),
quicEnabled = false,
)
client2nd = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
Port(23452),
quicEnabled = false,
)
await allFutures(server.start(), client.start(), client2nd.start())

View File

@ -50,6 +50,7 @@ suite "Waku Peer Exchange":
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort1),
quicEnabled = false,
)
nodeKey2 = generateSecp256k1Key()
@ -62,6 +63,7 @@ suite "Waku Peer Exchange":
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort2),
quicEnabled = false,
)
nodeKey3 = generateSecp256k1Key()
@ -74,6 +76,7 @@ suite "Waku Peer Exchange":
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort3),
quicEnabled = false,
)
# discv5
@ -304,10 +307,18 @@ suite "Waku Peer Exchange":
asyncTest "Request with invalid peer info":
# Given two valid nodes with PeerExchange
let
node1 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node2 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node1 = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
Port(0),
quicEnabled = false,
)
node2 = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
Port(0),
quicEnabled = false,
)
# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
@ -326,7 +337,12 @@ suite "Waku Peer Exchange":
asyncTest "Connections are closed after response is sent":
# Create 3 nodes
let nodes = toSeq(0 ..< 3).mapIt(
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
Port(0),
quicEnabled = false,
)
)
await allFutures(nodes.mapIt(it.start()))

View File

@ -400,7 +400,9 @@ suite "WakuNode - Relay":
asyncTest "Messages relaying fails with non-overlapping transports (TCP or Websockets)":
let
nodeKey1 = generateSecp256k1Key()
node1 = newTestWakuNode(nodeKey1, parseIpAddress("0.0.0.0"), bindPort = Port(0))
node1 = newTestWakuNode(
nodeKey1, parseIpAddress("0.0.0.0"), bindPort = Port(0), quicEnabled = false
)
nodeKey2 = generateSecp256k1Key()
node2 = newTestWakuNode(
nodeKey2,
@ -408,6 +410,7 @@ suite "WakuNode - Relay":
bindPort = Port(0),
wsBindPort = Port(0),
wsEnabled = true,
quicEnabled = false,
)
shard = DefaultRelayShard
contentTopic = ContentTopic("/waku/2/default-content/proto")
@ -629,10 +632,14 @@ suite "WakuNode - Relay":
for j in 0 ..< 50:
discard await nodes[0].wakuRelay.publish(topic, urandom(1 * (10 ^ 3)))
# long wait, must be higher than the configured decayInterval (how often score is updated)
await sleepAsync(20.seconds)
# wait for decayInterval to pass so gossipsub scores update and bad peer is disconnected
let deadline = Moment.now() + 30.seconds
while Moment.now() < deadline:
if nodes[0].peerManager.switch.connManager.getConnections().len == 0:
break
await sleepAsync(200.millis)
# all nodes lower the score of nodes[0] (will change if gossipsub params or amount of msg changes)
# all nodes lower the score of nodes[0] (will change if gossipsub params or amount of msg changes)
for i in 1 ..< 5:
check:
nodes[i].wakuRelay.peerStats[nodes[0].switch.peerInfo.peerId].score == -249999.9

View File

@ -55,8 +55,10 @@ suite "Waku v2 REST API - Debug":
check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.listenAddresses ==
@[$node.switch.peerInfo.addrs[^1] & "/p2p/" & $node.switch.peerInfo.peerId]
response.data.listenAddresses.len >= 1
response.data.listenAddresses.contains(
$node.switch.peerInfo.addrs[^1] & "/p2p/" & $node.switch.peerInfo.peerId
)
await restServer.stop()
await restServer.closeWait()

View File

@ -35,7 +35,8 @@ proc testWakuNode(): WakuNode =
extIp = parseIpAddress("127.0.0.1")
port = Port(0)
return newTestWakuNode(privkey, bindIp, port, some(extIp), some(port))
return
newTestWakuNode(privkey, bindIp, port, some(extIp), some(port), quicEnabled = false)
type RestFilterTest = object
serviceNode: WakuNode

View File

@ -680,6 +680,19 @@ with the drawback of consuming some more bandwidth.""",
name: "websocket-secure-cert-path"
.}: string
## QUIC transport config
quicSupport* {.
desc: "Enable QUIC transport: true|false",
defaultValue: false,
name: "quic-support"
.}: bool
quicPort* {.
desc: "QUIC transport listening port (UDP).",
defaultValue: 9090,
name: "quic-port"
.}: Port
## Rate limitation config, if not set, rate limit checks will not be performed
rateLimits* {.
desc:
@ -1111,6 +1124,9 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] =
b.webSocketConf.withKeyPath(n.websocketSecureKeyPath)
b.webSocketConf.withCertPath(n.websocketSecureCertPath)
b.quicConf.withEnabled(n.quicSupport)
b.quicConf.withQuicPort(n.quicPort)
if n.rateLimits.len > 0:
b.rateLimitConf.withRateLimits(n.rateLimits)

View File

@ -187,6 +187,7 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
privKey = builder.nodekey,
address = builder.netConfig.get().hostAddress,
wsAddress = builder.netConfig.get().wsHostAddress,
quicAddress = builder.netConfig.get().quicHostAddress,
transportFlags = {ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay},
rng = rng,
maxConnections = builder.switchMaxConnections.get(builders.MaxConnections),

View File

@ -7,6 +7,7 @@ import
./dns_discovery_conf_builder,
./discv5_conf_builder,
./web_socket_conf_builder,
./quic_conf_builder,
./metrics_server_conf_builder,
./rate_limit_conf_builder,
./rln_relay_conf_builder,
@ -16,6 +17,6 @@ import
export
waku_conf_builder, filter_service_conf_builder, store_sync_conf_builder,
store_service_conf_builder, rest_server_conf_builder, dns_discovery_conf_builder,
discv5_conf_builder, web_socket_conf_builder, metrics_server_conf_builder,
rate_limit_conf_builder, rln_relay_conf_builder, mix_conf_builder,
kademlia_discovery_conf_builder
discv5_conf_builder, web_socket_conf_builder, quic_conf_builder,
metrics_server_conf_builder, rate_limit_conf_builder, rln_relay_conf_builder,
mix_conf_builder, kademlia_discovery_conf_builder

View File

@ -0,0 +1,33 @@
import chronicles, std/[net, options], results
import waku/factory/waku_conf
logScope:
topics = "waku conf builder quic"
#########################
## QUIC Config Builder ##
#########################
type QuicConfBuilder* = object
enabled*: Option[bool]
quicPort*: Option[Port]
proc init*(T: type QuicConfBuilder): QuicConfBuilder =
QuicConfBuilder()
proc withEnabled*(b: var QuicConfBuilder, enabled: bool) =
b.enabled = some(enabled)
proc withQuicPort*(b: var QuicConfBuilder, quicPort: Port) =
b.quicPort = some(quicPort)
proc withQuicPort*(b: var QuicConfBuilder, quicPort: uint16) =
b.quicPort = some(Port(quicPort))
proc build*(b: QuicConfBuilder): Result[Option[QuicConf], string] =
if not b.enabled.get(false):
return ok(none(QuicConf))
if b.quicPort.isNone():
return err("quic.port is not specified")
return ok(some(QuicConf(port: b.quicPort.get())))

View File

@ -23,6 +23,7 @@ import
./dns_discovery_conf_builder,
./discv5_conf_builder,
./web_socket_conf_builder,
./quic_conf_builder,
./metrics_server_conf_builder,
./rate_limit_conf_builder,
./rln_relay_conf_builder,
@ -81,6 +82,7 @@ type WakuConfBuilder* = object
storeServiceConf*: StoreServiceConfBuilder
mixConf*: MixConfBuilder
webSocketConf*: WebSocketConfBuilder
quicConf*: QuicConfBuilder
rateLimitConf*: RateLimitConfBuilder
kademliaDiscoveryConf*: KademliaDiscoveryConfBuilder
# End conf builders
@ -142,6 +144,7 @@ proc init*(T: type WakuConfBuilder): WakuConfBuilder =
rlnRelayConf: RlnRelayConfBuilder.init(),
storeServiceConf: StoreServiceConfBuilder.init(),
webSocketConf: WebSocketConfBuilder.init(),
quicConf: QuicConfBuilder.init(),
rateLimitConf: RateLimitConfBuilder.init(),
kademliaDiscoveryConf: KademliaDiscoveryConfBuilder.init(),
)
@ -546,6 +549,9 @@ proc build*(
let webSocketConf = builder.webSocketConf.build().valueOr:
return err("WebSocket Conf building failed: " & $error)
let quicConf = builder.quicConf.build().valueOr:
return err("QUIC Conf building failed: " & $error)
let rateLimit = builder.rateLimitConf.build().valueOr:
return err("Rate limits Conf building failed: " & $error)
@ -707,6 +713,7 @@ proc build*(
),
portsShift: portsShift,
webSocketConf: webSocketConf,
quicConf: quicConf,
dnsAddrsNameServers: dnsAddrsNameServers,
peerPersistence: peerPersistence,
peerStoreCapacity: builder.peerStoreCapacity,

View File

@ -59,6 +59,7 @@ proc networkConfiguration*(
conf: EndpointConf,
discv5Conf: Option[Discv5Conf],
webSocketConf: Option[WebSocketConf],
quicConf: Option[QuicConf],
wakuFlags: CapabilitiesBitfield,
dnsAddrsNameServers: seq[IpAddress],
portsShift: uint16,
@ -109,6 +110,13 @@ proc networkConfiguration*(
else:
(false, none(Port), false)
let (quicEnabled, quicBindPort) =
if quicConf.isSome:
let qConf = quicConf.get()
(true, some(Port(qConf.port.uint16 + portsShift)))
else:
(false, none(Port))
# Wrap in none because NetConfig does not have a default constructor
# TODO: We could change bindIp in NetConfig to be something less restrictive
# than IpAddress, which doesn't allow default construction
@ -123,6 +131,8 @@ proc networkConfiguration*(
wsBindPort = wsBindPort,
wsEnabled = wsEnabled,
wssEnabled = wssEnabled,
quicBindPort = quicBindPort,
quicEnabled = quicEnabled,
dns4DomainName = conf.dns4DomainName,
discv5UdpPort = discv5UdpPort,
wakuFlags = some(wakuFlags),

View File

@ -463,8 +463,8 @@ proc setupNode*(
let netConfig = (
await networkConfiguration(
wakuConf.clusterId, wakuConf.endpointConf, wakuConf.discv5Conf,
wakuConf.webSocketConf, wakuConf.wakuFlags, wakuConf.dnsAddrsNameServers,
wakuConf.portsShift, clientId,
wakuConf.webSocketConf, wakuConf.quicConf, wakuConf.wakuFlags,
wakuConf.dnsAddrsNameServers, wakuConf.portsShift, clientId,
)
).valueOr:
error "failed to create internal config", error = error

View File

@ -263,7 +263,7 @@ proc getRunningNetConfig(waku: ptr Waku): Future[Result[NetConfig, string]] {.as
let netConf = (
await networkConfiguration(
conf.clusterId, conf.endpointConf, conf.discv5Conf, conf.webSocketConf,
conf.wakuFlags, conf.dnsAddrsNameServers, conf.portsShift, clientId,
conf.quicConf, conf.wakuFlags, conf.dnsAddrsNameServers, conf.portsShift, clientId,
)
).valueOr:
return err("Could not update NetConfig: " & error)

View File

@ -32,6 +32,9 @@ type WebSocketConf* = object
port*: Port
secureConf*: Option[WebSocketSecureConf]
type QuicConf* = object
port*: Port
# TODO: should be defined in validator_signed.nim and imported here
type ProtectedShard* {.requiresInit.} = object
shard*: uint16
@ -112,6 +115,7 @@ type WakuConf* {.requiresInit.} = ref object
restServerConf*: Option[RestServerConf]
metricsServerConf*: Option[MetricsServerConf]
webSocketConf*: Option[WebSocketConf]
quicConf*: Option[QuicConf]
mixConf*: Option[MixConf]
kademliaDiscoveryConf*: Option[KademliaDiscoveryConf]

View File

@ -9,6 +9,7 @@ type NetConfig* = object
hostAddress*: MultiAddress
clusterId*: uint16
wsHostAddress*: Option[MultiAddress]
quicHostAddress*: Option[MultiAddress]
hostExtAddress*: Option[MultiAddress]
wsExtAddress*: Option[MultiAddress]
wssEnabled*: bool
@ -74,6 +75,8 @@ proc init*(
wsBindPort: Option[Port] = some(DefaultWsBindPort),
wsEnabled: bool = false,
wssEnabled: bool = false,
quicBindPort = none(Port),
quicEnabled: bool = false,
dns4DomainName = none(string),
discv5UdpPort = none(Port),
clusterId: uint16 = 0,
@ -94,6 +97,17 @@ proc init*(
except CatchableError:
return err(getCurrentExceptionMsg())
var quicHostAddress = none(MultiAddress)
if quicEnabled and quicBindPort.isSome():
try:
quicHostAddress = some(
MultiAddress
.init("/ip4/" & $bindIp & "/udp/" & $quicBindPort.get() & "/quic-v1")
.tryGet()
)
except CatchableError:
return err(getCurrentExceptionMsg())
let enrIp =
if extIp.isSome():
extIp
@ -106,7 +120,7 @@ proc init*(
some(bindPort)
# Setup external addresses, if available
var hostExtAddress, wsExtAddress = none(MultiAddress)
var hostExtAddress, wsExtAddress, quicExtAddress = none(MultiAddress)
if dns4DomainName.isSome():
# Use dns4 for externally announced addresses
@ -123,6 +137,19 @@ proc init*(
)
except CatchableError:
return err(getCurrentExceptionMsg())
if quicHostAddress.isSome():
try:
quicExtAddress = some(
MultiAddress
.init(
"/dns4/" & dns4DomainName.get() & "/udp/" & $quicBindPort.get() &
"/quic-v1"
)
.tryGet()
)
except CatchableError:
return err(getCurrentExceptionMsg())
else:
# No public domain name, use ext IP if available
if extIp.isSome() and extPort.isSome():
@ -137,6 +164,18 @@ proc init*(
except CatchableError:
return err(getCurrentExceptionMsg())
if quicHostAddress.isSome():
try:
quicExtAddress = some(
MultiAddress
.init("/ip4/" & $extIp.get() & "/udp/" & $quicBindPort.get() & "/quic-v1")
.tryGet()
)
except CatchableError:
return err(
"Failed to create external QUIC multiaddress: " & getCurrentExceptionMsg()
)
var announcedAddresses = newSeq[MultiAddress]()
if not extMultiAddrsOnly:
@ -152,6 +191,11 @@ proc init*(
# Only publish wsHostAddress if a WS address is not set in extMultiAddrs
announcedAddresses.add(wsHostAddress.get())
if quicExtAddress.isSome():
announcedAddresses.add(quicExtAddress.get())
elif quicHostAddress.isSome():
announcedAddresses.add(formatListenAddress(quicHostAddress.get()))
# External multiaddrs that the operator may have configured
if extMultiAddrs.len > 0:
announcedAddresses.add(extMultiAddrs)
@ -161,7 +205,7 @@ proc init*(
# https://rfc.vac.dev/spec/31/#many-connection-types
enrMultiaddrs = announcedAddresses.filterIt(
it.hasProtocol("dns4") or it.hasProtocol("dns6") or it.hasProtocol("ws") or
it.hasProtocol("wss")
it.hasProtocol("wss") or it.hasProtocol("quic-v1")
)
ok(
@ -169,6 +213,7 @@ proc init*(
hostAddress: hostAddress,
clusterId: clusterId,
wsHostAddress: wsHostAddress,
quicHostAddress: quicHostAddress,
hostExtAddress: hostExtAddress,
wsExtAddress: wsExtAddress,
extIp: extIp,

View File

@ -7,7 +7,13 @@ import
chronos,
chronicles,
metrics,
libp2p/[multistream, muxers/muxer, nameresolving/nameresolver, peerstore],
libp2p/[
multistream,
muxers/muxer,
nameresolving/nameresolver,
peerstore,
transports/quictransport,
],
waku/[
waku_core,
waku_relay,

View File

@ -57,6 +57,7 @@ proc newWakuSwitch*(
privKey = none(crypto.PrivateKey),
address = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
wsAddress = none(MultiAddress),
quicAddress = none(MultiAddress),
secureManagers: openarray[SecureProtocol] = [SecureProtocol.Noise],
transportFlags: set[ServerFlags] = {},
rng: ref HmacDrbgContext,
@ -101,15 +102,26 @@ proc newWakuSwitch*(
b = b.withAgentVersion(agentString.get())
if privKey.isSome():
b = b.withPrivateKey(privKey.get())
if wsAddress.isSome():
b = b.withAddresses(@[wsAddress.get(), address])
# Collect all listen addresses
var addresses: seq[MultiAddress]
if wsAddress.isSome():
addresses.add(wsAddress.get())
addresses.add(address)
if quicAddress.isSome():
addresses.add(quicAddress.get())
b = b.withAddresses(addresses)
# Add WS transport if enabled
if wsAddress.isSome():
if wssEnabled:
b = b.withWssTransport(secureKeyPath, secureCertPath)
else:
b = b.withWsTransport()
else:
b = b.withAddress(address)
if quicAddress.isSome():
b = b.withQuicTransport()
if not rendezvous.isNil():
b = b.withRendezVous(rendezvous)

View File

@ -1,5 +1,6 @@
{.push raises: [].}
import std/strutils
import libp2p/[peerinfo, switch]
import ./peers
@ -8,14 +9,18 @@ proc constructMultiaddrStr*(wireaddr: MultiAddress, peerId: PeerId): string =
# Constructs a multiaddress with both wire address and p2p identity
return $wireaddr & "/p2p/" & $peerId
proc firstAddr(addrs: seq[MultiAddress]): MultiAddress =
for a in addrs:
if "/quic-v1" notin $a:
return a
return addrs[0]
proc constructMultiaddrStr*(peerInfo: PeerInfo): string =
# Constructs a multiaddress with both location (wire) address and p2p identity
if peerInfo.listenAddrs.len == 0:
return ""
return constructMultiaddrStr(peerInfo.listenAddrs[0], peerInfo.peerId)
return constructMultiaddrStr(firstAddr(peerInfo.listenAddrs), peerInfo.peerId)
proc constructMultiaddrStr*(remotePeerInfo: RemotePeerInfo): string =
# Constructs a multiaddress with both location (wire) address and p2p identity
if remotePeerInfo.addrs.len == 0:
return ""
return constructMultiaddrStr(remotePeerInfo.addrs[0], remotePeerInfo.peerId)
return constructMultiaddrStr(firstAddr(remotePeerInfo.addrs), remotePeerInfo.peerId)

View File

@ -330,7 +330,9 @@ converter toRemotePeerInfo*(peerInfo: PeerInfo): RemotePeerInfo =
## Useful for testing or internal connections
RemotePeerInfo(
peerId: peerInfo.peerId,
addrs: peerInfo.listenAddrs,
addrs:
peerInfo.listenAddrs.filterIt("/quic-v1" in $it) &
peerInfo.listenAddrs.filterIt("/quic-v1" notin $it),
enr: none(enr.Record),
protocols: peerInfo.protocols,
shards: @[],