From 6058a3fc694b763b92d1fb9e602ecbc03920e4a8 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 3 Sep 2019 21:08:51 -0600 Subject: [PATCH] split mplex --- libp2p/muxers/mplex/channel.nim | 34 +++++++++++ libp2p/muxers/mplex/coder.nim | 43 ++++++++++++++ libp2p/muxers/{ => mplex}/mplex.nim | 92 +++-------------------------- libp2p/muxers/mplex/types.nim | 28 +++++++++ tests/testmplex.nim | 12 +++- 5 files changed, 122 insertions(+), 87 deletions(-) create mode 100644 libp2p/muxers/mplex/channel.nim create mode 100644 libp2p/muxers/mplex/coder.nim rename libp2p/muxers/{ => mplex}/mplex.nim (58%) create mode 100644 libp2p/muxers/mplex/types.nim diff --git a/libp2p/muxers/mplex/channel.nim b/libp2p/muxers/mplex/channel.nim new file mode 100644 index 0000000..ecf7940 --- /dev/null +++ b/libp2p/muxers/mplex/channel.nim @@ -0,0 +1,34 @@ +## Nim-LibP2P +## Copyright (c) 2018 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import chronos +import ../../stream/bufferstream +import types + +type + Channel* = ref object of BufferStream + id*: int + initiator*: bool + isReset*: bool + closedLocal*: bool + closedRemote*: bool + handlerFuture*: Future[void] + +proc newChannel*(id: int, + initiator: bool, + handler: WriteHandler, + size: int = MaxMsgSize): Channel = + new result + result.id = id + result.initiator = initiator + result.initBufferStream(handler, size) + +proc closed*(s: Channel): bool = s.closedLocal and s.closedRemote +proc close*(s: Channel) {.async.} = discard +proc reset*(s: Channel) {.async.} = discard diff --git a/libp2p/muxers/mplex/coder.nim b/libp2p/muxers/mplex/coder.nim new file mode 100644 index 0000000..141e2a2 --- /dev/null +++ b/libp2p/muxers/mplex/coder.nim @@ -0,0 +1,43 @@ +## Nim-LibP2P +## Copyright (c) 2018 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import chronos +import ../../connection, ../../varint, + ../../vbuffer, mplex, types, + ../../stream/lpstream + +proc readHeader*(conn: Connection): Future[(uint, MessageType)] {.async, gcsafe.} = + var + header: uint + length: int + res: VarintStatus + var buffer = newSeq[byte](10) + try: + for i in 0.. 0: + buf.writeVarint(LPSomeUVarint(size.uint)) + buf.finish() + result = conn.write(buf.buffer) diff --git a/libp2p/muxers/mplex.nim b/libp2p/muxers/mplex/mplex.nim similarity index 58% rename from libp2p/muxers/mplex.nim rename to libp2p/muxers/mplex/mplex.nim index 1e3d93d..62b4bf4 100644 --- a/libp2p/muxers/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -9,27 +9,13 @@ import tables, sequtils import chronos -import ../varint, ../connection, - ../vbuffer, ../protocol, - ../stream/bufferstream, ../stream/lpstream, - muxer +import ../../varint, ../../connection, + ../../vbuffer, ../../protocol, + ../../stream/bufferstream, + ../../stream/lpstream, ../muxer, + coder, types, channel -const MaxMsgSize* = 1 shl 20 # 1mb -const MaxChannels* = 1000 -const MplexCodec* = "/mplex/6.7.0" - -type - MplexUnknownMsgError* = object of CatchableError - MessageType* {.pure.} = enum - New, - MsgIn, - MsgOut, - CloseIn, - CloseOut, - ResetIn, - ResetOut - - StreamHandler = proc(conn: Connection): Future[void] {.gcsafe.} +type Mplex* = ref object of Muxer remote*: Table[int, Channel] local*: Table[int, Channel] @@ -37,71 +23,9 @@ type maxChannels*: uint streamHandler*: StreamHandler - Channel* = ref object of BufferStream - id*: int - initiator*: bool - isReset*: bool - closedLocal*: bool - closedRemote*: bool - mplex*: Mplex - handlerFuture*: Future[void] - -proc newMplexUnknownMsgError*(): ref MplexUnknownMsgError = +proc newMplexUnknownMsgError(): ref MplexUnknownMsgError = result = newException(MplexUnknownMsgError, "Unknown mplex message type") -########################################## -## Read/Write Helpers -########################################## - -proc readHeader*(conn: Connection): Future[(uint, MessageType)] {.async, gcsafe.} = - var - header: uint - length: int - res: VarintStatus - var buffer = newSeq[byte](10) - try: - for i in 0.. 0: - buf.writeVarint(LPSomeUVarint(size.uint)) - buf.finish() - result = conn.write(buf.buffer) - -########################################## -## Channel -########################################## - -proc newChannel*(mplex: Mplex, - id: int, - initiator: bool, - handler: WriteHandler, - size: int = MaxMsgSize): Channel = - new result - result.id = id - result.mplex = mplex - result.initiator = initiator - result.initBufferStream(handler, size) - -proc closed*(s: Channel): bool = s.closedLocal and s.closedRemote -proc close*(s: Channel) {.async.} = discard -proc reset*(s: Channel) {.async.} = discard - ########################################## ## Mplex ########################################## @@ -123,7 +47,7 @@ proc newStreamInternal*(m: Mplex, await m.connection.writeHeader(id, msgType, data.len) # write header await m.connection.write(data) # write data - result = newChannel(m, id, initiator, writeHandler) + result = newChannel(id, initiator, writeHandler) m.getChannelList(initiator)[id] = result proc newStreamInternal*(m: Mplex): Future[Channel] {.gcsafe.} = diff --git a/libp2p/muxers/mplex/types.nim b/libp2p/muxers/mplex/types.nim new file mode 100644 index 0000000..8a7067f --- /dev/null +++ b/libp2p/muxers/mplex/types.nim @@ -0,0 +1,28 @@ +## Nim-LibP2P +## Copyright (c) 2018 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import chronos +import ../../connection + +const MaxMsgSize* = 1 shl 20 # 1mb +const MaxChannels* = 1000 +const MplexCodec* = "/mplex/6.7.0" + +type + MplexUnknownMsgError* = object of CatchableError + MessageType* {.pure.} = enum + New, + MsgIn, + MsgOut, + CloseIn, + CloseOut, + ResetIn, + ResetOut + + StreamHandler* = proc(conn: Connection): Future[void] {.gcsafe.} diff --git a/tests/testmplex.nim b/tests/testmplex.nim index 7f3531f..2899c25 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -1,8 +1,14 @@ import unittest, sequtils, sugar import chronos, nimcrypto/utils -import ../libp2p/muxers/mplex, ../libp2p/connection, - ../libp2p/stream/lpstream, ../libp2p/tcptransport, - ../libp2p/transport, ../libp2p/multiaddress +import ../libp2p/connection, + ../libp2p/stream/lpstream, + ../libp2p/tcptransport, + ../libp2p/transport, + ../libp2p/multiaddress, + ../libp2p/muxers/mplex/mplex, + ../libp2p/muxers/mplex/coder, + ../libp2p/muxers/mplex/types, + ../libp2p/muxers/mplex/channel type TestEncodeStream = ref object of LPStream