From 30e93e7c0a113630e88ae8352977fc6efd69ef3f Mon Sep 17 00:00:00 2001 From: Tanguy Date: Fri, 13 Oct 2023 18:07:52 +0200 Subject: [PATCH] almost compiling --- .pinned | 4 +- libp2p/transports/webrtctransport.nim | 116 ++++++++++++++++---------- testwebrtc.nim | 16 ++++ 3 files changed, 90 insertions(+), 46 deletions(-) create mode 100644 testwebrtc.nim diff --git a/.pinned b/.pinned index dfa76df1d..bb6c61dd7 100644 --- a/.pinned +++ b/.pinned @@ -15,6 +15,6 @@ serialization;https://github.com/status-im/nim-serialization@#4bdbc29e54fe540499 stew;https://github.com/status-im/nim-stew@#3159137d9a3110edb4024145ce0ba778975de40e testutils;https://github.com/status-im/nim-testutils@#dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34 unittest2;https://github.com/status-im/nim-unittest2@#2300fa9924a76e6c96bc4ea79d043e3a0f27120c -webrtc;https://github.com/status-im/nim-webrtc.git@#6174511f5b8f89152252f39404e8f30e6dcd7c4c +webrtc;https://github.com/status-im/nim-webrtc.git@#7dfb18eefdbd2e722cdec6d6a7b255015365d308 websock;https://github.com/status-im/nim-websock@#f8ed9b40a5ff27ad02a3c237c4905b0924e3f982 -zlib;https://github.com/status-im/nim-zlib@#a2f44bb7f65571a894227ff6fde9298a104e03a5 \ No newline at end of file +zlib;https://github.com/status-im/nim-zlib@#a2f44bb7f65571a894227ff6fde9298a104e03a5 diff --git a/libp2p/transports/webrtctransport.nim b/libp2p/transports/webrtctransport.nim index 033856f53..d06a1795c 100644 --- a/libp2p/transports/webrtctransport.nim +++ b/libp2p/transports/webrtctransport.nim @@ -13,18 +13,23 @@ {.push raises: [].} import std/[sequtils] -import stew/results +import stew/[endians2, byteutils, objects, results] import chronos, chronicles import transport, ../errors, ../wire, ../multicodec, + ../protobuf/minprotobuf, ../connmanager, + ../muxers/muxer, ../multiaddress, ../stream/connection, ../upgrademngrs/upgrade, + ../protocols/secure/noise, ../utility +import webrtc/webrtc, webrtc/datachannel + logScope: topics = "libp2p webrtctransport" @@ -50,22 +55,22 @@ proc decode(_: type WebRtcMessage, bytes: seq[byte]): Opt[WebRtcMessage] = pb = initProtoBuffer(bytes) flagOrd: uint32 res: WebRtcMessage - if ? pb.getField(1, flagOrd): + if ? pb.getField(1, flagOrd).toOpt(): var flag: MessageFlag - if flag.checkEnumAssign(flagOrd): + if flag.checkedEnumAssign(flagOrd): res.flag = Opt.some(flag) - discard ? pb.getField(2, res.data) + discard ? pb.getField(2, res.data).toOpt() Opt.some(res) proc encode(msg: WebRtcMessage): seq[byte] = var pb = initProtoBuffer() msg.flag.withValue(val): - pb.writeField(1, val) + pb.write(1, uint32(val)) if msg.data.len > 0: - pb.writeField(2, msg.data) + pb.write(2, msg.data) pb.finish() pb.buffer @@ -78,7 +83,7 @@ type Sending, Closing, Closed WebRtcStream = ref object of Connection - dataChannel: DataChannel + dataChannel: DataChannelStream sendQueue: seq[(seq[byte], Future[void])] sendLoop: Future[void] readData: seq[byte] @@ -87,33 +92,34 @@ type proc new( _: type WebRtcStream, - dataChannel: DataChannel, + dataChannel: DataChannelStream, oaddr: Opt[MultiAddress], peerId: PeerId): WebRtcStream = - let stream = WebRtcStream(stream: stream, observedAddr: oaddr, peerId: peerId) + let stream = WebRtcStream(dataChannel: dataChannel, observedAddr: oaddr, peerId: peerId) procCall Connection(stream).initStream() stream -proc sender(s: WebRtcConnection) {.async.} = +proc sender(s: WebRtcStream) {.async.} = while s.sendQueue.len > 0: let (message, fut) = s.sendQueue.pop() #TODO handle exceptions await s.dataChannel.write(message) if not fut.isNil: fut.complete() -proc send(s: WebRtcConnection, msg: WebRtcMessage, fut: Future[void] = nil) = +proc send(s: WebRtcStream, msg: WebRtcMessage, fut: Future[void] = nil) = let wrappedMessage = msg.encode() s.sendQueue.insert((wrappedMessage, fut)) if s.sendLoop == nil or s.sendLoop.finished: s.sendLoop = s.sender() -method write*(s: WebRtcConnection, msg: seq[byte]): Future[void] = +method write*(s: WebRtcStream, msg2: seq[byte]): Future[void] = # We need to make sure we send all of our data before another write # Otherwise, two concurrent writes could get intertwined # We avoid this by filling the s.sendQueue synchronously - let retFuture = newFuture[void]("WebRtcConnection.write") + var msg = msg2 + let retFuture = newFuture[void]("WebRtcStream.write") if s.txState != Sending: retFuture.fail(newLPStreamClosedError()) return retFuture @@ -132,25 +138,25 @@ method write*(s: WebRtcConnection, msg: seq[byte]): Future[void] = return retFuture -proc actuallyClose(s: WebRtcConnection) {.async.} = +proc actuallyClose(s: WebRtcStream) {.async.} = if s.rxState == Closed and s.txState == Closed and s.readData.len == 0: - await s.conn.close() + #TODO add support to DataChannel + #await s.dataChannel.close() await procCall Connection(s).closeImpl() -method readOnce*(s: WebRtcConnection, pbytes: pointer, nbytes: int): Future[int] {.async.} = - if s.atEof: +method readOnce*(s: WebRtcStream, pbytes: pointer, nbytes: int): Future[int] {.async.} = + if s.rxState == Closed: raise newLPStreamEOFError() while s.readData.len == 0: if s.rxState == Closed: - s.atEof = true await s.actuallyClose() return 0 let #TODO handle exceptions - message = await s.conn.read() - decoded = WebRtcMessage.decode(message) + message = await s.dataChannel.read() + decoded = WebRtcMessage.decode(message).tryGet() decoded.flag.withValue(flag): case flag: @@ -161,14 +167,15 @@ method readOnce*(s: WebRtcConnection, pbytes: pointer, nbytes: int): Future[int] of FinAck: s.txState = Closed await s.actuallyClose() + else: discard s.readData = decoded.data result = min(nbytes, s.readData.len) - copyMem(pbytes, addr s.readData[0], toCopy) - stream.cached = stream.cached[result..^1] + copyMem(pbytes, addr s.readData[0], result) + s.readData = s.readData[result..^1] -method closeImpl*(s: WebRtcConnection) {.async.} = +method closeImpl*(s: WebRtcStream) {.async.} = s.send(WebRtcMessage(flag: Opt.some(Fin))) s.txState = Closing await s.join() #TODO ?? @@ -181,14 +188,23 @@ method close*(conn: WebRtcConnection) {.async.} = #TODO discard +proc new( + _: type WebRtcConnection, + conn: DataChannelConnection, + observedAddr: Opt[MultiAddress] + ): WebRtcConnection = + let co = WebRtcConnection(connection: conn, observedAddr: observedAddr) + procCall Connection(co).initStream() + co + proc getStream*(conn: WebRtcConnection, direction: Direction): Future[WebRtcStream] {.async.} = var datachannel = case direction: of Direction.In: - await conn.connection.incomingStream() + await conn.connection.accept() of Direction.Out: - await conn.connection.openStream() + await conn.connection.openStream(0) #TODO don't hardcode stream id (should be in nim-webrtc) return WebRtcStream.new(datachannel, conn.observedAddr, conn.peerId) # -- Muxer -- @@ -208,9 +224,12 @@ proc handleStream(m: WebRtcMuxer, chann: WebRtcStream) {.async.} = trace "Exception in mplex stream handler", msg = exc.msg await chann.close() +#TODO add atEof + method handle*(m: WebRtcMuxer): Future[void] {.async, gcsafe.} = try: - while not m.webRtcConn.atEof: + #while not m.webRtcConn.atEof: + while true: let incomingStream = await m.webRtcConn.getStream(Direction.In) asyncSpawn m.handleStream(incomingStream) finally: @@ -237,11 +256,11 @@ method upgrade*( assert noiseHandler.len > 0 let - stream = webRtcConn.getStream(Out) #TODO add channelId: 0 - secureStream = noiseHandler[0].handshake( + stream = await webRtcConn.getStream(Out) #TODO add channelId: 0 + secureStream = await noiseHandler[0].handshake( stream, - initiator: true, # we are always the initiator in webrtc-direct - peerId: peerId + initiator = true, # we are always the initiator in webrtc-direct + peerId = peerId #TODO: add prelude data ) @@ -254,6 +273,7 @@ type WebRtcTransport* = ref object of Transport connectionsTimeout: Duration servers: seq[WebRtc] + acceptFuts: seq[Future[DataChannelConnection]] clients: array[Direction, seq[DataChannelConnection]] WebRtcTransportTracker* = ref object of TrackerBase @@ -318,14 +338,17 @@ method start*( continue let - transportAddress = initTAddress(ma) + transportAddress = initTAddress(ma[0..1].tryGet()).tryGet() server = WebRtc.new(transportAddress) - server.start() + server.listen() self.servers &= server - self.addrs[i] = MultiAddress.init(server.getLocalAddress(), IPPROTO_UDP).tryGet() - #TODO add webrtc-direct & certhash + let + cert = server.dtls.localCertificate + certHash = MultiHash.digest("sha2-256", cert).get().data.buffer + encodedCertHash = MultiBase.encode("base64", certHash).get() + self.addrs[i] = (MultiAddress.init(server.udp.laddr, IPPROTO_UDP).tryGet() & MultiAddress.init(multiCodec("webrtc-direct")).tryGet() & MultiAddress.init(multiCodec("cert-hash"), encodedCertHash).tryGet()).tryGet() trace "Listening on", address = self.addrs[i] @@ -341,15 +364,15 @@ proc connHandler(self: WebRtcTransport, let conn: Connection = WebRtcConnection.new( - client = client, - dir = dir, - observedAddr = observedAddr, - timeout = self.connectionsTimeout + conn = client, + # dir = dir, + observedAddr = observedAddr + # timeout = self.connectionsTimeout ) proc onClose() {.async.} = try: - let futs = @[client.join(), conn.join()] + let futs = @[conn.join(), conn.join()] #TODO that's stupid await futs[0] or futs[1] for f in futs: if not f.finished: await f.cancelAndWait() # cancel outstanding join() @@ -358,8 +381,9 @@ proc connHandler(self: WebRtcTransport, conn self.clients[dir].keepItIf( it != client ) - await allFuturesThrowing( - conn.close(), client.closeWait()) + #TODO + #await allFuturesThrowing( + # conn.close(), client.closeWait()) trace "Cleaned up client", addrs = $client.remoteAddress, conn @@ -392,14 +416,18 @@ method accept*(self: WebRtcTransport): Future[Connection] {.async, gcsafe.} = let transp = await finished try: - let observedAddr = MultiAddress.init(transp.remoteAddress).tryGet() + #TODO add remoteAddress to DataChannelConnection + #let observedAddr = MultiAddress.init(transp.remoteAddress).tryGet() #TODO add /webrtc-direct + let observedAddr = MultiAddress.init("/ip4/127.0.0.1").tryGet() return await self.connHandler(transp, Opt.some(observedAddr), Direction.In) except CancelledError as exc: - transp.close() + #TODO + #transp.close() raise exc except CatchableError as exc: debug "Failed to handle connection", exc = exc.msg - transp.close() + #TODO + #transp.close() method handles*(t: WebRtcTransport, address: MultiAddress): bool {.gcsafe.} = if procCall Transport(t).handles(address): diff --git a/testwebrtc.nim b/testwebrtc.nim new file mode 100644 index 000000000..17fb6f60a --- /dev/null +++ b/testwebrtc.nim @@ -0,0 +1,16 @@ +import chronos, libp2p, libp2p/transports/webrtctransport + +proc main {.async.} = + let switch = + SwitchBuilder.new() + .withAddress(MultiAddress.init("/ip4/127.0.0.1/udp/4242/webrtc-direct/certhash/uEiDDq4_xNyDorZBH3TlGazyJdOWSwvo4PUo5YHFMrvDE8g").tryGet()) #TODO the certhash shouldn't be necessary + .withRng(crypto.newRng()) + .withMplex() + .withTransport(proc (upgr: Upgrade): Transport = WebRtcTransport.new(upgr)) + .withNoise() + .build() + + await switch.start() + await sleepAsync(1.hours) + +waitFor main()