testing: chronos stream reads and writes

This commit is contained in:
Ben 2025-01-09 11:52:48 +01:00
parent d9a821e7be
commit 87e800db34
No known key found for this signature in database
GPG Key ID: 0F16E812E736C24B
4 changed files with 486 additions and 2 deletions

View File

@ -1,2 +1,2 @@
docker build --build-arg MAKE_PARALLEL=4 --build-arg NIMFLAGS="-d:disableMarchNative -d:codex_enable_api_debug_peers=true -d:codex_enable_proof_failures=true -d:codex_use_hardhat=false -d:codex_enable_log_counter=true -d:verify_circuit=true" --build-arg NAT_IP_AUTO=true -t thatbenbierens/nim-codex:blkex-cancelpresence-20-s -f codex.Dockerfile ..
docker push thatbenbierens/nim-codex:blkex-cancelpresence-20-s
docker build --build-arg MAKE_PARALLEL=4 --build-arg NIMFLAGS="-d:disableMarchNative -d:codex_enable_api_debug_peers=true -d:codex_enable_proof_failures=true -d:codex_use_hardhat=false -d:codex_enable_log_counter=true -d:verify_circuit=true" --build-arg NAT_IP_AUTO=true -t thatbenbierens/nim-codex:blkex-cancelpresence-25-f -f codex.Dockerfile ..
docker push thatbenbierens/nim-codex:blkex-cancelpresence-25-f

175
docker/chronosstream.nim Normal file
View File

@ -0,0 +1,175 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [].}
import std/[strformat]
import std/monotimes
import stew/results
import chronos, chronicles, metrics
import connection
import ../utility
export results
logScope:
topics = "libp2p chronosstream custom"
const
DefaultChronosStreamTimeout = 10.minutes
ChronosStreamTrackerName* = "ChronosStream"
type
ChronosStream* = ref object of Connection
client: StreamTransport
when defined(libp2p_agents_metrics):
tracked: bool
when defined(libp2p_agents_metrics):
declareGauge(libp2p_peers_identity, "peers identities", labels = ["agent"])
declareCounter(libp2p_peers_traffic_read, "incoming traffic", labels = ["agent"])
declareCounter(libp2p_peers_traffic_write, "outgoing traffic", labels = ["agent"])
declareCounter(libp2p_network_bytes, "total traffic", labels = ["direction"])
func shortLog*(conn: ChronosStream): auto =
try:
if conn.isNil: "ChronosStream(nil)"
else: &"{shortLog(conn.peerId)}:{conn.oid}"
except ValueError as exc:
raise newException(Defect, exc.msg)
chronicles.formatIt(ChronosStream): shortLog(it)
method initStream*(s: ChronosStream) =
if s.objName.len == 0:
s.objName = ChronosStreamTrackerName
s.timeoutHandler = proc() {.async.} =
trace "Idle timeout expired, closing ChronosStream", s
await s.close()
procCall Connection(s).initStream()
proc init*(C: type ChronosStream,
client: StreamTransport,
dir: Direction,
timeout = DefaultChronosStreamTimeout,
observedAddr: Opt[MultiAddress]): ChronosStream =
result = C(client: client,
timeout: timeout,
dir: dir,
observedAddr: observedAddr)
result.initStream()
template withExceptions(body: untyped) =
try:
body
except CancelledError as exc:
raise exc
except TransportIncompleteError:
# for all intents and purposes this is an EOF
raise newLPStreamIncompleteError()
except TransportLimitError:
raise newLPStreamLimitError()
except TransportUseClosedError:
raise newLPStreamEOFError()
except TransportError:
# TODO https://github.com/status-im/nim-chronos/pull/99
raise newLPStreamEOFError()
when defined(libp2p_agents_metrics):
proc trackPeerIdentity(s: ChronosStream) =
if not s.tracked and s.shortAgent.len > 0:
libp2p_peers_identity.inc(labelValues = [s.shortAgent])
s.tracked = true
proc untrackPeerIdentity(s: ChronosStream) =
if s.tracked:
libp2p_peers_identity.dec(labelValues = [s.shortAgent])
s.tracked = false
method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.async.} =
if s.atEof:
raise newLPStreamEOFError()
withExceptions:
let start = getMonoTime().ticks
result = await s.client.readOnce(pbytes, nbytes)
let timeTaken = getMonoTime().ticks - start
trace "chronosread", ticks = timeTaken
s.activity = true # reset activity flag
libp2p_network_bytes.inc(result.int64, labelValues = ["in"])
when defined(libp2p_agents_metrics):
s.trackPeerIdentity()
if s.tracked:
libp2p_peers_traffic_read.inc(result.int64, labelValues = [s.shortAgent])
proc completeWrite(
s: ChronosStream, fut: Future[int], msgLen: int): Future[void] {.async.} =
withExceptions:
# StreamTransport will only return written < msg.len on fatal failures where
# further writing is not possible - in such cases, we'll raise here,
# since we don't return partial writes lengths
let start = getMonoTime().ticks
var written = await fut
let timeTaken = getMonoTime().ticks - start
trace "chronoswrite", ticks = timeTaken
if written < msgLen:
raise (ref LPStreamClosedError)(msg: "Write couldn't finish writing")
s.activity = true # reset activity flag
libp2p_network_bytes.inc(msgLen.int64, labelValues = ["out"])
when defined(libp2p_agents_metrics):
s.trackPeerIdentity()
if s.tracked:
libp2p_peers_traffic_write.inc(msgLen.int64, labelValues = [s.shortAgent])
method write*(s: ChronosStream, msg: seq[byte]): Future[void] =
# Avoid a copy of msg being kept in the closure created by `{.async.}` as this
# drives up memory usage
if msg.len == 0:
trace "Empty byte seq, nothing to write"
let fut = newFuture[void]("chronosstream.write.empty")
fut.complete()
return fut
if s.closed:
let fut = newFuture[void]("chronosstream.write.closed")
fut.fail(newLPStreamClosedError())
return fut
s.completeWrite(s.client.write(msg), msg.len)
method closed*(s: ChronosStream): bool =
s.client.closed
method atEof*(s: ChronosStream): bool =
s.client.atEof()
method closeImpl*(s: ChronosStream) {.async.} =
try:
trace "Shutting down chronos stream", address = $s.client.remoteAddress(), s
if not s.client.closed():
await s.client.closeWait()
trace "Shutdown chronos stream", address = $s.client.remoteAddress(), s
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "Error closing chronosstream", s, msg = exc.msg
when defined(libp2p_agents_metrics):
# do this after closing!
s.untrackPeerIdentity()
await procCall Connection(s).closeImpl()
method getWrapped*(s: ChronosStream): Connection = nil

