From 5ab69edd76dd0e6446ef2bb0eb69e2a8cf4bd798 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Wed, 16 Apr 2025 17:04:52 +0200 Subject: [PATCH] chore: update lite-protocol-tester for handling shard argument. (#3371) * chore: replace pubsub topic with shard configuration across the lite protocol tester * chore: enhance protocol performance - response time - metrics * fix filter-client double mounting possibility. --- apps/liteprotocoltester/.env | 6 ++--- .../Dockerfile.liteprotocoltester.compile | 2 ++ apps/liteprotocoltester/README.md | 6 ++--- .../docker-compose-on-simularor.yml | 2 +- apps/liteprotocoltester/docker-compose.yml | 2 +- apps/liteprotocoltester/filter_subscriber.nim | 8 ++++--- apps/liteprotocoltester/infra.env | 2 +- .../lightpush_publisher.nim | 9 +++++++- .../liteprotocoltester/liteprotocoltester.nim | 5 ++--- apps/liteprotocoltester/lpt_metrics.nim | 7 ++++++ apps/liteprotocoltester/run_service_node.sh | 8 +++---- apps/liteprotocoltester/run_tester_node.sh | 16 +++++++++----- .../run_tester_node_at_infra.sh | 16 +++++++++----- .../run_tester_node_on_fleet.sh | 22 ++++++++++++------- .../service_peer_management.nim | 4 ++-- apps/liteprotocoltester/tester_config.nim | 17 +++++--------- waku/common/rate_limit/request_limiter.nim | 6 ++--- waku/common/rate_limit/service_metrics.nim | 10 ++++++++- .../rate_limit/single_token_limiter.nim | 6 ++--- waku/node/waku_node.nim | 12 +++++++--- waku/waku_filter_v2/protocol_metrics.nim | 6 ++++- waku/waku_lightpush/protocol.nim | 3 ++- waku/waku_lightpush_legacy/protocol.nim | 4 +++- 23 files changed, 115 insertions(+), 64 deletions(-) diff --git a/apps/liteprotocoltester/.env b/apps/liteprotocoltester/.env index 4f7c49976..0330284e1 100644 --- a/apps/liteprotocoltester/.env +++ b/apps/liteprotocoltester/.env @@ -12,16 +12,16 @@ MIN_MESSAGE_SIZE=15Kb MAX_MESSAGE_SIZE=145Kb ## for wakusim -#PUBSUB=/waku/2/rs/66/0 +#SHARD=0 #CONTENT_TOPIC=/tester/2/light-pubsub-test/wakusim #CLUSTER_ID=66 ## for status.prod -PUBSUB=/waku/2/rs/16/32 +#SHARDS=32 CONTENT_TOPIC=/tester/2/light-pubsub-test/fleet CLUSTER_ID=16 ## for TWN -#PUBSUB=/waku/2/rs/1/4 +#SHARD=4 #CONTENT_TOPIC=/tester/2/light-pubsub-test/twn #CLUSTER_ID=1 diff --git a/apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile b/apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile index cee1929ce..6d789ebd1 100644 --- a/apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile +++ b/apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile @@ -55,6 +55,8 @@ RUN chmod +x /usr/bin/liteprotocoltester FROM base_lpt AS standalone_lpt COPY --from=nim-build /app/apps/liteprotocoltester/run_tester_node.sh /usr/bin/ +COPY --from=nim-build /app/apps/liteprotocoltester/run_tester_node_on_fleet.sh /usr/bin/ + RUN chmod +x /usr/bin/run_tester_node.sh ENTRYPOINT ["/usr/bin/run_tester_node.sh", "/usr/bin/liteprotocoltester"] diff --git a/apps/liteprotocoltester/README.md b/apps/liteprotocoltester/README.md index eff025969..ea02ec1c1 100644 --- a/apps/liteprotocoltester/README.md +++ b/apps/liteprotocoltester/README.md @@ -127,7 +127,7 @@ Run a SENDER role liteprotocoltester and a RECEIVER role one on different termin | ---: | :--- | :--- | | NUM_MESSAGES | Number of message to publish, 0 means infinite | 120 | | MESSAGE_INTERVAL_MILLIS | Frequency of messages in milliseconds | 1000 | -| PUBSUB | Used pubsub_topic for testing | /waku/2/rs/66/0 | +| SHARD | Used shard for testing | 0 | | CONTENT_TOPIC | content_topic for testing | /tester/1/light-pubsub-example/proto | | CLUSTER_ID | cluster_id of the network | 16 | | START_PUBLISHING_AFTER_SECS | Delay in seconds before starting to publish to let service node connected | 5 | @@ -272,7 +272,7 @@ export NUM_MESSAGES=200 export MESSAGE_INTERVAL_MILLIS=1000 export MIN_MESSAGE_SIZE=15Kb export MAX_MESSAGE_SIZE=145Kb -export PUBSUB=/waku/2/rs/16/32 +export SHARD=32 export CONTENT_TOPIC=/tester/2/light-pubsub-test/fleet export CLUSTER_ID=16 @@ -307,7 +307,7 @@ export NUM_MESSAGES=300 export MESSAGE_INTERVAL_MILLIS=7000 export MIN_MESSAGE_SIZE=15Kb export MAX_MESSAGE_SIZE=145Kb -export PUBSUB=/waku/2/rs/1/4 +export SHARD=4 export CONTENT_TOPIC=/tester/2/light-pubsub-test/twn export CLUSTER_ID=1 diff --git a/apps/liteprotocoltester/docker-compose-on-simularor.yml b/apps/liteprotocoltester/docker-compose-on-simularor.yml index c63a294f2..9e899f78f 100644 --- a/apps/liteprotocoltester/docker-compose-on-simularor.yml +++ b/apps/liteprotocoltester/docker-compose-on-simularor.yml @@ -16,7 +16,7 @@ x-rln-environment: &rln_env x-test-running-conditions: &test_running_conditions NUM_MESSAGES: ${NUM_MESSAGES:-120} MESSAGE_INTERVAL_MILLIS: "${MESSAGE_INTERVAL_MILLIS:-1000}" - PUBSUB: ${PUBSUB:-/waku/2/rs/66/0} + SHARD: ${SHARD:-0} CONTENT_TOPIC: ${CONTENT_TOPIC:-/tester/2/light-pubsub-test/wakusim} CLUSTER_ID: ${CLUSTER_ID:-66} MIN_MESSAGE_SIZE: ${MIN_MESSAGE_SIZE:-1Kb} diff --git a/apps/liteprotocoltester/docker-compose.yml b/apps/liteprotocoltester/docker-compose.yml index afd2f1e72..0effbf8f0 100644 --- a/apps/liteprotocoltester/docker-compose.yml +++ b/apps/liteprotocoltester/docker-compose.yml @@ -16,7 +16,7 @@ x-rln-environment: &rln_env x-test-running-conditions: &test_running_conditions NUM_MESSAGES: ${NUM_MESSAGES:-120} MESSAGE_INTERVAL_MILLIS: "${MESSAGE_INTERVAL_MILLIS:-1000}" - PUBSUB: ${PUBSUB:-/waku/2/rs/66/0} + SHARD: ${SHARD:-0} CONTENT_TOPIC: ${CONTENT_TOPIC:-/tester/2/light-pubsub-test/wakusim} CLUSTER_ID: ${CLUSTER_ID:-66} MIN_MESSAGE_SIZE: ${MIN_MESSAGE_SIZE:-1Kb} diff --git a/apps/liteprotocoltester/filter_subscriber.nim b/apps/liteprotocoltester/filter_subscriber.nim index 143e0ca80..fbb11c92e 100644 --- a/apps/liteprotocoltester/filter_subscriber.nim +++ b/apps/liteprotocoltester/filter_subscriber.nim @@ -130,7 +130,9 @@ proc setupAndSubscribe*( var stats: PerPeerStatistics actualFilterPeer = servicePeer - let pushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.async.} = + let pushHandler = proc( + pubsubTopic: PubsubTopic, message: WakuMessage + ): Future[void] {.async, closure.} = let payloadStr = string.fromBytes(message.payload) let testerMessage = js.Json.decode(payloadStr, ProtocolTesterMessage) let msgHash = computeMessageHash(pubsubTopic, message).to0xHex @@ -163,7 +165,7 @@ proc setupAndSubscribe*( if conf.numMessages > 0 and waitFor stats.checkIfAllMessagesReceived(maxWaitForLastMessage): - waitFor unsubscribe(wakuNode, conf.pubsubTopics[0], conf.contentTopics[0]) + waitFor unsubscribe(wakuNode, conf.getPubsubTopic(), conf.contentTopics[0]) info "All messages received. Exiting." ## for gracefull shutdown through signal hooks @@ -176,5 +178,5 @@ proc setupAndSubscribe*( # Start maintaining subscription asyncSpawn maintainSubscription( - wakuNode, conf.pubsubTopics[0], conf.contentTopics[0], conf.fixedServicePeer + wakuNode, conf.getPubsubTopic(), conf.contentTopics[0], conf.fixedServicePeer ) diff --git a/apps/liteprotocoltester/infra.env b/apps/liteprotocoltester/infra.env index 6d4542eca..ebf614732 100644 --- a/apps/liteprotocoltester/infra.env +++ b/apps/liteprotocoltester/infra.env @@ -4,7 +4,7 @@ NUM_MESSAGES=300 MESSAGE_INTERVAL_MILLIS=1000 MIN_MESSAGE_SIZE=15Kb MAX_MESSAGE_SIZE=145Kb -PUBSUB=/waku/2/rs/16/32 +SHARD=32 CONTENT_TOPIC=/tester/2/light-pubsub-test-at-infra/status-prod CLUSTER_ID=16 LIGHTPUSH_BOOTSTRAP=enr:-QEKuED9AJm2HGgrRpVaJY2nj68ao_QiPeUT43sK-aRM7sMJ6R4G11OSDOwnvVacgN1sTw-K7soC5dzHDFZgZkHU0u-XAYJpZIJ2NIJpcISnYxMvim11bHRpYWRkcnO4WgAqNiVib290LTAxLmRvLWFtczMuc3RhdHVzLnByb2Quc3RhdHVzLmltBnZfACw2JWJvb3QtMDEuZG8tYW1zMy5zdGF0dXMucHJvZC5zdGF0dXMuaW0GAbveA4Jyc40AEAUAAQAgAEAAgAEAiXNlY3AyNTZrMaEC3rRtFQSgc24uWewzXaxTY8hDAHB8sgnxr9k8Rjb5GeSDdGNwgnZfg3VkcIIjKIV3YWt1Mg0 diff --git a/apps/liteprotocoltester/lightpush_publisher.nim b/apps/liteprotocoltester/lightpush_publisher.nim index 32f802fe4..d79e68590 100644 --- a/apps/liteprotocoltester/lightpush_publisher.nim +++ b/apps/liteprotocoltester/lightpush_publisher.nim @@ -145,13 +145,20 @@ proc publishMessages( lightpushContentTopic, renderMsgSize, ) + + let publishStartTime = Moment.now() + let wlpRes = await wakuNode.legacyLightpushPublish( some(lightpushPubsubTopic), message, actualServicePeer ) + let publishDuration = Moment.now() - publishStartTime + let msgHash = computeMessageHash(lightpushPubsubTopic, message).to0xHex if wlpRes.isOk(): + lpt_publish_duration_seconds.observe(publishDuration.milliseconds.float / 1000) + sentMessages[messagesSent] = (hash: msgHash, relayed: true) notice "published message using lightpush", index = messagesSent + 1, @@ -251,7 +258,7 @@ proc setupAndPublish*( asyncSpawn publishMessages( wakuNode, servicePeer, - conf.pubsubTopics[0], + conf.getPubsubTopic(), conf.contentTopics[0], conf.numMessages, (min: parsedMinMsgSize, max: parsedMaxMsgSize), diff --git a/apps/liteprotocoltester/liteprotocoltester.nim b/apps/liteprotocoltester/liteprotocoltester.nim index c23b80e72..ef63e6e7d 100644 --- a/apps/liteprotocoltester/liteprotocoltester.nim +++ b/apps/liteprotocoltester/liteprotocoltester.nim @@ -99,7 +99,7 @@ when isMainModule: wakuConf.dnsAddrs = true wakuConf.dnsAddrsNameServers = @[parseIpAddress("8.8.8.8"), parseIpAddress("1.1.1.1")] - wakuConf.pubsubTopics = conf.pubsubTopics + wakuConf.shards = @[conf.shard] wakuConf.contentTopics = conf.contentTopics wakuConf.clusterId = conf.clusterId ## TODO: Depending on the tester needs we might extend here with shards, clusterId, etc... @@ -118,6 +118,7 @@ when isMainModule: wakuConf.store = false wakuConf.rest = false + wakuConf.relayServiceRatio = "40:60" # NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it # It will always be called from main thread anyway. @@ -202,10 +203,8 @@ when isMainModule: var codec = WakuLightPushCodec # mounting relevant client, for PX filter client must be mounted ahead if conf.testFunc == TesterFunctionality.SENDER: - wakuApp.node.mountLegacyLightPushClient() codec = WakuLightPushCodec else: - waitFor wakuApp.node.mountFilterClient() codec = WakuFilterSubscribeCodec var lookForServiceNode = false diff --git a/apps/liteprotocoltester/lpt_metrics.nim b/apps/liteprotocoltester/lpt_metrics.nim index e68164d13..8b30619de 100644 --- a/apps/liteprotocoltester/lpt_metrics.nim +++ b/apps/liteprotocoltester/lpt_metrics.nim @@ -47,3 +47,10 @@ declarePublicGauge lpt_px_peers, declarePublicGauge lpt_dialed_peers, "Number of peers successfully dialed", ["agent"] declarePublicGauge lpt_dial_failures, "Number of dial failures by cause", ["agent"] + +declarePublicHistogram lpt_publish_duration_seconds, + "duration to lightpush messages", + buckets = [ + 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0, + 15.0, 20.0, 30.0, Inf, + ] diff --git a/apps/liteprotocoltester/run_service_node.sh b/apps/liteprotocoltester/run_service_node.sh index 1d36292c1..07fdbe980 100755 --- a/apps/liteprotocoltester/run_service_node.sh +++ b/apps/liteprotocoltester/run_service_node.sh @@ -5,10 +5,10 @@ IP=$(ip a | grep "inet " | grep -Fv 127.0.0.1 | sed 's/.*inet \([^/]*\).*/\1/') echo "Service node IP: ${IP}" -if [ -n "${PUBSUB}" ]; then - PUBSUB=--pubsub-topic="${PUBSUB}" +if [ -n "${SHARD}" ]; then + SHARD=--shard="${SHARD}" else - PUBSUB=--pubsub-topic="/waku/2/rs/66/0" + SHARD=--shard="0" fi if [ -n "${CLUSTER_ID}" ]; then @@ -59,5 +59,5 @@ exec /usr/bin/wakunode\ --metrics-server-port=8003\ --metrics-server-address=0.0.0.0\ --nat=extip:${IP}\ - ${PUBSUB}\ + ${SHARD}\ ${CLUSTER_ID} diff --git a/apps/liteprotocoltester/run_tester_node.sh b/apps/liteprotocoltester/run_tester_node.sh index 8975fba91..4a80ca460 100755 --- a/apps/liteprotocoltester/run_tester_node.sh +++ b/apps/liteprotocoltester/run_tester_node.sh @@ -93,10 +93,10 @@ else FULL_NODE=--bootstrap-node="${SERIVCE_NODE_ADDR}" fi -if [ -n "${PUBSUB}" ]; then - PUBSUB=--pubsub-topic="${PUBSUB}" +if [ -n "${SHARD}" ]; then + SHARD=--shard="${SHARD}" else - PUBSUB=--pubsub-topic="/waku/2/rs/66/0" + SHARD=--shard="0" fi if [ -n "${CONTENT_TOPIC}" ]; then @@ -128,19 +128,25 @@ if [ -n "${MESSAGE_INTERVAL_MILLIS}" ]; then MESSAGE_INTERVAL_MILLIS=--message-interval="${MESSAGE_INTERVAL_MILLIS}" fi +if [ -n "${LOG_LEVEL}" ]; then + LOG_LEVEL=--log-level=${LOG_LEVEL} +else + LOG_LEVEL=--log-level=INFO +fi + echo "Running binary: ${BINARY_PATH}" echo "Tester node: ${FUNCTION}" echo "Using service node: ${SERIVCE_NODE_ADDR}" echo "My external IP: ${MY_EXT_IP}" exec "${BINARY_PATH}"\ - --log-level=INFO\ --nat=extip:${MY_EXT_IP}\ --test-peers\ + ${LOG_LEVEL}\ ${FULL_NODE}\ ${MESSAGE_INTERVAL_MILLIS}\ ${NUM_MESSAGES}\ - ${PUBSUB}\ + ${SHARD}\ ${CONTENT_TOPIC}\ ${CLUSTER_ID}\ ${FUNCTION}\ diff --git a/apps/liteprotocoltester/run_tester_node_at_infra.sh b/apps/liteprotocoltester/run_tester_node_at_infra.sh index 6cec4b006..e926875aa 100644 --- a/apps/liteprotocoltester/run_tester_node_at_infra.sh +++ b/apps/liteprotocoltester/run_tester_node_at_infra.sh @@ -48,10 +48,10 @@ fi MY_EXT_IP=$(wget -qO- --no-check-certificate https://api4.ipify.org) -if [ -n "${PUBSUB}" ]; then - PUBSUB=--pubsub-topic="${PUBSUB}" +if [ -n "${SHARD}" ]; then + SHARD=--shard="${SHARD}" else - PUBSUB=--pubsub-topic="/waku/2/rs/66/0" + SHARD=--shard="0" fi if [ -n "${CONTENT_TOPIC}" ]; then @@ -83,19 +83,25 @@ if [ -n "${MESSAGE_INTERVAL_MILLIS}" ]; then MESSAGE_INTERVAL_MILLIS=--message-interval="${MESSAGE_INTERVAL_MILLIS}" fi +if [ -n "${LOG_LEVEL}" ]; then + LOG_LEVEL=--log-level=${LOG_LEVEL} +else + LOG_LEVEL=--log-level=INFO +fi + echo "Running binary: ${BINARY_PATH}" echo "Node function is: ${FUNCTION}" echo "Using service/bootstrap node as: ${NODE_ARG}" echo "My external IP: ${MY_EXT_IP}" exec "${BINARY_PATH}"\ - --log-level=INFO\ --nat=extip:${MY_EXT_IP}\ --test-peers\ + ${LOG_LEVEL}\ ${NODE_ARG}\ ${MESSAGE_INTERVAL_MILLIS}\ ${NUM_MESSAGES}\ - ${PUBSUB}\ + ${SHARD}\ ${CONTENT_TOPIC}\ ${CLUSTER_ID}\ ${FUNCTION}\ diff --git a/apps/liteprotocoltester/run_tester_node_on_fleet.sh b/apps/liteprotocoltester/run_tester_node_on_fleet.sh index f0300cef2..538a890e6 100644 --- a/apps/liteprotocoltester/run_tester_node_on_fleet.sh +++ b/apps/liteprotocoltester/run_tester_node_on_fleet.sh @@ -48,10 +48,10 @@ fi MY_EXT_IP=$(wget -qO- --no-check-certificate https://api4.ipify.org) -if [ -n "${PUBSUB}" ]; then - PUBSUB=--pubsub-topic="${PUBSUB}" +if [ -n "${SHARD}" ]; then + SHARD=--shard=${SHARD} else - PUBSUB=--pubsub-topic="/waku/2/rs/66/0" + SHARD=--shard=0 fi if [ -n "${CONTENT_TOPIC}" ]; then @@ -79,8 +79,14 @@ if [ -n "${NUM_MESSAGES}" ]; then NUM_MESSAGES=--num-messages="${NUM_MESSAGES}" fi -if [ -n "${DELAY_MESSAGES}" ]; then - DELAY_MESSAGES=--delay-messages="${DELAY_MESSAGES}" +if [ -n "${MESSAGE_INTERVAL_MILLIS}" ]; then + MESSAGE_INTERVAL_MILLIS=--message-interval="${MESSAGE_INTERVAL_MILLIS}" +fi + +if [ -n "${LOG_LEVEL}" ]; then + LOG_LEVEL=--log-level=${LOG_LEVEL} +else + LOG_LEVEL=--log-level=INFO fi echo "Running binary: ${BINARY_PATH}" @@ -89,12 +95,12 @@ echo "Using service/bootstrap node as: ${NODE_ARG}" echo "My external IP: ${MY_EXT_IP}" exec "${BINARY_PATH}"\ - --log-level=INFO\ --nat=extip:${MY_EXT_IP}\ + ${LOG_LEVEL}\ ${NODE_ARG}\ - ${DELAY_MESSAGES}\ + ${MESSAGE_INTERVAL_MILLIS}\ ${NUM_MESSAGES}\ - ${PUBSUB}\ + ${SHARD}\ ${CONTENT_TOPIC}\ ${CLUSTER_ID}\ ${FUNCTION}\ diff --git a/apps/liteprotocoltester/service_peer_management.nim b/apps/liteprotocoltester/service_peer_management.nim index 83216ae3b..a303c3c58 100644 --- a/apps/liteprotocoltester/service_peer_management.nim +++ b/apps/liteprotocoltester/service_peer_management.nim @@ -189,14 +189,14 @@ proc pxLookupServiceNode*( if conf.testPeers: let peersOpt = - await tryCallAllPxPeers(node.peerManager, codec, conf.pubsubTopics[0]) + await tryCallAllPxPeers(node.peerManager, codec, conf.getPubsubTopic()) if peersOpt.isSome(): info "Found service peers for codec", codec = codec, peer_count = peersOpt.get().len() return ok(peersOpt.get().len > 0) else: let peerOpt = - await selectRandomCapablePeer(node.peerManager, codec, conf.pubsubTopics[0]) + await selectRandomCapablePeer(node.peerManager, codec, conf.getPubsubTopic()) if peerOpt.isSome(): info "Found service peer for codec", codec = codec, peer = peerOpt.get() return ok(true) diff --git a/apps/liteprotocoltester/tester_config.nim b/apps/liteprotocoltester/tester_config.nim index 115686be3..eccaafc06 100644 --- a/apps/liteprotocoltester/tester_config.nim +++ b/apps/liteprotocoltester/tester_config.nim @@ -18,6 +18,7 @@ import common/logging, factory/external_config, waku_core, + waku_core/topics/pubsub_topic, ] export confTomlDefs, confTomlNet, confEnvvarDefs, confEnvvarNet @@ -95,18 +96,9 @@ type LiteProtocolTesterConf* = object name: "message-interval" .}: uint32 - pubsubTopics* {. - desc: "Default pubsub topic to subscribe to. Argument may be repeated.", - defaultValue: @[LitePubsubTopic], - name: "pubsub-topic" - .}: seq[PubsubTopic] + shard* {.desc: "Shards index to subscribe to. ", defaultValue: 0, name: "shard".}: + uint16 - ## TODO: extend lite protocol tester configuration based on testing needs - # shards* {. - # desc: "Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.", - # defaultValue: @[], - # name: "shard" - # .}: seq[uint16] contentTopics* {. desc: "Default content topic to subscribe to. Argument may be repeated.", defaultValue: @[LiteContentTopic], @@ -195,4 +187,7 @@ proc load*(T: type LiteProtocolTesterConf, version = ""): ConfResult[T] = except CatchableError: err(getCurrentExceptionMsg()) +proc getPubsubTopic*(conf: LiteProtocolTesterConf): PubsubTopic = + return $RelayShard(clusterId: conf.clusterId, shardId: conf.shard) + {.pop.} diff --git a/waku/common/rate_limit/request_limiter.nim b/waku/common/rate_limit/request_limiter.nim index 7f33d0348..0ede20be4 100644 --- a/waku/common/rate_limit/request_limiter.nim +++ b/waku/common/rate_limit/request_limiter.nim @@ -78,14 +78,14 @@ template checkUsageLimit*( bodyWithinLimit, bodyRejected: untyped, ) = if t.checkUsage(proto, conn): - let requestStartTime = getTime().toUnixFloat() + let requestStartTime = Moment.now() waku_service_requests.inc(labelValues = [proto, "served"]) bodyWithinLimit - let requestDurationSec = getTime().toUnixFloat() - requestStartTime + let requestDuration = Moment.now() - requestStartTime waku_service_request_handling_duration_seconds.observe( - requestDurationSec, labelValues = [proto] + requestDuration.milliseconds.float / 1000, labelValues = [proto] ) else: waku_service_requests.inc(labelValues = [proto, "rejected"]) diff --git a/waku/common/rate_limit/service_metrics.nim b/waku/common/rate_limit/service_metrics.nim index 7d24d9530..bff91f622 100644 --- a/waku/common/rate_limit/service_metrics.nim +++ b/waku/common/rate_limit/service_metrics.nim @@ -1,8 +1,11 @@ {.push raises: [].} import std/options +import chronos/timer import metrics, setting +export metrics + declarePublicGauge waku_service_requests_limit, "Applied rate limit of non-relay service", ["service"] @@ -19,4 +22,9 @@ proc setServiceLimitMetric*(service: string, limit: Option[RateLimitSetting]) = ) declarePublicHistogram waku_service_request_handling_duration_seconds, - "duration of non-relay service handling", ["service"] + "duration of non-relay service handling", + labels = ["service"], + buckets = [ + 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0, + 15.0, 20.0, 30.0, Inf, + ] diff --git a/waku/common/rate_limit/single_token_limiter.nim b/waku/common/rate_limit/single_token_limiter.nim index da01f61bb..50fb2d64c 100644 --- a/waku/common/rate_limit/single_token_limiter.nim +++ b/waku/common/rate_limit/single_token_limiter.nim @@ -45,14 +45,14 @@ template checkUsageLimit*( bodyWithinLimit, bodyRejected: untyped, ) = if t.checkUsage(proto): - let requestStartTime = getTime().toUnixFloat() + let requestStartTime = Moment.now() waku_service_requests.inc(labelValues = [proto, "served"]) bodyWithinLimit - let requestDurationSec = getTime().toUnixFloat() - requestStartTime + let requestDuration = Moment.now() - requestStartTime waku_service_request_handling_duration_seconds.observe( - requestDurationSec, labelValues = [proto] + requestDuration.milliseconds.float / 1000, labelValues = [proto] ) else: waku_service_requests.inc(labelValues = [proto, "rejected"]) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 18986d5c0..a544bdc80 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -513,6 +513,10 @@ proc mountFilterClient*(node: WakuNode) {.async: (raises: []).} = ## rely on node provided cache. - This only applies for v2 filter client info "mounting filter client" + if not node.wakuFilterClient.isNil(): + trace "Filter client already mounted." + return + node.wakuFilterClient = WakuFilterClient.new(node.peerManager, node.rng) try: @@ -1021,8 +1025,9 @@ proc mountLegacyLightPush*( proc mountLegacyLightPushClient*(node: WakuNode) = info "mounting legacy light push client" - node.wakuLegacyLightpushClient = - WakuLegacyLightPushClient.new(node.peerManager, node.rng) + if node.wakuLegacyLightpushClient.isNil(): + node.wakuLegacyLightpushClient = + WakuLegacyLightPushClient.new(node.peerManager, node.rng) proc legacyLightpushPublish*( node: WakuNode, @@ -1133,7 +1138,8 @@ proc mountLightPush*( proc mountLightPushClient*(node: WakuNode) = info "mounting light push client" - node.wakuLightpushClient = WakuLightPushClient.new(node.peerManager, node.rng) + if node.wakuLightpushClient.isNil(): + node.wakuLightpushClient = WakuLightPushClient.new(node.peerManager, node.rng) proc lightpushPublishHandler( node: WakuNode, diff --git a/waku/waku_filter_v2/protocol_metrics.nim b/waku/waku_filter_v2/protocol_metrics.nim index b19f612f3..2d9f63c63 100644 --- a/waku/waku_filter_v2/protocol_metrics.nim +++ b/waku/waku_filter_v2/protocol_metrics.nim @@ -11,7 +11,11 @@ declarePublicGauge waku_filter_subscriptions, "number of subscribed filter clien declarePublicHistogram waku_filter_request_duration_seconds, "duration of filter subscribe requests", ["type"] declarePublicHistogram waku_filter_handle_message_duration_seconds, - "duration to push message to filter subscribers" + "duration to push message to filter subscribers", + buckets = [ + 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0, + 15.0, 20.0, 30.0, Inf, + ] # Error types (metric label values) const diff --git a/waku/waku_lightpush/protocol.nim b/waku/waku_lightpush/protocol.nim index e2b096bc9..57a95e107 100644 --- a/waku/waku_lightpush/protocol.nim +++ b/waku/waku_lightpush/protocol.nim @@ -75,12 +75,13 @@ proc handleRequest*( waku_lightpush_v3_messages.inc(labelValues = ["PushRequest"]) + let msg_hash = pubsubTopic.computeMessageHash(pushRequest.message).to0xHex() notice "handling lightpush request", my_peer_id = wl.peerManager.switch.peerInfo.peerId, peer_id = peerId, requestId = pushRequest.requestId, pubsubTopic = pushRequest.pubsubTopic, - msg_hash = pubsubTopic.computeMessageHash(pushRequest.message).to0xHex(), + msg_hash = msg_hash, receivedTime = getNowInNanosecondTime() let handleRes = await wl.pushHandler(peerId, pubsubTopic, pushRequest.message) diff --git a/waku/waku_lightpush_legacy/protocol.nim b/waku/waku_lightpush_legacy/protocol.nim index feb6a1320..5de25ead9 100644 --- a/waku/waku_lightpush_legacy/protocol.nim +++ b/waku/waku_lightpush_legacy/protocol.nim @@ -42,12 +42,14 @@ proc handleRequest*( pubSubTopic = request.get().pubSubTopic message = request.get().message + let msg_hash = pubsubTopic.computeMessageHash(message).to0xHex() waku_lightpush_messages.inc(labelValues = ["PushRequest"]) + notice "handling lightpush request", peer_id = peerId, requestId = requestId, pubsubTopic = pubsubTopic, - msg_hash = pubsubTopic.computeMessageHash(message).to0xHex(), + msg_hash = msg_hash, receivedTime = getNowInNanosecondTime() let handleRes = await wl.pushHandler(peerId, pubsubTopic, message)