199 lines
6.1 KiB
Python
Raw Permalink Normal View History

"""Unit tests for `logos_integration_test_framework.waits`.
Drives a stub client whose `on_event` immediately fires crafted payloads
through the registered callback. Verifies:
- match-on-first-event,
- predicate-mismatch-then-match,
- timeout raises `EventTimeout`,
- predicate exceptions surface on the test thread (not swallowed by the
daemon-thread handler the way they would inside the upstream callback),
- `error_callback` path delivers errors as raised exceptions on `.next()`.
"""
from __future__ import annotations
from collections.abc import Callable
from typing import Any
import pytest
from logos_integration_test_framework import (
EventTimeout,
Waiter,
subscribe,
wait_for_event,
)
class _StubSubscription:
def __init__(self) -> None:
self.cancelled = False
def cancel(self, timeout: float = 5.0) -> None:
self.cancelled = True
class _StubClient:
"""Mimics `LogoscoreClient.on_event`. Each registered callback can be
fired manually via `.fire(payload)` / `.fire_error(exc)`.
"""
def __init__(self) -> None:
self.subs: list[
tuple[
str,
str | None,
Callable[[dict[str, Any]], None],
Callable[[BaseException], None] | None,
]
] = []
def on_event(
self,
module: str,
event: str | None,
callback: Callable[[dict[str, Any]], None],
*,
error_callback: Callable[[BaseException], None] | None = None,
) -> _StubSubscription:
self.subs.append((module, event, callback, error_callback))
return _StubSubscription()
@property
def latest_callback(self) -> Callable[[dict[str, Any]], None]:
return self.subs[-1][2]
@property
def latest_error_callback(self) -> Callable[[BaseException], None] | None:
return self.subs[-1][3]
def test_match_on_first_event() -> None:
client = _StubClient()
with subscribe(client, "delivery") as w:
client.latest_callback({"kind": "X", "n": 1})
got = w.next(timeout=1.0)
assert got == {"kind": "X", "n": 1}
def test_predicate_skips_until_match() -> None:
client = _StubClient()
with subscribe(client, "delivery", "Sent") as w:
client.latest_callback({"n": 1})
client.latest_callback({"n": 2})
client.latest_callback({"n": 3})
got = w.next(lambda e: e["n"] == 3, timeout=1.0)
assert got["n"] == 3
def test_timeout_with_no_events() -> None:
client = _StubClient()
with subscribe(client, "delivery") as w, pytest.raises(EventTimeout):
w.next(timeout=0.05)
def test_timeout_when_predicate_never_matches() -> None:
client = _StubClient()
with subscribe(client, "delivery") as w:
client.latest_callback({"n": 1})
client.latest_callback({"n": 2})
with pytest.raises(EventTimeout):
w.next(lambda e: e["n"] == 99, timeout=0.05)
def test_predicate_exception_surfaces_on_test_thread() -> None:
"""Crucial: predicate runs in `.next`, not in the on_event callback,
so a predicate raise reaches the test rather than being captured by
upstream's `Subscription._pump`.
"""
client = _StubClient()
with subscribe(client, "delivery") as w:
client.latest_callback({"n": 1})
def boom(_: dict[str, Any]) -> bool:
raise RuntimeError("predicate-boom")
with pytest.raises(RuntimeError, match="predicate-boom"):
w.next(boom, timeout=1.0)
def test_error_callback_surfaces_via_next() -> None:
client = _StubClient()
with subscribe(client, "delivery") as w:
cb = client.latest_error_callback
assert cb is not None
cb(RuntimeError("watcher exploded"))
with pytest.raises(RuntimeError, match="watcher exploded"):
w.next(timeout=1.0)
def test_subscription_cancelled_on_exit() -> None:
client = _StubClient()
sub_holder: list[_StubSubscription] = []
real_on_event = client.on_event
def capture(*args: Any, **kwargs: Any) -> _StubSubscription:
sub = real_on_event(*args, **kwargs)
sub_holder.append(sub)
return sub
client.on_event = capture # type: ignore[method-assign]
with subscribe(client, "delivery"):
pass
assert sub_holder[0].cancelled is True
def test_wait_for_event_one_shot() -> None:
client = _StubClient()
# Schedule the event before calling wait_for_event by registering through
# a wrapper that fires immediately upon subscription.
real_on_event = client.on_event
def fire_immediately(*args: Any, **kwargs: Any) -> _StubSubscription:
sub = real_on_event(*args, **kwargs)
client.latest_callback({"kind": "ok"})
return sub
client.on_event = fire_immediately # type: ignore[method-assign]
got = wait_for_event(client, "delivery", "Sent", timeout=1.0)
assert got == {"kind": "ok"}
def test_waiter_type_exposed() -> None:
"""Surface check: the public `Waiter` class is the one yielded."""
client = _StubClient()
with subscribe(client, "delivery") as w:
assert isinstance(w, Waiter)
def test_subscription_reused_across_next_calls() -> None:
"""The long-lived design must not re-call `on_event` per `.next()`."""
client = _StubClient()
with subscribe(client, "delivery") as w:
client.latest_callback({"n": 1})
client.latest_callback({"n": 2})
first = w.next(timeout=1.0)
second = w.next(timeout=1.0)
assert first == {"n": 1}
assert second == {"n": 2}
assert len(client.subs) == 1
def test_event_none_propagates() -> None:
"""`subscribe(client, module)` must pass `event=None` to upstream."""
client = _StubClient()
with subscribe(client, "delivery"):
pass
assert client.subs[0][1] is None
def test_resume_after_timeout() -> None:
"""A `.next()` that timed out must not poison the queue."""
client = _StubClient()
with subscribe(client, "delivery") as w:
with pytest.raises(EventTimeout):
w.next(timeout=0.05)
client.latest_callback({"n": 1})
got = w.next(timeout=1.0)
assert got == {"n": 1}