diff --git a/src/node/wrapper_helpers.py b/src/node/wrapper_helpers.py index fe631e21f..8710ed556 100644 --- a/src/node/wrapper_helpers.py +++ b/src/node/wrapper_helpers.py @@ -12,6 +12,13 @@ EVENT_PROPAGATED = "message_propagated" EVENT_SENT = "message_sent" EVENT_ERROR = "message_error" +# MaxTimeInCache from send_service.nim. +MAX_TIME_IN_CACHE_S = 60.0 +# Extra slack to cover the background retry loop tick after the window expires. +CACHE_EXPIRY_SLACK_S = 10.0 +ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S = MAX_TIME_IN_CACHE_S + CACHE_EXPIRY_SLACK_S +RETRY_WINDOW_EXPIRED_MSG = "Unable to send within retry time window" + class EventCollector: """Thread-safe collector for async node events. @@ -84,6 +91,27 @@ def wait_for_error(collector: EventCollector, request_id: str, timeout_s: float) return wait_for_event(collector, request_id, is_error_event, timeout_s) +def assert_no_error(collector: EventCollector, request_id: str, context: str = "") -> None: + """Assert that no message_error event is currently buffered for `request_id`.""" + event = wait_for_error(collector, request_id, timeout_s=0) + suffix = f" ({context})" if context else "" + assert event is None, f"Unexpected message_error event{suffix}: {event}" + + +def assert_no_sent(collector: EventCollector, request_id: str, context: str = "") -> None: + """Assert that no message_sent event is currently buffered for `request_id`.""" + event = wait_for_sent(collector, request_id, timeout_s=0) + suffix = f" ({context})" if context else "" + assert event is None, f"Unexpected message_sent event{suffix}: {event}" + + +def assert_no_propagated(collector: EventCollector, request_id: str, context: str = "") -> None: + """Assert that no message_propagated event is currently buffered for `request_id`.""" + event = wait_for_propagated(collector, request_id, timeout_s=0) + suffix = f" ({context})" if context else "" + assert event is None, f"Unexpected message_propagated event{suffix}: {event}" + + def wait_for_connected( collector: EventCollector, timeout_s: float = 10.0, @@ -165,3 +193,16 @@ def create_message_bindings(**overrides) -> dict: } envelope.update(overrides) return envelope + + +def assert_no_unknown_request_ids(collector: EventCollector, issued_request_ids) -> None: + """Cross-association guard: every event carrying a requestId must belong + to one of the request ids we issued. Catches events that get attached to + the wrong request id under concurrency. + """ + issued = set(issued_request_ids) + for event in collector.events: + event_request_id = event.get("requestId") + if event_request_id is None: + continue + assert event_request_id in issued, f"Event carries an unknown requestId={event_request_id!r}, " f"not in issued set {issued}. Event: {event}" diff --git a/tests/wrappers_tests/test_send_e2e_part1.py b/tests/wrappers_tests/test_send_e2e_part1.py index 7f6a1d8f2..7ef272f0d 100644 --- a/tests/wrappers_tests/test_send_e2e_part1.py +++ b/tests/wrappers_tests/test_send_e2e_part1.py @@ -1,6 +1,4 @@ from concurrent.futures import ThreadPoolExecutor -from time import time_ns - import pytest from src.env_vars import NODE_2 from src.steps.common import StepsCommon @@ -235,6 +233,7 @@ class TestSendBeforeRelay(StepsStore): -> SuccessfullyPropagated -> SuccessfullyValidated """ + sender_collector = EventCollector() store_node = WakuNode(NODE_2, f"s20_store_node_{self.test_id}") store_node.start( relay="true", diff --git a/tests/wrappers_tests/test_send_e2e_part2.py b/tests/wrappers_tests/test_send_e2e_part2.py index 33f66fb98..14add387d 100644 --- a/tests/wrappers_tests/test_send_e2e_part2.py +++ b/tests/wrappers_tests/test_send_e2e_part2.py @@ -1,5 +1,4 @@ import base64 - import pytest from src.steps.common import StepsCommon from src.libs.common import delay, to_base64 @@ -15,7 +14,6 @@ from src.node.wrapper_helpers import ( wait_for_sent, wait_for_error, ) -from src.steps.store import StepsStore from tests.wrappers_tests.conftest import build_node_config logger = get_custom_logger(__name__)