fix compilation issues and tests

This commit is contained in:
Diego 2024-09-02 19:12:34 +02:00
parent 54031430dd
commit e61a190f66
No known key found for this signature in database
GPG Key ID: C9DAC9BF68D1F806
5 changed files with 140 additions and 152 deletions

View File

@ -408,7 +408,10 @@ const
UDP_IP* = mapAnd(IP, mapEq("udp")) UDP_IP* = mapAnd(IP, mapEq("udp"))
UDP* = mapOr(UDP_DNS, UDP_IP) UDP* = mapOr(UDP_DNS, UDP_IP)
UTP* = mapAnd(UDP, mapEq("utp")) UTP* = mapAnd(UDP, mapEq("utp"))
QUIC* = mapAnd(UDP, mapEq("quic")) QUIC* = mapEq("quic")
QUIC_V1_IP* = mapAnd(UDP_IP, mapEq("quic-v1"))
QUIC_V1_DNS* = mapAnd(UDP_DNS, mapEq("quic-v1"))
QUIC_V1* = mapOr(QUIC_V1_DNS, QUIC_V1_IP)
UNIX* = mapEq("unix") UNIX* = mapEq("unix")
WS_DNS* = mapAnd(TCP_DNS, mapEq("ws")) WS_DNS* = mapAnd(TCP_DNS, mapEq("ws"))
WS_IP* = mapAnd(TCP_IP, mapEq("ws")) WS_IP* = mapAnd(TCP_IP, mapEq("ws"))

View File

