chore: update lite-protocol-tester for handling shard argument. (#3371)

* chore: replace pubsub topic with shard configuration across the lite protocol tester
* chore: enhance protocol performance - response time - metrics
* fix filter-client double mounting possibility.
This commit is contained in:
NagyZoltanPeter 2025-04-16 17:04:52 +02:00 committed by GitHub
parent 7c59f7c257
commit 2786ef6079
23 changed files with 115 additions and 64 deletions

View File

@ -12,16 +12,16 @@ MIN_MESSAGE_SIZE=15Kb
MAX_MESSAGE_SIZE=145Kb
## for wakusim
#PUBSUB=/waku/2/rs/66/0
#SHARD=0
#CONTENT_TOPIC=/tester/2/light-pubsub-test/wakusim
#CLUSTER_ID=66
## for status.prod
PUBSUB=/waku/2/rs/16/32
#SHARDS=32
CONTENT_TOPIC=/tester/2/light-pubsub-test/fleet
CLUSTER_ID=16
## for TWN
#PUBSUB=/waku/2/rs/1/4
#SHARD=4
#CONTENT_TOPIC=/tester/2/light-pubsub-test/twn
#CLUSTER_ID=1

View File

@ -55,6 +55,8 @@ RUN chmod +x /usr/bin/liteprotocoltester
FROM base_lpt AS standalone_lpt
COPY --from=nim-build /app/apps/liteprotocoltester/run_tester_node.sh /usr/bin/
COPY --from=nim-build /app/apps/liteprotocoltester/run_tester_node_on_fleet.sh /usr/bin/
RUN chmod +x /usr/bin/run_tester_node.sh
ENTRYPOINT ["/usr/bin/run_tester_node.sh", "/usr/bin/liteprotocoltester"]

View File

@ -127,7 +127,7 @@ Run a SENDER role liteprotocoltester and a RECEIVER role one on different termin
| ---: | :--- | :--- |
| NUM_MESSAGES | Number of message to publish, 0 means infinite | 120 |
| MESSAGE_INTERVAL_MILLIS | Frequency of messages in milliseconds | 1000 |
| PUBSUB | Used pubsub_topic for testing | /waku/2/rs/66/0 |
| SHARD | Used shard for testing | 0 |
| CONTENT_TOPIC | content_topic for testing | /tester/1/light-pubsub-example/proto |
| CLUSTER_ID | cluster_id of the network | 16 |
| START_PUBLISHING_AFTER_SECS | Delay in seconds before starting to publish to let service node connected | 5 |
@ -272,7 +272,7 @@ export NUM_MESSAGES=200
export MESSAGE_INTERVAL_MILLIS=1000
export MIN_MESSAGE_SIZE=15Kb
export MAX_MESSAGE_SIZE=145Kb
export PUBSUB=/waku/2/rs/16/32
export SHARD=32
export CONTENT_TOPIC=/tester/2/light-pubsub-test/fleet
export CLUSTER_ID=16
@ -307,7 +307,7 @@ export NUM_MESSAGES=300
export MESSAGE_INTERVAL_MILLIS=7000
export MIN_MESSAGE_SIZE=15Kb
export MAX_MESSAGE_SIZE=145Kb
export PUBSUB=/waku/2/rs/1/4
export SHARD=4
export CONTENT_TOPIC=/tester/2/light-pubsub-test/twn
export CLUSTER_ID=1

View File

@ -16,7 +16,7 @@ x-rln-environment: &rln_env
x-test-running-conditions: &test_running_conditions
NUM_MESSAGES: ${NUM_MESSAGES:-120}
MESSAGE_INTERVAL_MILLIS: "${MESSAGE_INTERVAL_MILLIS:-1000}"
PUBSUB: ${PUBSUB:-/waku/2/rs/66/0}
SHARD: ${SHARD:-0}
CONTENT_TOPIC: ${CONTENT_TOPIC:-/tester/2/light-pubsub-test/wakusim}
CLUSTER_ID: ${CLUSTER_ID:-66}
MIN_MESSAGE_SIZE: ${MIN_MESSAGE_SIZE:-1Kb}

View File

@ -16,7 +16,7 @@ x-rln-environment: &rln_env
x-test-running-conditions: &test_running_conditions
NUM_MESSAGES: ${NUM_MESSAGES:-120}
MESSAGE_INTERVAL_MILLIS: "${MESSAGE_INTERVAL_MILLIS:-1000}"
PUBSUB: ${PUBSUB:-/waku/2/rs/66/0}
SHARD: ${SHARD:-0}
CONTENT_TOPIC: ${CONTENT_TOPIC:-/tester/2/light-pubsub-test/wakusim}
CLUSTER_ID: ${CLUSTER_ID:-66}
MIN_MESSAGE_SIZE: ${MIN_MESSAGE_SIZE:-1Kb}

View File

@ -130,7 +130,9 @@ proc setupAndSubscribe*(
var stats: PerPeerStatistics
actualFilterPeer = servicePeer
let pushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.async.} =
let pushHandler = proc(
pubsubTopic: PubsubTopic, message: WakuMessage
): Future[void] {.async, closure.} =
let payloadStr = string.fromBytes(message.payload)
let testerMessage = js.Json.decode(payloadStr, ProtocolTesterMessage)
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex
@ -163,7 +165,7 @@ proc setupAndSubscribe*(
if conf.numMessages > 0 and
waitFor stats.checkIfAllMessagesReceived(maxWaitForLastMessage):
waitFor unsubscribe(wakuNode, conf.pubsubTopics[0], conf.contentTopics[0])
waitFor unsubscribe(wakuNode, conf.getPubsubTopic(), conf.contentTopics[0])
info "All messages received. Exiting."
## for gracefull shutdown through signal hooks
@ -176,5 +178,5 @@ proc setupAndSubscribe*(
# Start maintaining subscription
asyncSpawn maintainSubscription(
wakuNode, conf.pubsubTopics[0], conf.contentTopics[0], conf.fixedServicePeer
wakuNode, conf.getPubsubTopic(), conf.contentTopics[0], conf.fixedServicePeer
)

View File

@ -4,7 +4,7 @@ NUM_MESSAGES=300
MESSAGE_INTERVAL_MILLIS=1000
MIN_MESSAGE_SIZE=15Kb
MAX_MESSAGE_SIZE=145Kb
PUBSUB=/waku/2/rs/16/32
SHARD=32
CONTENT_TOPIC=/tester/2/light-pubsub-test-at-infra/status-prod
CLUSTER_ID=16
LIGHTPUSH_BOOTSTRAP=enr:-QEKuED9AJm2HGgrRpVaJY2nj68ao_QiPeUT43sK-aRM7sMJ6R4G11OSDOwnvVacgN1sTw-K7soC5dzHDFZgZkHU0u-XAYJpZIJ2NIJpcISnYxMvim11bHRpYWRkcnO4WgAqNiVib290LTAxLmRvLWFtczMuc3RhdHVzLnByb2Quc3RhdHVzLmltBnZfACw2JWJvb3QtMDEuZG8tYW1zMy5zdGF0dXMucHJvZC5zdGF0dXMuaW0GAbveA4Jyc40AEAUAAQAgAEAAgAEAiXNlY3AyNTZrMaEC3rRtFQSgc24uWewzXaxTY8hDAHB8sgnxr9k8Rjb5GeSDdGNwgnZfg3VkcIIjKIV3YWt1Mg0

View File

@ -145,13 +145,20 @@ proc publishMessages(
lightpushContentTopic,
renderMsgSize,
)
let publishStartTime = Moment.now()
let wlpRes = await wakuNode.legacyLightpushPublish(
some(lightpushPubsubTopic), message, actualServicePeer
)
let publishDuration = Moment.now() - publishStartTime
let msgHash = computeMessageHash(lightpushPubsubTopic, message).to0xHex
if wlpRes.isOk():
lpt_publish_duration_seconds.observe(publishDuration.milliseconds.float / 1000)
sentMessages[messagesSent] = (hash: msgHash, relayed: true)
notice "published message using lightpush",
index = messagesSent + 1,
@ -251,7 +258,7 @@ proc setupAndPublish*(
asyncSpawn publishMessages(
wakuNode,
servicePeer,
conf.pubsubTopics[0],
conf.getPubsubTopic(),
conf.contentTopics[0],
conf.numMessages,
(min: parsedMinMsgSize, max: parsedMaxMsgSize),

View File

@ -99,7 +99,7 @@ when isMainModule:
wakuConf.dnsAddrs = true
wakuConf.dnsAddrsNameServers = @[parseIpAddress("8.8.8.8"), parseIpAddress("1.1.1.1")]
wakuConf.pubsubTopics = conf.pubsubTopics
wakuConf.shards = @[conf.shard]
wakuConf.contentTopics = conf.contentTopics
wakuConf.clusterId = conf.clusterId
## TODO: Depending on the tester needs we might extend here with shards, clusterId, etc...
@ -118,6 +118,7 @@ when isMainModule:
wakuConf.store = false
wakuConf.rest = false
wakuConf.relayServiceRatio = "40:60"
# NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it
# It will always be called from main thread anyway.
@ -202,10 +203,8 @@ when isMainModule:
var codec = WakuLightPushCodec
# mounting relevant client, for PX filter client must be mounted ahead
if conf.testFunc == TesterFunctionality.SENDER:
wakuApp.node.mountLegacyLightPushClient()
codec = WakuLightPushCodec
else:
waitFor wakuApp.node.mountFilterClient()
codec = WakuFilterSubscribeCodec
var lookForServiceNode = false

View File

@ -47,3 +47,10 @@ declarePublicGauge lpt_px_peers,
declarePublicGauge lpt_dialed_peers, "Number of peers successfully dialed", ["agent"]
declarePublicGauge lpt_dial_failures, "Number of dial failures by cause", ["agent"]
declarePublicHistogram lpt_publish_duration_seconds,
"duration to lightpush messages",
buckets = [
0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0,
15.0, 20.0, 30.0, Inf,
]

View File

@ -5,10 +5,10 @@ IP=$(ip a | grep "inet " | grep -Fv 127.0.0.1 | sed 's/.*inet \([^/]*\).*/\1/')
echo "Service node IP: ${IP}"
if [ -n "${PUBSUB}" ]; then
PUBSUB=--pubsub-topic="${PUBSUB}"
if [ -n "${SHARD}" ]; then
SHARD=--shard="${SHARD}"
else
PUBSUB=--pubsub-topic="/waku/2/rs/66/0"
SHARD=--shard="0"
fi
if [ -n "${CLUSTER_ID}" ]; then
@ -59,5 +59,5 @@ exec /usr/bin/wakunode\
--metrics-server-port=8003\
--metrics-server-address=0.0.0.0\
--nat=extip:${IP}\
${PUBSUB}\
${SHARD}\
${CLUSTER_ID}

View File

@ -93,10 +93,10 @@ else
FULL_NODE=--bootstrap-node="${SERIVCE_NODE_ADDR}"
fi
if [ -n "${PUBSUB}" ]; then
PUBSUB=--pubsub-topic="${PUBSUB}"
if [ -n "${SHARD}" ]; then
SHARD=--shard="${SHARD}"
else
PUBSUB=--pubsub-topic="/waku/2/rs/66/0"
SHARD=--shard="0"
fi
if [ -n "${CONTENT_TOPIC}" ]; then
@ -128,19 +128,25 @@ if [ -n "${MESSAGE_INTERVAL_MILLIS}" ]; then
MESSAGE_INTERVAL_MILLIS=--message-interval="${MESSAGE_INTERVAL_MILLIS}"
fi
if [ -n "${LOG_LEVEL}" ]; then
LOG_LEVEL=--log-level=${LOG_LEVEL}
else
LOG_LEVEL=--log-level=INFO
fi
echo "Running binary: ${BINARY_PATH}"
echo "Tester node: ${FUNCTION}"
echo "Using service node: ${SERIVCE_NODE_ADDR}"
echo "My external IP: ${MY_EXT_IP}"
exec "${BINARY_PATH}"\
--log-level=INFO\
--nat=extip:${MY_EXT_IP}\
--test-peers\
${LOG_LEVEL}\
${FULL_NODE}\
${MESSAGE_INTERVAL_MILLIS}\
${NUM_MESSAGES}\
${PUBSUB}\
${SHARD}\
${CONTENT_TOPIC}\
${CLUSTER_ID}\
${FUNCTION}\

View File

@ -48,10 +48,10 @@ fi
MY_EXT_IP=$(wget -qO- --no-check-certificate https://api4.ipify.org)
if [ -n "${PUBSUB}" ]; then
PUBSUB=--pubsub-topic="${PUBSUB}"
if [ -n "${SHARD}" ]; then
SHARD=--shard="${SHARD}"
else
PUBSUB=--pubsub-topic="/waku/2/rs/66/0"
SHARD=--shard="0"
fi
if [ -n "${CONTENT_TOPIC}" ]; then
@ -83,19 +83,25 @@ if [ -n "${MESSAGE_INTERVAL_MILLIS}" ]; then
MESSAGE_INTERVAL_MILLIS=--message-interval="${MESSAGE_INTERVAL_MILLIS}"
fi
if [ -n "${LOG_LEVEL}" ]; then
LOG_LEVEL=--log-level=${LOG_LEVEL}
else
LOG_LEVEL=--log-level=INFO
fi
echo "Running binary: ${BINARY_PATH}"
echo "Node function is: ${FUNCTION}"
echo "Using service/bootstrap node as: ${NODE_ARG}"
echo "My external IP: ${MY_EXT_IP}"
exec "${BINARY_PATH}"\
--log-level=INFO\
--nat=extip:${MY_EXT_IP}\
--test-peers\
${LOG_LEVEL}\
${NODE_ARG}\
${MESSAGE_INTERVAL_MILLIS}\
${NUM_MESSAGES}\
${PUBSUB}\
${SHARD}\
${CONTENT_TOPIC}\
${CLUSTER_ID}\
${FUNCTION}\

View File

@ -48,10 +48,10 @@ fi
MY_EXT_IP=$(wget -qO- --no-check-certificate https://api4.ipify.org)
if [ -n "${PUBSUB}" ]; then
PUBSUB=--pubsub-topic="${PUBSUB}"
if [ -n "${SHARD}" ]; then
SHARD=--shard=${SHARD}
else
PUBSUB=--pubsub-topic="/waku/2/rs/66/0"
SHARD=--shard=0
fi
if [ -n "${CONTENT_TOPIC}" ]; then
@ -79,8 +79,14 @@ if [ -n "${NUM_MESSAGES}" ]; then
NUM_MESSAGES=--num-messages="${NUM_MESSAGES}"
fi
if [ -n "${DELAY_MESSAGES}" ]; then
DELAY_MESSAGES=--delay-messages="${DELAY_MESSAGES}"
if [ -n "${MESSAGE_INTERVAL_MILLIS}" ]; then
MESSAGE_INTERVAL_MILLIS=--message-interval="${MESSAGE_INTERVAL_MILLIS}"
fi
if [ -n "${LOG_LEVEL}" ]; then
LOG_LEVEL=--log-level=${LOG_LEVEL}
else
LOG_LEVEL=--log-level=INFO
fi
echo "Running binary: ${BINARY_PATH}"
@ -89,12 +95,12 @@ echo "Using service/bootstrap node as: ${NODE_ARG}"
echo "My external IP: ${MY_EXT_IP}"
exec "${BINARY_PATH}"\
--log-level=INFO\
--nat=extip:${MY_EXT_IP}\
${LOG_LEVEL}\
${NODE_ARG}\
${DELAY_MESSAGES}\
${MESSAGE_INTERVAL_MILLIS}\
${NUM_MESSAGES}\
${PUBSUB}\
${SHARD}\
${CONTENT_TOPIC}\
${CLUSTER_ID}\
${FUNCTION}\

View File

@ -189,14 +189,14 @@ proc pxLookupServiceNode*(
if conf.testPeers:
let peersOpt =
await tryCallAllPxPeers(node.peerManager, codec, conf.pubsubTopics[0])
await tryCallAllPxPeers(node.peerManager, codec, conf.getPubsubTopic())
if peersOpt.isSome():
info "Found service peers for codec",
codec = codec, peer_count = peersOpt.get().len()
return ok(peersOpt.get().len > 0)
else:
let peerOpt =
await selectRandomCapablePeer(node.peerManager, codec, conf.pubsubTopics[0])
await selectRandomCapablePeer(node.peerManager, codec, conf.getPubsubTopic())
if peerOpt.isSome():
info "Found service peer for codec", codec = codec, peer = peerOpt.get()
return ok(true)

View File

@ -18,6 +18,7 @@ import
common/logging,
factory/external_config,
waku_core,
waku_core/topics/pubsub_topic,
]
export confTomlDefs, confTomlNet, confEnvvarDefs, confEnvvarNet
@ -95,18 +96,9 @@ type LiteProtocolTesterConf* = object
name: "message-interval"
.}: uint32
pubsubTopics* {.
desc: "Default pubsub topic to subscribe to. Argument may be repeated.",
defaultValue: @[LitePubsubTopic],
name: "pubsub-topic"
.}: seq[PubsubTopic]
shard* {.desc: "Shards index to subscribe to. ", defaultValue: 0, name: "shard".}:
uint16
## TODO: extend lite protocol tester configuration based on testing needs
# shards* {.
# desc: "Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.",
# defaultValue: @[],
# name: "shard"
# .}: seq[uint16]
contentTopics* {.
desc: "Default content topic to subscribe to. Argument may be repeated.",
defaultValue: @[LiteContentTopic],
@ -195,4 +187,7 @@ proc load*(T: type LiteProtocolTesterConf, version = ""): ConfResult[T] =
except CatchableError:
err(getCurrentExceptionMsg())
proc getPubsubTopic*(conf: LiteProtocolTesterConf): PubsubTopic =
return $RelayShard(clusterId: conf.clusterId, shardId: conf.shard)
{.pop.}

View File

@ -78,14 +78,14 @@ template checkUsageLimit*(
bodyWithinLimit, bodyRejected: untyped,
) =
if t.checkUsage(proto, conn):
let requestStartTime = getTime().toUnixFloat()
let requestStartTime = Moment.now()
waku_service_requests.inc(labelValues = [proto, "served"])
bodyWithinLimit
let requestDurationSec = getTime().toUnixFloat() - requestStartTime
let requestDuration = Moment.now() - requestStartTime
waku_service_request_handling_duration_seconds.observe(
requestDurationSec, labelValues = [proto]
requestDuration.milliseconds.float / 1000, labelValues = [proto]
)
else:
waku_service_requests.inc(labelValues = [proto, "rejected"])

View File

@ -1,8 +1,11 @@
{.push raises: [].}
import std/options
import chronos/timer
import metrics, setting
export metrics
declarePublicGauge waku_service_requests_limit,
"Applied rate limit of non-relay service", ["service"]
@ -19,4 +22,9 @@ proc setServiceLimitMetric*(service: string, limit: Option[RateLimitSetting]) =
)
declarePublicHistogram waku_service_request_handling_duration_seconds,
"duration of non-relay service handling", ["service"]
"duration of non-relay service handling",
labels = ["service"],
buckets = [
0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0,
15.0, 20.0, 30.0, Inf,
]

View File

@ -45,14 +45,14 @@ template checkUsageLimit*(
bodyWithinLimit, bodyRejected: untyped,
) =
if t.checkUsage(proto):
let requestStartTime = getTime().toUnixFloat()
let requestStartTime = Moment.now()
waku_service_requests.inc(labelValues = [proto, "served"])
bodyWithinLimit
let requestDurationSec = getTime().toUnixFloat() - requestStartTime
let requestDuration = Moment.now() - requestStartTime
waku_service_request_handling_duration_seconds.observe(
requestDurationSec, labelValues = [proto]
requestDuration.milliseconds.float / 1000, labelValues = [proto]
)
else:
waku_service_requests.inc(labelValues = [proto, "rejected"])

View File

@ -513,6 +513,10 @@ proc mountFilterClient*(node: WakuNode) {.async: (raises: []).} =
## rely on node provided cache. - This only applies for v2 filter client
info "mounting filter client"
if not node.wakuFilterClient.isNil():
trace "Filter client already mounted."
return
node.wakuFilterClient = WakuFilterClient.new(node.peerManager, node.rng)
try:
@ -1021,8 +1025,9 @@ proc mountLegacyLightPush*(
proc mountLegacyLightPushClient*(node: WakuNode) =
info "mounting legacy light push client"
node.wakuLegacyLightpushClient =
WakuLegacyLightPushClient.new(node.peerManager, node.rng)
if node.wakuLegacyLightpushClient.isNil():
node.wakuLegacyLightpushClient =
WakuLegacyLightPushClient.new(node.peerManager, node.rng)
proc legacyLightpushPublish*(
node: WakuNode,
@ -1133,7 +1138,8 @@ proc mountLightPush*(
proc mountLightPushClient*(node: WakuNode) =
info "mounting light push client"
node.wakuLightpushClient = WakuLightPushClient.new(node.peerManager, node.rng)
if node.wakuLightpushClient.isNil():
node.wakuLightpushClient = WakuLightPushClient.new(node.peerManager, node.rng)
proc lightpushPublishHandler(
node: WakuNode,

View File

@ -11,7 +11,11 @@ declarePublicGauge waku_filter_subscriptions, "number of subscribed filter clien
declarePublicHistogram waku_filter_request_duration_seconds,
"duration of filter subscribe requests", ["type"]
declarePublicHistogram waku_filter_handle_message_duration_seconds,
"duration to push message to filter subscribers"
"duration to push message to filter subscribers",
buckets = [
0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0,
15.0, 20.0, 30.0, Inf,
]
# Error types (metric label values)
const

View File

@ -75,12 +75,13 @@ proc handleRequest*(
waku_lightpush_v3_messages.inc(labelValues = ["PushRequest"])
let msg_hash = pubsubTopic.computeMessageHash(pushRequest.message).to0xHex()
notice "handling lightpush request",
my_peer_id = wl.peerManager.switch.peerInfo.peerId,
peer_id = peerId,
requestId = pushRequest.requestId,
pubsubTopic = pushRequest.pubsubTopic,
msg_hash = pubsubTopic.computeMessageHash(pushRequest.message).to0xHex(),
msg_hash = msg_hash,
receivedTime = getNowInNanosecondTime()
let handleRes = await wl.pushHandler(peerId, pubsubTopic, pushRequest.message)

View File

@ -42,12 +42,14 @@ proc handleRequest*(
pubSubTopic = request.get().pubSubTopic
message = request.get().message
let msg_hash = pubsubTopic.computeMessageHash(message).to0xHex()
waku_lightpush_messages.inc(labelValues = ["PushRequest"])
notice "handling lightpush request",
peer_id = peerId,
requestId = requestId,
pubsubTopic = pubsubTopic,
msg_hash = pubsubTopic.computeMessageHash(message).to0xHex(),
msg_hash = msg_hash,
receivedTime = getNowInNanosecondTime()
let handleRes = await wl.pushHandler(peerId, pubsubTopic, message)