mirror of
https://github.com/codex-storage/nim-libp2p.git
synced 2025-02-23 08:08:22 +00:00
This change modifies how the backpressure algorithm in bufferstream works - in particular, instead of working byte-by-byte, it will now work seq-by-seq. When data arrives, it usually does so in packets - in the current bufferstream, the packet is read then split into bytes which are fed one by one to the bufferstream. On the reading side, the bytes are popped of the bufferstream, again byte by byte, to satisfy `readOnce` requests - this introduces a lot of synchronization traffic because the checks for full buffer and for async event handling must be done for every byte. In this PR, a queue of length 1 is used instead - this means there will at most exist one "packet" in `pushTo`, one in the queue and one in the slush buffer that is used to store incomplete reads. * avoid byte-by-byte copy to buffer, with synchronization in-between * reuse AsyncQueue synchronization logic instead of rolling own * avoid writeHandler callback - implement `write` method instead * simplify EOF signalling by only setting EOF flag in queue reader (and reset) * remove BufferStream pipes (unused) * fixes drainBuffer deadlock when drain is called from within read loop and thus blocks draining * fix lpchannel init order
265 lines
9.1 KiB
Nim
265 lines
9.1 KiB
Nim
## Nim-LibP2P
|
|
## Copyright (c) 2019 Status Research & Development GmbH
|
|
## Licensed under either of
|
|
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
## at your option.
|
|
## This file may not be copied, modified, or distributed except according to
|
|
## those terms.
|
|
|
|
import std/[hashes, options, strutils, tables]
|
|
import chronos, chronicles, nimcrypto/sha2, metrics
|
|
import rpc/[messages, message, protobuf],
|
|
../../peerid,
|
|
../../peerinfo,
|
|
../../stream/connection,
|
|
../../crypto/crypto,
|
|
../../protobuf/minprotobuf,
|
|
../../utility
|
|
|
|
logScope:
|
|
topics = "pubsubpeer"
|
|
|
|
when defined(libp2p_expensive_metrics):
|
|
declareCounter(libp2p_pubsub_sent_messages, "number of messages sent", labels = ["id", "topic"])
|
|
declareCounter(libp2p_pubsub_received_messages, "number of messages received", labels = ["id", "topic"])
|
|
declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"])
|
|
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"])
|
|
|
|
type
|
|
PubSubObserver* = ref object
|
|
onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
|
|
onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
|
|
|
|
GetConn* = proc(): Future[(Connection, RPCMsg)] {.gcsafe.}
|
|
|
|
PubSubPeer* = ref object of RootObj
|
|
getConn*: GetConn # callback to establish a new send connection
|
|
codec*: string # the protocol that this peer joined from
|
|
sendConn: Connection # cached send connection
|
|
connections*: seq[Connection] # connections to this peer
|
|
peerId*: PeerID
|
|
handler*: RPCHandler
|
|
observers*: ref seq[PubSubObserver] # ref as in smart_ptr
|
|
dialLock: AsyncLock
|
|
|
|
RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void] {.gcsafe.}
|
|
|
|
func hash*(p: PubSubPeer): Hash =
|
|
# int is either 32/64, so intptr basically, pubsubpeer is a ref
|
|
cast[pointer](p).hash
|
|
|
|
func shortLog*(p: PubSubPeer): string =
|
|
if p.isNil: "PubSubPeer(nil)"
|
|
else: shortLog(p.peerId)
|
|
chronicles.formatIt(PubSubPeer): shortLog(it)
|
|
|
|
proc connected*(p: PubSubPeer): bool =
|
|
not p.sendConn.isNil and not
|
|
(p.sendConn.closed or p.sendConn.atEof)
|
|
|
|
proc recvObservers(p: PubSubPeer, msg: var RPCMsg) =
|
|
# trigger hooks
|
|
if not(isNil(p.observers)) and p.observers[].len > 0:
|
|
for obs in p.observers[]:
|
|
if not(isNil(obs)): # TODO: should never be nil, but...
|
|
obs.onRecv(p, msg)
|
|
|
|
proc sendObservers(p: PubSubPeer, msg: var RPCMsg) =
|
|
# trigger hooks
|
|
if not(isNil(p.observers)) and p.observers[].len > 0:
|
|
for obs in p.observers[]:
|
|
if not(isNil(obs)): # TODO: should never be nil, but...
|
|
obs.onSend(p, msg)
|
|
|
|
proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
|
|
|
|
debug "starting pubsub read loop",
|
|
conn, peer = p, closed = conn.closed
|
|
try:
|
|
try:
|
|
while not conn.atEof:
|
|
trace "waiting for data", conn, peer = p, closed = conn.closed
|
|
|
|
let data = await conn.readLp(64 * 1024)
|
|
trace "read data from peer",
|
|
conn, peer = p, closed = conn.closed,
|
|
data = data.shortLog
|
|
|
|
var rmsg = decodeRpcMsg(data)
|
|
if rmsg.isErr():
|
|
notice "failed to decode msg from peer",
|
|
conn, peer = p, closed = conn.closed,
|
|
err = rmsg.error()
|
|
break
|
|
|
|
trace "decoded msg from peer",
|
|
conn, peer = p, closed = conn.closed,
|
|
msg = rmsg.get().shortLog
|
|
# trigger hooks
|
|
p.recvObservers(rmsg.get())
|
|
|
|
when defined(libp2p_expensive_metrics):
|
|
for m in rmsg.get().messages:
|
|
for t in m.topicIDs:
|
|
# metrics
|
|
libp2p_pubsub_received_messages.inc(labelValues = [$p.peerId, t])
|
|
|
|
await p.handler(p, rmsg.get())
|
|
finally:
|
|
await conn.close()
|
|
|
|
if p.sendConn == conn:
|
|
p.sendConn = nil
|
|
|
|
except CancelledError:
|
|
# This is top-level procedure which will work as separate task, so it
|
|
# do not need to propogate CancelledError.
|
|
trace "Unexpected cancellation in PubSubPeer.handle"
|
|
except CatchableError as exc:
|
|
trace "Exception occurred in PubSubPeer.handle",
|
|
conn, peer = p, closed = conn.closed, exc = exc.msg
|
|
finally:
|
|
debug "exiting pubsub read loop",
|
|
conn, peer = p, closed = conn.closed
|
|
|
|
proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} =
|
|
## get a cached send connection or create a new one - will return nil if
|
|
## getting a new connection fails
|
|
##
|
|
|
|
block: # check if there's an existing connection that can be reused
|
|
let current = p.sendConn
|
|
|
|
if not current.isNil:
|
|
if not (current.closed() or current.atEof):
|
|
# The existing send connection looks like it might work - reuse it
|
|
trace "Reusing existing connection", current
|
|
return current
|
|
|
|
# Send connection is set but broken - get rid of it
|
|
p.sendConn = nil
|
|
|
|
# Careful, p.sendConn might change after here!
|
|
await current.close() # TODO this might be unnecessary
|
|
|
|
try:
|
|
# Testing has demonstrated that when we perform concurrent meshsub dials
|
|
# and later close one of them, other implementations such as rust-libp2p
|
|
# become deaf to our messages (potentially due to the clean-up associated
|
|
# with closing connections). To prevent this, we use a lock that ensures
|
|
# that only a single dial will be performed for each peer and send the
|
|
# subscription table every time we reconnect.
|
|
#
|
|
# Nevertheless, this approach is still quite problematic because the gossip
|
|
# sends and their respective dials may be started from the mplex read loop.
|
|
# This may cause the read loop to get stuck which ultimately results in a
|
|
# deadlock when the other side tries to send us any other message that must
|
|
# be routed through mplex (it will be stuck on `pushTo`). Such messages
|
|
# naturally arise in the process of dialing itself.
|
|
#
|
|
# See https://github.com/status-im/nim-libp2p/issues/337
|
|
#
|
|
# One possible long-term solution is to avoid "blocking" the mplex read
|
|
# loop by making the gossip send non-blocking through the use of a queue.
|
|
await p.dialLock.acquire()
|
|
|
|
# Another concurrent dial may have populated p.sendConn
|
|
if p.sendConn != nil:
|
|
let current = p.sendConn
|
|
if not (current.closed() or current.atEof):
|
|
# The existing send connection looks like it might work - reuse it
|
|
debug "Reusing existing connection", current
|
|
return current
|
|
else:
|
|
p.sendConn = nil
|
|
|
|
# Grab a new send connection
|
|
let (newConn, handshake) = await p.getConn() # ...and here
|
|
if newConn.isNil:
|
|
return nil
|
|
|
|
trace "Sending handshake", newConn, handshake = shortLog(handshake)
|
|
await newConn.writeLp(encodeRpcMsg(handshake))
|
|
|
|
trace "Caching new send connection", newConn
|
|
p.sendConn = newConn
|
|
# Start a read loop on the new connection.
|
|
# All the errors are handled inside `handle()` procedure.
|
|
asyncSpawn p.handle(newConn)
|
|
return newConn
|
|
finally:
|
|
if p.dialLock.locked:
|
|
p.dialLock.release()
|
|
|
|
proc connectImpl*(p: PubSubPeer) {.async.} =
|
|
try:
|
|
discard await getSendConn(p)
|
|
except CatchableError as exc:
|
|
debug "Could not connect to pubsub peer", err = exc.msg
|
|
|
|
proc connect*(p: PubSubPeer) =
|
|
asyncCheck(connectImpl(p))
|
|
|
|
proc sendImpl(p: PubSubPeer, msg: RPCMsg) {.async.} =
|
|
doAssert(not isNil(p), "pubsubpeer nil!")
|
|
|
|
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
|
|
|
|
# trigger send hooks
|
|
var mm = msg # hooks can modify the message
|
|
p.sendObservers(mm)
|
|
|
|
let encoded = encodeRpcMsg(mm)
|
|
if encoded.len <= 0:
|
|
info "empty message, skipping"
|
|
return
|
|
|
|
var conn: Connection
|
|
try:
|
|
conn = await p.getSendConn()
|
|
if conn == nil:
|
|
trace "Couldn't get send connection, dropping message", peer = p
|
|
return
|
|
|
|
trace "sending encoded msgs to peer", conn, encoded = shortLog(encoded)
|
|
await conn.writeLp(encoded)
|
|
trace "sent pubsub message to remote", conn
|
|
|
|
when defined(libp2p_expensive_metrics):
|
|
for x in mm.messages:
|
|
for t in x.topicIDs:
|
|
# metrics
|
|
libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t])
|
|
|
|
except CatchableError as exc:
|
|
# Because we detach the send call from the currently executing task using
|
|
# asyncCheck, no exceptions may leak out of it
|
|
trace "Unable to send to remote", conn, exc = exc.msg
|
|
# Next time sendConn is used, it will be have its close flag set and thus
|
|
# will be recycled
|
|
if not isNil(conn):
|
|
await conn.close() # This will clean up the send connection
|
|
|
|
if exc is CancelledError: # TODO not handled
|
|
debug "Send cancelled", peer = p
|
|
|
|
# We'll ask for a new send connection whenever possible
|
|
if p.sendConn == conn:
|
|
p.sendConn = nil
|
|
|
|
proc send*(p: PubSubPeer, msg: RPCMsg) =
|
|
asyncCheck sendImpl(p, msg)
|
|
|
|
proc `$`*(p: PubSubPeer): string =
|
|
$p.peerId
|
|
|
|
proc newPubSubPeer*(peerId: PeerID,
|
|
getConn: GetConn,
|
|
codec: string): PubSubPeer =
|
|
new result
|
|
result.getConn = getConn
|
|
result.codec = codec
|
|
result.peerId = peerId
|
|
result.dialLock = newAsyncLock()
|