Add builder for node and manager.

This commit is contained in:
Alejandro Cabeza Romero 2025-11-03 13:17:19 +01:00
parent 0a241f8915
commit 8896b95836
No known key found for this signature in database
GPG Key ID: DA3D14AE478030FD
14 changed files with 153 additions and 61 deletions

4
.dockerignore Normal file
View File

@ -0,0 +1,4 @@
.venv/
node_modules/
.env
sqlite.db

28
Dockerfile Normal file
View File

@ -0,0 +1,28 @@
FROM python:3.14-slim
# Project files
COPY . /app
WORKDIR /app
# Environment variables
ENV NODE_COMPOSE_FILEPATH=/app/docker-compose.yml
ENV PYTHONPATH=/app:/app/src
ENV UV_INSTALL_DIR=/usr/local/bin
ENV NODE_API=http
ENV NODE_MANAGER=noop
# Package manager and dependencies
# RUN apt-get update && apt-get install -y curl git
RUN --mount=target=/var/lib/apt/lists,type=cache,sharing=locked \
--mount=target=/var/cache/apt,type=cache,sharing=locked \
rm -f /etc/apt/apt.conf.d/docker-clean \
&& apt-get update \
&& apt-get install -y curl git
RUN curl -LsSf https://astral.sh/uv/install.sh | sh
RUN uv pip compile pyproject.toml -o requirements.txt && uv pip install --system -r requirements.txt
# Ports
EXPOSE 8000
# Start application
CMD ["python", "/app/src/main.py"]

View File

@ -17,3 +17,7 @@ There are a few assumptions made to facilitate the development of the PoC:
- Tests - Tests
- Fix assumption of 1 block per slot - Fix assumption of 1 block per slot
- Log colouring - Log colouring
- Handle reconnections:
- Failures to connect to Node
- Timeouts
- Stream closed

View File

@ -1,5 +1,5 @@
from asyncio import Task, gather from asyncio import Task, gather
from typing import Optional from typing import Literal, Optional
from fastapi import FastAPI from fastapi import FastAPI
from pydantic_settings import BaseSettings, SettingsConfigDict from pydantic_settings import BaseSettings, SettingsConfigDict
@ -20,6 +20,14 @@ class NBESettings(BaseSettings):
node_compose_filepath: str node_compose_filepath: str
node_api: Literal["http", "fake"]
node_manager: Literal["docker", "noop"]
node_api_host: str = "127.0.0.1"
node_api_port: int = 8000
node_api_timeout: int = 60
node_api_protocol: str = "http"
class NBEState(State): class NBEState(State):
signal_exit: bool = False # TODO: asyncio.Event signal_exit: bool = False # TODO: asyncio.Event
@ -54,4 +62,4 @@ class NBE(FastAPI):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.state = NBEState() self.state = NBEState()
self.settings = NBESettings() # type: ignore[call-arg] # Missing parameter is filled from env file self.settings = NBESettings() # type: ignore[call-arg] # The missing parameter is filled from the env file

View File

@ -1,11 +1,18 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import AsyncIterator, List from typing import TYPE_CHECKING, AsyncIterator, List
from node.api.serializers.block import BlockSerializer from node.api.serializers.block import BlockSerializer
from node.api.serializers.health import HealthSerializer from node.api.serializers.health import HealthSerializer
if TYPE_CHECKING:
from core.app import NBESettings
class NodeApi(ABC): class NodeApi(ABC):
@abstractmethod
def __init__(self, _settings: "NBESettings"):
pass
@abstractmethod @abstractmethod
async def get_health(self) -> HealthSerializer: async def get_health(self) -> HealthSerializer:
pass pass

18
src/node/api/builder.py Normal file
View File

@ -0,0 +1,18 @@
from typing import TYPE_CHECKING
from node.api.base import NodeApi
from node.api.fake import FakeNodeApi
from node.api.http import HttpNodeApi
if TYPE_CHECKING:
from core.app import NBESettings
def build_node_api(settings: "NBESettings") -> NodeApi:
match settings.node_api:
case "http":
return HttpNodeApi(settings)
case "fake":
return FakeNodeApi(settings)
case _:
raise ValueError(f"Unknown API name: {settings.node_api}. Available options are: 'api', 'fake'.")

View File

@ -1,6 +1,6 @@
from asyncio import sleep from asyncio import sleep
from random import choices, random from random import choices, random
from typing import AsyncIterator, List from typing import TYPE_CHECKING, AsyncIterator, List
from rusty_results import Some from rusty_results import Some
@ -8,6 +8,9 @@ from node.api.base import NodeApi
from node.api.serializers.block import BlockSerializer from node.api.serializers.block import BlockSerializer
from node.api.serializers.health import HealthSerializer from node.api.serializers.health import HealthSerializer
if TYPE_CHECKING:
from core.app import NBESettings
def get_weighted_amount() -> int: def get_weighted_amount() -> int:
items = [1, 2, 3] items = [1, 2, 3]
@ -16,7 +19,7 @@ def get_weighted_amount() -> int:
class FakeNodeApi(NodeApi): class FakeNodeApi(NodeApi):
def __init__(self): def __init__(self, _settings: "NBESettings"):
self.current_slot: int = 0 self.current_slot: int = 0
async def get_health(self) -> HealthSerializer: async def get_health(self) -> HealthSerializer:

