Send priority with queue fix (#1051)

Co-authored-by: Diego <diego@status.im>
This commit is contained in:
Jacek Sieka 2024-03-05 16:05:21 +01:00 committed by GitHub
parent 28609597d1
commit ae13a0d583
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 240 additions and 83 deletions

View File

@ -157,7 +157,7 @@ method rpcHandler*(f: FloodSub,
# In theory, if topics are the same in all messages, we could batch - we'd # In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages # also have to be careful to only include validated messages
f.broadcast(toSendPeers, RPCMsg(messages: @[msg])) f.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false)
trace "Forwared message to peers", peers = toSendPeers.len trace "Forwared message to peers", peers = toSendPeers.len
f.updateMetrics(rpcMsg) f.updateMetrics(rpcMsg)
@ -219,7 +219,7 @@ method publish*(f: FloodSub,
return 0 return 0
# Try to send to all peers that are known to be interested # Try to send to all peers that are known to be interested
f.broadcast(peers, RPCMsg(messages: @[msg])) f.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true)
when defined(libp2p_expensive_metrics): when defined(libp2p_expensive_metrics):
libp2p_pubsub_messages_published.inc(labelValues = [topic]) libp2p_pubsub_messages_published.inc(labelValues = [topic])

View File

@ -220,6 +220,8 @@ method unsubscribePeer*(g: GossipSub, peer: PeerId) =
for topic, info in stats[].topicInfos.mpairs: for topic, info in stats[].topicInfos.mpairs:
info.firstMessageDeliveries = 0 info.firstMessageDeliveries = 0
pubSubPeer.stopSendNonPriorityTask()
procCall FloodSub(g).unsubscribePeer(peer) procCall FloodSub(g).unsubscribePeer(peer)
proc handleSubscribe*(g: GossipSub, proc handleSubscribe*(g: GossipSub,
@ -279,21 +281,16 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
respControl.prune.add(g.handleGraft(peer, control.graft)) respControl.prune.add(g.handleGraft(peer, control.graft))
let messages = g.handleIWant(peer, control.iwant) let messages = g.handleIWant(peer, control.iwant)
if let
respControl.prune.len > 0 or isPruneNotEmpty = respControl.prune.len > 0
respControl.iwant.len > 0 or isIWantNotEmpty = respControl.iwant.len > 0
messages.len > 0:
# iwant and prunes from here, also messages
for smsg in messages: if isPruneNotEmpty or isIWantNotEmpty:
for topic in smsg.topicIds:
if g.knownTopics.contains(topic):
libp2p_pubsub_broadcast_messages.inc(labelValues = [topic])
else:
libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"])
if isIWantNotEmpty:
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64) libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)
if isPruneNotEmpty:
for prune in respControl.prune: for prune in respControl.prune:
if g.knownTopics.contains(prune.topicId): if g.knownTopics.contains(prune.topicId):
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicId]) libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicId])
@ -303,7 +300,21 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
trace "sending control message", msg = shortLog(respControl), peer trace "sending control message", msg = shortLog(respControl), peer
g.send( g.send(
peer, peer,
RPCMsg(control: some(respControl), messages: messages)) RPCMsg(control: some(respControl)), isHighPriority = true)
if messages.len > 0:
for smsg in messages:
for topic in smsg.topicIds:
if g.knownTopics.contains(topic):
libp2p_pubsub_broadcast_messages.inc(labelValues = [topic])
else:
libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"])
# iwant replies have lower priority
trace "sending iwant reply messages", peer
g.send(
peer,
RPCMsg(messages: messages), isHighPriority = false)
proc validateAndRelay(g: GossipSub, proc validateAndRelay(g: GossipSub,
msg: Message, msg: Message,
@ -356,7 +367,7 @@ proc validateAndRelay(g: GossipSub,
if msg.data.len > msgId.len * 10: if msg.data.len > msgId.len * 10:
g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage( g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(
idontwant: @[ControlIWant(messageIds: @[msgId])] idontwant: @[ControlIWant(messageIds: @[msgId])]
)))) ))), isHighPriority = true)
for peer in toSendPeers: for peer in toSendPeers:
for heDontWant in peer.heDontWants: for heDontWant in peer.heDontWants:
@ -370,7 +381,7 @@ proc validateAndRelay(g: GossipSub,
# In theory, if topics are the same in all messages, we could batch - we'd # In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages # also have to be careful to only include validated messages
g.broadcast(toSendPeers, RPCMsg(messages: @[msg])) g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false)
trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer
for topic in msg.topicIds: for topic in msg.topicIds:
if topic notin g.topics: continue if topic notin g.topics: continue
@ -441,7 +452,7 @@ method rpcHandler*(g: GossipSub,
peer.recvObservers(rpcMsg) peer.recvObservers(rpcMsg)
if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0: if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0:
g.send(peer, RPCMsg(pong: rpcMsg.ping)) g.send(peer, RPCMsg(pong: rpcMsg.ping), isHighPriority = true)
peer.pingBudget.dec peer.pingBudget.dec
for i in 0..<min(g.topicsHigh, rpcMsg.subscriptions.len): for i in 0..<min(g.topicsHigh, rpcMsg.subscriptions.len):
template sub: untyped = rpcMsg.subscriptions[i] template sub: untyped = rpcMsg.subscriptions[i]
@ -551,7 +562,7 @@ method onTopicSubscription*(g: GossipSub, topic: string, subscribed: bool) =
topicID: topic, topicID: topic,
peers: g.peerExchangeList(topic), peers: g.peerExchangeList(topic),
backoff: g.parameters.unsubscribeBackoff.seconds.uint64)]))) backoff: g.parameters.unsubscribeBackoff.seconds.uint64)])))
g.broadcast(mpeers, msg) g.broadcast(mpeers, msg, isHighPriority = true)
for peer in mpeers: for peer in mpeers:
g.pruned(peer, topic, backoff = some(g.parameters.unsubscribeBackoff)) g.pruned(peer, topic, backoff = some(g.parameters.unsubscribeBackoff))
@ -655,7 +666,7 @@ method publish*(g: GossipSub,
g.mcache.put(msgId, msg) g.mcache.put(msgId, msg)
g.broadcast(peers, RPCMsg(messages: @[msg])) g.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true)
if g.knownTopics.contains(topic): if g.knownTopics.contains(topic):
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic]) libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic])

