Cleaned up obsolete BeaconSync code; Added some open questions regarding fetchAncestorBlocks

This commit is contained in:
Zahary Karadjov 2019-09-08 23:21:59 -04:00 committed by zah
parent 9dec05f9c9
commit 4a54fb4103
2 changed files with 26 additions and 194 deletions

View File

@ -11,10 +11,16 @@ proc init*(T: type RequestManager, network: Eth2Node): T =
type type
FetchAncestorsResponseHandler = proc (b: BeaconBlock) {.gcsafe.} FetchAncestorsResponseHandler = proc (b: BeaconBlock) {.gcsafe.}
proc fetchAncestorBlocksFromPeer(peer: Peer, rec: FetchRecord, responseHandler: FetchAncestorsResponseHandler) {.async.} = proc fetchAncestorBlocksFromPeer(
# TODO: (zah) Why are we specifying `GENESIS_SLOT` here? peer: Peer,
# I'm not sure what this meant for the old code. rec: FetchRecord,
let blocks = await peer.getBeaconBlocksSpec(rec.root, GENESIS_SLOT, rec.historySlots, 0'u64, true) responseHandler: FetchAncestorsResponseHandler) {.async.} =
# TODO: It's not clear if this function follows the intention of the
# FetchRecord data type. Perhaps it is supposed to get a range of blocks
# instead. In order to do this, we'll need the slot number of the known
# block to be stored in the FetchRecord, so we can ask for a range of
# blocks starting N positions before this slot number.
let blocks = await peer.beaconBlocksByRoot([rec.root])
if blocks.isSome: if blocks.isSome:
for b in blocks.get: for b in blocks.get:
responseHandler(b) responseHandler(b)

View File

@ -36,8 +36,7 @@ type
slot: Slot slot: Slot
const const
MaxRootsToRequest = 512'u64 maxBlocksToRequest = 512'u64
MaxHeadersToRequest = MaxRootsToRequest
MaxAncestorBlocksResponse = 256 MaxAncestorBlocksResponse = 256
func toHeader(b: BeaconBlock): BeaconBlockHeader = func toHeader(b: BeaconBlock): BeaconBlockHeader =
@ -78,15 +77,14 @@ proc mergeBlockHeadersAndBodies(headers: openarray[BeaconBlockHeader], bodies: o
res[^1].fromHeaderAndBody(headers[i], bodies[i]) res[^1].fromHeaderAndBody(headers[i], bodies[i])
some(res) some(res)
proc getBeaconBlocks*(peer: Peer, proc beaconBlocksByRange*(
blockRoot: Eth2Digest, peer: Peer,
slot: Slot, headBlockRoot: Eth2Digest,
maxBlocks, skipSlots: uint64, start_slot: Slot,
backward: bool): Future[Option[seq[BeaconBlock]]] {.gcsafe, async.} count: uint64,
step: uint64,
proc getBeaconBlocksSpec*(peer: Peer, blockRoot: Eth2Digest, timeout: Duration = milliseconds(10000'i64)):
slot: Slot, maxBlocks, skipSlots: uint64, Future[Option[seq[BeaconBlock]]] {.gcsafe.}
backward: bool): Future[Option[seq[BeaconBlock]]] {.gcsafe, async.}
type type
HelloMsg = object HelloMsg = object
@ -124,14 +122,16 @@ proc handleInitialHello(peer: Peer,
var s = bestSlot + 1 var s = bestSlot + 1
while s <= h.bestSlot: while s <= h.bestSlot:
debug "Waiting for block headers", fromPeer = peer, remoteBestSlot = h.bestSlot, peer debug "Waiting for block headers", peer, remoteBestSlot = h.bestSlot
let headersLeft = uint64(h.bestSlot - s)
let blocks = await peer.getBeaconBlocksSpec(bestRoot, s, min(headersLeft, MaxHeadersToRequest), 0, false) let numBlocksToRequest = min(uint64(h.bestSlot - s), maxBlocksToRequest)
let blocks = await peer.beaconBlocksByRange(bestRoot, s,
numBlocksToRequest, 1'u64)
if blocks.isSome: if blocks.isSome:
if blocks.get.len == 0: if blocks.get.len == 0:
info "Got 0 blocks while syncing", peer info "Got 0 blocks while syncing", peer
break break
node.importBlocks(blocks.get) node.importBlocks blocks.get
let lastSlot = blocks.get[^1].slot let lastSlot = blocks.get[^1].slot
if lastSlot <= s: if lastSlot <= s:
info "Slot did not advance during sync", peer info "Slot did not advance during sync", peer
@ -210,7 +210,7 @@ p2pProtocol BeaconSync(version = 1,
proc beaconBlocksByRange( proc beaconBlocksByRange(
peer: Peer, peer: Peer,
headBlockRoot: Eth2Digest, headBlockRoot: Eth2Digest,
start_slot: uint64, start_slot: Slot,
count: uint64, count: uint64,
step: uint64) {. step: uint64) {.
libp2pProtocol("beacon_blocks_by_range", 1).} = libp2pProtocol("beacon_blocks_by_range", 1).} =
@ -258,177 +258,3 @@ p2pProtocol BeaconSync(version = 1,
peer: Peer, peer: Peer,
blocks: openarray[BeaconBlock]) blocks: openarray[BeaconBlock])
requestResponse:
proc getBeaconBlockRoots(
peer: Peer,
fromSlot: Slot,
maxRoots: uint64) {.
libp2pProtocol("beacon_block_roots", 1).} =
let maxRoots = min(MaxRootsToRequest, maxRoots)
var s = fromSlot
var roots = newSeqOfCap[BlockRootSlot](maxRoots)
let blockPool = peer.networkState.node.blockPool
let maxSlot = blockPool.head.blck.slot
while s <= maxSlot:
for r in blockPool.blockRootsForSlot(s):
roots.add BlockRootSlot(blockRoot: r, slot: s)
if roots.len == maxRoots.int: break
s += 1
await response.write(roots)
proc beaconBlockRoots(
peer: Peer,
roots: openarray[BlockRootSlot])
requestResponse:
proc getBeaconBlockHeaders(
peer: Peer,
blockRoot: Eth2Digest,
slot: Slot,
maxHeaders: uint64,
skipSlots: uint64,
backward: bool) {.
libp2pProtocol("beacon_block_headers", 1).} =
let maxHeaders = min(MaxHeadersToRequest, maxHeaders)
var headers: seq[BeaconBlockHeader]
let db = peer.networkState.db
if backward:
# TODO: implement skipSlots
var blockRoot = blockRoot
if slot != GENESIS_SLOT:
# TODO: Get block from the best chain by slot
# blockRoot = ...
discard
let blockPool = peer.networkState.node.blockPool
var br = blockPool.getRef(blockRoot)
var blockRefs = newSeqOfCap[BlockRef](maxHeaders)
while not br.isNil:
blockRefs.add(br)
if blockRefs.len == maxHeaders.int:
break
br = br.parent
headers = newSeqOfCap[BeaconBlockHeader](blockRefs.len)
for i in blockRefs.high .. 0:
headers.add(blockPool.get(blockRefs[i]).data.toHeader)
else:
# TODO: This branch has to be revisited and possibly somehow merged with the
# branch above once we can traverse the best chain forward
# TODO: implement skipSlots
headers = newSeqOfCap[BeaconBlockHeader](maxHeaders)
var s = slot
let blockPool = peer.networkState.node.blockPool
let maxSlot = blockPool.head.blck.slot
while s <= maxSlot:
for r in blockPool.blockRootsForSlot(s):
headers.add(db.getBlock(r).get().toHeader)
if headers.len == maxHeaders.int: break
s += 1
await response.send(headers)
proc beaconBlockHeaders(
peer: Peer,
blockHeaders: openarray[BeaconBlockHeader])
# TODO move this at the bottom, because it's not in the spec yet, but it will
# consume a `method_id`
requestResponse:
proc getAncestorBlocks(
peer: Peer,
needed: openarray[FetchRecord]) {.
libp2pProtocol("ancestor_blocks", 1).} =
let db = peer.networkState.db
var neededRoots = initSet[Eth2Digest]()
for rec in needed: neededRoots.incl(rec.root)
var resultsCounter = 0
for rec in needed:
if (var blck = db.getBlock(rec.root); blck.isSome()):
# TODO validate historySlots
let firstSlot = blck.get().slot - rec.historySlots
for i in 0..<rec.historySlots.int:
await response.write(blck.get())
inc resultsCounter
if resultsCounter >= MaxAncestorBlocksResponse:
break
if blck.get().parent_root in neededRoots:
# Don't send duplicate blocks, if neededRoots has roots that are
# in the same chain
break
if (blck = db.getBlock(blck.get().parent_root);
blck.isNone() or blck.get().slot < firstSlot):
break
if resultsCounter >= MaxAncestorBlocksResponse:
break
proc ancestorBlocks(
peer: Peer,
blocks: openarray[BeaconBlock])
requestResponse:
proc getBeaconBlockBodies(
peer: Peer,
blockRoots: openarray[Eth2Digest]) {.
libp2pProtocol("beacon_block_bodies", 1).} =
# TODO: Validate blockRoots.len
var bodies = newSeqOfCap[BeaconBlockBody](blockRoots.len)
let db = peer.networkState.db
for r in blockRoots:
if (let blk = db.getBlock(r); blk.isSome):
await response.write(blk.get().body)
proc beaconBlockBodies(
peer: Peer,
blockBodies: openarray[BeaconBlockBody])
proc getBeaconBlocks*(peer: Peer,
blockRoot: Eth2Digest,
slot: Slot,
maxBlocks, skipSlots: uint64,
backward: bool): Future[Option[seq[BeaconBlock]]] {.async.} =
## Retrieve block headers and block bodies from the remote peer,
## merge them into blocks.
assert(maxBlocks <= MaxHeadersToRequest)
let headersResp = await peer.getBeaconBlockHeaders(blockRoot, slot, maxBlocks, skipSlots, backward)
if headersResp.isNone: return
let headers = headersResp.get
if headers.len == 0:
info "Peer has no headers", peer
var res: seq[BeaconBlock]
return some(res)
let bodiesRequest = headers.mapIt(signing_root(it))
debug "Block headers received. Requesting block bodies", peer
let bodiesResp = await peer.getBeaconBlockBodies(bodiesRequest)
if bodiesResp.isNone:
info "Did not receive bodies", peer
return
result = mergeBlockHeadersAndBodies(headers, bodiesResp.get)
# If result.isNone: disconnect with BreachOfProtocol?
proc getBeaconBlocksSpec*(peer: Peer, blockRoot: Eth2Digest, slot: Slot,
maxBlocks, skipSlots: uint64,
backward: bool): Future[Option[seq[BeaconBlock]]] {.async.} =
## Retrieve blocks from the remote peer, according to new network
## specification.
doAssert(maxBlocks <= MaxHeadersToRequest)
var startSlot = uint64(slot) + skipSlots
var blocksResp = await peer.beaconBlocksByRange(blockRoot, startSlot,
maxBlocks, 1'u64)
if blocksResp.isSome:
let blocks = blocksResp.get
info "Peer returned blocks", peer, count = len(blocks)
return some(blocks)