diff --git a/cryptarchia/sync.py b/cryptarchia/sync.py index 323e368..56cb379 100644 --- a/cryptarchia/sync.py +++ b/cryptarchia/sync.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from collections import defaultdict from typing import Generator @@ -23,7 +25,7 @@ def sync(local: Follower, peers: list[Follower], checkpoint: LedgerState | None # If the backfilling fails, it means that the checkpoint chain is invalid, # and the sync process should be cancelled. if checkpoint: - backfill_fork(local, checkpoint.block, block_fetcher) + backfill_fork(local, checkpoint.block, None, block_fetcher) # Repeat the sync process until no peer has a tip ahead of the local tip, # because peers' tips may advance during the sync process. @@ -33,9 +35,9 @@ def sync(local: Follower, peers: list[Follower], checkpoint: LedgerState | None # Gather orphaned blocks, which are blocks from forks that are absent in the local block tree. start_slot = local.tip().slot - orphans: set[BlockHeader] = set() + orphans: dict[BlockHeader, BlockFetcher.PeerId] = dict() num_blocks = 0 - for block in block_fetcher.fetch_blocks_from(start_slot): + for block, peer_id in block_fetcher.fetch_blocks_from(start_slot): num_blocks += 1 # Reject blocks that have been rejected in the past # or whose parent has been rejected. @@ -45,9 +47,9 @@ def sync(local: Follower, peers: list[Follower], checkpoint: LedgerState | None try: local.on_block(block) - orphans.discard(block) + orphans.pop(block, None) except ParentNotFound: - orphans.add(block) + orphans[block] = peer_id except Exception: rejected_blocks.add(block.id()) @@ -59,7 +61,7 @@ def sync(local: Follower, peers: list[Follower], checkpoint: LedgerState | None # Backfill the orphan forks starting from the orphan blocks with applying fork choice rule. # # Sort the orphan blocks by slot in descending order to minimize the number of backfillings. - for orphan in sorted(orphans, key=lambda b: b.slot, reverse=True): + for orphan, peer_id in sorted(orphans.items(), key=lambda item: item[0].slot, reverse=True): # Skip the orphan block if it has been processed during the previous backfillings # (i.e. if it has been already added to the local block tree). # Or, skip if it has been rejected during the previous backfillings. @@ -68,7 +70,7 @@ def sync(local: Follower, peers: list[Follower], checkpoint: LedgerState | None and orphan.id() not in rejected_blocks ): try: - backfill_fork(local, orphan, block_fetcher) + backfill_fork(local, orphan, peer_id, block_fetcher) except InvalidBlockFromBackfillFork as e: rejected_blocks.update(block.id() for block in e.invalid_suffix) @@ -76,14 +78,15 @@ def sync(local: Follower, peers: list[Follower], checkpoint: LedgerState | None def backfill_fork( local: Follower, fork_tip: BlockHeader, - block_fetcher: "BlockFetcher", + fork_peer_id: BlockFetcher.PeerId | None, + block_fetcher: BlockFetcher, ): # Backfills a fork, which is absent in the local block tree, by fetching blocks from the peers. # During backfilling, the fork choice rule is continuously applied. suffix = find_missing_part( local, - block_fetcher.fetch_chain_backward(fork_tip.id(), local), + block_fetcher.fetch_chain_backward(fork_tip.id(), fork_peer_id), ) # Add blocks in the fork suffix with applying fork choice rule. @@ -116,39 +119,43 @@ class BlockFetcher: # NOTE: This class is a mock, which uses a naive approach to fetch blocks from multiple peers. # In real implementation, any optimized way can be used, such as parallel fetching. - def __init__(self, peers: list[Follower]): - self.peers = peers + PeerId = int - def fetch_blocks_from(self, start_slot: Slot) -> Generator[BlockHeader, None, None]: + def __init__(self, peers: list[Follower]): + self.peers = dict() + for peer_id, peer in enumerate(peers): + self.peers[peer_id] = peer + + def fetch_blocks_from(self, start_slot: Slot) -> Generator[tuple[BlockHeader, PeerId], None, None]: # Filter peers that have a tip ahead of the local tip # and group peers by their tip to minimize the number of fetches. groups = self.filter_and_group_peers_by_tip(start_slot) for group in groups.values(): - for block in BlockFetcher.fetch_blocks_by_slot(group, start_slot): - yield block + for block, peer_id in BlockFetcher.fetch_blocks_by_slot(group, start_slot): + yield block, peer_id def filter_and_group_peers_by_tip( self, start_slot: Slot - ) -> dict[BlockHeader, list[Follower]]: + ) -> dict[BlockHeader, dict[PeerId, Follower]]: # Group peers by their tip. # Filter only the peers whose tip is ahead of the start_slot. - groups: dict[BlockHeader, list[Follower]] = defaultdict(list) - for peer in self.peers: + groups = defaultdict(dict) + for peer_id, peer in self.peers.items(): if peer.tip().slot.absolute_slot > start_slot.absolute_slot: - groups[peer.tip()].append(peer) + groups[peer.tip()][peer_id] = peer return groups @staticmethod def fetch_blocks_by_slot( - peers: list[Follower], start_slot: Slot - ) -> Generator[BlockHeader, None, None]: + peers: dict[PeerId, Follower], start_slot: Slot + ) -> Generator[tuple[BlockHeader, PeerId], None, None]: # Fetch blocks in the given range of slots from one of the peers. # Blocks should be returned in order of slot. # If a peer fails, try the next peer. - for peer in peers: + for peer_id, peer in peers.items(): try: for block in peer.blocks_by_slot(start_slot): - yield block + yield block, peer_id # Update start_slot for the potential try with the next peer. start_slot = block.slot # The peer successfully returned all blocks. No need to try the next peer. @@ -157,25 +164,18 @@ class BlockFetcher: continue def fetch_chain_backward( - self, tip: Hash, local: Follower + self, tip: Hash, peer_id: PeerId | None, ) -> Generator[BlockHeader, None, None]: - # Fetches a chain of blocks from the peers, starting from the given tip to the genesis. - # Attempts to extend the chain as much as possible by querying multiple peers, - # considering that not all peers may have the full chain (from the genesis). - + # Fetches a chain of blocks from a peer, starting from the given tip to the genesis. + # If peer_id is not set, fetch the chain by querying multiple peers. id = tip - # First, try to iterate the chain from the local block tree. - for block in iter_chain_blocks(id, local.ledger_state): - yield block - if block.id() == local.genesis_state.block.id(): - return - id = block.parent - - # Try to continue by fetching the remaining blocks from the peers - for peer in self.peers: + peers = [self.peers[peer_id]] if peer_id is not None else list(self.peers.values()) + for peer in peers: for block in iter_chain_blocks(id, peer.ledger_state): yield block - if block.id() == local.genesis_state.block.id(): + if block.id() == peer.genesis_state.block.id(): + # Received the entire chain from the peer. + # No need to continue with the next peer. return id = block.parent