mirror of
https://github.com/logos-messaging/logos-messaging-interop-tests.git
synced 2026-05-24 03:09:58 +00:00
553 lines
23 KiB
Python
553 lines
23 KiB
Python
import json
|
|
import subprocess
|
|
import sys
|
|
import textwrap
|
|
|
|
import pytest
|
|
from src.steps.common import StepsCommon
|
|
from src.libs.common import to_base64
|
|
from src.env_vars import NODE_1
|
|
from src.node.waku_node import WakuNode
|
|
from src.node.wrappers_manager import WrapperManager
|
|
from src.node.wrapper_helpers import (
|
|
EventCollector,
|
|
assert_event_invariants,
|
|
create_message_bindings,
|
|
get_node_multiaddr,
|
|
wait_for_connected,
|
|
wait_for_propagated,
|
|
wait_for_sent,
|
|
wait_for_error,
|
|
)
|
|
from src.test_data import DEFAULT_CLUSTER_ID
|
|
from tests.wrappers_tests.conftest import build_node_config
|
|
|
|
PROPAGATED_TIMEOUT_S = 30.0
|
|
SENT_AFTER_STORE_TIMEOUT_S = 60.0
|
|
|
|
# Default-cluster shard-0 pubsub topic; used to subscribe the S11 docker store
|
|
# peer so it joins the same relay mesh as the wrapper nodes (wrapper config
|
|
# uses numShardsInNetwork=1 => shard 0).
|
|
STORE_PEER_PUBSUB_TOPIC = f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/0"
|
|
|
|
S01_EXPECTED_ERROR_FRAGMENT = "not initialized"
|
|
S01_SUBPROCESS_TIMEOUT_S = 30
|
|
S01_RESULT_MARKER = "__S01_RESULT__"
|
|
SEND_AFTER_DESTROY_RESULT_MARKER = "__SEND_AFTER_DESTROY_RESULT__"
|
|
SEND_AFTER_DESTROY_SUBPROCESS_TIMEOUT_S = 60
|
|
|
|
# S05: malformed content topics break shard resolution inside
|
|
# SubscriptionManager.subscribe(), forcing the auto-subscribe step to fail.
|
|
# Each case targets a distinct validator branch.
|
|
# Fragment match: the bindings return Err("Failed to auto-subscribe before
|
|
# sending: ..."). We match on the capitalized verb phrase "Failed to
|
|
# auto-subscribe" because the bare word "auto-subscribe" also appears in
|
|
# success-path log strings (e.g. "Auto-subscribing to topic on send") and
|
|
# would produce false positives if we matched on that.
|
|
S05_EXPECTED_ERROR_FRAGMENT = "Failed to auto-subscribe"
|
|
S05_MALFORMED_CONTENT_TOPICS = [
|
|
# No leading slash — parser rejects with "must start with slash".
|
|
("s05-invalid-no-leading-slash", "no-leading-slash"),
|
|
# Empty string — parser rejects empty content topic.
|
|
("", "empty"),
|
|
# Only 3 segments — content topics need /app/version/name/encoding.
|
|
("/app/1/name", "missing-encoding-segment"),
|
|
# Empty middle segment between slashes.
|
|
("/app//name/proto", "empty-middle-segment"),
|
|
]
|
|
|
|
|
|
# Run send() in a subprocess so a missing C-ABI guard (which can SIGSEGV)
|
|
# fails the test cleanly instead of taking the runner down.
|
|
_S01_SUBPROCESS_SCRIPT = textwrap.dedent(
|
|
f"""
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
_project_root = Path({repr(__file__)}).resolve().parents[2]
|
|
_bindings_path = _project_root / "vendor" / "logos-delivery-python-bindings" / "waku"
|
|
if str(_bindings_path) not in sys.path:
|
|
sys.path.insert(0, str(_bindings_path))
|
|
if str(_project_root) not in sys.path:
|
|
sys.path.insert(0, str(_project_root))
|
|
|
|
from wrapper import NodeWrapper, ffi
|
|
from src.node.wrappers_manager import WrapperManager
|
|
from src.node.wrapper_helpers import create_message_bindings
|
|
|
|
sender = WrapperManager(NodeWrapper(ctx=ffi.NULL, config_buffer=None, event_cb_handler=None))
|
|
send_result = sender.send_message(message=create_message_bindings())
|
|
|
|
print({repr(S01_RESULT_MARKER)} + json.dumps({{
|
|
"is_ok": send_result.is_ok(),
|
|
"ok": send_result.ok_value if send_result.is_ok() else None,
|
|
"err": send_result.err() if send_result.is_err() else None,
|
|
}}))
|
|
sys.exit(0)
|
|
"""
|
|
).strip()
|
|
|
|
|
|
# Uses destroy_keep_ctx() so self.ctx stays non-nil after destroy — forces
|
|
# the send call to reach the C side with the original (now-stale) pointer.
|
|
_SEND_AFTER_DESTROY_SUBPROCESS_SCRIPT = textwrap.dedent(
|
|
f"""
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
_project_root = Path({repr(__file__)}).resolve().parents[2]
|
|
_bindings_path = _project_root / "vendor" / "logos-delivery-python-bindings" / "waku"
|
|
if str(_bindings_path) not in sys.path:
|
|
sys.path.insert(0, str(_bindings_path))
|
|
if str(_project_root) not in sys.path:
|
|
sys.path.insert(0, str(_project_root))
|
|
|
|
from src.node.wrappers_manager import WrapperManager
|
|
from src.node.wrapper_helpers import EventCollector, create_message_bindings
|
|
from tests.wrappers_tests.conftest import build_node_config
|
|
|
|
collector = EventCollector()
|
|
|
|
create_result = WrapperManager.create_and_start(
|
|
config=build_node_config(),
|
|
event_cb=collector.event_callback,
|
|
)
|
|
if create_result.is_err():
|
|
print({repr(SEND_AFTER_DESTROY_RESULT_MARKER)} + json.dumps({{
|
|
"stage": "create_and_start",
|
|
"is_ok": False,
|
|
"ok": None,
|
|
"err": create_result.err(),
|
|
"events_after_send": [],
|
|
}}))
|
|
sys.exit(0)
|
|
|
|
sender = create_result.ok_value
|
|
|
|
stop_result = sender.stop_node()
|
|
if stop_result.is_err():
|
|
print({repr(SEND_AFTER_DESTROY_RESULT_MARKER)} + json.dumps({{
|
|
"stage": "stop_node",
|
|
"is_ok": False,
|
|
"ok": None,
|
|
"err": stop_result.err(),
|
|
"events_after_send": [],
|
|
}}))
|
|
sys.exit(0)
|
|
|
|
destroy_result = sender.destroy_keep_ctx()
|
|
if destroy_result.is_err():
|
|
print({repr(SEND_AFTER_DESTROY_RESULT_MARKER)} + json.dumps({{
|
|
"stage": "destroy_keep_ctx",
|
|
"is_ok": False,
|
|
"ok": None,
|
|
"err": destroy_result.err(),
|
|
"events_after_send": [],
|
|
}}))
|
|
sys.exit(0)
|
|
|
|
events_before_send = len(collector.events)
|
|
|
|
envelope = create_message_bindings()
|
|
send_result = sender.send_message(message=envelope)
|
|
|
|
new_events = collector.events[events_before_send:]
|
|
|
|
payload = {{
|
|
"stage": "send_message",
|
|
"is_ok": send_result.is_ok(),
|
|
"ok": send_result.ok_value if send_result.is_ok() else None,
|
|
"err": send_result.err() if send_result.is_err() else None,
|
|
"events_after_send": [str(e) for e in new_events],
|
|
}}
|
|
print({repr(SEND_AFTER_DESTROY_RESULT_MARKER)} + json.dumps(payload))
|
|
sys.exit(0)
|
|
"""
|
|
).strip()
|
|
|
|
|
|
class TestS01NilOrUninitializedHandle(StepsCommon):
|
|
"""S01 — send() on a nil/destroyed handle must Err, no events, no crash."""
|
|
|
|
def test_s01_send_on_uninitialized_handle(self):
|
|
completed = subprocess.run(
|
|
[sys.executable, "-c", _S01_SUBPROCESS_SCRIPT],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=S01_SUBPROCESS_TIMEOUT_S,
|
|
)
|
|
|
|
assert completed.returncode == 0, (
|
|
f"send() crashed on a nil handle (returncode={completed.returncode}). " f"stdout={completed.stdout!r} stderr={completed.stderr!r}"
|
|
)
|
|
|
|
result_line = next(
|
|
(l for l in completed.stdout.splitlines() if l.startswith(S01_RESULT_MARKER)),
|
|
None,
|
|
)
|
|
assert result_line, f"missing result marker. stdout={completed.stdout!r} stderr={completed.stderr!r}"
|
|
|
|
result = json.loads(result_line[len(S01_RESULT_MARKER) :])
|
|
|
|
assert result["is_ok"] is False, f"expected Err, got Ok({result['ok']!r})"
|
|
assert S01_EXPECTED_ERROR_FRAGMENT in (
|
|
result["err"] or ""
|
|
), f"expected error to mention {S01_EXPECTED_ERROR_FRAGMENT!r}, got: {result['err']!r}"
|
|
|
|
def test_s01_send_on_destroyed_handle(self):
|
|
completed = subprocess.run(
|
|
[sys.executable, "-c", _SEND_AFTER_DESTROY_SUBPROCESS_SCRIPT],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=SEND_AFTER_DESTROY_SUBPROCESS_TIMEOUT_S,
|
|
)
|
|
|
|
assert completed.returncode == 0, (
|
|
f"send() crashed on a destroyed handle (returncode={completed.returncode}). " f"stdout={completed.stdout!r} stderr={completed.stderr!r}"
|
|
)
|
|
|
|
result_line = next(
|
|
(l for l in completed.stdout.splitlines() if l.startswith(SEND_AFTER_DESTROY_RESULT_MARKER)),
|
|
None,
|
|
)
|
|
assert result_line, f"missing result marker. stdout={completed.stdout!r} stderr={completed.stderr!r}"
|
|
|
|
result = json.loads(result_line[len(SEND_AFTER_DESTROY_RESULT_MARKER) :])
|
|
|
|
assert result["stage"] == "send_message", f"setup failed at stage {result['stage']!r}: {result['err']!r}"
|
|
assert result["is_ok"] is False, f"expected Err, got Ok({result['ok']!r})"
|
|
assert S01_EXPECTED_ERROR_FRAGMENT in (
|
|
result["err"] or ""
|
|
), f"expected error to mention {S01_EXPECTED_ERROR_FRAGMENT!r}, got: {result['err']!r}"
|
|
assert result["events_after_send"] == [], f"expected no events after send(), got: {result['events_after_send']}"
|
|
|
|
|
|
class TestS03SendOnAlreadySubscribedTopic(StepsCommon):
|
|
"""
|
|
S03 — Send on already-subscribed content topic.
|
|
Sender explicitly calls subscribe_content_topic() before send().
|
|
The send path must behave identically to the auto-subscribe case:
|
|
Propagated arrives, no Sent (store disabled), no Error.
|
|
Topology mirrors S06 (relay-only sender + relay peer, no store).
|
|
Purpose: proves the send path is identical when auto-subscription is skipped.
|
|
"""
|
|
|
|
def test_s03_send_on_already_subscribed_content_topic(self, node_config):
|
|
sender_collector = EventCollector()
|
|
content_topic = "/test/1/s03-already-subscribed/proto"
|
|
|
|
node_config.update(
|
|
{
|
|
"relay": True,
|
|
"store": False,
|
|
"lightpush": False,
|
|
"filter": False,
|
|
"discv5Discovery": False,
|
|
"numShardsInNetwork": 1,
|
|
"reliabilityEnabled": True,
|
|
}
|
|
)
|
|
|
|
sender_result = WrapperManager.create_and_start(
|
|
config=node_config,
|
|
event_cb=sender_collector.event_callback,
|
|
)
|
|
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
|
|
|
|
with sender_result.ok_value as sender:
|
|
peer_config = {
|
|
**node_config,
|
|
"staticnodes": [get_node_multiaddr(sender)],
|
|
"portsShift": 1,
|
|
}
|
|
|
|
peer_result = WrapperManager.create_and_start(config=peer_config)
|
|
assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}"
|
|
|
|
with peer_result.ok_value:
|
|
assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state"
|
|
|
|
# Explicit subscribe before send — this is what S03 is about.
|
|
# The send path must still return Ok(RequestId) and emit the
|
|
# same events as the auto-subscribe topology in S06.
|
|
subscribe_result = sender.subscribe_content_topic(content_topic)
|
|
assert subscribe_result.is_ok(), f"subscribe_content_topic failed: {subscribe_result.err()}"
|
|
|
|
message = create_message_bindings(
|
|
payload=to_base64("S03 already-subscribed test payload"),
|
|
contentTopic=content_topic,
|
|
)
|
|
|
|
send_result = sender.send_message(message=message)
|
|
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
|
|
|
|
request_id = send_result.ok_value
|
|
assert request_id, "send() returned an empty RequestId"
|
|
|
|
propagated = wait_for_propagated(
|
|
collector=sender_collector,
|
|
request_id=request_id,
|
|
timeout_s=PROPAGATED_TIMEOUT_S,
|
|
)
|
|
assert propagated is not None, (
|
|
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
|
|
)
|
|
assert propagated["requestId"] == request_id
|
|
|
|
error = wait_for_error(sender_collector, request_id, timeout_s=0)
|
|
assert error is None, f"Unexpected message_error event: {error}"
|
|
|
|
sent = wait_for_sent(sender_collector, request_id, timeout_s=0)
|
|
assert sent is None, f"Unexpected message_sent event (store is disabled): {sent}"
|
|
|
|
assert_event_invariants(sender_collector, request_id)
|
|
|
|
|
|
class TestS04UnsubscribeThenSendSameTopic(StepsCommon):
|
|
"""
|
|
S04 — Unsubscribe, then send the same content topic again.
|
|
Sender subscribes to topic A, unsubscribes from A, then sends on A.
|
|
The send path must re-establish topic interest and deliver normally.
|
|
Topology mirrors S06 (relay-only sender + relay peer, no store).
|
|
Expected: send() returns Ok(RequestId), Propagated arrives,
|
|
no Sent (store disabled), no Error.
|
|
Purpose: verifies send() re-establishes topic interest after local unsubscribe.
|
|
"""
|
|
|
|
def test_s04_unsubscribe_then_send_same_content_topic(self, node_config):
|
|
sender_collector = EventCollector()
|
|
content_topic = "/test/1/s04-unsubscribe-resend/proto"
|
|
|
|
node_config.update(
|
|
{
|
|
"relay": True,
|
|
"store": False,
|
|
"lightpush": False,
|
|
"filter": False,
|
|
"discv5Discovery": False,
|
|
"numShardsInNetwork": 1,
|
|
"reliabilityEnabled": True,
|
|
}
|
|
)
|
|
|
|
sender_result = WrapperManager.create_and_start(
|
|
config=node_config,
|
|
event_cb=sender_collector.event_callback,
|
|
)
|
|
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
|
|
|
|
with sender_result.ok_value as sender:
|
|
peer_config = {
|
|
**node_config,
|
|
"staticnodes": [get_node_multiaddr(sender)],
|
|
"portsShift": 1,
|
|
}
|
|
|
|
peer_result = WrapperManager.create_and_start(config=peer_config)
|
|
assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}"
|
|
|
|
with peer_result.ok_value:
|
|
assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state"
|
|
|
|
# subscribe -> unsubscribe -> send: send() must re-establish
|
|
# topic interest internally.
|
|
subscribe_result = sender.subscribe_content_topic(content_topic)
|
|
assert subscribe_result.is_ok(), f"subscribe_content_topic failed: {subscribe_result.err()}"
|
|
|
|
unsubscribe_result = sender.unsubscribe_content_topic(content_topic)
|
|
assert unsubscribe_result.is_ok(), f"unsubscribe_content_topic failed: {unsubscribe_result.err()}"
|
|
|
|
message = create_message_bindings(
|
|
payload=to_base64("S04 unsubscribe-then-send test payload"),
|
|
contentTopic=content_topic,
|
|
)
|
|
|
|
send_result = sender.send_message(message=message)
|
|
assert send_result.is_ok(), f"send() failed after unsubscribe: {send_result.err()}"
|
|
|
|
request_id = send_result.ok_value
|
|
assert request_id, "send() returned an empty RequestId"
|
|
|
|
propagated = wait_for_propagated(
|
|
collector=sender_collector,
|
|
request_id=request_id,
|
|
timeout_s=PROPAGATED_TIMEOUT_S,
|
|
)
|
|
assert propagated is not None, (
|
|
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s "
|
|
f"after unsubscribe + send. Collected events: {sender_collector.events}"
|
|
)
|
|
assert propagated["requestId"] == request_id
|
|
|
|
error = wait_for_error(sender_collector, request_id, timeout_s=0)
|
|
assert error is None, f"Unexpected message_error event: {error}"
|
|
|
|
sent = wait_for_sent(sender_collector, request_id, timeout_s=0)
|
|
assert sent is None, f"Unexpected message_sent event (store is disabled): {sent}"
|
|
|
|
assert_event_invariants(sender_collector, request_id)
|
|
|
|
|
|
class TestS05AutoSubscribeFailureBeforeTaskCreation(StepsCommon):
|
|
"""
|
|
S05 — Auto-subscribe failure before task creation.
|
|
Sender is initialized but auto-subscription is forced to fail by using a
|
|
malformed content topic that breaks shard resolution inside
|
|
SubscriptionManager.subscribe().
|
|
Expected: send() returns Err with an "auto-subscribe" message, no events.
|
|
Purpose: covers the last synchronous error path before request ID creation,
|
|
across several distinct validator branches.
|
|
"""
|
|
|
|
@pytest.mark.parametrize(
|
|
"content_topic",
|
|
[topic for topic, _ in S05_MALFORMED_CONTENT_TOPICS],
|
|
ids=[case_id for _, case_id in S05_MALFORMED_CONTENT_TOPICS],
|
|
)
|
|
def test_s05_send_fails_when_auto_subscribe_fails(self, node_config, content_topic):
|
|
sender_collector = EventCollector()
|
|
|
|
node_config.update(
|
|
{
|
|
"relay": True,
|
|
"numShardsInNetwork": 1,
|
|
}
|
|
)
|
|
|
|
sender_result = WrapperManager.create_and_start(
|
|
config=node_config,
|
|
event_cb=sender_collector.event_callback,
|
|
)
|
|
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
|
|
|
|
with sender_result.ok_value as sender:
|
|
# Malformed content topic — SubscriptionManager.subscribe() cannot
|
|
# resolve it to a shard, so auto-subscribe inside send() fails.
|
|
message = create_message_bindings(
|
|
payload=to_base64("S05 auto-subscribe failure payload"),
|
|
contentTopic=content_topic,
|
|
)
|
|
|
|
send_result = sender.send_message(message=message)
|
|
|
|
assert send_result.is_err(), (
|
|
f"send() must return Err when auto-subscribe fails for " f"content_topic={content_topic!r}, got Ok({send_result.ok_value!r})"
|
|
)
|
|
|
|
error_message = send_result.err() or ""
|
|
assert S05_EXPECTED_ERROR_FRAGMENT in error_message, (
|
|
f"expected error to mention {S05_EXPECTED_ERROR_FRAGMENT!r} " f"for content_topic={content_topic!r}, got: {error_message!r}"
|
|
)
|
|
|
|
# No request id was created, so no events should be emitted.
|
|
assert sender_collector.events == [] or all(
|
|
event.get("eventType") == "connection_status_change" for event in sender_collector.events
|
|
), f"Unexpected events after a pre-task-creation failure: {sender_collector.events}"
|
|
|
|
|
|
class TestS11EdgeSenderLightpushAndStore(StepsCommon):
|
|
"""
|
|
S11 — Edge sender with lightpush path and store validation.
|
|
Edge sender has no local relay; it publishes via a wrapper lightpush peer
|
|
and validates delivery via a docker store peer. Reliability enabled.
|
|
Topology:
|
|
[LightpushPeer] wrapper, relay=True, lightpush=True, store=False
|
|
[StorePeer] docker WakuNode, relay=true, store=true,
|
|
dials the lightpush peer via add_peers and subscribes
|
|
to the same shard-0 pubsub topic so it joins the
|
|
relay mesh and archives propagated messages.
|
|
[Edge] wrapper, mode="Edge",
|
|
staticnodes=[lightpush_peer],
|
|
storenode=store_peer,
|
|
reliabilityEnabled=True
|
|
Expected: send() returns Ok(RequestId), Propagated arrives, then Sent
|
|
(store validation succeeds), no Error.
|
|
Purpose: edge-mode fully validated success path against a real docker
|
|
store node (cross-implementation check).
|
|
"""
|
|
|
|
def test_s11_edge_lightpush_with_store_validation(self, node_config):
|
|
sender_collector = EventCollector()
|
|
|
|
common = {
|
|
"filter": False,
|
|
"discv5Discovery": True,
|
|
"numShardsInNetwork": 1,
|
|
}
|
|
|
|
lightpush_config = build_node_config(
|
|
relay=True,
|
|
lightpush=True,
|
|
store=False,
|
|
**common,
|
|
)
|
|
|
|
lightpush_result = WrapperManager.create_and_start(config=lightpush_config)
|
|
assert lightpush_result.is_ok(), f"Failed to start lightpush peer: {lightpush_result.err()}"
|
|
|
|
with lightpush_result.ok_value as lightpush_peer:
|
|
lightpush_multiaddr = get_node_multiaddr(lightpush_peer)
|
|
|
|
# Docker store peer — real nwaku node running as the store backend.
|
|
# Dial the lightpush peer via add_peers and subscribe to the same
|
|
# shard-0 pubsub topic so it joins the relay mesh and archives
|
|
# messages propagated by the lightpush peer.
|
|
store_peer = WakuNode(NODE_1, f"s11_store_{self.test_id}")
|
|
store_peer.start(relay="true", store="true")
|
|
self.add_node_peer(store_peer, [lightpush_multiaddr])
|
|
store_peer.set_relay_subscriptions([STORE_PEER_PUBSUB_TOPIC])
|
|
store_multiaddr = store_peer.get_multiaddr_with_id()
|
|
|
|
edge_config = build_node_config(
|
|
mode="Edge",
|
|
# assuming disc5v already happened
|
|
staticnodes=[lightpush_multiaddr, store_multiaddr],
|
|
reliabilityEnabled=True,
|
|
**common,
|
|
)
|
|
|
|
edge_result = WrapperManager.create_and_start(
|
|
config=edge_config,
|
|
event_cb=sender_collector.event_callback,
|
|
)
|
|
assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}"
|
|
|
|
with edge_result.ok_value as edge_sender:
|
|
message = create_message_bindings(
|
|
payload=to_base64("S11 edge lightpush + store test payload"),
|
|
contentTopic="/test/1/s11-edge-lightpush-store/proto",
|
|
)
|
|
|
|
send_result = edge_sender.send_message(message=message)
|
|
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
|
|
|
|
request_id = send_result.ok_value
|
|
assert request_id, "send() returned an empty RequestId"
|
|
|
|
propagated = wait_for_propagated(
|
|
collector=sender_collector,
|
|
request_id=request_id,
|
|
timeout_s=PROPAGATED_TIMEOUT_S,
|
|
)
|
|
assert propagated is not None, (
|
|
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
|
|
)
|
|
assert propagated["requestId"] == request_id
|
|
|
|
sent = wait_for_sent(
|
|
collector=sender_collector,
|
|
request_id=request_id,
|
|
timeout_s=SENT_AFTER_STORE_TIMEOUT_S,
|
|
)
|
|
assert sent is not None, (
|
|
f"No message_sent event within {SENT_AFTER_STORE_TIMEOUT_S}s " f"after propagation. Collected events: {sender_collector.events}"
|
|
)
|
|
assert sent["requestId"] == request_id
|
|
|
|
error = wait_for_error(sender_collector, request_id, timeout_s=0)
|
|
assert error is None, f"Unexpected message_error event: {error}"
|
|
|
|
assert_event_invariants(sender_collector, request_id)
|