Add fork choice rule (#58)

* add fork choice rule

* add comments explaining k and s

* add tests

* fix test import
This commit is contained in:
Giacomo Pasini 2024-01-29 14:29:56 +01:00 committed by GitHub
parent a587e3a164
commit 45c303ef14
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 109 additions and 9 deletions

View File

@ -46,13 +46,8 @@ class Config:
class BlockHeader:
slot: Slot
parent: Id
def parent(self) -> Id:
return self.parent
def id(self) -> Id:
# TODO: spec out the block id
raise NotImplemented()
# TODO: spec out the block id, this is just a placeholder to unblock tests
id: Id
@dataclass
@ -124,8 +119,9 @@ class Follower:
# in that case, just ignore the block
# Evaluate the fork choice rule and return the block header of the block that should be the head of the chain
def fork_choice(local_chain: Chain, forks: List[Chain]) -> BlockHeader:
pass
def fork_choice(local_chain: Chain, forks: List[Chain]) -> Chain:
# TODO: define k and s
return maxvalid_bg(local_chain, forks, 0, 0)
def tip(self) -> BlockHeader:
return self.fork_choice()
@ -218,5 +214,48 @@ class Leader:
return BlockHeader(parent=parent.id(), slot=slot)
def common_prefix_len(a: Chain, b: Chain) -> int:
for i, (x, y) in enumerate(zip(a.blocks, b.blocks)):
if x.id != y.id:
return i
return min(len(a.blocks), len(b.blocks))
def chain_density(chain: Chain, slot: Slot) -> int:
return len(
[
block
for block in chain.blocks
if block.slot.absolute_slot < slot.absolute_slot
]
)
# Implementation of the fork choice rule as defined in the Ouroboros Genesis paper
# k defines the forking depth of chain we accept without more analysis
# s defines the length of time after the fork happened we will inspect for chain density
def maxvalid_bg(local_chain: Chain, forks: List[Chain], k: int, s: int) -> Chain:
cmax = local_chain
for chain in forks:
lowest_common_ancestor = common_prefix_len(cmax, chain)
m = cmax.length() - lowest_common_ancestor
if m <= k:
# Classic longest chain rule with parameter k
if cmax.length() < chain.length():
cmax = chain
else:
# The chain is forking too much, we need to pay a bit more attention
# In particular, select the chain that is the densest after the fork
forking_slot = Slot(
cmax.blocks[lowest_common_ancestor].slot.absolute_slot + s
)
cmax_density = chain_density(cmax, forking_slot)
candidate_density = chain_density(chain, forking_slot)
if cmax_density < candidate_density:
cmax = chain
return cmax
if __name__ == "__main__":
pass

View File

@ -0,0 +1,61 @@
from unittest import TestCase
import numpy as np
import hashlib
from copy import deepcopy
from cryptarchia.cryptarchia import maxvalid_bg, Chain, BlockHeader, Slot, Id
def make_block(parent_id: Id, slot: Slot, block_id: Id) -> BlockHeader:
return BlockHeader(parent=parent_id, id=block_id, slot=slot)
class TestLeader(TestCase):
def test_fork_choice_long_sparse_chain(self):
# The longest chain is not dense after the fork
common = [make_block(b"", Slot(i), str(i).encode()) for i in range(1, 50)]
long_chain = deepcopy(common)
short_chain = deepcopy(common)
for slot in range(50, 100):
# make arbitrary ids for the different chain so that the blocks appear to be different
long_id = hashlib.sha256(f"{slot}-long".encode()).digest()
short_id = hashlib.sha256(f"{slot}-short".encode()).digest()
if slot % 2 == 0:
long_chain.append(make_block(b"", Slot(slot), long_id))
short_chain.append(make_block(b"", Slot(slot), short_id))
# add more blocks to the long chain
for slot in range(100, 200):
long_chain.append(make_block(b"", Slot(slot), long_id))
assert len(long_chain) > len(short_chain)
# by setting a low k we trigger the density choice rule
k = 1
s = 50
assert maxvalid_bg(Chain(short_chain), [Chain(long_chain)], k, s) == Chain(
short_chain
)
# However, if we set k to the fork length, it will be accepted
k = len(long_chain)
assert maxvalid_bg(Chain(short_chain), [Chain(long_chain)], k, s) == Chain(
long_chain
)
def test_fork_choice_long_dense_chain(self):
# The longest chain is also the densest after the fork
common = [make_block(b"", Slot(i), str(i).encode()) for i in range(1, 50)]
long_chain = deepcopy(common)
short_chain = deepcopy(common)
for slot in range(50, 100):
# make arbitrary ids for the different chain so that the blocks appear to be different
long_id = hashlib.sha256(f"{slot}-long".encode()).digest()
short_id = hashlib.sha256(f"{slot}-short".encode()).digest()
long_chain.append(make_block(b"", Slot(slot), long_id))
if slot % 2 == 0:
short_chain.append(make_block(b"", Slot(slot), short_id))
k = 1
s = 50
assert maxvalid_bg(Chain(short_chain), [Chain(long_chain)], k, s) == Chain(
long_chain
)