From 11197db624b4adb4659a07799ae76a81a2379c78 Mon Sep 17 00:00:00 2001 From: AYAHASSAN287 <49167455+AYAHASSAN287@users.noreply.github.com> Date: Mon, 11 May 2026 16:53:18 +0300 Subject: [PATCH] e2e_part2 (#179) * add test s17 * Add temp changes * Add s17 positive / negative scenarios * add S19 * Add S06 relay-only test and fix wrapper helpers (#173) * - Add S06 relay-only test case for testing message propagation without a store. - Update `wrapper_helpers` for clearer event type handling and type annotations (`Optional[...]` usage). - Simplify `get_node_multiaddr` to retrieve addresses via `get_node_info_raw`. - Refactor `wrappers_manager` to adjust bindings path to `vendor` directory and add `get_node_info_raw` method. - Update `.gitignore` to exclude `store.sqlite3*`. * Refactor S06 relay-only test: replace try-finally blocks with context managers for clarity and conciseness. * Migrate S06 relay-only test to `test_send_e2e.py` and refactor with `StepsCommon` for reusability. --------- Co-authored-by: Egor Rachkovskii * Modify S19 test * Adding S21 * Fix review comments * Adding S22/S23 * Adding S24 * Add S26 * Add S30 * Add S31 * Improve `wait_for_event` loop logic and add `assert_event_invariants` helper (#178) - Refactored the `wait_for_event` function for clarity and to ensure proper deadline handling within the loop. - Introduced `assert_event_invariants` to validate per-request event properties, enforcing invariants like correct `requestId`, no duplicate terminal events, and proper timing between `Propagated` and `Sent`. - Added tests for `assert_event_invariants` enforcement in `S14` and `S15` lightpush scenarios. Co-authored-by: Egor Rachkovskii * Add S07 and S10 send API tests with event invariants helper (#176) * Add `assert_event_invariants` to enforce per-request event constraints and integrate into relevant tests * Integrate `assert_event_invariants` into edge and store tests * Remove redundant comments from `test_send_e2e.py` --------- Co-authored-by: Egor Rachkovskii * Fix some tests * Add S02/S12 send API tests and PR CI pipeline (#174) * Add tests for auto-subscribe on first send and isolated sender with no peers * Add PR CI workflow with tiered test strategy - pr_tests.yml: build job with cache, wrapper-tests, smoke-tests, and label-triggered full-suite - test_common.yml: add deploy_allure/send_discord inputs so PR runs skip reporting side effects - Add docker_required marker to S19 (needs Docker, excluded from wrapper-only CI job) - Register docker_required marker in pytest.ini * Document PR CI test workflows in README * Refine PR CI test strategy: - Exclude `docker_required` tests from smoke set in `pr_tests.yml`. - Add `wait_for_connected` helper for connection state checks. - Update S19 test to dynamically create and clean up the store node setup. - General simplifications and improved test stability. * Add `wait_for_connected` assertion to ensure sender connection state before propagation test * Refine tests and CI workflows: - Replace `ERROR_TIMEOUT_S` with `ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S` in `test_send_e2e.py`. - Adjust timeout assertion for better clarity and accuracy. - Update `pr_tests.yml` to add retries (`--reruns`) and ignore wrapper tests in smoke tests. - Change `test_common.yml` default Discord reporting to `false`. * Normalize `portsshift` to `portsShift` in `test_send_e2e.py` configuration definitions. --------- Co-authored-by: Egor Rachkovskii * Add relay-to-lightpush fallback integration tests (S08/S09) (#180) Co-authored-by: Egor Rachkovskii * Ignore S19 * fix s26 * Ignore s20 / s31 for errors * Change image name * fix xfail syntax error * rename test file * FIx flaky tests * comment the skipped tests * Fix review comments * revert tag in yml in latest * commenting lightpush * Modify the PR * Fix the ports conflict * Modify S20 * fix portsshift option * remove the /true from yml to allow errors to exist * Modify the yml to continue on error * First set of review comments * adding xfail mark for failed tests * address review comments about xfail * cleanup unused lines * event collector fix * Address review comment about delay constant * fix the timeout review comment * Add assert_event_invariants * enhance comment on S26 test * mark the waku tests as docker_required * Mark `test_s10_edge_lightpush_propagation` as xfail due to broken lightpush peer discovery. * Mark `test_s15_lightpush_retryable_error_then_recovery` as xfail due to broken lightpush peer discovery. --------- Co-authored-by: Egor Rachkovskii <32649334+at0m1x19@users.noreply.github.com> Co-authored-by: Egor Rachkovskii --- .github/workflows/pr_tests.yml | 256 +++++ .github/workflows/test_common.yml | 16 +- .gitignore | 3 +- README.md | 10 + pytest.ini | 1 + src/node/waku_node.py | 58 +- src/node/wrapper_helpers.py | 218 ++++ src/node/wrappers_manager.py | 23 +- src/steps/store.py | 56 + tests/wrappers_tests/conftest.py | 9 +- tests/wrappers_tests/test_send_e2e_part1.py | 1020 +++++++++++++++++++ tests/wrappers_tests/test_send_e2e_part2.py | 700 +++++++++++++ 12 files changed, 2309 insertions(+), 61 deletions(-) create mode 100644 .github/workflows/pr_tests.yml create mode 100644 src/node/wrapper_helpers.py create mode 100644 tests/wrappers_tests/test_send_e2e_part1.py create mode 100644 tests/wrappers_tests/test_send_e2e_part2.py diff --git a/.github/workflows/pr_tests.yml b/.github/workflows/pr_tests.yml new file mode 100644 index 000000000..1ac65b970 --- /dev/null +++ b/.github/workflows/pr_tests.yml @@ -0,0 +1,256 @@ +name: PR Tests + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +on: + pull_request: + types: [opened, synchronize, reopened, labeled] + paths: + - "src/**" + - "tests/**" + - "vendor/**" + - "requirements.txt" + - "pytest.ini" + - ".github/workflows/pr_tests.yml" + push: + branches: [master] + paths: + - "vendor/**" + workflow_dispatch: + inputs: + run_full_suite: + description: "Run the full test suite (18 shards)" + required: false + default: false + type: boolean + +jobs: + build: + name: Build liblogosdelivery + runs-on: ubuntu-latest + timeout-minutes: 45 + if: >- + github.event.action != 'labeled' || + github.event.label.name == 'full-test' + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + + - name: Compute cache key + id: cache-key + run: | + BINDINGS_HASH=$(git rev-parse HEAD:vendor/logos-delivery-python-bindings) + DELIVERY_HASH=$(git -C vendor/logos-delivery-python-bindings rev-parse HEAD:vendor/logos-delivery) + echo "key=liblogosdelivery-${{ runner.os }}-nim2.2.4-${BINDINGS_HASH}-${DELIVERY_HASH}" >> "$GITHUB_OUTPUT" + + - name: Cache liblogosdelivery.so + id: cache-lib + uses: actions/cache@v4 + with: + path: vendor/logos-delivery-python-bindings/lib/liblogosdelivery.so + key: ${{ steps.cache-key.outputs.key }} + + - name: Remove unwanted software + if: steps.cache-lib.outputs.cache-hit != 'true' + uses: ./.github/actions/prune-vm + + - name: Install system deps + if: steps.cache-lib.outputs.cache-hit != 'true' + run: | + sudo apt-get update + sudo apt-get install -y \ + util-linux \ + iproute2 \ + sudo \ + ca-certificates \ + curl \ + make \ + gcc \ + g++ + + - name: Install Nim 2.2.4 + if: steps.cache-lib.outputs.cache-hit != 'true' + run: | + set -euo pipefail + curl https://nim-lang.org/choosenim/init.sh -sSf | sh -s -- -y + echo "$HOME/.nimble/bin" >> "$GITHUB_PATH" + export PATH="$HOME/.nimble/bin:$PATH" + choosenim 2.2.4 + nim --version + nimble --version + + - name: Build liblogosdelivery.so + if: steps.cache-lib.outputs.cache-hit != 'true' + run: | + set -euo pipefail + + export PATH="$HOME/.nimble/bin:$PATH" + + BINDINGS_DIR="$(pwd)/vendor/logos-delivery-python-bindings" + DELIVERY_DIR="$BINDINGS_DIR/vendor/logos-delivery" + + mkdir -p "$BINDINGS_DIR/lib" + + cd "$DELIVERY_DIR" + + ln -sf waku.nimble waku.nims + + nimble install -y + + make setup + + make liblogosdelivery + + SO_PATH="$(find . -type f -name 'liblogosdelivery.so' | head -n 1)" + + if [ -z "$SO_PATH" ]; then + echo "liblogosdelivery.so was not built" + exit 1 + fi + + cp "$SO_PATH" "$BINDINGS_DIR/lib/liblogosdelivery.so" + + echo "Built library:" + ls -l "$BINDINGS_DIR/lib/liblogosdelivery.so" + + - name: Upload library artifact + uses: actions/upload-artifact@v4 + with: + name: liblogosdelivery + path: vendor/logos-delivery-python-bindings/lib/liblogosdelivery.so + retention-days: 1 + + wrapper-tests: + name: Wrapper Tests + runs-on: ubuntu-latest + needs: [build] + timeout-minutes: 45 + if: >- + github.event_name != 'push' && + (github.event.action != 'labeled' || github.event.label.name == 'full-test') + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + + - uses: actions/setup-python@v4 + with: + python-version: "3.12" + cache: "pip" + + - run: pip install -r requirements.txt + + - name: Download liblogosdelivery.so + uses: actions/download-artifact@v4 + with: + name: liblogosdelivery + path: vendor/logos-delivery-python-bindings/lib/ + + - name: Run wrapper tests - basic life cycle + continue-on-error: true + env: + PYTHONPATH: ${{ github.workspace }}/vendor/logos-delivery-python-bindings/waku + run: | + pytest tests/wrappers_tests/test_basic_life_cycle.py \ + -m "not docker_required" \ + --reruns 2 \ + --junit-xml=wrapper-results-basic.xml + + - name: Run wrapper tests - send e2e part 1 + continue-on-error: true + env: + PYTHONPATH: ${{ github.workspace }}/vendor/logos-delivery-python-bindings/waku + run: | + pytest tests/wrappers_tests/test_send_e2e_part1.py \ + -m "not docker_required" \ + --reruns 2 \ + --junit-xml=wrapper-results-send-part1.xml + + - name: Run wrapper tests - send e2e part 2 + continue-on-error: true + env: + PYTHONPATH: ${{ github.workspace }}/vendor/logos-delivery-python-bindings/waku + run: | + pytest tests/wrappers_tests/test_send_e2e_part2.py \ + -m "not docker_required" \ + --reruns 2 \ + --junit-xml=wrapper-results-send-part2.xml + + - name: Test Report + if: always() + uses: dorny/test-reporter@95058abb17504553158e70e2c058fe1fda4392c2 + with: + name: Wrapper Test Results + path: wrapper-results-*.xml + reporter: java-junit + use-actions-summary: "true" + + smoke-tests: + name: Smoke Tests + runs-on: ubuntu-latest + needs: [build] + timeout-minutes: 30 + if: >- + github.event_name != 'push' && + (github.event.action != 'labeled' || github.event.label.name == 'full-test') + env: + NODE_1: "wakuorg/nwaku:latest" + NODE_2: "wakuorg/nwaku:latest" + ADDITIONAL_NODES: "wakuorg/nwaku:latest,wakuorg/nwaku:latest,wakuorg/nwaku:latest" + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + + - uses: actions/setup-python@v4 + with: + python-version: "3.12" + cache: "pip" + + - run: pip install -r requirements.txt + + - name: Download liblogosdelivery.so + uses: actions/download-artifact@v4 + with: + name: liblogosdelivery + path: vendor/logos-delivery-python-bindings/lib/ + + - name: Run smoke tests + env: + PYTHONPATH: ${{ github.workspace }}/vendor/logos-delivery-python-bindings/waku + run: | + pytest -m "smoke and not docker_required" \ + --ignore=vendor/logos-delivery-python-bindings/tests \ + --ignore=tests/wrappers_tests \ + --reruns 1 \ + -n 4 \ + --dist=loadgroup \ + --junit-xml=smoke-results.xml + + - name: Test Report + if: always() + uses: dorny/test-reporter@95058abb17504553158e70e2c058fe1fda4392c2 + with: + name: Smoke Test Results + path: smoke-results.xml + reporter: java-junit + use-actions-summary: "true" + + full-suite: + name: Full Suite + if: >- + github.event_name != 'push' && + (contains(github.event.pull_request.labels.*.name, 'full-test') || + github.event.inputs.run_full_suite == 'true') + uses: ./.github/workflows/test_common.yml + secrets: inherit + with: + node1: "wakuorg/nwaku:latest" + node2: "wakuorg/nwaku:latest" + additional_nodes: "wakuorg/nwaku:latest,wakuorg/nwaku:latest,wakuorg/nwaku:latest" + caller: "pr" + deploy_allure: false + send_discord: false \ No newline at end of file diff --git a/.github/workflows/test_common.yml b/.github/workflows/test_common.yml index 397d8fbf0..079be507c 100644 --- a/.github/workflows/test_common.yml +++ b/.github/workflows/test_common.yml @@ -27,6 +27,16 @@ on: description: "Run fleet tests only" type: boolean default: false + deploy_allure: + required: false + description: "Deploy allure report to gh-pages" + type: boolean + default: true + send_discord: + required: false + description: "Send test results to Discord" + type: boolean + default: false env: FORCE_COLOR: "1" @@ -42,7 +52,7 @@ jobs: strategy: fail-fast: false matrix: - shard: ${{ inputs.fleet_tests && fromJSON('[0]') || fromJSON('[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17]') }} + shard: [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17] # total number of shards =18 means tests will split into 18 thread and run in parallel to increase execution speed # command for sharding : # pytest --shard-id= --num-shards= @@ -223,7 +233,7 @@ jobs: aggregate-reports: runs-on: ubuntu-latest needs: [tests] - if: always() + if: always() && inputs.deploy_allure steps: - name: Download all allure results @@ -365,7 +375,7 @@ jobs: - name: Send report to Discord uses: rjstone/discord-webhook-notify@v1 - if: always() && env.CALLER != 'manual' + if: always() && env.CALLER != 'manual' && inputs.send_discord with: severity: ${{ env.TESTS_RESULT == 'success' && 'info' || 'error' }} username: ${{ github.workflow }} diff --git a/.gitignore b/.gitignore index e64e42ff9..7d7cfc8b6 100644 --- a/.gitignore +++ b/.gitignore @@ -105,4 +105,5 @@ dmypy.json # Pyre type checker .pyre/ -third_party/logos-delivery-python-bindings +# Waku node runtime artifacts +store.sqlite3* diff --git a/README.md b/README.md index d731ab6a1..1b6903fbf 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,16 @@ To launch it manually: 2. Click **► Run workflow**. 3. Pick the branch you want to test (defaults to `master`) and press **Run workflow**. +### PR tests + +Every push to a pull request triggers **pr\_tests.yml** which runs: + +1. **Build** — compiles `liblogosdelivery.so` (cached by submodule commit hash). +2. **Wrapper tests** — all tests under `tests/wrappers_tests/` that don't require Docker (~5 min). +3. **Smoke tests** — `pytest -m smoke` with Docker nodes (~10 min). + +To run the **full test suite** (18 shards, same as daily) on a PR, add the label **`full-test`** to the pull request. The full suite will start automatically. + ### On‑demand matrix against custom *logos-messaging-nim* versions Use **interop\_tests.yml** when you need to test a PR or a historical image: diff --git a/pytest.ini b/pytest.ini index f945bd8a0..9c02caf43 100644 --- a/pytest.ini +++ b/pytest.ini @@ -12,5 +12,6 @@ log_file_format = %(asctime)s.%(msecs)03d %(levelname)s [%(name)s] %(message)s timeout = 300 markers = smoke: marks tests as smoke test (deselect with '-m "not smoke"') + docker_required: test requires Docker nodes (WakuNode) waku_test_fleet: marks tests that run against a live Waku test fleet store2000: marks tests that use 2000 store messages diff --git a/src/node/waku_node.py b/src/node/waku_node.py index 1fa71dd2e..7afe881c6 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -17,7 +17,6 @@ from src.node.docker_mananger import DockerManager from src.env_vars import DOCKER_LOG_DIR from src.data_storage import DS from src.test_data import DEFAULT_CLUSTER_ID, LOG_ERROR_KEYWORDS, VALID_PUBSUB_TOPICS -from src.node.wrappers_manager import WrapperManager logger = get_custom_logger(__name__) @@ -106,25 +105,16 @@ class WakuNode: self._container = None self.rln_membership_index = None self.start_args = {} - self._wrapper_node = None self._rln_creds_set = False logger.debug(f"WakuNode instance initialized with log path {self._log_path}") - @property - def _is_wrapper(self) -> bool: - return self._wrapper_node is not None - @retry(stop=stop_after_delay(60), wait=wait_fixed(0.1), reraise=True) def start(self, wait_for_node_sec=20, use_wrapper=False, **kwargs): if WakuNode._pre_start_hook is not None: kwargs = WakuNode._pre_start_hook(self, kwargs) logger.debug("Starting Node...") default_args, remove_container = self._prepare_start_context(**kwargs) - - if use_wrapper: - self._start_wrapper(default_args, wait_for_node_sec) - else: - self._start_docker(default_args, remove_container, wait_for_node_sec) + self._start_docker(default_args, remove_container, wait_for_node_sec) def _prepare_start_context(self, **kwargs): self._docker_manager.create_network() @@ -285,7 +275,7 @@ class WakuNode: }, } - @retry(stop=stop_after_attempt(1), wait=wait_fixed(0.1), reraise=True) + @retry(stop=stop_after_delay(250), wait=wait_fixed(0.1), reraise=True) def register_rln(self, **kwargs): logger.debug("Registering RLN credentials...") self._docker_manager.create_network() @@ -338,12 +328,6 @@ class WakuNode: @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) def stop(self): - if self._is_wrapper: - self._stop_wrapper() - else: - self._stop_docker() - - def _stop_docker(self): if self._container: logger.debug(f"Stopping container with id {self._container.short_id}") try: @@ -359,14 +343,6 @@ class WakuNode: self._container = None logger.debug("Container stopped.") - def _stop_wrapper(self): - logger.debug("Stopping wrapper node") - result = self._wrapper_node.stop_and_destroy() - if result.is_err(): - logger.error(f"Failed to stop wrapper node: {result.err()}") - self._wrapper_node = None - logger.debug("Wrapper node stopped and destroyed.") - @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) def kill(self): if self._container: @@ -450,32 +426,14 @@ class WakuNode: def get_tcp_address(self): return f"/ip4/{self._ext_ip}/tcp/{self._tcp_port}" - def subscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0): - if self._is_wrapper: - result = self._wrapper_node.subscribe_content_topic(content_topic, timeout_s=timeout_s) - if result.is_err(): - raise RuntimeError(f"subscribe_content_topic failed: {result.err()}") - return result.ok_value - else: - return self._api.set_relay_auto_subscriptions([content_topic]) + def subscribe_content_topic(self, content_topic: str): + return self._api.set_relay_auto_subscriptions([content_topic]) - def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0): - if self._is_wrapper: - result = self._wrapper_node.unsubscribe_content_topic(content_topic, timeout_s=timeout_s) - if result.is_err(): - raise RuntimeError(f"unsubscribe_content_topic failed: {result.err()}") - return result.ok_value - else: - return self._api.delete_relay_auto_subscriptions([content_topic]) + def unsubscribe_content_topic(self, content_topic: str): + return self._api.delete_relay_auto_subscriptions([content_topic]) - def send_message(self, message: dict, *, timeout_s: float = 20.0): - if self._is_wrapper: - result = self._wrapper_node.send_message(message, timeout_s=timeout_s) - if result.is_err(): - raise RuntimeError(f"send_message failed: {result.err()}") - return result.ok_value - else: - return self._api.send_relay_auto_message(message) + def send_message(self, message: dict): + return self._api.send_relay_auto_message(message) def info(self): return self._api.info() diff --git a/src/node/wrapper_helpers.py b/src/node/wrapper_helpers.py new file mode 100644 index 000000000..7c9dd1a2d --- /dev/null +++ b/src/node/wrapper_helpers.py @@ -0,0 +1,218 @@ +from __future__ import annotations + +import json +import threading +import time +from typing import Optional +from src.libs.common import to_base64 + +DEFAULT_CONTENT_TOPIC = "/test/1/default/proto" +DEFAULT_PAYLOAD = to_base64("test payload") +EVENT_PROPAGATED = "message_propagated" +EVENT_SENT = "message_sent" +EVENT_ERROR = "message_error" + +# MaxTimeInCache from send_service.nim. +MAX_TIME_IN_CACHE_S = 60.0 +# Extra slack to cover the background retry loop tick after the window expires. +CACHE_EXPIRY_SLACK_S = 10.0 +ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S = MAX_TIME_IN_CACHE_S + CACHE_EXPIRY_SLACK_S +RETRY_WINDOW_EXPIRED_MSG = "Unable to send within retry time window" + + +class EventCollector: + """Thread-safe collector for async node events. + + Pass `collector.event_callback` as the `event_cb` argument to + WrapperManager.create_and_start(). Every event fired by the library + is decoded from JSON and appended to `self.events`. + """ + + def __init__(self): + self._lock = threading.Lock() + self.events: list[dict] = [] + + def event_callback(self, ret: int, raw: bytes) -> None: + try: + payload = json.loads(raw.decode("utf-8")) + except Exception: + payload = {"_raw": raw.decode("utf-8", errors="replace"), "_ret": ret} + + with self._lock: + self.events.append(payload) + + def get_events_for_request(self, request_id: str) -> list[dict]: + with self._lock: + return [e for e in self.events if e.get("requestId") == request_id] + + def snapshot(self) -> list[dict]: + """Return a thread-safe copy of all collected events. + + Use this whenever you need to iterate over every event (rather than + events for a single request_id). Iterating `self.events` directly is + unsafe because `event_callback` appends from the wrapper's event + thread. + """ + with self._lock: + return list(self.events) + + +def is_propagated_event(event: dict) -> bool: + return event.get("eventType") == EVENT_PROPAGATED + + +def is_sent_event(event: dict) -> bool: + return event.get("eventType") == EVENT_SENT + + +def is_error_event(event: dict) -> bool: + return event.get("eventType") == EVENT_ERROR + + +def wait_for_event( + collector: EventCollector, + request_id: str, + predicate, + timeout_s: float, + poll_interval_s: float = 0.5, +) -> Optional[dict]: + """Poll until an event matching `predicate` arrives for `request_id`, + or until `timeout_s` elapses. Returns the matching event or None. + """ + deadline = time.monotonic() + timeout_s + + while True: + for event in collector.get_events_for_request(request_id): + if predicate(event): + return event + if time.monotonic() >= deadline: + return None + time.sleep(poll_interval_s) + + +def wait_for_propagated(collector: EventCollector, request_id: str, timeout_s: float) -> Optional[dict]: + return wait_for_event(collector, request_id, is_propagated_event, timeout_s) + + +def wait_for_sent(collector: EventCollector, request_id: str, timeout_s: float) -> Optional[dict]: + return wait_for_event(collector, request_id, is_sent_event, timeout_s) + + +def wait_for_error(collector: EventCollector, request_id: str, timeout_s: float) -> Optional[dict]: + return wait_for_event(collector, request_id, is_error_event, timeout_s) + + +def assert_no_error(collector: EventCollector, request_id: str, context: str = "") -> None: + """Assert that no message_error event is currently buffered for `request_id`.""" + event = wait_for_error(collector, request_id, timeout_s=0) + suffix = f" ({context})" if context else "" + assert event is None, f"Unexpected message_error event{suffix}: {event}" + + +def assert_no_sent(collector: EventCollector, request_id: str, context: str = "") -> None: + """Assert that no message_sent event is currently buffered for `request_id`.""" + event = wait_for_sent(collector, request_id, timeout_s=0) + suffix = f" ({context})" if context else "" + assert event is None, f"Unexpected message_sent event{suffix}: {event}" + + +def assert_no_propagated(collector: EventCollector, request_id: str, context: str = "") -> None: + """Assert that no message_propagated event is currently buffered for `request_id`.""" + event = wait_for_propagated(collector, request_id, timeout_s=0) + suffix = f" ({context})" if context else "" + assert event is None, f"Unexpected message_propagated event{suffix}: {event}" + + +def wait_for_connected( + collector: EventCollector, + timeout_s: float = 10.0, + poll_interval_s: float = 0.3, +) -> Optional[dict]: + """Wait until a connection_status_change event with PartiallyConnected or Connected arrives.""" + deadline = time.monotonic() + timeout_s + while time.monotonic() < deadline: + for event in collector.snapshot(): + if event.get("eventType") == "connection_status_change" and event.get("connectionStatus") in ("PartiallyConnected", "Connected"): + return event + time.sleep(poll_interval_s) + return None + + +TERMINAL_EVENT_TYPES = {EVENT_PROPAGATED, EVENT_SENT, EVENT_ERROR} + + +def assert_event_invariants(collector: EventCollector, request_id: str) -> None: + """Check per-request event invariants (issue #163): + - All events carry the correct requestId. + - No duplicate terminal events (Propagated, Sent, Error). + - Sent never appears before Propagated. + """ + events = collector.get_events_for_request(request_id) + assert events, f"No events found for request {request_id}" + + counts: dict[str, int] = {} + first_index: dict[str, int] = {} + for i, event in enumerate(events): + assert event.get("requestId") == request_id, ( + f"Event at index {i} has wrong requestId: " f"expected {request_id!r}, got {event.get('requestId')!r}" + ) + event_type = event.get("eventType", "") + if event_type in TERMINAL_EVENT_TYPES: + counts[event_type] = counts.get(event_type, 0) + 1 + if event_type not in first_index: + first_index[event_type] = i + + for event_type, count in counts.items(): + assert count == 1, f"Duplicate {event_type} events for request {request_id}: " f"got {count}, expected 1. Events: {events}" + + if EVENT_SENT in first_index and EVENT_PROPAGATED in first_index: + assert first_index[EVENT_PROPAGATED] < first_index[EVENT_SENT], ( + f"message_sent (index {first_index[EVENT_SENT]}) arrived before " + f"message_propagated (index {first_index[EVENT_PROPAGATED]}) " + f"for request {request_id}. Events: {events}" + ) + + +def get_node_multiaddr(node) -> str: + """Return the TCP multiaddr (with peer-id) from a WrapperManager node. + + Asserts that the wrapper returned exactly one address. If the wrapper ever + starts returning multiple addresses (newline/comma-separated or a JSON + list), this fails loudly instead of silently passing a malformed string + downstream to staticnodes / add_peers. + """ + result = node.get_node_info_raw("MyMultiaddresses") + if result.is_err(): + raise RuntimeError(f"get_node_info_raw failed: {result.err()}") + + addr = result.ok_value.strip() + if not addr or not addr.startswith("/"): + raise RuntimeError(f"Unexpected multiaddr format: {addr!r}") + + if "\n" in addr or "," in addr or addr.startswith("["): + raise AssertionError(f"Expected a single multiaddr from MyMultiaddresses, got multiple: {addr!r}") + + return addr + + +def create_message_bindings(**overrides) -> dict: + envelope = { + "contentTopic": DEFAULT_CONTENT_TOPIC, + "payload": DEFAULT_PAYLOAD, + "ephemeral": False, + } + envelope.update(overrides) + return envelope + + +def assert_no_unknown_request_ids(collector: EventCollector, issued_request_ids) -> None: + """Cross-association guard: every event carrying a requestId must belong + to one of the request ids we issued. Catches events that get attached to + the wrong request id under concurrency. + """ + issued = set(issued_request_ids) + for event in collector.snapshot(): + event_request_id = event.get("requestId") + if event_request_id is None: + continue + assert event_request_id in issued, f"Event carries an unknown requestId={event_request_id!r}, " f"not in issued set {issued}. Event: {event}" diff --git a/src/node/wrappers_manager.py b/src/node/wrappers_manager.py index 41845077d..a0d20e0d6 100644 --- a/src/node/wrappers_manager.py +++ b/src/node/wrappers_manager.py @@ -2,9 +2,9 @@ import sys from pathlib import Path from result import Result, Ok, Err -_THIRD_PARTY = Path(__file__).resolve().parents[2] / "third_party" / "logos-delivery-python-bindings" / "waku" -if str(_THIRD_PARTY) not in sys.path: - sys.path.insert(0, str(_THIRD_PARTY)) +_BINDINGS_PATH = Path(__file__).resolve().parents[2] / "vendor" / "logos-delivery-python-bindings" / "waku" +if str(_BINDINGS_PATH) not in sys.path: + sys.path.insert(0, str(_BINDINGS_PATH)) from wrapper import NodeWrapper as _NodeWrapper # type: ignore[import] @@ -77,5 +77,22 @@ class WrapperManager: def get_node_info(self, node_info_id: str, *, timeout_s: float = 20.0) -> Result[dict, str]: return self._node.get_node_info(node_info_id, timeout_s=timeout_s) + def get_node_info_raw(self, node_info_id: str, *, timeout_s: float = 20.0) -> Result[str, str]: + """Like get_node_info but returns the raw string without JSON parsing.""" + from wrapper import lib, ffi, _new_cb_state, _wait_cb_raw # type: ignore[import] + + state = _new_cb_state() + cb = self._node._make_waiting_cb(state) + rc = lib.logosdelivery_get_node_info(self._node.ctx, cb, ffi.NULL, node_info_id.encode("utf-8")) + if rc != 0: + return Err(f"get_node_info_raw: immediate call failed (ret={rc})") + wait_result = _wait_cb_raw(state, "get_node_info_raw", timeout_s) + if wait_result.is_err(): + return Err(wait_result.err()) + cb_ret, cb_msg = wait_result.ok_value + if cb_ret != 0: + return Err(f"get_node_info_raw: callback failed (ret={cb_ret})") + return Ok(cb_msg.decode("utf-8") if cb_msg else "") + def get_available_configs(self, *, timeout_s: float = 20.0) -> Result[dict, str]: return self._node.get_available_configs(timeout_s=timeout_s) diff --git a/src/steps/store.py b/src/steps/store.py index 543db8c4e..311932ae5 100644 --- a/src/steps/store.py +++ b/src/steps/store.py @@ -250,6 +250,62 @@ class StepsStore(StepsCommon): expected_hash == actual_hash ), f"Message hash at index {idx} returned by store doesn't match the computed message hash {expected_hash}. Actual hash: {actual_hash}" + @allure.step + def check_sent_message_is_stored( + self, + expected_hashes, + store_node=None, + peer_addr=None, + include_data=None, + pubsub_topic=None, + content_topics=None, + start_time=None, + end_time=None, + hashes=None, + cursor=None, + page_size=None, + ascending=None, + store_v="v3", + **kwargs, + ): + """Verify that messages with the given hashes are present in the store.""" + if pubsub_topic is None: + pubsub_topic = self.test_pubsub_topic + if isinstance(expected_hashes, str): + expected_hashes = [expected_hashes] + if store_node is None: + store_node = self.store_nodes + elif not isinstance(store_node, list): + store_node = [store_node] + + for node in store_node: + logger.debug(f"Checking that peer {node.image} can find the stored messages by hash") + self.store_response = self.get_messages_from_store( + node=node, + peer_addr=peer_addr, + include_data=include_data, + pubsub_topic=pubsub_topic, + content_topics=content_topics, + start_time=start_time, + end_time=end_time, + hashes=hashes, + cursor=cursor, + page_size=page_size, + ascending=ascending, + store_v=store_v, + **kwargs, + ) + + logger.debug(f"messages length is {len(self.store_response.messages)}") + assert self.store_response.messages, f"Peer {node.image} couldn't find any messages. " f"Actual response: {self.store_response.resp_json}" + assert len(self.store_response.messages) >= len(expected_hashes), ( + f"Expected at least {len(expected_hashes)} messages " f"but got {len(self.store_response.messages)}" + ) + + actual_hashes = [self.store_response.message_hash(i) for i in range(len(self.store_response.messages))] + for expected_hash in expected_hashes: + assert expected_hash in actual_hashes, f"Expected hash {expected_hash} not found in store. " f"Actual hashes: {actual_hashes}" + @allure.step def check_store_returns_empty_response(self, pubsub_topic=None): if not pubsub_topic: diff --git a/tests/wrappers_tests/conftest.py b/tests/wrappers_tests/conftest.py index 7e96b6d80..3d2e19e38 100644 --- a/tests/wrappers_tests/conftest.py +++ b/tests/wrappers_tests/conftest.py @@ -3,7 +3,8 @@ import pytest from src.test_data import DEFAULT_CLUSTER_ID -def _free_port(): +def free_port(): + """Return a currently-unbound TCP/UDP port from the OS.""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("", 0)) return s.getsockname()[1] @@ -13,9 +14,9 @@ def build_node_config(**overrides): config = { "logLevel": "DEBUG", "listenAddress": "0.0.0.0", - "tcpPort": _free_port(), - "discv5UdpPort": _free_port(), - "restPort": _free_port(), + "tcpPort": free_port(), + "discv5UdpPort": free_port(), + "restPort": free_port(), "restAddress": "0.0.0.0", "clusterId": DEFAULT_CLUSTER_ID, "relay": True, diff --git a/tests/wrappers_tests/test_send_e2e_part1.py b/tests/wrappers_tests/test_send_e2e_part1.py new file mode 100644 index 000000000..6a94adf81 --- /dev/null +++ b/tests/wrappers_tests/test_send_e2e_part1.py @@ -0,0 +1,1020 @@ +from concurrent.futures import ThreadPoolExecutor + +import pytest +from src.env_vars import NODE_2 +from src.steps.common import StepsCommon +from src.libs.common import delay, to_base64 +from src.libs.custom_logger import get_custom_logger +from src.node.waku_node import WakuNode +from src.node.wrappers_manager import WrapperManager +from src.node.wrapper_helpers import ( + EventCollector, + assert_event_invariants, + create_message_bindings, + get_node_multiaddr, + wait_for_connected, + wait_for_propagated, + wait_for_sent, + wait_for_error, +) +from src.steps.store import StepsStore +from tests.wrappers_tests.conftest import free_port + +logger = get_custom_logger(__name__) + +## max time to wait after sending the message +PROPAGATED_TIMEOUT_S = 30.0 +SENT_TIMEOUT_S = 10.0 +NO_SENT_OBSERVATION_S = 5.0 +SENT_AFTER_STORE_TIMEOUT_S = 60.0 +NO_STORE_OBSERVATION_S = 60.0 +RECOVERY_TIMEOUT_S = 45.0 + +# S20 stabilization delays for gossipsub mesh formation. +MESH_STABILIZATION_S = 10 +STORE_JOIN_STABILIZATION_S = 10 + +# MaxTimeInCache from send_service.nim. +MAX_TIME_IN_CACHE_S = 60.0 +# Extra slack to cover the background retry loop tick after the window expires. +CACHE_EXPIRY_SLACK_S = 10.0 +ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S = MAX_TIME_IN_CACHE_S + CACHE_EXPIRY_SLACK_S +RETRY_WINDOW_EXPIRED_MSG = "Unable to send within retry time window" + +# S30: concurrent sends on the same content topic during initial auto-subscribe. +S30_CONCURRENT_SENDS = 5 +S30_CONTENT_TOPIC = "/test/1/s30-concurrent/proto" + +# S31: concurrent sends across mixed topics during peer churn. +S31_BURST_SIZE = 8 +S31_CONTENT_TOPICS = [ + "/test/1/s31-topic-a/proto", + "/test/1/s31-topic-b/proto", + "/test/1/s31-topic-c/proto", + "/test/1/s31-topic-d/proto", + "/test/1/s31-topic-e/proto", + "/test/1/s31-topic-f/proto", + "/test/1/s31-topic-g/proto", + "/test/1/s31-topic-h/proto", +] + + +class TestSendBeforeRelay(StepsStore): + def test_s17_send_before_relay_peers_joins(self, node_config): + """ + S17: sender starts isolated, calls send() + - send() returns Ok(RequestId) immediately + - Propagated event eventually arrives + """ + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "store": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + ) + + sender_result = WrapperManager.create_and_start( + config=node_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender_node: + message = create_message_bindings() + send_result = sender_node.send_message(message=message) + assert send_result.is_ok(), f"send() must return Ok(RequestId) even with no peers, got: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + # Step 2: start a relay peer with store enabled. + relay_config = { + **node_config, + "staticnodes": [get_node_multiaddr(sender_node)], + "portsShift": 1, + "store": True, + } + + relay_result = WrapperManager.create_and_start(config=relay_config) + assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" + + with relay_result.ok_value: + # Match the gating part2's tests use: wait until the sender + # actually reports Connected/PartiallyConnected before asserting + # on propagation. Without this, the wait_for_propagated poll can + # miss the event because the sender's mesh hasn't formed yet. + assert wait_for_connected(sender_collector) is not None, ( + f"Sender did not reach Connected/PartiallyConnected after " f"relay peer joined. Collected events: {sender_collector.events}" + ) + + propagated_event = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated_event is not None, ( + f"No MessagePropagatedEvent received within {PROPAGATED_TIMEOUT_S}s " + f"after relay peer joined. Collected events: {sender_collector.events}" + ) + sent_event = wait_for_sent( + collector=sender_collector, + request_id=request_id, + timeout_s=SENT_TIMEOUT_S, + ) + assert sent_event is not None, ( + f"No MessageSentEvent received within {SENT_TIMEOUT_S}s " + f"from a store-enabled relay peer. Collected events: {sender_collector.events}" + ) + + assert_event_invariants(sender_collector, request_id) + + @pytest.mark.docker_required + @pytest.mark.xfail(reason="fails to republish after store peer joins mesh see https://github.com/logos-messaging/logos-delivery/issues/3848") + def test_s19_store_peer_appears_after_propagation(self, node_config): + """ + S19: a store peer comes online later. + - send() returns Ok(RequestId) immediately + - Propagated --- relay peer + - Sent when store peer is reachable + """ + sender_collector = EventCollector() + + node_config.update({"relay": True, "store": False, "discv5Discovery": False, "numShardsInNetwork": 1, "reliabilityEnabled": True}) + + sender_result = WrapperManager.create_and_start( + config=node_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender_node: + # relay peer + relay_config = { + **node_config, + "tcpPort": free_port(), + "discv5UdpPort": free_port(), + "restPort": free_port(), + "staticnodes": [get_node_multiaddr(sender_node)], + "store": False, + "reliabilityEnabled": True, + } + + relay_result = WrapperManager.create_and_start(config=relay_config) + assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" + + with relay_result.ok_value as relay_peer: + # Wait until the sender actually reports a connection before + # sending. Without this, send() can race the static-peer + # dial on slower runners (same gate S17 uses). + assert wait_for_connected(sender_collector) is not None, ( + f"Sender did not reach Connected/PartiallyConnected after " f"relay peer joined. Collected events: {sender_collector.events}" + ) + message = create_message_bindings() + send_result = sender_node.send_message(message=message) + assert send_result.is_ok(), f"send() must return Ok(RequestId), got: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + # Propagated should arrive via the relay peer. + propagated_event = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated_event is not None, ( + f"No MessagePropagatedEvent received within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + + early_sent_event = wait_for_sent( + collector=sender_collector, + request_id=request_id, + timeout_s=NO_SENT_OBSERVATION_S, + ) + assert early_sent_event is None, f"MessageSentEvent arrived before any store peer was reachable. " f"Event: {early_sent_event}" + + # Store peer + store_node = WakuNode(NODE_2, f"store_node") + store_node.start(relay="true", store="true", discv5_discovery="false", cluster_id=node_config["clusterId"], shard=0) + store_node.set_relay_subscriptions([self.test_pubsub_topic]) + relay_multiaddr = get_node_multiaddr(relay_peer) + sender_multiaddr = get_node_multiaddr(sender_node) + store_node.add_peers([relay_multiaddr, sender_multiaddr]) + self.wait_for_autoconnection([store_node], hard_wait=40) + delay(3) + + sent_event = wait_for_sent( + collector=sender_collector, + request_id=request_id, + timeout_s=SENT_AFTER_STORE_TIMEOUT_S, + ) + + assert sent_event is not None, ( + f"No MessageSentEvent received within {SENT_AFTER_STORE_TIMEOUT_S}s " + f"after store peer joined. Collected events: {sender_collector.events}" + ) + + self.check_published_message_is_stored( + store_node=store_node, + pubsub_topic=self.test_pubsub_topic, + messages_to_check=[message], + page_size=5, + ascending="true", + ) + + assert_event_invariants(sender_collector, request_id) + + @pytest.mark.docker_required + @pytest.mark.skip(reason="Forcing the miss store round not possible") + def test_s20_store_misses_initially_then_retry_succeeds(self, node_config): + """ + S20: relay propagation succeeds, the first store query misses + (the store peer is reachable but does not yet have the message), + a later retry republishes through the relay mesh, and the store + peer then archives it. + + Covers state flow: + SuccessfullyPropagated -> NextRoundRetry + -> SuccessfullyPropagated -> SuccessfullyValidated + + """ + sender_collector = EventCollector() + store_node = WakuNode(NODE_2, f"s20_store_node_{self.test_id}") + store_node.start( + relay="true", + store="true", + discv5_discovery="false", + cluster_id=node_config["clusterId"], + shard=0, + ) + store_multiaddr = store_node.get_multiaddr_with_id() + + node_config.update( + { + "relay": True, + "store": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + "reliabilityEnabled": True, + "storenode": store_multiaddr, + } + ) + + sender_result = WrapperManager.create_and_start( + config=node_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender_node: + relay_config = { + **node_config, + "staticnodes": [get_node_multiaddr(sender_node)], + "portsShift": 1, + "store": False, + } + relay_result = WrapperManager.create_and_start(config=relay_config) + assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" + + with relay_result.ok_value as relay_peer: + # Wait for the sender to see the relay peer before publishing. + assert wait_for_connected(sender_collector) is not None, ( + f"Sender did not reach Connected/PartiallyConnected. " f"Collected events: {sender_collector.events}" + ) + + # Let the gossipsub mesh form between sender and relay peer. + delay(MESH_STABILIZATION_S) + + message = create_message_bindings(ephemeral=False) + send_result = sender_node.send_message(message=message) + assert send_result.is_ok(), f"send() must return Ok(RequestId), got: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + # Round 1: propagation succeeds via the relay peer. + propagated_event = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated_event is not None, ( + f"No MessagePropagatedEvent within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + + # The store peer is reachable for queries but never received + # the message via gossipsub, so the first store query must + # miss and Sent must NOT arrive yet. + early_sent_event = wait_for_sent( + collector=sender_collector, + request_id=request_id, + timeout_s=NO_SENT_OBSERVATION_S, + ) + assert early_sent_event is None, ( + f"MessageSentEvent arrived before the store could have the message. " + f"Initial store query should have missed. Event: {early_sent_event}" + ) + + # Now subscribe the store to the test topic and wire it into + # the relay mesh so the next retry round's republish reaches + # the store via gossipsub. + store_node.set_relay_subscriptions([self.test_pubsub_topic]) + store_node.add_peers([get_node_multiaddr(sender_node), get_node_multiaddr(relay_peer)]) + self.wait_for_autoconnection([store_node], hard_wait=10) + delay(STORE_JOIN_STABILIZATION_S) + + # Round 2: retry republishes, store archives, next query hits. + sent_event = wait_for_sent( + collector=sender_collector, + request_id=request_id, + timeout_s=SENT_AFTER_STORE_TIMEOUT_S, + ) + assert sent_event is not None, ( + f"No MessageSentEvent within {SENT_AFTER_STORE_TIMEOUT_S}s " + f"after the store joined the relay mesh. The retry round " + f"should have republished and the store should have archived. " + f"Collected events: {sender_collector.events}" + ) + + self.check_published_message_is_stored( + store_node=store_node, + pubsub_topic=self.test_pubsub_topic, + messages_to_check=[message], + page_size=5, + ascending="true", + ) + + assert_event_invariants(sender_collector, request_id) + + def test_s21_error_when_retry_window_expires(self, node_config): + """ + S21: delivery retry window expires before any valid path recovers. + """ + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "store": False, + "lightpush": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + ) + + sender_result = WrapperManager.create_and_start( + config=node_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender_node: + message = create_message_bindings() + send_result = sender_node.send_message(message=message) + assert send_result.is_ok(), f"send() must return Ok(RequestId) even with no peers, got: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + # No peer + error_event = wait_for_error( + collector=sender_collector, + request_id=request_id, + timeout_s=ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S, + ) + assert error_event is not None, ( + f"No MessageErrorEvent received within {ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S}s " + f"(MaxTimeInCache={MAX_TIME_IN_CACHE_S}s + slack). " + f"Collected events: {sender_collector.events}" + ) + logger.info(f"S21 received error event: {error_event}") + + assert error_event.get("error") == RETRY_WINDOW_EXPIRED_MSG, ( + f"Unexpected error message in message_error event.\n" + f"Expected: {RETRY_WINDOW_EXPIRED_MSG!r}\n" + f"Got: {error_event.get('error')!r}\n" + f"Full event: {error_event}" + ) + + assert_event_invariants(sender_collector, request_id) + + def test_s22_non_ephemeral_message_with_reliability_disabled(self, node_config): + """ + S22: non-ephemeral message with reliabilityEnabled disabled. + - propagation path exists ,reliabilityEnabled = false. + - Expected: Ok(RequestId), Propagated event only, no Sent event. + Note: S17 already covers the positive path of this test with reliabilityEnabled=True. + """ + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "store": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + "reliabilityEnabled": False, + } + ) + + sender_result = WrapperManager.create_and_start( + config=node_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender_node: + relay_config = { + **node_config, + "staticnodes": [get_node_multiaddr(sender_node)], + "portsShift": 1, + "store": True, + } + + relay_result = WrapperManager.create_and_start(config=relay_config) + assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" + + with relay_result.ok_value: + # Wait for the sender to actually establish the mesh before + # publishing, matching part2's pattern. Otherwise the publish + # races with mesh formation and message_propagated may not fire. + assert wait_for_connected(sender_collector) is not None, ( + f"Sender did not reach Connected/PartiallyConnected. " f"Collected events: {sender_collector.events}" + ) + + message = create_message_bindings(ephemeral=False) + send_result = sender_node.send_message(message=message) + assert send_result.is_ok(), f"send() must return Ok(RequestId), got: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + propagated_event = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated_event is not None, ( + f"No MessagePropagatedEvent received within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + + sent_event = wait_for_sent( + collector=sender_collector, + request_id=request_id, + timeout_s=NO_SENT_OBSERVATION_S, + ) + assert sent_event is None, ( + f"Unexpected MessageSentEvent received when reliabilityEnabled is disabled.\n" + f"Sent event: {sent_event}\n" + f"Collected events: {sender_collector.events}" + ) + + assert_event_invariants(sender_collector, request_id) + + def test_s23_no_sent_event_when_relay_has_no_store(self, node_config): + """ + S23: non-ephemeral message, reliability enabled, no store peer ever reachable. + - Expected: Ok(RequestId), Propagated event only, no Sent and no terminal error. + """ + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "store": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + "reliabilityEnabled": True, + } + ) + + sender_result = WrapperManager.create_and_start( + config=node_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender_node: + message = create_message_bindings(ephemeral=False) + send_result = sender_node.send_message(message=message) + assert send_result.is_ok(), f"send() must return Ok(RequestId) even with no peers, got: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + relay_config = { + **node_config, + "staticnodes": [get_node_multiaddr(sender_node)], + "portsShift": 1, + "store": False, + } + + relay_result = WrapperManager.create_and_start(config=relay_config) + assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" + + with relay_result.ok_value: + propagated_event = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated_event is not None, ( + f"No MessagePropagatedEvent received within {PROPAGATED_TIMEOUT_S}s " + f"after relay peer joined. Collected events: {sender_collector.events}" + ) + + sent_event = wait_for_sent( + collector=sender_collector, + request_id=request_id, + timeout_s=NO_STORE_OBSERVATION_S, + ) + assert sent_event is None, ( + f"Unexpected MessageSentEvent within {NO_STORE_OBSERVATION_S}s " + f"when relay peer has store=false.\n" + f"Sent event: {sent_event}\n" + f"Collected events: {sender_collector.events}" + ) + + # Regression guard: current behavior must NOT convert "no store + # reachable" into an immediate terminal error. If a future change + # starts emitting one, this assertion will catch it. + error_event = wait_for_error( + collector=sender_collector, + request_id=request_id, + timeout_s=0, + ) + assert error_event is None, ( + f"Unexpected terminal error event when no store peer is reachable. " + f"S23 expects silent behavior (Propagated only).\n" + f"Error event: {error_event}\n" + f"Collected events: {sender_collector.events}" + ) + + assert_event_invariants(sender_collector, request_id) + + def test_s24_ephemeral_message_with_reachable_store(self, node_config): + """ + S24: ephemeral message, reliability enabled, reachable store peer. + - Setup: propagation path exists, relay peer has store=True (reachable), + - Expected: Ok(RequestId), Propagated event only, no Sent event. + """ + + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "store": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + "reliabilityEnabled": True, + } + ) + + sender_result = WrapperManager.create_and_start( + config=node_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender_node: + relay_config = { + **node_config, + "staticnodes": [get_node_multiaddr(sender_node)], + "portsShift": 1, + "store": True, + } + + relay_result = WrapperManager.create_and_start(config=relay_config) + assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" + + with relay_result.ok_value: + message = create_message_bindings(ephemeral=True) + send_result = sender_node.send_message(message=message) + assert send_result.is_ok(), f"send() must return Ok(RequestId), got: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + propagated_event = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated_event is not None, ( + f"No MessagePropagatedEvent received within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + + sent_event = wait_for_sent( + collector=sender_collector, + request_id=request_id, + timeout_s=NO_STORE_OBSERVATION_S, + ) + assert sent_event is None, ( + f"Unexpected MessageSentEvent for an ephemeral message. " + f"Ephemeral messages must never be store-validated.\n" + f"Sent event: {sent_event}\n" + f"Collected events: {sender_collector.events}" + ) + + assert_event_invariants(sender_collector, request_id) + + def test_s26_lightpush_peer_churn_alternate_remains(self, node_config): + """ + S26: multiple lightpush peers, the selected one disappears, + an alternate remains. + + Topology (3 peers + sender): + - peer1: relay + lightpush. The lightpush server initially selected + by the sender. Stopped mid-test to simulate churn. + - relay_peer: relay-only. Kept alive throughout the test as a + stable gossipsub mesh neighbour, so that after peer1 disappears + peer2 still has a relay path to propagate the message. + - peer2: relay + lightpush. The surviving lightpush server that + must take over once peer1 is gone. + - sender: edge node with peer1 and peer2 as static lightpush peers. + """ + sender_collector = EventCollector() + peer1_config = { + **node_config, + "relay": True, + "lightpush": True, + "store": False, + "filter": False, + "discv5Discovery": True, + "numShardsInNetwork": 1, + "portsShift": 1, + "discv5UdpPort": free_port(), + } + peer1_result = WrapperManager.create_and_start(config=peer1_config) + assert peer1_result.is_ok(), f"Failed to start lightpush peer1: {peer1_result.err()}" + peer1 = peer1_result.ok_value + + relay_config = { + **node_config, + "relay": True, + "lightpush": False, + "store": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + "portsShift": 4, + } + + relay_result = WrapperManager.create_and_start(config=relay_config) + assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" + + with relay_result.ok_value as relay_peer: + peer2_config = { + **peer1_config, + "staticnodes": [ + get_node_multiaddr(peer1), + get_node_multiaddr(relay_peer), + ], + "portsShift": 2, + "discv5UdpPort": free_port(), + } + + peer2_result = WrapperManager.create_and_start(config=peer2_config) + assert peer2_result.is_ok(), f"Failed to start lightpush peer2: {peer2_result.err()}" + + with peer2_result.ok_value as peer2: + sender_config = { + **node_config, + "mode": "Edge", + "relay": True, + "lightpush": True, + "store": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + "portsShift": 3, + "staticnodes": [ + get_node_multiaddr(peer1), + get_node_multiaddr(peer2), + ], + } + + sender_result = WrapperManager.create_and_start( + config=sender_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender_node: + delay(2) + stop_result = peer1.stop_and_destroy() + assert stop_result.is_ok(), f"Failed to stop peer1: {stop_result.err()}" + delay(2) + + message = create_message_bindings() + send_result = sender_node.send_message(message=message) + assert send_result.is_ok(), f"send() must return Ok(RequestId) during peer churn, got: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + # Expect Propagated via the surviving lightpush peer (peer2). + propagated_event = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated_event is not None, ( + f"No MessagePropagatedEvent within {PROPAGATED_TIMEOUT_S}s " + f"after the selected lightpush peer disappeared. " + f"Collected events: {sender_collector.events}" + ) + + error_event = wait_for_error( + collector=sender_collector, + request_id=request_id, + timeout_s=0, + ) + assert error_event is None, f"Unexpected message_error event during peer churn: {error_event}" + + assert_event_invariants(sender_collector, request_id) + + def test_s30_concurrent_sends_during_auto_subscribe(self, node_config): + """ + S30: concurrent sends on the same content topic during initial auto-subscribe. + - Sender starts unsubscribed to the target topic. + - Several send() calls are issued at nearly the same time. + - Each call must return Ok(RequestId) with a unique id. + - Each request id must get its own propagated event, + with no dropped or cross-associated events. + """ + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "store": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + ) + + sender_result = WrapperManager.create_and_start( + config=node_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender_node: + # Relay peer so the sender has a propagation path. + relay_config = { + **node_config, + "staticnodes": [get_node_multiaddr(sender_node)], + "portsShift": 1, + } + + relay_result = WrapperManager.create_and_start(config=relay_config) + assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" + + with relay_result.ok_value: + # Build one message per send, with distinct payloads so we can + # detect any cross-association between request ids and events. + messages = [ + create_message_bindings( + contentTopic=S30_CONTENT_TOPIC, + payload=to_base64(f"s30-concurrent-{i}"), + ) + for i in range(S30_CONCURRENT_SENDS) + ] + + # Fire all sends concurrently. The sender is not yet subscribed + # to S30_CONTENT_TOPIC, so this exercises the auto-subscribe path + # under contention. + with ThreadPoolExecutor(max_workers=S30_CONCURRENT_SENDS) as pool: + send_results = list(pool.map(sender_node.send_message, messages)) + + # Every send must return Ok(RequestId). + request_ids = [] + for i, send_result in enumerate(send_results): + assert send_result.is_ok(), f"Concurrent send #{i} failed: {send_result.err()}" + request_id = send_result.ok_value + assert request_id, f"Concurrent send #{i} returned an empty RequestId" + request_ids.append(request_id) + + # Request ids must be unique across concurrent sends. + assert len(set(request_ids)) == len(request_ids), f"Duplicate RequestIds returned by concurrent sends: {request_ids}" + + # Each request id must get its own propagated event and no error. + for request_id in request_ids: + propagated_event = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated_event is not None, ( + f"No MessagePropagatedEvent for request_id={request_id} " + f"within {PROPAGATED_TIMEOUT_S}s. " + f"Collected events: {sender_collector.events}" + ) + + error_event = wait_for_error( + collector=sender_collector, + request_id=request_id, + timeout_s=0, + ) + assert error_event is None, f"Unexpected message_error for request_id={request_id}: {error_event}" + + # Cross-association guard: every event with a requestId must + # belong to exactly one of the request ids we issued. + issued = set(request_ids) + for event in sender_collector.snapshot(): + event_request_id = event.get("requestId") + if event_request_id is None: + continue + assert event_request_id in issued, ( + f"Event carries an unknown requestId={event_request_id!r}, " f"not in issued set {issued}. Event: {event}" + ) + + # Per-request invariants apply to every concurrent send + # (correct requestId, no duplicate terminal events, + # Sent never before Propagated). + for request_id in request_ids: + assert_event_invariants(sender_collector, request_id) + + @pytest.mark.docker_required + def test_s31_concurrent_sends_mixed_topics_during_churn(self, node_config): + """ + S31: concurrent sends across mixed content topics during peer churn. + """ + sender_collector = EventCollector() + + relay_peer = WakuNode(NODE_2, f"s31_relay_peer_{self.test_id}") + relay_peer.start(relay="true", discv5_discovery="false") + relay_peer.set_relay_subscriptions([self.test_pubsub_topic]) + + lightpush_peer = WakuNode(NODE_2, f"s31_lightpush_peer_{self.test_id}") + lightpush_peer.start(relay="true", lightpush="true", discv5_discovery="false") + lightpush_peer.set_relay_subscriptions([self.test_pubsub_topic]) + + store_peer = WakuNode(NODE_2, f"s31_store_peer_{self.test_id}") + store_peer.start(relay="true", store="true", discv5_discovery="false") + store_peer.set_relay_subscriptions([self.test_pubsub_topic]) + + churn_peers = [relay_peer, lightpush_peer, store_peer] + + # Mesh docker peers so a lightpushed message can fan out to the store peer. + peer_multiaddrs = [p.get_multiaddr_with_id() for p in churn_peers] + for peer in churn_peers: + others = [a for a in peer_multiaddrs if a != peer.get_multiaddr_with_id()] + peer.add_peers(others) + + node_config.update( + { + "mode": "Edge", + "relay": True, + "lightpush": True, + "store": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + "lightpushnode": lightpush_peer.get_multiaddr_with_id(), + } + ) + + sender_result = WrapperManager.create_and_start( + config=node_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender_node: + sender_multiaddr = get_node_multiaddr(sender_node) + for peer in churn_peers: + peer.add_peers([sender_multiaddr]) + delay(3) # let docker peers connect to the sender + + all_request_ids: list[str] = [] + phase1_ids = self._s31_fire_burst(sender_node, phase_label="phase1") + all_request_ids.extend(phase1_ids) + + for peer in churn_peers: + peer.restart() + delay(1) # small window so the restart is actually in-flight + phase2_ids = self._s31_fire_burst(sender_node, phase_label="phase2") + all_request_ids.extend(phase2_ids) + + # Wait for all peers to be ready again and re-attach the sender. + for peer in churn_peers: + peer.ensure_ready(timeout_duration=20) + peer.add_peers([sender_multiaddr]) + + peer_multiaddrs = [p.get_multiaddr_with_id() for p in churn_peers] + for peer in churn_peers: + others = [a for a in peer_multiaddrs if a != peer.get_multiaddr_with_id()] + peer.add_peers(others) + delay(3) + + phase3_ids = self._s31_fire_burst(sender_node, phase_label="phase3") + all_request_ids.extend(phase3_ids) + + assert len(set(all_request_ids)) == len(all_request_ids), f"Duplicate RequestIds across bursts: {all_request_ids}" + + # Phase 1 ran before any churn, so the mesh was stable — standard timeout. + # Phase 3 ran right after restart + re-attach, so the mesh needed to + # re-stabilize — use the recovery timeout to avoid CI flakiness. + phase_timeouts = [ + (phase1_ids, PROPAGATED_TIMEOUT_S), + (phase3_ids, RECOVERY_TIMEOUT_S), + ] + for request_ids, timeout_s in phase_timeouts: + for request_id in request_ids: + propagated_event = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=timeout_s, + ) + assert propagated_event is not None, ( + f"No MessagePropagatedEvent for stable-phase " + f"request_id={request_id} within {timeout_s}s. " + f"Collected events: {sender_collector.events}" + ) + + error_event = wait_for_error( + collector=sender_collector, + request_id=request_id, + timeout_s=0, + ) + assert error_event is None, f"Unexpected message_error event for stable-phase " f"request_id={request_id}: {error_event}" + + for request_id in phase2_ids: + error_event = wait_for_error( + collector=sender_collector, + request_id=request_id, + timeout_s=0, + ) + assert error_event is None, f"Unexpected terminal message_error for phase-2 " f"request_id={request_id} after recovery: {error_event}" + + issued = set(all_request_ids) + for event in sender_collector.snapshot(): + event_request_id = event.get("requestId") + if event_request_id is None: + continue + assert event_request_id in issued, ( + f"Event carries an unknown requestId={event_request_id!r}, " f"not in issued set {issued}. Event: {event}" + ) + + # Use the hash the wrapper emitted on message_sent so the store + # lookup matches the exact bytes that were actually published. + phase3_hashes = [] + for request_id in phase3_ids: + sent_event = wait_for_sent( + collector=sender_collector, + request_id=request_id, + timeout_s=RECOVERY_TIMEOUT_S, + ) + assert sent_event is not None, ( + f"No message_sent event for phase-3 request_id={request_id} " + f"within {RECOVERY_TIMEOUT_S}s. Collected events: {sender_collector.events}" + ) + msg_hash = sent_event.get("messageHash") + assert msg_hash, f"message_sent event missing messageHash: {sent_event}" + phase3_hashes.append(msg_hash) + + # 3 phases × S31_BURST_SIZE messages, so the page must fit them all, + # otherwise phase-3 hashes (which sort last in ascending order) get cut off. + self.check_sent_message_is_stored( + expected_hashes=phase3_hashes, + store_node=store_peer, + pubsub_topic=self.test_pubsub_topic, + page_size=S31_BURST_SIZE * 3, + ascending="true", + ) + + # Per-request invariants apply across all phases, including the + # retry-path bursts (phase 2). If retries ever emit duplicate + # Propagated events or reorder Sent before Propagated, this catches it. + for request_id in all_request_ids: + assert_event_invariants(sender_collector, request_id) + + def _s31_fire_burst(self, sender_node, *, phase_label: str) -> list[str]: + """Fire S31_BURST_SIZE concurrent sends, one per topic in S31_CONTENT_TOPICS. + Returns the list of RequestIds. Asserts every send returned Ok.""" + messages = [ + self.create_message( + contentTopic=S31_CONTENT_TOPICS[i], + payload=to_base64(f"s31-{phase_label}-{i}"), + ) + for i in range(S31_BURST_SIZE) + ] + + with ThreadPoolExecutor(max_workers=S31_BURST_SIZE) as pool: + send_results = list(pool.map(sender_node.send_message, messages)) + + request_ids = [] + for i, send_result in enumerate(send_results): + assert send_result.is_ok(), f"{phase_label}: concurrent send #{i} failed: {send_result.err()}" + request_id = send_result.ok_value + assert request_id, f"{phase_label}: concurrent send #{i} returned an empty RequestId" + request_ids.append(request_id) + + return request_ids diff --git a/tests/wrappers_tests/test_send_e2e_part2.py b/tests/wrappers_tests/test_send_e2e_part2.py new file mode 100644 index 000000000..f94a89829 --- /dev/null +++ b/tests/wrappers_tests/test_send_e2e_part2.py @@ -0,0 +1,700 @@ +import base64 +import pytest +from src.steps.common import StepsCommon +from src.libs.common import delay, to_base64 +from src.libs.custom_logger import get_custom_logger +from src.node.wrappers_manager import WrapperManager +from src.node.wrapper_helpers import ( + EventCollector, + assert_event_invariants, + create_message_bindings, + get_node_multiaddr, + wait_for_connected, + wait_for_propagated, + wait_for_sent, + wait_for_error, +) +from tests.wrappers_tests.conftest import build_node_config + +logger = get_custom_logger(__name__) + +PROPAGATED_TIMEOUT_S = 30.0 +SENT_TIMEOUT_S = 10.0 +NO_SENT_OBSERVATION_S = 5.0 +SENT_AFTER_STORE_TIMEOUT_S = 60.0 +OVERSIZED_PAYLOAD_BYTES = 200 * 1024 +RECOVERY_TIMEOUT_S = 45.0 +SERVICE_DOWN_SETTLE_S = 3.0 + +# MaxTimeInCache from send_service.nim. +MAX_TIME_IN_CACHE_S = 60.0 +# Extra slack to cover the background retry loop tick after the window expires. +CACHE_EXPIRY_SLACK_S = 10.0 +ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S = MAX_TIME_IN_CACHE_S + CACHE_EXPIRY_SLACK_S +RETRY_WINDOW_EXPIRED_MSG = "Unable to send within retry time window" + + +class TestS02AutoSubscribeOnFirstSend(StepsCommon): + """ + S02 — Auto-subscribe on first send. + Sender never calls subscribe_content_topic() before send(). + The send API must auto-subscribe to the content topic used in the message. + Expected: send() returns Ok(RequestId), message_propagated arrives. + """ + + def test_s02_send_without_explicit_subscribe(self, node_config): + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "store": False, + "lightpush": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + ) + + sender_result = WrapperManager.create_and_start( + config=node_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender: + peer_config = { + **node_config, + "staticnodes": [get_node_multiaddr(sender)], + "portsShift": 1, + } + + peer_result = WrapperManager.create_and_start(config=peer_config) + assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}" + + with peer_result.ok_value: + assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state" + + message = create_message_bindings( + payload=to_base64("S02 auto-subscribe test payload"), + contentTopic="/test/1/s02-auto-subscribe/proto", + ) + + send_result = sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error event: {error}" + + +class TestS06CoreSenderRelayOnly(StepsCommon): + """ + S06 — Core sender with relay peers only, no store. + Sender has local relay enabled and is connected to one relay peer. + Expected: send() returns Ok(RequestId), message_propagated event arrives, + no message_sent (store disabled), no message_error. + """ + + def test_s06_relay_propagation_without_store(self, node_config): + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "store": False, + "lightpush": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + "reliabilityEnabled": True, + } + ) + + sender_result = WrapperManager.create_and_start( + config=node_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender: + peer_config = { + **node_config, + "staticnodes": [get_node_multiaddr(sender)], + "portsShift": 1, + } + + peer_result = WrapperManager.create_and_start(config=peer_config) + assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}" + + with peer_result.ok_value: + assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state" + + message = create_message_bindings( + payload=to_base64("S06 relay-only test payload"), + contentTopic="/test/1/s06-relay-only/proto", + ) + + send_result = sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error event: {error}" + + sent = wait_for_sent(sender_collector, request_id, timeout_s=0) + assert sent is None, f"Unexpected message_sent event (store is disabled): {sent}" + + assert_event_invariants(sender_collector, request_id) + + +class TestS07CoreSenderRelayAndStore(StepsCommon): + """ + S07 — Core sender with relay peers and store peer, reliability enabled. + Sender relays message to a store-capable peer; delivery service validates + the message reached the store via p2p reliability check. + Expected: Propagated, then Sent. + """ + + def test_s07_relay_propagation_with_store_validation(self, node_config): + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "store": False, + "lightpush": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + "reliabilityEnabled": True, + } + ) + + sender_result = WrapperManager.create_and_start( + config=node_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender: + peer_config = { + **node_config, + "staticnodes": [get_node_multiaddr(sender)], + "portsShift": 1, + "store": True, + } + + peer_result = WrapperManager.create_and_start(config=peer_config) + assert peer_result.is_ok(), f"Failed to start store peer: {peer_result.err()}" + + with peer_result.ok_value: + message = create_message_bindings( + payload=to_base64("S07 relay+store test payload"), + contentTopic="/test/1/s07-relay-store/proto", + ) + + send_result = sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + sent = wait_for_sent( + collector=sender_collector, + request_id=request_id, + timeout_s=SENT_TIMEOUT_S, + ) + assert sent is not None, ( + f"No message_sent event within {SENT_TIMEOUT_S}s after propagation. " f"Collected events: {sender_collector.events}" + ) + assert sent["requestId"] == request_id + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error event: {error}" + + assert_event_invariants(sender_collector, request_id) + + +class TestRelayToLightpushFallback(StepsCommon): + """S08/S09 — Relay-to-lightpush fallback. + + Sender has relay enabled but zero gossipsub relay peers. + A lightpush peer is reachable via lightpushnode (no staticnodes). + Relay fails with NO_PEERS_TO_RELAY, lightpush fallback succeeds + in the same processing pass. + + Topology: + [Service] relay=True, lightpush=True + [RelayPeer] relay=True, staticnodes=[service] (gives service gossipsub mesh) + [Sender] relay=True, lightpush=True, lightpushnode=service + (no staticnodes → zero gossipsub relay peers → fallback) + """ + + @pytest.mark.xfail(reason="the test fail without lightpushnode, see https://github.com/logos-messaging/logos-delivery/issues/3847") + def test_s08_relay_fallback_to_lightpush(self, node_config): + """S08: no store peer → Propagated only.""" + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "lightpush": True, + "store": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + ) + + service_result = WrapperManager.create_and_start(config=node_config) + assert service_result.is_ok(), f"Failed to start service: {service_result.err()}" + + with service_result.ok_value as service: + service_addr = get_node_multiaddr(service) + + relay_config = { + **node_config, + "lightpush": False, + "staticnodes": [service_addr], + "portsShift": 1, + } + relay_result = WrapperManager.create_and_start(config=relay_config) + assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" + + with relay_result.ok_value: + sender_config = { + **node_config, + # "lightpushnode": service_addr, #this comment currently raise issue + "portsShift": 2, + "discv5Discovery": True, + } + sender_result = WrapperManager.create_and_start( + config=sender_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender: + message = create_message_bindings() + send_result = sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error event: {error}" + + sent = wait_for_sent(sender_collector, request_id, timeout_s=0) + assert sent is None, f"Unexpected message_sent event (no store peer): {sent}" + + assert_event_invariants(sender_collector, request_id) + + def test_s09_relay_fallback_to_lightpush_with_store_validation(self, node_config): + """S09: S08 + store peer + reliability → Propagated, then Sent.""" + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "lightpush": True, + "store": True, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + ) + + service_result = WrapperManager.create_and_start(config=node_config) + assert service_result.is_ok(), f"Failed to start service: {service_result.err()}" + + with service_result.ok_value as service: + service_addr = get_node_multiaddr(service) + + relay_config = { + **node_config, + "lightpush": False, + "store": False, + "staticnodes": [service_addr], + "portsShift": 1, + } + relay_result = WrapperManager.create_and_start(config=relay_config) + assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" + + with relay_result.ok_value: + sender_config = {**node_config, "reliabilityEnabled": True, "storenode": service_addr, "portsShift": 2, "store": False} + sender_result = WrapperManager.create_and_start( + config=sender_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender: + message = create_message_bindings() + send_result = sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + sent = wait_for_sent( + collector=sender_collector, + request_id=request_id, + timeout_s=SENT_AFTER_STORE_TIMEOUT_S, + ) + assert sent is not None, ( + f"No message_sent event within {SENT_AFTER_STORE_TIMEOUT_S}s " + f"after propagation. Collected events: {sender_collector.events}" + ) + assert sent["requestId"] == request_id + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error event: {error}" + + assert_event_invariants(sender_collector, request_id) + + +class TestS10EdgeSenderLightpushOnly(StepsCommon): + """ + S10 — Edge sender with lightpush path only, no store peer. + Edge sender has no local relay; it publishes via a lightpush service node. + Expected: Propagated only (no Sent, no Error). + """ + + @pytest.mark.xfail(reason="lightpush peer discovery via staticnodes is broken, see https://github.com/logos-messaging/logos-delivery/issues/3847") + def test_s10_edge_lightpush_propagation(self, node_config): + sender_collector = EventCollector() + + common = { + "store": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + + service_config = build_node_config(relay=True, lightpush=True, **common) + + service_result = WrapperManager.create_and_start(config=service_config) + assert service_result.is_ok(), f"Failed to start service node: {service_result.err()}" + + with service_result.ok_value as service_node: + service_multiaddr = get_node_multiaddr(service_node) + + relay_config = build_node_config( + relay=True, + staticnodes=[service_multiaddr], + **common, + ) + + relay_result = WrapperManager.create_and_start(config=relay_config) + assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" + + with relay_result.ok_value: + edge_config = build_node_config( + mode="Edge", + staticnodes=[service_multiaddr], + **common, + ) + + edge_result = WrapperManager.create_and_start( + config=edge_config, + event_cb=sender_collector.event_callback, + ) + assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}" + + with edge_result.ok_value as edge_sender: + message = create_message_bindings( + payload=to_base64("S10 edge lightpush test payload"), + contentTopic="/test/1/s10-edge-lightpush/proto", + ) + + send_result = edge_sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + sent = wait_for_sent(sender_collector, request_id, timeout_s=NO_SENT_OBSERVATION_S) + assert sent is None, f"Unexpected message_sent event (no store peer): {sent}" + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error event: {error}" + + assert_event_invariants(sender_collector, request_id) + + +class TestS12IsolatedSenderNoPeers(StepsCommon): + """ + S12 — Isolated sender, no peers. + Sender has relay enabled but zero relay peers and zero lightpush peers. + Expected: send() returns Ok(RequestId), but eventually a message_error + event arrives (no route to propagate). + """ + + def test_s12_send_with_no_peers_produces_error(self, node_config): + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "store": False, + "lightpush": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + ) + + sender_result = WrapperManager.create_and_start( + config=node_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender: + message = create_message_bindings( + payload=to_base64("S12 isolated sender payload"), + contentTopic="/test/1/s12-isolated/proto", + ) + + send_result = sender.send_message(message=message) + assert send_result.is_ok(), f"send() must return Ok(RequestId) even with no peers, got: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + error = wait_for_error( + collector=sender_collector, + request_id=request_id, + timeout_s=ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S, + ) + assert error is not None, ( + f"No message_error event within {ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S}s " + f"(MaxTimeInCache={MAX_TIME_IN_CACHE_S}s + slack) for isolated sender. " + f"Collected events: {sender_collector.events}" + ) + assert error["requestId"] == request_id + + propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0) + assert propagated is None, f"Unexpected message_propagated event for isolated sender: {propagated}" + + +class TestS14LightpushNonRetryableError(StepsCommon): + """ + S14 — Lightpush non-retryable error via oversized message. + Edge sender publishes a message exceeding DefaultMaxWakuMessageSize (150KiB) + through a lightpush service node. The server validates message size and + returns INVALID_MESSAGE (420), a non-retryable error. + Expected: send() returns Ok(RequestId), then message_error event. + """ + + def test_s14_oversized_message_triggers_error(self): + sender_collector = EventCollector() + + common = { + "store": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + + service_config = build_node_config(relay=True, lightpush=True, **common) + service_result = WrapperManager.create_and_start(config=service_config) + assert service_result.is_ok(), f"Failed to start service: {service_result.err()}" + + with service_result.ok_value as service: + service_multiaddr = get_node_multiaddr(service) + + edge_config = build_node_config( + mode="Edge", + staticnodes=[service_multiaddr], + **common, + ) + edge_result = WrapperManager.create_and_start( + config=edge_config, + event_cb=sender_collector.event_callback, + ) + assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}" + + with edge_result.ok_value as edge_sender: + oversized_payload = base64.b64encode(b"x" * OVERSIZED_PAYLOAD_BYTES).decode() + message = create_message_bindings( + payload=oversized_payload, + contentTopic="/test/1/s14-oversized/proto", + ) + + send_result = edge_sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + error = wait_for_error( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert error is not None, ( + f"No message_error event within {PROPAGATED_TIMEOUT_S}s " + f"after sending oversized message. " + f"Collected events: {sender_collector.events}" + ) + assert error["requestId"] == request_id + logger.info(f"S14 received error event: {error}") + + error_msg = error.get("error", "").lower() + assert "size exceeded" in error_msg, f"Error message doesn't indicate size violation: {error}" + + propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0) + assert propagated is None, f"Unexpected message_propagated for an invalid message: {propagated}" + + assert_event_invariants(sender_collector, request_id) + + +class TestS15LightpushRetryableErrorRecovery(StepsCommon): + """ + S15 — Lightpush retryable error + recovery. + Edge sender publishes via a lightpush service node that has NO relay peers. + The service accepts the lightpush request but returns NO_PEERS_TO_RELAY — + a retryable error (explicitly listed in the S15 spec). The message enters + the retry loop. A relay peer then joins the service node, and the next + retry succeeds. + Expected: send() returns Ok(RequestId), then eventually Propagated. + """ + + @pytest.mark.xfail(reason="lightpush peer discovery via staticnodes is broken, see https://github.com/logos-messaging/logos-delivery/issues/3847") + def test_s15_lightpush_retryable_error_then_recovery(self): + sender_collector = EventCollector() + + common = { + "store": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + + service_config = build_node_config(relay=True, lightpush=True, **common) + service_result = WrapperManager.create_and_start(config=service_config) + assert service_result.is_ok(), f"Failed to start service: {service_result.err()}" + + with service_result.ok_value as service: + service_multiaddr = get_node_multiaddr(service) + + edge_config = build_node_config( + mode="Edge", + staticnodes=[service_multiaddr], + **common, + ) + edge_result = WrapperManager.create_and_start( + config=edge_config, + event_cb=sender_collector.event_callback, + ) + assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}" + + with edge_result.ok_value as edge_sender: + msg = create_message_bindings( + payload=to_base64("S15 retryable error recovery"), + contentTopic="/test/1/s15-recovery/proto", + ) + send_result = edge_sender.send_message(message=msg) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + delay(SERVICE_DOWN_SETTLE_S) + + early_propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0) + assert early_propagated is None, ( + f"message_propagated arrived before relay peer joined — " f"retryable error path was not exercised: {early_propagated}" + ) + + relay_config = build_node_config( + relay=True, + staticnodes=[service_multiaddr], + **common, + ) + relay_result = WrapperManager.create_and_start(config=relay_config) + assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" + + with relay_result.ok_value: + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=RECOVERY_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated within {RECOVERY_TIMEOUT_S}s " + f"after relay peer joined. " + f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error after recovery: {error}" + + assert_event_invariants(sender_collector, request_id)