View File

@ -530,14 +530,14 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
# Send changes to peers after table updates to avoid stale state # Send changes to peers after table updates to avoid stale state
if grafts.len > 0: if grafts.len > 0:
let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)]))) let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))
g.broadcast(grafts, graft) g.broadcast(grafts, graft, isHighPriority = true)
if prunes.len > 0: if prunes.len > 0:
let prune = RPCMsg(control: some(ControlMessage( let prune = RPCMsg(control: some(ControlMessage(
prune: @[ControlPrune( prune: @[ControlPrune(
topicID: topic, topicID: topic,
peers: g.peerExchangeList(topic), peers: g.peerExchangeList(topic),
backoff: g.parameters.pruneBackoff.seconds.uint64)]))) backoff: g.parameters.pruneBackoff.seconds.uint64)])))
g.broadcast(prunes, prune) g.broadcast(prunes, prune, isHighPriority = true)
proc dropFanoutPeers*(g: GossipSub) {.raises: [].} = proc dropFanoutPeers*(g: GossipSub) {.raises: [].} =
# drop peers that we haven't published to in # drop peers that we haven't published to in
@ -669,7 +669,7 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} =
topicID: t, topicID: t,
peers: g.peerExchangeList(t), peers: g.peerExchangeList(t),
backoff: g.parameters.pruneBackoff.seconds.uint64)]))) backoff: g.parameters.pruneBackoff.seconds.uint64)])))
g.broadcast(prunes, prune) g.broadcast(prunes, prune, isHighPriority = true)
# pass by ptr in order to both signal we want to update metrics # pass by ptr in order to both signal we want to update metrics
# and as well update the struct for each topic during this iteration # and as well update the struct for each topic during this iteration
@ -691,7 +691,7 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} =
libp2p_pubsub_broadcast_ihave.inc(labelValues = [ihave.topicId]) libp2p_pubsub_broadcast_ihave.inc(labelValues = [ihave.topicId])
else: else:
libp2p_pubsub_broadcast_ihave.inc(labelValues = ["generic"]) libp2p_pubsub_broadcast_ihave.inc(labelValues = ["generic"])
g.send(peer, RPCMsg(control: some(control))) g.send(peer, RPCMsg(control: some(control)), isHighPriority = true)
g.mcache.shift() # shift the cache g.mcache.shift() # shift the cache