@ -28,12 +28,13 @@ type
QuicConnection = quic.Connection QuicConnection = quic.Connection
# Stream # Stream
type type QuicStream* = ref object of P2PConnection
QuicStream* = ref object of P2PConnection
stream: Stream stream: Stream
cached: seq[byte] cached: seq[byte]
proc new(_: type QuicStream, stream: Stream, oaddr: Opt[MultiAddress], peerId: PeerId): QuicStream = proc new(
_: type QuicStream, stream: Stream, oaddr: Opt[MultiAddress], peerId: PeerId
): QuicStream =
let quicstream = QuicStream(stream: stream, observedAddr: oaddr, peerId: peerId) let quicstream = QuicStream(stream: stream, observedAddr: oaddr, peerId: peerId)
procCall P2PConnection(quicstream).initStream() procCall P2PConnection(quicstream).initStream()
quicstream quicstream
@ -43,38 +44,49 @@ template mapExceptions(body: untyped) =
body body
except QuicError: except QuicError:
raise newLPStreamEOFError() raise newLPStreamEOFError()
except CatchableError:
raise newLPStreamEOFError()
method readOnce*(stream: QuicStream, method readOnce*(
pbytes: pointer, stream: QuicStream, pbytes: pointer, nbytes: int
nbytes: int): Future[int] {.async.} = ): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} =
try:
if stream.cached.len == 0: if stream.cached.len == 0:
stream.cached = await mapExceptions(stream.stream.read()) stream.cached = await stream.stream.read()
result = min(nbytes, stream.cached.len) result = min(nbytes, stream.cached.len)
copyMem(pbytes, addr stream.cached[0], result) copyMem(pbytes, addr stream.cached[0], result)
stream.cached = stream.cached[result..^1] stream.cached = stream.cached[result ..^ 1]
except CatchableError as exc:
raise newLPStreamEOFError()
{.push warning[LockLevel]: off.} {.push warning[LockLevel]: off.}
method write*(stream: QuicStream, bytes: seq[byte]) {.async.} = method write*(
stream: QuicStream, bytes: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
mapExceptions(await stream.stream.write(bytes)) mapExceptions(await stream.stream.write(bytes))
{.pop.} {.pop.}
method closeImpl*(stream: QuicStream) {.async.} = method closeImpl*(stream: QuicStream) {.async: (raises: []).} =
try:
await stream.stream.close() await stream.stream.close()
except CatchableError as exc:
discard
await procCall P2PConnection(stream).closeImpl() await procCall P2PConnection(stream).closeImpl()
# Session # Session
type type QuicSession* = ref object of P2PConnection
QuicSession* = ref object of P2PConnection
connection: QuicConnection connection: QuicConnection
method close*(session: QuicSession) {.async.} = method close*(session: QuicSession) {.async, base.} =
await session.connection.close() await session.connection.close()
await procCall P2PConnection(session).close() await procCall P2PConnection(session).close()
proc getStream*(session: QuicSession, proc getStream*(
direction = Direction.In): Future[QuicStream] {.async.} = session: QuicSession, direction = Direction.In
): Future[QuicStream] {.async.} =
var stream: Stream var stream: Stream
case direction: case direction
of Direction.In: of Direction.In:
stream = await session.connection.incomingStream() stream = await session.connection.incomingStream()
of Direction.Out: of Direction.Out:
@ -82,14 +94,23 @@ proc getStream*(session: QuicSession,
await stream.write(@[]) # QUIC streams do not exist until data is sent await stream.write(@[]) # QUIC streams do not exist until data is sent
return QuicStream.new(stream, session.observedAddr, session.peerId) return QuicStream.new(stream, session.observedAddr, session.peerId)
method getWrapped*(self: QuicSession): P2PConnection =
nil
# Muxer # Muxer
type type QuicMuxer = ref object of Muxer
QuicMuxer = ref object of Muxer
quicSession: QuicSession quicSession: QuicSession
handleFut: Future[void] handleFut: Future[void]
method newStream*(m: QuicMuxer, name: string = "", lazy: bool = false): Future[P2PConnection] {.async, gcsafe.} = method newStream*(
m: QuicMuxer, name: string = "", lazy: bool = false
): Future[P2PConnection] {.
async: (raises: [CancelledError, LPStreamError, MuxerError])
.} =
try:
return await m.quicSession.getStream(Direction.Out) return await m.quicSession.getStream(Direction.Out)
except CatchableError as exc:
raise newException(MuxerError, exc.msg, exc)
proc handleStream(m: QuicMuxer, chann: QuicStream) {.async.} = proc handleStream(m: QuicMuxer, chann: QuicStream) {.async.} =
## call the muxer stream handler for this channel ## call the muxer stream handler for this channel
@ -102,101 +123,35 @@ proc handleStream(m: QuicMuxer, chann: QuicStream) {.async.} =
trace "Exception in mplex stream handler", msg = exc.msg trace "Exception in mplex stream handler", msg = exc.msg
await chann.close() await chann.close()
method handle*(m: QuicMuxer): Future[void] {.async: (raises: []).} =
method handle*(m: QuicMuxer): Future[void] {.async, gcsafe.} = try:
while not m.quicSession.atEof: while not m.quicSession.atEof:
let incomingStream = await m.quicSession.getStream(Direction.In) let incomingStream = await m.quicSession.getStream(Direction.In)
asyncSpawn m.handleStream(incomingStream) asyncSpawn m.handleStream(incomingStream)
except CatchableError as exc:
trace "Exception in mplex handler", msg = exc.msg
method close*(m: QuicMuxer) {.async, gcsafe.} = method close*(m: QuicMuxer) {.async: (raises: []).} =
try:
await m.quicSession.close() await m.quicSession.close()
m.handleFut.cancel() m.handleFut.cancel()
# Upgrader
type
QuicUpgrade = ref object of Upgrade
proc identify(
self: QuicUpgrade,
conn: QuicSession
) {.async, gcsafe.} =
# new stream for identify
let muxer = QuicMuxer(quicSession: conn, connection: conn)
muxer.streamHandler = proc(conn: P2PConnection) {.async, gcsafe.} =
trace "Starting stream handler"
try:
await self.ms.handle(conn) # handle incoming connection
except CancelledError as exc:
raise exc
except CatchableError as exc: except CatchableError as exc:
trace "exception in stream handler", conn, msg = exc.msg discard
finally:
await conn.closeWithEOF()
trace "Stream handler done", conn
self.connManager.storeConn(conn)
# store it in muxed connections if we have a peer for it
muxer.handleFut = muxer.handle()
self.connManager.storeMuxer(muxer, muxer.handleFut)
var stream = await conn.getStream(Direction.Out)
if stream == nil:
return
try:
await self.identify(stream)
finally:
await stream.closeWithEOF()
method upgradeIncoming*(
self: QuicUpgrade,
conn: P2PConnection): Future[void] {.async.} =
let qs = QuicSession(conn)
#TODO home made shortcut to get the Peer's id
# in the future, Quic encryption should be used
# instead
let stream = await qs.getStream(Direction.Out)
await stream.writeLp(self.identity.peerInfo.peerId.getBytes())
assert qs.peerId.init(await stream.readLp(1024))
await stream.close()
try:
await self.identify(qs)
except CatchableError as exc:
info "Failed to upgrade incoming connection", msg=exc.msg
method upgradeOutgoing*(
self: QuicUpgrade,
conn: P2PConnection): Future[P2PConnection] {.async.} =
let qs = QuicSession(conn)
#TODO home made shortcut to get the Peer's id
let stream = await qs.getStream(Direction.In)
await stream.writeLp(self.identity.peerInfo.peerId.getBytes())
assert qs.peerId.init(await stream.readLp(1024))
await stream.close()
await self.identify(qs)
return conn
# Transport # Transport
type type QuicUpgrade = ref object of Upgrade
QuicTransport* = ref object of Transport
type QuicTransport* = ref object of Transport
listener: Listener listener: Listener
connections: seq[P2PConnection] connections: seq[P2PConnection]
func new*(_: type QuicTransport, u: Upgrade): QuicTransport = func new*(_: type QuicTransport, u: Upgrade): QuicTransport =
QuicTransport( QuicTransport(upgrader: QuicUpgrade(ms: u.ms))
upgrader: QuicUpgrade(
ms: u.ms,
identity: u.identity,
connManager: u.connManager
)
)
method handles*(transport: QuicTransport, address: MultiAddress): bool = method handles*(transport: QuicTransport, address: MultiAddress): bool =
if not procCall Transport(transport).handles(address): if not procCall Transport(transport).handles(address):
return false return false
QUIC.match(address) QUIC_V1.match(address)
method start*(transport: QuicTransport, addrs: seq[MultiAddress]) {.async.} = method start*(transport: QuicTransport, addrs: seq[MultiAddress]) {.async.} =
doAssert transport.listener.isNil, "start() already called" doAssert transport.listener.isNil, "start() already called"
@ -204,10 +159,8 @@ method start*(transport: QuicTransport, addrs: seq[MultiAddress]) {.async.} =
transport.listener = listen(initTAddress(addrs[0]).tryGet) transport.listener = listen(initTAddress(addrs[0]).tryGet)
await procCall Transport(transport).start(addrs) await procCall Transport(transport).start(addrs)
transport.addrs[0] = transport.addrs[0] =
MultiAddress.init( MultiAddress.init(transport.listener.localAddress(), IPPROTO_UDP).tryGet() &
transport.listener.localAddress(), MultiAddress.init("/quic-v1").get()
IPPROTO_UDP
).tryGet() & MultiAddress.init("/quic").get()
transport.running = true transport.running = true
method stop*(transport: QuicTransport) {.async.} = method stop*(transport: QuicTransport) {.async.} =
@ -220,12 +173,13 @@ method stop*(transport: QuicTransport) {.async.} =
transport.listener = nil transport.listener = nil
proc wrapConnection( proc wrapConnection(
transport: QuicTransport, transport: QuicTransport, connection: QuicConnection
connection: QuicConnection): P2PConnection {.raises: [Defect, TransportOsError, LPError].} = ): P2PConnection {.raises: [Defect, TransportOsError, LPError].} =
let let
remoteAddr = connection.remoteAddress() remoteAddr = connection.remoteAddress()
observedAddr = observedAddr =
MultiAddress.init(remoteAddr, IPPROTO_UDP).get() & MultiAddress.init("/quic").get() MultiAddress.init(remoteAddr, IPPROTO_UDP).get() &
MultiAddress.init("/quic-v1").get()
conres = QuicSession(connection: connection, observedAddr: Opt.some(observedAddr)) conres = QuicSession(connection: connection, observedAddr: Opt.some(observedAddr))
conres.initStream() conres.initStream()
@ -234,6 +188,7 @@ proc wrapConnection(
await conres.join() await conres.join()
transport.connections.keepItIf(it != conres) transport.connections.keepItIf(it != conres)
trace "Cleaned up client" trace "Cleaned up client"
asyncSpawn onClose() asyncSpawn onClose()
return conres return conres
@ -242,8 +197,33 @@ method accept*(transport: QuicTransport): Future[P2PConnection] {.async.} =
let connection = await transport.listener.accept() let connection = await transport.listener.accept()
return transport.wrapConnection(connection) return transport.wrapConnection(connection)
method dial*(transport: QuicTransport, method dial*(
transport: QuicTransport,
hostname: string, hostname: string,
address: MultiAddress): Future[P2PConnection] {.async.} = address: MultiAddress,
peerId: Opt[PeerId] = Opt.none(PeerId),
): Future[P2PConnection] {.async, gcsafe.} =
let connection = await dial(initTAddress(address).tryGet) let connection = await dial(initTAddress(address).tryGet)
return transport.wrapConnection(connection) return transport.wrapConnection(connection)
method upgrade*(
self: QuicTransport, conn: P2PConnection, peerId: Opt[PeerId]
): Future[Muxer] {.async: (raises: [CancelledError, LPError]).} =
let qs = QuicSession(conn)
if peerId.isSome:
qs.peerId = peerId.get()
let muxer = QuicMuxer(quicSession: qs, connection: conn)
muxer.streamHandler = proc(conn: P2PConnection) {.async: (raises: []).} =
trace "Starting stream handler"
try:
await self.upgrader.ms.handle(conn) # handle incoming connection
except CancelledError as exc:
return
except CatchableError as exc:
trace "exception in stream handler", conn, msg = exc.msg
finally:
await conn.closeWithEOF()
trace "Stream handler done", conn
muxer.handleFut = muxer.handle()
return muxer

View File

@ -20,11 +20,7 @@ when defined(windows): import winlean else: import posix
const const
RTRANSPMA* = mapOr(TCP, WebSockets, UNIX) RTRANSPMA* = mapOr(TCP, WebSockets, UNIX)
TRANSPMA* = mapOr( TRANSPMA* = mapOr(RTRANSPMA, QUIC, UDP)
RTRANSPMA,
QUIC,
UDP
)
proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress] = proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress] =
## Initialize ``TransportAddress`` with MultiAddress ``ma``. ## Initialize ``TransportAddress`` with MultiAddress ``ma``.
@ -32,7 +28,7 @@ proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress] =
## MultiAddress must be wire address, e.g. ``{IP4, IP6, UNIX}/{TCP, UDP}``. ## MultiAddress must be wire address, e.g. ``{IP4, IP6, UNIX}/{TCP, UDP}``.
## ##
if mapOr(TCP_IP, WebSockets_IP, UNIX, UDP_IP).match(ma): if mapOr(TCP_IP, WebSockets_IP, UNIX, UDP_IP, QUIC_V1_IP).match(ma):
var pbuf: array[2, byte] var pbuf: array[2, byte]
let code = (?(?ma[0]).protoCode()) let code = (?(?ma[0]).protoCode())
if code == multiCodec("unix"): if code == multiCodec("unix"):

