diff --git a/nimbus/beacon/api_handler/api_forkchoice.nim b/nimbus/beacon/api_handler/api_forkchoice.nim index 5687244d2..0acc62e04 100644 --- a/nimbus/beacon/api_handler/api_forkchoice.nim +++ b/nimbus/beacon/api_handler/api_forkchoice.nim @@ -118,9 +118,9 @@ proc forkchoiceUpdated*(ben: BeaconEngineRef, com.syncReqNewHead(header) # Pass on finalised header - if com.haveSyncFinalisedBlockHash(): + if com.haveSyncFinalisedBlockHash() or true: let finalizedBlockHash = ethHash update.finalizedBlockHash - if finalizedBlockHash != default(common.Hash256): + if finalizedBlockHash != zeroHash32: com.syncFinalisedBlockHash(finalizedBlockHash) return simpleFCU(PayloadExecutionStatus.syncing) diff --git a/nimbus/sync/flare.nim b/nimbus/sync/flare.nim index 204f538f2..f8e1aec0b 100644 --- a/nimbus/sync/flare.nim +++ b/nimbus/sync/flare.nim @@ -13,6 +13,7 @@ import pkg/[chronicles, chronos, eth/p2p, results], pkg/stew/[interval_set, sorted_set], + ../core/chain, ./flare/[worker, worker_desc], "."/[sync_desc, sync_sched, protocol] @@ -22,82 +23,30 @@ logScope: type FlareSyncRef* = RunnerSyncRef[FlareCtxData,FlareBuddyData] -const - extraTraceMessages = false # or true - ## Enable additional logging noise - -# ------------------------------------------------------------------------------ -# Private logging helpers -# ------------------------------------------------------------------------------ - -template traceMsg(f, info: static[string]; args: varargs[untyped]) = - trace "Flare scheduler " & f & "() " & info, args - -template traceMsgCtx(f, info: static[string]; c: FlareCtxRef) = - when extraTraceMessages: - block: - let - poolMode {.inject.} = c.poolMode - daemon {.inject.} = c.daemon - f.traceMsg info, poolMode, daemon - -template traceMsgBuddy(f, info: static[string]; b: FlareBuddyRef) = - when extraTraceMessages: - block: - let - peer {.inject.} = b.peer - runState {.inject.} = b.ctrl.state - multiOk {.inject.} = b.ctrl.multiOk - poolMode {.inject.} = b.ctx.poolMode - daemon {.inject.} = b.ctx.daemon - f.traceMsg info, peer, runState, multiOk, poolMode, daemon - - -template tracerFrameCtx(f: static[string]; c: FlareCtxRef; code: untyped) = - f.traceMsgCtx "begin", c - code - f.traceMsgCtx "end", c - -template tracerFrameBuddy(f: static[string]; b: FlareBuddyRef; code: untyped) = - f.traceMsgBuddy "begin", b - code - f.traceMsgBuddy "end", b - # ------------------------------------------------------------------------------ # Virtual methods/interface, `mixin` functions # ------------------------------------------------------------------------------ proc runSetup(ctx: FlareCtxRef): bool = - tracerFrameCtx("runSetup", ctx): - result = worker.setup(ctx) + worker.setup(ctx) proc runRelease(ctx: FlareCtxRef) = - tracerFrameCtx("runRelease", ctx): - worker.release(ctx) + worker.release(ctx) proc runDaemon(ctx: FlareCtxRef) {.async.} = - tracerFrameCtx("runDaemon", ctx): - await worker.runDaemon(ctx) + await worker.runDaemon(ctx) proc runStart(buddy: FlareBuddyRef): bool = - tracerFrameBuddy("runStart", buddy): - result = worker.start(buddy) + worker.start(buddy) proc runStop(buddy: FlareBuddyRef) = - tracerFrameBuddy("runStop", buddy): - worker.stop(buddy) + worker.stop(buddy) proc runPool(buddy: FlareBuddyRef; last: bool; laps: int): bool = - tracerFrameBuddy("runPool", buddy): - result = worker.runPool(buddy, last, laps) + worker.runPool(buddy, last, laps) -proc runSingle(buddy: FlareBuddyRef) {.async.} = - tracerFrameBuddy("runSingle", buddy): - await worker.runSingle(buddy) - -proc runMulti(buddy: FlareBuddyRef) {.async.} = - tracerFrameBuddy("runMulti", buddy): - await worker.runMulti(buddy) +proc runPeer(buddy: FlareBuddyRef) {.async.} = + await worker.runPeer(buddy) # ------------------------------------------------------------------------------ # Public functions @@ -111,8 +60,10 @@ proc init*( chunkSize: int; ): T = var desc = T() - desc.initSync(ethNode, chain, maxPeers) + desc.initSync(ethNode, maxPeers) desc.ctx.pool.nBodiesBatch = chunkSize + # Initalise for `persistBlocks()` + desc.ctx.pool.chain = chain.com.newChain() desc proc start*(ctx: FlareSyncRef) = diff --git a/nimbus/sync/flare/TODO.md b/nimbus/sync/flare/TODO.md index 4c8326445..71a237e04 100644 --- a/nimbus/sync/flare/TODO.md +++ b/nimbus/sync/flare/TODO.md @@ -1 +1,4 @@ * Update/resolve code fragments which are tagged FIXME +* Update docu: + + flare_beacon_block_number -> flare_finalized ?? + + in general, check agaist glossary https://notes.status.im/nimbus-merge-first-el?both=#Glossary diff --git a/nimbus/sync/flare/worker.nim b/nimbus/sync/flare/worker.nim index 19b14a055..8ea0820d6 100644 --- a/nimbus/sync/flare/worker.nim +++ b/nimbus/sync/flare/worker.nim @@ -16,7 +16,7 @@ import pkg/eth/[common, p2p], pkg/stew/[interval_set, sorted_set], ../../common, - ./worker/[blocks_staged, db, headers_staged, headers_unproc, + ./worker/[blocks_staged, db, headers_staged, headers_unproc, helpers, start_stop, update], ./worker_desc @@ -88,14 +88,13 @@ proc start*(buddy: FlareBuddyRef): bool = debug info & " failed", peer=buddy.peer return false - buddy.ctrl.multiOk = true debug info, peer=buddy.peer true proc stop*(buddy: FlareBuddyRef) = ## Clean up this peer debug "RUNSTOP", peer=buddy.peer, nInvocations=buddy.only.nMultiLoop, - lastIdleGap=buddy.only.multiRunIdle.toStr(2) + lastIdleGap=buddy.only.multiRunIdle.toStr buddy.stopBuddy() # ------------------------------------------------------------------------------ @@ -139,30 +138,13 @@ proc runDaemon*(ctx: FlareCtxRef) {.async.} = ctx.updateMetrics() -proc runSingle*(buddy: FlareBuddyRef) {.async.} = - ## This peer worker is invoked if the peer-local flag `buddy.ctrl.multiOk` - ## is set `false` which is the default mode. This flag is updated by the - ## worker when deemed appropriate. - ## * For all workers, there can be only one `runSingle()` function active - ## simultaneously for all worker peers. - ## * There will be no `runMulti()` function active for the same worker peer - ## simultaneously - ## * There will be no `runPool()` iterator active simultaneously. - ## - ## Note that this function runs in `async` mode. - ## - const info = "RUNSINGLE" - raiseAssert info & " should not be used: peer=" & $buddy.peer - - proc runPool*(buddy: FlareBuddyRef; last: bool; laps: int): bool = ## Once started, the function `runPool()` is called for all worker peers in - ## sequence as the body of an iteration as long as the function returns - ## `false`. There will be no other worker peer functions activated - ## simultaneously. + ## sequence as long as this function returns `false`. There will be no other + ## `runPeer()` functions activated while `runPool()` is active. ## ## This procedure is started if the global flag `buddy.ctx.poolMode` is set - ## `true` (default is `false`.) It will be automatically reset before the + ## `true` (default is `false`.) The flag will be automatically reset before ## the loop starts. Re-setting it again results in repeating the loop. The ## argument `laps` (starting with `0`) indicated the currend lap of the ## repeated loops. @@ -177,12 +159,11 @@ proc runPool*(buddy: FlareBuddyRef; last: bool; laps: int): bool = true # stop -proc runMulti*(buddy: FlareBuddyRef) {.async.} = - ## This peer worker is invoked if the `buddy.ctrl.multiOk` flag is set - ## `true` which is typically done after finishing `runSingle()`. This - ## instance can be simultaneously active for all peer workers. +proc runPeer*(buddy: FlareBuddyRef) {.async.} = + ## This peer worker method is repeatedly invoked (exactly one per peer) while + ## the `buddy.ctrl.poolMode` flag is set `false`. ## - const info = "RUNMULTI" + const info = "RUNPEER" let peer = buddy.peer if 0 < buddy.only.nMultiLoop: # statistics/debugging @@ -190,7 +171,7 @@ proc runMulti*(buddy: FlareBuddyRef) {.async.} = buddy.only.nMultiLoop.inc # statistics/debugging trace info, peer, nInvocations=buddy.only.nMultiLoop, - lastIdleGap=buddy.only.multiRunIdle.toStr(2) + lastIdleGap=buddy.only.multiRunIdle.toStr # Update beacon header when needed. For the beacon header, a hash will be # auto-magically made available via RPC. The corresponding header is then diff --git a/nimbus/sync/flare/worker/blocks_staged.nim b/nimbus/sync/flare/worker/blocks_staged.nim index cbde62bf0..72f921405 100644 --- a/nimbus/sync/flare/worker/blocks_staged.nim +++ b/nimbus/sync/flare/worker/blocks_staged.nim @@ -12,9 +12,8 @@ import pkg/[chronicles, chronos], - pkg/eth/[common, p2p], + pkg/eth/common, pkg/stew/[interval_set, sorted_set], - ../../../common, ../../../core/chain, ../worker_desc, ./blocks_staged/bodies, @@ -44,7 +43,7 @@ proc fetchAndCheck( # Preset/append headers to be completed with bodies. Also collect block hashes # for fetching missing blocks. blk.blocks.setLen(offset + ivReq.len) - var blockHash = newSeq[Hash256](ivReq.len) + var blockHash = newSeq[Hash32](ivReq.len) for n in 1u ..< ivReq.len: let header = ctx.dbPeekHeader(ivReq.minPt + n).expect "stashed header" blockHash[n - 1] = header.parentHash @@ -52,7 +51,7 @@ proc fetchAndCheck( blk.blocks[offset].header = ctx.dbPeekHeader(ivReq.minPt).expect "stashed header" blockHash[ivReq.len - 1] = - rlp.encode(blk.blocks[offset + ivReq.len - 1].header).keccakHash + rlp.encode(blk.blocks[offset + ivReq.len - 1].header).keccak256 # Fetch bodies let bodies = block: @@ -70,7 +69,7 @@ proc fetchAndCheck( block loop: for n in 0 ..< nBodies: block checkTxLenOk: - if blk.blocks[offset + n].header.txRoot != EMPTY_ROOT_HASH: + if blk.blocks[offset + n].header.transactionsRoot != emptyRoot: if 0 < bodies[n].transactions.len: break checkTxLenOk else: diff --git a/nimbus/sync/flare/worker/blocks_staged/bodies.nim b/nimbus/sync/flare/worker/blocks_staged/bodies.nim index 6b5f72679..4fc2ac41d 100644 --- a/nimbus/sync/flare/worker/blocks_staged/bodies.nim +++ b/nimbus/sync/flare/worker/blocks_staged/bodies.nim @@ -13,21 +13,15 @@ import std/options, pkg/[chronicles, chronos, results], - pkg/eth/p2p, + pkg/eth/[common, p2p], pkg/stew/interval_set, - "../../.."/[protocol, types], - ../../worker_desc + ../../../protocol, + ../../worker_desc, + ../helpers logScope: topics = "flare bodies" -# ------------------------------------------------------------------------------ -# Private functions -# ------------------------------------------------------------------------------ - -func toStr(a: chronos.Duration): string = - a.toStr(2) - # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ @@ -39,7 +33,7 @@ proc fetchRegisterError*(buddy: FlareBuddyRef) = proc bodiesFetch*( buddy: FlareBuddyRef; - blockHashes: seq[Hash256]; + blockHashes: seq[Hash32]; info: static[string]; ): Future[Result[seq[BlockBody],void]] {.async.} = diff --git a/nimbus/sync/flare/worker/blocks_unproc.nim b/nimbus/sync/flare/worker/blocks_unproc.nim index 44ac1b6f2..cb99ec9be 100644 --- a/nimbus/sync/flare/worker/blocks_unproc.nim +++ b/nimbus/sync/flare/worker/blocks_unproc.nim @@ -11,7 +11,7 @@ {.push raises:[].} import - pkg/eth/p2p, + pkg/eth/[common, p2p], pkg/results, pkg/stew/interval_set, ../worker_desc diff --git a/nimbus/sync/flare/worker/db.nim b/nimbus/sync/flare/worker/db.nim index a104e6c5c..50e5da8de 100644 --- a/nimbus/sync/flare/worker/db.nim +++ b/nimbus/sync/flare/worker/db.nim @@ -29,14 +29,14 @@ const type SavedDbStateSpecs = tuple number: BlockNumber - hash: Hash256 - parent: Hash256 + hash: Hash32 + parent: Hash32 # ------------------------------------------------------------------------------ # Private debugging & logging helpers # ------------------------------------------------------------------------------ -formatIt(Hash256): +formatIt(Hash32): it.data.toHex # ------------------------------------------------------------------------------ @@ -58,7 +58,7 @@ proc fetchSavedState(ctx: FlareCtxRef): Opt[SavedDbStateSpecs] = val.number = db.getSavedStateBlockNumber() if db.getBlockHash(val.number, val.hash): - var header: BlockHeader + var header: Header if db.getBlockHeader(val.hash, header): val.parent = header.parentHash return ok(val) @@ -127,7 +127,7 @@ proc dbLoadLinkedHChainsLayout*(ctx: FlareCtxRef) = proc dbStashHeaders*( ctx: FlareCtxRef; first: BlockNumber; - revBlobs: openArray[Blob]; + revBlobs: openArray[seq[byte]]; ) = ## Temporarily store header chain to persistent db (oblivious of the chain ## layout.) The headers should not be stashed if they are imepreted and @@ -149,19 +149,19 @@ proc dbStashHeaders*( kvt.put(key.toOpenArray, data).isOkOr: raiseAssert info & ": put() failed: " & $$error -proc dbPeekHeader*(ctx: FlareCtxRef; num: BlockNumber): Opt[BlockHeader] = +proc dbPeekHeader*(ctx: FlareCtxRef; num: BlockNumber): Opt[Header] = ## Retrieve some stashed header. let key = flareHeaderKey(num) rc = ctx.db.ctx.getKvt().get(key.toOpenArray) if rc.isOk: try: - return ok(rlp.decode(rc.value, BlockHeader)) + return ok(rlp.decode(rc.value, Header)) except RlpError: discard err() -proc dbPeekParentHash*(ctx: FlareCtxRef; num: BlockNumber): Opt[Hash256] = +proc dbPeekParentHash*(ctx: FlareCtxRef; num: BlockNumber): Opt[Hash32] = ## Retrieve some stashed parent hash. ok (? ctx.dbPeekHeader num).parentHash diff --git a/nimbus/sync/flare/worker/headers_staged.nim b/nimbus/sync/flare/worker/headers_staged.nim index 5073ad7d4..78962330d 100644 --- a/nimbus/sync/flare/worker/headers_staged.nim +++ b/nimbus/sync/flare/worker/headers_staged.nim @@ -60,14 +60,14 @@ proc headerStagedUpdateBeacon*( ) {.async.} = ## Fetch beacon header if there is an update available let ctx = buddy.ctx - if ctx.lhc.beacon.finalised != ZERO_HASH256: + if ctx.lhc.beacon.finalised != zeroHash32: const iv = BnRange.new(1u,1u) # dummy interval let finHash = ctx.lhc.beacon.finalised let rc = await buddy.headersFetchReversed(iv, finHash, info) if rc.isOk and ctx.lhc.beacon.header.number < rc.value[0].number: ctx.lhc.beacon.header = rc.value[0] ctx.lhc.beacon.changed = true - ctx.lhc.beacon.finalised = ZERO_HASH256 + ctx.lhc.beacon.finalised = zeroHash32 proc headersStagedCollect*( diff --git a/nimbus/sync/flare/worker/headers_staged/headers.nim b/nimbus/sync/flare/worker/headers_staged/headers.nim index dcca8f304..a73ca7f11 100644 --- a/nimbus/sync/flare/worker/headers_staged/headers.nim +++ b/nimbus/sync/flare/worker/headers_staged/headers.nim @@ -13,10 +13,12 @@ import std/options, pkg/[chronicles, chronos, results], - pkg/eth/p2p, + pkg/eth/[common, p2p], pkg/stew/interval_set, - "../../.."/[protocol, types], - ../../worker_desc + ../../../protocol, + ../../../protocol/eth/eth_types, + ../../worker_desc, + ../helpers logScope: topics = "flare headers" @@ -25,12 +27,6 @@ logScope: # Private functions # ------------------------------------------------------------------------------ -# For some reason neither `formatIt` nor `$` works as expected with logging -# the `elapsed` variable, below. This might be due to the fact that the -# `headersFetchReversed()` function is a generic one, i.e. a template. -func toStr(a: chronos.Duration): string = - a.toStr(2) - proc registerError(buddy: FlareBuddyRef) = buddy.only.nHdrRespErrors.inc if fetchHeadersReqThresholdCount < buddy.only.nHdrRespErrors: @@ -43,17 +39,17 @@ proc registerError(buddy: FlareBuddyRef) = proc headersFetchReversed*( buddy: FlareBuddyRef; ivReq: BnRange; - topHash: Hash256; + topHash: Hash32; info: static[string]; - ): Future[Result[seq[BlockHeader],void]] + ): Future[Result[seq[Header],void]] {.async.} = ## Get a list of headers in reverse order. let peer = buddy.peer - useHash = (topHash != EMPTY_ROOT_HASH) + useHash = (topHash != emptyRoot) req = block: if useHash: - BlocksRequest( + EthBlocksRequest( maxResults: ivReq.len.uint, skip: 0, reverse: true, @@ -61,7 +57,7 @@ proc headersFetchReversed*( isHash: true, hash: topHash)) else: - BlocksRequest( + EthBlocksRequest( maxResults: ivReq.len.uint, skip: 0, reverse: true, @@ -99,7 +95,7 @@ proc headersFetchReversed*( nRespErrors=buddy.only.nHdrRespErrors return err() - let h: seq[BlockHeader] = resp.get.headers + let h: seq[Header] = resp.get.headers if h.len == 0 or ivReq.len < h.len.uint64: buddy.registerError() trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash, diff --git a/nimbus/sync/flare/worker/headers_staged/linked_hchain.nim b/nimbus/sync/flare/worker/headers_staged/linked_hchain.nim index 79d734563..906f3e7d5 100644 --- a/nimbus/sync/flare/worker/headers_staged/linked_hchain.nim +++ b/nimbus/sync/flare/worker/headers_staged/linked_hchain.nim @@ -21,7 +21,7 @@ import # ------------------------------------------------------------------------------ proc extendLinkedHChain*( - rev: seq[BlockHeader]; + rev: seq[Header]; buddy: FlareBuddyRef; topNumber: BlockNumber; lhc: ref LinkedHChain; # update in place @@ -41,13 +41,13 @@ proc extendLinkedHChain*( # Set up header with largest block number let blob0 = rlp.encode(rev[0]) - hash0 = blob0.keccakHash + hash0 = blob0.keccak256 lhc.revHdrs[offset] = blob0 if offset == 0: lhc.hash = hash0 # Verify top block hash (if any) - if lhc.parentHash != EMPTY_ROOT_HASH and hash0 != lhc.parentHash: + if lhc.parentHash != emptyRoot and hash0 != lhc.parentHash: lhc.revHdrs.setLen(offset) return false @@ -58,7 +58,7 @@ proc extendLinkedHChain*( return false lhc.revHdrs[offset + n] = rlp.encode(rev[n]) - let hashN = lhc.revHdrs[offset + n].keccakHash + let hashN = lhc.revHdrs[offset + n].keccak256 if rev[n-1].parentHash != hashN: lhc.revHdrs.setLen(offset) return false diff --git a/nimbus/sync/flare/worker/headers_unproc.nim b/nimbus/sync/flare/worker/headers_unproc.nim index 6722db624..c5c1fc3dd 100644 --- a/nimbus/sync/flare/worker/headers_unproc.nim +++ b/nimbus/sync/flare/worker/headers_unproc.nim @@ -11,7 +11,7 @@ {.push raises:[].} import - pkg/eth/p2p, + pkg/eth/[common, p2p], pkg/results, pkg/stew/interval_set, ../worker_desc diff --git a/nimbus/sync/flare/worker/helpers.nim b/nimbus/sync/flare/worker/helpers.nim new file mode 100644 index 000000000..bae7eee0f --- /dev/null +++ b/nimbus/sync/flare/worker/helpers.nim @@ -0,0 +1,23 @@ +# Nimbus +# Copyright (c) 2021-2024 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at +# https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at +# https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +{.push raises:[].} + +import + pkg/chronos, + pkg/eth/common + +proc bnStr*(w: BlockNumber): string = + "#" & $w + +func toStr*(a: chronos.Duration): string = + a.toString 2 + +# End diff --git a/nimbus/sync/flare/worker/start_stop.nim b/nimbus/sync/flare/worker/start_stop.nim index a916f864c..e20eb17be 100644 --- a/nimbus/sync/flare/worker/start_stop.nim +++ b/nimbus/sync/flare/worker/start_stop.nim @@ -12,6 +12,7 @@ import pkg/eth/[common, p2p], + ../../../core/chain, ../../protocol, ../worker_desc, "."/[blocks_staged, blocks_unproc, db, headers_staged, headers_unproc] @@ -51,9 +52,9 @@ when enableTicker: proc updateBeaconHeaderCB(ctx: FlareCtxRef): SyncFinalisedBlockHashCB = ## Update beacon header. This function is intended as a call back function ## for the RPC module. - return proc(h: Hash256) {.gcsafe, raises: [].} = - # Rpc checks empty header against `Hash256()` rather than `EMPTY_ROOT_HASH` - if ctx.lhc.beacon.finalised == ZERO_HASH256: + return proc(h: Hash32) {.gcsafe, raises: [].} = + # Rpc checks empty header against a zero hash rather than `emptyRoot` + if ctx.lhc.beacon.finalised == zeroHash32: ctx.lhc.beacon.finalised = h # ------------------------------------------------------------------------------ @@ -85,10 +86,6 @@ proc setupDatabase*(ctx: FlareCtxRef) = ctx.headersUnprocInit() ctx.blocksUnprocInit() - # Initalise for `persistBlocks()`. Note that the `ctx.chain` is of - # type `ForkedChainRef` while `ctx.pool.chain` is a `ChainRef` - ctx.pool.chain = ctx.chain.com.newChain() - # Load initial state from database if there is any ctx.dbLoadLinkedHChainsLayout() @@ -110,11 +107,11 @@ proc setupDatabase*(ctx: FlareCtxRef) = proc setupRpcMagic*(ctx: FlareCtxRef) = ## Helper for `setup()`: Enable external pivot update via RPC - ctx.chain.com.syncFinalisedBlockHash = ctx.updateBeaconHeaderCB + ctx.pool.chain.com.syncFinalisedBlockHash = ctx.updateBeaconHeaderCB proc destroyRpcMagic*(ctx: FlareCtxRef) = ## Helper for `release()` - ctx.chain.com.syncFinalisedBlockHash = SyncFinalisedBlockHashCB(nil) + ctx.pool.chain.com.syncFinalisedBlockHash = SyncFinalisedBlockHashCB(nil) # --------- diff --git a/nimbus/sync/flare/worker/start_stop/ticker.nim b/nimbus/sync/flare/worker/start_stop/ticker.nim index 7a307ae06..a3c3943fd 100644 --- a/nimbus/sync/flare/worker/start_stop/ticker.nim +++ b/nimbus/sync/flare/worker/start_stop/ticker.nim @@ -15,7 +15,7 @@ import std/strutils, pkg/[chronos, chronicles, eth/common, stint], ../../../../utils/prettify, - ../../../types + ../helpers logScope: topics = "ticker" @@ -72,22 +72,22 @@ proc flareTicker(t: TickerRef) {.gcsafe.} = if data != t.lastStats or tickerLogSuppressMax < (now - t.visited): let - T = data.stateTop.toStr - B = data.base.toStr - L = data.least.toStr - F = data.final.toStr - Z = data.beacon.toStr + T = data.stateTop.bnStr + B = data.base.bnStr + L = data.least.bnStr + F = data.final.bnStr + Z = data.beacon.bnStr hS = if data.nHdrStaged == 0: "n/a" - else: data.hdrStagedTop.toStr & "(" & $data.nHdrStaged & ")" + else: data.hdrStagedTop.bnStr & "(" & $data.nHdrStaged & ")" hU = if data.nHdrUnprocFragm == 0: "n/a" - else: data.hdrUnprocTop.toStr & "(" & + else: data.hdrUnprocTop.bnStr & "(" & data.nHdrUnprocessed.toSI & "," & $data.nHdrUnprocFragm & ")" bS = if data.nBlkStaged == 0: "n/a" - else: data.blkStagedBottom.toStr & "(" & $data.nBlkStaged & ")" + else: data.blkStagedBottom.bnStr & "(" & $data.nBlkStaged & ")" bU = if data.nBlkUnprocFragm == 0: "n/a" - else: data.blkUnprocTop.toStr & "(" & + else: data.blkUnprocTop.bnStr & "(" & data.nBlkUnprocessed.toSI & "," & $data.nBlkUnprocFragm & ")" reorg = data.reorg diff --git a/nimbus/sync/flare/worker/update.nim b/nimbus/sync/flare/worker/update.nim index 69cf8a2fe..918b8a225 100644 --- a/nimbus/sync/flare/worker/update.nim +++ b/nimbus/sync/flare/worker/update.nim @@ -16,7 +16,7 @@ import pkg/stew/sorted_set, ../worker_desc, ./update/metrics, - "."/[blocks_unproc, db, headers_staged, headers_unproc] + "."/[blocks_unproc, db, headers_staged, headers_unproc, helpers] logScope: topics = "flare update" @@ -72,7 +72,7 @@ proc updateBeaconChange(ctx: FlareCtxRef): bool = least: z, leastParent: ctx.lhc.beacon.header.parentHash, final: z, - finalHash: rlpHeader.keccakHash) + finalHash: rlpHeader.keccak256) # Save this header on the database so it needs not be fetched again from # somewhere else. @@ -115,7 +115,7 @@ proc mergeAdjacentChains(ctx: FlareCtxRef): bool = base: ctx.layout.final, # `B` baseHash: ctx.layout.finalHash, least: ctx.layout.final, # `L` - leastParent: ctx.dbPeekParentHash(ctx.layout.final).expect "Hash256", + leastParent: ctx.dbPeekParentHash(ctx.layout.final).expect "Hash32", final: ctx.layout.final, # `F` finalHash: ctx.layout.finalHash) diff --git a/nimbus/sync/flare/worker_desc.nim b/nimbus/sync/flare/worker_desc.nim index 389fd8f76..57188beb7 100644 --- a/nimbus/sync/flare/worker_desc.nim +++ b/nimbus/sync/flare/worker_desc.nim @@ -12,7 +12,9 @@ import pkg/chronos, + pkg/eth/common, pkg/stew/[interval_set, sorted_set], + ../../core/chain, ../sync_desc, ./worker_config @@ -38,9 +40,9 @@ type ## block number has the least index `0`. This makes it easier to grow the ## sequence with parent headers, i.e. decreasing block numbers. ## - hash*: Hash256 ## Hash of `headers[0]` - revHdrs*: seq[Blob] ## Encoded linked header chain - parentHash*: Hash256 ## Parent hash of `headers[^1]` + hash*: Hash32 ## Hash of `headers[0]` + revHdrs*: seq[seq[byte]] ## Encoded linked header chain + parentHash*: Hash32 ## Parent hash of `headers[^1]` StagedBlocksQueue* = SortedSet[BlockNumber,BlocksForImport] ## Blocks sorted by least block number. @@ -62,24 +64,24 @@ type ## base*: BlockNumber ## `B`, maximal block number of linked chain starting at Genesis `G` - baseHash*: Hash256 + baseHash*: Hash32 ## Hash of `B` least*: BlockNumber ## `L`, minimal block number of linked chain ending at `F` with `B <= L` - leastParent*: Hash256 + leastParent*: Hash32 ## Parent hash of `L` (similar to `parentHash` in `HeaderChainItemRef`) final*: BlockNumber ## `F`, some finalised block - finalHash*: Hash256 + finalHash*: Hash32 ## Hash of `F` (similar to `hash` in `HeaderChainItemRef`) BeaconHeader* = object ## Beacon state to be implicitely updated by RPC method changed*: bool ## Set a marker if something has changed - header*: BlockHeader ## Beacon chain, finalised header - finalised*: Hash256 ## From RPC, ghash of finalised header + header*: Header ## Beacon chain, finalised header + finalised*: Hash32 ## From RPC, ghash of finalised header LinkedHChainsSync* = object ## Sync state for linked header chains @@ -155,7 +157,7 @@ func layout*(ctx: FlareCtxRef): var LinkedHChainsLayout = func db*(ctx: FlareCtxRef): CoreDbRef = ## Getter - ctx.chain.com.db + ctx.pool.chain.db # ------------------------------------------------------------------------------ # Public logging/debugging helpers @@ -164,35 +166,6 @@ func db*(ctx: FlareCtxRef): CoreDbRef = proc `$`*(w: BnRange): string = if w.len == 1: $w.minPt else: $w.minPt & ".." & $w.maxPt -proc bnStr*(w: BlockNumber): string = - "#" & $w - -# Source: `nimbus_import.shortLog()` -func toStr*(a: chronos.Duration, parts: int): string = - ## Returns string representation of Duration ``a`` as nanoseconds value. - if a == nanoseconds(0): - return "0" - var - res = "" - v = a.nanoseconds() - parts = parts - - template f(n: string, T: Duration) = - if v >= T.nanoseconds(): - res.add($(uint64(v div T.nanoseconds()))) - res.add(n) - v = v mod T.nanoseconds() - dec parts - if v == 0 or parts <= 0: - return res - - f("s", Second) - f("ms", Millisecond) - f("us", Microsecond) - f("ns", Nanosecond) - - res - # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/handlers/eth.nim b/nimbus/sync/handlers/eth.nim index 3bf9491c6..016d9c8d3 100644 --- a/nimbus/sync/handlers/eth.nim +++ b/nimbus/sync/handlers/eth.nim @@ -16,7 +16,7 @@ import stew/endians2, eth/p2p, eth/p2p/peer_pool, - ".."/[types, protocol], + ../protocol, ../protocol/eth/eth_types, ../../core/[chain, tx_pool] @@ -24,7 +24,7 @@ logScope: topics = "eth-wire" type - HashToTime = TableRef[Hash256, Time] + HashToTime = TableRef[Hash32, Time] EthWireRef* = ref object of EthWireBase db: CoreDbRef @@ -32,7 +32,7 @@ type txPool: TxPoolRef peerPool: PeerPool knownByPeer: Table[Peer, HashToTime] - pending: HashSet[Hash256] + pending: HashSet[Hash32] lastCleanup: Time const @@ -57,16 +57,16 @@ proc notImplemented(name: string) {.used.} = debug "Wire handler method not implemented", meth = name proc successorHeader(db: CoreDbRef, - h: BlockHeader, - output: var BlockHeader, + h: Header, + output: var Header, skip = 0'u): bool = let offset = 1 + skip.BlockNumber if h.number <= (not 0.BlockNumber) - offset: result = db.getBlockHeader(h.number + offset, output) proc ancestorHeader(db: CoreDbRef, - h: BlockHeader, - output: var BlockHeader, + h: Header, + output: var Header, skip = 0'u): bool = let offset = 1 + skip.BlockNumber if h.number >= offset: @@ -74,7 +74,7 @@ proc ancestorHeader(db: CoreDbRef, proc blockHeader(db: CoreDbRef, b: BlockHashOrNumber, - output: var BlockHeader): bool = + output: var Header): bool = if b.isHash: db.getBlockHeader(b.hash, output) else: @@ -96,7 +96,7 @@ when txpool_enabled: proc cleanupKnownByPeer(ctx: EthWireRef) = let now = getTime() - var tmp = HashSet[Hash256]() + var tmp = HashSet[Hash32]() for _, map in ctx.knownByPeer: for hash, time in map: if time - now >= POOLED_STORAGE_TIME_LIMIT: @@ -115,12 +115,12 @@ when txpool_enabled: ctx.lastCleanup = now - proc addToKnownByPeer(ctx: EthWireRef, txHashes: openArray[Hash256], peer: Peer) = + proc addToKnownByPeer(ctx: EthWireRef, txHashes: openArray[Hash32], peer: Peer) = var map: HashToTime ctx.knownByPeer.withValue(peer, val) do: map = val[] do: - map = newTable[Hash256, Time]() + map = newTable[Hash32, Time]() ctx.knownByPeer[peer] = map for txHash in txHashes: @@ -128,17 +128,17 @@ when txpool_enabled: map[txHash] = getTime() proc addToKnownByPeer(ctx: EthWireRef, - txHashes: openArray[Hash256], + txHashes: openArray[Hash32], peer: Peer, - newHashes: var seq[Hash256]) = + newHashes: var seq[Hash32]) = var map: HashToTime ctx.knownByPeer.withValue(peer, val) do: map = val[] do: - map = newTable[Hash256, Time]() + map = newTable[Hash32, Time]() ctx.knownByPeer[peer] = map - newHashes = newSeqOfCap[Hash256](txHashes.len) + newHashes = newSeqOfCap[Hash32](txHashes.len) for txHash in txHashes: if txHash notin map: map[txHash] = getTime() @@ -150,12 +150,12 @@ when txpool_enabled: when txpool_enabled: proc sendNewTxHashes(ctx: EthWireRef, - txHashes: seq[Hash256], + txHashes: seq[Hash32], peers: seq[Peer]): Future[void] {.async.} = try: for peer in peers: # Add to known tx hashes and get hashes still to send to peer - var hashesToSend: seq[Hash256] + var hashesToSend: seq[Hash32] ctx.addToKnownByPeer(txHashes, peer, hashesToSend) # Broadcast to peer if at least 1 new tx hash to announce @@ -176,7 +176,7 @@ when txpool_enabled: debug "Exception in sendNewTxHashes", exc = e.name, err = e.msg proc sendTransactions(ctx: EthWireRef, - txHashes: seq[Hash256], + txHashes: seq[Hash32], txs: seq[Transaction], peers: seq[Peer]): Future[void] {.async.} = try: @@ -191,7 +191,7 @@ when txpool_enabled: except CatchableError as e: debug "Exception in sendTransactions", exc = e.name, err = e.msg - proc fetchTransactions(ctx: EthWireRef, reqHashes: seq[Hash256], peer: Peer): Future[void] {.async.} = + proc fetchTransactions(ctx: EthWireRef, reqHashes: seq[Hash32], peer: Peer): Future[void] {.async.} = debug "fetchTx: requesting txs", number = reqHashes.len @@ -220,7 +220,7 @@ when txpool_enabled: debug "Exception in fetchTransactions", exc = e.name, err = e.msg return - var newTxHashes = newSeqOfCap[Hash256](reqHashes.len) + var newTxHashes = newSeqOfCap[Hash32](reqHashes.len) for txHash in reqHashes: if ctx.txPool.inPoolAndOk(txHash): newTxHashes.add txHash @@ -237,7 +237,7 @@ when txpool_enabled: proc onPeerConnected(ctx: EthWireRef, peer: Peer) = when txpool_enabled: - var txHashes = newSeqOfCap[Hash256](ctx.txPool.numTxs) + var txHashes = newSeqOfCap[Hash32](ctx.txPool.numTxs) for txHash, item in okPairs(ctx.txPool): txHashes.add txHash @@ -315,12 +315,12 @@ method getStatus*(ctx: EthWireRef): Result[EthState, string] return err(exc.msg) method getReceipts*(ctx: EthWireRef, - hashes: openArray[Hash256]): + hashes: openArray[Hash32]): Result[seq[seq[Receipt]], string] {.gcsafe.} = try: let db = ctx.db - var header: BlockHeader + var header: Header var list: seq[seq[Receipt]] for blockHash in hashes: if db.getBlockHeader(blockHash, header): @@ -333,7 +333,7 @@ method getReceipts*(ctx: EthWireRef, return err(exc.msg) method getPooledTxs*(ctx: EthWireRef, - hashes: openArray[Hash256]): + hashes: openArray[Hash32]): Result[seq[PooledTransaction], string] {.gcsafe.} = @@ -352,7 +352,7 @@ method getPooledTxs*(ctx: EthWireRef, ok(list) method getBlockBodies*(ctx: EthWireRef, - hashes: openArray[Hash256]): + hashes: openArray[Hash32]): Result[seq[BlockBody], string] {.gcsafe.} = let db = ctx.db @@ -367,13 +367,13 @@ method getBlockBodies*(ctx: EthWireRef, return ok(list) method getBlockHeaders*(ctx: EthWireRef, - req: BlocksRequest): - Result[seq[BlockHeader], string] + req: EthBlocksRequest): + Result[seq[Header], string] {.gcsafe.} = try: let db = ctx.db - var foundBlock: BlockHeader - var list = newSeqOfCap[BlockHeader](req.maxResults) + var foundBlock: Header + var list = newSeqOfCap[Header](req.maxResults) if db.blockHeader(req.startBlock, foundBlock): list.add foundBlock @@ -407,7 +407,7 @@ method handleAnnouncedTxs*(ctx: EthWireRef, if ctx.lastCleanup - getTime() > POOLED_STORAGE_TIME_LIMIT: ctx.cleanupKnownByPeer() - var txHashes = newSeqOfCap[Hash256](txs.len) + var txHashes = newSeqOfCap[Hash32](txs.len) for tx in txs: txHashes.add rlpHash(tx) @@ -418,7 +418,7 @@ method handleAnnouncedTxs*(ctx: EthWireRef, continue ctx.txPool.add PooledTransaction(tx: tx) - var newTxHashes = newSeqOfCap[Hash256](txHashes.len) + var newTxHashes = newSeqOfCap[Hash32](txHashes.len) var validTxs = newSeqOfCap[Transaction](txHashes.len) for i, txHash in txHashes: # Nodes must not automatically broadcast blob transactions to @@ -447,9 +447,9 @@ method handleAnnouncedTxs*(ctx: EthWireRef, method handleAnnouncedTxsHashes*( ctx: EthWireRef; peer: Peer; - txTypes: Blob; + txTypes: seq[byte]; txSizes: openArray[int]; - txHashes: openArray[Hash256]; + txHashes: openArray[Hash32]; ): Result[void, string] = when extraTraceMessages: trace "Wire handler ignoring txs hashes", nHashes=txHashes.len diff --git a/nimbus/sync/protocol/eth/eth_types.nim b/nimbus/sync/protocol/eth/eth_types.nim index d0b8a5ea1..338ed786d 100644 --- a/nimbus/sync/protocol/eth/eth_types.nim +++ b/nimbus/sync/protocol/eth/eth_types.nim @@ -13,15 +13,14 @@ import chronicles, results, - eth/[common, p2p, p2p/private/p2p_types], - ../../types + eth/[common, p2p, p2p/private/p2p_types] logScope: topics = "eth-wire" type NewBlockHashesAnnounce* = object - hash*: Hash256 + hash*: Hash32 number*: BlockNumber ChainForkId* = object @@ -32,15 +31,20 @@ type EthState* = object totalDifficulty*: DifficultyInt - genesisHash*: Hash256 - bestBlockHash*: Hash256 + genesisHash*: Hash32 + bestBlockHash*: Hash32 forkId*: ChainForkId EthPeerState* = ref object of RootRef initialized*: bool - bestBlockHash*: Hash256 + bestBlockHash*: Hash32 bestDifficulty*: DifficultyInt + EthBlocksRequest* = object + startBlock*: BlockHashOrNumber + maxResults*, skip*: uint + reverse*: bool + const maxStateFetch* = 384 maxBodiesFetch* = 128 @@ -55,26 +59,26 @@ method getStatus*(ctx: EthWireBase): Result[EthState, string] notImplemented("getStatus") method getReceipts*(ctx: EthWireBase, - hashes: openArray[Hash256]): + hashes: openArray[Hash32]): Result[seq[seq[Receipt]], string] {.base, gcsafe.} = notImplemented("getReceipts") method getPooledTxs*(ctx: EthWireBase, - hashes: openArray[Hash256]): + hashes: openArray[Hash32]): Result[seq[PooledTransaction], string] {.base, gcsafe.} = notImplemented("getPooledTxs") method getBlockBodies*(ctx: EthWireBase, - hashes: openArray[Hash256]): + hashes: openArray[Hash32]): Result[seq[BlockBody], string] {.base, gcsafe.} = notImplemented("getBlockBodies") method getBlockHeaders*(ctx: EthWireBase, - req: BlocksRequest): - Result[seq[BlockHeader], string] + req: EthBlocksRequest): + Result[seq[Header], string] {.base, gcsafe.} = notImplemented("getBlockHeaders") @@ -96,9 +100,9 @@ method handleAnnouncedTxs*(ctx: EthWireBase, method handleAnnouncedTxsHashes*( ctx: EthWireBase; peer: Peer; - txTypes: Blob; + txTypes: seq[byte]; txSizes: openArray[int]; - txHashes: openArray[Hash256]; + txHashes: openArray[Hash32]; ): Result[void, string] {.base, gcsafe.} = notImplemented("handleAnnouncedTxsHashes/eth68") diff --git a/nimbus/sync/protocol/eth68.nim b/nimbus/sync/protocol/eth68.nim index a2fec0f1e..9cce9402b 100644 --- a/nimbus/sync/protocol/eth68.nim +++ b/nimbus/sync/protocol/eth68.nim @@ -24,7 +24,6 @@ import stew/byteutils, ./trace_config, ./eth/eth_types, - ../types, ../../utils/utils export @@ -152,8 +151,8 @@ p2pProtocol eth68(version = ethVersion, ethVersionArg: uint, networkId: NetworkId, totalDifficulty: DifficultyInt, - bestHash: Hash256, - genesisHash: Hash256, + bestHash: Hash32, + genesisHash: Hash32, forkId: ChainForkId) = trace trEthRecvReceived & "Status (0x00)", peer, networkId, totalDifficulty, bestHash=short(bestHash), genesisHash=short(genesisHash), @@ -181,7 +180,7 @@ p2pProtocol eth68(version = ethVersion, requestResponse: # User message 0x03: GetBlockHeaders. - proc getBlockHeaders(peer: Peer, request: BlocksRequest) = + proc getBlockHeaders(peer: Peer, request: EthBlocksRequest) = when trEthTracePacketsOk: trace trEthRecvReceived & "GetBlockHeaders (0x03)", peer, count=request.maxResults @@ -205,11 +204,11 @@ p2pProtocol eth68(version = ethVersion, await response.send(headers.get) # User message 0x04: BlockHeaders. - proc blockHeaders(p: Peer, headers: openArray[BlockHeader]) + proc blockHeaders(p: Peer, headers: openArray[Header]) requestResponse: # User message 0x05: GetBlockBodies. - proc getBlockBodies(peer: Peer, hashes: openArray[Hash256]) = + proc getBlockBodies(peer: Peer, hashes: openArray[Hash32]) = trace trEthRecvReceived & "GetBlockBodies (0x05)", peer, hashes=hashes.len if hashes.len > maxBodiesFetch: @@ -250,9 +249,9 @@ p2pProtocol eth68(version = ethVersion, # User message 0x08: NewPooledTransactionHashes. proc newPooledTransactionHashes( peer: Peer, - txTypes: Blob, + txTypes: seq[byte], txSizes: openArray[int], - txHashes: openArray[Hash256] + txHashes: openArray[Hash32] ) = when trEthTraceGossipOk: trace trEthRecvReceived & "NewPooledTransactionHashes (0x08)", peer, @@ -265,7 +264,7 @@ p2pProtocol eth68(version = ethVersion, requestResponse: # User message 0x09: GetPooledTransactions. - proc getPooledTransactions(peer: Peer, txHashes: openArray[Hash256]) = + proc getPooledTransactions(peer: Peer, txHashes: openArray[Hash32]) = trace trEthRecvReceived & "GetPooledTransactions (0x09)", peer, hashes=txHashes.len @@ -292,7 +291,7 @@ p2pProtocol eth68(version = ethVersion, requestResponse: # User message 0x0f: GetReceipts. - proc getReceipts(peer: Peer, hashes: openArray[Hash256]) = + proc getReceipts(peer: Peer, hashes: openArray[Hash32]) = trace trEthRecvReceived & "GetReceipts (0x0f)", peer, hashes=hashes.len diff --git a/nimbus/sync/protocol/snap/snap_types.nim b/nimbus/sync/protocol/snap/snap_types.nim index d4023aa48..480d787c6 100644 --- a/nimbus/sync/protocol/snap/snap_types.nim +++ b/nimbus/sync/protocol/snap/snap_types.nim @@ -22,10 +22,10 @@ logScope: type SnapAccount* = object - accHash*: Hash256 + accHash*: Hash32 accBody* {.rlpCustomSerialization.}: Account - SnapProof* = distinct Blob + SnapProof* = distinct seq[byte] ## Rlp coded node data, to be handled different from a generic `Blob` SnapProofNodes* = object @@ -33,12 +33,12 @@ type nodes*: seq[SnapProof] SnapStorage* = object - slotHash*: Hash256 - slotData*: Blob + slotHash*: Hash32 + slotData*: seq[byte] SnapTriePaths* = object - accPath*: Blob - slotPaths*: seq[Blob] + accPath*: seq[byte] + slotPaths*: seq[seq[byte]] SnapWireBase* = ref object of RootRef @@ -48,16 +48,16 @@ type # Public `SnapProof` type helpers # ------------------------------------------------------------------------------ -proc to*(data: Blob; T: type SnapProof): T = data.T -proc to*(node: SnapProof; T: type Blob): T = node.T +proc to*(data: seq[byte]; T: type SnapProof): T = data.T +proc to*(node: SnapProof; T: type seq[byte]): T = node.T proc hash*(sp: SnapProof): Hash = ## Mixin for Table/HashSet - sp.to(Blob).hash + sp.to(seq[byte]).hash proc `==`*(a,b: SnapProof): bool = ## Mixin for Table/HashSet - a.to(Blob) == b.to(Blob) + a.to(seq[byte]) == b.to(seq[byte]) # ------------------------------------------------------------------------------ # Public serialisation helpers @@ -148,7 +148,7 @@ proc snapAppend*(writer: var RlpWriter; spn: SnapProofNodes) = ## the serialised destination data type. writer.startList spn.nodes.len for w in spn.nodes: - writer.appendRawBytes w.to(Blob) + writer.appendRawBytes w.to(seq[byte]) # --------------------- @@ -163,10 +163,10 @@ proc snapRead*( var first = true for w in rlp.items: if first: - result.accPath = rlp.read(Blob) + result.accPath = rlp.read(seq[byte]) first = false else: - result.slotPaths.add rlp.read(Blob) + result.slotPaths.add rlp.read(seq[byte]) proc snapAppend*(writer: var RlpWriter; stn: SnapTriePaths) = ## RLP encoding @@ -184,7 +184,7 @@ proc notImplemented(name: string) = method getAccountRange*( ctx: SnapWireBase; - root: Hash256; + root: Hash32; origin: openArray[byte]; limit: openArray[byte]; replySizeMax: uint64; @@ -194,8 +194,8 @@ method getAccountRange*( method getStorageRanges*( ctx: SnapWireBase; - root: Hash256; - accounts: openArray[Hash256]; + root: Hash32; + accounts: openArray[Hash32]; origin: openArray[byte]; limit: openArray[byte]; replySizeMax: uint64; @@ -205,18 +205,18 @@ method getStorageRanges*( method getByteCodes*( ctx: SnapWireBase; - nodes: openArray[Hash256]; + nodes: openArray[Hash32]; replySizeMax: uint64; - ): Result[seq[Blob], string] + ): Result[seq[seq[byte]], string] {.base, gcsafe.} = notImplemented("getByteCodes") method getTrieNodes*( ctx: SnapWireBase; - root: Hash256; + root: Hash32; pathGroups: openArray[SnapTriePaths]; replySizeMax: uint64; - ): Result[seq[Blob], string] + ): Result[seq[seq[byte]], string] {.base, gcsafe.} = notImplemented("getTrieNodes") diff --git a/nimbus/sync/protocol/snap1.nim b/nimbus/sync/protocol/snap1.nim index e9f54a584..4ca15c638 100644 --- a/nimbus/sync/protocol/snap1.nim +++ b/nimbus/sync/protocol/snap1.nim @@ -88,7 +88,7 @@ p2pProtocol snap1(version = snapVersion, # User message 0x00: GetAccountRange. proc getAccountRange( peer: Peer; - root: Hash256; + root: Hash32; origin: openArray[byte]; limit: openArray[byte]; replySizeMax: uint64; @@ -127,8 +127,8 @@ p2pProtocol snap1(version = snapVersion, # User message 0x02: GetStorageRanges. proc getStorageRanges( peer: Peer; - root: Hash256; - accounts: openArray[Hash256]; + root: Hash32; + accounts: openArray[Hash32]; origin: openArray[byte]; limit: openArray[byte]; replySizeMax: uint64; @@ -169,7 +169,7 @@ p2pProtocol snap1(version = snapVersion, # User message 0x04: GetByteCodes. proc getByteCodes( peer: Peer; - nodes: openArray[Hash256]; + nodes: openArray[Hash32]; replySizeMax: uint64; ) = trace trSnapRecvReceived & "GetByteCodes (0x04)", peer, @@ -195,14 +195,14 @@ p2pProtocol snap1(version = snapVersion, # User message 0x05: ByteCodes. proc byteCodes( peer: Peer; - codes: openArray[Blob]) + codes: openArray[seq[byte]]) requestResponse: # User message 0x06: GetTrieNodes. proc getTrieNodes( peer: Peer; - root: Hash256; + root: Hash32; pathGroups: openArray[SnapTriePaths]; replySizeMax: uint64; ) = @@ -229,6 +229,6 @@ p2pProtocol snap1(version = snapVersion, # User message 0x07: TrieNodes. proc trieNodes( peer: Peer; - nodes: openArray[Blob]) + nodes: openArray[seq[byte]]) # End diff --git a/nimbus/sync/sync_desc.nim b/nimbus/sync/sync_desc.nim index acba614e0..8cae2da1b 100644 --- a/nimbus/sync/sync_desc.nim +++ b/nimbus/sync/sync_desc.nim @@ -16,12 +16,7 @@ {.push raises: [].} import - eth/p2p, - ../core/chain, - ./handlers/eth - -export - chain + eth/p2p type BuddyRunState* = enum @@ -33,7 +28,6 @@ type BuddyCtrlRef* = ref object ## Control and state settings runState: BuddyRunState ## Access with getters - multiOk*: bool ## Triggers `runSingle()` mode unless `true` BuddyRef*[S,W] = ref object ## Worker peer state descriptor. @@ -45,8 +39,6 @@ type CtxRef*[S] = ref object ## Shared state among all syncing peer workers (aka buddies.) buddiesMax*: int ## Max number of buddies - ethWireCtx*: EthWireRef ## Eth protocol wire context (if available) - chain*: ForkedChainRef ## Block chain database (no need for `Peer`) poolMode*: bool ## Activate `runPool()` workers if set `true` daemon*: bool ## Enable global background job pool*: S ## Shared context for all worker peers diff --git a/nimbus/sync/sync_sched.nim b/nimbus/sync/sync_sched.nim index 6fed24f5d..a290b212d 100644 --- a/nimbus/sync/sync_sched.nim +++ b/nimbus/sync/sync_sched.nim @@ -27,8 +27,8 @@ ## Global background job that will be re-started as long as the variable ## `ctx.daemon` is set `true`. If that job was stopped due to re-setting ## `ctx.daemon` to `false`, it will be restarted next after it was reset -## as `true` not before there is some activity on the `runPool()`, -## `runSingle()`, or `runMulti()` functions. +## as `true` not before there is some activity on the `runPool()`, or +## `runPeer()` functions. ## ## ## *runStart(buddy: BuddyRef[S,W]): bool* @@ -40,12 +40,11 @@ ## ## *runPool(buddy: BuddyRef[S,W], last: bool; laps: int): bool* ## Once started, the function `runPool()` is called for all worker peers in -## sequence as the body of an iteration as long as the function returns -## `false`. There will be no other worker peer functions activated -## simultaneously. +## sequence as long as the function returns `false`. There will be no other +## `runPeer()` functions (see below) activated while `runPool()` is active. ## ## This procedure is started if the global flag `buddy.ctx.poolMode` is set -## `true` (default is `false`.) It will be automatically reset before the +## `true` (default is `false`.) The flag will be automatically reset before ## the loop starts. Re-setting it again results in repeating the loop. The ## argument `laps` (starting with `0`) indicated the currend lap of the ## repeated loops. To avoid continous looping, the number of `laps` is @@ -54,31 +53,14 @@ ## The argument `last` is set `true` if the last entry of the current loop ## has been reached. ## -## Note: -## + This function does *not* runs in `async` mode. -## + The flag `buddy.ctx.poolMode` has priority over the flag -## `buddy.ctrl.multiOk` which controls `runSingle()` and `runMulti()`. +## Note that this function does *not* run in `async` mode. ## ## -## *runSingle(buddy: BuddyRef[S,W]) {.async.}* -## This worker peer method is invoked if the peer-local flag -## `buddy.ctrl.multiOk` is set `false` which is the default mode. This flag -## is updated by the worker peer when deemed appropriate. -## + For all worker peerss, there can be only one `runSingle()` function -## active simultaneously. -## + There will be no `runMulti()` function active for the very same worker -## peer that runs the `runSingle()` function. -## + There will be no `runPool()` iterator active. +## *runPeer(buddy: BuddyRef[S,W]) {.async.}* +## This peer worker method is repeatedly invoked (exactly one per peer) while +## the `buddy.ctrl.poolMode` flag is set `false`. ## -## Note that this function runs in `async` mode. -## -## -## *runMulti(buddy: BuddyRef[S,W]) {.async.}* -## This worker peer method is invoked if the `buddy.ctrl.multiOk` flag is -## set `true` which is typically done after finishing `runSingle()`. This -## instance can be simultaneously active for all worker peers. -## -## Note that this function runs in `async` mode. +## These peer worker methods run concurrently in `async` mode. ## ## ## Additional import files needed when using this template: @@ -93,13 +75,9 @@ import std/hashes, chronos, - eth/[keys, p2p, p2p/peer_pool], + eth/[p2p, p2p/peer_pool], stew/keyed_queue, - "."/[handlers, sync_desc] - -static: - # type `EthWireRef` is needed in `initSync()` - type silenceUnusedhandlerComplaint {.used.} = EthWireRef # dummy directive + ./sync_desc type ActiveBuddies[S,W] = ##\ @@ -112,7 +90,6 @@ type pool: PeerPool ## For starting the system buddies: ActiveBuddies[S,W] ## LRU cache with worker descriptors daemonRunning: bool ## Run global background job - singleRunLock: bool ## Some single mode runner is activated monitorLock: bool ## Monitor mode is activated activeMulti: int ## Number of activated runners in multi-mode shutdown: bool ## Internal shut down flag @@ -192,7 +169,7 @@ proc daemonLoop[S,W](dsc: RunnerSyncRef[S,W]) {.async.} = proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} = - mixin runMulti, runSingle, runPool, runStop + mixin runPeer, runPool, runStop let dsc = buddy.dsc ctx = dsc.ctx @@ -213,7 +190,7 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} = # Grab `monitorLock` (was `false` as checked above) and wait until # clear to run as the only activated instance. dsc.monitorLock = true - while 0 < dsc.activeMulti or dsc.singleRunLock: + while 0 < dsc.activeMulti: await sleepAsync execLoopPollingTime if worker.ctrl.stopped: dsc.monitorLock = false @@ -244,24 +221,11 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} = # end. So zombies will end up leftish. discard dsc.buddies.lruFetch peer.key - # Multi mode - if worker.ctrl.multiOk: - if not dsc.singleRunLock: - dsc.activeMulti.inc - # Continue doing something, work a bit - await worker.runMulti() - dsc.activeMulti.dec - - elif dsc.singleRunLock: - # Some other process is running single mode - discard # suspend some time at the end of loop body - - else: - # Start single instance mode by grabbing `singleRunLock` (was - # `false` as checked above). - dsc.singleRunLock = true - await worker.runSingle() - dsc.singleRunLock = false + # Peer mode + dsc.activeMulti.inc + # Continue doing something, work a bit + await worker.runPeer() + dsc.activeMulti.dec # Dispatch daemon sevice if needed if not dsc.daemonRunning and dsc.ctx.daemon: @@ -383,17 +347,15 @@ proc onPeerDisconnected[S,W](dsc: RunnerSyncRef[S,W], peer: Peer) = proc initSync*[S,W]( dsc: RunnerSyncRef[S,W]; node: EthereumNode; - chain: ForkedChainRef, slots: int; ) = ## Constructor + # Leave one extra slot so that it can holds a *zombie* even if all slots # are full. The effect is that a re-connect on the latest zombie will be # rejected as long as its worker descriptor is registered. - dsc.ctx = CtxRef[S]( - ethWireCtx: cast[EthWireRef](node.protocolState protocol.eth), - buddiesMax: max(1, slots + 1), - chain: chain) + dsc.ctx = CtxRef[S](buddiesMax: max(1, slots + 1)) + dsc.pool = node.peerPool dsc.buddies.init(dsc.ctx.buddiesMax) diff --git a/nimbus/sync/types.nim b/nimbus/sync/types.nim deleted file mode 100644 index 6b3d84055..000000000 --- a/nimbus/sync/types.nim +++ /dev/null @@ -1,129 +0,0 @@ -# Nimbus -# Copyright (c) 2018-2024 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or -# http://www.apache.org/licenses/LICENSE-2.0) -# * MIT license ([LICENSE-MIT](LICENSE-MIT) or -# http://opensource.org/licenses/MIT) -# at your option. This file may not be copied, modified, or -# distributed except according to those terms. - -{.push raises: [].} - -import - std/[math, hashes], - eth/common/eth_types_rlp, - results, - stew/byteutils - -type - BlockHash* = distinct Hash256 - ## Hash of a block, goes with `BlockNumber`. - ## - ## Note that the `ethXX` protocol driver always uses the - ## underlying `Hash256` type which needs to be converted to `BlockHash`. - - BlocksRequest* = object - startBlock*: BlockHashOrNumber - maxResults*, skip*: uint - reverse*: bool - -# ------------------------------------------------------------------------------ -# Public constructors -# ------------------------------------------------------------------------------ - -proc new*(T: type BlockHash): T = - default(Hash256).T - -# ------------------------------------------------------------------------------ -# Public (probably non-trivial) type conversions -# ------------------------------------------------------------------------------ - -proc to*(num: SomeInteger; T: type float): T = - ## Convert to float. Result an d argument are not strictly equivalent. Though - ## sort of `(num.to(float) + 0.5).int == num` might hold in many cases. - num.T - -proc to*(longNum: UInt256; T: type float): T = - ## Convert to float (see also comment at `num.to(float)`, above.) - let mantissaLen = 256 - longNum.leadingZeros - if mantissaLen <= 64: - longNum.truncate(uint64).T - else: - let exp = mantissaLen - 64 - (longNum shr exp).truncate(uint64).T * (2.0 ^ exp) - -proc to*(w: BlockHash; T: type Hash256): T = - ## Syntactic sugar - w.Hash256 - -proc to*(w: seq[BlockHash]; T: type seq[Hash256]): T = - ## Ditto - cast[seq[Hash256]](w) - -proc to*(bh: BlockHash; T: type BlockHashOrNumber): T = - ## Convert argument blocj hash `bh` to `BlockHashOrNumber` - T(isHash: true, hash: bh.Hash256) - -# ------------------------------------------------------------------------------ -# Public functions -# ------------------------------------------------------------------------------ - -proc read*(rlp: var Rlp, T: type BlockHash): T - {.gcsafe, raises: [RlpError]} = - ## RLP mixin reader - rlp.read(Hash256).T - -proc append*(writer: var RlpWriter; h: BlockHash) = - ## RLP mixin - append(writer, h.Hash256) - -proc `==`*(a: BlockHash; b: Hash256): bool = - a.Hash256 == b - -proc `==`*[T: BlockHash](a,b: T): bool = - a.Hash256 == b.Hash256 - -proc hash*(root: BlockHash): Hash = - ## Mixin for `Table` or `KeyedQueue` - root.Hash256.data.hash - -# ------------------------------------------------------------------------------ -# Public printing and pretty printing -# ------------------------------------------------------------------------------ - -func toHex*(hash: Hash256): string = - ## Shortcut for `byteutils.toHex(hash.data)` - hash.data.toHex - -func `$`*(h: BlockHash): string = - $h.Hash256.data.toHex - -func `$`*(blob: Blob): string = - blob.toHex - -func `$`*(hashOrNum: BlockHashOrNumber): string = - # It's always obvious which one from the visible length of the string. - if hashOrNum.isHash: $hashOrNum.hash - else: $hashOrNum.number - -func toStr*(n: BlockNumber): string = - ## Pretty print block number, explicitely format with a leading hash `#` - if n == high(BlockNumber): "high" else:"#" & $n - -func toStr*(n: Opt[BlockNumber]): string = - if n.isNone: "n/a" else: n.get.toStr - -# ------------------------------------------------------------------------------ -# Public debug printing helpers -# ------------------------------------------------------------------------------ - -func traceStep*(request: BlocksRequest): string = - var str = if request.reverse: "-" else: "+" - if request.skip < high(typeof(request.skip)): - return str & $(request.skip + 1) - return static($(high(typeof(request.skip)).u256 + 1)) - -# ------------------------------------------------------------------------------ -# End -# ------------------------------------------------------------------------------ diff --git a/tests/test_aristo/undump_desc.nim b/tests/test_aristo/undump_desc.nim index 1b883bb14..1f049d504 100644 --- a/tests/test_aristo/undump_desc.nim +++ b/tests/test_aristo/undump_desc.nim @@ -13,7 +13,7 @@ import eth/common, stint, - ../../nimbus/sync/[protocol, types] + ../../nimbus/sync/protocol ## Stripped down version of `sync/snap/range_desc` in order to decode the ## snap sync dump samples. @@ -46,12 +46,12 @@ type ## In fact, the `snap/1` driver returns the `Account` structure which is ## unwanted overhead, here. accKey*: NodeKey - accBlob*: Blob + accBlob*: seq[byte] AccountSlotsHeader* = object ## Storage root header accKey*: NodeKey ## Owner account, maybe unnecessary - storageRoot*: Hash256 ## Start of storage tree + storageRoot*: Hash32 ## Start of storage tree #subRange*: Opt[NodeTagRange] ## Sub-range of slot range covered AccountStorageRange* = object @@ -67,11 +67,11 @@ type data*: seq[SnapStorage] -proc to*(tag: NodeTag; T: type Hash256): T = +proc to*(tag: NodeTag; T: type Hash32): T = ## Convert to serialised equivalent result.data = tag.UInt256.toBytesBE -proc to*(key: Hash256; T: type NodeTag): T = +proc to*(key: Hash32; T: type NodeTag): T = ## Syntactic sugar key.data.NodeKey.to(T)