Proto changes to facilitate backward sync (#271)

* Proto changes to facilitate backward sync

* Update to latest spec types in sync proto

* Use blockpool for more straightforward block headers collection

* Added BlockPool.getRef

* Update beacon_chain/sync_protocol.nim

Co-Authored-By: Jacek Sieka <arnetheduck@gmail.com>
This commit is contained in:
Yuriy Glukhov 2019-06-10 14:13:53 +03:00 committed by Jacek Sieka
parent 252819a84d
commit 10c7920b27
4 changed files with 125 additions and 106 deletions

View File

@ -292,6 +292,10 @@ proc add*(
(parentSlot.uint64 mod SLOTS_PER_EPOCH.uint64)) (parentSlot.uint64 mod SLOTS_PER_EPOCH.uint64))
) )
proc getRef*(pool: BlockPool, root: Eth2Digest): BlockRef =
## Retrieve a resolved block reference, if available
result = pool.blocks.getOrDefault(root)
proc get*(pool: BlockPool, blck: BlockRef): BlockData = proc get*(pool: BlockPool, blck: BlockRef): BlockData =
## Retrieve the associated block body of a block reference ## Retrieve the associated block body of a block reference
doAssert (not blck.isNil), "Trying to get nil BlockRef" doAssert (not blck.isNil), "Trying to get nil BlockRef"
@ -303,7 +307,7 @@ proc get*(pool: BlockPool, blck: BlockRef): BlockData =
proc get*(pool: BlockPool, root: Eth2Digest): Option[BlockData] = proc get*(pool: BlockPool, root: Eth2Digest): Option[BlockData] =
## Retrieve a resolved block reference and its associated body, if available ## Retrieve a resolved block reference and its associated body, if available
let refs = pool.blocks.getOrDefault(root) let refs = pool.getRef(root)
if not refs.isNil: if not refs.isNil:
some(pool.get(refs)) some(pool.get(refs))
@ -313,7 +317,7 @@ proc get*(pool: BlockPool, root: Eth2Digest): Option[BlockData] =
proc getOrResolve*(pool: var BlockPool, root: Eth2Digest): BlockRef = proc getOrResolve*(pool: var BlockPool, root: Eth2Digest): BlockRef =
## Fetch a block ref, or nil if not found (will be added to list of ## Fetch a block ref, or nil if not found (will be added to list of
## blocks-to-resolve) ## blocks-to-resolve)
result = pool.blocks.getOrDefault(root) result = pool.getRef(root)
if result.isNil: if result.isNil:
pool.missing[root] = MissingBlock(slots: 1) pool.missing[root] = MissingBlock(slots: 1)

View File

