Use one single async queue and loop for processing blocks. (#1487)

* Initial commit

* Fix compilation problem.

* Address review comments.
This commit is contained in:
Eugene Kabanov 2020-08-12 12:29:11 +03:00 committed by GitHub
parent 5da25e76be
commit 711f1f88ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 107 additions and 85 deletions

View File

@ -284,13 +284,9 @@ proc init*(T: type BeaconNode,
forkDigest: enrForkId.forkDigest,
topicBeaconBlocks: topicBeaconBlocks,
topicAggregateAndProofs: topicAggregateAndProofs,
blocksQueue: newAsyncQueue[SyncBlock](1),
)
res.requestManager = RequestManager.init(network,
proc(signedBlock: SignedBeaconBlock) =
onBeaconBlock(res, signedBlock)
)
res.requestManager = RequestManager.init(network, res.blocksQueue)
res.addLocalValidators()
# This merely configures the BeaconSync
@ -560,14 +556,45 @@ proc runOnSecondLoop(node: BeaconNode) {.async.} =
ticks_delay.set(sleepTime.nanoseconds.float / nanosecondsIn1s)
debug "onSecond task completed", sleepTime, processingTime
proc runForwardSyncLoop(node: BeaconNode) {.async.} =
proc importBlock(node: BeaconNode,
sblock: SignedBeaconBlock): Result[void, BlockError] =
let sm1 = now(chronos.Moment)
let res = node.storeBlock(sblock)
let em1 = now(chronos.Moment)
if res.isOk() or (res.error() in {BlockError.Duplicate, BlockError.Old}):
let sm2 = now(chronos.Moment)
discard node.updateHead(node.beaconClock.now().slotOrZero)
let em2 = now(chronos.Moment)
let storeBlockDuration = if res.isOk(): em1 - sm1 else: ZeroDuration
let updateHeadDuration = if res.isOk(): em2 - sm2 else: ZeroDuration
let overallDuration = if res.isOk(): em2 - sm1 else: ZeroDuration
let storeSpeed =
block:
let secs = float(chronos.seconds(1).nanoseconds)
if not(overallDuration.isZero()):
let v = secs / float(overallDuration.nanoseconds)
round(v * 10_000) / 10_000
else:
0.0
debug "Block got imported successfully",
local_head_slot = node.chainDag.head.slot, store_speed = storeSpeed,
block_root = shortLog(sblock.root),
block_slot = sblock.message.slot,
store_block_duration = $storeBlockDuration,
update_head_duration = $updateHeadDuration,
overall_duration = $overallDuration
ok()
else:
err(res.error())
proc startSyncManager(node: BeaconNode) =
func getLocalHeadSlot(): Slot =
result = node.chainDag.head.slot
node.chainDag.head.slot
proc getLocalWallSlot(): Slot {.gcsafe.} =
let epoch = node.beaconClock.now().slotOrZero.compute_epoch_at_slot() +
1'u64
result = epoch.compute_start_slot_at_epoch()
epoch.compute_start_slot_at_epoch()
func getFirstSlotAtFinalizedEpoch(): Slot {.gcsafe.} =
let fepoch = node.chainDag.headState.data.data.finalized_checkpoint.epoch
@ -581,49 +608,23 @@ proc runForwardSyncLoop(node: BeaconNode) {.async.} =
score_high_limit = PeerScoreHighLimit
except:
discard
result = false
false
else:
result = true
true
node.network.peerPool.setScoreCheck(scoreCheck)
node.syncManager = newSyncManager[Peer, PeerID](
node.network.peerPool, getLocalHeadSlot, getLocalWallSlot,
getFirstSlotAtFinalizedEpoch, chunkSize = 32
getFirstSlotAtFinalizedEpoch, node.blocksQueue, chunkSize = 32
)
node.syncManager.start()
proc runBlockProcessingLoop(node: BeaconNode) {.async.} =
## Incoming blocks processing loop.
while true:
let sblock = await node.syncManager.getBlock()
let sm1 = now(chronos.Moment)
let res = node.storeBlock(sblock.blk)
let em1 = now(chronos.Moment)
if res.isOk() or (res.error() in {BlockError.Duplicate, BlockError.Old}):
let sm2 = now(chronos.Moment)
discard node.updateHead(node.beaconClock.now().slotOrZero)
let em2 = now(chronos.Moment)
sblock.done()
let duration1 = if res.isOk(): em1 - sm1 else: ZeroDuration
let duration2 = if res.isOk(): em2 - sm2 else: ZeroDuration
let duration = if res.isOk(): em2 - sm1 else: ZeroDuration
let storeSpeed =
block:
let secs = float(chronos.seconds(1).nanoseconds)
if not(duration.isZero()):
let v = secs / float(duration.nanoseconds)
round(v * 10_000) / 10_000
else:
0.0
debug "Block got imported successfully",
local_head_slot = getLocalHeadSlot(), store_speed = storeSpeed,
block_root = shortLog(sblock.blk.root),
block_slot = sblock.blk.message.slot,
store_block_duration = $duration1,
update_head_duration = $duration2,
store_duration = $duration
else:
sblock.fail(res.error)
let sblock = await node.blocksQueue.popFirst()
sblock.complete(node.importBlock(sblock.blk))
proc currentSlot(node: BeaconNode): Slot =
node.beaconClock.now.slotOrZero
@ -866,9 +867,10 @@ proc run*(node: BeaconNode) =
asyncCheck node.onSlotStart(curSlot, nextSlot)
node.onSecondLoop = runOnSecondLoop(node)
node.forwardSyncLoop = runForwardSyncLoop(node)
node.blockProcessingLoop = runBlockProcessingLoop(node)
node.requestManager.start()
node.startSyncManager()
# main event loop
while status == BeaconNodeStatus.Running:

View File

@ -36,7 +36,6 @@ type
graffitiBytes*: GraffitiBytes
network*: Eth2Node
netKeys*: KeyPair
requestManager*: RequestManager
db*: BeaconChainDB
config*: BeaconNodeConf
attachedValidators*: ValidatorPool
@ -47,10 +46,12 @@ type
beaconClock*: BeaconClock
rpcServer*: RpcServer
forkDigest*: ForkDigest
blocksQueue*: AsyncQueue[SyncBlock]
requestManager*: RequestManager
syncManager*: SyncManager[Peer, PeerID]
topicBeaconBlocks*: string
topicAggregateAndProofs*: string
forwardSyncLoop*: Future[void]
blockProcessingLoop*: Future[void]
onSecondLoop*: Future[void]
genesisSnapshotContent*: string

View File

@ -2,25 +2,25 @@ import options, sequtils, strutils
import chronos, chronicles
import spec/[datatypes, digest], eth2_network, beacon_node_types, sync_protocol,
sync_manager, ssz/merkleization
export sync_manager
logScope:
topics = "requman"
const
SYNC_MAX_REQUESTED_BLOCKS* = 4 # Spec allows up to MAX_REQUEST_BLOCKS.
## Maximum number of blocks which will be requested in each `beaconBlocksByRoot` invocation.
SYNC_MAX_REQUESTED_BLOCKS* = 32 # Spec allows up to MAX_REQUEST_BLOCKS.
## Maximum number of blocks which will be requested in each
## `beaconBlocksByRoot` invocation.
PARALLEL_REQUESTS* = 2
## Number of peers we using to resolve our request.
type
RequestManager* = object
network*: Eth2Node
queue*: AsyncQueue[FetchRecord]
responseHandler*: FetchAncestorsResponseHandler
inpQueue*: AsyncQueue[FetchRecord]
outQueue*: AsyncQueue[SyncBlock]
loopFuture: Future[void]
FetchAncestorsResponseHandler = proc (b: SignedBeaconBlock) {.gcsafe.}
func shortLog*(x: seq[Eth2Digest]): string =
"[" & x.mapIt(shortLog(it)).join(", ") & "]"
@ -28,10 +28,11 @@ func shortLog*(x: seq[FetchRecord]): string =
"[" & x.mapIt(shortLog(it.root)).join(", ") & "]"
proc init*(T: type RequestManager, network: Eth2Node,
responseCb: FetchAncestorsResponseHandler): T =
T(
network: network, queue: newAsyncQueue[FetchRecord](),
responseHandler: responseCb
outputQueue: AsyncQueue[SyncBlock]): RequestManager =
RequestManager(
network: network,
inpQueue: newAsyncQueue[FetchRecord](),
outQueue: outputQueue
)
proc checkResponse(roots: openArray[Eth2Digest],
@ -48,6 +49,15 @@ proc checkResponse(roots: openArray[Eth2Digest],
checks.del(res)
return true
proc validate(rman: RequestManager,
b: SignedBeaconBlock): Future[Result[void, BlockError]] {.async.} =
let sblock = SyncBlock(
blk: b,
resfut: newFuture[Result[void, BlockError]]("request.manager.validate")
)
await rman.outQueue.addLast(sblock)
return await sblock.resfut
proc fetchAncestorBlocksFromNetwork(rman: RequestManager,
items: seq[Eth2Digest]) {.async.} =
var peer: Peer
@ -60,9 +70,19 @@ proc fetchAncestorBlocksFromNetwork(rman: RequestManager,
if blocks.isOk:
let ublocks = blocks.get()
if checkResponse(items, ublocks):
for b in ublocks:
rman.responseHandler(b)
peer.updateScore(PeerScoreGoodBlocks)
var res: Result[void, BlockError]
if len(ublocks) > 0:
for b in ublocks:
res = await rman.validate(b)
if not(res.isOk):
break
else:
res = Result[void, BlockError].ok()
if res.isOk():
peer.updateScore(PeerScoreGoodBlocks)
else:
peer.updateScore(PeerScoreBadBlocks)
else:
peer.updateScore(PeerScoreBadResponse)
else:
@ -85,12 +105,12 @@ proc requestManagerLoop(rman: RequestManager) {.async.} =
while true:
try:
rootList.setLen(0)
let req = await rman.queue.popFirst()
let req = await rman.inpQueue.popFirst()
rootList.add(req.root)
var count = min(SYNC_MAX_REQUESTED_BLOCKS - 1, len(rman.queue))
var count = min(SYNC_MAX_REQUESTED_BLOCKS - 1, len(rman.inpQueue))
while count > 0:
rootList.add(rman.queue.popFirstNoWait().root)
rootList.add(rman.inpQueue.popFirstNoWait().root)
dec(count)
let start = SyncMoment.now(Slot(0))
@ -111,7 +131,7 @@ proc requestManagerLoop(rman: RequestManager) {.async.} =
debug "Request manager tick", blocks_count = len(rootList),
succeed = succeed,
failed = (len(workers) - succeed),
queue_size = len(rman.queue),
queue_size = len(rman.inpQueue),
sync_speed = speed(start, finish)
except CatchableError as exc:
@ -119,7 +139,7 @@ proc requestManagerLoop(rman: RequestManager) {.async.} =
proc start*(rman: var RequestManager) =
## Start Request Manager's loop.
rman.loopFuture = requestManagerLoop(rman)
rman.loopFuture = rman.requestManagerLoop()
proc stop*(rman: RequestManager) =
## Stop Request Manager's loop.
@ -130,4 +150,4 @@ proc fetchAncestorBlocks*(rman: RequestManager, roots: seq[FetchRecord]) =
## Enqueue list missing blocks roots ``roots`` for download by
## Request Manager ``rman``.
for item in roots:
rman.queue.addLastNoWait(item)
rman.inpQueue.addLastNoWait(item)

View File

@ -440,7 +440,7 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
var res: Result[void, BlockError]
if len(item.data) > 0:
for blk in item.data:
debug "Pushing block", block_root = blk.root,
trace "Pushing block", block_root = blk.root,
block_slot = blk.message.slot
res = await sq.validate(blk)
if not(res.isOk):
@ -587,6 +587,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
getLocalHeadSlotCb: GetSlotCallback,
getLocalWallSlotCb: GetSlotCallback,
getFinalizedSlotCb: GetSlotCallback,
outputQueue: AsyncQueue[SyncBlock],
maxWorkers = 10,
maxStatusAge = uint64(SLOTS_PER_EPOCH * 4),
maxHeadAge = uint64(SLOTS_PER_EPOCH * 1),
@ -594,12 +595,9 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
int(SECONDS_PER_SLOT)).seconds,
chunkSize = uint64(SLOTS_PER_EPOCH),
toleranceValue = uint64(1),
maxRecurringFailures = 3,
outputQueueSize = 1,
maxRecurringFailures = 3
): SyncManager[A, B] =
var outputQueue = newAsyncQueue[SyncBlock](outputQueueSize)
let queue = SyncQueue.init(A, getFinalizedSlotCb(), getLocalWallSlotCb(),
chunkSize, getFinalizedSlotCb, outputQueue, 1)
@ -982,16 +980,17 @@ proc start*[A, B](man: SyncManager[A, B]) =
## Starts SyncManager's main loop.
man.syncFut = man.syncLoop()
proc getBlock*[A, B](man: SyncManager[A, B]): Future[SyncBlock] =
## Get the block that was received during synchronization.
man.outQueue.popFirst()
proc done*(blk: SyncBlock) =
## Send signal to SyncManager that the block ``blk`` has passed
## Send signal to [Sync/Request]Manager that the block ``blk`` has passed
## verification successfully.
blk.resfut.complete(Result[void, BlockError].ok())
proc fail*(blk: SyncBlock, error: BlockError) =
## Send signal to SyncManager that the block ``blk`` has NOT passed
## Send signal to [Sync/Request]Manager that the block ``blk`` has NOT passed
## verification with specific ``error``.
blk.resfut.complete(Result[void, BlockError].err(error))
proc complete*(blk: SyncBlock, res: Result[void, BlockError]) {.inline.} =
## Send signal to [Sync/Request]Manager about result ``res`` of block ``blk``
## verification.
blk.resfut.complete(res)

View File

@ -80,7 +80,7 @@ suiteReport "PeerPool testing suite":
doAssert(fut1.finished == false)
doAssert(fut2.finished == false)
peer0.close()
await sleepAsync(10.milliseconds)
await sleepAsync(100.milliseconds)
doAssert(fut1.finished == false)
doAssert(fut2.finished == true and fut2.failed == false)
result = true
@ -102,11 +102,11 @@ suiteReport "PeerPool testing suite":
doAssert(fut2.finished == false)
doAssert(fut3.finished == false)
peer0.close()
await sleepAsync(10.milliseconds)
await sleepAsync(100.milliseconds)
doAssert(fut2.finished == true and fut2.failed == false)
doAssert(fut3.finished == false)
peer1.close()
await sleepAsync(10.milliseconds)
await sleepAsync(100.milliseconds)
doAssert(fut3.finished == true and fut3.failed == false)
result = true
@ -128,11 +128,11 @@ suiteReport "PeerPool testing suite":
doAssert(fut2.finished == true and fut2.failed == false)
doAssert(fut3.finished == false)
peer0.close()
await sleepAsync(10.milliseconds)
await sleepAsync(100.milliseconds)
doAssert(fut1.finished == true and fut1.failed == false)
doAssert(fut3.finished == false)
peer2.close()
await sleepAsync(10.milliseconds)
await sleepAsync(100.milliseconds)
doAssert(fut3.finished == true and fut3.failed == false)
result = true
@ -160,21 +160,21 @@ suiteReport "PeerPool testing suite":
doAssert(fut4.finished == false)
doAssert(fut5.finished == false)
await sleepAsync(10.milliseconds)
await sleepAsync(100.milliseconds)
doAssert(fut3.finished == false)
doAssert(fut4.finished == false)
doAssert(fut5.finished == false)
peer0.close()
await sleepAsync(10.milliseconds)
await sleepAsync(100.milliseconds)
doAssert(fut3.finished == true and fut3.failed == false)
doAssert(fut4.finished == false)
doAssert(fut5.finished == false)
peer1.close()
await sleepAsync(10.milliseconds)
await sleepAsync(100.milliseconds)
doAssert(fut4.finished == true and fut4.failed == false)
doAssert(fut5.finished == false)
peer2.close()
await sleepAsync(10.milliseconds)
await sleepAsync(100.milliseconds)
doAssert(fut5.finished == true and fut5.failed == false)
result = true
@ -442,7 +442,7 @@ suiteReport "PeerPool testing suite":
proc testConsumer() {.async.} =
var p = await pool.acquire()
await sleepAsync(10.milliseconds)
await sleepAsync(100.milliseconds)
pool.release(p)
proc testClose(): Future[bool] {.async.} =