almost compiling

This commit is contained in:
Tanguy 2023-10-13 18:07:52 +02:00
parent e0f2b00f9a
commit 30e93e7c0a
No known key found for this signature in database
GPG Key ID: 7DD8EC6B6CE6C45E
3 changed files with 90 additions and 46 deletions

View File

@ -15,6 +15,6 @@ serialization;https://github.com/status-im/nim-serialization@#4bdbc29e54fe540499
stew;https://github.com/status-im/nim-stew@#3159137d9a3110edb4024145ce0ba778975de40e stew;https://github.com/status-im/nim-stew@#3159137d9a3110edb4024145ce0ba778975de40e
testutils;https://github.com/status-im/nim-testutils@#dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34 testutils;https://github.com/status-im/nim-testutils@#dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34
unittest2;https://github.com/status-im/nim-unittest2@#2300fa9924a76e6c96bc4ea79d043e3a0f27120c unittest2;https://github.com/status-im/nim-unittest2@#2300fa9924a76e6c96bc4ea79d043e3a0f27120c
webrtc;https://github.com/status-im/nim-webrtc.git@#6174511f5b8f89152252f39404e8f30e6dcd7c4c webrtc;https://github.com/status-im/nim-webrtc.git@#7dfb18eefdbd2e722cdec6d6a7b255015365d308
websock;https://github.com/status-im/nim-websock@#f8ed9b40a5ff27ad02a3c237c4905b0924e3f982 websock;https://github.com/status-im/nim-websock@#f8ed9b40a5ff27ad02a3c237c4905b0924e3f982
zlib;https://github.com/status-im/nim-zlib@#a2f44bb7f65571a894227ff6fde9298a104e03a5 zlib;https://github.com/status-im/nim-zlib@#a2f44bb7f65571a894227ff6fde9298a104e03a5

View File

