Initial full sync impl

This commit is contained in:
Yuriy Glukhov 2019-02-18 12:34:39 +02:00
parent 68a39a21be
commit bed6510da3
No known key found for this signature in database
GPG Key ID: 733560674BB43E6C
5 changed files with 187 additions and 14 deletions

View File

@ -13,14 +13,16 @@ type
kHashToBlock kHashToBlock
kHeadBlock # Pointer to the most recent block seen kHeadBlock # Pointer to the most recent block seen
kTailBlock # Pointer to the earliest finalized block kTailBlock # Pointer to the earliest finalized block
kSlotToBlockRoots
func subkey(kind: DbKeyKind): array[1, byte] = func subkey(kind: DbKeyKind): array[1, byte] =
result[0] = byte ord(kind) result[0] = byte ord(kind)
func subkey[N: static int](kind: DbKeyKind, key: array[N, byte]): func subkey[T](kind: DbKeyKind, key: T): auto =
array[N + 1, byte] = var res: array[sizeof(T) + 1, byte]
result[0] = byte ord(kind) res[0] = byte ord(kind)
result[1 .. ^1] = key copyMem(addr res[1], unsafeAddr key, sizeof(key))
return res
func subkey(kind: type BeaconState, key: Eth2Digest): auto = func subkey(kind: type BeaconState, key: Eth2Digest): auto =
subkey(kHashToState, key.data) subkey(kHashToState, key.data)
@ -32,6 +34,26 @@ proc init*(T: type BeaconChainDB, backend: TrieDatabaseRef): BeaconChainDB =
new result new result
result.backend = backend result.backend = backend
proc toSeq(v: openarray[byte], ofType: type): seq[ofType] =
if v.len != 0:
assert(v.len mod sizeof(ofType) == 0)
let sz = v.len div sizeof(ofType)
result = newSeq[ofType](sz)
copyMem(addr result[0], unsafeAddr v[0], v.len)
proc putBlock*(db: BeaconChainDB, key: Eth2Digest, value: BeaconBlock) =
let slotKey = subkey(kSlotToBlockRoots, value.slot)
var blockRootsBytes = db.backend.get(slotKey)
var blockRoots = blockRootsBytes.toSeq(Eth2Digest)
if key notin blockRoots:
db.backend.put(subkey(type value, key), ssz.serialize(value))
blockRootsBytes.setLen(blockRootsBytes.len + sizeof(key))
copyMem(addr blockRootsBytes[^sizeof(key)], unsafeAddr key, sizeof(key))
db.backend.put(slotKey, blockRootsBytes)
proc putHead*(db: BeaconChainDB, key: Eth2Digest) =
db.backend.put(subkey(kHeadBlock), key.data) # TODO head block?
proc putState*(db: BeaconChainDB, key: Eth2Digest, value: BeaconState) = proc putState*(db: BeaconChainDB, key: Eth2Digest, value: BeaconState) =
# TODO: prune old states # TODO: prune old states
# TODO: it might be necessary to introduce the concept of a "last finalized # TODO: it might be necessary to introduce the concept of a "last finalized
@ -50,9 +72,6 @@ proc putState*(db: BeaconChainDB, key: Eth2Digest, value: BeaconState) =
proc putState*(db: BeaconChainDB, value: BeaconState) = proc putState*(db: BeaconChainDB, value: BeaconState) =
db.putState(hash_tree_root_final(value), value) db.putState(hash_tree_root_final(value), value)
proc putBlock*(db: BeaconChainDB, key: Eth2Digest, value: BeaconBlock) =
db.backend.put(subkey(type value, key), ssz.serialize(value))
proc putBlock*(db: BeaconChainDB, value: BeaconBlock) = proc putBlock*(db: BeaconChainDB, value: BeaconBlock) =
db.putBlock(hash_tree_root_final(value), value) db.putBlock(hash_tree_root_final(value), value)
@ -81,6 +100,9 @@ proc getHeadBlock*(db: BeaconChainDB): Option[Eth2Digest] =
proc getTailBlock*(db: BeaconChainDB): Option[Eth2Digest] = proc getTailBlock*(db: BeaconChainDB): Option[Eth2Digest] =
db.get(subkey(kTailBlock), Eth2Digest) db.get(subkey(kTailBlock), Eth2Digest)
proc getBlockRootsForSlot*(db: BeaconChainDB, slot: uint64): seq[Eth2Digest] =
db.backend.get(subkey(kSlotToBlockRoots, slot)).toSeq(Eth2Digest)
proc containsBlock*( proc containsBlock*(
db: BeaconChainDB, key: Eth2Digest): bool = db: BeaconChainDB, key: Eth2Digest): bool =
db.backend.contains(subkey(BeaconBlock, key)) db.backend.contains(subkey(BeaconBlock, key))

View File

@ -4,7 +4,7 @@ import
spec/[datatypes, digest, crypto, beaconstate, helpers, validator], conf, time, spec/[datatypes, digest, crypto, beaconstate, helpers, validator], conf, time,
state_transition, fork_choice, ssz, beacon_chain_db, validator_pool, extras, state_transition, fork_choice, ssz, beacon_chain_db, validator_pool, extras,
attestation_pool, block_pool, attestation_pool, block_pool,
mainchain_monitor, sync_protocol, gossipsub_protocol, trusted_state_snapshots, mainchain_monitor, gossipsub_protocol, trusted_state_snapshots,
eth/trie/db, eth/trie/backends/rocksdb_backend eth/trie/db, eth/trie/backends/rocksdb_backend
type type
@ -15,7 +15,7 @@ type
keys*: KeyPair keys*: KeyPair
attachedValidators: ValidatorPool attachedValidators: ValidatorPool
blockPool: BlockPool blockPool: BlockPool
state: StateData state*: StateData
attestationPool: AttestationPool attestationPool: AttestationPool
mainchainMonitor: MainchainMonitor mainchainMonitor: MainchainMonitor
potentialHeads: seq[Eth2Digest] potentialHeads: seq[Eth2Digest]
@ -28,6 +28,12 @@ const
topicAttestations = "ethereum/2.1/beacon_chain/attestations" topicAttestations = "ethereum/2.1/beacon_chain/attestations"
topicfetchBlocks = "ethereum/2.1/beacon_chain/fetch" topicfetchBlocks = "ethereum/2.1/beacon_chain/fetch"
proc onBeaconBlock*(node: BeaconNode, blck: BeaconBlock) {.gcsafe.}
import sync_protocol
func shortValidatorKey(node: BeaconNode, validatorIdx: int): string = func shortValidatorKey(node: BeaconNode, validatorIdx: int): string =
($node.state.data.validator_registry[validatorIdx].pubkey)[0..7] ($node.state.data.validator_registry[validatorIdx].pubkey)[0..7]
@ -86,6 +92,9 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): T =
result.network = result.network =
newEthereumNode(result.keys, address, 0, nil, clientId, minPeers = 1) newEthereumNode(result.keys, address, 0, nil, clientId, minPeers = 1)
let state = result.network.protocolState(BeaconSync)
state.node = result
state.db = result.db
let let
head = result.blockPool.get(result.db.getHeadBlock().get()) head = result.blockPool.get(result.db.getHeadBlock().get())

View File

@ -128,15 +128,20 @@ proc readValue*(reader: var JsonReader, value: var ValidatorPrivKey) {.inline.}
proc newPrivKey*(): ValidatorPrivKey = SigKey.random() proc newPrivKey*(): ValidatorPrivKey = SigKey.random()
# RLP serialization (TODO: remove if no longer necessary)
proc append*(writer: var RlpWriter, value: ValidatorPubKey) = proc append*(writer: var RlpWriter, value: ValidatorPubKey) =
writer.append value.getBytes() writer.append value.getBytes()
proc read*(rlp: var Rlp, T: type ValidatorPubKey): T {.inline.} = proc read*(rlp: var Rlp, T: type ValidatorPubKey): T {.inline.} =
ValidatorPubKey.init rlp.toBytes.toOpenArray let r = rlp.read(seq[byte])
if not init(result, r):
raise newException(Exception, "Could not init ValidatorPubKey from bytes")
proc append*(writer: var RlpWriter, value: ValidatorSig) = proc append*(writer: var RlpWriter, value: ValidatorSig) =
writer.append value.getBytes() writer.append value.getBytes()
proc read*(rlp: var Rlp, T: type ValidatorSig): T {.inline.} = proc read*(rlp: var Rlp, T: type ValidatorSig): T {.inline.} =
ValidatorSig.init rlp.toBytes.toOpenArray let r = rlp.read(seq[byte])
if not init(result, r):
raise newException(Exception, "Could not init ValidatorSig from bytes")

View File

@ -339,6 +339,18 @@ type
body*: BeaconBlockBody body*: BeaconBlockBody
BeaconBlockHeader* = object
## Same as BeaconBlock, except `body` is the `hash_tree_root` of the
## associated BeaconBlockBody.
# TODO: Dry it up with BeaconBlock
slot*: uint64
parent_root*: Eth2Digest
state_root*: Eth2Digest
randao_reveal*: ValidatorSig
eth1_data*: Eth1Data
signature*: ValidatorSig
body*: Eth2Digest
# https://github.com/ethereum/eth2.0-specs/blob/v0.3.0/specs/core/0_beacon-chain.md#beaconblockbody # https://github.com/ethereum/eth2.0-specs/blob/v0.3.0/specs/core/0_beacon-chain.md#beaconblockbody
BeaconBlockBody* = object BeaconBlockBody* = object
proposer_slashings*: seq[ProposerSlashing] proposer_slashings*: seq[ProposerSlashing]

View File