View File

@ -138,18 +138,34 @@ method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} =
libp2p_pubsub_peers.set(p.peers.len.int64) libp2p_pubsub_peers.set(p.peers.len.int64)
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) {.raises: [].} = proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool) {.raises: [].} =
## Attempt to send `msg` to remote peer ## This procedure attempts to send a `msg` (of type `RPCMsg`) to the specified remote peer in the PubSub network.
## ##
## Parameters:
## - `p`: The `PubSub` instance.
## - `peer`: An instance of `PubSubPeer` representing the peer to whom the message should be sent.
## - `msg`: The `RPCMsg` instance that contains the message to be sent.
## - `isHighPriority`: A boolean indicating whether the message should be treated as high priority.
## High priority messages are sent immediately, while low priority messages are queued and sent only after all high
## priority messages have been sent.
trace "sending pubsub message to peer", peer, msg = shortLog(msg) trace "sending pubsub message to peer", peer, msg = shortLog(msg)
peer.send(msg, p.anonymize) peer.send(msg, p.anonymize, isHighPriority)
proc broadcast*( proc broadcast*(
p: PubSub, p: PubSub,
sendPeers: auto, # Iteratble[PubSubPeer] sendPeers: auto, # Iteratble[PubSubPeer]
msg: RPCMsg) {.raises: [].} = msg: RPCMsg,
## Attempt to send `msg` to the given peers isHighPriority: bool) {.raises: [].} =
## This procedure attempts to send a `msg` (of type `RPCMsg`) to a specified group of peers in the PubSub network.
##
## Parameters:
## - `p`: The `PubSub` instance.
## - `sendPeers`: An iterable of `PubSubPeer` instances representing the peers to whom the message should be sent.
## - `msg`: The `RPCMsg` instance that contains the message to be broadcast.
## - `isHighPriority`: A boolean indicating whether the message should be treated as high priority.
## High priority messages are sent immediately, while low priority messages are queued and sent only after all high
## priority messages have been sent.
let npeers = sendPeers.len.int64 let npeers = sendPeers.len.int64
for sub in msg.subscriptions: for sub in msg.subscriptions:
@ -195,19 +211,19 @@ proc broadcast*(
if anyIt(sendPeers, it.hasObservers): if anyIt(sendPeers, it.hasObservers):
for peer in sendPeers: for peer in sendPeers:
p.send(peer, msg) p.send(peer, msg, isHighPriority)
else: else:
# Fast path that only encodes message once # Fast path that only encodes message once
let encoded = encodeRpcMsg(msg, p.anonymize) let encoded = encodeRpcMsg(msg, p.anonymize)
for peer in sendPeers: for peer in sendPeers:
asyncSpawn peer.sendEncoded(encoded) asyncSpawn peer.sendEncoded(encoded, isHighPriority)
proc sendSubs*(p: PubSub, proc sendSubs*(p: PubSub,
peer: PubSubPeer, peer: PubSubPeer,
topics: openArray[string], topics: openArray[string],
subscribe: bool) = subscribe: bool) =
## send subscriptions to remote peer ## send subscriptions to remote peer
p.send(peer, RPCMsg.withSubs(topics, subscribe)) p.send(peer, RPCMsg.withSubs(topics, subscribe), isHighPriority = true)
for topic in topics: for topic in topics:
if subscribe: if subscribe:

View File

@ -31,6 +31,10 @@ when defined(libp2p_expensive_metrics):
declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"]) declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"])
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"]) declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"])
when defined(pubsubpeer_queue_metrics):
declareGauge(libp2p_gossipsub_priority_queue_size, "the number of messages in the priority queue", labels = ["id"])
declareGauge(libp2p_gossipsub_non_priority_queue_size, "the number of messages in the non-priority queue", labels = ["id"])
type type
PeerRateLimitError* = object of CatchableError PeerRateLimitError* = object of CatchableError
@ -49,6 +53,14 @@ type
DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init
OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].} OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].}
RpcMessageQueue* = ref object
# Tracks async tasks for sending high-priority peer-published messages.
sendPriorityQueue: Deque[Future[void]]
# Queue for lower-priority messages, like "IWANT" replies and relay messages.
nonPriorityQueue: AsyncQueue[seq[byte]]
# Task for processing non-priority message queue.
sendNonPriorityTask: Future[void]
PubSubPeer* = ref object of RootObj PubSubPeer* = ref object of RootObj
getConn*: GetConn # callback to establish a new send connection getConn*: GetConn # callback to establish a new send connection
onEvent*: OnEvent # Connectivity updates for peer onEvent*: OnEvent # Connectivity updates for peer
@ -70,6 +82,8 @@ type
behaviourPenalty*: float64 # the eventual penalty score behaviourPenalty*: float64 # the eventual penalty score
overheadRateLimitOpt*: Opt[TokenBucket] overheadRateLimitOpt*: Opt[TokenBucket]
rpcmessagequeue: RpcMessageQueue
RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void] RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void]
{.gcsafe, raises: [].} {.gcsafe, raises: [].}
@ -82,6 +96,16 @@ when defined(libp2p_agents_metrics):
#so we have to read the parents short agent.. #so we have to read the parents short agent..
p.sendConn.getWrapped().shortAgent p.sendConn.getWrapped().shortAgent
proc getAgent*(peer: PubSubPeer): string =
return
when defined(libp2p_agents_metrics):
if peer.shortAgent.len > 0:
peer.shortAgent
else:
"unknown"
else:
"unknown"
func hash*(p: PubSubPeer): Hash = func hash*(p: PubSubPeer): Hash =
p.peerId.hash p.peerId.hash
@ -227,17 +251,40 @@ template sendMetrics(msg: RPCMsg): untyped =
# metrics # metrics
libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t]) libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t])
proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.async.} = proc clearSendPriorityQueue(p: PubSubPeer) =
doAssert(not isNil(p), "pubsubpeer nil!") if p.rpcmessagequeue.sendPriorityQueue.len == 0:
return # fast path
if msg.len <= 0: while p.rpcmessagequeue.sendPriorityQueue.len > 0 and
debug "empty message, skipping", p, msg = shortLog(msg) p.rpcmessagequeue.sendPriorityQueue[0].finished:
return discard p.rpcmessagequeue.sendPriorityQueue.popFirst()
if msg.len > p.maxMessageSize: while p.rpcmessagequeue.sendPriorityQueue.len > 0 and
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len p.rpcmessagequeue.sendPriorityQueue[^1].finished:
return discard p.rpcmessagequeue.sendPriorityQueue.popLast()
when defined(pubsubpeer_queue_metrics):
libp2p_gossipsub_priority_queue_size.set(
value = p.rpcmessagequeue.sendPriorityQueue.len.int64,
labelValues = [$p.peerId])
proc sendMsgContinue(conn: Connection, msgFut: Future[void]) {.async.} =
# Continuation for a pending `sendMsg` future from below
try:
await msgFut
trace "sent pubsub message to remote", conn
except CatchableError as exc: # never cancelled
# Because we detach the send call from the currently executing task using
# asyncSpawn, no exceptions may leak out of it
trace "Unable to send to remote", conn, msg = exc.msg
# Next time sendConn is used, it will be have its close flag set and thus
# will be recycled
await conn.close() # This will clean up the send connection
proc sendMsgSlow(p: PubSubPeer, msg: seq[byte]) {.async.} =
# Slow path of `sendMsg` where msg is held in memory while send connection is
# being set up
if p.sendConn == nil: if p.sendConn == nil:
# Wait for a send conn to be setup. `connectOnce` will # Wait for a send conn to be setup. `connectOnce` will
# complete this even if the sendConn setup failed # complete this even if the sendConn setup failed
@ -248,19 +295,53 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.async.} =
debug "No send connection", p, msg = shortLog(msg) debug "No send connection", p, msg = shortLog(msg)
return return
trace "sending encoded msgs to peer", conn, encoded = shortLog(msg) trace "sending encoded msg to peer", conn, encoded = shortLog(msg)
await sendMsgContinue(conn, conn.writeLp(msg))
try: proc sendMsg(p: PubSubPeer, msg: seq[byte]): Future[void] =
await conn.writeLp(msg) if p.sendConn != nil and not p.sendConn.closed():
trace "sent pubsub message to remote", conn # Fast path that avoids copying msg (which happens for {.async.})
except CatchableError as exc: # never cancelled let conn = p.sendConn
# Because we detach the send call from the currently executing task using
# asyncSpawn, no exceptions may leak out of it
trace "Unable to send to remote", conn, msg = exc.msg
# Next time sendConn is used, it will be have its close flag set and thus
# will be recycled
await conn.close() # This will clean up the send connection trace "sending encoded msg to peer", conn, encoded = shortLog(msg)
let f = conn.writeLp(msg)
if not f.completed():
sendMsgContinue(conn, f)
else:
f
else:
sendMsgSlow(p, msg)
proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[void] =
## Asynchronously sends an encoded message to a specified `PubSubPeer`.
##
## Parameters:
## - `p`: The `PubSubPeer` instance to which the message is to be sent.
## - `msg`: The message to be sent, encoded as a sequence of bytes (`seq[byte]`).
## - `isHighPriority`: A boolean indicating whether the message should be treated as high priority.
## High priority messages are sent immediately, while low priority messages are queued and sent only after all high
## priority messages have been sent.
doAssert(not isNil(p), "pubsubpeer nil!")
if msg.len <= 0:
debug "empty message, skipping", p, msg = shortLog(msg)
Future[void].completed()
elif msg.len > p.maxMessageSize:
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
Future[void].completed()
elif isHighPriority:
p.clearSendPriorityQueue()
let f = p.sendMsg(msg)
if not f.finished:
p.rpcmessagequeue.sendPriorityQueue.addLast(f)
when defined(pubsubpeer_queue_metrics):
libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId])
f
else:
let f = p.rpcmessagequeue.nonPriorityQueue.addLast(msg)
when defined(pubsubpeer_queue_metrics):
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
f
iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] = iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] =
## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances. ## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances.
@ -297,7 +378,16 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize:
else: else:
trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg) trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} = proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool) {.raises: [].} =
## Asynchronously sends an `RPCMsg` to a specified `PubSubPeer` with an option for anonymization.
##
## Parameters:
## - `p`: The `PubSubPeer` instance to which the message is to be sent.
## - `msg`: The `RPCMsg` instance representing the message to be sent.
## - `anonymize`: A boolean flag indicating whether the message should be sent with anonymization.
## - `isHighPriority`: A boolean flag indicating whether the message should be treated as high priority.
## High priority messages are sent immediately, while low priority messages are queued and sent only after all high
## priority messages have been sent.
# When sending messages, we take care to re-encode them with the right # When sending messages, we take care to re-encode them with the right
# anonymization flag to ensure that we're not penalized for sending invalid # anonymization flag to ensure that we're not penalized for sending invalid
# or malicious data on the wire - in particular, re-encoding protects against # or malicious data on the wire - in particular, re-encoding protects against
@ -317,11 +407,11 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} =
if encoded.len > p.maxMessageSize and msg.messages.len > 1: if encoded.len > p.maxMessageSize and msg.messages.len > 1:
for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize): for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize):
asyncSpawn p.sendEncoded(encodedSplitMsg) asyncSpawn p.sendEncoded(encodedSplitMsg, isHighPriority)
else: else:
# If the message size is within limits, send it as is # If the message size is within limits, send it as is
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg) trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
asyncSpawn p.sendEncoded(encoded) asyncSpawn p.sendEncoded(encoded, isHighPriority)
proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool = proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
for sentIHave in p.sentIHaves.mitems(): for sentIHave in p.sentIHaves.mitems():
@ -330,6 +420,45 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
return true return true
return false return false
proc sendNonPriorityTask(p: PubSubPeer) {.async.} =
while true:
# we send non-priority messages only if there are no pending priority messages
let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst()
while p.rpcmessagequeue.sendPriorityQueue.len > 0:
p.clearSendPriorityQueue()
# waiting for the last future minimizes the number of times we have to
# wait for something (each wait = performance cost) -
# clearSendPriorityQueue ensures we're not waiting for an already-finished
# future
if p.rpcmessagequeue.sendPriorityQueue.len > 0:
await p.rpcmessagequeue.sendPriorityQueue[^1]
when defined(pubsubpeer_queue_metrics):
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId])
await p.sendMsg(msg)
proc startSendNonPriorityTask(p: PubSubPeer) =
debug "starting sendNonPriorityTask", p
if p.rpcmessagequeue.sendNonPriorityTask.isNil:
p.rpcmessagequeue.sendNonPriorityTask = p.sendNonPriorityTask()
proc stopSendNonPriorityTask*(p: PubSubPeer) =
if not p.rpcmessagequeue.sendNonPriorityTask.isNil:
debug "stopping sendNonPriorityTask", p
p.rpcmessagequeue.sendNonPriorityTask.cancelSoon()
p.rpcmessagequeue.sendNonPriorityTask = nil
p.rpcmessagequeue.sendPriorityQueue.clear()
p.rpcmessagequeue.nonPriorityQueue.clear()
when defined(pubsubpeer_queue_metrics):
libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
proc new(T: typedesc[RpcMessageQueue]): T =
return T(
sendPriorityQueue: initDeque[Future[void]](),
nonPriorityQueue: newAsyncQueue[seq[byte]](),
)
proc new*( proc new*(
T: typedesc[PubSubPeer], T: typedesc[PubSubPeer],
peerId: PeerId, peerId: PeerId,
@ -346,17 +475,9 @@ proc new*(
peerId: peerId, peerId: peerId,
connectedFut: newFuture[void](), connectedFut: newFuture[void](),
maxMessageSize: maxMessageSize, maxMessageSize: maxMessageSize,
overheadRateLimitOpt: overheadRateLimitOpt overheadRateLimitOpt: overheadRateLimitOpt,
rpcmessagequeue: RpcMessageQueue.new(),
) )
result.sentIHaves.addFirst(default(HashSet[MessageId])) result.sentIHaves.addFirst(default(HashSet[MessageId]))
result.heDontWants.addFirst(default(HashSet[MessageId])) result.heDontWants.addFirst(default(HashSet[MessageId]))
result.startSendNonPriorityTask()
proc getAgent*(peer: PubSubPeer): string =
return
when defined(libp2p_agents_metrics):
if peer.shortAgent.len > 0:
peer.shortAgent
else:
"unknown"
else:
"unknown"

View File

@ -780,7 +780,7 @@ suite "GossipSub internal":
gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage( gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
ihave: @[ControlIHave(topicId: "foobar", messageIds: iwantMessageIds)] ihave: @[ControlIHave(topicId: "foobar", messageIds: iwantMessageIds)]
)))) ))), isHighPriority = false)
checkUntilTimeout: receivedMessages[] == sentMessages checkUntilTimeout: receivedMessages[] == sentMessages
check receivedMessages[].len == 2 check receivedMessages[].len == 2
@ -797,7 +797,7 @@ suite "GossipSub internal":
gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage( gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)] ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)]
)))) ))), isHighPriority = false)
await sleepAsync(300.milliseconds) await sleepAsync(300.milliseconds)
checkUntilTimeout: receivedMessages[].len == 0 checkUntilTimeout: receivedMessages[].len == 0
@ -814,7 +814,7 @@ suite "GossipSub internal":
gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage( gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)] ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)]
)))) ))), isHighPriority = false)
checkUntilTimeout: receivedMessages[] == sentMessages checkUntilTimeout: receivedMessages[] == sentMessages
check receivedMessages[].len == 2 check receivedMessages[].len == 2
@ -832,7 +832,7 @@ suite "GossipSub internal":
gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage( gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)] ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)]
)))) ))), isHighPriority = false)
var smallestSet: HashSet[seq[byte]] var smallestSet: HashSet[seq[byte]]
let seqs = toSeq(sentMessages) let seqs = toSeq(sentMessages)

