From 27a7f8c948f5317a9051eeff24329c33dda5dd4a Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Fri, 29 May 2020 10:24:38 -0600 Subject: [PATCH] move EOF flag after local close and comments --- libp2p/muxers/mplex/lpchannel.nim | 36 ++++++++++++++++++------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 0133bc100..cff668e6c 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -31,6 +31,19 @@ logScope: ## | Write | No | Yes ## +# TODO: this is one place where we need to use +# a proper state machine, but I've opted out of +# it for now for two reasons: +# +# 1) we don't have that many states to manage +# 2) I'm not sure if adding the state machine +# would have simplified or complicated the code +# +# But now that this is in place, we should perhaps +# reconsider reworking it again, this time with a +# more formal approach. +# + type LPChannel* = ref object of BufferStream id*: uint64 # channel id @@ -38,7 +51,7 @@ type conn*: Connection # wrapped connection used to for writing initiator*: bool # initiated remotely or locally flag isLazy*: bool # is channel lazy - isOpen*: bool # has channel been oppened (only used with isLazy) + isOpen*: bool # has channel been opened (only used with isLazy) closedLocal*: bool # has channel been closed locally msgCode*: MessageType # cached in/out message code closeCode*: MessageType # cached in/out close code @@ -99,7 +112,7 @@ proc newChannel*(id: uint64, chan.msgCode, data).wait(2.minutes) # write header except AsyncTimeoutError: - trace "timeout writting channel, resetting" + trace "timeout writing channel, resetting" asyncCheck chan.reset() except CatchableError as exc: trace "unable to write in bufferstream handler", exc = exc.msg @@ -139,7 +152,7 @@ proc open*(s: LPChannel) {.async, gcsafe.} = ## which is locked withEOFExceptions: await s.conn.writeMsg(s.id, MessageType.New, s.name) - trace "oppened channel", oid = s.oid, + trace "opened channel", oid = s.oid, name = s.name, initiator = s.initiator s.isOpen = true @@ -155,9 +168,9 @@ proc closeRemote*(s: LPChannel) {.async.} = await s.dataReadEvent.wait() s.dataReadEvent.clear() - await s.close() # close local end s.isEof = true # set EOF immediately to prevent further reads - # call to avoid leacks + await s.close() # close local end + # call to avoid leaks await procCall BufferStream(s).close() # close parent bufferstream trace "channel closed on EOF", id = s.id, @@ -168,7 +181,7 @@ proc closeRemote*(s: LPChannel) {.async.} = method closed*(s: LPChannel): bool = ## this emulates half-closed behavior ## when closed locally writing is - ## dissabled - see the table in the + ## disabled - see the table in the ## header of the file s.closedLocal @@ -177,9 +190,6 @@ method reset*(s: LPChannel) {.base, async, gcsafe.} = # might be dead already - reset is always # optimistic asyncCheck s.resetMessage() - # # because of the async check above, - # # give the message time to depart - # await sleepAsync(100.millis) await procCall BufferStream(s).close() s.isEof = true s.closedLocal = true @@ -198,16 +208,12 @@ method close*(s: LPChannel) {.async, gcsafe.} = initiator = s.initiator, name = s.name, oid = s.oid - # TODO: we should install a timer that on expire - # will make sure the channel did close by the remote - # so the hald-closed flow completed, if it didn't - # we should send a `reset` and move on. await s.closeMessage().wait(2.minutes) s.closedLocal = true - if s.atEof: # already closed by remote close parent buffer imediately + if s.atEof: # already closed by remote close parent buffer immediately await procCall BufferStream(s).close() except AsyncTimeoutError: - trace "close timeoud, reset channel" + trace "close timed out, reset channel" asyncCheck s.reset() # reset on timeout except CatchableError as exc: trace "exception closing channel"