diff --git a/libp2p/mplex.nim b/libp2p/mplex.nim deleted file mode 100644 index 1ecb9cd21..000000000 --- a/libp2p/mplex.nim +++ /dev/null @@ -1,145 +0,0 @@ -## Nim-LibP2P -## Copyright (c) 2018 Status Research & Development GmbH -## Licensed under either of -## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -## * MIT license ([LICENSE-MIT](LICENSE-MIT)) -## at your option. -## This file may not be copied, modified, or distributed except according to -## those terms. - -import chronos -import ../varint, ../connection, - ../vbuffer, ../protocol - -const DefaultReadSize: uint = 1024 - -const MaxMsgSize* = 1 shl 20 # 1mb -const MaxChannels* = 1000 - -type - MplexUnknownMsgError* = object of CatchableError - - MessageType* {.pure.} = enum - New, - MsgIn, - MsgOut, - CloseIn, - CloseOut, - ResetIn, - ResetOut - - ChannelHandler* = proc(conn: Connection) {.gcsafe.} - - Mplex* = ref object of LPProtocol - remote*: seq[Connection] - local*: seq[Connection] - channelHandler*: ChannelHandler - currentId*: uint - - Channel* = ref object of BufferStream - id*: int - initiator*: bool - reset*: bool - closedLocal*: bool - closedRemote*: bool - mplex*: Mplex - -proc newMplexUnknownMsgError*(): ref MplexUnknownMsgError = - result = newException(MplexUnknownMsgError, "Unknown mplex message type") - -proc readLp*(conn: Connection): Future[tuple[id: uint, msgType: MessageType]] {.gcsafe.} = - var - header: uint - length: int - res: VarintStatus - var buffer = newSeq[byte](10) - try: - for i in 0.. -1: chanId else: m.currentId + proc writeHandler(data: seq[byte]): Future[void] {.async, gcsafe.} = + let msgType = if initiator: MessageType.MsgIn else: MessageType.MsgOut + await m.connection.writeHeader(id, msgType, data.len) # write header + await m.connection.write(data) # write data + + let channel = newChannel(m, id, initiator, writeHandler) + m.getChannelList(initiator)[id] = channel + result = newConnection(channel) + +proc handle*(m: Mplex) {.async, gcsafe.} = + while not m.connection.closed: + let (id, msgType) = await m.connection.readHeader() + let initiator = bool(ord(msgType) and 1) + case msgType: + of MessageType.New: + await m.streamHandler(await m.newStream(id.int, false)) + of MessageType.MsgIn, MessageType.MsgOut: + await m.getChannelList(initiator)[id.int].pushTo(await m.connection.readLp()) + of MessageType.CloseIn, MessageType.CloseOut: + await m.getChannelList(initiator)[id.int].close() + of MessageType.ResetIn, MessageType.ResetOut: + await m.getChannelList(initiator)[id.int].reset() + else: raise newMplexUnknownMsgError() + +proc newMplex*(conn: Connection, + streamHandler: StreamHandler, + maxChanns: uint = MaxChannels): Mplex = + new result + result.connection = conn + result.maxChannels = maxChanns + result.streamHandler = streamHandler + +method newStream*(m: Mplex): Future[Connection] {.gcsafe.} = + result = m.newStream(true) + +method close(m: Mplex) {.async, gcsafe.} = + let futs = @[allFutures(m.remote.mapIt(it.close())), + allFutures(m.local.mapIt(it.close()))] + await allFutures(futs) + await m.connection.close() diff --git a/libp2p/muxers/muxer.nim b/libp2p/muxers/muxer.nim new file mode 100644 index 000000000..e617208c0 --- /dev/null +++ b/libp2p/muxers/muxer.nim @@ -0,0 +1,34 @@ +## Nim-LibP2P +## Copyright (c) 2018 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import chronos +import ../protocol, ../connection + +type + Muxer* = ref object of RootObj + connection*: Connection + + MuxerCreator* = proc(conn: Connection): Muxer {.gcsafe, closure.} + # this wraps a creator proc that knows how to make muxers + MuxerProvider* = ref object of LPProtocol + newMuxer*: MuxerCreator + +proc newMuxerProvider*(creator: MuxerCreator, codec: string): MuxerProvider {.gcsafe.} = + new result + result.newMuxer = creator + result.codec = codec + +method init(c: MuxerProvider) = + proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = + let muxer = c.newMuxer(conn) + + c.handler = handler + +method newStream*(m: Muxer): Future[Connection] {.base, async, gcsafe.} = discard +method close*(m: Muxer) {.base, async, gcsafe.} = discard diff --git a/tests/testmplex.nim b/tests/testmplex.nim new file mode 100644 index 000000000..ee8386757 --- /dev/null +++ b/tests/testmplex.nim @@ -0,0 +1,90 @@ +import unittest, sequtils, sugar +import chronos, nimcrypto/utils +import ../libp2p/muxers/mplex, ../libp2p/connection, + ../libp2p/stream/lpstream, ../libp2p/tcptransport, + ../libp2p/transport, ../libp2p/multiaddress + +type + TestEncodeStream = ref object of LPStream + handler*: proc(data: seq[byte]) + +method write*(s: TestEncodeStream, + msg: seq[byte], + msglen = -1): + Future[void] {.gcsafe.} = + s.handler(msg) + +proc newTestEncodeStream(handler: proc(data: seq[byte])): TestEncodeStream = + new result + result.handler = handler + +type + TestDecodeStream = ref object of LPStream + handler*: proc(data: seq[byte]) + step*: int + msg*: seq[byte] + +method readExactly*(s: TestDecodeStream, + pbytes: pointer, + nbytes: int): Future[void] {.async, gcsafe.} = + let buff: seq[byte] = s.msg + copyMem(pbytes, unsafeAddr buff[s.step], nbytes) + s.step += nbytes + +proc newTestDecodeStream(): TestDecodeStream = + new result + result.step = 0 + result.msg = fromHex("8801023137") + +suite "Mplex": + # test "encode header": + # proc testEncodeHeader(): Future[bool] {.async.} = + # proc encHandler(msg: seq[byte]) = + # check msg == fromHex("880102") + + # let conn = newConnection(newTestEncodeStream(encHandler)) + # await conn.writeHeader(uint(17), MessageType.New, 2) + # result = true + + # check: + # waitFor(testEncodeHeader()) == true + + # test "decode header": + # proc testDecodeHeader(): Future[bool] {.async.} = + # let conn = newConnection(newTestDecodeStream()) + # let (id, msgType) = await conn.readHeader() + + # check id == 17 + # check msgType == MessageType.New + # let data = await conn.readLp() + # check cast[string](data) == "17" + # result = true + + # check: + # waitFor(testDecodeHeader()) == true + + test "e2e - new stream": + proc testNewStream(): Future[bool] {.async.} = + let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53351") + + proc connHandler(conn: Connection) {.async, gcsafe.} = + proc handleListen(stream: Connection) {.async, gcsafe.} = + await stream.writeLp("Hello from stream!") + + let mplexListen = newMplex(conn, handleListen) + await mplexListen.handle() + + let transport1: TcpTransport = newTransport(TcpTransport) + await transport1.listen(ma, connHandler) + + let transport2: TcpTransport = newTransport(TcpTransport) + let conn = await transport2.dial(ma) + proc handleDial(stream: Connection) {.async, gcsafe.} = + let msg = await stream.readLp() + + let mplexDial = newMplex(conn, handleDial) + let handleFut = mplexDial.handle() + result = true + + check: + waitFor(testNewStream()) == true