Forward sync refactoring. (#1191)

* Forward sync refactoring.
Rename Quarantine.pending to Quarantine.orphans.
Removing "old" fields.

* Fix test's FetchRecord.

* Fix `checkResponse` to not allow duplicates in response.
This commit is contained in:
Eugene Kabanov 2020-06-18 13:03:36 +03:00 committed by GitHub
parent db870fead4
commit 4436c85ff7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 160 additions and 105 deletions

View File

@ -213,7 +213,6 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async
nickname: nickname,
network: network,
netKeys: netKeys,
requestManager: RequestManager.init(network),
db: db,
config: conf,
attachedValidators: ValidatorPool.init(),
@ -227,6 +226,11 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async
topicAggregateAndProofs: topicAggregateAndProofs,
)
res.requestManager = RequestManager.init(network,
proc(signedBlock: SignedBeaconBlock) =
onBeaconBlock(res, signedBlock)
)
traceAsyncErrors res.addLocalValidators()
# This merely configures the BeaconSync
@ -501,21 +505,8 @@ proc handleMissingBlocks(node: BeaconNode) =
let missingBlocks = node.blockPool.checkMissing()
if missingBlocks.len > 0:
var left = missingBlocks.len
info "Requesting detected missing blocks", missingBlocks
node.requestManager.fetchAncestorBlocks(missingBlocks) do (b: SignedBeaconBlock):
onBeaconBlock(node, b)
# TODO instead of waiting for a full second to try the next missing block
# fetching, we'll do it here again in case we get all blocks we asked
# for (there might be new parents to fetch). of course, this is not
# good because the onSecond fetching also kicks in regardless but
# whatever - this is just a quick fix for making the testnet easier
# work with while the sync problem is dealt with more systematically
# dec left
# if left == 0:
# discard setTimer(Moment.now()) do (p: pointer):
# handleMissingBlocks(node)
info "Requesting detected missing blocks", blocks = shortLog(missingBlocks)
node.requestManager.fetchAncestorBlocks(missingBlocks)
proc onSecond(node: BeaconNode) {.async.} =
## This procedure will be called once per second.
@ -815,6 +806,8 @@ proc run*(node: BeaconNode) =
node.onSecondLoop = runOnSecondLoop(node)
node.forwardSyncLoop = runForwardSyncLoop(node)
node.requestManager.start()
# main event loop
while status == BeaconNodeStatus.Running:
try:
@ -1163,4 +1156,3 @@ programMain:
config.depositContractAddress,
config.depositPrivateKey,
delayGenerator)

View File

@ -37,7 +37,7 @@ type
##
## Invalid blocks are dropped immediately.
pending*: Table[Eth2Digest, SignedBeaconBlock] ##\
orphans*: Table[Eth2Digest, SignedBeaconBlock] ##\
## Blocks that have passed validation but that we lack a link back to tail
## for - when we receive a "missing link", we can use this data to build
## an entire branch
@ -49,12 +49,10 @@ type
inAdd*: bool
MissingBlock* = object
slots*: uint64 # number of slots that are suspected missing
tries*: int
FetchRecord* = object
root*: Eth2Digest
historySlots*: uint64
CandidateChains* = ref object
## Pool of blocks responsible for keeping a DAG of resolved blocks.

View File

