Light forward sync mechanism (#6515)

* Initial commit.

* Add hybrid syncing.

* Compilation fixes.

* Cast custom event for our purposes.

* Instantiate AsyncEventQueue properly.

* Fix mistype.

* Further research on optimistic updates.

* Fixing circular deps.

* Add backfilling.

* Add block download feature.

* Add block store.

* Update backfill information before storing block.

* Use custom block verifier for backfilling sync.

* Skip signature verification in backfilling.

* Add one more generic reload to storeBackfillBlock().

* Add block verification debugging statements.

* Add more debugging

* Do not use database for backfilling, part 1.

* Fix for stash.

* Stash fixes part 2.

* Prepare for testing.

* Fix assertion.

* Fix post-restart syncing process.

* Update backfill loading log statement.
Use proper backfill slot callback for sync manager.

* Add handling of Duplicates.

* Fix store duration and block backfilled log statements.

* Add proper syncing state log statement.

* Add snappy compression to beaconchain_file.
Format syncing speed properly.

* Add blobs verification.

* Add `slot` number to file structure for easy navigation over stream of compressed objects.

* Change database filename.

* Fix structure size.

* Add more consistency properties.

* Fix checkRepair() issues.

* Preparation to state rebuild process.

* Add plain & compressed size.

* Debugging snappy encode process.

* Add one more debugging line.

* Dump blocks.

* One more filedump.

* Fix chunk corruption code.

* Fix detection issue.

* Some fixes in state rebuilding process.

* Add more clearance steps.

* Move updateHead() back to block_processor.

* Fix compilation issues.

* Make code more async friendly.

* Fix async issues.
Add more information when proposer verification failed.

* Fix 8192 slots issue.

* Fix Future double completion issue.

* Pass updateFlags to some of the core procedures.

* Fix tests.

* Improve initial sync handling mechanism.

* Fix checkStateTransition() performance improvements.

* Add some performance tuning and meters.

* Light client performance tuning.

* Remove debugging statement.

* Use single file descriptor for blockchain file.

* Attempt to fix LC.

* Fix timeleft calculation when untrusted sync backfilling started right after LC block received.

* Workaround for `chronicles` + `results` `error` issue.
Remove some compilation warnings.
Fix `CatchableError` leaks on Windows.

* Address review comments.

* Address review comments part 2.

* Address review comments part 1.

* Rebase and fix the issues.

* Address review comments part 3.

* Add tests and fix some issues in auto-repair mechanism.

* Add tests to all_tests.

* Rename binary test file to pass restrictions.

* Add `bin` extension to excluded list.
Recover binary test data.

* Rename fixture file to .bin again.

* Update AllTests.

* Address review comments part 4.

* Address review comments part 5 and fix tests.

* Address review comments part 6.

* Eliminate foldl and combine from blobs processing.
Add some tests to ensure that checkResponse() also checks for correct order.

* Fix forgotten place.

* Post rebase fixes.

* Add unique slots tests.

* Optimize updateHead() code.

* Add forgotten changes.

* Address review comments on state as argument.
This commit is contained in:
Eugene Kabanov 2024-10-30 07:38:53 +02:00 committed by GitHub
parent 03fe86f448
commit 18409a69e1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 2737 additions and 177 deletions

View File

@ -254,7 +254,7 @@ jobs:
if: ${{ !cancelled() }} && github.event_name == 'pull_request'
run: |
excluded_files="config.yaml"
excluded_extensions="ans|cfg|json|json\\.template|md|png|service|ssz|txt|lock|nix"
excluded_extensions="ans|bin|cfg|json|json\\.template|md|png|service|ssz|txt|lock|nix"
current_year=$(date +"%Y")
problematic_files=()

View File

@ -70,6 +70,15 @@ OK: 4/4 Fail: 0/4 Skip: 0/4
+ sanity check state diff roundtrip [Preset: mainnet] OK
```
OK: 29/29 Fail: 0/29 Skip: 0/29
## Beacon chain file test suite
```diff
+ Auto check/repair test (missing data) OK
+ Auto check/repair test (missing footer) OK
+ Auto check/repair test (missing last chunk) OK
+ Auto check/repair test (only header) OK
+ Fixture file validation OK
```
OK: 5/5 Fail: 0/5 Skip: 0/5
## Beacon state [Preset: mainnet]
```diff
+ Smoke test initialize_beacon_state_from_eth1 [Preset: mainnet] OK
@ -1128,4 +1137,4 @@ OK: 2/2 Fail: 0/2 Skip: 0/2
OK: 9/9 Fail: 0/9 Skip: 0/9
---TOTAL---
OK: 765/770 Fail: 0/770 Skip: 5/770
OK: 770/775 Fail: 0/775 Skip: 5/775

View File

@ -0,0 +1,950 @@
# beacon_chain
# Copyright (c) 2018-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import results, snappy, stew/[io2, endians2]
import ./spec/[eth2_ssz_serialization, eth2_merkleization, forks]
from ./consensus_object_pools/block_pools_types import BlockData
export results
type
ChainFileHeader* = object
header: uint32
version: uint32
kind: uint64
comprSize: uint32
plainSize: uint32
slot: uint64
ChainFileFooter* = object
kind: uint64
comprSize: uint32
plainSize: uint32
slot: uint64
Chunk = object
header: ChainFileHeader
footer: ChainFileFooter
data: seq[byte]
ChainFileData* = object
head*: Opt[BlockData]
tail*: Opt[BlockData]
ChainFileHandle* = object
data*: ChainFileData
handle*: IoHandle
ChainFileErrorType* {.pure.} = enum
IoError, # OS input/output error
IncorrectSize, # Incorrect/unexpected size of chunk
IncompleteFooter, # Incomplete footer was read
IncompleteHeader, # Incomplete header was read
IncompleteData, # Incomplete data was read
FooterError, # Incorrect chunk's footer
HeaderError, # Incorrect chunk's header
MismatchError # Header and footer not from same chunk
ChainFileCheckResult* {.pure.} = enum
FileMissing,
FileEmpty,
FileOk,
FileRepaired,
FileCorrupted
ChainFileFlag* {.pure.} = enum
Repair,
OpenAlways
ChainFileError* = object
kind*: ChainFileErrorType
message*: string
const
ChainFileHeaderSize* = 32
ChainFileFooterSize* = 24
ChainFileVersion = 1'u32
ChainFileHeaderValue = 0x424D494E'u32
ChainFileBufferSize* = 4096
MaxChunkSize = int(GOSSIP_MAX_SIZE)
ChainFileHeaderArray = ChainFileHeaderValue.toBytesLE()
IncompleteWriteError = "Unable to write data to file, disk full?"
MaxForksCount* = 16384
BlockForkCodeRange =
int(ConsensusFork.Phase0) .. int(high(ConsensusFork))
BlobForkCodeRange =
MaxForksCount .. (MaxForksCount + int(high(ConsensusFork)) - int(ConsensusFork.Deneb))
func getBlockForkCode(fork: ConsensusFork): uint64 =
uint64(fork)
func getBlobForkCode(fork: ConsensusFork): uint64 =
case fork
of ConsensusFork.Deneb:
uint64(MaxForksCount)
of ConsensusFork.Electra:
uint64(MaxForksCount) + uint64(fork) - uint64(ConsensusFork.Deneb)
of ConsensusFork.Phase0 .. ConsensusFork.Capella:
raiseAssert "Blobs are not supported for the fork"
proc init(t: typedesc[ChainFileError], k: ChainFileErrorType,
m: string): ChainFileError =
ChainFileError(kind: k, message: m)
template init(t: typedesc[ChainFileHeader],
kind: uint64, clength, plength: uint32,
number: uint64): ChainFileHeader =
ChainFileHeader(
header: ChainFileHeaderValue,
version: ChainFileVersion,
kind: kind,
comprSize: clength,
plainSize: plength,
slot: number)
template init(t: typedesc[ChainFileFooter],
kind: uint64, clength, plength: uint32,
number: uint64): ChainFileFooter =
ChainFileFooter(
kind: kind,
comprSize: clength,
plainSize: plength,
slot: number)
template unmaskKind(k: uint64): uint64 =
k and not(0x8000_0000_0000_0000'u64)
template maskKind(k: uint64): uint64 =
k or 0x8000_0000_0000_0000'u64
template isLast(k: uint64): bool =
(k and 0x8000_0000_0000_0000'u64) != 0'u64
proc checkKind(kind: uint64): Result[void, string] =
let hkind =
block:
let res = unmaskKind(kind)
if res > uint64(high(int)):
return err("Unsuppoted chunk kind value")
int(res)
if (hkind in BlockForkCodeRange) or (hkind in BlobForkCodeRange):
ok()
else:
err("Unsuppoted chunk kind value")
proc check(a: ChainFileHeader): Result[void, string] =
if a.header != ChainFileHeaderValue:
return err("Incorrect chunk header [NIMB]")
if a.version != ChainFileVersion:
return err("Unsuppoted chunk version")
if a.comprSize > uint32(MaxChunkSize):
return err("Incorrect compressed size in chunk header")
if a.plainSize > uint32(MaxChunkSize):
return err("Incorrect plain size in chunk header")
? checkKind(a.kind)
ok()
proc check(a: ChainFileFooter): Result[void, string] =
if a.comprSize > uint32(MaxChunkSize):
return err("Incorrect compressed size in chunk header")
if a.plainSize > uint32(MaxChunkSize):
return err("Incorrect plain size in chunk header")
? a.kind.checkKind()
ok()
proc check(a: ChainFileFooter, b: ChainFileHeader): Result[void, string] =
if a.kind != b.kind:
return err("Footer and header reports different chunk kind")
if a.comprSize != b.comprSize:
return err("Footer and header reports different compressed size")
if a.plainSize != b.plainSize:
return err("Footer and header reports different plain size")
if a.slot != b.slot:
return err("Footer and header reports different slots")
ok()
proc init(t: typedesc[ChainFileHeader],
data: openArray[byte]): Result[ChainFileHeader, string] =
doAssert(len(data) >= ChainFileHeaderSize)
let header =
ChainFileHeader(
header: uint32.fromBytesLE(data.toOpenArray(0, 3)),
version: uint32.fromBytesLE(data.toOpenArray(4, 7)),
kind: uint64.fromBytesLE(data.toOpenArray(8, 15)),
comprSize: uint32.fromBytesLE(data.toOpenArray(16, 19)),
plainSize: uint32.fromBytesLE(data.toOpenArray(20, 23)),
slot: uint64.fromBytesLE(data.toOpenArray(24, 31)))
? check(header)
ok(header)
proc init(t: typedesc[ChainFileFooter],
data: openArray[byte]): Result[ChainFileFooter, string] =
doAssert(len(data) >= ChainFileFooterSize)
let footer =
ChainFileFooter(
kind: uint64.fromBytesLE(data.toOpenArray(0, 7)),
comprSize: uint32.fromBytesLE(data.toOpenArray(8, 11)),
plainSize: uint32.fromBytesLE(data.toOpenArray(12, 15)),
slot: uint64.fromBytesLE(data.toOpenArray(16, 23)))
? check(footer)
ok(footer)
template `[]=`(data: var openArray[byte], slice: Slice[int],
src: array[4, byte]) =
var k = 0
for i in slice:
data[i] = src[k]
inc(k)
template `[]=`(data: var openArray[byte], slice: Slice[int],
src: array[8, byte]) =
var k = 0
for i in slice:
data[i] = src[k]
inc(k)
proc store(a: ChainFileHeader, data: var openArray[byte]) =
doAssert(len(data) >= ChainFileHeaderSize)
data[0 .. 3] = a.header.toBytesLE()
data[4 .. 7] = a.version.toBytesLE()
data[8 .. 15] = a.kind.toBytesLE()
data[16 .. 19] = a.comprSize.toBytesLE()
data[20 .. 23] = a.plainSize.toBytesLE()
data[24 .. 31] = a.slot.toBytesLE()
proc store(a: ChainFileFooter, data: var openArray[byte]) =
doAssert(len(data) >= ChainFileFooterSize)
data[0 .. 7] = a.kind.toBytesLE()
data[8 .. 11] = a.comprSize.toBytesLE()
data[12 .. 15] = a.plainSize.toBytesLE()
data[16 .. 23] = a.slot.toBytesLE()
proc init(t: typedesc[Chunk], kind, slot: uint64, plainSize: uint32,
data: openArray[byte]): seq[byte] =
doAssert((len(data) < MaxChunkSize) and (plainSize < uint32(MaxChunkSize)))
var
dst = newSeq[byte](len(data) + ChainFileHeaderSize + ChainFileFooterSize)
let
header = ChainFileHeader.init(kind, uint32(len(data)), plainSize, slot)
footer = ChainFileFooter.init(kind, uint32(len(data)), plainSize, slot)
var offset = 0
header.store(dst.toOpenArray(offset, offset + ChainFileHeaderSize - 1))
offset += ChainFileHeaderSize
if len(data) > 0:
copyMem(addr dst[offset], unsafeAddr data[0], len(data))
offset += len(data)
footer.store(dst.toOpenArray(offset, offset + ChainFileFooterSize - 1))
dst
template getBlockChunkKind(kind: ConsensusFork, last: bool): uint64 =
if last:
maskKind(getBlockForkCode(kind))
else:
getBlockForkCode(kind)
template getBlobChunkKind(kind: ConsensusFork, last: bool): uint64 =
if last:
maskKind(getBlobForkCode(kind))
else:
getBlobForkCode(kind)
proc getBlockConsensusFork(header: ChainFileHeader): ConsensusFork =
let hkind = unmaskKind(header.kind)
if int(hkind) in BlockForkCodeRange:
cast[ConsensusFork](hkind)
else:
raiseAssert("Should not happen")
template isBlock(h: ChainFileHeader | ChainFileFooter): bool =
let hkind = unmaskKind(h.kind)
int(hkind) in BlockForkCodeRange
template isBlob(h: ChainFileHeader | ChainFileFooter): bool =
let hkind = unmaskKind(h.kind)
int(hkind) in BlobForkCodeRange
template isLast(h: ChainFileHeader | ChainFileFooter): bool =
h.kind.isLast()
template head*(chandle: ChainFileHandle): Opt[BlockData] =
chandle.data.head
template tail*(chandle: ChainFileHandle): Opt[BlockData] =
chandle.data.tail
proc setHead*(chandle: var ChainFileHandle, bdata: BlockData) =
chandle.data.head = Opt.some(bdata)
proc setTail*(chandle: var ChainFileHandle, bdata: BlockData) =
chandle.data.tail = Opt.some(bdata)
proc store*(chandle: ChainFileHandle, signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars]): Result[void, string] =
let origOffset =
updateFilePos(chandle.handle, 0'i64, SeekPosition.SeekEnd).valueOr:
return err(ioErrorMsg(error))
block:
let
kind = getBlockChunkKind(signedBlock.kind, blobs.isNone())
(data, plainSize) =
withBlck(signedBlock):
let res = SSZ.encode(forkyBlck)
(snappy.encode(res), len(res))
slot = signedBlock.slot
buffer = Chunk.init(kind, uint64(slot), uint32(plainSize), data)
wrote = writeFile(chandle.handle, buffer).valueOr:
discard truncate(chandle.handle, origOffset)
discard fsync(chandle.handle)
return err(ioErrorMsg(error))
if wrote != uint(len(buffer)):
discard truncate(chandle.handle, origOffset)
discard fsync(chandle.handle)
return err(IncompleteWriteError)
if blobs.isSome():
let blobSidecars = blobs.get()
for index, blob in blobSidecars.pairs():
let
kind =
getBlobChunkKind(signedBlock.kind, (index + 1) == len(blobSidecars))
(data, plainSize) =
block:
let res = SSZ.encode(blob[])
(snappy.encode(res), len(res))
slot = blob[].signed_block_header.message.slot
buffer = Chunk.init(kind, uint64(slot), uint32(plainSize), data)
setFilePos(chandle.handle, 0'i64, SeekPosition.SeekEnd).isOkOr:
discard truncate(chandle.handle, origOffset)
discard fsync(chandle.handle)
return err(ioErrorMsg(error))
let
wrote = writeFile(chandle.handle, buffer).valueOr:
discard truncate(chandle.handle, origOffset)
discard fsync(chandle.handle)
return err(ioErrorMsg(error))
if wrote != uint(len(buffer)):
discard truncate(chandle.handle, origOffset)
discard fsync(chandle.handle)
return err(IncompleteWriteError)
fsync(chandle.handle).isOkOr:
discard truncate(chandle.handle, origOffset)
return err(ioErrorMsg(error))
ok()
proc readChunkForward(handle: IoHandle,
dataRead: bool): Result[Opt[Chunk], ChainFileError] =
# This function only reads chunk header and footer, but does not read actual
# chunk data.
var
buffer = newSeq[byte](max(ChainFileHeaderSize, ChainFileFooterSize))
data: seq[byte]
bytesRead: uint
bytesRead =
readFile(handle, buffer.toOpenArray(0, ChainFileHeaderSize - 1)).valueOr:
return err(
ChainFileError.init(ChainFileErrorType.IoError, ioErrorMsg(error)))
if bytesRead == 0'u:
# End of file.
return ok(Opt.none(Chunk))
if bytesRead != uint(ChainFileHeaderSize):
return err(
ChainFileError.init(ChainFileErrorType.IncompleteHeader,
"Unable to read chunk header data, incorrect file?"))
let
header = ChainFileHeader.init(
buffer.toOpenArray(0, ChainFileHeaderSize - 1)).valueOr:
return err(
ChainFileError.init(ChainFileErrorType.HeaderError, error))
if not(dataRead):
setFilePos(handle, int64(header.comprSize),
SeekPosition.SeekCurrent).isOkOr:
return err(
ChainFileError.init(ChainFileErrorType.IoError, ioErrorMsg(error)))
else:
# Safe conversion to `int`, because header.comprSize < MaxChunkSize
data.setLen(int(header.comprSize))
bytesRead =
readFile(handle, data.toOpenArray(0, len(data) - 1)).valueOr:
return err(
ChainFileError.init(ChainFileErrorType.IoError, ioErrorMsg(error)))
if bytesRead != uint(header.comprSize):
return err(
ChainFileError.init(ChainFileErrorType.IncompleteData,
"Unable to read chunk data, incorrect file?"))
bytesRead =
readFile(handle, buffer.toOpenArray(0, ChainFileFooterSize - 1)).valueOr:
return err(
ChainFileError.init(ChainFileErrorType.IoError, ioErrorMsg(error)))
if bytesRead != uint(ChainFileFooterSize):
return err(
ChainFileError.init(ChainFileErrorType.IncompleteFooter,
"Unable to read chunk footer data, incorrect file?"))
let
footer = ChainFileFooter.init(
buffer.toOpenArray(0, ChainFileFooterSize - 1)).valueOr:
return err(
ChainFileError.init(ChainFileErrorType.FooterError, error))
check(footer, header).isOkOr:
return err(
ChainFileError.init(ChainFileErrorType.MismatchError, error))
if not(dataRead):
ok(Opt.some(Chunk(header: header, footer: footer)))
else:
ok(Opt.some(Chunk(header: header, footer: footer, data: data)))
proc readChunkBackward(handle: IoHandle,
dataRead: bool): Result[Opt[Chunk], ChainFileError] =
# This function only reads chunk header and footer, but does not read actual
# chunk data.
var
buffer = newSeq[byte](max(ChainFileHeaderSize, ChainFileFooterSize))
data: seq[byte]
bytesRead: uint
let offset = getFilePos(handle).valueOr:
return err(
ChainFileError.init(ChainFileErrorType.IoError, ioErrorMsg(error)))
if offset == 0:
return ok(Opt.none(Chunk))
if offset <= (ChainFileHeaderSize + ChainFileFooterSize):
return err(
ChainFileError.init(ChainFileErrorType.IncorrectSize,
"File position is incorrect"))
setFilePos(handle, -ChainFileFooterSize, SeekPosition.SeekCurrent).isOkOr:
return err(
ChainFileError.init(ChainFileErrorType.IoError, ioErrorMsg(error)))
bytesRead =
readFile(handle, buffer.toOpenArray(0, ChainFileFooterSize - 1)).valueOr:
return err(
ChainFileError.init(ChainFileErrorType.IoError, ioErrorMsg(error)))
if bytesRead != ChainFileFooterSize:
return err(
ChainFileError.init(ChainFileErrorType.IncompleteFooter,
"Unable to read chunk footer data, incorrect file?"))
let
footer = ChainFileFooter.init(
buffer.toOpenArray(0, ChainFileFooterSize - 1)).valueOr:
return err(
ChainFileError.init(ChainFileErrorType.FooterError, error))
block:
let position =
-(ChainFileHeaderSize + ChainFileFooterSize + int64(footer.comprSize))
setFilePos(handle, position, SeekPosition.SeekCurrent).isOkOr:
return err(
ChainFileError.init(ChainFileErrorType.IoError, ioErrorMsg(error)))
bytesRead =
readFile(handle, buffer.toOpenArray(0, ChainFileHeaderSize - 1)).valueOr:
return err(
ChainFileError.init(ChainFileErrorType.IoError, ioErrorMsg(error)))
if bytesRead != ChainFileHeaderSize:
return err(
ChainFileError.init(ChainFileErrorType.IncompleteHeader,
"Unable to read chunk header data, incorrect file?"))
let
header = ChainFileHeader.init(
buffer.toOpenArray(0, ChainFileHeaderSize - 1)).valueOr:
return err(
ChainFileError.init(ChainFileErrorType.HeaderError, error))
check(footer, header).isOkOr:
return err(
ChainFileError.init(ChainFileErrorType.MismatchError, error))
if not(dataRead):
let position = int64(-ChainFileHeaderSize)
setFilePos(handle, position, SeekPosition.SeekCurrent).isOkOr:
return err(
ChainFileError.init(ChainFileErrorType.IoError, ioErrorMsg(error)))
else:
# Safe conversion to `int`, because header.comprSize < MaxChunkSize
data.setLen(int(header.comprSize))
bytesRead =
readFile(handle, data.toOpenArray(0, len(data) - 1)).valueOr:
return err(
ChainFileError.init(ChainFileErrorType.IoError, ioErrorMsg(error)))
if bytesRead != uint(header.comprSize):
return err(
ChainFileError.init(ChainFileErrorType.IncompleteData,
"Unable to read chunk data, incorrect file?"))
let position = -(ChainFileHeaderSize + int64(header.comprSize))
setFilePos(handle, position, SeekPosition.SeekCurrent).isOkOr:
return err(
ChainFileError.init(ChainFileErrorType.IoError, ioErrorMsg(error)))
if not(dataRead):
ok(Opt.some(Chunk(header: header, footer: footer)))
else:
ok(Opt.some(Chunk(header: header, footer: footer, data: data)))
proc decodeBlock(
header: ChainFileHeader,
data: openArray[byte]
): Result[ForkedSignedBeaconBlock, string] =
if header.plainSize > uint32(MaxChunkSize):
return err("Size of block is enormously big")
let
fork = header.getBlockConsensusFork()
decompressed = snappy.decode(data, uint32(header.plainSize))
blck =
try:
withConsensusFork(fork):
ForkedSignedBeaconBlock.init(
SSZ.decode(decompressed, consensusFork.SignedBeaconBlock))
except SerializationError:
return err("Incorrect block format")
ok(blck)
proc decodeBlob(
header: ChainFileHeader,
data: openArray[byte]
): Result[BlobSidecar, string] =
if header.plainSize > uint32(MaxChunkSize):
return err("Size of blob is enormously big")
let
decompressed = snappy.decode(data, uint32(header.plainSize))
blob =
try:
SSZ.decode(decompressed, BlobSidecar)
except SerializationError:
return err("Incorrect blob format")
ok(blob)
proc getChainFileTail*(handle: IoHandle): Result[Opt[BlockData], string] =
var sidecars: BlobSidecars
while true:
let chunk =
block:
let res = readChunkBackward(handle, true).valueOr:
return err(error.message)
if res.isNone():
if len(sidecars) == 0:
return ok(Opt.none(BlockData))
else:
return err("Blobs without block encountered, incorrect file?")
res.get()
if chunk.header.isBlob():
let blob = ? decodeBlob(chunk.header, chunk.data)
sidecars.add(newClone blob)
else:
let blck = ? decodeBlock(chunk.header, chunk.data)
return
if len(sidecars) == 0:
ok(Opt.some(BlockData(blck: blck)))
else:
ok(Opt.some(BlockData(blck: blck, blob: Opt.some(sidecars))))
proc getChainFileHead*(handle: IoHandle): Result[Opt[BlockData], string] =
var
offset: int64 = 0
endOfFile = false
let
blck =
block:
let chunk =
block:
let res = readChunkForward(handle, true).valueOr:
return err(error.message)
if res.isNone():
return ok(Opt.none(BlockData))
res.get()
if not(chunk.header.isBlock()):
return err("Unexpected blob chunk encountered")
? decodeBlock(chunk.header, chunk.data)
blob =
block:
var sidecars: BlobSidecars
block mainLoop:
while true:
offset = getFilePos(handle).valueOr:
return err(ioErrorMsg(error))
let chunk =
block:
let res = readChunkForward(handle, true).valueOr:
return err(error.message)
if res.isNone():
endOfFile = true
break mainLoop
res.get()
if chunk.header.isBlob():
let blob = ? decodeBlob(chunk.header, chunk.data)
sidecars.add(newClone blob)
else:
break mainLoop
if len(sidecars) > 0:
Opt.some(sidecars)
else:
Opt.none(BlobSidecars)
if not(endOfFile):
setFilePos(handle, offset, SeekPosition.SeekBegin).isOkOr:
return err(ioErrorMsg(error))
ok(Opt.some(BlockData(blck: blck, blob: blob)))
proc seekForSlotBackward*(handle: IoHandle,
slot: Slot): Result[Opt[int64], string] =
## Search from the beginning of the file for the first chunk of data
## identified by slot ``slot``.
## This procedure updates current file position to the beginning of the found
## chunk and returns this position as the result.
block:
let res = setFilePos(handle, 0, SeekPosition.SeekEnd)
if res.isErr():
return err(ioErrorMsg(res.error))
while true:
let chunk =
block:
let res = readChunkBackward(handle, false).valueOr:
return err(error.message)
if res.isNone():
return ok(Opt.none(int64))
res.get()
if chunk.header.slot == slot:
block:
let
position =
ChainFileHeaderSize + ChainFileFooterSize +
int64(chunk.header.comprSize)
res = setFilePos(handle, position, SeekPosition.SeekCurrent)
if res.isErr():
return err(ioErrorMsg(res.error))
block:
let res = getFilePos(handle)
if res.isErr():
return err(ioErrorMsg(res.error))
return ok(Opt.some(res.get()))
proc seekForSlotForward*(handle: IoHandle,
slot: Slot): Result[Opt[int64], string] =
## Search from the end of the file for the last chunk of data identified by
## slot ``slot``.
## This procedure updates current file position to the beginning of the found
## chunk and returns this position as the result.
block:
let res = setFilePos(handle, 0, SeekPosition.SeekBegin)
if res.isErr():
return err(ioErrorMsg(res.error))
while true:
let chunk =
block:
let res = readChunkForward(handle, false).valueOr:
return err(error.message)
if res.isNone():
return ok(Opt.none(int64))
res.get()
if chunk.header.slot == slot:
block:
let
position =
-(ChainFileHeaderSize + ChainFileFooterSize +
int64(chunk.header.comprSize))
res = setFilePos(handle, position, SeekPosition.SeekCurrent)
if res.isErr():
return err(ioErrorMsg(res.error))
block:
let res = getFilePos(handle)
if res.isErr():
return err(ioErrorMsg(res.error))
return ok(Opt.some(res.get()))
proc search(data: openArray[byte], srch: openArray[byte],
state: var int): Opt[int] =
doAssert(len(srch) > 0)
for index in countdown(len(data) - 1, 0):
if data[index] == srch[len(srch) - 1 - state]:
inc(state)
if state == len(srch):
return Opt.some(index)
else:
state = 0
Opt.none(int)
proc seekForChunkBackward(
handle: IoHandle,
bufferSize = ChainFileBufferSize
): Result[Opt[int64], string] =
var
state = 0
data = newSeq[byte](bufferSize)
bytesRead: uint = 0
while true:
let
position = getFilePos(handle).valueOr:
return err(ioErrorMsg(error))
offset = max(0'i64, position - int64(bufferSize))
setFilePos(handle, offset, SeekPosition.SeekBegin).isOkOr:
return err(ioErrorMsg(error))
bytesRead = readFile(handle, data).valueOr:
return err(ioErrorMsg(error))
let indexOpt = search(data.toOpenArray(0, int(bytesRead) - 1),
ChainFileHeaderArray, state)
if indexOpt.isNone():
setFilePos(handle, offset, SeekPosition.SeekBegin).isOkOr:
return err(ioErrorMsg(error))
continue
state = 0
let
chunkOffset = -(int64(bytesRead) - int64(indexOpt.get()))
chunkPos =
updateFilePos(handle, chunkOffset, SeekPosition.SeekCurrent).valueOr:
return err(ioErrorMsg(error))
chunk = readChunkForward(handle, false).valueOr:
# Incorrect chunk detected, so we start our searching again
setFilePos(handle, offset, SeekPosition.SeekBegin).isOkOr:
return err(ioErrorMsg(error))
if offset == 0'i64:
return ok(Opt.none(int64))
continue
if chunk.isNone():
return err("File has been changed, while repairing")
if chunk.get().header.isLast():
let finishOffset = getFilePos(handle).valueOr:
return err(ioErrorMsg(error))
return ok(Opt.some(finishOffset))
else:
if chunkPos == 0'i64:
return ok(Opt.none(int64))
setFilePos(handle, chunkPos, SeekPosition.SeekBegin).isOkOr:
return err(ioErrorMsg(error))
ok(Opt.none(int64))
proc checkRepair*(filename: string,
repair: bool): Result[ChainFileCheckResult, string] =
if not(isFile(filename)):
return ok(ChainFileCheckResult.FileMissing)
let
handle = openFile(filename, {OpenFlags.Read, OpenFlags.Write}).valueOr:
return err(ioErrorMsg(error))
filesize = getFileSize(handle).valueOr:
discard closeFile(handle)
return err(ioErrorMsg(error))
if filesize == 0'i64:
closeFile(handle).isOkOr:
return err(ioErrorMsg(error))
return ok(ChainFileCheckResult.FileEmpty)
setFilePos(handle, 0'i64, SeekPosition.SeekEnd).isOkOr:
discard closeFile(handle)
return err(ioErrorMsg(error))
let res = readChunkBackward(handle, false)
if res.isOk():
let chunk = res.get()
if chunk.isNone():
discard closeFile(handle)
return err("File was changed while reading")
if chunk.get().header.isLast():
# Last chunk being marked as last, everything is fine.
closeFile(handle).isOkOr:
return err(ioErrorMsg(error))
return ok(ChainFileCheckResult.FileOk)
# Last chunk was not marked properly, searching for the proper last chunk.
while true:
let nres = readChunkBackward(handle, false)
if nres.isErr():
discard closeFile(handle)
return err(nres.error.message)
let cres = nres.get()
if cres.isNone():
# We reached start of file.
return
if repair:
truncate(handle, 0).isOkOr:
discard closeFile(handle)
return err(ioErrorMsg(error))
closeFile(handle).isOkOr:
return err(ioErrorMsg(error))
ok(ChainFileCheckResult.FileRepaired)
else:
closeFile(handle).isOkOr:
return err(ioErrorMsg(error))
ok(ChainFileCheckResult.FileCorrupted)
if cres.get().header.isLast():
return
if repair:
let
position = getFilePos(handle).valueOr:
discard closeFile(handle)
return err(ioErrorMsg(error))
offset = position + int64(cres.get().header.comprSize) +
ChainFileHeaderSize + ChainFileFooterSize
truncate(handle, offset).isOkOr:
discard closeFile(handle)
return err(ioErrorMsg(error))
closeFile(handle).isOkOr:
return err(ioErrorMsg(error))
ok(ChainFileCheckResult.FileRepaired)
else:
closeFile(handle).isOkOr:
return err(ioErrorMsg(error))
ok(ChainFileCheckResult.FileCorrupted)
ok(ChainFileCheckResult.FileCorrupted)
else:
setFilePos(handle, 0'i64, SeekPosition.SeekEnd).isOkOr:
discard closeFile(handle)
return err(ioErrorMsg(error))
let position = seekForChunkBackward(handle).valueOr:
discard closeFile(handle)
return err(error)
if repair:
let newsize =
if position.isNone():
0'i64
else:
position.get()
truncate(handle, newsize).isOkOr:
discard closeFile(handle)
return err(ioErrorMsg(error))
closeFile(handle).isOkOr:
return err(ioErrorMsg(error))
ok(ChainFileCheckResult.FileRepaired)
else:
closeFile(handle).isOkOr:
return err(ioErrorMsg(error))
ok(ChainFileCheckResult.FileCorrupted)
proc init*(t: typedesc[ChainFileHandle], filename: string,
flags: set[ChainFileFlag]): Result[ChainFileHandle, string] =
let
handle =
if not(isFile(filename)):
if ChainFileFlag.OpenAlways in flags:
let flags = {OpenFlags.Read, OpenFlags.Write, OpenFlags.Create}
openFile(filename, flags).valueOr:
return err(ioErrorMsg(error))
else:
return err("File not found")
else:
# If file exists we perform automatic check/repair procedure.
let res =
checkRepair(filename, ChainFileFlag.Repair in flags).valueOr:
return err(error)
if res notin {ChainFileCheckResult.FileMissing, FileEmpty,
FileOk, FileRepaired}:
return err("Chain file data is corrupted")
let flags = {OpenFlags.Read, OpenFlags.Write}
openFile(filename, flags).valueOr:
return err(ioErrorMsg(error))
head = getChainFileHead(handle).valueOr:
discard closeFile(handle)
return err(error)
setFilePos(handle, 0'i64, SeekPosition.SeekEnd).isOkOr:
discard closeFile(handle)
return err(ioErrorMsg(error))
let tail = getChainFileTail(handle).valueOr:
discard closeFile(handle)
return err(error)
ok(ChainFileHandle(handle: handle,
data: ChainFileData(head: head, tail: tail)))
proc close*(ch: ChainFileHandle): Result[void, string] =
closeFile(ch.handle).isOkOr:
return err(ioErrorMsg(error))
ok()
proc seekForSlot*(ch: ChainFileHandle,
slot: Slot): Result[Opt[int64], string] =
if ch.head.isNone() or ch.tail.isNone():
return err("Attempt to seek for slot in empty file")
let
headRange =
block:
let headSlot = ch.head.get().blck.slot()
if headSlot >= slot:
headSlot - slot
else:
slot - headSlot
tailRange =
block:
let tailSlot = ch.tail.get().blck.slot()
if tailSlot >= slot:
tailSlot - slot
else:
slot - tailSlot
offset =
if headRange <= tailRange:
? seekForSlotForward(ch.handle, slot)
else:
? seekForSlotBackward(ch.handle, slot)
ok(offset)
proc clearFile*(filename: string): Result[void, string] =
removeFile(filename).isOkOr:
return err(ioErrorMsg(error))
ok()

View File

@ -22,10 +22,12 @@ import
./el/el_manager,
./consensus_object_pools/[
blockchain_dag, blob_quarantine, block_quarantine, consensus_manager,
data_column_quarantine, attestation_pool, sync_committee_msg_pool, validator_change_pool],
data_column_quarantine, attestation_pool, sync_committee_msg_pool, validator_change_pool,
blockchain_list],
./spec/datatypes/[base, altair],
./spec/eth2_apis/dynamic_fee_recipients,
./sync/[sync_manager, request_manager],
./spec/signatures_batch,
./sync/[sync_manager, request_manager, sync_types],
./validators/[
action_tracker, message_router, validator_monitor, validator_pool,
keystore_management],
@ -38,7 +40,7 @@ export
eth2_network, el_manager, request_manager, sync_manager,
eth2_processor, optimistic_processor, blockchain_dag, block_quarantine,
base, message_router, validator_monitor, validator_pool,
consensus_manager, dynamic_fee_recipients
consensus_manager, dynamic_fee_recipients, sync_types
type
EventBus* = object
@ -57,6 +59,7 @@ type
RestVersioned[ForkedLightClientFinalityUpdate]]
optUpdateQueue*: AsyncEventQueue[
RestVersioned[ForkedLightClientOptimisticUpdate]]
optFinHeaderUpdateQueue*: AsyncEventQueue[ForkedLightClientHeader]
BeaconNode* = ref object
nickname*: string
@ -71,6 +74,7 @@ type
.Raising([CancelledError])
lightClient*: LightClient
dag*: ChainDAGRef
list*: ChainListRef
quarantine*: ref Quarantine
blobQuarantine*: ref BlobQuarantine
dataColumnQuarantine*: ref DataColumnQuarantine
@ -88,8 +92,11 @@ type
requestManager*: RequestManager
syncManager*: SyncManager[Peer, PeerId]
backfiller*: SyncManager[Peer, PeerId]
untrustedManager*: SyncManager[Peer, PeerId]
syncOverseer*: SyncOverseerRef
genesisSnapshotContent*: string
processor*: ref Eth2Processor
batchVerifier*: ref BatchVerifier
blockProcessor*: ref BlockProcessor
consensusManager*: ref ConsensusManager
attachedValidatorBalanceTotal*: Gwei

View File

@ -53,10 +53,10 @@ proc initLightClient*(
getBeaconTime, optimisticHandler)
shouldInhibitSync = func(): bool =
if node.syncManager != nil:
not node.syncManager.inProgress # No LC sync needed if DAG is in sync
else:
if isNil(node.syncOverseer):
false
else:
not node.syncOverseer.syncInProgress # No LC sync needed if DAG is in sync
lightClient = createLightClient(
node.network, rng, config, cfg, forkDigests, getBeaconTime,
genesis_validators_root, LightClientFinalizationMode.Strict,
@ -107,7 +107,16 @@ proc initLightClient*(
# The execution block hash is only available from Capella onward
info "Ignoring new LC optimistic header until Capella"
proc onFinalizedHeader(
lightClient: LightClient,
finalizedHeader: ForkedLightClientHeader) =
if not node.consensusManager[].shouldSyncOptimistically(node.currentSlot):
return
node.eventBus.optFinHeaderUpdateQueue.emit(finalizedHeader)
lightClient.onOptimisticHeader = onOptimisticHeader
lightClient.onFinalizedHeader = onFinalizedHeader
lightClient.trustedBlockRoot = config.trustedBlockRoot
elif config.trustedBlockRoot.isSome:

View File

@ -8,16 +8,15 @@
{.push raises: [].}
import
std/sequtils,
chronicles,
results,
stew/assign2,
../spec/[
beaconstate, forks, signatures, signatures_batch,
state_transition, state_transition_epoch],
"."/[block_dag, blockchain_dag, blockchain_dag_light_client]
from ../spec/datatypes/capella import asSigVerified, asTrusted, shortLog
from ../spec/datatypes/deneb import asSigVerified, asTrusted, shortLog
"."/[block_pools_types, block_dag, blockchain_dag,
blockchain_dag_light_client]
export results, signatures_batch, block_dag, blockchain_dag
@ -114,15 +113,18 @@ proc addResolvedHeadBlock(
blockRef
proc checkStateTransition(
dag: ChainDAGRef, signedBlock: ForkySigVerifiedSignedBeaconBlock,
cache: var StateCache): Result[void, VerifierError] =
dag: ChainDAGRef,
signedBlock: ForkySigVerifiedSignedBeaconBlock,
cache: var StateCache,
updateFlags: UpdateFlags,
): Result[void, VerifierError] =
## Ensure block can be applied on a state
func restore(v: var ForkedHashedBeaconState) =
assign(dag.clearanceState, dag.headState)
let res = state_transition_block(
dag.cfg, dag.clearanceState, signedBlock,
cache, dag.updateFlags, restore)
cache, updateFlags, restore)
if res.isErr():
info "Invalid block",
@ -150,7 +152,8 @@ proc advanceClearanceState*(dag: ChainDAGRef) =
var
cache = StateCache()
info = ForkedEpochInfo()
dag.advanceSlots(dag.clearanceState, next, true, cache, info)
dag.advanceSlots(dag.clearanceState, next, true, cache, info,
dag.updateFlags)
debug "Prepared clearance state for next block",
next, updateStateDur = Moment.now() - startTick
@ -267,7 +270,7 @@ proc addHeadBlockWithParent*(
# onto which we can apply the new block
let clearanceBlock = BlockSlotId.init(parent.bid, signedBlock.message.slot)
if not updateState(
dag, dag.clearanceState, clearanceBlock, true, cache):
dag, dag.clearanceState, clearanceBlock, true, cache, dag.updateFlags):
# We should never end up here - the parent must be a block no older than and
# rooted in the finalized checkpoint, hence we should always be able to
# load its corresponding state
@ -297,7 +300,8 @@ proc addHeadBlockWithParent*(
let sigVerifyTick = Moment.now()
? checkStateTransition(dag, signedBlock.asSigVerified(), cache)
? checkStateTransition(dag, signedBlock.asSigVerified(), cache,
dag.updateFlags)
let stateVerifyTick = Moment.now()
# Careful, clearanceState.data has been updated but not blck - we need to
@ -449,3 +453,112 @@ proc addBackfillBlock*(
putBlockDur = putBlockTick - sigVerifyTick
ok()
template BlockAdded(kind: static ConsensusFork): untyped =
when kind == ConsensusFork.Electra:
OnElectraBlockAdded
elif kind == ConsensusFork.Deneb:
OnDenebBlockAdded
elif kind == ConsensusFork.Capella:
OnCapellaBlockAdded
elif kind == ConsensusFork.Bellatrix:
OnBellatrixBlockAdded
elif kind == ConsensusFork.Altair:
OnAltairBlockAdded
elif kind == ConsensusFork.Phase0:
OnPhase0BlockAdded
else:
static: raiseAssert "Unreachable"
proc verifyBlockProposer*(
verifier: var BatchVerifier,
fork: Fork,
genesis_validators_root: Eth2Digest,
immutableValidators: openArray[ImmutableValidatorData2],
blocks: openArray[ForkedSignedBeaconBlock]
): Result[void, string] =
var sigs: seq[SignatureSet]
? sigs.collectProposerSignatureSet(
blocks, immutableValidators, fork, genesis_validators_root)
if not verifier.batchVerify(sigs):
err("Block batch signature verification failed")
else:
ok()
proc addBackfillBlockData*(
dag: ChainDAGRef,
bdata: BlockData,
onStateUpdated: OnStateUpdated,
onBlockAdded: OnForkedBlockAdded
): Result[void, VerifierError] =
var cache = StateCache()
withBlck(bdata.blck):
let
parent = checkHeadBlock(dag, forkyBlck).valueOr:
if error == VerifierError.Duplicate:
return ok()
return err(error)
startTick = Moment.now()
parentBlock = dag.getForkedBlock(parent.bid.root).get()
trustedStateRoot =
withBlck(parentBlock):
forkyBlck.message.state_root
clearanceBlock = BlockSlotId.init(parent.bid, forkyBlck.message.slot)
updateFlags1 = dag.updateFlags + {skipLastStateRootCalculation}
if not updateState(dag, dag.clearanceState, clearanceBlock, true, cache,
updateFlags1):
error "Unable to load clearance state for parent block, " &
"database corrupt?", clearanceBlock = shortLog(clearanceBlock)
return err(VerifierError.MissingParent)
dag.clearanceState.setStateRoot(trustedStateRoot)
let proposerVerifyTick = Moment.now()
if not(isNil(onStateUpdated)):
? onStateUpdated(forkyBlck.message.slot)
let
stateDataTick = Moment.now()
updateFlags2 =
dag.updateFlags + {skipBlsValidation, skipStateRootValidation}
? checkStateTransition(dag, forkyBlck.asSigVerified(), cache, updateFlags2)
let stateVerifyTick = Moment.now()
if bdata.blob.isSome():
for blob in bdata.blob.get():
dag.db.putBlobSidecar(blob[])
type Trusted = typeof forkyBlck.asTrusted()
proc onBlockAddedHandler(
blckRef: BlockRef,
trustedBlock: Trusted,
epochRef: EpochRef,
unrealized: FinalityCheckpoints
) {.gcsafe, raises: [].} =
onBlockAdded(
blckRef,
ForkedTrustedSignedBeaconBlock.init(trustedBlock),
epochRef,
unrealized)
let blockHandler: BlockAdded(consensusFork) = onBlockAddedHandler
discard addResolvedHeadBlock(
dag, dag.clearanceState,
forkyBlck.asTrusted(),
true,
parent, cache,
blockHandler,
proposerVerifyTick - startTick,
stateDataTick - proposerVerifyTick,
stateVerifyTick - stateDataTick)
ok()

View File

@ -285,7 +285,11 @@ type
# balances, as used in fork choice
effective_balances_bytes*: seq[byte]
OnBlockAdded[T: ForkyTrustedSignedBeaconBlock] = proc(
BlockData* = object
blck*: ForkedSignedBeaconBlock
blob*: Opt[BlobSidecars]
OnBlockAdded*[T: ForkyTrustedSignedBeaconBlock] = proc(
blckRef: BlockRef, blck: T, epochRef: EpochRef,
unrealized: FinalityCheckpoints) {.gcsafe, raises: [].}
OnPhase0BlockAdded* = OnBlockAdded[phase0.TrustedSignedBeaconBlock]
@ -299,6 +303,13 @@ type
OnPhase0BlockAdded | OnAltairBlockAdded | OnBellatrixBlockAdded |
OnCapellaBlockAdded | OnDenebBlockAdded | OnElectraBlockAdded
OnForkedBlockAdded* = proc(
blckRef: BlockRef, blck: ForkedTrustedSignedBeaconBlock, epochRef: EpochRef,
unrealized: FinalityCheckpoints) {.gcsafe, raises: [].}
OnStateUpdated* = proc(
slot: Slot): Result[void, VerifierError] {.gcsafe, raises: [].}
HeadChangeInfoObject* = object
slot*: Slot
block_root* {.serializedFieldName: "block".}: Eth2Digest

View File

@ -66,7 +66,8 @@ proc putBlock*(
proc updateState*(
dag: ChainDAGRef, state: var ForkedHashedBeaconState, bsi: BlockSlotId,
save: bool, cache: var StateCache): bool {.gcsafe.}
save: bool, cache: var StateCache,
updateFlags: UpdateFlags): bool {.gcsafe.}
template withUpdatedState*(
dag: ChainDAGRef, stateParam: var ForkedHashedBeaconState,
@ -77,7 +78,7 @@ template withUpdatedState*(
block:
let bsi {.inject.} = bsiParam
var cache {.inject.} = StateCache()
if updateState(dag, stateParam, bsi, false, cache):
if updateState(dag, stateParam, bsi, false, cache, dag.updateFlags):
template bid(): BlockId {.inject, used.} = bsi.bid
template updatedState(): ForkedHashedBeaconState {.inject, used.} = stateParam
okBody
@ -931,7 +932,8 @@ proc putState(dag: ChainDAGRef, state: ForkedHashedBeaconState, bid: BlockId) =
proc advanceSlots*(
dag: ChainDAGRef, state: var ForkedHashedBeaconState, slot: Slot, save: bool,
cache: var StateCache, info: var ForkedEpochInfo) =
cache: var StateCache, info: var ForkedEpochInfo,
updateFlags: UpdateFlags) =
# Given a state, advance it zero or more slots by applying empty slot
# processing - the state must be positioned at or before `slot`
doAssert getStateField(state, slot) <= slot
@ -945,7 +947,7 @@ proc advanceSlots*(
process_slots(
dag.cfg, state, getStateField(state, slot) + 1, cache, info,
dag.updateFlags).expect("process_slots shouldn't fail when state slot is correct")
updateFlags).expect("process_slots shouldn't fail when state slot is correct")
if save:
dag.putState(state, stateBid)
@ -967,7 +969,8 @@ proc advanceSlots*(
proc applyBlock(
dag: ChainDAGRef, state: var ForkedHashedBeaconState, bid: BlockId,
cache: var StateCache, info: var ForkedEpochInfo): Result[void, cstring] =
cache: var StateCache, info: var ForkedEpochInfo,
updateFlags: UpdateFlags): Result[void, cstring] =
loadStateCache(dag, cache, bid, getStateField(state, slot).epoch)
discard case dag.cfg.consensusForkAtEpoch(bid.slot.epoch)
@ -976,37 +979,37 @@ proc applyBlock(
return err("Block load failed")
? state_transition(
dag.cfg, state, data, cache, info,
dag.updateFlags + {slotProcessed}, noRollback)
updateFlags + {slotProcessed}, noRollback)
of ConsensusFork.Altair:
let data = getBlock(dag, bid, altair.TrustedSignedBeaconBlock).valueOr:
return err("Block load failed")
? state_transition(
dag.cfg, state, data, cache, info,
dag.updateFlags + {slotProcessed}, noRollback)
updateFlags + {slotProcessed}, noRollback)
of ConsensusFork.Bellatrix:
let data = getBlock(dag, bid, bellatrix.TrustedSignedBeaconBlock).valueOr:
return err("Block load failed")
? state_transition(
dag.cfg, state, data, cache, info,
dag.updateFlags + {slotProcessed}, noRollback)
updateFlags + {slotProcessed}, noRollback)
of ConsensusFork.Capella:
let data = getBlock(dag, bid, capella.TrustedSignedBeaconBlock).valueOr:
return err("Block load failed")
? state_transition(
dag.cfg, state, data, cache, info,
dag.updateFlags + {slotProcessed}, noRollback)
updateFlags + {slotProcessed}, noRollback)
of ConsensusFork.Deneb:
let data = getBlock(dag, bid, deneb.TrustedSignedBeaconBlock).valueOr:
return err("Block load failed")
? state_transition(
dag.cfg, state, data, cache, info,
dag.updateFlags + {slotProcessed}, noRollback)
updateFlags + {slotProcessed}, noRollback)
of ConsensusFork.Electra:
let data = getBlock(dag, bid, electra.TrustedSignedBeaconBlock).valueOr:
return err("Block load failed")
? state_transition(
dag.cfg, state, data, cache, info,
dag.updateFlags + {slotProcessed}, noRollback)
updateFlags + {slotProcessed}, noRollback)
ok()
@ -1140,7 +1143,7 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
while headBlocks.len > 0:
dag.applyBlock(
dag.headState, headBlocks.pop().bid, cache,
info).expect("head blocks should apply")
info, dag.updateFlags).expect("head blocks should apply")
dag.head = headRef
dag.heads = @[headRef]
@ -1399,7 +1402,8 @@ proc getEpochRef*(
return err("Requesting EpochRef for non-canonical block")
var cache: StateCache
if not updateState(dag, dag.epochRefState, ancestor, false, cache):
if not updateState(dag, dag.epochRefState, ancestor, false, cache,
dag.updateFlags):
return err("Could not load requested state")
ok(dag.getEpochRef(dag.epochRefState, cache))
@ -1685,7 +1689,7 @@ proc getBlockRange*(
proc updateState*(
dag: ChainDAGRef, state: var ForkedHashedBeaconState, bsi: BlockSlotId,
save: bool, cache: var StateCache): bool =
save: bool, cache: var StateCache, updateFlags: UpdateFlags): bool =
## Rewind or advance state such that it matches the given block and slot -
## this may include replaying from an earlier snapshot if blck is on a
## different branch or has advanced to a higher slot number than slot
@ -1839,7 +1843,8 @@ proc updateState*(
# again. Also, because we're applying blocks that were loaded from the
# database, we can skip certain checks that have already been performed
# before adding the block to the database.
if (let res = dag.applyBlock(state, ancestors[i], cache, info); res.isErr):
if (let res = dag.applyBlock(state, ancestors[i], cache, info,
updateFlags); res.isErr):
warn "Failed to apply block from database",
blck = shortLog(ancestors[i]),
state_bid = shortLog(state.latest_block_id),
@ -1848,7 +1853,7 @@ proc updateState*(
return false
# ...and make sure to process empty slots as requested
dag.advanceSlots(state, bsi.slot, save, cache, info)
dag.advanceSlots(state, bsi.slot, save, cache, info, updateFlags)
# ...and make sure to load the state cache, if it exists
loadStateCache(dag, cache, bsi.bid, getStateField(state, slot).epoch)
@ -2387,7 +2392,7 @@ proc updateHead*(
# to use existing in-memory states to make this smooth
var cache: StateCache
if not updateState(
dag, dag.headState, newHead.bid.atSlot(), false, cache):
dag, dag.headState, newHead.bid.atSlot(), false, cache, dag.updateFlags):
# Advancing the head state should never fail, given that the tail is
# implicitly finalised, the head is an ancestor of the tail and we always
# store the tail state in the database, as well as every epoch slot state in
@ -2651,7 +2656,7 @@ proc getProposalState*(
# it now
if not dag.updateState(
state[], head.atSlot(slot - 1).toBlockSlotId().expect("not nil"),
false, cache):
false, cache, dag.updateFlags):
error "Cannot get proposal state - skipping block production, database corrupt?",
head = shortLog(head),
slot
@ -2840,7 +2845,8 @@ proc rebuildIndex*(dag: ChainDAGRef) =
# The slot check is needed to avoid re-applying a block
if bids.isProposed and getStateField(state[], latest_block_header).slot < bids.bid.slot:
let res = dag.applyBlock(state[], bids.bid, cache, info)
let res = dag.applyBlock(state[], bids.bid, cache, info,
dag.updateFlags)
if res.isErr:
error "Failed to apply block while building index",
state_bid = shortLog(state[].latest_block_id()),

View File

@ -35,7 +35,7 @@ proc updateExistingState(
dag: ChainDAGRef, state: var ForkedHashedBeaconState, bsi: BlockSlotId,
save: bool, cache: var StateCache): bool =
## Wrapper around `updateState` for states expected to exist.
let ok = dag.updateState(state, bsi, save, cache)
let ok = dag.updateState(state, bsi, save, cache, dag.updateFlags)
if not ok:
error "State failed to load unexpectedly",
bsi, tail = dag.tail.slot, backfill = shortLog(dag.backfill)

View File

@ -0,0 +1,247 @@
# beacon_chain
# Copyright (c) 2018-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import std/sequtils, stew/io2, chronicles, chronos, metrics,
../spec/forks,
../[beacon_chain_file, beacon_clock],
../sszdump
from ./block_pools_types import VerifierError, BlockData
from ../spec/state_transition_block import validate_blobs
from std/os import `/`
export beacon_chain_file
const
ChainFileName = "nbc.bfdata"
type
ChainListRef* = ref object
path*: string
handle*: Opt[ChainFileHandle]
template chainFilePath*(directory: string): string =
directory / ChainFileName
template filePath*(clist: ChainListRef): string =
chainFilePath(clist.path)
proc init*(T: type ChainListRef, directory: string): ChainListRef =
let
filename = directory.chainFilePath()
handle =
if not(isFile(filename)):
Opt.none(ChainFileHandle)
else:
let
flags = {ChainFileFlag.Repair}
res = ChainFileHandle.init(filename, flags)
if res.isErr():
fatal "Unexpected failure while loading backfill data",
filename = filename, reason = res.error
quit 1
Opt.some(res.get())
ChainListRef(path: directory, handle: handle)
proc init*(T: type ChainListRef, directory: string,
slot: Slot): Result[ChainListRef, string] =
let
flags = {ChainFileFlag.Repair, ChainFileFlag.OpenAlways}
filename = directory.chainFilePath()
handle = ? ChainFileHandle.init(filename, flags)
offset {.used.} = ? seekForSlot(handle, slot)
ok(ChainListRef(path: directory, handle: Opt.some(handle)))
proc seekForSlot*(clist: ChainListRef, slot: Slot): Result[void, string] =
if clist.handle.isNone():
let
flags = {ChainFileFlag.Repair, ChainFileFlag.OpenAlways}
filename = clist.path.chainFilePath()
handle = ? ChainFileHandle.init(filename, flags)
clist.handle = Opt.some(handle)
let offset {.used.} = ? seekForSlot(clist.handle.get(), slot)
ok()
proc close*(clist: ChainListRef): Result[void, string] =
if clist.handle.isNone():
return ok()
? clist.handle.get().close()
ok()
proc clear*(clist: ChainListRef): Result[void, string] =
? clist.close()
? clearFile(clist.path.chainFilePath())
clist.handle = Opt.none(ChainFileHandle)
ok()
template slot*(data: BlockData): Slot =
data.blck.slot
template parent_root*(data: ForkedSignedBeaconBlock): Eth2Digest =
withBlck(data): forkyBlck.message.parent_root
template parent_root*(data: BlockData): Eth2Digest =
data.blck.parent_root()
template root*(data: BlockData): Eth2Digest =
withBlck(data.blck): forkyBlck.root
template shortLog*(x: BlockData): string =
let count = if x.blob.isSome(): $len(x.blob.get()) else: "0"
$(x.slot()) & "@" & shortLog(x.parent_root()) & "#" & count
template shortLog*(x: Opt[BlockData]): string =
if x.isNone():
"[none]"
else:
shortLog(x.get())
func tail*(clist: ChainListRef): Opt[BlockData] =
if clist.handle.isSome():
clist.handle.get().data.tail
else:
Opt.none(BlockData)
func head*(clist: ChainListRef): Opt[BlockData] =
if clist.handle.isSome():
clist.handle.get().data.head
else:
Opt.none(BlockData)
proc setHead*(clist: ChainListRef, bdata: BlockData) =
doAssert(clist.handle.isSome())
var handle = clist.handle.get()
handle.setHead(bdata)
clist.handle = Opt.some(handle)
proc setTail*(clist: ChainListRef, bdata: BlockData) =
doAssert(clist.handle.isSome())
var handle = clist.handle.get()
handle.setTail(bdata)
clist.handle = Opt.some(handle)
proc store*(clist: ChainListRef, signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars]): Result[void, string] =
if clist.handle.isNone():
let
filename = clist.path.chainFilePath()
flags = {ChainFileFlag.Repair, ChainFileFlag.OpenAlways}
handle = ? ChainFileHandle.init(filename, flags)
clist.handle = Opt.some(handle)
store(handle, signedBlock, blobs)
else:
store(clist.handle.get(), signedBlock, blobs)
proc checkBlobs(signedBlock: ForkedSignedBeaconBlock,
blobsOpt: Opt[BlobSidecars]): Result[void, VerifierError] =
withBlck(signedBlock):
when consensusFork >= ConsensusFork.Deneb:
if blobsOpt.isSome():
let blobs = blobsOpt.get()
template blob_kzg_commitments(): untyped =
forkyBlck.message.body.blob_kzg_commitments.asSeq
if len(blobs) > 0:
if len(blobs) != len(blob_kzg_commitments):
return err(VerifierError.Invalid)
let res =
validate_blobs(blob_kzg_commitments,
blobs.mapIt(KzgBlob(bytes: it.blob)),
blobs.mapIt(it.kzg_proof))
if res.isErr():
debug "Blob validation failed",
block_root = shortLog(forkyBlck.root),
blobs = shortLog(blobs),
blck = shortLog(forkyBlck.message),
kzg_commits = mapIt(blob_kzg_commitments, shortLog(it)),
signature = shortLog(forkyBlck.signature),
msg = res.error()
return err(VerifierError.Invalid)
ok()
proc addBackfillBlockData*(
clist: ChainListRef, signedBlock: ForkedSignedBeaconBlock,
blobsOpt: Opt[BlobSidecars]): Result[void, VerifierError] =
doAssert(not(isNil(clist)))
logScope:
backfill_tail = shortLog(clist.tail)
signed_block_slot = signedBlock.slot
signed_block_root = signedBlock.root
signed_block_parent_root = signedBlock.parent_root
let verifyBlockTick = Moment.now()
if clist.tail.isNone():
? checkBlobs(signedBlock, blobsOpt)
let storeBlockTick = Moment.now()
store(clist, signedBlock, blobsOpt).isOkOr:
fatal "Unexpected failure while trying to store data",
filename = chainFilePath(clist.path), reason = error
quit 1
let bdata = BlockData(blck: signedBlock, blob: blobsOpt)
clist.setTail(bdata)
if clist.head.isNone():
clist.setHead(bdata)
debug "Initial block backfilled",
verify_block_duration = shortLog(storeBlockTick - verifyBlockTick),
store_block_duration = shortLog(Moment.now() - storeBlockTick)
return ok()
let tail = clist.tail.get()
if signedBlock.slot == tail.slot:
if signedBlock.root == tail.root:
debug "Duplicate block"
return err(VerifierError.Duplicate)
else:
debug "Block from unviable fork"
return err(VerifierError.UnviableFork)
elif signedBlock.slot > tail.slot:
debug "Block from unviable fork"
return err(VerifierError.UnviableFork)
if tail.parent_root != signedBlock.root:
debug "Block does not match expected backfill root"
return err(VerifierError.MissingParent)
? checkBlobs(signedBlock, blobsOpt)
let storeBlockTick = Moment.now()
store(clist, signedBlock, blobsOpt).isOkOr:
fatal "Unexpected failure while trying to store data",
filename = chainFilePath(clist.path), reason = error
quit 1
debug "Block backfilled",
verify_block_duration = shortLog(storeBlockTick - verifyBlockTick),
store_block_duration = shortLog(Moment.now() - storeBlockTick)
clist.setTail(BlockData(blck: signedBlock, blob: blobsOpt))
ok()
proc untrustedBackfillVerifier*(
clist: ChainListRef,
signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars],
maybeFinalized: bool
): Future[Result[void, VerifierError]] {.
async: (raises: [CancelledError], raw: true).} =
let retFuture = newFuture[Result[void, VerifierError]]()
retFuture.complete(clist.addBackfillBlockData(signedBlock, blobs))
retFuture

View File

@ -107,7 +107,7 @@ type
## The slot at which we sent a payload to the execution client the last
## time
NewPayloadStatus {.pure.} = enum
NewPayloadStatus* {.pure.} = enum
valid
notValid
invalid
@ -123,7 +123,7 @@ type
proc new*(T: type BlockProcessor,
dumpEnabled: bool,
dumpDirInvalid, dumpDirIncoming: string,
rng: ref HmacDrbgContext, taskpool: TaskPoolPtr,
batchVerifier: ref BatchVerifier,
consensusManager: ref ConsensusManager,
validatorMonitor: ref ValidatorMonitor,
blobQuarantine: ref BlobQuarantine,
@ -137,7 +137,7 @@ proc new*(T: type BlockProcessor,
validatorMonitor: validatorMonitor,
blobQuarantine: blobQuarantine,
getBeaconTime: getBeaconTime,
verifier: BatchVerifier.init(rng, taskpool)
verifier: batchVerifier[]
)
# Sync callbacks
@ -416,6 +416,22 @@ proc enqueueBlock*(
except AsyncQueueFullError:
raiseAssert "unbounded queue"
proc updateHead*(
consensusManager: ref ConsensusManager,
getBeaconTimeFn: GetBeaconTimeFn,
): Result[void, string] =
let
attestationPool = consensusManager.attestationPool
wallTime = getBeaconTimeFn()
wallSlot = wallTime.slotOrZero()
newHead =
attestationPool[].selectOptimisticHead(wallSlot.start_beacon_time)
if newHead.isOk():
consensusManager[].updateHead(newHead.get.blck)
ok()
else:
err("Head selection failed, using previous head")
proc storeBlock(
self: ref BlockProcessor, src: MsgSource, wallTime: BeaconTime,
signedBlock: ForkySignedBeaconBlock,

View File

@ -13,7 +13,8 @@ import
metrics, metrics/chronos_httpserver,
stew/[byteutils, io2],
eth/p2p/discoveryv5/[enr, random2],
./consensus_object_pools/[blob_quarantine, data_column_quarantine],
./consensus_object_pools/[
blob_quarantine, data_column_quarantine, blockchain_list],
./consensus_object_pools/vanity_logs/vanity_logs,
./networking/[topic_params, network_metadata_downloads],
./rpc/[rest_api, state_ttl_cache],
@ -21,7 +22,7 @@ import
./spec/[
deposit_snapshots, engine_authentication, weak_subjectivity,
eip7594_helpers],
./sync/[sync_protocol, light_client_protocol],
./sync/[sync_protocol, light_client_protocol, sync_overseer],
./validators/[keystore_management, beacon_validators],
"."/[
beacon_node, beacon_node_light_client, deposits,
@ -275,10 +276,19 @@ proc checkWeakSubjectivityCheckpoint(
from ./spec/state_transition_block import kzg_commitment_to_versioned_hash
proc isSlotWithinWeakSubjectivityPeriod(dag: ChainDagRef, slot: Slot): bool =
let
checkpoint = Checkpoint(
epoch: epoch(getStateField(dag.headState, slot)),
root: getStateField(dag.headState, latest_block_header).state_root)
is_within_weak_subjectivity_period(dag.cfg, slot,
dag.headState, checkpoint)
proc initFullNode(
node: BeaconNode,
rng: ref HmacDrbgContext,
dag: ChainDAGRef,
clist: ChainListRef,
taskpool: TaskPoolPtr,
getBeaconTime: GetBeaconTimeFn) {.async.} =
template config(): auto = node.config
@ -369,17 +379,18 @@ proc initFullNode(
else:
dag.tail.slot
func getUntrustedBackfillSlot(): Slot =
if clist.tail.isSome():
clist.tail.get().blck.slot
else:
dag.tail.slot
func getFrontfillSlot(): Slot =
max(dag.frontfill.get(BlockId()).slot, dag.horizon)
proc isWithinWeakSubjectivityPeriod(): bool =
let
currentSlot = node.beaconClock.now().slotOrZero()
checkpoint = Checkpoint(
epoch: epoch(getStateField(node.dag.headState, slot)),
root: getStateField(node.dag.headState, latest_block_header).state_root)
is_within_weak_subjectivity_period(node.dag.cfg, currentSlot,
node.dag.headState, checkpoint)
isSlotWithinWeakSubjectivityPeriod(node.dag,
node.beaconClock.now().slotOrZero())
proc eventWaiter(): Future[void] {.async: (raises: [CancelledError]).} =
await node.shutdownEvent.wait()
@ -414,10 +425,12 @@ proc initFullNode(
ActionTracker.init(node.network.nodeId, config.subscribeAllSubnets),
node.dynamicFeeRecipientsStore, config.validatorsDir,
config.defaultFeeRecipient, config.suggestedGasLimit)
batchVerifier = BatchVerifier.new(rng, taskpool)
blockProcessor = BlockProcessor.new(
config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming,
rng, taskpool, consensusManager, node.validatorMonitor,
batchVerifier, consensusManager, node.validatorMonitor,
blobQuarantine, getBeaconTime)
blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars], maybeFinalized: bool):
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} =
@ -427,6 +440,11 @@ proc initFullNode(
# that should probably be reimagined more holistically in the future.
blockProcessor[].addBlock(
MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized)
untrustedBlockVerifier =
proc(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars],
maybeFinalized: bool): Future[Result[void, VerifierError]] {.
async: (raises: [CancelledError], raw: true).} =
clist.untrustedBackfillVerifier(signedBlock, blobs, maybeFinalized)
rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
maybeFinalized: bool):
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} =
@ -487,6 +505,20 @@ proc initFullNode(
dag.backfill.slot, blockVerifier, maxHeadAge = 0,
shutdownEvent = node.shutdownEvent,
flags = syncManagerFlags)
clistPivotSlot =
if clist.tail.isSome():
clist.tail.get().blck.slot()
else:
getLocalWallSlot()
untrustedManager = newSyncManager[Peer, PeerId](
node.network.peerPool,
dag.cfg.DENEB_FORK_EPOCH, dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS,
SyncQueueKind.Backward, getLocalHeadSlot,
getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getUntrustedBackfillSlot,
getFrontfillSlot, isWithinWeakSubjectivityPeriod,
clistPivotSlot, untrustedBlockVerifier, maxHeadAge = 0,
shutdownEvent = node.shutdownEvent,
flags = syncManagerFlags)
router = (ref MessageRouter)(
processor: processor,
network: node.network)
@ -540,6 +572,7 @@ proc initFullNode(
dag.setReorgCb(onChainReorg)
node.dag = dag
node.list = clist
node.blobQuarantine = blobQuarantine
node.quarantine = quarantine
node.attestationPool = attestationPool
@ -547,11 +580,24 @@ proc initFullNode(
node.lightClientPool = lightClientPool
node.validatorChangePool = validatorChangePool
node.processor = processor
node.batchVerifier = batchVerifier
node.blockProcessor = blockProcessor
node.consensusManager = consensusManager
node.requestManager = requestManager
node.syncManager = syncManager
node.backfiller = backfiller
node.untrustedManager = untrustedManager
node.syncOverseer = SyncOverseerRef.new(node.consensusManager,
node.validatorMonitor,
config,
getBeaconTime,
node.list,
node.beaconClock,
node.eventBus.optFinHeaderUpdateQueue,
node.network.peerPool,
node.batchVerifier,
syncManager, backfiller,
untrustedManager)
node.router = router
await node.addValidators()
@ -627,11 +673,20 @@ proc init*(T: type BeaconNode,
checkpoint = Checkpoint(
epoch: epoch(getStateField(genesisState[], slot)),
root: getStateField(genesisState[], latest_block_header).state_root)
notice "Genesis state information",
genesis_fork = genesisState.kind,
is_post_altair = (cfg.ALTAIR_FORK_EPOCH == GENESIS_EPOCH)
if config.longRangeSync == LongRangeSyncMode.Light:
if not is_within_weak_subjectivity_period(metadata.cfg, currentSlot,
genesisState[], checkpoint):
fatal WeakSubjectivityLogMessage, current_slot = currentSlot
quit 1
# We do support any network which starts from Altair or later fork.
let metadata = config.loadEth2Network()
if metadata.cfg.ALTAIR_FORK_EPOCH != GENESIS_EPOCH:
fatal WeakSubjectivityLogMessage, current_slot = currentSlot,
altair_fork_epoch = metadata.cfg.ALTAIR_FORK_EPOCH
quit 1
try:
if config.numThreads < 0:
@ -669,7 +724,8 @@ proc init*(T: type BeaconNode,
finUpdateQueue: newAsyncEventQueue[
RestVersioned[ForkedLightClientFinalityUpdate]](),
optUpdateQueue: newAsyncEventQueue[
RestVersioned[ForkedLightClientOptimisticUpdate]]())
RestVersioned[ForkedLightClientOptimisticUpdate]](),
optFinHeaderUpdateQueue: newAsyncEventQueue[ForkedLightClientHeader]())
db = BeaconChainDB.new(config.databaseDir, cfg, inMemory = false)
if config.externalBeaconApiUrl.isSome and ChainDAGRef.isInitialized(db).isErr:
@ -843,6 +899,28 @@ proc init*(T: type BeaconNode,
getBeaconTime = beaconClock.getBeaconTimeFn()
let clist =
block:
let res = ChainListRef.init(config.databaseDir())
debug "Backfill database has been loaded", path = config.databaseDir(),
head = shortLog(res.head), tail = shortLog(res.tail)
if res.handle.isSome() and res.tail().isSome():
if not(isSlotWithinWeakSubjectivityPeriod(dag, res.tail.get().slot())):
notice "Backfill database is outdated " &
"(outside of weak subjectivity period), reseting database",
path = config.databaseDir(),
tail = shortLog(res.tail)
res.clear().isOkOr:
fatal "Unable to reset backfill database",
path = config.databaseDir(), reason = error
quit 1
res
info "Backfill database initialized", path = config.databaseDir(),
head = shortLog(clist.head), tail = shortLog(clist.tail)
if config.weakSubjectivityCheckpoint.isSome:
dag.checkWeakSubjectivityCheckpoint(
config.weakSubjectivityCheckpoint.get, beaconClock)
@ -970,7 +1048,7 @@ proc init*(T: type BeaconNode,
node.initLightClient(
rng, cfg, dag.forkDigests, getBeaconTime, dag.genesis_validators_root)
await node.initFullNode(rng, dag, taskpool, getBeaconTime)
await node.initFullNode(rng, dag, clist, taskpool, getBeaconTime)
node.updateLightClientFromDag()
@ -1682,26 +1760,29 @@ func formatNextConsensusFork(
$nextConsensusFork & ":" & $nextForkEpoch)
func syncStatus(node: BeaconNode, wallSlot: Slot): string =
let optimisticHead = not node.dag.head.executionValid
if node.syncManager.inProgress:
let
optimisticSuffix =
if optimisticHead:
"/opt"
else:
""
lightClientSuffix =
if node.consensusManager[].shouldSyncOptimistically(wallSlot):
" - lc: " & $shortLog(node.consensusManager[].optimisticHead)
else:
""
node.syncManager.syncStatus & optimisticSuffix & lightClientSuffix
elif node.backfiller.inProgress:
"backfill: " & node.backfiller.syncStatus
elif optimisticHead:
"synced/opt"
else:
"synced"
node.syncOverseer.statusMsg.valueOr:
let optimisticHead = not node.dag.head.executionValid
if node.syncManager.inProgress:
let
optimisticSuffix =
if optimisticHead:
"/opt"
else:
""
lightClientSuffix =
if node.consensusManager[].shouldSyncOptimistically(wallSlot):
" - lc: " & $shortLog(node.consensusManager[].optimisticHead)
else:
""
node.syncManager.syncStatus & optimisticSuffix & lightClientSuffix
elif node.untrustedManager.inProgress:
"untrusted: " & node.untrustedManager.syncStatus
elif node.backfiller.inProgress:
"backfill: " & node.backfiller.syncStatus
elif optimisticHead:
"synced/opt"
else:
"synced"
when defined(windows):
from winservice import establishWindowsService, reportServiceStatusSuccess
@ -1995,18 +2076,6 @@ proc stop(node: BeaconNode) =
node.db.close()
notice "Databases closed"
proc startBackfillTask(node: BeaconNode) {.async.} =
while node.dag.needsBackfill:
if not node.syncManager.inProgress:
# Only start the backfiller if it's needed _and_ head sync has completed -
# if we lose sync after having synced head, we could stop the backfilller,
# but this should be a fringe case - might as well keep the logic simple for
# now
node.backfiller.start()
return
await sleepAsync(chronos.seconds(2))
proc run(node: BeaconNode) {.raises: [CatchableError].} =
bnStatus = BeaconNodeStatus.Running
@ -2026,9 +2095,7 @@ proc run(node: BeaconNode) {.raises: [CatchableError].} =
node.startLightClient()
node.requestManager.start()
node.syncManager.start()
if node.dag.needsBackfill(): asyncSpawn node.startBackfillTask()
node.syncOverseer.start()
waitFor node.updateGossipStatus(wallSlot)

View File

@ -82,7 +82,7 @@ proc installRewardsApiHandlers*(router: var RestRouter, node: BeaconNode) =
tmpState = assignClone(node.dag.headState)
if not updateState(
node.dag, tmpState[], targetBlock, false, cache):
node.dag, tmpState[], targetBlock, false, cache, node.dag.updateFlags):
return RestApiResponse.jsonError(Http404, ParentBlockMissingStateError)
func rollbackProc(state: var ForkedHashedBeaconState) {.
@ -164,7 +164,7 @@ proc installRewardsApiHandlers*(router: var RestRouter, node: BeaconNode) =
tmpState = assignClone(node.dag.headState)
if not updateState(
node.dag, tmpState[], targetBlock, false, cache):
node.dag, tmpState[], targetBlock, false, cache, node.dag.updateFlags):
return RestApiResponse.jsonError(Http404, ParentBlockMissingStateError)
let response =

View File

@ -192,7 +192,8 @@ template withStateForBlockSlotId*(nodeParam: BeaconNode,
else:
assignClone(node.dag.headState)
if node.dag.updateState(stateToAdvance[], blockSlotId, false, cache):
if node.dag.updateState(stateToAdvance[], blockSlotId, false, cache,
node.dag.updateFlags):
if cachedState == nil and node.stateTtlCache != nil:
# This was not a cached state, we can cache it now
node.stateTtlCache.add(stateToAdvance)

View File

@ -229,6 +229,38 @@ func bls_to_execution_change_signature_set*(
SignatureSet.init(pubkey, signing_root, signature)
proc collectProposerSignatureSet*(
sigs: var seq[SignatureSet],
blocks: openArray[ForkedSignedBeaconBlock],
validatorKeys: openArray[ImmutableValidatorData2],
fork: Fork,
genesis_validators_root: Eth2Digest
): Result[void, string] =
mixin load
for forkedBlock in blocks:
let item =
withBlck(forkedBlock):
let
proposerKey =
validatorKeys.load(forkyBlck.message.proposer_index).valueOr:
let msg = "collectProposerSignatureSet: " &
"invalid proposer index (" &
$forkyBlck.message.proposer_index & ")"
return err(msg)
signature =
forkyBlck.signature.load().valueOr:
let msg = "collectProposerSignatureSet: " &
"cannot load signature (" &
$ forkyBlck.signature & ")"
return err(msg)
block_signature_set(
fork, genesis_validators_root,
forkyBlck.message.slot, forkyBlck.root,
proposerKey, signature)
sigs.add(item)
ok()
proc collectSignatureSets*(
sigs: var seq[SignatureSet],
signed_block: ForkySignedBeaconBlock,

View File

@ -72,7 +72,7 @@ type
rangeAge: uint64
chunkSize: uint64
queue: SyncQueue[A]
syncFut: Future[void]
syncFut: Future[void].Raising([CancelledError])
blockVerifier: BlockVerifier
inProgress*: bool
insSyncSpeed*: float
@ -88,7 +88,14 @@ type
BeaconBlocksRes =
NetRes[List[ref ForkedSignedBeaconBlock, Limit MAX_REQUEST_BLOCKS]]
BlobSidecarsRes = NetRes[List[ref BlobSidecar, Limit(MAX_REQUEST_BLOB_SIDECARS)]]
BlobSidecarsRes =
NetRes[List[ref BlobSidecar, Limit(MAX_REQUEST_BLOB_SIDECARS)]]
SyncBlockData* = object
blocks*: seq[ref ForkedSignedBeaconBlock]
blobs*: Opt[seq[BlobSidecars]]
SyncBlockDataRes* = Result[SyncBlockData, string]
proc now*(sm: typedesc[SyncMoment], slots: uint64): SyncMoment {.inline.} =
SyncMoment(stamp: now(chronos.Moment), slots: slots)
@ -172,7 +179,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
res
proc getBlocks[A, B](man: SyncManager[A, B], peer: A,
req: SyncRequest): Future[BeaconBlocksRes] {.
req: SyncRequest[A]): Future[BeaconBlocksRes] {.
async: (raises: [CancelledError], raw: true).} =
mixin getScore, `==`
@ -188,14 +195,19 @@ proc getBlocks[A, B](man: SyncManager[A, B], peer: A,
beaconBlocksByRange_v2(peer, req.slot, req.count, 1'u64)
proc shouldGetBlobs[A, B](man: SyncManager[A, B], e: Epoch): bool =
let wallEpoch = man.getLocalWallSlot().epoch
e >= man.DENEB_FORK_EPOCH and
proc shouldGetBlobs[A, B](man: SyncManager[A, B], s: Slot): bool =
let
wallEpoch = man.getLocalWallSlot().epoch
epoch = s.epoch()
(epoch >= man.DENEB_FORK_EPOCH) and
(wallEpoch < man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS or
e >= wallEpoch - man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS)
epoch >= wallEpoch - man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS)
proc shouldGetBlobs[A, B](man: SyncManager[A, B], r: SyncRequest[A]): bool =
man.shouldGetBlobs(r.slot) or man.shouldGetBlobs(r.slot + (r.count - 1))
proc getBlobSidecars[A, B](man: SyncManager[A, B], peer: A,
req: SyncRequest): Future[BlobSidecarsRes]
req: SyncRequest[A]): Future[BlobSidecarsRes]
{.async: (raises: [CancelledError], raw: true).} =
mixin getScore, `==`
@ -225,10 +237,10 @@ proc remainingSlots(man: SyncManager): uint64 =
else:
0'u64
func groupBlobs*[T](req: SyncRequest[T],
blocks: seq[ref ForkedSignedBeaconBlock],
blobs: seq[ref BlobSidecar]):
Result[seq[BlobSidecars], string] =
func groupBlobs*(
blocks: seq[ref ForkedSignedBeaconBlock],
blobs: seq[ref BlobSidecar]
): Result[seq[BlobSidecars], string] =
var
grouped = newSeq[BlobSidecars](len(blocks))
blob_cursor = 0
@ -269,8 +281,93 @@ func checkBlobs(blobs: seq[BlobSidecars]): Result[void, string] =
? blob_sidecar[].verify_blob_sidecar_inclusion_proof()
ok()
proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A)
{.async: (raises: [CancelledError]).} =
proc getSyncBlockData*[T](
peer: T,
slot: Slot
): Future[SyncBlockDataRes] {.async: (raises: [CancelledError]).} =
mixin getScore
logScope:
slot = slot
peer_score = peer.getScore()
peer_speed = peer.netKbps()
topics = "syncman"
debug "Requesting block from peer"
let blocksRange =
block:
let res = await beaconBlocksByRange_v2(peer, slot, 1'u64, 1'u64)
if res.isErr():
peer.updateScore(PeerScoreNoValues)
return err("Failed to receive blocks on request [" & $res.error & "]")
res.get().asSeq
if len(blocksRange) == 0:
peer.updateScore(PeerScoreNoValues)
return err("An empty range of blocks was returned by peer")
if len(blocksRange) != 1:
peer.updateScore(PeerScoreBadResponse)
return err("Incorrect number of blocks was returned by peer, " &
$len(blocksRange))
debug "Received block on request"
if blocksRange[0][].slot != slot:
peer.updateScore(PeerScoreBadResponse)
return err("The received block is not in the requested range")
let (shouldGetBlob, blobsCount) =
withBlck(blocksRange[0][]):
when consensusFork >= ConsensusFork.Deneb:
let res = len(forkyBlck.message.body.blob_kzg_commitments)
if res > 0:
(true, res)
else:
(false, 0)
else:
(false, 0)
let blobsRange =
if shouldGetBlob:
let blobData =
block:
debug "Requesting blobs sidecars from peer"
let res = await blobSidecarsByRange(peer, slot, 1'u64)
if res.isErr():
peer.updateScore(PeerScoreNoValues)
return err(
"Failed to receive blobs on request, reason: " & $res.error)
res.get().asSeq()
if len(blobData) == 0:
peer.updateScore(PeerScoreNoValues)
return err("An empty range of blobs was returned by peer")
if len(blobData) != blobsCount:
peer.updateScore(PeerScoreBadResponse)
return err("Incorrect number of received blobs in the requested range")
debug "Received blobs on request", blobs_count = len(blobData)
let groupedBlobs = groupBlobs(blocksRange, blobData).valueOr:
peer.updateScore(PeerScoreNoValues)
return err("Received blobs sequence is inconsistent, reason: " & error)
groupedBlobs.checkBlobs().isOkOr:
peer.updateScore(PeerScoreBadResponse)
return err("Received blobs sequence is invalid, reason: " & error)
Opt.some(groupedBlobs)
else:
Opt.none(seq[BlobSidecars])
ok(SyncBlockData(blocks: blocksRange, blobs: blobsRange))
proc syncStep[A, B](
man: SyncManager[A, B], index: int, peer: A
) {.async: (raises: [CancelledError]).} =
logScope:
peer_score = peer.getScore()
peer_speed = peer.netKbps()
@ -409,21 +506,20 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A)
request = req, err = blocks.error
return
let blockData = blocks.get().asSeq()
let blockSmap = getShortMap(req, blockData)
debug "Received blocks on request", blocks_count = len(blockData),
blocks_map = blockSmap, request = req
blocks_map = getShortMap(req, blockData), request = req
let slots = mapIt(blockData, it[].slot)
if not(checkResponse(req, slots)):
peer.updateScore(PeerScoreBadResponse)
man.queue.push(req)
warn "Received blocks sequence is not in requested range",
blocks_count = len(blockData), blocks_map = blockSmap,
request = req
blocks_count = len(blockData),
blocks_map = getShortMap(req, blockData), request = req
return
let shouldGetBlobs =
if not man.shouldGetBlobs(req.slot.epoch):
if not man.shouldGetBlobs(req):
false
else:
var hasBlobs = false
@ -435,12 +531,6 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A)
break
hasBlobs
func combine(acc: seq[Slot], cur: Slot): seq[Slot] =
var copy = acc
if copy[copy.len-1] != cur:
copy.add(cur)
copy
let blobData =
if shouldGetBlobs:
let blobs = await man.getBlobSidecars(peer, req)
@ -451,37 +541,37 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A)
request = req, err = blobs.error
return
let blobData = blobs.get().asSeq()
let blobSmap = getShortMap(req, blobData)
debug "Received blobs on request", blobs_count = len(blobData),
blobs_map = blobSmap, request = req
debug "Received blobs on request",
blobs_count = len(blobData),
blobs_map = getShortMap(req, blobData), request = req
if len(blobData) > 0:
let slots = mapIt(blobData, it[].signed_block_header.message.slot)
let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]])
if not(checkResponse(req, uniqueSlots)):
if not(checkResponse(req, slots)):
peer.updateScore(PeerScoreBadResponse)
man.queue.push(req)
warn "Received blobs sequence is not in requested range",
blobs_count = len(blobData), blobs_map = getShortMap(req, blobData),
request = req
blobs_count = len(blobData),
blobs_map = getShortMap(req, blobData),
request = req
return
let groupedBlobs = groupBlobs(req, blockData, blobData)
if groupedBlobs.isErr():
let groupedBlobs = groupBlobs(blockData, blobData).valueOr:
peer.updateScore(PeerScoreNoValues)
man.queue.push(req)
info "Received blobs sequence is inconsistent",
blobs_map = getShortMap(req, blobData), request = req, msg=groupedBlobs.error()
blobs_map = getShortMap(req, blobData),
request = req, msg = error
return
if (let checkRes = groupedBlobs.get.checkBlobs(); checkRes.isErr):
if (let checkRes = groupedBlobs.checkBlobs(); checkRes.isErr):
peer.updateScore(PeerScoreBadResponse)
man.queue.push(req)
warn "Received blobs sequence is invalid",
blobs_count = len(blobData),
blobs_map = getShortMap(req, blobData),
request = req,
msg = checkRes.error
blobs_count = len(blobData),
blobs_map = getShortMap(req, blobData),
request = req,
msg = checkRes.error
return
Opt.some(groupedBlobs.get())
Opt.some(groupedBlobs)
else:
Opt.none(seq[BlobSidecars])
@ -512,7 +602,9 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A)
await man.queue.push(req, blockData, blobData, maybeFinalized, proc() =
man.workers[index].status = SyncWorkerStatus.Processing)
proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async: (raises: [CancelledError]).} =
proc syncWorker[A, B](
man: SyncManager[A, B], index: int
) {.async: (raises: [CancelledError]).} =
mixin getKey, getScore, getHeadSlot
logScope:
@ -610,8 +702,9 @@ proc toTimeLeftString*(d: Duration): string =
res = res & "00m"
res
proc syncClose[A, B](man: SyncManager[A, B],
speedTaskFut: Future[void]) {.async.} =
proc syncClose[A, B](
man: SyncManager[A, B], speedTaskFut: Future[void]
) {.async: (raises: []).} =
var pending: seq[FutureBase]
if not(speedTaskFut.finished()):
pending.add(speedTaskFut.cancelAndWait())
@ -620,7 +713,10 @@ proc syncClose[A, B](man: SyncManager[A, B],
pending.add(worker.future.cancelAndWait())
await noCancel allFutures(pending)
proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
proc syncLoop[A, B](
man: SyncManager[A, B]
) {.async: (raises: [CancelledError]).} =
logScope:
sync_ident = man.ident
direction = man.direction
@ -671,14 +767,27 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
let (map, sleeping, waiting, pending) = man.getWorkersStats()
debug "Current syncing state", workers_map = map,
sleeping_workers_count = sleeping,
waiting_workers_count = waiting,
pending_workers_count = pending,
wall_head_slot = wallSlot, local_head_slot = headSlot,
pause_time = $chronos.seconds(pauseTime),
avg_sync_speed = man.avgSyncSpeed, ins_sync_speed = man.insSyncSpeed
case man.queue.kind
of SyncQueueKind.Forward:
debug "Current syncing state", workers_map = map,
sleeping_workers_count = sleeping,
waiting_workers_count = waiting,
pending_workers_count = pending,
wall_head_slot = wallSlot,
local_head_slot = headSlot,
pause_time = $chronos.seconds(pauseTime),
avg_sync_speed = man.avgSyncSpeed.formatBiggestFloat(ffDecimal, 4),
ins_sync_speed = man.insSyncSpeed.formatBiggestFloat(ffDecimal, 4)
of SyncQueueKind.Backward:
debug "Current syncing state", workers_map = map,
sleeping_workers_count = sleeping,
waiting_workers_count = waiting,
pending_workers_count = pending,
wall_head_slot = wallSlot,
backfill_slot = man.getSafeSlot(),
pause_time = $chronos.seconds(pauseTime),
avg_sync_speed = man.avgSyncSpeed.formatBiggestFloat(ffDecimal, 4),
ins_sync_speed = man.insSyncSpeed.formatBiggestFloat(ffDecimal, 4)
let
pivot = man.progressPivot
progress =
@ -806,3 +915,18 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
proc start*[A, B](man: SyncManager[A, B]) =
## Starts SyncManager's main loop.
man.syncFut = man.syncLoop()
proc updatePivot*[A, B](man: SyncManager[A, B], pivot: Slot) =
## Update progress pivot slot.
man.progressPivot = pivot
proc join*[A, B](
man: SyncManager[A, B]
): Future[void] {.async: (raw: true, raises: [CancelledError]).} =
if man.syncFut.isNil():
let retFuture =
Future[void].Raising([CancelledError]).init("nimbus-eth2.join()")
retFuture.complete()
retFuture
else:
man.syncFut.join()

View File

@ -0,0 +1,501 @@
# beacon_chain
# Copyright (c) 2018-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import std/[strutils, sequtils]
import stew/base10, chronos, chronicles, results
import
../consensus_object_pools/blockchain_list,
../spec/eth2_apis/rest_types,
../spec/[helpers, forks, network, forks_light_client, weak_subjectivity],
../networking/[peer_pool, peer_scores, eth2_network],
../gossip_processing/block_processor,
../[beacon_clock, beacon_node],
./[sync_types, sync_manager, sync_queue]
from ../consensus_object_pools/spec_cache import get_attesting_indices
export sync_types
logScope:
topics = "overseer"
const
PARALLEL_REQUESTS = 3
## Number of peers used to obtain the initial block.
BLOCKS_PROCESS_CHUNK_SIZE = 2
## Number of blocks sent to processing (CPU heavy task).
type
BlockDataRes = Result[BlockData, string]
proc init*(t: typedesc[BlockDataChunk],
stateCallback: OnStateUpdated,
data: openArray[BlockData]): BlockDataChunk =
BlockDataChunk(
blocks: @data,
onStateUpdatedCb: stateCallback,
resfut:
Future[Result[void, string]].Raising([CancelledError]).init(
"blockdata.chunk")
)
proc shortLog*(c: BlockDataChunk): string =
let
map =
(c.blocks.mapIt(shortLog(it.blck.root) & ":" & $it.blck.slot)).
join(", ")
futureState = if c.resfut.finished(): "pending" else: "completed"
"[" & map & "]:" & futureState
iterator chunks*(data: openArray[BlockData],
stateCallback: OnStateUpdated,
maxCount: Positive): BlockDataChunk =
for i in countup(0, len(data) - 1, maxCount):
yield BlockDataChunk.init(stateCallback,
data.toOpenArray(i, min(i + maxCount, len(data)) - 1))
proc getLatestBeaconHeader(
overseer: SyncOverseerRef
): Future[BeaconBlockHeader] {.async: (raises: [CancelledError]).} =
let eventKey = overseer.eventQueue.register()
defer:
overseer.eventQueue.unregister(eventKey)
let events =
try:
await overseer.eventQueue.waitEvents(eventKey)
except CancelledError as exc:
raise exc
except AsyncEventQueueFullError:
raiseAssert "AsyncEventQueueFullError should not happen!"
withForkyHeader(events[^1]):
when lcDataFork > LightClientDataFork.None:
forkyHeader.beacon
else:
raiseAssert "Should not happen"
proc getPeerBlock(
overseer: SyncOverseerRef,
slot: Slot,
): Future[BlockDataRes] {.async: (raises: [CancelledError]).} =
let peer = await overseer.pool.acquire()
try:
let
res = (await getSyncBlockData(peer, slot)).valueOr:
return err(error)
blob =
if res.blobs.isSome():
Opt.some(res.blobs.get()[0])
else:
Opt.none(BlobSidecars)
ok(BlockData(blck: res.blocks[0][], blob: blob))
finally:
overseer.pool.release(peer)
proc getBlock(
overseer: SyncOverseerRef,
slot: Slot,
blockHeader: BeaconBlockHeader
): Future[BlockData] {.async: (raises: [CancelledError]).} =
var workers:
array[PARALLEL_REQUESTS, Future[BlockDataRes].Raising([CancelledError])]
while true:
for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = overseer.getPeerBlock(slot)
try:
await allFutures(workers)
except CancelledError as exc:
let pending =
workers.filterIt(not(it.finished())).mapIt(cancelAndWait(it))
await noCancel allFutures(pending)
raise exc
var results: seq[BlockData]
for i in 0 ..< PARALLEL_REQUESTS:
if workers[i].value.isOk:
results.add(workers[i].value.get())
if len(results) > 0:
for item in results:
withBlck(item.blck):
if forkyBlck.message.toBeaconBlockHeader() == blockHeader:
return item
# Wait for 2 seconds before trying one more time.
await sleepAsync(2.seconds)
proc isWithinWeakSubjectivityPeriod(
overseer: SyncOverseerRef, slot: Slot): bool =
let
dag = overseer.consensusManager.dag
currentSlot = overseer.beaconClock.now().slotOrZero()
checkpoint = Checkpoint(
epoch:
getStateField(dag.headState, slot).epoch(),
root:
getStateField(dag.headState, latest_block_header).state_root)
is_within_weak_subjectivity_period(
dag.cfg, currentSlot, dag.headState, checkpoint)
proc isUntrustedBackfillEmpty(clist: ChainListRef): bool =
clist.tail.isNone()
func speed(start, finish: Moment, entities: int): float =
if entities <= 0:
0.0
else:
float(entities) / toFloatSeconds(finish - start)
proc updatePerformance(overseer: SyncOverseerRef, startTick: Moment,
entities: int) =
let dag = overseer.consensusManager.dag
doAssert(overseer.clist.head.isSome() and overseer.clist.tail.isSome())
let
clistHeadSlot = overseer.clist.head.get().slot
clistTailSlot = overseer.clist.tail.get().slot
doAssert(clistHeadSlot >= dag.head.slot)
let slotsPerSec = speed(startTick, Moment.now(), entities)
inc(overseer.avgSpeedCounter)
overseer.avgSpeed = overseer.avgSpeed +
(slotsPerSec - overseer.avgSpeed) / float(overseer.avgSpeedCounter)
let
total = clistHeadSlot - clistTailSlot
progress = dag.head.slot - clistTailSlot
done = float(progress) / float(total)
remaining = total - progress
timeleft =
if overseer.avgSpeed >= 0.001:
Duration.fromFloatSeconds(remaining.float / overseer.avgSpeed)
else:
InfiniteDuration
# Update status string
overseer.statusMsg = Opt.some(
timeleft.toTimeLeftString() & " (" &
(done * 100).formatBiggestFloat(ffDecimal, 2) & "%) " &
overseer.avgSpeed.formatBiggestFloat(ffDecimal, 4) &
"slots/s (" & $dag.head.slot & ")")
proc blockProcessingLoop(overseer: SyncOverseerRef): Future[void] {.
async: (raises: [CancelledError]).} =
let
consensusManager = overseer.consensusManager
dag = consensusManager.dag
attestationPool = consensusManager.attestationPool
validatorMonitor = overseer.validatorMonitor
proc onBlockAdded(
blckRef: BlockRef, blck: ForkedTrustedSignedBeaconBlock, epochRef: EpochRef,
unrealized: FinalityCheckpoints) {.gcsafe, raises: [].} =
let wallTime = overseer.getBeaconTimeFn()
withBlck(blck):
attestationPool[].addForkChoice(
epochRef, blckRef, unrealized, forkyBlck.message, wallTime)
validatorMonitor[].registerBeaconBlock(
MsgSource.sync, wallTime, forkyBlck.message)
for attestation in forkyBlck.message.body.attestations:
for validator_index in
dag.get_attesting_indices(attestation, true):
validatorMonitor[].registerAttestationInBlock(
attestation.data, validator_index, forkyBlck.message.slot)
withState(dag[].clearanceState):
when (consensusFork >= ConsensusFork.Altair) and
(type(forkyBlck) isnot phase0.TrustedSignedBeaconBlock):
for i in forkyBlck.message.body.sync_aggregate.
sync_committee_bits.oneIndices():
validatorMonitor[].registerSyncAggregateInBlock(
forkyBlck.message.slot, forkyBlck.root,
forkyState.data.current_sync_committee.pubkeys.data[i])
block mainLoop:
while true:
let bchunk = await overseer.blocksQueue.popFirst()
block innerLoop:
for bdata in bchunk.blocks:
block:
let res = addBackfillBlockData(dag, bdata, bchunk.onStateUpdatedCb,
onBlockAdded)
if res.isErr():
let msg = "Unable to add block data to database [" &
$res.error & "]"
bchunk.resfut.complete(Result[void, string].err(msg))
break innerLoop
consensusManager.updateHead(overseer.getBeaconTimeFn).isOkOr:
bchunk.resfut.complete(Result[void, string].err(error))
break innerLoop
bchunk.resfut.complete(Result[void, string].ok())
proc verifyBlockProposer(
fork: Fork,
genesis_validators_root: Eth2Digest,
immutableValidators: openArray[ImmutableValidatorData2],
signedBlock: ForkedSignedBeaconBlock
): Result[void, cstring] =
withBlck(signedBlock):
let proposerKey =
immutableValidators.load(forkyBlck.message.proposer_index).valueOr:
return err("Unable to find proposer key")
if not(verify_block_signature(fork, genesis_validators_root,
forkyBlck.message.slot, forkyBlck.message,
proposerKey, forkyBlck.signature)):
return err("Signature verification failed")
ok()
proc rebuildState(overseer: SyncOverseerRef): Future[void] {.
async: (raises: [CancelledError]).} =
overseer.statusMsg = Opt.some("rebuilding state")
let
consensusManager = overseer.consensusManager
dag = consensusManager.dag
batchVerifier = overseer.batchVerifier
clist =
block:
overseer.clist.seekForSlot(dag.head.slot).isOkOr:
fatal "Unable to find slot in backfill data", reason = error,
path = overseer.clist.path
quit 1
overseer.clist
var
blocks: seq[BlockData]
currentEpoch: Epoch = FAR_FUTURE_EPOCH
let handle = clist.handle.get()
overseer.avgSpeed = 0.0
overseer.avgSpeedCounter = 0
# Set minimum slot number from which LC data is collected.
dag.lcDataStore.cache.tailSlot = clist.head.get().slot
block mainLoop:
while true:
let res = getChainFileTail(handle.handle)
if res.isErr():
fatal "Unable to read backfill data", reason = res.error
quit 1
let bres = res.get()
if bres.isNone():
return
let
data = bres.get()
blockEpoch = data.blck.slot.epoch()
if blockEpoch != currentEpoch:
if len(blocks) != 0:
let
startTick = Moment.now()
blocksOnly = blocks.mapIt(it.blck)
proc onStateUpdate(slot: Slot): Result[void, VerifierError] {.
gcsafe, raises: [].} =
if slot != blocksOnly[0].slot:
# We verify signatures only at the beginning of chunk/epoch, in
# such way we could verify whole epoch's proposer signatures in
# one batch.
return ok()
let
fork =
getStateField(dag.headState, fork)
genesis_validators_root =
getStateField(dag.headState, genesis_validators_root)
verifyBlockProposer(batchVerifier[], fork, genesis_validators_root,
dag.db.immutableValidators, blocksOnly).isOkOr:
for signedBlock in blocksOnly:
verifyBlockProposer(fork, genesis_validators_root,
dag.db.immutableValidators,
signedBlock).isOkOr:
fatal "Unable to verify block proposer",
blck = shortLog(signedBlock), reason = error
return err(VerifierError.Invalid)
ok()
for bchunk in blocks.chunks(onStateUpdate, BLOCKS_PROCESS_CHUNK_SIZE):
try:
overseer.blocksQueue.addLastNoWait(bchunk)
except AsyncQueueFullError:
raiseAssert "Should not happen with unbounded AsyncQueue"
let res = await bchunk.resfut
if res.isErr():
fatal "Unable to add block data to database", reason = res.error
quit 1
let updateTick = Moment.now()
debug "Number of blocks injected",
blocks_count = len(blocks),
head = shortLog(dag.head),
finalized = shortLog(getStateField(
dag.headState, finalized_checkpoint)),
store_update_time = updateTick - startTick
overseer.updatePerformance(startTick, len(blocks))
blocks.setLen(0)
currentEpoch = blockEpoch
if data.blck.slot != GENESIS_SLOT:
blocks.add(data)
proc initUntrustedSync(overseer: SyncOverseerRef): Future[void] {.
async: (raises: [CancelledError]).} =
overseer.statusMsg = Opt.some("awaiting light client")
let blockHeader = await overseer.getLatestBeaconHeader()
notice "Received light client block header",
beacon_header = shortLog(blockHeader),
current_slot = overseer.beaconClock.now().slotOrZero()
overseer.statusMsg = Opt.some("retrieving block")
let
blck = await overseer.getBlock(blockHeader.slot, blockHeader)
blobsCount = if blck.blob.isNone(): 0 else: len(blck.blob.get())
notice "Received beacon block", blck = shortLog(blck.blck),
blobs_count = blobsCount
overseer.statusMsg = Opt.some("storing block")
let res = overseer.clist.addBackfillBlockData(blck.blck, blck.blob)
if res.isErr():
warn "Unable to store initial block", reason = res.error
return
overseer.statusMsg = Opt.none(string)
notice "Initial block being stored",
blck = shortLog(blck.blck), blobs_count = blobsCount
proc startBackfillTask(overseer: SyncOverseerRef): Future[void] {.
async: (raises: []).} =
# This procedure performs delayed start of backfilling process.
while overseer.consensusManager.dag.needsBackfill:
if not(overseer.forwardSync.inProgress):
# Only start the backfiller if it's needed _and_ head sync has completed -
# if we lose sync after having synced head, we could stop the backfilller,
# but this should be a fringe case - might as well keep the logic simple
# for now.
overseer.backwardSync.start()
return
try:
await sleepAsync(chronos.seconds(2))
except CancelledError:
return
proc mainLoop*(
overseer: SyncOverseerRef
): Future[void] {.async: (raises: []).} =
let
dag = overseer.consensusManager.dag
clist = overseer.clist
currentSlot = overseer.beaconClock.now().slotOrZero()
if overseer.isWithinWeakSubjectivityPeriod(currentSlot):
# Starting forward sync manager/monitor.
overseer.forwardSync.start()
# Starting backfill/backward sync manager.
if dag.needsBackfill():
asyncSpawn overseer.startBackfillTask()
return
else:
if dag.needsBackfill():
# Checkpoint/Trusted state we have is too old.
error "Trusted node sync started too long time ago"
quit 1
if not(isUntrustedBackfillEmpty(clist)):
let headSlot = clist.head.get().slot
if not(overseer.isWithinWeakSubjectivityPeriod(headSlot)):
# Light forward sync file is too old.
warn "Light client sync was started too long time ago",
current_slot = currentSlot, backfill_data_slot = headSlot
if overseer.config.longRangeSync == LongRangeSyncMode.Lenient:
# Starting forward sync manager/monitor only.
overseer.forwardSync.start()
return
if overseer.config.longRangeSync == LongRangeSyncMode.Light:
let dagHead = dag.finalizedHead
if dagHead.slot < dag.cfg.ALTAIR_FORK_EPOCH.start_slot:
fatal "Light forward syncing requires a post-Altair state",
head_slot = dagHead.slot,
altair_start_slot = dag.cfg.ALTAIR_FORK_EPOCH.start_slot
quit 1
if isUntrustedBackfillEmpty(clist):
overseer.untrustedInProgress = true
try:
await overseer.initUntrustedSync()
except CancelledError:
return
# We need to update pivot slot to enable timeleft calculation.
overseer.untrustedSync.updatePivot(overseer.clist.tail.get().slot)
# Note: We should not start forward sync manager!
overseer.untrustedSync.start()
# Waiting until untrusted backfilling will not be complete
try:
await overseer.untrustedSync.join()
except CancelledError:
return
notice "Start state rebuilding process"
# We spawn block processing loop to keep async world happy, otherwise
# it could be single cpu heavy procedure call.
let blockProcessingFut = overseer.blockProcessingLoop()
try:
await overseer.rebuildState()
except CancelledError:
await cancelAndWait(blockProcessingFut)
return
clist.clear().isOkOr:
warn "Unable to remove backfill data file",
path = clist.path.chainFilePath(), reason = error
quit 1
overseer.untrustedInProgress = false
# When we finished state rebuilding process - we could start forward
# SyncManager which could perform finish sync.
overseer.forwardSync.start()
proc start*(overseer: SyncOverseerRef) =
overseer.loopFuture = overseer.mainLoop()
proc stop*(overseer: SyncOverseerRef) {.async: (raises: []).} =
doAssert(not(isNil(overseer.loopFuture)),
"SyncOverseer was not started yet")
if not(overseer.loopFuture.finished()):
await cancelAndWait(overseer.loopFuture)

View File

@ -0,0 +1,85 @@
# beacon_chain
# Copyright (c) 2018-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import results, chronos,
".."/spec/[forks_light_client, signatures_batch],
".."/consensus_object_pools/[block_pools_types, blockchain_dag,
attestation_pool, blockchain_list,
consensus_manager],
".."/validators/validator_monitor,
".."/[beacon_clock, conf],
".."/networking/eth2_network,
"."/sync_manager
export results, chronos, block_pools_types, conf
type
BlockDataChunk* = ref object
resfut*: Future[Result[void, string]].Raising([CancelledError])
onStateUpdatedCb*: OnStateUpdated
blocks*: seq[BlockData]
SyncOverseer* = object
statusMsg*: Opt[string]
consensusManager*: ref ConsensusManager
validatorMonitor*: ref ValidatorMonitor
config*: BeaconNodeConf
getBeaconTimeFn*: GetBeaconTimeFn
clist*: ChainListRef
beaconClock*: BeaconClock
eventQueue*: AsyncEventQueue[ForkedLightClientHeader]
loopFuture*: Future[void].Raising([])
forwardSync*: SyncManager[Peer, PeerId]
backwardSync*: SyncManager[Peer, PeerId]
untrustedSync*: SyncManager[Peer, PeerId]
batchVerifier*: ref BatchVerifier
pool*: PeerPool[Peer, PeerId]
avgSpeedCounter*: int
avgSpeed*: float
blocksQueue*: AsyncQueue[BlockDataChunk]
untrustedInProgress*: bool
SyncOverseerRef* = ref SyncOverseer
proc new*(
t: typedesc[SyncOverseerRef],
cm: ref ConsensusManager,
vm: ref ValidatorMonitor,
configuration: BeaconNodeConf,
bt: GetBeaconTimeFn,
clist: ChainListRef,
clock: BeaconClock,
eq: AsyncEventQueue[ForkedLightClientHeader],
pool: PeerPool[Peer, PeerId],
batchVerifier: ref BatchVerifier,
forwardSync: SyncManager[Peer, PeerId],
backwardSync: SyncManager[Peer, PeerId],
untrustedSync: SyncManager[Peer, PeerId]
): SyncOverseerRef =
SyncOverseerRef(
consensusManager: cm,
validatorMonitor: vm,
config: configuration,
getBeaconTimeFn: bt,
clist: clist,
beaconClock: clock,
eventQueue: eq,
pool: pool,
batchVerifier: batchVerifier,
forwardSync: forwardSync,
backwardSync: backwardSync,
untrustedSync: untrustedSync,
untrustedInProgress: false,
blocksQueue: newAsyncQueue[BlockDataChunk]())
proc syncInProgress*(overseer: SyncOverseerRef): bool =
overseer.forwardSync.inProgress or
overseer.backwardSync.inProgress or
overseer.untrustedSync.inProgress or
overseer.untrustedInProgress

View File

@ -211,6 +211,7 @@ type
# expanded in the future.
gossip = "gossip"
api = "api"
sync = "sync"
template toGaugeValue(v: bool): int64 =
if v: 1 else: 0

View File

@ -295,7 +295,7 @@ proc cmdBench(conf: DbConf, cfg: RuntimeConfig) =
doAssert dag.updateState(
stateData[],
dag.atSlot(blockRefs[^1], blockRefs[^1].slot - 1).expect("not nil"),
false, cache)
false, cache, dag.updateFlags)
template processBlocks(blocks: auto) =
for b in blocks.mitems():
@ -612,7 +612,8 @@ proc cmdExportEra(conf: DbConf, cfg: RuntimeConfig) =
withTimer(timers[tState]):
var cache: StateCache
if not updateState(dag, tmpState[], eraBid, false, cache):
if not updateState(dag, tmpState[], eraBid, false, cache,
dag.updateFlags):
notice "Skipping era, state history not available", era, name
missingHistory = true
continue
@ -753,7 +754,7 @@ proc cmdValidatorPerf(conf: DbConf, cfg: RuntimeConfig) =
doAssert dag.updateState(
state[],
dag.atSlot(blockRefs[^1], blockRefs[^1].slot - 1).expect("block found"),
false, cache)
false, cache, dag.updateFlags)
proc processEpoch() =
let
@ -1051,10 +1052,12 @@ proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) =
let slot = if startEpochSlot > 0: startEpochSlot - 1 else: 0.Slot
if blockRefs.len > 0:
discard dag.updateState(
tmpState[], dag.atSlot(blockRefs[^1], slot).expect("block"), false, cache)
tmpState[], dag.atSlot(blockRefs[^1], slot).expect("block"), false, cache,
dag.updateFlags)
else:
discard dag.updateState(
tmpState[], dag.getBlockIdAtSlot(slot).expect("block"), false, cache)
tmpState[], dag.getBlockIdAtSlot(slot).expect("block"), false, cache,
dag.updateFlags)
let savedValidatorsCount = outDb.getDbValidatorsCount
var validatorsCount = getStateField(tmpState[], validators).len
@ -1213,4 +1216,4 @@ when isMainModule:
of DbCmd.validatorPerf:
cmdValidatorPerf(conf, cfg)
of DbCmd.validatorDb:
cmdValidatorDb(conf, cfg)
cmdValidatorDb(conf, cfg)

