use events to process the queue

This commit is contained in:
Diego 2024-01-18 18:46:13 +01:00
parent 96cd3b3a2b
commit c948eec7a7
No known key found for this signature in database
GPG Key ID: C9DAC9BF68D1F806
2 changed files with 55 additions and 103 deletions

View File

@ -20,7 +20,6 @@ import rpc/[messages, message, protobuf],
../../crypto/crypto,
../../protobuf/minprotobuf,
../../utility
import rpcmessagequeue
export peerid, connection, deques
@ -53,6 +52,12 @@ type
DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init
OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].}
RpcMessageQueue* = ref object
priorityQueue: AsyncQueue[seq[byte]]
nonPriorityQueue: AsyncQueue[seq[byte]]
messageAvailableEvent: AsyncEvent
queueProcessingTask: Future[void]
PubSubPeer* = ref object of RootObj
getConn*: GetConn # callback to establish a new send connection
onEvent*: OnEvent # Connectivity updates for peer
@ -75,7 +80,6 @@ type
overheadRateLimitOpt*: Opt[TokenBucket]
rpcmessagequeue: RpcMessageQueue
queueProcessingTask: Future[void]
RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void]
{.gcsafe, raises: [].}
@ -256,13 +260,14 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) {
return
if isHighPriority:
await p.rpcmessagequeue.addPriorityMessage(msg)
p.rpcmessagequeue.priorityQueue.putNoWait(msg)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId])
else:
await p.rpcmessagequeue.addNonPriorityMessage(msg)
p.rpcmessagequeue.nonPriorityQueue.putNoWait(msg)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
p.rpcmessagequeue.messageAvailableEvent.fire()
iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] =
## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances.
@ -332,62 +337,63 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
return true
return false
proc getMessage(rpcMessageQueue: RpcMessageQueue, p: PubSubPeer): Opt[seq[byte]] =
var m = rpcMessageQueue.getPriorityMessage()
if m.isSome():
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId])
return m
else:
m = rpcMessageQueue.getNonPriorityMessage()
if m.isSome():
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId])
return m
else:
return Opt.none(seq[byte])
proc processMessages(p: PubSubPeer) {.async.} =
proc sendMsg(msg: seq[byte]) {.async.} =
if p.sendConn == nil:
# Wait for a send conn to be setup. `connectOnce` will
# complete this even if the sendConn setup failed
await p.connectedFut
var conn = p.sendConn
if conn == nil or conn.closed():
debug "No send connection", msg = shortLog(msg)
return
trace "sending encoded msgs to peer", p, conn, encoded = shortLog(msg)
try:
await conn.writeLp(msg)
trace "sent pubsub message to remote", conn
except CatchableError as exc: # never cancelled
# Because we detach the send call from the currently executing task using
# asyncSpawn, no exceptions may leak out of it
trace "Unable to send to remote", conn, msg = exc.msg
# Next time sendConn is used, it will be have its close flag set and thus
# will be recycled
await conn.close() # This will clean up the send connection
while true:
let m = p.rpcmessagequeue.getMessage(p)
m.withValue(msg):
if p.sendConn == nil:
# Wait for a send conn to be setup. `connectOnce` will
# complete this even if the sendConn setup failed
await p.connectedFut
await p.rpcmessagequeue.messageAvailableEvent.wait() # Wait for an event
p.rpcmessagequeue.messageAvailableEvent.clear() # Reset the event after handling
var conn = p.sendConn
if conn == nil or conn.closed():
debug "No send connection", msg = shortLog(msg)
return
if not p.rpcmessagequeue.priorityQueue.empty(): # Process messages from the priority queue first
let message = await p.rpcmessagequeue.priorityQueue.get()
await sendMsg(message)
elif not p.rpcmessagequeue.nonPriorityQueue.empty(): # Then process messages from the non-priority queue
let message = await p.rpcmessagequeue.nonPriorityQueue.get()
await sendMsg(message)
trace "sending encoded msgs to peer", p, conn, encoded = shortLog(msg)
try:
await conn.writeLp(msg)
trace "sent pubsub message to remote", conn
except CatchableError as exc: # never cancelled
# Because we detach the send call from the currently executing task using
# asyncSpawn, no exceptions may leak out of it
trace "Unable to send to remote", conn, msg = exc.msg
# Next time sendConn is used, it will be have its close flag set and thus
# will be recycled
await conn.close() # This will clean up the send connection
# Include a delay or a mechanism to prevent this loop from consuming too much CPU
await sleepAsync(10) # For example, a 10 ms sleep
proc startProcessingMessages(p: PubSubPeer) =
if p.queueProcessingTask.isNil:
p.queueProcessingTask = p.processMessages()
if p.rpcmessagequeue.queueProcessingTask.isNil:
p.rpcmessagequeue.queueProcessingTask = p.processMessages()
proc stopProcessingMessages*(p: PubSubPeer) =
if not p.queueProcessingTask.isNil:
p.queueProcessingTask.cancel()
p.rpcmessagequeue.clear()
if not p.rpcmessagequeue.queueProcessingTask.isNil:
p.rpcmessagequeue.queueProcessingTask.cancel()
p.rpcmessagequeue.queueProcessingTask = nil
p.rpcmessagequeue.priorityQueue.clear()
p.rpcmessagequeue.nonPriorityQueue.clear()
libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
proc new(T: typedesc[RpcMessageQueue]): T =
return T(
priorityQueue: newAsyncQueue[seq[byte]](),
nonPriorityQueue: newAsyncQueue[seq[byte]](),
messageAvailableEvent: newAsyncEvent(),
)
proc new*(
T: typedesc[PubSubPeer],
peerId: PeerId,

View File

@ -1,54 +0,0 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [].}
import chronos, chronicles, stew/results
import ../../stream/connection
type
RpcMessageQueue* = ref object
priorityQueue: AsyncQueue[seq[byte]]
nonPriorityQueue: AsyncQueue[seq[byte]]
proc addPriorityMessage*(aq: RpcMessageQueue; msg: seq[byte]) {.async.} =
await aq.priorityQueue.put(msg)
proc addNonPriorityMessage*(aq: RpcMessageQueue; msg: seq[byte]) {.async.} =
await aq.nonPriorityQueue.put(msg)
proc new*(T: typedesc[RpcMessageQueue]): T =
return T(
priorityQueue: newAsyncQueue[seq[byte]](),
nonPriorityQueue: newAsyncQueue[seq[byte]]()
)
proc getPriorityMessage*(rpcMessageQueue: RpcMessageQueue): Opt[seq[byte]] =
return
if not rpcMessageQueue.priorityQueue.empty():
try:
Opt.some(rpcMessageQueue.priorityQueue.getNoWait())
except AsyncQueueEmptyError:
Opt.none(seq[byte])
else:
Opt.none(seq[byte])
proc getNonPriorityMessage*(rpcMessageQueue: RpcMessageQueue): Opt[seq[byte]] =
return
if not rpcMessageQueue.nonPriorityQueue.empty():
try:
Opt.some(rpcMessageQueue.nonPriorityQueue.getNoWait())
except AsyncQueueEmptyError:
Opt.none(seq[byte])
else:
Opt.none(seq[byte])
proc clear*(rpcMessageQueue: RpcMessageQueue) =
rpcMessageQueue.priorityQueue.clear()
rpcMessageQueue.nonPriorityQueue.clear()