mirror of
https://github.com/status-im/nim-libp2p.git
synced 2025-02-18 07:46:42 +00:00
make forward messages non priority
This commit is contained in:
parent
5594bcb33e
commit
3c8e2a3725
@ -303,7 +303,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
|
|||||||
trace "sending control message", msg = shortLog(respControl), peer
|
trace "sending control message", msg = shortLog(respControl), peer
|
||||||
g.send(
|
g.send(
|
||||||
peer,
|
peer,
|
||||||
RPCMsg(control: some(respControl), messages: messages))
|
RPCMsg(control: some(respControl), messages: messages), true)
|
||||||
|
|
||||||
proc validateAndRelay(g: GossipSub,
|
proc validateAndRelay(g: GossipSub,
|
||||||
msg: Message,
|
msg: Message,
|
||||||
@ -370,7 +370,7 @@ proc validateAndRelay(g: GossipSub,
|
|||||||
|
|
||||||
# In theory, if topics are the same in all messages, we could batch - we'd
|
# In theory, if topics are the same in all messages, we could batch - we'd
|
||||||
# also have to be careful to only include validated messages
|
# also have to be careful to only include validated messages
|
||||||
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
|
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), false)
|
||||||
trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer
|
trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer
|
||||||
for topic in msg.topicIds:
|
for topic in msg.topicIds:
|
||||||
if topic notin g.topics: continue
|
if topic notin g.topics: continue
|
||||||
@ -441,7 +441,7 @@ method rpcHandler*(g: GossipSub,
|
|||||||
peer.recvObservers(rpcMsg)
|
peer.recvObservers(rpcMsg)
|
||||||
|
|
||||||
if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0:
|
if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0:
|
||||||
g.send(peer, RPCMsg(pong: rpcMsg.ping))
|
g.send(peer, RPCMsg(pong: rpcMsg.ping), true)
|
||||||
peer.pingBudget.dec
|
peer.pingBudget.dec
|
||||||
for i in 0..<min(g.topicsHigh, rpcMsg.subscriptions.len):
|
for i in 0..<min(g.topicsHigh, rpcMsg.subscriptions.len):
|
||||||
template sub: untyped = rpcMsg.subscriptions[i]
|
template sub: untyped = rpcMsg.subscriptions[i]
|
||||||
@ -655,7 +655,7 @@ method publish*(g: GossipSub,
|
|||||||
|
|
||||||
g.mcache.put(msgId, msg)
|
g.mcache.put(msgId, msg)
|
||||||
|
|
||||||
g.broadcast(peers, RPCMsg(messages: @[msg]))
|
g.broadcast(peers, RPCMsg(messages: @[msg]), true)
|
||||||
|
|
||||||
if g.knownTopics.contains(topic):
|
if g.knownTopics.contains(topic):
|
||||||
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic])
|
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic])
|
||||||
|
@ -138,17 +138,18 @@ method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} =
|
|||||||
|
|
||||||
libp2p_pubsub_peers.set(p.peers.len.int64)
|
libp2p_pubsub_peers.set(p.peers.len.int64)
|
||||||
|
|
||||||
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) {.raises: [].} =
|
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool = false) {.raises: [].} =
|
||||||
## Attempt to send `msg` to remote peer
|
## Attempt to send `msg` to remote peer
|
||||||
##
|
##
|
||||||
|
|
||||||
trace "sending pubsub message to peer", peer, msg = shortLog(msg)
|
trace "sending pubsub message to peer", peer, msg = shortLog(msg)
|
||||||
peer.send(msg, p.anonymize)
|
peer.send(msg, p.anonymize, isHighPriority)
|
||||||
|
|
||||||
proc broadcast*(
|
proc broadcast*(
|
||||||
p: PubSub,
|
p: PubSub,
|
||||||
sendPeers: auto, # Iteratble[PubSubPeer]
|
sendPeers: auto, # Iteratble[PubSubPeer]
|
||||||
msg: RPCMsg) {.raises: [].} =
|
msg: RPCMsg,
|
||||||
|
isHighPriority: bool = false) {.raises: [].} =
|
||||||
## Attempt to send `msg` to the given peers
|
## Attempt to send `msg` to the given peers
|
||||||
|
|
||||||
let npeers = sendPeers.len.int64
|
let npeers = sendPeers.len.int64
|
||||||
@ -195,12 +196,12 @@ proc broadcast*(
|
|||||||
|
|
||||||
if anyIt(sendPeers, it.hasObservers):
|
if anyIt(sendPeers, it.hasObservers):
|
||||||
for peer in sendPeers:
|
for peer in sendPeers:
|
||||||
p.send(peer, msg)
|
p.send(peer, msg, isHighPriority)
|
||||||
else:
|
else:
|
||||||
# Fast path that only encodes message once
|
# Fast path that only encodes message once
|
||||||
let encoded = encodeRpcMsg(msg, p.anonymize)
|
let encoded = encodeRpcMsg(msg, p.anonymize)
|
||||||
for peer in sendPeers:
|
for peer in sendPeers:
|
||||||
asyncSpawn peer.sendEncoded(encoded)
|
asyncSpawn peer.sendEncoded(encoded, isHighPriority)
|
||||||
|
|
||||||
proc sendSubs*(p: PubSub,
|
proc sendSubs*(p: PubSub,
|
||||||
peer: PubSubPeer,
|
peer: PubSubPeer,
|
||||||
|
@ -20,6 +20,7 @@ import rpc/[messages, message, protobuf],
|
|||||||
../../crypto/crypto,
|
../../crypto/crypto,
|
||||||
../../protobuf/minprotobuf,
|
../../protobuf/minprotobuf,
|
||||||
../../utility
|
../../utility
|
||||||
|
import rpcmessagequeue
|
||||||
|
|
||||||
export peerid, connection, deques
|
export peerid, connection, deques
|
||||||
|
|
||||||
@ -31,6 +32,9 @@ when defined(libp2p_expensive_metrics):
|
|||||||
declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"])
|
declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"])
|
||||||
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"])
|
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"])
|
||||||
|
|
||||||
|
declareGauge(libp2p_gossipsub_priority_queue_size, "the number of messages in the priority queue", labels = ["id_or_agent"])
|
||||||
|
declareGauge(libp2p_gossipsub_non_priority_queue_size, "the number of messages in the non-priority queue", labels = ["id_or_agent"])
|
||||||
|
|
||||||
type
|
type
|
||||||
PeerRateLimitError* = object of CatchableError
|
PeerRateLimitError* = object of CatchableError
|
||||||
|
|
||||||
@ -70,6 +74,8 @@ type
|
|||||||
behaviourPenalty*: float64 # the eventual penalty score
|
behaviourPenalty*: float64 # the eventual penalty score
|
||||||
overheadRateLimitOpt*: Opt[TokenBucket]
|
overheadRateLimitOpt*: Opt[TokenBucket]
|
||||||
|
|
||||||
|
rpcmessagequeue*: RpcMessageQueue
|
||||||
|
|
||||||
RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void]
|
RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void]
|
||||||
{.gcsafe, raises: [].}
|
{.gcsafe, raises: [].}
|
||||||
|
|
||||||
@ -82,6 +88,16 @@ when defined(libp2p_agents_metrics):
|
|||||||
#so we have to read the parents short agent..
|
#so we have to read the parents short agent..
|
||||||
p.sendConn.getWrapped().shortAgent
|
p.sendConn.getWrapped().shortAgent
|
||||||
|
|
||||||
|
proc getAgent*(peer: PubSubPeer): string =
|
||||||
|
return
|
||||||
|
when defined(libp2p_agents_metrics):
|
||||||
|
if peer.shortAgent.len > 0:
|
||||||
|
peer.shortAgent
|
||||||
|
else:
|
||||||
|
"unknown"
|
||||||
|
else:
|
||||||
|
"unknown"
|
||||||
|
|
||||||
func hash*(p: PubSubPeer): Hash =
|
func hash*(p: PubSubPeer): Hash =
|
||||||
p.peerId.hash
|
p.peerId.hash
|
||||||
|
|
||||||
@ -227,7 +243,7 @@ template sendMetrics(msg: RPCMsg): untyped =
|
|||||||
# metrics
|
# metrics
|
||||||
libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t])
|
libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t])
|
||||||
|
|
||||||
proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.async.} =
|
proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) {.async.} =
|
||||||
doAssert(not isNil(p), "pubsubpeer nil!")
|
doAssert(not isNil(p), "pubsubpeer nil!")
|
||||||
|
|
||||||
if msg.len <= 0:
|
if msg.len <= 0:
|
||||||
@ -238,29 +254,18 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.async.} =
|
|||||||
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
|
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
|
||||||
return
|
return
|
||||||
|
|
||||||
if p.sendConn == nil:
|
if isHighPriority:
|
||||||
# Wait for a send conn to be setup. `connectOnce` will
|
await p.rpcmessagequeue.addPriorityMessage(msg)
|
||||||
# complete this even if the sendConn setup failed
|
when defined(libp2p_expensive_metrics):
|
||||||
await p.connectedFut
|
libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId])
|
||||||
|
else:
|
||||||
var conn = p.sendConn
|
libp2p_gossipsub_priority_queue_size.inc(labelValues = [p.getAgent()])
|
||||||
if conn == nil or conn.closed():
|
else:
|
||||||
debug "No send connection", p, msg = shortLog(msg)
|
await p.rpcmessagequeue.addNonPriorityMessage(msg)
|
||||||
return
|
when defined(libp2p_expensive_metrics):
|
||||||
|
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
|
||||||
trace "sending encoded msgs to peer", conn, encoded = shortLog(msg)
|
else:
|
||||||
|
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [p.getAgent()])
|
||||||
try:
|
|
||||||
await conn.writeLp(msg)
|
|
||||||
trace "sent pubsub message to remote", conn
|
|
||||||
except CatchableError as exc: # never cancelled
|
|
||||||
# Because we detach the send call from the currently executing task using
|
|
||||||
# asyncSpawn, no exceptions may leak out of it
|
|
||||||
trace "Unable to send to remote", conn, msg = exc.msg
|
|
||||||
# Next time sendConn is used, it will be have its close flag set and thus
|
|
||||||
# will be recycled
|
|
||||||
|
|
||||||
await conn.close() # This will clean up the send connection
|
|
||||||
|
|
||||||
iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] =
|
iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] =
|
||||||
## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances.
|
## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances.
|
||||||
@ -297,7 +302,7 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize:
|
|||||||
else:
|
else:
|
||||||
trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)
|
trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)
|
||||||
|
|
||||||
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} =
|
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool = false) {.raises: [].} =
|
||||||
# When sending messages, we take care to re-encode them with the right
|
# When sending messages, we take care to re-encode them with the right
|
||||||
# anonymization flag to ensure that we're not penalized for sending invalid
|
# anonymization flag to ensure that we're not penalized for sending invalid
|
||||||
# or malicious data on the wire - in particular, re-encoding protects against
|
# or malicious data on the wire - in particular, re-encoding protects against
|
||||||
@ -317,11 +322,11 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} =
|
|||||||
|
|
||||||
if encoded.len > p.maxMessageSize and msg.messages.len > 1:
|
if encoded.len > p.maxMessageSize and msg.messages.len > 1:
|
||||||
for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize):
|
for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize):
|
||||||
asyncSpawn p.sendEncoded(encodedSplitMsg)
|
asyncSpawn p.sendEncoded(encodedSplitMsg, isHighPriority)
|
||||||
else:
|
else:
|
||||||
# If the message size is within limits, send it as is
|
# If the message size is within limits, send it as is
|
||||||
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
|
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
|
||||||
asyncSpawn p.sendEncoded(encoded)
|
asyncSpawn p.sendEncoded(encoded, isHighPriority)
|
||||||
|
|
||||||
proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
|
proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
|
||||||
for sentIHave in p.sentIHaves.mitems():
|
for sentIHave in p.sentIHaves.mitems():
|
||||||
@ -330,6 +335,55 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
|
|||||||
return true
|
return true
|
||||||
return false
|
return false
|
||||||
|
|
||||||
|
proc getMessage(rpcMessageQueue: RpcMessageQueue, p: PubSubPeer): Future[Opt[seq[byte]]] {.async.} =
|
||||||
|
var m = await rpcMessageQueue.getPriorityMessage()
|
||||||
|
if m.isSome():
|
||||||
|
when defined(libp2p_expensive_metrics):
|
||||||
|
libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId])
|
||||||
|
else:
|
||||||
|
libp2p_gossipsub_priority_queue_size.dec(labelValues = [p.getAgent()])
|
||||||
|
return m
|
||||||
|
else:
|
||||||
|
m = await rpcMessageQueue.getNonPriorityMessage()
|
||||||
|
if m.isSome():
|
||||||
|
when defined(libp2p_expensive_metrics):
|
||||||
|
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId])
|
||||||
|
else:
|
||||||
|
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [p.getAgent()])
|
||||||
|
return m
|
||||||
|
else:
|
||||||
|
return Opt.none(seq[byte])
|
||||||
|
|
||||||
|
proc processMessages(p: PubSubPeer, queue: RpcMessageQueue) {.async.} =
|
||||||
|
while true:
|
||||||
|
let m = await queue.getMessage(p)
|
||||||
|
m.withValue(msg):
|
||||||
|
if p.sendConn == nil:
|
||||||
|
# Wait for a send conn to be setup. `connectOnce` will
|
||||||
|
# complete this even if the sendConn setup failed
|
||||||
|
await p.connectedFut
|
||||||
|
|
||||||
|
var conn = p.sendConn
|
||||||
|
if conn == nil or conn.closed():
|
||||||
|
debug "No send connection", msg = shortLog(msg)
|
||||||
|
return
|
||||||
|
|
||||||
|
trace "sending encoded msgs to peer", p, conn, encoded = shortLog(msg)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await conn.writeLp(msg)
|
||||||
|
trace "sent pubsub message to remote", conn
|
||||||
|
except CatchableError as exc: # never cancelled
|
||||||
|
# Because we detach the send call from the currently executing task using
|
||||||
|
# asyncSpawn, no exceptions may leak out of it
|
||||||
|
trace "Unable to send to remote", conn, msg = exc.msg
|
||||||
|
# Next time sendConn is used, it will be have its close flag set and thus
|
||||||
|
# will be recycled
|
||||||
|
|
||||||
|
await conn.close() # This will clean up the send connection
|
||||||
|
# Include a delay or a mechanism to prevent this loop from consuming too much CPU
|
||||||
|
await sleepAsync(10) # For example, a 10 ms sleep
|
||||||
|
|
||||||
proc new*(
|
proc new*(
|
||||||
T: typedesc[PubSubPeer],
|
T: typedesc[PubSubPeer],
|
||||||
peerId: PeerId,
|
peerId: PeerId,
|
||||||
@ -346,17 +400,9 @@ proc new*(
|
|||||||
peerId: peerId,
|
peerId: peerId,
|
||||||
connectedFut: newFuture[void](),
|
connectedFut: newFuture[void](),
|
||||||
maxMessageSize: maxMessageSize,
|
maxMessageSize: maxMessageSize,
|
||||||
overheadRateLimitOpt: overheadRateLimitOpt
|
overheadRateLimitOpt: overheadRateLimitOpt,
|
||||||
|
rpcmessagequeue: RpcMessageQueue.new(),
|
||||||
)
|
)
|
||||||
result.sentIHaves.addFirst(default(HashSet[MessageId]))
|
result.sentIHaves.addFirst(default(HashSet[MessageId]))
|
||||||
result.heDontWants.addFirst(default(HashSet[MessageId]))
|
result.heDontWants.addFirst(default(HashSet[MessageId]))
|
||||||
|
asyncSpawn result.processMessages(result.rpcmessagequeue)
|
||||||
proc getAgent*(peer: PubSubPeer): string =
|
|
||||||
return
|
|
||||||
when defined(libp2p_agents_metrics):
|
|
||||||
if peer.shortAgent.len > 0:
|
|
||||||
peer.shortAgent
|
|
||||||
else:
|
|
||||||
"unknown"
|
|
||||||
else:
|
|
||||||
"unknown"
|
|
||||||
|
44
libp2p/protocols/pubsub/rpcmessagequeue.nim
Normal file
44
libp2p/protocols/pubsub/rpcmessagequeue.nim
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
# Nim-LibP2P
|
||||||
|
# Copyright (c) 2023 Status Research & Development GmbH
|
||||||
|
# Licensed under either of
|
||||||
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||||
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||||
|
# at your option.
|
||||||
|
# This file may not be copied, modified, or distributed except according to
|
||||||
|
# those terms.
|
||||||
|
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import chronos, chronicles, stew/results
|
||||||
|
import ../../stream/connection
|
||||||
|
|
||||||
|
type
|
||||||
|
RpcMessageQueue* = ref object
|
||||||
|
priorityQueue: AsyncQueue[seq[byte]]
|
||||||
|
nonPriorityQueue: AsyncQueue[seq[byte]]
|
||||||
|
|
||||||
|
proc addPriorityMessage*(aq: RpcMessageQueue; msg: seq[byte]) {.async.} =
|
||||||
|
await aq.priorityQueue.put(msg)
|
||||||
|
|
||||||
|
proc addNonPriorityMessage*(aq: RpcMessageQueue; msg: seq[byte]) {.async.} =
|
||||||
|
await aq.nonPriorityQueue.put(msg)
|
||||||
|
|
||||||
|
proc new*(T: typedesc[RpcMessageQueue]): T =
|
||||||
|
return T(
|
||||||
|
priorityQueue: newAsyncQueue[seq[byte]](),
|
||||||
|
nonPriorityQueue: newAsyncQueue[seq[byte]]()
|
||||||
|
)
|
||||||
|
|
||||||
|
proc getPriorityMessage*(rpcMessageQueue: RpcMessageQueue): Future[Opt[seq[byte]]] {.async.} =
|
||||||
|
return
|
||||||
|
if not rpcMessageQueue.priorityQueue.empty():
|
||||||
|
Opt.some(rpcMessageQueue.priorityQueue.getNoWait())
|
||||||
|
else:
|
||||||
|
Opt.none(seq[byte])
|
||||||
|
|
||||||
|
proc getNonPriorityMessage*(rpcMessageQueue: RpcMessageQueue): Future[Opt[seq[byte]]] {.async.} =
|
||||||
|
return
|
||||||
|
if not rpcMessageQueue.nonPriorityQueue.empty():
|
||||||
|
Opt.some(rpcMessageQueue.nonPriorityQueue.getNoWait())
|
||||||
|
else:
|
||||||
|
Opt.none(seq[byte])
|
Loading…
x
Reference in New Issue
Block a user