mem usage cleanups for pubsub (#564)

In `async` functions, a closure environment is created for variables
that cross an await boundary - this closure environment is kept in
memory for the lifetime of the associated future - this means that
although _some_ variables are no longer used, they still take up memory
for a long time.

In Nimbus, message validation is processed in batches meaning the future
of an incoming gossip message stays around for quite a while - this
leads to memory consumption peaks of 100-200 mb when there are many
attestations in the pipeline.

To avoid excessive memory usage, it's generally better to move non-async
code into proc's such that the variables therein can be released earlier
- this includes the many hidden variables introduced by macro and
template expansion (ie chronicles that does expensive exception
handling)

* move seen table salt to floodsub, use there as well
* shorten seen table salt to size of hash
* avoid unnecessary memory allocations and copies in a few places
* factor out message scoring
* avoid reencoding outgoing message for every peer
* keep checking validators until reject (in case there's both reject and
ignore)
* `readOnce` avoids `readExactly` overhead for single-byte read
* genericAssign -> assign2
This commit is contained in:
Jacek Sieka 2021-04-18 10:08:33 +02:00 committed by GitHub
parent 6b930ae7e6
commit e285d8bbf4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 179 additions and 148 deletions

View File

@ -7,8 +7,8 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import std/[sequtils, sets, tables] import std/[sequtils, sets, hashes, tables]
import chronos, chronicles, metrics import chronos, chronicles, metrics, bearssl
import ./pubsub, import ./pubsub,
./pubsubpeer, ./pubsubpeer,
./timedcache, ./timedcache,
@ -27,7 +27,17 @@ const FloodSubCodec* = "/floodsub/1.0.0"
type type
FloodSub* = ref object of PubSub FloodSub* = ref object of PubSub
floodsub*: PeerTable # topic to remote peer map floodsub*: PeerTable # topic to remote peer map
seen*: TimedCache[MessageID] # list of messages forwarded to peers seen*: TimedCache[MessageID] # message id:s already seen on the network
seenSalt*: seq[byte]
proc hasSeen*(f: FloodSub, msgId: MessageID): bool =
f.seenSalt & msgId in f.seen
proc addSeen*(f: FloodSub, msgId: MessageID): bool =
# Salting the seen hash helps avoid attacks against the hash function used
# in the nim hash table
# Return true if the message has already been seen
f.seen.put(f.seenSalt & msgId)
method subscribeTopic*(f: FloodSub, method subscribeTopic*(f: FloodSub,
topic: string, topic: string,
@ -88,7 +98,7 @@ method rpcHandler*(f: FloodSub,
for msg in rpcMsg.messages: # for every message for msg in rpcMsg.messages: # for every message
let msgId = f.msgIdProvider(msg) let msgId = f.msgIdProvider(msg)
if f.seen.put(msgId): if f.addSeen(msgId):
trace "Dropping already-seen message", msgId, peer trace "Dropping already-seen message", msgId, peer
continue continue
@ -118,13 +128,15 @@ method rpcHandler*(f: FloodSub,
var toSendPeers = initHashSet[PubSubPeer]() var toSendPeers = initHashSet[PubSubPeer]()
for t in msg.topicIDs: # for every topic in the message for t in msg.topicIDs: # for every topic in the message
if t notin f.topics:
continue
f.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) f.floodsub.withValue(t, peers): toSendPeers.incl(peers[])
await handleData(f, t, msg.data) await handleData(f, t, msg.data)
# In theory, if topics are the same in all messages, we could batch - we'd # In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages # also have to be careful to only include validated messages
f.broadcast(toSeq(toSendPeers), RPCMsg(messages: @[msg])) f.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
trace "Forwared message to peers", peers = toSendPeers.len trace "Forwared message to peers", peers = toSendPeers.len
method init*(f: FloodSub) = method init*(f: FloodSub) =
@ -157,7 +169,7 @@ method publish*(f: FloodSub,
debug "Empty topic, skipping publish", topic debug "Empty topic, skipping publish", topic
return 0 return 0
let peers = toSeq(f.floodsub.getOrDefault(topic)) let peers = f.floodsub.getOrDefault(topic)
if peers.len == 0: if peers.len == 0:
debug "No peers for topic, skipping publish", topic debug "No peers for topic, skipping publish", topic
@ -175,7 +187,7 @@ method publish*(f: FloodSub,
trace "Created new message", trace "Created new message",
msg = shortLog(msg), peers = peers.len, topic, msgId msg = shortLog(msg), peers = peers.len, topic, msgId
if f.seen.put(msgId): if f.addSeen(msgId):
# custom msgid providers might cause this # custom msgid providers might cause this
trace "Dropping already-seen message", msgId, topic trace "Dropping already-seen message", msgId, topic
return 0 return 0
@ -206,4 +218,8 @@ method unsubscribeAll*(f: FloodSub, topic: string) =
method initPubSub*(f: FloodSub) = method initPubSub*(f: FloodSub) =
procCall PubSub(f).initPubSub() procCall PubSub(f).initPubSub()
f.seen = TimedCache[MessageID].init(2.minutes) f.seen = TimedCache[MessageID].init(2.minutes)
var rng = newRng()
f.seenSalt = newSeqUninitialized[byte](sizeof(Hash))
brHmacDrbgGenerate(rng[], f.seenSalt)
f.init() f.init()

View File

@ -8,7 +8,7 @@
## those terms. ## those terms.
import std/[tables, sets, options, sequtils, random] import std/[tables, sets, options, sequtils, random]
import chronos, chronicles, metrics, bearssl import chronos, chronicles, metrics
import ./pubsub, import ./pubsub,
./floodsub, ./floodsub,
./pubsubpeer, ./pubsubpeer,
@ -98,27 +98,6 @@ proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] =
else: else:
ok() ok()
proc init*(_: type[TopicParams]): TopicParams =
TopicParams(
topicWeight: 0.0, # disabled by default
timeInMeshWeight: 0.01,
timeInMeshQuantum: 1.seconds,
timeInMeshCap: 10.0,
firstMessageDeliveriesWeight: 1.0,
firstMessageDeliveriesDecay: 0.5,
firstMessageDeliveriesCap: 10.0,
meshMessageDeliveriesWeight: -1.0,
meshMessageDeliveriesDecay: 0.5,
meshMessageDeliveriesCap: 10,
meshMessageDeliveriesThreshold: 1,
meshMessageDeliveriesWindow: 5.milliseconds,
meshMessageDeliveriesActivation: 10.seconds,
meshFailurePenaltyWeight: -1.0,
meshFailurePenaltyDecay: 0.5,
invalidMessageDeliveriesWeight: -1.0,
invalidMessageDeliveriesDecay: 0.5
)
proc validateParameters*(parameters: TopicParams): Result[void, cstring] = proc validateParameters*(parameters: TopicParams): Result[void, cstring] =
if parameters.timeInMeshWeight <= 0.0 or parameters.timeInMeshWeight > 1.0: if parameters.timeInMeshWeight <= 0.0 or parameters.timeInMeshWeight > 1.0:
err("gossipsub: timeInMeshWeight parameter error, Must be a small positive value") err("gossipsub: timeInMeshWeight parameter error, Must be a small positive value")
@ -262,6 +241,37 @@ method subscribeTopic*(g: GossipSub,
trace "gossip peers", peers = g.gossipsub.peers(topic), topic trace "gossip peers", peers = g.gossipsub.peers(topic), topic
proc handleControl(g: GossipSub, peer: PubSubPeer, rpcMsg: RPCMsg) =
if rpcMsg.control.isSome:
let control = rpcMsg.control.get()
g.handlePrune(peer, control.prune)
var respControl: ControlMessage
respControl.iwant.add(g.handleIHave(peer, control.ihave))
respControl.prune.add(g.handleGraft(peer, control.graft))
let messages = g.handleIWant(peer, control.iwant)
if respControl.graft.len > 0 or respControl.prune.len > 0 or
respControl.ihave.len > 0 or messages.len > 0:
# iwant and prunes from here, also messages
for smsg in messages:
for topic in smsg.topicIDs:
if g.knownTopics.contains(topic):
libp2p_pubsub_broadcast_messages.inc(labelValues = [topic])
else:
libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"])
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)
for prune in respControl.prune:
if g.knownTopics.contains(prune.topicID):
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicID])
else:
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])
trace "sending control message", msg = shortLog(respControl), peer
g.send(
peer,
RPCMsg(control: some(respControl), messages: messages))
method rpcHandler*(g: GossipSub, method rpcHandler*(g: GossipSub,
peer: PubSubPeer, peer: PubSubPeer,
rpcMsg: RPCMsg) {.async.} = rpcMsg: RPCMsg) {.async.} =
@ -283,26 +293,12 @@ method rpcHandler*(g: GossipSub,
# avoid the remote peer from controlling the seen table hashing # avoid the remote peer from controlling the seen table hashing
# by adding random bytes to the ID we ensure we randomize the IDs # by adding random bytes to the ID we ensure we randomize the IDs
# we do only for seen as this is the great filter from the external world # we do only for seen as this is the great filter from the external world
if g.seen.put(msgId & g.randomBytes): if g.addSeen(msgId):
trace "Dropping already-seen message", msgId = shortLog(msgId), peer trace "Dropping already-seen message", msgId = shortLog(msgId), peer
# make sure to update score tho before continuing # make sure to update score tho before continuing
for t in msg.topicIDs: # TODO: take into account meshMessageDeliveriesWindow
if t notin g.topics: # score only if messages are not too old.
continue g.rewardDelivered(peer, msg.topicIDs, false)
# for every topic in the message
let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init())
# if in mesh add more delivery score
g.withPeerStats(peer.peerId) do (stats: var PeerStats):
stats.topicInfos.withValue(t, tstats):
if tstats[].inMesh:
# TODO: take into account meshMessageDeliveriesWindow
# score only if messages are not too old.
tstats[].meshMessageDeliveries += 1
if tstats[].meshMessageDeliveries > topicParams.meshMessageDeliveriesCap:
tstats[].meshMessageDeliveries = topicParams.meshMessageDeliveriesCap
do: # make sure we don't loose this information
stats.topicInfos[t] = TopicInfo(meshMessageDeliveries: 1)
# onto the next message # onto the next message
continue continue
@ -346,28 +342,13 @@ method rpcHandler*(g: GossipSub,
# store in cache only after validation # store in cache only after validation
g.mcache.put(msgId, msg) g.mcache.put(msgId, msg)
g.rewardDelivered(peer, msg.topicIDs, true)
var toSendPeers = initHashSet[PubSubPeer]() var toSendPeers = initHashSet[PubSubPeer]()
for t in msg.topicIDs: # for every topic in the message for t in msg.topicIDs: # for every topic in the message
if t notin g.topics: if t notin g.topics:
continue continue
let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init())
g.withPeerStats(peer.peerId) do(stats: var PeerStats):
stats.topicInfos.withValue(t, tstats):
# contribute to peer score first delivery
tstats[].firstMessageDeliveries += 1
if tstats[].firstMessageDeliveries > topicParams.firstMessageDeliveriesCap:
tstats[].firstMessageDeliveries = topicParams.firstMessageDeliveriesCap
# if in mesh add more delivery score
if tstats[].inMesh:
tstats[].meshMessageDeliveries += 1
if tstats[].meshMessageDeliveries > topicParams.meshMessageDeliveriesCap:
tstats[].meshMessageDeliveries = topicParams.meshMessageDeliveriesCap
do: # make sure we don't loose this information
stats.topicInfos[t] = TopicInfo(firstMessageDeliveries: 1, meshMessageDeliveries: 1)
g.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) g.floodsub.withValue(t, peers): toSendPeers.incl(peers[])
g.mesh.withValue(t, peers): toSendPeers.incl(peers[]) g.mesh.withValue(t, peers): toSendPeers.incl(peers[])
@ -375,44 +356,15 @@ method rpcHandler*(g: GossipSub,
# In theory, if topics are the same in all messages, we could batch - we'd # In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages # also have to be careful to only include validated messages
let sendingTo = toSeq(toSendPeers) g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
g.broadcast(sendingTo, RPCMsg(messages: @[msg])) trace "forwared message to peers", peers = toSendPeers.len, msgId, peer
trace "forwared message to peers", peers = sendingTo.len, msgId, peer
for topic in msg.topicIDs: for topic in msg.topicIDs:
if g.knownTopics.contains(topic): if g.knownTopics.contains(topic):
libp2p_pubsub_messages_rebroadcasted.inc(sendingTo.len.int64, labelValues = [topic]) libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = [topic])
else: else:
libp2p_pubsub_messages_rebroadcasted.inc(sendingTo.len.int64, labelValues = ["generic"]) libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = ["generic"])
if rpcMsg.control.isSome: g.handleControl(peer, rpcMsg)
let control = rpcMsg.control.get()
g.handlePrune(peer, control.prune)
var respControl: ControlMessage
respControl.iwant.add(g.handleIHave(peer, control.ihave))
respControl.prune.add(g.handleGraft(peer, control.graft))
let messages = g.handleIWant(peer, control.iwant)
if respControl.graft.len > 0 or respControl.prune.len > 0 or
respControl.ihave.len > 0 or messages.len > 0:
# iwant and prunes from here, also messages
for smsg in messages:
for topic in smsg.topicIDs:
if g.knownTopics.contains(topic):
libp2p_pubsub_broadcast_messages.inc(labelValues = [topic])
else:
libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"])
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)
for prune in respControl.prune:
if g.knownTopics.contains(prune.topicID):
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicID])
else:
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])
trace "sending control message", msg = shortLog(respControl), peer
g.send(
peer,
RPCMsg(control: some(respControl), messages: messages))
method subscribe*(g: GossipSub, method subscribe*(g: GossipSub,
topic: string, topic: string,
@ -437,7 +389,7 @@ proc unsubscribe*(g: GossipSub, topic: string) =
# remove mesh peers from gpeers, we send 2 different messages # remove mesh peers from gpeers, we send 2 different messages
gpeers = gpeers - mpeers gpeers = gpeers - mpeers
# send to peers NOT in mesh first # send to peers NOT in mesh first
g.broadcast(toSeq(gpeers), msg) g.broadcast(gpeers, msg)
for peer in mpeers: for peer in mpeers:
trace "pruning unsubscribeAll call peer", peer, score = peer.score trace "pruning unsubscribeAll call peer", peer, score = peer.score
@ -452,9 +404,9 @@ proc unsubscribe*(g: GossipSub, topic: string) =
backoff: g.parameters.pruneBackoff.seconds.uint64)])) backoff: g.parameters.pruneBackoff.seconds.uint64)]))
# send to peers IN mesh now # send to peers IN mesh now
g.broadcast(toSeq(mpeers), msg) g.broadcast(mpeers, msg)
else: else:
g.broadcast(toSeq(gpeers), msg) g.broadcast(gpeers, msg)
g.topicParams.del(topic) g.topicParams.del(topic)
@ -540,19 +492,19 @@ method publish*(g: GossipSub,
trace "Created new message", msg = shortLog(msg), peers = peers.len trace "Created new message", msg = shortLog(msg), peers = peers.len
if g.seen.put(msgId & g.randomBytes): if g.addSeen(msgId):
# custom msgid providers might cause this # custom msgid providers might cause this
trace "Dropping already-seen message" trace "Dropping already-seen message"
return 0 return 0
g.mcache.put(msgId, msg) g.mcache.put(msgId, msg)
let peerSeq = toSeq(peers) g.broadcast(peers, RPCMsg(messages: @[msg]))
g.broadcast(peerSeq, RPCMsg(messages: @[msg]))
if g.knownTopics.contains(topic): if g.knownTopics.contains(topic):
libp2p_pubsub_messages_published.inc(peerSeq.len.int64, labelValues = [topic]) libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic])
else: else:
libp2p_pubsub_messages_published.inc(peerSeq.len.int64, labelValues = ["generic"]) libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = ["generic"])
trace "Published message to peers" trace "Published message to peers"
@ -618,6 +570,3 @@ method initPubSub*(g: GossipSub) =
# init gossip stuff # init gossip stuff
g.mcache = MCache.init(g.parameters.historyGossip, g.parameters.historyLength) g.mcache = MCache.init(g.parameters.historyGossip, g.parameters.historyLength)
var rng = newRng()
g.randomBytes = newSeqUninitialized[byte](32)
brHmacDrbgGenerate(rng[], g.randomBytes)

