commit 89bea8c7a5bb55380c047eaea297c5b5fea3a3db Author: Alejandro Cabeza Romero Date: Fri Oct 3 22:27:30 2025 +0200 Basic block explorer with faked data. diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9eaceec --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.venv/** +.idea/** +**/__pycache__/** +sqlite.db +*.ignore* diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..730979f --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,18 @@ +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v6.0.0 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: check-yaml + - id: check-added-large-files + + - repo: https://github.com/psf/black + rev: 25.9.0 + hooks: + - id: black + + - repo: https://github.com/PyCQA/isort + rev: 6.1.0 + hooks: + - id: isort diff --git a/README.md b/README.md new file mode 100644 index 0000000..8cf3fd1 --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# Nomos Block Explorer diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..9fc1095 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,11 @@ +[tool.black] +line-length = 120 +target-version = ["py313"] +skip-string-normalization = false + +[tool.isort] +profile = "black" +combine_as_imports = true +src_paths = ["src"] +skip_gitignore = true +ensure_newline_before_comments = true diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..96d7244 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +fastapi~=0.118.0 +uvicorn~=0.37.0 +requests~=2.32.5 +python-on-whales~=0.78.0 +sqlmodel~=0.0.25 +rusty-results~=1.1.1 diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..f74cace --- /dev/null +++ b/src/__init__.py @@ -0,0 +1,4 @@ +from pathlib import Path + +DIR_SRC = Path(__file__).resolve().parent +DIR_REPO = DIR_SRC.parent diff --git a/src/api/__init__.py b/src/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/api/router.py b/src/api/router.py new file mode 100644 index 0000000..b956898 --- /dev/null +++ b/src/api/router.py @@ -0,0 +1,9 @@ +from fastapi import APIRouter + +from .v1.router import router as v1_router + + +def create_api_router() -> APIRouter: + router = APIRouter() + router.include_router(v1_router, prefix="/v1") + return router diff --git a/src/api/utils.py b/src/api/utils.py new file mode 100644 index 0000000..a83ee91 --- /dev/null +++ b/src/api/utils.py @@ -0,0 +1,27 @@ +from asyncio import sleep +from typing import AsyncIterable, AsyncIterator, List, Union + +from core.models import IdNbeModel + +Data = Union[IdNbeModel, List[IdNbeModel]] +Stream = AsyncIterator[Data] + + +def _parse_stream_data(data: Data) -> bytes: + if isinstance(data, list): + return b"".join(item.model_dump_ndjson() for item in data) + else: + return data.model_dump_ndjson() + + +async def streamer( + stream: Stream, + bootstrap_data: Data = None, + interval: int = 5, +) -> AsyncIterable[bytes]: + if bootstrap_data is not None: + yield _parse_stream_data(bootstrap_data) + + async for data in stream: + yield _parse_stream_data(data) + await sleep(interval) diff --git a/src/api/v1/__init__.py b/src/api/v1/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/api/v1/blocks.py b/src/api/v1/blocks.py new file mode 100644 index 0000000..71bb48f --- /dev/null +++ b/src/api/v1/blocks.py @@ -0,0 +1,15 @@ +from typing import List + +from starlette.responses import Response + +from api.utils import streamer +from core.api import NBERequest, NDJsonStreamingResponse +from node.models.blocks import Block + + +async def stream(request: NBERequest) -> Response: + bootstrap_blocks: List[Block] = await request.app.state.block_repository.get_latest(limit=5, descending=False) + highest_slot: int = max((block.slot for block in bootstrap_blocks), default=0) + updates_stream = request.app.state.block_repository.updates_stream(slot_from=highest_slot + 1) + block_stream = streamer(stream=updates_stream, bootstrap_data=bootstrap_blocks) + return NDJsonStreamingResponse(block_stream) diff --git a/src/api/v1/health.py b/src/api/v1/health.py new file mode 100644 index 0000000..6555056 --- /dev/null +++ b/src/api/v1/health.py @@ -0,0 +1,24 @@ +from typing import AsyncIterator + +from starlette.responses import JSONResponse, Response + +from api.utils import streamer +from core.api import NBERequest, NDJsonStreamingResponse +from node.api.base import NodeApi +from node.models.health import Health + + +async def get(request: NBERequest) -> Response: + response = await request.app.state.node_api.get_health_check() + return JSONResponse(response) + + +async def _health_iterator(node_api: NodeApi) -> AsyncIterator[Health]: + while True: + yield await node_api.get_health_check() + + +async def stream(request: NBERequest) -> Response: + _stream = _health_iterator(request.app.state.node_api) + health_stream = streamer(stream=_stream, interval=2) + return NDJsonStreamingResponse(health_stream) diff --git a/src/api/v1/index.py b/src/api/v1/index.py new file mode 100644 index 0000000..336d6b0 --- /dev/null +++ b/src/api/v1/index.py @@ -0,0 +1,8 @@ +from starlette.responses import JSONResponse, Response + +from core.api import NBERequest + + +async def index(_request: NBERequest) -> Response: + content = {"version": "1"} + return JSONResponse(content) diff --git a/src/api/v1/router.py b/src/api/v1/router.py new file mode 100644 index 0000000..060f7a6 --- /dev/null +++ b/src/api/v1/router.py @@ -0,0 +1,11 @@ +from fastapi import APIRouter + +from . import blocks, health, index, transactions + +router = APIRouter() +router.add_api_route("/", index.index, methods=["GET", "HEAD"]) +router.add_api_route("/health", health.get, methods=["GET", "HEAD"]) +router.add_api_route("/health/stream", health.stream, methods=["GET", "HEAD"]) + +router.add_api_route("/transactions/stream", transactions.stream, methods=["GET"]) +router.add_api_route("/blocks/stream", blocks.stream, methods=["GET"]) diff --git a/src/api/v1/transactions.py b/src/api/v1/transactions.py new file mode 100644 index 0000000..fe81cab --- /dev/null +++ b/src/api/v1/transactions.py @@ -0,0 +1,23 @@ +from datetime import datetime +from typing import List + +from starlette.responses import Response + +from api.utils import streamer +from core.api import NBERequest, NDJsonStreamingResponse +from node.models.transactions import Transaction +from utils.datetime import increment_datetime + + +async def stream(request: NBERequest) -> Response: + bootstrap_transactions: List[Transaction] = await request.app.state.transaction_repository.get_latest( + limit=5, descending=False + ) + highest_timestamp: datetime = max( + (transaction.timestamp for transaction in bootstrap_transactions), default=datetime.min + ) + updates_stream = request.app.state.transaction_repository.updates_stream( + timestamp_from=increment_datetime(highest_timestamp) + ) + transaction_stream = streamer(stream=updates_stream, bootstrap_data=bootstrap_transactions) + return NDJsonStreamingResponse(transaction_stream) diff --git a/src/app.py b/src/app.py new file mode 100644 index 0000000..91e2ec4 --- /dev/null +++ b/src/app.py @@ -0,0 +1,11 @@ +from fastapi import FastAPI + +from core.app import NBE +from lifespan import lifespan +from router import create_router + + +def create_app() -> FastAPI: + app = NBE(lifespan=lifespan) + app.include_router(create_router()) + return app diff --git a/src/core/__init__.py b/src/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/core/api.py b/src/core/api.py new file mode 100644 index 0000000..e237ba8 --- /dev/null +++ b/src/core/api.py @@ -0,0 +1,21 @@ +from starlette.requests import Request +from starlette.responses import ContentStream, StreamingResponse + +from core.app import NBE + + +class NBERequest(Request): + app: NBE + + +class NDJsonStreamingResponse(StreamingResponse): + def __init__(self, content: ContentStream): + super().__init__( + content, + media_type="application/x-ndjson", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) diff --git a/src/core/app.py b/src/core/app.py new file mode 100644 index 0000000..ad02a7a --- /dev/null +++ b/src/core/app.py @@ -0,0 +1,45 @@ +from asyncio import Task, gather +from typing import Optional + +from fastapi import FastAPI +from starlette.datastructures import State + +from db.blocks import BlockRepository +from db.clients import DbClient +from db.transaction import TransactionRepository +from node.api.base import NodeApi +from node.manager.base import NodeManager + + +class NBEState(State): + signal_exit: bool = False + node_manager: Optional[NodeManager] + node_api: Optional[NodeApi] + db_client: DbClient + block_repository: BlockRepository + transaction_repository: TransactionRepository + subscription_to_updates_handle: Task + backfill_handle: Task + + @property + def is_running(self) -> bool: + return not self.signal_exit + + async def stop(self): + self.signal_exit = True + await self._wait_tasks_finished() + + async def _wait_tasks_finished(self): + await gather( + self.subscription_to_updates_handle, + self.backfill_handle, + return_exceptions=True, + ) + + +class NBE(FastAPI): + state: NBEState + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.state = NBEState() diff --git a/src/core/encodings.py b/src/core/encodings.py new file mode 100644 index 0000000..1d9625e --- /dev/null +++ b/src/core/encodings.py @@ -0,0 +1,5 @@ +import json + + +def ndjson(json_data: dict | list) -> bytes: + return (json.dumps(json_data) + "\n").encode("utf-8") diff --git a/src/core/models.py b/src/core/models.py new file mode 100644 index 0000000..7a804c6 --- /dev/null +++ b/src/core/models.py @@ -0,0 +1,12 @@ +from typing import Optional + +from sqlmodel import Field, SQLModel + + +class NbeModel(SQLModel): + def model_dump_ndjson(self) -> bytes: + return f"{self.model_dump_json()}\n".encode("utf-8") + + +class IdNbeModel(NbeModel): + id: Optional[int] = Field(default=None, primary_key=True) diff --git a/src/db/__init__.py b/src/db/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/db/blocks.py b/src/db/blocks.py new file mode 100644 index 0000000..b007b8c --- /dev/null +++ b/src/db/blocks.py @@ -0,0 +1,52 @@ +from typing import AsyncIterator, Iterable, List + +from rusty_results import Empty, Option, Some +from sqlalchemy import Result +from sqlmodel import select + +from db.clients import DbClient +from node.models.blocks import Block + + +class BlockRepository: + def __init__(self, client: DbClient): + self.client = client + + async def create(self, block: Iterable[Block]) -> None: + with self.client.session() as session: + session.add_all(block) + session.commit() + + async def get_latest(self, limit: int, descending: bool = True) -> List[Block]: + statement = select(Block).limit(limit) + if descending: + statement = statement.order_by(Block.slot.desc()) + else: + statement = statement.order_by(Block.slot.asc()) + + with self.client.session() as session: + results: Result[Block] = session.exec(statement) + return results.all() + + async def updates_stream(self, slot_from: int) -> AsyncIterator[List[Block]]: + _slot_from = slot_from + while True: + statement = select(Block).where(Block.slot >= _slot_from).order_by(Block.slot.asc()) + + with self.client.session() as session: + blocks: List[Block] = session.exec(statement).all() + + if len(blocks) > 0: + # POC: Assumes slots are sequential and one block per slot + _slot_from = blocks[-1].slot + 1 + + yield blocks + + async def get_earliest(self) -> Option[Block]: + with self.client.session() as session: + statement = select(Block).order_by(Block.slot.asc()).limit(1) + results: Result[Block] = session.exec(statement) + if (block := results.first()) is not None: + return Some(block) + else: + return Empty() diff --git a/src/db/clients/__init__.py b/src/db/clients/__init__.py new file mode 100644 index 0000000..bcbfb8d --- /dev/null +++ b/src/db/clients/__init__.py @@ -0,0 +1,2 @@ +from .base import DbClient +from .sqlite import SqliteClient diff --git a/src/db/clients/base.py b/src/db/clients/base.py new file mode 100644 index 0000000..3d9a2ea --- /dev/null +++ b/src/db/clients/base.py @@ -0,0 +1,18 @@ +from abc import ABC, abstractmethod +from typing import Iterator + +from sqlmodel.ext.asyncio.session import AsyncSession + + +class DbClient(ABC): + @abstractmethod + def connect(self): + pass + + @abstractmethod + def session(self) -> Iterator[AsyncSession]: + pass + + @abstractmethod + def disconnect(self): + pass diff --git a/src/db/clients/sqlite.py b/src/db/clients/sqlite.py new file mode 100644 index 0000000..cd39dd9 --- /dev/null +++ b/src/db/clients/sqlite.py @@ -0,0 +1,28 @@ +from contextlib import contextmanager +from typing import Iterator + +from sqlalchemy.engine.base import Engine +from sqlmodel import Session, SQLModel, create_engine + +from db.clients.base import DbClient +from src import DIR_REPO + +SQLITE_DB_PATH = DIR_REPO.joinpath("sqlite.db") + + +# TODO: Async +class SqliteClient(DbClient): + def __init__(self, sqlite_db_path: str = f"sqlite:///{SQLITE_DB_PATH}") -> None: + self.engine: Engine = create_engine(sqlite_db_path) + SQLModel.metadata.create_all(self.engine) + + def connect(self): + pass + + @contextmanager + def session(self) -> Iterator[Session]: + with Session(self.engine) as connection: + yield connection + + def disconnect(self): + self.engine.dispose() diff --git a/src/db/transaction.py b/src/db/transaction.py new file mode 100644 index 0000000..3190563 --- /dev/null +++ b/src/db/transaction.py @@ -0,0 +1,58 @@ +from datetime import datetime, timedelta +from typing import AsyncIterator, Iterable, List + +from rusty_results import Empty, Option, Some +from sqlalchemy import Result +from sqlmodel import select + +from db.clients import DbClient +from node.models.transactions import Transaction +from utils.datetime import increment_datetime + + +class TransactionRepository: + def __init__(self, client: DbClient): + self.client = client + + async def create(self, transaction: Iterable[Transaction]) -> None: + with self.client.session() as session: + session.add_all(transaction) + session.commit() + + async def get_latest(self, limit: int, descending: bool = True) -> List[Transaction]: + statement = select(Transaction).limit(limit) + if descending: + statement = statement.order_by(Transaction.timestamp.desc()) + else: + statement = statement.order_by(Transaction.timestamp.asc()) + + with self.client.session() as session: + results: Result[Transaction] = session.exec(statement) + return results.all() + + async def updates_stream(self, timestamp_from: datetime) -> AsyncIterator[List[Transaction]]: + _timestamp_from = timestamp_from + while True: + statement = ( + select(Transaction) + .where(Transaction.timestamp >= _timestamp_from) + .order_by(Transaction.timestamp.asc()) + ) + + with self.client.session() as session: + transactions: List[Transaction] = session.exec(statement).all() + + if len(transactions) > 0: + # POC: Assumes transactions are inserted in order and with a minimum 1 of second difference + _timestamp_from = increment_datetime(transactions[-1].timestamp) + + yield transactions + + async def get_earliest(self) -> Option[Transaction]: + with self.client.session() as session: + statement = select(Transaction).order_by(Transaction.slot.asc()).limit(1) + results: Result[Transaction] = session.exec(statement) + if (transaction := results.first()) is not None: + return Some(transaction) + else: + return Empty() diff --git a/src/frontend/__init__.py b/src/frontend/__init__.py new file mode 100644 index 0000000..f10a8a1 --- /dev/null +++ b/src/frontend/__init__.py @@ -0,0 +1 @@ +from .mount import create_frontend_router diff --git a/src/frontend/mount.py b/src/frontend/mount.py new file mode 100644 index 0000000..19689be --- /dev/null +++ b/src/frontend/mount.py @@ -0,0 +1,21 @@ +from fastapi import APIRouter +from fastapi.staticfiles import StaticFiles +from starlette.responses import FileResponse, Response + +from src import DIR_REPO + +STATIC_DIR = DIR_REPO.joinpath("static") +INDEX_FILE = STATIC_DIR.joinpath("index.html") + + +def index() -> Response: + return FileResponse(INDEX_FILE) + + +def create_frontend_router() -> APIRouter: + router = APIRouter() + + router.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") + router.get("/", include_in_schema=False)(index) + + return router diff --git a/src/lifespan.py b/src/lifespan.py new file mode 100644 index 0000000..fc42355 --- /dev/null +++ b/src/lifespan.py @@ -0,0 +1,14 @@ +import typing +from contextlib import AsyncExitStack, asynccontextmanager + +from node.lifespan import node_lifespan + +if typing.TYPE_CHECKING: + from core.app import NBE + + +@asynccontextmanager +async def lifespan(app: "NBE"): + async with AsyncExitStack() as stack: + await stack.enter_async_context(node_lifespan(app)) + yield diff --git a/src/logs.py b/src/logs.py new file mode 100644 index 0000000..c696287 --- /dev/null +++ b/src/logs.py @@ -0,0 +1,62 @@ +import os +from logging.config import dictConfig + +_LEVEL = os.getenv("NBE_LOG_LEVEL", "INFO").upper() +_SQLA_LEVEL = os.getenv("SQLALCHEMY_LOG_LEVEL", "ERROR").upper() + +_LOGGING_CONFIG = { + "version": 1, + "disable_existing_loggers": True, + "formatters": { + "standard": { + "format": "[%(asctime)s] [%(levelname)s] %(name)s: %(message)s", + "datefmt": "%Y-%m-%d %H:%M:%S", + }, + "uvicorn": { + "format": "[%(asctime)s] [%(levelname)s] [uvicorn] %(message)s", + "datefmt": "%Y-%m-%d %H:%M:%S", + }, + "uvicorn_access": { + "format": '%(client_addr)s - "%(request_line)s" %(status_code)s', + }, + }, + "handlers": { + "console": { + "class": "logging.StreamHandler", + "level": "DEBUG", + "formatter": "standard", + }, + "uvicorn": { + "class": "logging.StreamHandler", + "level": "DEBUG", + "formatter": "uvicorn", + }, + "uvicorn_access": { + "class": "logging.StreamHandler", + "level": "WARNING", + "formatter": "uvicorn_access", + }, + }, + "root": { + "handlers": ["console"], + "level": _LEVEL, + }, + "loggers": { + # ---- SQLAlchemy / SQLModel ---- + "sqlalchemy": {"level": _SQLA_LEVEL, "handlers": [], "propagate": False}, + "sqlalchemy.engine": {"level": _SQLA_LEVEL, "handlers": [], "propagate": False}, + "sqlalchemy.pool": {"level": _SQLA_LEVEL, "handlers": [], "propagate": False}, + "sqlalchemy.orm": {"level": _SQLA_LEVEL, "handlers": [], "propagate": False}, + "sqlalchemy.dialects": {"level": _SQLA_LEVEL, "handlers": [], "propagate": False}, + # ---- Uvicorn / FastAPI ---- + "uvicorn": {"level": "INFO", "handlers": ["uvicorn"], "propagate": False}, + # "uvicorn.error": {"level": "INFO", "handlers": ["uvicorn"], "propagate": False}, + "uvicorn.access": {"level": "WARNING", "handlers": ["uvicorn_access"], "propagate": False}, + # Your app namespace + "app": {"level": _LEVEL, "handlers": ["console"], "propagate": False}, + }, +} + + +def setup_logging(): + dictConfig(_LOGGING_CONFIG) diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..5f27425 --- /dev/null +++ b/src/main.py @@ -0,0 +1,35 @@ +import asyncio + +import uvicorn + +from app import create_app +from logs import setup_logging + + +async def main(): + app = create_app() + config = uvicorn.Config( + app, + host="127.0.0.1", + port=8000, + reload=False, + loop="asyncio", + log_config=None, + ) + server = uvicorn.Server(config) + + try: + await server.serve() + except KeyboardInterrupt: + # Swallow debugger’s SIGINT + pass + + +# Pycharm-Debuggable Uvicorn Server +if __name__ == "__main__": + try: + setup_logging() + asyncio.run(main()) + except KeyboardInterrupt: + # Graceful stop triggered by debugger/CTRL-C + pass diff --git a/src/node/__init__.py b/src/node/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/node/api/__init__.py b/src/node/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/node/api/base.py b/src/node/api/base.py new file mode 100644 index 0000000..1422a17 --- /dev/null +++ b/src/node/api/base.py @@ -0,0 +1,20 @@ +from abc import ABC, abstractmethod +from typing import List + +from node.models.blocks import Block +from node.models.health import Health +from node.models.transactions import Transaction + + +class NodeApi(ABC): + @abstractmethod + async def get_health_check(self) -> Health: + pass + + @abstractmethod + async def get_transactions(self) -> List[Transaction]: + pass + + @abstractmethod + async def get_blocks(self) -> List[Block]: + pass diff --git a/src/node/api/fake.py b/src/node/api/fake.py new file mode 100644 index 0000000..c65f0f7 --- /dev/null +++ b/src/node/api/fake.py @@ -0,0 +1,27 @@ +from random import choices, random +from typing import List + +from node.api.base import NodeApi +from node.models.blocks import Block +from node.models.health import Health +from node.models.transactions import Transaction + + +def get_weighted_amount() -> int: + items = [1, 2, 3] + weights = [0.6, 0.3, 0.1] + return choices(items, weights=weights, k=1)[0] + + +class FakeNodeApi(NodeApi): + async def get_health_check(self) -> Health: + if random() < 0.1: + return Health.from_unhealthy() + else: + return Health.from_healthy() + + async def get_transactions(self) -> List[Transaction]: + return [Transaction.from_random() for _ in range(get_weighted_amount())] + + async def get_blocks(self) -> List[Block]: + return [Block.from_random() for _ in range(1)] diff --git a/src/node/api/http.py b/src/node/api/http.py new file mode 100644 index 0000000..4a666fa --- /dev/null +++ b/src/node/api/http.py @@ -0,0 +1,35 @@ +from urllib.parse import urljoin + +import requests + +from node.api.base import NodeApi + + +class HttpNodeApi(NodeApi): + ENDPOINT_MANTLE_STATUS = "/mantle/status" + ENDPOINT_MANTLE_TRANSACTIONS = "/mantle/transactions" + ENDPOINT_MANTLE_BLOCKS = "/mantle/blocks" + + def __init__(self, host: str, port: int, protocol: str = "http"): + self.protocol: str = protocol + self.host: str = host + self.port: int = port + + @property + def base_url(self): + return f"{self.protocol}://{self.host}:{self.port}" + + async def get_health_check(self) -> dict: + url = urljoin(self.base_url, self.ENDPOINT_MANTLE_STATUS) + response = requests.get(url, timeout=60) + return response.json() + + async def get_transactions(self) -> list: + url = urljoin(self.base_url, self.ENDPOINT_MANTLE_TRANSACTIONS) + response = requests.get(url, timeout=60) + return response.json() + + async def get_blocks(self) -> list: + url = urljoin(self.base_url, self.ENDPOINT_MANTLE_BLOCKS) + response = requests.get(url, timeout=60) + return response.json() diff --git a/src/node/lifespan.py b/src/node/lifespan.py new file mode 100644 index 0000000..307791a --- /dev/null +++ b/src/node/lifespan.py @@ -0,0 +1,151 @@ +import random +from asyncio import TaskGroup, create_task, sleep +from contextlib import asynccontextmanager +from itertools import chain +from typing import TYPE_CHECKING, AsyncGenerator + +from rusty_results import Option + +from db.blocks import BlockRepository +from db.clients import SqliteClient +from db.transaction import TransactionRepository +from node.api.fake import FakeNodeApi +from node.api.http import HttpNodeApi +from node.manager.docker import DockerModeManager +from node.manager.fake import FakeNodeManager +from node.models.blocks import Block +from node.models.transactions import Transaction + +if TYPE_CHECKING: + from core.app import NBE + + +@asynccontextmanager +async def node_lifespan(app: "NBE") -> AsyncGenerator[None]: + # app.state.node_manager = DockerModeManager() + # app.state.node_api = HttpApi(host="127.0.0.1", port=3000) + + db_client = SqliteClient() + + app.state.node_manager = FakeNodeManager() + app.state.node_api = FakeNodeApi() + app.state.db_client = db_client + app.state.block_repository = BlockRepository(db_client) + app.state.transaction_repository = TransactionRepository(db_client) + try: + print("Starting node...") + await app.state.node_manager.start() + print("Node started.") + + app.state.subscription_to_updates_handle = create_task(subscription_to_updates(app)) + app.state.backfill = create_task(backfill(app)) + + yield + finally: + print("Stopping node...") + await app.state.node_manager.stop() + print("Node stopped.") + + +# ================ +# BACKFILLING +# ================ +# Legend: +# BT = Block and/or Transaction +# Steps: +# 1. Subscribe to new BT and store them in the database. +# 2. Backfill gaps between the earliest received BT from subscription (step 1.) and the latest BT in the database. +# 3. Backfill gaps between the earliest BT in the database and genesis BT (slot 0). +# Assumptions: +# - BT are always filled correctly. +# - There's at most 1 gap in the BT sequence: From genesis to earliest received BT from subscription. +# - Slots are populated fully or not at all (no partial slots). +# Notes: +# - Upsert always. + +# ================ +# Fake +_SUBSCRIPTION_START_SLOT = 5 # Simplification for now. +# ================ + + +async def subscription_to_updates(app: "NBE") -> None: + print("✅ Subscription to new blocks and transactions started.") + async with TaskGroup() as tg: + tg.create_task(subscribe_to_new_blocks(app)) + tg.create_task(subscribe_to_new_transactions(app)) + print("Subscription to new blocks and transactions finished.") + + +async def subscribe_to_new_blocks( + app: "NBE", interval: int = 5, subscription_start_slot: int = _SUBSCRIPTION_START_SLOT +): + while app.state.is_running: + try: + new_block: Block = Block.from_random(slot_start=subscription_start_slot, slot_end=subscription_start_slot) + print("> New Block") + await app.state.block_repository.create((new_block,)) + subscription_start_slot += 1 + except Exception as e: + print(f"Error while subscribing to new blocks: {e}") + finally: + await sleep(interval) + + +async def subscribe_to_new_transactions(app: "NBE", interval: int = 5): + while app.state.is_running: + try: + new_transaction: Transaction = Transaction.from_random() + print("> New TX") + await app.state.transaction_repository.create((new_transaction,)) + except Exception as e: + print(f"Error while subscribing to new transactions: {e}") + finally: + await sleep(interval) + + +async def backfill(app: "NBE", delay: int = 3) -> None: + await sleep(delay) # Wait for some data to be present: Simplification for now.s + + print("Backfilling started.") + async with TaskGroup() as tg: + tg.create_task(backfill_blocks(app)) + tg.create_task(backfill_transactions(app)) + print("✅ Backfilling finished.") + + +async def backfill_blocks(app: "NBE"): + # Assuming at most one gap. This will be either genesis block (no gap) or the earliest received block from subscription. + # If genesis, do nothing. + # If earliest received block from subscription, backfill. + print("Checking for block gaps to backfill...") + earliest_block: Option[Block] = await app.state.block_repository.get_earliest() + earliest_block: Block = earliest_block.expects("Subscription should have provided at least one block by now.") + earliest_block_slot = earliest_block.slot + if earliest_block_slot == 0: + print("No need to backfill blocks, genesis block already present.") + return + + print(f"Backfilling blocks from slot {earliest_block_slot - 1} down to 0...") + + def n_blocks(): + return random.choices((1, 2, 3), (6, 3, 1))[0] + + blocks = ( + (Block.from_random(slot_start=slot_index, slot_end=slot_index) for _ in range(n_blocks())) + for slot_index in reversed(range(0, earliest_block_slot)) + ) + flattened = list(chain.from_iterable(blocks)) + await sleep(10) # Simulate some backfilling delay + await app.state.block_repository.create(flattened) + print("Backfilling blocks completed.") + + +async def backfill_transactions(app: "NBE"): + # Assume there's some TXs to backfill + n = random.randint(0, 5) + print(f"Backfilling {n} transactions...") + transactions = (Transaction.from_random() for _ in range(n)) + await sleep(10) # Simulate some backfilling delay + await app.state.transaction_repository.create(transactions) + print("Backfilling transactions completed.") diff --git a/src/node/manager/__init__.py b/src/node/manager/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/node/manager/base.py b/src/node/manager/base.py new file mode 100644 index 0000000..a5fab97 --- /dev/null +++ b/src/node/manager/base.py @@ -0,0 +1,11 @@ +from abc import ABC, abstractmethod + + +class NodeManager(ABC): + @abstractmethod + async def start(self): + pass + + @abstractmethod + async def stop(self): + pass diff --git a/src/node/manager/docker.py b/src/node/manager/docker.py new file mode 100644 index 0000000..e55196c --- /dev/null +++ b/src/node/manager/docker.py @@ -0,0 +1,28 @@ +from os import environ +from pathlib import Path + +from python_on_whales.docker_client import DockerClient + +from node.manager.base import NodeManager + + +class DockerModeManager(NodeManager): + COMPOSE_FILE: Path = Path(environ["NODE_COMPOSE_FILEPATH"]) + + def __init__(self): + self.client: DockerClient = DockerClient( + client_type="docker", + compose_files=[ + self.COMPOSE_FILE, + ], + ) + + async def start(self): + self.client.compose.up( + detach=True, + build=False, + remove_orphans=True, + ) + + async def stop(self): + self.client.compose.down(remove_orphans=True, volumes=True) diff --git a/src/node/manager/fake.py b/src/node/manager/fake.py new file mode 100644 index 0000000..6bdc626 --- /dev/null +++ b/src/node/manager/fake.py @@ -0,0 +1,9 @@ +from node.manager.base import NodeManager + + +class FakeNodeManager(NodeManager): + async def start(self): + pass + + async def stop(self): + pass diff --git a/src/node/models/__init__.py b/src/node/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/node/models/blocks.py b/src/node/models/blocks.py new file mode 100644 index 0000000..0759828 --- /dev/null +++ b/src/node/models/blocks.py @@ -0,0 +1,28 @@ +from datetime import datetime + +from sqlmodel import Field + +from core.models import IdNbeModel +from utils.random import random_hash + + +class Block(IdNbeModel, table=True): + __tablename__ = "blocks" + + slot: int + hash: str + parent_hash: str + transaction_count: int + timestamp: datetime = Field(default=None, index=True) + + @classmethod + def from_random(cls, slot_start=1, slot_end=10_000) -> "Block": + import random + + return cls( + slot=random.randint(slot_start, slot_end), + hash=random_hash(), + parent_hash=random_hash(), + transaction_count=random.randint(0, 500), + timestamp=datetime.now(), + ) diff --git a/src/node/models/health.py b/src/node/models/health.py new file mode 100644 index 0000000..6a942b0 --- /dev/null +++ b/src/node/models/health.py @@ -0,0 +1,13 @@ +from core.models import IdNbeModel + + +class Health(IdNbeModel): + healthy: bool + + @classmethod + def from_healthy(cls) -> "Health": + return cls(healthy=True) + + @classmethod + def from_unhealthy(cls) -> "Health": + return cls(healthy=False) diff --git a/src/node/models/transactions.py b/src/node/models/transactions.py new file mode 100644 index 0000000..80cf931 --- /dev/null +++ b/src/node/models/transactions.py @@ -0,0 +1,30 @@ +import random +from datetime import datetime +from typing import Optional + +from sqlmodel import Field + +from core.models import IdNbeModel +from utils.random import random_address, random_hash + + +class Transaction(IdNbeModel, table=True): + __tablename__ = "transactions" + + hash: str + block_hash: Optional[str] = Field(default=None, index=True) + sender: str + recipient: str + amount: float + timestamp: datetime = Field(default=None, index=True) + + @classmethod + def from_random(cls) -> "Transaction": + return Transaction( + hash=random_hash(), + block_hash=random_hash(), + sender=random_address(), + recipient=random_address(), + amount=round(random.uniform(0.0001, 100.0), 6), + timestamp=datetime.now(), + ) diff --git a/src/router.py b/src/router.py new file mode 100644 index 0000000..982d7d9 --- /dev/null +++ b/src/router.py @@ -0,0 +1,35 @@ +from functools import partial, update_wrapper +from os import environ +from urllib.request import Request + +from fastapi import APIRouter +from starlette.responses import JSONResponse, Response + +from api.router import create_api_router +from frontend import create_frontend_router + + +async def _debug_router(_request: Request, *_, router: APIRouter) -> Response: + content = [ + { + "path": route.path, + "name": route.name, + "methods": list(route.methods), + } + for route in router.routes + ] + return JSONResponse(content) + + +def create_router() -> APIRouter: + router = APIRouter() + + debug_router = partial(_debug_router, router=router) + update_wrapper(debug_router, _debug_router) + + router.include_router(create_api_router(), prefix="/api") + if bool(environ.get("DEBUG")): + router.add_route("/debug", debug_router) + router.include_router(create_frontend_router()) + + return router diff --git a/src/utils/__init__.py b/src/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/utils/datetime.py b/src/utils/datetime.py new file mode 100644 index 0000000..feab333 --- /dev/null +++ b/src/utils/datetime.py @@ -0,0 +1,8 @@ +from datetime import datetime, timedelta + + +# Increments a timestamp by the smallest possible unit (1 microsecond), in terms of DB precision. +# This is used to avoid returning the same record again when querying for updates. +# FIXME: Hardcoded +def increment_datetime(timestamp: datetime) -> datetime: + return timestamp + timedelta(microseconds=1) diff --git a/src/utils/random.py b/src/utils/random.py new file mode 100644 index 0000000..6790394 --- /dev/null +++ b/src/utils/random.py @@ -0,0 +1,13 @@ +import random + + +def random_hex(length: int) -> str: + return f"0x{random.getrandbits(length * 4):0{length}x}" + + +def random_hash() -> str: + return random_hex(64) + + +def random_address() -> str: + return random_hex(40) diff --git a/static/index.html b/static/index.html new file mode 100644 index 0000000..4ae351c --- /dev/null +++ b/static/index.html @@ -0,0 +1,258 @@ + + + + + Nomos Block Explorer + + + + +
+

Nomos Block Explorer

+ Connecting… +
+ +
+
+ +
+
+
Blocks 0
+
/api/v1/blocks/stream
+
+
+ + + + + + + + + + + + + + +
SlotHashTxs
+
+
+ + +
+
+
Transactions 0
+
/api/v1/transactions/stream
+
+
+ + + + + + + + + + + + + + + + +
HashFrom → ToAmountTime
+
+
+
+
+ + + +