This commit is contained in:
Aya Hassan 2026-04-27 10:46:47 +02:00
parent f6a558b8e7
commit 8c7979ce67

View File

@ -38,6 +38,16 @@ RETRY_WINDOW_EXPIRED_MSG = "Unable to send within retry time window"
S30_CONCURRENT_SENDS = 5
S30_CONTENT_TOPIC = "/test/1/s30-concurrent/proto"
# S31: concurrent sends across mixed topics during peer churn.
S31_BURST_SIZE = 4
S31_CONTENT_TOPICS = [
"/test/1/s31-topic-a/proto",
"/test/1/s31-topic-b/proto",
"/test/1/s31-topic-c/proto",
"/test/1/s31-topic-d/proto",
]
S31_PROPAGATED_TIMEOUT_S = 30.0
class TestSendBeforeRelay(StepsStore):
def test_s17_send_before_relay_peers_joins(self, node_config):
@ -546,8 +556,11 @@ class TestSendBeforeRelay(StepsStore):
def test_s30_concurrent_sends_during_auto_subscribe(self, node_config):
"""
S30: concurrent sends on the same content topic during initial auto-subscribe.
- Sender starts unsubscribed to the target topic.
- Several send() calls are issued at nearly the same time.
- Each call must return Ok(RequestId) with a unique id.
- Each request id must get its own propagated event,
with no dropped or cross-associated events.
"""
sender_collector = EventCollector()
@ -636,6 +649,124 @@ class TestSendBeforeRelay(StepsStore):
f"Event carries an unknown requestId={event_request_id!r}, " f"not in issued set {issued}. Event: {event}"
)
def test_s31_concurrent_sends_mixed_topics_during_churn(self, node_config):
"""
S31: concurrent sends across mixed content topics during peer churn.
"""
sender_collector = EventCollector()
# Three docker peers, started first so the sender can discover them.
relay_peer = WakuNode(NODE_2, f"s31_relay_peer_{self.test_id}")
relay_peer.start(relay="true", discv5_discovery="false")
relay_peer.set_relay_subscriptions([self.test_pubsub_topic])
lightpush_peer = WakuNode(NODE_2, f"s31_lightpush_peer_{self.test_id}")
lightpush_peer.start(relay="true", lightpush="true", discv5_discovery="false")
lightpush_peer.set_relay_subscriptions([self.test_pubsub_topic])
store_peer = WakuNode(NODE_2, f"s31_store_peer_{self.test_id}")
store_peer.start(relay="true", store="true", discv5_discovery="false")
store_peer.set_relay_subscriptions([self.test_pubsub_topic])
churn_peers = [relay_peer, lightpush_peer, store_peer]
# Sender: wrapper node with relay + lightpush enabled as a client.
node_config.update(
{
"relay": True,
"lightpush": True,
"store": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
"lightpushnode": lightpush_peer.get_multiaddr_with_id(),
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender_node:
# Connect every docker peer to the sender.
sender_multiaddr = get_node_multiaddr(sender_node)
for peer in churn_peers:
peer.add_peers([sender_multiaddr])
delay(3) # let docker peers connect to the sender
all_request_ids: list[str] = []
# ---- Phase 1: burst BEFORE churn (full topology). ----
phase1_ids = self._s31_fire_burst(sender_node, phase_label="phase1")
all_request_ids.extend(phase1_ids)
# ---- Phase 2: restart all docker peers, burst DURING churn. ----
for peer in churn_peers:
peer.restart()
phase2_ids = self._s31_fire_burst(sender_node, phase_label="phase2")
all_request_ids.extend(phase2_ids)
# Wait for all peers to be ready again and re-attach the sender.
for peer in churn_peers:
peer.ensure_ready(timeout_duration=20)
peer.add_peers([sender_multiaddr])
delay(3)
# ---- Phase 3: burst AFTER churn (full topology restored). ----
phase3_ids = self._s31_fire_burst(sender_node, phase_label="phase3")
all_request_ids.extend(phase3_ids)
# All request ids across all phases must be globally unique.
assert len(set(all_request_ids)) == len(all_request_ids), f"Duplicate RequestIds across bursts: {all_request_ids}"
# Stable-topology phases must each get a propagated event
# per request id. We do not assert this for phase 2.
for request_id in phase1_ids + phase3_ids:
propagated_event = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=S31_PROPAGATED_TIMEOUT_S,
)
assert propagated_event is not None, (
f"No MessagePropagatedEvent for stable-phase "
f"request_id={request_id} within {S31_PROPAGATED_TIMEOUT_S}s. "
f"Collected events: {sender_collector.events}"
)
issued = set(all_request_ids)
for event in sender_collector.events:
event_request_id = event.get("requestId")
if event_request_id is None:
continue
assert event_request_id in issued, (
f"Event carries an unknown requestId={event_request_id!r}, " f"not in issued set {issued}. Event: {event}"
)
@staticmethod
def _s31_fire_burst(sender_node, *, phase_label: str) -> list[str]:
"""Fire S31_BURST_SIZE concurrent sends, one per topic in S31_CONTENT_TOPICS.
Returns the list of request ids. Asserts every send returned Ok."""
messages = [
create_message_bindings(
contentTopic=S31_CONTENT_TOPICS[i],
payload=to_base64(f"s31-{phase_label}-{i}"),
)
for i in range(S31_BURST_SIZE)
]
with ThreadPoolExecutor(max_workers=S31_BURST_SIZE) as pool:
send_results = list(pool.map(sender_node.send_message, messages))
request_ids = []
for i, send_result in enumerate(send_results):
assert send_result.is_ok(), f"{phase_label}: concurrent send #{i} failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, f"{phase_label}: concurrent send #{i} returned an empty RequestId"
request_ids.append(request_id)
return request_ids
class TestS06CoreSenderRelayOnly(StepsCommon):
"""