From ccba9ce7087648581ec938c32aff7d0e6b14fa33 Mon Sep 17 00:00:00 2001 From: Aya Hassan Date: Thu, 7 May 2026 13:43:29 +0200 Subject: [PATCH] Add assert_event_invariants --- tests/wrappers_tests/test_send_e2e_part1.py | 29 +++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/tests/wrappers_tests/test_send_e2e_part1.py b/tests/wrappers_tests/test_send_e2e_part1.py index 51e845078..de49a984f 100644 --- a/tests/wrappers_tests/test_send_e2e_part1.py +++ b/tests/wrappers_tests/test_send_e2e_part1.py @@ -9,6 +9,7 @@ from src.node.waku_node import WakuNode from src.node.wrappers_manager import WrapperManager from src.node.wrapper_helpers import ( EventCollector, + assert_event_invariants, create_message_bindings, get_node_multiaddr, wait_for_connected, @@ -129,6 +130,8 @@ class TestSendBeforeRelay(StepsStore): f"from a store-enabled relay peer. Collected events: {sender_collector.events}" ) + assert_event_invariants(sender_collector, request_id) + @pytest.mark.xfail(reason="fails to republish after store peer joins mesh see https://github.com/logos-messaging/logos-delivery/issues/3848") def test_s19_store_peer_appears_after_propagation(self, node_config): """ @@ -222,6 +225,8 @@ class TestSendBeforeRelay(StepsStore): ascending="true", ) + assert_event_invariants(sender_collector, request_id) + @pytest.mark.skip(reason="Forcing the miss store round not possible") def test_s20_store_misses_initially_then_retry_succeeds(self, node_config): """ @@ -341,6 +346,8 @@ class TestSendBeforeRelay(StepsStore): ascending="true", ) + assert_event_invariants(sender_collector, request_id) + def test_s21_error_when_retry_window_expires(self, node_config): """ S21: delivery retry window expires before any valid path recovers. @@ -392,6 +399,8 @@ class TestSendBeforeRelay(StepsStore): f"Full event: {error_event}" ) + assert_event_invariants(sender_collector, request_id) + def test_s22_non_ephemeral_message_with_reliability_disabled(self, node_config): """ S22: non-ephemeral message with reliabilityEnabled disabled. @@ -463,6 +472,8 @@ class TestSendBeforeRelay(StepsStore): f"Collected events: {sender_collector.events}" ) + assert_event_invariants(sender_collector, request_id) + def test_s23_no_sent_event_when_relay_has_no_store(self, node_config): """ S23: non-ephemeral message, reliability enabled, no store peer ever reachable. @@ -542,6 +553,8 @@ class TestSendBeforeRelay(StepsStore): f"Collected events: {sender_collector.events}" ) + assert_event_invariants(sender_collector, request_id) + def test_s24_ephemeral_message_with_reachable_store(self, node_config): """ S24: ephemeral message, reliability enabled, reachable store peer. @@ -607,6 +620,8 @@ class TestSendBeforeRelay(StepsStore): f"Collected events: {sender_collector.events}" ) + assert_event_invariants(sender_collector, request_id) + def test_s26_lightpush_peer_churn_alternate_remains(self, node_config): """ S26: multiple lightpush peers, the selected one disappears, @@ -719,6 +734,8 @@ class TestSendBeforeRelay(StepsStore): ) assert error_event is None, f"Unexpected message_error event during peer churn: {error_event}" + assert_event_invariants(sender_collector, request_id) + def test_s30_concurrent_sends_during_auto_subscribe(self, node_config): """ S30: concurrent sends on the same content topic during initial auto-subscribe. @@ -815,6 +832,12 @@ class TestSendBeforeRelay(StepsStore): f"Event carries an unknown requestId={event_request_id!r}, " f"not in issued set {issued}. Event: {event}" ) + # Per-request invariants apply to every concurrent send + # (correct requestId, no duplicate terminal events, + # Sent never before Propagated). + for request_id in request_ids: + assert_event_invariants(sender_collector, request_id) + def test_s31_concurrent_sends_mixed_topics_during_churn(self, node_config): """ S31: concurrent sends across mixed content topics during peer churn. @@ -962,6 +985,12 @@ class TestSendBeforeRelay(StepsStore): ascending="true", ) + # Per-request invariants apply across all phases, including the + # retry-path bursts (phase 2). If retries ever emit duplicate + # Propagated events or reorder Sent before Propagated, this catches it. + for request_id in all_request_ids: + assert_event_invariants(sender_collector, request_id) + def _s31_fire_burst(self, sender_node, *, phase_label: str) -> list[str]: """Fire S31_BURST_SIZE concurrent sends, one per topic in S31_CONTENT_TOPICS. Returns the list of RequestIds. Asserts every send returned Ok."""