diff --git a/.pinned b/.pinned index 8a4794ec8..d996ac9cd 100644 --- a/.pinned +++ b/.pinned @@ -1,12 +1,14 @@ bearssl;https://github.com/status-im/nim-bearssl@#667b40440a53a58e9f922e29e20818720c62d9ac chronicles;https://github.com/status-im/nim-chronicles@#32ac8679680ea699f7dbc046e8e0131cac97d41a -chronos;https://github.com/status-im/nim-chronos@#dc3847e4d6733dfc3811454c2a9c384b87343e26 +chronos;https://github.com/status-im/nim-chronos@#c04576d829b8a0a1b12baaa8bc92037501b3a4a0 dnsclient;https://github.com/ba0f3/dnsclient.nim@#23214235d4784d24aceed99bbfe153379ea557c8 faststreams;https://github.com/status-im/nim-faststreams@#720fc5e5c8e428d9d0af618e1e27c44b42350309 httputils;https://github.com/status-im/nim-http-utils@#3b491a40c60aad9e8d3407443f46f62511e63b18 json_serialization;https://github.com/status-im/nim-json-serialization@#85b7ea093cb85ee4f433a617b97571bd709d30df metrics;https://github.com/status-im/nim-metrics@#6142e433fc8ea9b73379770a788017ac528d46ff +ngtcp2;https://github.com/status-im/nim-ngtcp2@#6834f4756b6af58356ac9c4fef3d71db3c3ae5fe nimcrypto;https://github.com/cheatfate/nimcrypto@#1c8d6e3caf3abc572136ae9a1da81730c4eb4288 +quic;https://github.com/status-im/nim-quic.git@#ddcb31ffb74b5460ab37fd13547eca90594248bc results;https://github.com/arnetheduck/nim-results@#f3c666a272c69d70cb41e7245e7f6844797303ad secp256k1;https://github.com/status-im/nim-secp256k1@#7246d91c667f4cc3759fdd50339caa45a2ecd8be serialization;https://github.com/status-im/nim-serialization@#4bdbc29e54fe54049950e352bb969aab97173b35 diff --git a/libp2p.nim b/libp2p.nim index cbfcac975..e3e6942fc 100644 --- a/libp2p.nim +++ b/libp2p.nim @@ -52,6 +52,7 @@ else: stream/connection, transports/transport, transports/tcptransport, + transports/quictransport, protocols/secure/noise, cid, multihash, diff --git a/libp2p.nimble b/libp2p.nimble index ad05d0ee5..0762f2442 100644 --- a/libp2p.nimble +++ b/libp2p.nimble @@ -8,9 +8,10 @@ license = "MIT" skipDirs = @["tests", "examples", "Nim", "tools", "scripts", "docs"] requires "nim >= 1.6.0", - "nimcrypto >= 0.4.1", "dnsclient >= 0.3.0 & < 0.4.0", "bearssl >= 0.2.5", - "chronicles >= 0.10.2", "chronos >= 4.0.2", "metrics", "secp256k1", "stew#head", - "websock", "unittest2" + "nimcrypto >= 0.6.0 & < 0.7.0", "dnsclient >= 0.3.0 & < 0.4.0", "bearssl >= 0.2.5", + "chronicles >= 0.10.2", "chronos >= 4.0.3", "metrics", "secp256k1", "stew#head", + "websock", "unittest2", + "https://github.com/status-im/nim-quic.git#ddcb31ffb74b5460ab37fd13547eca90594248bc" let nimc = getEnv("NIMC", "nim") # Which nim compiler to use let lang = getEnv("NIMLANG", "c") # Which backend (c/cpp/js) diff --git a/libp2p/multiaddress.nim b/libp2p/multiaddress.nim index e1668dade..1e76d59d2 100644 --- a/libp2p/multiaddress.nim +++ b/libp2p/multiaddress.nim @@ -411,7 +411,12 @@ const UDP_IP* = mapAnd(IP, mapEq("udp")) UDP* = mapOr(UDP_DNS, UDP_IP) UTP* = mapAnd(UDP, mapEq("utp")) - QUIC* = mapAnd(UDP, mapEq("quic")) + QUIC_IP* = mapAnd(UDP_IP, mapEq("quic")) + QUIC_DNS* = mapAnd(UDP_DNS, mapEq("quic")) + QUIC* = mapOr(QUIC_DNS, QUIC_IP) + QUIC_V1_IP* = mapAnd(UDP_IP, mapEq("quic-v1")) + QUIC_V1_DNS* = mapAnd(UDP_DNS, mapEq("quic-v1")) + QUIC_V1* = mapOr(QUIC_V1_DNS, QUIC_V1_IP) UNIX* = mapEq("unix") WS_DNS* = mapAnd(TCP_DNS, mapEq("ws")) WS_IP* = mapAnd(TCP_IP, mapEq("ws")) diff --git a/libp2p/transports/quictransport.nim b/libp2p/transports/quictransport.nim new file mode 100644 index 000000000..f2515f82c --- /dev/null +++ b/libp2p/transports/quictransport.nim @@ -0,0 +1,224 @@ +import std/sequtils +import pkg/chronos +import pkg/chronicles +import pkg/quic +import ../multiaddress +import ../multicodec +import ../stream/connection +import ../wire +import ../muxers/muxer +import ../upgrademngrs/upgrade +import ./transport + +export multiaddress +export multicodec +export connection +export transport + +logScope: + topics = "libp2p quictransport" + +type + P2PConnection = connection.Connection + QuicConnection = quic.Connection + +# Stream +type QuicStream* = ref object of P2PConnection + stream: Stream + cached: seq[byte] + +proc new( + _: type QuicStream, stream: Stream, oaddr: Opt[MultiAddress], peerId: PeerId +): QuicStream = + let quicstream = QuicStream(stream: stream, observedAddr: oaddr, peerId: peerId) + procCall P2PConnection(quicstream).initStream() + quicstream + +template mapExceptions(body: untyped) = + try: + body + except QuicError: + raise newLPStreamEOFError() + except CatchableError: + raise newLPStreamEOFError() + +method readOnce*( + stream: QuicStream, pbytes: pointer, nbytes: int +): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} = + try: + if stream.cached.len == 0: + stream.cached = await stream.stream.read() + result = min(nbytes, stream.cached.len) + copyMem(pbytes, addr stream.cached[0], result) + stream.cached = stream.cached[result ..^ 1] + except CatchableError as exc: + raise newLPStreamEOFError() + +{.push warning[LockLevel]: off.} +method write*( + stream: QuicStream, bytes: seq[byte] +) {.async: (raises: [CancelledError, LPStreamError]).} = + mapExceptions(await stream.stream.write(bytes)) + +{.pop.} + +method closeImpl*(stream: QuicStream) {.async: (raises: []).} = + try: + await stream.stream.close() + except CatchableError as exc: + discard + await procCall P2PConnection(stream).closeImpl() + +# Session +type QuicSession* = ref object of P2PConnection + connection: QuicConnection + +method close*(session: QuicSession) {.async, base.} = + await session.connection.close() + await procCall P2PConnection(session).close() + +proc getStream*( + session: QuicSession, direction = Direction.In +): Future[QuicStream] {.async.} = + var stream: Stream + case direction + of Direction.In: + stream = await session.connection.incomingStream() + of Direction.Out: + stream = await session.connection.openStream() + await stream.write(@[]) # QUIC streams do not exist until data is sent + return QuicStream.new(stream, session.observedAddr, session.peerId) + +method getWrapped*(self: QuicSession): P2PConnection = + nil + +# Muxer +type QuicMuxer = ref object of Muxer + quicSession: QuicSession + handleFut: Future[void] + +method newStream*( + m: QuicMuxer, name: string = "", lazy: bool = false +): Future[P2PConnection] {. + async: (raises: [CancelledError, LPStreamError, MuxerError]) +.} = + try: + return await m.quicSession.getStream(Direction.Out) + except CatchableError as exc: + raise newException(MuxerError, exc.msg, exc) + +proc handleStream(m: QuicMuxer, chann: QuicStream) {.async.} = + ## call the muxer stream handler for this channel + ## + try: + await m.streamHandler(chann) + trace "finished handling stream" + doAssert(chann.closed, "connection not closed by handler!") + except CatchableError as exc: + trace "Exception in mplex stream handler", msg = exc.msg + await chann.close() + +method handle*(m: QuicMuxer): Future[void] {.async: (raises: []).} = + try: + while not m.quicSession.atEof: + let incomingStream = await m.quicSession.getStream(Direction.In) + asyncSpawn m.handleStream(incomingStream) + except CatchableError as exc: + trace "Exception in mplex handler", msg = exc.msg + +method close*(m: QuicMuxer) {.async: (raises: []).} = + try: + await m.quicSession.close() + m.handleFut.cancel() + except CatchableError as exc: + discard + +# Transport +type QuicUpgrade = ref object of Upgrade + +type QuicTransport* = ref object of Transport + listener: Listener + connections: seq[P2PConnection] + +func new*(_: type QuicTransport, u: Upgrade): QuicTransport = + QuicTransport(upgrader: QuicUpgrade(ms: u.ms)) + +method handles*(transport: QuicTransport, address: MultiAddress): bool = + if not procCall Transport(transport).handles(address): + return false + QUIC_V1.match(address) + +method start*(transport: QuicTransport, addrs: seq[MultiAddress]) {.async.} = + doAssert transport.listener.isNil, "start() already called" + #TODO handle multiple addr + transport.listener = listen(initTAddress(addrs[0]).tryGet) + await procCall Transport(transport).start(addrs) + transport.addrs[0] = + MultiAddress.init(transport.listener.localAddress(), IPPROTO_UDP).tryGet() & + MultiAddress.init("/quic-v1").get() + transport.running = true + +method stop*(transport: QuicTransport) {.async.} = + if transport.running: + for c in transport.connections: + await c.close() + await procCall Transport(transport).stop() + await transport.listener.stop() + transport.running = false + transport.listener = nil + +proc wrapConnection( + transport: QuicTransport, connection: QuicConnection +): P2PConnection {.raises: [Defect, TransportOsError, LPError].} = + let + remoteAddr = connection.remoteAddress() + observedAddr = + MultiAddress.init(remoteAddr, IPPROTO_UDP).get() & + MultiAddress.init("/quic-v1").get() + conres = QuicSession(connection: connection, observedAddr: Opt.some(observedAddr)) + conres.initStream() + + transport.connections.add(conres) + proc onClose() {.async.} = + await conres.join() + transport.connections.keepItIf(it != conres) + trace "Cleaned up client" + + asyncSpawn onClose() + return conres + +method accept*(transport: QuicTransport): Future[P2PConnection] {.async.} = + doAssert not transport.listener.isNil, "call start() before calling accept()" + let connection = await transport.listener.accept() + return transport.wrapConnection(connection) + +method dial*( + transport: QuicTransport, + hostname: string, + address: MultiAddress, + peerId: Opt[PeerId] = Opt.none(PeerId), +): Future[P2PConnection] {.async, gcsafe.} = + let connection = await dial(initTAddress(address).tryGet) + return transport.wrapConnection(connection) + +method upgrade*( + self: QuicTransport, conn: P2PConnection, peerId: Opt[PeerId] +): Future[Muxer] {.async: (raises: [CancelledError, LPError]).} = + let qs = QuicSession(conn) + if peerId.isSome: + qs.peerId = peerId.get() + + let muxer = QuicMuxer(quicSession: qs, connection: conn) + muxer.streamHandler = proc(conn: P2PConnection) {.async: (raises: []).} = + trace "Starting stream handler" + try: + await self.upgrader.ms.handle(conn) # handle incoming connection + except CancelledError as exc: + return + except CatchableError as exc: + trace "exception in stream handler", conn, msg = exc.msg + finally: + await conn.closeWithEOF() + trace "Stream handler done", conn + muxer.handleFut = muxer.handle() + return muxer diff --git a/libp2p/wire.nim b/libp2p/wire.nim index 05625db03..b37b585fd 100644 --- a/libp2p/wire.nim +++ b/libp2p/wire.nim @@ -20,7 +20,7 @@ when defined(windows): import winlean else: import posix const RTRANSPMA* = mapOr(TCP, WebSockets, UNIX) - TRANSPMA* = mapOr(RTRANSPMA, UDP) + TRANSPMA* = mapOr(RTRANSPMA, QUIC, UDP) proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress] = ## Initialize ``TransportAddress`` with MultiAddress ``ma``. @@ -28,7 +28,7 @@ proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress] = ## MultiAddress must be wire address, e.g. ``{IP4, IP6, UNIX}/{TCP, UDP}``. ## - if mapOr(TCP_IP, WebSockets_IP, UNIX, UDP_IP).match(ma): + if mapOr(TCP_IP, WebSockets_IP, UNIX, UDP_IP, QUIC_V1_IP).match(ma): var pbuf: array[2, byte] let code = (?(?ma[0]).protoCode()) if code == multiCodec("unix"): diff --git a/tests/testmultiaddress.nim b/tests/testmultiaddress.nim index ee7f6364f..acdcf0544 100644 --- a/tests/testmultiaddress.nim +++ b/tests/testmultiaddress.nim @@ -26,10 +26,11 @@ const SuccessVectors = [ "/ip4/1.2.3.4", "/ip4/0.0.0.0", "/ip6/::1", "/ip6/2601:9:4f81:9700:803e:ca65:66e8:c21", - "/ip6/2601:9:4f81:9700:803e:ca65:66e8:c21/udp/1234/quic", "/ip6zone/x/ip6/fe80::1", - "/ip6zone/x%y/ip6/fe80::1", "/ip6zone/x%y/ip6/::", - "/ip6zone/x/ip6/fe80::1/udp/1234/quic", "/onion/timaq4ygg2iegci7:1234", - "/onion/timaq4ygg2iegci7:80/http", + "/ip6/2601:9:4f81:9700:803e:ca65:66e8:c21/udp/1234/quic", + "/ip6/2601:9:4f81:9700:803e:ca65:66e8:c21/udp/1234/quic-v1", + "/ip6zone/x/ip6/fe80::1", "/ip6zone/x%y/ip6/fe80::1", "/ip6zone/x%y/ip6/::", + "/ip6zone/x/ip6/fe80::1/udp/1234/quic", "/ip6zone/x/ip6/fe80::1/udp/1234/quic-v1", + "/onion/timaq4ygg2iegci7:1234", "/onion/timaq4ygg2iegci7:80/http", "/onion3/vww6ybal4bd7szmgncyruucpgfkqahzddi37ktceo3ah7ngmcopnpyyd:1234", "/onion3/vww6ybal4bd7szmgncyruucpgfkqahzddi37ktceo3ah7ngmcopnpyyd:80/http", "/udp/0", "/tcp/0", "/sctp/0", "/udp/1234", "/tcp/1234", "/sctp/1234", "/udp/65535", @@ -57,7 +58,7 @@ const FailureVectors = [ "", "/", "/ip4", "/ip4/::1", "/ip4/fdpsofodsajfdoisa", "/ip6", "/ip6zone", "/ip6zone/", "/ip6zone//ip6/fe80::1", "/udp", "/tcp", "/sctp", "/udp/65536", - "/tcp/65536", "/quic/65536", "/onion/9imaq4ygg2iegci7:80", + "/tcp/65536", "/quic/65536", "/quic-v1/65536", "/onion/9imaq4ygg2iegci7:80", "/onion/aaimaq4ygg2iegci7:80", "/onion/timaq4ygg2iegci7:0", "/onion/timaq4ygg2iegci7:-1", "/onion/timaq4ygg2iegci7", "/onion/timaq4ygg2iegci@:666", @@ -70,8 +71,8 @@ const "/udp/1234/sctp", "/udp/1234/udt/1234", "/udp/1234/utp/1234", "/ip4/127.0.0.1/udp/jfodsajfidosajfoidsa", "/ip4/127.0.0.1/udp", "/ip4/127.0.0.1/tcp/jfodsajfidosajfoidsa", "/ip4/127.0.0.1/tcp", - "/ip4/127.0.0.1/quic/1234", "/ip4/127.0.0.1/ipfs", "/ip4/127.0.0.1/ipfs/tcp", - "/ip4/127.0.0.1/p2p", "/ip4/127.0.0.1/p2p/tcp", "/unix", + "/ip4/127.0.0.1/quic/1234", "/ip4/127.0.0.1/quic-v1/1234", "/ip4/127.0.0.1/ipfs", + "/ip4/127.0.0.1/ipfs/tcp", "/ip4/127.0.0.1/p2p", "/ip4/127.0.0.1/p2p/tcp", "/unix", ] RustSuccessVectors = [ @@ -160,6 +161,15 @@ const "/quic", ], ), + PatternVector( + pattern: QUIC_V1, + good: @["/ip4/1.2.3.4/udp/1234/quic-v1", "/ip6/::/udp/1234/quic-v1"], + bad: + @[ + "/ip4/0.0.0.0/tcp/12345/quic-v1", "/ip6/fc00::/ip4/0.0.0.0/udp/1234/quic-v1", + "/quic-v1", + ], + ), PatternVector( pattern: IPFS, good: diff --git a/tests/testquic.nim b/tests/testquic.nim new file mode 100644 index 000000000..4d342184f --- /dev/null +++ b/tests/testquic.nim @@ -0,0 +1,24 @@ +{.used.} + +import sequtils +import chronos, stew/byteutils +import + ../libp2p/[ + stream/connection, + transports/transport, + transports/quictransport, + upgrademngrs/upgrade, + multiaddress, + errors, + wire, + ] + +import ./helpers, ./commontransport + +suite "Quic transport": + asyncTest "can handle local address": + let ma = @[MultiAddress.init("/ip4/127.0.0.1/udp/0/quic-v1").tryGet()] + let transport1 = QuicTransport.new() + await transport1.start(ma) + check transport1.handles(transport1.addrs[0]) + await transport1.stop() diff --git a/tests/testswitch.nim b/tests/testswitch.nim index d15e4f8bf..f58e1d7d6 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -34,6 +34,7 @@ import utils/semaphore, transports/tcptransport, transports/wstransport, + transports/quictransport, ] import ./helpers @@ -988,6 +989,42 @@ suite "Switch": await srcWsSwitch.stop() await srcTcpSwitch.stop() + asyncTest "e2e quic transport": + let + quicAddress1 = MultiAddress.init("/ip4/127.0.0.1/udp/0/quic-v1").tryGet() + quicAddress2 = MultiAddress.init("/ip4/127.0.0.1/udp/0/quic-v1").tryGet() + + srcSwitch = SwitchBuilder + .new() + .withAddress(quicAddress1) + .withRng(crypto.newRng()) + .withTransport( + proc(upgr: Upgrade): Transport = + QuicTransport.new(upgr) + ) + .withNoise() + .build() + + destSwitch = SwitchBuilder + .new() + .withAddress(quicAddress2) + .withRng(crypto.newRng()) + .withTransport( + proc(upgr: Upgrade): Transport = + QuicTransport.new(upgr) + ) + .withNoise() + .build() + + await destSwitch.start() + await srcSwitch.start() + + await srcSwitch.connect(destSwitch.peerInfo.peerId, destSwitch.peerInfo.addrs) + check srcSwitch.isConnected(destSwitch.peerInfo.peerId) + + await destSwitch.stop() + await srcSwitch.stop() + asyncTest "mount unstarted protocol": proc handle(conn: Connection, proto: string) {.async.} = check "test123" == string.fromBytes(await conn.readLp(1024))