Initialize logos-integration-test-framework package with pytest plugins, event-wait helpers, and CI setup.

This commit is contained in:
Egor Rachkovskii 2026-04-30 14:49:12 +01:00
commit 890ea07b85
13 changed files with 794 additions and 0 deletions

34
.github/workflows/unit.yml vendored Normal file
View File

@ -0,0 +1,34 @@
name: unit
on:
pull_request:
push:
branches: [main, master]
concurrency:
group: unit-${{ github.ref }}
cancel-in-progress: true
jobs:
unit:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ["3.11", "3.12"]
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: pip
- name: Install
run: |
python -m pip install --upgrade pip
pip install -e '.[dev]'
- name: Ruff
run: ruff check src tests
- name: Mypy
run: mypy src tests
- name: Pytest (unit only)
run: pytest tests/unit -q

18
.gitignore vendored Normal file
View File

@ -0,0 +1,18 @@
__pycache__/
*.py[cod]
*.egg-info/
.eggs/
build/
dist/
.venv/
venv/
.env
.pytest_cache/
.ruff_cache/
.mypy_cache/
.coverage
htmlcov/
.idea/
.vscode/
*.swp
.DS_Store

16
.pre-commit-config.yaml Normal file
View File

@ -0,0 +1,16 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.5.7
hooks:
- id: ruff
args: [--fix]
- id: ruff-format
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.6.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-toml
- id: check-added-large-files

1
.python-version Normal file
View File

@ -0,0 +1 @@
3.12

100
README.md Normal file
View File

