diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 819161c0c..8809d6fe7 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -157,7 +157,7 @@ method rpcHandler*(f: FloodSub, # In theory, if topics are the same in all messages, we could batch - we'd # also have to be careful to only include validated messages - f.broadcast(toSendPeers, RPCMsg(messages: @[msg])) + f.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false) trace "Forwared message to peers", peers = toSendPeers.len f.updateMetrics(rpcMsg) @@ -219,7 +219,7 @@ method publish*(f: FloodSub, return 0 # Try to send to all peers that are known to be interested - f.broadcast(peers, RPCMsg(messages: @[msg])) + f.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true) when defined(libp2p_expensive_metrics): libp2p_pubsub_messages_published.inc(labelValues = [topic]) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 5740f5d8d..59ad4376d 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -220,6 +220,8 @@ method unsubscribePeer*(g: GossipSub, peer: PeerId) = for topic, info in stats[].topicInfos.mpairs: info.firstMessageDeliveries = 0 + pubSubPeer.stopSendNonPriorityTask() + procCall FloodSub(g).unsubscribePeer(peer) proc handleSubscribe*(g: GossipSub, @@ -279,12 +281,28 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = respControl.prune.add(g.handleGraft(peer, control.graft)) let messages = g.handleIWant(peer, control.iwant) - if - respControl.prune.len > 0 or - respControl.iwant.len > 0 or - messages.len > 0: - # iwant and prunes from here, also messages + let + isPruneNotEmpty = respControl.prune.len > 0 + isIWantNotEmpty = respControl.iwant.len > 0 + if isPruneNotEmpty or isIWantNotEmpty: + + if isIWantNotEmpty: + libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64) + + if isPruneNotEmpty: + for prune in respControl.prune: + if g.knownTopics.contains(prune.topicId): + libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicId]) + else: + libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"]) + + trace "sending control message", msg = shortLog(respControl), peer + g.send( + peer, + RPCMsg(control: some(respControl)), isHighPriority = true) + + if messages.len > 0: for smsg in messages: for topic in smsg.topicIds: if g.knownTopics.contains(topic): @@ -292,18 +310,11 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = else: libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"]) - libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64) - - for prune in respControl.prune: - if g.knownTopics.contains(prune.topicId): - libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicId]) - else: - libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"]) - - trace "sending control message", msg = shortLog(respControl), peer + # iwant replies have lower priority + trace "sending iwant reply messages", peer g.send( peer, - RPCMsg(control: some(respControl), messages: messages)) + RPCMsg(messages: messages), isHighPriority = false) proc validateAndRelay(g: GossipSub, msg: Message, @@ -356,7 +367,7 @@ proc validateAndRelay(g: GossipSub, if msg.data.len > msgId.len * 10: g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage( idontwant: @[ControlIWant(messageIds: @[msgId])] - )))) + ))), isHighPriority = true) for peer in toSendPeers: for heDontWant in peer.heDontWants: @@ -370,7 +381,7 @@ proc validateAndRelay(g: GossipSub, # In theory, if topics are the same in all messages, we could batch - we'd # also have to be careful to only include validated messages - g.broadcast(toSendPeers, RPCMsg(messages: @[msg])) + g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false) trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer for topic in msg.topicIds: if topic notin g.topics: continue @@ -441,7 +452,7 @@ method rpcHandler*(g: GossipSub, peer.recvObservers(rpcMsg) if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0: - g.send(peer, RPCMsg(pong: rpcMsg.ping)) + g.send(peer, RPCMsg(pong: rpcMsg.ping), isHighPriority = true) peer.pingBudget.dec for i in 0.. 0: let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)]))) - g.broadcast(grafts, graft) + g.broadcast(grafts, graft, isHighPriority = true) if prunes.len > 0: let prune = RPCMsg(control: some(ControlMessage( prune: @[ControlPrune( topicID: topic, peers: g.peerExchangeList(topic), backoff: g.parameters.pruneBackoff.seconds.uint64)]))) - g.broadcast(prunes, prune) + g.broadcast(prunes, prune, isHighPriority = true) proc dropFanoutPeers*(g: GossipSub) {.raises: [].} = # drop peers that we haven't published to in @@ -669,7 +669,7 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} = topicID: t, peers: g.peerExchangeList(t), backoff: g.parameters.pruneBackoff.seconds.uint64)]))) - g.broadcast(prunes, prune) + g.broadcast(prunes, prune, isHighPriority = true) # pass by ptr in order to both signal we want to update metrics # and as well update the struct for each topic during this iteration @@ -691,7 +691,7 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} = libp2p_pubsub_broadcast_ihave.inc(labelValues = [ihave.topicId]) else: libp2p_pubsub_broadcast_ihave.inc(labelValues = ["generic"]) - g.send(peer, RPCMsg(control: some(control))) + g.send(peer, RPCMsg(control: some(control)), isHighPriority = true) g.mcache.shift() # shift the cache diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index ef4d68002..2be9e4f29 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -138,18 +138,34 @@ method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} = libp2p_pubsub_peers.set(p.peers.len.int64) -proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) {.raises: [].} = - ## Attempt to send `msg` to remote peer +proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool) {.raises: [].} = + ## This procedure attempts to send a `msg` (of type `RPCMsg`) to the specified remote peer in the PubSub network. ## + ## Parameters: + ## - `p`: The `PubSub` instance. + ## - `peer`: An instance of `PubSubPeer` representing the peer to whom the message should be sent. + ## - `msg`: The `RPCMsg` instance that contains the message to be sent. + ## - `isHighPriority`: A boolean indicating whether the message should be treated as high priority. + ## High priority messages are sent immediately, while low priority messages are queued and sent only after all high + ## priority messages have been sent. trace "sending pubsub message to peer", peer, msg = shortLog(msg) - peer.send(msg, p.anonymize) + peer.send(msg, p.anonymize, isHighPriority) proc broadcast*( p: PubSub, sendPeers: auto, # Iteratble[PubSubPeer] - msg: RPCMsg) {.raises: [].} = - ## Attempt to send `msg` to the given peers + msg: RPCMsg, + isHighPriority: bool) {.raises: [].} = + ## This procedure attempts to send a `msg` (of type `RPCMsg`) to a specified group of peers in the PubSub network. + ## + ## Parameters: + ## - `p`: The `PubSub` instance. + ## - `sendPeers`: An iterable of `PubSubPeer` instances representing the peers to whom the message should be sent. + ## - `msg`: The `RPCMsg` instance that contains the message to be broadcast. + ## - `isHighPriority`: A boolean indicating whether the message should be treated as high priority. + ## High priority messages are sent immediately, while low priority messages are queued and sent only after all high + ## priority messages have been sent. let npeers = sendPeers.len.int64 for sub in msg.subscriptions: @@ -195,19 +211,19 @@ proc broadcast*( if anyIt(sendPeers, it.hasObservers): for peer in sendPeers: - p.send(peer, msg) + p.send(peer, msg, isHighPriority) else: # Fast path that only encodes message once let encoded = encodeRpcMsg(msg, p.anonymize) for peer in sendPeers: - asyncSpawn peer.sendEncoded(encoded) + asyncSpawn peer.sendEncoded(encoded, isHighPriority) proc sendSubs*(p: PubSub, peer: PubSubPeer, topics: openArray[string], subscribe: bool) = ## send subscriptions to remote peer - p.send(peer, RPCMsg.withSubs(topics, subscribe)) + p.send(peer, RPCMsg.withSubs(topics, subscribe), isHighPriority = true) for topic in topics: if subscribe: diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 75eec5f1c..1de6ea79e 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -31,6 +31,10 @@ when defined(libp2p_expensive_metrics): declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"]) declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"]) +when defined(pubsubpeer_queue_metrics): + declareGauge(libp2p_gossipsub_priority_queue_size, "the number of messages in the priority queue", labels = ["id"]) + declareGauge(libp2p_gossipsub_non_priority_queue_size, "the number of messages in the non-priority queue", labels = ["id"]) + type PeerRateLimitError* = object of CatchableError @@ -49,6 +53,14 @@ type DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].} + RpcMessageQueue* = ref object + # Tracks async tasks for sending high-priority peer-published messages. + sendPriorityQueue: Deque[Future[void]] + # Queue for lower-priority messages, like "IWANT" replies and relay messages. + nonPriorityQueue: AsyncQueue[seq[byte]] + # Task for processing non-priority message queue. + sendNonPriorityTask: Future[void] + PubSubPeer* = ref object of RootObj getConn*: GetConn # callback to establish a new send connection onEvent*: OnEvent # Connectivity updates for peer @@ -70,6 +82,8 @@ type behaviourPenalty*: float64 # the eventual penalty score overheadRateLimitOpt*: Opt[TokenBucket] + rpcmessagequeue: RpcMessageQueue + RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void] {.gcsafe, raises: [].} @@ -82,6 +96,16 @@ when defined(libp2p_agents_metrics): #so we have to read the parents short agent.. p.sendConn.getWrapped().shortAgent +proc getAgent*(peer: PubSubPeer): string = + return + when defined(libp2p_agents_metrics): + if peer.shortAgent.len > 0: + peer.shortAgent + else: + "unknown" + else: + "unknown" + func hash*(p: PubSubPeer): Hash = p.peerId.hash @@ -227,17 +251,40 @@ template sendMetrics(msg: RPCMsg): untyped = # metrics libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t]) -proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.async.} = - doAssert(not isNil(p), "pubsubpeer nil!") +proc clearSendPriorityQueue(p: PubSubPeer) = + if p.rpcmessagequeue.sendPriorityQueue.len == 0: + return # fast path - if msg.len <= 0: - debug "empty message, skipping", p, msg = shortLog(msg) - return + while p.rpcmessagequeue.sendPriorityQueue.len > 0 and + p.rpcmessagequeue.sendPriorityQueue[0].finished: + discard p.rpcmessagequeue.sendPriorityQueue.popFirst() - if msg.len > p.maxMessageSize: - info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len - return + while p.rpcmessagequeue.sendPriorityQueue.len > 0 and + p.rpcmessagequeue.sendPriorityQueue[^1].finished: + discard p.rpcmessagequeue.sendPriorityQueue.popLast() + when defined(pubsubpeer_queue_metrics): + libp2p_gossipsub_priority_queue_size.set( + value = p.rpcmessagequeue.sendPriorityQueue.len.int64, + labelValues = [$p.peerId]) + +proc sendMsgContinue(conn: Connection, msgFut: Future[void]) {.async.} = + # Continuation for a pending `sendMsg` future from below + try: + await msgFut + trace "sent pubsub message to remote", conn + except CatchableError as exc: # never cancelled + # Because we detach the send call from the currently executing task using + # asyncSpawn, no exceptions may leak out of it + trace "Unable to send to remote", conn, msg = exc.msg + # Next time sendConn is used, it will be have its close flag set and thus + # will be recycled + + await conn.close() # This will clean up the send connection + +proc sendMsgSlow(p: PubSubPeer, msg: seq[byte]) {.async.} = + # Slow path of `sendMsg` where msg is held in memory while send connection is + # being set up if p.sendConn == nil: # Wait for a send conn to be setup. `connectOnce` will # complete this even if the sendConn setup failed @@ -248,19 +295,53 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.async.} = debug "No send connection", p, msg = shortLog(msg) return - trace "sending encoded msgs to peer", conn, encoded = shortLog(msg) + trace "sending encoded msg to peer", conn, encoded = shortLog(msg) + await sendMsgContinue(conn, conn.writeLp(msg)) - try: - await conn.writeLp(msg) - trace "sent pubsub message to remote", conn - except CatchableError as exc: # never cancelled - # Because we detach the send call from the currently executing task using - # asyncSpawn, no exceptions may leak out of it - trace "Unable to send to remote", conn, msg = exc.msg - # Next time sendConn is used, it will be have its close flag set and thus - # will be recycled +proc sendMsg(p: PubSubPeer, msg: seq[byte]): Future[void] = + if p.sendConn != nil and not p.sendConn.closed(): + # Fast path that avoids copying msg (which happens for {.async.}) + let conn = p.sendConn - await conn.close() # This will clean up the send connection + trace "sending encoded msg to peer", conn, encoded = shortLog(msg) + let f = conn.writeLp(msg) + if not f.completed(): + sendMsgContinue(conn, f) + else: + f + else: + sendMsgSlow(p, msg) + +proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[void] = + ## Asynchronously sends an encoded message to a specified `PubSubPeer`. + ## + ## Parameters: + ## - `p`: The `PubSubPeer` instance to which the message is to be sent. + ## - `msg`: The message to be sent, encoded as a sequence of bytes (`seq[byte]`). + ## - `isHighPriority`: A boolean indicating whether the message should be treated as high priority. + ## High priority messages are sent immediately, while low priority messages are queued and sent only after all high + ## priority messages have been sent. + doAssert(not isNil(p), "pubsubpeer nil!") + + if msg.len <= 0: + debug "empty message, skipping", p, msg = shortLog(msg) + Future[void].completed() + elif msg.len > p.maxMessageSize: + info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len + Future[void].completed() + elif isHighPriority: + p.clearSendPriorityQueue() + let f = p.sendMsg(msg) + if not f.finished: + p.rpcmessagequeue.sendPriorityQueue.addLast(f) + when defined(pubsubpeer_queue_metrics): + libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId]) + f + else: + let f = p.rpcmessagequeue.nonPriorityQueue.addLast(msg) + when defined(pubsubpeer_queue_metrics): + libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId]) + f iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] = ## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances. @@ -297,7 +378,16 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: else: trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg) -proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} = +proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool) {.raises: [].} = + ## Asynchronously sends an `RPCMsg` to a specified `PubSubPeer` with an option for anonymization. + ## + ## Parameters: + ## - `p`: The `PubSubPeer` instance to which the message is to be sent. + ## - `msg`: The `RPCMsg` instance representing the message to be sent. + ## - `anonymize`: A boolean flag indicating whether the message should be sent with anonymization. + ## - `isHighPriority`: A boolean flag indicating whether the message should be treated as high priority. + ## High priority messages are sent immediately, while low priority messages are queued and sent only after all high + ## priority messages have been sent. # When sending messages, we take care to re-encode them with the right # anonymization flag to ensure that we're not penalized for sending invalid # or malicious data on the wire - in particular, re-encoding protects against @@ -317,11 +407,11 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} = if encoded.len > p.maxMessageSize and msg.messages.len > 1: for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize): - asyncSpawn p.sendEncoded(encodedSplitMsg) + asyncSpawn p.sendEncoded(encodedSplitMsg, isHighPriority) else: # If the message size is within limits, send it as is trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg) - asyncSpawn p.sendEncoded(encoded) + asyncSpawn p.sendEncoded(encoded, isHighPriority) proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool = for sentIHave in p.sentIHaves.mitems(): @@ -330,6 +420,45 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool = return true return false +proc sendNonPriorityTask(p: PubSubPeer) {.async.} = + while true: + # we send non-priority messages only if there are no pending priority messages + let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst() + while p.rpcmessagequeue.sendPriorityQueue.len > 0: + p.clearSendPriorityQueue() + # waiting for the last future minimizes the number of times we have to + # wait for something (each wait = performance cost) - + # clearSendPriorityQueue ensures we're not waiting for an already-finished + # future + if p.rpcmessagequeue.sendPriorityQueue.len > 0: + await p.rpcmessagequeue.sendPriorityQueue[^1] + when defined(pubsubpeer_queue_metrics): + libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) + await p.sendMsg(msg) + +proc startSendNonPriorityTask(p: PubSubPeer) = + debug "starting sendNonPriorityTask", p + if p.rpcmessagequeue.sendNonPriorityTask.isNil: + p.rpcmessagequeue.sendNonPriorityTask = p.sendNonPriorityTask() + +proc stopSendNonPriorityTask*(p: PubSubPeer) = + if not p.rpcmessagequeue.sendNonPriorityTask.isNil: + debug "stopping sendNonPriorityTask", p + p.rpcmessagequeue.sendNonPriorityTask.cancelSoon() + p.rpcmessagequeue.sendNonPriorityTask = nil + p.rpcmessagequeue.sendPriorityQueue.clear() + p.rpcmessagequeue.nonPriorityQueue.clear() + + when defined(pubsubpeer_queue_metrics): + libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0) + libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0) + +proc new(T: typedesc[RpcMessageQueue]): T = + return T( + sendPriorityQueue: initDeque[Future[void]](), + nonPriorityQueue: newAsyncQueue[seq[byte]](), + ) + proc new*( T: typedesc[PubSubPeer], peerId: PeerId, @@ -346,17 +475,9 @@ proc new*( peerId: peerId, connectedFut: newFuture[void](), maxMessageSize: maxMessageSize, - overheadRateLimitOpt: overheadRateLimitOpt + overheadRateLimitOpt: overheadRateLimitOpt, + rpcmessagequeue: RpcMessageQueue.new(), ) result.sentIHaves.addFirst(default(HashSet[MessageId])) result.heDontWants.addFirst(default(HashSet[MessageId])) - -proc getAgent*(peer: PubSubPeer): string = - return - when defined(libp2p_agents_metrics): - if peer.shortAgent.len > 0: - peer.shortAgent - else: - "unknown" - else: - "unknown" + result.startSendNonPriorityTask() diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index f369d5276..e3a99721f 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -780,7 +780,7 @@ suite "GossipSub internal": gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage( ihave: @[ControlIHave(topicId: "foobar", messageIds: iwantMessageIds)] - )))) + ))), isHighPriority = false) checkUntilTimeout: receivedMessages[] == sentMessages check receivedMessages[].len == 2 @@ -797,7 +797,7 @@ suite "GossipSub internal": gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage( ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)] - )))) + ))), isHighPriority = false) await sleepAsync(300.milliseconds) checkUntilTimeout: receivedMessages[].len == 0 @@ -814,7 +814,7 @@ suite "GossipSub internal": gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage( ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)] - )))) + ))), isHighPriority = false) checkUntilTimeout: receivedMessages[] == sentMessages check receivedMessages[].len == 2 @@ -832,7 +832,7 @@ suite "GossipSub internal": gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage( ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)] - )))) + ))), isHighPriority = false) var smallestSet: HashSet[seq[byte]] let seqs = toSeq(sentMessages) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 1081fe255..a843ed51d 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -912,7 +912,7 @@ suite "GossipSub": gossip3.broadcast(gossip3.mesh["foobar"], RPCMsg(control: some(ControlMessage( idontwant: @[ControlIWant(messageIds: @[newSeq[byte](10)])] - )))) + ))), isHighPriority = true) checkUntilTimeout: gossip2.mesh.getOrDefault("foobar").anyIt(it.heDontWants[^1].len == 1) tryPublish await nodes[0].publish("foobar", newSeq[byte](10000)), 1 @@ -968,7 +968,10 @@ suite "GossipSub": let rateLimitHits = currentRateLimitHits() let (nodes, gossip0, gossip1) = await initializeGossipTest() - gossip0.broadcast(gossip0.mesh["foobar"], RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](10))])) + gossip0.broadcast( + gossip0.mesh["foobar"], + RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](10))]), + isHighPriority = true) await sleepAsync(300.millis) check currentRateLimitHits() == rateLimitHits @@ -976,7 +979,10 @@ suite "GossipSub": # Disconnect peer when rate limiting is enabled gossip1.parameters.disconnectPeerAboveRateLimit = true - gossip0.broadcast(gossip0.mesh["foobar"], RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](12))])) + gossip0.broadcast( + gossip0.mesh["foobar"], + RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](12))]), + isHighPriority = true) await sleepAsync(300.millis) check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true @@ -990,7 +996,7 @@ suite "GossipSub": let (nodes, gossip0, gossip1) = await initializeGossipTest() # Simulate sending an undecodable message - await gossip1.peers[gossip0.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](33, 1.byte)) + await gossip1.peers[gossip0.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](33, 1.byte), isHighPriority = true) await sleepAsync(300.millis) check currentRateLimitHits() == rateLimitHits + 1 @@ -998,7 +1004,7 @@ suite "GossipSub": # Disconnect peer when rate limiting is enabled gossip1.parameters.disconnectPeerAboveRateLimit = true - await gossip0.peers[gossip1.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](35, 1.byte)) + await gossip0.peers[gossip1.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](35, 1.byte), isHighPriority = true) checkUntilTimeout gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false check currentRateLimitHits() == rateLimitHits + 2 @@ -1014,7 +1020,7 @@ suite "GossipSub": PeerInfoMsg(peerId: PeerId(data: newSeq[byte](33))) ], backoff: 123'u64) ]))) - gossip0.broadcast(gossip0.mesh["foobar"], msg) + gossip0.broadcast(gossip0.mesh["foobar"], msg, isHighPriority = true) await sleepAsync(300.millis) check currentRateLimitHits() == rateLimitHits + 1 @@ -1027,7 +1033,7 @@ suite "GossipSub": PeerInfoMsg(peerId: PeerId(data: newSeq[byte](35))) ], backoff: 123'u64) ]))) - gossip0.broadcast(gossip0.mesh["foobar"], msg2) + gossip0.broadcast(gossip0.mesh["foobar"], msg2, isHighPriority = true) checkUntilTimeout gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false check currentRateLimitHits() == rateLimitHits + 2 @@ -1049,7 +1055,7 @@ suite "GossipSub": let msg = RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](40))]) - gossip0.broadcast(gossip0.mesh[topic], msg) + gossip0.broadcast(gossip0.mesh[topic], msg, isHighPriority = true) await sleepAsync(300.millis) check currentRateLimitHits() == rateLimitHits + 1 @@ -1057,7 +1063,10 @@ suite "GossipSub": # Disconnect peer when rate limiting is enabled gossip1.parameters.disconnectPeerAboveRateLimit = true - gossip0.broadcast(gossip0.mesh[topic], RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](35))])) + gossip0.broadcast( + gossip0.mesh[topic], + RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](35))]), + isHighPriority = true) checkUntilTimeout gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false check currentRateLimitHits() == rateLimitHits + 2 diff --git a/tests/testheartbeat.nim b/tests/testheartbeat.nim index 3968daac4..5e89e46a1 100644 --- a/tests/testheartbeat.nim +++ b/tests/testheartbeat.nim @@ -28,7 +28,7 @@ when not defined(macosx): await sleepAsync(500.milliseconds) await hb.cancelAndWait() check: - i in 9..11 + i in 9..12 asyncTest "change heartbeat period on the fly": var i = 0 @@ -46,7 +46,7 @@ when not defined(macosx): # (500 ms - 120 ms) / 75ms = 5x 75ms # total 9 check: - i in 8..10 + i in 8..11 asyncTest "catch up on slow heartbeat": var i = 0 @@ -63,4 +63,4 @@ when not defined(macosx): # 360ms remaining, / 30ms = 12x # total 15 check: - i in 14..16 + i in 14..17