mirror of
https://github.com/status-im/nim-libp2p.git
synced 2025-02-27 20:10:50 +00:00
Initial implem
Co-authored-by: markspanbroek <mark@spanbroek.net>
This commit is contained in:
parent
36f3132d9a
commit
c7eef2e0ae
10
.pinned
10
.pinned
@ -1,17 +1,21 @@
|
|||||||
asynctest;https://github.com/markspanbroek/asynctest@#5347c59b4b057443a014722aa40800cd8bb95c69
|
asynctest;https://github.com/markspanbroek/asynctest@#5347c59b4b057443a014722aa40800cd8bb95c69
|
||||||
bearssl;https://github.com/status-im/nim-bearssl@#0ebb1d7a4af5f4b4d4756a9b6dbfe5d411fa55d9
|
bearssl;https://github.com/status-im/nim-bearssl@#0ebb1d7a4af5f4b4d4756a9b6dbfe5d411fa55d9
|
||||||
chronicles;https://github.com/status-im/nim-chronicles@#2a2681b60289aaf7895b7056f22616081eb1a882
|
chronicles;https://github.com/status-im/nim-chronicles@#2a2681b60289aaf7895b7056f22616081eb1a882
|
||||||
chronos;https://github.com/status-im/nim-chronos@#875d7d8e6ef0803ae1c331dbf76b1981b0caeb15
|
chronos;https://github.com/status-im/nim-chronos@#b3548583fcc768d93654685e7ea55126c1752c29
|
||||||
dnsclient;https://github.com/ba0f3/dnsclient.nim@#fbb76f8af8a33ab818184a7d4406d9fee20993be
|
dnsclient;https://github.com/ba0f3/dnsclient.nim@#fbb76f8af8a33ab818184a7d4406d9fee20993be
|
||||||
faststreams;https://github.com/status-im/nim-faststreams@#49e2c52eb5dda46b1c9c10d079abe7bffe6cea89
|
faststreams;https://github.com/status-im/nim-faststreams@#49e2c52eb5dda46b1c9c10d079abe7bffe6cea89
|
||||||
httputils;https://github.com/status-im/nim-http-utils@#f83fbce4d6ec7927b75be3f85e4fa905fcb69788
|
httputils;https://github.com/status-im/nim-http-utils@#f83fbce4d6ec7927b75be3f85e4fa905fcb69788
|
||||||
json_serialization;https://github.com/status-im/nim-json-serialization@#3509706517f3562cbcbe9d94988eccdd80474ab8
|
json_serialization;https://github.com/status-im/nim-json-serialization@#3509706517f3562cbcbe9d94988eccdd80474ab8
|
||||||
metrics;https://github.com/status-im/nim-metrics@#11edec862f96e42374bc2d584c84cc88d5d1f95f
|
metrics;https://github.com/status-im/nim-metrics@#11edec862f96e42374bc2d584c84cc88d5d1f95f
|
||||||
|
ngtcp2;https://github.com/status-im/nim-ngtcp2@#fe5e54ee6ccd98ba5a5f162db886371d97d7d5a4
|
||||||
nimcrypto;https://github.com/cheatfate/nimcrypto@#a5742a9a214ac33f91615f3862c7b099aec43b00
|
nimcrypto;https://github.com/cheatfate/nimcrypto@#a5742a9a214ac33f91615f3862c7b099aec43b00
|
||||||
|
questionable;https://github.com/markspanbroek/questionable@#d7e9f0bf7fec14df13a26e699437a2fe577b26ba
|
||||||
|
quic;https://github.com/status-im/nim-quic.git@#626d18dec86a7fc12f2ccc61ae8a4ce7eca9f3ee
|
||||||
secp256k1;https://github.com/status-im/nim-secp256k1@#e092373a5cbe1fa25abfc62e0f2a5f138dc3fb13
|
secp256k1;https://github.com/status-im/nim-secp256k1@#e092373a5cbe1fa25abfc62e0f2a5f138dc3fb13
|
||||||
serialization;https://github.com/status-im/nim-serialization@#9631fbd1c81c8b25ff8740df440ca7ba87fa6131
|
serialization;https://github.com/status-im/nim-serialization@#9631fbd1c81c8b25ff8740df440ca7ba87fa6131
|
||||||
stew;https://github.com/status-im/nim-stew@#cdb1f213d073fd2ecbdaf35a866417657da9294c
|
stew;https://github.com/status-im/nim-stew@#412a691f5d29c93bee8f083d213ee8f2c6578bed
|
||||||
testutils;https://github.com/status-im/nim-testutils@#aa6e5216f4b4ab5aa971cdcdd70e1ec1203cedf2
|
testutils;https://github.com/status-im/nim-testutils@#aa6e5216f4b4ab5aa971cdcdd70e1ec1203cedf2
|
||||||
unittest2;https://github.com/status-im/nim-unittest2@#4e2893eacb916c7678fdc4935ff7420f13bf3a9c
|
unittest2;https://github.com/status-im/nim-unittest2@#4e2893eacb916c7678fdc4935ff7420f13bf3a9c
|
||||||
websock;https://github.com/status-im/nim-websock@#8927db93f6ca96abaacfea39f8ca50ce9d41bcdb
|
upraises;https://github.com/markspanbroek/upraises@#d9f268db1021959fe0f2c7a5e49fba741f9932a0
|
||||||
|
websock;https://github.com/status-im/nim-websock@#47b486b52f850d3534b8a1e778fcf9cf40ffe7f6
|
||||||
zlib;https://github.com/status-im/nim-zlib@#74cdeb54b21bededb5a515d36f608bc1850555a2
|
zlib;https://github.com/status-im/nim-zlib@#74cdeb54b21bededb5a515d36f608bc1850555a2
|
@ -13,6 +13,7 @@ requires "nim >= 1.2.0",
|
|||||||
"bearssl >= 0.1.4",
|
"bearssl >= 0.1.4",
|
||||||
"chronicles >= 0.10.2",
|
"chronicles >= 0.10.2",
|
||||||
"chronos >= 3.0.6",
|
"chronos >= 3.0.6",
|
||||||
|
"https://github.com/status-im/nim-quic.git",
|
||||||
"metrics",
|
"metrics",
|
||||||
"secp256k1",
|
"secp256k1",
|
||||||
"stew#head",
|
"stew#head",
|
||||||
|
242
libp2p/transports/quictransport.nim
Normal file
242
libp2p/transports/quictransport.nim
Normal file
@ -0,0 +1,242 @@
|
|||||||
|
import std/sequtils
|
||||||
|
import pkg/chronos
|
||||||
|
import pkg/chronicles
|
||||||
|
import pkg/quic
|
||||||
|
import ../multiaddress
|
||||||
|
import ../multicodec
|
||||||
|
import ../stream/connection
|
||||||
|
import ../wire
|
||||||
|
import ../muxers/muxer
|
||||||
|
import ../upgrademngrs/upgrade
|
||||||
|
import ./transport
|
||||||
|
|
||||||
|
export multiaddress
|
||||||
|
export multicodec
|
||||||
|
export connection
|
||||||
|
export transport
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "libp2p quictransport"
|
||||||
|
|
||||||
|
type
|
||||||
|
P2PConnection = connection.Connection
|
||||||
|
QuicConnection = quic.Connection
|
||||||
|
|
||||||
|
# Stream
|
||||||
|
type
|
||||||
|
QuicStream* = ref object of P2PConnection
|
||||||
|
stream: Stream
|
||||||
|
cached: seq[byte]
|
||||||
|
|
||||||
|
proc new(_: type QuicStream, stream: Stream, oaddr: MultiAddress, peerId: PeerId): QuicStream =
|
||||||
|
let quicstream = QuicStream(stream: stream, observedAddr: oaddr, peerId: peerId)
|
||||||
|
procCall P2PConnection(quicstream).initStream()
|
||||||
|
quicstream
|
||||||
|
|
||||||
|
template mapExceptions(body: untyped) =
|
||||||
|
try:
|
||||||
|
body
|
||||||
|
except QuicError:
|
||||||
|
raise newLPStreamEOFError()
|
||||||
|
|
||||||
|
method readOnce*(stream: QuicStream,
|
||||||
|
pbytes: pointer,
|
||||||
|
nbytes: int): Future[int] {.async.} =
|
||||||
|
if stream.cached.len == 0:
|
||||||
|
stream.cached = await mapExceptions(stream.stream.read())
|
||||||
|
if stream.cached.len <= nbytes:
|
||||||
|
copyMem(pbytes, addr stream.cached[0], stream.cached.len)
|
||||||
|
result = stream.cached.len
|
||||||
|
stream.cached = @[]
|
||||||
|
else:
|
||||||
|
copyMem(pbytes, addr stream.cached[0], nbytes)
|
||||||
|
result = nbytes
|
||||||
|
stream.cached = stream.cached[nbytes..^1]
|
||||||
|
|
||||||
|
{.push warning[LockLevel]: off.}
|
||||||
|
method write*(stream: QuicStream, bytes: seq[byte]) {.async.} =
|
||||||
|
mapExceptions(await stream.stream.write(bytes))
|
||||||
|
{.pop.}
|
||||||
|
|
||||||
|
method closeImpl*(stream: QuicStream) {.async.} =
|
||||||
|
await stream.stream.close()
|
||||||
|
await procCall P2PConnection(stream).closeImpl()
|
||||||
|
|
||||||
|
# Session
|
||||||
|
type
|
||||||
|
QuicSession* = ref object of P2PConnection
|
||||||
|
connection: QuicConnection
|
||||||
|
|
||||||
|
method close*(session: QuicSession) {.async.} =
|
||||||
|
await session.connection.close()
|
||||||
|
await procCall P2PConnection(session).close()
|
||||||
|
|
||||||
|
proc getStream*(session: QuicSession,
|
||||||
|
direction = Direction.In): Future[QuicStream] {.async.} =
|
||||||
|
var stream: Stream
|
||||||
|
case direction:
|
||||||
|
of Direction.In:
|
||||||
|
stream = await session.connection.incomingStream()
|
||||||
|
of Direction.Out:
|
||||||
|
stream = await session.connection.openStream()
|
||||||
|
await stream.write(@[]) # QUIC streams do not exist until data is sent
|
||||||
|
return QuicStream.new(stream, session.observedAddr, session.peerId)
|
||||||
|
|
||||||
|
# Muxer
|
||||||
|
type
|
||||||
|
QuicMuxer = ref object of Muxer
|
||||||
|
quicSession: QuicSession
|
||||||
|
handleFut: Future[void]
|
||||||
|
|
||||||
|
method newStream*(m: QuicMuxer, name: string = "", lazy: bool = false): Future[P2PConnection] {.async, gcsafe.} =
|
||||||
|
return await m.quicSession.getStream(Direction.Out)
|
||||||
|
|
||||||
|
proc handleStream(m: QuicMuxer, chann: QuicStream) {.async.} =
|
||||||
|
## call the muxer stream handler for this channel
|
||||||
|
##
|
||||||
|
try:
|
||||||
|
await m.streamHandler(chann)
|
||||||
|
trace "finished handling stream"
|
||||||
|
doAssert(chann.closed, "connection not closed by handler!")
|
||||||
|
except CatchableError as exc:
|
||||||
|
trace "Exception in mplex stream handler", msg = exc.msg
|
||||||
|
await chann.close()
|
||||||
|
|
||||||
|
|
||||||
|
method handle*(m: QuicMuxer): Future[void] {.async, gcsafe.} =
|
||||||
|
while not m.quicSession.atEof:
|
||||||
|
let incomingStream = await m.quicSession.getStream(Direction.In)
|
||||||
|
asyncSpawn m.handleStream(incomingStream)
|
||||||
|
|
||||||
|
method close*(m: QuicMuxer) {.async, gcsafe.} =
|
||||||
|
await m.quicSession.close()
|
||||||
|
m.handleFut.cancel()
|
||||||
|
|
||||||
|
# Upgrader
|
||||||
|
type
|
||||||
|
QuicUpgrade = ref object of Upgrade
|
||||||
|
|
||||||
|
proc identify(
|
||||||
|
self: QuicUpgrade,
|
||||||
|
conn: QuicSession
|
||||||
|
) {.async, gcsafe.} =
|
||||||
|
# new stream for identify
|
||||||
|
let muxer = QuicMuxer(quicSession: conn, connection: conn)
|
||||||
|
muxer.streamHandler = proc(conn: P2PConnection) {.async, gcsafe, raises: [Defect].} =
|
||||||
|
trace "Starting stream handler"
|
||||||
|
try:
|
||||||
|
await self.ms.handle(conn) # handle incoming connection
|
||||||
|
except CancelledError as exc:
|
||||||
|
raise exc
|
||||||
|
except CatchableError as exc:
|
||||||
|
trace "exception in stream handler", conn, msg = exc.msg
|
||||||
|
finally:
|
||||||
|
await conn.closeWithEOF()
|
||||||
|
trace "Stream handler done", conn
|
||||||
|
|
||||||
|
self.connManager.storeConn(conn)
|
||||||
|
# store it in muxed connections if we have a peer for it
|
||||||
|
muxer.handleFut = muxer.handle()
|
||||||
|
self.connManager.storeMuxer(muxer, muxer.handleFut)
|
||||||
|
|
||||||
|
var stream = await conn.getStream(Direction.Out)
|
||||||
|
if stream == nil:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
await self.identify(stream)
|
||||||
|
finally:
|
||||||
|
await stream.closeWithEOF()
|
||||||
|
|
||||||
|
method upgradeIncoming*(
|
||||||
|
self: QuicUpgrade,
|
||||||
|
conn: P2PConnection): Future[void] {.async.} =
|
||||||
|
let qs = QuicSession(conn)
|
||||||
|
#TODO home made shortcut to get the Peer's id
|
||||||
|
# in the future, Quic encryption should be used
|
||||||
|
# instead
|
||||||
|
let stream = await qs.getStream(Direction.Out)
|
||||||
|
await stream.writeLp(self.identity.peerInfo.peerId.getBytes())
|
||||||
|
assert qs.peerId.init(await stream.readLp(1024))
|
||||||
|
await stream.close()
|
||||||
|
|
||||||
|
try:
|
||||||
|
await self.identify(qs)
|
||||||
|
except CatchableError as exc:
|
||||||
|
info "Failed to upgrade incoming connection", msg=exc.msg
|
||||||
|
|
||||||
|
method upgradeOutgoing*(
|
||||||
|
self: QuicUpgrade,
|
||||||
|
conn: P2PConnection): Future[P2PConnection] {.async.} =
|
||||||
|
let qs = QuicSession(conn)
|
||||||
|
#TODO home made shortcut to get the Peer's id
|
||||||
|
let stream = await qs.getStream(Direction.In)
|
||||||
|
await stream.writeLp(self.identity.peerInfo.peerId.getBytes())
|
||||||
|
assert qs.peerId.init(await stream.readLp(1024))
|
||||||
|
await stream.close()
|
||||||
|
|
||||||
|
await self.identify(qs)
|
||||||
|
return conn
|
||||||
|
|
||||||
|
# Transport
|
||||||
|
type
|
||||||
|
QuicTransport* = ref object of Transport
|
||||||
|
listener: Listener
|
||||||
|
connections: seq[P2PConnection]
|
||||||
|
|
||||||
|
func new*(_: type QuicTransport, u: Upgrade): QuicTransport =
|
||||||
|
QuicTransport(
|
||||||
|
upgrader: QuicUpgrade(
|
||||||
|
ms: u.ms,
|
||||||
|
identity: u.identity,
|
||||||
|
connManager: u.connManager
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
method handles*(transport: QuicTransport, address: MultiAddress): bool =
|
||||||
|
if not procCall Transport(transport).handles(address):
|
||||||
|
return false
|
||||||
|
QUIC.match(address)
|
||||||
|
|
||||||
|
method start*(transport: QuicTransport, addrs: seq[MultiAddress]) {.async.} =
|
||||||
|
doAssert transport.listener.isNil, "start() already called"
|
||||||
|
#TODO handle multiple addr
|
||||||
|
#TODO resolve used address
|
||||||
|
transport.listener = listen(initTAddress(addrs[0]).tryGet)
|
||||||
|
await procCall Transport(transport).start(addrs)
|
||||||
|
transport.running = true
|
||||||
|
|
||||||
|
method stop*(transport: QuicTransport) {.async.} =
|
||||||
|
if transport.running:
|
||||||
|
for c in transport.connections:
|
||||||
|
await c.close()
|
||||||
|
await procCall Transport(transport).stop()
|
||||||
|
await transport.listener.stop()
|
||||||
|
transport.running = false
|
||||||
|
transport.listener = nil
|
||||||
|
|
||||||
|
proc wrapConnection(transport: QuicTransport, connection: QuicConnection): P2PConnection =
|
||||||
|
#TODO currently not exposed from nim-quic
|
||||||
|
let
|
||||||
|
observedAddr = MultiAddress.init("/ip4/0.0.0.0/udp/0/quic").tryGet()
|
||||||
|
conres = QuicSession(connection: connection, observedAddr: observedAddr)
|
||||||
|
conres.initStream()
|
||||||
|
|
||||||
|
transport.connections.add(conres)
|
||||||
|
proc onClose() {.async.} =
|
||||||
|
await conres.join()
|
||||||
|
transport.connections.keepItIf(it != conres)
|
||||||
|
trace "Cleaned up client"
|
||||||
|
asyncSpawn onClose()
|
||||||
|
return conres
|
||||||
|
|
||||||
|
method accept*(transport: QuicTransport): Future[P2PConnection] {.async.} =
|
||||||
|
doAssert not transport.listener.isNil, "call start() before calling accept()"
|
||||||
|
let connection = await transport.listener.accept()
|
||||||
|
return transport.wrapConnection(connection)
|
||||||
|
|
||||||
|
method dial*(transport: QuicTransport,
|
||||||
|
hostname: string,
|
||||||
|
address: MultiAddress): Future[P2PConnection] {.async.} =
|
||||||
|
let connection = await dial(initTAddress(address).tryGet)
|
||||||
|
return transport.wrapConnection(connection)
|
@ -27,6 +27,7 @@ const
|
|||||||
|
|
||||||
TRANSPMA* = mapOr(
|
TRANSPMA* = mapOr(
|
||||||
RTRANSPMA,
|
RTRANSPMA,
|
||||||
|
QUIC,
|
||||||
UDP
|
UDP
|
||||||
)
|
)
|
||||||
|
|
||||||
|
21
tests/testquic.nim
Normal file
21
tests/testquic.nim
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
{.used.}
|
||||||
|
|
||||||
|
import sequtils
|
||||||
|
import chronos, stew/byteutils
|
||||||
|
import ../libp2p/[stream/connection,
|
||||||
|
transports/transport,
|
||||||
|
transports/quictransport,
|
||||||
|
upgrademngrs/upgrade,
|
||||||
|
multiaddress,
|
||||||
|
errors,
|
||||||
|
wire]
|
||||||
|
|
||||||
|
import ./helpers, ./commontransport
|
||||||
|
|
||||||
|
suite "Quic transport":
|
||||||
|
asyncTest "can handle local address":
|
||||||
|
let ma = @[MultiAddress.init("/ip4/127.0.0.1/udp/45894/quic").tryGet()]
|
||||||
|
let transport1 = QuicTransport.new()
|
||||||
|
await transport1.start(ma)
|
||||||
|
check transport1.handles(transport1.addrs[0])
|
||||||
|
await transport1.stop()
|
@ -22,7 +22,8 @@ import ../libp2p/[errors,
|
|||||||
nameresolving/mockresolver,
|
nameresolving/mockresolver,
|
||||||
stream/chronosstream,
|
stream/chronosstream,
|
||||||
transports/tcptransport,
|
transports/tcptransport,
|
||||||
transports/wstransport]
|
transports/wstransport,
|
||||||
|
transports/quictransport]
|
||||||
import ./helpers
|
import ./helpers
|
||||||
|
|
||||||
const
|
const
|
||||||
@ -979,3 +980,34 @@ suite "Switch":
|
|||||||
await destSwitch.stop()
|
await destSwitch.stop()
|
||||||
await srcWsSwitch.stop()
|
await srcWsSwitch.stop()
|
||||||
await srcTcpSwitch.stop()
|
await srcTcpSwitch.stop()
|
||||||
|
|
||||||
|
asyncTest "e2e quic transport":
|
||||||
|
let
|
||||||
|
#TODO port 0 doesn't work yet
|
||||||
|
quicAddress1 = MultiAddress.init("/ip4/127.0.0.1/udp/4567/quic").tryGet()
|
||||||
|
quicAddress2 = MultiAddress.init("/ip4/127.0.0.1/udp/4566/quic").tryGet()
|
||||||
|
|
||||||
|
srcSwitch =
|
||||||
|
SwitchBuilder.new()
|
||||||
|
.withAddress(quicAddress1)
|
||||||
|
.withRng(crypto.newRng())
|
||||||
|
.withTransport(proc (upgr: Upgrade): Transport = QuicTransport.new(upgr))
|
||||||
|
.withNoise()
|
||||||
|
.build()
|
||||||
|
|
||||||
|
destSwitch =
|
||||||
|
SwitchBuilder.new()
|
||||||
|
.withAddress(quicAddress2)
|
||||||
|
.withRng(crypto.newRng())
|
||||||
|
.withTransport(proc (upgr: Upgrade): Transport = QuicTransport.new(upgr))
|
||||||
|
.withNoise()
|
||||||
|
.build()
|
||||||
|
|
||||||
|
await destSwitch.start()
|
||||||
|
await srcSwitch.start()
|
||||||
|
|
||||||
|
await srcSwitch.connect(destSwitch.peerInfo.peerId, destSwitch.peerInfo.addrs)
|
||||||
|
check srcSwitch.isConnected(destSwitch.peerInfo.peerId)
|
||||||
|
|
||||||
|
await destSwitch.stop()
|
||||||
|
await srcSwitch.stop()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user