put expensive metrics under a Nim define (#310)

This commit is contained in:
Ștefan Talpalaru 2020-08-05 01:27:59 +02:00 committed by GitHub
parent cf2b42b914
commit 843d32f8db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 67 additions and 47 deletions

View File

@ -68,6 +68,7 @@ Please read the [GETTING_STARTED.md](docs/GETTING_STARTED.md) guide.
### Tutorials and Examples ### Tutorials and Examples
Example code can be found in the [examples folder](/examples). Example code can be found in the [examples folder](/examples).
#### Direct Chat Tutorial #### Direct Chat Tutorial
- [Part I](https://our.status.im/nim-libp2p-tutorial-a-peer-to-peer-chat-example-1/): Set up the main function and use multi-thread for processing IO. - [Part I](https://our.status.im/nim-libp2p-tutorial-a-peer-to-peer-chat-example-1/): Set up the main function and use multi-thread for processing IO.
- [Part II](https://our.status.im/nim-libp2p-tutorial-a-peer-to-peer-chat-example-2/): Dial remote peer and allow customized user input commands. - [Part II](https://our.status.im/nim-libp2p-tutorial-a-peer-to-peer-chat-example-2/): Dial remote peer and allow customized user input commands.
@ -148,6 +149,13 @@ Packages that exist in the original libp2p specs and are under active developmen
** Note that the current stack reflects the minimal requirements for the upcoming Eth2 implementation. ** Note that the current stack reflects the minimal requirements for the upcoming Eth2 implementation.
### Tips and tricks
- enable expensive metrics:
```bash
nim c -d:libp2p_expensive_metrics some_file.nim
```
## Contribute ## Contribute
The libp2p implementation in Nim is a work in progress. We welcome contributors to help out! Specifically, you can: The libp2p implementation in Nim is a work in progress. We welcome contributors to help out! Specifically, you can:
@ -168,4 +176,5 @@ or
* Apache License, Version 2.0, ([LICENSE-APACHEv2](LICENSE-APACHEv2) or http://www.apache.org/licenses/LICENSE-2.0) * Apache License, Version 2.0, ([LICENSE-APACHEv2](LICENSE-APACHEv2) or http://www.apache.org/licenses/LICENSE-2.0)
at your option. This file may not be copied, modified, or distributed except according to those terms. at your option. These files may not be copied, modified, or distributed except according to those terms.

View File

@ -23,7 +23,8 @@ export muxer
logScope: logScope:
topics = "mplex" topics = "mplex"
declareGauge(libp2p_mplex_channels, "mplex channels", labels = ["initiator", "peer"]) when defined(libp2p_expensive_metrics):
declareGauge(libp2p_mplex_channels, "mplex channels", labels = ["initiator", "peer"])
type type
Mplex* = ref object of Muxer Mplex* = ref object of Muxer
@ -76,10 +77,11 @@ proc newStreamInternal*(m: Mplex,
"channel slot already taken!") "channel slot already taken!")
m.getChannelList(initiator)[id] = result m.getChannelList(initiator)[id] = result
libp2p_mplex_channels.set( when defined(libp2p_expensive_metrics):
m.getChannelList(initiator).len.int64, libp2p_mplex_channels.set(
labelValues = [$initiator, m.getChannelList(initiator).len.int64,
$m.connection.peerInfo]) labelValues = [$initiator,
$m.connection.peerInfo])
proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} = proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} =
## remove the local channel from the internal tables ## remove the local channel from the internal tables
@ -89,10 +91,11 @@ proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} =
m.getChannelList(chann.initiator).del(chann.id) m.getChannelList(chann.initiator).del(chann.id)
trace "cleaned up channel", id = chann.id trace "cleaned up channel", id = chann.id
libp2p_mplex_channels.set( when defined(libp2p_expensive_metrics):
m.getChannelList(chann.initiator).len.int64, libp2p_mplex_channels.set(
labelValues = [$chann.initiator, m.getChannelList(chann.initiator).len.int64,
$m.connection.peerInfo]) labelValues = [$chann.initiator,
$m.connection.peerInfo])
proc handleStream(m: Mplex, chann: LPChannel) {.async.} = proc handleStream(m: Mplex, chann: LPChannel) {.async.} =
## call the muxer stream handler for this channel ## call the muxer stream handler for this channel

View File

