mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-05 23:43:07 +00:00
Added protocol message counters (#804)
* Added protocol message counters * Fix how stored messages are counted
This commit is contained in:
parent
26479e0a9b
commit
4f1e53d828
@ -9,6 +9,7 @@ This release contains the following:
|
|||||||
### Changes
|
### Changes
|
||||||
- The WakuInfo Object field of `listenStr` is deprecated and is now replaced with `listenAddresses`
|
- The WakuInfo Object field of `listenStr` is deprecated and is now replaced with `listenAddresses`
|
||||||
which is a sequence of string.
|
which is a sequence of string.
|
||||||
|
- Metrics: added counters for protocol messages
|
||||||
|
|
||||||
## 2021-11-05 v0.6
|
## 2021-11-05 v0.6
|
||||||
|
|
||||||
|
|||||||
@ -24,6 +24,7 @@ export waku_filter_types
|
|||||||
declarePublicGauge waku_filter_peers, "number of filter peers"
|
declarePublicGauge waku_filter_peers, "number of filter peers"
|
||||||
declarePublicGauge waku_filter_subscribers, "number of light node filter subscribers"
|
declarePublicGauge waku_filter_subscribers, "number of light node filter subscribers"
|
||||||
declarePublicGauge waku_filter_errors, "number of filter protocol errors", ["type"]
|
declarePublicGauge waku_filter_errors, "number of filter protocol errors", ["type"]
|
||||||
|
declarePublicGauge waku_filter_messages, "number of filter messages received", ["type"]
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "wakufilter"
|
topics = "wakufilter"
|
||||||
@ -177,8 +178,10 @@ method init*(wf: WakuFilter) =
|
|||||||
|
|
||||||
let value = res.value
|
let value = res.value
|
||||||
if value.push != MessagePush():
|
if value.push != MessagePush():
|
||||||
|
waku_filter_messages.inc(labelValues = ["MessagePush"])
|
||||||
wf.pushHandler(value.requestId, value.push)
|
wf.pushHandler(value.requestId, value.push)
|
||||||
if value.request != FilterRequest():
|
if value.request != FilterRequest():
|
||||||
|
waku_filter_messages.inc(labelValues = ["FilterRequest"])
|
||||||
if value.request.subscribe:
|
if value.request.subscribe:
|
||||||
wf.subscribers.add(Subscriber(peer: conn.peerId, requestId: value.requestId, filter: value.request))
|
wf.subscribers.add(Subscriber(peer: conn.peerId, requestId: value.requestId, filter: value.request))
|
||||||
else:
|
else:
|
||||||
|
|||||||
@ -20,6 +20,7 @@ export waku_lightpush_types
|
|||||||
|
|
||||||
declarePublicGauge waku_lightpush_peers, "number of lightpush peers"
|
declarePublicGauge waku_lightpush_peers, "number of lightpush peers"
|
||||||
declarePublicGauge waku_lightpush_errors, "number of lightpush protocol errors", ["type"]
|
declarePublicGauge waku_lightpush_errors, "number of lightpush protocol errors", ["type"]
|
||||||
|
declarePublicGauge waku_lightpush_messages, "number of lightpush messages received", ["type"]
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "wakulightpush"
|
topics = "wakulightpush"
|
||||||
@ -136,6 +137,7 @@ method init*(wlp: WakuLightPush) =
|
|||||||
let value = res.value
|
let value = res.value
|
||||||
if value.request != PushRequest():
|
if value.request != PushRequest():
|
||||||
info "lightpush push request"
|
info "lightpush push request"
|
||||||
|
waku_lightpush_messages.inc(labelValues = ["PushRequest"])
|
||||||
let
|
let
|
||||||
pubSubTopic = value.request.pubSubTopic
|
pubSubTopic = value.request.pubSubTopic
|
||||||
message = value.request.message
|
message = value.request.message
|
||||||
@ -154,6 +156,7 @@ method init*(wlp: WakuLightPush) =
|
|||||||
response: response).encode().buffer)
|
response: response).encode().buffer)
|
||||||
#wlp.requestHandler(value.requestId, value.request)
|
#wlp.requestHandler(value.requestId, value.request)
|
||||||
if value.response != PushResponse():
|
if value.response != PushResponse():
|
||||||
|
waku_lightpush_messages.inc(labelValues = ["PushResponse"])
|
||||||
if value.response.isSuccess:
|
if value.response.isSuccess:
|
||||||
info "lightpush message success"
|
info "lightpush message success"
|
||||||
else:
|
else:
|
||||||
|
|||||||
@ -38,6 +38,7 @@ export
|
|||||||
declarePublicGauge waku_store_messages, "number of historical messages", ["type"]
|
declarePublicGauge waku_store_messages, "number of historical messages", ["type"]
|
||||||
declarePublicGauge waku_store_peers, "number of store peers"
|
declarePublicGauge waku_store_peers, "number of store peers"
|
||||||
declarePublicGauge waku_store_errors, "number of store protocol errors", ["type"]
|
declarePublicGauge waku_store_errors, "number of store protocol errors", ["type"]
|
||||||
|
declarePublicGauge waku_store_queries, "number of store queries received"
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "wakustore"
|
topics = "wakustore"
|
||||||
@ -422,6 +423,7 @@ proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) =
|
|||||||
|
|
||||||
# TODO Print more info here
|
# TODO Print more info here
|
||||||
info "received query"
|
info "received query"
|
||||||
|
waku_store_queries.inc()
|
||||||
|
|
||||||
let value = res.value
|
let value = res.value
|
||||||
let response = ws.findMessages(res.value.query)
|
let response = ws.findMessages(res.value.query)
|
||||||
@ -489,7 +491,7 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} =
|
|||||||
|
|
||||||
let index = msg.computeIndex()
|
let index = msg.computeIndex()
|
||||||
w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic))
|
w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic))
|
||||||
waku_store_messages.inc(labelValues = ["stored"])
|
waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"])
|
||||||
if w.store.isNil:
|
if w.store.isNil:
|
||||||
return
|
return
|
||||||
|
|
||||||
@ -671,9 +673,9 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
ws.messages.add(indexedWakuMsg)
|
ws.messages.add(indexedWakuMsg)
|
||||||
waku_store_messages.inc(labelValues = ["stored"])
|
|
||||||
|
|
||||||
added = added + 1
|
added = added + 1
|
||||||
|
|
||||||
|
waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"])
|
||||||
|
|
||||||
debug "number of duplicate messages found in resume", dismissed=dismissed
|
debug "number of duplicate messages found in resume", dismissed=dismissed
|
||||||
debug "number of messages added via resume", added=added
|
debug "number of messages added via resume", added=added
|
||||||
|
|||||||
@ -50,6 +50,7 @@ const swapAccountBalanceBuckets = [-Inf, -200.0, -150.0, -100.0, -50.0, 0.0, 50.
|
|||||||
|
|
||||||
declarePublicGauge waku_swap_peers_count, "number of swap peers"
|
declarePublicGauge waku_swap_peers_count, "number of swap peers"
|
||||||
declarePublicGauge waku_swap_errors, "number of swap protocol errors", ["type"]
|
declarePublicGauge waku_swap_errors, "number of swap protocol errors", ["type"]
|
||||||
|
declarePublicGauge waku_swap_messages, "number of swap messages received", ["type"]
|
||||||
declarePublicHistogram waku_peer_swap_account_balance, "Swap Account Balance for waku peers, aggregated into buckets based on threshold limits", buckets = swapAccountBalanceBuckets
|
declarePublicHistogram waku_peer_swap_account_balance, "Swap Account Balance for waku peers, aggregated into buckets based on threshold limits", buckets = swapAccountBalanceBuckets
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
@ -224,6 +225,7 @@ proc init*(wakuSwap: WakuSwap) =
|
|||||||
return
|
return
|
||||||
|
|
||||||
info "received cheque", value=res.value
|
info "received cheque", value=res.value
|
||||||
|
waku_swap_messages.inc(labelValues = ["Cheque"])
|
||||||
wakuSwap.handleCheque(res.value, conn.peerId)
|
wakuSwap.handleCheque(res.value, conn.peerId)
|
||||||
|
|
||||||
proc credit(peerId: PeerID, n: int)
|
proc credit(peerId: PeerID, n: int)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user