diff --git a/libp2p/transports/webrtctransport.nim b/libp2p/transports/webrtctransport.nim index 28057330f..d5d839204 100644 --- a/libp2p/transports/webrtctransport.nim +++ b/libp2p/transports/webrtctransport.nim @@ -77,6 +77,39 @@ proc encode(msg: WebRtcMessage): seq[byte] = pb.finish() pb.buffer +# -- Raw WebRTC Stream -- + +type + RawWebRtcStream = ref object of Connection + dataChannel: DataChannelStream + readData: seq[byte] + +proc new(_: type RawWebRtcStream, dataChannel: DataChannelStream): RawWebRtcStream = + let stream = RawWebRtcStream(dataChannel: dataChannel) + stream + +method closeImpl*(s: RawWebRtcStream): Future[void] = + # TODO: close datachannel + discard + +method write*(s: RawWebRtcStream, msg: seq[byte]): Future[void] = + trace "RawWebrtcStream write", msg, len=msg.len() + s.dataChannel.write(msg) + +method readOnce*(s: RawWebRtcStream, pbytes: pointer, nbytes: int): Future[int] {.async.} = + # TODO: + # if s.isClosed: + # raise newLPStreamEOFError() + + if s.readData.len() == 0: + let rawData = await s.dataChannel.read() + s.readData = rawData + trace "readOnce RawWebRtcStream", data = s.readData, nbytes + + result = min(nbytes, s.readData.len) + copyMem(pbytes, addr s.readData[0], result) + s.readData = s.readData[result..^1] + # -- Stream -- const MaxMessageSize = 16384 # 16KiB @@ -85,7 +118,7 @@ type Sending, Closing, Closed WebRtcStream = ref object of Connection - dataChannel: DataChannelStream + rawStream: RawWebRtcStream sendQueue: seq[(seq[byte], Future[void])] sendLoop: Future[void] readData: seq[byte] @@ -97,7 +130,8 @@ proc new( dataChannel: DataChannelStream, oaddr: Opt[MultiAddress], peerId: PeerId): WebRtcStream = - let stream = WebRtcStream(dataChannel: dataChannel, observedAddr: oaddr, peerId: peerId) + let stream = WebRtcStream(rawStream: RawWebRtcStream.new(dataChannel), + observedAddr: oaddr, peerId: peerId) procCall Connection(stream).initStream() stream @@ -105,7 +139,7 @@ proc sender(s: WebRtcStream) {.async.} = while s.sendQueue.len > 0: let (message, fut) = s.sendQueue.pop() #TODO handle exceptions - await s.dataChannel.write(message) + await s.rawStream.writeLp(message) if not fut.isNil: fut.complete() proc send(s: WebRtcStream, msg: WebRtcMessage, fut: Future[void] = nil) = @@ -121,6 +155,7 @@ method write*(s: WebRtcStream, msg2: seq[byte]): Future[void] = # We avoid this by filling the s.sendQueue synchronously var msg = msg2 + trace "WebrtcStream write", msg, len=msg.len() let retFuture = newFuture[void]("WebRtcStream.write") if s.txState != Sending: retFuture.fail(newLPStreamClosedError()) @@ -158,7 +193,7 @@ method readOnce*(s: WebRtcStream, pbytes: pointer, nbytes: int): Future[int] {.a let #TODO handle exceptions - message = await s.dataChannel.read() + message = await s.rawStream.readLp(MaxMessageSize) decoded = WebRtcMessage.decode(message).tryGet() decoded.flag.withValue(flag): diff --git a/testwebrtc.nim b/testwebrtc.nim index 17fb6f60a..efedbd56a 100644 --- a/testwebrtc.nim +++ b/testwebrtc.nim @@ -1,4 +1,18 @@ import chronos, libp2p, libp2p/transports/webrtctransport +import stew/byteutils + +proc echoHandler(conn: Connection, proto: string) {.async.} = + defer: await conn.close() + while true: + try: + echo "\e[35;1m => Echo Handler <=\e[0m" + let msg = string.fromBytes(await conn.readLp(1024)) + echo " => Echo Handler Receive: ", msg, " <=" + echo " => Echo Handler Try Send: ", msg & "1", " <=" + await conn.writeLp(msg & "1") + except CatchableError as e: + echo " => Echo Handler Error: ", e.msg, " <=" + break proc main {.async.} = let switch = @@ -10,7 +24,15 @@ proc main {.async.} = .withNoise() .build() + let + codec = "/echo/1.0.0" + proto = new LPProtocol + proto.handler = echoHandler + proto.codec = codec + + switch.mount(proto) await switch.start() + echo "\e[31;1m", $(switch.peerInfo.addrs[0]), "/p2p/", $(switch.peerInfo.peerId), "\e[0m" await sleepAsync(1.hours) waitFor main()