From 10c7920b274f61e70531a5318e1ea0e4f61e65f2 Mon Sep 17 00:00:00 2001 From: Yuriy Glukhov Date: Mon, 10 Jun 2019 14:13:53 +0300 Subject: [PATCH] Proto changes to facilitate backward sync (#271) * Proto changes to facilitate backward sync * Update to latest spec types in sync proto * Use blockpool for more straightforward block headers collection * Added BlockPool.getRef * Update beacon_chain/sync_protocol.nim Co-Authored-By: Jacek Sieka --- beacon_chain/block_pool.nim | 8 +- beacon_chain/request_manager.nim | 21 ++-- beacon_chain/spec/datatypes.nim | 15 --- beacon_chain/sync_protocol.nim | 187 ++++++++++++++++++------------- 4 files changed, 125 insertions(+), 106 deletions(-) diff --git a/beacon_chain/block_pool.nim b/beacon_chain/block_pool.nim index fcb4055f1..be5d9fcf0 100644 --- a/beacon_chain/block_pool.nim +++ b/beacon_chain/block_pool.nim @@ -292,6 +292,10 @@ proc add*( (parentSlot.uint64 mod SLOTS_PER_EPOCH.uint64)) ) +proc getRef*(pool: BlockPool, root: Eth2Digest): BlockRef = + ## Retrieve a resolved block reference, if available + result = pool.blocks.getOrDefault(root) + proc get*(pool: BlockPool, blck: BlockRef): BlockData = ## Retrieve the associated block body of a block reference doAssert (not blck.isNil), "Trying to get nil BlockRef" @@ -303,7 +307,7 @@ proc get*(pool: BlockPool, blck: BlockRef): BlockData = proc get*(pool: BlockPool, root: Eth2Digest): Option[BlockData] = ## Retrieve a resolved block reference and its associated body, if available - let refs = pool.blocks.getOrDefault(root) + let refs = pool.getRef(root) if not refs.isNil: some(pool.get(refs)) @@ -313,7 +317,7 @@ proc get*(pool: BlockPool, root: Eth2Digest): Option[BlockData] = proc getOrResolve*(pool: var BlockPool, root: Eth2Digest): BlockRef = ## Fetch a block ref, or nil if not found (will be added to list of ## blocks-to-resolve) - result = pool.blocks.getOrDefault(root) + result = pool.getRef(root) if result.isNil: pool.missing[root] = MissingBlock(slots: 1) diff --git a/beacon_chain/request_manager.nim b/beacon_chain/request_manager.nim index 3f2e9bf7e..c4ffa9120 100644 --- a/beacon_chain/request_manager.nim +++ b/beacon_chain/request_manager.nim @@ -1,8 +1,9 @@ import - options, + options, random, chronos, chronicles, spec/datatypes, - eth2_network, beacon_node_types, sync_protocol + eth2_network, beacon_node_types, sync_protocol, + eth/async_utils proc init*(T: type RequestManager, network: EthereumNode): T = T(network: network) @@ -10,6 +11,12 @@ proc init*(T: type RequestManager, network: EthereumNode): T = type FetchAncestorsResponseHandler = proc (b: BeaconBlock) {.gcsafe.} +proc fetchAncestorBlocksFromPeer(peer: Peer, rec: FetchRecord, responseHandler: FetchAncestorsResponseHandler) {.async.} = + let blocks = await peer.getBeaconBlocks(rec.root, GENESIS_SLOT, rec.historySlots.int, 0, 1) + if blocks.isSome: + for b in blocks.get: + responseHandler(b) + proc fetchAncestorBlocks*(requestManager: RequestManager, roots: seq[FetchRecord], responseHandler: FetchAncestorsResponseHandler) = @@ -26,12 +33,4 @@ proc fetchAncestorBlocks*(requestManager: RequestManager, var fetchComplete = false for peer in requestManager.network.randomPeers(ParallelRequests, BeaconSync): - closureScope: - let response = peer.getAncestorBlocks(roots) - response.addCallback do(arg: pointer): - if not response.failed and response.read.isSome and not fetchComplete: - fetchComplete = true - for blk in response.read.get.blocks: - responseHandler(blk) - else: - debug "Failed to obtain ancestor blocks from peer", peer + traceAsyncErrors peer.fetchAncestorBlocksFromPeer(roots.rand(), responseHandler) diff --git a/beacon_chain/spec/datatypes.nim b/beacon_chain/spec/datatypes.nim index 0085ad4bc..2fffc9440 100644 --- a/beacon_chain/spec/datatypes.nim +++ b/beacon_chain/spec/datatypes.nim @@ -235,21 +235,6 @@ type block_body_root*: Eth2Digest signature*: ValidatorSig - BeaconBlockHeaderRLP* = object - ## Same as BeaconBlock, except `body` is the `hash_tree_root` of the - ## associated BeaconBlockBody. - # TODO: Dry it up with BeaconBlock - # TODO: As a first step, don't change RLP output; only previous user, - # but as with others, randao_reveal and eth1_data move to body. - # This is from before spec had a version. - slot*: uint64 - parent_root*: Eth2Digest - state_root*: Eth2Digest - randao_reveal*: ValidatorSig - eth1_data*: Eth1Data - signature*: ValidatorSig - body*: Eth2Digest - # https://github.com/ethereum/eth2.0-specs/blob/v0.6.3/specs/core/0_beacon-chain.md#beaconblockbody BeaconBlockBody* = object randao_reveal*: ValidatorSig diff --git a/beacon_chain/sync_protocol.nim b/beacon_chain/sync_protocol.nim index f557f7ad4..3688e68ab 100644 --- a/beacon_chain/sync_protocol.nim +++ b/beacon_chain/sync_protocol.nim @@ -1,5 +1,5 @@ import - options, tables, + options, tables, sequtils, algorithm, chronicles, chronos, ranges/bitranges, spec/[datatypes, crypto, digest, helpers], eth/rlp, beacon_node_types, eth2_network, beacon_chain_db, block_pool, time, ssz @@ -28,48 +28,46 @@ const MaxHeadersToRequest = MaxRootsToRequest MaxAncestorBlocksResponse = 256 -func toHeader(b: BeaconBlock): BeaconBlockHeaderRLP = - BeaconBlockHeaderRLP( - slot: b.slot.uint64, - parent_root: b.previous_block_root, +func toHeader(b: BeaconBlock): BeaconBlockHeader = + BeaconBlockHeader( + slot: b.slot, + previous_block_root: b.previous_block_root, state_root: b.state_root, - randao_reveal: b.body.randao_reveal, - eth1_data : b.body.eth1_data, - signature: b.signature, - body: hash_tree_root(b.body) + block_body_root: hash_tree_root(b.body), + signature: b.signature ) -proc fromHeaderAndBody(b: var BeaconBlock, h: BeaconBlockHeaderRLP, body: BeaconBlockBody) = - doAssert(hash_tree_root(body) == h.body) - b.slot = h.slot.Slot - b.previous_block_root = h.parent_root +proc fromHeaderAndBody(b: var BeaconBlock, h: BeaconBlockHeader, body: BeaconBlockBody) = + doAssert(hash_tree_root(body) == h.block_body_root) + b.slot = h.slot + b.previous_block_root = h.previous_block_root b.state_root = h.state_root - b.body.randao_reveal = h.randao_reveal - b.body.eth1_data = h.eth1_data - b.signature = h.signature b.body = body + b.signature = h.signature proc importBlocks(node: BeaconNode, - roots: openarray[(Eth2Digest, Slot)], - headers: openarray[BeaconBlockHeaderRLP], - bodies: openarray[BeaconBlockBody]) = - var bodyMap = initTable[Eth2Digest, int]() + blocks: openarray[BeaconBlock]) = + for blk in blocks: + node.onBeaconBlock(blk) + info "Forward sync imported blocks", len = blocks.len - for i, b in bodies: - bodyMap[hash_tree_root(b)] = i +proc mergeBlockHeadersAndBodies(headers: openarray[BeaconBlockHeader], bodies: openarray[BeaconBlockBody]): Option[seq[BeaconBlock]] = + if bodies.len != headers.len: + info "Cannot merge bodies and headers. Length mismatch.", bodies = bodies.len, headers = headers.len + return - var goodBlocks, badBlocks = 0 - for h in headers: - let iBody = bodyMap.getOrDefault(h.body, -1) - if iBody >= 0: - var blk: BeaconBlock - blk.fromHeaderAndBody(h, bodies[iBody]) - node.onBeaconBlock(blk) - inc goodBlocks - else: - inc badBlocks + var res: seq[BeaconBlock] + for i in 0 ..< headers.len: + if hash_tree_root(bodies[i]) != headers[i].block_body_root: + info "Block body is wrong for header" + return + + res.setLen(res.len + 1) + res[^1].fromHeaderAndBody(headers[i], bodies[i]) + some(res) + +proc getBeaconBlocks*(peer: Peer, blockRoot: Eth2Digest, slot: Slot, maxBlocks, skipSlots: int, backward: uint8): Future[Option[seq[BeaconBlock]]] {.gcsafe, async.} - info "Forward sync imported blocks", goodBlocks, badBlocks, headers = headers.len, bodies = bodies.len, roots = roots.len p2pProtocol BeaconSync(version = 1, shortName = "bcs", @@ -112,40 +110,20 @@ p2pProtocol BeaconSync(version = 1, var s = bestSlot + 1 while s <= m.bestSlot: - debug "Waiting for block roots", fromPeer = peer, remoteBestSlot = m.bestSlot, peer - let r = await peer.getBeaconBlockRoots(s, MaxRootsToRequest) - if not r.isSome: - debug "Block roots not received", peer - break - let roots = r.get.roots - debug "Received block roots", len = roots.len, peer - if roots.len != 0: - if roots.len > MaxRootsToRequest: - # Attack? - await peer.disconnect(BreachOfProtocol, true) + debug "Waiting for block headers", fromPeer = peer, remoteBestSlot = m.bestSlot, peer + let headersLeft = int(m.bestSlot - s) + let blocks = await peer.getBeaconBlocks(bestRoot, s, min(headersLeft, MaxHeadersToRequest), 0, 0) + if blocks.isSome: + if blocks.get.len == 0: + info "Got 0 blocks while syncing", peer break - - let headers = await peer.getBeaconBlockHeaders(bestRoot, s, roots.len, 0) - var bodiesRequest = newSeqOfCap[Eth2Digest](roots.len) - for r in roots: - bodiesRequest.add(r[0]) - - debug "Block headers received. Requesting block bodies", peer - let bodies = await peer.getBeaconBlockBodies(bodiesRequest) - node.importBlocks(roots, headers.get.blockHeaders, bodies.get.blockBodies) - - let lastSlot = roots[^1][1] - if roots.len == MaxRootsToRequest: - # Next batch of roots starts with the last slot of the current one - # to make sure we did not miss any roots with this slot that did - # not fit into the response. - - if s == lastSlot: - info "Too many roots for a single slot while syncing" - break - s = lastSlot - else: - s = lastSlot + 1 + node.importBlocks(blocks.get) + let lastSlot = blocks.get[^1].slot + if lastSlot <= s: + info "Slot did not advance during sync", peer + break + + s = lastSlot + 1 else: break @@ -182,22 +160,51 @@ p2pProtocol BeaconSync(version = 1, blockRoot: Eth2Digest, slot: Slot, maxHeaders: int, - skipSlots: int) {.libp2pProtocol("rpc/beacon_block_headers", "1.0.0").} = - # TODO: validate implement slipSlots + skipSlots: int, + backward: uint8) {.libp2pProtocol("rpc/beacon_block_headers", "1.0.0").} = let maxHeaders = min(MaxHeadersToRequest, maxHeaders) - var s = slot - var headers = newSeqOfCap[BeaconBlockHeaderRLP](maxHeaders) + var headers: seq[BeaconBlockHeader] let db = peer.networkState.db - let blockPool = peer.networkState.node.blockPool - let maxSlot = blockPool.head.blck.slot - while s <= maxSlot: - for r in blockPool.blockRootsForSlot(s): - headers.add(db.getBlock(r).get().toHeader) - if headers.len == maxHeaders: break - s += 1 + + if backward != 0: + # TODO: implement skipSlots + + var blockRoot = blockRoot + if slot != GENESIS_SLOT: + # TODO: Get block from the best chain by slot + # blockRoot = ... + discard + + let blockPool = peer.networkState.node.blockPool + var br = blockPool.getRef(blockRoot) + var blockRefs = newSeqOfCap[BlockRef](maxHeaders) + + while not br.isNil: + blockRefs.add(br) + if blockRefs.len == maxHeaders: + break + br = br.parent + + headers = newSeqOfCap[BeaconBlockHeader](blockRefs.len) + for i in blockRefs.high .. 0: + headers.add(blockPool.get(blockRefs[i]).data.toHeader) + else: + # TODO: This branch has to be revisited and possibly somehow merged with the + # branch above once we can traverse the best chain forward + # TODO: implement skipSlots + headers = newSeqOfCap[BeaconBlockHeader](maxHeaders) + var s = slot + let blockPool = peer.networkState.node.blockPool + let maxSlot = blockPool.head.blck.slot + while s <= maxSlot: + for r in blockPool.blockRootsForSlot(s): + headers.add(db.getBlock(r).get().toHeader) + if headers.len == maxHeaders: break + s += 1 + await response.send(headers) - proc beaconBlockHeaders(peer: Peer, blockHeaders: openarray[BeaconBlockHeaderRLP]) + proc beaconBlockHeaders(peer: Peer, blockHeaders: openarray[BeaconBlockHeader]) requestResponse: proc getAncestorBlocks( @@ -244,9 +251,33 @@ p2pProtocol BeaconSync(version = 1, for r in blockRoots: if (let blk = db.getBlock(r); blk.isSome): bodies.add(blk.get().body) + else: + bodies.setLen(bodies.len + 1) # According to wire spec. Pad with zero body. await response.send(bodies) proc beaconBlockBodies( peer: Peer, blockBodies: openarray[BeaconBlockBody]) +proc getBeaconBlocks*(peer: Peer, blockRoot: Eth2Digest, slot: Slot, maxBlocks, skipSlots: int, backward: uint8): Future[Option[seq[BeaconBlock]]] {.async.} = + ## Retrieve block headers and block bodies from the remote peer, merge them into blocks. + assert(maxBlocks <= MaxHeadersToRequest) + let headersResp = await peer.getBeaconBlockHeaders(blockRoot, slot, maxBlocks, skipSlots, backward) + if headersResp.isNone: return + + let headers = headersResp.get.blockHeaders + if headers.len == 0: + info "Peer has no headers", peer + var res: seq[BeaconBlock] + return some(res) + + let bodiesRequest = headers.mapIt(signing_root(it)) + + debug "Block headers received. Requesting block bodies", peer + let bodiesResp = await peer.getBeaconBlockBodies(bodiesRequest) + if bodiesResp.isNone: + info "Did not receive bodies", peer + return + + result = mergeBlockHeadersAndBodies(headers, bodiesResp.get.blockBodies) + # If result.isNone: disconnect with BreachOfProtocol?