Random Beacon revision (#29)

* Random Beacon revision

This is a proposed revision of the random beacon specification.
First of all it fixes a few little mistakes in the signing
process:
* Use BasicSchemeMPL instead of PoPSchemeMPL since we don't use
  Proof of Possession.
* Hashing the values prior to the call to BasicSchemeMPL.sing()
  is not necessary. This step has been removed.

In addition, all data inside the random beacon state that
anyone willing to verify must know anyway has been removed.
In the current version this includes the 'context' and the public
key of the signer. The verifier has to independently check that
those values have been correctly obtained anyway, so there's no
need to include them in the state that is passed around.

Lastly, the beacon context view has been changed from using a
string encoding to a little endian variable-length encoding and
is now tied to qc.view instead of current_view of the processing
node.

* actually use the version const
This commit is contained in:
Giacomo Pasini 2023-05-25 10:26:35 +02:00 committed by GitHub
parent b0edea6a98
commit 34617dc911
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 67 additions and 118 deletions

View File

@ -5,67 +5,68 @@ from typing import TypeAlias
# carnot imports
# lib imports
from blspy import PrivateKey, Util, PopSchemeMPL, G2Element, G1Element
from blspy import PrivateKey, Util, BasicSchemeMPL, G2Element, G1Element
# stdlib imports
from hashlib import sha256
View: TypeAlias = int
Beacon: TypeAlias = bytes
Proof: TypeAlias = bytes # For now this is gonna be a public key, in future research we may pivot to zk proofs.
Sig: TypeAlias = bytes
Entropy: TypeAlias = bytes
PublicKey: TypeAlias = G1Element
VERSION = 0
def generate_random_sk() -> PrivateKey:
seed = bytes([randint(0, 255) for _ in range(32)])
return PopSchemeMPL.key_gen(seed)
return BasicSchemeMPL.key_gen(seed)
def view_to_bytes(view: View) -> bytes:
return view.to_bytes((view.bit_length() + 7) // 8, byteorder='little', signed=True)
@dataclass
class RandomBeacon:
version: int
context: View
entropy: Beacon
# TODO: Just the happy path beacons owns a proof, we can set the proof to empty bytes for now.
# Probably we should separate this into two kinds of beacons and group them under a single type later on.
proof: Proof
sig: Sig
def entropy(self) -> Entropy:
return self.sig
class NormalMode:
@staticmethod
def verify(beacon: RandomBeacon) -> bool:
def verify(beacon: RandomBeacon, pk: PublicKey, view: View) -> bool:
"""
:param proof: BLS signature
:param beacon: Beacon is signature for current view
:param view: View to verify beacon upon
:param beacon: the provided beacon
:param view: view to verify beacon upon
:param pk: public key of the issuer of the beacon
:return:
"""
# TODO: Actually verify that the message is propoerly signed
sig = G2Element.from_bytes(beacon.entropy)
proof = G1Element.from_bytes(beacon.proof)
return PopSchemeMPL.verify(proof, Util.hash256(str(beacon.context).encode()), sig)
sig = G2Element.from_bytes(beacon.sig)
return BasicSchemeMPL.verify(pk, view_to_bytes(view), sig)
@staticmethod
def generate_beacon(private_key: PrivateKey, view: View) -> Beacon:
return bytes(PopSchemeMPL.sign(private_key, Util.hash256(str(view).encode())))
def generate_beacon(private_key: PrivateKey, view: View) -> RandomBeacon:
return RandomBeacon(VERSION, bytes(BasicSchemeMPL.sign(private_key, view_to_bytes(view))))
class RecoveryMode:
@staticmethod
def verify(last_beacon: RandomBeacon, beacon: RandomBeacon) -> bool:
def verify(last_beacon: RandomBeacon, beacon: RandomBeacon, view: View) -> bool:
"""
:param last_beacon: Unhappy -> last working beacon (signature), Happy -> Hash of previous beacon and next view number
:param beacon:
:param view:
:param last_beacon: beacon for view - 1
:param beacon: beacon for view
:param view: the view to verify beacon upon
:return:
"""
b = sha256(last_beacon.entropy + str(beacon.context).encode()).digest()
return b == beacon.entropy
b = sha256(last_beacon.entropy() + view_to_bytes(view)).digest()
return b == beacon.entropy()
@staticmethod
def generate_beacon(last_beacon: Beacon, view: View) -> Beacon:
return sha256(last_beacon + str(view).encode()).digest()
def generate_beacon(last_beacon_entropy: Entropy, view: View) -> RandomBeacon:
return RandomBeacon(VERSION, sha256(last_beacon_entropy + view_to_bytes(view)).digest())
class RandomBeaconHandler:
@ -77,14 +78,14 @@ class RandomBeaconHandler:
"""
self.last_beacon: RandomBeacon = beacon
def verify_happy(self, new_beacon: RandomBeacon) -> bool:
if NormalMode.verify(new_beacon):
def verify_happy(self, new_beacon: RandomBeacon, pk: PublicKey, view: View) -> bool:
if NormalMode.verify(new_beacon, pk, view):
self.last_beacon = new_beacon
return True
return False
def verify_unhappy(self, new_beacon: RandomBeacon) -> bool:
if RecoveryMode.verify(self.last_beacon, new_beacon):
def verify_unhappy(self, new_beacon: RandomBeacon, view: View) -> bool:
if RecoveryMode.verify(self.last_beacon, new_beacon, view):
self.last_beacon = new_beacon
return True
return False

View File

@ -7,6 +7,8 @@ from overlay import EntropyOverlay
@dataclass
class BeaconizedBlock(Block):
beacon: RandomBeacon
# public key of the proposer
pk: PublicKey
class BeaconizedCarnot(Carnot):
@ -14,14 +16,9 @@ class BeaconizedCarnot(Carnot):
self.sk = sk
self.pk = bytes(self.sk.get_g1())
self.random_beacon = RandomBeaconHandler(
RandomBeacon(
version=0,
context=-1,
entropy=RecoveryMode.generate_beacon(entropy, -1),
proof=self.pk
RecoveryMode.generate_beacon(entropy, -1)
)
)
overlay.set_entropy(self.random_beacon.last_beacon.entropy)
overlay.set_entropy(self.random_beacon.last_beacon.entropy())
super().__init__(self.pk, overlay=overlay)
def approve_block(self, block: BeaconizedBlock, votes: Set[Vote]) -> Event:
@ -47,39 +44,28 @@ class BeaconizedCarnot(Carnot):
# root members send votes to next leader, we update our beacon first
if self.overlay.is_member_of_root_committee(self.id):
self.random_beacon.verify_happy(block.beacon)
self.overlay.set_entropy(self.random_beacon.last_beacon.entropy)
assert(self.random_beacon.verify_happy(block.beacon, block.pk, block.qc.view))
self.overlay.set_entropy(self.random_beacon.last_beacon.entropy())
return Send(to=self.overlay.leader(), payload=vote)
# otherwise we send to the parent committee and update the beacon second
return_event = Send(to=self.overlay.parent_committee(self.id), payload=vote)
self.random_beacon.verify_happy(block.beacon)
self.overlay.set_entropy(self.random_beacon.last_beacon.entropy)
assert(self.random_beacon.verify_happy(block.beacon, block.pk, block.qc.view))
self.overlay.set_entropy(self.random_beacon.last_beacon.entropy())
return return_event
def receive_timeout_qc(self, timeout_qc: TimeoutQc):
super().receive_timeout_qc(timeout_qc)
if timeout_qc.view < self.current_view:
return
entropy = RecoveryMode.generate_beacon(self.random_beacon.last_beacon.entropy, timeout_qc.view)
new_beacon = RandomBeacon(
version=0,
context=self.current_view,
entropy=entropy,
proof=b""
)
self.random_beacon.verify_unhappy(new_beacon)
self.overlay.set_entropy(self.random_beacon.last_beacon.entropy)
new_beacon = RecoveryMode.generate_beacon(self.random_beacon.last_beacon.entropy(), timeout_qc.view)
self.random_beacon.verify_unhappy(new_beacon, timeout_qc.view)
self.overlay.set_entropy(self.random_beacon.last_beacon.entropy())
def propose_block(self, view: View, quorum: Quorum) -> Event:
beacon = RandomBeacon(
version=0,
context=self.current_view,
entropy=NormalMode.generate_beacon(self.sk, self.current_view),
proof=self.pk
)
event: Event = super().propose_block(view, quorum)
block = event.payload
block = BeaconizedBlock(view=block.view, qc=block.qc, _id=block._id, beacon=beacon)
beacon = NormalMode.generate_beacon(self.sk, block.qc.view)
block = BeaconizedBlock(view=block.view, qc=block.qc, _id=block._id, beacon=beacon, pk = G1Element.from_bytes(self.pk))
event.payload = block
return event

View File

@ -8,66 +8,32 @@ from random import randint
class TestRandomBeaconVerification(TestCase):
@staticmethod
def happy_entropy_and_proof(view: View) -> Tuple[Beacon, Proof]:
def happy_beacon_and_pk(view: View) -> Tuple[RandomBeacon, PublicKey]:
sk = generate_random_sk()
beacon = NormalMode.generate_beacon(sk, view)
return bytes(beacon), bytes(sk.get_g1())
return beacon, sk.get_g1()
@staticmethod
def unhappy_entropy(last_beacon: Beacon, view: View) -> Beacon:
def unhappy_beacon(last_beacon: Entropy, view: View) -> RandomBeacon:
return RecoveryMode.generate_beacon(last_beacon, view)
def setUp(self):
entropy, proof = self.happy_entropy_and_proof(0)
self.beacon = RandomBeaconHandler(
beacon=RandomBeacon(
version=0,
context=0,
entropy=entropy,
proof=proof
)
)
beacon, pk = self.happy_beacon_and_pk(0)
self.beacon = RandomBeaconHandler(beacon)
def test_happy(self):
for i in range(3):
entropy, proof = self.happy_entropy_and_proof(i)
new_beacon = RandomBeacon(
version=0,
context=i,
entropy=entropy,
proof=proof
)
self.beacon.verify_happy(new_beacon)
self.assertEqual(self.beacon.last_beacon.context, 2)
new_beacon, pk = self.happy_beacon_and_pk(i)
self.beacon.verify_happy(new_beacon, pk, i)
def test_unhappy(self):
for i in range(1, 3):
entropy = self.unhappy_entropy(self.beacon.last_beacon.entropy, i)
new_beacon = RandomBeacon(
version=0,
context=i,
entropy=entropy,
proof=b""
)
self.beacon.verify_unhappy(new_beacon)
self.assertEqual(self.beacon.last_beacon.context, 2)
new_beacon = self.unhappy_beacon(self.beacon.last_beacon.entropy(), i)
self.beacon.verify_unhappy(new_beacon, i)
def test_mixed(self):
for i in range(1, 6, 2):
entropy, proof = self.happy_entropy_and_proof(i)
new_beacon = RandomBeacon(
version=0,
context=i,
entropy=entropy,
proof=proof
)
self.beacon.verify_happy(new_beacon)
entropy = self.unhappy_entropy(self.beacon.last_beacon.entropy, i+1)
new_beacon = RandomBeacon(
version=0,
context=i+1,
entropy=entropy,
proof=b""
)
self.beacon.verify_unhappy(new_beacon)
self.assertEqual(self.beacon.last_beacon.context, 6)
new_beacon, pk = self.happy_beacon_and_pk(i)
self.beacon.verify_happy(new_beacon, pk, i)
new_beacon = self.unhappy_beacon(self.beacon.last_beacon.entropy(), i+1)
self.beacon.verify_unhappy(new_beacon, i+1)

View File

@ -6,7 +6,7 @@ from blspy import PrivateKey
from carnot import Id, Carnot, Block, Overlay, Vote, StandardQc, NewView
from beacon import generate_random_sk, RandomBeacon, NormalMode
from beconized_carnot import BeaconizedCarnot, BeaconizedBlock
from beaconized_carnot import BeaconizedCarnot, BeaconizedBlock
from overlay import FlatOverlay, EntropyOverlay
from test_unhappy_path import parents_from_childs
@ -18,7 +18,7 @@ def gen_node(sk: PrivateKey, overlay: Overlay, entropy: bytes = b""):
def succeed(nodes: Dict[Id, BeaconizedCarnot], proposed_block: BeaconizedBlock) -> (List[Vote], EntropyOverlay):
overlay = FlatOverlay(list(nodes.keys()))
overlay.set_entropy(proposed_block.beacon.entropy)
overlay.set_entropy(proposed_block.beacon.entropy())
# broadcast the block
for node in nodes.values():
@ -94,23 +94,19 @@ def fail(nodes: Dict[Id, BeaconizedCarnot], proposed_block: BeaconizedBlock) ->
def add_genesis_block(carnot: BeaconizedCarnot, sk: PrivateKey) -> Block:
entropy = NormalMode.generate_beacon(sk, -1)
beacon = NormalMode.generate_beacon(sk, -1)
genesis_block = BeaconizedBlock(
view=0,
qc=StandardQc(block=b"", view=0),
_id=b"",
beacon=RandomBeacon(
version=0,
context=-1,
entropy=entropy,
proof=bytes(sk.get_g1())
)
beacon=beacon,
pk=sk.get_g1()
)
carnot.safe_blocks[genesis_block.id()] = genesis_block
carnot.receive_block(genesis_block)
carnot.local_high_qc = genesis_block.qc
carnot.current_view = 1
carnot.overlay.set_entropy(entropy)
carnot.overlay.set_entropy(beacon.entropy())
return genesis_block
@ -121,7 +117,7 @@ def initial_setup(test_case: TestCase, size: int) -> (Dict[Id, Carnot], Carnot,
nodes = dict(gen_node(key, FlatOverlay(nodes_ids), bytes(genesis_sk.get_g1())) for key in keys)
genesis_block = None
overlay = FlatOverlay(nodes_ids)
overlay.set_entropy(NormalMode.generate_beacon(genesis_sk, -1))
overlay.set_entropy(NormalMode.generate_beacon(genesis_sk, -1).entropy())
leader: Carnot = nodes[overlay.leader()]
for node in nodes.values():
genesis_block = add_genesis_block(node, genesis_sk)
@ -140,7 +136,7 @@ def initial_setup(test_case: TestCase, size: int) -> (Dict[Id, Carnot], Carnot,
proposed_block = leader.propose_block(1, genesis_votes).payload
test_case.assertIsNotNone(proposed_block)
overlay = FlatOverlay(nodes_ids)
overlay.set_entropy(NormalMode.generate_beacon(genesis_sk, -1))
overlay.set_entropy(genesis_block.beacon.entropy())
return nodes, leader, proposed_block, overlay