make maxDurationInNonPriorityQueue configurable and none by default
This commit is contained in:
parent
5d9478b0ec
commit
a04f8d2757
|
@ -83,7 +83,8 @@ proc init*(_: type[GossipSubParams]): GossipSubParams =
|
|||
enablePX: false,
|
||||
bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps
|
||||
overheadRateLimit: Opt.none(tuple[bytes: int, interval: Duration]),
|
||||
disconnectPeerAboveRateLimit: false
|
||||
disconnectPeerAboveRateLimit: false,
|
||||
maxDurationInNonPriorityQueue: Opt.none(Duration),
|
||||
)
|
||||
|
||||
proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] =
|
||||
|
@ -751,4 +752,5 @@ method getOrCreatePeer*(
|
|||
let peer = procCall PubSub(g).getOrCreatePeer(peerId, protos)
|
||||
g.parameters.overheadRateLimit.withValue(overheadRateLimit):
|
||||
peer.overheadRateLimitOpt = Opt.some(TokenBucket.new(overheadRateLimit.bytes, overheadRateLimit.interval))
|
||||
peer.rpcmessagequeue.maxDurationInNonPriorityQueue = g.parameters.maxDurationInNonPriorityQueue
|
||||
return peer
|
||||
|
|
|
@ -147,6 +147,10 @@ type
|
|||
overheadRateLimit*: Opt[tuple[bytes: int, interval: Duration]]
|
||||
disconnectPeerAboveRateLimit*: bool
|
||||
|
||||
# The maximum duration a message can stay in the non-priority queue. If it exceeds this duration, it will be discarded
|
||||
# as soon as it is dequeued, instead of being sent to the remote peer. The default value is none, i.e., no maximum duration.
|
||||
maxDurationInNonPriorityQueue*: Opt[Duration]
|
||||
|
||||
BackoffTable* = Table[string, Table[PeerId, Moment]]
|
||||
ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]]
|
||||
|
||||
|
|
|
@ -67,7 +67,7 @@ type
|
|||
# Task for processing non-priority message queue.
|
||||
sendNonPriorityTask: Future[void]
|
||||
# The max duration a message to be relayed can wait to be sent before it is dropped. The default is 500ms.
|
||||
maxDurationInNonPriorityQueue: Duration
|
||||
maxDurationInNonPriorityQueue*: Opt[Duration]
|
||||
|
||||
PubSubPeer* = ref object of RootObj
|
||||
getConn*: GetConn # callback to establish a new send connection
|
||||
|
@ -90,7 +90,7 @@ type
|
|||
behaviourPenalty*: float64 # the eventual penalty score
|
||||
overheadRateLimitOpt*: Opt[TokenBucket]
|
||||
|
||||
rpcmessagequeue: RpcMessageQueue
|
||||
rpcmessagequeue*: RpcMessageQueue
|
||||
|
||||
RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void]
|
||||
{.gcsafe, raises: [].}
|
||||
|
@ -385,7 +385,7 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
|
|||
proc sendNonPriorityTask(p: PubSubPeer) {.async.} =
|
||||
while true:
|
||||
# we send non-priority messages only if there are no pending priority messages
|
||||
let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst()
|
||||
let queuedMsg = await p.rpcmessagequeue.nonPriorityQueue.popFirst()
|
||||
while p.rpcmessagequeue.sendPriorityQueue.len > 0:
|
||||
p.clearSendPriorityQueue()
|
||||
# this minimizes the number of times we have to wait for something (each wait = performance cost)
|
||||
|
@ -395,11 +395,12 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} =
|
|||
await p.rpcmessagequeue.sendPriorityQueue[^1]
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId])
|
||||
if Moment.now() - ttlMsg.addedAt >= p.rpcmessagequeue.maxDurationInNonPriorityQueue:
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_non_priority_msgs_dropped.inc(labelValues = [$p.peerId])
|
||||
continue
|
||||
await p.sendMsg(ttlMsg.msg)
|
||||
p.rpcmessagequeue.maxDurationInNonPriorityQueue.withValue(maxDurationInNonPriorityQueue):
|
||||
if Moment.now() - queuedMsg.addedAt >= maxDurationInNonPriorityQueue:
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_non_priority_msgs_dropped.inc(labelValues = [$p.peerId])
|
||||
continue
|
||||
await p.sendMsg(queuedMsg.msg)
|
||||
|
||||
proc startSendNonPriorityTask(p: PubSubPeer) =
|
||||
debug "starting sendNonPriorityTask", p
|
||||
|
@ -417,7 +418,7 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) =
|
|||
libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
|
||||
libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
|
||||
|
||||
proc new(T: typedesc[RpcMessageQueue], maxDurationInNonPriorityQueue = 1.seconds): T =
|
||||
proc new(T: typedesc[RpcMessageQueue], maxDurationInNonPriorityQueue = Opt.none(Duration)): T =
|
||||
return T(
|
||||
sendPriorityQueue: initDeque[Future[void]](),
|
||||
nonPriorityQueue: newAsyncQueue[QueuedMessage](),
|
||||
|
@ -431,7 +432,8 @@ proc new*(
|
|||
onEvent: OnEvent,
|
||||
codec: string,
|
||||
maxMessageSize: int,
|
||||
overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket)): T =
|
||||
overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket),
|
||||
maxDurationInNonPriorityQueue = Opt.none(Duration)): T =
|
||||
|
||||
result = T(
|
||||
getConn: getConn,
|
||||
|
@ -441,7 +443,7 @@ proc new*(
|
|||
connectedFut: newFuture[void](),
|
||||
maxMessageSize: maxMessageSize,
|
||||
overheadRateLimitOpt: overheadRateLimitOpt,
|
||||
rpcmessagequeue: RpcMessageQueue.new(),
|
||||
rpcmessagequeue: RpcMessageQueue.new(maxDurationInNonPriorityQueue),
|
||||
)
|
||||
result.sentIHaves.addFirst(default(HashSet[MessageId]))
|
||||
result.heDontWants.addFirst(default(HashSet[MessageId]))
|
||||
|
|
Loading…
Reference in New Issue