Enhancement for integration with waku-simulator

This commit is contained in:
NagyZoltanPeter 2024-06-14 15:44:33 +02:00
parent 5989de88a4
commit 07d7859a69
No known key found for this signature in database
GPG Key ID: 16EADB9673B65368
10 changed files with 486 additions and 48 deletions

View File

@ -0,0 +1,12 @@
NWAKU_IMAGE=quay.io/wakuorg/nwaku-pr:2800-rln-v2
START_PUBLISHING_AFTER=30 # seconds
NUM_MESSAGES=250
DELAY_MESSAGES=200 # gap between messages
MIN_MESSAGE_SIZE=1Kb
MAX_MESSAGE_SIZE=120Kb
# PUBSUB=/waku/2/rs/66/0
# CONTENT_TOPIC=/tester/1/light-pubsub-example/proto

View File

@ -0,0 +1,96 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[options, strutils, os, sequtils, net, strformat],
chronicles,
chronos,
metrics,
libbacktrace,
system/ansi_c,
libp2p/crypto/crypto,
confutils,
libp2p/wire
import
../../waku/common/logging,
../../waku/factory/waku,
../../waku/factory/external_config,
../../waku/node/health_monitor,
../../waku/node/waku_metrics,
../../waku/waku_api/rest/builder as rest_server_builder,
../../waku/node/peer_manager,
../../waku/waku_lightpush/common,
../../waku/waku_relay,
../../waku/waku_filter_v2,
../../waku/waku_api/rest/client,
../../waku/waku_api/rest/admin/client,
./tester_config,
./lightpush_publisher,
./filter_subscriber
logScope:
topics = "diagnose connections"
proc logSelfPeersLoop(pm: PeerManager, interval: Duration) {.async.} =
trace "Starting logSelfPeersLoop diagnosys loop"
while true:
let selfLighpushPeers = pm.peerStore.getPeersByProtocol(WakuLightPushCodec)
let selfRelayPeers = pm.peerStore.getPeersByProtocol(WakuRelayCodec)
let selfFilterPeers = pm.peerStore.getPeersByProtocol(WakuFilterSubscribeCodec)
let printable = catch:
"""*------------------------------------------------------------------------------------------*
| Self ({pm.switch.peerInfo}) peers:
*------------------------------------------------------------------------------------------*
| Lightpush peers({selfLighpushPeers.len()}): ${selfLighpushPeers}
*------------------------------------------------------------------------------------------*
| Filter peers({selfFilterPeers.len()}): ${selfFilterPeers}
*------------------------------------------------------------------------------------------*
| Relay peers({selfRelayPeers.len()}): ${selfRelayPeers}
*------------------------------------------------------------------------------------------*""".fmt()
if printable.isErr():
echo "Error while printing statistics: " & printable.error().msg
else:
echo printable.get()
await sleepAsync(interval)
proc logServiceRelayPeers(
pm: PeerManager, codec: string, interval: Duration
) {.async.} =
trace "Starting service node connectivity diagnosys loop"
while true:
echo "*------------------------------------------------------------------------------------------*"
echo "| Service peer connectivity:"
let selfLighpushPeers = pm.selectPeer(codec)
if selfLighpushPeers.isSome():
let ma = selfLighpushPeers.get().addrs[0]
var serviceIp = initTAddress(ma).valueOr:
echo "Error while parsing multiaddress: " & $error
continue
serviceIp.port = Port(8645)
let restClient = newRestHttpClient(initTAddress($serviceIp))
let getPeersRes = await restClient.getPeers()
if getPeersRes.status == 200:
let nrOfPeers = getPeersRes.data.len()
echo "Service node (@" & $ma & ") peers: " & $getPeersRes.data
else:
echo "Error while fetching service node (@" & $ma & ") peers: " &
$getPeersRes.data
else:
echo "No service node peers found"
echo "*------------------------------------------------------------------------------------------*"
await sleepAsync(interval)
proc startPeriodicPeerDiagnostic*(pm: PeerManager, codec: string) {.async.} =
asyncSpawn logSelfPeersLoop(pm, chronos.seconds(20))
asyncSpawn logServiceRelayPeers(pm, codec, chronos.seconds(20))

View File

