nomos-specs/cryptarchia/test_stake_relativization.py

171 lines
6.0 KiB
Python
Raw Permalink Normal View History

from unittest import TestCase
from dataclasses import dataclass
import itertools
import numpy as np
from .cryptarchia import Config, Coin, Slot
from .test_common import mk_config, mk_genesis_state, mk_block, TestNode, Follower
class TestStakeRelativization(TestCase):
def test_ledger_leader_counting(self):
coins = [Coin(sk=i, value=10) for i in range(2)]
c_a, c_b = coins
config = mk_config(coins)
genesis = mk_genesis_state(coins)
follower = Follower(genesis, config)
# initially, there are 0 leaders
assert follower.tip_state().leader_count == 0
# after a block, 1 leader has been observed
b1 = mk_block(genesis.block, slot=1, coin=c_a)
follower.on_block(b1)
assert follower.tip_state().leader_count == 1
# on fork, tip state is not updated
orphan = mk_block(genesis.block, slot=1, coin=c_b)
follower.on_block(orphan)
assert follower.tip_state().block == b1.id()
assert follower.tip_state().leader_count == 1
# after orphan is adopted, leader count should jumpy by 2 (each orphan counts as a leader)
b2 = mk_block(b1.id(), slot=2, coin=c_a.evolve(), orphaned_proofs=[orphan])
follower.on_block(b2)
assert follower.tip_state().block == b2.id()
assert follower.tip_state().leader_count == 3
def test_inference_on_empty_genesis_epoch(self):
coin = Coin(sk=0, value=10)
config = mk_config([coin]).replace(
initial_total_active_stake=20,
total_active_stake_learning_rate=0.5,
active_slot_coeff=0.5,
)
genesis = mk_genesis_state([coin])
node = TestNode(config, genesis, coin)
# -- epoch 0 --
# ..... silence
# -- epoch 1 --
# Given no blocks produced in epoch 0,
epoch1_state = node.epoch_state(Slot(config.epoch_length))
# given learning rate of 0.5 and 0 occupied slots in epoch 0, we should see
# inferred total stake drop by half in epoch 1
assert epoch1_state.inferred_total_active_stake == 10
# -- epoch 2 --
epoch1_state = node.epoch_state(Slot(config.epoch_length * 2))
# and again, we should see inferred total stake drop by half in epoch 2 given
# no occupied slots in epoch 1
assert epoch1_state.inferred_total_active_stake == 5
def test_inferred_total_active_stake_close_to_true_total_stake(self):
PRINT_DEBUG = False
seed = 0
N = 3
EPOCHS = 2
np.random.seed(seed)
stake = np.array((np.random.pareto(10, N) + 1) * 1000, dtype=np.int64)
coins = [Coin(sk=i, value=int(s)) for i, s in enumerate(stake)]
config = Config.cryptarchia_v0_0_1(stake.sum() * 2).replace(k=40)
genesis = mk_genesis_state(coins)
nodes = [TestNode(config, genesis, c) for c in coins]
T = config.epoch_length * EPOCHS
slot_leaders = np.zeros(T, dtype=np.int32)
for slot in map(Slot, range(T)):
proposed_blocks = [n.on_slot(slot) for n in nodes]
slot_leaders[slot.absolute_slot] = N - proposed_blocks.count(None)
# now deliver the proposed blocks
for n_idx, node in enumerate(nodes):
# shuffle proposed blocks to simulate random delivery
block_order = list(range(N))
np.random.shuffle(block_order)
for block_idx in block_order:
if block := proposed_blocks[block_idx]:
node.on_block(block)
# Instead of inspecting state of each node, we group the nodes by their
# tip, and select a representative for each group to inspect.
#
# This makes debugging with large number of nodes more maneagable.
grouped_by_tip = _group_by(nodes, lambda n: n.follower.tip_id())
for group in grouped_by_tip.values():
ref_node = group[0]
ref_epoch_state = ref_node.epoch_state(Slot(T))
for node in group:
assert node.follower.tip_state() == ref_node.follower.tip_state()
assert node.epoch_state(Slot(T)) == ref_epoch_state
reps = [g[0] for g in grouped_by_tip.values()]
if PRINT_DEBUG:
print()
print("seed", seed)
print(f"T={T}, EPOCHS={EPOCHS}")
print(
f"lottery stats",
f"mean={slot_leaders.mean():.3f}",
f"var={slot_leaders.var():.3f}",
)
print("true total stake\t", stake.sum())
print("D_0\t", config.initial_total_stake)
inferred_stake_by_epoch_by_rep = [
[
r.epoch_state(Slot(e * config.epoch_length)).total_stake()
for e in range(EPOCHS + 1)
]
for r in reps
]
print(
f"D_{list(range(EPOCHS + 1))}\n\t",
"\n\t".join(
[
f"Rep {i}: {stakes}"
for i, stakes in inferred_stake_by_epoch_by_rep
]
),
)
print("true leader count\t", slot_leaders.sum())
print(
"follower leader counts\t",
[r.follower.tip_state().leader_count for r in reps],
)
assert all(
slot_leaders.sum() + 1 == len(n.follower.ledger_state) for n in nodes
), f"{slot_leaders.sum() + 1}!={[len(n.follower.ledger_state) for n in nodes]}"
for node in reps:
inferred_stake = node.epoch_state(Slot(T)).total_active_stake()
pct_err = (
abs(stake.sum() - inferred_stake) / config.initial_total_active_stake
)
eps = (1 - config.total_active_stake_learning_rate) ** EPOCHS
assert pct_err < eps, f"pct_err={pct_err} < eps={eps}"
def _group_by(iterable, key):
import itertools
return {
k: list(group) for k, group in itertools.groupby(sorted(iterable, key=key), key)
}