From ec351cc2b01f09b2c3cf5cebec30ad699399c57e Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 4 Sep 2019 10:40:05 -0600 Subject: [PATCH] misc: cleanup mplex --- .gitignore | 1 + libp2p/muxers/mplex/channel.nim | 10 +++++----- libp2p/muxers/mplex/mplex.nim | 16 ++++++++++++---- libp2p/muxers/muxer.nim | 1 + tests/testmplex.nim | 8 ++++---- 5 files changed, 23 insertions(+), 13 deletions(-) diff --git a/.gitignore b/.gitignore index ef56d2862..cadf35384 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ build/ *.la *.exe *.dll +.vscode/ diff --git a/libp2p/muxers/mplex/channel.nim b/libp2p/muxers/mplex/channel.nim index 08eba9e40..562360c53 100644 --- a/libp2p/muxers/mplex/channel.nim +++ b/libp2p/muxers/mplex/channel.nim @@ -50,7 +50,7 @@ proc closeMessage(s: Channel) {.async, gcsafe.} = proc closed*(s: Channel): bool = s.closedLocal -proc closeRemote*(s: Channel) {.async.} = +proc closedByRemote*(s: Channel) {.async.} = s.closedRemote = true method close*(s: Channel) {.async, gcsafe.} = @@ -58,14 +58,14 @@ method close*(s: Channel) {.async, gcsafe.} = await s.closeMessage() proc resetMessage(s: Channel) {.async, gcsafe.} = - await s.conn.writeHeader(s.id, s.resetCode, 0) # write header + await s.conn.writeHeader(s.id, s.resetCode, 0) -proc remoteReset*(s: Channel) {.async, gcsafe.} = - await allFutures(s.close(), s.closeRemote()) +proc resetByRemote*(s: Channel) {.async, gcsafe.} = + await allFutures(s.close(), s.closedByRemote()) s.isReset = true proc reset*(s: Channel) {.async.} = - await allFutures(s.resetMessage(), s.remoteReset()) + await allFutures(s.resetMessage(), s.resetByRemote()) proc isReadEof(s: Channel): bool = bool((s.closedRemote or s.closedLocal) and s.len() < 1) diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index 8b956b357..5f27acc80 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -7,6 +7,13 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +## TODO: I have to be carefull to clean up channels correctly +## both by removing them from the internal tables as well as +## cleaning up when the channel is completelly finished, this +## is complicated because half closed makes it non-deterministic. +## This still needs to be implemented properly - I'm leaving it here +## to not forget that this needs to be fixed ASAP. + import tables, sequtils import chronos import ../../varint, ../../connection, @@ -44,7 +51,7 @@ proc newStreamInternal*(m: Mplex, proc newStreamInternal*(m: Mplex): Future[Channel] {.gcsafe.} = result = m.newStreamInternal(true, 0) -proc handle*(m: Mplex): Future[void] {.async, gcsafe.} = +method handle*(m: Mplex): Future[void] {.async, gcsafe.} = try: while not m.connection.closed: let (id, msgType) = await m.connection.readHeader() @@ -59,10 +66,11 @@ proc handle*(m: Mplex): Future[void] {.async, gcsafe.} = await channel.pushTo(msg) of MessageType.CloseIn, MessageType.CloseOut: let channel = m.getChannelList(initiator)[id.int] - await channel.closeRemote() + await channel.closedByRemote() + m.getChannelList(initiator).del(id.int) of MessageType.ResetIn, MessageType.ResetOut: let channel = m.getChannelList(initiator)[id.int] - await channel.reset() + await channel.resetByRemote() else: raise newMplexUnknownMsgError() except Exception as exc: #TODO: add proper loging @@ -88,4 +96,4 @@ method newStream*(m: Mplex): Future[Connection] {.async, gcsafe.} = method close*(m: Mplex) {.async, gcsafe.} = await allFutures(@[allFutures(toSeq(m.remote.values).mapIt(it.close())), allFutures(toSeq(m.local.values).mapIt(it.close()))]) - await m.connection.close() + m.connection.reset() diff --git a/libp2p/muxers/muxer.nim b/libp2p/muxers/muxer.nim index e617208c0..5623169a2 100644 --- a/libp2p/muxers/muxer.nim +++ b/libp2p/muxers/muxer.nim @@ -32,3 +32,4 @@ method init(c: MuxerProvider) = method newStream*(m: Muxer): Future[Connection] {.base, async, gcsafe.} = discard method close*(m: Muxer) {.base, async, gcsafe.} = discard +method handle*(m: Muxer): Future[void] {.base, async, gcsafe.} = discard diff --git a/tests/testmplex.nim b/tests/testmplex.nim index 67ba3a667..6a2fbe922 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -47,10 +47,10 @@ suite "Mplex": test "encode header": proc testEncodeHeader(): Future[bool] {.async.} = proc encHandler(msg: seq[byte]) = - check msg == fromHex("880102") + check msg == fromHex("886f04") let conn = newConnection(newTestEncodeStream(encHandler)) - await conn.writeHeader(17, MessageType.New, 2) + await conn.writeHeader(1777, MessageType.New, 4) result = true check: @@ -143,7 +143,7 @@ suite "Mplex": test "half closed - channel should close for read": proc testClosedForRead(): Future[void] {.async.} = let chann = newChannel(1, newConnection(new LPStream), true) - await chann.closeRemote() + await chann.closedByRemote() asyncDiscard chann.read() expect LPStreamClosedError: @@ -182,7 +182,7 @@ suite "Mplex": test "should not allow pushing data to channel when remote end closed": proc testResetWrite(): Future[void] {.async.} = let chann = newChannel(1, newConnection(new LPStream), true) - await chann.closeRemote() + await chann.closedByRemote() await chann.pushTo(@[byte(1)]) expect LPStreamClosedError: