mirror of
https://github.com/logos-blockchain/logos-blockchain-block-explorer-template.git
synced 2026-02-16 19:13:25 +00:00
parent-walking based backfilling
This commit is contained in:
parent
1d8c0fdec9
commit
7b8a2fd1ce
@ -12,6 +12,7 @@ class BlockRead(NbeSchema):
|
||||
hash: HexBytes
|
||||
parent_block_hash: HexBytes
|
||||
slot: int
|
||||
height: int
|
||||
block_root: HexBytes
|
||||
proof_of_leadership: ProofOfLeadership
|
||||
transactions: List[Transaction]
|
||||
@ -23,6 +24,7 @@ class BlockRead(NbeSchema):
|
||||
hash=block.hash,
|
||||
parent_block_hash=block.parent_block,
|
||||
slot=block.slot,
|
||||
height=block.height,
|
||||
block_root=block.block_root,
|
||||
proof_of_leadership=block.proof_of_leadership,
|
||||
transactions=block.transactions,
|
||||
|
||||
@ -51,7 +51,6 @@ class NBEState(State):
|
||||
block_repository: BlockRepository
|
||||
transaction_repository: TransactionRepository
|
||||
subscription_to_updates_handle: Task
|
||||
backfill_handle: Task
|
||||
|
||||
@property
|
||||
def is_running(self) -> bool:
|
||||
@ -64,7 +63,6 @@ class NBEState(State):
|
||||
async def _wait_tasks_finished(self):
|
||||
await gather(
|
||||
self.subscription_to_updates_handle,
|
||||
self.backfill_handle,
|
||||
return_exceptions=True,
|
||||
)
|
||||
|
||||
|
||||
143
src/db/blocks.py
143
src/db/blocks.py
@ -1,6 +1,6 @@
|
||||
import logging
|
||||
from asyncio import sleep
|
||||
from typing import AsyncIterator, List
|
||||
from typing import AsyncIterator, Dict, List
|
||||
|
||||
from rusty_results import Empty, Option, Some
|
||||
from sqlalchemy import Result, Select
|
||||
@ -11,30 +11,135 @@ from db.clients import DbClient
|
||||
from models.block import Block
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_latest_statement(limit: int, *, output_ascending: bool = True) -> Select:
|
||||
# Fetch the latest N blocks in descending slot order
|
||||
base = select(Block).order_by(Block.slot.desc(), Block.id.desc()).limit(limit)
|
||||
# Fetch the latest N blocks in descending height order
|
||||
base = select(Block).order_by(Block.height.desc()).limit(limit)
|
||||
if not output_ascending:
|
||||
return base
|
||||
|
||||
# Reorder for output
|
||||
inner = base.subquery()
|
||||
latest = aliased(Block, inner)
|
||||
return select(latest).options().order_by(latest.slot.asc(), latest.id.asc()) # type: ignore[arg-type]
|
||||
return select(latest).options().order_by(latest.height.asc()) # type: ignore[arg-type]
|
||||
|
||||
|
||||
class BlockRepository:
|
||||
"""
|
||||
FIXME: Assumes slots are sequential and one block per slot
|
||||
"""
|
||||
|
||||
def __init__(self, client: DbClient):
|
||||
self.client = client
|
||||
|
||||
async def create(self, *blocks: Block) -> None:
|
||||
async def create(self, *blocks: Block, allow_chain_root: bool = False) -> None:
|
||||
"""
|
||||
Insert blocks into the database with proper height calculation.
|
||||
|
||||
Args:
|
||||
blocks: Blocks to insert
|
||||
allow_chain_root: If True, allow the first block (by slot) to be a chain root
|
||||
even if its parent doesn't exist. Used during chain-walk backfills.
|
||||
"""
|
||||
if not blocks:
|
||||
return
|
||||
|
||||
with self.client.session() as session:
|
||||
session.add_all(list(blocks))
|
||||
session.commit()
|
||||
# Collect all unique parent hashes we need to look up
|
||||
parent_hashes = {block.parent_block for block in blocks}
|
||||
|
||||
# Fetch existing parent blocks to get their heights
|
||||
parent_heights: Dict[bytes, int] = {}
|
||||
if parent_hashes:
|
||||
statement = select(Block).where(Block.hash.in_(parent_hashes))
|
||||
existing_parents = session.exec(statement).all()
|
||||
for parent in existing_parents:
|
||||
parent_heights[parent.hash] = parent.height
|
||||
|
||||
# Also check if any of the blocks we're inserting are parents of others
|
||||
blocks_by_hash = {block.hash: block for block in blocks}
|
||||
|
||||
# Find the chain root candidate (lowest slot block whose parent isn't in the batch)
|
||||
chain_root_hash = None
|
||||
if allow_chain_root:
|
||||
sorted_blocks = sorted(blocks, key=lambda b: b.slot)
|
||||
for block in sorted_blocks:
|
||||
if block.parent_block not in blocks_by_hash and block.parent_block not in parent_heights:
|
||||
chain_root_hash = block.hash
|
||||
break
|
||||
|
||||
# Handle blocks in batch that depend on each other
|
||||
# Resolve dependencies iteratively, skipping orphans
|
||||
resolved = set()
|
||||
orphans = set()
|
||||
max_iterations = len(blocks) * 2 # Prevent infinite loops
|
||||
iterations = 0
|
||||
|
||||
while iterations < max_iterations:
|
||||
iterations += 1
|
||||
made_progress = False
|
||||
|
||||
for block in blocks:
|
||||
if block.hash in resolved or block.hash in orphans:
|
||||
continue
|
||||
|
||||
if block.parent_block in parent_heights:
|
||||
# Parent found in DB or already resolved
|
||||
block.height = parent_heights[block.parent_block] + 1
|
||||
parent_heights[block.hash] = block.height
|
||||
resolved.add(block.hash)
|
||||
made_progress = True
|
||||
elif block.parent_block in blocks_by_hash:
|
||||
parent = blocks_by_hash[block.parent_block]
|
||||
if parent.hash in resolved:
|
||||
# Parent in same batch and already resolved
|
||||
block.height = parent.height + 1
|
||||
parent_heights[block.hash] = block.height
|
||||
resolved.add(block.hash)
|
||||
made_progress = True
|
||||
elif parent.hash in orphans:
|
||||
# Parent is an orphan, so this block is also an orphan
|
||||
orphans.add(block.hash)
|
||||
made_progress = True
|
||||
# else: parent not yet resolved, try again next iteration
|
||||
else:
|
||||
# Parent not found anywhere
|
||||
if block.slot == 0 or block.hash == chain_root_hash:
|
||||
# Genesis block or chain root - no parent requirement
|
||||
block.height = 0
|
||||
parent_heights[block.hash] = block.height
|
||||
resolved.add(block.hash)
|
||||
made_progress = True
|
||||
if block.hash == chain_root_hash:
|
||||
logger.info(
|
||||
f"Chain root block: hash={block.hash.hex()[:16]}..., "
|
||||
f"slot={block.slot}, height=0"
|
||||
)
|
||||
else:
|
||||
# Orphan block - parent doesn't exist
|
||||
logger.warning(
|
||||
f"Dropping orphaned block: hash={block.hash.hex()}, "
|
||||
f"slot={block.slot}, parent={block.parent_block.hex()} (parent not found)"
|
||||
)
|
||||
orphans.add(block.hash)
|
||||
made_progress = True
|
||||
|
||||
# If no progress was made and we still have unresolved blocks, break
|
||||
if not made_progress:
|
||||
break
|
||||
|
||||
# Check for any blocks that couldn't be resolved (circular dependencies or other issues)
|
||||
unresolved = set(block.hash for block in blocks) - resolved - orphans
|
||||
for block in blocks:
|
||||
if block.hash in unresolved:
|
||||
logger.warning(
|
||||
f"Dropping unresolvable block: hash={block.hash.hex()}, "
|
||||
f"slot={block.slot}, parent={block.parent_block.hex()}"
|
||||
)
|
||||
|
||||
# Only add resolved blocks
|
||||
blocks_to_add = [block for block in blocks if block.hash in resolved]
|
||||
if blocks_to_add:
|
||||
session.add_all(blocks_to_add)
|
||||
session.commit()
|
||||
|
||||
async def get_by_id(self, block_id: int) -> Option[Block]:
|
||||
statement = select(Block).where(Block.id == block_id)
|
||||
@ -68,7 +173,7 @@ class BlockRepository:
|
||||
return b
|
||||
|
||||
async def get_earliest(self) -> Option[Block]:
|
||||
statement = select(Block).order_by(Block.slot.asc()).limit(1)
|
||||
statement = select(Block).order_by(Block.height.asc()).limit(1)
|
||||
|
||||
with self.client.session() as session:
|
||||
results: Result[Block] = session.exec(statement)
|
||||
@ -79,7 +184,7 @@ class BlockRepository:
|
||||
|
||||
async def get_paginated(self, page: int, page_size: int) -> tuple[List[Block], int]:
|
||||
"""
|
||||
Get blocks with pagination, ordered by slot descending (newest first).
|
||||
Get blocks with pagination, ordered by height descending (newest first).
|
||||
Returns a tuple of (blocks, total_count).
|
||||
"""
|
||||
offset = page * page_size
|
||||
@ -93,7 +198,7 @@ class BlockRepository:
|
||||
# Get paginated blocks
|
||||
statement = (
|
||||
select(Block)
|
||||
.order_by(Block.slot.desc(), Block.id.desc())
|
||||
.order_by(Block.height.desc())
|
||||
.offset(offset)
|
||||
.limit(page_size)
|
||||
)
|
||||
@ -104,22 +209,20 @@ class BlockRepository:
|
||||
async def updates_stream(
|
||||
self, block_from: Option[Block], *, timeout_seconds: int = 1
|
||||
) -> AsyncIterator[List[Block]]:
|
||||
slot_cursor: int = block_from.map(lambda block: block.slot).unwrap_or(0)
|
||||
id_cursor: int = block_from.map(lambda block: block.id + 1).unwrap_or(0)
|
||||
height_cursor: int = block_from.map(lambda block: block.height + 1).unwrap_or(0)
|
||||
|
||||
while True:
|
||||
statement = (
|
||||
select(Block)
|
||||
.where(Block.slot >= slot_cursor, Block.id >= id_cursor)
|
||||
.order_by(Block.slot.asc(), Block.id.asc())
|
||||
.where(Block.height >= height_cursor)
|
||||
.order_by(Block.height.asc())
|
||||
)
|
||||
|
||||
with self.client.session() as session:
|
||||
blocks: List[Block] = session.exec(statement).all()
|
||||
|
||||
if len(blocks) > 0:
|
||||
slot_cursor = blocks[-1].slot
|
||||
id_cursor = blocks[-1].id + 1
|
||||
height_cursor = blocks[-1].height + 1
|
||||
yield blocks
|
||||
else:
|
||||
await sleep(timeout_seconds)
|
||||
|
||||
@ -1,4 +1,3 @@
|
||||
import logging
|
||||
from asyncio import sleep
|
||||
from typing import AsyncIterator, List
|
||||
|
||||
@ -13,11 +12,11 @@ from models.transactions.transaction import Transaction
|
||||
|
||||
|
||||
def get_latest_statement(limit: int, *, output_ascending: bool, preload_relationships: bool) -> Select:
|
||||
# Join with Block to order by Block's slot and fetch the latest N transactions in descending order
|
||||
# Join with Block to order by Block's height and fetch the latest N transactions in descending order
|
||||
base = (
|
||||
select(Transaction, Block.slot.label("block__slot"), Block.id.label("block__id"))
|
||||
select(Transaction, Block.height.label("block__height"))
|
||||
.join(Block, Transaction.block_id == Block.id)
|
||||
.order_by(Block.slot.desc(), Block.id.desc(), Transaction.id.desc())
|
||||
.order_by(Block.height.desc(), Transaction.id.desc())
|
||||
.limit(limit)
|
||||
)
|
||||
if not output_ascending:
|
||||
@ -26,7 +25,7 @@ def get_latest_statement(limit: int, *, output_ascending: bool, preload_relation
|
||||
# Reorder for output
|
||||
inner = base.subquery()
|
||||
latest = aliased(Transaction, inner)
|
||||
statement = select(latest).order_by(inner.c.block__slot.asc(), inner.c.block__id.asc(), latest.id.asc())
|
||||
statement = select(latest).order_by(inner.c.block__height.asc(), latest.id.asc())
|
||||
if preload_relationships:
|
||||
statement = statement.options(selectinload(latest.block))
|
||||
return statement
|
||||
@ -76,8 +75,7 @@ class TransactionRepository:
|
||||
async def updates_stream(
|
||||
self, transaction_from: Option[Transaction], *, timeout_seconds: int = 1
|
||||
) -> AsyncIterator[List[Transaction]]:
|
||||
slot_cursor = transaction_from.map(lambda transaction: transaction.block.slot).unwrap_or(0)
|
||||
block_id_cursor = transaction_from.map(lambda transaction: transaction.block.id).unwrap_or(0)
|
||||
height_cursor = transaction_from.map(lambda transaction: transaction.block.height).unwrap_or(0)
|
||||
transaction_id_cursor = transaction_from.map(lambda transaction: transaction.id + 1).unwrap_or(0)
|
||||
|
||||
while True:
|
||||
@ -86,19 +84,17 @@ class TransactionRepository:
|
||||
.options(selectinload(Transaction.block))
|
||||
.join(Block, Transaction.block_id == Block.id)
|
||||
.where(
|
||||
Block.slot >= slot_cursor,
|
||||
Block.id >= block_id_cursor,
|
||||
Block.height >= height_cursor,
|
||||
Transaction.id >= transaction_id_cursor,
|
||||
)
|
||||
.order_by(Block.slot.asc(), Block.id.asc(), Transaction.id.asc())
|
||||
.order_by(Block.height.asc(), Transaction.id.asc())
|
||||
)
|
||||
|
||||
with self.client.session() as session:
|
||||
transactions: List[Transaction] = session.exec(statement).all()
|
||||
|
||||
if len(transactions) > 0:
|
||||
slot_cursor = transactions[-1].block.slot
|
||||
block_id_cursor = transactions[-1].block.id
|
||||
height_cursor = transactions[-1].block.height
|
||||
transaction_id_cursor = transactions[-1].id + 1
|
||||
yield transactions
|
||||
else:
|
||||
|
||||
@ -24,6 +24,7 @@ class Block(TimestampedModel, table=True):
|
||||
hash: HexBytes = Field(nullable=False, unique=True)
|
||||
parent_block: HexBytes = Field(nullable=False)
|
||||
slot: int = Field(nullable=False)
|
||||
height: int = Field(nullable=False, default=0)
|
||||
block_root: HexBytes = Field(nullable=False)
|
||||
proof_of_leadership: ProofOfLeadership = Field(
|
||||
sa_column=Column(PydanticJsonColumn(ProofOfLeadership), nullable=False)
|
||||
|
||||
@ -1,8 +1,9 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import TYPE_CHECKING, AsyncIterator, List
|
||||
from typing import TYPE_CHECKING, AsyncIterator, Optional
|
||||
|
||||
from node.api.serializers.block import BlockSerializer
|
||||
from node.api.serializers.health import HealthSerializer
|
||||
from node.api.serializers.info import InfoSerializer
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.app import NBESettings
|
||||
@ -18,9 +19,13 @@ class NodeApi(ABC):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def get_blocks(self, **kwargs) -> List[BlockSerializer]:
|
||||
async def get_info(self) -> InfoSerializer:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def get_blocks_stream(self) -> AsyncIterator[List[BlockSerializer]]:
|
||||
async def get_block_by_hash(self, block_hash: str) -> Optional[BlockSerializer]:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def get_blocks_stream(self) -> AsyncIterator[BlockSerializer]:
|
||||
pass
|
||||
|
||||
@ -1,23 +1,18 @@
|
||||
from asyncio import sleep
|
||||
from random import choices, random
|
||||
from typing import TYPE_CHECKING, AsyncIterator, List
|
||||
from random import random
|
||||
from typing import TYPE_CHECKING, AsyncIterator, Optional
|
||||
|
||||
from rusty_results import Some
|
||||
|
||||
from node.api.base import NodeApi
|
||||
from node.api.serializers.block import BlockSerializer
|
||||
from node.api.serializers.health import HealthSerializer
|
||||
from node.api.serializers.info import InfoSerializer
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.app import NBESettings
|
||||
|
||||
|
||||
def get_weighted_amount() -> int:
|
||||
items = [1, 2, 3]
|
||||
weights = [0.6, 0.3, 0.1]
|
||||
return choices(items, weights=weights, k=1)[0]
|
||||
|
||||
|
||||
class FakeNodeApi(NodeApi):
|
||||
def __init__(self, _settings: "NBESettings"):
|
||||
self.current_slot: int = 0
|
||||
@ -28,12 +23,18 @@ class FakeNodeApi(NodeApi):
|
||||
else:
|
||||
return HealthSerializer.from_healthy()
|
||||
|
||||
async def get_blocks(self, **kwargs) -> List[BlockSerializer]:
|
||||
n = get_weighted_amount()
|
||||
assert n >= 1
|
||||
blocks = [BlockSerializer.from_random() for _ in range(n)]
|
||||
self.current_slot = max(blocks, key=lambda block: block.slot).slot
|
||||
return blocks
|
||||
async def get_info(self) -> InfoSerializer:
|
||||
return InfoSerializer(
|
||||
lib="0" * 64,
|
||||
tip="0" * 64,
|
||||
slot=self.current_slot,
|
||||
height=0,
|
||||
mode="Fake",
|
||||
)
|
||||
|
||||
async def get_block_by_hash(self, block_hash: str) -> Optional[BlockSerializer]:
|
||||
# Fake API doesn't track blocks by hash
|
||||
return None
|
||||
|
||||
async def get_blocks_stream(self) -> AsyncIterator[BlockSerializer]:
|
||||
while True:
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import json
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, AsyncIterator, List, Optional
|
||||
from typing import TYPE_CHECKING, AsyncIterator, Optional
|
||||
from urllib.parse import urljoin, urlunparse
|
||||
|
||||
import httpx
|
||||
@ -12,6 +12,7 @@ from core.authentication import Authentication
|
||||
from node.api.base import NodeApi
|
||||
from node.api.serializers.block import BlockSerializer
|
||||
from node.api.serializers.health import HealthSerializer
|
||||
from node.api.serializers.info import InfoSerializer
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.app import NBESettings
|
||||
@ -23,9 +24,8 @@ logger = logging.getLogger(__name__)
|
||||
class HttpNodeApi(NodeApi):
|
||||
# Paths can't have a leading slash since they are relative to the base URL
|
||||
ENDPOINT_INFO = "cryptarchia/info"
|
||||
ENDPOINT_TRANSACTIONS = "cryptarchia/transactions"
|
||||
ENDPOINT_BLOCKS = "cryptarchia/blocks"
|
||||
ENDPOINT_BLOCKS_STREAM = "cryptarchia/events/blocks/stream"
|
||||
ENDPOINT_BLOCK_BY_HASH = "storage/block"
|
||||
|
||||
def __init__(self, settings: "NBESettings"):
|
||||
self.host: str = settings.node_api_host
|
||||
@ -70,19 +70,35 @@ class HttpNodeApi(NodeApi):
|
||||
else:
|
||||
return HealthSerializer.from_unhealthy()
|
||||
|
||||
async def get_blocks(self, slot_from: int, slot_to: int) -> List[BlockSerializer]:
|
||||
query_string = f"slot_from={slot_from}&slot_to={slot_to}"
|
||||
endpoint = urljoin(self.base_url, self.ENDPOINT_BLOCKS)
|
||||
url = f"{endpoint}?{query_string}"
|
||||
async def get_info(self) -> InfoSerializer:
|
||||
url = urljoin(self.base_url, self.ENDPOINT_INFO)
|
||||
response = requests.get(url, auth=self.authentication, timeout=60)
|
||||
python_json = response.json()
|
||||
blocks = [BlockSerializer.model_validate(item) for item in python_json]
|
||||
return blocks
|
||||
response.raise_for_status()
|
||||
return InfoSerializer.model_validate(response.json())
|
||||
|
||||
async def get_block_by_hash(self, block_hash: str) -> Optional[BlockSerializer]:
|
||||
url = urljoin(self.base_url, f"{self.ENDPOINT_BLOCK_BY_HASH}/{block_hash}")
|
||||
response = requests.get(url, auth=self.authentication, timeout=60)
|
||||
if response.status_code == 404:
|
||||
return None
|
||||
response.raise_for_status()
|
||||
json_data = response.json()
|
||||
if json_data is None:
|
||||
logger.warning(f"Block {block_hash} returned null from API")
|
||||
return None
|
||||
block = BlockSerializer.model_validate(json_data)
|
||||
# The storage endpoint doesn't include the block hash in the response,
|
||||
# so we set it from the URL parameter
|
||||
if not block.header.hash:
|
||||
block.header.hash = bytes.fromhex(block_hash)
|
||||
return block
|
||||
|
||||
async def get_blocks_stream(self) -> AsyncIterator[BlockSerializer]:
|
||||
url = urljoin(self.base_url, self.ENDPOINT_BLOCKS_STREAM)
|
||||
auth = self.authentication.map(lambda _auth: _auth.for_httpx()).unwrap_or(None)
|
||||
async with httpx.AsyncClient(timeout=self.timeout, auth=auth) as client:
|
||||
# Use no read timeout for streaming - blocks may arrive infrequently
|
||||
stream_timeout = httpx.Timeout(connect=self.timeout, read=None, write=self.timeout, pool=self.timeout)
|
||||
async with httpx.AsyncClient(timeout=stream_timeout, auth=auth) as client:
|
||||
async with client.stream("GET", url) as response:
|
||||
response.raise_for_status() # TODO: Result
|
||||
|
||||
|
||||
@ -15,7 +15,7 @@ from utils.random import random_hash
|
||||
|
||||
|
||||
class HeaderSerializer(NbeSerializer, FromRandom):
|
||||
hash: BytesFromHex = Field(alias="id", description="Hash id in hex format.")
|
||||
hash: BytesFromHex = Field(default=b"", alias="id", description="Hash id in hex format.")
|
||||
parent_block: BytesFromHex = Field(description="Hash in hex format.")
|
||||
slot: int = Field(description="Integer in u64 format.")
|
||||
block_root: BytesFromHex = Field(description="Hash in hex format.")
|
||||
|
||||
@ -1,16 +1,13 @@
|
||||
import logging
|
||||
from asyncio import TaskGroup, create_task, sleep
|
||||
from asyncio import create_task
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterator, List
|
||||
|
||||
from rusty_results import Option
|
||||
|
||||
from db.blocks import BlockRepository
|
||||
from db.clients import SqliteClient
|
||||
from db.transaction import TransactionRepository
|
||||
from models.block import Block
|
||||
from node.api.builder import build_node_api
|
||||
from node.api.serializers.block import BlockSerializer
|
||||
from node.manager.builder import build_node_manager
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@ -19,6 +16,69 @@ if TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def backfill_to_lib(app: "NBE") -> None:
|
||||
"""
|
||||
Fetch the LIB (Last Irreversible Block) from the node and backfill by walking the chain backwards.
|
||||
This traverses parent links instead of querying by slot range, which handles pruned/missing blocks.
|
||||
"""
|
||||
try:
|
||||
info = await app.state.node_api.get_info()
|
||||
logger.info(f"Node info: LIB={info.lib}, tip={info.tip}, slot={info.slot}, height={info.height}")
|
||||
|
||||
await backfill_chain_from_hash(app, info.lib)
|
||||
|
||||
except Exception as error:
|
||||
logger.exception(f"Error during initial backfill to LIB: {error}")
|
||||
# Don't raise - we can still try to subscribe to new blocks
|
||||
|
||||
|
||||
async def backfill_chain_from_hash(app: "NBE", block_hash: str) -> None:
|
||||
"""
|
||||
Walk the chain backwards from block_hash, fetching blocks until we hit
|
||||
a block we already have or a genesis block (parent doesn't exist).
|
||||
"""
|
||||
blocks_to_insert: List[Block] = []
|
||||
current_hash = block_hash
|
||||
|
||||
while True:
|
||||
# Check if we already have this block
|
||||
existing = await app.state.block_repository.get_by_hash(bytes.fromhex(current_hash))
|
||||
if existing.is_some:
|
||||
logger.debug(f"Block {current_hash[:16]}... already exists, stopping chain walk")
|
||||
break
|
||||
|
||||
# Fetch the block from the node
|
||||
block_serializer = await app.state.node_api.get_block_by_hash(current_hash)
|
||||
if block_serializer is None:
|
||||
logger.info(f"Block {current_hash[:16]}... not found on node (likely genesis parent), stopping chain walk")
|
||||
break
|
||||
|
||||
block = block_serializer.into_block()
|
||||
blocks_to_insert.append(block)
|
||||
logger.debug(f"Queued block at slot {block.slot} (hash={current_hash[:16]}...) for insertion")
|
||||
|
||||
# Move to parent
|
||||
current_hash = block.parent_block.hex()
|
||||
|
||||
if not blocks_to_insert:
|
||||
logger.info("No new blocks to backfill")
|
||||
return
|
||||
|
||||
# Reverse so we insert from oldest to newest (parent before child)
|
||||
blocks_to_insert.reverse()
|
||||
|
||||
# Capture slot range before insert (blocks get detached from session after commit)
|
||||
first_slot = blocks_to_insert[0].slot
|
||||
last_slot = blocks_to_insert[-1].slot
|
||||
block_count = len(blocks_to_insert)
|
||||
|
||||
logger.info(f"Backfilling {block_count} blocks from chain walk...")
|
||||
|
||||
# Insert all blocks, allowing the first one to be a chain root if its parent doesn't exist
|
||||
await app.state.block_repository.create(*blocks_to_insert, allow_chain_root=True)
|
||||
logger.info(f"Backfilled {block_count} blocks (slots {first_slot} to {last_slot})")
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def node_lifespan(app: "NBE") -> AsyncGenerator[None]:
|
||||
app.state.node_manager = build_node_manager(app.settings)
|
||||
@ -34,8 +94,10 @@ async def node_lifespan(app: "NBE") -> AsyncGenerator[None]:
|
||||
await app.state.node_manager.start()
|
||||
logger.info("Node started.")
|
||||
|
||||
app.state.subscription_to_updates_handle = create_task(subscribe_to_updates(app))
|
||||
app.state.backfill = create_task(backfill(app))
|
||||
# Backfill to LIB on startup
|
||||
await backfill_to_lib(app)
|
||||
|
||||
app.state.subscription_to_updates_handle = create_task(subscribe_to_new_blocks(app))
|
||||
|
||||
yield
|
||||
finally:
|
||||
@ -44,35 +106,6 @@ async def node_lifespan(app: "NBE") -> AsyncGenerator[None]:
|
||||
logger.info("Node stopped.")
|
||||
|
||||
|
||||
# ================
|
||||
# BACKFILLING
|
||||
# ================
|
||||
# Legend:
|
||||
# BT = Block and/or Transaction
|
||||
# Steps:
|
||||
# 1. Subscribe to new BT and store them in the database.
|
||||
# 2. Backfill gaps between the earliest received BT from subscription (step 1.) and the latest BT in the database.
|
||||
# 3. Backfill gaps between the earliest BT in the database and genesis BT (slot 0).
|
||||
# Assumptions:
|
||||
# - BT are always filled correctly.
|
||||
# - There's at most 1 gap in the BT sequence: From genesis to earliest received BT from subscription.
|
||||
# - Slots are populated fully or not at all (no partial slots).
|
||||
# Notes:
|
||||
# - Upsert always.
|
||||
|
||||
# ================
|
||||
# Fake
|
||||
_SUBSCRIPTION_START_SLOT = 5 # Simplification for now.
|
||||
# ================
|
||||
|
||||
|
||||
async def subscribe_to_updates(app: "NBE") -> None:
|
||||
logger.info("✅ Subscription to new blocks and transactions started.")
|
||||
async with TaskGroup() as tg:
|
||||
tg.create_task(subscribe_to_new_blocks(app))
|
||||
logger.info("Subscription to new blocks and transactions finished.")
|
||||
|
||||
|
||||
async def _gracefully_close_stream(stream: AsyncIterator) -> None:
|
||||
aclose = getattr(stream, "aclose", None)
|
||||
if aclose is not None:
|
||||
@ -83,15 +116,17 @@ async def _gracefully_close_stream(stream: AsyncIterator) -> None:
|
||||
|
||||
|
||||
async def subscribe_to_new_blocks(app: "NBE"):
|
||||
blocks_stream: AsyncGenerator[BlockSerializer] = app.state.node_api.get_blocks_stream() # type: ignore[call-arg]
|
||||
logger.info("Subscription to new blocks started.")
|
||||
blocks_stream = app.state.node_api.get_blocks_stream()
|
||||
|
||||
try:
|
||||
while app.state.is_running:
|
||||
try:
|
||||
block_serializer = await anext(blocks_stream) # TODO: Use anext's Sentinel?
|
||||
block_serializer = await anext(blocks_stream)
|
||||
except TimeoutError:
|
||||
continue
|
||||
except StopAsyncIteration:
|
||||
logger.error(f"Subscription to the new blocks stream ended unexpectedly. Please restart the node.")
|
||||
logger.error("Subscription to the new blocks stream ended unexpectedly. Please restart the node.")
|
||||
break
|
||||
except Exception as error:
|
||||
logger.exception(f"Error while fetching new blocks: {error}")
|
||||
@ -99,52 +134,31 @@ async def subscribe_to_new_blocks(app: "NBE"):
|
||||
|
||||
try:
|
||||
block = block_serializer.into_block()
|
||||
|
||||
# Check if parent exists in DB
|
||||
parent_exists = (await app.state.block_repository.get_by_hash(block.parent_block)).is_some
|
||||
|
||||
if not parent_exists:
|
||||
# Need to backfill the chain from this block's parent
|
||||
logger.info(f"Parent block not found for block at slot {block.slot}. Initiating chain backfill...")
|
||||
await backfill_chain_from_hash(app, block.parent_block.hex())
|
||||
|
||||
# Re-check if parent now exists after backfill
|
||||
parent_exists = (await app.state.block_repository.get_by_hash(block.parent_block)).is_some
|
||||
if not parent_exists:
|
||||
logger.warning(f"Parent block still not found after backfill for block at slot {block.slot}. Skipping block.")
|
||||
continue
|
||||
|
||||
# Capture values before create() detaches the block from the session
|
||||
block_slot = block.slot
|
||||
|
||||
# Now we have the parent, store the block
|
||||
await app.state.block_repository.create(block)
|
||||
logger.debug(f"Stored block at slot {block_slot}")
|
||||
|
||||
except Exception as error:
|
||||
logger.exception(f"Error while storing new block: {error}")
|
||||
finally:
|
||||
await _gracefully_close_stream(blocks_stream)
|
||||
|
||||
|
||||
async def backfill(app: "NBE") -> None:
|
||||
logger.info("Backfilling started.")
|
||||
async with TaskGroup() as tg:
|
||||
tg.create_task(backfill_blocks(app, db_hit_interval_seconds=3))
|
||||
logger.info("✅ Backfilling finished.")
|
||||
|
||||
|
||||
async def get_earliest_block_slot(app: "NBE") -> Option[int]:
|
||||
earliest_block: Option[Block] = await app.state.block_repository.get_earliest()
|
||||
return earliest_block.map(lambda block: block.slot)
|
||||
|
||||
|
||||
async def backfill_blocks(app: "NBE", *, db_hit_interval_seconds: int, batch_size: int = 50):
|
||||
"""
|
||||
FIXME: This is a very naive implementation:
|
||||
- One block per slot.
|
||||
- There's at most one gap to backfill (from genesis to earliest block).
|
||||
FIXME: First block received is slot=2
|
||||
"""
|
||||
logger.info("Checking for block gaps to backfill...")
|
||||
# Hit the database until we get a block
|
||||
while (earliest_block_slot_option := await get_earliest_block_slot(app)).is_empty:
|
||||
logger.debug("No blocks were found in the database yet. Waiting...")
|
||||
await sleep(db_hit_interval_seconds)
|
||||
earliest_block_slot: int = earliest_block_slot_option.unwrap()
|
||||
|
||||
if earliest_block_slot == 0:
|
||||
logger.info("No blocks to backfill.")
|
||||
return
|
||||
|
||||
slot_to = earliest_block_slot - 1
|
||||
logger.info(f"Backfilling blocks from slot {slot_to} down to 0...")
|
||||
while slot_to > 0:
|
||||
slot_from = max(0, slot_to - batch_size)
|
||||
blocks_serializers: List[BlockSerializer] = await app.state.node_api.get_blocks(
|
||||
slot_from=slot_from, slot_to=slot_to
|
||||
)
|
||||
blocks: List[Block] = [block_serializer.into_block() for block_serializer in blocks_serializers]
|
||||
logger.debug(f"Backfilling {len(blocks)} blocks from slot {slot_from} to {slot_to}...")
|
||||
await app.state.block_repository.create(*blocks)
|
||||
slot_to = slot_from - 1
|
||||
logger.info("Backfilling blocks completed.")
|
||||
logger.info("Subscription to new blocks finished.")
|
||||
|
||||
@ -17,7 +17,7 @@ function AppShell(props) {
|
||||
return h(
|
||||
Fragment,
|
||||
null,
|
||||
h('header', null, h('h1', null, 'Nomos Block Explorer'), h(HealthPill, null)),
|
||||
h('header', null, h('h1', null, 'λ Blockchain Block Explorer'), h(HealthPill, null)),
|
||||
props.children,
|
||||
);
|
||||
}
|
||||
|
||||
@ -15,6 +15,7 @@ const normalize = (raw) => {
|
||||
|
||||
return {
|
||||
id: Number(raw.id ?? 0),
|
||||
height: Number(raw.height ?? 0),
|
||||
slot: Number(raw.slot ?? header?.slot ?? 0),
|
||||
hash: raw.hash ?? header?.hash ?? '',
|
||||
parent: raw.parent_block_hash ?? header?.parent_block ?? raw.parent_block ?? '',
|
||||
@ -146,6 +147,8 @@ export default function BlocksTable() {
|
||||
shortenHex(b.hash),
|
||||
),
|
||||
),
|
||||
// Height
|
||||
h('td', null, h('span', { class: 'mono' }, String(b.height))),
|
||||
// Slot
|
||||
h('td', null, h('span', { class: 'mono' }, String(b.slot))),
|
||||
// Parent
|
||||
@ -182,6 +185,7 @@ export default function BlocksTable() {
|
||||
h('td', null, '\u00A0'),
|
||||
h('td', null, '\u00A0'),
|
||||
h('td', null, '\u00A0'),
|
||||
h('td', null, '\u00A0'),
|
||||
);
|
||||
};
|
||||
|
||||
@ -244,11 +248,12 @@ export default function BlocksTable() {
|
||||
h(
|
||||
'colgroup',
|
||||
null,
|
||||
h('col', { style: 'width:240px' }), // Hash
|
||||
h('col', { style: 'width:90px' }), // Slot
|
||||
h('col', { style: 'width:240px' }), // Parent
|
||||
h('col', { style: 'width:240px' }), // Block Root
|
||||
h('col', { style: 'width:120px' }), // Transactions
|
||||
h('col', { style: 'width:200px' }), // Hash
|
||||
h('col', { style: 'width:70px' }), // Height
|
||||
h('col', { style: 'width:80px' }), // Slot
|
||||
h('col', { style: 'width:200px' }), // Parent
|
||||
h('col', { style: 'width:200px' }), // Block Root
|
||||
h('col', { style: 'width:100px' }), // Transactions
|
||||
),
|
||||
h(
|
||||
'thead',
|
||||
@ -257,6 +262,7 @@ export default function BlocksTable() {
|
||||
'tr',
|
||||
null,
|
||||
h('th', null, 'Hash'),
|
||||
h('th', null, 'Height'),
|
||||
h('th', null, 'Slot'),
|
||||
h('th', null, 'Parent'),
|
||||
h('th', null, 'Block Root'),
|
||||
|
||||
@ -1,10 +1,10 @@
|
||||
<!doctype html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<title>Nomos Block Explorer</title>
|
||||
<title>λ Blockchain Block Explorer</title>
|
||||
<meta charset="utf-8" />
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1" />
|
||||
<meta name="description" content="Lightweight Nomos block Explorer UI" />
|
||||
<meta name="description" content="Lightweight λ Blockchain block Explorer UI" />
|
||||
|
||||
<!-- Styles -->
|
||||
<link rel="stylesheet" href="/static/styles.css" />
|
||||
|
||||
@ -133,6 +133,7 @@ export default function BlockDetailPage({ parameters }) {
|
||||
const transactions = Array.isArray(block?.transactions) ? block.transactions : [];
|
||||
|
||||
// Prefer new top-level fields; fallback to legacy header.*
|
||||
const height = block?.height ?? null;
|
||||
const slot = block?.slot ?? header?.slot ?? null;
|
||||
const blockRoot = block?.block_root ?? header?.block_root ?? '';
|
||||
const currentBlockHash = block?.hash ?? header?.hash ?? '';
|
||||
@ -185,6 +186,7 @@ export default function BlockDetailPage({ parameters }) {
|
||||
h(
|
||||
'div',
|
||||
{ style: 'margin-left:auto; display:flex; gap:8px; flex-wrap:wrap;' },
|
||||
height != null && h('span', { class: 'pill', title: 'Height' }, `Height ${String(height)}`),
|
||||
slot != null && h('span', { class: 'pill', title: 'Slot' }, `Slot ${String(slot)}`),
|
||||
),
|
||||
),
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user