mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-07 16:33:08 +00:00
deploy: a74d2309f2325b2ee23b22dbc9cad8766e87e841
This commit is contained in:
parent
9edcea0cee
commit
d8e4a5258f
@ -87,7 +87,7 @@ prometheus with this config, e.g.:
|
|||||||
|
|
||||||
```bash
|
```bash
|
||||||
cd ./metrics/prometheus
|
cd ./metrics/prometheus
|
||||||
prometheus
|
prometheus --config.file=prometheus.yml
|
||||||
```
|
```
|
||||||
|
|
||||||
A Grafana dashboard containing the example dashboard for each simulation node
|
A Grafana dashboard containing the example dashboard for each simulation node
|
||||||
|
|||||||
@ -1,6 +1,6 @@
|
|||||||
import
|
import
|
||||||
std/[options, tables, strutils, sequtils],
|
std/[options, tables, strutils, sequtils],
|
||||||
chronos, chronicles, stew/shims/net as stewNet,
|
chronos, chronicles, metrics, stew/shims/net as stewNet,
|
||||||
# TODO: Why do we need eth keys?
|
# TODO: Why do we need eth keys?
|
||||||
eth/keys,
|
eth/keys,
|
||||||
libp2p/multiaddress,
|
libp2p/multiaddress,
|
||||||
@ -18,6 +18,10 @@ import
|
|||||||
./message_store/message_store,
|
./message_store/message_store,
|
||||||
../utils/requests
|
../utils/requests
|
||||||
|
|
||||||
|
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
|
||||||
|
declarePublicGauge waku_node_filters, "number of content filter subscriptions"
|
||||||
|
declarePublicGauge waku_node_errors, "number of wakunode errors", ["type"]
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "wakunode"
|
topics = "wakunode"
|
||||||
|
|
||||||
@ -181,6 +185,8 @@ proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHa
|
|||||||
id = await node.wakuFilter.subscribe(request)
|
id = await node.wakuFilter.subscribe(request)
|
||||||
node.filters[id] = Filter(contentFilters: request.contentFilters, handler: handler)
|
node.filters[id] = Filter(contentFilters: request.contentFilters, handler: handler)
|
||||||
|
|
||||||
|
waku_node_filters.set(node.filters.len.int64)
|
||||||
|
|
||||||
proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) {.async.} =
|
proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) {.async.} =
|
||||||
## Unsubscribes a handler from a PubSub topic.
|
## Unsubscribes a handler from a PubSub topic.
|
||||||
##
|
##
|
||||||
@ -213,6 +219,8 @@ proc unsubscribe*(node: WakuNode, request: FilterRequest) {.async, gcsafe.} =
|
|||||||
await node.wakuFilter.unsubscribe(request)
|
await node.wakuFilter.unsubscribe(request)
|
||||||
node.filters.removeContentFilters(request.contentFilters)
|
node.filters.removeContentFilters(request.contentFilters)
|
||||||
|
|
||||||
|
waku_node_filters.set(node.filters.len.int64)
|
||||||
|
|
||||||
|
|
||||||
proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) {.async, gcsafe.} =
|
proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) {.async, gcsafe.} =
|
||||||
## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a
|
## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a
|
||||||
@ -263,6 +271,7 @@ proc mountFilter*(node: WakuNode) =
|
|||||||
info "push received"
|
info "push received"
|
||||||
for message in msg.messages:
|
for message in msg.messages:
|
||||||
node.filters.notify(message, requestId)
|
node.filters.notify(message, requestId)
|
||||||
|
waku_node_messages.inc(labelValues = ["filter"])
|
||||||
|
|
||||||
node.wakuFilter = WakuFilter.init(node.switch, node.rng, filterHandler)
|
node.wakuFilter = WakuFilter.init(node.switch, node.rng, filterHandler)
|
||||||
node.switch.mount(node.wakuFilter)
|
node.switch.mount(node.wakuFilter)
|
||||||
@ -317,6 +326,7 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRela
|
|||||||
if msg.isOk():
|
if msg.isOk():
|
||||||
node.filters.notify(msg.value(), "")
|
node.filters.notify(msg.value(), "")
|
||||||
await node.subscriptions.notify(topic, msg.value())
|
await node.subscriptions.notify(topic, msg.value())
|
||||||
|
waku_node_messages.inc(labelValues = ["relay"])
|
||||||
|
|
||||||
await node.wakuRelay.subscribe("/waku/2/default-waku/proto", relayHandler)
|
await node.wakuRelay.subscribe("/waku/2/default-waku/proto", relayHandler)
|
||||||
|
|
||||||
@ -433,8 +443,11 @@ when isMainModule:
|
|||||||
{.gcsafe.}:
|
{.gcsafe.}:
|
||||||
# TODO: libp2p_pubsub_peers is not public, so we need to make this either
|
# TODO: libp2p_pubsub_peers is not public, so we need to make this either
|
||||||
# public in libp2p or do our own peer counting after all.
|
# public in libp2p or do our own peer counting after all.
|
||||||
let
|
var
|
||||||
totalMessages = total_messages.value
|
totalMessages = 0.float64
|
||||||
|
|
||||||
|
for key in waku_node_messages.metrics.keys():
|
||||||
|
totalMessages = totalMessages + waku_node_messages.value(key)
|
||||||
|
|
||||||
info "Node metrics", totalMessages
|
info "Node metrics", totalMessages
|
||||||
discard setTimer(Moment.fromNow(2.seconds), logMetrics)
|
discard setTimer(Moment.fromNow(2.seconds), logMetrics)
|
||||||
@ -467,10 +480,12 @@ when isMainModule:
|
|||||||
let dbRes = SqliteDatabase.init(conf.dbpath)
|
let dbRes = SqliteDatabase.init(conf.dbpath)
|
||||||
if dbRes.isErr:
|
if dbRes.isErr:
|
||||||
warn "failed to init database", err = dbRes.error
|
warn "failed to init database", err = dbRes.error
|
||||||
|
waku_node_errors.inc(labelValues = ["init_db_failure"])
|
||||||
|
|
||||||
let res = WakuMessageStore.init(dbRes.value)
|
let res = WakuMessageStore.init(dbRes.value)
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
warn "failed to init WakuMessageStore", err = res.error
|
warn "failed to init WakuMessageStore", err = res.error
|
||||||
|
waku_node_errors.inc(labelValues = ["init_store_failure"])
|
||||||
else:
|
else:
|
||||||
store = res.value
|
store = res.value
|
||||||
|
|
||||||
|
|||||||
@ -20,6 +20,10 @@ import
|
|||||||
|
|
||||||
export waku_filter_types
|
export waku_filter_types
|
||||||
|
|
||||||
|
declarePublicGauge waku_filter_peers, "number of filter peers"
|
||||||
|
declarePublicGauge waku_filter_subscribers, "number of light node filter subscribers"
|
||||||
|
declarePublicGauge waku_filter_errors, "number of filter protocol errors", ["type"]
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "wakufilter"
|
topics = "wakufilter"
|
||||||
|
|
||||||
@ -162,6 +166,7 @@ method init*(wf: WakuFilter) =
|
|||||||
var res = FilterRPC.init(message)
|
var res = FilterRPC.init(message)
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
error "failed to decode rpc"
|
error "failed to decode rpc"
|
||||||
|
waku_filter_errors.inc(labelValues = ["decode_rpc_failure"])
|
||||||
return
|
return
|
||||||
|
|
||||||
info "filter message received"
|
info "filter message received"
|
||||||
@ -175,6 +180,8 @@ method init*(wf: WakuFilter) =
|
|||||||
else:
|
else:
|
||||||
wf.subscribers.unsubscribeFilters(value.request, conn.peerInfo.peerId)
|
wf.subscribers.unsubscribeFilters(value.request, conn.peerInfo.peerId)
|
||||||
|
|
||||||
|
waku_filter_subscribers.set(wf.subscribers.len.int64)
|
||||||
|
|
||||||
wf.handler = handle
|
wf.handler = handle
|
||||||
wf.codec = WakuFilterCodec
|
wf.codec = WakuFilterCodec
|
||||||
|
|
||||||
@ -188,6 +195,7 @@ proc init*(T: type WakuFilter, switch: Switch, rng: ref BrHmacDrbgContext, handl
|
|||||||
# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY
|
# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY
|
||||||
proc setPeer*(wf: WakuFilter, peer: PeerInfo) =
|
proc setPeer*(wf: WakuFilter, peer: PeerInfo) =
|
||||||
wf.peers.add(FilterPeer(peerInfo: peer))
|
wf.peers.add(FilterPeer(peerInfo: peer))
|
||||||
|
waku_filter_peers.inc()
|
||||||
|
|
||||||
proc subscription*(proto: WakuFilter): MessageNotificationSubscription =
|
proc subscription*(proto: WakuFilter): MessageNotificationSubscription =
|
||||||
## Returns a Filter for the specific protocol
|
## Returns a Filter for the specific protocol
|
||||||
|
|||||||
@ -9,8 +9,6 @@ import
|
|||||||
libp2p/protocols/pubsub/rpc/messages,
|
libp2p/protocols/pubsub/rpc/messages,
|
||||||
libp2p/stream/connection
|
libp2p/stream/connection
|
||||||
|
|
||||||
declarePublicGauge total_messages, "number of messages received"
|
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "wakurelay"
|
topics = "wakurelay"
|
||||||
|
|
||||||
|
|||||||
@ -19,6 +19,10 @@ import
|
|||||||
|
|
||||||
export waku_store_types
|
export waku_store_types
|
||||||
|
|
||||||
|
declarePublicGauge waku_store_messages, "number of historical messages"
|
||||||
|
declarePublicGauge waku_store_peers, "number of store peers"
|
||||||
|
declarePublicGauge waku_store_errors, "number of store protocol errors", ["type"]
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "wakustore"
|
topics = "wakustore"
|
||||||
|
|
||||||
@ -303,6 +307,7 @@ method init*(ws: WakuStore) =
|
|||||||
var res = HistoryRPC.init(message)
|
var res = HistoryRPC.init(message)
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
error "failed to decode rpc"
|
error "failed to decode rpc"
|
||||||
|
waku_store_errors.inc(labelValues = ["decode_rpc_failure"])
|
||||||
return
|
return
|
||||||
|
|
||||||
info "received query"
|
info "received query"
|
||||||
@ -336,6 +341,9 @@ method init*(ws: WakuStore) =
|
|||||||
let res = ws.store.getAll(onData)
|
let res = ws.store.getAll(onData)
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
warn "failed to load messages from store", err = res.error
|
warn "failed to load messages from store", err = res.error
|
||||||
|
waku_store_errors.inc(labelValues = ["store_load_failure"])
|
||||||
|
|
||||||
|
waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"])
|
||||||
|
|
||||||
proc init*(T: type WakuStore, switch: Switch, rng: ref BrHmacDrbgContext,
|
proc init*(T: type WakuStore, switch: Switch, rng: ref BrHmacDrbgContext,
|
||||||
store: MessageStore = nil, wakuSwap: WakuSwap = nil): T =
|
store: MessageStore = nil, wakuSwap: WakuSwap = nil): T =
|
||||||
@ -349,6 +357,7 @@ proc init*(T: type WakuStore, switch: Switch, rng: ref BrHmacDrbgContext,
|
|||||||
# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY
|
# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY
|
||||||
proc setPeer*(ws: WakuStore, peer: PeerInfo) =
|
proc setPeer*(ws: WakuStore, peer: PeerInfo) =
|
||||||
ws.peers.add(HistoryPeer(peerInfo: peer))
|
ws.peers.add(HistoryPeer(peerInfo: peer))
|
||||||
|
waku_store_peers.inc()
|
||||||
|
|
||||||
proc subscription*(proto: WakuStore): MessageNotificationSubscription =
|
proc subscription*(proto: WakuStore): MessageNotificationSubscription =
|
||||||
## The filter function returns the pubsub filter for the node.
|
## The filter function returns the pubsub filter for the node.
|
||||||
@ -358,12 +367,14 @@ proc subscription*(proto: WakuStore): MessageNotificationSubscription =
|
|||||||
proc handle(topic: string, msg: WakuMessage) {.async.} =
|
proc handle(topic: string, msg: WakuMessage) {.async.} =
|
||||||
let index = msg.computeIndex()
|
let index = msg.computeIndex()
|
||||||
proto.messages.add(IndexedWakuMessage(msg: msg, index: index))
|
proto.messages.add(IndexedWakuMessage(msg: msg, index: index))
|
||||||
|
waku_store_messages.inc(labelValues = ["stored"])
|
||||||
if proto.store.isNil:
|
if proto.store.isNil:
|
||||||
return
|
return
|
||||||
|
|
||||||
let res = proto.store.put(index, msg)
|
let res = proto.store.put(index, msg)
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
warn "failed to store messages", err = res.error
|
warn "failed to store messages", err = res.error
|
||||||
|
waku_store_errors.inc(labelValues = ["store_failure"])
|
||||||
|
|
||||||
MessageNotificationSubscription.init(@[], handle)
|
MessageNotificationSubscription.init(@[], handle)
|
||||||
|
|
||||||
@ -386,8 +397,11 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn
|
|||||||
|
|
||||||
if response.isErr:
|
if response.isErr:
|
||||||
error "failed to decode response"
|
error "failed to decode response"
|
||||||
|
waku_store_errors.inc(labelValues = ["decode_rpc_failure"])
|
||||||
return
|
return
|
||||||
|
|
||||||
|
waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"])
|
||||||
|
|
||||||
handler(response.value.response)
|
handler(response.value.response)
|
||||||
|
|
||||||
# NOTE: Experimental, maybe incorporate as part of query call
|
# NOTE: Experimental, maybe incorporate as part of query call
|
||||||
@ -410,6 +424,7 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand
|
|||||||
|
|
||||||
if response.isErr:
|
if response.isErr:
|
||||||
error "failed to decode response"
|
error "failed to decode response"
|
||||||
|
waku_store_errors.inc(labelValues = ["decode_rpc_failure"])
|
||||||
return
|
return
|
||||||
|
|
||||||
# NOTE Perform accounting operation
|
# NOTE Perform accounting operation
|
||||||
@ -418,4 +433,6 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand
|
|||||||
let messages = response.value.response.messages
|
let messages = response.value.response.messages
|
||||||
ws.wakuSwap.debit(peerId, messages.len)
|
ws.wakuSwap.debit(peerId, messages.len)
|
||||||
|
|
||||||
|
waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"])
|
||||||
|
|
||||||
handler(response.value.response)
|
handler(response.value.response)
|
||||||
|
|||||||
@ -35,6 +35,9 @@ import
|
|||||||
|
|
||||||
export waku_swap_types
|
export waku_swap_types
|
||||||
|
|
||||||
|
declarePublicGauge waku_swap_peers, "number of swap peers"
|
||||||
|
declarePublicGauge waku_swap_errors, "number of swap protocol errors", ["type"]
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "wakuswap"
|
topics = "wakuswap"
|
||||||
|
|
||||||
@ -115,6 +118,7 @@ proc init*(wakuSwap: WakuSwap) =
|
|||||||
var res = Cheque.init(message)
|
var res = Cheque.init(message)
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
error "failed to decode rpc"
|
error "failed to decode rpc"
|
||||||
|
waku_swap_errors.inc(labelValues = ["decode_rpc_failure"])
|
||||||
return
|
return
|
||||||
|
|
||||||
info "received cheque", value=res.value
|
info "received cheque", value=res.value
|
||||||
@ -171,6 +175,7 @@ proc init*(T: type WakuSwap, switch: Switch, rng: ref BrHmacDrbgContext): T =
|
|||||||
|
|
||||||
proc setPeer*(ws: WakuSwap, peer: PeerInfo) =
|
proc setPeer*(ws: WakuSwap, peer: PeerInfo) =
|
||||||
ws.peers.add(SwapPeer(peerInfo: peer))
|
ws.peers.add(SwapPeer(peerInfo: peer))
|
||||||
|
waku_swap_peers.inc()
|
||||||
|
|
||||||
# TODO End to end communication
|
# TODO End to end communication
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user