mirror of
synced 2025-03-03 17:40:38 +00:00
This change modifies how the backpressure algorithm in bufferstream works - in particular, instead of working byte-by-byte, it will now work seq-by-seq. When data arrives, it usually does so in packets - in the current bufferstream, the packet is read then split into bytes which are fed one by one to the bufferstream. On the reading side, the bytes are popped of the bufferstream, again byte by byte, to satisfy `readOnce` requests - this introduces a lot of synchronization traffic because the checks for full buffer and for async event handling must be done for every byte. In this PR, a queue of length 1 is used instead - this means there will at most exist one "packet" in `pushTo`, one in the queue and one in the slush buffer that is used to store incomplete reads. * avoid byte-by-byte copy to buffer, with synchronization in-between * reuse AsyncQueue synchronization logic instead of rolling own * avoid writeHandler callback - implement `write` method instead * simplify EOF signalling by only setting EOF flag in queue reader (and reset) * remove BufferStream pipes (unused) * fixes drainBuffer deadlock when drain is called from within read loop and thus blocks draining * fix lpchannel init order
265 lines
9.1 KiB
265 lines
9.1 KiB
## Nim-LibP2P
## Copyright (c) 2019 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/[hashes, options, strutils, tables]
import chronos, chronicles, nimcrypto/sha2, metrics
import rpc/[messages, message, protobuf],
topics = "pubsubpeer"
when defined(libp2p_expensive_metrics):
declareCounter(libp2p_pubsub_sent_messages, "number of messages sent", labels = ["id", "topic"])
declareCounter(libp2p_pubsub_received_messages, "number of messages received", labels = ["id", "topic"])
declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"])
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"])
PubSubObserver* = ref object
onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
GetConn* = proc(): Future[(Connection, RPCMsg)] {.gcsafe.}
PubSubPeer* = ref object of RootObj
getConn*: GetConn # callback to establish a new send connection
codec*: string # the protocol that this peer joined from
sendConn: Connection # cached send connection
connections*: seq[Connection] # connections to this peer
peerId*: PeerID
handler*: RPCHandler
observers*: ref seq[PubSubObserver] # ref as in smart_ptr
dialLock: AsyncLock
RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void] {.gcsafe.}
func hash*(p: PubSubPeer): Hash =
# int is either 32/64, so intptr basically, pubsubpeer is a ref
func shortLog*(p: PubSubPeer): string =
if p.isNil: "PubSubPeer(nil)"
else: shortLog(p.peerId)
chronicles.formatIt(PubSubPeer): shortLog(it)
proc connected*(p: PubSubPeer): bool =
not p.sendConn.isNil and not
(p.sendConn.closed or p.sendConn.atEof)
proc recvObservers(p: PubSubPeer, msg: var RPCMsg) =
# trigger hooks
if not(isNil(p.observers)) and p.observers[].len > 0:
for obs in p.observers[]:
if not(isNil(obs)): # TODO: should never be nil, but...
obs.onRecv(p, msg)
proc sendObservers(p: PubSubPeer, msg: var RPCMsg) =
# trigger hooks
if not(isNil(p.observers)) and p.observers[].len > 0:
for obs in p.observers[]:
if not(isNil(obs)): # TODO: should never be nil, but...
obs.onSend(p, msg)
proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
debug "starting pubsub read loop",
conn, peer = p, closed = conn.closed
while not conn.atEof:
trace "waiting for data", conn, peer = p, closed = conn.closed
let data = await conn.readLp(64 * 1024)
trace "read data from peer",
conn, peer = p, closed = conn.closed,
data = data.shortLog
var rmsg = decodeRpcMsg(data)
if rmsg.isErr():
notice "failed to decode msg from peer",
conn, peer = p, closed = conn.closed,
err = rmsg.error()
trace "decoded msg from peer",
conn, peer = p, closed = conn.closed,
msg = rmsg.get().shortLog
# trigger hooks
when defined(libp2p_expensive_metrics):
for m in rmsg.get().messages:
for t in m.topicIDs:
# metrics
libp2p_pubsub_received_messages.inc(labelValues = [$p.peerId, t])
await p.handler(p, rmsg.get())
await conn.close()
if p.sendConn == conn:
p.sendConn = nil
except CancelledError:
# This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
trace "Unexpected cancellation in PubSubPeer.handle"
except CatchableError as exc:
trace "Exception occurred in PubSubPeer.handle",
conn, peer = p, closed = conn.closed, exc = exc.msg
debug "exiting pubsub read loop",
conn, peer = p, closed = conn.closed
proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} =
## get a cached send connection or create a new one - will return nil if
## getting a new connection fails
block: # check if there's an existing connection that can be reused
let current = p.sendConn
if not current.isNil:
if not (current.closed() or current.atEof):
# The existing send connection looks like it might work - reuse it
trace "Reusing existing connection", current
return current
# Send connection is set but broken - get rid of it
p.sendConn = nil
# Careful, p.sendConn might change after here!
await current.close() # TODO this might be unnecessary
# Testing has demonstrated that when we perform concurrent meshsub dials
# and later close one of them, other implementations such as rust-libp2p
# become deaf to our messages (potentially due to the clean-up associated
# with closing connections). To prevent this, we use a lock that ensures
# that only a single dial will be performed for each peer and send the
# subscription table every time we reconnect.
# Nevertheless, this approach is still quite problematic because the gossip
# sends and their respective dials may be started from the mplex read loop.
# This may cause the read loop to get stuck which ultimately results in a
# deadlock when the other side tries to send us any other message that must
# be routed through mplex (it will be stuck on `pushTo`). Such messages
# naturally arise in the process of dialing itself.
# See https://github.com/status-im/nim-libp2p/issues/337
# One possible long-term solution is to avoid "blocking" the mplex read
# loop by making the gossip send non-blocking through the use of a queue.
await p.dialLock.acquire()
# Another concurrent dial may have populated p.sendConn
if p.sendConn != nil:
let current = p.sendConn
if not (current.closed() or current.atEof):
# The existing send connection looks like it might work - reuse it
debug "Reusing existing connection", current
return current
p.sendConn = nil
# Grab a new send connection
let (newConn, handshake) = await p.getConn() # ...and here
if newConn.isNil:
return nil
trace "Sending handshake", newConn, handshake = shortLog(handshake)
await newConn.writeLp(encodeRpcMsg(handshake))
trace "Caching new send connection", newConn
p.sendConn = newConn
# Start a read loop on the new connection.
# All the errors are handled inside `handle()` procedure.
asyncSpawn p.handle(newConn)
return newConn
if p.dialLock.locked:
proc connectImpl*(p: PubSubPeer) {.async.} =
discard await getSendConn(p)
except CatchableError as exc:
debug "Could not connect to pubsub peer", err = exc.msg
proc connect*(p: PubSubPeer) =
proc sendImpl(p: PubSubPeer, msg: RPCMsg) {.async.} =
doAssert(not isNil(p), "pubsubpeer nil!")
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
# trigger send hooks
var mm = msg # hooks can modify the message
let encoded = encodeRpcMsg(mm)
if encoded.len <= 0:
info "empty message, skipping"
var conn: Connection
conn = await p.getSendConn()
if conn == nil:
trace "Couldn't get send connection, dropping message", peer = p
trace "sending encoded msgs to peer", conn, encoded = shortLog(encoded)
await conn.writeLp(encoded)
trace "sent pubsub message to remote", conn
when defined(libp2p_expensive_metrics):
for x in mm.messages:
for t in x.topicIDs:
# metrics
libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t])
except CatchableError as exc:
# Because we detach the send call from the currently executing task using
# asyncCheck, no exceptions may leak out of it
trace "Unable to send to remote", conn, exc = exc.msg
# Next time sendConn is used, it will be have its close flag set and thus
# will be recycled
if not isNil(conn):
await conn.close() # This will clean up the send connection
if exc is CancelledError: # TODO not handled
debug "Send cancelled", peer = p
# We'll ask for a new send connection whenever possible
if p.sendConn == conn:
p.sendConn = nil
proc send*(p: PubSubPeer, msg: RPCMsg) =
asyncCheck sendImpl(p, msg)
proc `$`*(p: PubSubPeer): string =
proc newPubSubPeer*(peerId: PeerID,
getConn: GetConn,
codec: string): PubSubPeer =
new result
result.getConn = getConn
result.codec = codec
result.peerId = peerId
result.dialLock = newAsyncLock()