From 255bf740ea2fd73d8545f13fe56f8a8caa33f64b Mon Sep 17 00:00:00 2001 From: Ludovic Chenut Date: Fri, 8 Nov 2024 15:06:56 +0100 Subject: [PATCH] feat: webrtc-direct end to end --- .pinned | 6 +- libp2p/multiaddress.nim | 108 +++------------------ libp2p/multicodec.nim | 1 + libp2p/transports/webrtctransport.nim | 129 +++++++++++++++----------- testwebrtc.nim | 4 +- 5 files changed, 93 insertions(+), 155 deletions(-) diff --git a/.pinned b/.pinned index 2a609c4b8..e5cd3f3f4 100644 --- a/.pinned +++ b/.pinned @@ -1,11 +1,12 @@ bearssl;https://github.com/status-im/nim-bearssl@#667b40440a53a58e9f922e29e20818720c62d9ac +binary_serialization;https://github.com/status-im/nim-binary-serialization@#38a73a70fd43f3835ca01a877353858b19e39d70 chronicles;https://github.com/status-im/nim-chronicles@#32ac8679680ea699f7dbc046e8e0131cac97d41a chronos;https://github.com/status-im/nim-chronos@#c04576d829b8a0a1b12baaa8bc92037501b3a4a0 dnsclient;https://github.com/ba0f3/dnsclient.nim@#23214235d4784d24aceed99bbfe153379ea557c8 faststreams;https://github.com/status-im/nim-faststreams@#720fc5e5c8e428d9d0af618e1e27c44b42350309 httputils;https://github.com/status-im/nim-http-utils@#87b7cbf032c90b9e6b446081f4a647e950362cec json_serialization;https://github.com/status-im/nim-json-serialization@#85b7ea093cb85ee4f433a617b97571bd709d30df -mbedtls;https://github.com/status-im/nim-mbedtls.git@#308f3edaa0edcc880b54ce22156fb2f4e2a2bcc7 +mbedtls;https://github.com/status-im/nim-mbedtls.git@#740fb2f469511adc1772c5cb32395f4076b9e0c5 metrics;https://github.com/status-im/nim-metrics@#6142e433fc8ea9b73379770a788017ac528d46ff ngtcp2;https://github.com/status-im/nim-ngtcp2@#6834f4756b6af58356ac9c4fef3d71db3c3ae5fe nimcrypto;https://github.com/cheatfate/nimcrypto@#1c8d6e3caf3abc572136ae9a1da81730c4eb4288 @@ -16,6 +17,7 @@ serialization;https://github.com/status-im/nim-serialization@#4bdbc29e54fe540499 stew;https://github.com/status-im/nim-stew@#3159137d9a3110edb4024145ce0ba778975de40e testutils;https://github.com/status-im/nim-testutils@#dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34 unittest2;https://github.com/status-im/nim-unittest2@#2300fa9924a76e6c96bc4ea79d043e3a0f27120c -webrtc;https://github.com/status-im/nim-webrtc.git@#465992489c8c31803ec790f8ff6638ee64f924bc +usrsctp;https://github.com/status-im/nim-usrsctp@#c6a8d4bab44447df790e97dfc8099f7af93d435e +webrtc;https://github.com/status-im/nim-webrtc.git@#10d2389ab4d314ababe9544f71e462497d8c8dea websock;https://github.com/status-im/nim-websock@#f8ed9b40a5ff27ad02a3c237c4905b0924e3f982 zlib;https://github.com/status-im/nim-zlib@#a2f44bb7f65571a894227ff6fde9298a104e03a5 diff --git a/libp2p/multiaddress.nim b/libp2p/multiaddress.nim index 55533b30d..f30df8530 100644 --- a/libp2p/multiaddress.nim +++ b/libp2p/multiaddress.nim @@ -382,6 +382,11 @@ const ) TranscoderDNS* = Transcoder(stringToBuffer: dnsStB, bufferToString: dnsBtS, validateBuffer: dnsVB) + TranscoderCertHash* = Transcoder( + stringToBuffer: certHashStB, + bufferToString: certHashBtS, + validateBuffer: certHashVB + ) ProtocolsList = [ MAProtocol(mcodec: multiCodec("ip4"), kind: Fixed, size: 4, coder: TranscoderIP4), MAProtocol(mcodec: multiCodec("tcp"), kind: Fixed, size: 2, coder: TranscoderPort), @@ -416,105 +421,13 @@ const MAProtocol( mcodec: multiCodec("dnsaddr"), kind: Length, size: 0, coder: TranscoderDNS ), -<<<<<<< HEAD - MAProtocol( - mcodec: multiCodec("dccp"), kind: Fixed, size: 2, - coder: TranscoderPort - ), - MAProtocol( - mcodec: multiCodec("sctp"), kind: Fixed, size: 2, - coder: TranscoderPort - ), - MAProtocol( - mcodec: multiCodec("udt"), kind: Marker, size: 0 - ), - MAProtocol( - mcodec: multiCodec("utp"), kind: Marker, size: 0 - ), - MAProtocol( - mcodec: multiCodec("http"), kind: Marker, size: 0 - ), - MAProtocol( - mcodec: multiCodec("https"), kind: Marker, size: 0 - ), - MAProtocol( - mcodec: multiCodec("quic"), kind: Marker, size: 0 - ), - MAProtocol( - mcodec: multiCodec("ip6zone"), kind: Length, size: 0, - coder: TranscoderIP6Zone - ), - MAProtocol( - mcodec: multiCodec("onion"), kind: Fixed, size: 10, - coder: TranscoderOnion - ), - MAProtocol( - mcodec: multiCodec("onion3"), kind: Fixed, size: 37, - coder: TranscoderOnion3 - ), - MAProtocol( - mcodec: multiCodec("ws"), kind: Marker, size: 0 - ), - MAProtocol( - mcodec: multiCodec("wss"), kind: Marker, size: 0 - ), - MAProtocol( - mcodec: multiCodec("tls"), kind: Marker, size: 0 - ), - MAProtocol( - mcodec: multiCodec("ipfs"), kind: Length, size: 0, - coder: TranscoderP2P - ), - MAProtocol( - mcodec: multiCodec("p2p"), kind: Length, size: 0, - coder: TranscoderP2P - ), - MAProtocol( - mcodec: multiCodec("unix"), kind: Path, size: 0, - coder: TranscoderUnix - ), - MAProtocol( - mcodec: multiCodec("dns"), kind: Length, size: 0, - coder: TranscoderDNS - ), - MAProtocol( - mcodec: multiCodec("dns4"), kind: Length, size: 0, - coder: TranscoderDNS - ), - MAProtocol( - mcodec: multiCodec("dns6"), kind: Length, size: 0, - coder: TranscoderDNS - ), - MAProtocol( - mcodec: multiCodec("dnsaddr"), kind: Length, size: 0, - coder: TranscoderDNS - ), - MAProtocol( - mcodec: multiCodec("p2p-circuit"), kind: Marker, size: 0 - ), - MAProtocol( - mcodec: multiCodec("p2p-websocket-star"), kind: Marker, size: 0 - ), - MAProtocol( - mcodec: multiCodec("p2p-webrtc-star"), kind: Marker, size: 0 - ), - MAProtocol( - mcodec: multiCodec("p2p-webrtc-direct"), kind: Marker, size: 0 - ), - MAProtocol( - mcodec: multiCodec("webrtc"), kind: Marker, size: 0 - ), - MAProtocol( - mcodec: multiCodec("webrtc-direct"), kind: Marker, size: 0 - ), - MAProtocol( - mcodec: multiCodec("certhash"), kind: Length, size: 0, - coder: TranscoderCertHash - ), MAProtocol(mcodec: multiCodec("p2p-circuit"), kind: Marker, size: 0), MAProtocol(mcodec: multiCodec("p2p-websocket-star"), kind: Marker, size: 0), MAProtocol(mcodec: multiCodec("p2p-webrtc-star"), kind: Marker, size: 0), MAProtocol(mcodec: multiCodec("p2p-webrtc-direct"), kind: Marker, size: 0), + MAProtocol(mcodec: multiCodec("webrtc"), kind: Marker, size: 0), + MAProtocol(mcodec: multiCodec("webrtc-direct"), kind: Marker, size: 0), + MAProtocol(mcodec: multiCodec("certhash"), kind: Length, size: 0, coder: TranscoderCertHash), ] DNSANY* = mapEq("dns") @@ -550,7 +463,6 @@ const WebSockets_DNS* = mapOr(WS_DNS, WSS_DNS) WebSockets_IP* = mapOr(WS_IP, WSS_IP) WebSockets* = mapOr(WS, WSS) - WebRtcDirect2* = mapAnd(UDP, mapEq("webrtc-direct"), mapEq("certhash")) Onion3* = mapEq("onion3") TcpOnion3* = mapAnd(TCP, Onion3) @@ -570,10 +482,12 @@ const mapAnd(TCP, mapEq("https")), mapAnd(IP, mapEq("https")), mapAnd(DNS, mapEq("https")) ) - WebRTCDirect* = mapOr( + WebRTCDirect* {.deprecated.} = mapOr( mapAnd(HTTP, mapEq("p2p-webrtc-direct")), mapAnd(HTTPS, mapEq("p2p-webrtc-direct")) ) + WebRTCDirect2* = mapAnd(UDP, mapEq("webrtc-direct"), mapEq("certhash")) + CircuitRelay* = mapEq("p2p-circuit") proc initMultiAddressCodeTable(): Table[MultiCodec, MAProtocol] {.compileTime.} = diff --git a/libp2p/multicodec.nim b/libp2p/multicodec.nim index 613b0cc6a..81d9913c8 100644 --- a/libp2p/multicodec.nim +++ b/libp2p/multicodec.nim @@ -387,6 +387,7 @@ const MultiCodecList = [ ("tls", 0x01C0), ("quic", 0x01CC), ("quic-v1", 0x01CD), + ("certhash", 0x01D2), ("ws", 0x01DD), ("wss", 0x01DE), ("p2p-websocket-star", 0x01DF), # not in multicodec list diff --git a/libp2p/transports/webrtctransport.nim b/libp2p/transports/webrtctransport.nim index b26be1c97..c4907a053 100644 --- a/libp2p/transports/webrtctransport.nim +++ b/libp2p/transports/webrtctransport.nim @@ -30,7 +30,7 @@ import transport, ../protocols/secure/noise, ../utility -import webrtc/webrtc, webrtc/datachannel, webrtc/dtls/dtls +import webrtc/webrtc, webrtc/datachannel, webrtc/dtls/dtls_transport, webrtc/errors logScope: topics = "libp2p webrtctransport" @@ -113,22 +113,28 @@ type proc new(_: type RawWebRtcStream, dataChannel: DataChannelStream): RawWebRtcStream = RawWebRtcStream(dataChannel: dataChannel) -method closeImpl*(s: RawWebRtcStream): Future[void] = +method closeImpl*(s: RawWebRtcStream): Future[void] {.async: (raises: []).} = # TODO: close datachannel discard -method write*(s: RawWebRtcStream, msg: seq[byte]): Future[void] = +method write*(s: RawWebRtcStream, msg: seq[byte]): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} = trace "RawWebrtcStream write", msg, len=msg.len() - s.dataChannel.write(msg) + try: + await s.dataChannel.write(msg) + except WebRtcError as exc: + raise newException(LPStreamError, exc.msg, exc) -method readOnce*(s: RawWebRtcStream, pbytes: pointer, nbytes: int): Future[int] {.async.} = +method readOnce*(s: RawWebRtcStream, pbytes: pointer, nbytes: int): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} = # TODO: # if s.isClosed: # raise newLPStreamEOFError() if s.readData.len() == 0: - let rawData = await s.dataChannel.read() - s.readData = rawData + try: + let rawData = await s.dataChannel.read() + s.readData = rawData + except WebRtcError as exc: + raise newException(LPStreamError, exc.msg, exc) trace "readOnce RawWebRtcStream", data = s.readData, nbytes result = min(nbytes, s.readData.len) @@ -143,7 +149,7 @@ type WebRtcStream = ref object of Connection rawStream: RawWebRtcStream - sendQueue: seq[(seq[byte], Future[void])] + sendQueue: seq[(seq[byte], Future[void].Raising([CancelledError, LPStreamError]))] sendLoop: Future[void] readData: seq[byte] txState: WebRtcState # Transmission @@ -159,30 +165,29 @@ proc new( procCall Connection(stream).initStream() stream -proc sender(s: WebRtcStream) {.async.} = +proc sender(s: WebRtcStream) {.async: (raises: [CancelledError, LPStreamError]).} = while s.sendQueue.len > 0: let (message, fut) = s.sendQueue.pop() #TODO handle exceptions await s.rawStream.writeLp(message) if not fut.isNil: fut.complete() -proc send(s: WebRtcStream, msg: WebRtcMessage, fut: Future[void] = nil) = +proc send(s: WebRtcStream, msg: WebRtcMessage, fut: Future[void].Raising([CancelledError, LPStreamError]) = nil) = let wrappedMessage = msg.encode() s.sendQueue.insert((wrappedMessage, fut)) if s.sendLoop == nil or s.sendLoop.finished: s.sendLoop = s.sender() -method write*(s: WebRtcStream, msg2: seq[byte]): Future[void] = +method write*(s: WebRtcStream, msg2: seq[byte]): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} = # We need to make sure we send all of our data before another write # Otherwise, two concurrent writes could get intertwined # We avoid this by filling the s.sendQueue synchronously var msg = msg2 trace "WebrtcStream write", msg, len=msg.len() - let retFuture = newFuture[void]("WebRtcStream.write") + var retFuture = Future[void].Raising([CancelledError, LPStreamError]).init("WebRtcStream.write") if s.txState != Sending: - retFuture.fail(newLPStreamClosedError()) - return retFuture + raise newException(LPStreamClosedError, "whatever") var messages: seq[seq[byte]] while msg.len > MaxMessageSize - 16: @@ -196,16 +201,16 @@ method write*(s: WebRtcStream, msg2: seq[byte]): Future[void] = wrappedMessage = WebRtcMessage(data: msg) s.send(wrappedMessage, retFuture) - return retFuture + await retFuture -proc actuallyClose(s: WebRtcStream) {.async.} = +proc actuallyClose(s: WebRtcStream) {.async: (raises: [CancelledError, LPStreamError]).} = debug "stream closed", rxState=s.rxState, txState=s.txState if s.rxState == Closed and s.txState == Closed and s.readData.len == 0: #TODO add support to DataChannel #await s.dataChannel.close() await procCall Connection(s).closeImpl() -method readOnce*(s: WebRtcStream, pbytes: pointer, nbytes: int): Future[int] {.async.} = +method readOnce*(s: WebRtcStream, pbytes: pointer, nbytes: int): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} = if s.rxState == Closed: raise newLPStreamEOFError() @@ -216,35 +221,44 @@ method readOnce*(s: WebRtcStream, pbytes: pointer, nbytes: int): Future[int] {.a await s.actuallyClose() return 0 - let - #TODO handle exceptions - message = await s.rawStream.readLp(MaxMessageSize) - decoded = WebRtcMessage.decode(message).tryGet() + try: + let + #TODO handle exceptions + message = await s.rawStream.readLp(MaxMessageSize) + decoded = WebRtcMessage.decode(message).tryGet() - s.readData = s.readData.concat(decoded.data) + s.readData = s.readData.concat(decoded.data) + + decoded.flag.withValue(flag): + case flag: + of Fin: + # Peer won't send any more data + s.rxState = Closed + s.send(WebRtcMessage(flag: Opt.some(FinAck))) + of FinAck: + s.txState = Closed + await s.actuallyClose() + if nbytes == 0: + return 0 + else: discard + except CatchableError as exc: + raise newException(LPStreamError, exc.msg, exc) - decoded.flag.withValue(flag): - case flag: - of Fin: - # Peer won't send any more data - s.rxState = Closed - s.send(WebRtcMessage(flag: Opt.some(FinAck))) - of FinAck: - s.txState = Closed - await s.actuallyClose() - if nbytes == 0: - return 0 - else: discard result = min(nbytes, s.readData.len) copyMem(pbytes, addr s.readData[0], result) s.readData = s.readData[result..^1] -method closeImpl*(s: WebRtcStream) {.async.} = +method closeImpl*(s: WebRtcStream) {.async: (raises: []).} = s.send(WebRtcMessage(flag: Opt.some(Fin))) s.txState = Closing while s.txState != Closed: - discard await s.readOnce(nil, 0) + try: + discard await s.readOnce(nil, 0) + except CatchableError as exc: + discard + except CancelledError as exc: + discard # -- Connection -- @@ -252,7 +266,7 @@ type WebRtcConnection = ref object of Connection connection: DataChannelConnection remoteAddress: MultiAddress -method close*(conn: WebRtcConnection) {.async.} = +method close*(conn: WebRtcConnection) {.async: (raises: []).} = #TODO discard @@ -267,13 +281,16 @@ proc new( proc getStream*(conn: WebRtcConnection, direction: Direction, - noiseHandshake: bool = false): Future[WebRtcStream] {.async.} = + noiseHandshake: bool = false): Future[WebRtcStream] {.async: (raises: [CancelledError, LPStreamError]).} = var datachannel = case direction: of Direction.In: await conn.connection.accept() of Direction.Out: - await conn.connection.openStream(noiseHandshake) + try: + await conn.connection.openStream(noiseHandshake) + except WebRtcError as exc: + raise newException(LPStreamError, exc.msg, exc) return WebRtcStream.new(datachannel, conn.observedAddr, conn.peerId) # -- Muxer -- @@ -282,7 +299,7 @@ type WebRtcMuxer = ref object of Muxer webRtcConn: WebRtcConnection handleFut: Future[void] -method newStream*(m: WebRtcMuxer, name: string = "", lazy: bool = false): Future[Connection] {.async, gcsafe.} = +method newStream*(m: WebRtcMuxer, name: string = "", lazy: bool = false): Future[Connection] {.async: (raises: [CancelledError, LPStreamError, MuxerError]).} = return await m.webRtcConn.getStream(Direction.Out) proc handleStream(m: WebRtcMuxer, chann: WebRtcStream) {.async.} = @@ -296,31 +313,34 @@ proc handleStream(m: WebRtcMuxer, chann: WebRtcStream) {.async.} = #TODO add atEof -method handle*(m: WebRtcMuxer): Future[void] {.async, gcsafe.} = +method handle*(m: WebRtcMuxer): Future[void] {.async: (raises: []).} = try: #while not m.webRtcConn.atEof: while true: let incomingStream = await m.webRtcConn.getStream(Direction.In) asyncSpawn m.handleStream(incomingStream) + except CatchableError as exc: + discard + except CancelledError as exc: + discard finally: await m.webRtcConn.close() -method close*(m: WebRtcMuxer) {.async, gcsafe.} = +method close*(m: WebRtcMuxer) {.async: (raises: []).} = m.handleFut.cancel() await m.webRtcConn.close() # -- Upgrader -- type - WebRtcStreamHandler = proc(conn: Connection): Future[void] {.gcsafe, raises: [].} + WebRtcStreamHandler = proc(conn: Connection): Future[void] {.async: (raises: []).} WebRtcUpgrade = ref object of Upgrade streamHandler: WebRtcStreamHandler method upgrade*( self: WebRtcUpgrade, conn: Connection, - direction: Direction, - peerId: Opt[PeerId]): Future[Muxer] {.async.} = + peerId: Opt[PeerId]): Future[Muxer] {.async: (raises: [CancelledError, LPError]).} = let webRtcConn = WebRtcConnection(conn) result = WebRtcMuxer(connection: conn, webRtcConn: webRtcConn) @@ -330,8 +350,8 @@ method upgrade*( assert noiseHandler.len > 0 let xx = "libp2p-webrtc-noise:".toBytes() - let localCert = MultiHash.digest("sha2-256", webRtcConn.connection.conn.conn.localCertificate()).get().data.buffer - let remoteCert = MultiHash.digest("sha2-256", webRtcConn.connection.conn.conn.remoteCertificate()).get().data.buffer + let localCert = MultiHash.digest("sha2-256", webRtcConn.connection.localCertificate()).get().data.buffer + let remoteCert = MultiHash.digest("sha2-256", webRtcConn.connection.remoteCertificate()).get().data.buffer ((Noise)noiseHandler[0]).commonPrologue = xx & remoteCert & localCert echo "=> ", ((Noise)noiseHandler[0]).commonPrologue @@ -356,7 +376,7 @@ type WebRtcTransport* = ref object of Transport connectionsTimeout: Duration servers: seq[WebRtc] - acceptFuts: seq[Future[DataChannelConnection]] + acceptFuts: seq[Future[DataChannelConnection].Raising([CancelledError, WebRtcError])] clients: array[Direction, seq[DataChannelConnection]] WebRtcTransportTracker* = ref object of TrackerBase @@ -395,16 +415,15 @@ proc new*( connectionsTimeout = 10.minutes): T {.public.} = let upgrader = WebRtcUpgrade(ms: upgrade.ms, secureManagers: upgrade.secureManagers) - upgrader.streamHandler = proc(conn: Connection) - {.async, gcsafe, raises: [].} = + upgrader.streamHandler = proc(conn: Connection) {.async: (raises: []).} = # TODO: replace echo by trace and find why it fails compiling echo "Starting stream handler"#, conn try: await upgrader.ms.handle(conn) # handle incoming connection except CancelledError as exc: - raise exc + echo "Stream handler cancelled" except CatchableError as exc: - echo "exception in stream handler", exc.msg#, conn, msg = exc.msg + echo "Exception in stream handler", exc.msg finally: await conn.closeWithEOF() echo "Stream handler done"#, conn @@ -443,10 +462,10 @@ method start*( self.servers &= server let - cert = server.dtls.localCertificate() + cert = server.localCertificate() certHash = MultiHash.digest("sha2-256", cert).get().data.buffer encodedCertHash = MultiBase.encode("base64", certHash).get() - self.addrs[i] = MultiAddress.init(server.udp.laddr, IPPROTO_UDP).tryGet() & + self.addrs[i] = MultiAddress.init(server.localAddress(), IPPROTO_UDP).tryGet() & MultiAddress.init(multiCodec("webrtc-direct")).tryGet() & MultiAddress.init(multiCodec("certhash"), certHash).tryGet() @@ -529,4 +548,4 @@ method accept*(self: WebRtcTransport): Future[Connection] {.async, gcsafe.} = method handles*(t: WebRtcTransport, address: MultiAddress): bool {.gcsafe.} = if procCall Transport(t).handles(address): if address.protocols.isOk: - return WebRtcDirect2.match(address) + return WebRTCDirect2.match(address) diff --git a/testwebrtc.nim b/testwebrtc.nim index 461a910ac..79484bcce 100644 --- a/testwebrtc.nim +++ b/testwebrtc.nim @@ -18,9 +18,11 @@ proc echoHandler(conn: Connection, proto: string) {.async.} = break proc main {.async.} = + let ma = MultiAddress.init("/ip4/127.0.0.1/udp/4242/webrtc-direct/certhash/uEiDDq4_xNyDorZBH3TlGazyJdOWSwvo4PUo5YHFMrvDE8g") + echo ma let switch = SwitchBuilder.new() - .withAddress(MultiAddress.init("/ip4/127.0.0.1/udp/4242/webrtc-direct/certhash/uEiDDq4_xNyDorZBH3TlGazyJdOWSwvo4PUo5YHFMrvDE8g").tryGet()) #TODO the certhash shouldn't be necessary + .withAddress(ma.tryGet()) #TODO the certhash shouldn't be necessary .withRng(crypto.newRng()) .withMplex() .withYamux()