wip: checkpoint sync

This commit is contained in:
Youngjoon Lee 2025-02-28 17:47:57 +09:00
parent 974208a1f5
commit 512ade7f58
No known key found for this signature in database
GPG Key ID: 303963A54A81DD4D
4 changed files with 49 additions and 5 deletions

View File

@ -11,6 +11,8 @@ from typing import Dict, Generator, List, TypeAlias
import numpy as np
from sortedcontainers import SortedDict
from cryptarchia.error import ParentNotFound
logger = logging.getLogger(__name__)
@ -454,6 +456,8 @@ class Follower:
logger.warning("invalid header")
return
if block.parent not in self.ledger_state:
raise ParentNotFound
new_state = self.ledger_state[block.parent].copy()
new_state.apply(block)
self.ledger_state[block.id()] = new_state

2
cryptarchia/error.py Normal file
View File

@ -0,0 +1,2 @@
class ParentNotFound(Exception):
pass

View File

@ -0,0 +1,23 @@
from cryptarchia.cryptarchia import Follower, LedgerState, iter_chain
from cryptarchia.sync.full_sync import full_sync
def get_checkpoint(follower: Follower) -> LedgerState:
iter = iter_chain(follower.tip_id(), follower.ledger_state)
for _ in range(follower.config.k):
next(iter)
return next(iter)
def checkpoint_sync(local: Follower, checkpoint: LedgerState, remotes: list[Follower]):
# apply the checkpoint to the local
checkpoint_block_id = checkpoint.block.id()
local.ledger_state[checkpoint_block_id] = checkpoint
local.local_chain = checkpoint_block_id
local.forks.remove(checkpoint_block_id)
local.block_storage.add_block(checkpoint.block)
# start forwards sync from the checkpoint
orphans = full_sync(local, remotes, local.tip().slot)
if orphans:
raise NotImplementedError("Orphaned blocks after checkpoint sync")

View File

@ -1,31 +1,46 @@
from collections import defaultdict
from typing import Generator
from cryptarchia.cryptarchia import Follower, Id, Slot
from cryptarchia.cryptarchia import BlockHeader, Follower, Id, Slot
from cryptarchia.error import ParentNotFound
SLOT_TOLERANCE = 1
def full_sync(local: Follower, remotes: list[Follower], start_slot: Slot):
def full_sync(
local: Follower, remotes: list[Follower], start_slot: Slot
) -> list[BlockHeader]:
# Start a full sync from the remotes to the local, starting from the given slot.
# Return orphaned blocks that could not be applied to the local.
# Sync only with remotes that are at least SLOT_TOLERANCE ahead of the local.
# Continue until there is no target to sync with.
#
# Group the remotes by their tip slot, and sync only with one remote per group.
# This is safe as long as the remote provides all blocks necessary for the future fork choice.
orphans: list[BlockHeader] = []
while groups := group_targets(remotes, start_slot):
for _tip_id, group in groups.items():
remote = group[0]
range_sync(local, remote, start_slot, remote.tip().slot)
for orphan in range_sync(local, remote, start_slot, remote.tip().slot):
orphans.append(orphan)
# Update the start_slot to check if the sync should continue.
start_slot = Slot(local.tip().slot.absolute_slot + 1)
return orphans
def range_sync(local: Follower, remote: Follower, from_slot: Slot, to_slot: Slot):
def range_sync(
local: Follower, remote: Follower, from_slot: Slot, to_slot: Slot
) -> Generator[BlockHeader, None, None]:
# Fetch blocks in the given range of slots from the remote and apply them to the local.
# Blocks should be fetched in order of slot.
for block in remote.block_storage.blocks_by_range(from_slot, to_slot):
local.on_block(block)
try:
local.on_block(block)
except ParentNotFound:
yield block
except:
raise
def group_targets(