cleanup unused lines

This commit is contained in:
Aya Hassan 2026-05-07 12:31:41 +02:00
parent bf1731805b
commit 92aeeaa378
3 changed files with 42 additions and 4 deletions

View File

@ -12,6 +12,13 @@ EVENT_PROPAGATED = "message_propagated"
EVENT_SENT = "message_sent"
EVENT_ERROR = "message_error"
# MaxTimeInCache from send_service.nim.
MAX_TIME_IN_CACHE_S = 60.0
# Extra slack to cover the background retry loop tick after the window expires.
CACHE_EXPIRY_SLACK_S = 10.0
ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S = MAX_TIME_IN_CACHE_S + CACHE_EXPIRY_SLACK_S
RETRY_WINDOW_EXPIRED_MSG = "Unable to send within retry time window"
class EventCollector:
"""Thread-safe collector for async node events.
@ -84,6 +91,27 @@ def wait_for_error(collector: EventCollector, request_id: str, timeout_s: float)
return wait_for_event(collector, request_id, is_error_event, timeout_s)
def assert_no_error(collector: EventCollector, request_id: str, context: str = "") -> None:
"""Assert that no message_error event is currently buffered for `request_id`."""
event = wait_for_error(collector, request_id, timeout_s=0)
suffix = f" ({context})" if context else ""
assert event is None, f"Unexpected message_error event{suffix}: {event}"
def assert_no_sent(collector: EventCollector, request_id: str, context: str = "") -> None:
"""Assert that no message_sent event is currently buffered for `request_id`."""
event = wait_for_sent(collector, request_id, timeout_s=0)
suffix = f" ({context})" if context else ""
assert event is None, f"Unexpected message_sent event{suffix}: {event}"
def assert_no_propagated(collector: EventCollector, request_id: str, context: str = "") -> None:
"""Assert that no message_propagated event is currently buffered for `request_id`."""
event = wait_for_propagated(collector, request_id, timeout_s=0)
suffix = f" ({context})" if context else ""
assert event is None, f"Unexpected message_propagated event{suffix}: {event}"
def wait_for_connected(
collector: EventCollector,
timeout_s: float = 10.0,
@ -165,3 +193,16 @@ def create_message_bindings(**overrides) -> dict:
}
envelope.update(overrides)
return envelope
def assert_no_unknown_request_ids(collector: EventCollector, issued_request_ids) -> None:
"""Cross-association guard: every event carrying a requestId must belong
to one of the request ids we issued. Catches events that get attached to
the wrong request id under concurrency.
"""
issued = set(issued_request_ids)
for event in collector.events:
event_request_id = event.get("requestId")
if event_request_id is None:
continue
assert event_request_id in issued, f"Event carries an unknown requestId={event_request_id!r}, " f"not in issued set {issued}. Event: {event}"

View File

@ -1,6 +1,4 @@
from concurrent.futures import ThreadPoolExecutor
from time import time_ns
import pytest
from src.env_vars import NODE_2
from src.steps.common import StepsCommon
@ -235,6 +233,7 @@ class TestSendBeforeRelay(StepsStore):
-> SuccessfullyPropagated -> SuccessfullyValidated
"""
sender_collector = EventCollector()
store_node = WakuNode(NODE_2, f"s20_store_node_{self.test_id}")
store_node.start(
relay="true",

View File

@ -1,5 +1,4 @@
import base64
import pytest
from src.steps.common import StepsCommon
from src.libs.common import delay, to_base64
@ -15,7 +14,6 @@ from src.node.wrapper_helpers import (
wait_for_sent,
wait_for_error,
)
from src.steps.store import StepsStore
from tests.wrappers_tests.conftest import build_node_config
logger = get_custom_logger(__name__)