Merge pull request #14 from emnul/fix/node-http

fix(node.http): Fix node HTTP backfill reliability and performance
This commit is contained in:
davidrusu 2026-04-28 09:48:59 +04:00 committed by GitHub
commit 3e61dc8d80
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 26 additions and 20 deletions

View File

@ -84,7 +84,7 @@ B <--> D["Database<br/>(SQLite)"]
2. Run the block explorer:
```bash
uv run python -m main
PYTHONPATH=src uv run python -m main
```
By default, this will try to connect to a local Node running on port 18080.
@ -95,7 +95,7 @@ By default, this will try to connect to a local Node running on port 18080.
- If you want to run the Explorer without a Node, make sure to set the `NBE_NODE_API` environment variable to `fake`:
1. ```bash
NBE_NODE_API=fake uv run python -m main
PYTHONPATH=src NBE_NODE_API=fake uv run python -m main
```
2. ```bash
docker run -e NBE_NODE_API=fake -p 8000:8000 nomos-block-explorer

View File

@ -47,7 +47,7 @@ class BlockRepository:
def __init__(self, client: DbClient):
self.client = client
async def create(self, *blocks: Block, allow_chain_root: bool = False) -> None:
async def create(self, blocks: List[Block], allow_chain_root: bool = False) -> None:
"""
Insert blocks into the database with proper height calculation.

View File

@ -35,6 +35,11 @@ class HttpNodeApi(NodeApi):
self.authentication: Option[Authentication] = (
Some(settings.node_api_auth) if settings.node_api_auth else Empty()
)
auth = self.authentication.map(lambda _auth: _auth.for_httpx()).unwrap_or(None)
self._client = httpx.AsyncClient(timeout=self.timeout, auth=auth)
async def aclose(self) -> None:
await self._client.aclose()
@property
def base_url(self) -> str:
@ -78,12 +83,7 @@ class HttpNodeApi(NodeApi):
async def get_block_by_hash(self, block_hash: str) -> Optional[BlockSerializer]:
url = urljoin(self.base_url, self.ENDPOINT_BLOCK_BY_HASH)
response = requests.post(
url,
auth=self.authentication,
timeout=60,
json=block_hash,
)
response = await self._client.post(url, json=block_hash)
if response.status_code == 404:
return None
response.raise_for_status()

View File

@ -2,6 +2,7 @@ import asyncio
import logging
from asyncio import create_task
from contextlib import asynccontextmanager
from itertools import batched
from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterator, List
from db.blocks import BlockRepository
@ -16,6 +17,9 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
# Safe insert size for SQLite ^3.32.0
SQLITE_BATCH_INSERT_SIZE = 10_000
async def backfill_to_lib(app: "NBE") -> None:
"""
@ -73,19 +77,17 @@ async def backfill_chain_from_hash(app: "NBE", block_hash: str) -> None:
logger.info("No new blocks to backfill")
return
# Reverse so we insert from oldest to newest (parent before child)
blocks_to_insert.reverse()
# Capture slot range before insert (blocks get detached from session after commit)
first_slot = blocks_to_insert[0].slot
last_slot = blocks_to_insert[-1].slot
block_count = len(blocks_to_insert)
logger.info(f"Backfilling {block_count} blocks from chain walk...")
# Insert all blocks in 10k batches to avoid sqlite query limits
# allowing the first block to be a chain root if its parent doesn't exist
# Insert all blocks, allowing the first one to be a chain root if its parent doesn't exist
await app.state.block_repository.create(*blocks_to_insert, allow_chain_root=True)
logger.info(f"Backfilled {block_count} blocks (slots {first_slot} to {last_slot})")
for idx, batch in enumerate(batched(reversed(blocks_to_insert), SQLITE_BATCH_INSERT_SIZE)):
first_slot = batch[0].slot
last_slot = batch[-1].slot
# allow_chain_root true only on first iteration
await app.state.block_repository.create(list(batch), allow_chain_root=(idx == 0))
logger.info(f"Backfilled {len(batch)} blocks (slots {first_slot} to {last_slot})")
@asynccontextmanager
@ -110,6 +112,10 @@ async def node_lifespan(app: "NBE") -> AsyncGenerator[None]:
yield
finally:
# Check if node api needs cleanup
if hasattr(app.state.node_api, "aclose"):
logger.info("Closing node_api connections...")
await app.state.node_api.aclose()
logger.info("Stopping node...")
await app.state.node_manager.stop()
logger.info("Node stopped.")
@ -164,7 +170,7 @@ async def subscribe_to_new_blocks(app: "NBE"):
block_slot = block.slot
# Now we have the parent, store the block
await app.state.block_repository.create(block)
await app.state.block_repository.create([block])
logger.debug(f"Stored block at slot {block_slot}")
except Exception as error: