move EOF flag after local close and comments

This commit is contained in:
Dmitriy Ryajov 2020-05-29 10:24:38 -06:00
parent 815282a5da
commit 27a7f8c948
1 changed files with 21 additions and 15 deletions

View File

@ -31,6 +31,19 @@ logScope:
## | Write | No | Yes ## | Write | No | Yes
## ##
# TODO: this is one place where we need to use
# a proper state machine, but I've opted out of
# it for now for two reasons:
#
# 1) we don't have that many states to manage
# 2) I'm not sure if adding the state machine
# would have simplified or complicated the code
#
# But now that this is in place, we should perhaps
# reconsider reworking it again, this time with a
# more formal approach.
#
type type
LPChannel* = ref object of BufferStream LPChannel* = ref object of BufferStream
id*: uint64 # channel id id*: uint64 # channel id
@ -38,7 +51,7 @@ type
conn*: Connection # wrapped connection used to for writing conn*: Connection # wrapped connection used to for writing
initiator*: bool # initiated remotely or locally flag initiator*: bool # initiated remotely or locally flag
isLazy*: bool # is channel lazy isLazy*: bool # is channel lazy
isOpen*: bool # has channel been oppened (only used with isLazy) isOpen*: bool # has channel been opened (only used with isLazy)
closedLocal*: bool # has channel been closed locally closedLocal*: bool # has channel been closed locally
msgCode*: MessageType # cached in/out message code msgCode*: MessageType # cached in/out message code
closeCode*: MessageType # cached in/out close code closeCode*: MessageType # cached in/out close code
@ -99,7 +112,7 @@ proc newChannel*(id: uint64,
chan.msgCode, chan.msgCode,
data).wait(2.minutes) # write header data).wait(2.minutes) # write header
except AsyncTimeoutError: except AsyncTimeoutError:
trace "timeout writting channel, resetting" trace "timeout writing channel, resetting"
asyncCheck chan.reset() asyncCheck chan.reset()
except CatchableError as exc: except CatchableError as exc:
trace "unable to write in bufferstream handler", exc = exc.msg trace "unable to write in bufferstream handler", exc = exc.msg
@ -139,7 +152,7 @@ proc open*(s: LPChannel) {.async, gcsafe.} =
## which is locked ## which is locked
withEOFExceptions: withEOFExceptions:
await s.conn.writeMsg(s.id, MessageType.New, s.name) await s.conn.writeMsg(s.id, MessageType.New, s.name)
trace "oppened channel", oid = s.oid, trace "opened channel", oid = s.oid,
name = s.name, name = s.name,
initiator = s.initiator initiator = s.initiator
s.isOpen = true s.isOpen = true
@ -155,9 +168,9 @@ proc closeRemote*(s: LPChannel) {.async.} =
await s.dataReadEvent.wait() await s.dataReadEvent.wait()
s.dataReadEvent.clear() s.dataReadEvent.clear()
await s.close() # close local end
s.isEof = true # set EOF immediately to prevent further reads s.isEof = true # set EOF immediately to prevent further reads
# call to avoid leacks await s.close() # close local end
# call to avoid leaks
await procCall BufferStream(s).close() # close parent bufferstream await procCall BufferStream(s).close() # close parent bufferstream
trace "channel closed on EOF", id = s.id, trace "channel closed on EOF", id = s.id,
@ -168,7 +181,7 @@ proc closeRemote*(s: LPChannel) {.async.} =
method closed*(s: LPChannel): bool = method closed*(s: LPChannel): bool =
## this emulates half-closed behavior ## this emulates half-closed behavior
## when closed locally writing is ## when closed locally writing is
## dissabled - see the table in the ## disabled - see the table in the
## header of the file ## header of the file
s.closedLocal s.closedLocal
@ -177,9 +190,6 @@ method reset*(s: LPChannel) {.base, async, gcsafe.} =
# might be dead already - reset is always # might be dead already - reset is always
# optimistic # optimistic
asyncCheck s.resetMessage() asyncCheck s.resetMessage()
# # because of the async check above,
# # give the message time to depart
# await sleepAsync(100.millis)
await procCall BufferStream(s).close() await procCall BufferStream(s).close()
s.isEof = true s.isEof = true
s.closedLocal = true s.closedLocal = true
@ -198,16 +208,12 @@ method close*(s: LPChannel) {.async, gcsafe.} =
initiator = s.initiator, initiator = s.initiator,
name = s.name, name = s.name,
oid = s.oid oid = s.oid
# TODO: we should install a timer that on expire
# will make sure the channel did close by the remote
# so the hald-closed flow completed, if it didn't
# we should send a `reset` and move on.
await s.closeMessage().wait(2.minutes) await s.closeMessage().wait(2.minutes)
s.closedLocal = true s.closedLocal = true
if s.atEof: # already closed by remote close parent buffer imediately if s.atEof: # already closed by remote close parent buffer immediately
await procCall BufferStream(s).close() await procCall BufferStream(s).close()
except AsyncTimeoutError: except AsyncTimeoutError:
trace "close timeoud, reset channel" trace "close timed out, reset channel"
asyncCheck s.reset() # reset on timeout asyncCheck s.reset() # reset on timeout
except CatchableError as exc: except CatchableError as exc:
trace "exception closing channel" trace "exception closing channel"