389 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
"""
RLN end-to-end test against a running logos-delivery-simulator stack.
Designed to run as a sidecar container on the simulator's Docker network so
hostnames like `logos-delivery-simulator-nwaku-1` resolve via Docker DNS.
Scenarios covered (in order):
1. HEALTH - every node responds to /debug/v1/info with an enrUri
2. SUBSCRIBE - every node REST-subscribes to the pubsub topic
3. WITHIN_LIMIT - every node concurrently sends msg_limit messages -> 200
4. PROPAGATION - one sender's message lands in all peers' inboxes
5. OVER_LIMIT - one extra message per node -> 500 (rate-limit hit)
6. EPOCH_RESET - after epoch_sec, every node can send 1 more -> 200
7. SAME_MESSAGE_ID - sending same message_id twice in same epoch is the
slashable signal (verified by checking node logs)
Exit code:
0 = all scenarios passed
N = number of scenarios that failed
Usage (typical):
docker run --rm \\
--network logos-delivery-simulator_simulation \\
-v /path/to/rln-e2e-test.py:/test.py \\
python:3.11-slim \\
sh -c 'pip install --quiet requests && python /test.py \\
--hostname-prefix logos-delivery-simulator-nwaku- \\
--num-nodes 30 --msg-limit 30 --epoch-sec 15'
"""
import argparse
import base64
import concurrent.futures as cf
import json
import os
import sys
import time
import urllib.parse
from dataclasses import dataclass
from typing import Optional
import requests
PUBSUB_TOPIC = "/waku/2/rs/66/0"
CONTENT_TOPIC = "/rln-test/1/probe/proto"
# ---------------------------------------------------------------------------
# helpers
# ---------------------------------------------------------------------------
def url_of(host: str, port: int = 8645) -> str:
return f"http://{host}:{port}"
def waku_publish(node_url: str, payload: bytes, timeout: float = 5.0) -> int:
body = {
"payload": base64.b64encode(payload).decode("ascii"),
"contentTopic": CONTENT_TOPIC,
"version": 1,
"timestamp": time.time_ns(),
}
enc = urllib.parse.quote(PUBSUB_TOPIC, safe="")
try:
r = requests.post(
f"{node_url}/relay/v1/messages/{enc}",
json=body,
timeout=timeout,
headers={"content-type": "application/json"},
)
return r.status_code
except requests.RequestException:
return -1
def waku_subscribe(node_url: str, timeout: float = 5.0) -> int:
try:
r = requests.post(
f"{node_url}/relay/v1/subscriptions",
json=[PUBSUB_TOPIC],
timeout=timeout,
headers={"content-type": "application/json"},
)
return r.status_code
except requests.RequestException:
return -1
def waku_get_messages(node_url: str, timeout: float = 5.0) -> Optional[list]:
enc = urllib.parse.quote(PUBSUB_TOPIC, safe="")
try:
r = requests.get(
f"{node_url}/relay/v1/messages/{enc}",
timeout=timeout,
)
if r.status_code != 200:
return None
return r.json()
except (requests.RequestException, json.JSONDecodeError):
return None
def node_healthy(node_url: str, timeout: float = 3.0) -> bool:
try:
r = requests.get(f"{node_url}/debug/v1/info", timeout=timeout)
return r.status_code == 200 and "enrUri" in r.json()
except (requests.RequestException, json.JSONDecodeError):
return False
# ---------------------------------------------------------------------------
# scenarios
# ---------------------------------------------------------------------------
@dataclass
class Result:
name: str
ok: bool
detail: str = ""
def __str__(self) -> str:
status = "PASS" if self.ok else "FAIL"
s = f"[{status}] {self.name}"
if self.detail:
s += f"{self.detail}"
return s
def scenario_health(nodes: list[str], deadline_sec: int = 120) -> Result:
"""Every node must be reachable within deadline_sec."""
start = time.time()
unhealthy = list(nodes)
while time.time() - start < deadline_sec and unhealthy:
with cf.ThreadPoolExecutor(max_workers=min(32, len(unhealthy))) as ex:
results = list(ex.map(node_healthy, [url_of(n) for n in unhealthy]))
unhealthy = [n for n, ok in zip(unhealthy, results) if not ok]
if unhealthy:
time.sleep(3)
return Result(
"HEALTH",
not unhealthy,
f"{len(nodes) - len(unhealthy)}/{len(nodes)} healthy"
+ (f"; failing: {unhealthy[:5]}" if unhealthy else ""),
)
def scenario_subscribe(nodes: list[str]) -> Result:
"""REST-subscribe every node to the pubsub topic so GETs return cached msgs."""
with cf.ThreadPoolExecutor(max_workers=min(32, len(nodes))) as ex:
codes = list(ex.map(waku_subscribe, [url_of(n) for n in nodes]))
bad = [(n, c) for n, c in zip(nodes, codes) if c != 200]
return Result(
"SUBSCRIBE",
not bad,
f"{len(nodes) - len(bad)}/{len(nodes)} subscribed"
+ (f"; failing: {bad[:5]}" if bad else ""),
)
def _send_n(node_url: str, n: int) -> list[int]:
codes = []
for i in range(n):
codes.append(waku_publish(node_url, f"probe-{i}".encode()))
return codes
def _burst_until_blocked(node_url: str, msg_limit: int, overshoot: int = 3):
"""Send msg_limit+overshoot messages back-to-back, fast, recording codes.
Designed to complete inside a single epoch — keep epoch_sec large enough
that this burst can't straddle an epoch boundary.
Returns (n_200, n_500, n_transport_err, two_hundred_after_block) where
two_hundred_after_block flags a 200 appearing AFTER the first 500 (i.e.
quota reset mid-burst => epoch straddle)."""
codes = []
for i in range(msg_limit + overshoot):
codes.append(waku_publish(node_url, f"burst-{i}".encode(), timeout=10.0))
n_200 = sum(c == 200 for c in codes)
n_500 = sum(c == 500 for c in codes)
n_err = sum(c not in (200, 500) for c in codes) # -1, 4xx transient, etc.
first_block_idx = next((i for i, c in enumerate(codes) if c == 500), None)
two_hundred_after_block = (
first_block_idx is not None
and any(c == 200 for c in codes[first_block_idx + 1:])
)
return n_200, n_500, n_err, two_hundred_after_block
def _publish_until_ok(node_url: str, attempts: int = 20, spacing: float = 5.0) -> bool:
"""Retry a single publish until it returns 200 or attempts run out.
Tolerates the post-startup window where discv5/gossipsub mesh is still
forming and the RLN publish path transiently 500s."""
for _ in range(attempts):
if waku_publish(node_url, b"warmup", timeout=10.0) == 200:
return True
time.sleep(spacing)
return False
def scenario_warmup(nodes: list[str], attempts: int = 20) -> Result:
"""Readiness gate: every node must successfully publish at least once.
This absorbs mesh-formation churn so PROPAGATION/RATE_LIMIT aren't
judging a not-yet-connected fleet. Consumes 1 nonce/node — well within
msg_limit, and RATE_LIMIT's tolerance accounts for it."""
with cf.ThreadPoolExecutor(max_workers=min(8, len(nodes))) as ex:
ready = list(ex.map(lambda n: _publish_until_ok(url_of(n), attempts), nodes))
not_ready = [n for n, ok in zip(nodes, ready) if not ok]
return Result(
"WARMUP",
not not_ready,
f"{len(nodes) - len(not_ready)}/{len(nodes)} nodes publishing"
+ (f"; never ready: {not_ready[:5]}" if not_ready else ""),
)
def scenario_rate_limit(nodes: list[str], msg_limit: int, tolerance: int = 3) -> Result:
"""Per-node burst of msg_limit+3 messages within one epoch.
The RLN invariant being checked:
(a) a node must NEVER publish more than msg_limit in one epoch, and
(b) the node must enforce a 500 ceiling once the quota is exhausted.
Transient HTTP errors under concurrent load can lower the accepted count
below msg_limit — that does NOT violate the invariant, so we accept
successes in [msg_limit - tolerance, msg_limit]. successes > msg_limit OR
a 200 after the first 500 means the epoch rolled mid-burst (raise
RLN_RELAY_EPOCH_SEC) — reported as a timing skew, not an RLN failure."""
# Cap concurrency: firing len(nodes)*(msg_limit+3) publishes all at once
# saturates small CI runners (2 vCPU) and causes publish-path timeouts
# that masquerade as rate-limit failures.
with cf.ThreadPoolExecutor(max_workers=min(5, len(nodes))) as ex:
per_node = list(
ex.map(lambda n: _burst_until_blocked(url_of(n), msg_limit), nodes)
)
rate_failures = [] # genuine RLN misbehaviour
timing_skews = [] # epoch straddled mid-burst — inconclusive
for node, (n_200, n_500, n_err, after_block) in zip(nodes, per_node):
if n_200 > msg_limit or after_block:
timing_skews.append(
(node, f"{n_200} ok, epoch rolled mid-burst (raise epoch_sec)")
)
elif n_500 == 0:
rate_failures.append((node, f"no 500 ceiling ({n_200} ok, {n_err} err)"))
elif n_200 < msg_limit - tolerance:
rate_failures.append(
(node, f"only {n_200}/{msg_limit} ok ({n_err} transport err)")
)
if timing_skews and not rate_failures:
return Result(
"RATE_LIMIT",
False,
f"INCONCLUSIVE (timing) — raise RLN_RELAY_EPOCH_SEC; "
f"{len(timing_skews)} node(s) straddled an epoch: {timing_skews[:3]}",
)
ok = not rate_failures and not timing_skews
good = len(nodes) - len(rate_failures) - len(timing_skews)
return Result(
"RATE_LIMIT",
ok,
f"{good}/{len(nodes)} nodes enforced <= {msg_limit} then 500 "
f"(tolerance {tolerance} for transport noise)"
+ (f"; rate failures: {rate_failures[:3]}" if rate_failures else "")
+ (f"; timing skews: {timing_skews[:3]}" if timing_skews else ""),
)
def scenario_propagation(
sender: str, receivers: list[str], settle_sec: int = 5
) -> Result:
"""Send one message on `sender`, expect it visible in every receiver's
REST inbox within settle_sec."""
marker = f"propagation-marker-{time.time_ns()}".encode()
code = waku_publish(url_of(sender), marker)
if code != 200:
return Result("PROPAGATION", False, f"sender publish returned {code}")
time.sleep(settle_sec)
missing = []
with cf.ThreadPoolExecutor(max_workers=min(32, len(receivers))) as ex:
inboxes = list(ex.map(waku_get_messages, [url_of(r) for r in receivers]))
encoded_marker = base64.b64encode(marker).decode().rstrip("=")
for r, inbox in zip(receivers, inboxes):
if inbox is None:
missing.append((r, "GET failed"))
continue
# Look for our marker payload in any message
found = any(
(m.get("payload") or "").rstrip("=") == encoded_marker
for m in inbox
)
if not found:
missing.append((r, f"{len(inbox)} msgs, marker not present"))
return Result(
"PROPAGATION",
not missing,
f"{len(receivers) - len(missing)}/{len(receivers)} receivers got the message"
+ (f"; missing on {missing[:3]}" if missing else ""),
)
def scenario_epoch_reset(nodes: list[str], epoch_sec: int) -> Result:
"""After epoch_sec + slack, each node can send 1 more message — expect 200."""
sleep_s = epoch_sec + 3
print(f" sleeping {sleep_s}s for epoch reset...")
time.sleep(sleep_s)
with cf.ThreadPoolExecutor(max_workers=len(nodes)) as ex:
codes = list(
ex.map(
lambda n: waku_publish(url_of(n), b"post-epoch"),
nodes,
)
)
bad = [(n, c) for n, c in zip(nodes, codes) if c != 200]
return Result(
"EPOCH_RESET",
not bad,
f"{sum(c == 200 for c in codes)}/{len(nodes)} returned 200 after epoch reset"
+ (f"; failing: {bad[:3]}" if bad else ""),
)
# ---------------------------------------------------------------------------
# main
# ---------------------------------------------------------------------------
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("--hostname-prefix", default="logos-delivery-simulator-nwaku-")
ap.add_argument("--num-nodes", type=int, default=30)
ap.add_argument("--msg-limit", type=int, default=30,
help="Must match RLN_RELAY_MSG_LIMIT in simulator .env")
ap.add_argument("--epoch-sec", type=int, default=15,
help="Must match RLN_RELAY_EPOCH_SEC in simulator .env")
ap.add_argument("--health-deadline-sec", type=int, default=180)
args = ap.parse_args()
nodes = [f"{args.hostname_prefix}{i}" for i in range(1, args.num_nodes + 1)]
print(f"Testing {len(nodes)} nodes: {nodes[0]}{nodes[-1]}")
print(f"Config: msg_limit={args.msg_limit}, epoch_sec={args.epoch_sec}")
print()
results: list[Result] = []
def run(scenario_fn, *fn_args, **fn_kwargs) -> bool:
r = scenario_fn(*fn_args, **fn_kwargs)
results.append(r)
print(r)
return r.ok
if not run(scenario_health, nodes, deadline_sec=args.health_deadline_sec):
print("\nABORTING — nodes never reached healthy state.")
return _summarize(results)
if not run(scenario_subscribe, nodes):
print("\nABORTING — could not subscribe nodes to pubsub topic.")
return _summarize(results)
# Readiness gate: wait out mesh-formation churn before judging behaviour.
if not run(scenario_warmup, nodes):
print("\nABORTING — fleet never reached a publishable state.")
return _summarize(results)
run(scenario_propagation, nodes[0], nodes[1:])
# Rate limit: per-node burst, asserts exactly msg_limit then 500.
# Requires epoch_sec large enough that the burst can't straddle an epoch.
run(scenario_rate_limit, nodes, args.msg_limit)
run(scenario_epoch_reset, nodes, args.epoch_sec)
return _summarize(results)
def _summarize(results: list[Result]) -> int:
print()
print("=" * 64)
passed = sum(r.ok for r in results)
print(f" {passed}/{len(results)} scenarios passed")
for r in results:
print(f" {r}")
print("=" * 64)
return len(results) - passed
if __name__ == "__main__":
sys.exit(main())