fast path for writes (#659)
avoids several copies of the various message buffers being kept alive for the lifetime of the future
This commit is contained in:
parent
47a35e26d7
commit
c49932b55a
|
@ -175,9 +175,9 @@ method readOnce*(s: LPChannel,
|
||||||
await s.reset()
|
await s.reset()
|
||||||
raise exc
|
raise exc
|
||||||
|
|
||||||
method write*(s: LPChannel, msg: seq[byte]): Future[void] {.async.} =
|
proc prepareWrite(s: LPChannel, msg: seq[byte]): Future[void] {.async.} =
|
||||||
## Write to mplex channel - there may be up to MaxWrite concurrent writes
|
# prepareWrite is the slow path of writing a message - see conditions in
|
||||||
## pending after which the peer is disconnected
|
# write
|
||||||
if s.closedLocal or s.conn.closed:
|
if s.closedLocal or s.conn.closed:
|
||||||
raise newLPStreamClosedError()
|
raise newLPStreamClosedError()
|
||||||
|
|
||||||
|
@ -191,19 +191,20 @@ method write*(s: LPChannel, msg: seq[byte]): Future[void] {.async.} =
|
||||||
await s.conn.close()
|
await s.conn.close()
|
||||||
return
|
return
|
||||||
|
|
||||||
s.writes += 1
|
if not s.isOpen:
|
||||||
|
await s.open()
|
||||||
|
|
||||||
|
await s.conn.writeMsg(s.id, s.msgCode, msg)
|
||||||
|
|
||||||
|
proc completeWrite(
|
||||||
|
s: LPChannel, fut: Future[void], msgLen: int): Future[void] {.async.} =
|
||||||
try:
|
try:
|
||||||
if not s.isOpen:
|
s.writes += 1
|
||||||
await s.open()
|
|
||||||
|
|
||||||
# writes should happen in sequence
|
|
||||||
trace "write msg", s, conn = s.conn, len = msg.len
|
|
||||||
|
|
||||||
await s.conn.writeMsg(s.id, s.msgCode, msg)
|
|
||||||
|
|
||||||
|
await fut
|
||||||
when defined(libp2p_network_protocols_metrics):
|
when defined(libp2p_network_protocols_metrics):
|
||||||
if s.tag.len > 0:
|
if s.tag.len > 0:
|
||||||
libp2p_protocols_bytes.inc(msg.len.int64, labelValues=[s.tag, "out"])
|
libp2p_protocols_bytes.inc(msgLen.int64, labelValues=[s.tag, "out"])
|
||||||
|
|
||||||
s.activity = true
|
s.activity = true
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
|
@ -214,6 +215,24 @@ method write*(s: LPChannel, msg: seq[byte]): Future[void] {.async.} =
|
||||||
finally:
|
finally:
|
||||||
s.writes -= 1
|
s.writes -= 1
|
||||||
|
|
||||||
|
method write*(s: LPChannel, msg: seq[byte]): Future[void] =
|
||||||
|
## Write to mplex channel - there may be up to MaxWrite concurrent writes
|
||||||
|
## pending after which the peer is disconnected
|
||||||
|
|
||||||
|
let
|
||||||
|
closed = s.closedLocal or s.conn.closed
|
||||||
|
|
||||||
|
let fut =
|
||||||
|
if (not closed) and msg.len > 0 and s.writes < MaxWrites and s.isOpen:
|
||||||
|
# Fast path: Avoid a copy of msg being kept in the closure created by
|
||||||
|
# `{.async.}` as this drives up memory usage - the conditions are laid out
|
||||||
|
# in prepareWrite
|
||||||
|
s.conn.writeMsg(s.id, s.msgCode, msg)
|
||||||
|
else:
|
||||||
|
prepareWrite(s, msg)
|
||||||
|
|
||||||
|
s.completeWrite(fut, msg.len)
|
||||||
|
|
||||||
proc init*(
|
proc init*(
|
||||||
L: type LPChannel,
|
L: type LPChannel,
|
||||||
id: uint64,
|
id: uint64,
|
||||||
|
|
|
@ -460,10 +460,8 @@ proc encryptFrame(
|
||||||
|
|
||||||
cipherFrame[2 + src.len()..<cipherFrame.len] = tag
|
cipherFrame[2 + src.len()..<cipherFrame.len] = tag
|
||||||
|
|
||||||
method write*(sconn: NoiseConnection, message: seq[byte]): Future[void] {.async.} =
|
method write*(sconn: NoiseConnection, message: seq[byte]): Future[void] =
|
||||||
if message.len == 0:
|
# Fast path: `{.async.}` would introduce a copy of `message`
|
||||||
return
|
|
||||||
|
|
||||||
const FramingSize = 2 + sizeof(ChaChaPolyTag)
|
const FramingSize = 2 + sizeof(ChaChaPolyTag)
|
||||||
|
|
||||||
let
|
let
|
||||||
|
@ -479,10 +477,16 @@ method write*(sconn: NoiseConnection, message: seq[byte]): Future[void] {.async.
|
||||||
let
|
let
|
||||||
chunkSize = min(MaxPlainSize, left)
|
chunkSize = min(MaxPlainSize, left)
|
||||||
|
|
||||||
encryptFrame(
|
try:
|
||||||
sconn,
|
encryptFrame(
|
||||||
cipherFrames.toOpenArray(woffset, woffset + chunkSize + FramingSize - 1),
|
sconn,
|
||||||
message.toOpenArray(offset, offset + chunkSize - 1))
|
cipherFrames.toOpenArray(woffset, woffset + chunkSize + FramingSize - 1),
|
||||||
|
message.toOpenArray(offset, offset + chunkSize - 1))
|
||||||
|
except NoiseNonceMaxError as exc:
|
||||||
|
debug "Noise nonce exceeded"
|
||||||
|
let fut = newFuture[void]("noise.write.nonce")
|
||||||
|
fut.fail(exc)
|
||||||
|
return fut
|
||||||
|
|
||||||
when defined(libp2p_dump):
|
when defined(libp2p_dump):
|
||||||
dumpMessage(
|
dumpMessage(
|
||||||
|
@ -492,9 +496,12 @@ method write*(sconn: NoiseConnection, message: seq[byte]): Future[void] {.async.
|
||||||
left = left - chunkSize
|
left = left - chunkSize
|
||||||
offset += chunkSize
|
offset += chunkSize
|
||||||
woffset += chunkSize + FramingSize
|
woffset += chunkSize + FramingSize
|
||||||
sconn.activity = true
|
|
||||||
|
|
||||||
await sconn.stream.write(cipherFrames)
|
sconn.activity = true
|
||||||
|
|
||||||
|
# Write all `cipherFrames` in a single write, to avoid interleaving /
|
||||||
|
# sequencing issues
|
||||||
|
sconn.stream.write(cipherFrames)
|
||||||
|
|
||||||
method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureConn] {.async.} =
|
method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureConn] {.async.} =
|
||||||
trace "Starting Noise handshake", conn, initiator
|
trace "Starting Noise handshake", conn, initiator
|
||||||
|
|
|
@ -94,7 +94,6 @@ when defined(libp2p_agents_metrics):
|
||||||
method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.async.} =
|
method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.async.} =
|
||||||
if s.atEof:
|
if s.atEof:
|
||||||
raise newLPStreamEOFError()
|
raise newLPStreamEOFError()
|
||||||
|
|
||||||
withExceptions:
|
withExceptions:
|
||||||
result = await s.client.readOnce(pbytes, nbytes)
|
result = await s.client.readOnce(pbytes, nbytes)
|
||||||
s.activity = true # reset activity flag
|
s.activity = true # reset activity flag
|
||||||
|
@ -104,31 +103,36 @@ method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.
|
||||||
if s.tracked:
|
if s.tracked:
|
||||||
libp2p_peers_traffic_read.inc(nbytes.int64, labelValues = [s.shortAgent])
|
libp2p_peers_traffic_read.inc(nbytes.int64, labelValues = [s.shortAgent])
|
||||||
|
|
||||||
method write*(s: ChronosStream, msg: seq[byte]) {.async.} =
|
proc completeWrite(
|
||||||
if s.closed:
|
s: ChronosStream, fut: Future[int], msgLen: int): Future[void] {.async.} =
|
||||||
raise newLPStreamClosedError()
|
|
||||||
|
|
||||||
if msg.len == 0:
|
|
||||||
return
|
|
||||||
|
|
||||||
withExceptions:
|
withExceptions:
|
||||||
# StreamTransport will only return written < msg.len on fatal failures where
|
# StreamTransport will only return written < msg.len on fatal failures where
|
||||||
# further writing is not possible - in such cases, we'll raise here,
|
# further writing is not possible - in such cases, we'll raise here,
|
||||||
# since we don't return partial writes lengths
|
# since we don't return partial writes lengths
|
||||||
var written = await s.client.write(msg)
|
var written = await fut
|
||||||
|
|
||||||
if written < msg.len:
|
if written < msgLen:
|
||||||
raise (ref LPStreamClosedError)(msg: "Write couldn't finish writing")
|
raise (ref LPStreamClosedError)(msg: "Write couldn't finish writing")
|
||||||
|
|
||||||
s.activity = true # reset activity flag
|
s.activity = true # reset activity flag
|
||||||
libp2p_network_bytes.inc(msg.len.int64, labelValues = ["out"])
|
libp2p_network_bytes.inc(msgLen.int64, labelValues = ["out"])
|
||||||
when defined(libp2p_agents_metrics):
|
when defined(libp2p_agents_metrics):
|
||||||
s.trackPeerIdentity()
|
s.trackPeerIdentity()
|
||||||
if s.tracked:
|
if s.tracked:
|
||||||
libp2p_peers_traffic_write.inc(msg.len.int64, labelValues = [s.shortAgent])
|
libp2p_peers_traffic_write.inc(msgLen.int64, labelValues = [s.shortAgent])
|
||||||
|
|
||||||
|
method write*(s: ChronosStream, msg: seq[byte]): Future[void] =
|
||||||
|
# Avoid a copy of msg being kept in the closure created by `{.async.}` as this
|
||||||
|
# drives up memory usage
|
||||||
|
if s.closed:
|
||||||
|
let fut = newFuture[void]("chronosstream.write.closed")
|
||||||
|
fut.fail(newLPStreamClosedError())
|
||||||
|
return fut
|
||||||
|
|
||||||
|
s.completeWrite(s.client.write(msg), msg.len)
|
||||||
|
|
||||||
method closed*(s: ChronosStream): bool =
|
method closed*(s: ChronosStream): bool =
|
||||||
result = s.client.closed
|
s.client.closed
|
||||||
|
|
||||||
method atEof*(s: ChronosStream): bool =
|
method atEof*(s: ChronosStream): bool =
|
||||||
s.client.atEof()
|
s.client.atEof()
|
||||||
|
|
Loading…
Reference in New Issue