From c49932b55ad75d4fab7f0485aa0425d447c7c5aa Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Tue, 14 Dec 2021 10:55:17 +0100 Subject: [PATCH] fast path for writes (#659) avoids several copies of the various message buffers being kept alive for the lifetime of the future --- libp2p/muxers/mplex/lpchannel.nim | 43 ++++++++++++++++++++++--------- libp2p/protocols/secure/noise.nim | 27 ++++++++++++------- libp2p/stream/chronosstream.nim | 30 +++++++++++---------- 3 files changed, 65 insertions(+), 35 deletions(-) diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 44d9e6e19..224c6b408 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -175,9 +175,9 @@ method readOnce*(s: LPChannel, await s.reset() raise exc -method write*(s: LPChannel, msg: seq[byte]): Future[void] {.async.} = - ## Write to mplex channel - there may be up to MaxWrite concurrent writes - ## pending after which the peer is disconnected +proc prepareWrite(s: LPChannel, msg: seq[byte]): Future[void] {.async.} = + # prepareWrite is the slow path of writing a message - see conditions in + # write if s.closedLocal or s.conn.closed: raise newLPStreamClosedError() @@ -191,19 +191,20 @@ method write*(s: LPChannel, msg: seq[byte]): Future[void] {.async.} = await s.conn.close() return - s.writes += 1 + if not s.isOpen: + await s.open() + + await s.conn.writeMsg(s.id, s.msgCode, msg) + +proc completeWrite( + s: LPChannel, fut: Future[void], msgLen: int): Future[void] {.async.} = try: - if not s.isOpen: - await s.open() - - # writes should happen in sequence - trace "write msg", s, conn = s.conn, len = msg.len - - await s.conn.writeMsg(s.id, s.msgCode, msg) + s.writes += 1 + await fut when defined(libp2p_network_protocols_metrics): if s.tag.len > 0: - libp2p_protocols_bytes.inc(msg.len.int64, labelValues=[s.tag, "out"]) + libp2p_protocols_bytes.inc(msgLen.int64, labelValues=[s.tag, "out"]) s.activity = true except CatchableError as exc: @@ -214,6 +215,24 @@ method write*(s: LPChannel, msg: seq[byte]): Future[void] {.async.} = finally: s.writes -= 1 +method write*(s: LPChannel, msg: seq[byte]): Future[void] = + ## Write to mplex channel - there may be up to MaxWrite concurrent writes + ## pending after which the peer is disconnected + + let + closed = s.closedLocal or s.conn.closed + + let fut = + if (not closed) and msg.len > 0 and s.writes < MaxWrites and s.isOpen: + # Fast path: Avoid a copy of msg being kept in the closure created by + # `{.async.}` as this drives up memory usage - the conditions are laid out + # in prepareWrite + s.conn.writeMsg(s.id, s.msgCode, msg) + else: + prepareWrite(s, msg) + + s.completeWrite(fut, msg.len) + proc init*( L: type LPChannel, id: uint64, diff --git a/libp2p/protocols/secure/noise.nim b/libp2p/protocols/secure/noise.nim index 79cedb22f..a88fceb18 100644 --- a/libp2p/protocols/secure/noise.nim +++ b/libp2p/protocols/secure/noise.nim @@ -460,10 +460,8 @@ proc encryptFrame( cipherFrame[2 + src.len()..