From e61a190f66224c85b981027bb2d0e38073430922 Mon Sep 17 00:00:00 2001 From: Diego Date: Mon, 2 Sep 2024 19:12:34 +0200 Subject: [PATCH] fix compilation issues and tests --- libp2p/multiaddress.nim | 5 +- libp2p/transports/quictransport.nim | 238 +++++++++++++--------------- libp2p/wire.nim | 8 +- tests/testquic.nim | 19 ++- tests/testswitch.nim | 22 ++- 5 files changed, 140 insertions(+), 152 deletions(-) diff --git a/libp2p/multiaddress.nim b/libp2p/multiaddress.nim index 69915bd0d..594e20cae 100644 --- a/libp2p/multiaddress.nim +++ b/libp2p/multiaddress.nim @@ -408,7 +408,10 @@ const UDP_IP* = mapAnd(IP, mapEq("udp")) UDP* = mapOr(UDP_DNS, UDP_IP) UTP* = mapAnd(UDP, mapEq("utp")) - QUIC* = mapAnd(UDP, mapEq("quic")) + QUIC* = mapEq("quic") + QUIC_V1_IP* = mapAnd(UDP_IP, mapEq("quic-v1")) + QUIC_V1_DNS* = mapAnd(UDP_DNS, mapEq("quic-v1")) + QUIC_V1* = mapOr(QUIC_V1_DNS, QUIC_V1_IP) UNIX* = mapEq("unix") WS_DNS* = mapAnd(TCP_DNS, mapEq("ws")) WS_IP* = mapAnd(TCP_IP, mapEq("ws")) diff --git a/libp2p/transports/quictransport.nim b/libp2p/transports/quictransport.nim index 1147e4aff..78b739c1a 100644 --- a/libp2p/transports/quictransport.nim +++ b/libp2p/transports/quictransport.nim @@ -28,12 +28,13 @@ type QuicConnection = quic.Connection # Stream -type - QuicStream* = ref object of P2PConnection - stream: Stream - cached: seq[byte] +type QuicStream* = ref object of P2PConnection + stream: Stream + cached: seq[byte] -proc new(_: type QuicStream, stream: Stream, oaddr: Opt[MultiAddress], peerId: PeerId): QuicStream = +proc new( + _: type QuicStream, stream: Stream, oaddr: Opt[MultiAddress], peerId: PeerId +): QuicStream = let quicstream = QuicStream(stream: stream, observedAddr: oaddr, peerId: peerId) procCall P2PConnection(quicstream).initStream() quicstream @@ -43,53 +44,73 @@ template mapExceptions(body: untyped) = body except QuicError: raise newLPStreamEOFError() + except CatchableError: + raise newLPStreamEOFError() -method readOnce*(stream: QuicStream, - pbytes: pointer, - nbytes: int): Future[int] {.async.} = - if stream.cached.len == 0: - stream.cached = await mapExceptions(stream.stream.read()) - result = min(nbytes, stream.cached.len) - copyMem(pbytes, addr stream.cached[0], result) - stream.cached = stream.cached[result..^1] +method readOnce*( + stream: QuicStream, pbytes: pointer, nbytes: int +): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} = + try: + if stream.cached.len == 0: + stream.cached = await stream.stream.read() + result = min(nbytes, stream.cached.len) + copyMem(pbytes, addr stream.cached[0], result) + stream.cached = stream.cached[result ..^ 1] + except CatchableError as exc: + raise newLPStreamEOFError() {.push warning[LockLevel]: off.} -method write*(stream: QuicStream, bytes: seq[byte]) {.async.} = +method write*( + stream: QuicStream, bytes: seq[byte] +) {.async: (raises: [CancelledError, LPStreamError]).} = mapExceptions(await stream.stream.write(bytes)) + {.pop.} -method closeImpl*(stream: QuicStream) {.async.} = - await stream.stream.close() +method closeImpl*(stream: QuicStream) {.async: (raises: []).} = + try: + await stream.stream.close() + except CatchableError as exc: + discard await procCall P2PConnection(stream).closeImpl() # Session -type - QuicSession* = ref object of P2PConnection - connection: QuicConnection +type QuicSession* = ref object of P2PConnection + connection: QuicConnection -method close*(session: QuicSession) {.async.} = +method close*(session: QuicSession) {.async, base.} = await session.connection.close() await procCall P2PConnection(session).close() -proc getStream*(session: QuicSession, - direction = Direction.In): Future[QuicStream] {.async.} = +proc getStream*( + session: QuicSession, direction = Direction.In +): Future[QuicStream] {.async.} = var stream: Stream - case direction: - of Direction.In: - stream = await session.connection.incomingStream() - of Direction.Out: - stream = await session.connection.openStream() - await stream.write(@[]) # QUIC streams do not exist until data is sent + case direction + of Direction.In: + stream = await session.connection.incomingStream() + of Direction.Out: + stream = await session.connection.openStream() + await stream.write(@[]) # QUIC streams do not exist until data is sent return QuicStream.new(stream, session.observedAddr, session.peerId) -# Muxer -type - QuicMuxer = ref object of Muxer - quicSession: QuicSession - handleFut: Future[void] +method getWrapped*(self: QuicSession): P2PConnection = + nil -method newStream*(m: QuicMuxer, name: string = "", lazy: bool = false): Future[P2PConnection] {.async, gcsafe.} = - return await m.quicSession.getStream(Direction.Out) +# Muxer +type QuicMuxer = ref object of Muxer + quicSession: QuicSession + handleFut: Future[void] + +method newStream*( + m: QuicMuxer, name: string = "", lazy: bool = false +): Future[P2PConnection] {. + async: (raises: [CancelledError, LPStreamError, MuxerError]) +.} = + try: + return await m.quicSession.getStream(Direction.Out) + except CatchableError as exc: + raise newException(MuxerError, exc.msg, exc) proc handleStream(m: QuicMuxer, chann: QuicStream) {.async.} = ## call the muxer stream handler for this channel @@ -102,101 +123,35 @@ proc handleStream(m: QuicMuxer, chann: QuicStream) {.async.} = trace "Exception in mplex stream handler", msg = exc.msg await chann.close() - -method handle*(m: QuicMuxer): Future[void] {.async, gcsafe.} = - while not m.quicSession.atEof: - let incomingStream = await m.quicSession.getStream(Direction.In) - asyncSpawn m.handleStream(incomingStream) - -method close*(m: QuicMuxer) {.async, gcsafe.} = - await m.quicSession.close() - m.handleFut.cancel() - -# Upgrader -type - QuicUpgrade = ref object of Upgrade - -proc identify( - self: QuicUpgrade, - conn: QuicSession - ) {.async, gcsafe.} = - # new stream for identify - let muxer = QuicMuxer(quicSession: conn, connection: conn) - muxer.streamHandler = proc(conn: P2PConnection) {.async, gcsafe.} = - trace "Starting stream handler" - try: - await self.ms.handle(conn) # handle incoming connection - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "exception in stream handler", conn, msg = exc.msg - finally: - await conn.closeWithEOF() - trace "Stream handler done", conn - - self.connManager.storeConn(conn) - # store it in muxed connections if we have a peer for it - muxer.handleFut = muxer.handle() - self.connManager.storeMuxer(muxer, muxer.handleFut) - - var stream = await conn.getStream(Direction.Out) - if stream == nil: - return - +method handle*(m: QuicMuxer): Future[void] {.async: (raises: []).} = try: - await self.identify(stream) - finally: - await stream.closeWithEOF() - -method upgradeIncoming*( - self: QuicUpgrade, - conn: P2PConnection): Future[void] {.async.} = - let qs = QuicSession(conn) - #TODO home made shortcut to get the Peer's id - # in the future, Quic encryption should be used - # instead - let stream = await qs.getStream(Direction.Out) - await stream.writeLp(self.identity.peerInfo.peerId.getBytes()) - assert qs.peerId.init(await stream.readLp(1024)) - await stream.close() - - try: - await self.identify(qs) + while not m.quicSession.atEof: + let incomingStream = await m.quicSession.getStream(Direction.In) + asyncSpawn m.handleStream(incomingStream) except CatchableError as exc: - info "Failed to upgrade incoming connection", msg=exc.msg + trace "Exception in mplex handler", msg = exc.msg -method upgradeOutgoing*( - self: QuicUpgrade, - conn: P2PConnection): Future[P2PConnection] {.async.} = - let qs = QuicSession(conn) - #TODO home made shortcut to get the Peer's id - let stream = await qs.getStream(Direction.In) - await stream.writeLp(self.identity.peerInfo.peerId.getBytes()) - assert qs.peerId.init(await stream.readLp(1024)) - await stream.close() - - await self.identify(qs) - return conn +method close*(m: QuicMuxer) {.async: (raises: []).} = + try: + await m.quicSession.close() + m.handleFut.cancel() + except CatchableError as exc: + discard # Transport -type - QuicTransport* = ref object of Transport - listener: Listener - connections: seq[P2PConnection] +type QuicUpgrade = ref object of Upgrade + +type QuicTransport* = ref object of Transport + listener: Listener + connections: seq[P2PConnection] func new*(_: type QuicTransport, u: Upgrade): QuicTransport = - QuicTransport( - upgrader: QuicUpgrade( - ms: u.ms, - identity: u.identity, - connManager: u.connManager - ) - ) + QuicTransport(upgrader: QuicUpgrade(ms: u.ms)) method handles*(transport: QuicTransport, address: MultiAddress): bool = if not procCall Transport(transport).handles(address): return false - QUIC.match(address) + QUIC_V1.match(address) method start*(transport: QuicTransport, addrs: seq[MultiAddress]) {.async.} = doAssert transport.listener.isNil, "start() already called" @@ -204,10 +159,8 @@ method start*(transport: QuicTransport, addrs: seq[MultiAddress]) {.async.} = transport.listener = listen(initTAddress(addrs[0]).tryGet) await procCall Transport(transport).start(addrs) transport.addrs[0] = - MultiAddress.init( - transport.listener.localAddress(), - IPPROTO_UDP - ).tryGet() & MultiAddress.init("/quic").get() + MultiAddress.init(transport.listener.localAddress(), IPPROTO_UDP).tryGet() & + MultiAddress.init("/quic-v1").get() transport.running = true method stop*(transport: QuicTransport) {.async.} = @@ -220,12 +173,13 @@ method stop*(transport: QuicTransport) {.async.} = transport.listener = nil proc wrapConnection( - transport: QuicTransport, - connection: QuicConnection): P2PConnection {.raises: [Defect, TransportOsError, LPError].} = + transport: QuicTransport, connection: QuicConnection +): P2PConnection {.raises: [Defect, TransportOsError, LPError].} = let remoteAddr = connection.remoteAddress() observedAddr = - MultiAddress.init(remoteAddr, IPPROTO_UDP).get() & MultiAddress.init("/quic").get() + MultiAddress.init(remoteAddr, IPPROTO_UDP).get() & + MultiAddress.init("/quic-v1").get() conres = QuicSession(connection: connection, observedAddr: Opt.some(observedAddr)) conres.initStream() @@ -234,6 +188,7 @@ proc wrapConnection( await conres.join() transport.connections.keepItIf(it != conres) trace "Cleaned up client" + asyncSpawn onClose() return conres @@ -242,8 +197,33 @@ method accept*(transport: QuicTransport): Future[P2PConnection] {.async.} = let connection = await transport.listener.accept() return transport.wrapConnection(connection) -method dial*(transport: QuicTransport, - hostname: string, - address: MultiAddress): Future[P2PConnection] {.async.} = +method dial*( + transport: QuicTransport, + hostname: string, + address: MultiAddress, + peerId: Opt[PeerId] = Opt.none(PeerId), +): Future[P2PConnection] {.async, gcsafe.} = let connection = await dial(initTAddress(address).tryGet) return transport.wrapConnection(connection) + +method upgrade*( + self: QuicTransport, conn: P2PConnection, peerId: Opt[PeerId] +): Future[Muxer] {.async: (raises: [CancelledError, LPError]).} = + let qs = QuicSession(conn) + if peerId.isSome: + qs.peerId = peerId.get() + + let muxer = QuicMuxer(quicSession: qs, connection: conn) + muxer.streamHandler = proc(conn: P2PConnection) {.async: (raises: []).} = + trace "Starting stream handler" + try: + await self.upgrader.ms.handle(conn) # handle incoming connection + except CancelledError as exc: + return + except CatchableError as exc: + trace "exception in stream handler", conn, msg = exc.msg + finally: + await conn.closeWithEOF() + trace "Stream handler done", conn + muxer.handleFut = muxer.handle() + return muxer diff --git a/libp2p/wire.nim b/libp2p/wire.nim index 1c38580bd..b37b585fd 100644 --- a/libp2p/wire.nim +++ b/libp2p/wire.nim @@ -20,11 +20,7 @@ when defined(windows): import winlean else: import posix const RTRANSPMA* = mapOr(TCP, WebSockets, UNIX) - TRANSPMA* = mapOr( - RTRANSPMA, - QUIC, - UDP - ) + TRANSPMA* = mapOr(RTRANSPMA, QUIC, UDP) proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress] = ## Initialize ``TransportAddress`` with MultiAddress ``ma``. @@ -32,7 +28,7 @@ proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress] = ## MultiAddress must be wire address, e.g. ``{IP4, IP6, UNIX}/{TCP, UDP}``. ## - if mapOr(TCP_IP, WebSockets_IP, UNIX, UDP_IP).match(ma): + if mapOr(TCP_IP, WebSockets_IP, UNIX, UDP_IP, QUIC_V1_IP).match(ma): var pbuf: array[2, byte] let code = (?(?ma[0]).protoCode()) if code == multiCodec("unix"): diff --git a/tests/testquic.nim b/tests/testquic.nim index 26ef10fef..4d342184f 100644 --- a/tests/testquic.nim +++ b/tests/testquic.nim @@ -2,19 +2,22 @@ import sequtils import chronos, stew/byteutils -import ../libp2p/[stream/connection, - transports/transport, - transports/quictransport, - upgrademngrs/upgrade, - multiaddress, - errors, - wire] +import + ../libp2p/[ + stream/connection, + transports/transport, + transports/quictransport, + upgrademngrs/upgrade, + multiaddress, + errors, + wire, + ] import ./helpers, ./commontransport suite "Quic transport": asyncTest "can handle local address": - let ma = @[MultiAddress.init("/ip4/127.0.0.1/udp/0/quic").tryGet()] + let ma = @[MultiAddress.init("/ip4/127.0.0.1/udp/0/quic-v1").tryGet()] let transport1 = QuicTransport.new() await transport1.start(ma) check transport1.handles(transport1.addrs[0]) diff --git a/tests/testswitch.nim b/tests/testswitch.nim index 547e1366c..f58e1d7d6 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -991,22 +991,28 @@ suite "Switch": asyncTest "e2e quic transport": let - quicAddress1 = MultiAddress.init("/ip4/127.0.0.1/udp/0/quic").tryGet() - quicAddress2 = MultiAddress.init("/ip4/127.0.0.1/udp/0/quic").tryGet() + quicAddress1 = MultiAddress.init("/ip4/127.0.0.1/udp/0/quic-v1").tryGet() + quicAddress2 = MultiAddress.init("/ip4/127.0.0.1/udp/0/quic-v1").tryGet() - srcSwitch = - SwitchBuilder.new() + srcSwitch = SwitchBuilder + .new() .withAddress(quicAddress1) .withRng(crypto.newRng()) - .withTransport(proc (upgr: Upgrade): Transport = QuicTransport.new(upgr)) + .withTransport( + proc(upgr: Upgrade): Transport = + QuicTransport.new(upgr) + ) .withNoise() .build() - destSwitch = - SwitchBuilder.new() + destSwitch = SwitchBuilder + .new() .withAddress(quicAddress2) .withRng(crypto.newRng()) - .withTransport(proc (upgr: Upgrade): Transport = QuicTransport.new(upgr)) + .withTransport( + proc(upgr: Upgrade): Transport = + QuicTransport.new(upgr) + ) .withNoise() .build()