feat(node): implements batching to avoid sqlite query limits

Batches updates to sqlite db in chunks of 10k. Well within SQlite query
limits (32,766) as of v3.32.0 which is bundled with python 3.9+.

Adds cleanup code to terminate httpx client connection in node_api.

style: fmt files

style: fmt file
This commit is contained in:
⟣ €₥ℵ∪ℓ ⟢ 2026-04-23 20:28:19 -04:00
parent 0430fe0dd9
commit e55f7345fc
No known key found for this signature in database
GPG Key ID: F57D7381FBAFD773
2 changed files with 18 additions and 20 deletions

View File

@ -35,8 +35,7 @@ class HttpNodeApi(NodeApi):
self.authentication: Option[Authentication] = ( self.authentication: Option[Authentication] = (
Some(settings.node_api_auth) if settings.node_api_auth else Empty() Some(settings.node_api_auth) if settings.node_api_auth else Empty()
) )
auth = self.authentication.map( auth = self.authentication.map(lambda _auth: _auth.for_httpx()).unwrap_or(None)
lambda _auth: _auth.for_httpx()).unwrap_or(None)
self._client = httpx.AsyncClient(timeout=self.timeout, auth=auth) self._client = httpx.AsyncClient(timeout=self.timeout, auth=auth)
async def aclose(self) -> None: async def aclose(self) -> None:
@ -101,11 +100,9 @@ class HttpNodeApi(NodeApi):
async def get_blocks_stream(self) -> AsyncIterator[BlockSerializer]: async def get_blocks_stream(self) -> AsyncIterator[BlockSerializer]:
url = urljoin(self.base_url, self.ENDPOINT_BLOCKS_STREAM) url = urljoin(self.base_url, self.ENDPOINT_BLOCKS_STREAM)
auth = self.authentication.map( auth = self.authentication.map(lambda _auth: _auth.for_httpx()).unwrap_or(None)
lambda _auth: _auth.for_httpx()).unwrap_or(None)
# Use no read timeout for streaming - blocks may arrive infrequently # Use no read timeout for streaming - blocks may arrive infrequently
stream_timeout = httpx.Timeout( stream_timeout = httpx.Timeout(connect=self.timeout, read=None, write=self.timeout, pool=self.timeout)
connect=self.timeout, read=None, write=self.timeout, pool=self.timeout)
async with httpx.AsyncClient(timeout=stream_timeout, auth=auth) as client: async with httpx.AsyncClient(timeout=stream_timeout, auth=auth) as client:
async with client.stream("GET", url) as response: async with client.stream("GET", url) as response:
response.raise_for_status() # TODO: Result response.raise_for_status() # TODO: Result

View File

@ -31,7 +31,6 @@ async def backfill_to_lib(app: "NBE") -> None:
info = await app.state.node_api.get_info() info = await app.state.node_api.get_info()
logger.info(f"Node info: LIB={info.lib}, tip={info.tip}, slot={info.slot}, height={info.height}") logger.info(f"Node info: LIB={info.lib}, tip={info.tip}, slot={info.slot}, height={info.height}")
logger.debug(f"Block {info.lib}...")
await backfill_chain_from_hash(app, info.lib) await backfill_chain_from_hash(app, info.lib)
return return
@ -52,7 +51,6 @@ async def backfill_chain_from_hash(app: "NBE", block_hash: str) -> None:
while True: while True:
# Check if we already have this block # Check if we already have this block
logger.debug(f"Block {current_hash}...")
existing = await app.state.block_repository.get_by_hash(bytes.fromhex(current_hash)) existing = await app.state.block_repository.get_by_hash(bytes.fromhex(current_hash))
if existing.is_some: if existing.is_some:
logger.debug(f"Block {current_hash[:16]}... already exists, stopping chain walk") logger.debug(f"Block {current_hash[:16]}... already exists, stopping chain walk")
@ -75,19 +73,18 @@ async def backfill_chain_from_hash(app: "NBE", block_hash: str) -> None:
logger.info("No new blocks to backfill") logger.info("No new blocks to backfill")
return return
# Reverse so we insert from oldest to newest (parent before child)
blocks_to_insert.reverse()
# Capture slot range before insert (blocks get detached from session after commit)
first_slot = blocks_to_insert[0].slot
last_slot = blocks_to_insert[-1].slot
block_count = len(blocks_to_insert) block_count = len(blocks_to_insert)
logger.info(f"Backfilling {block_count} blocks from chain walk...") # Insert all blocks in 10k batches to avoid sqlite query limits
# allowing the first block to be a chain root if its parent doesn't exist
# Insert all blocks, allowing the first one to be a chain root if its parent doesn't exist for idx, i in enumerate(range(block_count - 1, -1, -10_000)):
await app.state.block_repository.create(*blocks_to_insert, allow_chain_root=True) start = max(0, i - 9_999)
logger.info(f"Backfilled {block_count} blocks (slots {first_slot} to {last_slot})") batch = blocks_to_insert[start : (i + 1)]
first_slot = batch[0].slot
last_slot = batch[-1].slot
# allow_chain_root true only on first iteration
await app.state.block_repository.create(batch, allow_chain_root=(idx == 0))
logger.info(f"Backfilled {len(batch)} blocks (slots {first_slot} to {last_slot})")
@asynccontextmanager @asynccontextmanager
@ -112,6 +109,10 @@ async def node_lifespan(app: "NBE") -> AsyncGenerator[None]:
yield yield
finally: finally:
# Check if node api needs cleanup
if hasattr(app.state.node_api, "aclose"):
logger.info("Closing node_api connections...")
await app.state.node_api.aclose()
logger.info("Stopping node...") logger.info("Stopping node...")
await app.state.node_manager.stop() await app.state.node_manager.stop()
logger.info("Node stopped.") logger.info("Node stopped.")
@ -166,7 +167,7 @@ async def subscribe_to_new_blocks(app: "NBE"):
block_slot = block.slot block_slot = block.slot
# Now we have the parent, store the block # Now we have the parent, store the block
await app.state.block_repository.create(block) await app.state.block_repository.create([block])
logger.debug(f"Stored block at slot {block_slot}") logger.debug(f"Stored block at slot {block_slot}")
except Exception as error: except Exception as error: