diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index bb6a69a9a..3c4a2ad02 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -213,7 +213,6 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async nickname: nickname, network: network, netKeys: netKeys, - requestManager: RequestManager.init(network), db: db, config: conf, attachedValidators: ValidatorPool.init(), @@ -227,6 +226,11 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async topicAggregateAndProofs: topicAggregateAndProofs, ) + res.requestManager = RequestManager.init(network, + proc(signedBlock: SignedBeaconBlock) = + onBeaconBlock(res, signedBlock) + ) + traceAsyncErrors res.addLocalValidators() # This merely configures the BeaconSync @@ -501,21 +505,8 @@ proc handleMissingBlocks(node: BeaconNode) = let missingBlocks = node.blockPool.checkMissing() if missingBlocks.len > 0: var left = missingBlocks.len - - info "Requesting detected missing blocks", missingBlocks - node.requestManager.fetchAncestorBlocks(missingBlocks) do (b: SignedBeaconBlock): - onBeaconBlock(node, b) - - # TODO instead of waiting for a full second to try the next missing block - # fetching, we'll do it here again in case we get all blocks we asked - # for (there might be new parents to fetch). of course, this is not - # good because the onSecond fetching also kicks in regardless but - # whatever - this is just a quick fix for making the testnet easier - # work with while the sync problem is dealt with more systematically - # dec left - # if left == 0: - # discard setTimer(Moment.now()) do (p: pointer): - # handleMissingBlocks(node) + info "Requesting detected missing blocks", blocks = shortLog(missingBlocks) + node.requestManager.fetchAncestorBlocks(missingBlocks) proc onSecond(node: BeaconNode) {.async.} = ## This procedure will be called once per second. @@ -815,6 +806,8 @@ proc run*(node: BeaconNode) = node.onSecondLoop = runOnSecondLoop(node) node.forwardSyncLoop = runForwardSyncLoop(node) + node.requestManager.start() + # main event loop while status == BeaconNodeStatus.Running: try: @@ -1163,4 +1156,3 @@ programMain: config.depositContractAddress, config.depositPrivateKey, delayGenerator) - diff --git a/beacon_chain/block_pools/block_pools_types.nim b/beacon_chain/block_pools/block_pools_types.nim index b2e69406d..b83fbd3c1 100644 --- a/beacon_chain/block_pools/block_pools_types.nim +++ b/beacon_chain/block_pools/block_pools_types.nim @@ -37,7 +37,7 @@ type ## ## Invalid blocks are dropped immediately. - pending*: Table[Eth2Digest, SignedBeaconBlock] ##\ + orphans*: Table[Eth2Digest, SignedBeaconBlock] ##\ ## Blocks that have passed validation but that we lack a link back to tail ## for - when we receive a "missing link", we can use this data to build ## an entire branch @@ -49,12 +49,10 @@ type inAdd*: bool MissingBlock* = object - slots*: uint64 # number of slots that are suspected missing tries*: int FetchRecord* = object root*: Eth2Digest - historySlots*: uint64 CandidateChains* = ref object ## Pool of blocks responsible for keeping a DAG of resolved blocks. diff --git a/beacon_chain/block_pools/clearance.nim b/beacon_chain/block_pools/clearance.nim index f2798401b..6baf602f3 100644 --- a/beacon_chain/block_pools/clearance.nim +++ b/beacon_chain/block_pools/clearance.nim @@ -12,7 +12,7 @@ import metrics, stew/results, ../ssz/merkleization, ../state_transition, ../extras, ../spec/[crypto, datatypes, digest, helpers, signatures], - block_pools_types, candidate_chains + block_pools_types, candidate_chains, quarantine export results @@ -32,7 +32,7 @@ func getOrResolve*(dag: CandidateChains, quarantine: var Quarantine, root: Eth2D result = dag.getRef(root) if result.isNil: - quarantine.missing[root] = MissingBlock(slots: 1) + quarantine.missing[root] = MissingBlock() proc add*( dag: var CandidateChains, quarantine: var Quarantine, @@ -99,12 +99,12 @@ proc addResolvedBlock( defer: quarantine.inAdd = false var keepGoing = true while keepGoing: - let retries = quarantine.pending + let retries = quarantine.orphans for k, v in retries: discard add(dag, quarantine, k, v) # Keep going for as long as the pending dag is shrinking # TODO inefficient! so what? - keepGoing = quarantine.pending.len < retries.len + keepGoing = quarantine.orphans.len < retries.len blockRef proc add*( @@ -165,9 +165,9 @@ proc add*( return err Invalid - # The block might have been in either of pending or missing - we don't want - # any more work done on its behalf - quarantine.pending.del(blockRoot) + # The block might have been in either of `orphans` or `missing` - we don't + # want any more work done on its behalf + quarantine.orphans.del(blockRoot) # The block is resolved, now it's time to validate it to ensure that the # blocks we add to the database are clean for the given state @@ -209,7 +209,7 @@ proc add*( # the pending dag calls this function back later in a loop, so as long # as dag.add(...) requires a SignedBeaconBlock, easier to keep them in # pending too. - quarantine.pending[blockRoot] = signedBlock + quarantine.add(dag, signedBlock, some(blockRoot)) # TODO possibly, it makes sense to check the database - that would allow sync # to simply fill up the database with random blocks the other clients @@ -217,7 +217,7 @@ proc add*( # junk that's not part of the block graph if blck.parent_root in quarantine.missing or - blck.parent_root in quarantine.pending: + blck.parent_root in quarantine.orphans: return err MissingParent # This is an unresolved block - put its parent on the missing list for now... @@ -232,24 +232,11 @@ proc add*( # filter. # TODO when we receive the block, we don't know how many others we're missing # from that branch, so right now, we'll just do a blind guess - let parentSlot = blck.slot - 1 - - quarantine.missing[blck.parent_root] = MissingBlock( - slots: - # The block is at least two slots ahead - try to grab whole history - if parentSlot > dag.head.blck.slot: - parentSlot - dag.head.blck.slot - else: - # It's a sibling block from a branch that we're missing - fetch one - # epoch at a time - max(1.uint64, SLOTS_PER_EPOCH.uint64 - - (parentSlot.uint64 mod SLOTS_PER_EPOCH.uint64)) - ) debug "Unresolved block (parent missing)", blck = shortLog(blck), blockRoot = shortLog(blockRoot), - pending = quarantine.pending.len, + orphans = quarantine.orphans.len, missing = quarantine.missing.len, cat = "filtering" @@ -345,8 +332,7 @@ proc isValidBeaconBlock*( # not specific to this, but by the pending dag keying on the htr of the # BeaconBlock, not SignedBeaconBlock, opens up certain spoofing attacks. debug "parent unknown, putting block in quarantine" - quarantine.pending[hash_tree_root(signed_beacon_block.message)] = - signed_beacon_block + quarantine.add(dag, signed_beacon_block) return err(MissingParent) # The proposer signature, signed_beacon_block.signature, is valid with diff --git a/beacon_chain/block_pools/quarantine.nim b/beacon_chain/block_pools/quarantine.nim index d224d8ed0..08851a973 100644 --- a/beacon_chain/block_pools/quarantine.nim +++ b/beacon_chain/block_pools/quarantine.nim @@ -6,13 +6,15 @@ # at your option. This file may not be copied, modified, or distributed except according to those terms. import - chronicles, tables, + chronicles, tables, options, stew/bitops2, metrics, - ../spec/digest, - + ../spec/[datatypes, digest], + ../ssz/merkleization, block_pools_types +export options + logScope: topics = "quarant" {.push raises: [Defect].} @@ -35,4 +37,19 @@ func checkMissing*(quarantine: var Quarantine): seq[FetchRecord] = # simple (simplistic?) exponential backoff for retries.. for k, v in quarantine.missing.pairs(): if countOnes(v.tries.uint64) == 1: - result.add(FetchRecord(root: k, historySlots: v.slots)) + result.add(FetchRecord(root: k)) + +func add*(quarantine: var Quarantine, dag: CandidateChains, + sblck: SignedBeaconBlock, + broot: Option[Eth2Digest] = none[Eth2Digest]()) = + ## Adds block to quarantine's `orphans` and `missing` lists. + let blockRoot = if broot.isSome(): + broot.get() + else: + hash_tree_root(sblck.message) + + quarantine.orphans[blockRoot] = sblck + + let parentRoot = sblck.message.parent_root + if parentRoot notin quarantine.missing: + quarantine.missing[parentRoot] = MissingBlock() diff --git a/beacon_chain/request_manager.nim b/beacon_chain/request_manager.nim index 9ad7ec795..24e70d352 100644 --- a/beacon_chain/request_manager.nim +++ b/beacon_chain/request_manager.nim @@ -1,71 +1,133 @@ -import - options, random, - chronos, chronicles, - spec/datatypes, - eth2_network, beacon_node_types, sync_protocol, - eth/async_utils +import options, sequtils, strutils +import chronos, chronicles +import spec/[datatypes, digest], eth2_network, beacon_node_types, sync_protocol, + sync_manager, ssz/merkleization + +logScope: + topics = "requman" + +const + MAX_REQUEST_BLOCKS* = 4 # Specification's value is 1024. + ## Maximum number of blocks, which can be requested by beaconBlocksByRoot. + PARALLEL_REQUESTS* = 2 + ## Number of peers we using to resolve our request. type RequestManager* = object network*: Eth2Node + queue*: AsyncQueue[FetchRecord] + responseHandler*: FetchAncestorsResponseHandler + loopFuture: Future[void] -proc init*(T: type RequestManager, network: Eth2Node): T = - T(network: network) - -type FetchAncestorsResponseHandler = proc (b: SignedBeaconBlock) {.gcsafe.} -proc fetchAncestorBlocksFromPeer( - peer: Peer, - rec: FetchRecord, - responseHandler: FetchAncestorsResponseHandler) {.async.} = - # TODO: It's not clear if this function follows the intention of the - # FetchRecord data type. Perhaps it is supposed to get a range of blocks - # instead. In order to do this, we'll need the slot number of the known - # block to be stored in the FetchRecord, so we can ask for a range of - # blocks starting N positions before this slot number. - try: - let blocks = await peer.beaconBlocksByRoot(BlockRootsList @[rec.root]) - if blocks.isOk: - for b in blocks.get: - responseHandler(b) - except CatchableError as err: - debug "Error while fetching ancestor blocks", - err = err.msg, root = rec.root, peer = peer +func shortLog*(x: seq[Eth2Digest]): string = + "[" & x.mapIt(shortLog(it)).join(", ") & "]" -proc fetchAncestorBlocksFromNetwork( - network: Eth2Node, - rec: FetchRecord, - responseHandler: FetchAncestorsResponseHandler) {.async.} = +func shortLog*(x: seq[FetchRecord]): string = + "[" & x.mapIt(shortLog(it.root)).join(", ") & "]" + +proc init*(T: type RequestManager, network: Eth2Node, + responseCb: FetchAncestorsResponseHandler): T = + T( + network: network, queue: newAsyncQueue[FetchRecord](), + responseHandler: responseCb + ) + +proc checkResponse(roots: openArray[Eth2Digest], + blocks: openArray[SignedBeaconBlock]): bool = + ## This procedure checks peer's response. + var checks = @roots + if len(blocks) > len(roots): + return false + for blk in blocks: + let blockRoot = hash_tree_root(blk.message) + let res = checks.find(blockRoot) + if res == -1: + return false + else: + checks.del(res) + return true + +proc fetchAncestorBlocksFromNetwork(rman: RequestManager, + items: seq[Eth2Digest]) {.async.} = var peer: Peer try: - peer = await network.peerPool.acquire() - let blocks = await peer.beaconBlocksByRoot(BlockRootsList @[rec.root]) + peer = await rman.network.peerPool.acquire() + debug "Requesting blocks by root", peer = peer, blocks = shortLog(items), + peer_score = peer.getScore() + + let blocks = await peer.beaconBlocksByRoot(BlockRootsList items) if blocks.isOk: - for b in blocks.get: - responseHandler(b) - except CatchableError as err: - debug "Error while fetching ancestor blocks", - err = err.msg, root = rec.root, peer = peer + let ublocks = blocks.get() + if checkResponse(items, ublocks): + for b in ublocks: + rman.responseHandler(b) + peer.updateScore(PeerScoreGoodBlocks) + else: + peer.updateScore(PeerScoreBadResponse) + else: + peer.updateScore(PeerScoreNoBlocks) + + except CancelledError as exc: + raise exc + except CatchableError as exc: + debug "Error while fetching ancestor blocks", exc = exc.msg, + items = shortLog(items), peer = peer, peer_score = peer.getScore() + raise exc finally: if not(isNil(peer)): - network.peerPool.release(peer) + rman.network.peerPool.release(peer) -proc fetchAncestorBlocks*(requestManager: RequestManager, - roots: seq[FetchRecord], - responseHandler: FetchAncestorsResponseHandler) = - # TODO: we could have some fancier logic here: - # - # * Keeps track of what was requested - # (this would give a little bit of time for the asked peer to respond) - # - # * Keep track of the average latency of each peer - # (we can give priority to peers with better latency) - # - const ParallelRequests = 2 +proc requestManagerLoop(rman: RequestManager) {.async.} = + var rootList = newSeq[Eth2Digest]() + var workers = newSeq[Future[void]](PARALLEL_REQUESTS) + while true: + try: + rootList.setLen(0) + let req = await rman.queue.popFirst() + rootList.add(req.root) - for i in 0 ..< ParallelRequests: - traceAsyncErrors fetchAncestorBlocksFromNetwork(requestManager.network, - roots.sample(), - responseHandler) + var count = min(MAX_REQUEST_BLOCKS - 1, len(rman.queue)) + while count > 0: + rootList.add(rman.queue.popFirstNoWait().root) + dec(count) + let start = SyncMoment.now(Slot(0)) + + for i in 0 ..< PARALLEL_REQUESTS: + workers[i] = rman.fetchAncestorBlocksFromNetwork(rootList) + + # We do not care about + await allFutures(workers) + + let finish = SyncMoment.now(Slot(0) + uint64(len(rootList))) + + var succeed = 0 + for worker in workers: + if worker.finished() and not(worker.failed()): + inc(succeed) + + debug "Request manager tick", blocks_count = len(rootList), + succeed = succeed, + failed = (len(workers) - succeed), + queue_size = len(rman.queue), + sync_speed = speed(start, finish) + + except CatchableError as exc: + debug "Got a problem in request manager", exc = exc.msg + +proc start*(rman: var RequestManager) = + ## Start Request Manager's loop. + rman.loopFuture = requestManagerLoop(rman) + +proc stop*(rman: RequestManager) = + ## Stop Request Manager's loop. + if not(isNil(rman.loopFuture)): + rman.loopFuture.cancel() + +proc fetchAncestorBlocks*(rman: RequestManager, roots: seq[FetchRecord]) = + ## Enqueue list missing blocks roots ``roots`` for download by + ## Request Manager ``rman``. + for item in roots: + rman.queue.addLastNoWait(item) diff --git a/tests/test_block_pool.nim b/tests/test_block_pool.nim index 53f1509be..d5642bc9b 100644 --- a/tests/test_block_pool.nim +++ b/tests/test_block_pool.nim @@ -178,7 +178,7 @@ suiteReport "Block pool processing" & preset(): check: pool.get(b2Root).isNone() # Unresolved, shouldn't show up - FetchRecord(root: b1Root, historySlots: 1) in pool.checkMissing() + FetchRecord(root: b1Root) in pool.checkMissing() check: pool.add(b1Root, b1).isOk