From 8338a16aab14af674fc38df2178908acbe4eafb7 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Fri, 6 Sep 2019 00:51:19 -0600 Subject: [PATCH] testing mplex --- libp2p/muxers/mplex/channel.nim | 10 ++++++---- libp2p/muxers/mplex/coder.nim | 12 ++++++++---- libp2p/muxers/mplex/mplex.nim | 15 +++++++++------ libp2p/muxers/muxer.nim | 26 +++++++++++++++++--------- 4 files changed, 40 insertions(+), 23 deletions(-) diff --git a/libp2p/muxers/mplex/channel.nim b/libp2p/muxers/mplex/channel.nim index 562360c53..a8ef82848 100644 --- a/libp2p/muxers/mplex/channel.nim +++ b/libp2p/muxers/mplex/channel.nim @@ -7,10 +7,12 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import chronos +import chronos, strformat import ../../stream/bufferstream, ../../stream/lpstream, - types, coder, ../../connection + ../../connection, + nimcrypto/utils, + types, coder type Channel* = ref object of BufferStream @@ -45,7 +47,7 @@ proc newChannel*(id: int, result.initBufferStream(writeHandler, size) proc closeMessage(s: Channel) {.async, gcsafe.} = - await s.conn.writeHeader(s.id, s.closeCode, 0) # write header + await s.conn.writeHeader(s.id, s.closeCode) # write header proc closed*(s: Channel): bool = s.closedLocal @@ -58,7 +60,7 @@ method close*(s: Channel) {.async, gcsafe.} = await s.closeMessage() proc resetMessage(s: Channel) {.async, gcsafe.} = - await s.conn.writeHeader(s.id, s.resetCode, 0) + await s.conn.writeHeader(s.id, s.resetCode) proc resetByRemote*(s: Channel) {.async, gcsafe.} = await allFutures(s.close(), s.closedByRemote()) diff --git a/libp2p/muxers/mplex/coder.nim b/libp2p/muxers/mplex/coder.nim index 7ba8f2c53..80fe7e507 100644 --- a/libp2p/muxers/mplex/coder.nim +++ b/libp2p/muxers/mplex/coder.nim @@ -8,9 +8,12 @@ ## those terms. import chronos -import ../../connection, ../../varint, - ../../vbuffer, types, - ../../stream/lpstream +import types, + ../../connection, + ../../varint, + ../../vbuffer, + ../../stream/lpstream, + nimcrypto/utils proc readHeader*(conn: Connection): Future[(uint, MessageType)] {.async, gcsafe.} = var @@ -29,11 +32,12 @@ proc readHeader*(conn: Connection): Future[(uint, MessageType)] {.async, gcsafe. return except LPStreamIncompleteError: buffer.setLen(0) + raise newException(CatchableError, "Could not decode header!") proc writeHeader*(conn: Connection, id: int, msgType: MessageType, - size: int) {.async, gcsafe.} = + size: int = 0) {.async, gcsafe.} = ## write lenght prefixed var buf = initVBuffer() buf.writeVarint(LPSomeUVarint(id.uint shl 3 or msgType.uint)) diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index 04a8039ff..2bc193082 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -16,13 +16,16 @@ ## This still needs to be implemented properly - I'm leaving it ## here to not forget that this needs to be fixed ASAP. -import tables, sequtils +import tables, sequtils, strformat import chronos -import ../../varint, ../../connection, - ../../vbuffer, ../../protocol, +import coder, types, channel, + ../../varint, + ../../connection, + ../../vbuffer, + ../../protocols/protocol, ../../stream/bufferstream, - ../../stream/lpstream, ../muxer, - coder, types, channel + ../../stream/lpstream, + ../muxer type Mplex* = ref object of Muxer @@ -90,7 +93,7 @@ proc newMplex*(conn: Connection, method newStream*(m: Mplex): Future[Connection] {.async, gcsafe.} = let channel = await m.newStreamInternal() - await m.connection.writeHeader(channel.id, MessageType.New, 0) + await m.connection.writeHeader(channel.id, MessageType.New) result = newConnection(channel) method close*(m: Mplex) {.async, gcsafe.} = diff --git a/libp2p/muxers/muxer.nim b/libp2p/muxers/muxer.nim index 03d5dcfce..c855ba669 100644 --- a/libp2p/muxers/muxer.nim +++ b/libp2p/muxers/muxer.nim @@ -8,7 +8,8 @@ ## those terms. import chronos -import ../protocol, ../connection +import ../protocols/protocol, + ../connection type StreamHandler* = proc(conn: Connection): Future[void] {.gcsafe.} @@ -21,21 +22,28 @@ type # this wraps a creator proc that knows how to make muxers MuxerProvider* = ref object of LPProtocol newMuxer*: MuxerCreator + streamHandler*: StreamHandler + +method newStream*(m: Muxer): Future[Connection] {.base, async, gcsafe.} = discard +method close*(m: Muxer) {.base, async, gcsafe.} = discard +method handle*(m: Muxer): Future[void] {.base, async, gcsafe.} = discard +method `=streamHandler`*(m: Muxer, handler: StreamHandler) {.base, gcsafe.} = + m.streamHandler = handler proc newMuxerProvider*(creator: MuxerCreator, codec: string): MuxerProvider {.gcsafe.} = new result result.newMuxer = creator result.codec = codec - + result.init() + +method `=streamHandler`*(m: MuxerProvider, handler: StreamHandler) {.base, gcsafe.} = + m.streamHandler = handler + method init(c: MuxerProvider) = proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = let muxer = c.newMuxer(conn) + if not isNil(c.streamHandler): + muxer.streamHandler = c.streamHandler + await muxer.handle() c.handler = handler - -method newStream*(m: Muxer): Future[Connection] {.base, async, gcsafe.} = discard -method close*(m: Muxer) {.base, async, gcsafe.} = discard -method handle*(m: Muxer): Future[void] {.base, async, gcsafe.} = discard - -method `=streamHandler`*(m: Muxer, handler: StreamHandler) {.base, gcsafe.} = - m.streamHandler = handler