nomos-specs/cryptarchia/test_ledger_state_update.py
davidrusu d2f6ad579a
Stake Relativization Specification + Fixes (#86)
* cryptarchia/relative-stake: failing test showing lack of inference

* implement stake-relativization spec

* test total stake inference in empty epoch

* move TestNode to test_common

* fix bug in Follower re-org logic

* improve orphan proof test coverage

* force orphans to already have been in one of the existing branches

* rename initial_inferred_total_stake ==> initial_total_stake

* add simple orphan import test

* Follower.unimported_orphans: ensure no orphans from same branch

* remove unnecessary LedgerState.slot

* cryptarchia: doc fixes

* factor out total stake inference

* docs for total stake inference

* rename total_stake to total_active_stake

* replace prints in cryptarchia with logging.logger
2024-03-23 05:50:00 +04:00

290 lines
11 KiB
Python

from unittest import TestCase
import numpy as np
from .cryptarchia import (
Follower,
TimeConfig,
BlockHeader,
Config,
Coin,
LedgerState,
MockLeaderProof,
Slot,
Id,
)
from .test_common import mk_config, mk_block, mk_genesis_state
class TestLedgerStateUpdate(TestCase):
def test_ledger_state_prevents_coin_reuse(self):
leader_coin = Coin(sk=0, value=100)
genesis = mk_genesis_state([leader_coin])
follower = Follower(genesis, mk_config([leader_coin]))
block = mk_block(slot=0, parent=genesis.block, coin=leader_coin)
follower.on_block(block)
# Follower should have accepted the block
assert follower.local_chain.length() == 1
assert follower.tip() == block
# Follower should have updated their ledger state to mark the leader coin as spent
assert follower.tip_state().verify_unspent(leader_coin.nullifier()) == False
reuse_coin_block = mk_block(slot=1, parent=block.id(), coin=leader_coin)
follower.on_block(block)
# Follower should *not* have accepted the block
assert follower.local_chain.length() == 1
assert follower.tip() == block
def test_ledger_state_is_properly_updated_on_reorg(self):
coin = [Coin(sk=0, value=100), Coin(sk=1, value=100), Coin(sk=2, value=100)]
genesis = mk_genesis_state(coin)
follower = Follower(genesis, mk_config(coin))
# 1) coin[0] & coin[1] both concurrently win slot 0
block_1 = mk_block(parent=genesis.block, slot=0, coin=coin[0])
block_2 = mk_block(parent=genesis.block, slot=0, coin=coin[1])
# 2) follower sees block 1 first
follower.on_block(block_1)
assert follower.tip() == block_1
assert not follower.tip_state().verify_unspent(coin[0].nullifier())
# 3) then sees block 2, but sticks with block_1 as the tip
follower.on_block(block_2)
assert follower.tip() == block_1
assert len(follower.forks) == 1, f"{len(follower.forks)}"
# 4) then coin[2] wins slot 1 and chooses to extend from block_2
block_3 = mk_block(parent=block_2.id(), slot=1, coin=coin[2])
follower.on_block(block_3)
# the follower should have switched over to the block_2 fork
assert follower.tip() == block_3
# and the original coin[0] should now be removed from the spent pool
assert follower.tip_state().verify_unspent(coin[0].nullifier())
def test_fork_creation(self):
coins = [Coin(sk=i, value=100) for i in range(7)]
genesis = mk_genesis_state(coins)
follower = Follower(genesis, mk_config(coins))
# coin_0 & coin_1 both concurrently win slot 0 based on the genesis block
# Both blocks are accepted, and a fork is created "from the genesis block"
block_1 = mk_block(parent=genesis.block, slot=0, coin=coins[0])
block_2 = mk_block(parent=genesis.block, slot=0, coin=coins[1])
follower.on_block(block_1)
follower.on_block(block_2)
assert follower.tip() == block_1
assert len(follower.forks) == 1, f"{len(follower.forks)}"
assert follower.forks[0].tip() == block_2
# coin_2 wins slot 1 and chooses to extend from block_1
# coin_3 also wins slot 1 and but chooses to extend from block_2
# Both blocks are accepted. Both the local chain and the fork grow. No fork is newly created.
block_3 = mk_block(parent=block_1.id(), slot=1, coin=coins[2])
block_4 = mk_block(parent=block_2.id(), slot=1, coin=coins[3])
follower.on_block(block_3)
follower.on_block(block_4)
assert follower.tip() == block_3
assert len(follower.forks) == 1, f"{len(follower.forks)}"
assert follower.forks[0].tip() == block_4
# coin_4 wins slot 1 and but chooses to extend from block_2 as well
# The block is accepted. A new fork is created "from the block_2".
block_5 = mk_block(parent=block_2.id(), slot=1, coin=coins[4])
follower.on_block(block_5)
assert follower.tip() == block_3
assert len(follower.forks) == 2, f"{len(follower.forks)}"
assert follower.forks[0].tip() == block_4
assert follower.forks[1].tip() == block_5
# A block based on an unknown parent is not accepted.
# Nothing changes from the local chain and forks.
unknown_block = mk_block(parent=block_5.id(), slot=2, coin=coins[5])
block_6 = mk_block(parent=unknown_block.id(), slot=2, coin=coins[6])
follower.on_block(block_6)
assert follower.tip() == block_3
assert len(follower.forks) == 2, f"{len(follower.forks)}"
assert follower.forks[0].tip() == block_4
assert follower.forks[1].tip() == block_5
def test_epoch_transition(self):
leader_coins = [Coin(sk=i, value=100) for i in range(4)]
genesis = mk_genesis_state(leader_coins)
config = mk_config(leader_coins)
follower = Follower(genesis, config)
# We assume an epoch length of 10 slots in this test.
assert config.epoch_length == 20, f"epoch len: {config.epoch_length}"
# ---- EPOCH 0 ----
block_1 = mk_block(slot=0, parent=genesis.block, coin=leader_coins[0])
follower.on_block(block_1)
assert follower.tip() == block_1
assert follower.tip().slot.epoch(config).epoch == 0
block_2 = mk_block(slot=19, parent=block_1.id(), coin=leader_coins[1])
follower.on_block(block_2)
assert follower.tip() == block_2
assert follower.tip().slot.epoch(config).epoch == 0
# ---- EPOCH 1 ----
block_3 = mk_block(slot=20, parent=block_2.id(), coin=leader_coins[2])
follower.on_block(block_3)
assert follower.tip() == block_3
assert follower.tip().slot.epoch(config).epoch == 1
# ---- EPOCH 2 ----
# when trying to propose a block for epoch 2, the stake distribution snapshot should be taken
# at the end of epoch 0, i.e. slot 9
# To ensure this is the case, we add a new coin just to the state associated with that slot,
# so that the new block can be accepted only if that is the snapshot used
# first, verify that if we don't change the state, the block is not accepted
block_4 = mk_block(slot=40, parent=block_3.id(), coin=Coin(sk=4, value=100))
follower.on_block(block_4)
assert follower.tip() == block_3
# then we add the coin to "spendable commitments" associated with slot 9
follower.ledger_state[block_2.id()].commitments_spend.add(
Coin(sk=4, value=100).commitment()
)
follower.on_block(block_4)
assert follower.tip() == block_4
assert follower.tip().slot.epoch(config).epoch == 2
def test_evolved_coin_is_eligible_for_leadership(self):
coin = Coin(sk=0, value=100)
genesis = mk_genesis_state([coin])
follower = Follower(genesis, mk_config([coin]))
# coin wins the first slot
block_1 = mk_block(slot=0, parent=genesis.block, coin=coin)
follower.on_block(block_1)
assert follower.tip() == block_1
# coin can't be reused to win following slots:
block_2_reuse = mk_block(slot=1, parent=block_1.id(), coin=coin)
follower.on_block(block_2_reuse)
assert follower.tip() == block_1
# but the evolved coin is eligible
block_2_evolve = mk_block(slot=1, parent=block_1.id(), coin=coin.evolve())
follower.on_block(block_2_evolve)
assert follower.tip() == block_2_evolve
def test_new_coins_becoming_eligible_after_stake_distribution_stabilizes(self):
coin = Coin(sk=0, value=100)
config = mk_config([coin])
genesis = mk_genesis_state([coin])
follower = Follower(genesis, config)
# We assume an epoch length of 20 slots in this test.
assert config.epoch_length == 20
# ---- EPOCH 0 ----
block_0_0 = mk_block(slot=0, parent=genesis.block, coin=coin)
follower.on_block(block_0_0)
assert follower.tip() == block_0_0
# mint a new coin to be used for leader elections in upcoming epochs
coin_new = Coin(sk=1, value=10)
follower.ledger_state[block_0_0.id()].commitments_spend.add(
coin_new.commitment()
)
# the new coin is not yet eligible for elections
block_0_1_attempt = mk_block(slot=1, parent=block_0_0.id(), coin=coin_new)
follower.on_block(block_0_1_attempt)
assert follower.tip() == block_0_0
# whereas the evolved coin from genesis can be spent immediately
block_0_1 = mk_block(slot=1, parent=block_0_0.id(), coin=coin.evolve())
follower.on_block(block_0_1)
assert follower.tip() == block_0_1
# ---- EPOCH 1 ----
# The newly minted coin is still not eligible in the following epoch since the
# stake distribution snapshot is taken at the beginning of the previous epoch
block_1_0 = mk_block(slot=20, parent=block_0_1.id(), coin=coin_new)
follower.on_block(block_1_0)
assert follower.tip() == block_0_1
# ---- EPOCH 2 ----
# The coin is finally eligible 2 epochs after it was first minted
block_2_0 = mk_block(
slot=40,
parent=block_0_1.id(),
coin=coin_new,
)
follower.on_block(block_2_0)
assert follower.tip() == block_2_0
# And now the minted coin can freely use the evolved coin for subsequent blocks
block_2_1 = mk_block(slot=40, parent=block_2_0.id(), coin=coin_new.evolve())
follower.on_block(block_2_1)
assert follower.tip() == block_2_1
def test_orphaned_proofs(self):
coin, coin_orphan = Coin(sk=0, value=100), Coin(sk=1, value=100)
genesis = mk_genesis_state([coin, coin_orphan])
follower = Follower(genesis, mk_config([coin, coin_orphan]))
block_0_0 = mk_block(slot=0, parent=genesis.block, coin=coin)
follower.on_block(block_0_0)
assert follower.tip() == block_0_0
coin_new = coin.evolve()
coin_new_new = coin_new.evolve()
block_0_1 = mk_block(slot=1, parent=block_0_0.id(), coin=coin_new_new)
follower.on_block(block_0_1)
# the coin evolved twice should not be accepted as it is not in the lead commitments
assert follower.tip() == block_0_0
# An orphaned proof will not be accepted until a node first sees the corresponding block.
#
# Also, notice that the block is using the evolved orphan coin which is not present on the main
# branch. The evolved orphan commitment is added from the orphan prior to validating the block
# header as part of orphan importing process
orphan = mk_block(parent=genesis.block, slot=0, coin=coin_orphan)
block_0_1 = mk_block(
slot=1,
parent=block_0_0.id(),
coin=coin_orphan.evolve(),
orphaned_proofs=[orphan],
)
follower.on_block(block_0_1)
# since follower had not seen this orphan prior to being included as
# an orphan proof, it will be rejected
assert follower.tip() == block_0_0
# but all is fine if the follower first sees the orphan block, and then
# is imported into the main chain
follower.on_block(orphan)
follower.on_block(block_0_1)
assert follower.tip() == block_0_1