View File

@ -27,6 +27,8 @@ RUN echo "export PATH=$PATH:$HOME/.cargo/bin" >> $BASH_ENV
WORKDIR ${BUILD_HOME}
COPY . .
RUN make -j ${MAKE_PARALLEL} update
COPY ./docker/lpchannel.nim ./vendor/nim-libp2p/libp2p/muxers/mplex/lpchannel.nim
COPY ./docker/chronosstream.nim ./vendor/nim-libp2p/libp2p/stream/chronosstream.nim
RUN make -j ${MAKE_PARALLEL}
# Create

307
docker/lpchannel.nim Normal file
View File

@ -0,0 +1,307 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [].}
import std/[oids, strformat]
import pkg/[chronos, chronicles, metrics]
import ./coder,
../muxer,
../../stream/[bufferstream, connection, streamseq],
../../peerinfo
export connection
logScope:
topics = "libp2p mplexchannel custom"
when defined(libp2p_mplex_metrics):
declareHistogram libp2p_mplex_qlen, "message queue length",
buckets = [0.0, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0]
declareCounter libp2p_mplex_qlenclose, "closed because of max queuelen"
declareHistogram libp2p_mplex_qtime, "message queuing time"
when defined(libp2p_network_protocols_metrics):
declareCounter libp2p_protocols_bytes, "total sent or received bytes", ["protocol", "direction"]
## Channel half-closed states
##
## | State | Closed local | Closed remote
## |=============================================
## | Read | Yes (until EOF) | No
## | Write | No | Yes
##
## Channels are considered fully closed when both outgoing and incoming
## directions are closed and when the reader of the channel has read the
## EOF marker
const
MaxWrites = 1024 ##\
## Maximum number of in-flight writes - after this, we disconnect the peer
LPChannelTrackerName* = "LPChannel"
type
LPChannel* = ref object of BufferStream
id*: uint64 # channel id
name*: string # name of the channel (for debugging)
conn*: Connection # wrapped connection used to for writing
initiator*: bool # initiated remotely or locally flag
isOpen*: bool # has channel been opened
closedLocal*: bool # has channel been closed locally
remoteReset*: bool # has channel been remotely reset
localReset*: bool # has channel been reset locally
msgCode*: MessageType # cached in/out message code
closeCode*: MessageType # cached in/out close code
resetCode*: MessageType # cached in/out reset code
writes*: int # In-flight writes
func shortLog*(s: LPChannel): auto =
try:
if s.isNil: "LPChannel(nil)"
elif s.name != $s.oid and s.name.len > 0:
&"{shortLog(s.conn.peerId)}:{s.oid}:{s.name}"
else: &"{shortLog(s.conn.peerId)}:{s.oid}"
except ValueError as exc:
raise newException(Defect, exc.msg)
chronicles.formatIt(LPChannel): shortLog(it)
proc open*(s: LPChannel) {.async.} =
trace "Opening channel", s, conn = s.conn
if s.conn.isClosed:
return
try:
await s.conn.writeMsg(s.id, MessageType.New, s.name)
s.isOpen = true
except CancelledError as exc:
raise exc
except CatchableError as exc:
await s.conn.close()
raise exc
method closed*(s: LPChannel): bool =
s.closedLocal
proc closeUnderlying(s: LPChannel): Future[void] {.async.} =
## Channels may be closed for reading and writing in any order - we'll close
## the underlying bufferstream when both directions are closed
if s.closedLocal and s.atEof():
await procCall BufferStream(s).close()
proc reset*(s: LPChannel) {.async.} =
if s.isClosed:
trace "Already closed", s
return
s.isClosed = true
s.closedLocal = true
s.localReset = not s.remoteReset
trace "Resetting channel", s, len = s.len
if s.isOpen and not s.conn.isClosed:
# If the connection is still active, notify the other end
proc resetMessage() {.async.} =
try:
trace "sending reset message", s, conn = s.conn
await s.conn.writeMsg(s.id, s.resetCode) # write reset
except CatchableError as exc:
# No cancellations
await s.conn.close()
trace "Can't send reset message", s, conn = s.conn, msg = exc.msg
asyncSpawn resetMessage()
await s.closeImpl() # noraises, nocancels
trace "Channel reset", s
method close*(s: LPChannel) {.async.} =
## Close channel for writing - a message will be sent to the other peer
## informing them that the channel is closed and that we're waiting for
## their acknowledgement.
if s.closedLocal:
trace "Already closed", s
return
s.closedLocal = true
trace "Closing channel", s, conn = s.conn, len = s.len
if s.isOpen and not s.conn.isClosed:
try:
await s.conn.writeMsg(s.id, s.closeCode) # write close
except CancelledError as exc:
await s.conn.close()
raise exc
except CatchableError as exc:
# It's harmless that close message cannot be sent - the connection is
# likely down already
await s.conn.close()
trace "Cannot send close message", s, id = s.id, msg = exc.msg
await s.closeUnderlying() # maybe already eofed
trace "Closed channel", s, len = s.len
method initStream*(s: LPChannel) =
if s.objName.len == 0:
s.objName = LPChannelTrackerName
s.timeoutHandler = proc(): Future[void] {.gcsafe.} =
trace "Idle timeout expired, resetting LPChannel", s
s.reset()
procCall BufferStream(s).initStream()
method readOnce*(s: LPChannel,
pbytes: pointer,
nbytes: int):
Future[int] {.async.} =
## Mplex relies on reading being done regularly from every channel, or all
## channels are blocked - in particular, this means that reading from one
## channel must not be done from within a callback / read handler of another
## or the reads will lock each other.
if s.remoteReset:
raise newLPStreamResetError()
if s.localReset:
raise newLPStreamClosedError()
if s.atEof():
raise newLPStreamRemoteClosedError()
if s.conn.closed:
raise newLPStreamConnDownError()
try:
let bytes = await procCall BufferStream(s).readOnce(pbytes, nbytes)
when defined(libp2p_network_protocols_metrics):
if s.protocol.len > 0:
libp2p_protocols_bytes.inc(bytes.int64, labelValues=[s.protocol, "in"])
trace "readOnce", s, bytes
if bytes == 0:
await s.closeUnderlying()
return bytes
except CatchableError as exc:
# readOnce in BufferStream generally raises on EOF or cancellation - for
# the former, resetting is harmless, for the latter it's necessary because
# data has been lost in s.readBuf and there's no way to gracefully recover /
# use the channel any more
await s.reset()
raise newLPStreamConnDownError(exc)
proc prepareWrite(s: LPChannel, msg: seq[byte]): Future[void] {.async.} =
# prepareWrite is the slow path of writing a message - see conditions in
# write
if s.remoteReset:
raise newLPStreamResetError()
if s.closedLocal:
raise newLPStreamClosedError()
if s.conn.closed:
raise newLPStreamConnDownError()
if msg.len == 0:
return
if s.writes >= MaxWrites:
debug "Closing connection, too many in-flight writes on channel",
s, conn = s.conn, writes = s.writes
when defined(libp2p_mplex_metrics):
libp2p_mplex_qlenclose.inc()
await s.reset()
await s.conn.close()
return
if not s.isOpen:
await s.open()
await s.conn.writeMsg(s.id, s.msgCode, msg)
proc completeWrite(
s: LPChannel, fut: Future[void], msgLen: int): Future[void] {.async.} =
try:
s.writes += 1
when defined(libp2p_mplex_metrics):
libp2p_mplex_qlen.observe(s.writes.int64 - 1)
libp2p_mplex_qtime.time:
await fut
else:
await fut
when defined(libp2p_network_protocols_metrics):
if s.protocol.len > 0:
libp2p_protocols_bytes.inc(msgLen.int64, labelValues=[s.protocol, "out"])
s.activity = true
except CancelledError as exc:
# Chronos may still send the data
raise exc
except LPStreamConnDownError as exc:
await s.reset()
await s.conn.close()
raise exc
except LPStreamEOFError as exc:
raise exc
except CatchableError as exc:
trace "exception in lpchannel write handler", s, msg = exc.msg
await s.reset()
await s.conn.close()
raise newLPStreamConnDownError(exc)
finally:
s.writes -= 1
method write*(s: LPChannel, msg: seq[byte]): Future[void] =
## Write to mplex channel - there may be up to MaxWrite concurrent writes
## pending after which the peer is disconnected
let
closed = s.closedLocal or s.conn.closed
let fut =
if (not closed) and msg.len > 0 and s.writes < MaxWrites and s.isOpen:
# Fast path: Avoid a copy of msg being kept in the closure created by
# `{.async.}` as this drives up memory usage - the conditions are laid out
# in prepareWrite
trace "lpc-write-fast"
s.conn.writeMsg(s.id, s.msgCode, msg)
else:
trace "lpc-write-prepare"
prepareWrite(s, msg)
s.completeWrite(fut, msg.len)
method getWrapped*(s: LPChannel): Connection = s.conn
proc init*(
L: type LPChannel,
id: uint64,
conn: Connection,
initiator: bool,
name: string = "",
timeout: Duration = DefaultChanTimeout): LPChannel =
let chann = L(
id: id,
name: name,
conn: conn,
initiator: initiator,
timeout: timeout,
isOpen: if initiator: false else: true,
msgCode: if initiator: MessageType.MsgOut else: MessageType.MsgIn,
closeCode: if initiator: MessageType.CloseOut else: MessageType.CloseIn,
resetCode: if initiator: MessageType.ResetOut else: MessageType.ResetIn,
dir: if initiator: Direction.Out else: Direction.In)
chann.initStream()
when chronicles.enabledLogLevel == LogLevel.TRACE:
chann.name = if chann.name.len > 0: chann.name else: $chann.oid
trace "Created new lpchannel", s = chann, id, initiator
return chann