implement simple PoA sealing engine

the goal of this module is to pass hive/smoke/clique test
and also support for hive/ethereum/rpc test

fixes #801
This commit is contained in:
jangko 2021-08-24 14:34:58 +07:00
parent cff5c5afc4
commit 7dbc44f88c
No known key found for this signature in database
GPG Key ID: 31702AE10541E6B9
9 changed files with 281 additions and 109 deletions

View File

@ -50,6 +50,7 @@ const
GENESIS_EXTRA_DATA* = ""
GAS_LIMIT_MINIMUM* = 5000
GAS_LIMIT_MAXIMUM* = high(GasInt)
DEFAULT_GAS_LIMIT* = 8_000_000
BLANK_ROOT_HASH* = "56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421".toDigest
EMPTY_SHA3* = "c5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470".toDigest

View File

@ -21,7 +21,7 @@ import
config, genesis, rpc/[common, p2p, debug], p2p/chain,
eth/trie/db, metrics, metrics/[chronos_httpserver, chronicles_support],
graphql/ethapi,
"."/[utils, conf_utils]
"."/[utils, conf_utils, sealer, constants]
## TODO:
## * No IPv6 support
@ -41,6 +41,7 @@ type
state*: NimbusState
graphqlServer*: GraphqlHttpServerRef
wsRpcServer*: RpcWebSocketServer
sealingEngine*: SealingEngineRef
proc start(nimbus: NimbusNode) =
var conf = getConfiguration()
@ -163,6 +164,14 @@ proc start(nimbus: NimbusNode) =
nimbus.graphqlServer = setupGraphqlHttpServer(conf, chainDB, nimbus.ethNode)
nimbus.graphqlServer.start()
if conf.engineSigner != ZERO_ADDRESS:
let rs = validateSealer(chainRef)
if rs.isErr:
echo rs.error
quit(QuitFailure)
nimbus.sealingEngine = SealingEngineRef.new(chainRef)
nimbus.sealingEngine.start()
# metrics server
if conf.net.metricsServer:
let metricsAddress = "127.0.0.1"
@ -199,6 +208,8 @@ proc stop*(nimbus: NimbusNode) {.async, gcsafe.} =
nimbus.rpcServer.stop()
if conf.graphql.enabled:
await nimbus.graphqlServer.stop()
if conf.engineSigner != ZERO_ADDRESS:
await nimbus.sealingEngine.stop()
proc process*(nimbus: NimbusNode) =
# Main event loop

View File

@ -218,6 +218,13 @@ proc verifyFrom*(c: Chain): auto {.inline.} =
## Getter
c.verifyFrom
proc currentBlock*(c: Chain): BlockHeader
{.gcsafe, raises: [Defect,CatchableError].} =
## currentBlock retrieves the current head block of the canonical chain.
## Ideally the block should be retrieved from the blockchain's internal cache.
## but now it's enough to retrieve it from database
c.db.getCanonicalHead()
# ------------------------------------------------------------------------------
# Public `Chain` setters
# ------------------------------------------------------------------------------

View File

