From 832ed183520f27c8b3d479d8957d5e511acf4721 Mon Sep 17 00:00:00 2001 From: Alejandro Cabeza Romero Date: Wed, 15 Oct 2025 20:53:52 +0200 Subject: [PATCH] Integrate explorer with Node. --- .gitignore | 6 +- http-client.env.json | 5 + node.rest | 24 ++++ pyproject.toml | 2 + src/api/streams.py | 33 +++++ src/api/utils.py | 27 ---- src/api/v1/blocks.py | 6 +- src/api/v1/health.py | 6 +- src/api/v1/transactions.py | 4 +- src/core/app.py | 14 +- src/core/models.py | 83 ++++++++++- src/core/sqlmodel.py | 49 +++++++ src/db/blocks.py | 76 +++++++--- src/db/transaction.py | 12 +- src/logs.py | 108 +++++++------- src/main.py | 2 + src/node/api/base.py | 8 +- src/node/api/fake.py | 6 +- src/node/api/http.py | 60 ++++++-- src/node/lifespan.py | 146 ++++++++++--------- src/node/manager/docker.py | 33 +++-- src/node/models/blocks.py | 102 ++++++++++--- src/node/models/health.py | 6 + src/node/models/transactions.py | 90 +++++++++--- static/index.html | 244 ++++++++++++++++++++------------ 25 files changed, 819 insertions(+), 333 deletions(-) create mode 100644 http-client.env.json create mode 100644 node.rest create mode 100644 src/api/streams.py delete mode 100644 src/api/utils.py create mode 100644 src/core/sqlmodel.py diff --git a/.gitignore b/.gitignore index 9eaceec..8502b42 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ -.venv/** -.idea/** +.venv/ +.idea/ **/__pycache__/** sqlite.db *.ignore* +.env +uv.lock diff --git a/http-client.env.json b/http-client.env.json new file mode 100644 index 0000000..6d4323c --- /dev/null +++ b/http-client.env.json @@ -0,0 +1,5 @@ +{ + "dev": { + "host": "http://localhost:18080" + } +} diff --git a/node.rest b/node.rest new file mode 100644 index 0000000..b0b4a41 --- /dev/null +++ b/node.rest @@ -0,0 +1,24 @@ +GET {{host}}/cryptarchia/info +Accept: application/json + +### + +GET {{host}}/cryptarchia/headers +Accept: application/json + +### + +GET {{host}}/network/info +Accept: application/json + +### + +GET {{host}}/mantle/metrics + +### + +GET {{host}}/cryptarchia/blocks?slot_from=0&slot_to=10 + +### + +GET {{host}}/cryptarchia/blocks/stream diff --git a/pyproject.toml b/pyproject.toml index b2eee1f..be81afd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,6 +4,8 @@ version = "0.1.0" requires-python = ">=3.13,<3.14" dependencies = [ "fastapi~=0.118.0", + "httpx>=0.28.1", + "pydantic-settings>=2.11.0", "python-on-whales~=0.78.0", "requests~=2.32.5", "rusty-results~=1.1.1", diff --git a/src/api/streams.py b/src/api/streams.py new file mode 100644 index 0000000..d37a473 --- /dev/null +++ b/src/api/streams.py @@ -0,0 +1,33 @@ +import logging +from typing import AsyncIterable, AsyncIterator, List, Union + +from core.models import IdNbeModel + +Data = Union[IdNbeModel, List[IdNbeModel]] +Stream = AsyncIterator[Data] + + +logger = logging.getLogger(__name__) + + +def _into_ndjson_data(data: Data) -> bytes: + if isinstance(data, list): + return b"".join(item.model_dump_ndjson() for item in data) + else: + return data.model_dump_ndjson() + + +async def into_ndjson_stream(stream: Stream, bootstrap_data: Data = None) -> AsyncIterable[bytes]: + if bootstrap_data is not None: + ndjson_data = _into_ndjson_data(bootstrap_data) + if ndjson_data: + yield ndjson_data + else: + logger.debug("Ignoring streaming bootstrap data because it is empty.") + + async for data in stream: + ndjson_data = _into_ndjson_data(data) + if ndjson_data: + yield ndjson_data + else: + logger.debug("Ignoring streaming data because it is empty.") diff --git a/src/api/utils.py b/src/api/utils.py deleted file mode 100644 index a83ee91..0000000 --- a/src/api/utils.py +++ /dev/null @@ -1,27 +0,0 @@ -from asyncio import sleep -from typing import AsyncIterable, AsyncIterator, List, Union - -from core.models import IdNbeModel - -Data = Union[IdNbeModel, List[IdNbeModel]] -Stream = AsyncIterator[Data] - - -def _parse_stream_data(data: Data) -> bytes: - if isinstance(data, list): - return b"".join(item.model_dump_ndjson() for item in data) - else: - return data.model_dump_ndjson() - - -async def streamer( - stream: Stream, - bootstrap_data: Data = None, - interval: int = 5, -) -> AsyncIterable[bytes]: - if bootstrap_data is not None: - yield _parse_stream_data(bootstrap_data) - - async for data in stream: - yield _parse_stream_data(data) - await sleep(interval) diff --git a/src/api/v1/blocks.py b/src/api/v1/blocks.py index 71bb48f..e9302f0 100644 --- a/src/api/v1/blocks.py +++ b/src/api/v1/blocks.py @@ -2,14 +2,14 @@ from typing import List from starlette.responses import Response -from api.utils import streamer +from api.streams import into_ndjson_stream from core.api import NBERequest, NDJsonStreamingResponse from node.models.blocks import Block async def stream(request: NBERequest) -> Response: - bootstrap_blocks: List[Block] = await request.app.state.block_repository.get_latest(limit=5, descending=False) + bootstrap_blocks: List[Block] = await request.app.state.block_repository.get_latest(limit=5, ascending=True) highest_slot: int = max((block.slot for block in bootstrap_blocks), default=0) updates_stream = request.app.state.block_repository.updates_stream(slot_from=highest_slot + 1) - block_stream = streamer(stream=updates_stream, bootstrap_data=bootstrap_blocks) + block_stream = into_ndjson_stream(stream=updates_stream, bootstrap_data=bootstrap_blocks) return NDJsonStreamingResponse(block_stream) diff --git a/src/api/v1/health.py b/src/api/v1/health.py index 6555056..e1231c4 100644 --- a/src/api/v1/health.py +++ b/src/api/v1/health.py @@ -1,8 +1,9 @@ +from asyncio import sleep from typing import AsyncIterator from starlette.responses import JSONResponse, Response -from api.utils import streamer +from api.streams import into_ndjson_stream from core.api import NBERequest, NDJsonStreamingResponse from node.api.base import NodeApi from node.models.health import Health @@ -16,9 +17,10 @@ async def get(request: NBERequest) -> Response: async def _health_iterator(node_api: NodeApi) -> AsyncIterator[Health]: while True: yield await node_api.get_health_check() + await sleep(10) async def stream(request: NBERequest) -> Response: _stream = _health_iterator(request.app.state.node_api) - health_stream = streamer(stream=_stream, interval=2) + health_stream = into_ndjson_stream(stream=_stream) return NDJsonStreamingResponse(health_stream) diff --git a/src/api/v1/transactions.py b/src/api/v1/transactions.py index fe81cab..fcd720e 100644 --- a/src/api/v1/transactions.py +++ b/src/api/v1/transactions.py @@ -3,7 +3,7 @@ from typing import List from starlette.responses import Response -from api.utils import streamer +from api.streams import into_ndjson_stream from core.api import NBERequest, NDJsonStreamingResponse from node.models.transactions import Transaction from utils.datetime import increment_datetime @@ -19,5 +19,5 @@ async def stream(request: NBERequest) -> Response: updates_stream = request.app.state.transaction_repository.updates_stream( timestamp_from=increment_datetime(highest_timestamp) ) - transaction_stream = streamer(stream=updates_stream, bootstrap_data=bootstrap_transactions) + transaction_stream = into_ndjson_stream(stream=updates_stream, bootstrap_data=bootstrap_transactions) return NDJsonStreamingResponse(transaction_stream) diff --git a/src/core/app.py b/src/core/app.py index ad02a7a..df5581d 100644 --- a/src/core/app.py +++ b/src/core/app.py @@ -2,6 +2,7 @@ from asyncio import Task, gather from typing import Optional from fastapi import FastAPI +from pydantic_settings import BaseSettings, SettingsConfigDict from starlette.datastructures import State from db.blocks import BlockRepository @@ -9,10 +10,19 @@ from db.clients import DbClient from db.transaction import TransactionRepository from node.api.base import NodeApi from node.manager.base import NodeManager +from src import DIR_REPO + +ENV_FILEPATH = DIR_REPO.joinpath(".env") + + +class NBESettings(BaseSettings): + model_config = SettingsConfigDict(env_file=ENV_FILEPATH, extra="ignore") + + node_compose_filepath: str class NBEState(State): - signal_exit: bool = False + signal_exit: bool = False # TODO: asyncio.Event node_manager: Optional[NodeManager] node_api: Optional[NodeApi] db_client: DbClient @@ -39,7 +49,9 @@ class NBEState(State): class NBE(FastAPI): state: NBEState + settings: NBESettings def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.state = NBEState() + self.settings = NBESettings() # type: ignore[call-arg] # Missing parameter is filled from env file diff --git a/src/core/models.py b/src/core/models.py index 7a804c6..a00e8ae 100644 --- a/src/core/models.py +++ b/src/core/models.py @@ -1,12 +1,87 @@ -from typing import Optional +from abc import ABC, abstractmethod +from datetime import datetime +from json import loads +from typing import Any, Optional, Self +from pydantic import BaseModel +from pydantic.config import ExtraValues +from sqlalchemy import DateTime, func from sqlmodel import Field, SQLModel +# --- Generic --- + + +class NdjsonMixin(ABC): + @abstractmethod + def _dump_json(self) -> str: + pass -class NbeModel(SQLModel): def model_dump_ndjson(self) -> bytes: - return f"{self.model_dump_json()}\n".encode("utf-8") + return f"{self._dump_json()}\n".encode("utf-8") -class IdNbeModel(NbeModel): +# --- Pydantic --- + + +class NbeSchema(NdjsonMixin, BaseModel): + def _dump_json(self) -> str: + return self.model_dump_json() + + +# --- SQLModel --- + + +class NbeModel(NdjsonMixin, SQLModel): + def _dump_json(self) -> str: + return self.model_dump_json() + + @classmethod + def model_validate_json( + cls, + json_data: str | bytes | bytearray, + *, + strict: bool | None = None, + extra: ExtraValues | None = None, + context: Any | None = None, + by_alias: bool | None = None, + by_name: bool | None = None, + ) -> Self: + """ + Sourced from: https://github.com/fastapi/sqlmodel/discussions/852 + Related: + - https://github.com/fastapi/sqlmodel/issues/453 + - https://github.com/fastapi/sqlmodel/discussions/961 + + SQLModel's `model_validate_json` is broken on `table=True`, when using JSON columns linked to a Pydantic model. + Nested fields defined this way are transformed to plain dict/list instead of their respective Pydantic models. + Because `model_validate` has its behaviour fixed, we delegate to it. + + Note: `pydantic.TypeAdapter` also suffers from this issue. + """ + python_data = loads(json_data) + return cls.model_validate(obj=python_data, strict=strict, context=context) + + +class IdMixin: id: Optional[int] = Field(default=None, primary_key=True) + + +class TimestampedMixin: + created_at: datetime | None = Field( + default=None, + sa_type=DateTime(timezone=True), # type: ignore[arg-type] + sa_column_kwargs={"server_default": func.now(), "nullable": False}, + ) + updated_at: datetime | None = Field( + default=None, + sa_type=DateTime(timezone=True), # type: ignore[arg-type] + sa_column_kwargs={"server_default": func.now(), "onupdate": func.now(), "nullable": False}, + ) + + +class IdNbeModel(NbeModel, IdMixin): + pass + + +class TimestampedModel(IdNbeModel, TimestampedMixin): + pass diff --git a/src/core/sqlmodel.py b/src/core/sqlmodel.py new file mode 100644 index 0000000..6137f66 --- /dev/null +++ b/src/core/sqlmodel.py @@ -0,0 +1,49 @@ +from typing import Any, Generic, List, TypeVar + +from pydantic import TypeAdapter +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.types import JSON as SA_JSON, TypeDecorator + +T = TypeVar("T") + + +class PydanticJsonColumn(TypeDecorator, Generic[T]): + """ + Store/load a Pydantic v2 model (or list of models) in a JSON/JSONB column. + + Python -> DB: accepts Model | dict | list[Model] | list[dict] | JSON str/bytes, + emits dict or list[dict] (what JSON columns expect). + DB -> Python: returns Model or list[Model], preserving shape. + """ + + impl = SA_JSON + cache_ok = True + + def __init__(self, model: type[T], *, many: bool = False) -> None: + super().__init__() + self.many = many + self._ta = TypeAdapter(List[model] if many else model) + + # Use JSONB on Postgres, JSON elsewhere + def load_dialect_impl(self, dialect): + return dialect.type_descriptor(JSONB()) if dialect.name == "postgresql" else dialect.type_descriptor(SA_JSON()) + + # Python -> DB (on INSERT/UPDATE) + def process_bind_param(self, value: Any, _dialect) -> Any: + if value is None: + return [] if self.many else None + + # If given JSON text/bytes, validate from JSON; else from Python + if isinstance(value, (str, bytes, bytearray)): + model_value = self._ta.validate_json(value.decode() if not isinstance(value, str) else value) + else: + model_value = self._ta.validate_python(value) + + # Dump to plain Python (dict/list) for the JSON column + return self._ta.dump_python(model_value, mode="json") + + # DB -> Python (on SELECT) + def process_result_value(self, value: Any, _dialect): + if value is None: + return [] if self.many else None + return self._ta.validate_python(value) diff --git a/src/db/blocks.py b/src/db/blocks.py index b007b8c..c8b95e2 100644 --- a/src/db/blocks.py +++ b/src/db/blocks.py @@ -1,50 +1,90 @@ -from typing import AsyncIterator, Iterable, List +from asyncio import sleep +from typing import AsyncIterator, List, Literal from rusty_results import Empty, Option, Some -from sqlalchemy import Result +from sqlalchemy import Float, Integer, Result, String, cast, func +from sqlalchemy.orm import aliased from sqlmodel import select +from sqlmodel.sql._expression_select_cls import Select from db.clients import DbClient from node.models.blocks import Block +def order_by_json( + sql_expr, path: str, *, into_type: Literal["int", "float", "text"] = "text", descending: bool = False +): + expression = jget(sql_expr, path, into_type=into_type) + return expression.desc() if descending else expression.asc() + + +def jget(sql_expr, path: str, *, into_type: Literal["int", "float", "text"] = "text"): + expression = func.json_extract(sql_expr, path) + match into_type: + case "int": + expression = cast(expression, Integer) + case "float": + expression = cast(expression, Float) + case "text": + expression = cast(expression, String) + return expression + + +def get_latest_statement(limit: int, latest_ascending: bool = True) -> Select: + # Fetch latest + descending = order_by_json(Block.header, "$.slot", into_type="int", descending=True) + inner = select(Block).order_by(descending, Block.id.desc()).limit(limit).subquery() + + # Reorder + latest = aliased(Block, inner) + latest_order = order_by_json(latest.header, "$.slot", into_type="int", descending=(not latest_ascending)) + id_order = latest.id.asc() if latest_ascending else latest.id.desc() + statement = select(latest).order_by(latest_order, id_order) # type: ignore[arg-type] + return statement + + class BlockRepository: + """ + FIXME: Assumes slots are sequential and one block per slot + """ + def __init__(self, client: DbClient): self.client = client - async def create(self, block: Iterable[Block]) -> None: + async def create(self, *blocks: Block) -> None: with self.client.session() as session: - session.add_all(block) + session.add_all(list(blocks)) session.commit() - async def get_latest(self, limit: int, descending: bool = True) -> List[Block]: - statement = select(Block).limit(limit) - if descending: - statement = statement.order_by(Block.slot.desc()) - else: - statement = statement.order_by(Block.slot.asc()) + async def get_latest(self, limit: int, *, ascending: bool = True) -> List[Block]: + statement = get_latest_statement(limit, ascending) with self.client.session() as session: results: Result[Block] = session.exec(statement) return results.all() - async def updates_stream(self, slot_from: int) -> AsyncIterator[List[Block]]: - _slot_from = slot_from + async def updates_stream(self, slot_from: int, *, timeout_seconds: int = 1) -> AsyncIterator[List[Block]]: + slot_cursor = slot_from + block_slot_expression = jget(Block.header, "$.slot", into_type="int") + order = order_by_json(Block.header, "$.slot", into_type="int", descending=False) + while True: - statement = select(Block).where(Block.slot >= _slot_from).order_by(Block.slot.asc()) + where_clause = block_slot_expression >= slot_cursor + statement = select(Block).where(where_clause).order_by(order) with self.client.session() as session: blocks: List[Block] = session.exec(statement).all() if len(blocks) > 0: - # POC: Assumes slots are sequential and one block per slot - _slot_from = blocks[-1].slot + 1 - - yield blocks + slot_cursor = blocks[-1].slot + 1 + yield blocks + else: + await sleep(timeout_seconds) async def get_earliest(self) -> Option[Block]: with self.client.session() as session: - statement = select(Block).order_by(Block.slot.asc()).limit(1) + order = order_by_json(Block.header, "$.slot", into_type="int", descending=False) + statement = select(Block).order_by(order).limit(1) results: Result[Block] = session.exec(statement) if (block := results.first()) is not None: return Some(block) diff --git a/src/db/transaction.py b/src/db/transaction.py index 3190563..016f59a 100644 --- a/src/db/transaction.py +++ b/src/db/transaction.py @@ -1,4 +1,5 @@ -from datetime import datetime, timedelta +from asyncio import sleep +from datetime import datetime from typing import AsyncIterator, Iterable, List from rusty_results import Empty, Option, Some @@ -20,6 +21,8 @@ class TransactionRepository: session.commit() async def get_latest(self, limit: int, descending: bool = True) -> List[Transaction]: + return [] + statement = select(Transaction).limit(limit) if descending: statement = statement.order_by(Transaction.timestamp.desc()) @@ -31,6 +34,11 @@ class TransactionRepository: return results.all() async def updates_stream(self, timestamp_from: datetime) -> AsyncIterator[List[Transaction]]: + while True: + if False: + yield [] + await sleep(10) + _timestamp_from = timestamp_from while True: statement = ( @@ -49,6 +57,8 @@ class TransactionRepository: yield transactions async def get_earliest(self) -> Option[Transaction]: + return Empty() + with self.client.session() as session: statement = select(Transaction).order_by(Transaction.slot.asc()).limit(1) results: Result[Transaction] = session.exec(statement) diff --git a/src/logs.py b/src/logs.py index c696287..5379aab 100644 --- a/src/logs.py +++ b/src/logs.py @@ -1,62 +1,68 @@ import os from logging.config import dictConfig -_LEVEL = os.getenv("NBE_LOG_LEVEL", "INFO").upper() -_SQLA_LEVEL = os.getenv("SQLALCHEMY_LOG_LEVEL", "ERROR").upper() -_LOGGING_CONFIG = { - "version": 1, - "disable_existing_loggers": True, - "formatters": { - "standard": { - "format": "[%(asctime)s] [%(levelname)s] %(name)s: %(message)s", - "datefmt": "%Y-%m-%d %H:%M:%S", +def get_logging_config(nbe_log_level: str, sqla_log_level: str): + return { + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "standard": { + "format": "[%(asctime)s] [%(levelname)s] [%(name)s] (%(module)s:%(lineno)d): %(message)s", + "datefmt": "%Y-%m-%d %H:%M:%S", + }, + "uvicorn": { + "format": "[%(asctime)s] [%(levelname)s] [uvicorn] %(message)s", + "datefmt": "%Y-%m-%d %H:%M:%S", + }, + "uvicorn_access": { + "format": '%(client_addr)s - "%(request_line)s" %(status_code)s', + }, }, - "uvicorn": { - "format": "[%(asctime)s] [%(levelname)s] [uvicorn] %(message)s", - "datefmt": "%Y-%m-%d %H:%M:%S", + "handlers": { + "console": { + "class": "logging.StreamHandler", + "level": "DEBUG", + "formatter": "standard", + }, + "uvicorn": { + "class": "logging.StreamHandler", + "level": "DEBUG", + "formatter": "uvicorn", + }, + "uvicorn_access": { + "class": "logging.StreamHandler", + "level": "WARNING", + "formatter": "uvicorn_access", + }, }, - "uvicorn_access": { - "format": '%(client_addr)s - "%(request_line)s" %(status_code)s', + "root": { + "handlers": ["console"], + "level": nbe_log_level, }, - }, - "handlers": { - "console": { - "class": "logging.StreamHandler", - "level": "DEBUG", - "formatter": "standard", + "loggers": { + # ---- SQLAlchemy / SQLModel ---- + "sqlalchemy": {"level": sqla_log_level, "handlers": [], "propagate": False}, + "sqlalchemy.engine": {"level": sqla_log_level, "handlers": [], "propagate": False}, + "sqlalchemy.pool": {"level": sqla_log_level, "handlers": [], "propagate": False}, + "sqlalchemy.orm": {"level": sqla_log_level, "handlers": [], "propagate": False}, + "sqlalchemy.dialects": {"level": sqla_log_level, "handlers": [], "propagate": False}, + # ---- Httpx / HttpCore / Urllib3 ---- + "httpx": {"level": "WARNING", "handlers": ["console"], "propagate": False}, + "httpcore": {"level": "WARNING", "handlers": ["console"], "propagate": False}, + "urllib3": {"level": "WARNING", "handlers": ["console"], "propagate": False}, + # ---- Uvicorn / FastAPI ---- + "uvicorn": {"level": "INFO", "handlers": ["uvicorn"], "propagate": False}, + # "uvicorn.error": {"level": "INFO", "handlers": ["uvicorn"], "propagate": False}, + "uvicorn.access": {"level": "WARNING", "handlers": ["uvicorn_access"], "propagate": False}, + # ---- Application ---- + "src": {"level": nbe_log_level, "handlers": ["console"], "propagate": False}, }, - "uvicorn": { - "class": "logging.StreamHandler", - "level": "DEBUG", - "formatter": "uvicorn", - }, - "uvicorn_access": { - "class": "logging.StreamHandler", - "level": "WARNING", - "formatter": "uvicorn_access", - }, - }, - "root": { - "handlers": ["console"], - "level": _LEVEL, - }, - "loggers": { - # ---- SQLAlchemy / SQLModel ---- - "sqlalchemy": {"level": _SQLA_LEVEL, "handlers": [], "propagate": False}, - "sqlalchemy.engine": {"level": _SQLA_LEVEL, "handlers": [], "propagate": False}, - "sqlalchemy.pool": {"level": _SQLA_LEVEL, "handlers": [], "propagate": False}, - "sqlalchemy.orm": {"level": _SQLA_LEVEL, "handlers": [], "propagate": False}, - "sqlalchemy.dialects": {"level": _SQLA_LEVEL, "handlers": [], "propagate": False}, - # ---- Uvicorn / FastAPI ---- - "uvicorn": {"level": "INFO", "handlers": ["uvicorn"], "propagate": False}, - # "uvicorn.error": {"level": "INFO", "handlers": ["uvicorn"], "propagate": False}, - "uvicorn.access": {"level": "WARNING", "handlers": ["uvicorn_access"], "propagate": False}, - # Your app namespace - "app": {"level": _LEVEL, "handlers": ["console"], "propagate": False}, - }, -} + } def setup_logging(): - dictConfig(_LOGGING_CONFIG) + nbe_log_level = os.getenv("NBE_LOG_LEVEL", "INFO").upper() + sqla_log_level = os.getenv("SQLALCHEMY_LOG_LEVEL", "ERROR").upper() + logging_config = get_logging_config(nbe_log_level, sqla_log_level) + dictConfig(logging_config) diff --git a/src/main.py b/src/main.py index 5f27425..2940972 100644 --- a/src/main.py +++ b/src/main.py @@ -1,6 +1,7 @@ import asyncio import uvicorn +from dotenv import load_dotenv from app import create_app from logs import setup_logging @@ -28,6 +29,7 @@ async def main(): # Pycharm-Debuggable Uvicorn Server if __name__ == "__main__": try: + load_dotenv() setup_logging() asyncio.run(main()) except KeyboardInterrupt: diff --git a/src/node/api/base.py b/src/node/api/base.py index 1422a17..6a58baa 100644 --- a/src/node/api/base.py +++ b/src/node/api/base.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import List +from typing import AsyncIterator, List from node.models.blocks import Block from node.models.health import Health @@ -16,5 +16,9 @@ class NodeApi(ABC): pass @abstractmethod - async def get_blocks(self) -> List[Block]: + async def get_blocks(self, **kwargs) -> List[Block]: + pass + + @abstractmethod + async def get_blocks_stream(self) -> AsyncIterator[List[Block]]: pass diff --git a/src/node/api/fake.py b/src/node/api/fake.py index c65f0f7..394a683 100644 --- a/src/node/api/fake.py +++ b/src/node/api/fake.py @@ -1,5 +1,5 @@ from random import choices, random -from typing import List +from typing import AsyncIterator, List from node.api.base import NodeApi from node.models.blocks import Block @@ -25,3 +25,7 @@ class FakeNodeApi(NodeApi): async def get_blocks(self) -> List[Block]: return [Block.from_random() for _ in range(1)] + + async def get_blocks_stream(self) -> AsyncIterator[Block]: + while True: + yield Block.from_random() diff --git a/src/node/api/http.py b/src/node/api/http.py index 4a666fa..1860288 100644 --- a/src/node/api/http.py +++ b/src/node/api/http.py @@ -1,35 +1,67 @@ +import logging +from typing import AsyncIterator, List from urllib.parse import urljoin +import httpx import requests from node.api.base import NodeApi +from node.models.blocks import Block +from node.models.health import Health +from node.models.transactions import Transaction + +logger = logging.getLogger(__name__) class HttpNodeApi(NodeApi): - ENDPOINT_MANTLE_STATUS = "/mantle/status" - ENDPOINT_MANTLE_TRANSACTIONS = "/mantle/transactions" - ENDPOINT_MANTLE_BLOCKS = "/mantle/blocks" + ENDPOINT_INFO = "/cryptarchia/info" + ENDPOINT_TRANSACTIONS = "/cryptarchia/transactions" + ENDPOINT_BLOCKS = "/cryptarchia/blocks" + ENDPOINT_BLOCKS_STREAM = "/cryptarchia/blocks/stream" - def __init__(self, host: str, port: int, protocol: str = "http"): - self.protocol: str = protocol + def __init__(self, host: str, port: int, protocol: str = "http", timeout: int = 60): self.host: str = host self.port: int = port + self.protocol: str = protocol + self.timeout: int = timeout @property def base_url(self): return f"{self.protocol}://{self.host}:{self.port}" - async def get_health_check(self) -> dict: - url = urljoin(self.base_url, self.ENDPOINT_MANTLE_STATUS) + async def get_health_check(self) -> Health: + url = urljoin(self.base_url, self.ENDPOINT_INFO) response = requests.get(url, timeout=60) - return response.json() + if response.status_code == 200: + return Health.from_healthy() + else: + return Health.from_unhealthy() - async def get_transactions(self) -> list: - url = urljoin(self.base_url, self.ENDPOINT_MANTLE_TRANSACTIONS) + async def get_transactions(self) -> List[Transaction]: + url = urljoin(self.base_url, self.ENDPOINT_TRANSACTIONS) response = requests.get(url, timeout=60) - return response.json() + json = response.json() + return [Transaction.model_validate(item) for item in json] - async def get_blocks(self) -> list: - url = urljoin(self.base_url, self.ENDPOINT_MANTLE_BLOCKS) + async def get_blocks(self, slot_from: int, slot_to: int) -> List[Block]: + query_string = f"slot_from={slot_from}&slot_to={slot_to}" + endpoint = urljoin(self.base_url, self.ENDPOINT_BLOCKS) + url = f"{endpoint}?{query_string}" response = requests.get(url, timeout=60) - return response.json() + python_json = response.json() + blocks = [Block.model_validate(item) for item in python_json] + return blocks + + async def get_blocks_stream(self) -> AsyncIterator[Block]: + url = urljoin(self.base_url, self.ENDPOINT_BLOCKS_STREAM) + + async with httpx.AsyncClient(timeout=self.timeout) as client: + async with client.stream("GET", url) as response: + response.raise_for_status() # TODO: Result + + async for line in response.aiter_lines(): + if not line: + continue + block = Block.model_validate_json(line) + logger.debug(f"Received new block from Node: {block}") + yield block diff --git a/src/node/lifespan.py b/src/node/lifespan.py index 307791a..1935ebb 100644 --- a/src/node/lifespan.py +++ b/src/node/lifespan.py @@ -1,8 +1,7 @@ -import random +import logging from asyncio import TaskGroup, create_task, sleep from contextlib import asynccontextmanager -from itertools import chain -from typing import TYPE_CHECKING, AsyncGenerator +from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterator from rusty_results import Option @@ -19,32 +18,34 @@ from node.models.transactions import Transaction if TYPE_CHECKING: from core.app import NBE +logger = logging.getLogger(__name__) + @asynccontextmanager async def node_lifespan(app: "NBE") -> AsyncGenerator[None]: - # app.state.node_manager = DockerModeManager() - # app.state.node_api = HttpApi(host="127.0.0.1", port=3000) - db_client = SqliteClient() app.state.node_manager = FakeNodeManager() - app.state.node_api = FakeNodeApi() + # app.state.node_manager = DockerModeManager(app.settings.node_compose_filepath) + # app.state.node_api = FakeNodeApi() + app.state.node_api = HttpNodeApi(host="127.0.0.1", port=18080) + app.state.db_client = db_client app.state.block_repository = BlockRepository(db_client) app.state.transaction_repository = TransactionRepository(db_client) try: - print("Starting node...") + logger.info("Starting node...") await app.state.node_manager.start() - print("Node started.") + logger.info("Node started.") - app.state.subscription_to_updates_handle = create_task(subscription_to_updates(app)) + app.state.subscription_to_updates_handle = create_task(subscribe_to_updates(app)) app.state.backfill = create_task(backfill(app)) yield finally: - print("Stopping node...") + logger.info("Stopping node...") await app.state.node_manager.stop() - print("Node stopped.") + logger.info("Node stopped.") # ================ @@ -69,83 +70,88 @@ _SUBSCRIPTION_START_SLOT = 5 # Simplification for now. # ================ -async def subscription_to_updates(app: "NBE") -> None: - print("✅ Subscription to new blocks and transactions started.") +async def subscribe_to_updates(app: "NBE") -> None: + logger.info("✅ Subscription to new blocks and transactions started.") async with TaskGroup() as tg: tg.create_task(subscribe_to_new_blocks(app)) tg.create_task(subscribe_to_new_transactions(app)) - print("Subscription to new blocks and transactions finished.") + logger.info("Subscription to new blocks and transactions finished.") -async def subscribe_to_new_blocks( - app: "NBE", interval: int = 5, subscription_start_slot: int = _SUBSCRIPTION_START_SLOT -): - while app.state.is_running: +async def _gracefully_close_stream(stream: AsyncIterator) -> None: + aclose = getattr(stream, "aclose", None) + if aclose is not None: try: - new_block: Block = Block.from_random(slot_start=subscription_start_slot, slot_end=subscription_start_slot) - print("> New Block") - await app.state.block_repository.create((new_block,)) - subscription_start_slot += 1 + await aclose() except Exception as e: - print(f"Error while subscribing to new blocks: {e}") - finally: - await sleep(interval) + logger.error(f"Error while closing the new blocks stream: {e}") -async def subscribe_to_new_transactions(app: "NBE", interval: int = 5): - while app.state.is_running: - try: - new_transaction: Transaction = Transaction.from_random() - print("> New TX") - await app.state.transaction_repository.create((new_transaction,)) - except Exception as e: - print(f"Error while subscribing to new transactions: {e}") - finally: - await sleep(interval) +async def subscribe_to_new_blocks(app: "NBE"): + blocks_stream: AsyncGenerator[Block] = app.state.node_api.get_blocks_stream() # type: ignore[call-arg] + try: + while app.state.is_running: + try: + block = await anext(blocks_stream) # TODO: Use anext's Sentinel? + except StopAsyncIteration: + logger.error("Subscription to the new blocks stream ended unexpectedly. Please restart the node.") + break + except TimeoutError: + continue + except Exception as e: + logger.error(f"Error while fetching new blocks: {e}") + continue + + await app.state.block_repository.create(block) + finally: + await _gracefully_close_stream(blocks_stream) -async def backfill(app: "NBE", delay: int = 3) -> None: - await sleep(delay) # Wait for some data to be present: Simplification for now.s +async def subscribe_to_new_transactions(_app: "NBE"): + pass - print("Backfilling started.") + +async def backfill(app: "NBE") -> None: + logger.info("Backfilling started.") async with TaskGroup() as tg: - tg.create_task(backfill_blocks(app)) + tg.create_task(backfill_blocks(app, db_hit_interval_seconds=3)) tg.create_task(backfill_transactions(app)) - print("✅ Backfilling finished.") + logger.info("✅ Backfilling finished.") -async def backfill_blocks(app: "NBE"): - # Assuming at most one gap. This will be either genesis block (no gap) or the earliest received block from subscription. - # If genesis, do nothing. - # If earliest received block from subscription, backfill. - print("Checking for block gaps to backfill...") +async def get_earliest_block_slot(app: "NBE") -> Option[int]: earliest_block: Option[Block] = await app.state.block_repository.get_earliest() - earliest_block: Block = earliest_block.expects("Subscription should have provided at least one block by now.") - earliest_block_slot = earliest_block.slot + return earliest_block.map(lambda block: block.slot) + + +async def backfill_blocks(app: "NBE", *, db_hit_interval_seconds: int, batch_size: int = 50): + """ + FIXME: This is a very naive implementation: + - One block per slot. + - There's at most one gap to backfill (from genesis to earliest block). + FIXME: First block received is slot=2 + """ + logger.info("Checking for block gaps to backfill...") + # Hit the database until we get a block + while (earliest_block_slot_option := await get_earliest_block_slot(app)).is_empty: + logger.debug("No blocks were found in the database yet. Waiting...") + await sleep(db_hit_interval_seconds) + earliest_block_slot: int = earliest_block_slot_option.unwrap() + if earliest_block_slot == 0: - print("No need to backfill blocks, genesis block already present.") + logger.info("No blocks to backfill.") return - print(f"Backfilling blocks from slot {earliest_block_slot - 1} down to 0...") - - def n_blocks(): - return random.choices((1, 2, 3), (6, 3, 1))[0] - - blocks = ( - (Block.from_random(slot_start=slot_index, slot_end=slot_index) for _ in range(n_blocks())) - for slot_index in reversed(range(0, earliest_block_slot)) - ) - flattened = list(chain.from_iterable(blocks)) - await sleep(10) # Simulate some backfilling delay - await app.state.block_repository.create(flattened) - print("Backfilling blocks completed.") + slot_to = earliest_block_slot - 1 + logger.info(f"Backfilling blocks from slot {slot_to} down to 0...") + while slot_to > 0: + slot_from = max(0, slot_to - batch_size) + blocks = await app.state.node_api.get_blocks(slot_from=slot_from, slot_to=slot_to) + logger.debug(f"Backfilling {len(blocks)} blocks from slot {slot_from} to {slot_to}...") + await app.state.block_repository.create(*blocks) + slot_to = slot_from + logger.info("Backfilling blocks completed.") -async def backfill_transactions(app: "NBE"): - # Assume there's some TXs to backfill - n = random.randint(0, 5) - print(f"Backfilling {n} transactions...") - transactions = (Transaction.from_random() for _ in range(n)) - await sleep(10) # Simulate some backfilling delay - await app.state.transaction_repository.create(transactions) - print("Backfilling transactions completed.") +async def backfill_transactions(_app: "NBE"): + pass diff --git a/src/node/manager/docker.py b/src/node/manager/docker.py index e55196c..a16c1e2 100644 --- a/src/node/manager/docker.py +++ b/src/node/manager/docker.py @@ -1,23 +1,40 @@ -from os import environ -from pathlib import Path +from logging import error, warn +from python_on_whales import DockerException from python_on_whales.docker_client import DockerClient +from rusty_results import Err, Ok, Result from node.manager.base import NodeManager class DockerModeManager(NodeManager): - COMPOSE_FILE: Path = Path(environ["NODE_COMPOSE_FILEPATH"]) - - def __init__(self): + def __init__(self, compose_filepath: str): self.client: DockerClient = DockerClient( client_type="docker", - compose_files=[ - self.COMPOSE_FILE, - ], + compose_files=[compose_filepath], ) + match self.ps(): + case Err(1): + error("Compose services are not running.") + exit(21) # FIXME: There's too much output here. + case Err(_): + error("Failed to run docker compose.") + exit(20) + + def ps(self, only_running: bool = True) -> Result: + try: + services = self.client.compose.ps(all=(not only_running)) # TODO: Filter compose services. + except DockerException as e: + return Err(e.return_code) + return Ok(services) + async def start(self): + services = self.ps().map(lambda _services: len(_services)).expect("Failed to get compose services.") + if services > 0: + warn("Compose services are already running.") + return + self.client.compose.up( detach=True, build=False, diff --git a/src/node/models/blocks.py b/src/node/models/blocks.py index 0759828..15e6e9d 100644 --- a/src/node/models/blocks.py +++ b/src/node/models/blocks.py @@ -1,28 +1,96 @@ -from datetime import datetime +import random +from typing import List +from sqlalchemy import Column from sqlmodel import Field -from core.models import IdNbeModel +from core.models import NbeSchema, TimestampedModel +from core.sqlmodel import PydanticJsonColumn +from node.models.transactions import Transaction from utils.random import random_hash -class Block(IdNbeModel, table=True): - __tablename__ = "blocks" - +class Public(NbeSchema): + aged_root: str + epoch_nonce: str + latest_root: str slot: int - hash: str - parent_hash: str - transaction_count: int - timestamp: datetime = Field(default=None, index=True) + total_stake: float @classmethod - def from_random(cls, slot_start=1, slot_end=10_000) -> "Block": - import random + def from_random(cls, slot: int = None) -> "Public": + if slot is not None: + slot = random.randint(1, 100) - return cls( - slot=random.randint(slot_start, slot_end), - hash=random_hash(), - parent_hash=random_hash(), - transaction_count=random.randint(0, 500), - timestamp=datetime.now(), + return Public( + aged_root=random_hash(), + epoch_nonce=random_hash(), + latest_root=random_hash(), + slot=slot, + total_stake=100.0, + ) + + +class ProofOfLeadership(NbeSchema): + entropy_contribution: str + leader_key: List[int] + proof: List[int] + public: Public + voucher_cm: str + + @classmethod + def from_random(cls, slot: int = None) -> "ProofOfLeadership": + random_hash_as_list = lambda: [random.randint(0, 255) for _ in range(64)] + + return ProofOfLeadership( + entropy_contribution=random_hash(), + leader_key=random_hash_as_list(), + proof=random_hash_as_list(), + public=Public.from_random(slot), + voucher_cm=random_hash(), + ) + + +class Header(NbeSchema): + block_root: str + parent_block: str + proof_of_leadership: ProofOfLeadership + slot: int + + @classmethod + def from_random(cls, slot_from: int = 1, slot_to: int = 100) -> "Header": + slot = random.randint(slot_from, slot_to) + return Header( + block_root=random_hash(), + parent_block=random_hash(), + proof_of_leadership=ProofOfLeadership.from_random(slot), + slot=slot, + ) + + +class Block(TimestampedModel, table=True): + __tablename__ = "blocks" + + header: Header = Field(sa_column=Column(PydanticJsonColumn(Header), nullable=False)) + transactions: List[Transaction] = Field( + default_factory=list, sa_column=Column(PydanticJsonColumn(Transaction, many=True), nullable=False) + ) + + @property + def slot(self) -> int: + return self.header.slot + + def __str__(self) -> str: + return f"Block(slot={self.slot})" + + def __repr__(self) -> str: + return f"" + + @classmethod + def from_random(cls, slot_from: int = 1, slot_to: int = 100) -> "Block": + n = random.randint(1, 10) + _transactions = [Transaction.from_random() for _ in range(n)] + return Block( + header=Header.from_random(slot_from, slot_to), + transactions=[], ) diff --git a/src/node/models/health.py b/src/node/models/health.py index 6a942b0..1d25108 100644 --- a/src/node/models/health.py +++ b/src/node/models/health.py @@ -11,3 +11,9 @@ class Health(IdNbeModel): @classmethod def from_unhealthy(cls) -> "Health": return cls(healthy=False) + + def __str__(self): + return "Healthy" if self.healthy else "Unhealthy" + + def __repr__(self): + return f"" diff --git a/src/node/models/transactions.py b/src/node/models/transactions.py index 80cf931..09c4ebb 100644 --- a/src/node/models/transactions.py +++ b/src/node/models/transactions.py @@ -1,30 +1,84 @@ import random -from datetime import datetime -from typing import Optional +from enum import StrEnum +from typing import List +from sqlalchemy import JSON, Column from sqlmodel import Field -from core.models import IdNbeModel -from utils.random import random_address, random_hash +from core.models import NbeSchema, TimestampedModel +from utils.random import random_address + +Value = int +Fr = int +Gas = float +PublicKey = bytes -class Transaction(IdNbeModel, table=True): - __tablename__ = "transactions" +class Operation(StrEnum): + CHANNEL_INSCRIBE = ("ChannelInscribe",) # (InscriptionOp) + CHANNEL_BLOB = ("ChannelBlob",) # (BlobOp) + CHANNEL_SET_KEYS = ("ChannelSetKeys",) # (SetKeysOp) + NATIVE = ("Native",) # (NativeOp) + SDP_DECLARE = ("SDPDeclare",) # (SDPDeclareOp) + SDP_WITHDRAW = ("SDPWithdraw",) # (SDPWithdrawOp) + SDP_ACTIVE = ("SDPActive",) # (SDPActiveOp) + LEADER_CLAIM = ("LeaderClaim",) # (LeaderClaimOp) - hash: str - block_hash: Optional[str] = Field(default=None, index=True) - sender: str - recipient: str - amount: float - timestamp: datetime = Field(default=None, index=True) + +class Note(NbeSchema): + value: Value + public_key: PublicKey + + @classmethod + def from_random(cls) -> "Note": + return Note( + value=random.randint(1, 100), + public_key=random_address(), + ) + + +class LedgerTransaction(NbeSchema): + """ + Tx + """ + + inputs: List[Fr] = Field(default_factory=list, sa_column=Column(JSON, nullable=False)) + outputs: List[Note] = Field(default_factory=list, sa_column=Column(JSON, nullable=False)) + + @classmethod + def from_random(cls) -> "LedgerTransaction": + return LedgerTransaction( + inputs=[random.randint(1, 100) for _ in range(10)], + outputs=[Note.from_random() for _ in range(10)], + ) + + +class Transaction(NbeSchema): # table=true # It currently lives inside Block + """ + MantleTx + """ + + # __tablename__ = "transactions" + + # TODO: hash + operations: List[str] = Field(alias="ops", default_factory=list, sa_column=Column(JSON, nullable=False)) + ledger_transaction: LedgerTransaction = Field(default_factory=dict, sa_column=Column(JSON, nullable=False)) + execution_gas_price: Gas + storage_gas_price: Gas + + def __str__(self) -> str: + return f"Transaction({self.operations})" + + def __repr__(self) -> str: + return f"" @classmethod def from_random(cls) -> "Transaction": + n = random.randint(1, 10) + operations = [random.choice(list(Operation)).value for _ in range(n)] return Transaction( - hash=random_hash(), - block_hash=random_hash(), - sender=random_address(), - recipient=random_address(), - amount=round(random.uniform(0.0001, 100.0), 6), - timestamp=datetime.now(), + operations=operations, + ledger_transaction=LedgerTransaction.from_random(), + execution_gas_price=random.random(), + storage_gas_price=random.random(), ) diff --git a/static/index.html b/static/index.html index 4ae351c..f618dad 100644 --- a/static/index.html +++ b/static/index.html @@ -22,35 +22,12 @@ .card { background: var(--card); border: 1px solid #20263a; border-radius: 10px; overflow: hidden; } .card-header { display:flex; justify-content:space-between; align-items:center; padding:12px 14px; border-bottom: 1px solid #1f2435; } - /* SCROLLER */ - .table-wrapper { - overflow-y: auto; - overflow-x: auto; /* horizontal scrolling back */ - -webkit-overflow-scrolling: touch; - max-height: 60vh; - scrollbar-gutter: stable both-edges; - padding-right: 8px; - } + .table-wrapper { overflow: auto; -webkit-overflow-scrolling: touch; max-height: 60vh; scrollbar-gutter: stable both-edges; padding-right: 8px; } + table { border-collapse: collapse; table-layout: fixed; width: 100%; font-family: ui-monospace, SFMono-Regular, Menlo, Consolas, monospace; } + .table-wrapper .table--blocks { min-width: 860px; } + .table-wrapper .table--txs { min-width: 980px; } - table { - border-collapse: collapse; - table-layout: fixed; - font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, monospace; - width: 100%; - } - /* Force tables to be wider than the container when needed so horizontal scroll appears */ - .table-wrapper .table--blocks { min-width: 560px; } - .table-wrapper .table--txs { min-width: 980px; } /* Hash + From→To + Amount + Time */ - - th, td { - text-align:left; - padding:8px 10px; - border-bottom:1px solid #1f2435; - vertical-align: top; - white-space: nowrap; - overflow: hidden; - text-overflow: ellipsis; - } + th, td { text-align:left; padding:8px 10px; border-bottom:1px solid #1f2435; white-space: nowrap; overflow: hidden; text-overflow: ellipsis; vertical-align: top; } th { color: var(--muted); font-weight: normal; font-size: 13px; position: sticky; top: 0; background: var(--card); z-index: 1; } tbody td { height: 28px; } tr:nth-child(odd) { background: #121728; } @@ -58,10 +35,13 @@ .twocol { display: grid; grid-template-columns: 1fr 1fr; gap: 20px; margin-top: 16px; } @media (max-width: 960px) { .twocol { grid-template-columns: 1fr; } } - .table--txs th:last-child, .table--txs td:last-child { padding-right: 16px; } - - .mono { font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, monospace; } + .mono { font-family: ui-monospace, SFMono-Regular, Menlo, Consolas, monospace; } .amount { font-variant-numeric: tabular-nums; } + .linkish { color: var(--fg); text-decoration: none; border-bottom: 1px dotted #2a3350; } + .linkish:hover { border-bottom-color: var(--fg); } + + /* Placeholder rows for fixed table height */ + tr.ph td { opacity: .35; } @@ -81,15 +61,19 @@
- - - + + + + + - + + + @@ -107,9 +91,9 @@
SlotHashBlock RootParent TxsTime
- + - + @@ -133,9 +117,26 @@ const TXS_ENDPOINT = `${API_PREFIX}/transactions/stream`; const TABLE_SIZE = 10; + // ------------------ RELOAD/UNLOAD ABORT PATCH (prevents “Error in input stream”) ------------------ + // We keep AbortControllers for each stream and abort them when the page unloads/reloads, + // so the fetch ReadableStreams don't throw TypeError during navigation. + let healthController = null; + let blocksController = null; + let txsController = null; + addEventListener("beforeunload", () => { + healthController?.abort(); + blocksController?.abort(); + txsController?.abort(); + }, { passive: true }); + addEventListener("pagehide", () => { + healthController?.abort(); + blocksController?.abort(); + txsController?.abort(); + }, { passive: true }); + // -------------------------------------------------------------------------------------------------- + // ---- Health pill ---- const pill = document.getElementById("status-pill"); - let healthController = null; let state = "connecting"; function setState(next) { if (next === state) return; @@ -156,7 +157,9 @@ if (!res.ok || !res.body) throw new Error(`HTTP ${res.status}`); const reader = res.body.getReader(); const decoder = new TextDecoder(); let buf = ""; while (true) { - const { value, done } = await reader.read(); if (done) break; + let chunk; + try { chunk = await reader.read(); } catch { if (healthController.signal.aborted) return; else break; } + const { value, done } = chunk; if (done) break; buf += decoder.decode(value, { stream: true }); const lines = buf.split("\n"); buf = lines.pop() ?? ""; for (const line of lines) { if (!line.trim()) continue; try { applyHealth(JSON.parse(line)); } catch {} } @@ -165,55 +168,35 @@ } connectHealth(); - // ---- NDJSON reader ---- + // ---- NDJSON reader (shared) ---- async function streamNDJSON(url, onItem, { signal } = {}) { const res = await fetch(url, { headers: { "accept": "application/x-ndjson" }, signal, cache: "no-cache" }); if (!res.ok || !res.body) throw new Error(`Stream failed: ${res.status}`); const reader = res.body.getReader(); const decoder = new TextDecoder(); let buf = ""; while (true) { - const { value, done } = await reader.read(); if (done) break; + let chunk; + try { chunk = await reader.read(); } + catch { if (signal?.aborted) return; else break; } // quiet on navigation abort + const { value, done } = chunk; if (done) break; buf += decoder.decode(value, { stream: true }); let idx; while ((idx = buf.indexOf("\n")) >= 0) { const line = buf.slice(0, idx).trim(); buf = buf.slice(idx + 1); - if (!line) continue; try { onItem(JSON.parse(line)); } catch {} + if (!line) continue; + try { onItem(JSON.parse(line)); } catch {} } } const last = buf.trim(); if (last) { try { onItem(JSON.parse(last)); } catch {} } } - // ---- Table helpers ---- - function ensureSize(tbody, colCount) { - while (tbody.rows.length < TABLE_SIZE) { - const row = document.createElement("tr"); - for (let i = 0; i < colCount; i++) { - const td = document.createElement("td"); - td.innerHTML = " "; - row.appendChild(td); - } - tbody.appendChild(row); - } + // ---- Helpers ---- + function shortHex(s, left = 10, right = 8) { + if (!s) return ""; + return s.length <= left + right + 1 ? s : `${s.slice(0,left)}…${s.slice(-right)}`; } - function appendRow(tbody, cells) { - const tr = document.createElement("tr"); - for (const cell of cells) { - const td = document.createElement("td"); - if (cell && cell.html !== undefined) td.innerHTML = cell.html; - else td.textContent = String(cell ?? ""); - if (cell && cell.class) td.className = cell.class; - tr.appendChild(td); - } - tbody.insertBefore(tr, tbody.firstChild); - while (tbody.rows.length > TABLE_SIZE) tbody.deleteRow(-1); - } - function shortHex(s, left = 6, right = 4) { if (!s) return ""; return s.length <= left + right + 2 ? s : `${s.slice(0,left)}…${s.slice(-right)}`; } - - // Flexible timestamp formatter function fmtTime(ts) { if (ts == null) return ""; - let d; - if (typeof ts === "number") d = new Date(ts < 1e12 ? ts * 1000 : ts); - else d = new Date(ts); + let d = typeof ts === "number" ? new Date(ts < 1e12 ? ts * 1000 : ts) : new Date(ts); if (isNaN(d)) return ""; return d.toLocaleString(undefined, { year: "numeric", month: "2-digit", day: "2-digit", @@ -221,37 +204,114 @@ }); } - // ---- Blocks ---- + // Keep table exactly TABLE_SIZE rows using placeholders + function ensureSize(tbody, cols, size) { + // remove existing placeholders first + for (let i = tbody.rows.length - 1; i >= 0; i--) { + if (tbody.rows[i].classList.contains("ph")) tbody.deleteRow(i); + } + let real = tbody.rows.length; + // pad + for (let i = 0; i < size - real; i++) { + const tr = document.createElement("tr"); tr.className = "ph"; + for (let c = 0; c < cols; c++) { const td = document.createElement("td"); td.innerHTML = " "; tr.appendChild(td); } + tbody.appendChild(tr); + } + } + + // ---- Blocks (dedupe by slot:id + fixed-size table) ---- (function initBlocks() { const body = document.getElementById("blocks-body"); const counter = document.getElementById("blocks-count"); - let n = 0; - ensureSize(body, 3); - streamNDJSON(BLOCKS_ENDPOINT, (b) => { - appendRow(body, [ - { html: `${b.slot}` }, - { html: `${shortHex(b.hash, 10, 8)}` }, - b.transaction_count, - ]); - counter.textContent = (++n).toString(); - }).catch(err => console.error("Blocks stream error:", err)); + const seen = new Set(); // keys "slot:id" + + function pruneAndPad() { + // remove placeholders + for (let i = body.rows.length - 1; i >= 0; i--) if (body.rows[i].classList.contains("ph")) body.deleteRow(i); + // cap real rows and drop keys for removed rows + while ([...body.rows].filter(r => !r.classList.contains("ph")).length > TABLE_SIZE) { + const last = body.rows[body.rows.length - 1]; + const key = last?.dataset?.key; + if (key) seen.delete(key); + body.deleteRow(-1); + } + // pad back to TABLE_SIZE + const real = [...body.rows].filter(r => !r.classList.contains("ph")).length; + for (let i = 0; i < TABLE_SIZE - real; i++) { + const tr = document.createElement("tr"); tr.className = "ph"; + for (let c = 0; c < 5; c++) { const td = document.createElement("td"); td.innerHTML = " "; tr.appendChild(td); } + body.appendChild(tr); + } + counter.textContent = String(real); + } + + function appendBlockRow(b, key) { + const tr = document.createElement("tr"); + tr.dataset.key = key; + const td = (html) => { const x = document.createElement("td"); x.innerHTML = html; return x; }; + tr.appendChild(td(`${b.slot}`)); + tr.appendChild(td(`${shortHex(b.root)}`)); + tr.appendChild(td(`${shortHex(b.parent)}`)); + tr.appendChild(td(`${b.txCount}`)); + tr.appendChild(td(`${fmtTime(b.time)}`)); + body.insertBefore(tr, body.firstChild); + pruneAndPad(); + } + + function normalizeBlock(raw) { + const h = raw.header ?? raw; + const created = raw.created_at ?? raw.header?.created_at ?? null; + return { + id: Number(raw.id ?? 0), // include id for (slot,id) dedupe + slot: Number(h?.slot ?? raw.slot ?? 0), + root: h?.block_root ?? raw.block_root ?? "", + parent: h?.parent_block ?? raw.parent_block ?? "", + txCount: Array.isArray(raw.transactions) ? raw.transactions.length + : (typeof raw.transaction_count === "number" ? raw.transaction_count : 0), + time: created + }; + } + + ensureSize(body, 5, TABLE_SIZE); + + // start stream (with reload-abort controller) + if (blocksController) blocksController.abort(); + blocksController = new AbortController(); + + streamNDJSON(BLOCKS_ENDPOINT, (raw) => { + const b = normalizeBlock(raw); + const key = `${b.slot}:${b.id}`; + if (seen.has(key)) { pruneAndPad(); return; } + seen.add(key); + appendBlockRow(b, key); + }, { signal: blocksController.signal }).catch(err => { + if (!blocksController.signal.aborted) console.error("Blocks stream error:", err); + }); })(); - // ---- Transactions ---- + // ---- Transactions (kept simple placeholder; adapt to your API shape) ---- (function initTxs() { const body = document.getElementById("txs-body"); const counter = document.getElementById("txs-count"); let n = 0; - ensureSize(body, 4); // 4 columns now + ensureSize(body, 4, TABLE_SIZE); + + if (txsController) txsController.abort(); + txsController = new AbortController(); + streamNDJSON(TXS_ENDPOINT, (t) => { - appendRow(body, [ - { html: `${shortHex(t.hash, 10, 8)}` }, - { html: `${shortHex(t.sender)}${shortHex(t.recipient)}` }, - { html: `${Number(t.amount).toLocaleString(undefined, { maximumFractionDigits: 8 })}` }, - { html: `${fmtTime(t.timestamp)}` }, - ]); - counter.textContent = (++n).toString(); - }).catch(err => console.error("Tx stream error:", err)); + const tr = document.createElement("tr"); + const td = (html) => { const x = document.createElement("td"); x.innerHTML = html; return x; }; + tr.appendChild(td(`${shortHex(t.hash ?? "")}`)); + tr.appendChild(td(`${shortHex(t.sender ?? "")}${shortHex(t.recipient ?? "")}`)); + tr.appendChild(td(`${(t.amount ?? 0).toLocaleString(undefined, { maximumFractionDigits: 8 })}`)); + tr.appendChild(td(`${fmtTime(t.timestamp)}`)); + body.insertBefore(tr, body.firstChild); + while (body.rows.length > TABLE_SIZE) body.deleteRow(-1); + counter.textContent = String(++n); + }, { signal: txsController.signal }).catch(err => { + if (!txsController.signal.aborted) console.error("Tx stream error:", err); + }); })();