@ -0,0 +1,212 @@
version: "3.7"
x-logging: &logging
logging:
driver: json-file
options:
max-size: 1000m
# Environment variable definitions
x-eth-client-address: &eth_client_address ${ETH_CLIENT_ADDRESS:-} # Add your ETH_CLIENT_ADDRESS after the "-"
x-rln-environment: &rln_env
RLN_RELAY_CONTRACT_ADDRESS: ${RLN_RELAY_CONTRACT_ADDRESS:-0xF471d71E9b1455bBF4b85d475afb9BB0954A29c4}
RLN_RELAY_CRED_PATH: ${RLN_RELAY_CRED_PATH:-} # Optional: Add your RLN_RELAY_CRED_PATH after the "-"
RLN_RELAY_CRED_PASSWORD: ${RLN_RELAY_CRED_PASSWORD:-} # Optional: Add your RLN_RELAY_CRED_PASSWORD after the "-"
x-test-running-conditions: &test_running_conditions
NUM_MESSAGES: ${NUM_MESSAGES:-120}
DELAY_MESSAGES: "${DELAY_MESSAGES:-1000}"
PUBSUB: ${PUBSUB:-}
CONTENT_TOPIC: ${CONTENT_TOPIC:-}
MIN_MESSAGE_SIZE: ${MIN_MESSAGE_SIZE:-1Kb}
MAX_MESSAGE_SIZE: ${MAX_MESSAGE_SIZE:-150Kb}
START_PUBLISHING_AFTER: ${START_PUBLISHING_AFTER:-5} # seconds
# Services definitions
services:
lightpush-service:
image: ${NWAKU_IMAGE:-harbor.status.im/wakuorg/nwaku:latest}
# ports:
# - 30304:30304/tcp
# - 30304:30304/udp
# - 9005:9005/udp
# - 127.0.0.1:8003:8003
# - 80:80 #Let's Encrypt
# - 8000:8000/tcp #WSS
# - 127.0.0.1:8645:8645
<<:
- *logging
environment:
DOMAIN: ${DOMAIN}
RLN_RELAY_CRED_PASSWORD: "${RLN_RELAY_CRED_PASSWORD}"
ETH_CLIENT_ADDRESS: *eth_client_address
EXTRA_ARGS: ${EXTRA_ARGS}
<<:
- *rln_env
volumes:
- ./run_service_node.sh:/opt/run_service_node.sh:Z
- ${CERTS_DIR:-./certs}:/etc/letsencrypt/:Z
- ./rln_tree:/etc/rln_tree/:Z
- ./keystore:/keystore:Z
entrypoint: sh
command:
- /opt/run_service_node.sh
- LIGHTPUSH
networks:
- waku-simulator_simulation
publishernode:
image: waku.liteprotocoltester:latest
build:
context: ../..
dockerfile: ./apps/liteprotocoltester/Dockerfile.liteprotocoltester.copy
deploy:
replicas: ${NUM_PUBLISHER_NODES:-3}
# ports:
# - 30304:30304/tcp
# - 30304:30304/udp
# - 9005:9005/udp
# - 127.0.0.1:8003:8003
# - 80:80 #Let's Encrypt
# - 8000:8000/tcp #WSS
# - 127.0.0.1:8646:8646
<<:
- *logging
environment:
DOMAIN: ${DOMAIN}
RLN_RELAY_CRED_PASSWORD: "${RLN_RELAY_CRED_PASSWORD}"
ETH_CLIENT_ADDRESS: *eth_client_address
EXTRA_ARGS: ${EXTRA_ARGS}
<<:
- *rln_env
- *test_running_conditions
volumes:
- ./run_tester_node.sh:/opt/run_tester_node.sh:Z
- ${CERTS_DIR:-./certs}:/etc/letsencrypt/:Z
- ./rln_tree:/etc/rln_tree/:Z
- ./keystore:/keystore:Z
entrypoint: sh
command:
- /opt/run_tester_node.sh
- SENDER
depends_on:
- lightpush-service
configs:
- source: cfg_tester_node.toml
target: config.toml
networks:
- waku-simulator_simulation
filter-service:
image: ${NWAKU_IMAGE:-harbor.status.im/wakuorg/nwaku:latest}
# ports:
# - 30304:30305/tcp
# - 30304:30305/udp
# - 9005:9005/udp
# - 127.0.0.1:8003:8003
# - 80:80 #Let's Encrypt
# - 8000:8000/tcp #WSS
# - 127.0.0.1:8645:8645
<<:
- *logging
environment:
DOMAIN: ${DOMAIN}
RLN_RELAY_CRED_PASSWORD: "${RLN_RELAY_CRED_PASSWORD}"
ETH_CLIENT_ADDRESS: *eth_client_address
EXTRA_ARGS: ${EXTRA_ARGS}
<<:
- *rln_env
volumes:
- ./run_service_node.sh:/opt/run_service_node.sh:Z
- ${CERTS_DIR:-./certs}:/etc/letsencrypt/:Z
- ./rln_tree:/etc/rln_tree/:Z
- ./keystore:/keystore:Z
entrypoint: sh
command:
- /opt/run_service_node.sh
- FILTER
networks:
- waku-simulator_simulation
receivernode:
image: waku.liteprotocoltester:latest
build:
context: ../..
dockerfile: ./apps/liteprotocoltester/Dockerfile.liteprotocoltester.copy
deploy:
replicas: ${NUM_RECEIVER_NODES:-1}
# ports:
# - 30304:30304/tcp
# - 30304:30304/udp
# - 9005:9005/udp
# - 127.0.0.1:8003:8003
# - 80:80 #Let's Encrypt
# - 8000:8000/tcp #WSS
# - 127.0.0.1:8647:8647
<<:
- *logging
environment:
DOMAIN: ${DOMAIN}
RLN_RELAY_CRED_PASSWORD: "${RLN_RELAY_CRED_PASSWORD}"
ETH_CLIENT_ADDRESS: *eth_client_address
EXTRA_ARGS: ${EXTRA_ARGS}
<<:
- *rln_env
- *test_running_conditions
volumes:
- ./run_tester_node.sh:/opt/run_tester_node.sh:Z
- ${CERTS_DIR:-./certs}:/etc/letsencrypt/:Z
- ./rln_tree:/etc/rln_tree/:Z
- ./keystore:/keystore:Z
entrypoint: sh
command:
- /opt/run_tester_node.sh
- RECEIVER
depends_on:
- filter-service
- publishernode
configs:
- source: cfg_tester_node.toml
target: config.toml
networks:
- waku-simulator_simulation
## We have prometheus and grafana defined in waku-simulator already
# prometheus:
# image: docker.io/prom/prometheus:latest
# volumes:
# - ./monitoring/prometheus-config.yml:/etc/prometheus/prometheus.yml:Z
# command:
# - --config.file=/etc/prometheus/prometheus.yml
# ports:
# - 127.0.0.1:9090:9090
# depends_on:
# - servicenode
# grafana:
# image: docker.io/grafana/grafana:latest
# env_file:
# - ./monitoring/configuration/grafana-plugins.env
# volumes:
# - ./monitoring/configuration/grafana.ini:/etc/grafana/grafana.ini:Z
# - ./monitoring/configuration/dashboards.yaml:/etc/grafana/provisioning/dashboards/dashboards.yaml:Z
# - ./monitoring/configuration/datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml:Z
# - ./monitoring/configuration/dashboards:/var/lib/grafana/dashboards/:Z
# - ./monitoring/configuration/customizations/custom-logo.svg:/usr/share/grafana/public/img/grafana_icon.svg:Z
# - ./monitoring/configuration/customizations/custom-logo.svg:/usr/share/grafana/public/img/grafana_typelogo.svg:Z
# - ./monitoring/configuration/customizations/custom-logo.png:/usr/share/grafana/public/img/fav32.png:Z
# ports:
# - 0.0.0.0:3000:3000
# depends_on:
# - prometheus
configs:
cfg_tester_node.toml:
content: |
max-connections = 100
networks:
waku-simulator_simulation:
external: true

