import json import logging import time import websocket class SignalClient: def __init__(self, ws_url, await_signals): self.url = f"{ws_url}/signals" self.await_signals = await_signals self.received_signals = {signal: [] for signal in self.await_signals} def on_message(self, ws, signal): logger = logging.getLogger(__name__) signal_data = json.loads(signal) signal_type = signal_data.get("type") logger.info(f"Received signal: {signal_data}") if signal_type in self.await_signals: self.received_signals[signal_type].append(signal_data) # logger.debug(f"Signal {signal_type} stored: {signal_data}") def wait_for_signal(self, signal_type, expected_event=None, timeout=20): logger = logging.getLogger(__name__) start_time = time.time() while time.time() - start_time < timeout: if self.received_signals.get(signal_type): received_signal = self.received_signals[signal_type][0] if expected_event: event = received_signal.get("event", {}) if all(event.get(k) == v for k, v in expected_event.items()): logger.info(f"Signal {signal_type} with event {expected_event} received and matched.") return received_signal else: logger.debug( f"Signal {signal_type} received but event did not match expected event: {expected_event}. Received event: {event}") else: logger.info(f"Signal {signal_type} received without specific event validation.") return received_signal time.sleep(0.2) raise TimeoutError(f"Signal {signal_type} with event {expected_event} not received in {timeout} seconds") def _on_error(self, ws, error): logger = logging.getLogger(__name__) logger.error(f"WebSocket error: {error}") def _on_close(self, ws, close_status_code, close_msg): logger = logging.getLogger(__name__) logger.info(f"WebSocket connection closed: {close_status_code}, {close_msg}") def _on_open(self, ws): logger = logging.getLogger(__name__) logger.info("WebSocket connection opened") def _connect(self): ws = websocket.WebSocketApp( self.url, on_message=self.on_message, on_error=self._on_error, on_close=self._on_close ) ws.on_open = self._on_open ws.run_forever()