View File

@ -912,7 +912,7 @@ suite "GossipSub":
gossip3.broadcast(gossip3.mesh["foobar"], RPCMsg(control: some(ControlMessage( gossip3.broadcast(gossip3.mesh["foobar"], RPCMsg(control: some(ControlMessage(
idontwant: @[ControlIWant(messageIds: @[newSeq[byte](10)])] idontwant: @[ControlIWant(messageIds: @[newSeq[byte](10)])]
)))) ))), isHighPriority = true)
checkUntilTimeout: gossip2.mesh.getOrDefault("foobar").anyIt(it.heDontWants[^1].len == 1) checkUntilTimeout: gossip2.mesh.getOrDefault("foobar").anyIt(it.heDontWants[^1].len == 1)
tryPublish await nodes[0].publish("foobar", newSeq[byte](10000)), 1 tryPublish await nodes[0].publish("foobar", newSeq[byte](10000)), 1
@ -968,7 +968,10 @@ suite "GossipSub":
let rateLimitHits = currentRateLimitHits() let rateLimitHits = currentRateLimitHits()
let (nodes, gossip0, gossip1) = await initializeGossipTest() let (nodes, gossip0, gossip1) = await initializeGossipTest()
gossip0.broadcast(gossip0.mesh["foobar"], RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](10))])) gossip0.broadcast(
gossip0.mesh["foobar"],
RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](10))]),
isHighPriority = true)
await sleepAsync(300.millis) await sleepAsync(300.millis)
check currentRateLimitHits() == rateLimitHits check currentRateLimitHits() == rateLimitHits
@ -976,7 +979,10 @@ suite "GossipSub":
# Disconnect peer when rate limiting is enabled # Disconnect peer when rate limiting is enabled
gossip1.parameters.disconnectPeerAboveRateLimit = true gossip1.parameters.disconnectPeerAboveRateLimit = true
gossip0.broadcast(gossip0.mesh["foobar"], RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](12))])) gossip0.broadcast(
gossip0.mesh["foobar"],
RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](12))]),
isHighPriority = true)
await sleepAsync(300.millis) await sleepAsync(300.millis)
check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true
@ -990,7 +996,7 @@ suite "GossipSub":
let (nodes, gossip0, gossip1) = await initializeGossipTest() let (nodes, gossip0, gossip1) = await initializeGossipTest()
# Simulate sending an undecodable message # Simulate sending an undecodable message
await gossip1.peers[gossip0.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](33, 1.byte)) await gossip1.peers[gossip0.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](33, 1.byte), isHighPriority = true)
await sleepAsync(300.millis) await sleepAsync(300.millis)
check currentRateLimitHits() == rateLimitHits + 1 check currentRateLimitHits() == rateLimitHits + 1
@ -998,7 +1004,7 @@ suite "GossipSub":
# Disconnect peer when rate limiting is enabled # Disconnect peer when rate limiting is enabled
gossip1.parameters.disconnectPeerAboveRateLimit = true gossip1.parameters.disconnectPeerAboveRateLimit = true
await gossip0.peers[gossip1.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](35, 1.byte)) await gossip0.peers[gossip1.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](35, 1.byte), isHighPriority = true)
checkUntilTimeout gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false checkUntilTimeout gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
check currentRateLimitHits() == rateLimitHits + 2 check currentRateLimitHits() == rateLimitHits + 2
@ -1014,7 +1020,7 @@ suite "GossipSub":
PeerInfoMsg(peerId: PeerId(data: newSeq[byte](33))) PeerInfoMsg(peerId: PeerId(data: newSeq[byte](33)))
], backoff: 123'u64) ], backoff: 123'u64)
]))) ])))
gossip0.broadcast(gossip0.mesh["foobar"], msg) gossip0.broadcast(gossip0.mesh["foobar"], msg, isHighPriority = true)
await sleepAsync(300.millis) await sleepAsync(300.millis)
check currentRateLimitHits() == rateLimitHits + 1 check currentRateLimitHits() == rateLimitHits + 1
@ -1027,7 +1033,7 @@ suite "GossipSub":
PeerInfoMsg(peerId: PeerId(data: newSeq[byte](35))) PeerInfoMsg(peerId: PeerId(data: newSeq[byte](35)))
], backoff: 123'u64) ], backoff: 123'u64)
]))) ])))
gossip0.broadcast(gossip0.mesh["foobar"], msg2) gossip0.broadcast(gossip0.mesh["foobar"], msg2, isHighPriority = true)
checkUntilTimeout gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false checkUntilTimeout gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
check currentRateLimitHits() == rateLimitHits + 2 check currentRateLimitHits() == rateLimitHits + 2
@ -1049,7 +1055,7 @@ suite "GossipSub":
let msg = RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](40))]) let msg = RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](40))])
gossip0.broadcast(gossip0.mesh[topic], msg) gossip0.broadcast(gossip0.mesh[topic], msg, isHighPriority = true)
await sleepAsync(300.millis) await sleepAsync(300.millis)
check currentRateLimitHits() == rateLimitHits + 1 check currentRateLimitHits() == rateLimitHits + 1
@ -1057,7 +1063,10 @@ suite "GossipSub":
# Disconnect peer when rate limiting is enabled # Disconnect peer when rate limiting is enabled
gossip1.parameters.disconnectPeerAboveRateLimit = true gossip1.parameters.disconnectPeerAboveRateLimit = true
gossip0.broadcast(gossip0.mesh[topic], RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](35))])) gossip0.broadcast(
gossip0.mesh[topic],
RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](35))]),
isHighPriority = true)
checkUntilTimeout gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false checkUntilTimeout gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
check currentRateLimitHits() == rateLimitHits + 2 check currentRateLimitHits() == rateLimitHits + 2

View File

@ -28,7 +28,7 @@ when not defined(macosx):
await sleepAsync(500.milliseconds) await sleepAsync(500.milliseconds)
await hb.cancelAndWait() await hb.cancelAndWait()
check: check:
i in 9..11 i in 9..12
asyncTest "change heartbeat period on the fly": asyncTest "change heartbeat period on the fly":
var i = 0 var i = 0
@ -46,7 +46,7 @@ when not defined(macosx):
# (500 ms - 120 ms) / 75ms = 5x 75ms # (500 ms - 120 ms) / 75ms = 5x 75ms
# total 9 # total 9
check: check:
i in 8..10 i in 8..11
asyncTest "catch up on slow heartbeat": asyncTest "catch up on slow heartbeat":
var i = 0 var i = 0
@ -63,4 +63,4 @@ when not defined(macosx):
# 360ms remaining, / 30ms = 12x # 360ms remaining, / 30ms = 12x
# total 15 # total 15
check: check:
i in 14..16 i in 14..17