From 1a707e126464ba9cefd00916f428245149072181 Mon Sep 17 00:00:00 2001 From: diegomrsantos Date: Mon, 25 Mar 2024 22:00:11 +0100 Subject: [PATCH] feat: add max number of elements to non-prio queue (#1077) --- libp2p/protocols/pubsub/gossipsub.nim | 12 ++-- libp2p/protocols/pubsub/gossipsub/types.nim | 3 + libp2p/protocols/pubsub/pubsub.nim | 7 +- libp2p/protocols/pubsub/pubsubpeer.nim | 74 ++++++++++++++------- 4 files changed, 65 insertions(+), 31 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 0382279a8..f23f116e2 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -83,7 +83,8 @@ proc init*(_: type[GossipSubParams]): GossipSubParams = enablePX: false, bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps overheadRateLimit: Opt.none(tuple[bytes: int, interval: Duration]), - disconnectPeerAboveRateLimit: false + disconnectPeerAboveRateLimit: false, + maxNumElementsInNonPriorityQueue: DefaultMaxNumElementsInNonPriorityQueue ) proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] = @@ -172,10 +173,10 @@ method onNewPeer*(g: GossipSub, peer: PubSubPeer) = method onPubSubPeerEvent*(p: GossipSub, peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe.} = case event.kind - of PubSubPeerEventKind.Connected: + of PubSubPeerEventKind.StreamOpened: discard - of PubSubPeerEventKind.Disconnected: - # If a send connection is lost, it's better to remove peer from the mesh - + of PubSubPeerEventKind.StreamClosed: + # If a send stream is lost, it's better to remove peer from the mesh - # if it gets reestablished, the peer will be readded to the mesh, and if it # doesn't, well.. then we hope the peer is going away! for topic, peers in p.mesh.mpairs(): @@ -183,6 +184,8 @@ method onPubSubPeerEvent*(p: GossipSub, peer: PubSubPeer, event: PubSubPeerEvent peers.excl(peer) for _, peers in p.fanout.mpairs(): peers.excl(peer) + of PubSubPeerEventKind.DisconnectionRequested: + asyncSpawn p.disconnectPeer(peer) # this should unsubscribePeer the peer too procCall FloodSub(p).onPubSubPeerEvent(peer, event) @@ -750,4 +753,5 @@ method getOrCreatePeer*( let peer = procCall PubSub(g).getOrCreatePeer(peerId, protos) g.parameters.overheadRateLimit.withValue(overheadRateLimit): peer.overheadRateLimitOpt = Opt.some(TokenBucket.new(overheadRateLimit.bytes, overheadRateLimit.interval)) + peer.maxNumElementsInNonPriorityQueue = g.parameters.maxNumElementsInNonPriorityQueue return peer diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index 06fa55eb3..e75abfd51 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -147,6 +147,9 @@ type overheadRateLimit*: Opt[tuple[bytes: int, interval: Duration]] disconnectPeerAboveRateLimit*: bool + # Max number of elements allowed in the non-priority queue. When this limit has been reached, the peer will be disconnected. + maxNumElementsInNonPriorityQueue*: int + BackoffTable* = Table[string, Table[PeerId, Moment]] ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]] diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 07ccfd0b7..800565d44 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -287,11 +287,14 @@ method onNewPeer(p: PubSub, peer: PubSubPeer) {.base, gcsafe.} = discard method onPubSubPeerEvent*(p: PubSub, peer: PubSubPeer, event: PubSubPeerEvent) {.base, gcsafe.} = # Peer event is raised for the send connection in particular case event.kind - of PubSubPeerEventKind.Connected: + of PubSubPeerEventKind.StreamOpened: if p.topics.len > 0: p.sendSubs(peer, toSeq(p.topics.keys), true) - of PubSubPeerEventKind.Disconnected: + of PubSubPeerEventKind.StreamClosed: discard + of PubSubPeerEventKind.DisconnectionRequested: + discard + method getOrCreatePeer*( p: PubSub, diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 64e7cd26f..916f01b1b 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -35,6 +35,11 @@ when defined(pubsubpeer_queue_metrics): declareGauge(libp2p_gossipsub_priority_queue_size, "the number of messages in the priority queue", labels = ["id"]) declareGauge(libp2p_gossipsub_non_priority_queue_size, "the number of messages in the non-priority queue", labels = ["id"]) +declareCounter(libp2p_pubsub_disconnects_over_non_priority_queue_limit, "number of peers disconnected due to over non-prio queue capacity") + +const + DefaultMaxNumElementsInNonPriorityQueue* = 1024 + type PeerRateLimitError* = object of CatchableError @@ -43,8 +48,9 @@ type onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [].} PubSubPeerEventKind* {.pure.} = enum - Connected - Disconnected + StreamOpened + StreamClosed + DisconnectionRequested # tells gossipsub that the transport connection to the peer should be closed PubSubPeerEvent* = object kind*: PubSubPeerEventKind @@ -83,6 +89,8 @@ type overheadRateLimitOpt*: Opt[TokenBucket] rpcmessagequeue: RpcMessageQueue + maxNumElementsInNonPriorityQueue*: int # The max number of elements allowed in the non-priority queue. + disconnected: bool RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void] {.gcsafe, raises: [].} @@ -181,6 +189,24 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = debug "exiting pubsub read loop", conn, peer = p, closed = conn.closed +proc closeSendConn(p: PubSubPeer, event: PubSubPeerEventKind) {.async.} = + if p.sendConn != nil: + trace "Removing send connection", p, conn = p.sendConn + await p.sendConn.close() + p.sendConn = nil + + if not p.connectedFut.finished: + p.connectedFut.complete() + + try: + if p.onEvent != nil: + p.onEvent(p, PubSubPeerEvent(kind: event)) + except CancelledError as exc: + raise exc + except CatchableError as exc: + debug "Errors during diconnection events", error = exc.msg + # don't cleanup p.address else we leak some gossip stat table + proc connectOnce(p: PubSubPeer): Future[void] {.async.} = try: if p.connectedFut.finished: @@ -203,27 +229,11 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} = p.address = if p.sendConn.observedAddr.isSome: some(p.sendConn.observedAddr.get) else: none(MultiAddress) if p.onEvent != nil: - p.onEvent(p, PubSubPeerEvent(kind: PubSubPeerEventKind.Connected)) + p.onEvent(p, PubSubPeerEvent(kind: PubSubPeerEventKind.StreamOpened)) await handle(p, newConn) finally: - if p.sendConn != nil: - trace "Removing send connection", p, conn = p.sendConn - await p.sendConn.close() - p.sendConn = nil - - if not p.connectedFut.finished: - p.connectedFut.complete() - - try: - if p.onEvent != nil: - p.onEvent(p, PubSubPeerEvent(kind: PubSubPeerEventKind.Disconnected)) - except CancelledError as exc: - raise exc - except CatchableError as exc: - debug "Errors during diconnection events", error = exc.msg - - # don't cleanup p.address else we leak some gossip stat table + await p.closeSendConn(PubSubPeerEventKind.StreamClosed) proc connectImpl(p: PubSubPeer) {.async.} = try: @@ -231,6 +241,10 @@ proc connectImpl(p: PubSubPeer) {.async.} = # send connection might get disconnected due to a timeout or an unrelated # issue so we try to get a new on while true: + if p.disconnected: + if not p.connectedFut.finished: + p.connectedFut.complete() + return await connectOnce(p) except CatchableError as exc: # never cancelled debug "Could not establish send connection", msg = exc.msg @@ -337,10 +351,18 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[v libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId]) f else: - let f = p.rpcmessagequeue.nonPriorityQueue.addLast(msg) - when defined(pubsubpeer_queue_metrics): - libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId]) - f + if len(p.rpcmessagequeue.nonPriorityQueue) >= p.maxNumElementsInNonPriorityQueue: + if not p.disconnected: + p.disconnected = true + libp2p_pubsub_disconnects_over_non_priority_queue_limit.inc() + p.closeSendConn(PubSubPeerEventKind.DisconnectionRequested) + else: + Future[void].completed() + else: + let f = p.rpcmessagequeue.nonPriorityQueue.addLast(msg) + when defined(pubsubpeer_queue_metrics): + libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId]) + f iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] = ## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances. @@ -457,7 +479,7 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) = proc new(T: typedesc[RpcMessageQueue]): T = return T( sendPriorityQueue: initDeque[Future[void]](), - nonPriorityQueue: newAsyncQueue[seq[byte]](), + nonPriorityQueue: newAsyncQueue[seq[byte]]() ) proc new*( @@ -467,6 +489,7 @@ proc new*( onEvent: OnEvent, codec: string, maxMessageSize: int, + maxNumElementsInNonPriorityQueue: int = DefaultMaxNumElementsInNonPriorityQueue, overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket)): T = result = T( @@ -478,6 +501,7 @@ proc new*( maxMessageSize: maxMessageSize, overheadRateLimitOpt: overheadRateLimitOpt, rpcmessagequeue: RpcMessageQueue.new(), + maxNumElementsInNonPriorityQueue: maxNumElementsInNonPriorityQueue ) result.sentIHaves.addFirst(default(HashSet[MessageId])) result.heDontWants.addFirst(default(HashSet[MessageId]))