From 2a780a4435f3d06a9516ab63f42487da41a283e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=9F=A3=20=E2=82=AC=E2=82=A5=E2=84=B5=E2=88=AA=E2=84=93?= =?UTF-8?q?=20=E2=9F=A2?= <34749913+emnul@users.noreply.github.com> Date: Thu, 23 Apr 2026 16:47:36 -0400 Subject: [PATCH 1/5] fix(node.http): avoid exhausting ephemeral ports in backfill logic --- README.md | 4 ++-- src/node/api/http.py | 35 ++++++++++++++++------------------- src/node/lifespan.py | 2 ++ 3 files changed, 20 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index bee646a..93c29cb 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,7 @@ B <--> D["Database
(SQLite)"] 2. Run the block explorer: ```bash - uv run python -m main + PYTHONPATH=src uv run python -m main ``` By default, this will try to connect to a local Node running on port 18080. @@ -95,7 +95,7 @@ By default, this will try to connect to a local Node running on port 18080. - If you want to run the Explorer without a Node, make sure to set the `NBE_NODE_API` environment variable to `fake`: 1. ```bash - NBE_NODE_API=fake uv run python -m main + PYTHONPATH=src NBE_NODE_API=fake uv run python -m main ``` 2. ```bash docker run -e NBE_NODE_API=fake -p 8000:8000 nomos-block-explorer diff --git a/src/node/api/http.py b/src/node/api/http.py index cabd3c9..5c8665e 100644 --- a/src/node/api/http.py +++ b/src/node/api/http.py @@ -78,25 +78,22 @@ class HttpNodeApi(NodeApi): async def get_block_by_hash(self, block_hash: str) -> Optional[BlockSerializer]: url = urljoin(self.base_url, self.ENDPOINT_BLOCK_BY_HASH) - response = requests.post( - url, - auth=self.authentication, - timeout=60, - json=block_hash, - ) - if response.status_code == 404: - return None - response.raise_for_status() - json_data = response.json() - if json_data is None: - logger.warning(f"Block {block_hash} returned null from API") - return None - block = BlockSerializer.model_validate(json_data) - # The storage endpoint doesn't include the block hash in the response, - # so we set it from the request body - if not block.header.hash: - block.header.hash = bytes.fromhex(block_hash) - return block + auth = self.authentication.map(lambda _auth: _auth.for_httpx()).unwrap_or(None) + async with httpx.AsyncClient(timeout=60, auth=auth) as client: + response = await client.post(url, json=block_hash) + if response.status_code == 404: + return None + response.raise_for_status() + json_data = response.json() + if json_data is None: + logger.warning(f"Block {block_hash} returned null from API") + return None + block = BlockSerializer.model_validate(json_data) + # The storage endpoint doesn't include the block hash in the response, + # so we set it from the request body + if not block.header.hash: + block.header.hash = bytes.fromhex(block_hash) + return block async def get_blocks_stream(self) -> AsyncIterator[BlockSerializer]: url = urljoin(self.base_url, self.ENDPOINT_BLOCKS_STREAM) diff --git a/src/node/lifespan.py b/src/node/lifespan.py index 0ac8ae0..8f51909 100644 --- a/src/node/lifespan.py +++ b/src/node/lifespan.py @@ -31,6 +31,7 @@ async def backfill_to_lib(app: "NBE") -> None: info = await app.state.node_api.get_info() logger.info(f"Node info: LIB={info.lib}, tip={info.tip}, slot={info.slot}, height={info.height}") + logger.debug(f"Block {info.lib}...") await backfill_chain_from_hash(app, info.lib) return @@ -51,6 +52,7 @@ async def backfill_chain_from_hash(app: "NBE", block_hash: str) -> None: while True: # Check if we already have this block + logger.debug(f"Block {current_hash}...") existing = await app.state.block_repository.get_by_hash(bytes.fromhex(current_hash)) if existing.is_some: logger.debug(f"Block {current_hash[:16]}... already exists, stopping chain walk") From aef4b1b876a859902e18ca24262c9d377e643130 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=9F=A3=20=E2=82=AC=E2=82=A5=E2=84=B5=E2=88=AA=E2=84=93?= =?UTF-8?q?=20=E2=9F=A2?= <34749913+emnul@users.noreply.github.com> Date: Thu, 23 Apr 2026 20:19:22 -0400 Subject: [PATCH 2/5] feat(db): change function signature to avoid unpacking --- src/db/blocks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db/blocks.py b/src/db/blocks.py index 99721a7..76bd326 100644 --- a/src/db/blocks.py +++ b/src/db/blocks.py @@ -47,7 +47,7 @@ class BlockRepository: def __init__(self, client: DbClient): self.client = client - async def create(self, *blocks: Block, allow_chain_root: bool = False) -> None: + async def create(self, blocks: List[Block], allow_chain_root: bool = False) -> None: """ Insert blocks into the database with proper height calculation. From 0430fe0dd99afbf23a8d397412d646d6c4b57172 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=9F=A3=20=E2=82=AC=E2=82=A5=E2=84=B5=E2=88=AA=E2=84=93?= =?UTF-8?q?=20=E2=9F=A2?= <34749913+emnul@users.noreply.github.com> Date: Thu, 23 Apr 2026 20:23:46 -0400 Subject: [PATCH 3/5] feat(node): reuse http connection to avoid spamming connections Adds a new class attribute to NodeHttpApi that instantiates an httpx client to reuse during startup. Adds respective cleanup function for client connection Updates get_block_by_hash to use httpx client --- src/node/api/http.py | 42 ++++++++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/src/node/api/http.py b/src/node/api/http.py index 5c8665e..3be7a69 100644 --- a/src/node/api/http.py +++ b/src/node/api/http.py @@ -35,6 +35,12 @@ class HttpNodeApi(NodeApi): self.authentication: Option[Authentication] = ( Some(settings.node_api_auth) if settings.node_api_auth else Empty() ) + auth = self.authentication.map( + lambda _auth: _auth.for_httpx()).unwrap_or(None) + self._client = httpx.AsyncClient(timeout=self.timeout, auth=auth) + + async def aclose(self) -> None: + await self._client.aclose() @property def base_url(self) -> str: @@ -78,28 +84,28 @@ class HttpNodeApi(NodeApi): async def get_block_by_hash(self, block_hash: str) -> Optional[BlockSerializer]: url = urljoin(self.base_url, self.ENDPOINT_BLOCK_BY_HASH) - auth = self.authentication.map(lambda _auth: _auth.for_httpx()).unwrap_or(None) - async with httpx.AsyncClient(timeout=60, auth=auth) as client: - response = await client.post(url, json=block_hash) - if response.status_code == 404: - return None - response.raise_for_status() - json_data = response.json() - if json_data is None: - logger.warning(f"Block {block_hash} returned null from API") - return None - block = BlockSerializer.model_validate(json_data) - # The storage endpoint doesn't include the block hash in the response, - # so we set it from the request body - if not block.header.hash: - block.header.hash = bytes.fromhex(block_hash) - return block + response = await self._client.post(url, json=block_hash) + if response.status_code == 404: + return None + response.raise_for_status() + json_data = response.json() + if json_data is None: + logger.warning(f"Block {block_hash} returned null from API") + return None + block = BlockSerializer.model_validate(json_data) + # The storage endpoint doesn't include the block hash in the response, + # so we set it from the request body + if not block.header.hash: + block.header.hash = bytes.fromhex(block_hash) + return block async def get_blocks_stream(self) -> AsyncIterator[BlockSerializer]: url = urljoin(self.base_url, self.ENDPOINT_BLOCKS_STREAM) - auth = self.authentication.map(lambda _auth: _auth.for_httpx()).unwrap_or(None) + auth = self.authentication.map( + lambda _auth: _auth.for_httpx()).unwrap_or(None) # Use no read timeout for streaming - blocks may arrive infrequently - stream_timeout = httpx.Timeout(connect=self.timeout, read=None, write=self.timeout, pool=self.timeout) + stream_timeout = httpx.Timeout( + connect=self.timeout, read=None, write=self.timeout, pool=self.timeout) async with httpx.AsyncClient(timeout=stream_timeout, auth=auth) as client: async with client.stream("GET", url) as response: response.raise_for_status() # TODO: Result From e55f7345fc8a237b65a797b1bd0ed89fdbfcd6e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=9F=A3=20=E2=82=AC=E2=82=A5=E2=84=B5=E2=88=AA=E2=84=93?= =?UTF-8?q?=20=E2=9F=A2?= <34749913+emnul@users.noreply.github.com> Date: Thu, 23 Apr 2026 20:28:19 -0400 Subject: [PATCH 4/5] feat(node): implements batching to avoid sqlite query limits Batches updates to sqlite db in chunks of 10k. Well within SQlite query limits (32,766) as of v3.32.0 which is bundled with python 3.9+. Adds cleanup code to terminate httpx client connection in node_api. style: fmt files style: fmt file --- src/node/api/http.py | 9 +++------ src/node/lifespan.py | 29 +++++++++++++++-------------- 2 files changed, 18 insertions(+), 20 deletions(-) diff --git a/src/node/api/http.py b/src/node/api/http.py index 3be7a69..6f469d8 100644 --- a/src/node/api/http.py +++ b/src/node/api/http.py @@ -35,8 +35,7 @@ class HttpNodeApi(NodeApi): self.authentication: Option[Authentication] = ( Some(settings.node_api_auth) if settings.node_api_auth else Empty() ) - auth = self.authentication.map( - lambda _auth: _auth.for_httpx()).unwrap_or(None) + auth = self.authentication.map(lambda _auth: _auth.for_httpx()).unwrap_or(None) self._client = httpx.AsyncClient(timeout=self.timeout, auth=auth) async def aclose(self) -> None: @@ -101,11 +100,9 @@ class HttpNodeApi(NodeApi): async def get_blocks_stream(self) -> AsyncIterator[BlockSerializer]: url = urljoin(self.base_url, self.ENDPOINT_BLOCKS_STREAM) - auth = self.authentication.map( - lambda _auth: _auth.for_httpx()).unwrap_or(None) + auth = self.authentication.map(lambda _auth: _auth.for_httpx()).unwrap_or(None) # Use no read timeout for streaming - blocks may arrive infrequently - stream_timeout = httpx.Timeout( - connect=self.timeout, read=None, write=self.timeout, pool=self.timeout) + stream_timeout = httpx.Timeout(connect=self.timeout, read=None, write=self.timeout, pool=self.timeout) async with httpx.AsyncClient(timeout=stream_timeout, auth=auth) as client: async with client.stream("GET", url) as response: response.raise_for_status() # TODO: Result diff --git a/src/node/lifespan.py b/src/node/lifespan.py index 8f51909..2ef87c7 100644 --- a/src/node/lifespan.py +++ b/src/node/lifespan.py @@ -31,7 +31,6 @@ async def backfill_to_lib(app: "NBE") -> None: info = await app.state.node_api.get_info() logger.info(f"Node info: LIB={info.lib}, tip={info.tip}, slot={info.slot}, height={info.height}") - logger.debug(f"Block {info.lib}...") await backfill_chain_from_hash(app, info.lib) return @@ -52,7 +51,6 @@ async def backfill_chain_from_hash(app: "NBE", block_hash: str) -> None: while True: # Check if we already have this block - logger.debug(f"Block {current_hash}...") existing = await app.state.block_repository.get_by_hash(bytes.fromhex(current_hash)) if existing.is_some: logger.debug(f"Block {current_hash[:16]}... already exists, stopping chain walk") @@ -75,19 +73,18 @@ async def backfill_chain_from_hash(app: "NBE", block_hash: str) -> None: logger.info("No new blocks to backfill") return - # Reverse so we insert from oldest to newest (parent before child) - blocks_to_insert.reverse() - - # Capture slot range before insert (blocks get detached from session after commit) - first_slot = blocks_to_insert[0].slot - last_slot = blocks_to_insert[-1].slot block_count = len(blocks_to_insert) - logger.info(f"Backfilling {block_count} blocks from chain walk...") - - # Insert all blocks, allowing the first one to be a chain root if its parent doesn't exist - await app.state.block_repository.create(*blocks_to_insert, allow_chain_root=True) - logger.info(f"Backfilled {block_count} blocks (slots {first_slot} to {last_slot})") + # Insert all blocks in 10k batches to avoid sqlite query limits + # allowing the first block to be a chain root if its parent doesn't exist + for idx, i in enumerate(range(block_count - 1, -1, -10_000)): + start = max(0, i - 9_999) + batch = blocks_to_insert[start : (i + 1)] + first_slot = batch[0].slot + last_slot = batch[-1].slot + # allow_chain_root true only on first iteration + await app.state.block_repository.create(batch, allow_chain_root=(idx == 0)) + logger.info(f"Backfilled {len(batch)} blocks (slots {first_slot} to {last_slot})") @asynccontextmanager @@ -112,6 +109,10 @@ async def node_lifespan(app: "NBE") -> AsyncGenerator[None]: yield finally: + # Check if node api needs cleanup + if hasattr(app.state.node_api, "aclose"): + logger.info("Closing node_api connections...") + await app.state.node_api.aclose() logger.info("Stopping node...") await app.state.node_manager.stop() logger.info("Node stopped.") @@ -166,7 +167,7 @@ async def subscribe_to_new_blocks(app: "NBE"): block_slot = block.slot # Now we have the parent, store the block - await app.state.block_repository.create(block) + await app.state.block_repository.create([block]) logger.debug(f"Stored block at slot {block_slot}") except Exception as error: From 0d5f801a818e0a3673a7c81931e628b721ea2ef6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=9F=A3=20=E2=82=AC=E2=82=A5=E2=84=B5=E2=88=AA=E2=84=93?= =?UTF-8?q?=20=E2=9F=A2?= <34749913+emnul@users.noreply.github.com> Date: Fri, 24 Apr 2026 14:42:29 -0400 Subject: [PATCH 5/5] refactor(node): simplify batch logic using batched Simplifies batch logic and improves readability using a more functional approach with Python iterators --- src/node/lifespan.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/node/lifespan.py b/src/node/lifespan.py index 2ef87c7..deb37cc 100644 --- a/src/node/lifespan.py +++ b/src/node/lifespan.py @@ -2,6 +2,7 @@ import asyncio import logging from asyncio import create_task from contextlib import asynccontextmanager +from itertools import batched from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterator, List from db.blocks import BlockRepository @@ -16,6 +17,9 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +# Safe insert size for SQLite ^3.32.0 +SQLITE_BATCH_INSERT_SIZE = 10_000 + async def backfill_to_lib(app: "NBE") -> None: """ @@ -77,13 +81,12 @@ async def backfill_chain_from_hash(app: "NBE", block_hash: str) -> None: # Insert all blocks in 10k batches to avoid sqlite query limits # allowing the first block to be a chain root if its parent doesn't exist - for idx, i in enumerate(range(block_count - 1, -1, -10_000)): - start = max(0, i - 9_999) - batch = blocks_to_insert[start : (i + 1)] + + for idx, batch in enumerate(batched(reversed(blocks_to_insert), SQLITE_BATCH_INSERT_SIZE)): first_slot = batch[0].slot last_slot = batch[-1].slot # allow_chain_root true only on first iteration - await app.state.block_repository.create(batch, allow_chain_root=(idx == 0)) + await app.state.block_repository.create(list(batch), allow_chain_root=(idx == 0)) logger.info(f"Backfilled {len(batch)} blocks (slots {first_slot} to {last_slot})")