@ -140,7 +140,8 @@ method publish*(f: FloodSub,
# start the future but do not wait yet # start the future but do not wait yet
let published = await f.publishHelper(f.floodsub.getOrDefault(topic), @[msg], timeout) let published = await f.publishHelper(f.floodsub.getOrDefault(topic), @[msg], timeout)
libp2p_pubsub_messages_published.inc(labelValues = [topic]) when defined(libp2p_expensive_metrics):
libp2p_pubsub_messages_published.inc(labelValues = [topic])
trace "published message to peers", peers = published, trace "published message to peers", peers = published,
msg = msg.shortLog() msg = msg.shortLog()

View File

@ -518,8 +518,9 @@ method publish*(g: GossipSub,
g.mcache.put(msgId, msg) g.mcache.put(msgId, msg)
let published = await g.publishHelper(peers, @[msg], timeout) let published = await g.publishHelper(peers, @[msg], timeout)
if published > 0: when defined(libp2p_expensive_metrics):
libp2p_pubsub_messages_published.inc(labelValues = [topic]) if published > 0:
libp2p_pubsub_messages_published.inc(labelValues = [topic])
trace "published message to peers", peers = published, trace "published message to peers", peers = published,
msg = msg.shortLog() msg = msg.shortLog()

View File

@ -27,7 +27,8 @@ declareGauge(libp2p_pubsub_peers, "pubsub peer instances")
declareGauge(libp2p_pubsub_topics, "pubsub subscribed topics") declareGauge(libp2p_pubsub_topics, "pubsub subscribed topics")
declareCounter(libp2p_pubsub_validation_success, "pubsub successfully validated messages") declareCounter(libp2p_pubsub_validation_success, "pubsub successfully validated messages")
declareCounter(libp2p_pubsub_validation_failure, "pubsub failed validated messages") declareCounter(libp2p_pubsub_validation_failure, "pubsub failed validated messages")
declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"]) when defined(libp2p_expensive_metrics):
declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"])
type type
TopicHandler* = proc(topic: string, TopicHandler* = proc(topic: string,

View File

@ -21,10 +21,11 @@ import rpc/[messages, message, protobuf],
logScope: logScope:
topics = "pubsubpeer" topics = "pubsubpeer"
declareCounter(libp2p_pubsub_sent_messages, "number of messages sent", labels = ["id", "topic"]) when defined(libp2p_expensive_metrics):
declareCounter(libp2p_pubsub_received_messages, "number of messages received", labels = ["id", "topic"]) declareCounter(libp2p_pubsub_sent_messages, "number of messages sent", labels = ["id", "topic"])
declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"]) declareCounter(libp2p_pubsub_received_messages, "number of messages received", labels = ["id", "topic"])
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"]) declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"])
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"])
const const
DefaultReadTimeout* = 1.minutes DefaultReadTimeout* = 1.minutes
@ -91,7 +92,8 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
let digest = $(sha256.digest(data)) let digest = $(sha256.digest(data))
trace "read data from peer", data = data.shortLog trace "read data from peer", data = data.shortLog
if digest in p.recvdRpcCache: if digest in p.recvdRpcCache:
libp2p_pubsub_skipped_received_messages.inc(labelValues = [p.id]) when defined(libp2p_expensive_metrics):
libp2p_pubsub_skipped_received_messages.inc(labelValues = [p.id])
trace "message already received, skipping" trace "message already received, skipping"
continue continue
@ -106,10 +108,11 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
# trigger hooks # trigger hooks
p.recvObservers(msg) p.recvObservers(msg)
for m in msg.messages: when defined(libp2p_expensive_metrics):
for t in m.topicIDs: for m in msg.messages:
# metrics for t in m.topicIDs:
libp2p_pubsub_received_messages.inc(labelValues = [p.id, t]) # metrics
libp2p_pubsub_received_messages.inc(labelValues = [p.id, t])
await p.handler(p, @[msg]) await p.handler(p, @[msg])
p.recvdRpcCache.put(digest) p.recvdRpcCache.put(digest)
@ -148,7 +151,8 @@ proc send*(
let digest = $(sha256.digest(encoded)) let digest = $(sha256.digest(encoded))
if digest in p.sentRpcCache: if digest in p.sentRpcCache:
trace "message already sent to peer, skipping" trace "message already sent to peer, skipping"
libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id]) when defined(libp2p_expensive_metrics):
libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id])
return return
proc sendToRemote() {.async.} = proc sendToRemote() {.async.} =
@ -168,10 +172,11 @@ proc send*(
p.sentRpcCache.put(digest) p.sentRpcCache.put(digest)
trace "sent pubsub message to remote" trace "sent pubsub message to remote"
for x in mm.messages: when defined(libp2p_expensive_metrics):
for t in x.topicIDs: for x in mm.messages:
# metrics for t in x.topicIDs:
libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t]) # metrics
libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t])
let sendFut = sendToRemote() let sendFut = sendToRemote()
try: try: