commit 890ea07b851a4de10f106b18bb3b81f7cc84a737 Author: Egor Rachkovskii Date: Thu Apr 30 14:49:12 2026 +0100 Initialize `logos-integration-test-framework` package with pytest plugins, event-wait helpers, and CI setup. diff --git a/.github/workflows/unit.yml b/.github/workflows/unit.yml new file mode 100644 index 0000000..a11b3cd --- /dev/null +++ b/.github/workflows/unit.yml @@ -0,0 +1,34 @@ +name: unit + +on: + pull_request: + push: + branches: [main, master] + +concurrency: + group: unit-${{ github.ref }} + cancel-in-progress: true + +jobs: + unit: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.11", "3.12"] + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + cache: pip + - name: Install + run: | + python -m pip install --upgrade pip + pip install -e '.[dev]' + - name: Ruff + run: ruff check src tests + - name: Mypy + run: mypy src tests + - name: Pytest (unit only) + run: pytest tests/unit -q diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5938219 --- /dev/null +++ b/.gitignore @@ -0,0 +1,18 @@ +__pycache__/ +*.py[cod] +*.egg-info/ +.eggs/ +build/ +dist/ +.venv/ +venv/ +.env +.pytest_cache/ +.ruff_cache/ +.mypy_cache/ +.coverage +htmlcov/ +.idea/ +.vscode/ +*.swp +.DS_Store diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..04d6afb --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,16 @@ +repos: + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.5.7 + hooks: + - id: ruff + args: [--fix] + - id: ruff-format + + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.6.0 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: check-yaml + - id: check-toml + - id: check-added-large-files diff --git a/.python-version b/.python-version new file mode 100644 index 0000000..e4fba21 --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.12 diff --git a/README.md b/README.md new file mode 100644 index 0000000..e28ff04 --- /dev/null +++ b/README.md @@ -0,0 +1,100 @@ +# logos-integration-test-framework + +Pytest plugin + helpers for writing integration tests against a `logoscore` daemon. Built on top of [`logos-co/logos-logoscore-py`](https://github.com/logos-co/logos-logoscore-py). + +## What's in the box + +``` +src/logos_integration_test_framework/ +├── __init__.py # exports: subscribe, wait_for_event, Waiter, EventTimeout +├── waits.py # Queue-backed adapters over LogoscoreClient.on_event +└── fixtures.py # pytest fixtures: local_daemon, local_client, docker_daemon, docker_client + # (auto-loaded via pytest11 entry-point — no import needed) +``` + +That's the whole package. Client / transport / topology layers live upstream in `logoscore-py` — don't reimplement them. + +## Install (consumer-side) + +In your module's test repo (e.g. `logos-chat-module/tests/integration/`): + +```toml +# pyproject.toml +[project.optional-dependencies] +test = [ + "logos-integration-test-framework @ git+https://github.com/logos-co/integration-tests.git@", + "pytest>=8.0", +] +``` + +Pin a commit SHA, not a branch. Then `pip install -e '.[test]'` and the four daemon/client fixtures are immediately available in your tests — no `pytest_plugins` declaration needed. + +## Writing a test (consumer) + +Open the subscription **before** triggering the action — the upstream `logoscore watch` subprocess takes a moment to come live, and events fired before that window are lost. A short sleep or a known-pumped sentinel event is enough. + +```python +import time + +from logos_integration_test_framework import subscribe + +def test_my_module(local_client): # local_client comes from the auto-loaded plugin + local_client.load_module("my_module") + with subscribe(local_client, "my_module", "DoneEvent") as w: + time.sleep(0.3) # let the watcher come live + request_id = local_client.call("my_module", "do_something", "arg") + event = w.next( + predicate=lambda e: e["data"][0] == request_id, + timeout=10.0, + ) + assert event["event"] == "DoneEvent" +``` + +Multiple waits in one test — re-use the same subscription: + +```python +def test_two_waits(local_client): + with subscribe(local_client, "my_module") as w: + time.sleep(0.3) + local_client.call("my_module", "fire", "first") + first = w.next(lambda e: e["data"][0] == 1, timeout=5.0) + local_client.call("my_module", "fire", "second") + second = w.next(lambda e: e["data"][0] == 2, timeout=5.0) + assert first != second +``` + +Predicate exceptions surface on the test thread (unlike exceptions raised inside an upstream `on_event` callback, which the watcher's pump catches and routes to `error_callback`). + +The one-shot `wait_for_event(...)` is a convenience for cases where the trigger has *already* happened and the event is still in flight; for the more common subscribe-then-trigger pattern, use `subscribe(...)` directly. + +## Fixtures + +| Fixture | Scope | Provides | Skip condition | +|---|---|---|---| +| `local_daemon` | module | `logoscore.LogoscoreDaemon` (local subprocess) | `logoscore` not on `PATH` or `LOGOS_MODULES_DIR` unset / missing | +| `local_client` | function | `LogoscoreClient` from `local_daemon` | (inherits) | +| `docker_daemon` | module | `logoscore.LogoscoreDockerDaemon` (containerised) | `docker` CLI absent, image absent, `LOGOSCORE_IMAGE`/`LOGOS_MODULES_DIR` unset | +| `docker_client` | function | `LogoscoreClient` from `docker_daemon` | (inherits) | + +Default scopes are `module` for daemons (function-scope is too slow given upstream's `startup_timeout=15.0`; session-scope hides cross-test leaks) and `function` for clients (`client.stop()` would otherwise poison every later test in the module). Override per-test with `pytest.fixture(scope=...)` if needed. + +The `docker_*` fixtures need the `docker` CLI on `PATH` — upstream `LogoscoreDockerDaemon` shells out to it (it does not use docker-py). The fixture skips automatically if the binary is absent. + +To opt out of the auto-loaded plugin in a particular run: `pytest -p no:logos_integration_test_framework`. + +## Contributing to this repo + +```bash +python -m venv .venv && source .venv/bin/activate +pip install -e '.[dev]' # ← required: registers the pytest11 entry-point +pytest tests/unit -q # 13 tests: surface check + waits.py +``` + +`pip install -e '.[dev]'` is **required** before `pytest` — otherwise the entry-point isn't registered and the smoke test (`tests/test_smoke.py`) won't see the `local_*` fixtures. CI does this automatically. + +Lint / type / unit runs are gated in `.github/workflows/unit.yml` (Python 3.11 + 3.12). Direct pushes to `master` are rejected by branch-protection rules; merge via PR with green CI. + +## Related + +- [`logos-co/logos-logoscore-py`](https://github.com/logos-co/logos-logoscore-py) — the Python wrapper this builds on (`LogoscoreDaemon` / `LogoscoreDockerDaemon` / `LogoscoreClient`). +- [`logos-co/logos-test-framework`](https://github.com/logos-co/logos-test-framework) — **different layer**: a C++ unit-test framework for module internals (gtest-style, link-time mock substitution). No overlap with this Python package. diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..23b765a --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,66 @@ +[build-system] +requires = ["hatchling>=1.24"] +build-backend = "hatchling.build" + +[project] +name = "logos-integration-test-framework" +version = "0.2.0" +description = "pytest infrastructure for writing logoscore integration tests" +readme = "README.md" +# Deliberate tightening: upstream logoscore allows >=3.10. Don't relax without +# auditing for `Self` / pattern-match / debug-f-string usage. +requires-python = ">=3.11" +license = { text = "MIT OR Apache-2.0" } +authors = [{ name = "IFT QA" }] +dependencies = [ + # logoscore is not on PyPI. Pinned to PR #1 head SHA, not the branch name, + # so an upstream force-push can't silently change what we install. + # Re-pin to `@master` after https://github.com/logos-co/logos-logoscore-py/pull/1 merges. + "logoscore @ git+https://github.com/logos-co/logos-logoscore-py.git@6e7155733d90a66d117211c33322594e1b50e742", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0", + "ruff>=0.5", + "mypy>=1.10", + "pre-commit>=3.7", +] + +# Auto-load the fixtures module as a pytest plugin. After +# `pip install logos-integration-test-framework`, fixtures +# `local_daemon` / `local_client` / `docker_daemon` / `docker_client` +# are available without `pytest_plugins` declarations. +[project.entry-points.pytest11] +logos_integration_test_framework = "logos_integration_test_framework.fixtures" + +[tool.hatch.metadata] +# logoscore is installed via `git+https://...` — a direct reference. +allow-direct-references = true + +[tool.hatch.build.targets.wheel] +packages = ["src/logos_integration_test_framework"] + +[tool.pytest.ini_options] +testpaths = ["tests"] +addopts = "-ra --strict-markers" + +[tool.ruff] +line-length = 100 +target-version = "py311" +src = ["src", "tests"] + +[tool.ruff.lint] +select = ["E", "F", "I", "UP", "B", "SIM", "RUF"] +ignore = ["E501"] + +[tool.mypy] +python_version = "3.11" +strict = true +mypy_path = "src" + +# Upstream logoscore ships no py.typed marker — without this override, strict +# mode fails with import-untyped on first push. +[[tool.mypy.overrides]] +module = "logoscore.*" +ignore_missing_imports = true diff --git a/src/logos_integration_test_framework/__init__.py b/src/logos_integration_test_framework/__init__.py new file mode 100644 index 0000000..a6171de --- /dev/null +++ b/src/logos_integration_test_framework/__init__.py @@ -0,0 +1,17 @@ +"""pytest infrastructure for writing logoscore integration tests. + +Public surface: blocking event-wait helpers (`subscribe`, `wait_for_event`, +`Waiter`, `EventTimeout`). Daemon/client pytest fixtures live in +`logos_integration_test_framework.fixtures` and auto-load via the `pytest11` +entry-point — no explicit import needed in consumer test files. +""" + +from logos_integration_test_framework.waits import ( + EventTimeout, + Waiter, + subscribe, + wait_for_event, +) + +__all__ = ["EventTimeout", "Waiter", "subscribe", "wait_for_event"] +__version__ = "0.2.0" diff --git a/src/logos_integration_test_framework/fixtures.py b/src/logos_integration_test_framework/fixtures.py new file mode 100644 index 0000000..3971e62 --- /dev/null +++ b/src/logos_integration_test_framework/fixtures.py @@ -0,0 +1,83 @@ +"""Pytest fixtures wrapping `logoscore.LogoscoreDaemon` / `LogoscoreDockerDaemon`. + +Auto-loaded as a pytest plugin via the `pytest11` entry-point declared in +`pyproject.toml`. Any project that installs `logos-integration-test-framework` +gets these fixtures available without `pytest_plugins` declarations. + +Both daemon flavors auto-skip when their environmental requirements aren't +met (binary on PATH, image present, modules dir set, …) — there's no +in-package binary or image. Set `LOGOS_MODULES_DIR` (and `LOGOSCORE_IMAGE` +for the docker fixture) in CI/local env to enable. + +Daemons are `module`-scope (function-scope pays the upstream +`startup_timeout=15.0` per test; session-scope hides cross-test state leakage). +Clients are `function`-scope on top of the shared daemon — `client()` is a +cheap construction, but `client.stop()` shells out `logoscore stop` to the +daemon, which would otherwise poison every later test in the module. + +`logoscore` is imported **lazily** inside each fixture body. Many consumers +will only use `wait_for_event` and never touch a daemon fixture — eager +imports would charge them subprocess/Docker shim startup on every pytest run. +""" + +from __future__ import annotations + +import os +import shutil +from collections.abc import Iterator +from pathlib import Path +from typing import TYPE_CHECKING, Any + +import pytest + +if TYPE_CHECKING: + from logoscore import LogoscoreClient, LogoscoreDaemon, LogoscoreDockerDaemon + + +def _modules_dir_or_skip() -> Path: + raw = os.environ.get("LOGOS_MODULES_DIR") + if not raw: + pytest.skip("LOGOS_MODULES_DIR not set") + path = Path(raw) + if not path.is_dir(): + pytest.skip(f"LOGOS_MODULES_DIR={raw!r} is not an existing directory") + return path + + +@pytest.fixture(scope="module") +def local_daemon() -> Iterator[LogoscoreDaemon]: + if shutil.which("logoscore") is None: + pytest.skip("`logoscore` binary not on PATH") + modules_dir = _modules_dir_or_skip() + + from logoscore import LogoscoreDaemon + + with LogoscoreDaemon(modules_dir=modules_dir) as daemon: + yield daemon + + +@pytest.fixture +def local_client(local_daemon: LogoscoreDaemon) -> LogoscoreClient: + return local_daemon.client() + + +@pytest.fixture(scope="module") +def docker_daemon() -> Iterator[LogoscoreDockerDaemon]: + from logoscore import LogoscoreDockerDaemon, docker_available, image_present + + if not docker_available(): + pytest.skip("docker not available on host") + image = os.environ.get("LOGOSCORE_IMAGE") + if not image: + pytest.skip("LOGOSCORE_IMAGE not set") + if not image_present(image): + pytest.skip(f"docker image {image!r} not present locally") + modules_dir = _modules_dir_or_skip() + + with LogoscoreDockerDaemon(image=image, modules_dir=modules_dir) as daemon: + yield daemon + + +@pytest.fixture +def docker_client(docker_daemon: LogoscoreDockerDaemon) -> Any: + return docker_daemon.client(binary="logoscore") diff --git a/src/logos_integration_test_framework/waits.py b/src/logos_integration_test_framework/waits.py new file mode 100644 index 0000000..cfd6e5a --- /dev/null +++ b/src/logos_integration_test_framework/waits.py @@ -0,0 +1,134 @@ +"""Blocking event-wait helpers over `LogoscoreClient.on_event`. + +Upstream's `on_event` is callback-based and runs the callback on a background +thread. Pytest scenarios want blocking semantics ("wait for an event matching +predicate, with a timeout") and need exceptions raised in predicate to fail +the test loudly, not get swallowed by the daemon thread's exception handler. + +The pattern is: callback enqueues raw payloads; the test thread dequeues and +evaluates the predicate. Errors from the watcher subprocess are surfaced +through `error_callback` and re-raised on the next `.next()` call. +""" + +from __future__ import annotations + +import time +from collections.abc import Callable, Iterator +from contextlib import contextmanager +from queue import Empty, Queue +from typing import Any, Literal, Protocol + +__all__ = ["EventTimeout", "Predicate", "Waiter", "subscribe", "wait_for_event"] + +_QueueItem = ( + tuple[Literal["event"], dict[str, Any]] | tuple[Literal["error"], BaseException] +) + + +class _ClientLike(Protocol): + def on_event( + self, + module: str, + event: str | None, + callback: Callable[[dict[str, Any]], None], + *, + error_callback: Callable[[BaseException], None] | None = ..., + ) -> Any: ... + + +class EventTimeout(TimeoutError): + """Raised when no matching event arrived within `timeout`.""" + + +Predicate = Callable[[dict[str, Any]], bool] + + +class Waiter: + """Yielded by `subscribe()`. Call `.next()` repeatedly within one subscription.""" + + def __init__(self, queue: Queue[_QueueItem], module: str, event: str | None) -> None: + self._queue = queue + self._module = module + self._event = event + + def next( + self, + predicate: Predicate | None = None, + *, + timeout: float, + ) -> dict[str, Any]: + """Block until an event matching `predicate` arrives, or `timeout` elapses. + + Predicate runs on the test thread (after the dequeue) — exceptions raised + inside it propagate to the test, unlike exceptions raised inside the + upstream callback (which the watcher's `_pump` catches and routes through + `error_callback`). + """ + deadline = time.monotonic() + timeout + while True: + remaining = deadline - time.monotonic() + if remaining <= 0: + raise EventTimeout( + f"no matching event for {self._module}/{self._event} within {timeout}s" + ) + try: + slot = self._queue.get(timeout=remaining) + except Empty as exc: + raise EventTimeout( + f"no matching event for {self._module}/{self._event} within {timeout}s" + ) from exc + if slot[0] == "error": + raise slot[1] + payload = slot[1] + if predicate is None or predicate(payload): + return payload + + +@contextmanager +def subscribe( + client: _ClientLike, + module: str, + event: str | None = None, +) -> Iterator[Waiter]: + """Open one `LogoscoreClient.on_event` subscription, yield a `Waiter`. + + Re-using a single subscription across multiple `.next()` calls avoids + paying the watcher-subprocess startup cost (a fresh `logoscore watch` + subprocess) per wait. + + NB: upstream's `Subscription.start()` returns immediately after + `Popen(...)` + `thread.start()`. The watcher needs a moment to come + live and start emitting NDJSON; events fired in that window are lost. + If your trigger is a synchronous local action (e.g. `client.call(...)` + that emits an event before returning), open the subscription, wait + briefly (typical: 0.2-0.5s, or use a known-pumped sentinel event), + then trigger. + """ + queue: Queue[_QueueItem] = Queue() + + def _on_event(payload: dict[str, Any]) -> None: + queue.put(("event", payload)) + + def _on_error(exc: BaseException) -> None: + queue.put(("error", exc)) + + sub = client.on_event(module, event, _on_event, error_callback=_on_error) + try: + yield Waiter(queue, module, event) + finally: + # tight teardown — upstream default is 5.0s twice; we accept harder + # kills to keep test feedback fast on flake. + sub.cancel(timeout=1.0) + + +def wait_for_event( + client: _ClientLike, + module: str, + event: str | None = None, + *, + predicate: Predicate | None = None, + timeout: float, +) -> dict[str, Any]: + """One-shot convenience: open a subscription, wait once, close.""" + with subscribe(client, module, event) as w: + return w.next(predicate, timeout=timeout) diff --git a/tests/scripts/check_plugin_discovery.sh b/tests/scripts/check_plugin_discovery.sh new file mode 100755 index 0000000..3790e44 --- /dev/null +++ b/tests/scripts/check_plugin_discovery.sh @@ -0,0 +1,59 @@ +#!/usr/bin/env bash +# Verify the pytest11 entry-point auto-loads fixtures into a fresh consumer venv. +# +# What this proves: a downstream consumer who runs `pip install +# logos-integration-test-framework` (no `pytest_plugins` declarations, no +# `from ... import ...`) can still use `local_daemon` as a fixture in their +# tests because pytest discovered the plugin via dist-info entry-points. +# +# Why a separate venv: the dev venv already has the package as an editable +# install, so a local pytest run there proves nothing new — the entry-point +# *would* be found, but so would a stray `pytest_plugins` line in our own +# `tests/conftest.py`. A clean venv removes that ambiguity. +# +# This script is not run in CI yet; first downstream consumer to land theirs +# (chat-module) exercises the same code path for real. +set -euo pipefail + +REPO_ROOT="$(cd "$(dirname "$0")/../.." && pwd)" +TMPDIR_ROOT="$(mktemp -d)" +trap 'rm -rf "$TMPDIR_ROOT"' EXIT + +VENV_DIR="$TMPDIR_ROOT/venv" +TESTPROJ_DIR="$TMPDIR_ROOT/consumer" + +echo "==> Creating fresh venv at $VENV_DIR" +python3.12 -m venv "$VENV_DIR" +"$VENV_DIR/bin/pip" install --quiet --upgrade pip + +echo "==> Installing the framework from $REPO_ROOT (non-editable)" +"$VENV_DIR/bin/pip" install --quiet "$REPO_ROOT" + +echo "==> Installing pytest in the consumer venv" +"$VENV_DIR/bin/pip" install --quiet "pytest>=8.0" + +echo "==> Writing a minimal consumer test that uses the auto-loaded fixture" +mkdir -p "$TESTPROJ_DIR" +cat > "$TESTPROJ_DIR/test_consumer.py" <<'PY' +def test_local_daemon_fixture_is_visible(local_daemon): + # Will skip if `logoscore` binary isn't on PATH or LOGOS_MODULES_DIR is unset. + # Important: we just need pytest to *recognise* `local_daemon` as a fixture. + # If the entry-point auto-load failed, pytest would error with + # "fixture 'local_daemon' not found" — that's the regression we catch here. + pass +PY + +echo "==> Collecting (no execution) — proves the fixture is recognised" +cd "$TESTPROJ_DIR" +"$VENV_DIR/bin/pytest" --collect-only -q test_consumer.py + +echo "==> Running with skip-friendly env (no logoscore binary expected)" +"$VENV_DIR/bin/pytest" -q test_consumer.py || rc=$? +rc=${rc:-0} +# Acceptable outcomes: exit 0 (passed) or exit 5 (no tests) or skipped (which is exit 0 in pytest). +# An exit code of 1 (failures) or 4 (collection error → fixture-not-found) means broken plugin. +case "$rc" in + 0) echo "==> OK — fixture resolved (test ran or skipped)";; + 5) echo "==> No tests collected — unexpected"; exit 1;; + *) echo "==> FAIL — pytest exit $rc (likely fixture not found via entry-point)"; exit "$rc";; +esac diff --git a/tests/test_smoke.py b/tests/test_smoke.py new file mode 100644 index 0000000..9af4406 --- /dev/null +++ b/tests/test_smoke.py @@ -0,0 +1,29 @@ +"""End-to-end smoke against a real `logoscore` binary. + +Skipped without the binary + `LOGOS_MODULES_DIR` (the `local_*` fixtures +auto-skip; they're auto-loaded by the `pytest11` entry-point declared in +`pyproject.toml`). Not part of CI yet — runs locally for human verification. +""" + +from __future__ import annotations + +import pytest +from logoscore import LogoscoreClient + +from logos_integration_test_framework import EventTimeout, wait_for_event + + +def test_local_client_status(local_client: LogoscoreClient) -> None: + status = local_client.status() + assert isinstance(status, dict) + + +def test_wait_for_event_drives_watcher(local_client: LogoscoreClient) -> None: + """End-to-end check that `wait_for_event` actually spawns the watcher. + + No module is loaded so no event will ever fire; we just need to confirm + the helper drives `logoscore watch` against the real daemon and reaches + its timeout cleanly (no hangs, no exceptions other than `EventTimeout`). + """ + with pytest.raises(EventTimeout): + wait_for_event(local_client, "nonexistent-module", timeout=1.0) diff --git a/tests/unit/test_imports.py b/tests/unit/test_imports.py new file mode 100644 index 0000000..b2b5b4a --- /dev/null +++ b/tests/unit/test_imports.py @@ -0,0 +1,39 @@ +"""CI-runnable surface check. + +Catches upstream surface drift (e.g. PR #1 force-push removing a symbol) +before any fixture-dependent test fails opaquely. Runs anywhere — no +binary, no docker, no env vars. +""" + +from __future__ import annotations + + +def test_logoscore_surface() -> None: + # `issue_token` / `revoke_token` / `list_tokens` are deliberately not + # surfaced — we don't call them, and the master-branch fallback path + # (after PR #1 merges, or if we re-pin to @master before merge) may + # not export them yet. Adding them here would make a one-line re-pin + # break the import smoke spuriously. + from logoscore import ( # noqa: F401 + CONTAINER_TCP_PORT, + DaemonNotRunningError, + LogoscoreClient, + LogoscoreDaemon, + LogoscoreDockerDaemon, + LogoscoreError, + MethodError, + ModuleError, + Subscription, + docker_available, + image_present, + pick_free_port, + ) + + +def test_logos_integration_test_framework_surface() -> None: + from logos_integration_test_framework import ( # noqa: F401 + EventTimeout, + Waiter, + subscribe, + wait_for_event, + ) diff --git a/tests/unit/test_waits.py b/tests/unit/test_waits.py new file mode 100644 index 0000000..4054517 --- /dev/null +++ b/tests/unit/test_waits.py @@ -0,0 +1,198 @@ +"""Unit tests for `logos_integration_test_framework.waits`. + +Drives a stub client whose `on_event` immediately fires crafted payloads +through the registered callback. Verifies: +- match-on-first-event, +- predicate-mismatch-then-match, +- timeout raises `EventTimeout`, +- predicate exceptions surface on the test thread (not swallowed by the + daemon-thread handler the way they would inside the upstream callback), +- `error_callback` path delivers errors as raised exceptions on `.next()`. +""" + +from __future__ import annotations + +from collections.abc import Callable +from typing import Any + +import pytest + +from logos_integration_test_framework import ( + EventTimeout, + Waiter, + subscribe, + wait_for_event, +) + + +class _StubSubscription: + def __init__(self) -> None: + self.cancelled = False + + def cancel(self, timeout: float = 5.0) -> None: + self.cancelled = True + + +class _StubClient: + """Mimics `LogoscoreClient.on_event`. Each registered callback can be + fired manually via `.fire(payload)` / `.fire_error(exc)`. + """ + + def __init__(self) -> None: + self.subs: list[ + tuple[ + str, + str | None, + Callable[[dict[str, Any]], None], + Callable[[BaseException], None] | None, + ] + ] = [] + + def on_event( + self, + module: str, + event: str | None, + callback: Callable[[dict[str, Any]], None], + *, + error_callback: Callable[[BaseException], None] | None = None, + ) -> _StubSubscription: + self.subs.append((module, event, callback, error_callback)) + return _StubSubscription() + + @property + def latest_callback(self) -> Callable[[dict[str, Any]], None]: + return self.subs[-1][2] + + @property + def latest_error_callback(self) -> Callable[[BaseException], None] | None: + return self.subs[-1][3] + + +def test_match_on_first_event() -> None: + client = _StubClient() + with subscribe(client, "delivery") as w: + client.latest_callback({"kind": "X", "n": 1}) + got = w.next(timeout=1.0) + assert got == {"kind": "X", "n": 1} + + +def test_predicate_skips_until_match() -> None: + client = _StubClient() + with subscribe(client, "delivery", "Sent") as w: + client.latest_callback({"n": 1}) + client.latest_callback({"n": 2}) + client.latest_callback({"n": 3}) + got = w.next(lambda e: e["n"] == 3, timeout=1.0) + assert got["n"] == 3 + + +def test_timeout_with_no_events() -> None: + client = _StubClient() + with subscribe(client, "delivery") as w, pytest.raises(EventTimeout): + w.next(timeout=0.05) + + +def test_timeout_when_predicate_never_matches() -> None: + client = _StubClient() + with subscribe(client, "delivery") as w: + client.latest_callback({"n": 1}) + client.latest_callback({"n": 2}) + with pytest.raises(EventTimeout): + w.next(lambda e: e["n"] == 99, timeout=0.05) + + +def test_predicate_exception_surfaces_on_test_thread() -> None: + """Crucial: predicate runs in `.next`, not in the on_event callback, + so a predicate raise reaches the test rather than being captured by + upstream's `Subscription._pump`. + """ + client = _StubClient() + with subscribe(client, "delivery") as w: + client.latest_callback({"n": 1}) + + def boom(_: dict[str, Any]) -> bool: + raise RuntimeError("predicate-boom") + + with pytest.raises(RuntimeError, match="predicate-boom"): + w.next(boom, timeout=1.0) + + +def test_error_callback_surfaces_via_next() -> None: + client = _StubClient() + with subscribe(client, "delivery") as w: + cb = client.latest_error_callback + assert cb is not None + cb(RuntimeError("watcher exploded")) + with pytest.raises(RuntimeError, match="watcher exploded"): + w.next(timeout=1.0) + + +def test_subscription_cancelled_on_exit() -> None: + client = _StubClient() + sub_holder: list[_StubSubscription] = [] + real_on_event = client.on_event + + def capture(*args: Any, **kwargs: Any) -> _StubSubscription: + sub = real_on_event(*args, **kwargs) + sub_holder.append(sub) + return sub + + client.on_event = capture # type: ignore[method-assign] + with subscribe(client, "delivery"): + pass + assert sub_holder[0].cancelled is True + + +def test_wait_for_event_one_shot() -> None: + client = _StubClient() + # Schedule the event before calling wait_for_event by registering through + # a wrapper that fires immediately upon subscription. + real_on_event = client.on_event + + def fire_immediately(*args: Any, **kwargs: Any) -> _StubSubscription: + sub = real_on_event(*args, **kwargs) + client.latest_callback({"kind": "ok"}) + return sub + + client.on_event = fire_immediately # type: ignore[method-assign] + got = wait_for_event(client, "delivery", "Sent", timeout=1.0) + assert got == {"kind": "ok"} + + +def test_waiter_type_exposed() -> None: + """Surface check: the public `Waiter` class is the one yielded.""" + client = _StubClient() + with subscribe(client, "delivery") as w: + assert isinstance(w, Waiter) + + +def test_subscription_reused_across_next_calls() -> None: + """The long-lived design must not re-call `on_event` per `.next()`.""" + client = _StubClient() + with subscribe(client, "delivery") as w: + client.latest_callback({"n": 1}) + client.latest_callback({"n": 2}) + first = w.next(timeout=1.0) + second = w.next(timeout=1.0) + assert first == {"n": 1} + assert second == {"n": 2} + assert len(client.subs) == 1 + + +def test_event_none_propagates() -> None: + """`subscribe(client, module)` must pass `event=None` to upstream.""" + client = _StubClient() + with subscribe(client, "delivery"): + pass + assert client.subs[0][1] is None + + +def test_resume_after_timeout() -> None: + """A `.next()` that timed out must not poison the queue.""" + client = _StubClient() + with subscribe(client, "delivery") as w: + with pytest.raises(EventTimeout): + w.next(timeout=0.05) + client.latest_callback({"n": 1}) + got = w.next(timeout=1.0) + assert got == {"n": 1}