diff --git a/src/node/api/http.py b/src/node/api/http.py index 3be7a69..6f469d8 100644 --- a/src/node/api/http.py +++ b/src/node/api/http.py @@ -35,8 +35,7 @@ class HttpNodeApi(NodeApi): self.authentication: Option[Authentication] = ( Some(settings.node_api_auth) if settings.node_api_auth else Empty() ) - auth = self.authentication.map( - lambda _auth: _auth.for_httpx()).unwrap_or(None) + auth = self.authentication.map(lambda _auth: _auth.for_httpx()).unwrap_or(None) self._client = httpx.AsyncClient(timeout=self.timeout, auth=auth) async def aclose(self) -> None: @@ -101,11 +100,9 @@ class HttpNodeApi(NodeApi): async def get_blocks_stream(self) -> AsyncIterator[BlockSerializer]: url = urljoin(self.base_url, self.ENDPOINT_BLOCKS_STREAM) - auth = self.authentication.map( - lambda _auth: _auth.for_httpx()).unwrap_or(None) + auth = self.authentication.map(lambda _auth: _auth.for_httpx()).unwrap_or(None) # Use no read timeout for streaming - blocks may arrive infrequently - stream_timeout = httpx.Timeout( - connect=self.timeout, read=None, write=self.timeout, pool=self.timeout) + stream_timeout = httpx.Timeout(connect=self.timeout, read=None, write=self.timeout, pool=self.timeout) async with httpx.AsyncClient(timeout=stream_timeout, auth=auth) as client: async with client.stream("GET", url) as response: response.raise_for_status() # TODO: Result diff --git a/src/node/lifespan.py b/src/node/lifespan.py index 8f51909..2ef87c7 100644 --- a/src/node/lifespan.py +++ b/src/node/lifespan.py @@ -31,7 +31,6 @@ async def backfill_to_lib(app: "NBE") -> None: info = await app.state.node_api.get_info() logger.info(f"Node info: LIB={info.lib}, tip={info.tip}, slot={info.slot}, height={info.height}") - logger.debug(f"Block {info.lib}...") await backfill_chain_from_hash(app, info.lib) return @@ -52,7 +51,6 @@ async def backfill_chain_from_hash(app: "NBE", block_hash: str) -> None: while True: # Check if we already have this block - logger.debug(f"Block {current_hash}...") existing = await app.state.block_repository.get_by_hash(bytes.fromhex(current_hash)) if existing.is_some: logger.debug(f"Block {current_hash[:16]}... already exists, stopping chain walk") @@ -75,19 +73,18 @@ async def backfill_chain_from_hash(app: "NBE", block_hash: str) -> None: logger.info("No new blocks to backfill") return - # Reverse so we insert from oldest to newest (parent before child) - blocks_to_insert.reverse() - - # Capture slot range before insert (blocks get detached from session after commit) - first_slot = blocks_to_insert[0].slot - last_slot = blocks_to_insert[-1].slot block_count = len(blocks_to_insert) - logger.info(f"Backfilling {block_count} blocks from chain walk...") - - # Insert all blocks, allowing the first one to be a chain root if its parent doesn't exist - await app.state.block_repository.create(*blocks_to_insert, allow_chain_root=True) - logger.info(f"Backfilled {block_count} blocks (slots {first_slot} to {last_slot})") + # Insert all blocks in 10k batches to avoid sqlite query limits + # allowing the first block to be a chain root if its parent doesn't exist + for idx, i in enumerate(range(block_count - 1, -1, -10_000)): + start = max(0, i - 9_999) + batch = blocks_to_insert[start : (i + 1)] + first_slot = batch[0].slot + last_slot = batch[-1].slot + # allow_chain_root true only on first iteration + await app.state.block_repository.create(batch, allow_chain_root=(idx == 0)) + logger.info(f"Backfilled {len(batch)} blocks (slots {first_slot} to {last_slot})") @asynccontextmanager @@ -112,6 +109,10 @@ async def node_lifespan(app: "NBE") -> AsyncGenerator[None]: yield finally: + # Check if node api needs cleanup + if hasattr(app.state.node_api, "aclose"): + logger.info("Closing node_api connections...") + await app.state.node_api.aclose() logger.info("Stopping node...") await app.state.node_manager.stop() logger.info("Node stopped.") @@ -166,7 +167,7 @@ async def subscribe_to_new_blocks(app: "NBE"): block_slot = block.slot # Now we have the parent, store the block - await app.state.block_repository.create(block) + await app.state.block_repository.create([block]) logger.debug(f"Stored block at slot {block_slot}") except Exception as error: