diff --git a/libp2p/muxers/mplex/coder.nim b/libp2p/muxers/mplex/coder.nim index a4cd4b021..6e44bd0ae 100644 --- a/libp2p/muxers/mplex/coder.nim +++ b/libp2p/muxers/mplex/coder.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import chronos +import chronos, options, sequtils import types, ../../connection, ../../varint, @@ -16,41 +16,57 @@ import types, nimcrypto/utils type - Phase = enum Header, Size + Msg* = tuple + id: uint + msgType: MessageType + data: seq[byte] -proc readMplexVarint(conn: Connection): Future[uint] {.async, gcsafe.} = +proc readMplexVarint(conn: Connection): Future[Option[uint]] {.async, gcsafe.} = var varint: uint length: int res: VarintStatus var buffer = newSeq[byte](10) + result = none(uint) try: for i in 0.. 0.uint: - data = await conn.read(dataLen.int) - result = (header shr 3, MessageType(header and 0x7), data) + if dataLenVarint.isSome and dataLenVarint.get() > 0.uint: + data = await conn.read(dataLenVarint.get().int) + + let header = headerVarint.get() + result = some((header shr 3, MessageType(header and 0x7), data)) proc writeMsg*(conn: Connection, - id: uint, - msgType: MessageType, - data: seq[byte] = @[]) {.async, gcsafe.} = + id: uint, + msgType: MessageType, + data: seq[byte] = @[]) {.async, gcsafe.} = ## write lenght prefixed var buf = initVBuffer() - buf.writeVarint((id shl 3) or ord(msgType).uint) + let header = (id shl 3 or ord(msgType).uint) + buf.writeVarint(id shl 3 or ord(msgType).uint) buf.writeVarint(data.len().uint) # size should be always sent buf.finish() await conn.write(buf.buffer & data) + +proc writeMsg*(conn: Connection, + id: uint, + msgType: MessageType, + data: string) {.async, gcsafe.} = + result = conn.writeMsg(id, msgType, cast[seq[byte]](toSeq(data.items))) \ No newline at end of file