diff --git a/src/node/api_clients/rest.py b/src/node/api_clients/rest.py index 464f5de7..c0bd2f7f 100644 --- a/src/node/api_clients/rest.py +++ b/src/node/api_clients/rest.py @@ -184,3 +184,7 @@ class REST(BaseClient): def get_debug_version(self): return self.rest_call("get", "debug/v1/version").text.strip() + + def get_peer(self, peer_id: str): + resp = self.rest_call("get", f"admin/v1/peer/{peer_id}") + return resp.json() diff --git a/src/node/waku_node.py b/src/node/waku_node.py index 8d11c65d..9a8d2e35 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -574,3 +574,6 @@ class WakuNode: def get_debug_version(self): return self._api.get_debug_version() + + def get_peer_info(self, peer_id: str): + return self._api.get_peer(peer_id) diff --git a/tests/rest_flags/admin_flags.py b/tests/rest_flags/admin_flags.py index 7a0d3b3e..409cd928 100644 --- a/tests/rest_flags/admin_flags.py +++ b/tests/rest_flags/admin_flags.py @@ -1,4 +1,4 @@ -import pytest +import pytest, time, re, os from src.env_vars import NODE_1, NODE_2, STRESS_ENABLED from src.libs.common import delay from src.libs.custom_logger import get_custom_logger @@ -17,6 +17,24 @@ These tests make sure thst REST flags related to admin flags acting as expected class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush): + TAGS = ["TRC", "DBG", "INF", "NTC", "WRN", "ERR", "FTL"] + + LEVEL_RE = re.compile(r'"lvl"\s*:\s*"(TRC|DBG|INF|NTC|WRN|ERR|FTL)"|\b(TRC|DBG|INF|NTC|WRN|ERR|FTL)\b') + + def _read_tail_counts(self, path: str, start_size: int) -> dict: + with open(path, "rb") as f: + f.seek(start_size) + text = f.read().decode(errors="ignore") + counts = {t: 0 for t in self.TAGS} + for a, b in self.LEVEL_RE.findall(text): + counts[(a or b)] += 1 + return counts + + def _trigger(self): + self.node1.info() + self.node1.get_version() + self.node1.get_debug_version() + @pytest.fixture(scope="function", autouse=True) def nodes(self): self.node1 = WakuNode(NODE_2, f"node1_{self.test_id}") @@ -24,6 +42,14 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush): self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") self.node4 = WakuNode(NODE_2, f"node3_{self.test_id}") + def _tail(self, path, start_size): + with open(path, "rb") as f: + f.seek(start_size) + return f.read().decode(errors="ignore") + + def _count_levels(self, text, levels): + return {lvl: len(re.findall(getattr(self, f"{lvl}_RE"), text)) for lvl in levels} + def test_admin_filter_subscriptions_shape(self): self.node1.start(filter="true", relay="true") self.node2.start(relay="false", filternode=self.node1.get_multiaddr_with_id(), discv5_bootstrap_node=self.node1.get_enr_uri()) @@ -38,12 +64,19 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush): assert subs[0]["filterCriteria"][0]["pubsubTopic"] == self.test_pubsub_topic, "pubsub topic doesn't match" assert subs[0]["filterCriteria"][0]["contentTopic"] == self.test_content_topic, "content topic doesn't match" - def test_admin_peers_stats_shape(self): - self.node1.start(relay="true") + def test_admin_peers_stats_schema(self): + self.node1.start(filter="true", relay="true") self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) - stats = self.node1.get_peer_stats() - logger.debug(f"Node admin peers stats {stats}") + logger.debug(f"Peer stats schema check: {stats}") + assert isinstance(stats, dict), "stats must be a dict" + for k in ("Sum", "Relay peers"): + assert k in stats, f"missing section: {k}" + assert isinstance(stats[k], dict), f"{k} must be a dict" + assert isinstance(stats["Sum"].get("Total peers", 0), int) and stats["Sum"]["Total peers"] >= 0, "Sum.Total peers must be a non-negative int" + assert ( + isinstance(stats["Relay peers"].get("Total relay peers", 0), int) and stats["Relay peers"]["Total relay peers"] >= 0 + ), "Relay peers.Total relay peers must be a non-negative int" def test_admin_peers_stats_counts(self): self.node1.start(filter="true", relay="true") @@ -76,3 +109,374 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush): assert isinstance(p["agent"], str), "agent not str" assert isinstance(p["origin"], str), "origin not str" assert isinstance(p.get("score", 0.0), (int, float)), "score not number" + + def test_admin_peer_by_id(self): + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + peer_id = self.node2.get_multiaddr_with_id().rpartition("/p2p/")[2] + info = self.node1.get_peer_info(peer_id) + logger.debug(f"Node-1 /admin/v1/peer/{peer_id}: {info} \n") + logger.debug("Validate response schema") + for k in ("multiaddr", "protocols", "shards", "connected", "agent", "origin"): + assert k in info, f"missing field: {k}" + assert info["multiaddr"] == self.node2.get_multiaddr_with_id(), "multiaddr mismatch" + + def test_admin_set_all_log_levels(self): + self.node1.start(relay="true") + self.node1.container() + levels = ["TRACE", "DEBUG", "INFO", "NOTICE", "WARN", "ERROR", "FATAL"] + _levels = ["INFO"] + for lvl in _levels: + resp = self.node1.set_log_level(lvl) + logger.debug(f"Set log level ({lvl})") + self.node2.start(relay="true") + assert resp.status_code == 200, f"failed to set log level {lvl} {resp.text}" + self.node2.info() + self.node2.get_debug_version() + + resp = self.node1.set_log_level("TRACE") + logger.debug(f"Restore default log level (TRACE) -> status={resp.status_code}") + assert resp.status_code == 200, f"failed to revert log level: {resp.text}" + + @pytest.mark.timeout(120) + def test_log_level_DEBUG_from_TRACE(self): + self.node1.start(relay="true") + path = self.node1._log_path + for _ in range(50): + if os.path.exists(path): + break + time.sleep(0.05) + + assert self.node1.set_log_level("TRACE").status_code == 200 + assert self.node1.set_log_level("DEBUG").status_code == 200 + + start = os.path.getsize(path) + self._trigger() + time.sleep(2) + + counts = self._read_tail_counts(path, start) + logger.debug(f"counts at DEBUG: {counts}") + + assert counts["DBG"] > 0, "expected DEBUG logs at DEBUG level" + assert counts["TRC"] == 0, "TRACE must be filtered at DEBUG" + + assert self.node1.set_log_level("TRACE").status_code == 200 + + @pytest.mark.timeout(120) + def test_log_level_INFO_from_DEBUG(self): + self.node1.start(relay="true") + path = self.node1._log_path + for _ in range(50): + if os.path.exists(path): + break + time.sleep(0.05) + + # assert self.node1.set_log_level("DEBUG").status_code == 200 + assert self.node1.set_log_level("INFO").status_code == 200 + + start = os.path.getsize(path) + self._trigger() + time.sleep(2) + + counts = self._read_tail_counts(path, start) + logger.debug(f"counts at INFO: {counts}") + + assert counts["INF"] > 0, "expected INFO logs at INFO level" + assert counts["DBG"] == 0 and counts["TRC"] == 0, "lower than INFO (DBG/TRC) must be filtered" + + assert self.node1.set_log_level("TRACE").status_code == 200 + + @pytest.mark.timeout(120) + def test_log_level_NOTICE_from_INFO(self): + self.node1.start(relay="true") + path = self.node1._log_path + for _ in range(50): + if os.path.exists(path): + break + time.sleep(0.05) + + assert self.node1.set_log_level("INFO").status_code == 200 + assert self.node1.set_log_level("NOTICE").status_code == 200 + + start = os.path.getsize(path) + self._trigger() + time.sleep(2) + + counts = self._read_tail_counts(path, start) + logger.debug(f"counts at NOTICE: {counts}") + + assert counts["NTC"] > 0, "expected NOTICE logs at NOTICE level" + for lv in ["TRC", "DBG", "INF"]: + assert counts[lv] == 0, f"{lv} must be filtered at NOTICE" + + assert self.node1.set_log_level("TRACE").status_code == 200 + + @pytest.mark.timeout(120) + def test_log_level_WARN_from_NOTICE(self): + self.node1.start(relay="true") + path = self.node1._log_path + for _ in range(50): + if os.path.exists(path): + break + time.sleep(0.05) + + assert self.node1.set_log_level("NOTICE").status_code == 200 + assert self.node1.set_log_level("WARN").status_code == 200 + + start = os.path.getsize(path) + self._trigger() + time.sleep(2) + + counts = self._read_tail_counts(path, start) + logger.debug(f"counts at WARN: {counts}") + + assert counts["WRN"] > 0, "expected WARN logs at WARN level" + for lv in ["TRC", "DBG", "INF", "NTC"]: + assert counts[lv] == 0, f"{lv} must be filtered at WARN" + + assert self.node1.set_log_level("TRACE").status_code == 200 + + @pytest.mark.timeout(120) + def test_log_level_ERROR_from_WARN(self): + self.node1.start(relay="true") + path = self.node1._log_path + for _ in range(50): + if os.path.exists(path): + break + time.sleep(0.05) + + assert self.node1.set_log_level("WARN").status_code == 200 + assert self.node1.set_log_level("ERROR").status_code == 200 + + start = os.path.getsize(path) + self._trigger() + time.sleep(2) + + counts = self._read_tail_counts(path, start) + logger.debug(f"counts at ERROR: {counts}") + + assert counts["ERR"] > 0, "expected ERROR logs at ERROR level" + for lv in ["TRC", "DBG", "INF", "NTC", "WRN"]: + assert counts[lv] == 0, f"{lv} must be filtered at ERROR" + + assert self.node1.set_log_level("TRACE").status_code == 200 + + @pytest.mark.timeout(120) + def test_log_level_FATAL_from_ERROR(self): + self.node1.start(relay="true") + path = self.node1._log_path + for _ in range(50): + if os.path.exists(path): + break + time.sleep(0.05) + + assert self.node1.set_log_level("ERROR").status_code == 200 + assert self.node1.set_log_level("FATAL").status_code == 200 + + start = os.path.getsize(path) + self._trigger() + time.sleep(2) + + counts = self._read_tail_counts(path, start) + logger.debug(f"counts at FATAL: {counts}") + + assert counts["FTL"] > 0, "expected FATAL logs at FATAL level" + for lv in ["TRC", "DBG", "INF", "NTC", "WRN", "ERR"]: + assert counts[lv] == 0, f"{lv} must be filtered at FATAL" + + assert self.node1.set_log_level("TRACE").status_code == 200 + + def test_relay_peers_on_shard_schema(self): + node_shard = "0" + self.node1.start(relay="true", shard=node_shard, dns_discovery="false") + self.node2.start( + relay="true", + shard=node_shard, + dns_discovery="false", + discv5_bootstrap_node=self.node1.get_enr_uri(), + ) + time.sleep(1) + resp = self.node1.get_relay_peers_on_shard(node_shard) + logger.debug(f"relay peers on shard=0 (schema): {resp!r}") + assert str(resp["shard"]) == node_shard, "Returned 'shard' must match requested shardId" + for p in resp["peers"]: + assert isinstance(p.get("multiaddr"), str) and p["multiaddr"].strip(), "peer.multiaddr must be a non-empty string" + if "protocols" in p: + assert isinstance(p["protocols"], list) and all(isinstance(x, str) for x in p["protocols"]), "peer.protocols must be list[str]" + if "shards" in p: + assert isinstance(p["shards"], list), "peer.shards must be a list" + if "connected" in p: + assert isinstance(p["connected"], str), "peer.connected must be a string" + if "agent" in p: + assert isinstance(p["agent"], str), "peer.agent must be a string" + if "origin" in p: + assert isinstance(p["origin"], str), "peer.origin must be a string" + if "score" in p: + assert isinstance(p["score"], (int, float)), "peer.score must be a number" + + def test_relay_peers_on_shard_contains_connected_peer(self): + self.node1.start(relay="true", shard="0", dns_discovery="false") + self.node2.start( + relay="true", + shard="0", + dns_discovery="false", + discv5_bootstrap_node=self.node1.get_enr_uri(), + ) + n2_addr = self.node2.get_multiaddr_with_id() + resp = self.node1.get_relay_peers_on_shard("0") + logger.debug(f"checking shard=0 list: {resp!r}") + assert any( + p.get("multiaddr") == n2_addr for p in resp["peers"] + ), f"Expected Node-2 address {n2_addr} in Node-1's /admin/v1/peers/relay/on/0 list" + + def test_admin_relay_peers_schema(self): + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node1.add_peers([self.node2.get_multiaddr_with_id()]) + self.node2.add_peers([self.node1.get_multiaddr_with_id()]) + time.sleep(1) + + resp = self.node1.get_relay_peers() + logger.debug(f"/admin/v1/peers/relay (schema): {resp!r} / type={type(resp).__name__}") + + groups = resp if isinstance(resp, list) else [resp] + for grp in groups: + peers_list = grp.get("peers") + for peer in peers_list: + ma = peer.get("multiaddr") + assert isinstance(ma, str) and ma.strip(), "multiaddr must be a non-empty string" + + protos = peer["protocols"] + all(isinstance(x, str) for x in protos), "protocols must be list[str]" + + assert isinstance(peer["score"], (int, float)), "score must be a number" + + def test_admin_relay_peers_contains_all_relay_peers(self): + self.node1.start(relay="true") + + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + self.node3.start(relay="false", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + + n2_addr = self.node2.get_multiaddr_with_id() + n3_addr = self.node3.get_multiaddr_with_id() + n4_addr = self.node4.get_multiaddr_with_id() + time.sleep(1) + + resp = self.node1.get_relay_peers() + logger.debug(f"/admin/v1/peers/relay {resp!r}") + + peer_addrs = {peer["multiaddr"] for group in resp for peer in group["peers"]} + assert n2_addr in peer_addrs, f"Missing Node-2 address {n2_addr}" + assert n3_addr not in peer_addrs, f"Missing Node-3 address {n3_addr}" + assert n4_addr in peer_addrs, f"Missing Node-4 address {n4_addr}" + + def test_admin_connected_peers_on_shard_contains_all_three(self): + shard = "0" + self.node1.start(relay="true", shard=shard) + self.node2.start(relay="true", shard=shard, discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + self.node3.start(relay="true", shard=shard, discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node4.start(relay="true", shard=shard, discv5_bootstrap_node=self.node1.get_enr_uri()) + + n2_addr = self.node2.get_multiaddr_with_id() + n3_addr = self.node3.get_multiaddr_with_id() + n4_addr = self.node4.get_multiaddr_with_id() + time.sleep(1) + + resp = self.node1.get_connected_peers(shard) + logger.debug(f"/admin/v1/peers/connected/on/{shard} (contains 3): {resp!r}") + + peer_addrs = {p["multiaddr"] for p in resp["peers"]} + assert n2_addr in peer_addrs, f"Missing Node-2 address {n2_addr}" + assert n3_addr in peer_addrs, f"Missing Node-3 address {n3_addr}" + assert n4_addr in peer_addrs, f"Missing Node-4 address {n4_addr}" + + def test_admin_connected_peers_scalar_types(self): + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + resp = self.node1.get_connected_peers() + logger.debug(f"Response for get connected peers {resp!r}") + + for p in resp: + assert isinstance(p["multiaddr"], str) and p["multiaddr"].strip(), "multiaddr must be a non-empty string" + assert isinstance(p["agent"], str), "agent must be a string" + assert isinstance(p["connected"], str), "connected must be a string" + assert isinstance(p["origin"], str), "origin must be a string" + assert isinstance(p["score"], (int, float)), "score must be a number" + assert isinstance(p["latency"], (int, float)), "latency must be a number" + + def test_admin_connected_peers_contains_peers_only(self): + self.node1.start(relay="true") + + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + self.node3.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node4.start(relay="true") + + n2_addr = self.node2.get_multiaddr_with_id() + n3_addr = self.node3.get_multiaddr_with_id() + n4_addr = self.node4.get_multiaddr_with_id() + resp = self.node1.get_connected_peers() + logger.debug(f"/admin/v1/peers/connected contains : {resp!r}") + + peer_addrs = {p["multiaddr"] for p in resp} + assert n2_addr in peer_addrs, f"Missing Node-2 address {n2_addr}" + assert n3_addr in peer_addrs, f"Missing Node-3 address {n3_addr}" + assert n4_addr not in peer_addrs, f"Missing Node-4 address {n4_addr}" + + def test_admin_service_peers_scalar_required_types(self): + self.node1.start(relay="true") + + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + resp = self.node1.get_service_peers() + logger.debug(f"/admin/v1/peers/service {resp!r}") + + for service, peers in resp.items(): + assert isinstance(service, str) and service.strip(), "service must be a non-empty string" + for p in peers: + assert isinstance(p.get("multiaddr"), str) and p["multiaddr"].strip(), "multiaddr must be a non-empty string" + assert isinstance(p.get("agent"), str), "agent must be a string" + assert isinstance(p.get("connected"), str), "connected must be a string" + assert isinstance(p.get("origin"), str), "origin must be a string" + assert isinstance(p.get("score"), (int, float)), "score must be a number" + assert isinstance(p.get("latency"), (int, float)), "latency must be a number" + + def test_admin_service_peers_schema(self): + n1 = WakuNode(NODE_1, "n1_service_schema") + n2 = WakuNode(NODE_2, "n2_service_schema") + n1.start(relay="true") + n2.start(relay="true", discv5_bootstrap_node=n1.get_enr_uri()) + peers = n1.get_service_peers() + logger.debug("Validate schema of get service peers") + for p in peers: + assert "multiaddr" in p, "missing 'multiaddr'" + assert "protocols" in p, "missing 'protocols'" + assert "shards" in p, "missing 'shards'" + assert "connected" in p, "missing 'connected'" + assert "agent" in p, "missing 'agent'" + assert "origin" in p, "missing 'origin'" + + def test_admin_service_peers_contains_expected_addrs_and_protocols(self): + n1 = WakuNode(NODE_1, "n1_service_lookup") + n2 = WakuNode(NODE_2, "n2_service_relay") + n3 = WakuNode(NODE_2, "n3_service_store") + + n1.start(relay="true") + n2.start(relay="true", discv5_bootstrap_node=n1.get_enr_uri()) + n3.start(store="true", discv5_bootstrap_node=n1.get_enr_uri()) + n1.add_peers([n3.get_multiaddr_with_id()]) + resp = n1.get_service_peers() + logger.debug("/admin/v1/peers/service %s", resp) + by_addr = {p["multiaddr"]: p["protocols"] for p in resp} + + m2 = n2.get_multiaddr_with_id() + m3 = n3.get_multiaddr_with_id() + assert m2 in by_addr, f"node2 not found" + assert any("/waku/relay/" in s for s in by_addr[m2]), "node2 should advertise a relay protocol" + assert m3 in by_addr, f"node3 not found. got: {list(by_addr.keys())}" + assert any("/waku/store-query/" in s for s in by_addr[m3]), "node3 should advertise a store-query protocol" diff --git a/tests/rest_flags/debug_flags.py b/tests/rest_flags/debug_flags.py index a424fdaf..d0dfe870 100644 --- a/tests/rest_flags/debug_flags.py +++ b/tests/rest_flags/debug_flags.py @@ -33,3 +33,9 @@ class TestDebugFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush): logger.debug(f"node info: {info}") assert info["enrUri"] == self.node1.get_enr_uri(), "node enruri doesn't match" assert self.node1.get_multiaddr_with_id() in info["listenAddresses"], "node address doesn't match" + + def test_get_debug_version_is_string(self): + self.node1.start(relay="true") + dbg_version = self.node1.get_debug_version() + logger.debug(f"debug version returned: {dbg_version}") + assert isinstance(dbg_version, str) and dbg_version.strip() != "", "Expected /debug/v1/version to return a non-empty string"