diff --git a/README.md b/README.md index bee646a..93c29cb 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,7 @@ B <--> D["Database
(SQLite)"] 2. Run the block explorer: ```bash - uv run python -m main + PYTHONPATH=src uv run python -m main ``` By default, this will try to connect to a local Node running on port 18080. @@ -95,7 +95,7 @@ By default, this will try to connect to a local Node running on port 18080. - If you want to run the Explorer without a Node, make sure to set the `NBE_NODE_API` environment variable to `fake`: 1. ```bash - NBE_NODE_API=fake uv run python -m main + PYTHONPATH=src NBE_NODE_API=fake uv run python -m main ``` 2. ```bash docker run -e NBE_NODE_API=fake -p 8000:8000 nomos-block-explorer diff --git a/src/db/blocks.py b/src/db/blocks.py index 8e47a9e..5f7dfcb 100644 --- a/src/db/blocks.py +++ b/src/db/blocks.py @@ -47,7 +47,7 @@ class BlockRepository: def __init__(self, client: DbClient): self.client = client - async def create(self, *blocks: Block, allow_chain_root: bool = False) -> None: + async def create(self, blocks: List[Block], allow_chain_root: bool = False) -> None: """ Insert blocks into the database with proper height calculation. diff --git a/src/node/api/http.py b/src/node/api/http.py index cabd3c9..6f469d8 100644 --- a/src/node/api/http.py +++ b/src/node/api/http.py @@ -35,6 +35,11 @@ class HttpNodeApi(NodeApi): self.authentication: Option[Authentication] = ( Some(settings.node_api_auth) if settings.node_api_auth else Empty() ) + auth = self.authentication.map(lambda _auth: _auth.for_httpx()).unwrap_or(None) + self._client = httpx.AsyncClient(timeout=self.timeout, auth=auth) + + async def aclose(self) -> None: + await self._client.aclose() @property def base_url(self) -> str: @@ -78,12 +83,7 @@ class HttpNodeApi(NodeApi): async def get_block_by_hash(self, block_hash: str) -> Optional[BlockSerializer]: url = urljoin(self.base_url, self.ENDPOINT_BLOCK_BY_HASH) - response = requests.post( - url, - auth=self.authentication, - timeout=60, - json=block_hash, - ) + response = await self._client.post(url, json=block_hash) if response.status_code == 404: return None response.raise_for_status() diff --git a/src/node/lifespan.py b/src/node/lifespan.py index 0ac8ae0..deb37cc 100644 --- a/src/node/lifespan.py +++ b/src/node/lifespan.py @@ -2,6 +2,7 @@ import asyncio import logging from asyncio import create_task from contextlib import asynccontextmanager +from itertools import batched from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterator, List from db.blocks import BlockRepository @@ -16,6 +17,9 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +# Safe insert size for SQLite ^3.32.0 +SQLITE_BATCH_INSERT_SIZE = 10_000 + async def backfill_to_lib(app: "NBE") -> None: """ @@ -73,19 +77,17 @@ async def backfill_chain_from_hash(app: "NBE", block_hash: str) -> None: logger.info("No new blocks to backfill") return - # Reverse so we insert from oldest to newest (parent before child) - blocks_to_insert.reverse() - - # Capture slot range before insert (blocks get detached from session after commit) - first_slot = blocks_to_insert[0].slot - last_slot = blocks_to_insert[-1].slot block_count = len(blocks_to_insert) - logger.info(f"Backfilling {block_count} blocks from chain walk...") + # Insert all blocks in 10k batches to avoid sqlite query limits + # allowing the first block to be a chain root if its parent doesn't exist - # Insert all blocks, allowing the first one to be a chain root if its parent doesn't exist - await app.state.block_repository.create(*blocks_to_insert, allow_chain_root=True) - logger.info(f"Backfilled {block_count} blocks (slots {first_slot} to {last_slot})") + for idx, batch in enumerate(batched(reversed(blocks_to_insert), SQLITE_BATCH_INSERT_SIZE)): + first_slot = batch[0].slot + last_slot = batch[-1].slot + # allow_chain_root true only on first iteration + await app.state.block_repository.create(list(batch), allow_chain_root=(idx == 0)) + logger.info(f"Backfilled {len(batch)} blocks (slots {first_slot} to {last_slot})") @asynccontextmanager @@ -110,6 +112,10 @@ async def node_lifespan(app: "NBE") -> AsyncGenerator[None]: yield finally: + # Check if node api needs cleanup + if hasattr(app.state.node_api, "aclose"): + logger.info("Closing node_api connections...") + await app.state.node_api.aclose() logger.info("Stopping node...") await app.state.node_manager.stop() logger.info("Node stopped.") @@ -164,7 +170,7 @@ async def subscribe_to_new_blocks(app: "NBE"): block_slot = block.slot # Now we have the parent, store the block - await app.state.block_repository.create(block) + await app.state.block_repository.create([block]) logger.debug(f"Stored block at slot {block_slot}") except Exception as error: