nimbus-eth1/nimbus/sync/beacon/beacon_impl.nim

406 lines
11 KiB
Nim

# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
std/tables,
chronicles,
chronos,
chronos/timer,
./worker_desc,
./skeleton_main,
./skeleton_utils,
./skeleton_db,
../../utils/utils,
../protocol,
../types
logScope:
topics = "beacon-impl"
{.push gcsafe, raises: [].}
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
func makeGetBodyJob(header: BlockHeader, setHead: bool): BeaconJob =
BeaconJob(
mode: bjmGetBody,
getBodyJob: BeaconJobGetBody(
headerHash: header.blockHash,
sumHash: header.sumHash,
header: header,
setHead: setHead
)
)
func makeGetBlocksJob(number, maxResults: uint64) : BeaconJob =
BeaconJob(
mode: bjmGetBlocks,
getBlocksJob: BeaconJobGetBlocks(
number: number,
maxResults: maxResults,
)
)
func makeHeaderRequest(number: uint64, maxResults: uint64): BlocksRequest =
BlocksRequest(
startBlock: HashOrNum(isHash: false, number: number.u256),
maxResults: maxResults.uint,
skip: 0,
reverse: true
)
func makeGetBodiesJob(headers: sink seq[BlockHeader],
headerHash: sink seq[Hash256],
reqBodies: sink seq[bool]): BeaconJob =
BeaconJob(
mode: bjmGetBodies,
getBodiesJob: BeaconJobGetBodies(
headers : system.move(headers),
headerHash: system.move(headerHash),
reqBodies : system.move(reqBodies)
)
)
proc requeue(buddy: BeaconBuddyRef, job: BeaconJob) =
buddy.ctx.poolMode = true
buddy.only.requeue.add job
proc jobDone(buddy: BeaconBuddyRef) =
buddy.only.job = nil
proc mapBodiesToHeader(buddy: BeaconBuddyRef,
job: BeaconJob,
bodies: openArray[BlockBody],
reqBodies: openArray[bool]) {.raises: [].} =
var
headers = system.move(job.getBlocksJob.headers)
map = initTable[Hash256, int]()
for i, x in bodies:
let bodyHash = sumHash(x)
map[bodyHash] = i
for i, req in reqBodies:
if not req:
if job.mode == bjmGetBlocks:
job.getBlocksJob.headers.add headers[i]
job.getBlocksJob.bodies.add BlockBody()
else:
job.getBodiesJob.headers.add headers[i]
job.getBodiesJob.bodies.add BlockBody()
continue
let bodyHash = sumHash(headers[i])
let z = map.getOrDefault(bodyHash, MissingBody)
if z == MissingBody:
# missing or invalid body, request again
buddy.requeue makeGetBodyJob(headers[i], setHead = false)
continue
if job.mode == bjmGetBlocks:
job.getBlocksJob.headers.add headers[i]
job.getBlocksJob.bodies.add bodies[z]
else:
job.getBodiesJob.headers.add headers[i]
job.getBodiesJob.bodies.add bodies[z]
proc putBlocks(ctx: BeaconCtxRef,
skel: SkeletonRef,
headers: openArray[BlockHeader],
bodies: openArray[BlockBody]) =
for i, body in bodies:
let r = skel.putBody(headers[i], body)
doAssert(r.isOk)
let res = skel.putBlocks(headers)
if res.isErr:
error "putBlocks->putBlocks", msg=res.error
return
let z = res.get
if FillCanonical in z.status:
let rr = skel.fillCanonicalChain()
if rr.isErr:
error "putBlocks->fillCanonicalChain", msg=rr.error
return
proc setupTally*(ctx: BeaconCtxRef) =
let
skel = ctx.pool.skeleton
last = skel.last
discard ctx.pool.mask.merge(1'u64, last.head)
for x in skel.subchains:
discard ctx.pool.mask.reduce(x.tail, x.head)
discard ctx.pool.pulled.merge(x.tail, x.head)
proc mergeTally*(ctx: BeaconCtxRef, least: uint64, last: uint64) =
discard ctx.pool.mask.merge(least, last)
proc reduceTally*(ctx: BeaconCtxRef, least: uint64, last: uint64) =
discard ctx.pool.mask.reduce(least, last)
discard ctx.pool.pulled.merge(least, last)
proc downloaded*(ctx: BeaconCtxRef, head: uint64): bool =
ctx.pool.pulled.covered(head, head) > 0'u64
proc headTally(ctx: BeaconCtxRef, head: uint64) =
discard ctx.pool.pulled.merge(head, head)
let rc = ctx.pool.mask.le()
if rc.isSome:
let maxPt = rc.get().maxPt
if head > maxPt:
# new head
discard ctx.pool.mask.merge(maxPt+1, head-1)
else:
# initialize
discard ctx.pool.mask.merge(1'u64, head)
discard ctx.pool.mask.reduce(head, head)
proc popFirst(x: var TargetQueue): BlockHeader =
# assume we already check len > 0
x.shift().get().data
proc addLast(x: var TargetQueue, h: BlockHeader) =
discard x.prepend(h.blockHash, h)
# ------------------------------------------------------------------------------
# Synchronizer will produce jobs for workers
# ------------------------------------------------------------------------------
proc resumeSync*(ctx: BeaconCtxRef): Future[bool] {.async.} =
let skel = ctx.pool.skeleton
if skel.len == 0:
return true
let last = skel.last
let res = skel.getHeader(last.head)
if res.isErr:
error "resumeSync->getHeader", msg=res.error
return false
let maybeHeader = res.get
if maybeHeader.isNone:
return true
let header = maybeHeader.get
let r = skel.initSync(header)
if r.isErr:
error "resumeSync->initSync", msg=r.error
return false
let z = r.get
if FillCanonical in z.status:
let rr = skel.fillCanonicalChain()
if rr.isErr:
error "resumeSync->fillCanonicalChain", msg=rr.error
return false
# collect gaps of skeleton, excluding genesis
ctx.setupTally()
return true
proc appendSyncTarget*(ctx: BeaconCtxRef, h: BlockHeader): Future[void] {.async.} =
while bmShiftTarget in ctx.pool.mode:
await sleepAsync timer.milliseconds(10)
let number = h.u64
ctx.pool.mode.incl bmAppendTarget
if not ctx.downloaded(number):
ctx.headTally(number)
ctx.pool.target.addLast(h)
ctx.pool.mode.excl bmAppendTarget
ctx.daemon = true
proc shiftSyncTarget*(ctx: BeaconCtxRef): Future[BlockHeader] {.async.} =
doAssert(ctx.pool.target.len > 0)
while bmAppendTarget in ctx.pool.mode:
await sleepAsync timer.milliseconds(10)
ctx.pool.mode.incl bmShiftTarget
let h = ctx.pool.target.popFirst()
ctx.pool.mode.excl bmShiftTarget
return h
proc setSyncTarget*(ctx: BeaconCtxRef): Future[void] {.async.} =
let header = await ctx.shiftSyncTarget()
let job = makeGetBodyJob(header, setHead = true)
ctx.pool.jobs.addLast(job)
proc fillBlocksGaps*(ctx: BeaconCtxRef, least: uint64, last: uint64) =
if last - least < MaxGetBlocks:
ctx.reduceTally(last-least, last)
let job = makeGetBlocksJob(last, last-least+1)
ctx.pool.jobs.addLast(job)
return
var
max = last
while true:
ctx.reduceTally(max-MaxGetBlocks, max)
let job = makeGetBlocksJob(max, MaxGetBlocks)
ctx.pool.jobs.addLast(job)
if ctx.pool.jobs.len > MaxJobsQueue:
return
max = max-MaxGetBlocks
if max <= MaxGetBlocks:
break
if max > 1:
ctx.reduceTally(1, max)
let job = makeGetBlocksJob(max, max)
ctx.pool.jobs.addLast(job)
# ------------------------------------------------------------------------------
# Worker will consume available jobs
# ------------------------------------------------------------------------------
proc executeGetBodyJob*(buddy: BeaconBuddyRef, job: BeaconJob): Future[void] {.async.} =
let
ctx = buddy.ctx
peer = buddy.peer
skel = ctx.pool.skeleton
let b = await peer.getBlockBodies([job.getBodyJob.headerHash])
if b.isNone:
debug "executeGetBodyJob->getBodies none"
# retry with other peer
buddy.requeue job
return
let bodies = b.get
if bodies.blocks.len == 0:
debug "executeGetBodyJob->getBodies isZero"
# retry with other peer
buddy.requeue job
return
job.getBodyJob.body = bodies.blocks[0]
let bodySumHash = sumHash(job.getBodyJob.body)
if bodySumHash != job.getBodyJob.sumHash:
# retry with other peer
debug "executeGetBodyJob->sumHash",
expect=job.getBodyJob.sumHash.short,
get=bodySumHash.short
buddy.requeue job
return
var status: set[SkeletonStatus]
if job.getBodyJob.setHead:
let res = skel.setHead(job.getBodyJob.header)
if res.isErr:
error "executeGetBodyJob->setHead", msg=res.error
# something wrong
return
status = res.get().status
else:
let res = skel.putBlocks([job.getBodyJob.header])
if res.isErr:
error "executeGetBodyJob->putBlocks", msg=res.error
return
status = res.get().status
let r = skel.putBody(job.getBodyJob.header, job.getBodyJob.body)
doAssert(r.isOk)
if FillCanonical in status:
let rr = skel.fillCanonicalChain()
if rr.isErr:
error "executeGetBodyJob->fillCanonicalChain", msg=rr.error
return
buddy.jobDone()
proc executeGetBlocksJob*(buddy: BeaconBuddyRef, job: BeaconJob): Future[void] {.async.} =
let
ctx = buddy.ctx
peer = buddy.peer
skel = ctx.pool.skeleton
request = makeHeaderRequest(job.getBlocksJob.number, job.getBlocksJob.maxResults)
let res = await peer.getBlockHeaders(request)
if res.isNone:
# retry with other peer
error "executeGetBlocksJob->getBlockHeaders none"
buddy.requeue job
return
job.getBlocksJob.headers = res.get().headers
let numHeaders = job.getBlocksJob.headers.len
var
headerHashes = newSeqOfCap[Hash256](numHeaders)
reqBodies = newSeqOfCap[bool](numHeaders)
numRequest = 0
for i, x in job.getBlocksJob.headers:
if not x.hasBody:
reqBodies.add false
continue
reqBodies.add true
headerHashes.add x.blockHash
inc numRequest
if numRequest == 0:
# all bodies are empty
for _ in 0..<numHeaders:
job.getBlocksJob.bodies.add BlockBody()
else:
let b = await peer.getBlockBodies(headerHashes)
if b.isNone:
debug "executeGetBlocksJob->getBodies none"
# retry with other peer
buddy.requeue makeGetBodiesJob(job.getBlocksJob.headers,
headerHashes, reqBodies)
return
buddy.mapBodiesToHeader(job, b.get().blocks, reqBodies)
ctx.putBlocks(skel, job.getBlocksJob.headers, job.getBlocksJob.bodies)
buddy.jobDone()
proc executeGetBodiesJob*(buddy: BeaconBuddyRef, job: BeaconJob): Future[void] {.async.} =
let
ctx = buddy.ctx
peer = buddy.peer
skel = ctx.pool.skeleton
let b = await peer.getBlockBodies(job.getBodiesJob.headerHash)
if b.isNone:
debug "executeGetBodiesJob->getBodies none"
# retry with other peer
buddy.requeue job
return
buddy.mapBodiesToHeader(job, b.get().blocks, job.getBodiesJob.reqBodies)
ctx.putBlocks(skel, job.getBodiesJob.headers, job.getBodiesJob.bodies)
buddy.jobDone()
proc executeJob*(buddy: BeaconBuddyRef, job: BeaconJob): Future[void] {.async.} =
try:
case job.mode
of bjmGetBody:
await executeGetBodyJob(buddy, job)
of bjmGetBlocks:
await executeGetBlocksJob(buddy, job)
of bjmGetBodies:
await executeGetBodiesJob(buddy, job)
except TransportError as ex:
error "executeJob->TransportError", msg=ex.msg
except CatchableError as ex:
error "executeJob->OtherError", msg=ex.msg
# retry with other peer
buddy.requeue job