@ -13,18 +13,23 @@
{.push raises: [].} {.push raises: [].}
import std/[sequtils] import std/[sequtils]
import stew/results import stew/[endians2, byteutils, objects, results]
import chronos, chronicles import chronos, chronicles
import transport, import transport,
../errors, ../errors,
../wire, ../wire,
../multicodec, ../multicodec,
../protobuf/minprotobuf,
../connmanager, ../connmanager,
../muxers/muxer,
../multiaddress, ../multiaddress,
../stream/connection, ../stream/connection,
../upgrademngrs/upgrade, ../upgrademngrs/upgrade,
../protocols/secure/noise,
../utility ../utility
import webrtc/webrtc, webrtc/datachannel
logScope: logScope:
topics = "libp2p webrtctransport" topics = "libp2p webrtctransport"
@ -50,22 +55,22 @@ proc decode(_: type WebRtcMessage, bytes: seq[byte]): Opt[WebRtcMessage] =
pb = initProtoBuffer(bytes) pb = initProtoBuffer(bytes)
flagOrd: uint32 flagOrd: uint32
res: WebRtcMessage res: WebRtcMessage
if ? pb.getField(1, flagOrd): if ? pb.getField(1, flagOrd).toOpt():
var flag: MessageFlag var flag: MessageFlag
if flag.checkEnumAssign(flagOrd): if flag.checkedEnumAssign(flagOrd):
res.flag = Opt.some(flag) res.flag = Opt.some(flag)
discard ? pb.getField(2, res.data) discard ? pb.getField(2, res.data).toOpt()
Opt.some(res) Opt.some(res)
proc encode(msg: WebRtcMessage): seq[byte] = proc encode(msg: WebRtcMessage): seq[byte] =
var pb = initProtoBuffer() var pb = initProtoBuffer()
msg.flag.withValue(val): msg.flag.withValue(val):
pb.writeField(1, val) pb.write(1, uint32(val))
if msg.data.len > 0: if msg.data.len > 0:
pb.writeField(2, msg.data) pb.write(2, msg.data)
pb.finish() pb.finish()
pb.buffer pb.buffer
@ -78,7 +83,7 @@ type
Sending, Closing, Closed Sending, Closing, Closed
WebRtcStream = ref object of Connection WebRtcStream = ref object of Connection
dataChannel: DataChannel dataChannel: DataChannelStream
sendQueue: seq[(seq[byte], Future[void])] sendQueue: seq[(seq[byte], Future[void])]
sendLoop: Future[void] sendLoop: Future[void]
readData: seq[byte] readData: seq[byte]
@ -87,33 +92,34 @@ type
proc new( proc new(
_: type WebRtcStream, _: type WebRtcStream,
dataChannel: DataChannel, dataChannel: DataChannelStream,
oaddr: Opt[MultiAddress], oaddr: Opt[MultiAddress],
peerId: PeerId): WebRtcStream = peerId: PeerId): WebRtcStream =
let stream = WebRtcStream(stream: stream, observedAddr: oaddr, peerId: peerId) let stream = WebRtcStream(dataChannel: dataChannel, observedAddr: oaddr, peerId: peerId)
procCall Connection(stream).initStream() procCall Connection(stream).initStream()
stream stream
proc sender(s: WebRtcConnection) {.async.} = proc sender(s: WebRtcStream) {.async.} =
while s.sendQueue.len > 0: while s.sendQueue.len > 0:
let (message, fut) = s.sendQueue.pop() let (message, fut) = s.sendQueue.pop()
#TODO handle exceptions #TODO handle exceptions
await s.dataChannel.write(message) await s.dataChannel.write(message)
if not fut.isNil: fut.complete() if not fut.isNil: fut.complete()
proc send(s: WebRtcConnection, msg: WebRtcMessage, fut: Future[void] = nil) = proc send(s: WebRtcStream, msg: WebRtcMessage, fut: Future[void] = nil) =
let wrappedMessage = msg.encode() let wrappedMessage = msg.encode()
s.sendQueue.insert((wrappedMessage, fut)) s.sendQueue.insert((wrappedMessage, fut))
if s.sendLoop == nil or s.sendLoop.finished: if s.sendLoop == nil or s.sendLoop.finished:
s.sendLoop = s.sender() s.sendLoop = s.sender()
method write*(s: WebRtcConnection, msg: seq[byte]): Future[void] = method write*(s: WebRtcStream, msg2: seq[byte]): Future[void] =
# We need to make sure we send all of our data before another write # We need to make sure we send all of our data before another write
# Otherwise, two concurrent writes could get intertwined # Otherwise, two concurrent writes could get intertwined
# We avoid this by filling the s.sendQueue synchronously # We avoid this by filling the s.sendQueue synchronously
let retFuture = newFuture[void]("WebRtcConnection.write") var msg = msg2
let retFuture = newFuture[void]("WebRtcStream.write")
if s.txState != Sending: if s.txState != Sending:
retFuture.fail(newLPStreamClosedError()) retFuture.fail(newLPStreamClosedError())
return retFuture return retFuture
@ -132,25 +138,25 @@ method write*(s: WebRtcConnection, msg: seq[byte]): Future[void] =
return retFuture return retFuture
proc actuallyClose(s: WebRtcConnection) {.async.} = proc actuallyClose(s: WebRtcStream) {.async.} =
if s.rxState == Closed and s.txState == Closed and s.readData.len == 0: if s.rxState == Closed and s.txState == Closed and s.readData.len == 0:
await s.conn.close() #TODO add support to DataChannel
#await s.dataChannel.close()
await procCall Connection(s).closeImpl() await procCall Connection(s).closeImpl()
method readOnce*(s: WebRtcConnection, pbytes: pointer, nbytes: int): Future[int] {.async.} = method readOnce*(s: WebRtcStream, pbytes: pointer, nbytes: int): Future[int] {.async.} =
if s.atEof: if s.rxState == Closed:
raise newLPStreamEOFError() raise newLPStreamEOFError()
while s.readData.len == 0: while s.readData.len == 0:
if s.rxState == Closed: if s.rxState == Closed:
s.atEof = true
await s.actuallyClose() await s.actuallyClose()
return 0 return 0
let let
#TODO handle exceptions #TODO handle exceptions
message = await s.conn.read() message = await s.dataChannel.read()
decoded = WebRtcMessage.decode(message) decoded = WebRtcMessage.decode(message).tryGet()
decoded.flag.withValue(flag): decoded.flag.withValue(flag):
case flag: case flag:
@ -161,14 +167,15 @@ method readOnce*(s: WebRtcConnection, pbytes: pointer, nbytes: int): Future[int]
of FinAck: of FinAck:
s.txState = Closed s.txState = Closed
await s.actuallyClose() await s.actuallyClose()
else: discard
s.readData = decoded.data s.readData = decoded.data
result = min(nbytes, s.readData.len) result = min(nbytes, s.readData.len)
copyMem(pbytes, addr s.readData[0], toCopy) copyMem(pbytes, addr s.readData[0], result)
stream.cached = stream.cached[result..^1] s.readData = s.readData[result..^1]
method closeImpl*(s: WebRtcConnection) {.async.} = method closeImpl*(s: WebRtcStream) {.async.} =
s.send(WebRtcMessage(flag: Opt.some(Fin))) s.send(WebRtcMessage(flag: Opt.some(Fin)))
s.txState = Closing s.txState = Closing
await s.join() #TODO ?? await s.join() #TODO ??
@ -181,14 +188,23 @@ method close*(conn: WebRtcConnection) {.async.} =
#TODO #TODO
discard discard
proc new(
_: type WebRtcConnection,
conn: DataChannelConnection,
observedAddr: Opt[MultiAddress]
): WebRtcConnection =
let co = WebRtcConnection(connection: conn, observedAddr: observedAddr)
procCall Connection(co).initStream()
co
proc getStream*(conn: WebRtcConnection, proc getStream*(conn: WebRtcConnection,
direction: Direction): Future[WebRtcStream] {.async.} = direction: Direction): Future[WebRtcStream] {.async.} =
var datachannel = var datachannel =
case direction: case direction:
of Direction.In: of Direction.In:
await conn.connection.incomingStream() await conn.connection.accept()
of Direction.Out: of Direction.Out:
await conn.connection.openStream() await conn.connection.openStream(0) #TODO don't hardcode stream id (should be in nim-webrtc)
return WebRtcStream.new(datachannel, conn.observedAddr, conn.peerId) return WebRtcStream.new(datachannel, conn.observedAddr, conn.peerId)
# -- Muxer -- # -- Muxer --
@ -208,9 +224,12 @@ proc handleStream(m: WebRtcMuxer, chann: WebRtcStream) {.async.} =
trace "Exception in mplex stream handler", msg = exc.msg trace "Exception in mplex stream handler", msg = exc.msg
await chann.close() await chann.close()
#TODO add atEof
method handle*(m: WebRtcMuxer): Future[void] {.async, gcsafe.} = method handle*(m: WebRtcMuxer): Future[void] {.async, gcsafe.} =
try: try:
while not m.webRtcConn.atEof: #while not m.webRtcConn.atEof:
while true:
let incomingStream = await m.webRtcConn.getStream(Direction.In) let incomingStream = await m.webRtcConn.getStream(Direction.In)
asyncSpawn m.handleStream(incomingStream) asyncSpawn m.handleStream(incomingStream)
finally: finally:
@ -237,11 +256,11 @@ method upgrade*(
assert noiseHandler.len > 0 assert noiseHandler.len > 0
let let
stream = webRtcConn.getStream(Out) #TODO add channelId: 0 stream = await webRtcConn.getStream(Out) #TODO add channelId: 0
secureStream = noiseHandler[0].handshake( secureStream = await noiseHandler[0].handshake(
stream, stream,
initiator: true, # we are always the initiator in webrtc-direct initiator = true, # we are always the initiator in webrtc-direct
peerId: peerId peerId = peerId
#TODO: add prelude data #TODO: add prelude data
) )
@ -254,6 +273,7 @@ type
WebRtcTransport* = ref object of Transport WebRtcTransport* = ref object of Transport
connectionsTimeout: Duration connectionsTimeout: Duration
servers: seq[WebRtc] servers: seq[WebRtc]
acceptFuts: seq[Future[DataChannelConnection]]
clients: array[Direction, seq[DataChannelConnection]] clients: array[Direction, seq[DataChannelConnection]]
WebRtcTransportTracker* = ref object of TrackerBase WebRtcTransportTracker* = ref object of TrackerBase
@ -318,14 +338,17 @@ method start*(
continue continue
let let
transportAddress = initTAddress(ma) transportAddress = initTAddress(ma[0..1].tryGet()).tryGet()
server = WebRtc.new(transportAddress) server = WebRtc.new(transportAddress)
server.start() server.listen()
self.servers &= server self.servers &= server
self.addrs[i] = MultiAddress.init(server.getLocalAddress(), IPPROTO_UDP).tryGet() let
#TODO add webrtc-direct & certhash cert = server.dtls.localCertificate
certHash = MultiHash.digest("sha2-256", cert).get().data.buffer
encodedCertHash = MultiBase.encode("base64", certHash).get()
self.addrs[i] = (MultiAddress.init(server.udp.laddr, IPPROTO_UDP).tryGet() & MultiAddress.init(multiCodec("webrtc-direct")).tryGet() & MultiAddress.init(multiCodec("cert-hash"), encodedCertHash).tryGet()).tryGet()
trace "Listening on", address = self.addrs[i] trace "Listening on", address = self.addrs[i]
@ -341,15 +364,15 @@ proc connHandler(self: WebRtcTransport,
let conn: Connection = let conn: Connection =
WebRtcConnection.new( WebRtcConnection.new(
client = client, conn = client,
dir = dir, # dir = dir,
observedAddr = observedAddr, observedAddr = observedAddr
timeout = self.connectionsTimeout # timeout = self.connectionsTimeout
) )
proc onClose() {.async.} = proc onClose() {.async.} =
try: try:
let futs = @[client.join(), conn.join()] let futs = @[conn.join(), conn.join()] #TODO that's stupid
await futs[0] or futs[1] await futs[0] or futs[1]
for f in futs: for f in futs:
if not f.finished: await f.cancelAndWait() # cancel outstanding join() if not f.finished: await f.cancelAndWait() # cancel outstanding join()
@ -358,8 +381,9 @@ proc connHandler(self: WebRtcTransport,
conn conn
self.clients[dir].keepItIf( it != client ) self.clients[dir].keepItIf( it != client )
await allFuturesThrowing( #TODO
conn.close(), client.closeWait()) #await allFuturesThrowing(
# conn.close(), client.closeWait())
trace "Cleaned up client", addrs = $client.remoteAddress, trace "Cleaned up client", addrs = $client.remoteAddress,
conn conn
@ -392,14 +416,18 @@ method accept*(self: WebRtcTransport): Future[Connection] {.async, gcsafe.} =
let transp = await finished let transp = await finished
try: try:
let observedAddr = MultiAddress.init(transp.remoteAddress).tryGet() #TODO add remoteAddress to DataChannelConnection
#let observedAddr = MultiAddress.init(transp.remoteAddress).tryGet() #TODO add /webrtc-direct
let observedAddr = MultiAddress.init("/ip4/127.0.0.1").tryGet()
return await self.connHandler(transp, Opt.some(observedAddr), Direction.In) return await self.connHandler(transp, Opt.some(observedAddr), Direction.In)
except CancelledError as exc: except CancelledError as exc:
transp.close() #TODO
#transp.close()
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:
debug "Failed to handle connection", exc = exc.msg debug "Failed to handle connection", exc = exc.msg
transp.close() #TODO
#transp.close()
method handles*(t: WebRtcTransport, address: MultiAddress): bool {.gcsafe.} = method handles*(t: WebRtcTransport, address: MultiAddress): bool {.gcsafe.} =
if procCall Transport(t).handles(address): if procCall Transport(t).handles(address):

16
testwebrtc.nim Normal file
View File

@ -0,0 +1,16 @@
import chronos, libp2p, libp2p/transports/webrtctransport
proc main {.async.} =
let switch =
SwitchBuilder.new()
.withAddress(MultiAddress.init("/ip4/127.0.0.1/udp/4242/webrtc-direct/certhash/uEiDDq4_xNyDorZBH3TlGazyJdOWSwvo4PUo5YHFMrvDE8g").tryGet()) #TODO the certhash shouldn't be necessary
.withRng(crypto.newRng())
.withMplex()
.withTransport(proc (upgr: Upgrade): Transport = WebRtcTransport.new(upgr))
.withNoise()
.build()
await switch.start()
await sleepAsync(1.hours)
waitFor main()