View File

@ -532,8 +532,8 @@ cli do(slots = SLOTS_PER_EPOCH * 7,
var cache = StateCache()
doAssert dag.updateState(
replayState[], dag.getBlockIdAtSlot(Slot(slots)).expect("block"),
false, cache)
false, cache, dag.updateFlags)
echo "Done!"
printTimers(dag.headState, attesters, true, timers)
printTimers(dag.headState, attesters, true, timers)

View File

@ -61,9 +61,10 @@ import # Unit test
./slashing_protection/test_fixtures,
./slashing_protection/test_slashing_protection_db,
./test_validator_client,
./test_beacon_validators
./test_beacon_validators,
./test_beacon_chain_file
when not defined(windows):
import ./test_keymanager_api
summarizeLongTests("AllTests")
summarizeLongTests("AllTests")

View File

@ -196,7 +196,8 @@ proc stepOnBlock(
state,
dag.getBlockIdAtSlot(time.slotOrZero).expect("block exists"),
save = false,
stateCache
stateCache,
dag.updateFlags
)
# 3. Add block to DAG

BIN
tests/fixtures/bfdata-test.bin vendored Normal file

Binary file not shown.

View File

@ -0,0 +1,342 @@
# beacon_chain
# Copyright (c) 2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
{.used.}
import
results, unittest2, stew/io2, nimcrypto/hash,
../beacon_chain/spec/forks,
../beacon_chain/beacon_chain_file
template onDiskChunkSize(data: int): int =
sizeof(ChainFileFooter) + sizeof(ChainFileHeader) + data
const
FixtureFile =
currentSourcePath().dirname() & DirSep & "fixtures" & DirSep &
"bfdata-test.bin"
Block0Root =
"4bbd1c7468626d6520e27a534ce9f3ee305160860367431528404697c60ce222".toDigest
Block0BlobsCount = 1
Block0BlockChunkSize = 45127
Block0Blob0ChunkSize = 7043
Block1Root =
"133a92629a94cb9664eea57a649ee2d4a16fa48cac93aa5ccc0e9df727b5d9bd".toDigest
Block1BlobsCount = 3
Block1BlockChunkSize = 36321
Block1Blob0ChunkSize = 7090
Block1Blob1ChunkSize = 7016
Block1Blob2ChunkSize = 131886
Block2Root =
"f92b453230c5b1914c5b8f868bdd9692d38b5231b8e365f2b8049b1d22cca396".toDigest
Block2BlobsCount = 3
Block2BlockChunkSize = 36248
Block2Blob0ChunkSize = 7090
Block2Blob1ChunkSize = 7090
Block2Blob2ChunkSize = 7056
Block0FullSize = onDiskChunkSize(Block0BlockChunkSize) +
onDiskChunkSize(Block0Blob0ChunkSize)
Block1FullSize = onDiskChunkSize(Block1BlockChunkSize) +
onDiskChunkSize(Block1Blob0ChunkSize) +
onDiskChunkSize(Block1Blob1ChunkSize) +
onDiskChunkSize(Block1Blob2ChunkSize)
Block2FullSize = onDiskChunkSize(Block2BlockChunkSize) +
onDiskChunkSize(Block2Blob0ChunkSize) +
onDiskChunkSize(Block2Blob1ChunkSize) +
onDiskChunkSize(Block2Blob2ChunkSize)
type
AutoRepairObject = object
data: ChainFileData
size: int64
suite "Beacon chain file test suite":
var fixtureData: seq[byte]
proc doAutoCheckRepairTest(id, size: int): Result[AutoRepairObject, string] =
let path =
block:
let res = getTempPath().valueOr:
return err(ioErrorMsg(error))
res & DirSep & "tmp_" & $id & "_" & $size & ".tmp"
discard removeFile(path)
io2.writeFile(path, fixtureData.toOpenArray(0, size - 1)).isOkOr:
return err(ioErrorMsg(error))
let
flags = {ChainFileFlag.Repair}
fres = ? ChainFileHandle.init(path, flags)
closeFile(fres.handle).isOkOr:
return err(ioErrorMsg(error))
let filesize = getFileSize(path).valueOr:
return err(ioErrorMsg(error))
removeFile(path).isOkOr:
return err(ioErrorMsg(error))
ok(AutoRepairObject(data: fres.data, size: filesize))
template check01(adata: untyped): untyped =
check:
adata.data.head.isSome()
adata.data.tail.isSome()
let
head = adata.data.head.get()
tail = adata.data.tail.get()
headRoot = withBlck(head.blck): forkyBlck.root
tailRoot = withBlck(tail.blck): forkyBlck.root
check:
head.blob.isSome()
tail.blob.isSome()
headRoot == Block0Root
tailRoot == Block1Root
len(head.blob.get()) == Block0BlobsCount
len(tail.blob.get()) == Block1BlobsCount
adata.size == Block0FullSize + Block1FullSize
template check0(adata: untyped): untyped =
check:
adata.data.head.isSome()
adata.data.tail.isSome()
let
head = adata.data.head.get()
tail = adata.data.tail.get()
headRoot = withBlck(head.blck): forkyBlck.root
tailRoot = withBlck(tail.blck): forkyBlck.root
check:
head.blob.isSome()
tail.blob.isSome()
headRoot == Block0Root
tailRoot == Block0Root
len(head.blob.get()) == Block0BlobsCount
len(tail.blob.get()) == Block0BlobsCount
adata.size == Block0FullSize
test "Fixture file validation":
check isFile(FixtureFile) == true
fixtureData = readAllBytes(FixtureFile).valueOr:
default(seq[byte])
check len(fixtureData) > 0
let hres = ChainFileHandle.init(FixtureFile, {})
check hres.isOk()
let handle = hres.get()
check:
handle.head.isSome()
handle.tail.isSome()
let
head = handle.head.get()
tail = handle.tail.get()
headRoot = withBlck(head.blck): forkyBlck.root
tailRoot = withBlck(tail.blck): forkyBlck.root
check:
head.blob.isSome()
tail.blob.isSome()
headRoot == Block0Root
tailRoot == Block2Root
len(head.blob.get()) == Block0BlobsCount
len(tail.blob.get()) == Block2BlobsCount
let cres = close(handle)
check cres.isOk()
test "Auto check/repair test (missing footer)":
let
hiLimit = len(fixtureData) - 1
loLimit = len(fixtureData) - sizeof(ChainFileFooter)
var counter = 1
for size in countdown(hiLimit, loLimit):
let tres = doAutoCheckRepairTest(counter, size)
check tres.isOk()
let adata = tres.get()
check01(adata)
inc(counter)
test "Auto check/repair test (missing last chunk)":
var size = len(fixtureData)
block:
size -= onDiskChunkSize(Block2Blob2ChunkSize)
let tres = doAutoCheckRepairTest(1, size)
check tres.isOk()
let adata = tres.get()
check01(adata)
block:
size -= onDiskChunkSize(Block2Blob1ChunkSize)
let tres = doAutoCheckRepairTest(2, size)
check tres.isOk()
let adata = tres.get()
check01(adata)
block:
size -= onDiskChunkSize(Block2Blob0ChunkSize)
let tres = doAutoCheckRepairTest(3, size)
check tres.isOk()
let adata = tres.get()
check01(adata)
block:
size -= onDiskChunkSize(Block2BlockChunkSize)
let tres = doAutoCheckRepairTest(4, size)
check tres.isOk()
let adata = tres.get()
check01(adata)
block:
size -= onDiskChunkSize(Block1Blob2ChunkSize)
let tres = doAutoCheckRepairTest(5, size)
check tres.isOk()
let adata = tres.get()
check0(adata)
block:
size -= onDiskChunkSize(Block1Blob1ChunkSize)
let tres = doAutoCheckRepairTest(6, size)
check tres.isOk()
let adata = tres.get()
check0(adata)
block:
size -= onDiskChunkSize(Block1Blob0ChunkSize)
let tres = doAutoCheckRepairTest(7, size)
check tres.isOk()
let adata = tres.get()
check0(adata)
block:
size -= onDiskChunkSize(Block1BlockChunkSize)
let tres = doAutoCheckRepairTest(8, size)
check tres.isOk()
let adata = tres.get()
check0(adata)
block:
size -= onDiskChunkSize(Block0Blob0ChunkSize)
let tres = doAutoCheckRepairTest(9, size)
check tres.isOk()
let adata = tres.get()
check:
adata.data.head.isNone()
adata.data.tail.isNone()
adata.size == 0
block:
size -= onDiskChunkSize(Block0BlockChunkSize)
let tres = doAutoCheckRepairTest(10, size)
check tres.isOk()
let adata = tres.get()
check:
adata.data.head.isNone()
adata.data.tail.isNone()
adata.size == 0
test "Auto check/repair test (only header)":
var size = len(fixtureData)
block:
size -= onDiskChunkSize(Block2Blob2ChunkSize)
let tres = doAutoCheckRepairTest(1, size + sizeof(ChainFileHeader))
check tres.isOk()
let adata = tres.get()
check01(adata)
block:
size -= onDiskChunkSize(Block2Blob1ChunkSize)
let tres = doAutoCheckRepairTest(2, size + sizeof(ChainFileHeader))
check tres.isOk()
let adata = tres.get()
check01(adata)
block:
size -= onDiskChunkSize(Block2Blob0ChunkSize)
let tres = doAutoCheckRepairTest(3, size + sizeof(ChainFileHeader))
check tres.isOk()
let adata = tres.get()
check01(adata)
block:
size -= onDiskChunkSize(Block2BlockChunkSize)
let tres = doAutoCheckRepairTest(4, size + sizeof(ChainFileHeader))
check tres.isOk()
let adata = tres.get()
check01(adata)
block:
size -= onDiskChunkSize(Block1Blob2ChunkSize)
let tres = doAutoCheckRepairTest(5, size + sizeof(ChainFileHeader))
check tres.isOk()
let adata = tres.get()
check0(adata)
block:
size -= onDiskChunkSize(Block1Blob1ChunkSize)
let tres = doAutoCheckRepairTest(6, size + sizeof(ChainFileHeader))
check tres.isOk()
let adata = tres.get()
check0(adata)
block:
size -= onDiskChunkSize(Block1Blob0ChunkSize)
let tres = doAutoCheckRepairTest(7, size + sizeof(ChainFileHeader))
check tres.isOk()
let adata = tres.get()
check0(adata)
block:
size -= onDiskChunkSize(Block1BlockChunkSize)
let tres = doAutoCheckRepairTest(8, size + sizeof(ChainFileHeader))
check tres.isOk()
let adata = tres.get()
check0(adata)
block:
size -= onDiskChunkSize(Block0Blob0ChunkSize)
let tres = doAutoCheckRepairTest(9, size + sizeof(ChainFileHeader))
check tres.isOk()
let adata = tres.get()
check:
adata.data.head.isNone()
adata.data.tail.isNone()
adata.size == 0
block:
size -= onDiskChunkSize(Block0BlockChunkSize)
let tres = doAutoCheckRepairTest(10, size + sizeof(ChainFileHeader))
check tres.isOk()
let adata = tres.get()
check:
adata.data.head.isNone()
adata.data.tail.isNone()
adata.size == 0
test "Auto check/repair test (missing data)":
let
limit1 = Block0FullSize + Block1FullSize + Block2FullSize
limit2 = Block0FullSize + Block1FullSize
limit3 = Block0FullSize
var
size = len(fixtureData)
counter = 1
while size > 0:
size = max(0, size - 4096)
let tres = doAutoCheckRepairTest(counter, size)
check tres.isOk()
let adata = tres.get()
if (size < limit1) and (size >= limit2):
check01(adata)
elif (size < limit2) and (size >= limit3):
check0(adata)
else:
check:
adata.data.head.isNone()
adata.data.tail.isNone()
adata.size == 0
inc(counter)

