diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..8f7978b --- /dev/null +++ b/.dockerignore @@ -0,0 +1,4 @@ +.venv/ +node_modules/ +.env +sqlite.db diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..d66d184 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,28 @@ +FROM python:3.14-slim + +# Project files +COPY . /app +WORKDIR /app + +# Environment variables +ENV NODE_COMPOSE_FILEPATH=/app/docker-compose.yml +ENV PYTHONPATH=/app:/app/src +ENV UV_INSTALL_DIR=/usr/local/bin +ENV NODE_API=http +ENV NODE_MANAGER=noop + +# Package manager and dependencies +# RUN apt-get update && apt-get install -y curl git +RUN --mount=target=/var/lib/apt/lists,type=cache,sharing=locked \ + --mount=target=/var/cache/apt,type=cache,sharing=locked \ + rm -f /etc/apt/apt.conf.d/docker-clean \ + && apt-get update \ + && apt-get install -y curl git +RUN curl -LsSf https://astral.sh/uv/install.sh | sh +RUN uv pip compile pyproject.toml -o requirements.txt && uv pip install --system -r requirements.txt + +# Ports +EXPOSE 8000 + +# Start application +CMD ["python", "/app/src/main.py"] diff --git a/README.md b/README.md index eef5db9..7270dc2 100644 --- a/README.md +++ b/README.md @@ -17,3 +17,7 @@ There are a few assumptions made to facilitate the development of the PoC: - Tests - Fix assumption of 1 block per slot - Log colouring +- Handle reconnections: + - Failures to connect to Node + - Timeouts + - Stream closed diff --git a/src/core/app.py b/src/core/app.py index df5581d..f8a7f56 100644 --- a/src/core/app.py +++ b/src/core/app.py @@ -1,5 +1,5 @@ from asyncio import Task, gather -from typing import Optional +from typing import Literal, Optional from fastapi import FastAPI from pydantic_settings import BaseSettings, SettingsConfigDict @@ -20,6 +20,14 @@ class NBESettings(BaseSettings): node_compose_filepath: str + node_api: Literal["http", "fake"] + node_manager: Literal["docker", "noop"] + + node_api_host: str = "127.0.0.1" + node_api_port: int = 8000 + node_api_timeout: int = 60 + node_api_protocol: str = "http" + class NBEState(State): signal_exit: bool = False # TODO: asyncio.Event @@ -54,4 +62,4 @@ class NBE(FastAPI): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.state = NBEState() - self.settings = NBESettings() # type: ignore[call-arg] # Missing parameter is filled from env file + self.settings = NBESettings() # type: ignore[call-arg] # The missing parameter is filled from the env file diff --git a/src/node/api/base.py b/src/node/api/base.py index 2e9ff57..70987cb 100644 --- a/src/node/api/base.py +++ b/src/node/api/base.py @@ -1,11 +1,18 @@ from abc import ABC, abstractmethod -from typing import AsyncIterator, List +from typing import TYPE_CHECKING, AsyncIterator, List from node.api.serializers.block import BlockSerializer from node.api.serializers.health import HealthSerializer +if TYPE_CHECKING: + from core.app import NBESettings + class NodeApi(ABC): + @abstractmethod + def __init__(self, _settings: "NBESettings"): + pass + @abstractmethod async def get_health(self) -> HealthSerializer: pass diff --git a/src/node/api/builder.py b/src/node/api/builder.py new file mode 100644 index 0000000..e17c252 --- /dev/null +++ b/src/node/api/builder.py @@ -0,0 +1,18 @@ +from typing import TYPE_CHECKING + +from node.api.base import NodeApi +from node.api.fake import FakeNodeApi +from node.api.http import HttpNodeApi + +if TYPE_CHECKING: + from core.app import NBESettings + + +def build_node_api(settings: "NBESettings") -> NodeApi: + match settings.node_api: + case "http": + return HttpNodeApi(settings) + case "fake": + return FakeNodeApi(settings) + case _: + raise ValueError(f"Unknown API name: {settings.node_api}. Available options are: 'api', 'fake'.") diff --git a/src/node/api/fake.py b/src/node/api/fake.py index 8624813..5789641 100644 --- a/src/node/api/fake.py +++ b/src/node/api/fake.py @@ -1,6 +1,6 @@ from asyncio import sleep from random import choices, random -from typing import AsyncIterator, List +from typing import TYPE_CHECKING, AsyncIterator, List from rusty_results import Some @@ -8,6 +8,9 @@ from node.api.base import NodeApi from node.api.serializers.block import BlockSerializer from node.api.serializers.health import HealthSerializer +if TYPE_CHECKING: + from core.app import NBESettings + def get_weighted_amount() -> int: items = [1, 2, 3] @@ -16,7 +19,7 @@ def get_weighted_amount() -> int: class FakeNodeApi(NodeApi): - def __init__(self): + def __init__(self, _settings: "NBESettings"): self.current_slot: int = 0 async def get_health(self) -> HealthSerializer: diff --git a/src/node/api/http.py b/src/node/api/http.py index 89a65de..9b1ff6c 100644 --- a/src/node/api/http.py +++ b/src/node/api/http.py @@ -1,14 +1,19 @@ import logging -from typing import AsyncIterator, List +from typing import TYPE_CHECKING, AsyncIterator, List from urllib.parse import urljoin import httpx import requests +from pydantic import ValidationError from node.api.base import NodeApi from node.api.serializers.block import BlockSerializer from node.api.serializers.health import HealthSerializer +if TYPE_CHECKING: + from core.app import NBESettings + + logger = logging.getLogger(__name__) @@ -18,11 +23,11 @@ class HttpNodeApi(NodeApi): ENDPOINT_BLOCKS = "/cryptarchia/blocks" ENDPOINT_BLOCKS_STREAM = "/cryptarchia/blocks/stream" - def __init__(self, host: str, port: int, protocol: str = "http", timeout: int = 60): - self.host: str = host - self.port: int = port - self.protocol: str = protocol - self.timeout: int = timeout + def __init__(self, settings: "NBESettings"): + self.host: str = settings.node_api_host + self.port: int = settings.node_api_port + self.protocol: str = settings.node_api_protocol or "http" + self.timeout: int = settings.node_api_timeout or 60 @property def base_url(self): @@ -57,10 +62,9 @@ class HttpNodeApi(NodeApi): continue try: block = BlockSerializer.model_validate_json(line) - except Exception as e: - import traceback + except ValidationError as error: + logger.exception(error) + continue - traceback.print_exc() - raise e logger.debug(f"Received new block from Node: {block}") yield block diff --git a/src/node/lifespan.py b/src/node/lifespan.py index d1feb33..f81fcdd 100644 --- a/src/node/lifespan.py +++ b/src/node/lifespan.py @@ -9,11 +9,9 @@ from db.blocks import BlockRepository from db.clients import SqliteClient from db.transaction import TransactionRepository from models.block import Block -from node.api.fake import FakeNodeApi -from node.api.http import HttpNodeApi +from node.api.builder import build_node_api from node.api.serializers.block import BlockSerializer -from node.manager.docker import DockerModeManager -from node.manager.fake import FakeNodeManager +from node.manager.builder import build_node_manager if TYPE_CHECKING: from core.app import NBE @@ -23,16 +21,14 @@ logger = logging.getLogger(__name__) @asynccontextmanager async def node_lifespan(app: "NBE") -> AsyncGenerator[None]: + app.state.node_manager = build_node_manager(app.settings) + app.state.node_api = build_node_api(app.settings) + db_client = SqliteClient() - - app.state.node_manager = FakeNodeManager() - # app.state.node_manager = DockerModeManager(app.settings.node_compose_filepath) - # app.state.node_api = FakeNodeApi() - app.state.node_api = HttpNodeApi(host="127.0.0.1", port=18080) - app.state.db_client = db_client app.state.block_repository = BlockRepository(db_client) app.state.transaction_repository = TransactionRepository(db_client) + try: logger.info("Starting node...") await app.state.node_manager.start() @@ -74,7 +70,6 @@ async def subscribe_to_updates(app: "NBE") -> None: logger.info("✅ Subscription to new blocks and transactions started.") async with TaskGroup() as tg: tg.create_task(subscribe_to_new_blocks(app)) - tg.create_task(subscribe_to_new_transactions(app)) logger.info("Subscription to new blocks and transactions finished.") @@ -96,39 +91,25 @@ async def subscribe_to_new_blocks(app: "NBE"): except TimeoutError: continue except StopAsyncIteration: - import traceback - - traceback.print_exc() - logger.error("Subscription to the new blocks stream ended unexpectedly. Please restart the node.") + logger.error(f"Subscription to the new blocks stream ended unexpectedly. Please restart the node.") break - except Exception as e: - import traceback - - traceback.print_exc() - logger.error(f"Error while fetching new blocks: {e}") + except Exception as error: + logger.exception(f"Error while fetching new blocks: {error}") continue try: block = block_serializer.into_block() await app.state.block_repository.create(block) - except Exception as e: - import traceback - - traceback.print_exc() - logger.error(f"Error while saving new block: {e}") + except Exception as error: + logger.exception(f"Error while storing new block: {error}") finally: await _gracefully_close_stream(blocks_stream) -async def subscribe_to_new_transactions(_app: "NBE"): - pass - - async def backfill(app: "NBE") -> None: logger.info("Backfilling started.") async with TaskGroup() as tg: tg.create_task(backfill_blocks(app, db_hit_interval_seconds=3)) - tg.create_task(backfill_transactions(app)) logger.info("✅ Backfilling finished.") @@ -167,7 +148,3 @@ async def backfill_blocks(app: "NBE", *, db_hit_interval_seconds: int, batch_siz await app.state.block_repository.create(*blocks) slot_to = slot_from - 1 logger.info("Backfilling blocks completed.") - - -async def backfill_transactions(_app: "NBE"): - pass diff --git a/src/node/manager/base.py b/src/node/manager/base.py index a5fab97..e6c5efe 100644 --- a/src/node/manager/base.py +++ b/src/node/manager/base.py @@ -1,7 +1,15 @@ from abc import ABC, abstractmethod +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from core.app import NBESettings class NodeManager(ABC): + @abstractmethod + def __init__(self, _settings: "NBESettings"): + pass + @abstractmethod async def start(self): pass diff --git a/src/node/manager/builder.py b/src/node/manager/builder.py new file mode 100644 index 0000000..ed87641 --- /dev/null +++ b/src/node/manager/builder.py @@ -0,0 +1,18 @@ +from typing import TYPE_CHECKING + +from node.manager.base import NodeManager +from node.manager.docker import DockerModeManager +from node.manager.noop import NoopNodeManager + +if TYPE_CHECKING: + from core.app import NBESettings + + +def build_node_manager(settings: "NBESettings") -> NodeManager: + match settings.node_manager: + case "docker": + return DockerModeManager(settings) + case "noop": + return NoopNodeManager(settings) + case _: + raise ValueError(f"Unknown Manager name: {settings.node_manager}. Available options are: 'docker', 'noop'.") diff --git a/src/node/manager/docker.py b/src/node/manager/docker.py index 98b46ca..6e1ee0d 100644 --- a/src/node/manager/docker.py +++ b/src/node/manager/docker.py @@ -1,4 +1,5 @@ import logging +from typing import TYPE_CHECKING from python_on_whales import DockerException from python_on_whales.docker_client import DockerClient @@ -6,14 +7,18 @@ from rusty_results import Err, Ok, Result from node.manager.base import NodeManager +if TYPE_CHECKING: + from core.app import NBESettings + + logger = logging.getLogger(__name__) class DockerModeManager(NodeManager): - def __init__(self, compose_filepath: str): + def __init__(self, settings: "NBESettings"): self.client: DockerClient = DockerClient( client_type="docker", - compose_files=[compose_filepath], + compose_files=[settings.node_compose_filepath], ) match self.ps(): @@ -34,7 +39,7 @@ class DockerModeManager(NodeManager): async def start(self): services = self.ps().map(lambda _services: len(_services)).expect("Failed to get compose services.") if services > 0: - logger.warn("Compose services are already running.") + logger.warning("Compose services are already running.") return self.client.compose.up( diff --git a/src/node/manager/fake.py b/src/node/manager/fake.py deleted file mode 100644 index 6bdc626..0000000 --- a/src/node/manager/fake.py +++ /dev/null @@ -1,9 +0,0 @@ -from node.manager.base import NodeManager - - -class FakeNodeManager(NodeManager): - async def start(self): - pass - - async def stop(self): - pass diff --git a/src/node/manager/noop.py b/src/node/manager/noop.py new file mode 100644 index 0000000..aa4524f --- /dev/null +++ b/src/node/manager/noop.py @@ -0,0 +1,17 @@ +from typing import TYPE_CHECKING + +from node.manager.base import NodeManager + +if TYPE_CHECKING: + from core.app import NBESettings + + +class NoopNodeManager(NodeManager): + def __init__(self, _settings: "NBESettings"): + pass + + async def start(self): + pass + + async def stop(self): + pass