diff --git a/beacon_chain/beacon_chain_db.nim b/beacon_chain/beacon_chain_db.nim index a515ded68..21363fd20 100644 --- a/beacon_chain/beacon_chain_db.nim +++ b/beacon_chain/beacon_chain_db.nim @@ -13,14 +13,16 @@ type kHashToBlock kHeadBlock # Pointer to the most recent block seen kTailBlock # Pointer to the earliest finalized block + kSlotToBlockRoots func subkey(kind: DbKeyKind): array[1, byte] = result[0] = byte ord(kind) -func subkey[N: static int](kind: DbKeyKind, key: array[N, byte]): - array[N + 1, byte] = - result[0] = byte ord(kind) - result[1 .. ^1] = key +func subkey[T](kind: DbKeyKind, key: T): auto = + var res: array[sizeof(T) + 1, byte] + res[0] = byte ord(kind) + copyMem(addr res[1], unsafeAddr key, sizeof(key)) + return res func subkey(kind: type BeaconState, key: Eth2Digest): auto = subkey(kHashToState, key.data) @@ -32,6 +34,26 @@ proc init*(T: type BeaconChainDB, backend: TrieDatabaseRef): BeaconChainDB = new result result.backend = backend +proc toSeq(v: openarray[byte], ofType: type): seq[ofType] = + if v.len != 0: + assert(v.len mod sizeof(ofType) == 0) + let sz = v.len div sizeof(ofType) + result = newSeq[ofType](sz) + copyMem(addr result[0], unsafeAddr v[0], v.len) + +proc putBlock*(db: BeaconChainDB, key: Eth2Digest, value: BeaconBlock) = + let slotKey = subkey(kSlotToBlockRoots, value.slot) + var blockRootsBytes = db.backend.get(slotKey) + var blockRoots = blockRootsBytes.toSeq(Eth2Digest) + if key notin blockRoots: + db.backend.put(subkey(type value, key), ssz.serialize(value)) + blockRootsBytes.setLen(blockRootsBytes.len + sizeof(key)) + copyMem(addr blockRootsBytes[^sizeof(key)], unsafeAddr key, sizeof(key)) + db.backend.put(slotKey, blockRootsBytes) + +proc putHead*(db: BeaconChainDB, key: Eth2Digest) = + db.backend.put(subkey(kHeadBlock), key.data) # TODO head block? + proc putState*(db: BeaconChainDB, key: Eth2Digest, value: BeaconState) = # TODO: prune old states # TODO: it might be necessary to introduce the concept of a "last finalized @@ -50,9 +72,6 @@ proc putState*(db: BeaconChainDB, key: Eth2Digest, value: BeaconState) = proc putState*(db: BeaconChainDB, value: BeaconState) = db.putState(hash_tree_root_final(value), value) -proc putBlock*(db: BeaconChainDB, key: Eth2Digest, value: BeaconBlock) = - db.backend.put(subkey(type value, key), ssz.serialize(value)) - proc putBlock*(db: BeaconChainDB, value: BeaconBlock) = db.putBlock(hash_tree_root_final(value), value) @@ -81,6 +100,9 @@ proc getHeadBlock*(db: BeaconChainDB): Option[Eth2Digest] = proc getTailBlock*(db: BeaconChainDB): Option[Eth2Digest] = db.get(subkey(kTailBlock), Eth2Digest) +proc getBlockRootsForSlot*(db: BeaconChainDB, slot: uint64): seq[Eth2Digest] = + db.backend.get(subkey(kSlotToBlockRoots, slot)).toSeq(Eth2Digest) + proc containsBlock*( db: BeaconChainDB, key: Eth2Digest): bool = db.backend.contains(subkey(BeaconBlock, key)) diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index b75f76685..47aa1bae0 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -4,7 +4,7 @@ import spec/[datatypes, digest, crypto, beaconstate, helpers, validator], conf, time, state_transition, fork_choice, ssz, beacon_chain_db, validator_pool, extras, attestation_pool, block_pool, - mainchain_monitor, sync_protocol, gossipsub_protocol, trusted_state_snapshots, + mainchain_monitor, gossipsub_protocol, trusted_state_snapshots, eth/trie/db, eth/trie/backends/rocksdb_backend type @@ -15,7 +15,7 @@ type keys*: KeyPair attachedValidators: ValidatorPool blockPool: BlockPool - state: StateData + state*: StateData attestationPool: AttestationPool mainchainMonitor: MainchainMonitor potentialHeads: seq[Eth2Digest] @@ -28,6 +28,12 @@ const topicAttestations = "ethereum/2.1/beacon_chain/attestations" topicfetchBlocks = "ethereum/2.1/beacon_chain/fetch" + +proc onBeaconBlock*(node: BeaconNode, blck: BeaconBlock) {.gcsafe.} + +import sync_protocol + + func shortValidatorKey(node: BeaconNode, validatorIdx: int): string = ($node.state.data.validator_registry[validatorIdx].pubkey)[0..7] @@ -86,6 +92,9 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): T = result.network = newEthereumNode(result.keys, address, 0, nil, clientId, minPeers = 1) + let state = result.network.protocolState(BeaconSync) + state.node = result + state.db = result.db let head = result.blockPool.get(result.db.getHeadBlock().get()) diff --git a/beacon_chain/spec/crypto.nim b/beacon_chain/spec/crypto.nim index d82214a2e..135056631 100644 --- a/beacon_chain/spec/crypto.nim +++ b/beacon_chain/spec/crypto.nim @@ -128,15 +128,20 @@ proc readValue*(reader: var JsonReader, value: var ValidatorPrivKey) {.inline.} proc newPrivKey*(): ValidatorPrivKey = SigKey.random() +# RLP serialization (TODO: remove if no longer necessary) proc append*(writer: var RlpWriter, value: ValidatorPubKey) = writer.append value.getBytes() proc read*(rlp: var Rlp, T: type ValidatorPubKey): T {.inline.} = - ValidatorPubKey.init rlp.toBytes.toOpenArray + let r = rlp.read(seq[byte]) + if not init(result, r): + raise newException(Exception, "Could not init ValidatorPubKey from bytes") proc append*(writer: var RlpWriter, value: ValidatorSig) = writer.append value.getBytes() proc read*(rlp: var Rlp, T: type ValidatorSig): T {.inline.} = - ValidatorSig.init rlp.toBytes.toOpenArray + let r = rlp.read(seq[byte]) + if not init(result, r): + raise newException(Exception, "Could not init ValidatorSig from bytes") diff --git a/beacon_chain/spec/datatypes.nim b/beacon_chain/spec/datatypes.nim index b04153bff..de4c90283 100644 --- a/beacon_chain/spec/datatypes.nim +++ b/beacon_chain/spec/datatypes.nim @@ -339,6 +339,18 @@ type body*: BeaconBlockBody + BeaconBlockHeader* = object + ## Same as BeaconBlock, except `body` is the `hash_tree_root` of the + ## associated BeaconBlockBody. + # TODO: Dry it up with BeaconBlock + slot*: uint64 + parent_root*: Eth2Digest + state_root*: Eth2Digest + randao_reveal*: ValidatorSig + eth1_data*: Eth1Data + signature*: ValidatorSig + body*: Eth2Digest + # https://github.com/ethereum/eth2.0-specs/blob/v0.3.0/specs/core/0_beacon-chain.md#beaconblockbody BeaconBlockBody* = object proposer_slashings*: seq[ProposerSlashing] diff --git a/beacon_chain/sync_protocol.nim b/beacon_chain/sync_protocol.nim index 9af7e2c6c..521dcc519 100644 --- a/beacon_chain/sync_protocol.nim +++ b/beacon_chain/sync_protocol.nim @@ -1,7 +1,8 @@ import - options, + options, tables, chronicles, eth/[rlp, p2p], chronos, ranges/bitranges, eth/p2p/rlpx, - spec/[datatypes, crypto, digest] + spec/[datatypes, crypto, digest], + beacon_node, beacon_chain_db, time, ssz type ValidatorChangeLogEntry* = object @@ -13,8 +14,132 @@ type ValidatorSet = seq[Validator] + BeaconSyncState* = ref object + node*: BeaconNode + db*: BeaconChainDB + +func toHeader(b: BeaconBlock): BeaconBlockHeader = + result.slot = b.slot + result.parent_root = b.parent_root + result.state_root = b.state_root + result.randao_reveal = b.randao_reveal + result.eth1_data = b.eth1_data + result.signature = b.signature + result.body = hash_tree_root_final(b.body) + +proc fromHeader(b: var BeaconBlock, h: BeaconBlockHeader) = + b.slot = h.slot + b.parent_root = h.parent_root + b.state_root = h.state_root + b.randao_reveal = h.randao_reveal + b.eth1_data = h.eth1_data + b.signature = h.signature + +proc importBlocks(node: BeaconNode, roots: openarray[(Eth2Digest, uint64)], headers: openarray[BeaconBlockHeader], bodies: openarray[BeaconBlockBody]) = + var bodyMap = initTable[Eth2Digest, int]() + + for i, b in bodies: + bodyMap[hash_tree_root_final(b)] = i + + var goodBlocks, badBlocks = 0 + for h in headers: + let iBody = bodyMap.getOrDefault(h.body, -1) + if iBody >= 0: + var blk: BeaconBlock + blk.fromHeader(h) + blk.body = bodies[iBody] + node.onBeaconBlock(blk) + inc goodBlocks + else: + inc badBlocks + + info "Forward sync imported blocks", goodBlocks, badBlocks, headers = headers.len, bodies = bodies.len, roots = roots.len + + p2pProtocol BeaconSync(version = 1, - shortName = "bcs"): + shortName = "bcs", + networkState = BeaconSyncState): + + onPeerConnected do(peer: Peer): + const + protocolVersion = 1 # TODO: Spec doesn't specify this yet + networkId = 1 + let node = peer.networkState.node + + var + latestFinalizedRoot: Eth2Digest # TODO + latestFinalizedEpoch: uint64 = node.state.data.finalized_epoch + bestRoot: Eth2Digest # TODO + bestSlot: uint64 = node.state.data.slot + + await peer.status(protocolVersion, networkId, latestFinalizedRoot, latestFinalizedEpoch, + bestRoot, bestSlot) + + let m = await peer.nextMsg(BeaconSync.status) + let bestDiff = cmp((latestFinalizedEpoch, bestSlot), (m.latestFinalizedEpoch, m.bestSlot)) + if bestDiff == 0: + # Nothing to do? + trace "Nothing to sync", peer = peer.node + else: + # TODO: Check for WEAK_SUBJECTIVITY_PERIOD difference and terminate the + # connection if it's too big. + let db = peer.networkState.db + + if bestDiff > 0: + # Send roots + # TODO: Currently we send all block roots in one "packet". Maybe + # they should be split to multiple packets. + type Root = (Eth2Digest, uint64) + var roots = newSeqOfCap[Root](128) + for i in m.bestSlot .. bestSlot: + for r in db.getBlockRootsForSlot(i): + roots.add((r, i)) + + await peer.beaconBlockRoots(roots) + else: + # Receive roots + let roots = await peer.nextMsg(BeaconSync.beaconBlockRoots) + let headers = await peer.getBeaconBlockHeaders(bestRoot, bestSlot, roots.roots.len, 0) + var bodiesRequest = newSeqOfCap[Eth2Digest](roots.roots.len) + for r in roots.roots: + bodiesRequest.add(r[0]) + let bodies = await peer.getBeaconBlockBodies(bodiesRequest) + node.importBlocks(roots.roots, headers.get.blockHeaders, bodies.get.blockBodies) + + proc status(peer: Peer, protocolVersion, networkId: int, latestFinalizedRoot: Eth2Digest, + latestFinalizedEpoch: uint64, bestRoot: Eth2Digest, bestSlot: uint64) + + proc beaconBlockRoots(peer: Peer, roots: openarray[(Eth2Digest, uint64)]) + + requestResponse: + proc getBeaconBlockHeaders(peer: Peer, blockRoot: Eth2Digest, slot: uint64, maxHeaders: int, skipSlots: int) = + # TODO: validate maxHeaders + var s = slot + var headers = newSeqOfCap[BeaconBlockHeader](maxHeaders) + let db = peer.networkState.db + while headers.len < maxHeaders: + let blkRoots = db.getBlockRootsForSlot(s) + for r in blkRoots: + headers.add(db.getBlock(r).get().toHeader) + if headers.len == maxHeaders: break + inc s + await peer.beaconBlockHeaders(reqId, headers) + + proc beaconBlockHeaders(peer: Peer, blockHeaders: openarray[BeaconBlockHeader]) + + requestResponse: + proc getBeaconBlockBodies(peer: Peer, blockRoots: openarray[Eth2Digest]) = + # TODO: Validate blockRoots.len + var bodies = newSeqOfCap[BeaconBlockBody](blockRoots.len) + let db = peer.networkState.db + for r in blockRoots: + if (let blk = db.getBlock(r); blk.isSome): + bodies.add(blk.get().body) + await peer.beaconBlockBodies(reqId, bodies) + + proc beaconBlockBodies(peer: Peer, blockBodies: openarray[BeaconBlockBody]) + + requestResponse: proc getValidatorChangeLog(peer: Peer, changeLogHead: Eth2Digest) = var bb: BeaconBlock