@ -0,0 +1,100 @@
# logos-integration-test-framework
Pytest plugin + helpers for writing integration tests against a `logoscore` daemon. Built on top of [`logos-co/logos-logoscore-py`](https://github.com/logos-co/logos-logoscore-py).
## What's in the box
```
src/logos_integration_test_framework/
├── __init__.py # exports: subscribe, wait_for_event, Waiter, EventTimeout
├── waits.py # Queue-backed adapters over LogoscoreClient.on_event
└── fixtures.py # pytest fixtures: local_daemon, local_client, docker_daemon, docker_client
# (auto-loaded via pytest11 entry-point — no import needed)
```
That's the whole package. Client / transport / topology layers live upstream in `logoscore-py` — don't reimplement them.
## Install (consumer-side)
In your module's test repo (e.g. `logos-chat-module/tests/integration/`):
```toml
# pyproject.toml
[project.optional-dependencies]
test = [
"logos-integration-test-framework @ git+https://github.com/logos-co/integration-tests.git@<commit-sha>",
"pytest>=8.0",
]
```
Pin a commit SHA, not a branch. Then `pip install -e '.[test]'` and the four daemon/client fixtures are immediately available in your tests — no `pytest_plugins` declaration needed.
## Writing a test (consumer)
Open the subscription **before** triggering the action — the upstream `logoscore watch` subprocess takes a moment to come live, and events fired before that window are lost. A short sleep or a known-pumped sentinel event is enough.
```python
import time
from logos_integration_test_framework import subscribe
def test_my_module(local_client): # local_client comes from the auto-loaded plugin
local_client.load_module("my_module")
with subscribe(local_client, "my_module", "DoneEvent") as w:
time.sleep(0.3) # let the watcher come live
request_id = local_client.call("my_module", "do_something", "arg")
event = w.next(
predicate=lambda e: e["data"][0] == request_id,
timeout=10.0,
)
assert event["event"] == "DoneEvent"
```
Multiple waits in one test — re-use the same subscription:
```python
def test_two_waits(local_client):
with subscribe(local_client, "my_module") as w:
time.sleep(0.3)
local_client.call("my_module", "fire", "first")
first = w.next(lambda e: e["data"][0] == 1, timeout=5.0)
local_client.call("my_module", "fire", "second")
second = w.next(lambda e: e["data"][0] == 2, timeout=5.0)
assert first != second
```
Predicate exceptions surface on the test thread (unlike exceptions raised inside an upstream `on_event` callback, which the watcher's pump catches and routes to `error_callback`).
The one-shot `wait_for_event(...)` is a convenience for cases where the trigger has *already* happened and the event is still in flight; for the more common subscribe-then-trigger pattern, use `subscribe(...)` directly.
## Fixtures
| Fixture | Scope | Provides | Skip condition |
|---|---|---|---|
| `local_daemon` | module | `logoscore.LogoscoreDaemon` (local subprocess) | `logoscore` not on `PATH` or `LOGOS_MODULES_DIR` unset / missing |
| `local_client` | function | `LogoscoreClient` from `local_daemon` | (inherits) |
| `docker_daemon` | module | `logoscore.LogoscoreDockerDaemon` (containerised) | `docker` CLI absent, image absent, `LOGOSCORE_IMAGE`/`LOGOS_MODULES_DIR` unset |
| `docker_client` | function | `LogoscoreClient` from `docker_daemon` | (inherits) |
Default scopes are `module` for daemons (function-scope is too slow given upstream's `startup_timeout=15.0`; session-scope hides cross-test leaks) and `function` for clients (`client.stop()` would otherwise poison every later test in the module). Override per-test with `pytest.fixture(scope=...)` if needed.
The `docker_*` fixtures need the `docker` CLI on `PATH` — upstream `LogoscoreDockerDaemon` shells out to it (it does not use docker-py). The fixture skips automatically if the binary is absent.
To opt out of the auto-loaded plugin in a particular run: `pytest -p no:logos_integration_test_framework`.
## Contributing to this repo
```bash
python -m venv .venv && source .venv/bin/activate
pip install -e '.[dev]' # ← required: registers the pytest11 entry-point
pytest tests/unit -q # 13 tests: surface check + waits.py
```
`pip install -e '.[dev]'` is **required** before `pytest` — otherwise the entry-point isn't registered and the smoke test (`tests/test_smoke.py`) won't see the `local_*` fixtures. CI does this automatically.
Lint / type / unit runs are gated in `.github/workflows/unit.yml` (Python 3.11 + 3.12). Direct pushes to `master` are rejected by branch-protection rules; merge via PR with green CI.
## Related
- [`logos-co/logos-logoscore-py`](https://github.com/logos-co/logos-logoscore-py) — the Python wrapper this builds on (`LogoscoreDaemon` / `LogoscoreDockerDaemon` / `LogoscoreClient`).
- [`logos-co/logos-test-framework`](https://github.com/logos-co/logos-test-framework) — **different layer**: a C++ unit-test framework for module internals (gtest-style, link-time mock substitution). No overlap with this Python package.

66
pyproject.toml Normal file
View File

@ -0,0 +1,66 @@
[build-system]
requires = ["hatchling>=1.24"]
build-backend = "hatchling.build"
[project]
name = "logos-integration-test-framework"
version = "0.2.0"
description = "pytest infrastructure for writing logoscore integration tests"
readme = "README.md"
# Deliberate tightening: upstream logoscore allows >=3.10. Don't relax without
# auditing for `Self` / pattern-match / debug-f-string usage.
requires-python = ">=3.11"
license = { text = "MIT OR Apache-2.0" }
authors = [{ name = "IFT QA" }]
dependencies = [
# logoscore is not on PyPI. Pinned to PR #1 head SHA, not the branch name,
# so an upstream force-push can't silently change what we install.
# Re-pin to `@master` after https://github.com/logos-co/logos-logoscore-py/pull/1 merges.
"logoscore @ git+https://github.com/logos-co/logos-logoscore-py.git@6e7155733d90a66d117211c33322594e1b50e742",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0",
"ruff>=0.5",
"mypy>=1.10",
"pre-commit>=3.7",
]
# Auto-load the fixtures module as a pytest plugin. After
# `pip install logos-integration-test-framework`, fixtures
# `local_daemon` / `local_client` / `docker_daemon` / `docker_client`
# are available without `pytest_plugins` declarations.
[project.entry-points.pytest11]
logos_integration_test_framework = "logos_integration_test_framework.fixtures"
[tool.hatch.metadata]
# logoscore is installed via `git+https://...` — a direct reference.
allow-direct-references = true
[tool.hatch.build.targets.wheel]
packages = ["src/logos_integration_test_framework"]
[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "-ra --strict-markers"
[tool.ruff]
line-length = 100
target-version = "py311"
src = ["src", "tests"]
[tool.ruff.lint]
select = ["E", "F", "I", "UP", "B", "SIM", "RUF"]
ignore = ["E501"]
[tool.mypy]
python_version = "3.11"
strict = true
mypy_path = "src"
# Upstream logoscore ships no py.typed marker — without this override, strict
# mode fails with import-untyped on first push.
[[tool.mypy.overrides]]
module = "logoscore.*"
ignore_missing_imports = true

View File

@ -0,0 +1,17 @@
"""pytest infrastructure for writing logoscore integration tests.
Public surface: blocking event-wait helpers (`subscribe`, `wait_for_event`,
`Waiter`, `EventTimeout`). Daemon/client pytest fixtures live in
`logos_integration_test_framework.fixtures` and auto-load via the `pytest11`
entry-point no explicit import needed in consumer test files.
"""
from logos_integration_test_framework.waits import (
EventTimeout,
Waiter,
subscribe,
wait_for_event,
)
__all__ = ["EventTimeout", "Waiter", "subscribe", "wait_for_event"]
__version__ = "0.2.0"

View File

@ -0,0 +1,83 @@
"""Pytest fixtures wrapping `logoscore.LogoscoreDaemon` / `LogoscoreDockerDaemon`.
Auto-loaded as a pytest plugin via the `pytest11` entry-point declared in
`pyproject.toml`. Any project that installs `logos-integration-test-framework`
gets these fixtures available without `pytest_plugins` declarations.
Both daemon flavors auto-skip when their environmental requirements aren't
met (binary on PATH, image present, modules dir set, ) there's no
in-package binary or image. Set `LOGOS_MODULES_DIR` (and `LOGOSCORE_IMAGE`
for the docker fixture) in CI/local env to enable.
Daemons are `module`-scope (function-scope pays the upstream
`startup_timeout=15.0` per test; session-scope hides cross-test state leakage).
Clients are `function`-scope on top of the shared daemon `client()` is a
cheap construction, but `client.stop()` shells out `logoscore stop` to the
daemon, which would otherwise poison every later test in the module.
`logoscore` is imported **lazily** inside each fixture body. Many consumers
will only use `wait_for_event` and never touch a daemon fixture eager
imports would charge them subprocess/Docker shim startup on every pytest run.
"""
from __future__ import annotations
import os
import shutil
from collections.abc import Iterator
from pathlib import Path
from typing import TYPE_CHECKING, Any
import pytest
if TYPE_CHECKING:
from logoscore import LogoscoreClient, LogoscoreDaemon, LogoscoreDockerDaemon
def _modules_dir_or_skip() -> Path:
raw = os.environ.get("LOGOS_MODULES_DIR")
if not raw:
pytest.skip("LOGOS_MODULES_DIR not set")
path = Path(raw)
if not path.is_dir():
pytest.skip(f"LOGOS_MODULES_DIR={raw!r} is not an existing directory")
return path
@pytest.fixture(scope="module")
def local_daemon() -> Iterator[LogoscoreDaemon]:
if shutil.which("logoscore") is None:
pytest.skip("`logoscore` binary not on PATH")
modules_dir = _modules_dir_or_skip()
from logoscore import LogoscoreDaemon
with LogoscoreDaemon(modules_dir=modules_dir) as daemon:
yield daemon
@pytest.fixture
def local_client(local_daemon: LogoscoreDaemon) -> LogoscoreClient:
return local_daemon.client()
@pytest.fixture(scope="module")
def docker_daemon() -> Iterator[LogoscoreDockerDaemon]:
from logoscore import LogoscoreDockerDaemon, docker_available, image_present
if not docker_available():
pytest.skip("docker not available on host")
image = os.environ.get("LOGOSCORE_IMAGE")
if not image:
pytest.skip("LOGOSCORE_IMAGE not set")
if not image_present(image):
pytest.skip(f"docker image {image!r} not present locally")
modules_dir = _modules_dir_or_skip()
with LogoscoreDockerDaemon(image=image, modules_dir=modules_dir) as daemon:
yield daemon
@pytest.fixture
def docker_client(docker_daemon: LogoscoreDockerDaemon) -> Any:
return docker_daemon.client(binary="logoscore")

View File

@ -0,0 +1,134 @@
"""Blocking event-wait helpers over `LogoscoreClient.on_event`.
Upstream's `on_event` is callback-based and runs the callback on a background
thread. Pytest scenarios want blocking semantics ("wait for an event matching
predicate, with a timeout") and need exceptions raised in predicate to fail
the test loudly, not get swallowed by the daemon thread's exception handler.
The pattern is: callback enqueues raw payloads; the test thread dequeues and
evaluates the predicate. Errors from the watcher subprocess are surfaced
through `error_callback` and re-raised on the next `.next()` call.
"""
from __future__ import annotations
import time
from collections.abc import Callable, Iterator
from contextlib import contextmanager
from queue import Empty, Queue
from typing import Any, Literal, Protocol
__all__ = ["EventTimeout", "Predicate", "Waiter", "subscribe", "wait_for_event"]
_QueueItem = (
tuple[Literal["event"], dict[str, Any]] | tuple[Literal["error"], BaseException]
)
class _ClientLike(Protocol):
def on_event(
self,
module: str,
event: str | None,
callback: Callable[[dict[str, Any]], None],
*,
error_callback: Callable[[BaseException], None] | None = ...,
) -> Any: ...
class EventTimeout(TimeoutError):
"""Raised when no matching event arrived within `timeout`."""
Predicate = Callable[[dict[str, Any]], bool]
class Waiter:
"""Yielded by `subscribe()`. Call `.next()` repeatedly within one subscription."""
def __init__(self, queue: Queue[_QueueItem], module: str, event: str | None) -> None:
self._queue = queue
self._module = module
self._event = event
def next(
self,
predicate: Predicate | None = None,
*,
timeout: float,
) -> dict[str, Any]:
"""Block until an event matching `predicate` arrives, or `timeout` elapses.
Predicate runs on the test thread (after the dequeue) exceptions raised
inside it propagate to the test, unlike exceptions raised inside the
upstream callback (which the watcher's `_pump` catches and routes through
`error_callback`).
"""
deadline = time.monotonic() + timeout
while True:
remaining = deadline - time.monotonic()
if remaining <= 0:
raise EventTimeout(
f"no matching event for {self._module}/{self._event} within {timeout}s"
)
try:
slot = self._queue.get(timeout=remaining)
except Empty as exc:
raise EventTimeout(
f"no matching event for {self._module}/{self._event} within {timeout}s"
) from exc
if slot[0] == "error":
raise slot[1]
payload = slot[1]
if predicate is None or predicate(payload):
return payload
@contextmanager
def subscribe(
client: _ClientLike,
module: str,
event: str | None = None,
) -> Iterator[Waiter]:
"""Open one `LogoscoreClient.on_event` subscription, yield a `Waiter`.
Re-using a single subscription across multiple `.next()` calls avoids
paying the watcher-subprocess startup cost (a fresh `logoscore watch`
subprocess) per wait.
NB: upstream's `Subscription.start()` returns immediately after
`Popen(...)` + `thread.start()`. The watcher needs a moment to come
live and start emitting NDJSON; events fired in that window are lost.
If your trigger is a synchronous local action (e.g. `client.call(...)`
that emits an event before returning), open the subscription, wait
briefly (typical: 0.2-0.5s, or use a known-pumped sentinel event),
then trigger.
"""
queue: Queue[_QueueItem] = Queue()
def _on_event(payload: dict[str, Any]) -> None:
queue.put(("event", payload))
def _on_error(exc: BaseException) -> None:
queue.put(("error", exc))
sub = client.on_event(module, event, _on_event, error_callback=_on_error)
try:
yield Waiter(queue, module, event)
finally:
# tight teardown — upstream default is 5.0s twice; we accept harder
# kills to keep test feedback fast on flake.
sub.cancel(timeout=1.0)
def wait_for_event(
client: _ClientLike,
module: str,
event: str | None = None,
*,
predicate: Predicate | None = None,
timeout: float,
) -> dict[str, Any]:
"""One-shot convenience: open a subscription, wait once, close."""
with subscribe(client, module, event) as w:
return w.next(predicate, timeout=timeout)

View File

@ -0,0 +1,59 @@
#!/usr/bin/env bash
# Verify the pytest11 entry-point auto-loads fixtures into a fresh consumer venv.
#
# What this proves: a downstream consumer who runs `pip install
# logos-integration-test-framework` (no `pytest_plugins` declarations, no
# `from ... import ...`) can still use `local_daemon` as a fixture in their
# tests because pytest discovered the plugin via dist-info entry-points.
#
# Why a separate venv: the dev venv already has the package as an editable
# install, so a local pytest run there proves nothing new — the entry-point
# *would* be found, but so would a stray `pytest_plugins` line in our own
# `tests/conftest.py`. A clean venv removes that ambiguity.
#
# This script is not run in CI yet; first downstream consumer to land theirs
# (chat-module) exercises the same code path for real.
set -euo pipefail
REPO_ROOT="$(cd "$(dirname "$0")/../.." && pwd)"
TMPDIR_ROOT="$(mktemp -d)"
trap 'rm -rf "$TMPDIR_ROOT"' EXIT
VENV_DIR="$TMPDIR_ROOT/venv"
TESTPROJ_DIR="$TMPDIR_ROOT/consumer"
echo "==> Creating fresh venv at $VENV_DIR"
python3.12 -m venv "$VENV_DIR"
"$VENV_DIR/bin/pip" install --quiet --upgrade pip
echo "==> Installing the framework from $REPO_ROOT (non-editable)"
"$VENV_DIR/bin/pip" install --quiet "$REPO_ROOT"
echo "==> Installing pytest in the consumer venv"
"$VENV_DIR/bin/pip" install --quiet "pytest>=8.0"
echo "==> Writing a minimal consumer test that uses the auto-loaded fixture"
mkdir -p "$TESTPROJ_DIR"
cat > "$TESTPROJ_DIR/test_consumer.py" <<'PY'
def test_local_daemon_fixture_is_visible(local_daemon):
# Will skip if `logoscore` binary isn't on PATH or LOGOS_MODULES_DIR is unset.
# Important: we just need pytest to *recognise* `local_daemon` as a fixture.
# If the entry-point auto-load failed, pytest would error with
# "fixture 'local_daemon' not found" — that's the regression we catch here.
pass
PY
echo "==> Collecting (no execution) — proves the fixture is recognised"
cd "$TESTPROJ_DIR"
"$VENV_DIR/bin/pytest" --collect-only -q test_consumer.py
echo "==> Running with skip-friendly env (no logoscore binary expected)"
"$VENV_DIR/bin/pytest" -q test_consumer.py || rc=$?
rc=${rc:-0}
# Acceptable outcomes: exit 0 (passed) or exit 5 (no tests) or skipped (which is exit 0 in pytest).
# An exit code of 1 (failures) or 4 (collection error → fixture-not-found) means broken plugin.
case "$rc" in
0) echo "==> OK — fixture resolved (test ran or skipped)";;
5) echo "==> No tests collected — unexpected"; exit 1;;
*) echo "==> FAIL — pytest exit $rc (likely fixture not found via entry-point)"; exit "$rc";;
esac

29
tests/test_smoke.py Normal file
View File

@ -0,0 +1,29 @@
"""End-to-end smoke against a real `logoscore` binary.
Skipped without the binary + `LOGOS_MODULES_DIR` (the `local_*` fixtures
auto-skip; they're auto-loaded by the `pytest11` entry-point declared in
`pyproject.toml`). Not part of CI yet runs locally for human verification.
"""
from __future__ import annotations
import pytest
from logoscore import LogoscoreClient
from logos_integration_test_framework import EventTimeout, wait_for_event
def test_local_client_status(local_client: LogoscoreClient) -> None:
status = local_client.status()
assert isinstance(status, dict)
def test_wait_for_event_drives_watcher(local_client: LogoscoreClient) -> None:
"""End-to-end check that `wait_for_event` actually spawns the watcher.
No module is loaded so no event will ever fire; we just need to confirm
the helper drives `logoscore watch` against the real daemon and reaches
its timeout cleanly (no hangs, no exceptions other than `EventTimeout`).
"""
with pytest.raises(EventTimeout):
wait_for_event(local_client, "nonexistent-module", timeout=1.0)

View File

@ -0,0 +1,39 @@
"""CI-runnable surface check.
Catches upstream surface drift (e.g. PR #1 force-push removing a symbol)
before any fixture-dependent test fails opaquely. Runs anywhere no
binary, no docker, no env vars.
"""
from __future__ import annotations
def test_logoscore_surface() -> None:
# `issue_token` / `revoke_token` / `list_tokens` are deliberately not
# surfaced — we don't call them, and the master-branch fallback path
# (after PR #1 merges, or if we re-pin to @master before merge) may
# not export them yet. Adding them here would make a one-line re-pin
# break the import smoke spuriously.
from logoscore import ( # noqa: F401
CONTAINER_TCP_PORT,
DaemonNotRunningError,
LogoscoreClient,
LogoscoreDaemon,
LogoscoreDockerDaemon,
LogoscoreError,
MethodError,
ModuleError,
Subscription,
docker_available,
image_present,
pick_free_port,
)
def test_logos_integration_test_framework_surface() -> None:
from logos_integration_test_framework import ( # noqa: F401
EventTimeout,
Waiter,
subscribe,
wait_for_event,
)

198
tests/unit/test_waits.py Normal file
View File

@ -0,0 +1,198 @@
"""Unit tests for `logos_integration_test_framework.waits`.
Drives a stub client whose `on_event` immediately fires crafted payloads
through the registered callback. Verifies:
- match-on-first-event,
- predicate-mismatch-then-match,
- timeout raises `EventTimeout`,
- predicate exceptions surface on the test thread (not swallowed by the
daemon-thread handler the way they would inside the upstream callback),
- `error_callback` path delivers errors as raised exceptions on `.next()`.
"""
from __future__ import annotations
from collections.abc import Callable
from typing import Any
import pytest
from logos_integration_test_framework import (
EventTimeout,
Waiter,
subscribe,
wait_for_event,
)
class _StubSubscription:
def __init__(self) -> None:
self.cancelled = False
def cancel(self, timeout: float = 5.0) -> None:
self.cancelled = True
class _StubClient:
"""Mimics `LogoscoreClient.on_event`. Each registered callback can be
fired manually via `.fire(payload)` / `.fire_error(exc)`.
"""
def __init__(self) -> None:
self.subs: list[
tuple[
str,
str | None,
Callable[[dict[str, Any]], None],
Callable[[BaseException], None] | None,
]
] = []
def on_event(
self,
module: str,
event: str | None,
callback: Callable[[dict[str, Any]], None],
*,
error_callback: Callable[[BaseException], None] | None = None,
) -> _StubSubscription:
self.subs.append((module, event, callback, error_callback))
return _StubSubscription()
@property
def latest_callback(self) -> Callable[[dict[str, Any]], None]:
return self.subs[-1][2]
@property
def latest_error_callback(self) -> Callable[[BaseException], None] | None:
return self.subs[-1][3]
def test_match_on_first_event() -> None:
client = _StubClient()
with subscribe(client, "delivery") as w:
client.latest_callback({"kind": "X", "n": 1})
got = w.next(timeout=1.0)
assert got == {"kind": "X", "n": 1}
def test_predicate_skips_until_match() -> None:
client = _StubClient()
with subscribe(client, "delivery", "Sent") as w:
client.latest_callback({"n": 1})
client.latest_callback({"n": 2})
client.latest_callback({"n": 3})
got = w.next(lambda e: e["n"] == 3, timeout=1.0)
assert got["n"] == 3
def test_timeout_with_no_events() -> None:
client = _StubClient()
with subscribe(client, "delivery") as w, pytest.raises(EventTimeout):
w.next(timeout=0.05)
def test_timeout_when_predicate_never_matches() -> None:
client = _StubClient()
with subscribe(client, "delivery") as w:
client.latest_callback({"n": 1})
client.latest_callback({"n": 2})
with pytest.raises(EventTimeout):
w.next(lambda e: e["n"] == 99, timeout=0.05)
def test_predicate_exception_surfaces_on_test_thread() -> None:
"""Crucial: predicate runs in `.next`, not in the on_event callback,
so a predicate raise reaches the test rather than being captured by
upstream's `Subscription._pump`.
"""
client = _StubClient()
with subscribe(client, "delivery") as w:
client.latest_callback({"n": 1})
def boom(_: dict[str, Any]) -> bool:
raise RuntimeError("predicate-boom")
with pytest.raises(RuntimeError, match="predicate-boom"):
w.next(boom, timeout=1.0)
def test_error_callback_surfaces_via_next() -> None:
client = _StubClient()
with subscribe(client, "delivery") as w:
cb = client.latest_error_callback
assert cb is not None
cb(RuntimeError("watcher exploded"))
with pytest.raises(RuntimeError, match="watcher exploded"):
w.next(timeout=1.0)
def test_subscription_cancelled_on_exit() -> None:
client = _StubClient()
sub_holder: list[_StubSubscription] = []
real_on_event = client.on_event
def capture(*args: Any, **kwargs: Any) -> _StubSubscription:
sub = real_on_event(*args, **kwargs)
sub_holder.append(sub)
return sub
client.on_event = capture # type: ignore[method-assign]
with subscribe(client, "delivery"):
pass
assert sub_holder[0].cancelled is True
def test_wait_for_event_one_shot() -> None:
client = _StubClient()
# Schedule the event before calling wait_for_event by registering through
# a wrapper that fires immediately upon subscription.
real_on_event = client.on_event
def fire_immediately(*args: Any, **kwargs: Any) -> _StubSubscription:
sub = real_on_event(*args, **kwargs)
client.latest_callback({"kind": "ok"})
return sub
client.on_event = fire_immediately # type: ignore[method-assign]
got = wait_for_event(client, "delivery", "Sent", timeout=1.0)
assert got == {"kind": "ok"}
def test_waiter_type_exposed() -> None:
"""Surface check: the public `Waiter` class is the one yielded."""
client = _StubClient()
with subscribe(client, "delivery") as w:
assert isinstance(w, Waiter)
def test_subscription_reused_across_next_calls() -> None:
"""The long-lived design must not re-call `on_event` per `.next()`."""
client = _StubClient()
with subscribe(client, "delivery") as w:
client.latest_callback({"n": 1})
client.latest_callback({"n": 2})
first = w.next(timeout=1.0)
second = w.next(timeout=1.0)
assert first == {"n": 1}
assert second == {"n": 2}
assert len(client.subs) == 1
def test_event_none_propagates() -> None:
"""`subscribe(client, module)` must pass `event=None` to upstream."""
client = _StubClient()
with subscribe(client, "delivery"):
pass
assert client.subs[0][1] is None
def test_resume_after_timeout() -> None:
"""A `.next()` that timed out must not poison the queue."""
client = _StubClient()
with subscribe(client, "delivery") as w:
with pytest.raises(EventTimeout):
w.next(timeout=0.05)
client.latest_callback({"n": 1})
got = w.next(timeout=1.0)
assert got == {"n": 1}