diff --git a/libp2p/muxers/mplex/channel.nim b/libp2p/muxers/mplex/channel.nim index a8ef82848..f501e97fc 100644 --- a/libp2p/muxers/mplex/channel.nim +++ b/libp2p/muxers/mplex/channel.nim @@ -14,9 +14,12 @@ import ../../stream/bufferstream, nimcrypto/utils, types, coder +const DefaultChannelSize* = DefaultBufferSize * 64 # 64kb + type Channel* = ref object of BufferStream id*: int + name*: string conn*: Connection initiator*: bool isReset*: bool @@ -30,9 +33,11 @@ type proc newChannel*(id: int, conn: Connection, initiator: bool, - size: int = MaxMsgSize): Channel = + name: string = "", + size: int = DefaultChannelSize): Channel = new result result.id = id + result.name = name result.conn = conn result.initiator = initiator result.msgCode = if initiator: MessageType.MsgOut else: MessageType.MsgIn @@ -41,7 +46,7 @@ proc newChannel*(id: int, let chan = result proc writeHandler(data: seq[byte]): Future[void] {.async, gcsafe.} = - await conn.writeHeader(id, chan.msgCode, data.len) # write header + await conn.writeHeader(chan.id, chan.msgCode, data.len) # write header await conn.write(data) result.initBufferStream(writeHandler, size) diff --git a/libp2p/muxers/mplex/coder.nim b/libp2p/muxers/mplex/coder.nim index 80fe7e507..93648ff4f 100644 --- a/libp2p/muxers/mplex/coder.nim +++ b/libp2p/muxers/mplex/coder.nim @@ -15,6 +15,9 @@ import types, ../../stream/lpstream, nimcrypto/utils +type + Phase = enum Header, Size + proc readHeader*(conn: Connection): Future[(uint, MessageType)] {.async, gcsafe.} = var header: uint @@ -26,13 +29,14 @@ proc readHeader*(conn: Connection): Future[(uint, MessageType)] {.async, gcsafe. await conn.readExactly(addr buffer[i], 1) res = LP.getUVarint(buffer.toOpenArray(0, i), length, header) if res == VarintStatus.Success: + let (id, msg) = (header shr 3, MessageType(header and 0x7)) return (header shr 3, MessageType(header and 0x7)) if res != VarintStatus.Success: buffer.setLen(0) return - except LPStreamIncompleteError: + except TransportIncompleteError: buffer.setLen(0) - raise newException(CatchableError, "Could not decode header!") + raise newLPStreamIncompleteError() proc writeHeader*(conn: Connection, id: int, @@ -40,8 +44,7 @@ proc writeHeader*(conn: Connection, size: int = 0) {.async, gcsafe.} = ## write lenght prefixed var buf = initVBuffer() - buf.writeVarint(LPSomeUVarint(id.uint shl 3 or msgType.uint)) - if size > 0: - buf.writeVarint(LPSomeUVarint(size.uint)) + buf.writeVarint((id.uint shl 3) or msgType.uint) + buf.writeVarint(size.uint) # size should be always sent buf.finish() - result = conn.write(buf.buffer) + await conn.write(buf.buffer) diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index 2bc193082..8a975e134 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -16,7 +16,7 @@ ## This still needs to be implemented properly - I'm leaving it ## here to not forget that this needs to be fixed ASAP. -import tables, sequtils, strformat +import tables, sequtils, strformat, options import chronos import coder, types, channel, ../../varint, @@ -34,6 +34,9 @@ type currentId*: int maxChannels*: uint +proc newMplexNoSuchChannel(id: int, msgType: MessageType): ref MplexNoSuchChannel = + result = newException(MplexNoSuchChannel, &"No such channel id {$id} and message {$msgType}") + proc newMplexUnknownMsgError(): ref MplexUnknownMsgError = result = newException(MplexUnknownMsgError, "Unknown mplex message type") @@ -45,11 +48,12 @@ proc getChannelList(m: Mplex, initiator: bool): var Table[int, Channel] = proc newStreamInternal*(m: Mplex, initiator: bool = true, - chanId: int): + chanId: int, + name: string = ""): Future[Channel] {.async, gcsafe.} = ## create new channel/stream let id = if initiator: m.currentId.inc(); m.currentId else: chanId - result = newChannel(id, m.connection, initiator) + result = newChannel(id, m.connection, initiator, name) m.getChannelList(initiator)[id] = result proc newStreamInternal*(m: Mplex): Future[Channel] {.gcsafe.} = @@ -60,26 +64,36 @@ method handle*(m: Mplex): Future[void] {.async, gcsafe.} = while not m.connection.closed: let (id, msgType) = await m.connection.readHeader() let initiator = bool(ord(msgType) and 1) + var channel: Channel + if MessageType(msgType) != MessageType.New: + let channels = m.getChannelList(initiator) + if not channels.contains(id.int): + raise newMplexNoSuchChannel(id.int, msgType) + channel = channels[id.int] + case msgType: of MessageType.New: - let channel = await m.newStreamInternal(false, id.int) + var name: seq[byte] + try: + name = await m.connection.readLp() + except LPStreamIncompleteError as exc: + echo exc.msg + except Exception as exc: + echo exc.msg + raise + + let channel = await m.newStreamInternal(false, id.int, cast[string](name)) if not isNil(m.streamHandler): channel.handlerFuture = m.streamHandler(newConnection(channel)) of MessageType.MsgIn, MessageType.MsgOut: - let channel = m.getChannelList(initiator)[id.int] let msg = await m.connection.readLp() await channel.pushTo(msg) of MessageType.CloseIn, MessageType.CloseOut: - let channel = m.getChannelList(initiator)[id.int] await channel.closedByRemote() m.getChannelList(initiator).del(id.int) of MessageType.ResetIn, MessageType.ResetOut: - let channel = m.getChannelList(initiator)[id.int] await channel.resetByRemote() else: raise newMplexUnknownMsgError() - except Exception as exc: - #TODO: add proper loging - discard finally: await m.connection.close() @@ -91,9 +105,11 @@ proc newMplex*(conn: Connection, result.remote = initTable[int, Channel]() result.local = initTable[int, Channel]() -method newStream*(m: Mplex): Future[Connection] {.async, gcsafe.} = +method newStream*(m: Mplex, name: string = ""): Future[Connection] {.async, gcsafe.} = let channel = await m.newStreamInternal() - await m.connection.writeHeader(channel.id, MessageType.New) + await m.connection.writeHeader(channel.id, MessageType.New, len(name)) + if name.len > 0: + await m.connection.write(name) result = newConnection(channel) method close*(m: Mplex) {.async, gcsafe.} = diff --git a/libp2p/muxers/mplex/types.nim b/libp2p/muxers/mplex/types.nim index f83bb37a7..acb529681 100644 --- a/libp2p/muxers/mplex/types.nim +++ b/libp2p/muxers/mplex/types.nim @@ -17,6 +17,8 @@ const MaxReadWriteTime* = 5.seconds type MplexUnknownMsgError* = object of CatchableError + MplexNoSuchChannel* = object of CatchableError + MessageType* {.pure.} = enum New, MsgIn, diff --git a/libp2p/muxers/muxer.nim b/libp2p/muxers/muxer.nim index c855ba669..630633253 100644 --- a/libp2p/muxers/muxer.nim +++ b/libp2p/muxers/muxer.nim @@ -24,7 +24,7 @@ type newMuxer*: MuxerCreator streamHandler*: StreamHandler -method newStream*(m: Muxer): Future[Connection] {.base, async, gcsafe.} = discard +method newStream*(m: Muxer, name: string = ""): Future[Connection] {.base, async, gcsafe.} = discard method close*(m: Muxer) {.base, async, gcsafe.} = discard method handle*(m: Muxer): Future[void] {.base, async, gcsafe.} = discard method `=streamHandler`*(m: Muxer, handler: StreamHandler) {.base, gcsafe.} =