work on the rest of tests (#134)

* work on the rest of tests

* Add debug tests

* Add rest debug levels tests

* Add rest APIs tests

* Fix non working tests

* Adding more tests

* Add final set of tests
This commit is contained in:
AYAHASSAN287 2025-08-26 14:04:40 +03:00 committed by GitHub
parent 9769730bac
commit 2fd56d4aa3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 422 additions and 5 deletions

View File

@ -184,3 +184,7 @@ class REST(BaseClient):
def get_debug_version(self):
return self.rest_call("get", "debug/v1/version").text.strip()
def get_peer(self, peer_id: str):
resp = self.rest_call("get", f"admin/v1/peer/{peer_id}")
return resp.json()

View File

@ -574,3 +574,6 @@ class WakuNode:
def get_debug_version(self):
return self._api.get_debug_version()
def get_peer_info(self, peer_id: str):
return self._api.get_peer(peer_id)

View File

@ -1,4 +1,4 @@
import pytest
import pytest, time, re, os
from src.env_vars import NODE_1, NODE_2, STRESS_ENABLED
from src.libs.common import delay
from src.libs.custom_logger import get_custom_logger
@ -17,6 +17,24 @@ These tests make sure thst REST flags related to admin flags acting as expected
class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush):
TAGS = ["TRC", "DBG", "INF", "NTC", "WRN", "ERR", "FTL"]
LEVEL_RE = re.compile(r'"lvl"\s*:\s*"(TRC|DBG|INF|NTC|WRN|ERR|FTL)"|\b(TRC|DBG|INF|NTC|WRN|ERR|FTL)\b')
def _read_tail_counts(self, path: str, start_size: int) -> dict:
with open(path, "rb") as f:
f.seek(start_size)
text = f.read().decode(errors="ignore")
counts = {t: 0 for t in self.TAGS}
for a, b in self.LEVEL_RE.findall(text):
counts[(a or b)] += 1
return counts
def _trigger(self):
self.node1.info()
self.node1.get_version()
self.node1.get_debug_version()
@pytest.fixture(scope="function", autouse=True)
def nodes(self):
self.node1 = WakuNode(NODE_2, f"node1_{self.test_id}")
@ -24,6 +42,14 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush):
self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}")
self.node4 = WakuNode(NODE_2, f"node3_{self.test_id}")
def _tail(self, path, start_size):
with open(path, "rb") as f:
f.seek(start_size)
return f.read().decode(errors="ignore")
def _count_levels(self, text, levels):
return {lvl: len(re.findall(getattr(self, f"{lvl}_RE"), text)) for lvl in levels}
def test_admin_filter_subscriptions_shape(self):
self.node1.start(filter="true", relay="true")
self.node2.start(relay="false", filternode=self.node1.get_multiaddr_with_id(), discv5_bootstrap_node=self.node1.get_enr_uri())
@ -38,12 +64,19 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush):
assert subs[0]["filterCriteria"][0]["pubsubTopic"] == self.test_pubsub_topic, "pubsub topic doesn't match"
assert subs[0]["filterCriteria"][0]["contentTopic"] == self.test_content_topic, "content topic doesn't match"
def test_admin_peers_stats_shape(self):
self.node1.start(relay="true")
def test_admin_peers_stats_schema(self):
self.node1.start(filter="true", relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
stats = self.node1.get_peer_stats()
logger.debug(f"Node admin peers stats {stats}")
logger.debug(f"Peer stats schema check: {stats}")
assert isinstance(stats, dict), "stats must be a dict"
for k in ("Sum", "Relay peers"):
assert k in stats, f"missing section: {k}"
assert isinstance(stats[k], dict), f"{k} must be a dict"
assert isinstance(stats["Sum"].get("Total peers", 0), int) and stats["Sum"]["Total peers"] >= 0, "Sum.Total peers must be a non-negative int"
assert (
isinstance(stats["Relay peers"].get("Total relay peers", 0), int) and stats["Relay peers"]["Total relay peers"] >= 0
), "Relay peers.Total relay peers must be a non-negative int"
def test_admin_peers_stats_counts(self):
self.node1.start(filter="true", relay="true")
@ -76,3 +109,374 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush):
assert isinstance(p["agent"], str), "agent not str"
assert isinstance(p["origin"], str), "origin not str"
assert isinstance(p.get("score", 0.0), (int, float)), "score not number"
def test_admin_peer_by_id(self):
self.node1.start(relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
peer_id = self.node2.get_multiaddr_with_id().rpartition("/p2p/")[2]
info = self.node1.get_peer_info(peer_id)
logger.debug(f"Node-1 /admin/v1/peer/{peer_id}: {info} \n")
logger.debug("Validate response schema")
for k in ("multiaddr", "protocols", "shards", "connected", "agent", "origin"):
assert k in info, f"missing field: {k}"
assert info["multiaddr"] == self.node2.get_multiaddr_with_id(), "multiaddr mismatch"
def test_admin_set_all_log_levels(self):
self.node1.start(relay="true")
self.node1.container()
levels = ["TRACE", "DEBUG", "INFO", "NOTICE", "WARN", "ERROR", "FATAL"]
_levels = ["INFO"]
for lvl in _levels:
resp = self.node1.set_log_level(lvl)
logger.debug(f"Set log level ({lvl})")
self.node2.start(relay="true")
assert resp.status_code == 200, f"failed to set log level {lvl} {resp.text}"
self.node2.info()
self.node2.get_debug_version()
resp = self.node1.set_log_level("TRACE")
logger.debug(f"Restore default log level (TRACE) -> status={resp.status_code}")
assert resp.status_code == 200, f"failed to revert log level: {resp.text}"
@pytest.mark.timeout(120)
def test_log_level_DEBUG_from_TRACE(self):
self.node1.start(relay="true")
path = self.node1._log_path
for _ in range(50):
if os.path.exists(path):
break
time.sleep(0.05)
assert self.node1.set_log_level("TRACE").status_code == 200
assert self.node1.set_log_level("DEBUG").status_code == 200
start = os.path.getsize(path)
self._trigger()
time.sleep(2)
counts = self._read_tail_counts(path, start)
logger.debug(f"counts at DEBUG: {counts}")
assert counts["DBG"] > 0, "expected DEBUG logs at DEBUG level"
assert counts["TRC"] == 0, "TRACE must be filtered at DEBUG"
assert self.node1.set_log_level("TRACE").status_code == 200
@pytest.mark.timeout(120)
def test_log_level_INFO_from_DEBUG(self):
self.node1.start(relay="true")
path = self.node1._log_path
for _ in range(50):
if os.path.exists(path):
break
time.sleep(0.05)
# assert self.node1.set_log_level("DEBUG").status_code == 200
assert self.node1.set_log_level("INFO").status_code == 200
start = os.path.getsize(path)
self._trigger()
time.sleep(2)
counts = self._read_tail_counts(path, start)
logger.debug(f"counts at INFO: {counts}")
assert counts["INF"] > 0, "expected INFO logs at INFO level"
assert counts["DBG"] == 0 and counts["TRC"] == 0, "lower than INFO (DBG/TRC) must be filtered"
assert self.node1.set_log_level("TRACE").status_code == 200
@pytest.mark.timeout(120)
def test_log_level_NOTICE_from_INFO(self):
self.node1.start(relay="true")
path = self.node1._log_path
for _ in range(50):
if os.path.exists(path):
break
time.sleep(0.05)
assert self.node1.set_log_level("INFO").status_code == 200
assert self.node1.set_log_level("NOTICE").status_code == 200
start = os.path.getsize(path)
self._trigger()
time.sleep(2)
counts = self._read_tail_counts(path, start)
logger.debug(f"counts at NOTICE: {counts}")
assert counts["NTC"] > 0, "expected NOTICE logs at NOTICE level"
for lv in ["TRC", "DBG", "INF"]:
assert counts[lv] == 0, f"{lv} must be filtered at NOTICE"
assert self.node1.set_log_level("TRACE").status_code == 200
@pytest.mark.timeout(120)
def test_log_level_WARN_from_NOTICE(self):
self.node1.start(relay="true")
path = self.node1._log_path
for _ in range(50):
if os.path.exists(path):
break
time.sleep(0.05)
assert self.node1.set_log_level("NOTICE").status_code == 200
assert self.node1.set_log_level("WARN").status_code == 200
start = os.path.getsize(path)
self._trigger()
time.sleep(2)
counts = self._read_tail_counts(path, start)
logger.debug(f"counts at WARN: {counts}")
assert counts["WRN"] > 0, "expected WARN logs at WARN level"
for lv in ["TRC", "DBG", "INF", "NTC"]:
assert counts[lv] == 0, f"{lv} must be filtered at WARN"
assert self.node1.set_log_level("TRACE").status_code == 200
@pytest.mark.timeout(120)
def test_log_level_ERROR_from_WARN(self):
self.node1.start(relay="true")
path = self.node1._log_path
for _ in range(50):
if os.path.exists(path):
break
time.sleep(0.05)
assert self.node1.set_log_level("WARN").status_code == 200
assert self.node1.set_log_level("ERROR").status_code == 200
start = os.path.getsize(path)
self._trigger()
time.sleep(2)
counts = self._read_tail_counts(path, start)
logger.debug(f"counts at ERROR: {counts}")
assert counts["ERR"] > 0, "expected ERROR logs at ERROR level"
for lv in ["TRC", "DBG", "INF", "NTC", "WRN"]:
assert counts[lv] == 0, f"{lv} must be filtered at ERROR"
assert self.node1.set_log_level("TRACE").status_code == 200
@pytest.mark.timeout(120)
def test_log_level_FATAL_from_ERROR(self):
self.node1.start(relay="true")
path = self.node1._log_path
for _ in range(50):
if os.path.exists(path):
break
time.sleep(0.05)
assert self.node1.set_log_level("ERROR").status_code == 200
assert self.node1.set_log_level("FATAL").status_code == 200
start = os.path.getsize(path)
self._trigger()
time.sleep(2)
counts = self._read_tail_counts(path, start)
logger.debug(f"counts at FATAL: {counts}")
assert counts["FTL"] > 0, "expected FATAL logs at FATAL level"
for lv in ["TRC", "DBG", "INF", "NTC", "WRN", "ERR"]:
assert counts[lv] == 0, f"{lv} must be filtered at FATAL"
assert self.node1.set_log_level("TRACE").status_code == 200
def test_relay_peers_on_shard_schema(self):
node_shard = "0"
self.node1.start(relay="true", shard=node_shard, dns_discovery="false")
self.node2.start(
relay="true",
shard=node_shard,
dns_discovery="false",
discv5_bootstrap_node=self.node1.get_enr_uri(),
)
time.sleep(1)
resp = self.node1.get_relay_peers_on_shard(node_shard)
logger.debug(f"relay peers on shard=0 (schema): {resp!r}")
assert str(resp["shard"]) == node_shard, "Returned 'shard' must match requested shardId"
for p in resp["peers"]:
assert isinstance(p.get("multiaddr"), str) and p["multiaddr"].strip(), "peer.multiaddr must be a non-empty string"
if "protocols" in p:
assert isinstance(p["protocols"], list) and all(isinstance(x, str) for x in p["protocols"]), "peer.protocols must be list[str]"
if "shards" in p:
assert isinstance(p["shards"], list), "peer.shards must be a list"
if "connected" in p:
assert isinstance(p["connected"], str), "peer.connected must be a string"
if "agent" in p:
assert isinstance(p["agent"], str), "peer.agent must be a string"
if "origin" in p:
assert isinstance(p["origin"], str), "peer.origin must be a string"
if "score" in p:
assert isinstance(p["score"], (int, float)), "peer.score must be a number"
def test_relay_peers_on_shard_contains_connected_peer(self):
self.node1.start(relay="true", shard="0", dns_discovery="false")
self.node2.start(
relay="true",
shard="0",
dns_discovery="false",
discv5_bootstrap_node=self.node1.get_enr_uri(),
)
n2_addr = self.node2.get_multiaddr_with_id()
resp = self.node1.get_relay_peers_on_shard("0")
logger.debug(f"checking shard=0 list: {resp!r}")
assert any(
p.get("multiaddr") == n2_addr for p in resp["peers"]
), f"Expected Node-2 address {n2_addr} in Node-1's /admin/v1/peers/relay/on/0 list"
def test_admin_relay_peers_schema(self):
self.node1.start(relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node1.add_peers([self.node2.get_multiaddr_with_id()])
self.node2.add_peers([self.node1.get_multiaddr_with_id()])
time.sleep(1)
resp = self.node1.get_relay_peers()
logger.debug(f"/admin/v1/peers/relay (schema): {resp!r} / type={type(resp).__name__}")
groups = resp if isinstance(resp, list) else [resp]
for grp in groups:
peers_list = grp.get("peers")
for peer in peers_list:
ma = peer.get("multiaddr")
assert isinstance(ma, str) and ma.strip(), "multiaddr must be a non-empty string"
protos = peer["protocols"]
all(isinstance(x, str) for x in protos), "protocols must be list[str]"
assert isinstance(peer["score"], (int, float)), "score must be a number"
def test_admin_relay_peers_contains_all_relay_peers(self):
self.node1.start(relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}")
self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}")
self.node3.start(relay="false", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node4.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
n2_addr = self.node2.get_multiaddr_with_id()
n3_addr = self.node3.get_multiaddr_with_id()
n4_addr = self.node4.get_multiaddr_with_id()
time.sleep(1)
resp = self.node1.get_relay_peers()
logger.debug(f"/admin/v1/peers/relay {resp!r}")
peer_addrs = {peer["multiaddr"] for group in resp for peer in group["peers"]}
assert n2_addr in peer_addrs, f"Missing Node-2 address {n2_addr}"
assert n3_addr not in peer_addrs, f"Missing Node-3 address {n3_addr}"
assert n4_addr in peer_addrs, f"Missing Node-4 address {n4_addr}"
def test_admin_connected_peers_on_shard_contains_all_three(self):
shard = "0"
self.node1.start(relay="true", shard=shard)
self.node2.start(relay="true", shard=shard, discv5_bootstrap_node=self.node1.get_enr_uri())
self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}")
self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}")
self.node3.start(relay="true", shard=shard, discv5_bootstrap_node=self.node1.get_enr_uri())
self.node4.start(relay="true", shard=shard, discv5_bootstrap_node=self.node1.get_enr_uri())
n2_addr = self.node2.get_multiaddr_with_id()
n3_addr = self.node3.get_multiaddr_with_id()
n4_addr = self.node4.get_multiaddr_with_id()
time.sleep(1)
resp = self.node1.get_connected_peers(shard)
logger.debug(f"/admin/v1/peers/connected/on/{shard} (contains 3): {resp!r}")
peer_addrs = {p["multiaddr"] for p in resp["peers"]}
assert n2_addr in peer_addrs, f"Missing Node-2 address {n2_addr}"
assert n3_addr in peer_addrs, f"Missing Node-3 address {n3_addr}"
assert n4_addr in peer_addrs, f"Missing Node-4 address {n4_addr}"
def test_admin_connected_peers_scalar_types(self):
self.node1.start(relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
resp = self.node1.get_connected_peers()
logger.debug(f"Response for get connected peers {resp!r}")
for p in resp:
assert isinstance(p["multiaddr"], str) and p["multiaddr"].strip(), "multiaddr must be a non-empty string"
assert isinstance(p["agent"], str), "agent must be a string"
assert isinstance(p["connected"], str), "connected must be a string"
assert isinstance(p["origin"], str), "origin must be a string"
assert isinstance(p["score"], (int, float)), "score must be a number"
assert isinstance(p["latency"], (int, float)), "latency must be a number"
def test_admin_connected_peers_contains_peers_only(self):
self.node1.start(relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}")
self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}")
self.node3.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node4.start(relay="true")
n2_addr = self.node2.get_multiaddr_with_id()
n3_addr = self.node3.get_multiaddr_with_id()
n4_addr = self.node4.get_multiaddr_with_id()
resp = self.node1.get_connected_peers()
logger.debug(f"/admin/v1/peers/connected contains : {resp!r}")
peer_addrs = {p["multiaddr"] for p in resp}
assert n2_addr in peer_addrs, f"Missing Node-2 address {n2_addr}"
assert n3_addr in peer_addrs, f"Missing Node-3 address {n3_addr}"
assert n4_addr not in peer_addrs, f"Missing Node-4 address {n4_addr}"
def test_admin_service_peers_scalar_required_types(self):
self.node1.start(relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
resp = self.node1.get_service_peers()
logger.debug(f"/admin/v1/peers/service {resp!r}")
for service, peers in resp.items():
assert isinstance(service, str) and service.strip(), "service must be a non-empty string"
for p in peers:
assert isinstance(p.get("multiaddr"), str) and p["multiaddr"].strip(), "multiaddr must be a non-empty string"
assert isinstance(p.get("agent"), str), "agent must be a string"
assert isinstance(p.get("connected"), str), "connected must be a string"
assert isinstance(p.get("origin"), str), "origin must be a string"
assert isinstance(p.get("score"), (int, float)), "score must be a number"
assert isinstance(p.get("latency"), (int, float)), "latency must be a number"
def test_admin_service_peers_schema(self):
n1 = WakuNode(NODE_1, "n1_service_schema")
n2 = WakuNode(NODE_2, "n2_service_schema")
n1.start(relay="true")
n2.start(relay="true", discv5_bootstrap_node=n1.get_enr_uri())
peers = n1.get_service_peers()
logger.debug("Validate schema of get service peers")
for p in peers:
assert "multiaddr" in p, "missing 'multiaddr'"
assert "protocols" in p, "missing 'protocols'"
assert "shards" in p, "missing 'shards'"
assert "connected" in p, "missing 'connected'"
assert "agent" in p, "missing 'agent'"
assert "origin" in p, "missing 'origin'"
def test_admin_service_peers_contains_expected_addrs_and_protocols(self):
n1 = WakuNode(NODE_1, "n1_service_lookup")
n2 = WakuNode(NODE_2, "n2_service_relay")
n3 = WakuNode(NODE_2, "n3_service_store")
n1.start(relay="true")
n2.start(relay="true", discv5_bootstrap_node=n1.get_enr_uri())
n3.start(store="true", discv5_bootstrap_node=n1.get_enr_uri())
n1.add_peers([n3.get_multiaddr_with_id()])
resp = n1.get_service_peers()
logger.debug("/admin/v1/peers/service %s", resp)
by_addr = {p["multiaddr"]: p["protocols"] for p in resp}
m2 = n2.get_multiaddr_with_id()
m3 = n3.get_multiaddr_with_id()
assert m2 in by_addr, f"node2 not found"
assert any("/waku/relay/" in s for s in by_addr[m2]), "node2 should advertise a relay protocol"
assert m3 in by_addr, f"node3 not found. got: {list(by_addr.keys())}"
assert any("/waku/store-query/" in s for s in by_addr[m3]), "node3 should advertise a store-query protocol"

View File

@ -33,3 +33,9 @@ class TestDebugFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush):
logger.debug(f"node info: {info}")
assert info["enrUri"] == self.node1.get_enr_uri(), "node enruri doesn't match"
assert self.node1.get_multiaddr_with_id() in info["listenAddresses"], "node address doesn't match"
def test_get_debug_version_is_string(self):
self.node1.start(relay="true")
dbg_version = self.node1.get_debug_version()
logger.debug(f"debug version returned: {dbg_version}")
assert isinstance(dbg_version, str) and dbg_version.strip() != "", "Expected /debug/v1/version to return a non-empty string"