View File

@ -85,12 +85,17 @@ proc setupAndSubscribe*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) =
stats.addMessage(testerMessage.sender, testerMessage) stats.addMessage(testerMessage.sender, testerMessage)
trace "message received", let msgHash = computeMessageHash(pubsubTopic, message).to0xHex
notice "message received",
index = testerMessage.index, index = testerMessage.index,
count = testerMessage.count, count = testerMessage.count,
startedAt = $testerMessage.startedAt, startedAt = $testerMessage.startedAt,
sinceStart = $testerMessage.sinceStart, sinceStart = $testerMessage.sinceStart,
sincePrev = $testerMessage.sincePrev sincePrev = $testerMessage.sincePrev,
size = $testerMessage.size,
pubsubTopic = pubsubTopic,
hash = msgHash
wakuNode.wakuFilterClient.registerPushHandler(pushHandler) wakuNode.wakuFilterClient.registerPushHandler(pushHandler)

View File

@ -1,5 +1,5 @@
import import
std/strformat, std/[strformat, sysrand, random, sequtils],
system/ansi_c, system/ansi_c,
chronicles, chronicles,
chronos, chronos,
@ -12,16 +12,27 @@ import
../../../waku/node/peer_manager, ../../../waku/node/peer_manager,
../../../waku/waku_core, ../../../waku/waku_core,
../../../waku/waku_lightpush/client, ../../../waku/waku_lightpush/client,
../../../waku/common/utils/parse_size_units,
./tester_config, ./tester_config,
./tester_message ./tester_message
randomize()
type SizeRange* = tuple[min: uint64, max: uint64]
var RANDOM_PALYLOAD {.threadvar.}: seq[byte]
RANDOM_PALYLOAD = urandom(1024 * 1024)
# 1MiB of random payload to be used to extend message
proc prepareMessage( proc prepareMessage(
sender: string, sender: string,
messageIndex, numMessages: uint32, messageIndex, numMessages: uint32,
startedAt: TimeStamp, startedAt: TimeStamp,
prevMessageAt: var Timestamp, prevMessageAt: var Timestamp,
contentTopic: ContentTopic, contentTopic: ContentTopic,
): WakuMessage = size: SizeRange,
): (WakuMessage, uint64) =
var renderSize = rand(size.min .. size.max)
let current = getNowInNanosecondTime() let current = getNowInNanosecondTime()
let payload = ProtocolTesterMessage( let payload = ProtocolTesterMessage(
sender: sender, sender: sender,
@ -30,49 +41,72 @@ proc prepareMessage(
startedAt: startedAt, startedAt: startedAt,
sinceStart: current - startedAt, sinceStart: current - startedAt,
sincePrev: current - prevMessageAt, sincePrev: current - prevMessageAt,
size: renderSize,
) )
prevMessageAt = current prevMessageAt = current
let text = js.Json.encode(payload) let text = js.Json.encode(payload)
let contentPayload = toBytes(text & " \0")
if renderSize < len(contentPayload).uint64:
renderSize = len(contentPayload).uint64
let finalPayload = concat(
contentPayload, RANDOM_PALYLOAD[0 .. renderSize - len(contentPayload).uint64]
)
let message = WakuMessage( let message = WakuMessage(
payload: toBytes(text), # content of the message payload: finalPayload, # content of the message
contentTopic: contentTopic, # content topic to publish to contentTopic: contentTopic, # content topic to publish to
ephemeral: true, # tell store nodes to not store it ephemeral: true, # tell store nodes to not store it
timestamp: current, # current timestamp timestamp: current, # current timestamp
) )
return message return (message, renderSize)
proc publishMessages( proc publishMessages(
wakuNode: WakuNode, wakuNode: WakuNode,
lightpushPubsubTopic: PubsubTopic, lightpushPubsubTopic: PubsubTopic,
lightpushContentTopic: ContentTopic, lightpushContentTopic: ContentTopic,
numMessages: uint32, numMessages: uint32,
messageSizeRange: SizeRange,
delayMessages: Duration, delayMessages: Duration,
) {.async.} = ) {.async.} =
let startedAt = getNowInNanosecondTime() let startedAt = getNowInNanosecondTime()
var prevMessageAt = startedAt var prevMessageAt = startedAt
var failedToSendCount: uint32 = 0 var failedToSendCount: uint32 = 0
var renderMsgSize = messageSizeRange
# sets some default of min max message size to avoid conflict with meaningful payload size
renderMsgSize.min = max(1024.uint64, renderMsgSize.min) # do not use less than 1KB
renderMsgSize.max = max(2048.uint64, renderMsgSize.max) # minimum of max is 2KB
renderMsgSize.min = min(renderMsgSize.min, renderMsgSize.max)
renderMsgSize.max = max(renderMsgSize.min, renderMsgSize.max)
let selfPeerId = $wakuNode.switch.peerInfo.peerId let selfPeerId = $wakuNode.switch.peerInfo.peerId
var messagesSent: uint32 = 1 var messagesSent: uint32 = 1
while numMessages >= messagesSent: while numMessages >= messagesSent:
let message = prepareMessage( let (message, msgSize) = prepareMessage(
selfPeerId, messagesSent, numMessages, startedAt, prevMessageAt, selfPeerId, messagesSent, numMessages, startedAt, prevMessageAt,
lightpushContentTopic, lightpushContentTopic, renderMsgSize,
) )
let wlpRes = await wakuNode.lightpushPublish(some(lightpushPubsubTopic), message) let wlpRes = await wakuNode.lightpushPublish(some(lightpushPubsubTopic), message)
let msgHash = computeMessageHash(lightpushPubsubTopic, message).to0xHex
if wlpRes.isOk(): if wlpRes.isOk():
info "published message using lightpush", notice "published message using lightpush",
index = messagesSent, count = numMessages index = messagesSent,
count = numMessages,
size = msgSize,
pubsubTopic = lightpushPubsubTopic,
hash = msgHash
else: else:
error "failed to publish message using lightpush", err = wlpRes.error error "failed to publish message using lightpush",
err = wlpRes.error, hash = msgHash
inc(failedToSendCount) inc(failedToSendCount)
await sleepAsync(delayMessages) # Publish every 5 seconds await sleepAsync(delayMessages)
inc(messagesSent) inc(messagesSent)
let report = catch: let report = catch:
@ -94,8 +128,15 @@ proc setupAndPublish*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) =
return return
# give some time to receiver side to set up # give some time to receiver side to set up
# TODO: this maybe done in more sphisticated way, though. let waitTillStartTesting = conf.startPublishingAfter.seconds
let waitTillStartTesting = 5.seconds
let parsedMinMsgSize = parseMsgSize(conf.minTestMessageSize).valueOr:
error "failed to parse 'min-test-msg-size' param: ", error = error
return
let parsedMaxMsgSize = parseMsgSize(conf.maxTestMessageSize).valueOr:
error "failed to parse 'max-test-msg-size' param: ", error = error
return
info "Sending test messages in", wait = waitTillStartTesting info "Sending test messages in", wait = waitTillStartTesting
waitFor sleepAsync(waitTillStartTesting) waitFor sleepAsync(waitTillStartTesting)
@ -108,5 +149,6 @@ proc setupAndPublish*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) =
conf.pubsubTopics[0], conf.pubsubTopics[0],
conf.contentTopics[0], conf.contentTopics[0],
conf.numMessages, conf.numMessages,
(min: parsedMinMsgSize, max: parsedMaxMsgSize),
conf.delayMessages.milliseconds, conf.delayMessages.milliseconds,
) )