@ -1,8 +1,9 @@
import import
options, options, random,
chronos, chronicles, chronos, chronicles,
spec/datatypes, spec/datatypes,
eth2_network, beacon_node_types, sync_protocol eth2_network, beacon_node_types, sync_protocol,
eth/async_utils
proc init*(T: type RequestManager, network: EthereumNode): T = proc init*(T: type RequestManager, network: EthereumNode): T =
T(network: network) T(network: network)
@ -10,6 +11,12 @@ proc init*(T: type RequestManager, network: EthereumNode): T =
type type
FetchAncestorsResponseHandler = proc (b: BeaconBlock) {.gcsafe.} FetchAncestorsResponseHandler = proc (b: BeaconBlock) {.gcsafe.}
proc fetchAncestorBlocksFromPeer(peer: Peer, rec: FetchRecord, responseHandler: FetchAncestorsResponseHandler) {.async.} =
let blocks = await peer.getBeaconBlocks(rec.root, GENESIS_SLOT, rec.historySlots.int, 0, 1)
if blocks.isSome:
for b in blocks.get:
responseHandler(b)
proc fetchAncestorBlocks*(requestManager: RequestManager, proc fetchAncestorBlocks*(requestManager: RequestManager,
roots: seq[FetchRecord], roots: seq[FetchRecord],
responseHandler: FetchAncestorsResponseHandler) = responseHandler: FetchAncestorsResponseHandler) =
@ -26,12 +33,4 @@ proc fetchAncestorBlocks*(requestManager: RequestManager,
var fetchComplete = false var fetchComplete = false
for peer in requestManager.network.randomPeers(ParallelRequests, BeaconSync): for peer in requestManager.network.randomPeers(ParallelRequests, BeaconSync):
closureScope: traceAsyncErrors peer.fetchAncestorBlocksFromPeer(roots.rand(), responseHandler)
let response = peer.getAncestorBlocks(roots)
response.addCallback do(arg: pointer):
if not response.failed and response.read.isSome and not fetchComplete:
fetchComplete = true
for blk in response.read.get.blocks:
responseHandler(blk)
else:
debug "Failed to obtain ancestor blocks from peer", peer

View File

@ -235,21 +235,6 @@ type
block_body_root*: Eth2Digest block_body_root*: Eth2Digest
signature*: ValidatorSig signature*: ValidatorSig
BeaconBlockHeaderRLP* = object
## Same as BeaconBlock, except `body` is the `hash_tree_root` of the
## associated BeaconBlockBody.
# TODO: Dry it up with BeaconBlock
# TODO: As a first step, don't change RLP output; only previous user,
# but as with others, randao_reveal and eth1_data move to body.
# This is from before spec had a version.
slot*: uint64
parent_root*: Eth2Digest
state_root*: Eth2Digest
randao_reveal*: ValidatorSig
eth1_data*: Eth1Data
signature*: ValidatorSig
body*: Eth2Digest
# https://github.com/ethereum/eth2.0-specs/blob/v0.6.3/specs/core/0_beacon-chain.md#beaconblockbody # https://github.com/ethereum/eth2.0-specs/blob/v0.6.3/specs/core/0_beacon-chain.md#beaconblockbody
BeaconBlockBody* = object BeaconBlockBody* = object
randao_reveal*: ValidatorSig randao_reveal*: ValidatorSig

View File

@ -1,5 +1,5 @@
import import
options, tables, options, tables, sequtils, algorithm,
chronicles, chronos, ranges/bitranges, chronicles, chronos, ranges/bitranges,
spec/[datatypes, crypto, digest, helpers], eth/rlp, spec/[datatypes, crypto, digest, helpers], eth/rlp,
beacon_node_types, eth2_network, beacon_chain_db, block_pool, time, ssz beacon_node_types, eth2_network, beacon_chain_db, block_pool, time, ssz
@ -28,48 +28,46 @@ const
MaxHeadersToRequest = MaxRootsToRequest MaxHeadersToRequest = MaxRootsToRequest
MaxAncestorBlocksResponse = 256 MaxAncestorBlocksResponse = 256
func toHeader(b: BeaconBlock): BeaconBlockHeaderRLP = func toHeader(b: BeaconBlock): BeaconBlockHeader =
BeaconBlockHeaderRLP( BeaconBlockHeader(
slot: b.slot.uint64, slot: b.slot,
parent_root: b.previous_block_root, previous_block_root: b.previous_block_root,
state_root: b.state_root, state_root: b.state_root,
randao_reveal: b.body.randao_reveal, block_body_root: hash_tree_root(b.body),
eth1_data : b.body.eth1_data, signature: b.signature
signature: b.signature,
body: hash_tree_root(b.body)
) )
proc fromHeaderAndBody(b: var BeaconBlock, h: BeaconBlockHeaderRLP, body: BeaconBlockBody) = proc fromHeaderAndBody(b: var BeaconBlock, h: BeaconBlockHeader, body: BeaconBlockBody) =
doAssert(hash_tree_root(body) == h.body) doAssert(hash_tree_root(body) == h.block_body_root)
b.slot = h.slot.Slot b.slot = h.slot
b.previous_block_root = h.parent_root b.previous_block_root = h.previous_block_root
b.state_root = h.state_root b.state_root = h.state_root
b.body.randao_reveal = h.randao_reveal
b.body.eth1_data = h.eth1_data
b.signature = h.signature
b.body = body b.body = body
b.signature = h.signature
proc importBlocks(node: BeaconNode, proc importBlocks(node: BeaconNode,
roots: openarray[(Eth2Digest, Slot)], blocks: openarray[BeaconBlock]) =
headers: openarray[BeaconBlockHeaderRLP], for blk in blocks:
bodies: openarray[BeaconBlockBody]) = node.onBeaconBlock(blk)
var bodyMap = initTable[Eth2Digest, int]() info "Forward sync imported blocks", len = blocks.len
for i, b in bodies: proc mergeBlockHeadersAndBodies(headers: openarray[BeaconBlockHeader], bodies: openarray[BeaconBlockBody]): Option[seq[BeaconBlock]] =
bodyMap[hash_tree_root(b)] = i if bodies.len != headers.len:
info "Cannot merge bodies and headers. Length mismatch.", bodies = bodies.len, headers = headers.len
return
var goodBlocks, badBlocks = 0 var res: seq[BeaconBlock]
for h in headers: for i in 0 ..< headers.len:
let iBody = bodyMap.getOrDefault(h.body, -1) if hash_tree_root(bodies[i]) != headers[i].block_body_root:
if iBody >= 0: info "Block body is wrong for header"
var blk: BeaconBlock return
blk.fromHeaderAndBody(h, bodies[iBody])
node.onBeaconBlock(blk) res.setLen(res.len + 1)
inc goodBlocks res[^1].fromHeaderAndBody(headers[i], bodies[i])
else: some(res)
inc badBlocks
proc getBeaconBlocks*(peer: Peer, blockRoot: Eth2Digest, slot: Slot, maxBlocks, skipSlots: int, backward: uint8): Future[Option[seq[BeaconBlock]]] {.gcsafe, async.}
info "Forward sync imported blocks", goodBlocks, badBlocks, headers = headers.len, bodies = bodies.len, roots = roots.len
p2pProtocol BeaconSync(version = 1, p2pProtocol BeaconSync(version = 1,
shortName = "bcs", shortName = "bcs",
@ -112,40 +110,20 @@ p2pProtocol BeaconSync(version = 1,
var s = bestSlot + 1 var s = bestSlot + 1
while s <= m.bestSlot: while s <= m.bestSlot:
debug "Waiting for block roots", fromPeer = peer, remoteBestSlot = m.bestSlot, peer debug "Waiting for block headers", fromPeer = peer, remoteBestSlot = m.bestSlot, peer
let r = await peer.getBeaconBlockRoots(s, MaxRootsToRequest) let headersLeft = int(m.bestSlot - s)
if not r.isSome: let blocks = await peer.getBeaconBlocks(bestRoot, s, min(headersLeft, MaxHeadersToRequest), 0, 0)
debug "Block roots not received", peer if blocks.isSome:
break if blocks.get.len == 0:
let roots = r.get.roots info "Got 0 blocks while syncing", peer
debug "Received block roots", len = roots.len, peer
if roots.len != 0:
if roots.len > MaxRootsToRequest:
# Attack?
await peer.disconnect(BreachOfProtocol, true)
break break
node.importBlocks(blocks.get)
let headers = await peer.getBeaconBlockHeaders(bestRoot, s, roots.len, 0) let lastSlot = blocks.get[^1].slot
var bodiesRequest = newSeqOfCap[Eth2Digest](roots.len) if lastSlot <= s:
for r in roots: info "Slot did not advance during sync", peer
bodiesRequest.add(r[0]) break
debug "Block headers received. Requesting block bodies", peer s = lastSlot + 1
let bodies = await peer.getBeaconBlockBodies(bodiesRequest)
node.importBlocks(roots, headers.get.blockHeaders, bodies.get.blockBodies)
let lastSlot = roots[^1][1]
if roots.len == MaxRootsToRequest:
# Next batch of roots starts with the last slot of the current one
# to make sure we did not miss any roots with this slot that did
# not fit into the response.
if s == lastSlot:
info "Too many roots for a single slot while syncing"
break
s = lastSlot
else:
s = lastSlot + 1
else: else:
break break
@ -182,22 +160,51 @@ p2pProtocol BeaconSync(version = 1,
blockRoot: Eth2Digest, blockRoot: Eth2Digest,
slot: Slot, slot: Slot,
maxHeaders: int, maxHeaders: int,
skipSlots: int) {.libp2pProtocol("rpc/beacon_block_headers", "1.0.0").} = skipSlots: int,
# TODO: validate implement slipSlots backward: uint8) {.libp2pProtocol("rpc/beacon_block_headers", "1.0.0").} =
let maxHeaders = min(MaxHeadersToRequest, maxHeaders) let maxHeaders = min(MaxHeadersToRequest, maxHeaders)
var s = slot var headers: seq[BeaconBlockHeader]
var headers = newSeqOfCap[BeaconBlockHeaderRLP](maxHeaders)
let db = peer.networkState.db let db = peer.networkState.db
let blockPool = peer.networkState.node.blockPool
let maxSlot = blockPool.head.blck.slot if backward != 0:
while s <= maxSlot: # TODO: implement skipSlots
for r in blockPool.blockRootsForSlot(s):
headers.add(db.getBlock(r).get().toHeader) var blockRoot = blockRoot
if headers.len == maxHeaders: break if slot != GENESIS_SLOT:
s += 1 # TODO: Get block from the best chain by slot
# blockRoot = ...
discard
let blockPool = peer.networkState.node.blockPool
var br = blockPool.getRef(blockRoot)
var blockRefs = newSeqOfCap[BlockRef](maxHeaders)
while not br.isNil:
blockRefs.add(br)
if blockRefs.len == maxHeaders:
break
br = br.parent
headers = newSeqOfCap[BeaconBlockHeader](blockRefs.len)
for i in blockRefs.high .. 0:
headers.add(blockPool.get(blockRefs[i]).data.toHeader)
else:
# TODO: This branch has to be revisited and possibly somehow merged with the
# branch above once we can traverse the best chain forward
# TODO: implement skipSlots
headers = newSeqOfCap[BeaconBlockHeader](maxHeaders)
var s = slot
let blockPool = peer.networkState.node.blockPool
let maxSlot = blockPool.head.blck.slot
while s <= maxSlot:
for r in blockPool.blockRootsForSlot(s):
headers.add(db.getBlock(r).get().toHeader)
if headers.len == maxHeaders: break
s += 1
await response.send(headers) await response.send(headers)
proc beaconBlockHeaders(peer: Peer, blockHeaders: openarray[BeaconBlockHeaderRLP]) proc beaconBlockHeaders(peer: Peer, blockHeaders: openarray[BeaconBlockHeader])
requestResponse: requestResponse:
proc getAncestorBlocks( proc getAncestorBlocks(
@ -244,9 +251,33 @@ p2pProtocol BeaconSync(version = 1,
for r in blockRoots: for r in blockRoots:
if (let blk = db.getBlock(r); blk.isSome): if (let blk = db.getBlock(r); blk.isSome):
bodies.add(blk.get().body) bodies.add(blk.get().body)
else:
bodies.setLen(bodies.len + 1) # According to wire spec. Pad with zero body.
await response.send(bodies) await response.send(bodies)
proc beaconBlockBodies( proc beaconBlockBodies(
peer: Peer, peer: Peer,
blockBodies: openarray[BeaconBlockBody]) blockBodies: openarray[BeaconBlockBody])
proc getBeaconBlocks*(peer: Peer, blockRoot: Eth2Digest, slot: Slot, maxBlocks, skipSlots: int, backward: uint8): Future[Option[seq[BeaconBlock]]] {.async.} =
## Retrieve block headers and block bodies from the remote peer, merge them into blocks.
assert(maxBlocks <= MaxHeadersToRequest)
let headersResp = await peer.getBeaconBlockHeaders(blockRoot, slot, maxBlocks, skipSlots, backward)
if headersResp.isNone: return
let headers = headersResp.get.blockHeaders
if headers.len == 0:
info "Peer has no headers", peer
var res: seq[BeaconBlock]
return some(res)
let bodiesRequest = headers.mapIt(signing_root(it))
debug "Block headers received. Requesting block bodies", peer
let bodiesResp = await peer.getBeaconBlockBodies(bodiesRequest)
if bodiesResp.isNone:
info "Did not receive bodies", peer
return
result = mergeBlockHeadersAndBodies(headers, bodiesResp.get.blockBodies)
# If result.isNone: disconnect with BreachOfProtocol?