View File

@ -13,7 +13,7 @@ import std/[tables, sequtils, sets, algorithm]
import random # for shuffle import random # for shuffle
import chronos, chronicles, metrics import chronos, chronicles, metrics
import "."/[types, scoring] import "."/[types, scoring]
import ".."/[pubsubpeer, peertable, timedcache, mcache, pubsub] import ".."/[pubsubpeer, peertable, timedcache, mcache, floodsub, pubsub]
import "../rpc"/[messages] import "../rpc"/[messages]
import "../../.."/[peerid, multiaddress, utility, switch] import "../../.."/[peerid, multiaddress, utility, switch]
@ -198,11 +198,10 @@ proc handleIHave*(g: GossipSub,
if ihave.topicID in g.mesh: if ihave.topicID in g.mesh:
# also avoid duplicates here! # also avoid duplicates here!
let deIhavesMsgs = ihave.messageIDs.deduplicate() let deIhavesMsgs = ihave.messageIDs.deduplicate()
for m in deIhavesMsgs: for msgId in deIhavesMsgs:
let msgId = m & g.randomBytes if not g.hasSeen(msgId):
if msgId notin g.seen:
if peer.iHaveBudget > 0: if peer.iHaveBudget > 0:
result.messageIDs.add(m) result.messageIDs.add(msgId)
dec peer.iHaveBudget dec peer.iHaveBudget
else: else:
return return

View File

@ -25,6 +25,27 @@ declareGauge(libp2p_gossipsub_peers_score_appScore, "Detailed gossipsub scoring
declareGauge(libp2p_gossipsub_peers_score_behaviourPenalty, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_behaviourPenalty, "Detailed gossipsub scoring metric", labels = ["agent"])
declareGauge(libp2p_gossipsub_peers_score_colocationFactor, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_colocationFactor, "Detailed gossipsub scoring metric", labels = ["agent"])
proc init*(_: type[TopicParams]): TopicParams =
TopicParams(
topicWeight: 0.0, # disabled by default
timeInMeshWeight: 0.01,
timeInMeshQuantum: 1.seconds,
timeInMeshCap: 10.0,
firstMessageDeliveriesWeight: 1.0,
firstMessageDeliveriesDecay: 0.5,
firstMessageDeliveriesCap: 10.0,
meshMessageDeliveriesWeight: -1.0,
meshMessageDeliveriesDecay: 0.5,
meshMessageDeliveriesCap: 10,
meshMessageDeliveriesThreshold: 1,
meshMessageDeliveriesWindow: 5.milliseconds,
meshMessageDeliveriesActivation: 10.seconds,
meshFailurePenaltyWeight: -1.0,
meshFailurePenaltyDecay: 0.5,
invalidMessageDeliveriesWeight: -1.0,
invalidMessageDeliveriesDecay: 0.5
)
proc withPeerStats*( proc withPeerStats*(
g: GossipSub, peerId: PeerId, g: GossipSub, peerId: PeerId,
action: proc (stats: var PeerStats) {.gcsafe, raises: [Defect].}) = action: proc (stats: var PeerStats) {.gcsafe, raises: [Defect].}) =
@ -274,3 +295,26 @@ proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, topics: seq[string])
# update stats # update stats
g.withPeerStats(peer.peerId) do (stats: var PeerStats): g.withPeerStats(peer.peerId) do (stats: var PeerStats):
stats.topicInfos.mgetOrPut(t, TopicInfo()).invalidMessageDeliveries += 1 stats.topicInfos.mgetOrPut(t, TopicInfo()).invalidMessageDeliveries += 1
proc addCapped*[T](stat: var T, diff, cap: T) =
stat += min(diff, cap - stat)
proc rewardDelivered*(
g: GossipSub, peer: PubSubPeer, topics: openArray[string], first: bool) =
for t in topics:
if t notin g.topics:
continue
let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init())
# if in mesh add more delivery score
g.withPeerStats(peer.peerId) do (stats: var PeerStats):
stats.topicInfos.withValue(t, tstats):
if tstats[].inMesh:
if first:
tstats[].firstMessageDeliveries.addCapped(
1, topicParams.firstMessageDeliveriesCap)
tstats[].meshMessageDeliveries.addCapped(
1, topicParams.meshMessageDeliveriesCap)
do: # make sure we don't loose this information
stats.topicInfos[t] = TopicInfo(meshMessageDeliveries: 1)

