limit write queue length (#376)

To break a potential read/write deadlock, gossipsub uses an unbounded
queue for writes - when peers are too slow to process this queue, it may
end up growing without bounds causing high memory usage.

Here, we introduce a maximum write queue length after which the peer is
disconnected - the queue is generous enough that any "normal" usage
should be fine - writes that are `await`:ed are not affected, only
writes that are launched in an `asyncSpawn` task or similar.

* avoid unnecessary copy of message when there are no send observers
* release message memory earlier in gossipsub
* simplify pubsubpeer logging
This commit is contained in:
Jacek Sieka 2020-09-24 18:43:20 +02:00 committed by GitHub
parent 58f26d4164
commit 17e00e642a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 67 additions and 38 deletions

View File

@ -31,6 +31,10 @@ logScope:
## directions are closed and when the reader of the channel has read the
## EOF marker
const
MaxWrites = 1024 ##\
## Maximum number of in-flight writes - after this, we disconnect the peer
type
LPChannel* = ref object of BufferStream
id*: uint64 # channel id
@ -42,6 +46,7 @@ type
msgCode*: MessageType # cached in/out message code
closeCode*: MessageType # cached in/out close code
resetCode*: MessageType # cached in/out reset code
writes*: int # In-flight writes
proc open*(s: LPChannel) {.async, gcsafe.}
@ -149,6 +154,10 @@ method readOnce*(s: LPChannel,
pbytes: pointer,
nbytes: int):
Future[int] {.async.} =
## Mplex relies on reading being done regularly from every channel, or all
## channels are blocked - in particular, this means that reading from one
## channel must not be done from within a callback / read handler of another
## or the reads will lock each other.
try:
let bytes = await procCall BufferStream(s).readOnce(pbytes, nbytes)
trace "readOnce", s, bytes
@ -160,12 +169,21 @@ method readOnce*(s: LPChannel,
raise exc
method write*(s: LPChannel, msg: seq[byte]): Future[void] {.async.} =
## Write to mplex channel - there may be up to MaxWrite concurrent writes
## pending after which the peer is disconencted
if s.closedLocal or s.conn.closed:
raise newLPStreamClosedError()
if msg.len == 0:
return
if s.writes >= MaxWrites:
debug "Closing connection, too many in-flight writes on channel",
s, conn = s.conn, writes = s.writes
await s.conn.close()
return
s.writes += 1
try:
if not s.isOpen:
await s.open()
@ -179,6 +197,8 @@ method write*(s: LPChannel, msg: seq[byte]): Future[void] {.async.} =
trace "exception in lpchannel write handler", s, msg = exc.msg
await s.conn.close()
raise exc
finally:
s.writes -= 1
proc init*(
L: type LPChannel,

View File

@ -530,7 +530,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
if g.mesh.addPeer(topic, peer):
g.grafted(peer, topic)
grafting &= peer
trace "opportunistic grafting", peer = $peer
trace "opportunistic grafting", peer
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_gossipsub
@ -977,9 +977,9 @@ proc handleIHave(g: GossipSub,
peer: PubSubPeer,
ihaves: seq[ControlIHave]): ControlIWant =
if peer.score < g.parameters.gossipThreshold:
trace "ihave: ignoring low score peer", peer = $peer, score = peer.score
trace "ihave: ignoring low score peer", peer, score = peer.score
elif peer.iHaveBudget == 0:
trace "ihave: ignoring out of budget peer", peer = $peer, score = peer.score
trace "ihave: ignoring out of budget peer", peer, score = peer.score
else:
dec peer.iHaveBudget
for ihave in ihaves:
@ -995,7 +995,7 @@ proc handleIWant(g: GossipSub,
peer: PubSubPeer,
iwants: seq[ControlIWant]): seq[Message] =
if peer.score < g.parameters.gossipThreshold:
trace "iwant: ignoring low score peer", peer = $peer, score = peer.score
trace "iwant: ignoring low score peer", peer, score = peer.score
else:
for iwant in iwants:
for mid in iwant.messageIDs:
@ -1178,7 +1178,7 @@ method publish*(g: GossipSub,
# but a peer's own messages will always be published to all known peers in the topic.
for peer in g.gossipsub.getOrDefault(topic):
if peer.score >= g.parameters.publishThreshold:
trace "publish: including flood/high score peer", peer = $peer
trace "publish: including flood/high score peer", peer
peers.incl(peer)
# add always direct peers

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/[sequtils, strutils, tables, hashes, sets]
import std/[sequtils, strutils, tables, hashes]
import chronos, chronicles, nimcrypto/sha2, metrics
import rpc/[messages, message, protobuf],
../../peerid,
@ -17,6 +17,8 @@ import rpc/[messages, message, protobuf],
../../protobuf/minprotobuf,
../../utility
export peerid, connection
logScope:
topics = "pubsubpeer"
@ -60,8 +62,6 @@ type
RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void] {.gcsafe.}
chronicles.formatIt(PubSubPeer): $it.peerId
func hash*(p: PubSubPeer): Hash =
# int is either 32/64, so intptr basically, pubsubpeer is a ref
cast[pointer](p).hash
@ -75,6 +75,9 @@ proc connected*(p: PubSubPeer): bool =
not p.sendConn.isNil and not
(p.sendConn.closed or p.sendConn.atEof)
proc hasObservers(p: PubSubPeer): bool =
p.observers != nil and anyIt(p.observers[], it != nil)
proc recvObservers(p: PubSubPeer, msg: var RPCMsg) =
# trigger hooks
if not(isNil(p.observers)) and p.observers[].len > 0:
@ -90,7 +93,6 @@ proc sendObservers(p: PubSubPeer, msg: var RPCMsg) =
obs.onSend(p, msg)
proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
debug "starting pubsub read loop",
conn, peer = p, closed = conn.closed
try:
@ -176,50 +178,54 @@ proc connectImpl(p: PubSubPeer) {.async.} =
proc connect*(p: PubSubPeer) =
asyncSpawn connectImpl(p)
proc sendImpl(p: PubSubPeer, msg: RPCMsg) {.async.} =
doAssert(not isNil(p), "pubsubpeer nil!")
let conn = p.sendConn
if conn == nil:
trace "No send connection, skipping message", p, msg
return
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
# trigger send hooks
var mm = msg # hooks can modify the message
p.sendObservers(mm)
let encoded = encodeRpcMsg(mm)
if encoded.len <= 0:
info "empty message, skipping"
return
proc sendImpl(conn: Connection, encoded: seq[byte]) {.async.} =
try:
trace "sending encoded msgs to peer", conn, encoded = shortLog(encoded)
await conn.writeLp(encoded)
trace "sent pubsub message to remote", conn
when defined(libp2p_expensive_metrics):
for x in mm.messages:
for t in x.topicIDs:
# metrics
libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t])
except CatchableError as exc:
# Because we detach the send call from the currently executing task using
# asyncSpawn, no exceptions may leak out of it
trace "Unable to send to remote", conn, exc = exc.msg
trace "Unable to send to remote", conn, msg = exc.msg
# Next time sendConn is used, it will be have its close flag set and thus
# will be recycled
await conn.close() # This will clean up the send connection
proc send*(p: PubSubPeer, msg: RPCMsg) =
asyncSpawn sendImpl(p, msg)
doAssert(not isNil(p), "pubsubpeer nil!")
proc `$`*(p: PubSubPeer): string =
$p.peerId
let conn = p.sendConn
if conn == nil or conn.closed():
trace "No send connection, skipping message", p, msg
return
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
let encoded = if p.hasObservers():
# trigger send hooks
var mm = msg # hooks can modify the message
p.sendObservers(mm)
encodeRpcMsg(mm)
else:
# If there are no send hooks, we redundantly re-encode the message to
# protobuf for every peer - this could easily be improved!
encodeRpcMsg(msg)
if encoded.len <= 0:
debug "empty message, skipping", p, msg
return
# To limit the size of the closure, we only pass the encoded message and
# connection to the spawned send task
asyncSpawn sendImpl(conn, encoded)
when defined(libp2p_expensive_metrics):
for x in mm.messages:
for t in x.topicIDs:
# metrics
libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t])
proc newPubSubPeer*(peerId: PeerID,
getConn: GetConn,

View File

@ -14,6 +14,8 @@ import messages,
../../../utility,
../../../protobuf/minprotobuf
{.push raises: [Defect].}
logScope:
topics = "pubsubprotobuf"

View File

@ -165,6 +165,7 @@ method readOnce*(s: BufferStream,
if buf.len == 0 or s.isEof: # Another task might have set EOF!
# No more data will arrive on read queue
trace "EOF", s
s.isEof = true
else:
let remaining = min(buf.len, nbytes - rbytes)