View File

@ -1,14 +1,19 @@
import logging import logging
from typing import AsyncIterator, List from typing import TYPE_CHECKING, AsyncIterator, List
from urllib.parse import urljoin from urllib.parse import urljoin
import httpx import httpx
import requests import requests
from pydantic import ValidationError
from node.api.base import NodeApi from node.api.base import NodeApi
from node.api.serializers.block import BlockSerializer from node.api.serializers.block import BlockSerializer
from node.api.serializers.health import HealthSerializer from node.api.serializers.health import HealthSerializer
if TYPE_CHECKING:
from core.app import NBESettings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -18,11 +23,11 @@ class HttpNodeApi(NodeApi):
ENDPOINT_BLOCKS = "/cryptarchia/blocks" ENDPOINT_BLOCKS = "/cryptarchia/blocks"
ENDPOINT_BLOCKS_STREAM = "/cryptarchia/blocks/stream" ENDPOINT_BLOCKS_STREAM = "/cryptarchia/blocks/stream"
def __init__(self, host: str, port: int, protocol: str = "http", timeout: int = 60): def __init__(self, settings: "NBESettings"):
self.host: str = host self.host: str = settings.node_api_host
self.port: int = port self.port: int = settings.node_api_port
self.protocol: str = protocol self.protocol: str = settings.node_api_protocol or "http"
self.timeout: int = timeout self.timeout: int = settings.node_api_timeout or 60
@property @property
def base_url(self): def base_url(self):
@ -57,10 +62,9 @@ class HttpNodeApi(NodeApi):
continue continue
try: try:
block = BlockSerializer.model_validate_json(line) block = BlockSerializer.model_validate_json(line)
except Exception as e: except ValidationError as error:
import traceback logger.exception(error)
continue
traceback.print_exc()
raise e
logger.debug(f"Received new block from Node: {block}") logger.debug(f"Received new block from Node: {block}")
yield block yield block

View File

