diff --git a/docker/build.bat b/docker/build.bat index c4fe79a3..a3fe6888 100644 --- a/docker/build.bat +++ b/docker/build.bat @@ -1,2 +1,2 @@ -docker build --build-arg MAKE_PARALLEL=4 --build-arg NIMFLAGS="-d:disableMarchNative -d:codex_enable_api_debug_peers=true -d:codex_enable_proof_failures=true -d:codex_use_hardhat=false -d:codex_enable_log_counter=true -d:verify_circuit=true" --build-arg NAT_IP_AUTO=true -t thatbenbierens/nim-codex:blkex-cancelpresence-20-s -f codex.Dockerfile .. -docker push thatbenbierens/nim-codex:blkex-cancelpresence-20-s +docker build --build-arg MAKE_PARALLEL=4 --build-arg NIMFLAGS="-d:disableMarchNative -d:codex_enable_api_debug_peers=true -d:codex_enable_proof_failures=true -d:codex_use_hardhat=false -d:codex_enable_log_counter=true -d:verify_circuit=true" --build-arg NAT_IP_AUTO=true -t thatbenbierens/nim-codex:blkex-cancelpresence-25-f -f codex.Dockerfile .. +docker push thatbenbierens/nim-codex:blkex-cancelpresence-25-f diff --git a/docker/chronosstream.nim b/docker/chronosstream.nim new file mode 100644 index 00000000..a92a25aa --- /dev/null +++ b/docker/chronosstream.nim @@ -0,0 +1,175 @@ +# Nim-LibP2P +# Copyright (c) 2023 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +{.push raises: [].} + +import std/[strformat] +import std/monotimes +import stew/results +import chronos, chronicles, metrics +import connection +import ../utility + +export results + +logScope: + topics = "libp2p chronosstream custom" + +const + DefaultChronosStreamTimeout = 10.minutes + ChronosStreamTrackerName* = "ChronosStream" + +type + ChronosStream* = ref object of Connection + client: StreamTransport + when defined(libp2p_agents_metrics): + tracked: bool + +when defined(libp2p_agents_metrics): + declareGauge(libp2p_peers_identity, "peers identities", labels = ["agent"]) + declareCounter(libp2p_peers_traffic_read, "incoming traffic", labels = ["agent"]) + declareCounter(libp2p_peers_traffic_write, "outgoing traffic", labels = ["agent"]) + +declareCounter(libp2p_network_bytes, "total traffic", labels = ["direction"]) + +func shortLog*(conn: ChronosStream): auto = + try: + if conn.isNil: "ChronosStream(nil)" + else: &"{shortLog(conn.peerId)}:{conn.oid}" + except ValueError as exc: + raise newException(Defect, exc.msg) + +chronicles.formatIt(ChronosStream): shortLog(it) + +method initStream*(s: ChronosStream) = + if s.objName.len == 0: + s.objName = ChronosStreamTrackerName + + s.timeoutHandler = proc() {.async.} = + trace "Idle timeout expired, closing ChronosStream", s + await s.close() + + procCall Connection(s).initStream() + +proc init*(C: type ChronosStream, + client: StreamTransport, + dir: Direction, + timeout = DefaultChronosStreamTimeout, + observedAddr: Opt[MultiAddress]): ChronosStream = + result = C(client: client, + timeout: timeout, + dir: dir, + observedAddr: observedAddr) + result.initStream() + +template withExceptions(body: untyped) = + try: + body + except CancelledError as exc: + raise exc + except TransportIncompleteError: + # for all intents and purposes this is an EOF + raise newLPStreamIncompleteError() + except TransportLimitError: + raise newLPStreamLimitError() + except TransportUseClosedError: + raise newLPStreamEOFError() + except TransportError: + # TODO https://github.com/status-im/nim-chronos/pull/99 + raise newLPStreamEOFError() + +when defined(libp2p_agents_metrics): + proc trackPeerIdentity(s: ChronosStream) = + if not s.tracked and s.shortAgent.len > 0: + libp2p_peers_identity.inc(labelValues = [s.shortAgent]) + s.tracked = true + + proc untrackPeerIdentity(s: ChronosStream) = + if s.tracked: + libp2p_peers_identity.dec(labelValues = [s.shortAgent]) + s.tracked = false + +method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.async.} = + if s.atEof: + raise newLPStreamEOFError() + withExceptions: + let start = getMonoTime().ticks + result = await s.client.readOnce(pbytes, nbytes) + let timeTaken = getMonoTime().ticks - start + trace "chronosread", ticks = timeTaken + s.activity = true # reset activity flag + libp2p_network_bytes.inc(result.int64, labelValues = ["in"]) + when defined(libp2p_agents_metrics): + s.trackPeerIdentity() + if s.tracked: + libp2p_peers_traffic_read.inc(result.int64, labelValues = [s.shortAgent]) + +proc completeWrite( + s: ChronosStream, fut: Future[int], msgLen: int): Future[void] {.async.} = + withExceptions: + # StreamTransport will only return written < msg.len on fatal failures where + # further writing is not possible - in such cases, we'll raise here, + # since we don't return partial writes lengths + let start = getMonoTime().ticks + var written = await fut + let timeTaken = getMonoTime().ticks - start + trace "chronoswrite", ticks = timeTaken + + if written < msgLen: + raise (ref LPStreamClosedError)(msg: "Write couldn't finish writing") + + s.activity = true # reset activity flag + libp2p_network_bytes.inc(msgLen.int64, labelValues = ["out"]) + when defined(libp2p_agents_metrics): + s.trackPeerIdentity() + if s.tracked: + libp2p_peers_traffic_write.inc(msgLen.int64, labelValues = [s.shortAgent]) + +method write*(s: ChronosStream, msg: seq[byte]): Future[void] = + # Avoid a copy of msg being kept in the closure created by `{.async.}` as this + # drives up memory usage + if msg.len == 0: + trace "Empty byte seq, nothing to write" + let fut = newFuture[void]("chronosstream.write.empty") + fut.complete() + return fut + if s.closed: + let fut = newFuture[void]("chronosstream.write.closed") + fut.fail(newLPStreamClosedError()) + return fut + + s.completeWrite(s.client.write(msg), msg.len) + +method closed*(s: ChronosStream): bool = + s.client.closed + +method atEof*(s: ChronosStream): bool = + s.client.atEof() + +method closeImpl*(s: ChronosStream) {.async.} = + try: + trace "Shutting down chronos stream", address = $s.client.remoteAddress(), s + + if not s.client.closed(): + await s.client.closeWait() + + trace "Shutdown chronos stream", address = $s.client.remoteAddress(), s + + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "Error closing chronosstream", s, msg = exc.msg + + when defined(libp2p_agents_metrics): + # do this after closing! + s.untrackPeerIdentity() + + await procCall Connection(s).closeImpl() + +method getWrapped*(s: ChronosStream): Connection = nil diff --git a/docker/codex.Dockerfile b/docker/codex.Dockerfile index 66261118..be949764 100644 --- a/docker/codex.Dockerfile +++ b/docker/codex.Dockerfile @@ -27,6 +27,8 @@ RUN echo "export PATH=$PATH:$HOME/.cargo/bin" >> $BASH_ENV WORKDIR ${BUILD_HOME} COPY . . RUN make -j ${MAKE_PARALLEL} update +COPY ./docker/lpchannel.nim ./vendor/nim-libp2p/libp2p/muxers/mplex/lpchannel.nim +COPY ./docker/chronosstream.nim ./vendor/nim-libp2p/libp2p/stream/chronosstream.nim RUN make -j ${MAKE_PARALLEL} # Create diff --git a/docker/lpchannel.nim b/docker/lpchannel.nim new file mode 100644 index 00000000..2416da03 --- /dev/null +++ b/docker/lpchannel.nim @@ -0,0 +1,307 @@ +# Nim-LibP2P +# Copyright (c) 2023 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +{.push raises: [].} + +import std/[oids, strformat] +import pkg/[chronos, chronicles, metrics] +import ./coder, + ../muxer, + ../../stream/[bufferstream, connection, streamseq], + ../../peerinfo + +export connection + +logScope: + topics = "libp2p mplexchannel custom" + +when defined(libp2p_mplex_metrics): + declareHistogram libp2p_mplex_qlen, "message queue length", + buckets = [0.0, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0] + declareCounter libp2p_mplex_qlenclose, "closed because of max queuelen" + declareHistogram libp2p_mplex_qtime, "message queuing time" + +when defined(libp2p_network_protocols_metrics): + declareCounter libp2p_protocols_bytes, "total sent or received bytes", ["protocol", "direction"] + +## Channel half-closed states +## +## | State | Closed local | Closed remote +## |============================================= +## | Read | Yes (until EOF) | No +## | Write | No | Yes +## +## Channels are considered fully closed when both outgoing and incoming +## directions are closed and when the reader of the channel has read the +## EOF marker + +const + MaxWrites = 1024 ##\ + ## Maximum number of in-flight writes - after this, we disconnect the peer + + LPChannelTrackerName* = "LPChannel" + +type + LPChannel* = ref object of BufferStream + id*: uint64 # channel id + name*: string # name of the channel (for debugging) + conn*: Connection # wrapped connection used to for writing + initiator*: bool # initiated remotely or locally flag + isOpen*: bool # has channel been opened + closedLocal*: bool # has channel been closed locally + remoteReset*: bool # has channel been remotely reset + localReset*: bool # has channel been reset locally + msgCode*: MessageType # cached in/out message code + closeCode*: MessageType # cached in/out close code + resetCode*: MessageType # cached in/out reset code + writes*: int # In-flight writes + +func shortLog*(s: LPChannel): auto = + try: + if s.isNil: "LPChannel(nil)" + elif s.name != $s.oid and s.name.len > 0: + &"{shortLog(s.conn.peerId)}:{s.oid}:{s.name}" + else: &"{shortLog(s.conn.peerId)}:{s.oid}" + except ValueError as exc: + raise newException(Defect, exc.msg) + +chronicles.formatIt(LPChannel): shortLog(it) + +proc open*(s: LPChannel) {.async.} = + trace "Opening channel", s, conn = s.conn + if s.conn.isClosed: + return + try: + await s.conn.writeMsg(s.id, MessageType.New, s.name) + s.isOpen = true + except CancelledError as exc: + raise exc + except CatchableError as exc: + await s.conn.close() + raise exc + +method closed*(s: LPChannel): bool = + s.closedLocal + +proc closeUnderlying(s: LPChannel): Future[void] {.async.} = + ## Channels may be closed for reading and writing in any order - we'll close + ## the underlying bufferstream when both directions are closed + if s.closedLocal and s.atEof(): + await procCall BufferStream(s).close() + +proc reset*(s: LPChannel) {.async.} = + if s.isClosed: + trace "Already closed", s + return + + s.isClosed = true + s.closedLocal = true + s.localReset = not s.remoteReset + + trace "Resetting channel", s, len = s.len + + if s.isOpen and not s.conn.isClosed: + # If the connection is still active, notify the other end + proc resetMessage() {.async.} = + try: + trace "sending reset message", s, conn = s.conn + await s.conn.writeMsg(s.id, s.resetCode) # write reset + except CatchableError as exc: + # No cancellations + await s.conn.close() + trace "Can't send reset message", s, conn = s.conn, msg = exc.msg + + asyncSpawn resetMessage() + + await s.closeImpl() # noraises, nocancels + + trace "Channel reset", s + +method close*(s: LPChannel) {.async.} = + ## Close channel for writing - a message will be sent to the other peer + ## informing them that the channel is closed and that we're waiting for + ## their acknowledgement. + if s.closedLocal: + trace "Already closed", s + return + s.closedLocal = true + + trace "Closing channel", s, conn = s.conn, len = s.len + + if s.isOpen and not s.conn.isClosed: + try: + await s.conn.writeMsg(s.id, s.closeCode) # write close + except CancelledError as exc: + await s.conn.close() + raise exc + except CatchableError as exc: + # It's harmless that close message cannot be sent - the connection is + # likely down already + await s.conn.close() + trace "Cannot send close message", s, id = s.id, msg = exc.msg + + await s.closeUnderlying() # maybe already eofed + + trace "Closed channel", s, len = s.len + +method initStream*(s: LPChannel) = + if s.objName.len == 0: + s.objName = LPChannelTrackerName + + s.timeoutHandler = proc(): Future[void] {.gcsafe.} = + trace "Idle timeout expired, resetting LPChannel", s + s.reset() + + procCall BufferStream(s).initStream() + +method readOnce*(s: LPChannel, + pbytes: pointer, + nbytes: int): + Future[int] {.async.} = + ## Mplex relies on reading being done regularly from every channel, or all + ## channels are blocked - in particular, this means that reading from one + ## channel must not be done from within a callback / read handler of another + ## or the reads will lock each other. + if s.remoteReset: + raise newLPStreamResetError() + if s.localReset: + raise newLPStreamClosedError() + if s.atEof(): + raise newLPStreamRemoteClosedError() + if s.conn.closed: + raise newLPStreamConnDownError() + try: + let bytes = await procCall BufferStream(s).readOnce(pbytes, nbytes) + when defined(libp2p_network_protocols_metrics): + if s.protocol.len > 0: + libp2p_protocols_bytes.inc(bytes.int64, labelValues=[s.protocol, "in"]) + + trace "readOnce", s, bytes + if bytes == 0: + await s.closeUnderlying() + return bytes + except CatchableError as exc: + # readOnce in BufferStream generally raises on EOF or cancellation - for + # the former, resetting is harmless, for the latter it's necessary because + # data has been lost in s.readBuf and there's no way to gracefully recover / + # use the channel any more + await s.reset() + raise newLPStreamConnDownError(exc) + +proc prepareWrite(s: LPChannel, msg: seq[byte]): Future[void] {.async.} = + # prepareWrite is the slow path of writing a message - see conditions in + # write + if s.remoteReset: + raise newLPStreamResetError() + if s.closedLocal: + raise newLPStreamClosedError() + if s.conn.closed: + raise newLPStreamConnDownError() + + if msg.len == 0: + return + + if s.writes >= MaxWrites: + debug "Closing connection, too many in-flight writes on channel", + s, conn = s.conn, writes = s.writes + when defined(libp2p_mplex_metrics): + libp2p_mplex_qlenclose.inc() + await s.reset() + await s.conn.close() + return + + if not s.isOpen: + await s.open() + + await s.conn.writeMsg(s.id, s.msgCode, msg) + +proc completeWrite( + s: LPChannel, fut: Future[void], msgLen: int): Future[void] {.async.} = + try: + s.writes += 1 + + when defined(libp2p_mplex_metrics): + libp2p_mplex_qlen.observe(s.writes.int64 - 1) + libp2p_mplex_qtime.time: + await fut + else: + await fut + + when defined(libp2p_network_protocols_metrics): + if s.protocol.len > 0: + libp2p_protocols_bytes.inc(msgLen.int64, labelValues=[s.protocol, "out"]) + + s.activity = true + except CancelledError as exc: + # Chronos may still send the data + raise exc + except LPStreamConnDownError as exc: + await s.reset() + await s.conn.close() + raise exc + except LPStreamEOFError as exc: + raise exc + except CatchableError as exc: + trace "exception in lpchannel write handler", s, msg = exc.msg + await s.reset() + await s.conn.close() + raise newLPStreamConnDownError(exc) + finally: + s.writes -= 1 + +method write*(s: LPChannel, msg: seq[byte]): Future[void] = + ## Write to mplex channel - there may be up to MaxWrite concurrent writes + ## pending after which the peer is disconnected + + let + closed = s.closedLocal or s.conn.closed + + let fut = + if (not closed) and msg.len > 0 and s.writes < MaxWrites and s.isOpen: + # Fast path: Avoid a copy of msg being kept in the closure created by + # `{.async.}` as this drives up memory usage - the conditions are laid out + # in prepareWrite + trace "lpc-write-fast" + s.conn.writeMsg(s.id, s.msgCode, msg) + else: + trace "lpc-write-prepare" + prepareWrite(s, msg) + + s.completeWrite(fut, msg.len) + +method getWrapped*(s: LPChannel): Connection = s.conn + +proc init*( + L: type LPChannel, + id: uint64, + conn: Connection, + initiator: bool, + name: string = "", + timeout: Duration = DefaultChanTimeout): LPChannel = + + let chann = L( + id: id, + name: name, + conn: conn, + initiator: initiator, + timeout: timeout, + isOpen: if initiator: false else: true, + msgCode: if initiator: MessageType.MsgOut else: MessageType.MsgIn, + closeCode: if initiator: MessageType.CloseOut else: MessageType.CloseIn, + resetCode: if initiator: MessageType.ResetOut else: MessageType.ResetIn, + dir: if initiator: Direction.Out else: Direction.In) + + chann.initStream() + + when chronicles.enabledLogLevel == LogLevel.TRACE: + chann.name = if chann.name.len > 0: chann.name else: $chann.oid + + trace "Created new lpchannel", s = chann, id, initiator + + return chann