Add header id and message format specification (#52)

* Create messages.abnf

* add missing block rule

* Add content id to header message

* add header id definition + implementation in python

* address review comments
This commit is contained in:
Giacomo Pasini 2024-01-31 23:09:03 +01:00 committed by GitHub
parent 0f6bcf11b1
commit 734b038c50
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 67 additions and 18 deletions

View File

@ -1,5 +1,5 @@
from typing import TypeAlias, List, Optional from typing import TypeAlias, List, Optional
from hashlib import sha256 from hashlib import sha256, blake2b
# Please note this is still a work in progress # Please note this is still a work in progress
from dataclasses import dataclass from dataclasses import dataclass
@ -46,8 +46,31 @@ class Config:
class BlockHeader: class BlockHeader:
slot: Slot slot: Slot
parent: Id parent: Id
# TODO: spec out the block id, this is just a placeholder to unblock tests content_size: int
id: Id content_id: Id
# **Attention**:
# The ID of a block header is defined as the 32byte blake2b hash of its fields
# as serialized in the format specified by the 'HEADER' rule in 'messages.abnf'.
#
# The following code is to be considered as a reference implementation, mostly to be used for testing.
def id(self) -> Id:
# version byte
h = blake2b(digest_size=32)
h.update(b"\x01")
# header type
h.update(b"\x00")
# content size
h.update(int.to_bytes(self.content_size, length=4, byteorder="big"))
# content id
assert len(self.content_id) == 32
h.update(self.content_id)
# slot
h.update(int.to_bytes(self.slot.absolute_slot, length=8, byteorder="big"))
# parent
assert len(self.parent) == 32
h.update(self.parent)
return h.digest()
@dataclass @dataclass
@ -216,7 +239,7 @@ class Leader:
def common_prefix_len(a: Chain, b: Chain) -> int: def common_prefix_len(a: Chain, b: Chain) -> int:
for i, (x, y) in enumerate(zip(a.blocks, b.blocks)): for i, (x, y) in enumerate(zip(a.blocks, b.blocks)):
if x.id != y.id: if x.id() != y.id():
return i return i
return min(len(a.blocks), len(b.blocks)) return min(len(a.blocks), len(b.blocks))

21
cryptarchia/messages.abnf Normal file
View File

@ -0,0 +1,21 @@
; VERSION 0.1
; ------------ BLOCK ----------------------
BLOCK = HEADER CONTENT
; ------------ HEADER ---------------------
VERSION = %x01
HEADER = VERSION HEADER-UNSIGNED
HEADER-UNSIGNED = %x00 HEADER-COMMON
HEADER-COMMON = CONTENT-SIZE CONTENT-ID BLOCK-DATE PARENT-ID
CONTENT-SIZE = U32
BLOCK-DATE = BLOCK-SLOT
BLOCK-SLOT = U64
PARENT-ID = HEADER-ID
; ------------ CONTENT --------------------
CONTENT = *OCTET
; ------------- MISC ----------------------
U32 = 4OCTET ; unsigned integer 32 bit (BE)
U64 = 8OCTET ; unsigned integer 32 bit (BE)
HEADER-ID = 32OCTET
CONTENT-ID = 32OCTET

View File

@ -1,5 +1,5 @@
from unittest import TestCase from unittest import TestCase
from itertools import repeat
import numpy as np import numpy as np
import hashlib import hashlib
@ -7,27 +7,32 @@ from copy import deepcopy
from cryptarchia.cryptarchia import maxvalid_bg, Chain, BlockHeader, Slot, Id from cryptarchia.cryptarchia import maxvalid_bg, Chain, BlockHeader, Slot, Id
def make_block(parent_id: Id, slot: Slot, block_id: Id) -> BlockHeader: def make_block(parent_id: Id, slot: Slot, content: bytes) -> BlockHeader:
return BlockHeader(parent=parent_id, id=block_id, slot=slot) assert len(parent_id) == 32
content_id = hashlib.sha256(content).digest()
return BlockHeader(
parent=parent_id, content_size=1, slot=slot, content_id=content_id
)
class TestLeader(TestCase): class TestLeader(TestCase):
def test_fork_choice_long_sparse_chain(self): def test_fork_choice_long_sparse_chain(self):
# The longest chain is not dense after the fork # The longest chain is not dense after the fork
common = [make_block(b"", Slot(i), str(i).encode()) for i in range(1, 50)] common = [make_block(bytes(32), Slot(i), bytes(i)) for i in range(1, 50)]
long_chain = deepcopy(common) long_chain = deepcopy(common)
short_chain = deepcopy(common) short_chain = deepcopy(common)
for slot in range(50, 100): for slot in range(50, 100):
# make arbitrary ids for the different chain so that the blocks appear to be different # make arbitrary ids for the different chain so that the blocks appear to be different
long_id = hashlib.sha256(f"{slot}-long".encode()).digest() long_content = f"{slot}-long".encode()
short_id = hashlib.sha256(f"{slot}-short".encode()).digest() short_content = f"{slot}-short".encode()
if slot % 2 == 0: if slot % 2 == 0:
long_chain.append(make_block(b"", Slot(slot), long_id)) long_chain.append(make_block(bytes(32), Slot(slot), long_content))
short_chain.append(make_block(b"", Slot(slot), short_id)) short_chain.append(make_block(bytes(32), Slot(slot), short_content))
# add more blocks to the long chain # add more blocks to the long chain
for slot in range(100, 200): for slot in range(100, 200):
long_chain.append(make_block(b"", Slot(slot), long_id)) long_content = f"{slot}-long".encode()
long_chain.append(make_block(bytes(32), Slot(slot), long_content))
assert len(long_chain) > len(short_chain) assert len(long_chain) > len(short_chain)
# by setting a low k we trigger the density choice rule # by setting a low k we trigger the density choice rule
k = 1 k = 1
@ -44,16 +49,16 @@ class TestLeader(TestCase):
def test_fork_choice_long_dense_chain(self): def test_fork_choice_long_dense_chain(self):
# The longest chain is also the densest after the fork # The longest chain is also the densest after the fork
common = [make_block(b"", Slot(i), str(i).encode()) for i in range(1, 50)] common = [make_block(bytes(32), Slot(i), bytes(i)) for i in range(1, 50)]
long_chain = deepcopy(common) long_chain = deepcopy(common)
short_chain = deepcopy(common) short_chain = deepcopy(common)
for slot in range(50, 100): for slot in range(50, 100):
# make arbitrary ids for the different chain so that the blocks appear to be different # make arbitrary ids for the different chain so that the blocks appear to be different
long_id = hashlib.sha256(f"{slot}-long".encode()).digest() long_content = f"{slot}-long".encode()
short_id = hashlib.sha256(f"{slot}-short".encode()).digest() short_content = f"{slot}-short".encode()
long_chain.append(make_block(b"", Slot(slot), long_id)) long_chain.append(make_block(bytes(32), Slot(slot), long_content))
if slot % 2 == 0: if slot % 2 == 0:
short_chain.append(make_block(b"", Slot(slot), short_id)) short_chain.append(make_block(bytes(32), Slot(slot), short_content))
k = 1 k = 1
s = 50 s = 50
assert maxvalid_bg(Chain(short_chain), [Chain(long_chain)], k, s) == Chain( assert maxvalid_bg(Chain(short_chain), [Chain(long_chain)], k, s) == Chain(