View File

@ -2,19 +2,22 @@
import sequtils import sequtils
import chronos, stew/byteutils import chronos, stew/byteutils
import ../libp2p/[stream/connection, import
../libp2p/[
stream/connection,
transports/transport, transports/transport,
transports/quictransport, transports/quictransport,
upgrademngrs/upgrade, upgrademngrs/upgrade,
multiaddress, multiaddress,
errors, errors,
wire] wire,
]
import ./helpers, ./commontransport import ./helpers, ./commontransport
suite "Quic transport": suite "Quic transport":
asyncTest "can handle local address": asyncTest "can handle local address":
let ma = @[MultiAddress.init("/ip4/127.0.0.1/udp/0/quic").tryGet()] let ma = @[MultiAddress.init("/ip4/127.0.0.1/udp/0/quic-v1").tryGet()]
let transport1 = QuicTransport.new() let transport1 = QuicTransport.new()
await transport1.start(ma) await transport1.start(ma)
check transport1.handles(transport1.addrs[0]) check transport1.handles(transport1.addrs[0])

View File

@ -991,22 +991,28 @@ suite "Switch":
asyncTest "e2e quic transport": asyncTest "e2e quic transport":
let let
quicAddress1 = MultiAddress.init("/ip4/127.0.0.1/udp/0/quic").tryGet() quicAddress1 = MultiAddress.init("/ip4/127.0.0.1/udp/0/quic-v1").tryGet()
quicAddress2 = MultiAddress.init("/ip4/127.0.0.1/udp/0/quic").tryGet() quicAddress2 = MultiAddress.init("/ip4/127.0.0.1/udp/0/quic-v1").tryGet()
srcSwitch = srcSwitch = SwitchBuilder
SwitchBuilder.new() .new()
.withAddress(quicAddress1) .withAddress(quicAddress1)
.withRng(crypto.newRng()) .withRng(crypto.newRng())
.withTransport(proc (upgr: Upgrade): Transport = QuicTransport.new(upgr)) .withTransport(
proc(upgr: Upgrade): Transport =
QuicTransport.new(upgr)
)
.withNoise() .withNoise()
.build() .build()
destSwitch = destSwitch = SwitchBuilder
SwitchBuilder.new() .new()
.withAddress(quicAddress2) .withAddress(quicAddress2)
.withRng(crypto.newRng()) .withRng(crypto.newRng())
.withTransport(proc (upgr: Upgrade): Transport = QuicTransport.new(upgr)) .withTransport(
proc(upgr: Upgrade): Transport =
QuicTransport.new(upgr)
)
.withNoise() .withNoise()
.build() .build()