From 7b8a2fd1ce68cee58c675c994d76f7081537c840 Mon Sep 17 00:00:00 2001 From: David Rusu Date: Thu, 5 Feb 2026 19:45:42 +0400 Subject: [PATCH] parent-walking based backfilling --- src/api/v1/serializers/blocks.py | 2 + src/core/app.py | 2 - src/db/blocks.py | 143 +++++++++++++++++++---- src/db/transaction.py | 20 ++-- src/models/block.py | 1 + src/node/api/base.py | 11 +- src/node/api/fake.py | 29 ++--- src/node/api/http.py | 38 +++++-- src/node/api/serializers/header.py | 2 +- src/node/lifespan.py | 176 ++++++++++++++++------------- static/app.js | 2 +- static/components/BlocksTable.js | 16 ++- static/index.html | 4 +- static/pages/BlockDetail.js | 2 + 14 files changed, 296 insertions(+), 152 deletions(-) diff --git a/src/api/v1/serializers/blocks.py b/src/api/v1/serializers/blocks.py index 611ff51..3c9cb48 100644 --- a/src/api/v1/serializers/blocks.py +++ b/src/api/v1/serializers/blocks.py @@ -12,6 +12,7 @@ class BlockRead(NbeSchema): hash: HexBytes parent_block_hash: HexBytes slot: int + height: int block_root: HexBytes proof_of_leadership: ProofOfLeadership transactions: List[Transaction] @@ -23,6 +24,7 @@ class BlockRead(NbeSchema): hash=block.hash, parent_block_hash=block.parent_block, slot=block.slot, + height=block.height, block_root=block.block_root, proof_of_leadership=block.proof_of_leadership, transactions=block.transactions, diff --git a/src/core/app.py b/src/core/app.py index eb4a835..ddb5ef2 100644 --- a/src/core/app.py +++ b/src/core/app.py @@ -51,7 +51,6 @@ class NBEState(State): block_repository: BlockRepository transaction_repository: TransactionRepository subscription_to_updates_handle: Task - backfill_handle: Task @property def is_running(self) -> bool: @@ -64,7 +63,6 @@ class NBEState(State): async def _wait_tasks_finished(self): await gather( self.subscription_to_updates_handle, - self.backfill_handle, return_exceptions=True, ) diff --git a/src/db/blocks.py b/src/db/blocks.py index 5f5e191..85b347b 100644 --- a/src/db/blocks.py +++ b/src/db/blocks.py @@ -1,6 +1,6 @@ import logging from asyncio import sleep -from typing import AsyncIterator, List +from typing import AsyncIterator, Dict, List from rusty_results import Empty, Option, Some from sqlalchemy import Result, Select @@ -11,30 +11,135 @@ from db.clients import DbClient from models.block import Block +logger = logging.getLogger(__name__) + + def get_latest_statement(limit: int, *, output_ascending: bool = True) -> Select: - # Fetch the latest N blocks in descending slot order - base = select(Block).order_by(Block.slot.desc(), Block.id.desc()).limit(limit) + # Fetch the latest N blocks in descending height order + base = select(Block).order_by(Block.height.desc()).limit(limit) if not output_ascending: return base # Reorder for output inner = base.subquery() latest = aliased(Block, inner) - return select(latest).options().order_by(latest.slot.asc(), latest.id.asc()) # type: ignore[arg-type] + return select(latest).options().order_by(latest.height.asc()) # type: ignore[arg-type] class BlockRepository: - """ - FIXME: Assumes slots are sequential and one block per slot - """ - def __init__(self, client: DbClient): self.client = client - async def create(self, *blocks: Block) -> None: + async def create(self, *blocks: Block, allow_chain_root: bool = False) -> None: + """ + Insert blocks into the database with proper height calculation. + + Args: + blocks: Blocks to insert + allow_chain_root: If True, allow the first block (by slot) to be a chain root + even if its parent doesn't exist. Used during chain-walk backfills. + """ + if not blocks: + return + with self.client.session() as session: - session.add_all(list(blocks)) - session.commit() + # Collect all unique parent hashes we need to look up + parent_hashes = {block.parent_block for block in blocks} + + # Fetch existing parent blocks to get their heights + parent_heights: Dict[bytes, int] = {} + if parent_hashes: + statement = select(Block).where(Block.hash.in_(parent_hashes)) + existing_parents = session.exec(statement).all() + for parent in existing_parents: + parent_heights[parent.hash] = parent.height + + # Also check if any of the blocks we're inserting are parents of others + blocks_by_hash = {block.hash: block for block in blocks} + + # Find the chain root candidate (lowest slot block whose parent isn't in the batch) + chain_root_hash = None + if allow_chain_root: + sorted_blocks = sorted(blocks, key=lambda b: b.slot) + for block in sorted_blocks: + if block.parent_block not in blocks_by_hash and block.parent_block not in parent_heights: + chain_root_hash = block.hash + break + + # Handle blocks in batch that depend on each other + # Resolve dependencies iteratively, skipping orphans + resolved = set() + orphans = set() + max_iterations = len(blocks) * 2 # Prevent infinite loops + iterations = 0 + + while iterations < max_iterations: + iterations += 1 + made_progress = False + + for block in blocks: + if block.hash in resolved or block.hash in orphans: + continue + + if block.parent_block in parent_heights: + # Parent found in DB or already resolved + block.height = parent_heights[block.parent_block] + 1 + parent_heights[block.hash] = block.height + resolved.add(block.hash) + made_progress = True + elif block.parent_block in blocks_by_hash: + parent = blocks_by_hash[block.parent_block] + if parent.hash in resolved: + # Parent in same batch and already resolved + block.height = parent.height + 1 + parent_heights[block.hash] = block.height + resolved.add(block.hash) + made_progress = True + elif parent.hash in orphans: + # Parent is an orphan, so this block is also an orphan + orphans.add(block.hash) + made_progress = True + # else: parent not yet resolved, try again next iteration + else: + # Parent not found anywhere + if block.slot == 0 or block.hash == chain_root_hash: + # Genesis block or chain root - no parent requirement + block.height = 0 + parent_heights[block.hash] = block.height + resolved.add(block.hash) + made_progress = True + if block.hash == chain_root_hash: + logger.info( + f"Chain root block: hash={block.hash.hex()[:16]}..., " + f"slot={block.slot}, height=0" + ) + else: + # Orphan block - parent doesn't exist + logger.warning( + f"Dropping orphaned block: hash={block.hash.hex()}, " + f"slot={block.slot}, parent={block.parent_block.hex()} (parent not found)" + ) + orphans.add(block.hash) + made_progress = True + + # If no progress was made and we still have unresolved blocks, break + if not made_progress: + break + + # Check for any blocks that couldn't be resolved (circular dependencies or other issues) + unresolved = set(block.hash for block in blocks) - resolved - orphans + for block in blocks: + if block.hash in unresolved: + logger.warning( + f"Dropping unresolvable block: hash={block.hash.hex()}, " + f"slot={block.slot}, parent={block.parent_block.hex()}" + ) + + # Only add resolved blocks + blocks_to_add = [block for block in blocks if block.hash in resolved] + if blocks_to_add: + session.add_all(blocks_to_add) + session.commit() async def get_by_id(self, block_id: int) -> Option[Block]: statement = select(Block).where(Block.id == block_id) @@ -68,7 +173,7 @@ class BlockRepository: return b async def get_earliest(self) -> Option[Block]: - statement = select(Block).order_by(Block.slot.asc()).limit(1) + statement = select(Block).order_by(Block.height.asc()).limit(1) with self.client.session() as session: results: Result[Block] = session.exec(statement) @@ -79,7 +184,7 @@ class BlockRepository: async def get_paginated(self, page: int, page_size: int) -> tuple[List[Block], int]: """ - Get blocks with pagination, ordered by slot descending (newest first). + Get blocks with pagination, ordered by height descending (newest first). Returns a tuple of (blocks, total_count). """ offset = page * page_size @@ -93,7 +198,7 @@ class BlockRepository: # Get paginated blocks statement = ( select(Block) - .order_by(Block.slot.desc(), Block.id.desc()) + .order_by(Block.height.desc()) .offset(offset) .limit(page_size) ) @@ -104,22 +209,20 @@ class BlockRepository: async def updates_stream( self, block_from: Option[Block], *, timeout_seconds: int = 1 ) -> AsyncIterator[List[Block]]: - slot_cursor: int = block_from.map(lambda block: block.slot).unwrap_or(0) - id_cursor: int = block_from.map(lambda block: block.id + 1).unwrap_or(0) + height_cursor: int = block_from.map(lambda block: block.height + 1).unwrap_or(0) while True: statement = ( select(Block) - .where(Block.slot >= slot_cursor, Block.id >= id_cursor) - .order_by(Block.slot.asc(), Block.id.asc()) + .where(Block.height >= height_cursor) + .order_by(Block.height.asc()) ) with self.client.session() as session: blocks: List[Block] = session.exec(statement).all() if len(blocks) > 0: - slot_cursor = blocks[-1].slot - id_cursor = blocks[-1].id + 1 + height_cursor = blocks[-1].height + 1 yield blocks else: await sleep(timeout_seconds) diff --git a/src/db/transaction.py b/src/db/transaction.py index 667cd5b..698bbe4 100644 --- a/src/db/transaction.py +++ b/src/db/transaction.py @@ -1,4 +1,3 @@ -import logging from asyncio import sleep from typing import AsyncIterator, List @@ -13,11 +12,11 @@ from models.transactions.transaction import Transaction def get_latest_statement(limit: int, *, output_ascending: bool, preload_relationships: bool) -> Select: - # Join with Block to order by Block's slot and fetch the latest N transactions in descending order + # Join with Block to order by Block's height and fetch the latest N transactions in descending order base = ( - select(Transaction, Block.slot.label("block__slot"), Block.id.label("block__id")) + select(Transaction, Block.height.label("block__height")) .join(Block, Transaction.block_id == Block.id) - .order_by(Block.slot.desc(), Block.id.desc(), Transaction.id.desc()) + .order_by(Block.height.desc(), Transaction.id.desc()) .limit(limit) ) if not output_ascending: @@ -26,7 +25,7 @@ def get_latest_statement(limit: int, *, output_ascending: bool, preload_relation # Reorder for output inner = base.subquery() latest = aliased(Transaction, inner) - statement = select(latest).order_by(inner.c.block__slot.asc(), inner.c.block__id.asc(), latest.id.asc()) + statement = select(latest).order_by(inner.c.block__height.asc(), latest.id.asc()) if preload_relationships: statement = statement.options(selectinload(latest.block)) return statement @@ -76,8 +75,7 @@ class TransactionRepository: async def updates_stream( self, transaction_from: Option[Transaction], *, timeout_seconds: int = 1 ) -> AsyncIterator[List[Transaction]]: - slot_cursor = transaction_from.map(lambda transaction: transaction.block.slot).unwrap_or(0) - block_id_cursor = transaction_from.map(lambda transaction: transaction.block.id).unwrap_or(0) + height_cursor = transaction_from.map(lambda transaction: transaction.block.height).unwrap_or(0) transaction_id_cursor = transaction_from.map(lambda transaction: transaction.id + 1).unwrap_or(0) while True: @@ -86,19 +84,17 @@ class TransactionRepository: .options(selectinload(Transaction.block)) .join(Block, Transaction.block_id == Block.id) .where( - Block.slot >= slot_cursor, - Block.id >= block_id_cursor, + Block.height >= height_cursor, Transaction.id >= transaction_id_cursor, ) - .order_by(Block.slot.asc(), Block.id.asc(), Transaction.id.asc()) + .order_by(Block.height.asc(), Transaction.id.asc()) ) with self.client.session() as session: transactions: List[Transaction] = session.exec(statement).all() if len(transactions) > 0: - slot_cursor = transactions[-1].block.slot - block_id_cursor = transactions[-1].block.id + height_cursor = transactions[-1].block.height transaction_id_cursor = transactions[-1].id + 1 yield transactions else: diff --git a/src/models/block.py b/src/models/block.py index 5833918..4323de0 100644 --- a/src/models/block.py +++ b/src/models/block.py @@ -24,6 +24,7 @@ class Block(TimestampedModel, table=True): hash: HexBytes = Field(nullable=False, unique=True) parent_block: HexBytes = Field(nullable=False) slot: int = Field(nullable=False) + height: int = Field(nullable=False, default=0) block_root: HexBytes = Field(nullable=False) proof_of_leadership: ProofOfLeadership = Field( sa_column=Column(PydanticJsonColumn(ProofOfLeadership), nullable=False) diff --git a/src/node/api/base.py b/src/node/api/base.py index 70987cb..f7280b7 100644 --- a/src/node/api/base.py +++ b/src/node/api/base.py @@ -1,8 +1,9 @@ from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, AsyncIterator, List +from typing import TYPE_CHECKING, AsyncIterator, Optional from node.api.serializers.block import BlockSerializer from node.api.serializers.health import HealthSerializer +from node.api.serializers.info import InfoSerializer if TYPE_CHECKING: from core.app import NBESettings @@ -18,9 +19,13 @@ class NodeApi(ABC): pass @abstractmethod - async def get_blocks(self, **kwargs) -> List[BlockSerializer]: + async def get_info(self) -> InfoSerializer: pass @abstractmethod - async def get_blocks_stream(self) -> AsyncIterator[List[BlockSerializer]]: + async def get_block_by_hash(self, block_hash: str) -> Optional[BlockSerializer]: + pass + + @abstractmethod + async def get_blocks_stream(self) -> AsyncIterator[BlockSerializer]: pass diff --git a/src/node/api/fake.py b/src/node/api/fake.py index 5789641..9dc6c77 100644 --- a/src/node/api/fake.py +++ b/src/node/api/fake.py @@ -1,23 +1,18 @@ from asyncio import sleep -from random import choices, random -from typing import TYPE_CHECKING, AsyncIterator, List +from random import random +from typing import TYPE_CHECKING, AsyncIterator, Optional from rusty_results import Some from node.api.base import NodeApi from node.api.serializers.block import BlockSerializer from node.api.serializers.health import HealthSerializer +from node.api.serializers.info import InfoSerializer if TYPE_CHECKING: from core.app import NBESettings -def get_weighted_amount() -> int: - items = [1, 2, 3] - weights = [0.6, 0.3, 0.1] - return choices(items, weights=weights, k=1)[0] - - class FakeNodeApi(NodeApi): def __init__(self, _settings: "NBESettings"): self.current_slot: int = 0 @@ -28,12 +23,18 @@ class FakeNodeApi(NodeApi): else: return HealthSerializer.from_healthy() - async def get_blocks(self, **kwargs) -> List[BlockSerializer]: - n = get_weighted_amount() - assert n >= 1 - blocks = [BlockSerializer.from_random() for _ in range(n)] - self.current_slot = max(blocks, key=lambda block: block.slot).slot - return blocks + async def get_info(self) -> InfoSerializer: + return InfoSerializer( + lib="0" * 64, + tip="0" * 64, + slot=self.current_slot, + height=0, + mode="Fake", + ) + + async def get_block_by_hash(self, block_hash: str) -> Optional[BlockSerializer]: + # Fake API doesn't track blocks by hash + return None async def get_blocks_stream(self) -> AsyncIterator[BlockSerializer]: while True: diff --git a/src/node/api/http.py b/src/node/api/http.py index aa317cc..b144dda 100644 --- a/src/node/api/http.py +++ b/src/node/api/http.py @@ -1,6 +1,6 @@ import json import logging -from typing import TYPE_CHECKING, AsyncIterator, List, Optional +from typing import TYPE_CHECKING, AsyncIterator, Optional from urllib.parse import urljoin, urlunparse import httpx @@ -12,6 +12,7 @@ from core.authentication import Authentication from node.api.base import NodeApi from node.api.serializers.block import BlockSerializer from node.api.serializers.health import HealthSerializer +from node.api.serializers.info import InfoSerializer if TYPE_CHECKING: from core.app import NBESettings @@ -23,9 +24,8 @@ logger = logging.getLogger(__name__) class HttpNodeApi(NodeApi): # Paths can't have a leading slash since they are relative to the base URL ENDPOINT_INFO = "cryptarchia/info" - ENDPOINT_TRANSACTIONS = "cryptarchia/transactions" - ENDPOINT_BLOCKS = "cryptarchia/blocks" ENDPOINT_BLOCKS_STREAM = "cryptarchia/events/blocks/stream" + ENDPOINT_BLOCK_BY_HASH = "storage/block" def __init__(self, settings: "NBESettings"): self.host: str = settings.node_api_host @@ -70,19 +70,35 @@ class HttpNodeApi(NodeApi): else: return HealthSerializer.from_unhealthy() - async def get_blocks(self, slot_from: int, slot_to: int) -> List[BlockSerializer]: - query_string = f"slot_from={slot_from}&slot_to={slot_to}" - endpoint = urljoin(self.base_url, self.ENDPOINT_BLOCKS) - url = f"{endpoint}?{query_string}" + async def get_info(self) -> InfoSerializer: + url = urljoin(self.base_url, self.ENDPOINT_INFO) response = requests.get(url, auth=self.authentication, timeout=60) - python_json = response.json() - blocks = [BlockSerializer.model_validate(item) for item in python_json] - return blocks + response.raise_for_status() + return InfoSerializer.model_validate(response.json()) + + async def get_block_by_hash(self, block_hash: str) -> Optional[BlockSerializer]: + url = urljoin(self.base_url, f"{self.ENDPOINT_BLOCK_BY_HASH}/{block_hash}") + response = requests.get(url, auth=self.authentication, timeout=60) + if response.status_code == 404: + return None + response.raise_for_status() + json_data = response.json() + if json_data is None: + logger.warning(f"Block {block_hash} returned null from API") + return None + block = BlockSerializer.model_validate(json_data) + # The storage endpoint doesn't include the block hash in the response, + # so we set it from the URL parameter + if not block.header.hash: + block.header.hash = bytes.fromhex(block_hash) + return block async def get_blocks_stream(self) -> AsyncIterator[BlockSerializer]: url = urljoin(self.base_url, self.ENDPOINT_BLOCKS_STREAM) auth = self.authentication.map(lambda _auth: _auth.for_httpx()).unwrap_or(None) - async with httpx.AsyncClient(timeout=self.timeout, auth=auth) as client: + # Use no read timeout for streaming - blocks may arrive infrequently + stream_timeout = httpx.Timeout(connect=self.timeout, read=None, write=self.timeout, pool=self.timeout) + async with httpx.AsyncClient(timeout=stream_timeout, auth=auth) as client: async with client.stream("GET", url) as response: response.raise_for_status() # TODO: Result diff --git a/src/node/api/serializers/header.py b/src/node/api/serializers/header.py index c7268dc..642f4cf 100644 --- a/src/node/api/serializers/header.py +++ b/src/node/api/serializers/header.py @@ -15,7 +15,7 @@ from utils.random import random_hash class HeaderSerializer(NbeSerializer, FromRandom): - hash: BytesFromHex = Field(alias="id", description="Hash id in hex format.") + hash: BytesFromHex = Field(default=b"", alias="id", description="Hash id in hex format.") parent_block: BytesFromHex = Field(description="Hash in hex format.") slot: int = Field(description="Integer in u64 format.") block_root: BytesFromHex = Field(description="Hash in hex format.") diff --git a/src/node/lifespan.py b/src/node/lifespan.py index f81fcdd..6c3db97 100644 --- a/src/node/lifespan.py +++ b/src/node/lifespan.py @@ -1,16 +1,13 @@ import logging -from asyncio import TaskGroup, create_task, sleep +from asyncio import create_task from contextlib import asynccontextmanager from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterator, List -from rusty_results import Option - from db.blocks import BlockRepository from db.clients import SqliteClient from db.transaction import TransactionRepository from models.block import Block from node.api.builder import build_node_api -from node.api.serializers.block import BlockSerializer from node.manager.builder import build_node_manager if TYPE_CHECKING: @@ -19,6 +16,69 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +async def backfill_to_lib(app: "NBE") -> None: + """ + Fetch the LIB (Last Irreversible Block) from the node and backfill by walking the chain backwards. + This traverses parent links instead of querying by slot range, which handles pruned/missing blocks. + """ + try: + info = await app.state.node_api.get_info() + logger.info(f"Node info: LIB={info.lib}, tip={info.tip}, slot={info.slot}, height={info.height}") + + await backfill_chain_from_hash(app, info.lib) + + except Exception as error: + logger.exception(f"Error during initial backfill to LIB: {error}") + # Don't raise - we can still try to subscribe to new blocks + + +async def backfill_chain_from_hash(app: "NBE", block_hash: str) -> None: + """ + Walk the chain backwards from block_hash, fetching blocks until we hit + a block we already have or a genesis block (parent doesn't exist). + """ + blocks_to_insert: List[Block] = [] + current_hash = block_hash + + while True: + # Check if we already have this block + existing = await app.state.block_repository.get_by_hash(bytes.fromhex(current_hash)) + if existing.is_some: + logger.debug(f"Block {current_hash[:16]}... already exists, stopping chain walk") + break + + # Fetch the block from the node + block_serializer = await app.state.node_api.get_block_by_hash(current_hash) + if block_serializer is None: + logger.info(f"Block {current_hash[:16]}... not found on node (likely genesis parent), stopping chain walk") + break + + block = block_serializer.into_block() + blocks_to_insert.append(block) + logger.debug(f"Queued block at slot {block.slot} (hash={current_hash[:16]}...) for insertion") + + # Move to parent + current_hash = block.parent_block.hex() + + if not blocks_to_insert: + logger.info("No new blocks to backfill") + return + + # Reverse so we insert from oldest to newest (parent before child) + blocks_to_insert.reverse() + + # Capture slot range before insert (blocks get detached from session after commit) + first_slot = blocks_to_insert[0].slot + last_slot = blocks_to_insert[-1].slot + block_count = len(blocks_to_insert) + + logger.info(f"Backfilling {block_count} blocks from chain walk...") + + # Insert all blocks, allowing the first one to be a chain root if its parent doesn't exist + await app.state.block_repository.create(*blocks_to_insert, allow_chain_root=True) + logger.info(f"Backfilled {block_count} blocks (slots {first_slot} to {last_slot})") + + @asynccontextmanager async def node_lifespan(app: "NBE") -> AsyncGenerator[None]: app.state.node_manager = build_node_manager(app.settings) @@ -34,8 +94,10 @@ async def node_lifespan(app: "NBE") -> AsyncGenerator[None]: await app.state.node_manager.start() logger.info("Node started.") - app.state.subscription_to_updates_handle = create_task(subscribe_to_updates(app)) - app.state.backfill = create_task(backfill(app)) + # Backfill to LIB on startup + await backfill_to_lib(app) + + app.state.subscription_to_updates_handle = create_task(subscribe_to_new_blocks(app)) yield finally: @@ -44,35 +106,6 @@ async def node_lifespan(app: "NBE") -> AsyncGenerator[None]: logger.info("Node stopped.") -# ================ -# BACKFILLING -# ================ -# Legend: -# BT = Block and/or Transaction -# Steps: -# 1. Subscribe to new BT and store them in the database. -# 2. Backfill gaps between the earliest received BT from subscription (step 1.) and the latest BT in the database. -# 3. Backfill gaps between the earliest BT in the database and genesis BT (slot 0). -# Assumptions: -# - BT are always filled correctly. -# - There's at most 1 gap in the BT sequence: From genesis to earliest received BT from subscription. -# - Slots are populated fully or not at all (no partial slots). -# Notes: -# - Upsert always. - -# ================ -# Fake -_SUBSCRIPTION_START_SLOT = 5 # Simplification for now. -# ================ - - -async def subscribe_to_updates(app: "NBE") -> None: - logger.info("✅ Subscription to new blocks and transactions started.") - async with TaskGroup() as tg: - tg.create_task(subscribe_to_new_blocks(app)) - logger.info("Subscription to new blocks and transactions finished.") - - async def _gracefully_close_stream(stream: AsyncIterator) -> None: aclose = getattr(stream, "aclose", None) if aclose is not None: @@ -83,15 +116,17 @@ async def _gracefully_close_stream(stream: AsyncIterator) -> None: async def subscribe_to_new_blocks(app: "NBE"): - blocks_stream: AsyncGenerator[BlockSerializer] = app.state.node_api.get_blocks_stream() # type: ignore[call-arg] + logger.info("Subscription to new blocks started.") + blocks_stream = app.state.node_api.get_blocks_stream() + try: while app.state.is_running: try: - block_serializer = await anext(blocks_stream) # TODO: Use anext's Sentinel? + block_serializer = await anext(blocks_stream) except TimeoutError: continue except StopAsyncIteration: - logger.error(f"Subscription to the new blocks stream ended unexpectedly. Please restart the node.") + logger.error("Subscription to the new blocks stream ended unexpectedly. Please restart the node.") break except Exception as error: logger.exception(f"Error while fetching new blocks: {error}") @@ -99,52 +134,31 @@ async def subscribe_to_new_blocks(app: "NBE"): try: block = block_serializer.into_block() + + # Check if parent exists in DB + parent_exists = (await app.state.block_repository.get_by_hash(block.parent_block)).is_some + + if not parent_exists: + # Need to backfill the chain from this block's parent + logger.info(f"Parent block not found for block at slot {block.slot}. Initiating chain backfill...") + await backfill_chain_from_hash(app, block.parent_block.hex()) + + # Re-check if parent now exists after backfill + parent_exists = (await app.state.block_repository.get_by_hash(block.parent_block)).is_some + if not parent_exists: + logger.warning(f"Parent block still not found after backfill for block at slot {block.slot}. Skipping block.") + continue + + # Capture values before create() detaches the block from the session + block_slot = block.slot + + # Now we have the parent, store the block await app.state.block_repository.create(block) + logger.debug(f"Stored block at slot {block_slot}") + except Exception as error: logger.exception(f"Error while storing new block: {error}") finally: await _gracefully_close_stream(blocks_stream) - -async def backfill(app: "NBE") -> None: - logger.info("Backfilling started.") - async with TaskGroup() as tg: - tg.create_task(backfill_blocks(app, db_hit_interval_seconds=3)) - logger.info("✅ Backfilling finished.") - - -async def get_earliest_block_slot(app: "NBE") -> Option[int]: - earliest_block: Option[Block] = await app.state.block_repository.get_earliest() - return earliest_block.map(lambda block: block.slot) - - -async def backfill_blocks(app: "NBE", *, db_hit_interval_seconds: int, batch_size: int = 50): - """ - FIXME: This is a very naive implementation: - - One block per slot. - - There's at most one gap to backfill (from genesis to earliest block). - FIXME: First block received is slot=2 - """ - logger.info("Checking for block gaps to backfill...") - # Hit the database until we get a block - while (earliest_block_slot_option := await get_earliest_block_slot(app)).is_empty: - logger.debug("No blocks were found in the database yet. Waiting...") - await sleep(db_hit_interval_seconds) - earliest_block_slot: int = earliest_block_slot_option.unwrap() - - if earliest_block_slot == 0: - logger.info("No blocks to backfill.") - return - - slot_to = earliest_block_slot - 1 - logger.info(f"Backfilling blocks from slot {slot_to} down to 0...") - while slot_to > 0: - slot_from = max(0, slot_to - batch_size) - blocks_serializers: List[BlockSerializer] = await app.state.node_api.get_blocks( - slot_from=slot_from, slot_to=slot_to - ) - blocks: List[Block] = [block_serializer.into_block() for block_serializer in blocks_serializers] - logger.debug(f"Backfilling {len(blocks)} blocks from slot {slot_from} to {slot_to}...") - await app.state.block_repository.create(*blocks) - slot_to = slot_from - 1 - logger.info("Backfilling blocks completed.") + logger.info("Subscription to new blocks finished.") diff --git a/static/app.js b/static/app.js index 273e91d..f83f9e8 100644 --- a/static/app.js +++ b/static/app.js @@ -17,7 +17,7 @@ function AppShell(props) { return h( Fragment, null, - h('header', null, h('h1', null, 'Nomos Block Explorer'), h(HealthPill, null)), + h('header', null, h('h1', null, 'λ Blockchain Block Explorer'), h(HealthPill, null)), props.children, ); } diff --git a/static/components/BlocksTable.js b/static/components/BlocksTable.js index b3390c6..9765b43 100644 --- a/static/components/BlocksTable.js +++ b/static/components/BlocksTable.js @@ -15,6 +15,7 @@ const normalize = (raw) => { return { id: Number(raw.id ?? 0), + height: Number(raw.height ?? 0), slot: Number(raw.slot ?? header?.slot ?? 0), hash: raw.hash ?? header?.hash ?? '', parent: raw.parent_block_hash ?? header?.parent_block ?? raw.parent_block ?? '', @@ -146,6 +147,8 @@ export default function BlocksTable() { shortenHex(b.hash), ), ), + // Height + h('td', null, h('span', { class: 'mono' }, String(b.height))), // Slot h('td', null, h('span', { class: 'mono' }, String(b.slot))), // Parent @@ -182,6 +185,7 @@ export default function BlocksTable() { h('td', null, '\u00A0'), h('td', null, '\u00A0'), h('td', null, '\u00A0'), + h('td', null, '\u00A0'), ); }; @@ -244,11 +248,12 @@ export default function BlocksTable() { h( 'colgroup', null, - h('col', { style: 'width:240px' }), // Hash - h('col', { style: 'width:90px' }), // Slot - h('col', { style: 'width:240px' }), // Parent - h('col', { style: 'width:240px' }), // Block Root - h('col', { style: 'width:120px' }), // Transactions + h('col', { style: 'width:200px' }), // Hash + h('col', { style: 'width:70px' }), // Height + h('col', { style: 'width:80px' }), // Slot + h('col', { style: 'width:200px' }), // Parent + h('col', { style: 'width:200px' }), // Block Root + h('col', { style: 'width:100px' }), // Transactions ), h( 'thead', @@ -257,6 +262,7 @@ export default function BlocksTable() { 'tr', null, h('th', null, 'Hash'), + h('th', null, 'Height'), h('th', null, 'Slot'), h('th', null, 'Parent'), h('th', null, 'Block Root'), diff --git a/static/index.html b/static/index.html index 26f6a0d..840b0e7 100644 --- a/static/index.html +++ b/static/index.html @@ -1,10 +1,10 @@ - Nomos Block Explorer + λ Blockchain Block Explorer - + diff --git a/static/pages/BlockDetail.js b/static/pages/BlockDetail.js index cfaa6e3..1d103a9 100644 --- a/static/pages/BlockDetail.js +++ b/static/pages/BlockDetail.js @@ -133,6 +133,7 @@ export default function BlockDetailPage({ parameters }) { const transactions = Array.isArray(block?.transactions) ? block.transactions : []; // Prefer new top-level fields; fallback to legacy header.* + const height = block?.height ?? null; const slot = block?.slot ?? header?.slot ?? null; const blockRoot = block?.block_root ?? header?.block_root ?? ''; const currentBlockHash = block?.hash ?? header?.hash ?? ''; @@ -185,6 +186,7 @@ export default function BlockDetailPage({ parameters }) { h( 'div', { style: 'margin-left:auto; display:flex; gap:8px; flex-wrap:wrap;' }, + height != null && h('span', { class: 'pill', title: 'Height' }, `Height ${String(height)}`), slot != null && h('span', { class: 'pill', title: 'Slot' }, `Slot ${String(slot)}`), ), ),