@ -12,7 +12,7 @@ import
metrics, stew/results,
../ssz/merkleization, ../state_transition, ../extras,
../spec/[crypto, datatypes, digest, helpers, signatures],
block_pools_types, candidate_chains
block_pools_types, candidate_chains, quarantine
export results
@ -32,7 +32,7 @@ func getOrResolve*(dag: CandidateChains, quarantine: var Quarantine, root: Eth2D
result = dag.getRef(root)
if result.isNil:
quarantine.missing[root] = MissingBlock(slots: 1)
quarantine.missing[root] = MissingBlock()
proc add*(
dag: var CandidateChains, quarantine: var Quarantine,
@ -99,12 +99,12 @@ proc addResolvedBlock(
defer: quarantine.inAdd = false
var keepGoing = true
while keepGoing:
let retries = quarantine.pending
let retries = quarantine.orphans
for k, v in retries:
discard add(dag, quarantine, k, v)
# Keep going for as long as the pending dag is shrinking
# TODO inefficient! so what?
keepGoing = quarantine.pending.len < retries.len
keepGoing = quarantine.orphans.len < retries.len
blockRef
proc add*(
@ -165,9 +165,9 @@ proc add*(
return err Invalid
# The block might have been in either of pending or missing - we don't want
# any more work done on its behalf
quarantine.pending.del(blockRoot)
# The block might have been in either of `orphans` or `missing` - we don't
# want any more work done on its behalf
quarantine.orphans.del(blockRoot)
# The block is resolved, now it's time to validate it to ensure that the
# blocks we add to the database are clean for the given state
@ -209,7 +209,7 @@ proc add*(
# the pending dag calls this function back later in a loop, so as long
# as dag.add(...) requires a SignedBeaconBlock, easier to keep them in
# pending too.
quarantine.pending[blockRoot] = signedBlock
quarantine.add(dag, signedBlock, some(blockRoot))
# TODO possibly, it makes sense to check the database - that would allow sync
# to simply fill up the database with random blocks the other clients
@ -217,7 +217,7 @@ proc add*(
# junk that's not part of the block graph
if blck.parent_root in quarantine.missing or
blck.parent_root in quarantine.pending:
blck.parent_root in quarantine.orphans:
return err MissingParent
# This is an unresolved block - put its parent on the missing list for now...
@ -232,24 +232,11 @@ proc add*(
# filter.
# TODO when we receive the block, we don't know how many others we're missing
# from that branch, so right now, we'll just do a blind guess
let parentSlot = blck.slot - 1
quarantine.missing[blck.parent_root] = MissingBlock(
slots:
# The block is at least two slots ahead - try to grab whole history
if parentSlot > dag.head.blck.slot:
parentSlot - dag.head.blck.slot
else:
# It's a sibling block from a branch that we're missing - fetch one
# epoch at a time
max(1.uint64, SLOTS_PER_EPOCH.uint64 -
(parentSlot.uint64 mod SLOTS_PER_EPOCH.uint64))
)
debug "Unresolved block (parent missing)",
blck = shortLog(blck),
blockRoot = shortLog(blockRoot),
pending = quarantine.pending.len,
orphans = quarantine.orphans.len,
missing = quarantine.missing.len,
cat = "filtering"
@ -345,8 +332,7 @@ proc isValidBeaconBlock*(
# not specific to this, but by the pending dag keying on the htr of the
# BeaconBlock, not SignedBeaconBlock, opens up certain spoofing attacks.
debug "parent unknown, putting block in quarantine"
quarantine.pending[hash_tree_root(signed_beacon_block.message)] =
signed_beacon_block
quarantine.add(dag, signed_beacon_block)
return err(MissingParent)
# The proposer signature, signed_beacon_block.signature, is valid with

View File

@ -6,13 +6,15 @@
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
chronicles, tables,
chronicles, tables, options,
stew/bitops2,
metrics,
../spec/digest,
../spec/[datatypes, digest],
../ssz/merkleization,
block_pools_types
export options
logScope: topics = "quarant"
{.push raises: [Defect].}
@ -35,4 +37,19 @@ func checkMissing*(quarantine: var Quarantine): seq[FetchRecord] =
# simple (simplistic?) exponential backoff for retries..
for k, v in quarantine.missing.pairs():
if countOnes(v.tries.uint64) == 1:
result.add(FetchRecord(root: k, historySlots: v.slots))
result.add(FetchRecord(root: k))
func add*(quarantine: var Quarantine, dag: CandidateChains,
sblck: SignedBeaconBlock,
broot: Option[Eth2Digest] = none[Eth2Digest]()) =
## Adds block to quarantine's `orphans` and `missing` lists.
let blockRoot = if broot.isSome():
broot.get()
else:
hash_tree_root(sblck.message)
quarantine.orphans[blockRoot] = sblck
let parentRoot = sblck.message.parent_root
if parentRoot notin quarantine.missing:
quarantine.missing[parentRoot] = MissingBlock()

View File

@ -1,71 +1,133 @@
import
options, random,
chronos, chronicles,
spec/datatypes,
eth2_network, beacon_node_types, sync_protocol,
eth/async_utils
import options, sequtils, strutils
import chronos, chronicles
import spec/[datatypes, digest], eth2_network, beacon_node_types, sync_protocol,
sync_manager, ssz/merkleization
logScope:
topics = "requman"
const
MAX_REQUEST_BLOCKS* = 4 # Specification's value is 1024.
## Maximum number of blocks, which can be requested by beaconBlocksByRoot.
PARALLEL_REQUESTS* = 2
## Number of peers we using to resolve our request.
type
RequestManager* = object
network*: Eth2Node
queue*: AsyncQueue[FetchRecord]
responseHandler*: FetchAncestorsResponseHandler
loopFuture: Future[void]
proc init*(T: type RequestManager, network: Eth2Node): T =
T(network: network)
type
FetchAncestorsResponseHandler = proc (b: SignedBeaconBlock) {.gcsafe.}
proc fetchAncestorBlocksFromPeer(
peer: Peer,
rec: FetchRecord,
responseHandler: FetchAncestorsResponseHandler) {.async.} =
# TODO: It's not clear if this function follows the intention of the
# FetchRecord data type. Perhaps it is supposed to get a range of blocks
# instead. In order to do this, we'll need the slot number of the known
# block to be stored in the FetchRecord, so we can ask for a range of
# blocks starting N positions before this slot number.
try:
let blocks = await peer.beaconBlocksByRoot(BlockRootsList @[rec.root])
if blocks.isOk:
for b in blocks.get:
responseHandler(b)
except CatchableError as err:
debug "Error while fetching ancestor blocks",
err = err.msg, root = rec.root, peer = peer
func shortLog*(x: seq[Eth2Digest]): string =
"[" & x.mapIt(shortLog(it)).join(", ") & "]"
proc fetchAncestorBlocksFromNetwork(
network: Eth2Node,
rec: FetchRecord,
responseHandler: FetchAncestorsResponseHandler) {.async.} =
func shortLog*(x: seq[FetchRecord]): string =
"[" & x.mapIt(shortLog(it.root)).join(", ") & "]"
proc init*(T: type RequestManager, network: Eth2Node,
responseCb: FetchAncestorsResponseHandler): T =
T(
network: network, queue: newAsyncQueue[FetchRecord](),
responseHandler: responseCb
)
proc checkResponse(roots: openArray[Eth2Digest],
blocks: openArray[SignedBeaconBlock]): bool =
## This procedure checks peer's response.
var checks = @roots
if len(blocks) > len(roots):
return false
for blk in blocks:
let blockRoot = hash_tree_root(blk.message)
let res = checks.find(blockRoot)
if res == -1:
return false
else:
checks.del(res)
return true
proc fetchAncestorBlocksFromNetwork(rman: RequestManager,
items: seq[Eth2Digest]) {.async.} =
var peer: Peer
try:
peer = await network.peerPool.acquire()
let blocks = await peer.beaconBlocksByRoot(BlockRootsList @[rec.root])
peer = await rman.network.peerPool.acquire()
debug "Requesting blocks by root", peer = peer, blocks = shortLog(items),
peer_score = peer.getScore()
let blocks = await peer.beaconBlocksByRoot(BlockRootsList items)
if blocks.isOk:
for b in blocks.get:
responseHandler(b)
except CatchableError as err:
debug "Error while fetching ancestor blocks",
err = err.msg, root = rec.root, peer = peer
let ublocks = blocks.get()
if checkResponse(items, ublocks):
for b in ublocks:
rman.responseHandler(b)
peer.updateScore(PeerScoreGoodBlocks)
else:
peer.updateScore(PeerScoreBadResponse)
else:
peer.updateScore(PeerScoreNoBlocks)
except CancelledError as exc:
raise exc
except CatchableError as exc:
debug "Error while fetching ancestor blocks", exc = exc.msg,
items = shortLog(items), peer = peer, peer_score = peer.getScore()
raise exc
finally:
if not(isNil(peer)):
network.peerPool.release(peer)
rman.network.peerPool.release(peer)
proc fetchAncestorBlocks*(requestManager: RequestManager,
roots: seq[FetchRecord],
responseHandler: FetchAncestorsResponseHandler) =
# TODO: we could have some fancier logic here:
#
# * Keeps track of what was requested
# (this would give a little bit of time for the asked peer to respond)
#
# * Keep track of the average latency of each peer
# (we can give priority to peers with better latency)
#
const ParallelRequests = 2
proc requestManagerLoop(rman: RequestManager) {.async.} =
var rootList = newSeq[Eth2Digest]()
var workers = newSeq[Future[void]](PARALLEL_REQUESTS)
while true:
try:
rootList.setLen(0)
let req = await rman.queue.popFirst()
rootList.add(req.root)
for i in 0 ..< ParallelRequests:
traceAsyncErrors fetchAncestorBlocksFromNetwork(requestManager.network,
roots.sample(),
responseHandler)
var count = min(MAX_REQUEST_BLOCKS - 1, len(rman.queue))
while count > 0:
rootList.add(rman.queue.popFirstNoWait().root)
dec(count)
let start = SyncMoment.now(Slot(0))
for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = rman.fetchAncestorBlocksFromNetwork(rootList)
# We do not care about
await allFutures(workers)
let finish = SyncMoment.now(Slot(0) + uint64(len(rootList)))
var succeed = 0
for worker in workers:
if worker.finished() and not(worker.failed()):
inc(succeed)
debug "Request manager tick", blocks_count = len(rootList),
succeed = succeed,
failed = (len(workers) - succeed),
queue_size = len(rman.queue),
sync_speed = speed(start, finish)
except CatchableError as exc:
debug "Got a problem in request manager", exc = exc.msg
proc start*(rman: var RequestManager) =
## Start Request Manager's loop.
rman.loopFuture = requestManagerLoop(rman)
proc stop*(rman: RequestManager) =
## Stop Request Manager's loop.
if not(isNil(rman.loopFuture)):
rman.loopFuture.cancel()
proc fetchAncestorBlocks*(rman: RequestManager, roots: seq[FetchRecord]) =
## Enqueue list missing blocks roots ``roots`` for download by
## Request Manager ``rman``.
for item in roots:
rman.queue.addLastNoWait(item)

View File

@ -178,7 +178,7 @@ suiteReport "Block pool processing" & preset():
check:
pool.get(b2Root).isNone() # Unresolved, shouldn't show up
FetchRecord(root: b1Root, historySlots: 1) in pool.checkMissing()
FetchRecord(root: b1Root) in pool.checkMissing()
check: pool.add(b1Root, b1).isOk