diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 5740f5d8d..69dc14962 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -303,7 +303,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = trace "sending control message", msg = shortLog(respControl), peer g.send( peer, - RPCMsg(control: some(respControl), messages: messages)) + RPCMsg(control: some(respControl), messages: messages), true) proc validateAndRelay(g: GossipSub, msg: Message, @@ -370,7 +370,7 @@ proc validateAndRelay(g: GossipSub, # In theory, if topics are the same in all messages, we could batch - we'd # also have to be careful to only include validated messages - g.broadcast(toSendPeers, RPCMsg(messages: @[msg])) + g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), false) trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer for topic in msg.topicIds: if topic notin g.topics: continue @@ -441,7 +441,7 @@ method rpcHandler*(g: GossipSub, peer.recvObservers(rpcMsg) if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0: - g.send(peer, RPCMsg(pong: rpcMsg.ping)) + g.send(peer, RPCMsg(pong: rpcMsg.ping), true) peer.pingBudget.dec for i in 0.. 0: + peer.shortAgent + else: + "unknown" + else: + "unknown" + func hash*(p: PubSubPeer): Hash = p.peerId.hash @@ -227,7 +243,7 @@ template sendMetrics(msg: RPCMsg): untyped = # metrics libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t]) -proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.async.} = +proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) {.async.} = doAssert(not isNil(p), "pubsubpeer nil!") if msg.len <= 0: @@ -238,29 +254,18 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.async.} = info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len return - if p.sendConn == nil: - # Wait for a send conn to be setup. `connectOnce` will - # complete this even if the sendConn setup failed - await p.connectedFut - - var conn = p.sendConn - if conn == nil or conn.closed(): - debug "No send connection", p, msg = shortLog(msg) - return - - trace "sending encoded msgs to peer", conn, encoded = shortLog(msg) - - try: - await conn.writeLp(msg) - trace "sent pubsub message to remote", conn - except CatchableError as exc: # never cancelled - # Because we detach the send call from the currently executing task using - # asyncSpawn, no exceptions may leak out of it - trace "Unable to send to remote", conn, msg = exc.msg - # Next time sendConn is used, it will be have its close flag set and thus - # will be recycled - - await conn.close() # This will clean up the send connection + if isHighPriority: + await p.rpcmessagequeue.addPriorityMessage(msg) + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId]) + else: + libp2p_gossipsub_priority_queue_size.inc(labelValues = [p.getAgent()]) + else: + await p.rpcmessagequeue.addNonPriorityMessage(msg) + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId]) + else: + libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [p.getAgent()]) iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] = ## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances. @@ -297,7 +302,7 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: else: trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg) -proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} = +proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool = false) {.raises: [].} = # When sending messages, we take care to re-encode them with the right # anonymization flag to ensure that we're not penalized for sending invalid # or malicious data on the wire - in particular, re-encoding protects against @@ -317,11 +322,11 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} = if encoded.len > p.maxMessageSize and msg.messages.len > 1: for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize): - asyncSpawn p.sendEncoded(encodedSplitMsg) + asyncSpawn p.sendEncoded(encodedSplitMsg, isHighPriority) else: # If the message size is within limits, send it as is trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg) - asyncSpawn p.sendEncoded(encoded) + asyncSpawn p.sendEncoded(encoded, isHighPriority) proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool = for sentIHave in p.sentIHaves.mitems(): @@ -330,6 +335,55 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool = return true return false +proc getMessage(rpcMessageQueue: RpcMessageQueue, p: PubSubPeer): Future[Opt[seq[byte]]] {.async.} = + var m = await rpcMessageQueue.getPriorityMessage() + if m.isSome(): + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId]) + else: + libp2p_gossipsub_priority_queue_size.dec(labelValues = [p.getAgent()]) + return m + else: + m = await rpcMessageQueue.getNonPriorityMessage() + if m.isSome(): + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) + else: + libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [p.getAgent()]) + return m + else: + return Opt.none(seq[byte]) + +proc processMessages(p: PubSubPeer, queue: RpcMessageQueue) {.async.} = + while true: + let m = await queue.getMessage(p) + m.withValue(msg): + if p.sendConn == nil: + # Wait for a send conn to be setup. `connectOnce` will + # complete this even if the sendConn setup failed + await p.connectedFut + + var conn = p.sendConn + if conn == nil or conn.closed(): + debug "No send connection", msg = shortLog(msg) + return + + trace "sending encoded msgs to peer", p, conn, encoded = shortLog(msg) + + try: + await conn.writeLp(msg) + trace "sent pubsub message to remote", conn + except CatchableError as exc: # never cancelled + # Because we detach the send call from the currently executing task using + # asyncSpawn, no exceptions may leak out of it + trace "Unable to send to remote", conn, msg = exc.msg + # Next time sendConn is used, it will be have its close flag set and thus + # will be recycled + + await conn.close() # This will clean up the send connection + # Include a delay or a mechanism to prevent this loop from consuming too much CPU + await sleepAsync(10) # For example, a 10 ms sleep + proc new*( T: typedesc[PubSubPeer], peerId: PeerId, @@ -346,17 +400,9 @@ proc new*( peerId: peerId, connectedFut: newFuture[void](), maxMessageSize: maxMessageSize, - overheadRateLimitOpt: overheadRateLimitOpt + overheadRateLimitOpt: overheadRateLimitOpt, + rpcmessagequeue: RpcMessageQueue.new(), ) result.sentIHaves.addFirst(default(HashSet[MessageId])) result.heDontWants.addFirst(default(HashSet[MessageId])) - -proc getAgent*(peer: PubSubPeer): string = - return - when defined(libp2p_agents_metrics): - if peer.shortAgent.len > 0: - peer.shortAgent - else: - "unknown" - else: - "unknown" + asyncSpawn result.processMessages(result.rpcmessagequeue) diff --git a/libp2p/protocols/pubsub/rpcmessagequeue.nim b/libp2p/protocols/pubsub/rpcmessagequeue.nim new file mode 100644 index 000000000..7f7c52f26 --- /dev/null +++ b/libp2p/protocols/pubsub/rpcmessagequeue.nim @@ -0,0 +1,44 @@ +# Nim-LibP2P +# Copyright (c) 2023 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +{.push raises: [].} + +import chronos, chronicles, stew/results +import ../../stream/connection + +type + RpcMessageQueue* = ref object + priorityQueue: AsyncQueue[seq[byte]] + nonPriorityQueue: AsyncQueue[seq[byte]] + +proc addPriorityMessage*(aq: RpcMessageQueue; msg: seq[byte]) {.async.} = + await aq.priorityQueue.put(msg) + +proc addNonPriorityMessage*(aq: RpcMessageQueue; msg: seq[byte]) {.async.} = + await aq.nonPriorityQueue.put(msg) + +proc new*(T: typedesc[RpcMessageQueue]): T = + return T( + priorityQueue: newAsyncQueue[seq[byte]](), + nonPriorityQueue: newAsyncQueue[seq[byte]]() + ) + +proc getPriorityMessage*(rpcMessageQueue: RpcMessageQueue): Future[Opt[seq[byte]]] {.async.} = + return + if not rpcMessageQueue.priorityQueue.empty(): + Opt.some(rpcMessageQueue.priorityQueue.getNoWait()) + else: + Opt.none(seq[byte]) + +proc getNonPriorityMessage*(rpcMessageQueue: RpcMessageQueue): Future[Opt[seq[byte]]] {.async.} = + return + if not rpcMessageQueue.nonPriorityQueue.empty(): + Opt.some(rpcMessageQueue.nonPriorityQueue.getNoWait()) + else: + Opt.none(seq[byte])