@ -9,11 +9,9 @@ from db.blocks import BlockRepository
from db.clients import SqliteClient from db.clients import SqliteClient
from db.transaction import TransactionRepository from db.transaction import TransactionRepository
from models.block import Block from models.block import Block
from node.api.fake import FakeNodeApi from node.api.builder import build_node_api
from node.api.http import HttpNodeApi
from node.api.serializers.block import BlockSerializer from node.api.serializers.block import BlockSerializer
from node.manager.docker import DockerModeManager from node.manager.builder import build_node_manager
from node.manager.fake import FakeNodeManager
if TYPE_CHECKING: if TYPE_CHECKING:
from core.app import NBE from core.app import NBE
@ -23,16 +21,14 @@ logger = logging.getLogger(__name__)
@asynccontextmanager @asynccontextmanager
async def node_lifespan(app: "NBE") -> AsyncGenerator[None]: async def node_lifespan(app: "NBE") -> AsyncGenerator[None]:
app.state.node_manager = build_node_manager(app.settings)
app.state.node_api = build_node_api(app.settings)
db_client = SqliteClient() db_client = SqliteClient()
app.state.node_manager = FakeNodeManager()
# app.state.node_manager = DockerModeManager(app.settings.node_compose_filepath)
# app.state.node_api = FakeNodeApi()
app.state.node_api = HttpNodeApi(host="127.0.0.1", port=18080)
app.state.db_client = db_client app.state.db_client = db_client
app.state.block_repository = BlockRepository(db_client) app.state.block_repository = BlockRepository(db_client)
app.state.transaction_repository = TransactionRepository(db_client) app.state.transaction_repository = TransactionRepository(db_client)
try: try:
logger.info("Starting node...") logger.info("Starting node...")
await app.state.node_manager.start() await app.state.node_manager.start()
@ -74,7 +70,6 @@ async def subscribe_to_updates(app: "NBE") -> None:
logger.info("✅ Subscription to new blocks and transactions started.") logger.info("✅ Subscription to new blocks and transactions started.")
async with TaskGroup() as tg: async with TaskGroup() as tg:
tg.create_task(subscribe_to_new_blocks(app)) tg.create_task(subscribe_to_new_blocks(app))
tg.create_task(subscribe_to_new_transactions(app))
logger.info("Subscription to new blocks and transactions finished.") logger.info("Subscription to new blocks and transactions finished.")
@ -96,39 +91,25 @@ async def subscribe_to_new_blocks(app: "NBE"):
except TimeoutError: except TimeoutError:
continue continue
except StopAsyncIteration: except StopAsyncIteration:
import traceback logger.error(f"Subscription to the new blocks stream ended unexpectedly. Please restart the node.")
traceback.print_exc()
logger.error("Subscription to the new blocks stream ended unexpectedly. Please restart the node.")
break break
except Exception as e: except Exception as error:
import traceback logger.exception(f"Error while fetching new blocks: {error}")
traceback.print_exc()
logger.error(f"Error while fetching new blocks: {e}")
continue continue
try: try:
block = block_serializer.into_block() block = block_serializer.into_block()
await app.state.block_repository.create(block) await app.state.block_repository.create(block)
except Exception as e: except Exception as error:
import traceback logger.exception(f"Error while storing new block: {error}")
traceback.print_exc()
logger.error(f"Error while saving new block: {e}")
finally: finally:
await _gracefully_close_stream(blocks_stream) await _gracefully_close_stream(blocks_stream)
async def subscribe_to_new_transactions(_app: "NBE"):
pass
async def backfill(app: "NBE") -> None: async def backfill(app: "NBE") -> None:
logger.info("Backfilling started.") logger.info("Backfilling started.")
async with TaskGroup() as tg: async with TaskGroup() as tg:
tg.create_task(backfill_blocks(app, db_hit_interval_seconds=3)) tg.create_task(backfill_blocks(app, db_hit_interval_seconds=3))
tg.create_task(backfill_transactions(app))
logger.info("✅ Backfilling finished.") logger.info("✅ Backfilling finished.")
@ -167,7 +148,3 @@ async def backfill_blocks(app: "NBE", *, db_hit_interval_seconds: int, batch_siz
await app.state.block_repository.create(*blocks) await app.state.block_repository.create(*blocks)
slot_to = slot_from - 1 slot_to = slot_from - 1
logger.info("Backfilling blocks completed.") logger.info("Backfilling blocks completed.")
async def backfill_transactions(_app: "NBE"):
pass

View File

@ -1,7 +1,15 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from core.app import NBESettings
class NodeManager(ABC): class NodeManager(ABC):
@abstractmethod
def __init__(self, _settings: "NBESettings"):
pass
@abstractmethod @abstractmethod
async def start(self): async def start(self):
pass pass

View File

@ -0,0 +1,18 @@
from typing import TYPE_CHECKING
from node.manager.base import NodeManager
from node.manager.docker import DockerModeManager
from node.manager.noop import NoopNodeManager
if TYPE_CHECKING:
from core.app import NBESettings
def build_node_manager(settings: "NBESettings") -> NodeManager:
match settings.node_manager:
case "docker":
return DockerModeManager(settings)
case "noop":
return NoopNodeManager(settings)
case _:
raise ValueError(f"Unknown Manager name: {settings.node_manager}. Available options are: 'docker', 'noop'.")

View File

@ -1,4 +1,5 @@
import logging import logging
from typing import TYPE_CHECKING
from python_on_whales import DockerException from python_on_whales import DockerException
from python_on_whales.docker_client import DockerClient from python_on_whales.docker_client import DockerClient
@ -6,14 +7,18 @@ from rusty_results import Err, Ok, Result
from node.manager.base import NodeManager from node.manager.base import NodeManager
if TYPE_CHECKING:
from core.app import NBESettings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class DockerModeManager(NodeManager): class DockerModeManager(NodeManager):
def __init__(self, compose_filepath: str): def __init__(self, settings: "NBESettings"):
self.client: DockerClient = DockerClient( self.client: DockerClient = DockerClient(
client_type="docker", client_type="docker",
compose_files=[compose_filepath], compose_files=[settings.node_compose_filepath],
) )
match self.ps(): match self.ps():
@ -34,7 +39,7 @@ class DockerModeManager(NodeManager):
async def start(self): async def start(self):
services = self.ps().map(lambda _services: len(_services)).expect("Failed to get compose services.") services = self.ps().map(lambda _services: len(_services)).expect("Failed to get compose services.")
if services > 0: if services > 0:
logger.warn("Compose services are already running.") logger.warning("Compose services are already running.")
return return
self.client.compose.up( self.client.compose.up(

View File

@ -1,9 +0,0 @@
from node.manager.base import NodeManager
class FakeNodeManager(NodeManager):
async def start(self):
pass
async def stop(self):
pass

17
src/node/manager/noop.py Normal file
View File

@ -0,0 +1,17 @@
from typing import TYPE_CHECKING
from node.manager.base import NodeManager
if TYPE_CHECKING:
from core.app import NBESettings
class NoopNodeManager(NodeManager):
def __init__(self, _settings: "NBESettings"):
pass
async def start(self):
pass
async def stop(self):
pass