fix webrtcstream

This commit is contained in:
Ludovic Chenut 2024-02-05 17:44:35 +01:00
parent 60d48e644b
commit 03ff023e94
No known key found for this signature in database
GPG Key ID: D9A59B1907F1D50C
2 changed files with 61 additions and 4 deletions

View File

@ -77,6 +77,39 @@ proc encode(msg: WebRtcMessage): seq[byte] =
pb.finish()
pb.buffer
# -- Raw WebRTC Stream --
type
RawWebRtcStream = ref object of Connection
dataChannel: DataChannelStream
readData: seq[byte]
proc new(_: type RawWebRtcStream, dataChannel: DataChannelStream): RawWebRtcStream =
let stream = RawWebRtcStream(dataChannel: dataChannel)
stream
method closeImpl*(s: RawWebRtcStream): Future[void] =
# TODO: close datachannel
discard
method write*(s: RawWebRtcStream, msg: seq[byte]): Future[void] =
trace "RawWebrtcStream write", msg, len=msg.len()
s.dataChannel.write(msg)
method readOnce*(s: RawWebRtcStream, pbytes: pointer, nbytes: int): Future[int] {.async.} =
# TODO:
# if s.isClosed:
# raise newLPStreamEOFError()
if s.readData.len() == 0:
let rawData = await s.dataChannel.read()
s.readData = rawData
trace "readOnce RawWebRtcStream", data = s.readData, nbytes
result = min(nbytes, s.readData.len)
copyMem(pbytes, addr s.readData[0], result)
s.readData = s.readData[result..^1]
# -- Stream --
const MaxMessageSize = 16384 # 16KiB
@ -85,7 +118,7 @@ type
Sending, Closing, Closed
WebRtcStream = ref object of Connection
dataChannel: DataChannelStream
rawStream: RawWebRtcStream
sendQueue: seq[(seq[byte], Future[void])]
sendLoop: Future[void]
readData: seq[byte]
@ -97,7 +130,8 @@ proc new(
dataChannel: DataChannelStream,
oaddr: Opt[MultiAddress],
peerId: PeerId): WebRtcStream =
let stream = WebRtcStream(dataChannel: dataChannel, observedAddr: oaddr, peerId: peerId)
let stream = WebRtcStream(rawStream: RawWebRtcStream.new(dataChannel),
observedAddr: oaddr, peerId: peerId)
procCall Connection(stream).initStream()
stream
@ -105,7 +139,7 @@ proc sender(s: WebRtcStream) {.async.} =
while s.sendQueue.len > 0:
let (message, fut) = s.sendQueue.pop()
#TODO handle exceptions
await s.dataChannel.write(message)
await s.rawStream.writeLp(message)
if not fut.isNil: fut.complete()
proc send(s: WebRtcStream, msg: WebRtcMessage, fut: Future[void] = nil) =
@ -121,6 +155,7 @@ method write*(s: WebRtcStream, msg2: seq[byte]): Future[void] =
# We avoid this by filling the s.sendQueue synchronously
var msg = msg2
trace "WebrtcStream write", msg, len=msg.len()
let retFuture = newFuture[void]("WebRtcStream.write")
if s.txState != Sending:
retFuture.fail(newLPStreamClosedError())
@ -158,7 +193,7 @@ method readOnce*(s: WebRtcStream, pbytes: pointer, nbytes: int): Future[int] {.a
let
#TODO handle exceptions
message = await s.dataChannel.read()
message = await s.rawStream.readLp(MaxMessageSize)
decoded = WebRtcMessage.decode(message).tryGet()
decoded.flag.withValue(flag):

View File

@ -1,4 +1,18 @@
import chronos, libp2p, libp2p/transports/webrtctransport
import stew/byteutils
proc echoHandler(conn: Connection, proto: string) {.async.} =
defer: await conn.close()
while true:
try:
echo "\e[35;1m => Echo Handler <=\e[0m"
let msg = string.fromBytes(await conn.readLp(1024))
echo " => Echo Handler Receive: ", msg, " <="
echo " => Echo Handler Try Send: ", msg & "1", " <="
await conn.writeLp(msg & "1")
except CatchableError as e:
echo " => Echo Handler Error: ", e.msg, " <="
break
proc main {.async.} =
let switch =
@ -10,7 +24,15 @@ proc main {.async.} =
.withNoise()
.build()
let
codec = "/echo/1.0.0"
proto = new LPProtocol
proto.handler = echoHandler
proto.codec = codec
switch.mount(proto)
await switch.start()
echo "\e[31;1m", $(switch.peerInfo.addrs[0]), "/p2p/", $(switch.peerInfo.peerId), "\e[0m"
await sleepAsync(1.hours)
waitFor main()