E2e s13 s16 (#183)

* Add S13

* Adding S16

* Add error message check for S13

* msrk s16 as xfail
This commit is contained in:
AYAHASSAN287 2026-05-18 17:04:16 +03:00 committed by GitHub
parent 4ba30a8aff
commit 938e761947
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 169 additions and 0 deletions

View File

@ -1,3 +1,4 @@
import base64
from concurrent.futures import ThreadPoolExecutor
import pytest
@ -13,6 +14,7 @@ from src.node.wrapper_helpers import (
assert_event_invariants,
create_message_bindings,
get_node_multiaddr,
wait_for_connected,
wait_for_propagated,
wait_for_sent,
wait_for_error,
@ -30,6 +32,12 @@ CACHE_EXPIRY_SLACK_S = 10.0
ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S = MAX_TIME_IN_CACHE_S + CACHE_EXPIRY_SLACK_S
RETRY_WINDOW_EXPIRED_MSG = "Unable to send within retry time window"
# Payload above DefaultMaxWakuMessageSize (150KiB), so the relay publish
# rejects it instead of failing with NO_PEERS_TO_RELAY.
OVERSIZED_PAYLOAD_BYTES = 200 * 1024
ERROR_TIMEOUT_S = 30.0
MESSAGE_SIZE_EXCEEDED_MSG = "Message size exceeded"
# S30: concurrent sends on the same content topic during initial auto-subscribe.
S30_CONCURRENT_SENDS = 5
S30_CONTENT_TOPIC = "/test/1/s30-concurrent/proto"
@ -160,6 +168,79 @@ class TestS21ErrorWhenRetryWindowExpires(StepsCommon):
assert_event_invariants(sender_collector, request_id)
class TestS13RelayHardFailureWithoutFallback(StepsCommon):
"""
S13: relay path is reachable (a relay peer is connected, so the publish
gets past NO_PEERS_TO_RELAY), but the relay publish fails for another
reason. An oversized payload is used so the relay processor rejects the
message immediately. No lightpush fallback is configured.
- Expected: Ok(RequestId), then a message_error event.
"""
def test_s13_relay_hard_failure_without_fallback(self, node_config):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"numShardsInNetwork": 1,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender_node:
relay_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender_node)],
"portsShift": 1,
}
relay_result = WrapperManager.create_and_start(config=relay_config)
assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}"
with relay_result.ok_value:
# A connected relay peer means the publish gets past
# NO_PEERS_TO_RELAY and actually reaches the relay processor.
assert wait_for_connected(sender_collector) is not None, (
f"Sender did not reach Connected/PartiallyConnected. " f"Collected events: {sender_collector.events}"
)
oversized_payload = base64.b64encode(b"x" * OVERSIZED_PAYLOAD_BYTES).decode()
message = create_message_bindings(
payload=oversized_payload,
contentTopic="/test/1/s13-relay-hard-failure/proto",
)
send_result = sender_node.send_message(message=message)
assert send_result.is_ok(), f"send() must return Ok(RequestId), got: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
error_event = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=ERROR_TIMEOUT_S,
)
assert error_event is not None, (
f"No message_error event within {ERROR_TIMEOUT_S}s from the " f"relay processor. Collected events: {sender_collector.events}"
)
assert error_event["requestId"] == request_id
assert MESSAGE_SIZE_EXCEEDED_MSG in (error_event.get("error") or ""), (
f"Expected error to contain {MESSAGE_SIZE_EXCEEDED_MSG!r}.\n" f"Got: {error_event.get('error')!r}\n" f"Full event: {error_event}"
)
propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0)
assert propagated is None, f"Unexpected message_propagated event for a failed relay publish: {propagated}"
assert_event_invariants(sender_collector, request_id)
class TestS30ConcurrentSendsDuringAutoSubscribe(StepsCommon):
"""
S30: concurrent sends on the same content topic during initial auto-subscribe.

View File

@ -532,6 +532,94 @@ class TestS15LightpushRetryableErrorRecovery(StepsCommon):
assert_event_invariants(sender_collector, request_id)
class TestS16LightpushPeerAppearsLater(StepsCommon):
"""
S16 No delivery peers at T0, lightpush peer appears later.
The edge sender has the lightpush service in its staticnodes, but the
service is stopped before the sender starts, so there is no reachable
delivery peer at T0. send() is called while the service is down. The
service is restarted during the retry window; the sender connects to it
and a later retry delivers the message.
Expected: send() returns Ok(RequestId), then eventually Propagated.
"""
@pytest.mark.xfail(reason="binding cannot restart a node or add peers at runtime")
def test_s16_lightpush_peer_appears_later(self):
sender_collector = EventCollector()
common = {
"store": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
}
# Start the lightpush service once to obtain its multiaddr, then stop
# it so the sender has no reachable peer at T0. The same node object
# is restarted later, so the address stays valid.
service_config = build_node_config(relay=True, lightpush=True, **common)
service_result = WrapperManager.create_and_start(config=service_config)
assert service_result.is_ok(), f"Failed to start lightpush peer: {service_result.err()}"
with service_result.ok_value as service:
service_multiaddr = get_node_multiaddr(service)
stop_result = service.stop_node()
assert stop_result.is_ok(), f"Failed to stop lightpush peer: {stop_result.err()}"
delay(SERVICE_DOWN_SETTLE_S)
# Edge sender is a lightpush client; its only peer is the service,
# which is currently down.
edge_config = build_node_config(
mode="Edge",
lightpush=True,
staticnodes=[service_multiaddr],
**common,
)
edge_result = WrapperManager.create_and_start(
config=edge_config,
event_cb=sender_collector.event_callback,
)
assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}"
with edge_result.ok_value as edge_sender:
# send() is invoked while the service is down.
msg = create_message_bindings(
payload=to_base64("S16 lightpush peer appears later"),
contentTopic="/test/1/s16-late-lightpush/proto",
)
send_result = edge_sender.send_message(message=msg)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
delay(SERVICE_DOWN_SETTLE_S)
early_propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0)
assert early_propagated is None, f"message_propagated arrived before the lightpush peer was reachable: {early_propagated}"
# The lightpush peer comes back during the retry window.
restart_result = service.start_node()
assert restart_result.is_ok(), f"Failed to restart lightpush peer: {restart_result.err()}"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=RECOVERY_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated within {RECOVERY_TIMEOUT_S}s "
f"after the lightpush peer joined. "
f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error after recovery: {error}"
assert_event_invariants(sender_collector, request_id)
class TestS26LightpushPeerChurn(StepsCommon):
"""
S26: multiple lightpush peers, the selected one disappears,