From 8644151be3a60121104f39446adf937d16c875b9 Mon Sep 17 00:00:00 2001 From: Darshan <35736874+darshankabariya@users.noreply.github.com> Date: Fri, 10 Apr 2026 15:26:03 +0530 Subject: [PATCH] fix: resolved flaky peer management tests. (#170) --- src/test_data.py | 35 --------- .../test_peer_store.py | 72 +++++++++++++------ 2 files changed, 52 insertions(+), 55 deletions(-) diff --git a/src/test_data.py b/src/test_data.py index b5e425201..4166c30ae 100644 --- a/src/test_data.py +++ b/src/test_data.py @@ -277,40 +277,6 @@ METRICS_WITH_INITIAL_VALUE_ZERO = [ 'waku_archive_query_duration_seconds_bucket{le="7.5"}', 'waku_archive_query_duration_seconds_bucket{le="10.0"}', 'waku_archive_query_duration_seconds_bucket{le="+Inf"}', - "waku_legacy_archive_insert_duration_seconds_sum", - "waku_legacy_archive_insert_duration_seconds_count", - 'waku_legacy_archive_insert_duration_seconds_bucket{le="0.005"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="0.01"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="0.025"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="0.05"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="0.075"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="0.1"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="0.25"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="0.5"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="0.75"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="1.0"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="2.5"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="5.0"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="7.5"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="10.0"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="+Inf"}', - "waku_legacy_archive_query_duration_seconds_sum", - "waku_legacy_archive_query_duration_seconds_count", - 'waku_legacy_archive_query_duration_seconds_bucket{le="0.005"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="0.01"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="0.025"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="0.05"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="0.075"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="0.1"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="0.25"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="0.5"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="0.75"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="1.0"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="2.5"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="5.0"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="7.5"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="10.0"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="+Inf"}', "waku_filter_subscriptions", "waku_filter_handle_message_duration_seconds_sum", "waku_filter_handle_message_duration_seconds_count", @@ -383,7 +349,6 @@ METRICS_WITH_INITIAL_VALUE_ZERO = [ 'waku_filter_handle_message_duration_seconds_bucket{le="20.0"}', 'waku_filter_handle_message_duration_seconds_bucket{le="30.0"}', "total_messages_cached", - "waku_legacy_store_queries_total", "waku_store_queries_total", "mix_pool_size", "libp2p_gossipsub_imreceiving_saved_messages_total", diff --git a/tests/peer_connection_management/test_peer_store.py b/tests/peer_connection_management/test_peer_store.py index fb265db93..0766684d1 100644 --- a/tests/peer_connection_management/test_peer_store.py +++ b/tests/peer_connection_management/test_peer_store.py @@ -1,4 +1,5 @@ import pytest +from tenacity import retry, stop_after_delay, wait_fixed from src.env_vars import NODE_1, NODE_2 from src.libs.common import delay @@ -25,14 +26,20 @@ class TestPeerStore(StepsRelay, StepsStore): logger.debug(f"Node {i} peer ID {node_id}") ids.append(node_id) - for i in range(5): - others = [] - for peer_info in nodes[i].get_peers(): - logger.debug(f"Node {i} peer info {peer_info}") - peer_id = peer_info2id(peer_info, nodes[i].is_nwaku()) - others.append(peer_id) + # Discv5 discovery is eventually-consistent; poll until every node's peer + # store reflects the expected mesh, or fail after the timeout. + @retry(stop=stop_after_delay(60), wait=wait_fixed(2), reraise=True) + def check_peer_stores(): + for i in range(5): + others = [] + for peer_info in nodes[i].get_peers(): + logger.debug(f"Node {i} peer info {peer_info}") + peer_id = peer_info2id(peer_info, nodes[i].is_nwaku()) + others.append(peer_id) - assert (i == 0 and len(others) == 4) or (i > 0 and len(others) >= 1), f"Some nodes missing in the peer store of Node ID {ids[i]}" + assert (i == 0 and len(others) == 4) or (i > 0 and len(others) >= 1), f"Some nodes missing in the peer store of Node ID {ids[i]}" + + check_peer_stores() def test_add_peers(self): self.setup_main_nodes() @@ -41,27 +48,52 @@ class TestPeerStore(StepsRelay, StepsStore): nodes = [self.node1, self.node2] nodes.extend(self.optional_nodes) delay(10) - peers_multiaddr = set() - for i in range(2): - for peer_info in nodes[i].get_peers(): - multiaddr = peer_info2multiaddr(peer_info, nodes[i].is_nwaku()) - peers_multiaddr.add(multiaddr) - assert len(peers_multiaddr) >= 5, "At least 5 multi addresses are expected" + # Discv5 discovery is eventually-consistent; poll node1 and node2's peer + # stores until the union contains >=5 multiaddrs, or fail after the timeout. + @retry(stop=stop_after_delay(60), wait=wait_fixed(2), reraise=True) + def collect_multiaddrs(): + peers_multiaddr = set() + for i in range(2): + for peer_info in nodes[i].get_peers(): + multiaddr = peer_info2multiaddr(peer_info, nodes[i].is_nwaku()) + peers_multiaddr.add(multiaddr) - # Add peers one by one excluding self for Nodes 2-5 + assert len(peers_multiaddr) >= 5, f"At least 5 multi addresses are expected, got {len(peers_multiaddr)}: {peers_multiaddr}" + return peers_multiaddr + + peers_multiaddr = collect_multiaddrs() + + # Group multiaddrs by peer ID. libp2p identify can leak observed (ephemeral + # source) addresses into the peer store alongside the real listen address; + # those observed addrs are unreachable when dialed back, so we try every + # known address for a peer until one succeeds. + addrs_by_peer = {} + for peer in peers_multiaddr: + addrs_by_peer.setdefault(multiaddr2id(peer), []).append(peer) + + # For each of nodes 2-5, add every other peer via the add_peers API. for i in range(1, 5): - for peer in list(peers_multiaddr): - if nodes[i].get_id() != multiaddr2id(peer): + self_id = nodes[i].get_id() + for peer_id, addrs in addrs_by_peer.items(): + if peer_id == self_id: + continue + last_err = None + for addr in addrs: try: if nodes[i].is_nwaku(): - nodes[i].add_peers([peer]) + nodes[i].add_peers([addr]) else: - peer_info = {"multiaddr": peer, "shards": [0], "protocols": ["/vac/waku/relay/2.0.0"]} + peer_info = {"multiaddr": addr, "shards": [0], "protocols": ["/vac/waku/relay/2.0.0"]} nodes[i].add_peers(peer_info) + last_err = None + break except Exception as ex: - logger.error(f"Failed to add peer to Node {i} peer store: {ex}") - raise + last_err = ex + logger.warning(f"Node {i} failed to add peer {peer_id} via {addr}: {ex}") + if last_err is not None: + logger.error(f"Node {i} could not add peer {peer_id} via any known address") + raise last_err @pytest.mark.skip(reason="waiting for https://github.com/waku-org/nwaku/issues/1549 resolution") def test_get_peers_two_protocols(self):