mirror of
https://github.com/logos-messaging/logos-integration-test-framework.git
synced 2026-05-19 10:59:49 +00:00
199 lines
6.1 KiB
Python
199 lines
6.1 KiB
Python
"""Unit tests for `logos_integration_test_framework.waits`.
|
|
|
|
Drives a stub client whose `on_event` immediately fires crafted payloads
|
|
through the registered callback. Verifies:
|
|
- match-on-first-event,
|
|
- predicate-mismatch-then-match,
|
|
- timeout raises `EventTimeout`,
|
|
- predicate exceptions surface on the test thread (not swallowed by the
|
|
daemon-thread handler the way they would inside the upstream callback),
|
|
- `error_callback` path delivers errors as raised exceptions on `.next()`.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Callable
|
|
from typing import Any
|
|
|
|
import pytest
|
|
|
|
from logos_integration_test_framework import (
|
|
EventTimeout,
|
|
Waiter,
|
|
subscribe,
|
|
wait_for_event,
|
|
)
|
|
|
|
|
|
class _StubSubscription:
|
|
def __init__(self) -> None:
|
|
self.cancelled = False
|
|
|
|
def cancel(self, timeout: float = 5.0) -> None:
|
|
self.cancelled = True
|
|
|
|
|
|
class _StubClient:
|
|
"""Mimics `LogoscoreClient.on_event`. Each registered callback can be
|
|
fired manually via `.fire(payload)` / `.fire_error(exc)`.
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self.subs: list[
|
|
tuple[
|
|
str,
|
|
str | None,
|
|
Callable[[dict[str, Any]], None],
|
|
Callable[[BaseException], None] | None,
|
|
]
|
|
] = []
|
|
|
|
def on_event(
|
|
self,
|
|
module: str,
|
|
event: str | None,
|
|
callback: Callable[[dict[str, Any]], None],
|
|
*,
|
|
error_callback: Callable[[BaseException], None] | None = None,
|
|
) -> _StubSubscription:
|
|
self.subs.append((module, event, callback, error_callback))
|
|
return _StubSubscription()
|
|
|
|
@property
|
|
def latest_callback(self) -> Callable[[dict[str, Any]], None]:
|
|
return self.subs[-1][2]
|
|
|
|
@property
|
|
def latest_error_callback(self) -> Callable[[BaseException], None] | None:
|
|
return self.subs[-1][3]
|
|
|
|
|
|
def test_match_on_first_event() -> None:
|
|
client = _StubClient()
|
|
with subscribe(client, "delivery") as w:
|
|
client.latest_callback({"kind": "X", "n": 1})
|
|
got = w.next(timeout=1.0)
|
|
assert got == {"kind": "X", "n": 1}
|
|
|
|
|
|
def test_predicate_skips_until_match() -> None:
|
|
client = _StubClient()
|
|
with subscribe(client, "delivery", "Sent") as w:
|
|
client.latest_callback({"n": 1})
|
|
client.latest_callback({"n": 2})
|
|
client.latest_callback({"n": 3})
|
|
got = w.next(lambda e: e["n"] == 3, timeout=1.0)
|
|
assert got["n"] == 3
|
|
|
|
|
|
def test_timeout_with_no_events() -> None:
|
|
client = _StubClient()
|
|
with subscribe(client, "delivery") as w, pytest.raises(EventTimeout):
|
|
w.next(timeout=0.05)
|
|
|
|
|
|
def test_timeout_when_predicate_never_matches() -> None:
|
|
client = _StubClient()
|
|
with subscribe(client, "delivery") as w:
|
|
client.latest_callback({"n": 1})
|
|
client.latest_callback({"n": 2})
|
|
with pytest.raises(EventTimeout):
|
|
w.next(lambda e: e["n"] == 99, timeout=0.05)
|
|
|
|
|
|
def test_predicate_exception_surfaces_on_test_thread() -> None:
|
|
"""Crucial: predicate runs in `.next`, not in the on_event callback,
|
|
so a predicate raise reaches the test rather than being captured by
|
|
upstream's `Subscription._pump`.
|
|
"""
|
|
client = _StubClient()
|
|
with subscribe(client, "delivery") as w:
|
|
client.latest_callback({"n": 1})
|
|
|
|
def boom(_: dict[str, Any]) -> bool:
|
|
raise RuntimeError("predicate-boom")
|
|
|
|
with pytest.raises(RuntimeError, match="predicate-boom"):
|
|
w.next(boom, timeout=1.0)
|
|
|
|
|
|
def test_error_callback_surfaces_via_next() -> None:
|
|
client = _StubClient()
|
|
with subscribe(client, "delivery") as w:
|
|
cb = client.latest_error_callback
|
|
assert cb is not None
|
|
cb(RuntimeError("watcher exploded"))
|
|
with pytest.raises(RuntimeError, match="watcher exploded"):
|
|
w.next(timeout=1.0)
|
|
|
|
|
|
def test_subscription_cancelled_on_exit() -> None:
|
|
client = _StubClient()
|
|
sub_holder: list[_StubSubscription] = []
|
|
real_on_event = client.on_event
|
|
|
|
def capture(*args: Any, **kwargs: Any) -> _StubSubscription:
|
|
sub = real_on_event(*args, **kwargs)
|
|
sub_holder.append(sub)
|
|
return sub
|
|
|
|
client.on_event = capture # type: ignore[method-assign]
|
|
with subscribe(client, "delivery"):
|
|
pass
|
|
assert sub_holder[0].cancelled is True
|
|
|
|
|
|
def test_wait_for_event_one_shot() -> None:
|
|
client = _StubClient()
|
|
# Schedule the event before calling wait_for_event by registering through
|
|
# a wrapper that fires immediately upon subscription.
|
|
real_on_event = client.on_event
|
|
|
|
def fire_immediately(*args: Any, **kwargs: Any) -> _StubSubscription:
|
|
sub = real_on_event(*args, **kwargs)
|
|
client.latest_callback({"kind": "ok"})
|
|
return sub
|
|
|
|
client.on_event = fire_immediately # type: ignore[method-assign]
|
|
got = wait_for_event(client, "delivery", "Sent", timeout=1.0)
|
|
assert got == {"kind": "ok"}
|
|
|
|
|
|
def test_waiter_type_exposed() -> None:
|
|
"""Surface check: the public `Waiter` class is the one yielded."""
|
|
client = _StubClient()
|
|
with subscribe(client, "delivery") as w:
|
|
assert isinstance(w, Waiter)
|
|
|
|
|
|
def test_subscription_reused_across_next_calls() -> None:
|
|
"""The long-lived design must not re-call `on_event` per `.next()`."""
|
|
client = _StubClient()
|
|
with subscribe(client, "delivery") as w:
|
|
client.latest_callback({"n": 1})
|
|
client.latest_callback({"n": 2})
|
|
first = w.next(timeout=1.0)
|
|
second = w.next(timeout=1.0)
|
|
assert first == {"n": 1}
|
|
assert second == {"n": 2}
|
|
assert len(client.subs) == 1
|
|
|
|
|
|
def test_event_none_propagates() -> None:
|
|
"""`subscribe(client, module)` must pass `event=None` to upstream."""
|
|
client = _StubClient()
|
|
with subscribe(client, "delivery"):
|
|
pass
|
|
assert client.subs[0][1] is None
|
|
|
|
|
|
def test_resume_after_timeout() -> None:
|
|
"""A `.next()` that timed out must not poison the queue."""
|
|
client = _StubClient()
|
|
with subscribe(client, "delivery") as w:
|
|
with pytest.raises(EventTimeout):
|
|
w.next(timeout=0.05)
|
|
client.latest_callback({"n": 1})
|
|
got = w.next(timeout=1.0)
|
|
assert got == {"n": 1}
|