View File

@ -55,8 +55,9 @@ suite "Block processor" & preset():
b1 = addTestBlock(state[], cache).phase0Data
b2 = addTestBlock(state[], cache).phase0Data
getTimeFn = proc(): BeaconTime = b2.message.slot.start_beacon_time()
batchVerifier = BatchVerifier.new(rng, taskpool)
processor = BlockProcessor.new(
false, "", "", rng, taskpool, consensusManager,
false, "", "", batchVerifier, consensusManager,
validatorMonitor, blobQuarantine, getTimeFn)
processorFut = processor.runQueueProcessingLoop()

View File

@ -256,39 +256,41 @@ suite "Block pool processing" & preset():
# move to specific block
var cache = StateCache()
check:
dag.updateState(tmpState[], bs1, false, cache)
dag.updateState(tmpState[], bs1, false, cache, dag.updateFlags)
tmpState[].latest_block_root == b1Add[].root
getStateField(tmpState[], slot) == bs1.slot
# Skip slots
check:
dag.updateState(tmpState[], bs1_3, false, cache) # skip slots
dag.updateState(tmpState[], bs1_3, false, cache, dag.updateFlags) # skip slots
tmpState[].latest_block_root == b1Add[].root
getStateField(tmpState[], slot) == bs1_3.slot
# Move back slots, but not blocks
check:
dag.updateState(
tmpState[], dag.parent(bs1_3.bid).expect("block").atSlot(), false, cache)
tmpState[], dag.parent(bs1_3.bid).expect("block").atSlot(), false,
cache, dag.updateFlags)
tmpState[].latest_block_root == b1Add[].parent.root
getStateField(tmpState[], slot) == b1Add[].parent.slot
# Move to different block and slot
check:
dag.updateState(tmpState[], bs2_3, false, cache)
dag.updateState(tmpState[], bs2_3, false, cache, dag.updateFlags)
tmpState[].latest_block_root == b2Add[].root
getStateField(tmpState[], slot) == bs2_3.slot
# Move back slot and block
check:
dag.updateState(tmpState[], bs1, false, cache)
dag.updateState(tmpState[], bs1, false, cache, dag.updateFlags)
tmpState[].latest_block_root == b1Add[].root
getStateField(tmpState[], slot) == bs1.slot
# Move back to genesis
check:
dag.updateState(
tmpState[], dag.parent(bs1.bid).expect("block").atSlot(), false, cache)
tmpState[], dag.parent(bs1.bid).expect("block").atSlot(), false, cache,
dag.updateFlags)
tmpState[].latest_block_root == b1Add[].parent.root
getStateField(tmpState[], slot) == b1Add[].parent.slot
@ -500,7 +502,7 @@ suite "chain DAG finalization tests" & preset():
check: updateState(
dag, tmpStateData[],
dag.head.atSlot(dag.head.slot).toBlockSlotId().expect("not nil"),
false, cache)
false, cache, dag.updateFlags)
check:
dag.head.slot.epoch in cache.shuffled_active_validator_indices
@ -623,7 +625,8 @@ suite "chain DAG finalization tests" & preset():
while cur != nil: # Go all the way to dag.finalizedHead
assign(tmpStateData[], dag.headState)
check:
dag.updateState(tmpStateData[], cur.bid.atSlot(), false, cache)
dag.updateState(tmpStateData[], cur.bid.atSlot(), false, cache,
dag.updateFlags)
dag.getForkedBlock(cur.bid).get().phase0Data.message.state_root ==
getStateRoot(tmpStateData[])
getStateRoot(tmpStateData[]) == hash_tree_root(

View File

@ -1059,9 +1059,12 @@ suite "SyncManager test suite":
req1.contains(Slot(15)) == false
test "[SyncQueue] checkResponse() test":
let chain = createChain(Slot(10), Slot(20))
let r1 = SyncRequest[SomeTPeer](slot: Slot(11), count: 1'u64)
let r21 = SyncRequest[SomeTPeer](slot: Slot(11), count: 2'u64)
let
chain = createChain(Slot(10), Slot(20))
r1 = SyncRequest[SomeTPeer](slot: Slot(11), count: 1'u64)
r21 = SyncRequest[SomeTPeer](slot: Slot(11), count: 2'u64)
r3 = SyncRequest[SomeTPeer](slot: Slot(11), count: 3'u64)
let slots = mapIt(chain, it[].slot)
check:
@ -1083,12 +1086,39 @@ suite "SyncManager test suite":
checkResponse(r21, @[slots[2], slots[3]]) == false
checkResponse(r21, @[slots[3]]) == false
test "[SyncManager] groupBlobs() test":
var blocks = createChain(Slot(10), Slot(15))
var blobs = createBlobs(blocks, @[Slot(11), Slot(11), Slot(12), Slot(14)])
checkResponse(r21, @[slots[1], slots[1]]) == false
checkResponse(r21, @[slots[2], slots[2]]) == false
let req = SyncRequest[SomeTPeer](slot: Slot(10))
let groupedRes = groupBlobs(req, blocks, blobs)
checkResponse(r3, @[slots[1]]) == true
checkResponse(r3, @[slots[2]]) == true
checkResponse(r3, @[slots[3]]) == true
checkResponse(r3, @[slots[1], slots[2]]) == true
checkResponse(r3, @[slots[1], slots[3]]) == true
checkResponse(r3, @[slots[2], slots[3]]) == true
checkResponse(r3, @[slots[1], slots[3], slots[2]]) == false
checkResponse(r3, @[slots[2], slots[3], slots[1]]) == false
checkResponse(r3, @[slots[3], slots[2], slots[1]]) == false
checkResponse(r3, @[slots[3], slots[1]]) == false
checkResponse(r3, @[slots[3], slots[2]]) == false
checkResponse(r3, @[slots[2], slots[1]]) == false
checkResponse(r3, @[slots[1], slots[1], slots[1]]) == false
checkResponse(r3, @[slots[1], slots[2], slots[2]]) == false
checkResponse(r3, @[slots[1], slots[3], slots[3]]) == false
checkResponse(r3, @[slots[2], slots[3], slots[3]]) == false
checkResponse(r3, @[slots[1], slots[1], slots[1]]) == false
checkResponse(r3, @[slots[2], slots[2], slots[2]]) == false
checkResponse(r3, @[slots[3], slots[3], slots[3]]) == false
checkResponse(r3, @[slots[1], slots[1]]) == false
checkResponse(r3, @[slots[2], slots[2]]) == false
checkResponse(r3, @[slots[3], slots[3]]) == false
test "[SyncManager] groupBlobs() test":
var
blocks = createChain(Slot(10), Slot(15))
blobs = createBlobs(blocks, @[Slot(11), Slot(11), Slot(12), Slot(14)])
let groupedRes = groupBlobs(blocks, blobs)
check:
groupedRes.isOk()
@ -1118,7 +1148,7 @@ suite "SyncManager test suite":
let block17 = newClone ForkedSignedBeaconBlock(kind: ConsensusFork.Deneb)
block17[].denebData.message.slot = Slot(17)
blocks.add(block17)
let groupedRes2 = groupBlobs(req, blocks, blobs)
let groupedRes2 = groupBlobs(blocks, blobs)
check:
groupedRes2.isOk()
@ -1130,7 +1160,7 @@ suite "SyncManager test suite":
let blob18 = new (ref BlobSidecar)
blob18[].signed_block_header.message.slot = Slot(18)
blobs.add(blob18)
let groupedRes3 = groupBlobs(req, blocks, blobs)
let groupedRes3 = groupBlobs(blocks, blobs)
check:
groupedRes3.isErr()