Integrate explorer with Node.

This commit is contained in:
Alejandro Cabeza Romero 2025-10-15 20:53:52 +02:00
parent 226a1712d3
commit 832ed18352
No known key found for this signature in database
GPG Key ID: DA3D14AE478030FD
25 changed files with 819 additions and 333 deletions

6
.gitignore vendored
View File

@ -1,5 +1,7 @@
.venv/** .venv/
.idea/** .idea/
**/__pycache__/** **/__pycache__/**
sqlite.db sqlite.db
*.ignore* *.ignore*
.env
uv.lock

5
http-client.env.json Normal file
View File

@ -0,0 +1,5 @@
{
"dev": {
"host": "http://localhost:18080"
}
}

24
node.rest Normal file
View File

@ -0,0 +1,24 @@
GET {{host}}/cryptarchia/info
Accept: application/json
###
GET {{host}}/cryptarchia/headers
Accept: application/json
###
GET {{host}}/network/info
Accept: application/json
###
GET {{host}}/mantle/metrics
###
GET {{host}}/cryptarchia/blocks?slot_from=0&slot_to=10
###
GET {{host}}/cryptarchia/blocks/stream

View File

@ -4,6 +4,8 @@ version = "0.1.0"
requires-python = ">=3.13,<3.14" requires-python = ">=3.13,<3.14"
dependencies = [ dependencies = [
"fastapi~=0.118.0", "fastapi~=0.118.0",
"httpx>=0.28.1",
"pydantic-settings>=2.11.0",
"python-on-whales~=0.78.0", "python-on-whales~=0.78.0",
"requests~=2.32.5", "requests~=2.32.5",
"rusty-results~=1.1.1", "rusty-results~=1.1.1",

33
src/api/streams.py Normal file
View File

@ -0,0 +1,33 @@
import logging
from typing import AsyncIterable, AsyncIterator, List, Union
from core.models import IdNbeModel
Data = Union[IdNbeModel, List[IdNbeModel]]
Stream = AsyncIterator[Data]
logger = logging.getLogger(__name__)
def _into_ndjson_data(data: Data) -> bytes:
if isinstance(data, list):
return b"".join(item.model_dump_ndjson() for item in data)
else:
return data.model_dump_ndjson()
async def into_ndjson_stream(stream: Stream, bootstrap_data: Data = None) -> AsyncIterable[bytes]:
if bootstrap_data is not None:
ndjson_data = _into_ndjson_data(bootstrap_data)
if ndjson_data:
yield ndjson_data
else:
logger.debug("Ignoring streaming bootstrap data because it is empty.")
async for data in stream:
ndjson_data = _into_ndjson_data(data)
if ndjson_data:
yield ndjson_data
else:
logger.debug("Ignoring streaming data because it is empty.")

View File

@ -1,27 +0,0 @@
from asyncio import sleep
from typing import AsyncIterable, AsyncIterator, List, Union
from core.models import IdNbeModel
Data = Union[IdNbeModel, List[IdNbeModel]]
Stream = AsyncIterator[Data]
def _parse_stream_data(data: Data) -> bytes:
if isinstance(data, list):
return b"".join(item.model_dump_ndjson() for item in data)
else:
return data.model_dump_ndjson()
async def streamer(
stream: Stream,
bootstrap_data: Data = None,
interval: int = 5,
) -> AsyncIterable[bytes]:
if bootstrap_data is not None:
yield _parse_stream_data(bootstrap_data)
async for data in stream:
yield _parse_stream_data(data)
await sleep(interval)

View File

@ -2,14 +2,14 @@ from typing import List
from starlette.responses import Response from starlette.responses import Response
from api.utils import streamer from api.streams import into_ndjson_stream
from core.api import NBERequest, NDJsonStreamingResponse from core.api import NBERequest, NDJsonStreamingResponse
from node.models.blocks import Block from node.models.blocks import Block
async def stream(request: NBERequest) -> Response: async def stream(request: NBERequest) -> Response:
bootstrap_blocks: List[Block] = await request.app.state.block_repository.get_latest(limit=5, descending=False) bootstrap_blocks: List[Block] = await request.app.state.block_repository.get_latest(limit=5, ascending=True)
highest_slot: int = max((block.slot for block in bootstrap_blocks), default=0) highest_slot: int = max((block.slot for block in bootstrap_blocks), default=0)
updates_stream = request.app.state.block_repository.updates_stream(slot_from=highest_slot + 1) updates_stream = request.app.state.block_repository.updates_stream(slot_from=highest_slot + 1)
block_stream = streamer(stream=updates_stream, bootstrap_data=bootstrap_blocks) block_stream = into_ndjson_stream(stream=updates_stream, bootstrap_data=bootstrap_blocks)
return NDJsonStreamingResponse(block_stream) return NDJsonStreamingResponse(block_stream)

View File

@ -1,8 +1,9 @@
from asyncio import sleep
from typing import AsyncIterator from typing import AsyncIterator
from starlette.responses import JSONResponse, Response from starlette.responses import JSONResponse, Response
from api.utils import streamer from api.streams import into_ndjson_stream
from core.api import NBERequest, NDJsonStreamingResponse from core.api import NBERequest, NDJsonStreamingResponse
from node.api.base import NodeApi from node.api.base import NodeApi
from node.models.health import Health from node.models.health import Health
@ -16,9 +17,10 @@ async def get(request: NBERequest) -> Response:
async def _health_iterator(node_api: NodeApi) -> AsyncIterator[Health]: async def _health_iterator(node_api: NodeApi) -> AsyncIterator[Health]:
while True: while True:
yield await node_api.get_health_check() yield await node_api.get_health_check()
await sleep(10)
async def stream(request: NBERequest) -> Response: async def stream(request: NBERequest) -> Response:
_stream = _health_iterator(request.app.state.node_api) _stream = _health_iterator(request.app.state.node_api)
health_stream = streamer(stream=_stream, interval=2) health_stream = into_ndjson_stream(stream=_stream)
return NDJsonStreamingResponse(health_stream) return NDJsonStreamingResponse(health_stream)

View File

@ -3,7 +3,7 @@ from typing import List
from starlette.responses import Response from starlette.responses import Response
from api.utils import streamer from api.streams import into_ndjson_stream
from core.api import NBERequest, NDJsonStreamingResponse from core.api import NBERequest, NDJsonStreamingResponse
from node.models.transactions import Transaction from node.models.transactions import Transaction
from utils.datetime import increment_datetime from utils.datetime import increment_datetime
@ -19,5 +19,5 @@ async def stream(request: NBERequest) -> Response:
updates_stream = request.app.state.transaction_repository.updates_stream( updates_stream = request.app.state.transaction_repository.updates_stream(
timestamp_from=increment_datetime(highest_timestamp) timestamp_from=increment_datetime(highest_timestamp)
) )
transaction_stream = streamer(stream=updates_stream, bootstrap_data=bootstrap_transactions) transaction_stream = into_ndjson_stream(stream=updates_stream, bootstrap_data=bootstrap_transactions)
return NDJsonStreamingResponse(transaction_stream) return NDJsonStreamingResponse(transaction_stream)

View File

@ -2,6 +2,7 @@ from asyncio import Task, gather
from typing import Optional from typing import Optional
from fastapi import FastAPI from fastapi import FastAPI
from pydantic_settings import BaseSettings, SettingsConfigDict
from starlette.datastructures import State from starlette.datastructures import State
from db.blocks import BlockRepository from db.blocks import BlockRepository
@ -9,10 +10,19 @@ from db.clients import DbClient
from db.transaction import TransactionRepository from db.transaction import TransactionRepository
from node.api.base import NodeApi from node.api.base import NodeApi
from node.manager.base import NodeManager from node.manager.base import NodeManager
from src import DIR_REPO
ENV_FILEPATH = DIR_REPO.joinpath(".env")
class NBESettings(BaseSettings):
model_config = SettingsConfigDict(env_file=ENV_FILEPATH, extra="ignore")
node_compose_filepath: str
class NBEState(State): class NBEState(State):
signal_exit: bool = False signal_exit: bool = False # TODO: asyncio.Event
node_manager: Optional[NodeManager] node_manager: Optional[NodeManager]
node_api: Optional[NodeApi] node_api: Optional[NodeApi]
db_client: DbClient db_client: DbClient
@ -39,7 +49,9 @@ class NBEState(State):
class NBE(FastAPI): class NBE(FastAPI):
state: NBEState state: NBEState
settings: NBESettings
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.state = NBEState() self.state = NBEState()
self.settings = NBESettings() # type: ignore[call-arg] # Missing parameter is filled from env file

View File

@ -1,12 +1,87 @@
from typing import Optional from abc import ABC, abstractmethod
from datetime import datetime
from json import loads
from typing import Any, Optional, Self
from pydantic import BaseModel
from pydantic.config import ExtraValues
from sqlalchemy import DateTime, func
from sqlmodel import Field, SQLModel from sqlmodel import Field, SQLModel
# --- Generic ---
class NdjsonMixin(ABC):
@abstractmethod
def _dump_json(self) -> str:
pass
class NbeModel(SQLModel):
def model_dump_ndjson(self) -> bytes: def model_dump_ndjson(self) -> bytes:
return f"{self.model_dump_json()}\n".encode("utf-8") return f"{self._dump_json()}\n".encode("utf-8")
class IdNbeModel(NbeModel): # --- Pydantic ---
class NbeSchema(NdjsonMixin, BaseModel):
def _dump_json(self) -> str:
return self.model_dump_json()
# --- SQLModel ---
class NbeModel(NdjsonMixin, SQLModel):
def _dump_json(self) -> str:
return self.model_dump_json()
@classmethod
def model_validate_json(
cls,
json_data: str | bytes | bytearray,
*,
strict: bool | None = None,
extra: ExtraValues | None = None,
context: Any | None = None,
by_alias: bool | None = None,
by_name: bool | None = None,
) -> Self:
"""
Sourced from: https://github.com/fastapi/sqlmodel/discussions/852
Related:
- https://github.com/fastapi/sqlmodel/issues/453
- https://github.com/fastapi/sqlmodel/discussions/961
SQLModel's `model_validate_json` is broken on `table=True`, when using JSON columns linked to a Pydantic model.
Nested fields defined this way are transformed to plain dict/list instead of their respective Pydantic models.
Because `model_validate` has its behaviour fixed, we delegate to it.
Note: `pydantic.TypeAdapter` also suffers from this issue.
"""
python_data = loads(json_data)
return cls.model_validate(obj=python_data, strict=strict, context=context)
class IdMixin:
id: Optional[int] = Field(default=None, primary_key=True) id: Optional[int] = Field(default=None, primary_key=True)
class TimestampedMixin:
created_at: datetime | None = Field(
default=None,
sa_type=DateTime(timezone=True), # type: ignore[arg-type]
sa_column_kwargs={"server_default": func.now(), "nullable": False},
)
updated_at: datetime | None = Field(
default=None,
sa_type=DateTime(timezone=True), # type: ignore[arg-type]
sa_column_kwargs={"server_default": func.now(), "onupdate": func.now(), "nullable": False},
)
class IdNbeModel(NbeModel, IdMixin):
pass
class TimestampedModel(IdNbeModel, TimestampedMixin):
pass

49
src/core/sqlmodel.py Normal file
View File

@ -0,0 +1,49 @@
from typing import Any, Generic, List, TypeVar
from pydantic import TypeAdapter
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.types import JSON as SA_JSON, TypeDecorator
T = TypeVar("T")
class PydanticJsonColumn(TypeDecorator, Generic[T]):
"""
Store/load a Pydantic v2 model (or list of models) in a JSON/JSONB column.
Python -> DB: accepts Model | dict | list[Model] | list[dict] | JSON str/bytes,
emits dict or list[dict] (what JSON columns expect).
DB -> Python: returns Model or list[Model], preserving shape.
"""
impl = SA_JSON
cache_ok = True
def __init__(self, model: type[T], *, many: bool = False) -> None:
super().__init__()
self.many = many
self._ta = TypeAdapter(List[model] if many else model)
# Use JSONB on Postgres, JSON elsewhere
def load_dialect_impl(self, dialect):
return dialect.type_descriptor(JSONB()) if dialect.name == "postgresql" else dialect.type_descriptor(SA_JSON())
# Python -> DB (on INSERT/UPDATE)
def process_bind_param(self, value: Any, _dialect) -> Any:
if value is None:
return [] if self.many else None
# If given JSON text/bytes, validate from JSON; else from Python
if isinstance(value, (str, bytes, bytearray)):
model_value = self._ta.validate_json(value.decode() if not isinstance(value, str) else value)
else:
model_value = self._ta.validate_python(value)
# Dump to plain Python (dict/list) for the JSON column
return self._ta.dump_python(model_value, mode="json")
# DB -> Python (on SELECT)
def process_result_value(self, value: Any, _dialect):
if value is None:
return [] if self.many else None
return self._ta.validate_python(value)

View File

@ -1,50 +1,90 @@
from typing import AsyncIterator, Iterable, List from asyncio import sleep
from typing import AsyncIterator, List, Literal
from rusty_results import Empty, Option, Some from rusty_results import Empty, Option, Some
from sqlalchemy import Result from sqlalchemy import Float, Integer, Result, String, cast, func
from sqlalchemy.orm import aliased
from sqlmodel import select from sqlmodel import select
from sqlmodel.sql._expression_select_cls import Select
from db.clients import DbClient from db.clients import DbClient
from node.models.blocks import Block from node.models.blocks import Block
def order_by_json(
sql_expr, path: str, *, into_type: Literal["int", "float", "text"] = "text", descending: bool = False
):
expression = jget(sql_expr, path, into_type=into_type)
return expression.desc() if descending else expression.asc()
def jget(sql_expr, path: str, *, into_type: Literal["int", "float", "text"] = "text"):
expression = func.json_extract(sql_expr, path)
match into_type:
case "int":
expression = cast(expression, Integer)
case "float":
expression = cast(expression, Float)
case "text":
expression = cast(expression, String)
return expression
def get_latest_statement(limit: int, latest_ascending: bool = True) -> Select:
# Fetch latest
descending = order_by_json(Block.header, "$.slot", into_type="int", descending=True)
inner = select(Block).order_by(descending, Block.id.desc()).limit(limit).subquery()
# Reorder
latest = aliased(Block, inner)
latest_order = order_by_json(latest.header, "$.slot", into_type="int", descending=(not latest_ascending))
id_order = latest.id.asc() if latest_ascending else latest.id.desc()
statement = select(latest).order_by(latest_order, id_order) # type: ignore[arg-type]
return statement
class BlockRepository: class BlockRepository:
"""
FIXME: Assumes slots are sequential and one block per slot
"""
def __init__(self, client: DbClient): def __init__(self, client: DbClient):
self.client = client self.client = client
async def create(self, block: Iterable[Block]) -> None: async def create(self, *blocks: Block) -> None:
with self.client.session() as session: with self.client.session() as session:
session.add_all(block) session.add_all(list(blocks))
session.commit() session.commit()
async def get_latest(self, limit: int, descending: bool = True) -> List[Block]: async def get_latest(self, limit: int, *, ascending: bool = True) -> List[Block]:
statement = select(Block).limit(limit) statement = get_latest_statement(limit, ascending)
if descending:
statement = statement.order_by(Block.slot.desc())
else:
statement = statement.order_by(Block.slot.asc())
with self.client.session() as session: with self.client.session() as session:
results: Result[Block] = session.exec(statement) results: Result[Block] = session.exec(statement)
return results.all() return results.all()
async def updates_stream(self, slot_from: int) -> AsyncIterator[List[Block]]: async def updates_stream(self, slot_from: int, *, timeout_seconds: int = 1) -> AsyncIterator[List[Block]]:
_slot_from = slot_from slot_cursor = slot_from
block_slot_expression = jget(Block.header, "$.slot", into_type="int")
order = order_by_json(Block.header, "$.slot", into_type="int", descending=False)
while True: while True:
statement = select(Block).where(Block.slot >= _slot_from).order_by(Block.slot.asc()) where_clause = block_slot_expression >= slot_cursor
statement = select(Block).where(where_clause).order_by(order)
with self.client.session() as session: with self.client.session() as session:
blocks: List[Block] = session.exec(statement).all() blocks: List[Block] = session.exec(statement).all()
if len(blocks) > 0: if len(blocks) > 0:
# POC: Assumes slots are sequential and one block per slot slot_cursor = blocks[-1].slot + 1
_slot_from = blocks[-1].slot + 1 yield blocks
else:
yield blocks await sleep(timeout_seconds)
async def get_earliest(self) -> Option[Block]: async def get_earliest(self) -> Option[Block]:
with self.client.session() as session: with self.client.session() as session:
statement = select(Block).order_by(Block.slot.asc()).limit(1) order = order_by_json(Block.header, "$.slot", into_type="int", descending=False)
statement = select(Block).order_by(order).limit(1)
results: Result[Block] = session.exec(statement) results: Result[Block] = session.exec(statement)
if (block := results.first()) is not None: if (block := results.first()) is not None:
return Some(block) return Some(block)

View File

@ -1,4 +1,5 @@
from datetime import datetime, timedelta from asyncio import sleep
from datetime import datetime
from typing import AsyncIterator, Iterable, List from typing import AsyncIterator, Iterable, List
from rusty_results import Empty, Option, Some from rusty_results import Empty, Option, Some
@ -20,6 +21,8 @@ class TransactionRepository:
session.commit() session.commit()
async def get_latest(self, limit: int, descending: bool = True) -> List[Transaction]: async def get_latest(self, limit: int, descending: bool = True) -> List[Transaction]:
return []
statement = select(Transaction).limit(limit) statement = select(Transaction).limit(limit)
if descending: if descending:
statement = statement.order_by(Transaction.timestamp.desc()) statement = statement.order_by(Transaction.timestamp.desc())
@ -31,6 +34,11 @@ class TransactionRepository:
return results.all() return results.all()
async def updates_stream(self, timestamp_from: datetime) -> AsyncIterator[List[Transaction]]: async def updates_stream(self, timestamp_from: datetime) -> AsyncIterator[List[Transaction]]:
while True:
if False:
yield []
await sleep(10)
_timestamp_from = timestamp_from _timestamp_from = timestamp_from
while True: while True:
statement = ( statement = (
@ -49,6 +57,8 @@ class TransactionRepository:
yield transactions yield transactions
async def get_earliest(self) -> Option[Transaction]: async def get_earliest(self) -> Option[Transaction]:
return Empty()
with self.client.session() as session: with self.client.session() as session:
statement = select(Transaction).order_by(Transaction.slot.asc()).limit(1) statement = select(Transaction).order_by(Transaction.slot.asc()).limit(1)
results: Result[Transaction] = session.exec(statement) results: Result[Transaction] = session.exec(statement)

View File

@ -1,62 +1,68 @@
import os import os
from logging.config import dictConfig from logging.config import dictConfig
_LEVEL = os.getenv("NBE_LOG_LEVEL", "INFO").upper()
_SQLA_LEVEL = os.getenv("SQLALCHEMY_LOG_LEVEL", "ERROR").upper()
_LOGGING_CONFIG = { def get_logging_config(nbe_log_level: str, sqla_log_level: str):
"version": 1, return {
"disable_existing_loggers": True, "version": 1,
"formatters": { "disable_existing_loggers": False,
"standard": { "formatters": {
"format": "[%(asctime)s] [%(levelname)s] %(name)s: %(message)s", "standard": {
"datefmt": "%Y-%m-%d %H:%M:%S", "format": "[%(asctime)s] [%(levelname)s] [%(name)s] (%(module)s:%(lineno)d): %(message)s",
"datefmt": "%Y-%m-%d %H:%M:%S",
},
"uvicorn": {
"format": "[%(asctime)s] [%(levelname)s] [uvicorn] %(message)s",
"datefmt": "%Y-%m-%d %H:%M:%S",
},
"uvicorn_access": {
"format": '%(client_addr)s - "%(request_line)s" %(status_code)s',
},
}, },
"uvicorn": { "handlers": {
"format": "[%(asctime)s] [%(levelname)s] [uvicorn] %(message)s", "console": {
"datefmt": "%Y-%m-%d %H:%M:%S", "class": "logging.StreamHandler",
"level": "DEBUG",
"formatter": "standard",
},
"uvicorn": {
"class": "logging.StreamHandler",
"level": "DEBUG",
"formatter": "uvicorn",
},
"uvicorn_access": {
"class": "logging.StreamHandler",
"level": "WARNING",
"formatter": "uvicorn_access",
},
}, },
"uvicorn_access": { "root": {
"format": '%(client_addr)s - "%(request_line)s" %(status_code)s', "handlers": ["console"],
"level": nbe_log_level,
}, },
}, "loggers": {
"handlers": { # ---- SQLAlchemy / SQLModel ----
"console": { "sqlalchemy": {"level": sqla_log_level, "handlers": [], "propagate": False},
"class": "logging.StreamHandler", "sqlalchemy.engine": {"level": sqla_log_level, "handlers": [], "propagate": False},
"level": "DEBUG", "sqlalchemy.pool": {"level": sqla_log_level, "handlers": [], "propagate": False},
"formatter": "standard", "sqlalchemy.orm": {"level": sqla_log_level, "handlers": [], "propagate": False},
"sqlalchemy.dialects": {"level": sqla_log_level, "handlers": [], "propagate": False},
# ---- Httpx / HttpCore / Urllib3 ----
"httpx": {"level": "WARNING", "handlers": ["console"], "propagate": False},
"httpcore": {"level": "WARNING", "handlers": ["console"], "propagate": False},
"urllib3": {"level": "WARNING", "handlers": ["console"], "propagate": False},
# ---- Uvicorn / FastAPI ----
"uvicorn": {"level": "INFO", "handlers": ["uvicorn"], "propagate": False},
# "uvicorn.error": {"level": "INFO", "handlers": ["uvicorn"], "propagate": False},
"uvicorn.access": {"level": "WARNING", "handlers": ["uvicorn_access"], "propagate": False},
# ---- Application ----
"src": {"level": nbe_log_level, "handlers": ["console"], "propagate": False},
}, },
"uvicorn": { }
"class": "logging.StreamHandler",
"level": "DEBUG",
"formatter": "uvicorn",
},
"uvicorn_access": {
"class": "logging.StreamHandler",
"level": "WARNING",
"formatter": "uvicorn_access",
},
},
"root": {
"handlers": ["console"],
"level": _LEVEL,
},
"loggers": {
# ---- SQLAlchemy / SQLModel ----
"sqlalchemy": {"level": _SQLA_LEVEL, "handlers": [], "propagate": False},
"sqlalchemy.engine": {"level": _SQLA_LEVEL, "handlers": [], "propagate": False},
"sqlalchemy.pool": {"level": _SQLA_LEVEL, "handlers": [], "propagate": False},
"sqlalchemy.orm": {"level": _SQLA_LEVEL, "handlers": [], "propagate": False},
"sqlalchemy.dialects": {"level": _SQLA_LEVEL, "handlers": [], "propagate": False},
# ---- Uvicorn / FastAPI ----
"uvicorn": {"level": "INFO", "handlers": ["uvicorn"], "propagate": False},
# "uvicorn.error": {"level": "INFO", "handlers": ["uvicorn"], "propagate": False},
"uvicorn.access": {"level": "WARNING", "handlers": ["uvicorn_access"], "propagate": False},
# Your app namespace
"app": {"level": _LEVEL, "handlers": ["console"], "propagate": False},
},
}
def setup_logging(): def setup_logging():
dictConfig(_LOGGING_CONFIG) nbe_log_level = os.getenv("NBE_LOG_LEVEL", "INFO").upper()
sqla_log_level = os.getenv("SQLALCHEMY_LOG_LEVEL", "ERROR").upper()
logging_config = get_logging_config(nbe_log_level, sqla_log_level)
dictConfig(logging_config)

View File

@ -1,6 +1,7 @@
import asyncio import asyncio
import uvicorn import uvicorn
from dotenv import load_dotenv
from app import create_app from app import create_app
from logs import setup_logging from logs import setup_logging
@ -28,6 +29,7 @@ async def main():
# Pycharm-Debuggable Uvicorn Server # Pycharm-Debuggable Uvicorn Server
if __name__ == "__main__": if __name__ == "__main__":
try: try:
load_dotenv()
setup_logging() setup_logging()
asyncio.run(main()) asyncio.run(main())
except KeyboardInterrupt: except KeyboardInterrupt:

View File

@ -1,5 +1,5 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import List from typing import AsyncIterator, List
from node.models.blocks import Block from node.models.blocks import Block
from node.models.health import Health from node.models.health import Health
@ -16,5 +16,9 @@ class NodeApi(ABC):
pass pass
@abstractmethod @abstractmethod
async def get_blocks(self) -> List[Block]: async def get_blocks(self, **kwargs) -> List[Block]:
pass
@abstractmethod
async def get_blocks_stream(self) -> AsyncIterator[List[Block]]:
pass pass

View File

@ -1,5 +1,5 @@
from random import choices, random from random import choices, random
from typing import List from typing import AsyncIterator, List
from node.api.base import NodeApi from node.api.base import NodeApi
from node.models.blocks import Block from node.models.blocks import Block
@ -25,3 +25,7 @@ class FakeNodeApi(NodeApi):
async def get_blocks(self) -> List[Block]: async def get_blocks(self) -> List[Block]:
return [Block.from_random() for _ in range(1)] return [Block.from_random() for _ in range(1)]
async def get_blocks_stream(self) -> AsyncIterator[Block]:
while True:
yield Block.from_random()

View File

@ -1,35 +1,67 @@
import logging
from typing import AsyncIterator, List
from urllib.parse import urljoin from urllib.parse import urljoin
import httpx
import requests import requests
from node.api.base import NodeApi from node.api.base import NodeApi
from node.models.blocks import Block
from node.models.health import Health
from node.models.transactions import Transaction
logger = logging.getLogger(__name__)
class HttpNodeApi(NodeApi): class HttpNodeApi(NodeApi):
ENDPOINT_MANTLE_STATUS = "/mantle/status" ENDPOINT_INFO = "/cryptarchia/info"
ENDPOINT_MANTLE_TRANSACTIONS = "/mantle/transactions" ENDPOINT_TRANSACTIONS = "/cryptarchia/transactions"
ENDPOINT_MANTLE_BLOCKS = "/mantle/blocks" ENDPOINT_BLOCKS = "/cryptarchia/blocks"
ENDPOINT_BLOCKS_STREAM = "/cryptarchia/blocks/stream"
def __init__(self, host: str, port: int, protocol: str = "http"): def __init__(self, host: str, port: int, protocol: str = "http", timeout: int = 60):
self.protocol: str = protocol
self.host: str = host self.host: str = host
self.port: int = port self.port: int = port
self.protocol: str = protocol
self.timeout: int = timeout
@property @property
def base_url(self): def base_url(self):
return f"{self.protocol}://{self.host}:{self.port}" return f"{self.protocol}://{self.host}:{self.port}"
async def get_health_check(self) -> dict: async def get_health_check(self) -> Health:
url = urljoin(self.base_url, self.ENDPOINT_MANTLE_STATUS) url = urljoin(self.base_url, self.ENDPOINT_INFO)
response = requests.get(url, timeout=60) response = requests.get(url, timeout=60)
return response.json() if response.status_code == 200:
return Health.from_healthy()
else:
return Health.from_unhealthy()
async def get_transactions(self) -> list: async def get_transactions(self) -> List[Transaction]:
url = urljoin(self.base_url, self.ENDPOINT_MANTLE_TRANSACTIONS) url = urljoin(self.base_url, self.ENDPOINT_TRANSACTIONS)
response = requests.get(url, timeout=60) response = requests.get(url, timeout=60)
return response.json() json = response.json()
return [Transaction.model_validate(item) for item in json]
async def get_blocks(self) -> list: async def get_blocks(self, slot_from: int, slot_to: int) -> List[Block]:
url = urljoin(self.base_url, self.ENDPOINT_MANTLE_BLOCKS) query_string = f"slot_from={slot_from}&slot_to={slot_to}"
endpoint = urljoin(self.base_url, self.ENDPOINT_BLOCKS)
url = f"{endpoint}?{query_string}"
response = requests.get(url, timeout=60) response = requests.get(url, timeout=60)
return response.json() python_json = response.json()
blocks = [Block.model_validate(item) for item in python_json]
return blocks
async def get_blocks_stream(self) -> AsyncIterator[Block]:
url = urljoin(self.base_url, self.ENDPOINT_BLOCKS_STREAM)
async with httpx.AsyncClient(timeout=self.timeout) as client:
async with client.stream("GET", url) as response:
response.raise_for_status() # TODO: Result
async for line in response.aiter_lines():
if not line:
continue
block = Block.model_validate_json(line)
logger.debug(f"Received new block from Node: {block}")
yield block

View File

@ -1,8 +1,7 @@
import random import logging
from asyncio import TaskGroup, create_task, sleep from asyncio import TaskGroup, create_task, sleep
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from itertools import chain from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterator
from typing import TYPE_CHECKING, AsyncGenerator
from rusty_results import Option from rusty_results import Option
@ -19,32 +18,34 @@ from node.models.transactions import Transaction
if TYPE_CHECKING: if TYPE_CHECKING:
from core.app import NBE from core.app import NBE
logger = logging.getLogger(__name__)
@asynccontextmanager @asynccontextmanager
async def node_lifespan(app: "NBE") -> AsyncGenerator[None]: async def node_lifespan(app: "NBE") -> AsyncGenerator[None]:
# app.state.node_manager = DockerModeManager()
# app.state.node_api = HttpApi(host="127.0.0.1", port=3000)
db_client = SqliteClient() db_client = SqliteClient()
app.state.node_manager = FakeNodeManager() app.state.node_manager = FakeNodeManager()
app.state.node_api = FakeNodeApi() # app.state.node_manager = DockerModeManager(app.settings.node_compose_filepath)
# app.state.node_api = FakeNodeApi()
app.state.node_api = HttpNodeApi(host="127.0.0.1", port=18080)
app.state.db_client = db_client app.state.db_client = db_client
app.state.block_repository = BlockRepository(db_client) app.state.block_repository = BlockRepository(db_client)
app.state.transaction_repository = TransactionRepository(db_client) app.state.transaction_repository = TransactionRepository(db_client)
try: try:
print("Starting node...") logger.info("Starting node...")
await app.state.node_manager.start() await app.state.node_manager.start()
print("Node started.") logger.info("Node started.")
app.state.subscription_to_updates_handle = create_task(subscription_to_updates(app)) app.state.subscription_to_updates_handle = create_task(subscribe_to_updates(app))
app.state.backfill = create_task(backfill(app)) app.state.backfill = create_task(backfill(app))
yield yield
finally: finally:
print("Stopping node...") logger.info("Stopping node...")
await app.state.node_manager.stop() await app.state.node_manager.stop()
print("Node stopped.") logger.info("Node stopped.")
# ================ # ================
@ -69,83 +70,88 @@ _SUBSCRIPTION_START_SLOT = 5 # Simplification for now.
# ================ # ================
async def subscription_to_updates(app: "NBE") -> None: async def subscribe_to_updates(app: "NBE") -> None:
print("✅ Subscription to new blocks and transactions started.") logger.info("✅ Subscription to new blocks and transactions started.")
async with TaskGroup() as tg: async with TaskGroup() as tg:
tg.create_task(subscribe_to_new_blocks(app)) tg.create_task(subscribe_to_new_blocks(app))
tg.create_task(subscribe_to_new_transactions(app)) tg.create_task(subscribe_to_new_transactions(app))
print("Subscription to new blocks and transactions finished.") logger.info("Subscription to new blocks and transactions finished.")
async def subscribe_to_new_blocks( async def _gracefully_close_stream(stream: AsyncIterator) -> None:
app: "NBE", interval: int = 5, subscription_start_slot: int = _SUBSCRIPTION_START_SLOT aclose = getattr(stream, "aclose", None)
): if aclose is not None:
while app.state.is_running:
try: try:
new_block: Block = Block.from_random(slot_start=subscription_start_slot, slot_end=subscription_start_slot) await aclose()
print("> New Block")
await app.state.block_repository.create((new_block,))
subscription_start_slot += 1
except Exception as e: except Exception as e:
print(f"Error while subscribing to new blocks: {e}") logger.error(f"Error while closing the new blocks stream: {e}")
finally:
await sleep(interval)
async def subscribe_to_new_transactions(app: "NBE", interval: int = 5): async def subscribe_to_new_blocks(app: "NBE"):
while app.state.is_running: blocks_stream: AsyncGenerator[Block] = app.state.node_api.get_blocks_stream() # type: ignore[call-arg]
try: try:
new_transaction: Transaction = Transaction.from_random() while app.state.is_running:
print("> New TX") try:
await app.state.transaction_repository.create((new_transaction,)) block = await anext(blocks_stream) # TODO: Use anext's Sentinel?
except Exception as e: except StopAsyncIteration:
print(f"Error while subscribing to new transactions: {e}") logger.error("Subscription to the new blocks stream ended unexpectedly. Please restart the node.")
finally: break
await sleep(interval) except TimeoutError:
continue
except Exception as e:
logger.error(f"Error while fetching new blocks: {e}")
continue
await app.state.block_repository.create(block)
finally:
await _gracefully_close_stream(blocks_stream)
async def backfill(app: "NBE", delay: int = 3) -> None: async def subscribe_to_new_transactions(_app: "NBE"):
await sleep(delay) # Wait for some data to be present: Simplification for now.s pass
print("Backfilling started.")
async def backfill(app: "NBE") -> None:
logger.info("Backfilling started.")
async with TaskGroup() as tg: async with TaskGroup() as tg:
tg.create_task(backfill_blocks(app)) tg.create_task(backfill_blocks(app, db_hit_interval_seconds=3))
tg.create_task(backfill_transactions(app)) tg.create_task(backfill_transactions(app))
print("✅ Backfilling finished.") logger.info("✅ Backfilling finished.")
async def backfill_blocks(app: "NBE"): async def get_earliest_block_slot(app: "NBE") -> Option[int]:
# Assuming at most one gap. This will be either genesis block (no gap) or the earliest received block from subscription.
# If genesis, do nothing.
# If earliest received block from subscription, backfill.
print("Checking for block gaps to backfill...")
earliest_block: Option[Block] = await app.state.block_repository.get_earliest() earliest_block: Option[Block] = await app.state.block_repository.get_earliest()
earliest_block: Block = earliest_block.expects("Subscription should have provided at least one block by now.") return earliest_block.map(lambda block: block.slot)
earliest_block_slot = earliest_block.slot
async def backfill_blocks(app: "NBE", *, db_hit_interval_seconds: int, batch_size: int = 50):
"""
FIXME: This is a very naive implementation:
- One block per slot.
- There's at most one gap to backfill (from genesis to earliest block).
FIXME: First block received is slot=2
"""
logger.info("Checking for block gaps to backfill...")
# Hit the database until we get a block
while (earliest_block_slot_option := await get_earliest_block_slot(app)).is_empty:
logger.debug("No blocks were found in the database yet. Waiting...")
await sleep(db_hit_interval_seconds)
earliest_block_slot: int = earliest_block_slot_option.unwrap()
if earliest_block_slot == 0: if earliest_block_slot == 0:
print("No need to backfill blocks, genesis block already present.") logger.info("No blocks to backfill.")
return return
print(f"Backfilling blocks from slot {earliest_block_slot - 1} down to 0...") slot_to = earliest_block_slot - 1
logger.info(f"Backfilling blocks from slot {slot_to} down to 0...")
def n_blocks(): while slot_to > 0:
return random.choices((1, 2, 3), (6, 3, 1))[0] slot_from = max(0, slot_to - batch_size)
blocks = await app.state.node_api.get_blocks(slot_from=slot_from, slot_to=slot_to)
blocks = ( logger.debug(f"Backfilling {len(blocks)} blocks from slot {slot_from} to {slot_to}...")
(Block.from_random(slot_start=slot_index, slot_end=slot_index) for _ in range(n_blocks())) await app.state.block_repository.create(*blocks)
for slot_index in reversed(range(0, earliest_block_slot)) slot_to = slot_from
) logger.info("Backfilling blocks completed.")
flattened = list(chain.from_iterable(blocks))
await sleep(10) # Simulate some backfilling delay
await app.state.block_repository.create(flattened)
print("Backfilling blocks completed.")
async def backfill_transactions(app: "NBE"): async def backfill_transactions(_app: "NBE"):
# Assume there's some TXs to backfill pass
n = random.randint(0, 5)
print(f"Backfilling {n} transactions...")
transactions = (Transaction.from_random() for _ in range(n))
await sleep(10) # Simulate some backfilling delay
await app.state.transaction_repository.create(transactions)
print("Backfilling transactions completed.")

View File

@ -1,23 +1,40 @@
from os import environ from logging import error, warn
from pathlib import Path
from python_on_whales import DockerException
from python_on_whales.docker_client import DockerClient from python_on_whales.docker_client import DockerClient
from rusty_results import Err, Ok, Result
from node.manager.base import NodeManager from node.manager.base import NodeManager
class DockerModeManager(NodeManager): class DockerModeManager(NodeManager):
COMPOSE_FILE: Path = Path(environ["NODE_COMPOSE_FILEPATH"]) def __init__(self, compose_filepath: str):
def __init__(self):
self.client: DockerClient = DockerClient( self.client: DockerClient = DockerClient(
client_type="docker", client_type="docker",
compose_files=[ compose_files=[compose_filepath],
self.COMPOSE_FILE,
],
) )
match self.ps():
case Err(1):
error("Compose services are not running.")
exit(21) # FIXME: There's too much output here.
case Err(_):
error("Failed to run docker compose.")
exit(20)
def ps(self, only_running: bool = True) -> Result:
try:
services = self.client.compose.ps(all=(not only_running)) # TODO: Filter compose services.
except DockerException as e:
return Err(e.return_code)
return Ok(services)
async def start(self): async def start(self):
services = self.ps().map(lambda _services: len(_services)).expect("Failed to get compose services.")
if services > 0:
warn("Compose services are already running.")
return
self.client.compose.up( self.client.compose.up(
detach=True, detach=True,
build=False, build=False,

View File

@ -1,28 +1,96 @@
from datetime import datetime import random
from typing import List
from sqlalchemy import Column
from sqlmodel import Field from sqlmodel import Field
from core.models import IdNbeModel from core.models import NbeSchema, TimestampedModel
from core.sqlmodel import PydanticJsonColumn
from node.models.transactions import Transaction
from utils.random import random_hash from utils.random import random_hash
class Block(IdNbeModel, table=True): class Public(NbeSchema):
__tablename__ = "blocks" aged_root: str
epoch_nonce: str
latest_root: str
slot: int slot: int
hash: str total_stake: float
parent_hash: str
transaction_count: int
timestamp: datetime = Field(default=None, index=True)
@classmethod @classmethod
def from_random(cls, slot_start=1, slot_end=10_000) -> "Block": def from_random(cls, slot: int = None) -> "Public":
import random if slot is not None:
slot = random.randint(1, 100)
return cls( return Public(
slot=random.randint(slot_start, slot_end), aged_root=random_hash(),
hash=random_hash(), epoch_nonce=random_hash(),
parent_hash=random_hash(), latest_root=random_hash(),
transaction_count=random.randint(0, 500), slot=slot,
timestamp=datetime.now(), total_stake=100.0,
)
class ProofOfLeadership(NbeSchema):
entropy_contribution: str
leader_key: List[int]
proof: List[int]
public: Public
voucher_cm: str
@classmethod
def from_random(cls, slot: int = None) -> "ProofOfLeadership":
random_hash_as_list = lambda: [random.randint(0, 255) for _ in range(64)]
return ProofOfLeadership(
entropy_contribution=random_hash(),
leader_key=random_hash_as_list(),
proof=random_hash_as_list(),
public=Public.from_random(slot),
voucher_cm=random_hash(),
)
class Header(NbeSchema):
block_root: str
parent_block: str
proof_of_leadership: ProofOfLeadership
slot: int
@classmethod
def from_random(cls, slot_from: int = 1, slot_to: int = 100) -> "Header":
slot = random.randint(slot_from, slot_to)
return Header(
block_root=random_hash(),
parent_block=random_hash(),
proof_of_leadership=ProofOfLeadership.from_random(slot),
slot=slot,
)
class Block(TimestampedModel, table=True):
__tablename__ = "blocks"
header: Header = Field(sa_column=Column(PydanticJsonColumn(Header), nullable=False))
transactions: List[Transaction] = Field(
default_factory=list, sa_column=Column(PydanticJsonColumn(Transaction, many=True), nullable=False)
)
@property
def slot(self) -> int:
return self.header.slot
def __str__(self) -> str:
return f"Block(slot={self.slot})"
def __repr__(self) -> str:
return f"<Block(id={self.id}, created_at={self.created_at}, slot={self.slot}, parent={self.header['parent_block']})>"
@classmethod
def from_random(cls, slot_from: int = 1, slot_to: int = 100) -> "Block":
n = random.randint(1, 10)
_transactions = [Transaction.from_random() for _ in range(n)]
return Block(
header=Header.from_random(slot_from, slot_to),
transactions=[],
) )

View File

@ -11,3 +11,9 @@ class Health(IdNbeModel):
@classmethod @classmethod
def from_unhealthy(cls) -> "Health": def from_unhealthy(cls) -> "Health":
return cls(healthy=False) return cls(healthy=False)
def __str__(self):
return "Healthy" if self.healthy else "Unhealthy"
def __repr__(self):
return f"<Health(healthy={self.healthy})>"

View File

@ -1,30 +1,84 @@
import random import random
from datetime import datetime from enum import StrEnum
from typing import Optional from typing import List
from sqlalchemy import JSON, Column
from sqlmodel import Field from sqlmodel import Field
from core.models import IdNbeModel from core.models import NbeSchema, TimestampedModel
from utils.random import random_address, random_hash from utils.random import random_address
Value = int
Fr = int
Gas = float
PublicKey = bytes
class Transaction(IdNbeModel, table=True): class Operation(StrEnum):
__tablename__ = "transactions" CHANNEL_INSCRIBE = ("ChannelInscribe",) # (InscriptionOp)
CHANNEL_BLOB = ("ChannelBlob",) # (BlobOp)
CHANNEL_SET_KEYS = ("ChannelSetKeys",) # (SetKeysOp)
NATIVE = ("Native",) # (NativeOp)
SDP_DECLARE = ("SDPDeclare",) # (SDPDeclareOp)
SDP_WITHDRAW = ("SDPWithdraw",) # (SDPWithdrawOp)
SDP_ACTIVE = ("SDPActive",) # (SDPActiveOp)
LEADER_CLAIM = ("LeaderClaim",) # (LeaderClaimOp)
hash: str
block_hash: Optional[str] = Field(default=None, index=True) class Note(NbeSchema):
sender: str value: Value
recipient: str public_key: PublicKey
amount: float
timestamp: datetime = Field(default=None, index=True) @classmethod
def from_random(cls) -> "Note":
return Note(
value=random.randint(1, 100),
public_key=random_address(),
)
class LedgerTransaction(NbeSchema):
"""
Tx
"""
inputs: List[Fr] = Field(default_factory=list, sa_column=Column(JSON, nullable=False))
outputs: List[Note] = Field(default_factory=list, sa_column=Column(JSON, nullable=False))
@classmethod
def from_random(cls) -> "LedgerTransaction":
return LedgerTransaction(
inputs=[random.randint(1, 100) for _ in range(10)],
outputs=[Note.from_random() for _ in range(10)],
)
class Transaction(NbeSchema): # table=true # It currently lives inside Block
"""
MantleTx
"""
# __tablename__ = "transactions"
# TODO: hash
operations: List[str] = Field(alias="ops", default_factory=list, sa_column=Column(JSON, nullable=False))
ledger_transaction: LedgerTransaction = Field(default_factory=dict, sa_column=Column(JSON, nullable=False))
execution_gas_price: Gas
storage_gas_price: Gas
def __str__(self) -> str:
return f"Transaction({self.operations})"
def __repr__(self) -> str:
return f"<Transaction(id={self.id}, created_at={self.created_at}, operations={self.operations})>"
@classmethod @classmethod
def from_random(cls) -> "Transaction": def from_random(cls) -> "Transaction":
n = random.randint(1, 10)
operations = [random.choice(list(Operation)).value for _ in range(n)]
return Transaction( return Transaction(
hash=random_hash(), operations=operations,
block_hash=random_hash(), ledger_transaction=LedgerTransaction.from_random(),
sender=random_address(), execution_gas_price=random.random(),
recipient=random_address(), storage_gas_price=random.random(),
amount=round(random.uniform(0.0001, 100.0), 6),
timestamp=datetime.now(),
) )

View File

@ -22,35 +22,12 @@
.card { background: var(--card); border: 1px solid #20263a; border-radius: 10px; overflow: hidden; } .card { background: var(--card); border: 1px solid #20263a; border-radius: 10px; overflow: hidden; }
.card-header { display:flex; justify-content:space-between; align-items:center; padding:12px 14px; border-bottom: 1px solid #1f2435; } .card-header { display:flex; justify-content:space-between; align-items:center; padding:12px 14px; border-bottom: 1px solid #1f2435; }
/* SCROLLER */ .table-wrapper { overflow: auto; -webkit-overflow-scrolling: touch; max-height: 60vh; scrollbar-gutter: stable both-edges; padding-right: 8px; }
.table-wrapper { table { border-collapse: collapse; table-layout: fixed; width: 100%; font-family: ui-monospace, SFMono-Regular, Menlo, Consolas, monospace; }
overflow-y: auto; .table-wrapper .table--blocks { min-width: 860px; }
overflow-x: auto; /* horizontal scrolling back */ .table-wrapper .table--txs { min-width: 980px; }
-webkit-overflow-scrolling: touch;
max-height: 60vh;
scrollbar-gutter: stable both-edges;
padding-right: 8px;
}
table { th, td { text-align:left; padding:8px 10px; border-bottom:1px solid #1f2435; white-space: nowrap; overflow: hidden; text-overflow: ellipsis; vertical-align: top; }
border-collapse: collapse;
table-layout: fixed;
font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, monospace;
width: 100%;
}
/* Force tables to be wider than the container when needed so horizontal scroll appears */
.table-wrapper .table--blocks { min-width: 560px; }
.table-wrapper .table--txs { min-width: 980px; } /* Hash + From→To + Amount + Time */
th, td {
text-align:left;
padding:8px 10px;
border-bottom:1px solid #1f2435;
vertical-align: top;
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
}
th { color: var(--muted); font-weight: normal; font-size: 13px; position: sticky; top: 0; background: var(--card); z-index: 1; } th { color: var(--muted); font-weight: normal; font-size: 13px; position: sticky; top: 0; background: var(--card); z-index: 1; }
tbody td { height: 28px; } tbody td { height: 28px; }
tr:nth-child(odd) { background: #121728; } tr:nth-child(odd) { background: #121728; }
@ -58,10 +35,13 @@
.twocol { display: grid; grid-template-columns: 1fr 1fr; gap: 20px; margin-top: 16px; } .twocol { display: grid; grid-template-columns: 1fr 1fr; gap: 20px; margin-top: 16px; }
@media (max-width: 960px) { .twocol { grid-template-columns: 1fr; } } @media (max-width: 960px) { .twocol { grid-template-columns: 1fr; } }
.table--txs th:last-child, .table--txs td:last-child { padding-right: 16px; } .mono { font-family: ui-monospace, SFMono-Regular, Menlo, Consolas, monospace; }
.mono { font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, monospace; }
.amount { font-variant-numeric: tabular-nums; } .amount { font-variant-numeric: tabular-nums; }
.linkish { color: var(--fg); text-decoration: none; border-bottom: 1px dotted #2a3350; }
.linkish:hover { border-bottom-color: var(--fg); }
/* Placeholder rows for fixed table height */
tr.ph td { opacity: .35; }
</style> </style>
</head> </head>
<body> <body>
@ -81,15 +61,19 @@
<div class="table-wrapper"> <div class="table-wrapper">
<table class="table--blocks"> <table class="table--blocks">
<colgroup> <colgroup>
<col style="width:100px" /> <col style="width:90px" /> <!-- Slot -->
<col style="width:260px" /> <col style="width:260px" /> <!-- Block Root -->
<col style="width:80px" /> <col style="width:260px" /> <!-- Parent -->
<col style="width:80px" /> <!-- Txs -->
<col style="width:180px" /> <!-- Time -->
</colgroup> </colgroup>
<thead> <thead>
<tr> <tr>
<th>Slot</th> <th>Slot</th>
<th>Hash</th> <th>Block Root</th>
<th>Parent</th>
<th>Txs</th> <th>Txs</th>
<th>Time</th>
</tr> </tr>
</thead> </thead>
<tbody id="blocks-body"></tbody> <tbody id="blocks-body"></tbody>
@ -107,9 +91,9 @@
<table class="table--txs"> <table class="table--txs">
<colgroup> <colgroup>
<col style="width:260px" /> <col style="width:260px" />
<col /> <!-- From → To --> <col />
<col style="width:120px" /> <col style="width:120px" />
<col style="width:180px" /> <!-- Time --> <col style="width:180px" />
</colgroup> </colgroup>
<thead> <thead>
<tr> <tr>
@ -133,9 +117,26 @@
const TXS_ENDPOINT = `${API_PREFIX}/transactions/stream`; const TXS_ENDPOINT = `${API_PREFIX}/transactions/stream`;
const TABLE_SIZE = 10; const TABLE_SIZE = 10;
// ------------------ RELOAD/UNLOAD ABORT PATCH (prevents “Error in input stream”) ------------------
// We keep AbortControllers for each stream and abort them when the page unloads/reloads,
// so the fetch ReadableStreams don't throw TypeError during navigation.
let healthController = null;
let blocksController = null;
let txsController = null;
addEventListener("beforeunload", () => {
healthController?.abort();
blocksController?.abort();
txsController?.abort();
}, { passive: true });
addEventListener("pagehide", () => {
healthController?.abort();
blocksController?.abort();
txsController?.abort();
}, { passive: true });
// --------------------------------------------------------------------------------------------------
// ---- Health pill ---- // ---- Health pill ----
const pill = document.getElementById("status-pill"); const pill = document.getElementById("status-pill");
let healthController = null;
let state = "connecting"; let state = "connecting";
function setState(next) { function setState(next) {
if (next === state) return; if (next === state) return;
@ -156,7 +157,9 @@
if (!res.ok || !res.body) throw new Error(`HTTP ${res.status}`); if (!res.ok || !res.body) throw new Error(`HTTP ${res.status}`);
const reader = res.body.getReader(); const decoder = new TextDecoder(); let buf = ""; const reader = res.body.getReader(); const decoder = new TextDecoder(); let buf = "";
while (true) { while (true) {
const { value, done } = await reader.read(); if (done) break; let chunk;
try { chunk = await reader.read(); } catch { if (healthController.signal.aborted) return; else break; }
const { value, done } = chunk; if (done) break;
buf += decoder.decode(value, { stream: true }); buf += decoder.decode(value, { stream: true });
const lines = buf.split("\n"); buf = lines.pop() ?? ""; const lines = buf.split("\n"); buf = lines.pop() ?? "";
for (const line of lines) { if (!line.trim()) continue; try { applyHealth(JSON.parse(line)); } catch {} } for (const line of lines) { if (!line.trim()) continue; try { applyHealth(JSON.parse(line)); } catch {} }
@ -165,55 +168,35 @@
} }
connectHealth(); connectHealth();
// ---- NDJSON reader ---- // ---- NDJSON reader (shared) ----
async function streamNDJSON(url, onItem, { signal } = {}) { async function streamNDJSON(url, onItem, { signal } = {}) {
const res = await fetch(url, { headers: { "accept": "application/x-ndjson" }, signal, cache: "no-cache" }); const res = await fetch(url, { headers: { "accept": "application/x-ndjson" }, signal, cache: "no-cache" });
if (!res.ok || !res.body) throw new Error(`Stream failed: ${res.status}`); if (!res.ok || !res.body) throw new Error(`Stream failed: ${res.status}`);
const reader = res.body.getReader(); const decoder = new TextDecoder(); let buf = ""; const reader = res.body.getReader(); const decoder = new TextDecoder(); let buf = "";
while (true) { while (true) {
const { value, done } = await reader.read(); if (done) break; let chunk;
try { chunk = await reader.read(); }
catch { if (signal?.aborted) return; else break; } // quiet on navigation abort
const { value, done } = chunk; if (done) break;
buf += decoder.decode(value, { stream: true }); buf += decoder.decode(value, { stream: true });
let idx; let idx;
while ((idx = buf.indexOf("\n")) >= 0) { while ((idx = buf.indexOf("\n")) >= 0) {
const line = buf.slice(0, idx).trim(); buf = buf.slice(idx + 1); const line = buf.slice(0, idx).trim(); buf = buf.slice(idx + 1);
if (!line) continue; try { onItem(JSON.parse(line)); } catch {} if (!line) continue;
try { onItem(JSON.parse(line)); } catch {}
} }
} }
const last = buf.trim(); if (last) { try { onItem(JSON.parse(last)); } catch {} } const last = buf.trim(); if (last) { try { onItem(JSON.parse(last)); } catch {} }
} }
// ---- Table helpers ---- // ---- Helpers ----
function ensureSize(tbody, colCount) { function shortHex(s, left = 10, right = 8) {
while (tbody.rows.length < TABLE_SIZE) { if (!s) return "";
const row = document.createElement("tr"); return s.length <= left + right + 1 ? s : `${s.slice(0,left)}…${s.slice(-right)}`;
for (let i = 0; i < colCount; i++) {
const td = document.createElement("td");
td.innerHTML = "&nbsp;";
row.appendChild(td);
}
tbody.appendChild(row);
}
} }
function appendRow(tbody, cells) {
const tr = document.createElement("tr");
for (const cell of cells) {
const td = document.createElement("td");
if (cell && cell.html !== undefined) td.innerHTML = cell.html;
else td.textContent = String(cell ?? "");
if (cell && cell.class) td.className = cell.class;
tr.appendChild(td);
}
tbody.insertBefore(tr, tbody.firstChild);
while (tbody.rows.length > TABLE_SIZE) tbody.deleteRow(-1);
}
function shortHex(s, left = 6, right = 4) { if (!s) return ""; return s.length <= left + right + 2 ? s : `${s.slice(0,left)}…${s.slice(-right)}`; }
// Flexible timestamp formatter
function fmtTime(ts) { function fmtTime(ts) {
if (ts == null) return ""; if (ts == null) return "";
let d; let d = typeof ts === "number" ? new Date(ts < 1e12 ? ts * 1000 : ts) : new Date(ts);
if (typeof ts === "number") d = new Date(ts < 1e12 ? ts * 1000 : ts);
else d = new Date(ts);
if (isNaN(d)) return ""; if (isNaN(d)) return "";
return d.toLocaleString(undefined, { return d.toLocaleString(undefined, {
year: "numeric", month: "2-digit", day: "2-digit", year: "numeric", month: "2-digit", day: "2-digit",
@ -221,37 +204,114 @@
}); });
} }
// ---- Blocks ---- // Keep table exactly TABLE_SIZE rows using placeholders
function ensureSize(tbody, cols, size) {
// remove existing placeholders first
for (let i = tbody.rows.length - 1; i >= 0; i--) {
if (tbody.rows[i].classList.contains("ph")) tbody.deleteRow(i);
}
let real = tbody.rows.length;
// pad
for (let i = 0; i < size - real; i++) {
const tr = document.createElement("tr"); tr.className = "ph";
for (let c = 0; c < cols; c++) { const td = document.createElement("td"); td.innerHTML = "&nbsp;"; tr.appendChild(td); }
tbody.appendChild(tr);
}
}
// ---- Blocks (dedupe by slot:id + fixed-size table) ----
(function initBlocks() { (function initBlocks() {
const body = document.getElementById("blocks-body"); const body = document.getElementById("blocks-body");
const counter = document.getElementById("blocks-count"); const counter = document.getElementById("blocks-count");
let n = 0; const seen = new Set(); // keys "slot:id"
ensureSize(body, 3);
streamNDJSON(BLOCKS_ENDPOINT, (b) => { function pruneAndPad() {
appendRow(body, [ // remove placeholders
{ html: `<span class="mono">${b.slot}</span>` }, for (let i = body.rows.length - 1; i >= 0; i--) if (body.rows[i].classList.contains("ph")) body.deleteRow(i);
{ html: `<span class="mono" title="${b.hash}">${shortHex(b.hash, 10, 8)}</span>` }, // cap real rows and drop keys for removed rows
b.transaction_count, while ([...body.rows].filter(r => !r.classList.contains("ph")).length > TABLE_SIZE) {
]); const last = body.rows[body.rows.length - 1];
counter.textContent = (++n).toString(); const key = last?.dataset?.key;
}).catch(err => console.error("Blocks stream error:", err)); if (key) seen.delete(key);
body.deleteRow(-1);
}
// pad back to TABLE_SIZE
const real = [...body.rows].filter(r => !r.classList.contains("ph")).length;
for (let i = 0; i < TABLE_SIZE - real; i++) {
const tr = document.createElement("tr"); tr.className = "ph";
for (let c = 0; c < 5; c++) { const td = document.createElement("td"); td.innerHTML = "&nbsp;"; tr.appendChild(td); }
body.appendChild(tr);
}
counter.textContent = String(real);
}
function appendBlockRow(b, key) {
const tr = document.createElement("tr");
tr.dataset.key = key;
const td = (html) => { const x = document.createElement("td"); x.innerHTML = html; return x; };
tr.appendChild(td(`<span class="mono">${b.slot}</span>`));
tr.appendChild(td(`<span class="mono" title="${b.root}">${shortHex(b.root)}</span>`));
tr.appendChild(td(`<span class="mono" title="${b.parent}">${shortHex(b.parent)}</span>`));
tr.appendChild(td(`<span class="mono">${b.txCount}</span>`));
tr.appendChild(td(`<span class="mono" title="${b.time ?? ""}">${fmtTime(b.time)}</span>`));
body.insertBefore(tr, body.firstChild);
pruneAndPad();
}
function normalizeBlock(raw) {
const h = raw.header ?? raw;
const created = raw.created_at ?? raw.header?.created_at ?? null;
return {
id: Number(raw.id ?? 0), // include id for (slot,id) dedupe
slot: Number(h?.slot ?? raw.slot ?? 0),
root: h?.block_root ?? raw.block_root ?? "",
parent: h?.parent_block ?? raw.parent_block ?? "",
txCount: Array.isArray(raw.transactions) ? raw.transactions.length
: (typeof raw.transaction_count === "number" ? raw.transaction_count : 0),
time: created
};
}
ensureSize(body, 5, TABLE_SIZE);
// start stream (with reload-abort controller)
if (blocksController) blocksController.abort();
blocksController = new AbortController();
streamNDJSON(BLOCKS_ENDPOINT, (raw) => {
const b = normalizeBlock(raw);
const key = `${b.slot}:${b.id}`;
if (seen.has(key)) { pruneAndPad(); return; }
seen.add(key);
appendBlockRow(b, key);
}, { signal: blocksController.signal }).catch(err => {
if (!blocksController.signal.aborted) console.error("Blocks stream error:", err);
});
})(); })();
// ---- Transactions ---- // ---- Transactions (kept simple placeholder; adapt to your API shape) ----
(function initTxs() { (function initTxs() {
const body = document.getElementById("txs-body"); const body = document.getElementById("txs-body");
const counter = document.getElementById("txs-count"); const counter = document.getElementById("txs-count");
let n = 0; let n = 0;
ensureSize(body, 4); // 4 columns now ensureSize(body, 4, TABLE_SIZE);
if (txsController) txsController.abort();
txsController = new AbortController();
streamNDJSON(TXS_ENDPOINT, (t) => { streamNDJSON(TXS_ENDPOINT, (t) => {
appendRow(body, [ const tr = document.createElement("tr");
{ html: `<span class="mono" title="${t.hash}">${shortHex(t.hash, 10, 8)}</span>` }, const td = (html) => { const x = document.createElement("td"); x.innerHTML = html; return x; };
{ html: `<span class="mono" title="${t.sender}">${shortHex(t.sender)}</span><span class="mono" title="${t.recipient}">${shortHex(t.recipient)}</span>` }, tr.appendChild(td(`<span class="mono" title="${t.hash ?? ""}">${shortHex(t.hash ?? "")}</span>`));
{ html: `<span class="amount">${Number(t.amount).toLocaleString(undefined, { maximumFractionDigits: 8 })}</span>` }, tr.appendChild(td(`<span class="mono" title="${t.sender ?? ""}">${shortHex(t.sender ?? "")}</span><span class="mono" title="${t.recipient ?? ""}">${shortHex(t.recipient ?? "")}</span>`));
{ html: `<span class="mono" title="${t.timestamp}">${fmtTime(t.timestamp)}</span>` }, tr.appendChild(td(`<span class="amount">${(t.amount ?? 0).toLocaleString(undefined, { maximumFractionDigits: 8 })}</span>`));
]); tr.appendChild(td(`<span class="mono" title="${t.timestamp ?? ""}">${fmtTime(t.timestamp)}</span>`));
counter.textContent = (++n).toString(); body.insertBefore(tr, body.firstChild);
}).catch(err => console.error("Tx stream error:", err)); while (body.rows.length > TABLE_SIZE) body.deleteRow(-1);
counter.textContent = String(++n);
}, { signal: txsController.signal }).catch(err => {
if (!txsController.signal.aborted) console.error("Tx stream error:", err);
});
})(); })();
</script> </script>
</body> </body>