count published messages (#224)
This commit is contained in:
parent
224c2613e3
commit
fe828d87d8
|
@ -92,6 +92,7 @@ method rpcHandler*(f: FloodSub,
|
||||||
for p in toSendPeers:
|
for p in toSendPeers:
|
||||||
if p in f.peers and f.peers[p].id != peer.id:
|
if p in f.peers and f.peers[p].id != peer.id:
|
||||||
sent.add(f.peers[p].send(@[RPCMsg(messages: m.messages)]))
|
sent.add(f.peers[p].send(@[RPCMsg(messages: m.messages)]))
|
||||||
|
|
||||||
# wait for all the futures now
|
# wait for all the futures now
|
||||||
sent = await allFinished(sent)
|
sent = await allFinished(sent)
|
||||||
checkFutures(sent)
|
checkFutures(sent)
|
||||||
|
@ -133,10 +134,13 @@ method publish*(f: FloodSub,
|
||||||
for p in f.floodsub[topic]:
|
for p in f.floodsub[topic]:
|
||||||
trace "publishing message", name = topic, peer = p, data = data.shortLog
|
trace "publishing message", name = topic, peer = p, data = data.shortLog
|
||||||
sent.add(f.peers[p].send(@[RPCMsg(messages: @[msg])]))
|
sent.add(f.peers[p].send(@[RPCMsg(messages: @[msg])]))
|
||||||
|
|
||||||
# wait for all the futures now
|
# wait for all the futures now
|
||||||
sent = await allFinished(sent)
|
sent = await allFinished(sent)
|
||||||
checkFutures(sent)
|
checkFutures(sent)
|
||||||
|
|
||||||
|
libp2p_pubsub_messages_published.inc(labelValues = [topic])
|
||||||
|
|
||||||
method unsubscribe*(f: FloodSub,
|
method unsubscribe*(f: FloodSub,
|
||||||
topics: seq[TopicPair]) {.async.} =
|
topics: seq[TopicPair]) {.async.} =
|
||||||
await procCall PubSub(f).unsubscribe(topics)
|
await procCall PubSub(f).unsubscribe(topics)
|
||||||
|
|
|
@ -480,6 +480,8 @@ method publish*(g: GossipSub,
|
||||||
sent.add(g.peers[p].send(@[RPCMsg(messages: @[msg])]))
|
sent.add(g.peers[p].send(@[RPCMsg(messages: @[msg])]))
|
||||||
checkFutures(await allFinished(sent))
|
checkFutures(await allFinished(sent))
|
||||||
|
|
||||||
|
libp2p_pubsub_messages_published.inc(labelValues = [topic])
|
||||||
|
|
||||||
method start*(g: GossipSub) {.async.} =
|
method start*(g: GossipSub) {.async.} =
|
||||||
## start pubsub
|
## start pubsub
|
||||||
## start long running/repeating procedures
|
## start long running/repeating procedures
|
||||||
|
|
|
@ -26,6 +26,7 @@ declareGauge(libp2p_pubsub_peers, "pubsub peer instances")
|
||||||
declareGauge(libp2p_pubsub_topics, "pubsub subscribed topics")
|
declareGauge(libp2p_pubsub_topics, "pubsub subscribed topics")
|
||||||
declareCounter(libp2p_pubsub_validation_success, "pubsub successfully validated messages")
|
declareCounter(libp2p_pubsub_validation_success, "pubsub successfully validated messages")
|
||||||
declareCounter(libp2p_pubsub_validation_failure, "pubsub failed validated messages")
|
declareCounter(libp2p_pubsub_validation_failure, "pubsub failed validated messages")
|
||||||
|
declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"])
|
||||||
declareGauge(libp2p_pubsub_peers_per_topic, "pubsub peers per topic", labels = ["topic"])
|
declareGauge(libp2p_pubsub_peers_per_topic, "pubsub peers per topic", labels = ["topic"])
|
||||||
|
|
||||||
type
|
type
|
||||||
|
|
|
@ -12,6 +12,7 @@ import chronos, chronicles, metrics
|
||||||
import connection,
|
import connection,
|
||||||
transports/transport,
|
transports/transport,
|
||||||
multistream,
|
multistream,
|
||||||
|
multiaddress,
|
||||||
protocols/protocol,
|
protocols/protocol,
|
||||||
protocols/secure/secure,
|
protocols/secure/secure,
|
||||||
protocols/secure/plaintext, # for plain text
|
protocols/secure/plaintext, # for plain text
|
||||||
|
|
Loading…
Reference in New Issue