## Nim-LibP2P ## Copyright (c) 2019 Status Research & Development GmbH ## Licensed under either of ## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) ## * MIT license ([LICENSE-MIT](LICENSE-MIT)) ## at your option. ## This file may not be copied, modified, or distributed except according to ## those terms. import std/[oids, strformat] import chronos, chronicles, metrics import ./coder, ../muxer, nimcrypto/utils, ../../stream/[bufferstream, connection, streamseq], ../../peerinfo export connection logScope: topics = "mplexchannel" ## Channel half-closed states ## ## | State | Closed local | Closed remote ## |============================================= ## | Read | Yes (until EOF) | No ## | Write | No | Yes ## ## Channels are considered fully closed when both outgoing and incoming ## directions are closed and when the reader of the channel has read the ## EOF marker const MaxWrites = 1024 ##\ ## Maximum number of in-flight writes - after this, we disconnect the peer LPChannelTrackerName* = "LPChannel" type LPChannel* = ref object of BufferStream id*: uint64 # channel id name*: string # name of the channel (for debugging) conn*: Connection # wrapped connection used to for writing initiator*: bool # initiated remotely or locally flag isOpen*: bool # has channel been opened closedLocal*: bool # has channel been closed locally msgCode*: MessageType # cached in/out message code closeCode*: MessageType # cached in/out close code resetCode*: MessageType # cached in/out reset code writes*: int # In-flight writes proc open*(s: LPChannel) {.async, gcsafe.} func shortLog*(s: LPChannel): auto = if s.isNil: "LPChannel(nil)" elif s.conn.peerInfo.isNil: $s.oid elif s.name != $s.oid and s.name.len > 0: &"{shortLog(s.conn.peerInfo.peerId)}:{s.oid}:{s.name}" else: &"{shortLog(s.conn.peerInfo.peerId)}:{s.oid}" chronicles.formatIt(LPChannel): shortLog(it) proc open*(s: LPChannel) {.async, gcsafe.} = trace "Opening channel", s, conn = s.conn await s.conn.writeMsg(s.id, MessageType.New, s.name) s.isOpen = true method closed*(s: LPChannel): bool = s.closedLocal proc closeUnderlying(s: LPChannel): Future[void] {.async.} = ## Channels may be closed for reading and writing in any order - we'll close ## the underlying bufferstream when both directions are closed if s.closedLocal and s.atEof(): await procCall BufferStream(s).close() proc reset*(s: LPChannel) {.async, gcsafe.} = if s.isClosed: trace "Already closed", s return s.isClosed = true trace "Resetting channel", s, len = s.len # First, make sure any new calls to `readOnce` and `pushData` etc will fail - # there may already be such calls in the event queue however s.closedLocal = true s.isEof = true s.readBuf = StreamSeq() s.pushedEof = true let pushing = s.pushing # s.pushing changes while iterating for i in 0..= MaxWrites: debug "Closing connection, too many in-flight writes on channel", s, conn = s.conn, writes = s.writes await s.reset() await s.conn.close() return s.writes += 1 try: if not s.isOpen: await s.open() # writes should happen in sequence trace "write msg", s, conn = s.conn, len = msg.len await s.conn.writeMsg(s.id, s.msgCode, msg) s.activity = true except CatchableError as exc: trace "exception in lpchannel write handler", s, msg = exc.msg await s.reset() await s.conn.close() raise exc finally: s.writes -= 1 proc init*( L: type LPChannel, id: uint64, conn: Connection, initiator: bool, name: string = "", timeout: Duration = DefaultChanTimeout): LPChannel = let chann = L( id: id, name: name, conn: conn, initiator: initiator, timeout: timeout, isOpen: if initiator: false else: true, msgCode: if initiator: MessageType.MsgOut else: MessageType.MsgIn, closeCode: if initiator: MessageType.CloseOut else: MessageType.CloseIn, resetCode: if initiator: MessageType.ResetOut else: MessageType.ResetIn, dir: if initiator: Direction.Out else: Direction.In) chann.initStream() when chronicles.enabledLogLevel == LogLevel.TRACE: chann.name = if chann.name.len > 0: chann.name else: $chann.oid trace "Created new lpchannel", chann, id, initiator return chann