From 4a54fb4103401f04b9aa6d58cd4d1051e15c625e Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Sun, 8 Sep 2019 23:21:59 -0400 Subject: [PATCH] Cleaned up obsolete BeaconSync code; Added some open questions regarding fetchAncestorBlocks --- beacon_chain/request_manager.nim | 14 ++- beacon_chain/sync_protocol.nim | 206 +++---------------------------- 2 files changed, 26 insertions(+), 194 deletions(-) diff --git a/beacon_chain/request_manager.nim b/beacon_chain/request_manager.nim index 21f688042..2588a934a 100644 --- a/beacon_chain/request_manager.nim +++ b/beacon_chain/request_manager.nim @@ -11,10 +11,16 @@ proc init*(T: type RequestManager, network: Eth2Node): T = type FetchAncestorsResponseHandler = proc (b: BeaconBlock) {.gcsafe.} -proc fetchAncestorBlocksFromPeer(peer: Peer, rec: FetchRecord, responseHandler: FetchAncestorsResponseHandler) {.async.} = - # TODO: (zah) Why are we specifying `GENESIS_SLOT` here? - # I'm not sure what this meant for the old code. - let blocks = await peer.getBeaconBlocksSpec(rec.root, GENESIS_SLOT, rec.historySlots, 0'u64, true) +proc fetchAncestorBlocksFromPeer( + peer: Peer, + rec: FetchRecord, + responseHandler: FetchAncestorsResponseHandler) {.async.} = + # TODO: It's not clear if this function follows the intention of the + # FetchRecord data type. Perhaps it is supposed to get a range of blocks + # instead. In order to do this, we'll need the slot number of the known + # block to be stored in the FetchRecord, so we can ask for a range of + # blocks starting N positions before this slot number. + let blocks = await peer.beaconBlocksByRoot([rec.root]) if blocks.isSome: for b in blocks.get: responseHandler(b) diff --git a/beacon_chain/sync_protocol.nim b/beacon_chain/sync_protocol.nim index 35ea45fcc..9f898a051 100644 --- a/beacon_chain/sync_protocol.nim +++ b/beacon_chain/sync_protocol.nim @@ -36,8 +36,7 @@ type slot: Slot const - MaxRootsToRequest = 512'u64 - MaxHeadersToRequest = MaxRootsToRequest + maxBlocksToRequest = 512'u64 MaxAncestorBlocksResponse = 256 func toHeader(b: BeaconBlock): BeaconBlockHeader = @@ -78,15 +77,14 @@ proc mergeBlockHeadersAndBodies(headers: openarray[BeaconBlockHeader], bodies: o res[^1].fromHeaderAndBody(headers[i], bodies[i]) some(res) -proc getBeaconBlocks*(peer: Peer, - blockRoot: Eth2Digest, - slot: Slot, - maxBlocks, skipSlots: uint64, - backward: bool): Future[Option[seq[BeaconBlock]]] {.gcsafe, async.} - -proc getBeaconBlocksSpec*(peer: Peer, blockRoot: Eth2Digest, - slot: Slot, maxBlocks, skipSlots: uint64, - backward: bool): Future[Option[seq[BeaconBlock]]] {.gcsafe, async.} +proc beaconBlocksByRange*( + peer: Peer, + headBlockRoot: Eth2Digest, + start_slot: Slot, + count: uint64, + step: uint64, + timeout: Duration = milliseconds(10000'i64)): + Future[Option[seq[BeaconBlock]]] {.gcsafe.} type HelloMsg = object @@ -124,14 +122,16 @@ proc handleInitialHello(peer: Peer, var s = bestSlot + 1 while s <= h.bestSlot: - debug "Waiting for block headers", fromPeer = peer, remoteBestSlot = h.bestSlot, peer - let headersLeft = uint64(h.bestSlot - s) - let blocks = await peer.getBeaconBlocksSpec(bestRoot, s, min(headersLeft, MaxHeadersToRequest), 0, false) + debug "Waiting for block headers", peer, remoteBestSlot = h.bestSlot + + let numBlocksToRequest = min(uint64(h.bestSlot - s), maxBlocksToRequest) + let blocks = await peer.beaconBlocksByRange(bestRoot, s, + numBlocksToRequest, 1'u64) if blocks.isSome: if blocks.get.len == 0: info "Got 0 blocks while syncing", peer break - node.importBlocks(blocks.get) + node.importBlocks blocks.get let lastSlot = blocks.get[^1].slot if lastSlot <= s: info "Slot did not advance during sync", peer @@ -210,7 +210,7 @@ p2pProtocol BeaconSync(version = 1, proc beaconBlocksByRange( peer: Peer, headBlockRoot: Eth2Digest, - start_slot: uint64, + start_slot: Slot, count: uint64, step: uint64) {. libp2pProtocol("beacon_blocks_by_range", 1).} = @@ -258,177 +258,3 @@ p2pProtocol BeaconSync(version = 1, peer: Peer, blocks: openarray[BeaconBlock]) - requestResponse: - proc getBeaconBlockRoots( - peer: Peer, - fromSlot: Slot, - maxRoots: uint64) {. - libp2pProtocol("beacon_block_roots", 1).} = - let maxRoots = min(MaxRootsToRequest, maxRoots) - var s = fromSlot - var roots = newSeqOfCap[BlockRootSlot](maxRoots) - let blockPool = peer.networkState.node.blockPool - let maxSlot = blockPool.head.blck.slot - while s <= maxSlot: - for r in blockPool.blockRootsForSlot(s): - roots.add BlockRootSlot(blockRoot: r, slot: s) - if roots.len == maxRoots.int: break - s += 1 - await response.write(roots) - - proc beaconBlockRoots( - peer: Peer, - roots: openarray[BlockRootSlot]) - - requestResponse: - proc getBeaconBlockHeaders( - peer: Peer, - blockRoot: Eth2Digest, - slot: Slot, - maxHeaders: uint64, - skipSlots: uint64, - backward: bool) {. - libp2pProtocol("beacon_block_headers", 1).} = - let maxHeaders = min(MaxHeadersToRequest, maxHeaders) - var headers: seq[BeaconBlockHeader] - let db = peer.networkState.db - - if backward: - # TODO: implement skipSlots - - var blockRoot = blockRoot - if slot != GENESIS_SLOT: - # TODO: Get block from the best chain by slot - # blockRoot = ... - discard - - let blockPool = peer.networkState.node.blockPool - var br = blockPool.getRef(blockRoot) - var blockRefs = newSeqOfCap[BlockRef](maxHeaders) - - while not br.isNil: - blockRefs.add(br) - if blockRefs.len == maxHeaders.int: - break - br = br.parent - - headers = newSeqOfCap[BeaconBlockHeader](blockRefs.len) - for i in blockRefs.high .. 0: - headers.add(blockPool.get(blockRefs[i]).data.toHeader) - else: - # TODO: This branch has to be revisited and possibly somehow merged with the - # branch above once we can traverse the best chain forward - # TODO: implement skipSlots - headers = newSeqOfCap[BeaconBlockHeader](maxHeaders) - var s = slot - let blockPool = peer.networkState.node.blockPool - let maxSlot = blockPool.head.blck.slot - while s <= maxSlot: - for r in blockPool.blockRootsForSlot(s): - headers.add(db.getBlock(r).get().toHeader) - if headers.len == maxHeaders.int: break - s += 1 - - await response.send(headers) - - proc beaconBlockHeaders( - peer: Peer, - blockHeaders: openarray[BeaconBlockHeader]) - - # TODO move this at the bottom, because it's not in the spec yet, but it will - # consume a `method_id` - requestResponse: - proc getAncestorBlocks( - peer: Peer, - needed: openarray[FetchRecord]) {. - libp2pProtocol("ancestor_blocks", 1).} = - let db = peer.networkState.db - var neededRoots = initSet[Eth2Digest]() - for rec in needed: neededRoots.incl(rec.root) - var resultsCounter = 0 - - for rec in needed: - if (var blck = db.getBlock(rec.root); blck.isSome()): - # TODO validate historySlots - let firstSlot = blck.get().slot - rec.historySlots - - for i in 0..= MaxAncestorBlocksResponse: - break - - if blck.get().parent_root in neededRoots: - # Don't send duplicate blocks, if neededRoots has roots that are - # in the same chain - break - - if (blck = db.getBlock(blck.get().parent_root); - blck.isNone() or blck.get().slot < firstSlot): - break - - if resultsCounter >= MaxAncestorBlocksResponse: - break - - proc ancestorBlocks( - peer: Peer, - blocks: openarray[BeaconBlock]) - - requestResponse: - proc getBeaconBlockBodies( - peer: Peer, - blockRoots: openarray[Eth2Digest]) {. - libp2pProtocol("beacon_block_bodies", 1).} = - # TODO: Validate blockRoots.len - var bodies = newSeqOfCap[BeaconBlockBody](blockRoots.len) - let db = peer.networkState.db - for r in blockRoots: - if (let blk = db.getBlock(r); blk.isSome): - await response.write(blk.get().body) - - proc beaconBlockBodies( - peer: Peer, - blockBodies: openarray[BeaconBlockBody]) - -proc getBeaconBlocks*(peer: Peer, - blockRoot: Eth2Digest, - slot: Slot, - maxBlocks, skipSlots: uint64, - backward: bool): Future[Option[seq[BeaconBlock]]] {.async.} = - ## Retrieve block headers and block bodies from the remote peer, - ## merge them into blocks. - assert(maxBlocks <= MaxHeadersToRequest) - let headersResp = await peer.getBeaconBlockHeaders(blockRoot, slot, maxBlocks, skipSlots, backward) - if headersResp.isNone: return - - let headers = headersResp.get - if headers.len == 0: - info "Peer has no headers", peer - var res: seq[BeaconBlock] - return some(res) - - let bodiesRequest = headers.mapIt(signing_root(it)) - - debug "Block headers received. Requesting block bodies", peer - let bodiesResp = await peer.getBeaconBlockBodies(bodiesRequest) - if bodiesResp.isNone: - info "Did not receive bodies", peer - return - - result = mergeBlockHeadersAndBodies(headers, bodiesResp.get) - # If result.isNone: disconnect with BreachOfProtocol? - -proc getBeaconBlocksSpec*(peer: Peer, blockRoot: Eth2Digest, slot: Slot, - maxBlocks, skipSlots: uint64, - backward: bool): Future[Option[seq[BeaconBlock]]] {.async.} = - ## Retrieve blocks from the remote peer, according to new network - ## specification. - doAssert(maxBlocks <= MaxHeadersToRequest) - var startSlot = uint64(slot) + skipSlots - var blocksResp = await peer.beaconBlocksByRange(blockRoot, startSlot, - maxBlocks, 1'u64) - if blocksResp.isSome: - let blocks = blocksResp.get - info "Peer returned blocks", peer, count = len(blocks) - return some(blocks) -