@ -217,6 +217,10 @@ when enableCliqueAsyncLock:
action
c.unlock
else:
template doExclusively*(c: Clique; action: untyped) =
action
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -23,8 +23,7 @@
import
std/[sequtils, tables, times],
../../constants,
../../db/[db_chain, state_db],
../../utils,
../../utils/ec_recover,
./clique_cfg,
./clique_defs,
./clique_desc,
@ -34,11 +33,7 @@ import
./snapshot/[ballot, snapshot_desc],
chronicles,
chronos,
eth/[common, keys, rlp],
nimcrypto
when not enableCliqueAsyncLock:
{.fatal: "Async locks must be enabled in clique_desc, try: -d:clique_async_lock"}
eth/[common, keys, rlp]
{.push raises: [Defect].}
@ -114,7 +109,7 @@ proc recentBlockNumber*(s: Snapshot;
# ------------------------------------------------------------------------------
# clique/clique.go(212): func (c *Clique) Author(header [..]
proc author*(c: Clique; header: BlockHeader): Result[EthAddress,CliqueError]
proc author*(c: Clique; header: BlockHeader): Result[EthAddress,UtilsError]
{.gcsafe, raises: [Defect,CatchableError].} =
## For the Consensus Engine, `author()` retrieves the Ethereum address of the
## account that minted the given block, which may be different from the
@ -171,7 +166,7 @@ proc verifyUncles*(c: Clique; ethBlock: EthBlock): CliqueOkResult =
# clique/clique.go(506): func (c *Clique) Prepare(chain [..]
proc prepare*(c: Clique; header: var BlockHeader): CliqueOkResult
proc prepare*(c: Clique; parent: BlockHeader, header: var BlockHeader): CliqueOkResult
{.gcsafe, raises: [Defect,CatchableError].} =
## For the Consensus Engine, `prepare()` initializes the consensus fields
## of a block header according to the rules of a particular engine. The
@ -216,58 +211,11 @@ proc prepare*(c: Clique; header: var BlockHeader): CliqueOkResult
header.mixDigest.reset
# Ensure the timestamp has the correct delay
var parent: BlockHeader
if not c.db.getBlockHeader(header.blockNumber-1, parent):
return err((errUnknownAncestor,""))
header.timestamp = parent.timestamp + c.cfg.period
if header.timestamp < getTime():
header.timestamp = getTime()
return ok()
# clique/clique.go(571): func (c *Clique) Finalize(chain [..]
proc finalize*(c: Clique; header: BlockHeader; db: AccountStateDB) =
## For the Consensus Engine, `finalize()` runs any post-transaction state
## modifications (e.g. block rewards) but does not assemble the block.
##
## Note: The block header and state database might be updated to reflect any
## consensus rules that happen at finalization (e.g. block rewards).
##
## Not implemented here, raises `AssertionDefect`
raiseAssert "Not implemented"
#
# ## This implementation ensures no uncles are set, nor block rewards given.
# # No block rewards in PoA, so the state remains as is and uncles are dropped
# let deleteEmptyObjectsOk = c.cfg.config.eip158block <= header.blockNumber
# header.stateRoot = db.intermediateRoot(deleteEmptyObjectsOk)
# header.ommersHash = EMPTY_UNCLE_HASH
# clique/clique.go(579): func (c *Clique) FinalizeAndAssemble(chain [..]
proc finalizeAndAssemble*(c: Clique; header: BlockHeader;
db: AccountStateDB; txs: openArray[Transaction];
receipts: openArray[Receipt]):
Result[EthBlock,CliqueError] =
## For the Consensus Engine, `finalizeAndAssemble()` runs any
## post-transaction state modifications (e.g. block rewards) and assembles
## the final block.
##
## Note: The block header and state database might be updated to reflect any
## consensus rules that happen at finalization (e.g. block rewards).
##
## Not implemented here, raises `AssertionDefect`
raiseAssert "Not implemented"
# ## Ensuring no uncles are set, nor block rewards given, and returns the
# ## final block.
#
# # Finalize block
# c.finalize(header, state, txs, uncles)
#
# # Assemble and return the final block for sealing
# return types.NewBlock(header, txs, nil, receipts,
# trie.NewStackTrie(nil)), nil
ok()
# clique/clique.go(589): func (c *Clique) Authorize(signer [..]
proc authorize*(c: Clique; signer: EthAddress; signFn: CliqueSignerFn) =
@ -301,8 +249,9 @@ proc sealHash*(header: BlockHeader): Hash256 =
# clique/clique.go(599): func (c *Clique) Seal(chain [..]
proc seal*(c: Clique; ethBlock: EthBlock):
Future[Result[EthBlock,CliqueError]] {.async,gcsafe.} =
proc seal*(c: Clique; ethBlock: var EthBlock):
Result[void,CliqueError] {.gcsafe,
raises: [Defect,CatchableError].} =
## For the Consensus Engine, `seal()` generates a new sealing request for
## the given input block and pushes the result into the given channel.
##
@ -312,19 +261,18 @@ proc seal*(c: Clique; ethBlock: EthBlock):
## This implementation attempts to create a sealed block using the local
## signing credentials. If running in the background, the process can be
## stopped by calling the `stopSeal()` function.
c.doExclusively:
c.stopSealReq = false
var header = ethBlock.header
# Sealing the genesis block is not supported
if header.blockNumber.isZero:
return err((errUnknownBlock,""))
return err((errUnknownBlock, ""))
# For 0-period chains, refuse to seal empty blocks (no reward but would spin
# sealing)
if c.cfg.period.isZero and ethBlock.txs.len == 0:
info $nilCliqueSealNoBlockYet
return err((nilCliqueSealNoBlockYet,""))
return err((nilCliqueSealNoBlockYet, ""))
# Don't hold the signer fields for the entire sealing procedure
c.doExclusively:
@ -337,7 +285,7 @@ proc seal*(c: Clique; ethBlock: EthBlock):
if rc.isErr:
return err(rc.error)
if not c.snapshot.isSigner(signer):
return err((errUnauthorizedSigner,""))
return err((errUnauthorizedSigner, ""))
# If we're amongst the recent signers, wait for the next block
let seen = c.snapshot.recentBlockNumber(signer)
@ -346,7 +294,7 @@ proc seal*(c: Clique; ethBlock: EthBlock):
# shift it out
if header.blockNumber < seen.value + c.snapshot.signersThreshold.u256:
info $nilCliqueSealSignedRecently
return err((nilCliqueSealSignedRecently,""))
return err((nilCliqueSealSignedRecently, ""))
# Sweet, the protocol permits us to sign the block, wait for our time
var delay = header.timestamp - getTime()
@ -365,42 +313,19 @@ proc seal*(c: Clique; ethBlock: EthBlock):
wiggle = $wiggle
# Sign all the things!
let sigHash = signFn(signer,header.cliqueRlp)
if sigHash.isErr:
return err((errCliqueSealSigFn,$sigHash.error))
let extraLen = header.extraData.len
if EXTRA_SEAL < extraLen:
header.extraData.setLen(extraLen - EXTRA_SEAL)
header.extraData.add sigHash.value.data
# Wait until sealing is terminated or delay timeout.
trace "Waiting for slot to sign and propagate",
delay = $delay
# FIXME: double check
let timeOutTime = getTime() + delay
while getTime() < timeOutTime:
c.doExclusively:
let isStopRequest = c.stopVHeaderReq
if isStopRequest:
warn "Sealing result is not read by miner",
sealhash = sealHash(header)
return err((errCliqueStopped,""))
poll()
c.doExclusively:
c.stopSealReq = false
return ok(ethBlock.withHeader(header))
proc stopSeal*(c: Clique): bool {.discardable.} =
## Activate the stop flag for running `seal()` function.
## Returns `true` if the stop flag could be activated.
syncExceptionWrap:
c.doExclusively:
if not c.stopSealReq:
c.stopSealReq = true
result =true
try:
let signature = signFn(signer,header.cliqueRlp)
if signature.isErr:
return err((errCliqueSealSigFn,$signature.error))
let extraLen = header.extraData.len
if EXTRA_SEAL < extraLen:
header.extraData.setLen(extraLen - EXTRA_SEAL)
header.extraData.add signature.value
except Exception as exc:
return err((errCliqueSealSigFn, "Error when signing block header"))
ethBlock = ethBlock.withHeader(header)
ok()
# clique/clique.go(673): func (c *Clique) CalcDifficulty(chain [..]
proc calcDifficulty(c: Clique;
@ -418,11 +343,6 @@ proc calcDifficulty(c: Clique;
return ok(c.snapshot.calcDifficulty(c.signer))
# # clique/clique.go(710): func (c *Clique) SealHash(header [..]
# proc sealHash(c: Clique; header: BlockHeader): Hash256 =
# ## SealHash returns the hash of a block prior to it being sealed.
# header.encodeSigHeader.keccakHash
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -45,13 +45,13 @@ proc validateGasLimit(header: BlockHeader; limit: GasInt): Result[void, string]
let upperLimit = limit div GAS_LIMIT_ADJUSTMENT_FACTOR
if diff >= upperLimit:
return err("invalid gas limit: have {header.gasLimit}, want {limit} +-= {upperLimit-1}")
return err(&"invalid gas limit: have {header.gasLimit}, want {limit} +-= {upperLimit-1}")
if header.gasLimit < GAS_LIMIT_MINIMUM:
return err("invalid gas limit below 5000")
ok()
proc validateGasLimit(c: BaseChainDB; header: BlockHeader): Result[void, string] {.
gcsafe, raises: [Defect,RlpError,BlockNotFound].} =
gcsafe, raises: [Defect,RlpError,BlockNotFound,ValueError].} =
let parent = c.getBlockHeader(header.parentHash)
header.validateGasLimit(parent.gasLimit)
@ -64,7 +64,7 @@ proc isLondonOrLater*(c: ChainConfig; number: BlockNumber): bool =
c.toFork(number) >= FkLondon
# consensus/misc/eip1559.go(55): func CalcBaseFee(config [..]
proc calcEip1599BaseFee(c: ChainConfig; parent: BlockHeader): UInt256 =
proc calcEip1599BaseFee*(c: ChainConfig; parent: BlockHeader): UInt256 =
## calculates the basefee of the header.
# If the current block is the first EIP-1559 block, return the

190
nimbus/sealer.nim Normal file
View File

@ -0,0 +1,190 @@
# Nimbus
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed except
# according to those terms.
import
std/[times, tables],
pkg/[chronos, eth/common, eth/keys, stew/results, chronicles],
"."/[config, db/db_chain, p2p/chain, constants, utils/header],
"."/p2p/clique/[clique_defs,
clique_desc,
clique_cfg,
clique_sealer,
clique_snapshot],
./p2p/gaslimit,
./chain_config,
./utils
type
EngineState = enum
EngineStopped, EngineRunning
SealingEngineRef* = ref SealingEngineObj
SealingEngineObj = object of RootObj
state: EngineState
engineLoop: Future[void]
chain: Chain
proc validateSealer*(chain: Chain): Result[void, string] =
let conf = getConfiguration()
if conf.engineSigner == ZERO_ADDRESS:
return err("signer address should not zero, use --engine-signer to set signer address")
if conf.engineSigner notin conf.accounts:
return err("signer address not in registered accounts, use --import-key/account to register the account")
let acc = conf.accounts[conf.engineSigner]
if not acc.unlocked:
return err("signer account not unlocked, please unlock it first via rpc/password file")
let chainConf = chain.db.config
if not chainConf.poaEngine:
return err("currently only PoA engine is supported")
if chainConf.cliquePeriod <= 0:
return err("clique period should be greater than 0")
if chainConf.cliqueEpoch <= 0:
return err("clique epoch should be greater than 0")
ok()
proc isLondon(c: ChainConfig, number: BlockNumber): bool {.inline.} =
number >= c.londonBlock
proc prepareHeader(engine: SealingEngineRef, parent: BlockHeader, time: Time): Result[BlockHeader, string] =
let timestamp = if parent.timestamp >= time:
parent.timestamp + 1.seconds
else:
time
var header = BlockHeader(
parentHash : parent.blockHash,
blockNumber: parent.blockNumber + 1.toBlockNumber,
# TODO: gasFloor and gasCeil can be configured by user
gasLimit : computeGasLimit(
parent.gasUsed,
parent.gasLimit,
gasFloor = DEFAULT_GAS_LIMIT,
gasCeil = DEFAULT_GAS_LIMIT),
# TODO: extraData can be configured via cli
#extraData : engine.extra,
timestamp : timestamp,
ommersHash : EMPTY_UNCLE_HASH,
stateRoot : parent.stateRoot,
txRoot : BLANK_ROOT_HASH,
receiptRoot: BLANK_ROOT_HASH
)
# Set baseFee and GasLimit if we are on an EIP-1559 chain
let conf = engine.chain.db.config
if isLondon(conf, header.blockNumber):
header.baseFee = calcEip1599BaseFee(conf, parent)
var parentGasLimit = parent.gasLimit
if not isLondon(conf, parent.blockNumber):
# Bump by 2x
parentGasLimit = parent.gasLimit * EIP1559_ELASTICITY_MULTIPLIER
# TODO: desiredLimit can be configured by user, gasCeil
header.gasLimit = calcGasLimit1559(parentGasLimit, desiredLimit = DEFAULT_GAS_LIMIT)
let clique = engine.chain.clique
let res = clique.prepare(parent, header)
if res.isErr:
return err($res.error)
ok(header)
proc generateBlock(engine: SealingEngineRef, ethBlock: var EthBlock): Result[void,string] =
# deviation from standard block generator
# - no local and remote transactions inclusion(need tx pool)
# - no receipts from tx
# - no DAO hard fork
# - no local and remote uncles inclusion
let clique = engine.chain.clique
let parent = engine.chain.currentBlock()
let time = getTime()
let res = prepareHeader(engine, parent, time)
if res.isErr:
return err("error prepare header")
ethBlock = EthBlock(
header: res.get()
)
let sealRes = clique.seal(ethBlock)
if sealRes.isErr:
return err("error sealing block header: " & $sealRes.error)
ok()
proc signerFunc(signer: EthAddress, message: openArray[byte]):
Result[RawSignature, cstring] {.gcsafe.} =
let
hashData = keccakHash(message)
conf = getConfiguration()
acc = conf.accounts[signer]
rawSign = sign(acc.privateKey, SkMessage(hashData.data)).toRaw
ok(rawSign)
proc sealingLoop(engine: SealingEngineRef): Future[void] {.async.} =
let clique = engine.chain.clique
let conf = getConfiguration()
clique.authorize(conf.engineSigner, signerFunc)
# convert times.Duration to chronos.Duration
let period = chronos.seconds(clique.cfg.period.inSeconds)
while engine.state == EngineRunning:
# the sealing engine will tick every `cliquePeriod` seconds
await sleepAsync(period)
if engine.state != EngineRunning:
break
# deviation from 'correct' sealing engine:
# - no queue for chain reorgs
# - no async lock/guard against race with sync algo
var blk: EthBlock
let blkRes = engine.generateBlock(blk)
if blkRes.isErr:
error "sealing engine generateBlock error", msg=blkRes.error
break
let res = engine.chain.persistBlocks([blk.header], [
BlockBody(transactions: blk.txs, uncles: blk.uncles)
])
if res == ValidationResult.Error:
error "sealing engine: persistBlocks error"
break
info "block generated", number=blk.header.blockNumber
proc new*(_: type SealingEngineRef, chain: Chain): SealingEngineRef =
SealingEngineRef(
chain: chain
)
proc start*(engine: SealingEngineRef) =
## Starts sealing engine.
if engine.state == EngineStopped:
engine.state = EngineRunning
engine.engineLoop = sealingLoop(engine)
info "sealing engine started"
proc stop*(engine: SealingEngineRef) {.async.} =
## Stop sealing engine from producing more blocks.
if engine.state == EngineRunning:
engine.state = EngineStopped
await engine.engineLoop.cancelAndWait()
info "sealing engine stopped"

View File

@ -25,6 +25,9 @@ import
stew/results,
stint
export
utils_defs
const
INMEMORY_SIGNATURES* = ##\
## Number of recent block signatures to keep in memory

View File

@ -57,6 +57,42 @@ proc computeGasLimit*(parent: BlockHeader, gasLimitFloor: GasInt): GasInt =
else:
return gasLimit
# CalcGasLimit computes the gas limit of the next block after parent. It aims
# to keep the baseline gas above the provided floor, and increase it towards the
# ceil if the blocks are full. If the ceil is exceeded, it will always decrease
# the gas allowance.
func computeGasLimit*(parentGasUsed, parentGasLimit, gasFloor, gasCeil: GasInt): GasInt =
# contrib = (parentGasUsed * 3 / 2) / 1024
let contrib = (parentGasUsed + parentGasUsed div 2) div GAS_LIMIT_ADJUSTMENT_FACTOR
# decay = parentGasLimit / 1024 -1
let decay = parentGasLimit div GAS_LIMIT_ADJUSTMENT_FACTOR - 1
#[
strategy: gasLimit of block-to-mine is set based on parent's
gasUsed value. if parentGasUsed > parentGasLimit * (2/3) then we
increase it, otherwise lower it (or leave it unchanged if it's right
at that usage) the amount increased/decreased depends on how far away
from parentGasLimit * (2/3) parentGasUsed is.
]#
var limit = parentGasLimit - decay + contrib
if limit < GAS_LIMIT_MINIMUM:
limit = GAS_LIMIT_MINIMUM
# If we're outside our allowed gas range, we try to hone towards them
if limit < gasFloor:
limit = parentGasLimit + decay
if limit > gasFloor:
limit = gasFloor
elif limit > gasCeil:
limit = parentGasLimit - decay
if limit < gasCeil:
limit = gasCeil
return limit
proc generateHeaderFromParentHeader*(config: ChainConfig, parent: BlockHeader,
coinbase: EthAddress, timestamp: Option[EthTime],
gasLimit: Option[GasInt], extraData: Blob, baseFee: Option[Uint256]): BlockHeader =