mirror of
https://github.com/logos-messaging/logos-messaging-interop-tests.git
synced 2026-05-24 03:09:58 +00:00
Improve wait_for_event loop logic and add assert_event_invariants helper (#178)
- Refactored the `wait_for_event` function for clarity and to ensure proper deadline handling within the loop. - Introduced `assert_event_invariants` to validate per-request event properties, enforcing invariants like correct `requestId`, no duplicate terminal events, and proper timing between `Propagated` and `Sent`. - Added tests for `assert_event_invariants` enforcement in `S14` and `S15` lightpush scenarios. Co-authored-by: Egor Rachkovskii <egorrachkovskii@status.im>
This commit is contained in:
parent
91a9fc55eb
commit
4d4bd535c4
@ -67,14 +67,14 @@ def wait_for_event(
|
||||
"""
|
||||
deadline = time.monotonic() + timeout_s
|
||||
|
||||
while time.monotonic() < deadline:
|
||||
while True:
|
||||
for event in collector.get_events_for_request(request_id):
|
||||
if predicate(event):
|
||||
return event
|
||||
if time.monotonic() >= deadline:
|
||||
return None
|
||||
time.sleep(poll_interval_s)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def wait_for_propagated(collector: EventCollector, request_id: str, timeout_s: float) -> Optional[dict]:
|
||||
return wait_for_event(collector, request_id, is_propagated_event, timeout_s)
|
||||
@ -88,6 +88,41 @@ def wait_for_error(collector: EventCollector, request_id: str, timeout_s: float)
|
||||
return wait_for_event(collector, request_id, is_error_event, timeout_s)
|
||||
|
||||
|
||||
TERMINAL_EVENT_TYPES = {EVENT_PROPAGATED, EVENT_SENT, EVENT_ERROR}
|
||||
|
||||
|
||||
def assert_event_invariants(collector: EventCollector, request_id: str) -> None:
|
||||
"""Check per-request event invariants (issue #163):
|
||||
- All events carry the correct requestId.
|
||||
- No duplicate terminal events (Propagated, Sent, Error).
|
||||
- Sent never appears before Propagated.
|
||||
"""
|
||||
events = collector.get_events_for_request(request_id)
|
||||
assert events, f"No events found for request {request_id}"
|
||||
|
||||
counts: dict[str, int] = {}
|
||||
first_index: dict[str, int] = {}
|
||||
for i, event in enumerate(events):
|
||||
assert event.get("requestId") == request_id, (
|
||||
f"Event at index {i} has wrong requestId: " f"expected {request_id!r}, got {event.get('requestId')!r}"
|
||||
)
|
||||
event_type = event.get("eventType", "")
|
||||
if event_type in TERMINAL_EVENT_TYPES:
|
||||
counts[event_type] = counts.get(event_type, 0) + 1
|
||||
if event_type not in first_index:
|
||||
first_index[event_type] = i
|
||||
|
||||
for event_type, count in counts.items():
|
||||
assert count == 1, f"Duplicate {event_type} events for request {request_id}: " f"got {count}, expected 1. Events: {events}"
|
||||
|
||||
if EVENT_SENT in first_index and EVENT_PROPAGATED in first_index:
|
||||
assert first_index[EVENT_PROPAGATED] < first_index[EVENT_SENT], (
|
||||
f"message_sent (index {first_index[EVENT_SENT]}) arrived before "
|
||||
f"message_propagated (index {first_index[EVENT_PROPAGATED]}) "
|
||||
f"for request {request_id}. Events: {events}"
|
||||
)
|
||||
|
||||
|
||||
def get_node_multiaddr(node) -> str:
|
||||
"""Return the first TCP multiaddr (with peer-id) from a WrapperManager node."""
|
||||
result = node.get_node_info_raw("MyMultiaddresses")
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
from time import time_ns
|
||||
import base64
|
||||
|
||||
import pytest
|
||||
from src.env_vars import NODE_2
|
||||
@ -9,6 +9,7 @@ from src.node.waku_node import WakuNode
|
||||
from src.node.wrappers_manager import WrapperManager
|
||||
from src.node.wrapper_helpers import (
|
||||
EventCollector,
|
||||
assert_event_invariants,
|
||||
create_message_bindings,
|
||||
get_node_multiaddr,
|
||||
wait_for_propagated,
|
||||
@ -16,14 +17,17 @@ from src.node.wrapper_helpers import (
|
||||
wait_for_error,
|
||||
)
|
||||
from src.steps.store import StepsStore
|
||||
from tests.wrappers_tests.conftest import build_node_config
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
PROPAGATED_TIMEOUT_S = 30.0
|
||||
SENT_TIMEOUT_S = 10.0
|
||||
NO_SENT_OBSERVATION_S = 5.0
|
||||
SENT_AFTER_STORE_TIMEOUT_S = 60.0
|
||||
OVERSIZED_PAYLOAD_BYTES = 200 * 1024
|
||||
RECOVERY_TIMEOUT_S = 45.0
|
||||
SERVICE_DOWN_SETTLE_S = 3.0
|
||||
|
||||
# MaxTimeInCache from send_service.nim.
|
||||
MAX_TIME_IN_CACHE_S = 60.0
|
||||
@ -371,3 +375,162 @@ class TestS06CoreSenderRelayOnly(StepsCommon):
|
||||
|
||||
sent = wait_for_sent(sender_collector, request_id, timeout_s=0)
|
||||
assert sent is None, f"Unexpected message_sent event (store is disabled): {sent}"
|
||||
|
||||
|
||||
class TestS14LightpushNonRetryableError(StepsCommon):
|
||||
"""
|
||||
S14 — Lightpush non-retryable error via oversized message.
|
||||
Edge sender publishes a message exceeding DefaultMaxWakuMessageSize (150KiB)
|
||||
through a lightpush service node. The server validates message size and
|
||||
returns INVALID_MESSAGE (420), a non-retryable error.
|
||||
Expected: send() returns Ok(RequestId), then message_error event.
|
||||
"""
|
||||
|
||||
def test_s14_oversized_message_triggers_error(self):
|
||||
sender_collector = EventCollector()
|
||||
|
||||
common = {
|
||||
"store": False,
|
||||
"filter": False,
|
||||
"discv5Discovery": False,
|
||||
"numShardsInNetwork": 1,
|
||||
}
|
||||
|
||||
service_config = build_node_config(relay=True, lightpush=True, **common)
|
||||
service_result = WrapperManager.create_and_start(config=service_config)
|
||||
assert service_result.is_ok(), f"Failed to start service: {service_result.err()}"
|
||||
|
||||
with service_result.ok_value as service:
|
||||
service_multiaddr = get_node_multiaddr(service)
|
||||
|
||||
edge_config = build_node_config(
|
||||
mode="Edge",
|
||||
relay=False,
|
||||
lightpushnode=service_multiaddr,
|
||||
staticnodes=[service_multiaddr],
|
||||
**common,
|
||||
)
|
||||
edge_result = WrapperManager.create_and_start(
|
||||
config=edge_config,
|
||||
event_cb=sender_collector.event_callback,
|
||||
)
|
||||
assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}"
|
||||
|
||||
with edge_result.ok_value as edge_sender:
|
||||
oversized_payload = base64.b64encode(b"x" * OVERSIZED_PAYLOAD_BYTES).decode()
|
||||
message = create_message_bindings(
|
||||
payload=oversized_payload,
|
||||
contentTopic="/test/1/s14-oversized/proto",
|
||||
)
|
||||
|
||||
send_result = edge_sender.send_message(message=message)
|
||||
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
|
||||
|
||||
request_id = send_result.ok_value
|
||||
assert request_id, "send() returned an empty RequestId"
|
||||
|
||||
error = wait_for_error(
|
||||
collector=sender_collector,
|
||||
request_id=request_id,
|
||||
timeout_s=PROPAGATED_TIMEOUT_S,
|
||||
)
|
||||
assert error is not None, (
|
||||
f"No message_error event within {PROPAGATED_TIMEOUT_S}s "
|
||||
f"after sending oversized message. "
|
||||
f"Collected events: {sender_collector.events}"
|
||||
)
|
||||
assert error["requestId"] == request_id
|
||||
logger.info(f"S14 received error event: {error}")
|
||||
|
||||
error_msg = error.get("error", "").lower()
|
||||
assert "size exceeded" in error_msg, f"Error message doesn't indicate size violation: {error}"
|
||||
|
||||
propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0)
|
||||
assert propagated is None, f"Unexpected message_propagated for an invalid message: {propagated}"
|
||||
|
||||
assert_event_invariants(sender_collector, request_id)
|
||||
|
||||
|
||||
class TestS15LightpushRetryableErrorRecovery(StepsCommon):
|
||||
"""
|
||||
S15 — Lightpush retryable error + recovery.
|
||||
Edge sender publishes via a lightpush service node that has NO relay peers.
|
||||
The service accepts the lightpush request but returns NO_PEERS_TO_RELAY —
|
||||
a retryable error (explicitly listed in the S15 spec). The message enters
|
||||
the retry loop. A relay peer then joins the service node, and the next
|
||||
retry succeeds.
|
||||
Expected: send() returns Ok(RequestId), then eventually Propagated.
|
||||
"""
|
||||
|
||||
def test_s15_lightpush_retryable_error_then_recovery(self):
|
||||
sender_collector = EventCollector()
|
||||
|
||||
common = {
|
||||
"store": False,
|
||||
"filter": False,
|
||||
"discv5Discovery": False,
|
||||
"numShardsInNetwork": 1,
|
||||
}
|
||||
|
||||
service_config = build_node_config(relay=True, lightpush=True, **common)
|
||||
service_result = WrapperManager.create_and_start(config=service_config)
|
||||
assert service_result.is_ok(), f"Failed to start service: {service_result.err()}"
|
||||
|
||||
with service_result.ok_value as service:
|
||||
service_multiaddr = get_node_multiaddr(service)
|
||||
|
||||
edge_config = build_node_config(
|
||||
mode="Edge",
|
||||
relay=False,
|
||||
lightpushnode=service_multiaddr,
|
||||
staticnodes=[service_multiaddr],
|
||||
**common,
|
||||
)
|
||||
edge_result = WrapperManager.create_and_start(
|
||||
config=edge_config,
|
||||
event_cb=sender_collector.event_callback,
|
||||
)
|
||||
assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}"
|
||||
|
||||
with edge_result.ok_value as edge_sender:
|
||||
msg = create_message_bindings(
|
||||
payload=to_base64("S15 retryable error recovery"),
|
||||
contentTopic="/test/1/s15-recovery/proto",
|
||||
)
|
||||
send_result = edge_sender.send_message(message=msg)
|
||||
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
|
||||
request_id = send_result.ok_value
|
||||
assert request_id, "send() returned an empty RequestId"
|
||||
|
||||
delay(SERVICE_DOWN_SETTLE_S)
|
||||
|
||||
early_propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0)
|
||||
assert early_propagated is None, (
|
||||
f"message_propagated arrived before relay peer joined — " f"retryable error path was not exercised: {early_propagated}"
|
||||
)
|
||||
|
||||
relay_config = build_node_config(
|
||||
relay=True,
|
||||
staticnodes=[service_multiaddr],
|
||||
**common,
|
||||
)
|
||||
relay_result = WrapperManager.create_and_start(config=relay_config)
|
||||
assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}"
|
||||
|
||||
with relay_result.ok_value:
|
||||
propagated = wait_for_propagated(
|
||||
collector=sender_collector,
|
||||
request_id=request_id,
|
||||
timeout_s=RECOVERY_TIMEOUT_S,
|
||||
)
|
||||
assert propagated is not None, (
|
||||
f"No message_propagated within {RECOVERY_TIMEOUT_S}s "
|
||||
f"after relay peer joined. "
|
||||
f"Collected events: {sender_collector.events}"
|
||||
)
|
||||
assert propagated["requestId"] == request_id
|
||||
|
||||
error = wait_for_error(sender_collector, request_id, timeout_s=0)
|
||||
assert error is None, f"Unexpected message_error after recovery: {error}"
|
||||
|
||||
assert_event_invariants(sender_collector, request_id)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user