From c7eef2e0ae6c9497208bd79052f2539807b6cc9f Mon Sep 17 00:00:00 2001 From: Tanguy Date: Tue, 31 May 2022 16:25:57 +0200 Subject: [PATCH] Initial implem Co-authored-by: markspanbroek --- .pinned | 10 +- libp2p.nimble | 1 + libp2p/transports/quictransport.nim | 242 ++++++++++++++++++++++++++++ libp2p/wire.nim | 1 + tests/testquic.nim | 21 +++ tests/testswitch.nim | 34 +++- 6 files changed, 305 insertions(+), 4 deletions(-) create mode 100644 libp2p/transports/quictransport.nim create mode 100644 tests/testquic.nim diff --git a/.pinned b/.pinned index 46f7cb1a4..287c93dba 100644 --- a/.pinned +++ b/.pinned @@ -1,17 +1,21 @@ asynctest;https://github.com/markspanbroek/asynctest@#5347c59b4b057443a014722aa40800cd8bb95c69 bearssl;https://github.com/status-im/nim-bearssl@#0ebb1d7a4af5f4b4d4756a9b6dbfe5d411fa55d9 chronicles;https://github.com/status-im/nim-chronicles@#2a2681b60289aaf7895b7056f22616081eb1a882 -chronos;https://github.com/status-im/nim-chronos@#875d7d8e6ef0803ae1c331dbf76b1981b0caeb15 +chronos;https://github.com/status-im/nim-chronos@#b3548583fcc768d93654685e7ea55126c1752c29 dnsclient;https://github.com/ba0f3/dnsclient.nim@#fbb76f8af8a33ab818184a7d4406d9fee20993be faststreams;https://github.com/status-im/nim-faststreams@#49e2c52eb5dda46b1c9c10d079abe7bffe6cea89 httputils;https://github.com/status-im/nim-http-utils@#f83fbce4d6ec7927b75be3f85e4fa905fcb69788 json_serialization;https://github.com/status-im/nim-json-serialization@#3509706517f3562cbcbe9d94988eccdd80474ab8 metrics;https://github.com/status-im/nim-metrics@#11edec862f96e42374bc2d584c84cc88d5d1f95f +ngtcp2;https://github.com/status-im/nim-ngtcp2@#fe5e54ee6ccd98ba5a5f162db886371d97d7d5a4 nimcrypto;https://github.com/cheatfate/nimcrypto@#a5742a9a214ac33f91615f3862c7b099aec43b00 +questionable;https://github.com/markspanbroek/questionable@#d7e9f0bf7fec14df13a26e699437a2fe577b26ba +quic;https://github.com/status-im/nim-quic.git@#626d18dec86a7fc12f2ccc61ae8a4ce7eca9f3ee secp256k1;https://github.com/status-im/nim-secp256k1@#e092373a5cbe1fa25abfc62e0f2a5f138dc3fb13 serialization;https://github.com/status-im/nim-serialization@#9631fbd1c81c8b25ff8740df440ca7ba87fa6131 -stew;https://github.com/status-im/nim-stew@#cdb1f213d073fd2ecbdaf35a866417657da9294c +stew;https://github.com/status-im/nim-stew@#412a691f5d29c93bee8f083d213ee8f2c6578bed testutils;https://github.com/status-im/nim-testutils@#aa6e5216f4b4ab5aa971cdcdd70e1ec1203cedf2 unittest2;https://github.com/status-im/nim-unittest2@#4e2893eacb916c7678fdc4935ff7420f13bf3a9c -websock;https://github.com/status-im/nim-websock@#8927db93f6ca96abaacfea39f8ca50ce9d41bcdb +upraises;https://github.com/markspanbroek/upraises@#d9f268db1021959fe0f2c7a5e49fba741f9932a0 +websock;https://github.com/status-im/nim-websock@#47b486b52f850d3534b8a1e778fcf9cf40ffe7f6 zlib;https://github.com/status-im/nim-zlib@#74cdeb54b21bededb5a515d36f608bc1850555a2 \ No newline at end of file diff --git a/libp2p.nimble b/libp2p.nimble index 513d83491..1e7216c24 100644 --- a/libp2p.nimble +++ b/libp2p.nimble @@ -13,6 +13,7 @@ requires "nim >= 1.2.0", "bearssl >= 0.1.4", "chronicles >= 0.10.2", "chronos >= 3.0.6", + "https://github.com/status-im/nim-quic.git", "metrics", "secp256k1", "stew#head", diff --git a/libp2p/transports/quictransport.nim b/libp2p/transports/quictransport.nim new file mode 100644 index 000000000..0dceac133 --- /dev/null +++ b/libp2p/transports/quictransport.nim @@ -0,0 +1,242 @@ +import std/sequtils +import pkg/chronos +import pkg/chronicles +import pkg/quic +import ../multiaddress +import ../multicodec +import ../stream/connection +import ../wire +import ../muxers/muxer +import ../upgrademngrs/upgrade +import ./transport + +export multiaddress +export multicodec +export connection +export transport + +logScope: + topics = "libp2p quictransport" + +type + P2PConnection = connection.Connection + QuicConnection = quic.Connection + +# Stream +type + QuicStream* = ref object of P2PConnection + stream: Stream + cached: seq[byte] + +proc new(_: type QuicStream, stream: Stream, oaddr: MultiAddress, peerId: PeerId): QuicStream = + let quicstream = QuicStream(stream: stream, observedAddr: oaddr, peerId: peerId) + procCall P2PConnection(quicstream).initStream() + quicstream + +template mapExceptions(body: untyped) = + try: + body + except QuicError: + raise newLPStreamEOFError() + +method readOnce*(stream: QuicStream, + pbytes: pointer, + nbytes: int): Future[int] {.async.} = + if stream.cached.len == 0: + stream.cached = await mapExceptions(stream.stream.read()) + if stream.cached.len <= nbytes: + copyMem(pbytes, addr stream.cached[0], stream.cached.len) + result = stream.cached.len + stream.cached = @[] + else: + copyMem(pbytes, addr stream.cached[0], nbytes) + result = nbytes + stream.cached = stream.cached[nbytes..^1] + +{.push warning[LockLevel]: off.} +method write*(stream: QuicStream, bytes: seq[byte]) {.async.} = + mapExceptions(await stream.stream.write(bytes)) +{.pop.} + +method closeImpl*(stream: QuicStream) {.async.} = + await stream.stream.close() + await procCall P2PConnection(stream).closeImpl() + +# Session +type + QuicSession* = ref object of P2PConnection + connection: QuicConnection + +method close*(session: QuicSession) {.async.} = + await session.connection.close() + await procCall P2PConnection(session).close() + +proc getStream*(session: QuicSession, + direction = Direction.In): Future[QuicStream] {.async.} = + var stream: Stream + case direction: + of Direction.In: + stream = await session.connection.incomingStream() + of Direction.Out: + stream = await session.connection.openStream() + await stream.write(@[]) # QUIC streams do not exist until data is sent + return QuicStream.new(stream, session.observedAddr, session.peerId) + +# Muxer +type + QuicMuxer = ref object of Muxer + quicSession: QuicSession + handleFut: Future[void] + +method newStream*(m: QuicMuxer, name: string = "", lazy: bool = false): Future[P2PConnection] {.async, gcsafe.} = + return await m.quicSession.getStream(Direction.Out) + +proc handleStream(m: QuicMuxer, chann: QuicStream) {.async.} = + ## call the muxer stream handler for this channel + ## + try: + await m.streamHandler(chann) + trace "finished handling stream" + doAssert(chann.closed, "connection not closed by handler!") + except CatchableError as exc: + trace "Exception in mplex stream handler", msg = exc.msg + await chann.close() + + +method handle*(m: QuicMuxer): Future[void] {.async, gcsafe.} = + while not m.quicSession.atEof: + let incomingStream = await m.quicSession.getStream(Direction.In) + asyncSpawn m.handleStream(incomingStream) + +method close*(m: QuicMuxer) {.async, gcsafe.} = + await m.quicSession.close() + m.handleFut.cancel() + +# Upgrader +type + QuicUpgrade = ref object of Upgrade + +proc identify( + self: QuicUpgrade, + conn: QuicSession + ) {.async, gcsafe.} = + # new stream for identify + let muxer = QuicMuxer(quicSession: conn, connection: conn) + muxer.streamHandler = proc(conn: P2PConnection) {.async, gcsafe, raises: [Defect].} = + trace "Starting stream handler" + try: + await self.ms.handle(conn) # handle incoming connection + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "exception in stream handler", conn, msg = exc.msg + finally: + await conn.closeWithEOF() + trace "Stream handler done", conn + + self.connManager.storeConn(conn) + # store it in muxed connections if we have a peer for it + muxer.handleFut = muxer.handle() + self.connManager.storeMuxer(muxer, muxer.handleFut) + + var stream = await conn.getStream(Direction.Out) + if stream == nil: + return + + try: + await self.identify(stream) + finally: + await stream.closeWithEOF() + +method upgradeIncoming*( + self: QuicUpgrade, + conn: P2PConnection): Future[void] {.async.} = + let qs = QuicSession(conn) + #TODO home made shortcut to get the Peer's id + # in the future, Quic encryption should be used + # instead + let stream = await qs.getStream(Direction.Out) + await stream.writeLp(self.identity.peerInfo.peerId.getBytes()) + assert qs.peerId.init(await stream.readLp(1024)) + await stream.close() + + try: + await self.identify(qs) + except CatchableError as exc: + info "Failed to upgrade incoming connection", msg=exc.msg + +method upgradeOutgoing*( + self: QuicUpgrade, + conn: P2PConnection): Future[P2PConnection] {.async.} = + let qs = QuicSession(conn) + #TODO home made shortcut to get the Peer's id + let stream = await qs.getStream(Direction.In) + await stream.writeLp(self.identity.peerInfo.peerId.getBytes()) + assert qs.peerId.init(await stream.readLp(1024)) + await stream.close() + + await self.identify(qs) + return conn + +# Transport +type + QuicTransport* = ref object of Transport + listener: Listener + connections: seq[P2PConnection] + +func new*(_: type QuicTransport, u: Upgrade): QuicTransport = + QuicTransport( + upgrader: QuicUpgrade( + ms: u.ms, + identity: u.identity, + connManager: u.connManager + ) + ) + +method handles*(transport: QuicTransport, address: MultiAddress): bool = + if not procCall Transport(transport).handles(address): + return false + QUIC.match(address) + +method start*(transport: QuicTransport, addrs: seq[MultiAddress]) {.async.} = + doAssert transport.listener.isNil, "start() already called" + #TODO handle multiple addr + #TODO resolve used address + transport.listener = listen(initTAddress(addrs[0]).tryGet) + await procCall Transport(transport).start(addrs) + transport.running = true + +method stop*(transport: QuicTransport) {.async.} = + if transport.running: + for c in transport.connections: + await c.close() + await procCall Transport(transport).stop() + await transport.listener.stop() + transport.running = false + transport.listener = nil + +proc wrapConnection(transport: QuicTransport, connection: QuicConnection): P2PConnection = + #TODO currently not exposed from nim-quic + let + observedAddr = MultiAddress.init("/ip4/0.0.0.0/udp/0/quic").tryGet() + conres = QuicSession(connection: connection, observedAddr: observedAddr) + conres.initStream() + + transport.connections.add(conres) + proc onClose() {.async.} = + await conres.join() + transport.connections.keepItIf(it != conres) + trace "Cleaned up client" + asyncSpawn onClose() + return conres + +method accept*(transport: QuicTransport): Future[P2PConnection] {.async.} = + doAssert not transport.listener.isNil, "call start() before calling accept()" + let connection = await transport.listener.accept() + return transport.wrapConnection(connection) + +method dial*(transport: QuicTransport, + hostname: string, + address: MultiAddress): Future[P2PConnection] {.async.} = + let connection = await dial(initTAddress(address).tryGet) + return transport.wrapConnection(connection) diff --git a/libp2p/wire.nim b/libp2p/wire.nim index 761a6fdb5..7f1406b55 100644 --- a/libp2p/wire.nim +++ b/libp2p/wire.nim @@ -27,6 +27,7 @@ const TRANSPMA* = mapOr( RTRANSPMA, + QUIC, UDP ) diff --git a/tests/testquic.nim b/tests/testquic.nim new file mode 100644 index 000000000..36eb382ed --- /dev/null +++ b/tests/testquic.nim @@ -0,0 +1,21 @@ +{.used.} + +import sequtils +import chronos, stew/byteutils +import ../libp2p/[stream/connection, + transports/transport, + transports/quictransport, + upgrademngrs/upgrade, + multiaddress, + errors, + wire] + +import ./helpers, ./commontransport + +suite "Quic transport": + asyncTest "can handle local address": + let ma = @[MultiAddress.init("/ip4/127.0.0.1/udp/45894/quic").tryGet()] + let transport1 = QuicTransport.new() + await transport1.start(ma) + check transport1.handles(transport1.addrs[0]) + await transport1.stop() diff --git a/tests/testswitch.nim b/tests/testswitch.nim index 23cb6ed0b..08ac5bf1c 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -22,7 +22,8 @@ import ../libp2p/[errors, nameresolving/mockresolver, stream/chronosstream, transports/tcptransport, - transports/wstransport] + transports/wstransport, + transports/quictransport] import ./helpers const @@ -979,3 +980,34 @@ suite "Switch": await destSwitch.stop() await srcWsSwitch.stop() await srcTcpSwitch.stop() + + asyncTest "e2e quic transport": + let + #TODO port 0 doesn't work yet + quicAddress1 = MultiAddress.init("/ip4/127.0.0.1/udp/4567/quic").tryGet() + quicAddress2 = MultiAddress.init("/ip4/127.0.0.1/udp/4566/quic").tryGet() + + srcSwitch = + SwitchBuilder.new() + .withAddress(quicAddress1) + .withRng(crypto.newRng()) + .withTransport(proc (upgr: Upgrade): Transport = QuicTransport.new(upgr)) + .withNoise() + .build() + + destSwitch = + SwitchBuilder.new() + .withAddress(quicAddress2) + .withRng(crypto.newRng()) + .withTransport(proc (upgr: Upgrade): Transport = QuicTransport.new(upgr)) + .withNoise() + .build() + + await destSwitch.start() + await srcSwitch.start() + + await srcSwitch.connect(destSwitch.peerInfo.peerId, destSwitch.peerInfo.addrs) + check srcSwitch.isConnected(destSwitch.peerInfo.peerId) + + await destSwitch.stop() + await srcSwitch.stop()