View File

@ -20,9 +20,12 @@ import
../../waku/node/health_monitor, ../../waku/node/health_monitor,
../../waku/node/waku_metrics, ../../waku/node/waku_metrics,
../../waku/waku_api/rest/builder as rest_server_builder, ../../waku/waku_api/rest/builder as rest_server_builder,
../../waku/waku_lightpush/common,
../../waku/waku_filter_v2,
./tester_config, ./tester_config,
./lightpush_publisher, ./lightpush_publisher,
./filter_subscriber ./filter_subscriber,
./diagnose_connections
logScope: logScope:
topics = "liteprotocoltester main" topics = "liteprotocoltester main"
@ -84,7 +87,7 @@ when isMainModule:
wakuConf.logFormat = conf.logFormat wakuConf.logFormat = conf.logFormat
wakuConf.staticNodes = @[conf.serviceNode] wakuConf.staticNodes = @[conf.serviceNode]
wakuConf.nat = conf.nat wakuConf.nat = conf.nat
wakuConf.maxConnections = 100 wakuConf.maxConnections = 500
wakuConf.restAddress = conf.restAddress wakuConf.restAddress = conf.restAddress
wakuConf.restPort = conf.restPort wakuConf.restPort = conf.restPort
wakuConf.restAllowOrigin = conf.restAllowOrigin wakuConf.restAllowOrigin = conf.restAllowOrigin
@ -106,6 +109,9 @@ when isMainModule:
wakuConf.rest = true wakuConf.rest = true
wakuConf.metricsServer = true
wakuConf.metricsServerAddress = parseIpAddress("0.0.0.0")
# NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it # NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it
# It will always be called from main thread anyway. # It will always be called from main thread anyway.
# Ref: https://nim-lang.org/docs/manual.html#threads-gc-safety # Ref: https://nim-lang.org/docs/manual.html#threads-gc-safety
@ -187,8 +193,12 @@ when isMainModule:
info "Node setup complete" info "Node setup complete"
if conf.testFunc == TesterFunctionality.SENDER: if conf.testFunc == TesterFunctionality.SENDER:
waitFor startPeriodicPeerDiagnostic(wakuApp.node.peerManager, WakuLightPushCodec)
setupAndPublish(wakuApp.node, conf) setupAndPublish(wakuApp.node, conf)
else: else:
waitFor startPeriodicPeerDiagnostic(
wakuApp.node.peerManager, WakuFilterSubscribeCodec
)
setupAndSubscribe(wakuApp.node, conf) setupAndSubscribe(wakuApp.node, conf)
runForever() runForever()

