194 lines
6.6 KiB
Nim
194 lines
6.6 KiB
Nim
## Nim-LibP2P
|
|
## Copyright (c) 2019 Status Research & Development GmbH
|
|
## Licensed under either of
|
|
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
## at your option.
|
|
## This file may not be copied, modified, or distributed except according to
|
|
## those terms.
|
|
|
|
import oids, deques
|
|
import chronos, chronicles
|
|
import types,
|
|
coder,
|
|
nimcrypto/utils,
|
|
../../stream/bufferstream,
|
|
../../stream/lpstream,
|
|
../../connection,
|
|
../../utility,
|
|
../../errors
|
|
|
|
export lpstream
|
|
|
|
logScope:
|
|
topic = "MplexChannel"
|
|
|
|
## Channel half-closed states
|
|
##
|
|
## | State | Closed local | Closed remote
|
|
## |=============================================
|
|
## | Read | Yes (until EOF) | No
|
|
## | Write | No | Yes
|
|
##
|
|
|
|
type
|
|
LPChannel* = ref object of BufferStream
|
|
id*: uint64 # channel id
|
|
name*: string # name of the channel (for debugging)
|
|
conn*: Connection # wrapped connection used to for writing
|
|
initiator*: bool # initiated remotely or locally flag
|
|
isLazy*: bool # is channel lazy
|
|
isOpen*: bool # has channel been oppened (only used with isLazy)
|
|
closedLocal*: bool # has channel been closed locally
|
|
msgCode*: MessageType # cached in/out message code
|
|
closeCode*: MessageType # cached in/out close code
|
|
resetCode*: MessageType # cached in/out reset code
|
|
|
|
proc open*(s: LPChannel) {.async, gcsafe.}
|
|
|
|
template withWriteLock(lock: AsyncLock, body: untyped): untyped =
|
|
try:
|
|
await lock.acquire()
|
|
body
|
|
finally:
|
|
lock.release()
|
|
|
|
template withEOFExceptions(body: untyped): untyped =
|
|
try:
|
|
body
|
|
except LPStreamEOFError as exc:
|
|
trace "muxed connection EOF", exc = exc.msg
|
|
except LPStreamClosedError as exc:
|
|
trace "muxed connection closed", exc = exc.msg
|
|
except LPStreamIncompleteError as exc:
|
|
trace "incomplete message", exc = exc.msg
|
|
|
|
proc newChannel*(id: uint64,
|
|
conn: Connection,
|
|
initiator: bool,
|
|
name: string = "",
|
|
size: int = DefaultBufferSize,
|
|
lazy: bool = false): LPChannel =
|
|
new result
|
|
result.id = id
|
|
result.name = name
|
|
result.conn = conn
|
|
result.initiator = initiator
|
|
result.msgCode = if initiator: MessageType.MsgOut else: MessageType.MsgIn
|
|
result.closeCode = if initiator: MessageType.CloseOut else: MessageType.CloseIn
|
|
result.resetCode = if initiator: MessageType.ResetOut else: MessageType.ResetIn
|
|
result.isLazy = lazy
|
|
|
|
let chan = result
|
|
proc writeHandler(data: seq[byte]): Future[void] {.async, gcsafe.} =
|
|
if chan.isLazy and not(chan.isOpen):
|
|
await chan.open()
|
|
|
|
# writes should happen in sequence
|
|
trace "sending data", data = data.shortLog,
|
|
id = chan.id,
|
|
initiator = chan.initiator,
|
|
name = chan.name,
|
|
oid = chan.oid
|
|
|
|
await conn.writeMsg(chan.id, chan.msgCode, data) # write header
|
|
|
|
result.initBufferStream(writeHandler, size)
|
|
when chronicles.enabledLogLevel == LogLevel.TRACE:
|
|
result.name = if result.name.len > 0: result.name else: $result.oid
|
|
|
|
trace "created new lpchannel", id = result.id,
|
|
oid = result.oid,
|
|
initiator = result.initiator,
|
|
name = result.name
|
|
|
|
proc closeMessage(s: LPChannel) {.async.} =
|
|
withEOFExceptions:
|
|
withWriteLock(s.writeLock):
|
|
trace "sending close message", id = s.id,
|
|
initiator = s.initiator,
|
|
name = s.name,
|
|
oid = s.oid
|
|
|
|
await s.conn.writeMsg(s.id, s.closeCode) # write close
|
|
|
|
proc resetMessage(s: LPChannel) {.async.} =
|
|
withEOFExceptions:
|
|
withWriteLock(s.writeLock):
|
|
trace "sending reset message", id = s.id,
|
|
initiator = s.initiator,
|
|
name = s.name,
|
|
oid = s.oid
|
|
|
|
await s.conn.writeMsg(s.id, s.resetCode) # write reset
|
|
|
|
proc open*(s: LPChannel) {.async, gcsafe.} =
|
|
## NOTE: Don't call withExcAndLock or withWriteLock,
|
|
## because this already gets called from writeHandler
|
|
## which is locked
|
|
withEOFExceptions:
|
|
await s.conn.writeMsg(s.id, MessageType.New, s.name)
|
|
trace "oppened channel", oid = s.oid,
|
|
name = s.name,
|
|
initiator = s.initiator
|
|
s.isOpen = true
|
|
|
|
proc closeRemote*(s: LPChannel) {.async.} =
|
|
trace "got EOF, closing channel", id = s.id,
|
|
initiator = s.initiator,
|
|
name = s.name,
|
|
oid = s.oid
|
|
|
|
# wait for all data in the buffer to be consumed
|
|
while s.len > 0:
|
|
await s.dataReadEvent.wait()
|
|
s.dataReadEvent.clear()
|
|
|
|
# TODO: Not sure if this needs to be set here or bfore consuming
|
|
# the buffer
|
|
s.isEof = true # set EOF immediately to prevent further reads
|
|
await procCall BufferStream(s).close() # close parent bufferstream
|
|
|
|
trace "channel closed on EOF", id = s.id,
|
|
initiator = s.initiator,
|
|
oid = s.oid,
|
|
name = s.name
|
|
|
|
method closed*(s: LPChannel): bool =
|
|
## this emulates half-closed behavior
|
|
## when closed locally writing is
|
|
## dissabled - see the table in the
|
|
## header of the file
|
|
s.closedLocal
|
|
|
|
method close*(s: LPChannel) {.async, gcsafe.} =
|
|
if s.closedLocal:
|
|
return
|
|
|
|
trace "closing local lpchannel", id = s.id,
|
|
initiator = s.initiator,
|
|
name = s.name,
|
|
oid = s.oid
|
|
# TODO: we should install a timer that on expire
|
|
# will make sure the channel did close by the remote
|
|
# so the hald-closed flow completed, if it didn't
|
|
# we should send a `reset` and move on.
|
|
await s.closeMessage()
|
|
s.closedLocal = true
|
|
if s.atEof: # already closed by remote close parent buffer imediately
|
|
await procCall BufferStream(s).close()
|
|
|
|
trace "lpchannel closed local", id = s.id,
|
|
initiator = s.initiator,
|
|
name = s.name,
|
|
oid = s.oid
|
|
|
|
method reset*(s: LPChannel) {.base, async.} =
|
|
# we asyncCheck here because the other end
|
|
# might be dead already - reset is always
|
|
# optimistic
|
|
asyncCheck s.resetMessage()
|
|
await procCall BufferStream(s).close()
|
|
s.isEof = true
|
|
s.closedLocal = true
|