diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 517bb98..4e6055c 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -92,6 +92,7 @@ method rpcHandler*(f: FloodSub, for p in toSendPeers: if p in f.peers and f.peers[p].id != peer.id: sent.add(f.peers[p].send(@[RPCMsg(messages: m.messages)])) + # wait for all the futures now sent = await allFinished(sent) checkFutures(sent) @@ -133,10 +134,13 @@ method publish*(f: FloodSub, for p in f.floodsub[topic]: trace "publishing message", name = topic, peer = p, data = data.shortLog sent.add(f.peers[p].send(@[RPCMsg(messages: @[msg])])) + # wait for all the futures now sent = await allFinished(sent) checkFutures(sent) + libp2p_pubsub_messages_published.inc(labelValues = [topic]) + method unsubscribe*(f: FloodSub, topics: seq[TopicPair]) {.async.} = await procCall PubSub(f).unsubscribe(topics) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index e68d50d..e3c8eb4 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -480,6 +480,8 @@ method publish*(g: GossipSub, sent.add(g.peers[p].send(@[RPCMsg(messages: @[msg])])) checkFutures(await allFinished(sent)) + libp2p_pubsub_messages_published.inc(labelValues = [topic]) + method start*(g: GossipSub) {.async.} = ## start pubsub ## start long running/repeating procedures diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 0667af9..03fb960 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -26,6 +26,7 @@ declareGauge(libp2p_pubsub_peers, "pubsub peer instances") declareGauge(libp2p_pubsub_topics, "pubsub subscribed topics") declareCounter(libp2p_pubsub_validation_success, "pubsub successfully validated messages") declareCounter(libp2p_pubsub_validation_failure, "pubsub failed validated messages") +declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"]) declareGauge(libp2p_pubsub_peers_per_topic, "pubsub peers per topic", labels = ["topic"]) type diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 1319870..63025d2 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -12,6 +12,7 @@ import chronos, chronicles, metrics import connection, transports/transport, multistream, + multiaddress, protocols/protocol, protocols/secure/secure, protocols/secure/plaintext, # for plain text