View File

@ -150,8 +150,6 @@ type
heartbeatEvents*: seq[AsyncEvent] heartbeatEvents*: seq[AsyncEvent]
randomBytes*: seq[byte]
MeshMetrics* = object MeshMetrics* = object
# scratch buffers for metrics # scratch buffers for metrics
otherPeersPerTopicMesh*: int64 otherPeersPerTopicMesh*: int64

View File

@ -9,8 +9,8 @@
import std/[tables, sequtils, sets, strutils] import std/[tables, sequtils, sets, strutils]
import chronos, chronicles, metrics import chronos, chronicles, metrics
import pubsubpeer, import ./pubsubpeer,
rpc/[message, messages], ./rpc/[message, messages, protobuf],
../../switch, ../../switch,
../protocol, ../protocol,
../../stream/connection, ../../stream/connection,
@ -128,7 +128,7 @@ proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) {.raises: [Defect].} =
proc broadcast*( proc broadcast*(
p: PubSub, p: PubSub,
sendPeers: openArray[PubSubPeer], sendPeers: auto, # Iteratble[PubSubPeer]
msg: RPCMsg) {.raises: [Defect].} = msg: RPCMsg) {.raises: [Defect].} =
## Attempt to send `msg` to the given peers ## Attempt to send `msg` to the given peers
@ -174,8 +174,15 @@ proc broadcast*(
trace "broadcasting messages to peers", trace "broadcasting messages to peers",
peers = sendPeers.len, msg = shortLog(msg) peers = sendPeers.len, msg = shortLog(msg)
for peer in sendPeers:
p.send(peer, msg) if anyIt(sendPeers, it.hasObservers):
for peer in sendPeers:
p.send(peer, msg)
else:
# Fast path that only encodes message once
let encoded = encodeRpcMsg(msg, p.anonymize)
for peer in sendPeers:
peer.sendEncoded(encoded)
proc sendSubs*(p: PubSub, proc sendSubs*(p: PubSub,
peer: PubSubPeer, peer: PubSubPeer,
@ -205,7 +212,7 @@ method subscribeTopic*(p: PubSub,
method rpcHandler*(p: PubSub, method rpcHandler*(p: PubSub,
peer: PubSubPeer, peer: PubSubPeer,
rpcMsg: RPCMsg) {.async, base.} = rpcMsg: RPCMsg): Future[void] {.base.} =
## handle rpc messages ## handle rpc messages
trace "processing RPC message", msg = rpcMsg.shortLog, peer trace "processing RPC message", msg = rpcMsg.shortLog, peer
for i in 0..<min(rpcMsg.subscriptions.len, p.topicsHigh): for i in 0..<min(rpcMsg.subscriptions.len, p.topicsHigh):
@ -253,6 +260,12 @@ method rpcHandler*(p: PubSub,
else: else:
libp2p_pubsub_received_prune.inc(labelValues = ["generic"]) libp2p_pubsub_received_prune.inc(labelValues = ["generic"])
# Avoid async transformation to avoid copying of rpcMsg into closure - this
# is an unnecessary hotspot in gossip
var res = newFuture[void]("PubSub.rpcHandler")
res.complete()
return res
method onNewPeer(p: PubSub, peer: PubSubPeer) {.base.} = discard method onNewPeer(p: PubSub, peer: PubSubPeer) {.base.} = discard
method onPubSubPeerEvent*(p: PubSub, peer: PubsubPeer, event: PubsubPeerEvent) {.base, gcsafe.} = method onPubSubPeerEvent*(p: PubSub, peer: PubsubPeer, event: PubsubPeerEvent) {.base, gcsafe.} =
@ -306,6 +319,7 @@ proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] {.asyn
# gather all futures without yielding to scheduler # gather all futures without yielding to scheduler
var futs = p.topics[topic].handler.mapIt(it(topic, data)) var futs = p.topics[topic].handler.mapIt(it(topic, data))
if futs.len() == 0: return # No handlers
try: try:
futs = await allFinished(futs) futs = await allFinished(futs)
@ -488,8 +502,8 @@ method validate*(p: PubSub, message: Message): Future[ValidationResult] {.async,
registered = toSeq(p.validators.keys) registered = toSeq(p.validators.keys)
if topic in p.validators: if topic in p.validators:
trace "running validators for topic", topicID = topic trace "running validators for topic", topicID = topic
# TODO: add timeout to validator for validator in p.validators[topic]:
pending.add(p.validators[topic].mapIt(it(topic, message))) pending.add(validator(topic, message))
result = ValidationResult.Accept result = ValidationResult.Accept
let futs = await allFinished(pending) let futs = await allFinished(pending)
@ -500,7 +514,8 @@ method validate*(p: PubSub, message: Message): Future[ValidationResult] {.async,
let res = fut.read() let res = fut.read()
if res != ValidationResult.Accept: if res != ValidationResult.Accept:
result = res result = res
break if res == ValidationResult.Reject:
break
case result case result
of ValidationResult.Accept: of ValidationResult.Accept:

View File

@ -81,7 +81,7 @@ proc connected*(p: PubSubPeer): bool =
not p.sendConn.isNil and not not p.sendConn.isNil and not
(p.sendConn.closed or p.sendConn.atEof) (p.sendConn.closed or p.sendConn.atEof)
proc hasObservers(p: PubSubPeer): bool = proc hasObservers*(p: PubSubPeer): bool =
p.observers != nil and anyIt(p.observers[], it != nil) p.observers != nil and anyIt(p.observers[], it != nil)
func outbound*(p: PubSubPeer): bool = func outbound*(p: PubSubPeer): bool =
@ -226,14 +226,26 @@ template sendMetrics(msg: RPCMsg): untyped =
# metrics # metrics
libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t]) libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t])
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [Defect].} = proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [Defect].} =
doAssert(not isNil(p), "pubsubpeer nil!") doAssert(not isNil(p), "pubsubpeer nil!")
if msg.len <= 0:
debug "empty message, skipping", p, msg = shortLog(msg)
return
let conn = p.sendConn let conn = p.sendConn
if conn == nil or conn.closed(): if conn == nil or conn.closed():
trace "No send connection, skipping message", p, msg trace "No send connection, skipping message", p, msg = shortLog(msg)
return return
# To limit the size of the closure, we only pass the encoded message and
# connection to the spawned send task
asyncSpawn(try:
sendImpl(conn, msg)
except Exception as exc: # TODO chronos Exception
raiseAssert exc.msg)
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [Defect].} =
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg) trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
# When sending messages, we take care to re-encode them with the right # When sending messages, we take care to re-encode them with the right
@ -253,16 +265,7 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [Defect].} =
sendMetrics(msg) sendMetrics(msg)
encodeRpcMsg(msg, anonymize) encodeRpcMsg(msg, anonymize)
if encoded.len <= 0: p.sendEncoded(encoded)
debug "empty message, skipping", p, msg
return
# To limit the size of the closure, we only pass the encoded message and
# connection to the spawned send task
asyncSpawn(try:
sendImpl(conn, encoded)
except Exception as exc: # TODO chronos Exception
raiseAssert exc.msg)
proc newPubSubPeer*(peerId: PeerID, proc newPubSubPeer*(peerId: PeerID,
getConn: GetConn, getConn: GetConn,

View File

@ -8,6 +8,7 @@
## those terms. ## those terms.
import options import options
import stew/assign2
import chronicles import chronicles
import messages, import messages,
../../../peerid, ../../../peerid,
@ -116,7 +117,7 @@ proc encodeMessage*(msg: Message, anonymize: bool): seq[byte] =
when defined(libp2p_protobuf_metrics): when defined(libp2p_protobuf_metrics):
libp2p_pubsub_rpc_bytes_write.inc(pb.getLen().int64, labelValues = ["message"]) libp2p_pubsub_rpc_bytes_write.inc(pb.getLen().int64, labelValues = ["message"])
pb.buffer pb.buffer
proc write*(pb: var ProtoBuffer, field: int, msg: Message, anonymize: bool) = proc write*(pb: var ProtoBuffer, field: int, msg: Message, anonymize: bool) =
@ -320,8 +321,8 @@ proc encodeRpcMsg*(msg: RPCMsg, anonymize: bool): seq[byte] =
proc decodeRpcMsg*(msg: seq[byte]): ProtoResult[RPCMsg] {.inline.} = proc decodeRpcMsg*(msg: seq[byte]): ProtoResult[RPCMsg] {.inline.} =
trace "decodeRpcMsg: decoding message", msg = msg.shortLog() trace "decodeRpcMsg: decoding message", msg = msg.shortLog()
var pb = initProtoBuffer(msg) var pb = initProtoBuffer(msg)
var rpcMsg: RPCMsg var rpcMsg = ok(RPCMsg())
rpcMsg.messages = ? pb.decodeMessages() assign(rpcMsg.get().messages, ? pb.decodeMessages())
rpcMsg.subscriptions = ? pb.decodeSubscriptions() assign(rpcMsg.get().subscriptions, ? pb.decodeSubscriptions())
rpcMsg.control = ? pb.decodeControl() assign(rpcMsg.get().control, ? pb.decodeControl())
ok(rpcMsg) rpcMsg

View File

@ -182,7 +182,8 @@ proc readLine*(s: LPStream,
while true: while true:
var ch: char var ch: char
await readExactly(s, addr ch, 1) if (await readOnce(s, addr ch, 1)) == 0:
raise newLPStreamEOFError()
if sep[state] == ch: if sep[state] == ch:
inc(state) inc(state)
@ -202,12 +203,15 @@ proc readLine*(s: LPStream,
proc readVarint*(conn: LPStream): Future[uint64] {.async, gcsafe.} = proc readVarint*(conn: LPStream): Future[uint64] {.async, gcsafe.} =
var var
varint: uint64
length: int
buffer: array[10, byte] buffer: array[10, byte]
for i in 0..<len(buffer): for i in 0..<len(buffer):
await conn.readExactly(addr buffer[i], 1) if (await conn.readOnce(addr buffer[i], 1)) == 0:
raise newLPStreamEOFError()
var
varint: uint64
length: int
let res = PB.getUVarint(buffer.toOpenArray(0, i), length, varint) let res = PB.getUVarint(buffer.toOpenArray(0, i), length, varint)
if res.isOk(): if res.isOk():
return varint return varint

View File

@ -25,6 +25,8 @@ import utils, ../../libp2p/[errors,
protocols/pubsub/rpc/messages] protocols/pubsub/rpc/messages]
import ../helpers import ../helpers
proc `$`(peer: PubSubPeer): string = shortLog(peer)
proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} = proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
if sender == receiver: if sender == receiver:
return return