LiteProtocolTester application and docker compose bundle setup. (#2706)

faster image build with copy from pre-built binary
   cluster-id to 0 
   Added README.md documentation
This commit is contained in:
NagyZoltanPeter 2024-05-21 23:03:33 +02:00 committed by GitHub
parent 7464684842
commit 58aa5e6895
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 7026 additions and 0 deletions

6
.gitignore vendored
View File

@ -66,3 +66,9 @@ coverage_html_report/
# Wildcard
*.ignore.*
# Ignore all possible node runner directories
**/keystore/
**/rln_tree/
**/certs/

View File

@ -214,6 +214,10 @@ chat2bridge: | build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
$(ENV_SCRIPT) nim chat2bridge $(NIM_PARAMS) waku.nims
liteprotocoltester: | build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
$(ENV_SCRIPT) nim liteprotocoltester $(NIM_PARAMS) waku.nims
################
## Waku tools ##

View File

@ -0,0 +1,58 @@
# BUILD NIM APP ----------------------------------------------------------------
FROM rust:1.77.1-alpine3.18 AS nim-build
ARG NIMFLAGS
ARG MAKE_TARGET=liteprotocoltester
ARG NIM_COMMIT
ARG LOG_LEVEL=TRACE
# Get build tools and required header files
RUN apk add --no-cache bash git build-base pcre-dev linux-headers curl jq
WORKDIR /app
COPY . .
# workaround for alpine issue: https://github.com/alpinelinux/docker-alpine/issues/383
RUN apk update && apk upgrade
# Ran separately from 'make' to avoid re-doing
RUN git submodule update --init --recursive
# Slowest build step for the sake of caching layers
RUN make -j$(nproc) deps QUICK_AND_DIRTY_COMPILER=1 ${NIM_COMMIT}
# Build the final node binary
RUN make -j$(nproc) ${NIM_COMMIT} $MAKE_TARGET LOG_LEVEL=${LOG_LEVEL} NIMFLAGS="${NIMFLAGS}"
# PRODUCTION IMAGE -------------------------------------------------------------
FROM alpine:3.18 as prod
ARG MAKE_TARGET=liteprotocoltester
LABEL maintainer="jakub@status.im"
LABEL source="https://github.com/waku-org/nwaku"
LABEL description="Lite Protocol Tester: Waku light-client"
LABEL commit="unknown"
LABEL version="unknown"
# DevP2P, LibP2P, and JSON RPC ports
EXPOSE 30303 60000 8545
# Referenced in the binary
RUN apk add --no-cache libgcc pcre-dev libpq-dev
# Fix for 'Error loading shared library libpcre.so.3: No such file or directory'
RUN ln -s /usr/lib/libpcre.so /usr/lib/libpcre.so.3
# Copy to separate location to accomodate different MAKE_TARGET values
COPY --from=nim-build /app/build/$MAKE_TARGET /usr/bin/
# Copy migration scripts for DB upgrades
COPY --from=nim-build /app/migrations/ /app/migrations/
ENTRYPOINT ["/usr/bin/liteprotocoltester"]
# By default just show help if called without arguments
CMD ["--help"]

View File

@ -0,0 +1,35 @@
# TESTING IMAGE --------------------------------------------------------------
## NOTICE: This is a short cut build file for ubuntu users who compiles nwaku in ubuntu distro.
## This is used for faster turnaround time for testing the compiled binary.
## Prerequisites: compiled liteprotocoltester binary in build/ directory
FROM ubuntu:noble as prod
LABEL maintainer="jakub@status.im"
LABEL source="https://github.com/waku-org/nwaku"
LABEL description="Lite Protocol Tester: Waku light-client"
LABEL commit="unknown"
LABEL version="unknown"
# DevP2P, LibP2P, and JSON RPC ports
EXPOSE 30303 60000 8545
# Referenced in the binary
RUN apt-get update && apt-get install -y --no-install-recommends \
libgcc1 \
libpcre3 \
libpq-dev \
wget \
iproute2 \
&& rm -rf /var/lib/apt/lists/*
# Fix for 'Error loading shared library libpcre.so.3: No such file or directory'
RUN ln -s /usr/lib/libpcre.so /usr/lib/libpcre.so.3
COPY build/liteprotocoltester /usr/bin/
ENTRYPOINT ["/usr/bin/liteprotocoltester"]
# # By default just show help if called without arguments
CMD ["--help"]

View File

@ -0,0 +1,87 @@
# Waku - Lite Protocol Tester
## Aim
Testing reliability of light client protocols in different scale.
Measure message delivery reliability and latency between light push client(s) and a filter client(s) node(s).
## Concept of testing
A tester node is configured either 'publisher' or 'receiver' and connects to a certain service node.
All service protocols are disabled except for lightpush client or filter client. This way we would like to simulate
a light client application.
Each publisher pumps messages to the network in a preconfigured way (number of messages, frequency) while on the receiver side
we would like to track and measure message losses, mis-ordered receives, late arrived messages and latencies.
Ideally the tester nodes will connect to different edge of the network where we can gather more result from mulitple publishers
and multiple receivers.
Publishers are fill all message payloads with information about the test message and sender, helping the receiver side to calculate results.
## Phases of development
### Phase 1
At the first phase we aims to demonstrate the concept of the testing all boundled into a docker-compose environment where we run
one service (full)node and a publisher and a receiver node.
At this stage we can only configure number of messages and fixed frequency of the message pump. We do not expect message losses and any significant latency hence the test setup is very simple.
### Further plans
- Add more configurability (randomized message sizes, usage of more content topics and support for static sharding).
- Extend collected metrics and polish reporting.
- Add test metrics to graphana dashboard.
- Support for static sharding and auto sharding for being able to test under different conditions.
- ...
## Usage
### Phase 1
Lite Protocol Tester application is built under name `liteprotocoltester` in apps/liteprotocoltester folder.
Starting from nwaku repository root:
```bash
make liteprotocoltester
cd apps/liteprotocoltester
docker compose build
docker compose up -d
docker compose logs -f receivernode
```
## Configure
### Environment variables for docker compose runs
| Variable | Description | Default |
| ---: | :--- | :--- |
| NUM_MESSAGES | Number of message to publish | 120 |
| DELAY_MESSAGES | Frequency of messages in milliseconds | 1000 |
| PUBSUB | Used pubsub_topic for testing | /waku/2/default-waku/proto |
| CONTENT_TOPIC | content_topic for testing | /tester/1/light-pubsub-example/proto |
### Lite Protocol Tester application cli options
| Option | Description | Default |
| :--- | :--- | :--- |
| --test_func | separation of PUBLISHER or RECEIVER mode | RECEIVER |
| --service-node| Address of the service node to use for lightpush and/or filter service | - |
| --num-messages | Number of message to publish | 120 |
| --delay-messages | Frequency of messages in milliseconds | 1000 |
| --pubsub-topic | Used pubsub_topic for testing | /waku/2/default-waku/proto |
| --content_topic | content_topic for testing | /tester/1/light-pubsub-example/proto |
| --cluster-id | Cluster id for the test | 0 |
| --config-file | TOML configuration file to fine tune the light waku node<br>Note that some configurations (full node services) are not taken into account | - |
| --nat |Same as wakunode "nat" configuration, appear here to ease test setup | any |
| --rest-address | For convenience rest configuration can be done here | 127.0.0.1 |
| --rest-port | For convenience rest configuration can be done here | 8654 |
| --rest-allow-origin | For convenience rest configuration can be done here | * |
| --log-level | Log level for the application | DEBUG |
| --log-format | Logging output format (TEXT or JSON) | TEXT |
### Docker image notice
Please note that currently to ease testing and development tester application docker image is based on ubuntu and uses the externally pre-built binary of 'liteprotocoltester'.
This speeds up image creation. Another dokcer build file is provided for proper build of boundle image.

View File

@ -0,0 +1,160 @@
version: "3.7"
x-logging: &logging
logging:
driver: json-file
options:
max-size: 1000m
# Environment variable definitions
x-eth-client-address: &eth_client_address ${ETH_CLIENT_ADDRESS:-} # Add your ETH_CLIENT_ADDRESS after the "-"
x-rln-environment: &rln_env
RLN_RELAY_CONTRACT_ADDRESS: ${RLN_RELAY_CONTRACT_ADDRESS:-0xF471d71E9b1455bBF4b85d475afb9BB0954A29c4}
RLN_RELAY_CRED_PATH: ${RLN_RELAY_CRED_PATH:-} # Optional: Add your RLN_RELAY_CRED_PATH after the "-"
RLN_RELAY_CRED_PASSWORD: ${RLN_RELAY_CRED_PASSWORD:-} # Optional: Add your RLN_RELAY_CRED_PASSWORD after the "-"
x-test-running-conditions: &test_running_conditions
NUM_MESSAGES: ${NUM_MESSAGES:-120}
DELAY_MESSAGES: "${DELAY_MESSAGES:-1000}"
PUBSUB: ${PUBSUB:-}
CONTENT_TOPIC: ${CONTENT_TOPIC:-}
# Services definitions
services:
servicenode:
image: ${NWAKU_IMAGE:-harbor.status.im/wakuorg/nwaku:latest}
ports:
- 30304:30304/tcp
- 30304:30304/udp
- 9005:9005/udp
- 127.0.0.1:8003:8003
- 80:80 #Let's Encrypt
- 8000:8000/tcp #WSS
- 127.0.0.1:8645:8645
<<:
- *logging
environment:
DOMAIN: ${DOMAIN}
RLN_RELAY_CRED_PASSWORD: "${RLN_RELAY_CRED_PASSWORD}"
ETH_CLIENT_ADDRESS: *eth_client_address
EXTRA_ARGS: ${EXTRA_ARGS}
<<:
- *rln_env
volumes:
- ./run_service_node.sh:/opt/run_service_node.sh:Z
- ${CERTS_DIR:-./certs}:/etc/letsencrypt/:Z
- ./rln_tree:/etc/rln_tree/:Z
- ./keystore:/keystore:Z
entrypoint: sh
command:
- /opt/run_service_node.sh
publishernode:
image: waku.liteprotocoltester:latest
build:
context: ../..
dockerfile: ./apps/liteprotocoltester/Dockerfile.liteprotocoltester.copy
ports:
# - 30304:30304/tcp
# - 30304:30304/udp
# - 9005:9005/udp
# - 127.0.0.1:8003:8003
# - 80:80 #Let's Encrypt
# - 8000:8000/tcp #WSS
- 127.0.0.1:8646:8646
<<:
- *logging
environment:
DOMAIN: ${DOMAIN}
RLN_RELAY_CRED_PASSWORD: "${RLN_RELAY_CRED_PASSWORD}"
ETH_CLIENT_ADDRESS: *eth_client_address
EXTRA_ARGS: ${EXTRA_ARGS}
<<:
- *rln_env
- *test_running_conditions
volumes:
- ./run_tester_node.sh:/opt/run_tester_node.sh:Z
- ${CERTS_DIR:-./certs}:/etc/letsencrypt/:Z
- ./rln_tree:/etc/rln_tree/:Z
- ./keystore:/keystore:Z
entrypoint: sh
command:
- /opt/run_tester_node.sh
- SENDER
depends_on:
- servicenode
configs:
- source: cfg_tester_node.toml
target: config.toml
receivernode:
image: waku.liteprotocoltester:latest
build:
context: ../..
dockerfile: ./apps/liteprotocoltester/Dockerfile.liteprotocoltester.copy
ports:
# - 30304:30304/tcp
# - 30304:30304/udp
# - 9005:9005/udp
# - 127.0.0.1:8003:8003
# - 80:80 #Let's Encrypt
# - 8000:8000/tcp #WSS
- 127.0.0.1:8647:8647
<<:
- *logging
environment:
DOMAIN: ${DOMAIN}
RLN_RELAY_CRED_PASSWORD: "${RLN_RELAY_CRED_PASSWORD}"
ETH_CLIENT_ADDRESS: *eth_client_address
EXTRA_ARGS: ${EXTRA_ARGS}
<<:
- *rln_env
- *test_running_conditions
volumes:
- ./run_tester_node.sh:/opt/run_tester_node.sh:Z
- ${CERTS_DIR:-./certs}:/etc/letsencrypt/:Z
- ./rln_tree:/etc/rln_tree/:Z
- ./keystore:/keystore:Z
entrypoint: sh
command:
- /opt/run_tester_node.sh
- RECEIVER
depends_on:
- servicenode
- publishernode
configs:
- source: cfg_tester_node.toml
target: config.toml
prometheus:
image: docker.io/prom/prometheus:latest
volumes:
- ./monitoring/prometheus-config.yml:/etc/prometheus/prometheus.yml:Z
command:
- --config.file=/etc/prometheus/prometheus.yml
ports:
- 127.0.0.1:9090:9090
depends_on:
- servicenode
grafana:
image: docker.io/grafana/grafana:latest
env_file:
- ./monitoring/configuration/grafana-plugins.env
volumes:
- ./monitoring/configuration/grafana.ini:/etc/grafana/grafana.ini:Z
- ./monitoring/configuration/dashboards.yaml:/etc/grafana/provisioning/dashboards/dashboards.yaml:Z
- ./monitoring/configuration/datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml:Z
- ./monitoring/configuration/dashboards:/var/lib/grafana/dashboards/:Z
- ./monitoring/configuration/customizations/custom-logo.svg:/usr/share/grafana/public/img/grafana_icon.svg:Z
- ./monitoring/configuration/customizations/custom-logo.svg:/usr/share/grafana/public/img/grafana_typelogo.svg:Z
- ./monitoring/configuration/customizations/custom-logo.png:/usr/share/grafana/public/img/fav32.png:Z
ports:
- 0.0.0.0:3000:3000
depends_on:
- prometheus
configs:
cfg_tester_node.toml:
content: |
max-connections = 100

View File

@ -0,0 +1,121 @@
## Example showing how a resource restricted client may
## subscribe to messages without relay
import
std/options,
system/ansi_c,
chronicles,
chronos,
chronos/timer as chtimer,
stew/byteutils,
stew/results,
serialization,
json_serialization as js,
times
import
../../../waku/common/logging,
../../../waku/node/peer_manager,
../../../waku/waku_node,
../../../waku/waku_core,
../../../waku/waku_filter_v2/client,
./tester_config,
./tester_message,
./statistics
proc unsubscribe(
wakuNode: WakuNode,
filterPeer: RemotePeerInfo,
filterPubsubTopic: PubsubTopic,
filterContentTopic: ContentTopic,
) {.async.} =
notice "unsubscribing from filter"
let unsubscribeRes = await wakuNode.wakuFilterClient.unsubscribe(
filterPeer, filterPubsubTopic, @[filterContentTopic]
)
if unsubscribeRes.isErr:
notice "unsubscribe request failed", err = unsubscribeRes.error
else:
notice "unsubscribe request successful"
proc maintainSubscription(
wakuNode: WakuNode,
filterPeer: RemotePeerInfo,
filterPubsubTopic: PubsubTopic,
filterContentTopic: ContentTopic,
) {.async.} =
while true:
trace "maintaining subscription"
# First use filter-ping to check if we have an active subscription
let pingRes = await wakuNode.wakuFilterClient.ping(filterPeer)
if pingRes.isErr():
# No subscription found. Let's subscribe.
trace "no subscription found. Sending subscribe request"
let subscribeRes = await wakuNode.filterSubscribe(
some(filterPubsubTopic), filterContentTopic, filterPeer
)
if subscribeRes.isErr():
trace "subscribe request failed. Quitting.", err = subscribeRes.error
break
else:
trace "subscribe request successful."
else:
trace "subscription found."
await sleepAsync(chtimer.seconds(60)) # Subscription maintenance interval
proc setupAndSubscribe*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) =
if isNil(wakuNode.wakuFilterClient):
error "WakuFilterClient not initialized"
return
info "Start receiving messages to service node using lightpush",
serviceNode = conf.serviceNode
var stats: PerPeerStatistics
let remotePeer = parsePeerInfo(conf.serviceNode).valueOr:
error "Couldn't parse the peer info properly", error = error
return
let pushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.async.} =
let payloadStr = string.fromBytes(message.payload)
let testerMessage = js.Json.decode(payloadStr, ProtocolTesterMessage)
stats.addMessage(testerMessage.sender, testerMessage)
trace "message received",
index = testerMessage.index,
count = testerMessage.count,
startedAt = $testerMessage.startedAt,
sinceStart = $testerMessage.sinceStart,
sincePrev = $testerMessage.sincePrev
wakuNode.wakuFilterClient.registerPushHandler(pushHandler)
let interval = millis(20000)
var printStats: CallbackFunc
printStats = CallbackFunc(
proc(udata: pointer) {.gcsafe.} =
stats.echoStats()
if stats.checkIfAllMessagesReceived():
waitFor unsubscribe(
wakuNode, remotePeer, conf.pubsubTopics[0], conf.contentTopics[0]
)
info "All messages received. Exiting."
## for gracefull shutdown through signal hooks
discard c_raise(ansi_c.SIGTERM)
else:
discard setTimer(Moment.fromNow(interval), printStats)
)
discard setTimer(Moment.fromNow(interval), printStats)
# Start maintaining subscription
asyncSpawn maintainSubscription(
wakuNode, remotePeer, conf.pubsubTopics[0], conf.contentTopics[0]
)

View File

@ -0,0 +1,112 @@
import
std/strformat,
system/ansi_c,
chronicles,
chronos,
stew/byteutils,
stew/results,
json_serialization as js
import
../../../waku/common/logging,
../../../waku/waku_node,
../../../waku/node/peer_manager,
../../../waku/waku_core,
../../../waku/waku_lightpush/client,
./tester_config,
./tester_message
proc prepareMessage(
sender: string,
messageIndex, numMessages: uint32,
startedAt: TimeStamp,
prevMessageAt: var Timestamp,
contentTopic: ContentTopic,
): WakuMessage =
let current = getNowInNanosecondTime()
let payload = ProtocolTesterMessage(
sender: sender,
index: messageIndex,
count: numMessages,
startedAt: startedAt,
sinceStart: current - startedAt,
sincePrev: current - prevMessageAt,
)
prevMessageAt = current
let text = js.Json.encode(payload)
let message = WakuMessage(
payload: toBytes(text), # content of the message
contentTopic: contentTopic, # content topic to publish to
ephemeral: true, # tell store nodes to not store it
timestamp: current, # current timestamp
)
return message
proc publishMessages(
wakuNode: WakuNode,
lightpushPubsubTopic: PubsubTopic,
lightpushContentTopic: ContentTopic,
numMessages: uint32,
delayMessages: Duration,
) {.async.} =
let startedAt = getNowInNanosecondTime()
var prevMessageAt = startedAt
var failedToSendCount: uint32 = 0
let selfPeerId = $wakuNode.switch.peerInfo.peerId
var messagesSent: uint32 = 1
while numMessages >= messagesSent:
let message = prepareMessage(
selfPeerId, messagesSent, numMessages, startedAt, prevMessageAt,
lightpushContentTopic,
)
let wlpRes = await wakuNode.lightpushPublish(some(lightpushPubsubTopic), message)
if wlpRes.isOk():
info "published message using lightpush",
index = messagesSent, count = numMessages
else:
error "failed to publish message using lightpush", err = wlpRes.error
inc(failedToSendCount)
await sleepAsync(delayMessages) # Publish every 5 seconds
inc(messagesSent)
let report = catch:
"""*----------------------------------------*
| Expected | Sent | Failed |
|{numMessages:>11} |{messagesSent-failedToSendCount-1:>11} |{failedToSendCount:>11} |
*----------------------------------------*""".fmt()
if report.isErr:
echo "Error while printing statistics"
else:
echo report.get()
discard c_raise(ansi_c.SIGTERM)
proc setupAndPublish*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) =
if isNil(wakuNode.wakuLightpushClient):
error "WakuFilterClient not initialized"
return
# give some time to receiver side to set up
# TODO: this maybe done in more sphisticated way, though.
let waitTillStartTesting = 5.seconds
info "Sending test messages in", wait = waitTillStartTesting
waitFor sleepAsync(waitTillStartTesting)
info "Start sending messages to service node using lightpush"
# Start maintaining subscription
asyncSpawn publishMessages(
wakuNode,
conf.pubsubTopics[0],
conf.contentTopics[0],
conf.numMessages,
conf.delayMessages.milliseconds,
)

View File

@ -0,0 +1,194 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[options, strutils, os, sequtils, net],
chronicles,
chronos,
metrics,
libbacktrace,
system/ansi_c,
libp2p/crypto/crypto,
confutils
import
../../waku/common/logging,
../../waku/factory/waku,
../../waku/factory/external_config,
../../waku/node/health_monitor,
../../waku/node/waku_metrics,
../../waku/waku_api/rest/builder as rest_server_builder,
./tester_config,
./lightpush_publisher,
./filter_subscriber
logScope:
topics = "liteprotocoltester main"
proc logConfig(conf: LiteProtocolTesterConf) =
info "Configuration: Lite protocol tester", conf = $conf
{.pop.}
when isMainModule:
## Node setup happens in 6 phases:
## 1. Set up storage
## 2. Initialize node
## 3. Mount and initialize configured protocols
## 4. Start node and mounted protocols
## 5. Start monitoring tools and external interfaces
## 6. Setup graceful shutdown hooks
const versionString = "version / git commit hash: " & waku.git_version
let confRes = LiteProtocolTesterConf.load(version = versionString)
if confRes.isErr():
error "failure while loading the configuration", error = confRes.error
quit(QuitFailure)
var conf = confRes.get()
## Logging setup
logging.setupLog(conf.logLevel, conf.logFormat)
info "Running Lite Protocol Tester node", version = waku.git_version
logConfig(conf)
##Prepare Waku configuration
## - load from config file
## - override according to tester functionality
##
var wakuConf: WakuNodeConf
if conf.configFile.isSome():
try:
var configFile {.threadvar.}: InputFile
configFile = conf.configFile.get()
wakuConf = WakuNodeConf.load(
version = versionString,
printUsage = false,
secondarySources = proc(
wnconf: WakuNodeConf, sources: auto
) {.gcsafe, raises: [ConfigurationError].} =
echo "Loading secondary configuration file into WakuNodeConf"
sources.addConfigFile(Toml, configFile)
,
)
except CatchableError:
error "Loading Waku configuration failed", error = getCurrentExceptionMsg()
quit(QuitFailure)
wakuConf.logLevel = conf.logLevel
wakuConf.logFormat = conf.logFormat
wakuConf.staticNodes = @[conf.serviceNode]
wakuConf.nat = conf.nat
wakuConf.maxConnections = 100
wakuConf.restAddress = conf.restAddress
wakuConf.restPort = conf.restPort
wakuConf.restAllowOrigin = conf.restAllowOrigin
wakuConf.pubsubTopics = conf.pubsubTopics
wakuConf.contentTopics = conf.contentTopics
wakuConf.clusterId = conf.clusterId
## TODO: Depending on the tester needs we might extend here with shards, clusterId, etc...
if conf.testFunc == TesterFunctionality.SENDER:
wakuConf.lightpushnode = conf.serviceNode
else:
wakuConf.filterNode = conf.serviceNode
wakuConf.relay = false
wakuConf.filter = false
wakuConf.lightpush = false
wakuConf.store = false
wakuConf.rest = true
# NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it
# It will always be called from main thread anyway.
# Ref: https://nim-lang.org/docs/manual.html#threads-gc-safety
var nodeHealthMonitor {.threadvar.}: WakuNodeHealthMonitor
nodeHealthMonitor = WakuNodeHealthMonitor()
nodeHealthMonitor.setOverallHealth(HealthStatus.INITIALIZING)
let restServer = rest_server_builder.startRestServerEsentials(
nodeHealthMonitor, wakuConf
).valueOr:
error "Starting esential REST server failed.", error = $error
quit(QuitFailure)
var wakuApp = Waku.init(wakuConf).valueOr:
error "Waku initialization failed", error = error
quit(QuitFailure)
wakuApp.restServer = restServer
nodeHealthMonitor.setNode(wakuApp.node)
(waitFor startWaku(addr wakuApp)).isOkOr:
error "Starting waku failed", error = error
quit(QuitFailure)
rest_server_builder.startRestServerProtocolSupport(
restServer, wakuApp.node, wakuApp.wakuDiscv5, wakuConf
).isOkOr:
error "Starting protocols support REST server failed.", error = $error
quit(QuitFailure)
wakuApp.metricsServer = waku_metrics.startMetricsServerAndLogging(wakuConf).valueOr:
error "Starting monitoring and external interfaces failed", error = error
quit(QuitFailure)
nodeHealthMonitor.setOverallHealth(HealthStatus.READY)
debug "Setting up shutdown hooks"
## Setup shutdown hooks for this process.
## Stop node gracefully on shutdown.
proc asyncStopper(wakuApp: Waku) {.async: (raises: [Exception]).} =
nodeHealthMonitor.setOverallHealth(HealthStatus.SHUTTING_DOWN)
await wakuApp.stop()
quit(QuitSuccess)
# Handle Ctrl-C SIGINT
proc handleCtrlC() {.noconv.} =
when defined(windows):
# workaround for https://github.com/nim-lang/Nim/issues/4057
setupForeignThreadGc()
notice "Shutting down after receiving SIGINT"
asyncSpawn asyncStopper(wakuApp)
setControlCHook(handleCtrlC)
# Handle SIGTERM
when defined(posix):
proc handleSigterm(signal: cint) {.noconv.} =
notice "Shutting down after receiving SIGTERM"
asyncSpawn asyncStopper(wakuApp)
c_signal(ansi_c.SIGTERM, handleSigterm)
# Handle SIGSEGV
when defined(posix):
proc handleSigsegv(signal: cint) {.noconv.} =
# Require --debugger:native
fatal "Shutting down after receiving SIGSEGV", stacktrace = getBacktrace()
# Not available in -d:release mode
writeStackTrace()
waitFor wakuApp.stop()
quit(QuitFailure)
c_signal(ansi_c.SIGSEGV, handleSigsegv)
info "Node setup complete"
if conf.testFunc == TesterFunctionality.SENDER:
setupAndPublish(wakuApp.node, conf)
else:
setupAndSubscribe(wakuApp.node, conf)
runForever()

Binary file not shown.

After

Width:  |  Height:  |  Size: 11 KiB

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 13 KiB

View File

@ -0,0 +1,9 @@
apiVersion: 1
providers:
- name: 'Prometheus'
orgId: 1
folder: ''
type: file
options:
path: /var/lib/grafana/dashboards

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,11 @@
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
access: proxy
org_id: 1
url: http://prometheus:9090
is_default: true
version: 1
editable: true

View File

@ -0,0 +1,2 @@
#GF_INSTALL_PLUGINS=grafana-worldmap-panel,grafana-piechart-panel,digrich-bubblechart-panel,yesoreyeram-boomtheme-panel,briangann-gauge-panel,jdbranham-diagram-panel,agenty-flowcharting-panel,citilogics-geoloop-panel,savantly-heatmap-panel,mtanda-histogram-panel,pierosavi-imageit-panel,michaeldmoore-multistat-panel,zuburqan-parity-report-panel,natel-plotly-panel,bessler-pictureit-panel,grafana-polystat-panel,corpglory-progresslist-panel,snuids-radar-panel,fzakaria-simple-config.config.annotations-datasource,vonage-status-panel,snuids-trafficlights-panel,pr0ps-trackmap-panel,alexandra-trackmap-panel,btplc-trend-box-panel
GF_INSTALL_PLUGINS=grafana-worldmap-panel,grafana-piechart-panel,yesoreyeram-boomtheme-panel,briangann-gauge-panel,pierosavi-imageit-panel,bessler-pictureit-panel,vonage-status-panel

View File

@ -0,0 +1,51 @@
instance_name = nwaku dashboard
;[dashboards.json]
;enabled = true
;path = /home/git/grafana/grafana-dashboards/dashboards
#################################### Auth ##########################
[auth]
disable_login_form = false
#################################### Anonymous Auth ##########################
[auth.anonymous]
# enable anonymous access
enabled = true
# specify organization name that should be used for unauthenticated users
;org_name = Public
# specify role for unauthenticated users
org_role = Admin
; org_role = Viewer
;[security]
;admin_user = ocr
;admin_password = ocr
;[users]
# disable user signup / registration
;allow_sign_up = false
# Set to true to automatically assign new users to the default organization (id 1)
;auto_assign_org = true
# Default role new users will be automatically assigned (if disabled above is set to true)
;auto_assign_org_role = Viewer
#################################### SMTP / Emailing ##########################
;[smtp]
;enabled = false
;host = localhost:25
;user =
;password =
;cert_file =
;key_file =
;skip_verify = false
;from_address = admin@grafana.localhost
;[emails]
;welcome_email_on_sign_up = false

View File

@ -0,0 +1,284 @@
pg_replication:
query: "SELECT CASE WHEN NOT pg_is_in_recovery() THEN 0 ELSE GREATEST (0, EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))) END AS lag"
master: true
metrics:
- lag:
usage: "GAUGE"
description: "Replication lag behind master in seconds"
pg_postmaster:
query: "SELECT pg_postmaster_start_time as start_time_seconds from pg_postmaster_start_time()"
master: true
metrics:
- start_time_seconds:
usage: "GAUGE"
description: "Time at which postmaster started"
pg_stat_user_tables:
query: |
SELECT
current_database() datname,
schemaname,
relname,
seq_scan,
seq_tup_read,
idx_scan,
idx_tup_fetch,
n_tup_ins,
n_tup_upd,
n_tup_del,
n_tup_hot_upd,
n_live_tup,
n_dead_tup,
n_mod_since_analyze,
COALESCE(last_vacuum, '1970-01-01Z') as last_vacuum,
COALESCE(last_autovacuum, '1970-01-01Z') as last_autovacuum,
COALESCE(last_analyze, '1970-01-01Z') as last_analyze,
COALESCE(last_autoanalyze, '1970-01-01Z') as last_autoanalyze,
vacuum_count,
autovacuum_count,
analyze_count,
autoanalyze_count
FROM
pg_stat_user_tables
metrics:
- datname:
usage: "LABEL"
description: "Name of current database"
- schemaname:
usage: "LABEL"
description: "Name of the schema that this table is in"
- relname:
usage: "LABEL"
description: "Name of this table"
- seq_scan:
usage: "COUNTER"
description: "Number of sequential scans initiated on this table"
- seq_tup_read:
usage: "COUNTER"
description: "Number of live rows fetched by sequential scans"
- idx_scan:
usage: "COUNTER"
description: "Number of index scans initiated on this table"
- idx_tup_fetch:
usage: "COUNTER"
description: "Number of live rows fetched by index scans"
- n_tup_ins:
usage: "COUNTER"
description: "Number of rows inserted"
- n_tup_upd:
usage: "COUNTER"
description: "Number of rows updated"
- n_tup_del:
usage: "COUNTER"
description: "Number of rows deleted"
- n_tup_hot_upd:
usage: "COUNTER"
description: "Number of rows HOT updated (i.e., with no separate index update required)"
- n_live_tup:
usage: "GAUGE"
description: "Estimated number of live rows"
- n_dead_tup:
usage: "GAUGE"
description: "Estimated number of dead rows"
- n_mod_since_analyze:
usage: "GAUGE"
description: "Estimated number of rows changed since last analyze"
- last_vacuum:
usage: "GAUGE"
description: "Last time at which this table was manually vacuumed (not counting VACUUM FULL)"
- last_autovacuum:
usage: "GAUGE"
description: "Last time at which this table was vacuumed by the autovacuum daemon"
- last_analyze:
usage: "GAUGE"
description: "Last time at which this table was manually analyzed"
- last_autoanalyze:
usage: "GAUGE"
description: "Last time at which this table was analyzed by the autovacuum daemon"
- vacuum_count:
usage: "COUNTER"
description: "Number of times this table has been manually vacuumed (not counting VACUUM FULL)"
- autovacuum_count:
usage: "COUNTER"
description: "Number of times this table has been vacuumed by the autovacuum daemon"
- analyze_count:
usage: "COUNTER"
description: "Number of times this table has been manually analyzed"
- autoanalyze_count:
usage: "COUNTER"
description: "Number of times this table has been analyzed by the autovacuum daemon"
pg_statio_user_tables:
query: "SELECT current_database() datname, schemaname, relname, heap_blks_read, heap_blks_hit, idx_blks_read, idx_blks_hit, toast_blks_read, toast_blks_hit, tidx_blks_read, tidx_blks_hit FROM pg_statio_user_tables"
metrics:
- datname:
usage: "LABEL"
description: "Name of current database"
- schemaname:
usage: "LABEL"
description: "Name of the schema that this table is in"
- relname:
usage: "LABEL"
description: "Name of this table"
- heap_blks_read:
usage: "COUNTER"
description: "Number of disk blocks read from this table"
- heap_blks_hit:
usage: "COUNTER"
description: "Number of buffer hits in this table"
- idx_blks_read:
usage: "COUNTER"
description: "Number of disk blocks read from all indexes on this table"
- idx_blks_hit:
usage: "COUNTER"
description: "Number of buffer hits in all indexes on this table"
- toast_blks_read:
usage: "COUNTER"
description: "Number of disk blocks read from this table's TOAST table (if any)"
- toast_blks_hit:
usage: "COUNTER"
description: "Number of buffer hits in this table's TOAST table (if any)"
- tidx_blks_read:
usage: "COUNTER"
description: "Number of disk blocks read from this table's TOAST table indexes (if any)"
- tidx_blks_hit:
usage: "COUNTER"
description: "Number of buffer hits in this table's TOAST table indexes (if any)"
# WARNING: This set of metrics can be very expensive on a busy server as every unique query executed will create an additional time series
pg_stat_statements:
query: "SELECT t2.rolname, t3.datname, queryid, calls, ( total_plan_time + total_exec_time ) / 1000 as total_time_seconds, ( min_plan_time + min_exec_time ) / 1000 as min_time_seconds, ( max_plan_time + max_exec_time ) / 1000 as max_time_seconds, ( mean_plan_time + mean_exec_time ) / 1000 as mean_time_seconds, ( stddev_plan_time + stddev_exec_time ) / 1000 as stddev_time_seconds, rows, shared_blks_hit, shared_blks_read, shared_blks_dirtied, shared_blks_written, local_blks_hit, local_blks_read, local_blks_dirtied, local_blks_written, temp_blks_read, temp_blks_written, blk_read_time / 1000 as blk_read_time_seconds, blk_write_time / 1000 as blk_write_time_seconds FROM pg_stat_statements t1 JOIN pg_roles t2 ON (t1.userid=t2.oid) JOIN pg_database t3 ON (t1.dbid=t3.oid) WHERE t2.rolname != 'rdsadmin' AND queryid IS NOT NULL"
master: true
metrics:
- rolname:
usage: "LABEL"
description: "Name of user"
- datname:
usage: "LABEL"
description: "Name of database"
- queryid:
usage: "LABEL"
description: "Query ID"
- calls:
usage: "COUNTER"
description: "Number of times executed"
- total_time_seconds:
usage: "COUNTER"
description: "Total time spent in the statement, in milliseconds"
- min_time_seconds:
usage: "GAUGE"
description: "Minimum time spent in the statement, in milliseconds"
- max_time_seconds:
usage: "GAUGE"
description: "Maximum time spent in the statement, in milliseconds"
- mean_time_seconds:
usage: "GAUGE"
description: "Mean time spent in the statement, in milliseconds"
- stddev_time_seconds:
usage: "GAUGE"
description: "Population standard deviation of time spent in the statement, in milliseconds"
- rows:
usage: "COUNTER"
description: "Total number of rows retrieved or affected by the statement"
- shared_blks_hit:
usage: "COUNTER"
description: "Total number of shared block cache hits by the statement"
- shared_blks_read:
usage: "COUNTER"
description: "Total number of shared blocks read by the statement"
- shared_blks_dirtied:
usage: "COUNTER"
description: "Total number of shared blocks dirtied by the statement"
- shared_blks_written:
usage: "COUNTER"
description: "Total number of shared blocks written by the statement"
- local_blks_hit:
usage: "COUNTER"
description: "Total number of local block cache hits by the statement"
- local_blks_read:
usage: "COUNTER"
description: "Total number of local blocks read by the statement"
- local_blks_dirtied:
usage: "COUNTER"
description: "Total number of local blocks dirtied by the statement"
- local_blks_written:
usage: "COUNTER"
description: "Total number of local blocks written by the statement"
- temp_blks_read:
usage: "COUNTER"
description: "Total number of temp blocks read by the statement"
- temp_blks_written:
usage: "COUNTER"
description: "Total number of temp blocks written by the statement"
- blk_read_time_seconds:
usage: "COUNTER"
description: "Total time the statement spent reading blocks, in milliseconds (if track_io_timing is enabled, otherwise zero)"
- blk_write_time_seconds:
usage: "COUNTER"
description: "Total time the statement spent writing blocks, in milliseconds (if track_io_timing is enabled, otherwise zero)"
pg_process_idle:
query: |
WITH
metrics AS (
SELECT
application_name,
SUM(EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - state_change))::bigint)::float AS process_idle_seconds_sum,
COUNT(*) AS process_idle_seconds_count
FROM pg_stat_activity
WHERE state = 'idle'
GROUP BY application_name
),
buckets AS (
SELECT
application_name,
le,
SUM(
CASE WHEN EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - state_change)) <= le
THEN 1
ELSE 0
END
)::bigint AS bucket
FROM
pg_stat_activity,
UNNEST(ARRAY[1, 2, 5, 15, 30, 60, 90, 120, 300]) AS le
GROUP BY application_name, le
ORDER BY application_name, le
)
SELECT
application_name,
process_idle_seconds_sum as seconds_sum,
process_idle_seconds_count as seconds_count,
ARRAY_AGG(le) AS seconds,
ARRAY_AGG(bucket) AS seconds_bucket
FROM metrics JOIN buckets USING (application_name)
GROUP BY 1, 2, 3
metrics:
- application_name:
usage: "LABEL"
description: "Application Name"
- seconds:
usage: "HISTOGRAM"
description: "Idle time of server processes"
pg_tb_stats:
query: |
select pubsubtopic, count(*) AS messages FROM (SELECT id, array_agg(pubsubtopic ORDER BY pubsubtopic) AS pubsubtopic FROM messages GROUP BY id) sub GROUP BY pubsubtopic ORDER BY pubsubtopic;
metrics:
- pubsubtopic:
usage: "LABEL"
description: "pubsubtopic"
- messages:
usage: "GAUGE"
description: "Number of messages for the given pubsub topic"
pg_tb_messages:
query: |
SELECT
COUNT(ID)
FROM messages
metrics:
- count:
usage: "GAUGE"
description: "Row count in `messages` table"

View File

@ -0,0 +1,9 @@
auth_modules:
mypostgres:
type: userpass
userpass:
username: postgres
password: ${POSTGRES_PASSWORD}
options:
# options become key=value parameters of the DSN
sslmode: disable

View File

@ -0,0 +1,10 @@
global:
scrape_interval: 15s
evaluation_interval: 15s
external_labels:
monitor: "Monitoring"
scrape_configs:
- job_name: "nwaku"
static_configs:
- targets: ["nwaku:8003"]

View File

@ -0,0 +1,29 @@
#!/bin/sh
echo "I am a service node"
IP=$(ip a | grep "inet " | grep -Fv 127.0.0.1 | sed 's/.*inet \([^/]*\).*/\1/')
echo "Service node IP: ${IP}"
exec /usr/bin/wakunode\
--relay=true\
--filter=true\
--lightpush=true\
--store=false\
--rest=true\
--rest-admin=true\
--rest-private=true\
--rest-address=0.0.0.0\
--rest-allow-origin="*"\
--keep-alive=true\
--max-connections=300\
--dns-discovery=true\
--discv5-discovery=true\
--discv5-enr-auto-update=True\
--log-level=DEBUG\
--metrics-server=True\
--metrics-server-address=0.0.0.0\
--nodekey=e3f5e64568b3a612dee609f6e7c0203c501dab6131662922bdcbcabd474281d5\
--nat=extip:${IP}\
--pubsub-topic=/waku/2/default-waku/proto\
--cluster-id=0

View File

@ -0,0 +1,76 @@
#!/bin/sh
if test -f .env; then
echo "Using .env file"
. $(pwd)/.env
fi
IP=$(ip a | grep "inet " | grep -Fv 127.0.0.1 | sed 's/.*inet \([^/]*\).*/\1/')
echo "I am a lite-protocol-tester node"
# Get an unique node index based on the container's IP
FOURTH_OCTET=${IP##*.}
THIRD_OCTET="${IP%.*}"; THIRD_OCTET="${THIRD_OCTET##*.}"
NODE_INDEX=$((FOURTH_OCTET + 256 * THIRD_OCTET))
echo "NODE_INDEX $NODE_INDEX"
RETRIES=${RETRIES:=10}
while [ -z "${SERIVCE_NODE_ADDR}" ] && [ ${RETRIES} -ge 0 ]; do
SERIVCE_NODE_ADDR=$(wget -qO- http://servicenode:8645/debug/v1/info --header='Content-Type:application/json' 2> /dev/null | sed 's/.*"listenAddresses":\["\([^"]*\)".*/\1/');
echo "Service node not ready, retrying (retries left: ${RETRIES})"
sleep 1
RETRIES=$(( $RETRIES - 1 ))
done
if [ -z "${SERIVCE_NODE_ADDR}" ]; then
echo "Could not get SERIVCE_NODE_ADDR and none provided. Failing"
exit 1
fi
if [ -n "${PUBSUB}" ]; then
PUBSUB=--pubsub-topic="${PUBSUB}"
fi
if [ -n "${CONTENT_TOPIC}" ]; then
CONTENT_TOPIC=--content-topic="${CONTENT_TOPIC}"
fi
FUNCTION=$1
echo "Tester node: ${FUNCTION}"
REST_PORT=--rest-port=8647
if [ "${FUNCTION}" = "SENDER" ]; then
FUNCTION=--test-func=SENDER
REST_PORT=--rest-port=8646
fi
if [ "${FUNCTION}" = "RECEIVER" ]; then
FUNCTION=--test-func=RECEIVER
REST_PORT=--rest-port=8647
fi
if [ -z "${FUNCTION}" ]; then
FUNCTION=--test-func=RECEIVER
fi
echo "Using service node: ${SERIVCE_NODE_ADDR}"
exec /usr/bin/liteprotocoltester\
--log-level=DEBUG\
--service-node="${SERIVCE_NODE_ADDR}"\
--pubsub-topic=/waku/2/default-waku/proto\
--cluster-id=0\
--num-messages=${NUM_MESSAGES}\
--delay-messages=${DELAY_MESSAGES}\
--nat=extip:${IP}\
${FUNCTION}\
${PUBSUB}\
${CONTENT_TOPIC}\
${REST_PORT}
# --config-file=config.toml\

View File

@ -0,0 +1,201 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[sets, tables, strutils, sequtils, options, strformat],
chronos/timer as chtimer,
chronicles,
results
import ./tester_message
type
StatHelper = object
prevIndex: uint32
prevArrivedAt: Moment
lostIndices: HashSet[uint32]
seenIndices: HashSet[uint32]
Statistics* = object
allMessageCount*: uint32
receivedMessages*: uint32
misorderCount*: uint32
lateCount*: uint32
duplicateCount*: uint32
minLatency*: Duration
maxLatency*: Duration
cummulativeLatency: Duration
helper: StatHelper
PerPeerStatistics* = Table[string, Statistics]
func `$`*(a: Duration): string {.inline.} =
## Original stringify implementation from chronos/timer.nim is not capable of printing 0ns
## Returns string representation of Duration ``a`` as nanoseconds value.
if a.isZero:
return "0ns"
return chtimer.`$`(a)
proc init*(T: type Statistics, expectedMessageCount: int = 1000): T =
result.helper.prevIndex = 0
result.helper.seenIndices.init(expectedMessageCount)
result.minLatency = nanos(0)
result.maxLatency = nanos(0)
result.cummulativeLatency = nanos(0)
return result
proc addMessage*(self: var Statistics, msg: ProtocolTesterMessage) =
if self.allMessageCount == 0:
self.allMessageCount = msg.count
elif self.allMessageCount != msg.count:
warn "Message count mismatch at message",
index = msg.index, expected = self.allMessageCount, got = msg.count
if not self.helper.seenIndices.contains(msg.index):
self.helper.seenIndices.incl(msg.index)
else:
inc(self.duplicateCount)
warn "Duplicate message", index = msg.index
## just do not count into stats
return
## detect misorder arrival and possible lost messages
if self.helper.prevIndex + 1 < msg.index:
inc(self.misorderCount)
warn "Misordered message arrival",
index = msg.index, expected = self.helper.prevIndex + 1
## collect possible lost message indicies
for idx in self.helper.prevIndex + 1 ..< msg.index:
self.helper.lostIndices.incl(idx)
elif self.helper.prevIndex > msg.index:
inc(self.lateCount)
warn "Late message arrival", index = msg.index, expected = self.helper.prevIndex + 1
else:
## may remove late arrival
self.helper.lostIndices.excl(msg.index)
## calculate latency
let currentArrivedAt = Moment.now()
let delaySincePrevArrived: Duration = currentArrivedAt - self.helper.prevArrivedAt
let expectedDelay: Duration = nanos(msg.sincePrev)
var latency: Duration
# if we have any latency...
if expectedDelay > delaySincePrevArrived:
latency = delaySincePrevArrived - expectedDelay
if self.minLatency.isZero or (latency < self.minLatency and latency > nanos(0)):
self.minLatency = latency
if latency > self.maxLatency:
self.maxLatency = latency
self.cummulativeLatency += latency
else:
warn "Negative latency detected",
index = msg.index, expected = expectedDelay, actual = delaySincePrevArrived
self.helper.prevIndex = msg.index
self.helper.prevArrivedAt = currentArrivedAt
inc(self.receivedMessages)
proc addMessage*(
self: var PerPeerStatistics, peerId: string, msg: ProtocolTesterMessage
) =
if not self.contains(peerId):
self[peerId] = Statistics.init()
discard catch:
self[peerId].addMessage(msg)
proc lossCount*(self: Statistics): uint32 =
self.allMessageCount - self.receivedMessages
proc averageLatency*(self: Statistics): Duration =
if self.receivedMessages == 0:
return nanos(0)
return self.cummulativeLatency div self.receivedMessages
proc echoStat*(self: Statistics) =
let printable = catch:
"""*-----------------------------------------------------------------------------*
| Expected | Reveived | Loss | Misorder | Late | Duplicate |
|{self.allMessageCount:>11} |{self.receivedMessages:>11} |{self.lossCount():>11} |{self.misorderCount:>11} |{self.lateCount:>11} |{self.duplicateCount:>11} |
*-----------------------------------------------------------------------------*
| Latency stat: |
| avg latency: {$self.averageLatency():<60}|
| min latency: {$self.maxLatency:<60}|
| max latency: {$self.minLatency:<60}|
*-----------------------------------------------------------------------------*""".fmt()
if printable.isErr():
echo "Error while printing statistics: " & printable.error().msg
else:
echo printable.get()
proc jsonStat*(self: Statistics): string =
let json = catch:
"""{{"expected":{self.allMessageCount},
"received": {self.receivedMessages},
"loss": {self.lossCount()},
"misorder": {self.misorderCount},
"late": {self.lateCount},
"duplicate": {self.duplicateCount},
"latency":
{{"avg": "{self.averageLatency()}",
"min": "{self.minLatency}",
"max": "{self.maxLatency}"
}}
}}""".fmt()
if json.isErr:
return "{\"result:\": \"" & json.error.msg & "\"}"
return json.get()
proc echoStats*(self: var PerPeerStatistics) =
for peerId, stats in self.pairs:
let peerLine = catch:
"Receiver statistics from peer {peerId}".fmt()
if peerLine.isErr:
echo "Error while printing statistics"
else:
echo peerLine.get()
stats.echoStat()
proc jsonStats*(self: PerPeerStatistics): string =
try:
#!fmt: off
var json = "{\"statistics\": ["
var first = true
for peerId, stats in self.pairs:
if first:
first = false
else:
json.add(", ")
json.add("{{\"sender\": \"{peerId}\", \"stat\":".fmt())
json.add(stats.jsonStat())
json.add("}")
json.add("]}")
return json
#!fmt: on
except CatchableError:
return
"{\"result:\": \"Error while generating json stats: " & getCurrentExceptionMsg() &
"\"}"
proc checkIfAllMessagesReceived*(self: PerPeerStatistics): bool =
# if there are no peers have sent messages, assume we just have started.
if self.len == 0:
return false
for stat in self.values:
if (stat.allMessageCount == 0 and stat.receivedMessages == 0) or
stat.receivedMessages < stat.allMessageCount:
return false
return true

View File

@ -0,0 +1,146 @@
import
std/[strutils, strformat],
stew/results,
chronos,
regex,
confutils,
confutils/defs,
confutils/std/net,
confutils/toml/defs as confTomlDefs,
confutils/toml/std/net as confTomlNet,
libp2p/crypto/crypto,
libp2p/crypto/secp,
libp2p/multiaddress,
nimcrypto/utils,
secp256k1,
json
import
../../waku/common/confutils/envvar/defs as confEnvvarDefs,
../../waku/common/confutils/envvar/std/net as confEnvvarNet,
../../waku/common/logging,
../../waku/factory/external_config,
../../waku/waku_core
export confTomlDefs, confTomlNet, confEnvvarDefs, confEnvvarNet
const
LitePubsubTopic* = PubsubTopic("/waku/2/default-waku/proto")
LiteContentTopic* = ContentTopic("/tester/1/light-pubsub-example/proto")
type TesterFunctionality* = enum
SENDER # pumps messages to the network
RECEIVER # gather and analyze messages from the network
type LiteProtocolTesterConf* = object
configFile* {.
desc:
"Loads configuration from a TOML file (cmd-line parameters take precedence) for the light waku node",
name: "config-file"
.}: Option[InputFile]
## Log configuration
logLevel* {.
desc:
"Sets the log level for process. Supported levels: TRACE, DEBUG, INFO, NOTICE, WARN, ERROR or FATAL",
defaultValue: logging.LogLevel.DEBUG,
name: "log-level"
.}: logging.LogLevel
logFormat* {.
desc:
"Specifies what kind of logs should be written to stdout. Suported formats: TEXT, JSON",
defaultValue: logging.LogFormat.TEXT,
name: "log-format"
.}: logging.LogFormat
## Test configuration
servicenode* {.desc: "Peer multiaddr of the service node.", name: "service-node".}:
string
nat* {.
desc:
"Specify method to use for determining public address. " &
"Must be one of: any, none, upnp, pmp, extip:<IP>.",
defaultValue: "any"
.}: string
testFunc* {.
desc: "Specifies the lite protocol tester side. Supported values: sender, receiver.",
defaultValue: TesterFunctionality.RECEIVER,
name: "test-func"
.}: TesterFunctionality
numMessages* {.
desc: "Number of messages to send.", defaultValue: 120, name: "num-messages"
.}: uint32
delayMessages* {.
desc: "Delay between messages in milliseconds.",
defaultValue: 1000,
name: "delay-messages"
.}: uint32
pubsubTopics* {.
desc: "Default pubsub topic to subscribe to. Argument may be repeated.",
defaultValue: @[LitePubsubTopic],
name: "pubsub-topic"
.}: seq[PubsubTopic]
## TODO: extend lite protocol tester configuration based on testing needs
# shards* {.
# desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.",
# defaultValue: @[],
# name: "shard"
# .}: seq[ShardIdx]
contentTopics* {.
desc: "Default content topic to subscribe to. Argument may be repeated.",
defaultValue: @[LiteContentTopic],
name: "content-topic"
.}: seq[ContentTopic]
clusterId* {.
desc:
"Cluster id that the node is running in. Node in a different cluster id is disconnected.",
defaultValue: 0,
name: "cluster-id"
.}: uint32
## Tester REST service configuration
restAddress* {.
desc: "Listening address of the REST HTTP server.",
defaultValue: parseIpAddress("127.0.0.1"),
name: "rest-address"
.}: IpAddress
restPort* {.
desc: "Listening port of the REST HTTP server.",
defaultValue: 8654,
name: "rest-port"
.}: uint16
restAllowOrigin* {.
desc:
"Allow cross-origin requests from the specified origin." &
"Argument may be repeated." & "Wildcards: * or ? allowed." &
"Ex.: \"localhost:*\" or \"127.0.0.1:8080\"",
defaultValue: @["*"],
name: "rest-allow-origin"
.}: seq[string]
{.push warning[ProveInit]: off.}
proc load*(T: type LiteProtocolTesterConf, version = ""): ConfResult[T] =
try:
let conf = LiteProtocolTesterConf.load(
version = version,
secondarySources = proc(
conf: LiteProtocolTesterConf, sources: auto
) {.gcsafe, raises: [ConfigurationError].} =
sources.addConfigFile(Envvar, InputFile("liteprotocoltester"))
,
)
ok(conf)
except CatchableError:
err(getCurrentExceptionMsg())
{.pop.}

View File

@ -0,0 +1,111 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
chronicles,
json_serialization,
json_serialization/std/options,
json_serialization/lexer
import ../../waku/waku_api/rest/serdes
type ProtocolTesterMessage* = object
sender*: string
index*: uint32
count*: uint32
startedAt*: int64
sinceStart*: int64
sincePrev*: int64
proc writeValue*(
writer: var JsonWriter[RestJson], value: ProtocolTesterMessage
) {.raises: [IOError].} =
writer.beginRecord()
writer.writeField("sender", value.sender)
writer.writeField("index", value.index)
writer.writeField("count", value.count)
writer.writeField("startedAt", value.startedAt)
writer.writeField("sinceStart", value.sinceStart)
writer.writeField("sincePrev", value.sincePrev)
writer.endRecord()
proc readValue*(
reader: var JsonReader[RestJson], value: var ProtocolTesterMessage
) {.gcsafe, raises: [SerializationError, IOError].} =
var
sender: Option[string]
index: Option[uint32]
count: Option[uint32]
startedAt: Option[int64]
sinceStart: Option[int64]
sincePrev: Option[int64]
for fieldName in readObjectFields(reader):
case fieldName
of "sender":
if sender.isSome():
reader.raiseUnexpectedField(
"Multiple `sender` fields found", "ProtocolTesterMessage"
)
sender = some(reader.readValue(string))
of "index":
if index.isSome():
reader.raiseUnexpectedField(
"Multiple `index` fields found", "ProtocolTesterMessage"
)
index = some(reader.readValue(uint32))
of "count":
if count.isSome():
reader.raiseUnexpectedField(
"Multiple `count` fields found", "ProtocolTesterMessage"
)
count = some(reader.readValue(uint32))
of "startedAt":
if startedAt.isSome():
reader.raiseUnexpectedField(
"Multiple `startedAt` fields found", "ProtocolTesterMessage"
)
startedAt = some(reader.readValue(int64))
of "sinceStart":
if sinceStart.isSome():
reader.raiseUnexpectedField(
"Multiple `sinceStart` fields found", "ProtocolTesterMessage"
)
sinceStart = some(reader.readValue(int64))
of "sincePrev":
if sincePrev.isSome():
reader.raiseUnexpectedField(
"Multiple `sincePrev` fields found", "ProtocolTesterMessage"
)
sincePrev = some(reader.readValue(int64))
else:
unrecognizedFieldWarning()
if sender.isNone():
reader.raiseUnexpectedValue("Field `sender` is missing")
if index.isNone():
reader.raiseUnexpectedValue("Field `index` is missing")
if count.isNone():
reader.raiseUnexpectedValue("Field `count` is missing")
if startedAt.isNone():
reader.raiseUnexpectedValue("Field `startedAt` is missing")
if sinceStart.isNone():
reader.raiseUnexpectedValue("Field `sinceStart` is missing")
if sincePrev.isNone():
reader.raiseUnexpectedValue("Field `sincePrev` is missing")
value = ProtocolTesterMessage(
sender: sender.get(),
index: index.get(),
count: count.get(),
startedAt: startedAt.get(),
sinceStart: sinceStart.get(),
sincePrev: sincePrev.get(),
)

View File

@ -102,6 +102,10 @@ task chat2bridge, "Build chat2bridge":
let name = "chat2bridge"
buildBinary name, "apps/chat2bridge/"
task liteprotocoltester, "Build liteprotocoltester":
let name = "liteprotocoltester"
buildBinary name, "apps/liteprotocoltester/"
### C Bindings
task libwakuStatic, "Build the cbindings waku node library":
let name = "libwaku"