View File

@ -5,6 +5,23 @@ IP=$(ip a | grep "inet " | grep -Fv 127.0.0.1 | sed 's/.*inet \([^/]*\).*/\1/')
echo "Service node IP: ${IP}" echo "Service node IP: ${IP}"
RETRIES=${RETRIES:=10}
while [ -z "${BOOTSTRAP_ENR}" ] && [ ${RETRIES} -ge 0 ]; do
BOOTSTRAP_ENR=$(wget -qO- http://bootstrap:8645/debug/v1/info --header='Content-Type:application/json' 2> /dev/null | sed 's/.*"enrUri":"\([^"]*\)".*/\1/');
echo "Bootstrap node not ready, retrying (retries left: ${RETRIES})"
sleep 1
RETRIES=$(( $RETRIES - 1 ))
done
if [ -z "${BOOTSTRAP_ENR}" ]; then
echo "Could not get BOOTSTRAP_ENR and none provided. Failing"
exit 1
fi
echo "Using bootstrap node: ${BOOTSTRAP_ENR}"
exec /usr/bin/wakunode\ exec /usr/bin/wakunode\
--relay=true\ --relay=true\
--filter=true\ --filter=true\
@ -20,10 +37,10 @@ exec /usr/bin/wakunode\
--dns-discovery=true\ --dns-discovery=true\
--discv5-discovery=true\ --discv5-discovery=true\
--discv5-enr-auto-update=True\ --discv5-enr-auto-update=True\
--log-level=DEBUG\ --discv5-bootstrap-node=${BOOTSTRAP_ENR}\
--log-level=INFO\
--metrics-server=True\ --metrics-server=True\
--metrics-server-address=0.0.0.0\ --metrics-server-address=0.0.0.0\
--nodekey=e3f5e64568b3a612dee609f6e7c0203c501dab6131662922bdcbcabd474281d5\
--nat=extip:${IP}\ --nat=extip:${IP}\
--pubsub-topic=/waku/2/default-waku/proto\ --pubsub-topic=/waku/2/rs/66/0\
--cluster-id=0 --cluster-id=66

View File

@ -1,5 +1,7 @@
#!/bin/sh #!/bin/sh
set -x
if test -f .env; then if test -f .env; then
echo "Using .env file" echo "Using .env file"
. $(pwd)/.env . $(pwd)/.env
@ -16,10 +18,22 @@ NODE_INDEX=$((FOURTH_OCTET + 256 * THIRD_OCTET))
echo "NODE_INDEX $NODE_INDEX" echo "NODE_INDEX $NODE_INDEX"
FUNCTION=$1
if [ "${FUNCTION}" = "SENDER" ]; then
FUNCTION=--test-func=SENDER
SERVICENAME=lightpush-service
fi
if [ "${FUNCTION}" = "RECEIVER" ]; then
FUNCTION=--test-func=RECEIVER
SERVICENAME=filter-service
fi
RETRIES=${RETRIES:=10} RETRIES=${RETRIES:=10}
while [ -z "${SERIVCE_NODE_ADDR}" ] && [ ${RETRIES} -ge 0 ]; do while [ -z "${SERIVCE_NODE_ADDR}" ] && [ ${RETRIES} -ge 0 ]; do
SERIVCE_NODE_ADDR=$(wget -qO- http://servicenode:8645/debug/v1/info --header='Content-Type:application/json' 2> /dev/null | sed 's/.*"listenAddresses":\["\([^"]*\)".*/\1/'); SERIVCE_NODE_ADDR=$(wget -qO- http://${SERVICENAME}:8645/debug/v1/info --header='Content-Type:application/json' 2> /dev/null | sed 's/.*"listenAddresses":\["\([^"]*\)".*/\1/');
echo "Service node not ready, retrying (retries left: ${RETRIES})" echo "Service node not ready, retrying (retries left: ${RETRIES})"
sleep 1 sleep 1
RETRIES=$(( $RETRIES - 1 )) RETRIES=$(( $RETRIES - 1 ))
@ -30,47 +44,43 @@ if [ -z "${SERIVCE_NODE_ADDR}" ]; then
exit 1 exit 1
fi fi
if [ -n "${PUBSUB}" ]; then if [ -n "${PUBSUB}" ]; then
PUBSUB=--pubsub-topic="${PUBSUB}" PUBSUB=--pubsub-topic="${PUBSUB}"
else
PUBSUB=--pubsub-topic="/waku/2/rs/66/0"
fi fi
if [ -n "${CONTENT_TOPIC}" ]; then if [ -n "${CONTENT_TOPIC}" ]; then
CONTENT_TOPIC=--content-topic="${CONTENT_TOPIC}" CONTENT_TOPIC=--content-topic="${CONTENT_TOPIC}"
fi fi
FUNCTION=$1 if [ -n "${START_PUBLISHING_AFTER}" ]; then
START_PUBLISHING_AFTER=--start-publishing-after="${START_PUBLISHING_AFTER}"
fi
if [ -n "${MIN_MESSAGE_SIZE}" ]; then
MIN_MESSAGE_SIZE=--min-test-msg-size="${MIN_MESSAGE_SIZE}"
fi
if [ -n "${MAX_MESSAGE_SIZE}" ]; then
MAX_MESSAGE_SIZE=--max-test-msg-size="${MAX_MESSAGE_SIZE}"
fi
echo "Tester node: ${FUNCTION}" echo "Tester node: ${FUNCTION}"
REST_PORT=--rest-port=8647
if [ "${FUNCTION}" = "SENDER" ]; then
FUNCTION=--test-func=SENDER
REST_PORT=--rest-port=8646
fi
if [ "${FUNCTION}" = "RECEIVER" ]; then
FUNCTION=--test-func=RECEIVER
REST_PORT=--rest-port=8647
fi
if [ -z "${FUNCTION}" ]; then
FUNCTION=--test-func=RECEIVER
fi
echo "Using service node: ${SERIVCE_NODE_ADDR}" echo "Using service node: ${SERIVCE_NODE_ADDR}"
exec /usr/bin/liteprotocoltester\ exec /usr/bin/liteprotocoltester\
--log-level=DEBUG\ --log-level=INFO\
--service-node="${SERIVCE_NODE_ADDR}"\ --service-node="${SERIVCE_NODE_ADDR}"\
--pubsub-topic=/waku/2/default-waku/proto\ --cluster-id=66\
--cluster-id=0\
--num-messages=${NUM_MESSAGES}\ --num-messages=${NUM_MESSAGES}\
--delay-messages=${DELAY_MESSAGES}\ --delay-messages=${DELAY_MESSAGES}\
--nat=extip:${IP}\ --nat=extip:${IP}\
${FUNCTION}\
${PUBSUB}\ ${PUBSUB}\
${CONTENT_TOPIC}\ ${CONTENT_TOPIC}\
${REST_PORT} ${FUNCTION}\
${START_PUBLISHING_AFTER}\
${MIN_MESSAGE_SIZE}\
${MAX_MESSAGE_SIZE}
# --config-file=config.toml\ # --config-file=config.toml\

View File

@ -24,8 +24,10 @@ import
export confTomlDefs, confTomlNet, confEnvvarDefs, confEnvvarNet export confTomlDefs, confTomlNet, confEnvvarDefs, confEnvvarNet
const const
LitePubsubTopic* = PubsubTopic("/waku/2/default-waku/proto") LitePubsubTopic* = PubsubTopic("/waku/2/rs/66/0")
LiteContentTopic* = ContentTopic("/tester/1/light-pubsub-example/proto") LiteContentTopic* = ContentTopic("/tester/1/light-pubsub-example/proto")
DefaultMinTestMessageSizeStr* = "1KiB"
DefaultMaxTestMessageSizeStr* = "150KiB"
type TesterFunctionality* = enum type TesterFunctionality* = enum
SENDER # pumps messages to the network SENDER # pumps messages to the network
@ -74,6 +76,12 @@ type LiteProtocolTesterConf* = object
desc: "Number of messages to send.", defaultValue: 120, name: "num-messages" desc: "Number of messages to send.", defaultValue: 120, name: "num-messages"
.}: uint32 .}: uint32
startPublishingAfter* {.
desc: "Wait number of seconds before start publishing messages.",
defaultValue: 5,
name: "start-publishing-after"
.}: uint32
delayMessages* {. delayMessages* {.
desc: "Delay between messages in milliseconds.", desc: "Delay between messages in milliseconds.",
defaultValue: 1000, defaultValue: 1000,
@ -103,8 +111,21 @@ type LiteProtocolTesterConf* = object
"Cluster id that the node is running in. Node in a different cluster id is disconnected.", "Cluster id that the node is running in. Node in a different cluster id is disconnected.",
defaultValue: 0, defaultValue: 0,
name: "cluster-id" name: "cluster-id"
.}: uint32 .}: uint16
minTestMessageSize* {.
desc:
"Minimum message size. Accepted units: KiB, KB, and B. e.g. 1024KiB; 1500 B; etc.",
defaultValue: DefaultMinTestMessageSizeStr,
name: "min-test-msg-size"
.}: string
maxTestMessageSize* {.
desc:
"Maximum message size. Accepted units: KiB, KB, and B. e.g. 1024KiB; 1500 B; etc.",
defaultValue: DefaultMaxTestMessageSizeStr,
name: "max-test-msg-size"
.}: string
## Tester REST service configuration ## Tester REST service configuration
restAddress* {. restAddress* {.
desc: "Listening address of the REST HTTP server.", desc: "Listening address of the REST HTTP server.",

View File

@ -18,6 +18,7 @@ type ProtocolTesterMessage* = object
startedAt*: int64 startedAt*: int64
sinceStart*: int64 sinceStart*: int64
sincePrev*: int64 sincePrev*: int64
size*: uint64
proc writeValue*( proc writeValue*(
writer: var JsonWriter[RestJson], value: ProtocolTesterMessage writer: var JsonWriter[RestJson], value: ProtocolTesterMessage
@ -29,6 +30,7 @@ proc writeValue*(
writer.writeField("startedAt", value.startedAt) writer.writeField("startedAt", value.startedAt)
writer.writeField("sinceStart", value.sinceStart) writer.writeField("sinceStart", value.sinceStart)
writer.writeField("sincePrev", value.sincePrev) writer.writeField("sincePrev", value.sincePrev)
writer.writeField("size", value.size)
writer.endRecord() writer.endRecord()
proc readValue*( proc readValue*(
@ -41,6 +43,7 @@ proc readValue*(
startedAt: Option[int64] startedAt: Option[int64]
sinceStart: Option[int64] sinceStart: Option[int64]
sincePrev: Option[int64] sincePrev: Option[int64]
size: Option[uint64]
for fieldName in readObjectFields(reader): for fieldName in readObjectFields(reader):
case fieldName case fieldName
@ -80,6 +83,12 @@ proc readValue*(
"Multiple `sincePrev` fields found", "ProtocolTesterMessage" "Multiple `sincePrev` fields found", "ProtocolTesterMessage"
) )
sincePrev = some(reader.readValue(int64)) sincePrev = some(reader.readValue(int64))
of "size":
if size.isSome():
reader.raiseUnexpectedField(
"Multiple `size` fields found", "ProtocolTesterMessage"
)
size = some(reader.readValue(uint64))
else: else:
unrecognizedFieldWarning() unrecognizedFieldWarning()
@ -101,6 +110,9 @@ proc readValue*(
if sincePrev.isNone(): if sincePrev.isNone():
reader.raiseUnexpectedValue("Field `sincePrev` is missing") reader.raiseUnexpectedValue("Field `sincePrev` is missing")
if size.isNone():
reader.raiseUnexpectedValue("Field `size` is missing")
value = ProtocolTesterMessage( value = ProtocolTesterMessage(
sender: sender.get(), sender: sender.get(),
index: index.get(), index: index.get(),
@ -108,4 +120,5 @@ proc readValue*(
startedAt: startedAt.get(), startedAt: startedAt.get(),
sinceStart: sinceStart.get(), sinceStart: sinceStart.get(),
sincePrev: sincePrev.get(), sincePrev: sincePrev.get(),
size: size.get(),
) )