@ -1,7 +1,8 @@
import import
options, options, tables,
chronicles, eth/[rlp, p2p], chronos, ranges/bitranges, eth/p2p/rlpx, chronicles, eth/[rlp, p2p], chronos, ranges/bitranges, eth/p2p/rlpx,
spec/[datatypes, crypto, digest] spec/[datatypes, crypto, digest],
beacon_node, beacon_chain_db, time, ssz
type type
ValidatorChangeLogEntry* = object ValidatorChangeLogEntry* = object
@ -13,8 +14,132 @@ type
ValidatorSet = seq[Validator] ValidatorSet = seq[Validator]
BeaconSyncState* = ref object
node*: BeaconNode
db*: BeaconChainDB
func toHeader(b: BeaconBlock): BeaconBlockHeader =
result.slot = b.slot
result.parent_root = b.parent_root
result.state_root = b.state_root
result.randao_reveal = b.randao_reveal
result.eth1_data = b.eth1_data
result.signature = b.signature
result.body = hash_tree_root_final(b.body)
proc fromHeader(b: var BeaconBlock, h: BeaconBlockHeader) =
b.slot = h.slot
b.parent_root = h.parent_root
b.state_root = h.state_root
b.randao_reveal = h.randao_reveal
b.eth1_data = h.eth1_data
b.signature = h.signature
proc importBlocks(node: BeaconNode, roots: openarray[(Eth2Digest, uint64)], headers: openarray[BeaconBlockHeader], bodies: openarray[BeaconBlockBody]) =
var bodyMap = initTable[Eth2Digest, int]()
for i, b in bodies:
bodyMap[hash_tree_root_final(b)] = i
var goodBlocks, badBlocks = 0
for h in headers:
let iBody = bodyMap.getOrDefault(h.body, -1)
if iBody >= 0:
var blk: BeaconBlock
blk.fromHeader(h)
blk.body = bodies[iBody]
node.onBeaconBlock(blk)
inc goodBlocks
else:
inc badBlocks
info "Forward sync imported blocks", goodBlocks, badBlocks, headers = headers.len, bodies = bodies.len, roots = roots.len
p2pProtocol BeaconSync(version = 1, p2pProtocol BeaconSync(version = 1,
shortName = "bcs"): shortName = "bcs",
networkState = BeaconSyncState):
onPeerConnected do(peer: Peer):
const
protocolVersion = 1 # TODO: Spec doesn't specify this yet
networkId = 1
let node = peer.networkState.node
var
latestFinalizedRoot: Eth2Digest # TODO
latestFinalizedEpoch: uint64 = node.state.data.finalized_epoch
bestRoot: Eth2Digest # TODO
bestSlot: uint64 = node.state.data.slot
await peer.status(protocolVersion, networkId, latestFinalizedRoot, latestFinalizedEpoch,
bestRoot, bestSlot)
let m = await peer.nextMsg(BeaconSync.status)
let bestDiff = cmp((latestFinalizedEpoch, bestSlot), (m.latestFinalizedEpoch, m.bestSlot))
if bestDiff == 0:
# Nothing to do?
trace "Nothing to sync", peer = peer.node
else:
# TODO: Check for WEAK_SUBJECTIVITY_PERIOD difference and terminate the
# connection if it's too big.
let db = peer.networkState.db
if bestDiff > 0:
# Send roots
# TODO: Currently we send all block roots in one "packet". Maybe
# they should be split to multiple packets.
type Root = (Eth2Digest, uint64)
var roots = newSeqOfCap[Root](128)
for i in m.bestSlot .. bestSlot:
for r in db.getBlockRootsForSlot(i):
roots.add((r, i))
await peer.beaconBlockRoots(roots)
else:
# Receive roots
let roots = await peer.nextMsg(BeaconSync.beaconBlockRoots)
let headers = await peer.getBeaconBlockHeaders(bestRoot, bestSlot, roots.roots.len, 0)
var bodiesRequest = newSeqOfCap[Eth2Digest](roots.roots.len)
for r in roots.roots:
bodiesRequest.add(r[0])
let bodies = await peer.getBeaconBlockBodies(bodiesRequest)
node.importBlocks(roots.roots, headers.get.blockHeaders, bodies.get.blockBodies)
proc status(peer: Peer, protocolVersion, networkId: int, latestFinalizedRoot: Eth2Digest,
latestFinalizedEpoch: uint64, bestRoot: Eth2Digest, bestSlot: uint64)
proc beaconBlockRoots(peer: Peer, roots: openarray[(Eth2Digest, uint64)])
requestResponse:
proc getBeaconBlockHeaders(peer: Peer, blockRoot: Eth2Digest, slot: uint64, maxHeaders: int, skipSlots: int) =
# TODO: validate maxHeaders
var s = slot
var headers = newSeqOfCap[BeaconBlockHeader](maxHeaders)
let db = peer.networkState.db
while headers.len < maxHeaders:
let blkRoots = db.getBlockRootsForSlot(s)
for r in blkRoots:
headers.add(db.getBlock(r).get().toHeader)
if headers.len == maxHeaders: break
inc s
await peer.beaconBlockHeaders(reqId, headers)
proc beaconBlockHeaders(peer: Peer, blockHeaders: openarray[BeaconBlockHeader])
requestResponse:
proc getBeaconBlockBodies(peer: Peer, blockRoots: openarray[Eth2Digest]) =
# TODO: Validate blockRoots.len
var bodies = newSeqOfCap[BeaconBlockBody](blockRoots.len)
let db = peer.networkState.db
for r in blockRoots:
if (let blk = db.getBlock(r); blk.isSome):
bodies.add(blk.get().body)
await peer.beaconBlockBodies(reqId, bodies)
proc beaconBlockBodies(peer: Peer, blockBodies: openarray[BeaconBlockBody])
requestResponse: requestResponse:
proc getValidatorChangeLog(peer: Peer, changeLogHead: Eth2Digest) = proc getValidatorChangeLog(peer: Peer, changeLogHead: Eth2Digest) =
var bb: BeaconBlock var bb: BeaconBlock