import logging from typing import TYPE_CHECKING, AsyncIterator, List from urllib.parse import urljoin import httpx import requests from pydantic import ValidationError from node.api.base import NodeApi from node.api.serializers.block import BlockSerializer from node.api.serializers.health import HealthSerializer if TYPE_CHECKING: from core.app import NBESettings logger = logging.getLogger(__name__) class HttpNodeApi(NodeApi): ENDPOINT_INFO = "/cryptarchia/info" ENDPOINT_TRANSACTIONS = "/cryptarchia/transactions" ENDPOINT_BLOCKS = "/cryptarchia/blocks" ENDPOINT_BLOCKS_STREAM = "/cryptarchia/blocks/stream" def __init__(self, settings: "NBESettings"): self.host: str = settings.node_api_host self.port: int = settings.node_api_port self.protocol: str = settings.node_api_protocol or "http" self.timeout: int = settings.node_api_timeout or 60 @property def base_url(self): return f"{self.protocol}://{self.host}:{self.port}" async def get_health(self) -> HealthSerializer: url = urljoin(self.base_url, self.ENDPOINT_INFO) response = requests.get(url, timeout=60) if response.status_code == 200: return HealthSerializer.from_healthy() else: return HealthSerializer.from_unhealthy() async def get_blocks(self, slot_from: int, slot_to: int) -> List[BlockSerializer]: query_string = f"slot_from={slot_from}&slot_to={slot_to}" endpoint = urljoin(self.base_url, self.ENDPOINT_BLOCKS) url = f"{endpoint}?{query_string}" response = requests.get(url, timeout=60) python_json = response.json() blocks = [BlockSerializer.model_validate(item) for item in python_json] return blocks async def get_blocks_stream(self) -> AsyncIterator[BlockSerializer]: url = urljoin(self.base_url, self.ENDPOINT_BLOCKS_STREAM) async with httpx.AsyncClient(timeout=self.timeout) as client: async with client.stream("GET", url) as response: response.raise_for_status() # TODO: Result async for line in response.aiter_lines(): if not line: continue try: block = BlockSerializer.model_validate_json(line) except ValidationError as error: logger.exception(error) continue logger.debug(f"Received new block from Node: {block}") yield block