chore: liteprotocoltester for simulation and for fleets (#2813)

* Added phase 2 - waku-simulatior integration in README.md

* Enhancement on statistics reports, added list of sent messages with hash, fixed latency calculations

* Enable standalonde running liteprotocoltester agains any waku network/fleet

* Fix missing env vars on run_tester_node.sh

* Adjustment on log levels, fix REST initialization

* Added standalon docker image build, fine tune duplicate detection and logging.

* Adjustments for waku-simulator runs

* Extended liteprotocoltester README.md with docker build

* Fix test inside docker service node connectivity failure

* Update apps/liteprotocoltester/README.md

Co-authored-by: gabrielmer <101006718+gabrielmer@users.noreply.github.com>

* Apply suggestions from code review

Co-authored-by: gabrielmer <101006718+gabrielmer@users.noreply.github.com>
Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>

* Explain minLatency calculation in code comment

---------

Co-authored-by: gabrielmer <101006718+gabrielmer@users.noreply.github.com>
Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
This commit is contained in:
NagyZoltanPeter 2024-08-21 14:54:18 +02:00 committed by GitHub
parent d63e34304d
commit f4fa73e961
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 957 additions and 156 deletions

View File

@ -0,0 +1,27 @@
START_PUBLISHING_AFTER=10
# can add some seconds delay before SENDER starts publishing
NUM_MESSAGES=0
# 0 for infinite number of messages
DELAY_MESSAGES=8000
# ms delay between messages
MIN_MESSAGE_SIZE=15Kb
MAX_MESSAGE_SIZE=145Kb
## for wakusim
PUBSUB=/waku/2/rs/66/0
CONTENT_TOPIC=/tester/2/light-pubsub-test/wakusim
CLUSTER_ID=66
## for status.prod
#PUBSUB=/waku/2/rs/16/32
#CONTENT_TOPIC=/tester/2/light-pubsub-test/fleet
#CLUSTER_ID=16
## for TWN
#PUBSUB=/waku/2/rs/1/4
#CONTENT_TOPIC=/tester/2/light-pubsub-test/twn
#CLUSTER_ID=1

View File

@ -0,0 +1,36 @@
# TESTING IMAGE --------------------------------------------------------------
## NOTICE: This is a short cut build file for ubuntu users who compiles nwaku in ubuntu distro.
## This is used for faster turnaround time for testing the compiled binary.
## Prerequisites: compiled liteprotocoltester binary in build/ directory
FROM ubuntu:noble AS prod
LABEL maintainer="zoltan@status.im"
LABEL source="https://github.com/waku-org/nwaku"
LABEL description="Lite Protocol Tester: Waku light-client"
LABEL commit="unknown"
LABEL version="unknown"
# DevP2P, LibP2P, and JSON RPC ports
EXPOSE 30303 60000 8545
# Referenced in the binary
RUN apt-get update && apt-get install -y --no-install-recommends \
libgcc1 \
libpcre3 \
libpq-dev \
wget \
iproute2 \
&& rm -rf /var/lib/apt/lists/*
# Fix for 'Error loading shared library libpcre.so.3: No such file or directory'
RUN ln -s /usr/lib/libpcre.so /usr/lib/libpcre.so.3
COPY build/liteprotocoltester /usr/bin/
COPY apps/liteprotocoltester/run_tester_node.sh /usr/bin/
ENTRYPOINT ["/usr/bin/run_tester_node.sh", "/usr/bin/liteprotocoltester"]
# # By default just show help if called without arguments
CMD ["--help"]

View File

@ -4,7 +4,7 @@
ARG NIMFLAGS ARG NIMFLAGS
ARG MAKE_TARGET=liteprotocoltester ARG MAKE_TARGET=liteprotocoltester
ARG NIM_COMMIT ARG NIM_COMMIT
ARG LOG_LEVEL=TRACE ARG LOG_LEVEL=DEBUG
# Get build tools and required header files # Get build tools and required header files
RUN apk add --no-cache bash git build-base pcre-dev linux-headers curl jq RUN apk add --no-cache bash git build-base pcre-dev linux-headers curl jq
@ -27,7 +27,7 @@
# PRODUCTION IMAGE ------------------------------------------------------------- # PRODUCTION IMAGE -------------------------------------------------------------
FROM alpine:3.18 as prod FROM alpine:3.18 AS prod
ARG MAKE_TARGET=liteprotocoltester ARG MAKE_TARGET=liteprotocoltester

View File

@ -4,7 +4,7 @@
## This is used for faster turnaround time for testing the compiled binary. ## This is used for faster turnaround time for testing the compiled binary.
## Prerequisites: compiled liteprotocoltester binary in build/ directory ## Prerequisites: compiled liteprotocoltester binary in build/ directory
FROM ubuntu:noble as prod FROM ubuntu:noble AS prod
LABEL maintainer="jakub@status.im" LABEL maintainer="jakub@status.im"
LABEL source="https://github.com/waku-org/nwaku" LABEL source="https://github.com/waku-org/nwaku"

View File

@ -37,6 +37,9 @@ At this stage we can only configure number of messages and fixed frequency of th
### Phase 1 ### Phase 1
> NOTICE: This part is obsolate due integration with waku-simulator.
> It needs some rework to make it work again standalone.
Lite Protocol Tester application is built under name `liteprotocoltester` in apps/liteprotocoltester folder. Lite Protocol Tester application is built under name `liteprotocoltester` in apps/liteprotocoltester folder.
Starting from nwaku repository root: Starting from nwaku repository root:
@ -48,16 +51,102 @@ docker compose up -d
docker compose logs -f receivernode docker compose logs -f receivernode
``` ```
### Phase 2
> Integration with waku-simulator!
- For convenience, integration is done in cooperation with waku-simulator repository, but nothing is tightly coupled.
- waku-simulator must be started separately with its own configuration.
- To enable waku-simulator working without RLN currently a separate branch is to be used.
- When waku-simulator is configured and up and running, lite-protocol-tester composite docker setup can be started.
```bash
# Start waku-simulator
git clone https://github.com/waku-org/waku-simulator.git ../waku-simulator
cd ../waku-simulator
git checkout chore-integrate-liteprotocoltester
# optionally edit .env file
docker compose -f docker-compose-norln.yml up -d
# navigate localhost:30001 to see the waku-simulator dashboard
cd ../{your-repository}
make LOG_LEVEL=DEBUG liteprotocoltester
cd apps/liteprotocoltester
# optionally edit .env file
docker compose -f docker-compose-on-simularor.yml build
docker compose -f docker-compose-on-simularor.yml up -d
docker compose -f docker-compose-on-simularor.yml logs -f receivernode
```
#### Current setup
- waku-simulator is configured to run with 25 full node
- liteprotocoltester is configured to run with 3 publisher and 1 receiver
- liteprotocoltester is configured to run 1 lightpush service and a filter service node
- light clients are connected accordingly
- publishers will send 250 messages in every 200ms with size between 1KiB and 120KiB
- Notice there is a configurable wait before start publishing messages as it is noticed time is needed for the service nodes to get connected to full nodes from simulator
- light clients will print report on their and the connected service node's connectivity to the network in every 20 secs.
### Phase 3
> Run independently on a chosen waku fleet
This option is simple as is just to run the built liteprotocoltester binary with run_tester_node.sh script.
Syntax:
`./run_tester_node.sh <path-to-liteprotocoltester-binary> <SENDER|RECEIVER> <service-node-address>`
How to run from you nwaku repository:
```bash
cd ../{your-repository}
make LOG_LEVEL=DEBUG liteprotocoltester
cd apps/liteprotocoltester
# optionally edit .env file
# run publisher side
./run_tester_node.sh ../../build/liteprotocoltester SENDER [chosen service node address that support lightpush]
# or run receiver side
./run_tester_node.sh ../../build/liteprotocoltester RECEIVER [chosen service node address that support filter service]
```
#### Recommendations
In order to run on any kind of network, it is recommended to deploy the built `liteprotocoltester` binary with the `.env` file and the `run_tester_node.sh` script to the desired machine.
Select a lightpush service node and a filter service node from the targeted network, or you can run your own. Note down the selected peers peer_id.
Run a SENDER role liteprotocoltester and a RECEIVER role one on different terminals. Depending on the test aim, you may want to redirect the output to a file.
> RECEIVER side will periodically print statistics to standard output.
## Configure ## Configure
### Environment variables for docker compose runs ### Environment variables for docker compose runs
| Variable | Description | Default | | Variable | Description | Default |
| ---: | :--- | :--- | | ---: | :--- | :--- |
| NUM_MESSAGES | Number of message to publish | 120 | | NUM_MESSAGES | Number of message to publish, 0 means infinite | 120 |
| DELAY_MESSAGES | Frequency of messages in milliseconds | 1000 | | DELAY_MESSAGES | Frequency of messages in milliseconds | 1000 |
| PUBSUB | Used pubsub_topic for testing | /waku/2/rs/0/0 | | PUBSUB | Used pubsub_topic for testing | /waku/2/rs/66/0 |
| CONTENT_TOPIC | content_topic for testing | /tester/1/light-pubsub-example/proto | | CONTENT_TOPIC | content_topic for testing | /tester/1/light-pubsub-example/proto |
| CLUSTER_ID | cluster_id of the network | 16 |
| START_PUBLISHING_AFTER | Delay in seconds before starting to publish to let service node connected | 5 |
| MIN_MESSAGE_SIZE | Minimum message size in bytes | 1KiB |
| MAX_MESSAGE_SIZE | Maximum message size in bytes | 120KiB |
### Lite Protocol Tester application cli options ### Lite Protocol Tester application cli options
@ -67,7 +156,10 @@ docker compose logs -f receivernode
| --service-node| Address of the service node to use for lightpush and/or filter service | - | | --service-node| Address of the service node to use for lightpush and/or filter service | - |
| --num-messages | Number of message to publish | 120 | | --num-messages | Number of message to publish | 120 |
| --delay-messages | Frequency of messages in milliseconds | 1000 | | --delay-messages | Frequency of messages in milliseconds | 1000 |
| --pubsub-topic | Used pubsub_topic for testing | /waku/2/rs/0/0 | | --min-message-size | Minimum message size in bytes | 1KiB |
| --max-message-size | Maximum message size in bytes | 120KiB |
| --start-publishing-after | Delay in seconds before starting to publish to let service node connected in seconds | 5 |
| --pubsub-topic | Used pubsub_topic for testing | /waku/2/default-waku/proto |
| --content_topic | content_topic for testing | /tester/1/light-pubsub-example/proto | | --content_topic | content_topic for testing | /tester/1/light-pubsub-example/proto |
| --cluster-id | Cluster id for the test | 0 | | --cluster-id | Cluster id for the test | 0 |
| --config-file | TOML configuration file to fine tune the light waku node<br>Note that some configurations (full node services) are not taken into account | - | | --config-file | TOML configuration file to fine tune the light waku node<br>Note that some configurations (full node services) are not taken into account | - |
@ -82,6 +174,34 @@ docker compose logs -f receivernode
### Docker image notice ### Docker image notice
#### Building for docker compose runs on simulator or standalone
Please note that currently to ease testing and development tester application docker image is based on ubuntu and uses the externally pre-built binary of 'liteprotocoltester'. Please note that currently to ease testing and development tester application docker image is based on ubuntu and uses the externally pre-built binary of 'liteprotocoltester'.
This speeds up image creation. Another dokcer build file is provided for proper build of boundle image. This speeds up image creation. Another dokcer build file is provided for proper build of boundle image.
> `Dockerfile.liteprotocoltester.copy` will create an image with the binary copied from the build directory.
> `Dockerfile.liteprotocoltester.compile` will create an image completely compiled from source. This can be quite slow.
#### Creating standalone runner docker image
To ease the work with lite-proto-tester, a docker image is possible to build.
With that image it is easy to run the application in a container.
> `Dockerfile.liteprotocoltester` will create an ubuntu image with the binary copied from the build directory. You need to pre-build the application.
Here is how to build and run:
```bash
cd <your-repository>
make liteprotocoltester
cd apps/liteprotocoltester
docker build -t liteprotocoltester:latest -f Dockerfile.liteprotocoltester ../..
# alternatively you can push it to a registry
# edit and adjust .env file to your needs and for the network configuration
docker run --env-file .env liteprotocoltester:latest RECEIVER <service-node-ip4-peer-address>
docker run --env-file .env liteprotocoltester:latest SENDER <service-node-ip4-peer-address>
```

View File

@ -0,0 +1,96 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[options, strutils, os, sequtils, net, strformat],
chronicles,
chronos,
metrics,
libbacktrace,
system/ansi_c,
libp2p/crypto/crypto,
confutils,
libp2p/wire
import
../../waku/common/logging,
../../waku/factory/waku,
../../waku/factory/external_config,
../../waku/node/health_monitor,
../../waku/node/waku_metrics,
../../waku/waku_api/rest/builder as rest_server_builder,
../../waku/node/peer_manager,
../../waku/waku_lightpush/common,
../../waku/waku_relay,
../../waku/waku_filter_v2,
../../waku/waku_api/rest/client,
../../waku/waku_api/rest/admin/client,
./tester_config,
./lightpush_publisher,
./filter_subscriber
logScope:
topics = "diagnose connections"
proc logSelfPeersLoop(pm: PeerManager, interval: Duration) {.async.} =
trace "Starting logSelfPeersLoop diagnosis loop"
while true:
let selfLighpushPeers = pm.peerStore.getPeersByProtocol(WakuLightPushCodec)
let selfRelayPeers = pm.peerStore.getPeersByProtocol(WakuRelayCodec)
let selfFilterPeers = pm.peerStore.getPeersByProtocol(WakuFilterSubscribeCodec)
let printable = catch:
"""*------------------------------------------------------------------------------------------*
| Self ({pm.switch.peerInfo}) peers:
*------------------------------------------------------------------------------------------*
| Lightpush peers({selfLighpushPeers.len()}): ${selfLighpushPeers}
*------------------------------------------------------------------------------------------*
| Filter peers({selfFilterPeers.len()}): ${selfFilterPeers}
*------------------------------------------------------------------------------------------*
| Relay peers({selfRelayPeers.len()}): ${selfRelayPeers}
*------------------------------------------------------------------------------------------*""".fmt()
if printable.isErr():
echo "Error while printing statistics: " & printable.error().msg
else:
echo printable.get()
await sleepAsync(interval)
proc logServiceRelayPeers(
pm: PeerManager, codec: string, interval: Duration
) {.async.} =
trace "Starting service node connectivity diagnosys loop"
while true:
echo "*------------------------------------------------------------------------------------------*"
echo "| Service peer connectivity:"
let selfLighpushPeers = pm.selectPeer(codec)
if selfLighpushPeers.isSome():
let ma = selfLighpushPeers.get().addrs[0]
var serviceIp = initTAddress(ma).valueOr:
echo "Error while parsing multiaddress: " & $error
continue
serviceIp.port = Port(8645)
let restClient = newRestHttpClient(initTAddress($serviceIp))
let getPeersRes = await restClient.getPeers()
if getPeersRes.status == 200:
let nrOfPeers = getPeersRes.data.len()
echo "Service node (@" & $ma & ") peers: " & $getPeersRes.data
else:
echo "Error while fetching service node (@" & $ma & ") peers: " &
$getPeersRes.data
else:
echo "No service node peers found"
echo "*------------------------------------------------------------------------------------------*"
await sleepAsync(interval)
proc startPeriodicPeerDiagnostic*(pm: PeerManager, codec: string) {.async.} =
asyncSpawn logSelfPeersLoop(pm, chronos.seconds(60))
# asyncSpawn logServiceRelayPeers(pm, codec, chronos.seconds(20))

View File

@ -0,0 +1,219 @@
version: "3.7"
x-logging: &logging
logging:
driver: json-file
options:
max-size: 1000m
# Environment variable definitions
x-eth-client-address: &eth_client_address ${ETH_CLIENT_ADDRESS:-} # Add your ETH_CLIENT_ADDRESS after the "-"
x-rln-environment: &rln_env
RLN_RELAY_CONTRACT_ADDRESS: ${RLN_RELAY_CONTRACT_ADDRESS:-0xF471d71E9b1455bBF4b85d475afb9BB0954A29c4}
RLN_RELAY_CRED_PATH: ${RLN_RELAY_CRED_PATH:-} # Optional: Add your RLN_RELAY_CRED_PATH after the "-"
RLN_RELAY_CRED_PASSWORD: ${RLN_RELAY_CRED_PASSWORD:-} # Optional: Add your RLN_RELAY_CRED_PASSWORD after the "-"
x-test-running-conditions: &test_running_conditions
NUM_MESSAGES: ${NUM_MESSAGES:-120}
DELAY_MESSAGES: "${DELAY_MESSAGES:-1000}"
PUBSUB: ${PUBSUB:-/waku/2/rs/66/0}
CONTENT_TOPIC: ${CONTENT_TOPIC:-/tester/2/light-pubsub-test/wakusim}
CLUSTER_ID: ${CLUSTER_ID:-66}
MIN_MESSAGE_SIZE: ${MIN_MESSAGE_SIZE:-1Kb}
MAX_MESSAGE_SIZE: ${MAX_MESSAGE_SIZE:-150Kb}
START_PUBLISHING_AFTER: ${START_PUBLISHING_AFTER:-5} # seconds
# Services definitions
services:
lightpush-service:
image: ${NWAKU_IMAGE:-harbor.status.im/wakuorg/nwaku:latest-release}
# ports:
# - 30304:30304/tcp
# - 30304:30304/udp
# - 9005:9005/udp
# - 127.0.0.1:8003:8003
# - 80:80 #Let's Encrypt
# - 8000:8000/tcp #WSS
# - 127.0.0.1:8645:8645
<<:
- *logging
environment:
DOMAIN: ${DOMAIN}
RLN_RELAY_CRED_PASSWORD: "${RLN_RELAY_CRED_PASSWORD}"
ETH_CLIENT_ADDRESS: *eth_client_address
EXTRA_ARGS: ${EXTRA_ARGS}
<<:
- *rln_env
- *test_running_conditions
volumes:
- ./run_service_node.sh:/opt/run_service_node.sh:Z
- ${CERTS_DIR:-./certs}:/etc/letsencrypt/:Z
- ./rln_tree:/etc/rln_tree/:Z
- ./keystore:/keystore:Z
entrypoint: sh
command:
- /opt/run_service_node.sh
- LIGHTPUSH
networks:
- waku-simulator_simulation
publishernode:
image: waku.liteprotocoltester:latest
build:
context: ../..
dockerfile: ./apps/liteprotocoltester/Dockerfile.liteprotocoltester.copy
deploy:
replicas: ${NUM_PUBLISHER_NODES:-3}
# ports:
# - 30304:30304/tcp
# - 30304:30304/udp
# - 9005:9005/udp
# - 127.0.0.1:8003:8003
# - 80:80 #Let's Encrypt
# - 8000:8000/tcp #WSS
# - 127.0.0.1:8646:8646
<<:
- *logging
environment:
DOMAIN: ${DOMAIN}
RLN_RELAY_CRED_PASSWORD: "${RLN_RELAY_CRED_PASSWORD}"
ETH_CLIENT_ADDRESS: *eth_client_address
EXTRA_ARGS: ${EXTRA_ARGS}
<<:
- *rln_env
- *test_running_conditions
volumes:
- ./run_tester_node.sh:/opt/run_tester_node.sh:Z
- ${CERTS_DIR:-./certs}:/etc/letsencrypt/:Z
- ./rln_tree:/etc/rln_tree/:Z
- ./keystore:/keystore:Z
entrypoint: sh
command:
- /opt/run_tester_node.sh
- /usr/bin/liteprotocoltester
- SENDER
- waku-sim
depends_on:
- lightpush-service
configs:
- source: cfg_tester_node.toml
target: config.toml
networks:
- waku-simulator_simulation
filter-service:
image: ${NWAKU_IMAGE:-harbor.status.im/wakuorg/nwaku:latest-release}
# ports:
# - 30304:30305/tcp
# - 30304:30305/udp
# - 9005:9005/udp
# - 127.0.0.1:8003:8003
# - 80:80 #Let's Encrypt
# - 8000:8000/tcp #WSS
# - 127.0.0.1:8645:8645
<<:
- *logging
environment:
DOMAIN: ${DOMAIN}
RLN_RELAY_CRED_PASSWORD: "${RLN_RELAY_CRED_PASSWORD}"
ETH_CLIENT_ADDRESS: *eth_client_address
EXTRA_ARGS: ${EXTRA_ARGS}
<<:
- *rln_env
- *test_running_conditions
volumes:
- ./run_service_node.sh:/opt/run_service_node.sh:Z
- ${CERTS_DIR:-./certs}:/etc/letsencrypt/:Z
- ./rln_tree:/etc/rln_tree/:Z
- ./keystore:/keystore:Z
entrypoint: sh
command:
- /opt/run_service_node.sh
- FILTER
networks:
- waku-simulator_simulation
receivernode:
image: waku.liteprotocoltester:latest
build:
context: ../..
dockerfile: ./apps/liteprotocoltester/Dockerfile.liteprotocoltester.copy
deploy:
replicas: ${NUM_RECEIVER_NODES:-1}
# ports:
# - 30304:30304/tcp
# - 30304:30304/udp
# - 9005:9005/udp
# - 127.0.0.1:8003:8003
# - 80:80 #Let's Encrypt
# - 8000:8000/tcp #WSS
# - 127.0.0.1:8647:8647
<<:
- *logging
environment:
DOMAIN: ${DOMAIN}
RLN_RELAY_CRED_PASSWORD: "${RLN_RELAY_CRED_PASSWORD}"
ETH_CLIENT_ADDRESS: *eth_client_address
EXTRA_ARGS: ${EXTRA_ARGS}
<<:
- *rln_env
- *test_running_conditions
volumes:
- ./run_tester_node.sh:/opt/run_tester_node.sh:Z
- ${CERTS_DIR:-./certs}:/etc/letsencrypt/:Z
- ./rln_tree:/etc/rln_tree/:Z
- ./keystore:/keystore:Z
entrypoint: sh
command:
- /opt/run_tester_node.sh
- /usr/bin/liteprotocoltester
- RECEIVER
- waku-sim
depends_on:
- filter-service
- publishernode
configs:
- source: cfg_tester_node.toml
target: config.toml
networks:
- waku-simulator_simulation
## We have prometheus and grafana defined in waku-simulator already
# prometheus:
# image: docker.io/prom/prometheus:latest
# volumes:
# - ./monitoring/prometheus-config.yml:/etc/prometheus/prometheus.yml:Z
# command:
# - --config.file=/etc/prometheus/prometheus.yml
# ports:
# - 127.0.0.1:9090:9090
# depends_on:
# - servicenode
# grafana:
# image: docker.io/grafana/grafana:latest
# env_file:
# - ./monitoring/configuration/grafana-plugins.env
# volumes:
# - ./monitoring/configuration/grafana.ini:/etc/grafana/grafana.ini:Z
# - ./monitoring/configuration/dashboards.yaml:/etc/grafana/provisioning/dashboards/dashboards.yaml:Z
# - ./monitoring/configuration/datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml:Z
# - ./monitoring/configuration/dashboards:/var/lib/grafana/dashboards/:Z
# - ./monitoring/configuration/customizations/custom-logo.svg:/usr/share/grafana/public/img/grafana_icon.svg:Z
# - ./monitoring/configuration/customizations/custom-logo.svg:/usr/share/grafana/public/img/grafana_typelogo.svg:Z
# - ./monitoring/configuration/customizations/custom-logo.png:/usr/share/grafana/public/img/fav32.png:Z
# ports:
# - 0.0.0.0:3000:3000
# depends_on:
# - prometheus
configs:
cfg_tester_node.toml:
content: |
max-connections = 100
networks:
waku-simulator_simulation:
external: true

View File

@ -16,13 +16,18 @@ x-rln-environment: &rln_env
x-test-running-conditions: &test_running_conditions x-test-running-conditions: &test_running_conditions
NUM_MESSAGES: ${NUM_MESSAGES:-120} NUM_MESSAGES: ${NUM_MESSAGES:-120}
DELAY_MESSAGES: "${DELAY_MESSAGES:-1000}" DELAY_MESSAGES: "${DELAY_MESSAGES:-1000}"
PUBSUB: ${PUBSUB:-} PUBSUB: ${PUBSUB:-/waku/2/rs/66/0}
CONTENT_TOPIC: ${CONTENT_TOPIC:-} CONTENT_TOPIC: ${CONTENT_TOPIC:-/tester/2/light-pubsub-test/wakusim}
CLUSTER_ID: ${CLUSTER_ID:-66}
MIN_MESSAGE_SIZE: ${MIN_MESSAGE_SIZE:-1Kb}
MAX_MESSAGE_SIZE: ${MAX_MESSAGE_SIZE:-150Kb}
START_PUBLISHING_AFTER: ${START_PUBLISHING_AFTER:-5} # seconds
STANDALONE: ${STANDALONE:-1}
# Services definitions # Services definitions
services: services:
servicenode: servicenode:
image: ${NWAKU_IMAGE:-harbor.status.im/wakuorg/nwaku:latest} image: ${NWAKU_IMAGE:-harbor.status.im/wakuorg/nwaku:latest-release}
ports: ports:
- 30304:30304/tcp - 30304:30304/tcp
- 30304:30304/udp - 30304:30304/udp
@ -40,6 +45,7 @@ services:
EXTRA_ARGS: ${EXTRA_ARGS} EXTRA_ARGS: ${EXTRA_ARGS}
<<: <<:
- *rln_env - *rln_env
- *test_running_conditions
volumes: volumes:
- ./run_service_node.sh:/opt/run_service_node.sh:Z - ./run_service_node.sh:/opt/run_service_node.sh:Z
- ${CERTS_DIR:-./certs}:/etc/letsencrypt/:Z - ${CERTS_DIR:-./certs}:/etc/letsencrypt/:Z
@ -80,7 +86,9 @@ services:
entrypoint: sh entrypoint: sh
command: command:
- /opt/run_tester_node.sh - /opt/run_tester_node.sh
- /usr/bin/liteprotocoltester
- SENDER - SENDER
- servicenode
depends_on: depends_on:
- servicenode - servicenode
configs: configs:
@ -118,7 +126,9 @@ services:
entrypoint: sh entrypoint: sh
command: command:
- /opt/run_tester_node.sh - /opt/run_tester_node.sh
- /usr/bin/liteprotocoltester
- RECEIVER - RECEIVER
- servicenode
depends_on: depends_on:
- servicenode - servicenode
- publishernode - publishernode

View File

@ -45,6 +45,7 @@ proc maintainSubscription(
let pingRes = await wakuNode.wakuFilterClient.ping(filterPeer) let pingRes = await wakuNode.wakuFilterClient.ping(filterPeer)
if pingRes.isErr(): if pingRes.isErr():
# No subscription found. Let's subscribe. # No subscription found. Let's subscribe.
error "ping failed.", err = pingRes.error
trace "no subscription found. Sending subscribe request" trace "no subscription found. Sending subscribe request"
let subscribeRes = await wakuNode.filterSubscribe( let subscribeRes = await wakuNode.filterSubscribe(
@ -52,10 +53,10 @@ proc maintainSubscription(
) )
if subscribeRes.isErr(): if subscribeRes.isErr():
trace "subscribe request failed. Quitting.", err = subscribeRes.error error "subscribe request failed. Quitting.", err = subscribeRes.error
break break
else: else:
trace "subscribe request successful." notice "subscribe request successful."
else: else:
trace "subscription found." trace "subscription found."
@ -78,15 +79,19 @@ proc setupAndSubscribe*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) =
let pushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.async.} = let pushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.async.} =
let payloadStr = string.fromBytes(message.payload) let payloadStr = string.fromBytes(message.payload)
let testerMessage = js.Json.decode(payloadStr, ProtocolTesterMessage) let testerMessage = js.Json.decode(payloadStr, ProtocolTesterMessage)
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex
stats.addMessage(testerMessage.sender, testerMessage) stats.addMessage(testerMessage.sender, testerMessage, msgHash)
trace "message received", notice "message received",
index = testerMessage.index, index = testerMessage.index,
count = testerMessage.count, count = testerMessage.count,
startedAt = $testerMessage.startedAt, startedAt = $testerMessage.startedAt,
sinceStart = $testerMessage.sinceStart, sinceStart = $testerMessage.sinceStart,
sincePrev = $testerMessage.sincePrev sincePrev = $testerMessage.sincePrev,
size = $testerMessage.size,
pubsubTopic = pubsubTopic,
hash = msgHash
wakuNode.wakuFilterClient.registerPushHandler(pushHandler) wakuNode.wakuFilterClient.registerPushHandler(pushHandler)
@ -97,7 +102,7 @@ proc setupAndSubscribe*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) =
proc(udata: pointer) {.gcsafe.} = proc(udata: pointer) {.gcsafe.} =
stats.echoStats() stats.echoStats()
if stats.checkIfAllMessagesReceived(): if conf.numMessages > 0 and waitFor stats.checkIfAllMessagesReceived():
waitFor unsubscribe( waitFor unsubscribe(
wakuNode, remotePeer, conf.pubsubTopics[0], conf.contentTopics[0] wakuNode, remotePeer, conf.pubsubTopics[0], conf.contentTopics[0]
) )

View File

@ -1,23 +1,41 @@
import import
std/strformat, std/[strformat, sysrand, random, sequtils],
system/ansi_c, system/ansi_c,
chronicles, chronicles,
chronos, chronos,
chronos/timer as chtimer,
stew/byteutils, stew/byteutils,
results, results,
json_serialization as js json_serialization as js
import import
waku/[common/logging, waku_node, node/peer_manager, waku_core, waku_lightpush/client], waku/[
common/logging,
waku_node,
node/peer_manager,
waku_core,
waku_lightpush/client,
common/utils/parse_size_units,
],
./tester_config, ./tester_config,
./tester_message ./tester_message
randomize()
type SizeRange* = tuple[min: uint64, max: uint64]
var RANDOM_PALYLOAD {.threadvar.}: seq[byte]
RANDOM_PALYLOAD = urandom(1024 * 1024)
# 1MiB of random payload to be used to extend message
proc prepareMessage( proc prepareMessage(
sender: string, sender: string,
messageIndex, numMessages: uint32, messageIndex, numMessages: uint32,
startedAt: TimeStamp, startedAt: TimeStamp,
prevMessageAt: var Timestamp, prevMessageAt: var Timestamp,
contentTopic: ContentTopic, contentTopic: ContentTopic,
): WakuMessage = size: SizeRange,
): (WakuMessage, uint64) =
var renderSize = rand(size.min .. size.max)
let current = getNowInNanosecondTime() let current = getNowInNanosecondTime()
let payload = ProtocolTesterMessage( let payload = ProtocolTesterMessage(
sender: sender, sender: sender,
@ -26,61 +44,113 @@ proc prepareMessage(
startedAt: startedAt, startedAt: startedAt,
sinceStart: current - startedAt, sinceStart: current - startedAt,
sincePrev: current - prevMessageAt, sincePrev: current - prevMessageAt,
size: renderSize,
) )
prevMessageAt = current prevMessageAt = current
let text = js.Json.encode(payload) let text = js.Json.encode(payload)
let contentPayload = toBytes(text & " \0")
if renderSize < len(contentPayload).uint64:
renderSize = len(contentPayload).uint64
let finalPayload = concat(
contentPayload, RANDOM_PALYLOAD[0 .. renderSize - len(contentPayload).uint64]
)
let message = WakuMessage( let message = WakuMessage(
payload: toBytes(text), # content of the message payload: finalPayload, # content of the message
contentTopic: contentTopic, # content topic to publish to contentTopic: contentTopic, # content topic to publish to
ephemeral: true, # tell store nodes to not store it ephemeral: true, # tell store nodes to not store it
timestamp: current, # current timestamp timestamp: current, # current timestamp
) )
return message return (message, renderSize)
var sentMessages {.threadvar.}: OrderedTable[uint32, tuple[hash: string, relayed: bool]]
var failedToSendCause {.threadvar.}: Table[string, uint32]
var failedToSendCount {.threadvar.}: uint32
var numMessagesToSend {.threadvar.}: uint32
var messagesSent {.threadvar.}: uint32
proc reportSentMessages() {.async.} =
while true:
await sleepAsync(chtimer.seconds(60))
let report = catch:
"""*----------------------------------------*
| Expected | Sent | Failed |
|{numMessagesToSend+failedToSendCount:>11} |{messagesSent:>11} |{failedToSendCount:>11} |
*----------------------------------------*""".fmt()
if report.isErr:
echo "Error while printing statistics"
else:
echo report.get()
echo "*--------------------------------------------------------------------------------------------------*"
echo "| Failur cause | count |"
for (cause, count) in failedToSendCause.pairs:
echo fmt"|{cause:<87}|{count:>10}|"
echo "*--------------------------------------------------------------------------------------------------*"
echo "*--------------------------------------------------------------------------------------------------*"
echo "| Index | Relayed | Hash |"
for (index, info) in sentMessages.pairs:
echo fmt"|{index:>10}|{info.relayed:<9}| {info.hash:<76}|"
echo "*--------------------------------------------------------------------------------------------------*"
# evere sent message hash should logged once
sentMessages.clear()
proc publishMessages( proc publishMessages(
wakuNode: WakuNode, wakuNode: WakuNode,
lightpushPubsubTopic: PubsubTopic, lightpushPubsubTopic: PubsubTopic,
lightpushContentTopic: ContentTopic, lightpushContentTopic: ContentTopic,
numMessages: uint32, numMessages: uint32,
messageSizeRange: SizeRange,
delayMessages: Duration, delayMessages: Duration,
) {.async.} = ) {.async.} =
let startedAt = getNowInNanosecondTime() let startedAt = getNowInNanosecondTime()
var prevMessageAt = startedAt var prevMessageAt = startedAt
var failedToSendCount: uint32 = 0 var renderMsgSize = messageSizeRange
# sets some default of min max message size to avoid conflict with meaningful payload size
renderMsgSize.min = max(1024.uint64, renderMsgSize.min) # do not use less than 1KB
renderMsgSize.max = max(2048.uint64, renderMsgSize.max) # minimum of max is 2KB
renderMsgSize.min = min(renderMsgSize.min, renderMsgSize.max)
renderMsgSize.max = max(renderMsgSize.min, renderMsgSize.max)
let selfPeerId = $wakuNode.switch.peerInfo.peerId let selfPeerId = $wakuNode.switch.peerInfo.peerId
failedToSendCount = 0
numMessagesToSend = if numMessages == 0: uint32.high else: numMessages
messagesSent = 1
var messagesSent: uint32 = 1 while numMessagesToSend >= messagesSent:
while numMessages >= messagesSent: let (message, msgSize) = prepareMessage(
let message = prepareMessage( selfPeerId, messagesSent, numMessagesToSend, startedAt, prevMessageAt,
selfPeerId, messagesSent, numMessages, startedAt, prevMessageAt, lightpushContentTopic, renderMsgSize,
lightpushContentTopic,
) )
let wlpRes = await wakuNode.lightpushPublish(some(lightpushPubsubTopic), message) let wlpRes = await wakuNode.lightpushPublish(some(lightpushPubsubTopic), message)
let msgHash = computeMessageHash(lightpushPubsubTopic, message).to0xHex
if wlpRes.isOk(): if wlpRes.isOk():
info "published message using lightpush", sentMessages[messagesSent] = (hash: msgHash, relayed: true)
index = messagesSent, count = numMessages notice "published message using lightpush",
index = messagesSent,
count = numMessagesToSend,
size = msgSize,
pubsubTopic = lightpushPubsubTopic,
hash = msgHash
inc(messagesSent)
else: else:
error "failed to publish message using lightpush", err = wlpRes.error sentMessages[messagesSent] = (hash: msgHash, relayed: false)
failedToSendCause.mgetOrPut(wlpRes.error, 1).inc()
error "failed to publish message using lightpush",
err = wlpRes.error, hash = msgHash
inc(failedToSendCount) inc(failedToSendCount)
await sleepAsync(delayMessages) # Publish every 5 seconds await sleepAsync(delayMessages)
inc(messagesSent)
let report = catch: waitFor reportSentMessages()
"""*----------------------------------------*
| Expected | Sent | Failed |
|{numMessages:>11} |{messagesSent-failedToSendCount-1:>11} |{failedToSendCount:>11} |
*----------------------------------------*""".fmt()
if report.isErr:
echo "Error while printing statistics"
else:
echo report.get()
discard c_raise(ansi_c.SIGTERM) discard c_raise(ansi_c.SIGTERM)
@ -90,19 +160,30 @@ proc setupAndPublish*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) =
return return
# give some time to receiver side to set up # give some time to receiver side to set up
# TODO: this maybe done in more sphisticated way, though. let waitTillStartTesting = conf.startPublishingAfter.seconds
let waitTillStartTesting = 5.seconds
let parsedMinMsgSize = parseMsgSize(conf.minTestMessageSize).valueOr:
error "failed to parse 'min-test-msg-size' param: ", error = error
return
let parsedMaxMsgSize = parseMsgSize(conf.maxTestMessageSize).valueOr:
error "failed to parse 'max-test-msg-size' param: ", error = error
return
info "Sending test messages in", wait = waitTillStartTesting info "Sending test messages in", wait = waitTillStartTesting
waitFor sleepAsync(waitTillStartTesting) waitFor sleepAsync(waitTillStartTesting)
info "Start sending messages to service node using lightpush" info "Start sending messages to service node using lightpush"
sentMessages.sort(system.cmp)
# Start maintaining subscription # Start maintaining subscription
asyncSpawn publishMessages( asyncSpawn publishMessages(
wakuNode, wakuNode,
conf.pubsubTopics[0], conf.pubsubTopics[0],
conf.contentTopics[0], conf.contentTopics[0],
conf.numMessages, conf.numMessages,
(min: parsedMinMsgSize, max: parsedMaxMsgSize),
conf.delayMessages.milliseconds, conf.delayMessages.milliseconds,
) )
asyncSpawn reportSentMessages()

View File

@ -18,10 +18,13 @@ import
node/health_monitor, node/health_monitor,
node/waku_metrics, node/waku_metrics,
waku_api/rest/builder as rest_server_builder, waku_api/rest/builder as rest_server_builder,
waku_lightpush/common,
waku_filter_v2,
], ],
./tester_config, ./tester_config,
./lightpush_publisher, ./lightpush_publisher,
./filter_subscriber ./filter_subscriber,
./diagnose_connections
logScope: logScope:
topics = "liteprotocoltester main" topics = "liteprotocoltester main"
@ -81,9 +84,9 @@ when isMainModule:
wakuConf.logLevel = conf.logLevel wakuConf.logLevel = conf.logLevel
wakuConf.logFormat = conf.logFormat wakuConf.logFormat = conf.logFormat
wakuConf.staticNodes = @[conf.serviceNode] wakuConf.staticnodes = @[conf.serviceNode]
wakuConf.nat = conf.nat wakuConf.nat = conf.nat
wakuConf.maxConnections = 100 wakuConf.maxConnections = 500
wakuConf.restAddress = conf.restAddress wakuConf.restAddress = conf.restAddress
wakuConf.restPort = conf.restPort wakuConf.restPort = conf.restPort
wakuConf.restAllowOrigin = conf.restAllowOrigin wakuConf.restAllowOrigin = conf.restAllowOrigin
@ -103,7 +106,10 @@ when isMainModule:
wakuConf.lightpush = false wakuConf.lightpush = false
wakuConf.store = false wakuConf.store = false
wakuConf.rest = true wakuConf.rest = false
wakuConf.metricsServer = true
wakuConf.metricsServerAddress = parseIpAddress("0.0.0.0")
# NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it # NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it
# It will always be called from main thread anyway. # It will always be called from main thread anyway.
@ -186,8 +192,12 @@ when isMainModule:
info "Node setup complete" info "Node setup complete"
if conf.testFunc == TesterFunctionality.SENDER: if conf.testFunc == TesterFunctionality.SENDER:
waitFor startPeriodicPeerDiagnostic(wakuApp.node.peerManager, WakuLightPushCodec)
setupAndPublish(wakuApp.node, conf) setupAndPublish(wakuApp.node, conf)
else: else:
waitFor startPeriodicPeerDiagnostic(
wakuApp.node.peerManager, WakuFilterSubscribeCodec
)
setupAndSubscribe(wakuApp.node, conf) setupAndSubscribe(wakuApp.node, conf)
runForever() runForever()

View File

@ -0,0 +1,4 @@
-d:chronicles_line_numbers
-d:chronicles_runtime_filtering:on
-d:discv5_protocol_id:d5waku
path = "../.."

41
apps/liteprotocoltester/run_service_node.sh Normal file → Executable file
View File

@ -5,6 +5,39 @@ IP=$(ip a | grep "inet " | grep -Fv 127.0.0.1 | sed 's/.*inet \([^/]*\).*/\1/')
echo "Service node IP: ${IP}" echo "Service node IP: ${IP}"
if [ -n "${PUBSUB}" ]; then
PUBSUB=--pubsub-topic="${PUBSUB}"
else
PUBSUB=--pubsub-topic="/waku/2/rs/66/0"
fi
if [ -n "${CLUSTER_ID}" ]; then
CLUSTER_ID=--cluster-id="${CLUSTER_ID}"
fi
echo "STANDALONE: ${STANDALONE}"
if [ -z "${STANDALONE}" ]; then
RETRIES=${RETRIES:=10}
while [ -z "${BOOTSTRAP_ENR}" ] && [ ${RETRIES} -ge 0 ]; do
BOOTSTRAP_ENR=$(wget -qO- http://bootstrap:8645/debug/v1/info --header='Content-Type:application/json' 2> /dev/null | sed 's/.*"enrUri":"\([^"]*\)".*/\1/');
echo "Bootstrap node not ready, retrying (retries left: ${RETRIES})"
sleep 1
RETRIES=$(( $RETRIES - 1 ))
done
if [ -z "${BOOTSTRAP_ENR}" ]; then
echo "Could not get BOOTSTRAP_ENR and none provided. Failing"
exit 1
fi
echo "Using bootstrap node: ${BOOTSTRAP_ENR}"
fi
exec /usr/bin/wakunode\ exec /usr/bin/wakunode\
--relay=true\ --relay=true\
--filter=true\ --filter=true\
@ -20,10 +53,10 @@ exec /usr/bin/wakunode\
--dns-discovery=true\ --dns-discovery=true\
--discv5-discovery=true\ --discv5-discovery=true\
--discv5-enr-auto-update=True\ --discv5-enr-auto-update=True\
--log-level=DEBUG\ --discv5-bootstrap-node=${BOOTSTRAP_ENR}\
--log-level=INFO\
--metrics-server=True\ --metrics-server=True\
--metrics-server-address=0.0.0.0\ --metrics-server-address=0.0.0.0\
--nodekey=e3f5e64568b3a612dee609f6e7c0203c501dab6131662922bdcbcabd474281d5\
--nat=extip:${IP}\ --nat=extip:${IP}\
--pubsub-topic=/waku/2/rs/0/0\ ${PUBSUB}\
--cluster-id=0 ${CLUSTER_ID}

135
apps/liteprotocoltester/run_tester_node.sh Normal file → Executable file
View File

@ -1,5 +1,7 @@
#!/bin/sh #!/bin/sh
#set -x
if test -f .env; then if test -f .env; then
echo "Using .env file" echo "Using .env file"
. $(pwd)/.env . $(pwd)/.env
@ -9,68 +11,119 @@ IP=$(ip a | grep "inet " | grep -Fv 127.0.0.1 | sed 's/.*inet \([^/]*\).*/\1/')
echo "I am a lite-protocol-tester node" echo "I am a lite-protocol-tester node"
# Get an unique node index based on the container's IP BINARY_PATH=$1
FOURTH_OCTET=${IP##*.}
THIRD_OCTET="${IP%.*}"; THIRD_OCTET="${THIRD_OCTET##*.}"
NODE_INDEX=$((FOURTH_OCTET + 256 * THIRD_OCTET))
echo "NODE_INDEX $NODE_INDEX" if [ ! -x "${BINARY_PATH}" ]; then
echo "Invalid binary path '${BINARY_PATH}'. Failing"
exit 1
fi
RETRIES=${RETRIES:=10} if [ "${2}" = "--help" ]; then
echo "You might want to check nwaku/apps/liteprotocoltester/README.md"
exec "${BINARY_PATH}" --help
exit 0
fi
while [ -z "${SERIVCE_NODE_ADDR}" ] && [ ${RETRIES} -ge 0 ]; do FUNCTION=$2
SERIVCE_NODE_ADDR=$(wget -qO- http://servicenode:8645/debug/v1/info --header='Content-Type:application/json' 2> /dev/null | sed 's/.*"listenAddresses":\["\([^"]*\)".*/\1/'); if [ "${FUNCTION}" = "SENDER" ]; then
echo "Service node not ready, retrying (retries left: ${RETRIES})" FUNCTION=--test-func=SENDER
sleep 1 SERVICENAME=lightpush-service
RETRIES=$(( $RETRIES - 1 )) fi
done
if [ "${FUNCTION}" = "RECEIVER" ]; then
FUNCTION=--test-func=RECEIVER
SERVICENAME=filter-service
fi
SERIVCE_NODE_ADDR=$3
if [ -z "${SERIVCE_NODE_ADDR}" ]; then
echo "Service node peer_id provided. Failing"
exit 1
fi
DO_DETECT_SERVICENODE=0
if [ "${SERIVCE_NODE_ADDR}" = "servicenode" ]; then
DO_DETECT_SERVICENODE=1
SERIVCE_NODE_ADDR=""
SERVICENAME=servicenode
fi
if [ "${SERIVCE_NODE_ADDR}" = "waku-sim" ]; then
DO_DETECT_SERVICENODE=1
SERIVCE_NODE_ADDR=""
fi
if [ $DO_DETECT_SERVICENODE -eq 1 ]; then
RETRIES=${RETRIES:=10}
while [ -z "${SERIVCE_NODE_ADDR}" ] && [ ${RETRIES} -ge 0 ]; do
SERVICE_DEBUG_INFO=$(wget -qO- http://${SERVICENAME}:8645/debug/v1/info --header='Content-Type:application/json' 2> /dev/null);
echo "SERVICE_DEBUG_INFO: ${SERVICE_DEBUG_INFO}"
SERIVCE_NODE_ADDR=$(wget -qO- http://${SERVICENAME}:8645/debug/v1/info --header='Content-Type:application/json' 2> /dev/null | sed 's/.*"listenAddresses":\["\([^"]*\)".*/\1/');
echo "Service node not ready, retrying (retries left: ${RETRIES})"
sleep 1
RETRIES=$(( $RETRIES - 1 ))
done
fi
if [ -z "${SERIVCE_NODE_ADDR}" ]; then if [ -z "${SERIVCE_NODE_ADDR}" ]; then
echo "Could not get SERIVCE_NODE_ADDR and none provided. Failing" echo "Could not get SERIVCE_NODE_ADDR and none provided. Failing"
exit 1 exit 1
fi fi
if [ -n "${PUBSUB}" ]; then if [ -n "${PUBSUB}" ]; then
PUBSUB=--pubsub-topic="${PUBSUB}" PUBSUB=--pubsub-topic="${PUBSUB}"
else
PUBSUB=--pubsub-topic="/waku/2/rs/66/0"
fi fi
if [ -n "${CONTENT_TOPIC}" ]; then if [ -n "${CONTENT_TOPIC}" ]; then
CONTENT_TOPIC=--content-topic="${CONTENT_TOPIC}" CONTENT_TOPIC=--content-topic="${CONTENT_TOPIC}"
fi fi
FUNCTION=$1 if [ -n "${CLUSTER_ID}" ]; then
CLUSTER_ID=--cluster-id="${CLUSTER_ID}"
fi
if [ -n "${START_PUBLISHING_AFTER}" ]; then
START_PUBLISHING_AFTER=--start-publishing-after="${START_PUBLISHING_AFTER}"
fi
if [ -n "${MIN_MESSAGE_SIZE}" ]; then
MIN_MESSAGE_SIZE=--min-test-msg-size="${MIN_MESSAGE_SIZE}"
fi
if [ -n "${MAX_MESSAGE_SIZE}" ]; then
MAX_MESSAGE_SIZE=--max-test-msg-size="${MAX_MESSAGE_SIZE}"
fi
if [ -n "${NUM_MESSAGES}" ]; then
NUM_MESSAGES=--num-messages="${NUM_MESSAGES}"
fi
if [ -n "${DELAY_MESSAGES}" ]; then
DELAY_MESSAGES=--delay-messages="${DELAY_MESSAGES}"
fi
echo "Running binary: ${BINARY_PATH}"
echo "Tester node: ${FUNCTION}" echo "Tester node: ${FUNCTION}"
REST_PORT=--rest-port=8647
if [ "${FUNCTION}" = "SENDER" ]; then
FUNCTION=--test-func=SENDER
REST_PORT=--rest-port=8646
fi
if [ "${FUNCTION}" = "RECEIVER" ]; then
FUNCTION=--test-func=RECEIVER
REST_PORT=--rest-port=8647
fi
if [ -z "${FUNCTION}" ]; then
FUNCTION=--test-func=RECEIVER
fi
echo "Using service node: ${SERIVCE_NODE_ADDR}" echo "Using service node: ${SERIVCE_NODE_ADDR}"
exec /usr/bin/liteprotocoltester\
--log-level=DEBUG\ exec "${BINARY_PATH}"\
--log-level=INFO\
--service-node="${SERIVCE_NODE_ADDR}"\ --service-node="${SERIVCE_NODE_ADDR}"\
--pubsub-topic=/waku/2/rs/0/0\ ${DELAY_MESSAGES}\
--cluster-id=0\ ${NUM_MESSAGES}\
--num-messages=${NUM_MESSAGES}\
--delay-messages=${DELAY_MESSAGES}\
--nat=extip:${IP}\
${FUNCTION}\
${PUBSUB}\ ${PUBSUB}\
${CONTENT_TOPIC}\ ${CONTENT_TOPIC}\
${REST_PORT} ${CLUSTER_ID}\
${FUNCTION}\
${START_PUBLISHING_AFTER}\
${MIN_MESSAGE_SIZE}\
${MAX_MESSAGE_SIZE}
# --nat=extip:${IP}\
# --config-file=config.toml\ # --config-file=config.toml\

View File

@ -4,27 +4,36 @@ import
std/[sets, tables, strutils, sequtils, options, strformat], std/[sets, tables, strutils, sequtils, options, strformat],
chronos/timer as chtimer, chronos/timer as chtimer,
chronicles, chronicles,
chronos,
results results
import ./tester_message import ./tester_message
type type
ArrivalInfo = object
arrivedAt: Moment
prevArrivedAt: Moment
prevIndex: uint32
MessageInfo = tuple[msg: ProtocolTesterMessage, info: ArrivalInfo]
DupStat = tuple[hash: string, dupCount: int, size: uint64]
StatHelper = object StatHelper = object
prevIndex: uint32 prevIndex: uint32
prevArrivedAt: Moment prevArrivedAt: Moment
lostIndices: HashSet[uint32] lostIndices: HashSet[uint32]
seenIndices: HashSet[uint32] seenIndices: HashSet[uint32]
maxIndex: uint32 maxIndex: uint32
duplicates: OrderedTable[uint32, DupStat]
Statistics* = object Statistics* = object
received: Table[uint32, MessageInfo]
firstReceivedIdx*: uint32
allMessageCount*: uint32 allMessageCount*: uint32
receivedMessages*: uint32 receivedMessages*: uint32
misorderCount*: uint32 misorderCount*: uint32
lateCount*: uint32 lateCount*: uint32
duplicateCount*: uint32 duplicateCount*: uint32
minLatency*: Duration
maxLatency*: Duration
cummulativeLatency: Duration
helper: StatHelper helper: StatHelper
PerPeerStatistics* = Table[string, Statistics] PerPeerStatistics* = Table[string, Statistics]
@ -42,24 +51,33 @@ proc init*(T: type Statistics, expectedMessageCount: int = 1000): T =
result.helper.prevIndex = 0 result.helper.prevIndex = 0
result.helper.maxIndex = 0 result.helper.maxIndex = 0
result.helper.seenIndices.init(expectedMessageCount) result.helper.seenIndices.init(expectedMessageCount)
result.minLatency = nanos(0) result.received = initTable[uint32, MessageInfo](expectedMessageCount)
result.maxLatency = nanos(0)
result.cummulativeLatency = nanos(0)
return result return result
proc addMessage*(self: var Statistics, msg: ProtocolTesterMessage) = proc addMessage*(self: var Statistics, msg: ProtocolTesterMessage, msgHash: string) =
if self.allMessageCount == 0: if self.allMessageCount == 0:
self.allMessageCount = msg.count self.allMessageCount = msg.count
self.firstReceivedIdx = msg.index
elif self.allMessageCount != msg.count: elif self.allMessageCount != msg.count:
warn "Message count mismatch at message", error "Message count mismatch at message",
index = msg.index, expected = self.allMessageCount, got = msg.count index = msg.index, expected = self.allMessageCount, got = msg.count
if not self.helper.seenIndices.contains(msg.index): let currentArrived: MessageInfo = (
self.helper.seenIndices.incl(msg.index) msg: msg,
else: info: ArrivalInfo(
arrivedAt: Moment.now(),
prevArrivedAt: self.helper.prevArrivedAt,
prevIndex: self.helper.prevIndex,
),
)
if self.received.hasKeyOrPut(msg.index, currentArrived):
inc(self.duplicateCount) inc(self.duplicateCount)
warn "Duplicate message", index = msg.index self.helper.duplicates.mgetOrPut(msg.index, (msgHash, 0, msg.size)).dupCount.inc()
## just do not count into stats warn "Duplicate message",
index = msg.index,
hash = msgHash,
times_duplicated = self.helper.duplicates[msg.index].dupCount
return return
## detect misorder arrival and possible lost messages ## detect misorder arrival and possible lost messages
@ -67,70 +85,111 @@ proc addMessage*(self: var Statistics, msg: ProtocolTesterMessage) =
inc(self.misorderCount) inc(self.misorderCount)
warn "Misordered message arrival", warn "Misordered message arrival",
index = msg.index, expected = self.helper.prevIndex + 1 index = msg.index, expected = self.helper.prevIndex + 1
## collect possible lost message indicies
for idx in self.helper.prevIndex + 1 ..< msg.index:
self.helper.lostIndices.incl(idx)
elif self.helper.prevIndex > msg.index: elif self.helper.prevIndex > msg.index:
inc(self.lateCount) inc(self.lateCount)
warn "Late message arrival", index = msg.index, expected = self.helper.prevIndex + 1 warn "Late message arrival", index = msg.index, expected = self.helper.prevIndex + 1
else:
## may remove late arrival
self.helper.lostIndices.excl(msg.index)
## calculate latency
let currentArrivedAt = Moment.now()
let delaySincePrevArrived: Duration = currentArrivedAt - self.helper.prevArrivedAt
let expectedDelay: Duration = nanos(msg.sincePrev)
var latency: Duration
# if we have any latency...
if expectedDelay > delaySincePrevArrived:
latency = delaySincePrevArrived - expectedDelay
if self.minLatency.isZero or (latency < self.minLatency and latency > nanos(0)):
self.minLatency = latency
if latency > self.maxLatency:
self.maxLatency = latency
self.cummulativeLatency += latency
else:
warn "Negative latency detected",
index = msg.index, expected = expectedDelay, actual = delaySincePrevArrived
self.helper.maxIndex = max(self.helper.maxIndex, msg.index) self.helper.maxIndex = max(self.helper.maxIndex, msg.index)
self.helper.prevIndex = msg.index self.helper.prevIndex = msg.index
self.helper.prevArrivedAt = currentArrivedAt self.helper.prevArrivedAt = currentArrived.info.arrivedAt
inc(self.receivedMessages) inc(self.receivedMessages)
proc addMessage*( proc addMessage*(
self: var PerPeerStatistics, peerId: string, msg: ProtocolTesterMessage self: var PerPeerStatistics,
peerId: string,
msg: ProtocolTesterMessage,
msgHash: string,
) = ) =
if not self.contains(peerId): if not self.contains(peerId):
self[peerId] = Statistics.init() self[peerId] = Statistics.init()
discard catch: discard catch:
self[peerId].addMessage(msg) self[peerId].addMessage(msg, msgHash)
proc lossCount*(self: Statistics): uint32 = proc lossCount*(self: Statistics): uint32 =
self.helper.maxIndex - self.receivedMessages self.helper.maxIndex - self.receivedMessages
proc averageLatency*(self: Statistics): Duration = proc calcLatency*(self: Statistics): tuple[min, max, avg: Duration] =
if self.receivedMessages == 0: var
return nanos(0) minLatency = nanos(0)
return self.cummulativeLatency div self.receivedMessages maxLatency = nanos(0)
avgLatency = nanos(0)
if self.receivedMessages > 2:
try:
var prevArrivedAt = self.received[self.firstReceivedIdx].info.arrivedAt
for idx, (msg, arrival) in self.received.pairs:
if idx <= 1:
continue
let expectedDelay = nanos(msg.sincePrev)
## latency will be 0 if arrived in shorter time than expected
var latency = arrival.arrivedAt - arrival.prevArrivedAt - expectedDelay
## will not measure zero latency, it is unlikely to happen but in case happens could
## ditort the min latency calulculation as we want to calculate the feasible minimum.
if latency > nanos(0):
if minLatency == nanos(0):
minLatency = latency
else:
minLatency = min(minLatency, latency)
maxLatency = max(maxLatency, latency)
avgLatency += latency
avgLatency = avgLatency div (self.receivedMessages - 1)
except KeyError:
error "Error while calculating latency: " & getCurrentExceptionMsg()
return (minLatency, maxLatency, avgLatency)
proc missingIndices*(self: Statistics): seq[uint32] =
var missing: seq[uint32] = @[]
for idx in 1 .. self.helper.maxIndex:
if not self.received.hasKey(idx):
missing.add(idx)
return missing
proc distinctDupCount(self: Statistics): int {.inline.} =
return self.helper.duplicates.len()
proc allDuplicates(self: Statistics): int {.inline.} =
var total = 0
for _, (_, dupCount, _) in self.helper.duplicates.pairs:
total += dupCount
return total
proc dupMsgs(self: Statistics): string =
var dupMsgs: string = ""
for idx, (hash, dupCount, size) in self.helper.duplicates.pairs:
dupMsgs.add(
" index: " & $idx & " | hash: " & hash & " | count: " & $dupCount & " | size: " &
$size & "\n"
)
return dupMsgs
proc echoStat*(self: Statistics) = proc echoStat*(self: Statistics) =
let (minL, maxL, avgL) = self.calcLatency()
let printable = catch: let printable = catch:
"""*------------------------------------------------------------------------------------------* """*------------------------------------------------------------------------------------------*
| Expected | Received | Target | Loss | Misorder | Late | Duplicate | | Expected | Received | Target | Loss | Misorder | Late | |
|{self.helper.maxIndex:>11} |{self.receivedMessages:>11} |{self.allMessageCount:>11} |{self.lossCount():>11} |{self.misorderCount:>11} |{self.lateCount:>11} |{self.duplicateCount:>11} | |{self.helper.maxIndex:>11} |{self.receivedMessages:>11} |{self.allMessageCount:>11} |{self.lossCount():>11} |{self.misorderCount:>11} |{self.lateCount:>11} | |
*------------------------------------------------------------------------------------------* *------------------------------------------------------------------------------------------*
| Latency stat: | | Latency stat: |
| avg latency: {$self.averageLatency():<73}| | min latency: {$minL:<73}|
| min latency: {$self.maxLatency:<73}| | avg latency: {$avgL:<73}|
| max latency: {$self.minLatency:<73}| | max latency: {$maxL:<73}|
*------------------------------------------------------------------------------------------*
| Duplicate stat: |
| distinct duplicate messages: {$self.distinctDupCount():<57}|
| sum duplicates : {$self.allDuplicates():<57}|
Duplicated messages:
{self.dupMsgs()}
*------------------------------------------------------------------------------------------*
| Lost indices: |
| {self.missingIndices()} |
*------------------------------------------------------------------------------------------*""".fmt() *------------------------------------------------------------------------------------------*""".fmt()
if printable.isErr(): if printable.isErr():
@ -139,6 +198,8 @@ proc echoStat*(self: Statistics) =
echo printable.get() echo printable.get()
proc jsonStat*(self: Statistics): string = proc jsonStat*(self: Statistics): string =
let minL, maxL, avgL = self.calcLatency()
let json = catch: let json = catch:
"""{{"expected":{self.helper.maxIndex}, """{{"expected":{self.helper.maxIndex},
"received": {self.receivedMessages}, "received": {self.receivedMessages},
@ -148,10 +209,11 @@ proc jsonStat*(self: Statistics): string =
"late": {self.lateCount}, "late": {self.lateCount},
"duplicate": {self.duplicateCount}, "duplicate": {self.duplicateCount},
"latency": "latency":
{{"avg": "{self.averageLatency()}", {{"avg": "{avgL}",
"min": "{self.minLatency}", "min": "{minL}",
"max": "{self.maxLatency}" "max": "{maxL}"
}} }},
"lostIndices": {self.missingIndices()}
}}""".fmt() }}""".fmt()
if json.isErr: if json.isErr:
return "{\"result:\": \"" & json.error.msg & "\"}" return "{\"result:\": \"" & json.error.msg & "\"}"
@ -189,14 +251,25 @@ proc jsonStats*(self: PerPeerStatistics): string =
"{\"result:\": \"Error while generating json stats: " & getCurrentExceptionMsg() & "{\"result:\": \"Error while generating json stats: " & getCurrentExceptionMsg() &
"\"}" "\"}"
proc checkIfAllMessagesReceived*(self: PerPeerStatistics): bool = proc checkIfAllMessagesReceived*(self: PerPeerStatistics): Future[bool] {.async.} =
# if there are no peers have sent messages, assume we just have started. # if there are no peers have sent messages, assume we just have started.
if self.len == 0: if self.len == 0:
return false return false
for stat in self.values: for stat in self.values:
if (stat.allMessageCount == 0 and stat.receivedMessages == 0) or if (stat.allMessageCount == 0 and stat.receivedMessages == 0) or
stat.receivedMessages < stat.allMessageCount: stat.helper.maxIndex < stat.allMessageCount:
return false return false
## Ok, we see last message arrived from all peers,
## lets check if all messages are received
## and if not let's wait another 20 secs to give chance the system will send them.
var shallWait = false
for stat in self.values:
if stat.receivedMessages < stat.allMessageCount:
shallWait = true
if shallWait:
await sleepAsync(chtimer.seconds(20))
return true return true

View File

@ -20,14 +20,16 @@ import
common/confutils/envvar/std/net as confEnvvarNet, common/confutils/envvar/std/net as confEnvvarNet,
common/logging, common/logging,
factory/external_config, factory/external_config,
waku/waku_core, waku_core,
] ]
export confTomlDefs, confTomlNet, confEnvvarDefs, confEnvvarNet export confTomlDefs, confTomlNet, confEnvvarDefs, confEnvvarNet
const const
LitePubsubTopic* = PubsubTopic("/waku/2/rs/0/0") LitePubsubTopic* = PubsubTopic("/waku/2/rs/66/0")
LiteContentTopic* = ContentTopic("/tester/1/light-pubsub-example/proto") LiteContentTopic* = ContentTopic("/tester/1/light-pubsub-example/proto")
DefaultMinTestMessageSizeStr* = "1KiB"
DefaultMaxTestMessageSizeStr* = "150KiB"
type TesterFunctionality* = enum type TesterFunctionality* = enum
SENDER # pumps messages to the network SENDER # pumps messages to the network
@ -76,6 +78,12 @@ type LiteProtocolTesterConf* = object
desc: "Number of messages to send.", defaultValue: 120, name: "num-messages" desc: "Number of messages to send.", defaultValue: 120, name: "num-messages"
.}: uint32 .}: uint32
startPublishingAfter* {.
desc: "Wait number of seconds before start publishing messages.",
defaultValue: 5,
name: "start-publishing-after"
.}: uint32
delayMessages* {. delayMessages* {.
desc: "Delay between messages in milliseconds.", desc: "Delay between messages in milliseconds.",
defaultValue: 1000, defaultValue: 1000,
@ -105,8 +113,21 @@ type LiteProtocolTesterConf* = object
"Cluster id that the node is running in. Node in a different cluster id is disconnected.", "Cluster id that the node is running in. Node in a different cluster id is disconnected.",
defaultValue: 0, defaultValue: 0,
name: "cluster-id" name: "cluster-id"
.}: uint32 .}: uint16
minTestMessageSize* {.
desc:
"Minimum message size. Accepted units: KiB, KB, and B. e.g. 1024KiB; 1500 B; etc.",
defaultValue: DefaultMinTestMessageSizeStr,
name: "min-test-msg-size"
.}: string
maxTestMessageSize* {.
desc:
"Maximum message size. Accepted units: KiB, KB, and B. e.g. 1024KiB; 1500 B; etc.",
defaultValue: DefaultMaxTestMessageSizeStr,
name: "max-test-msg-size"
.}: string
## Tester REST service configuration ## Tester REST service configuration
restAddress* {. restAddress* {.
desc: "Listening address of the REST HTTP server.", desc: "Listening address of the REST HTTP server.",

View File

@ -15,6 +15,7 @@ type ProtocolTesterMessage* = object
startedAt*: int64 startedAt*: int64
sinceStart*: int64 sinceStart*: int64
sincePrev*: int64 sincePrev*: int64
size*: uint64
proc writeValue*( proc writeValue*(
writer: var JsonWriter[RestJson], value: ProtocolTesterMessage writer: var JsonWriter[RestJson], value: ProtocolTesterMessage
@ -26,6 +27,7 @@ proc writeValue*(
writer.writeField("startedAt", value.startedAt) writer.writeField("startedAt", value.startedAt)
writer.writeField("sinceStart", value.sinceStart) writer.writeField("sinceStart", value.sinceStart)
writer.writeField("sincePrev", value.sincePrev) writer.writeField("sincePrev", value.sincePrev)
writer.writeField("size", value.size)
writer.endRecord() writer.endRecord()
proc readValue*( proc readValue*(
@ -38,6 +40,7 @@ proc readValue*(
startedAt: Option[int64] startedAt: Option[int64]
sinceStart: Option[int64] sinceStart: Option[int64]
sincePrev: Option[int64] sincePrev: Option[int64]
size: Option[uint64]
for fieldName in readObjectFields(reader): for fieldName in readObjectFields(reader):
case fieldName case fieldName
@ -77,6 +80,12 @@ proc readValue*(
"Multiple `sincePrev` fields found", "ProtocolTesterMessage" "Multiple `sincePrev` fields found", "ProtocolTesterMessage"
) )
sincePrev = some(reader.readValue(int64)) sincePrev = some(reader.readValue(int64))
of "size":
if size.isSome():
reader.raiseUnexpectedField(
"Multiple `size` fields found", "ProtocolTesterMessage"
)
size = some(reader.readValue(uint64))
else: else:
unrecognizedFieldWarning() unrecognizedFieldWarning()
@ -98,6 +107,9 @@ proc readValue*(
if sincePrev.isNone(): if sincePrev.isNone():
reader.raiseUnexpectedValue("Field `sincePrev` is missing") reader.raiseUnexpectedValue("Field `sincePrev` is missing")
if size.isNone():
reader.raiseUnexpectedValue("Field `size` is missing")
value = ProtocolTesterMessage( value = ProtocolTesterMessage(
sender: sender.get(), sender: sender.get(),
index: index.get(), index: index.get(),
@ -105,4 +117,5 @@ proc readValue*(
startedAt: startedAt.get(), startedAt: startedAt.get(),
sinceStart: sinceStart.get(), sinceStart: sinceStart.get(),
sincePrev: sincePrev.get(), sincePrev: sincePrev.get(),
size: size.get(),
) )

View File

@ -33,7 +33,7 @@ proc startRestServerEsentials*(
nodeHealthMonitor: WakuNodeHealthMonitor, conf: WakuNodeConf nodeHealthMonitor: WakuNodeHealthMonitor, conf: WakuNodeConf
): Result[WakuRestServerRef, string] = ): Result[WakuRestServerRef, string] =
if not conf.rest: if not conf.rest:
return return ok(nil)
let requestErrorHandler: RestRequestErrorHandler = proc( let requestErrorHandler: RestRequestErrorHandler = proc(
error: RestRequestError, request: HttpRequestRef error: RestRequestError, request: HttpRequestRef
@ -113,7 +113,7 @@ proc startRestServerProtocolSupport*(
conf: WakuNodeConf, conf: WakuNodeConf,
): Result[void, string] = ): Result[void, string] =
if not conf.rest: if not conf.rest:
return return ok()
var router = restServer.router var router = restServer.router
## Admin REST API ## Admin REST API