initial wire protocol transformation

rework on the eth wire protocol handlers.
curently still missing 4 handlers implementation.
but the framework is ready for eexpansion.
This commit is contained in:
jangko 2022-10-10 09:31:28 +07:00
parent 8c7d91512b
commit 3fa1b012e6
No known key found for this signature in database
GPG Key ID: 31702AE10541E6B9
32 changed files with 1617 additions and 319 deletions

View File

@ -17,7 +17,6 @@ import
eth/[keys, net/nat, trie/db],
eth/common as eth_common,
eth/p2p as eth_p2p,
eth/p2p/[rlpx_protocols/les_protocol],
json_rpc/rpcserver,
metrics,
metrics/[chronos_httpserver, chronicles_support],
@ -28,7 +27,7 @@ import
./graphql/ethapi,
./p2p/[chain, clique/clique_desc, clique/clique_sealer],
./rpc/[common, debug, engine_api, jwt_auth, p2p, cors],
./sync/[fast, full, protocol, snap],
./sync/[fast, full, protocol, snap, protocol/les_protocol, handlers],
./utils/tx_pool
when defined(evmc_enabled):
@ -67,6 +66,21 @@ proc importBlocks(conf: NimbusConf, chainDB: BaseChainDB) =
else:
quit(QuitSuccess)
proc basicServices(nimbus: NimbusNode,
conf: NimbusConf,
chainDB: BaseChainDB) =
# app wide TxPool singleton
# TODO: disable some of txPool internal mechanism if
# the engineSigner is zero.
nimbus.txPool = TxPoolRef.new(chainDB, conf.engineSigner)
# chainRef: some name to avoid module-name/filed/function misunderstandings
nimbus.chainRef = newChain(chainDB)
if conf.verifyFrom.isSome:
let verifyFrom = conf.verifyFrom.get()
nimbus.chainRef.extraValidation = 0 < verifyFrom
nimbus.chainRef.verifyFrom = verifyFrom
proc manageAccounts(nimbus: NimbusNode, conf: NimbusConf) =
if string(conf.keyStore).len > 0:
let res = nimbus.ctx.am.loadKeystores(string conf.keyStore)
@ -81,7 +95,7 @@ proc manageAccounts(nimbus: NimbusNode, conf: NimbusConf) =
quit(QuitFailure)
proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
chainDB: BaseChainDB, protocols: set[ProtocolFlag]) =
protocols: set[ProtocolFlag]) =
## Creating P2P Server
let kpres = nimbus.ctx.getNetKeys(conf.netKey, conf.dataDir.string)
if kpres.isErr:
@ -115,7 +129,7 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
let bootstrapNodes = conf.getBootNodes()
nimbus.ethNode = newEthereumNode(
keypair, address, conf.networkId, nil, conf.agentString,
keypair, address, conf.networkId, conf.agentString,
addAllCapabilities = false, minPeers = conf.maxPeers,
bootstrapNodes = bootstrapNodes,
bindUdpPort = conf.udpPort, bindTcpPort = conf.tcpPort,
@ -124,7 +138,8 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
# Add protocol capabilities based on protocol flags
if ProtocolFlag.Eth in protocols:
nimbus.ethNode.addCapability protocol.eth
let ethWireHandler = EthWireRef.new(nimbus.chainRef, nimbus.txPool)
nimbus.ethNode.addCapability(protocol.eth, ethWireHandler)
case conf.syncMode:
of SyncMode.Snap:
nimbus.ethNode.addCapability protocol.snap
@ -134,22 +149,14 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
if ProtocolFlag.Les in protocols:
nimbus.ethNode.addCapability les
# chainRef: some name to avoid module-name/filed/function misunderstandings
nimbus.chainRef = newChain(chainDB)
nimbus.ethNode.chain = nimbus.chainRef
if conf.verifyFrom.isSome:
let verifyFrom = conf.verifyFrom.get()
nimbus.chainRef.extraValidation = 0 < verifyFrom
nimbus.chainRef.verifyFrom = verifyFrom
# Early-initialise "--snap-sync" before starting any network connections.
if ProtocolFlag.Eth in protocols:
let tickerOK =
conf.logLevel in {LogLevel.INFO, LogLevel.DEBUG, LogLevel.TRACE}
case conf.syncMode:
of SyncMode.Full:
FullSyncRef.init(nimbus.ethNode, nimbus.ctx.rng, conf.maxPeers,
tickerOK).start
FullSyncRef.init(nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng,
conf.maxPeers, tickerOK).start
of SyncMode.Snap:
SnapSyncRef.init(nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng,
conf.maxPeers, nimbus.dbBackend, tickerOK).start
@ -181,12 +188,6 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
proc localServices(nimbus: NimbusNode, conf: NimbusConf,
chainDB: BaseChainDB, protocols: set[ProtocolFlag]) =
# app wide TxPool singleton
# TODO: disable some of txPool internal mechanism if
# the engineSigner is zero.
nimbus.txPool = TxPoolRef.new(chainDB, conf.engineSigner)
# metrics logging
if conf.logMetricsEnabled:
# https://github.com/nim-lang/Nim/issues/17369
@ -389,14 +390,15 @@ proc start(nimbus: NimbusNode, conf: NimbusConf) =
of NimbusCmd.`import`:
importBlocks(conf, chainDB)
else:
basicServices(nimbus, conf, chainDB)
manageAccounts(nimbus, conf)
setupP2P(nimbus, conf, chainDB, protocols)
setupP2P(nimbus, conf, protocols)
localServices(nimbus, conf, chainDB, protocols)
if ProtocolFlag.Eth in protocols and conf.maxPeers > 0:
case conf.syncMode:
of SyncMode.Default:
FastSyncCtx.new(nimbus.ethNode).start
FastSyncCtx.new(nimbus.ethNode, nimbus.chainRef).start
of SyncMode.Full, SyncMode.Snap:
discard

View File

@ -18,8 +18,7 @@ import
../clique,
../validate,
chronicles,
eth/common/chaindb,
eth/[common, trie/db],
eth/[common],
stew/endians2,
stint
@ -43,7 +42,7 @@ type
ArrowGlacier,
MergeFork
Chain* = ref object of AbstractChainDB
Chain* = ref object of RootRef
db: BaseChainDB
forkIds: array[ChainFork, ForkID]
@ -204,27 +203,6 @@ proc newChain*(db: BaseChainDB): Chain
new result
result.initChain(db, db.newClique, db.config.poaEngine)
# ------------------------------------------------------------------------------
# Public `AbstractChainDB` getter overload methods
# ------------------------------------------------------------------------------
method genesisHash*(c: Chain): KeccakHash {.gcsafe.} =
## Getter: `AbstractChainDB` overload method
c.blockZeroHash
method genesisStateRoot*(c: Chain): KeccakHash {.gcsafe, base.} =
## Getter: `AbstractChainDB` overloadable base method
c.blockZeroStateRoot
method getBestBlockHeader*(c: Chain): BlockHeader
{.gcsafe, raises: [Defect,CatchableError].} =
## Getter: `AbstractChainDB` overload method
c.db.getCanonicalHead()
method getTrieDB*(c: Chain): TrieDatabaseRef {.gcsafe.} =
## Getter: `AbstractChainDB` overload method
c.db.db
# ------------------------------------------------------------------------------
# Public `Chain` getters
# ------------------------------------------------------------------------------
@ -264,6 +242,10 @@ proc currentBlock*(c: Chain): BlockHeader
## but now it's enough to retrieve it from database
c.db.getCanonicalHead()
func genesisHash*(c: Chain): Hash256 =
## Getter
c.blockZeroHash
# ------------------------------------------------------------------------------
# Public `Chain` setters
# ------------------------------------------------------------------------------

View File

@ -40,48 +40,11 @@ func toChainFork(c: ChainConfig, number: BlockNumber): ChainFork =
elif number >= c.homesteadBlock: Homestead
else: Frontier
# ------------------------------------------------------------------------------
# Public `AbstractChainDB` overload methods
# ------------------------------------------------------------------------------
method getBlockHeader*(c: Chain, b: HashOrNum, output: var BlockHeader): bool
{.gcsafe, raises: [Defect,RlpError].} =
case b.isHash
of true:
c.db.getBlockHeader(b.hash, output)
else:
c.db.getBlockHeader(b.number, output)
method getSuccessorHeader*(c: Chain, h: BlockHeader, output: var BlockHeader,
skip = 0'u): bool {.gcsafe, raises: [Defect,RlpError].} =
let offset = 1 + skip.toBlockNumber
if h.blockNumber <= (not 0.toBlockNumber) - offset:
result = c.db.getBlockHeader(h.blockNumber + offset, output)
method getAncestorHeader*(c: Chain, h: BlockHeader, output: var BlockHeader,
skip = 0'u): bool {.gcsafe, raises: [Defect,RlpError].} =
let offset = 1 + skip.toBlockNumber
if h.blockNumber >= offset:
result = c.db.getBlockHeader(h.blockNumber - offset, output)
method getBlockBody*(c: Chain, blockHash: KeccakHash): BlockBodyRef {.gcsafe, raises: [Defect,RlpError].} =
result = BlockBodyRef()
if not c.db.getBlockBody(blockHash, result[]):
result = nil
method getForkId*(c: Chain, n: BlockNumber): ForkID {.gcsafe.} =
func getForkId*(c: Chain, n: BlockNumber): ForkID {.gcsafe.} =
## EIP 2364/2124
let fork = c.db.config.toChainFork(n)
c.forkIds[fork]
method getTotalDifficulty*(c: Chain): DifficultyInt {.gcsafe, raises: [Defect,RlpError].} =
c.db.headTotalDifficulty()
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -172,11 +172,7 @@ proc setCanonical*(c: Chain, blockHash: Hash256): ValidationResult
setCanonical(c, header)
# ------------------------------------------------------------------------------
# Public `AbstractChainDB` overload method
# ------------------------------------------------------------------------------
method persistBlocks*(c: Chain; headers: openArray[BlockHeader];
proc persistBlocks*(c: Chain; headers: openArray[BlockHeader];
bodies: openArray[BlockBody]): ValidationResult
{.gcsafe, raises: [Defect,CatchableError].} =
# Run the VM here

View File

@ -235,6 +235,7 @@ proc jwtSharedSecret*(rndSecret: JwtGenSecret; config: NimbusConf):
# client using authentication. This keeps it lower-risk initially.
warn "Could not write JWT secret to data directory",
jwtSecretPath
discard e
return ok(newSecret)
try:
@ -245,7 +246,7 @@ proc jwtSharedSecret*(rndSecret: JwtGenSecret; config: NimbusConf):
let rc = key.fromHex(lines[0])
if rc.isErr:
return err(rc.error)
return ok(key.JwtSharedKey)
return ok(key)
except IOError:
return err(jwtKeyFileCannotOpen)
except ValueError:

View File

@ -12,10 +12,11 @@ import
std/[sets, options, random, hashes, sequtils],
chronicles,
chronos,
eth/[common/eth_types, p2p],
eth/[common, p2p],
eth/p2p/[private/p2p_types, peer_pool],
stew/byteutils,
"."/[protocol, types]
"."/[protocol, types],
../p2p/chain
{.push raises:[Defect].}
@ -52,7 +53,7 @@ type
workQueue: seq[WantedBlocks]
endBlockNumber: BlockNumber
finalizedBlock: BlockNumber # Block which was downloaded and verified
chain: AbstractChainDB
chain: Chain
peerPool: PeerPool
trustedPeers: HashSet[Peer]
hasOutOfOrderBlocks: bool
@ -240,6 +241,7 @@ proc obtainBlocksFromPeer(syncCtx: FastSyncCtx, peer: Peer) {.async.} =
debug "Exception in getBestBlockNumber()", exc = e.name, err = e.msg
# no need to exit here, because the context might still have blocks to fetch
# from this peer
discard e
while (let workItemIdx = syncCtx.availableWorkItem(); workItemIdx != -1 and
peer.connectionState notin {Disconnecting, Disconnected}):
@ -433,16 +435,16 @@ proc onPeerDisconnected(ctx: FastSyncCtx, p: Peer) =
trace "peer disconnected ", peer = p
ctx.trustedPeers.excl(p)
proc new*(T: type FastSyncCtx; ethNode: EthereumNode): T
proc new*(T: type FastSyncCtx; ethNode: EthereumNode; chain: Chain): T
{.gcsafe, raises:[Defect,CatchableError].} =
FastSyncCtx(
# workQueue: n/a
# endBlockNumber: n/a
# hasOutOfOrderBlocks: n/a
chain: ethNode.chain,
chain: chain,
peerPool: ethNode.peerPool,
trustedPeers: initHashSet[Peer](),
finalizedBlock: ethNode.chain.getBestBlockHeader.blockNumber)
finalizedBlock: chain.db.getCanonicalHead().blockNumber)
proc start*(ctx: FastSyncCtx) =
## Code for the fast blockchain sync procedure:

View File

@ -55,11 +55,12 @@ proc runMulti(buddy: FullBuddyRef) {.async.} =
proc init*(
T: type FullSyncRef;
ethNode: EthereumNode;
chain: Chain;
rng: ref HmacDrbgContext;
maxPeers: int;
enableTicker = false): T =
new result
result.initSync(ethNode, maxPeers, enableTicker)
result.initSync(ethNode, chain, maxPeers, enableTicker)
result.ctx.data.rng = rng
proc start*(ctx: FullSyncRef) =

View File

@ -64,7 +64,7 @@ proc topUsedNumber(
top = 0.toBlockNumber
try:
let
bestNumber = ctx.chain.getBestBlockHeader.blockNumber
bestNumber = ctx.chain.db.getCanonicalHead().blockNumber
nBackBlocks = backBlocks.toBlockNumber
# Initialise before best block number
if nBackBlocks < bestNumber:
@ -95,7 +95,8 @@ proc processStaged(buddy: FullBuddyRef): bool =
let
ctx = buddy.ctx
peer = buddy.peer
chainDb = buddy.ctx.chain
chainDb = buddy.ctx.chain.db
chain = buddy.ctx.chain
bq = buddy.data.bQueue
# Get a work item, a list of headers + bodies
@ -109,7 +110,7 @@ proc processStaged(buddy: FullBuddyRef): bool =
# Store in persistent database
try:
if chainDb.persistBlocks(wi.headers, wi.bodies) == ValidationResult.OK:
if chain.persistBlocks(wi.headers, wi.bodies) == ValidationResult.OK:
bq.blockQueueAccept(wi)
return true
except CatchableError as e:
@ -128,11 +129,11 @@ proc processStaged(buddy: FullBuddyRef): bool =
# Something went wrong. Recycle work item (needs to be re-fetched, anyway)
let
parentHoN = HashOrNum(isHash: true, hash: wi.headers[0].parentHash)
parentHash = wi.headers[0].parentHash
try:
# Check whether hash of the first block is consistent
var parent: BlockHeader
if chainDb.getBlockHeader(parentHoN, parent):
if chainDb.getBlockHeader(parentHash, parent):
# First block parent is ok, so there might be other problems. Re-fetch
# the blocks from another peer.
trace "Storing persistent blocks failed", peer, range=($wi.blocks)

134
nimbus/sync/handlers.nim Normal file
View File

@ -0,0 +1,134 @@
import
chronicles,
eth/[common, p2p, trie/db],
./types,
./protocol/eth/eth_types,
../db/db_chain,
../p2p/chain,
../utils/tx_pool
type
EthWireRef* = ref object of EthWireBase
db: BaseChainDB
chain: Chain
txPool: TxPoolRef
proc new*(_: type EthWireRef, chain: Chain, txPool: TxPoolRef): EthWireRef =
EthWireRef(
db: chain.db,
chain: chain,
txPool: txPool
)
proc notImplemented(name: string) =
debug "Wire handler method not implemented", meth = name
method getStatus*(ctx: EthWireRef): EthState {.gcsafe.} =
let
db = ctx.db
chain = ctx.chain
bestBlock = db.getCanonicalHead()
forkId = chain.getForkId(bestBlock.blockNumber)
EthState(
totalDifficulty: db.headTotalDifficulty,
genesisHash: chain.genesisHash,
bestBlockHash: bestBlock.blockHash,
forkId: ChainForkId(
forkHash: forkId.crc.toBytesBE,
forkNext: forkId.nextFork.toBlockNumber
)
)
method getReceipts*(ctx: EthWireRef, hashes: openArray[Hash256]): seq[seq[Receipt]] {.gcsafe.} =
let db = ctx.db
var header: BlockHeader
for blockHash in hashes:
if db.getBlockHeader(blockHash, header):
result.add db.getReceipts(header.receiptRoot)
else:
result.add @[]
trace "handlers.getReceipts: blockHeader not found", blockHash
method getPooledTxs*(ctx: EthWireRef, hashes: openArray[Hash256]): seq[Transaction] {.gcsafe.} =
let txPool = ctx.txPool
for txHash in hashes:
let res = txPool.getItem(txHash)
if res.isOk:
result.add res.value.tx
else:
result.add Transaction()
trace "handlers.getPooledTxs: tx not found", txHash
method getBlockBodies*(ctx: EthWireRef, hashes: openArray[Hash256]): seq[BlockBody] {.gcsafe.} =
let db = ctx.db
var body: BlockBody
for blockHash in hashes:
if db.getBlockBody(blockHash, body):
result.add body
else:
result.add BlockBody()
trace "handlers.getBlockBodies: blockBody not found", blockHash
proc successorHeader(db: BaseChainDB,
h: BlockHeader,
output: var BlockHeader,
skip = 0'u): bool {.gcsafe, raises: [Defect,RlpError].} =
let offset = 1 + skip.toBlockNumber
if h.blockNumber <= (not 0.toBlockNumber) - offset:
result = db.getBlockHeader(h.blockNumber + offset, output)
proc ancestorHeader*(db: BaseChainDB,
h: BlockHeader,
output: var BlockHeader,
skip = 0'u): bool {.gcsafe, raises: [Defect,RlpError].} =
let offset = 1 + skip.toBlockNumber
if h.blockNumber >= offset:
result = db.getBlockHeader(h.blockNumber - offset, output)
proc blockHeader(db: BaseChainDB,
b: HashOrNum,
output: var BlockHeader): bool
{.gcsafe, raises: [Defect,RlpError].} =
if b.isHash:
db.getBlockHeader(b.hash, output)
else:
db.getBlockHeader(b.number, output)
method getBlockHeaders*(ctx: EthWireRef, req: BlocksRequest): seq[BlockHeader] {.gcsafe.} =
let db = ctx.db
var foundBlock: BlockHeader
result = newSeqOfCap[BlockHeader](req.maxResults)
if db.blockHeader(req.startBlock, foundBlock):
result.add foundBlock
while uint64(result.len) < req.maxResults:
if not req.reverse:
if not db.successorHeader(foundBlock, foundBlock, req.skip):
break
else:
if not db.ancestorHeader(foundBlock, foundBlock, req.skip):
break
result.add foundBlock
method handleAnnouncedTxs*(ctx: EthWireRef, peer: Peer, txs: openArray[Transaction]) {.gcsafe.} =
ctx.txPool.jobAddTxs(txs)
method handleAnnouncedTxsHashes*(ctx: EthWireRef, peer: Peer, txHashes: openArray[Hash256]) {.gcsafe.} =
notImplemented("handleAnnouncedTxsHashes")
method handleNewBlock*(ctx: EthWireRef, peer: Peer, blk: EthBlock, totalDifficulty: DifficultyInt) {.gcsafe.} =
notImplemented("handleNewBlock")
method handleNewBlockHashes*(ctx: EthWireRef, peer: Peer, hashes: openArray[NewBlockHashesAnnounce]) {.gcsafe.} =
notImplemented("handleNewBlockHashes")
when defined(legacy_eth66_enabled):
method getStorageNodes*(ctx: EthWireRef, hashes: openArray[Hash256]): seq[Blob] {.gcsafe.} =
let db = ctx.db.db
for hash in hashes:
result.add db.get(hash.data)
method handleNodeData*(ctx: EthWireRef, peer: Peer, data: openArray[Blob]) {.gcsafe.} =
notImplemented("handleNodeData")

View File

@ -19,7 +19,7 @@ import
chronos,
eth/[common/eth_types, p2p],
stew/byteutils,
".."/[protocol, sync_desc]
".."/[protocol, sync_desc, types]
{.push raises:[Defect].}

View File

@ -63,7 +63,7 @@ import
eth/[common/eth_types, p2p],
stew/[byteutils, interval_set, sorted_set],
"../.."/[db/db_chain, utils],
".."/[protocol, sync_desc]
".."/[protocol, sync_desc, types]
{.push raises:[Defect].}

View File

@ -16,7 +16,6 @@ else:
type eth* = eth67
import
#./protocol/eth67 as proto_eth
./protocol/snap1 as proto_snap
export
@ -24,7 +23,6 @@ export
proto_snap
type
#eth* = eth67
snap* = snap1
SnapAccountRange* = accountRangeObj

View File

@ -0,0 +1,69 @@
import
chronicles,
eth/[common, p2p, p2p/private/p2p_types],
../../types
type
NewBlockHashesAnnounce* = object
hash*: Hash256
number*: BlockNumber
ChainForkId* = object
forkHash*: array[4, byte] # The RLP encoding must be exactly 4 bytes.
forkNext*: BlockNumber # The RLP encoding must be variable-length
EthWireBase* = ref object of RootRef
EthState* = object
totalDifficulty*: DifficultyInt
genesisHash*: Hash256
bestBlockHash*: Hash256
forkId*: ChainForkId
PeerState* = ref object of RootRef
initialized*: bool
bestBlockHash*: Hash256
bestDifficulty*: DifficultyInt
const
maxStateFetch* = 384
maxBodiesFetch* = 128
maxReceiptsFetch* = 256
maxHeadersFetch* = 192
proc notImplemented(name: string) =
debug "Method not implemented", meth = name
method getStatus*(ctx: EthWireBase): EthState {.base.} =
notImplemented("getStatus")
method getReceipts*(ctx: EthWireBase, hashes: openArray[Hash256]): seq[seq[Receipt]] {.base.} =
notImplemented("getReceipts")
method getPooledTxs*(ctx: EthWireBase, hashes: openArray[Hash256]): seq[Transaction] {.base.} =
notImplemented("getPooledTxs")
method getBlockBodies*(ctx: EthWireBase, hashes: openArray[Hash256]): seq[BlockBody] {.base.} =
notImplemented("getBlockBodies")
method getBlockHeaders*(ctx: EthWireBase, req: BlocksRequest): seq[BlockHeader] {.base.} =
notImplemented("getBlockHeaders")
method handleNewBlock*(ctx: EthWireBase, peer: Peer, blk: EthBlock, totalDifficulty: DifficultyInt) {.base.} =
notImplemented("handleNewBlock")
method handleAnnouncedTxs*(ctx: EthWireBase, peer: Peer, txs: openArray[Transaction]) {.base.} =
notImplemented("handleAnnouncedTxs")
method handleAnnouncedTxsHashes*(ctx: EthWireBase, peer: Peer, txHashes: openArray[Hash256]) {.base.} =
notImplemented("handleAnnouncedTxsHashes")
method handleNewBlockHashes*(ctx: EthWireBase, peer: Peer, hashes: openArray[NewBlockHashesAnnounce]) {.base.} =
notImplemented("handleNewBlockHashes")
when defined(legacy_eth66_enabled):
method getStorageNodes*(ctx: EthWireBase, hashes: openArray[Hash256]): seq[Blob] {.base.} =
notImplemented("getStorageNodes")
method handleNodeData*(ctx: EthWireBase, peer: Peer, data: openArray[Blob]) {.base.} =
notImplemented("handleNodeData")

View File

@ -1,4 +1,4 @@
# Nimbus - Ethereum Wire Protocol, version eth/65
# Nimbus - Ethereum Wire Protocol
#
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed under either of
@ -9,69 +9,28 @@
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## This module implements `eth/66`, the `Ethereum Wire Protocol version 66
## <https://github.com/ethereum/devp2p/blob/master/caps/eth.md>`_
##
## Optional peply processor function hooks
## ---------------------------------------
##
## The `onGetNodeData` and `onNodeData` hooks allow new sync code to register
## for providing reply data or consume incoming events without a circular
## import dependency involving the `p2pProtocol`.
##
## Without the hooks, the protocol file needs to import functions that consume
## incoming network messages. So the `p2pProtocol` can call them, and the
## functions that produce outgoing network messages need to import the protocol
## file.
##
## But related producer/consumer function pairs are typically located in the
## very same file because they are closely related. For an example see the
## producer of `GetNodeData` and the consumer of `NodeData`.
##
## In this specific case, we need to split the `requestResponse` relationship
## between `GetNodeData` and `NodeData` messages when pipelining.
##
## Among others, this way is the most practical to acomplish the split
## implementation. It allows different protocol-using modules to coexist
## easily. When the hooks aren't set, default behaviour applies.
## This module implements Ethereum Wire Protocol version 66, `eth/66`.
## Specification:
## `eth/66 <https://github.com/ethereum/devp2p/blob/master/caps/eth.md>`_
import
stint,
chronicles,
chronos,
eth/[common/eth_types_rlp, p2p, p2p/private/p2p_types, p2p/blockchain_utils],
eth/[common, p2p, p2p/private/p2p_types],
stew/byteutils,
./trace_config
./trace_config,
./eth/eth_types,
../types,
../../utils
export
eth_types
logScope:
topics = "datax"
type
NewBlockHashesAnnounce* = object
hash: Hash256
number: BlockNumber
NewBlockAnnounce* = EthBlock
ForkId* = object
forkHash: array[4, byte] # The RLP encoding must be exactly 4 bytes.
forkNext: BlockNumber # The RLP encoding must be variable-length
PeerState* = ref object
initialized*: bool
bestBlockHash*: Hash256
bestDifficulty*: DifficultyInt
onGetNodeData*:
proc (peer: Peer, hashes: openArray[Hash256],
data: var seq[Blob]) {.gcsafe.}
onNodeData*:
proc (peer: Peer, data: openArray[Blob]) {.gcsafe.}
topics = "eth66"
const
maxStateFetch* = 384
maxBodiesFetch* = 128
maxReceiptsFetch* = 256
maxHeadersFetch* = 192
ethVersion* = 66
prettyEthProtoName* = "[eth/" & $ethVersion & "]"
@ -82,10 +41,6 @@ const
trEthRecvReceived & "BlockHeaders (0x04)"
trEthRecvReceivedBlockBodies* =
trEthRecvReceived & "BlockBodies (0x06)"
trEthRecvReceivedGetNodeData* =
trEthRecvReceived & "GetNodeData (0x0d)"
trEthRecvReceivedNodeData* =
trEthRecvReceived & "NodeData (0x0e)"
trEthRecvProtocolViolation* =
"<< " & prettyEthProtoName & " Protocol violation, "
@ -105,8 +60,6 @@ const
trEthSendReplying* =
">> " & prettyEthProtoName & " Replying "
trEthSendReplyingNodeData* =
trEthSendReplying & "NodeData (0x0e)"
trEthSendDelaying* =
">> " & prettyEthProtoName & " Delaying "
@ -115,50 +68,41 @@ func toHex(hash: Hash256): string =
## Shortcut for `byteutils.toHex(hash.data)`
hash.data.toHex
func traceStep(request: BlocksRequest): string =
var str = if request.reverse: "-" else: "+"
if request.skip < high(typeof(request.skip)):
return str & $(request.skip + 1)
return static($(high(typeof(request.skip)).u256 + 1))
p2pProtocol eth66(version = ethVersion,
rlpxName = "eth",
peerState = PeerState,
networkState = EthWireBase,
useRequestIds = true):
onPeerConnected do (peer: Peer):
let
network = peer.network
chain = network.chain
bestBlock = chain.getBestBlockHeader
totalDifficulty = chain.getTotalDifficulty
chainForkId = chain.getForkId(bestBlock.blockNumber)
forkId = ForkId(
forkHash: chainForkId.crc.toBytesBE,
forkNext: chainForkId.nextFork.toBlockNumber)
network = peer.network
ctx = peer.networkState
status = ctx.getStatus()
trace trEthSendSending & "Status (0x00)", peer,
td=totalDifficulty,
bestHash=bestBlock.blockHash.toHex,
networkId=network.networkId,
genesis=chain.genesisHash.toHex,
forkHash=forkId.forkHash.toHex, forkNext=forkId.forkNext
td = status.totalDifficulty,
bestHash = status.bestBlockHash,
networkId = network.networkId,
genesis = status.genesisHash,
forkHash = status.forkId.forkHash.toHex,
forkNext = status.forkId.forkNext
let m = await peer.status(ethVersion,
network.networkId,
totalDifficulty,
bestBlock.blockHash,
chain.genesisHash,
forkId,
status.totalDifficulty,
status.bestBlockHash,
status.genesisHash,
status.forkId,
timeout = chronos.seconds(10))
when trEthTraceHandshakesOk:
trace "Handshake: Local and remote networkId",
local=network.networkId, remote=m.networkId
trace "Handshake: Local and remote genesisHash",
local=chain.genesisHash.toHex, remote=m.genesisHash.toHex
local=status.genesisHash, remote=m.genesisHash
trace "Handshake: Local and remote forkId",
local=(forkId.forkHash.toHex & "/" & $forkId.forkNext),
local=(status.forkId.forkHash.toHex & "/" & $status.forkId.forkNext),
remote=(m.forkId.forkHash.toHex & "/" & $m.forkId.forkNext)
if m.networkId != network.networkId:
@ -167,9 +111,9 @@ p2pProtocol eth66(version = ethVersion,
raise newException(
UselessPeerError, "Eth handshake for different network")
if m.genesisHash != chain.genesisHash:
if m.genesisHash != status.genesisHash:
trace "Peer for a different network (genesisHash)", peer,
expectGenesis=chain.genesisHash.toHex, gotGenesis=m.genesisHash.toHex
expectGenesis=status.genesisHash, gotGenesis=m.genesisHash
raise newException(
UselessPeerError, "Eth handshake for different network")
@ -186,7 +130,7 @@ p2pProtocol eth66(version = ethVersion,
totalDifficulty: DifficultyInt,
bestHash: Hash256,
genesisHash: Hash256,
forkId: ForkId) =
forkId: ChainForkId) =
trace trEthRecvReceived & "Status (0x00)", peer,
networkId, totalDifficulty, bestHash, genesisHash,
forkHash=forkId.forkHash.toHex, forkNext=forkId.forkNext
@ -194,30 +138,27 @@ p2pProtocol eth66(version = ethVersion,
# User message 0x01: NewBlockHashes.
proc newBlockHashes(peer: Peer, hashes: openArray[NewBlockHashesAnnounce]) =
when trEthTraceGossipOk:
trace trEthRecvDiscarding & "NewBlockHashes (0x01)", peer,
trace trEthRecvReceived & "NewBlockHashes (0x01)", peer,
hashes=hashes.len
discard
let ctx = peer.networkState()
ctx.handleNewBlockHashes(peer, hashes)
# User message 0x02: Transactions.
proc transactions(peer: Peer, transactions: openArray[Transaction]) =
when trEthTraceGossipOk:
trace trEthRecvDiscarding & "Transactions (0x02)", peer,
trace trEthRecvReceived & "Transactions (0x02)", peer,
transactions=transactions.len
discard
let ctx = peer.networkState()
ctx.handleAnnouncedTxs(peer, transactions)
requestResponse:
# User message 0x03: GetBlockHeaders.
proc getBlockHeaders(peer: Peer, request: BlocksRequest) =
when trEthTracePacketsOk:
let
startBlock =
if request.startBlock.isHash: request.startBlock.hash.toHex
else: '#' & $request.startBlock.number
step =
if request.maxResults == 1: "n/a"
else: $request.traceStep
trace trEthRecvReceived & "GetBlockHeaders (0x03)", peer,
startBlock, count=request.maxResults, step
count=request.maxResults
if request.maxResults > uint64(maxHeadersFetch):
debug "GetBlockHeaders (0x03) requested too many headers",
@ -225,7 +166,8 @@ p2pProtocol eth66(version = ethVersion,
await peer.disconnect(BreachOfProtocol)
return
let headers = peer.network.chain.getBlockHeaders(request)
let ctx = peer.networkState()
let headers = ctx.getBlockHeaders(request)
if headers.len > 0:
trace trEthSendReplying & "with BlockHeaders (0x04)", peer,
sent=headers.len, requested=request.maxResults
@ -249,7 +191,8 @@ p2pProtocol eth66(version = ethVersion,
await peer.disconnect(BreachOfProtocol)
return
let bodies = peer.network.chain.getBlockBodies(hashes)
let ctx = peer.networkState()
let bodies = ctx.getBlockBodies(hashes)
if bodies.len > 0:
trace trEthSendReplying & "with BlockBodies (0x06)", peer,
sent=bodies.len, requested=hashes.len
@ -263,22 +206,26 @@ p2pProtocol eth66(version = ethVersion,
proc blockBodies(peer: Peer, blocks: openArray[BlockBody])
# User message 0x07: NewBlock.
proc newBlock(peer: Peer, bh: EthBlock, totalDifficulty: DifficultyInt) =
proc newBlock(peer: Peer, blk: EthBlock, totalDifficulty: DifficultyInt) =
# (Note, needs to use `EthBlock` instead of its alias `NewBlockAnnounce`
# because either `p2pProtocol` or RLPx doesn't work with an alias.)
when trEthTraceGossipOk:
trace trEthRecvDiscarding & "NewBlock (0x07)", peer,
trace trEthRecvReceived & "NewBlock (0x07)", peer,
totalDifficulty,
blockNumber = bh.header.blockNumber,
blockDifficulty = bh.header.difficulty
discard
blockNumber = blk.header.blockNumber,
blockDifficulty = blk.header.difficulty
let ctx = peer.networkState()
ctx.handleNewBlock(peer, blk, totalDifficulty)
# User message 0x08: NewPooledTransactionHashes.
proc newPooledTransactionHashes(peer: Peer, txHashes: openArray[Hash256]) =
when trEthTraceGossipOk:
trace trEthRecvDiscarding & "NewPooledTransactionHashes (0x08)", peer,
trace trEthRecvReceived & "NewPooledTransactionHashes (0x08)", peer,
hashes=txHashes.len
discard
let ctx = peer.networkState()
ctx.handleAnnouncedTxsHashes(peer, txHashes)
requestResponse:
# User message 0x09: GetPooledTransactions.
@ -286,9 +233,16 @@ p2pProtocol eth66(version = ethVersion,
trace trEthRecvReceived & "GetPooledTransactions (0x09)", peer,
hashes=txHashes.len
trace trEthSendReplying & "EMPTY PooledTransactions (0x10)", peer,
sent=0, requested=txHashes.len
await response.send([])
let ctx = peer.networkState()
let txs = ctx.getPooledTxs(txHashes)
if txs.len > 0:
trace trEthSendReplying & "with PooledTransactions (0x0a)", peer,
sent=txs.len, requested=txHashes.len
else:
trace trEthSendReplying & "EMPTY PooledTransactions (0x0a)", peer,
sent=0, requested=txHashes.len
await response.send(txs)
# User message 0x0a: PooledTransactions.
proc pooledTransactions(peer: Peer, transactions: openArray[Transaction])
@ -297,29 +251,23 @@ p2pProtocol eth66(version = ethVersion,
# User message 0x0d: GetNodeData.
proc getNodeData(peer: Peer, nodeHashes: openArray[Hash256]) =
trace trEthRecvReceivedGetNodeData, peer,
trace trEthRecvReceived & "GetNodeData (0x0d)", peer,
hashes=nodeHashes.len
var data: seq[Blob]
if not peer.state.onGetNodeData.isNil:
peer.state.onGetNodeData(peer, nodeHashes, data)
else:
data = peer.network.chain.getStorageNodes(nodeHashes)
let ctx = peer.networkState()
let data = ctx.getStorageNodes(nodeHashes)
trace trEthSendReplyingNodeData, peer,
trace trEthSendReplying & "NodeData (0x0e)", peer,
sent=data.len, requested=nodeHashes.len
await peer.nodeData(data)
# User message 0x0e: NodeData.
proc nodeData(peer: Peer, data: openArray[Blob]) =
if not peer.state.onNodeData.isNil:
# The `onNodeData` should do its own `tracePacket`, because we don't
# know if this is a valid reply ("Got reply") or something else.
peer.state.onNodeData(peer, data)
else:
trace trEthRecvDiscarding & "NodeData (0x0e)", peer,
trace trEthRecvReceived & "NodeData (0x0e)", peer,
bytes=data.len
let ctx = peer.networkState()
ctx.handleNodeData(peer, data)
requestResponse:
# User message 0x0f: GetReceipts.
@ -327,11 +275,16 @@ p2pProtocol eth66(version = ethVersion,
trace trEthRecvReceived & "GetReceipts (0x0f)", peer,
hashes=hashes.len
trace trEthSendReplying & "EMPTY Receipts (0x10)", peer,
sent=0, requested=hashes.len
await response.send([])
# TODO: implement `getReceipts` and reactivate this code
# await response.send(peer.network.chain.getReceipts(hashes))
let ctx = peer.networkState()
let rec = ctx.getReceipts(hashes)
if rec.len > 0:
trace trEthSendReplying & "with Receipts (0x10)", peer,
sent=rec.len, requested=hashes.len
else:
trace trEthSendReplying & "EMPTY Receipts (0x10)", peer,
sent=0, requested=hashes.len
await response.send(rec)
# User message 0x10: Receipts.
proc receipts(peer: Peer, receipts: openArray[Receipt])
proc receipts(peer: Peer, receipts: openArray[seq[Receipt]])

View File

@ -14,36 +14,23 @@
## `eth/67 <https://github.com/ethereum/devp2p/blob/master/caps/eth.md>`_
import
stint,
chronicles,
chronos,
eth/[common/eth_types, p2p, p2p/private/p2p_types, p2p/blockchain_utils],
eth/[common, p2p, p2p/private/p2p_types],
stew/byteutils,
./trace_config
./trace_config,
./eth/eth_types,
../types,
../../utils
export
eth_types
logScope:
topics = "datax"
type
NewBlockHashesAnnounce* = object
hash: Hash256
number: BlockNumber
NewBlockAnnounce* = EthBlock
ForkId* = object
forkHash: array[4, byte] # The RLP encoding must be exactly 4 bytes.
forkNext: BlockNumber # The RLP encoding must be variable-length
PeerState = ref object
initialized*: bool
bestBlockHash*: Hash256
bestDifficulty*: DifficultyInt
topics = "eth67"
const
maxStateFetch* = 384
maxBodiesFetch* = 128
maxReceiptsFetch* = 256
maxHeadersFetch* = 192
ethVersion* = 67
prettyEthProtoName* = "[eth/" & $ethVersion & "]"
@ -84,41 +71,38 @@ func toHex(hash: Hash256): string =
p2pProtocol eth67(version = ethVersion,
rlpxName = "eth",
peerState = PeerState,
networkState = EthWireBase,
useRequestIds = true):
onPeerConnected do (peer: Peer):
let
network = peer.network
chain = network.chain
bestBlock = chain.getBestBlockHeader
totalDifficulty = chain.getTotalDifficulty
chainForkId = chain.getForkId(bestBlock.blockNumber)
forkId = ForkId(
forkHash: chainForkId.crc.toBytesBE,
forkNext: chainForkId.nextFork.toBlockNumber)
network = peer.network
ctx = peer.networkState
status = ctx.getStatus()
trace trEthSendSending & "Status (0x00)", peer,
td=totalDifficulty,
bestHash=bestBlock.blockHash.toHex,
networkId=network.networkId,
genesis=chain.genesisHash.toHex,
forkHash=forkId.forkHash.toHex, forkNext=forkId.forkNext
td = status.totalDifficulty,
bestHash = status.bestBlockHash,
networkId = network.networkId,
genesis = status.genesisHash,
forkHash = status.forkId.forkHash.toHex,
forkNext = status.forkId.forkNext
let m = await peer.status(ethVersion,
network.networkId,
totalDifficulty,
bestBlock.blockHash,
chain.genesisHash,
forkId,
status.totalDifficulty,
status.bestBlockHash,
status.genesisHash,
status.forkId,
timeout = chronos.seconds(10))
when trEthTraceHandshakesOk:
trace "Handshake: Local and remote networkId",
local=network.networkId, remote=m.networkId
trace "Handshake: Local and remote genesisHash",
local=chain.genesisHash.toHex, remote=m.genesisHash.toHex
local=status.genesisHash, remote=m.genesisHash
trace "Handshake: Local and remote forkId",
local=(forkId.forkHash.toHex & "/" & $forkId.forkNext),
local=(status.forkId.forkHash.toHex & "/" & $status.forkId.forkNext),
remote=(m.forkId.forkHash.toHex & "/" & $m.forkId.forkNext)
if m.networkId != network.networkId:
@ -127,9 +111,9 @@ p2pProtocol eth67(version = ethVersion,
raise newException(
UselessPeerError, "Eth handshake for different network")
if m.genesisHash != chain.genesisHash:
if m.genesisHash != status.genesisHash:
trace "Peer for a different network (genesisHash)", peer,
expectGenesis=chain.genesisHash.toHex, gotGenesis=m.genesisHash.toHex
expectGenesis=status.genesisHash, gotGenesis=m.genesisHash
raise newException(
UselessPeerError, "Eth handshake for different network")
@ -146,7 +130,7 @@ p2pProtocol eth67(version = ethVersion,
totalDifficulty: DifficultyInt,
bestHash: Hash256,
genesisHash: Hash256,
forkId: ForkId) =
forkId: ChainForkId) =
trace trEthRecvReceived & "Status (0x00)", peer,
networkId, totalDifficulty, bestHash, genesisHash,
forkHash=forkId.forkHash.toHex, forkNext=forkId.forkNext
@ -154,16 +138,20 @@ p2pProtocol eth67(version = ethVersion,
# User message 0x01: NewBlockHashes.
proc newBlockHashes(peer: Peer, hashes: openArray[NewBlockHashesAnnounce]) =
when trEthTraceGossipOk:
trace trEthRecvDiscarding & "NewBlockHashes (0x01)", peer,
trace trEthRecvReceived & "NewBlockHashes (0x01)", peer,
hashes=hashes.len
discard
let ctx = peer.networkState()
ctx.handleNewBlockHashes(peer, hashes)
# User message 0x02: Transactions.
proc transactions(peer: Peer, transactions: openArray[Transaction]) =
when trEthTraceGossipOk:
trace trEthRecvDiscarding & "Transactions (0x02)", peer,
trace trEthRecvReceived & "Transactions (0x02)", peer,
transactions=transactions.len
discard
let ctx = peer.networkState()
ctx.handleAnnouncedTxs(peer, transactions)
requestResponse:
# User message 0x03: GetBlockHeaders.
@ -178,7 +166,8 @@ p2pProtocol eth67(version = ethVersion,
await peer.disconnect(BreachOfProtocol)
return
let headers = peer.network.chain.getBlockHeaders(request)
let ctx = peer.networkState()
let headers = ctx.getBlockHeaders(request)
if headers.len > 0:
trace trEthSendReplying & "with BlockHeaders (0x04)", peer,
sent=headers.len, requested=request.maxResults
@ -202,7 +191,8 @@ p2pProtocol eth67(version = ethVersion,
await peer.disconnect(BreachOfProtocol)
return
let bodies = peer.network.chain.getBlockBodies(hashes)
let ctx = peer.networkState()
let bodies = ctx.getBlockBodies(hashes)
if bodies.len > 0:
trace trEthSendReplying & "with BlockBodies (0x06)", peer,
sent=bodies.len, requested=hashes.len
@ -216,22 +206,26 @@ p2pProtocol eth67(version = ethVersion,
proc blockBodies(peer: Peer, blocks: openArray[BlockBody])
# User message 0x07: NewBlock.
proc newBlock(peer: Peer, bh: EthBlock, totalDifficulty: DifficultyInt) =
proc newBlock(peer: Peer, blk: EthBlock, totalDifficulty: DifficultyInt) =
# (Note, needs to use `EthBlock` instead of its alias `NewBlockAnnounce`
# because either `p2pProtocol` or RLPx doesn't work with an alias.)
when trEthTraceGossipOk:
trace trEthRecvDiscarding & "NewBlock (0x07)", peer,
trace trEthRecvReceived & "NewBlock (0x07)", peer,
totalDifficulty,
blockNumber = bh.header.blockNumber,
blockDifficulty = bh.header.difficulty
discard
blockNumber = blk.header.blockNumber,
blockDifficulty = blk.header.difficulty
let ctx = peer.networkState()
ctx.handleNewBlock(peer, blk, totalDifficulty)
# User message 0x08: NewPooledTransactionHashes.
proc newPooledTransactionHashes(peer: Peer, txHashes: openArray[Hash256]) =
when trEthTraceGossipOk:
trace trEthRecvDiscarding & "NewPooledTransactionHashes (0x08)", peer,
trace trEthRecvReceived & "NewPooledTransactionHashes (0x08)", peer,
hashes=txHashes.len
discard
let ctx = peer.networkState()
ctx.handleAnnouncedTxsHashes(peer, txHashes)
requestResponse:
# User message 0x09: GetPooledTransactions.
@ -239,9 +233,16 @@ p2pProtocol eth67(version = ethVersion,
trace trEthRecvReceived & "GetPooledTransactions (0x09)", peer,
hashes=txHashes.len
trace trEthSendReplying & "EMPTY PooledTransactions (0x10)", peer,
sent=0, requested=txHashes.len
await response.send([])
let ctx = peer.networkState()
let txs = ctx.getPooledTxs(txHashes)
if txs.len > 0:
trace trEthSendReplying & "with PooledTransactions (0x0a)", peer,
sent=txs.len, requested=txHashes.len
else:
trace trEthSendReplying & "EMPTY PooledTransactions (0x0a)", peer,
sent=0, requested=txHashes.len
await response.send(txs)
# User message 0x0a: PooledTransactions.
proc pooledTransactions(peer: Peer, transactions: openArray[Transaction])
@ -257,11 +258,16 @@ p2pProtocol eth67(version = ethVersion,
trace trEthRecvReceived & "GetReceipts (0x0f)", peer,
hashes=hashes.len
trace trEthSendReplying & "EMPTY Receipts (0x10)", peer,
sent=0, requested=hashes.len
await response.send([])
# TODO: implement `getReceipts` and reactivate this code
# await response.send(peer.network.chain.getReceipts(hashes))
let ctx = peer.networkState()
let rec = ctx.getReceipts(hashes)
if rec.len > 0:
trace trEthSendReplying & "with Receipts (0x10)", peer,
sent=rec.len, requested=hashes.len
else:
trace trEthSendReplying & "EMPTY Receipts (0x10)", peer,
sent=0, requested=hashes.len
await response.send(rec)
# User message 0x10: Receipts.
proc receipts(peer: Peer, receipts: openArray[Receipt])
proc receipts(peer: Peer, receipts: openArray[seq[Receipt]])

View File

@ -0,0 +1,499 @@
import
std/[tables, sets],
chronicles, chronos,
eth/[rlp, common],
eth/p2p/[rlpx, private/p2p_types],
./private/les_types
const
maxSamples = 100000
rechargingScale = 1000000
lesStatsKey = "les.flow_control.stats"
lesStatsVer = 0
logScope:
topics = "les flow_control"
# TODO: move this somewhere
proc pop[A, B](t: var Table[A, B], key: A): B =
result = t[key]
t.del(key)
when LesTime is SomeInteger:
template `/`(lhs, rhs: LesTime): LesTime =
lhs div rhs
when defined(testing):
var lesTime* = LesTime(0)
template now(): LesTime = lesTime
template advanceTime(t) = lesTime += LesTime(t)
else:
import times
let startTime = epochTime()
proc now(): LesTime =
return LesTime((times.epochTime() - startTime) * 1000.0)
proc addSample(ra: var StatsRunningAverage; x, y: float64) =
if ra.count >= maxSamples:
let decay = float64(ra.count + 1 - maxSamples) / maxSamples
template applyDecay(x) = x -= x * decay
applyDecay ra.sumX
applyDecay ra.sumY
applyDecay ra.sumXX
applyDecay ra.sumXY
ra.count = maxSamples - 1
inc ra.count
ra.sumX += x
ra.sumY += y
ra.sumXX += x * x
ra.sumXY += x * y
proc calc(ra: StatsRunningAverage): tuple[m, b: float] =
if ra.count == 0:
return
let count = float64(ra.count)
let d = count * ra.sumXX - ra.sumX * ra.sumX
if d < 0.001:
return (m: ra.sumY / count, b: 0.0)
result.m = (count * ra.sumXY - ra.sumX * ra.sumY) / d
result.b = (ra.sumY / count) - (result.m * ra.sumX / count)
proc currentRequestsCosts*(network: LesNetwork,
les: ProtocolInfo): seq[ReqCostInfo] =
# Make sure the message costs are already initialized
doAssert network.messageStats.len > les.messages[^1].id,
"Have you called `initFlowControl`"
for msg in les.messages:
var (m, b) = network.messageStats[msg.id].calc()
if m < 0:
b += m
m = 0
if b < 0:
b = 0
result.add ReqCostInfo(msgId: msg.id,
baseCost: ReqCostInt(b * 2),
reqCost: ReqCostInt(m * 2))
proc persistMessageStats*(network: LesNetwork) =
# XXX: Because of the package_visible_types template magic, Nim complains
# when we pass the messageStats expression directly to `encodeList`
let stats = network.messageStats
network.setSetting(lesStatsKey, rlp.encodeList(lesStatsVer, stats))
proc loadMessageStats*(network: LesNetwork,
les: ProtocolInfo): bool =
block readFromDB:
var stats = network.getSetting(lesStatsKey)
if stats.len == 0:
notice "LES stats not present in the database"
break readFromDB
try:
var statsRlp = rlpFromBytes(stats)
if not statsRlp.enterList:
notice "Found a corrupted LES stats record"
break readFromDB
let version = statsRlp.read(int)
if version != lesStatsVer:
notice "Found an outdated LES stats record"
break readFromDB
statsRlp >> network.messageStats
if network.messageStats.len <= les.messages[^1].id:
notice "Found an incomplete LES stats record"
break readFromDB
return true
except RlpError as e:
error "Error while loading LES message stats", err = e.msg
newSeq(network.messageStats, les.messages[^1].id + 1)
return false
proc update(s: var FlowControlState, t: LesTime) =
let dt = max(t - s.lastUpdate, LesTime(0))
s.bufValue = min(
s.bufValue + s.minRecharge * dt,
s.bufLimit)
s.lastUpdate = t
proc init(s: var FlowControlState,
bufLimit: BufValueInt, minRecharge: int, t: LesTime) =
s.bufValue = bufLimit
s.bufLimit = bufLimit
s.minRecharge = minRecharge
s.lastUpdate = t
func canMakeRequest(s: FlowControlState,
maxCost: ReqCostInt): (LesTime, float64) =
## Returns the required waiting time before sending a request and
## the estimated buffer level afterwards (as a fraction of the limit)
const safetyMargin = 50
var maxCost = min(
maxCost + safetyMargin * s.minRecharge,
s.bufLimit)
if s.bufValue >= maxCost:
result[1] = float64(s.bufValue - maxCost) / float64(s.bufLimit)
else:
result[0] = (maxCost - s.bufValue) / s.minRecharge
func canServeRequest(srv: LesNetwork): bool =
result = srv.reqCount < srv.maxReqCount and
srv.reqCostSum < srv.maxReqCostSum
proc rechargeReqCost(peer: LesPeer, t: LesTime) =
let dt = t - peer.lastRechargeTime
peer.reqCostVal += peer.reqCostGradient * dt / rechargingScale
peer.lastRechargeTime = t
if peer.isRecharging and t >= peer.rechargingEndsAt:
peer.isRecharging = false
peer.reqCostGradient = 0
peer.reqCostVal = 0
proc updateRechargingParams(peer: LesPeer, network: LesNetwork) =
peer.reqCostGradient = 0
if peer.reqCount > 0:
peer.reqCostGradient = rechargingScale / network.reqCount
if peer.isRecharging:
peer.reqCostGradient = (network.rechargingRate * (peer.rechargingPower /
network.totalRechargingPower).int64).int
peer.rechargingEndsAt = peer.lastRechargeTime +
LesTime(peer.reqCostVal * rechargingScale /
-peer.reqCostGradient )
proc trackRequests(network: LesNetwork, peer: LesPeer, reqCountChange: int) =
peer.reqCount += reqCountChange
network.reqCount += reqCountChange
doAssert peer.reqCount >= 0 and network.reqCount >= 0
if peer.reqCount == 0:
# All requests have been finished. Start recharging.
peer.isRecharging = true
network.totalRechargingPower += peer.rechargingPower
elif peer.reqCount == reqCountChange and peer.isRecharging:
# `peer.reqCount` must have been 0 for the condition above to hold.
# This is a transition from recharging to serving state.
peer.isRecharging = false
network.totalRechargingPower -= peer.rechargingPower
peer.startReqCostVal = peer.reqCostVal
updateRechargingParams peer, network
proc updateFlowControl(network: LesNetwork, t: LesTime) =
while true:
var firstTime = t
for peer in network.peers:
# TODO: perhaps use a bin heap here
if peer.isRecharging and peer.rechargingEndsAt < firstTime:
firstTime = peer.rechargingEndsAt
let rechargingEndedForSomePeer = firstTime < t
network.reqCostSum = 0
for peer in network.peers:
peer.rechargeReqCost firstTime
network.reqCostSum += peer.reqCostVal
if rechargingEndedForSomePeer:
for peer in network.peers:
if peer.isRecharging:
updateRechargingParams peer, network
else:
network.lastUpdate = t
return
proc endPendingRequest*(network: LesNetwork, peer: LesPeer, t: LesTime) =
if peer.reqCount > 0:
network.updateFlowControl t
network.trackRequests peer, -1
network.updateFlowControl t
proc enlistInFlowControl*(network: LesNetwork,
peer: LesPeer,
peerRechargingPower = 100) =
let t = now()
doAssert peer.isServer or peer.isClient
# Each Peer must be potential communication partner for us.
# There will be useless peers on the network, but the logic
# should make sure to disconnect them earlier in `onPeerConnected`.
if peer.isServer:
peer.localFlowState.init network.bufferLimit, network.minRechargingRate, t
peer.pendingReqs = initTable[int, ReqCostInt]()
if peer.isClient:
peer.remoteFlowState.init network.bufferLimit, network.minRechargingRate, t
peer.lastRechargeTime = t
peer.rechargingEndsAt = t
peer.rechargingPower = peerRechargingPower
network.updateFlowControl t
proc delistFromFlowControl*(network: LesNetwork, peer: LesPeer) =
let t = now()
# XXX: perhaps this is not safe with our reqCount logic.
# The original code may depend on the binarity of the `serving` flag.
network.endPendingRequest peer, t
network.updateFlowControl t
proc initFlowControl*(network: LesNetwork, les: ProtocolInfo,
maxReqCount, maxReqCostSum, reqCostTarget: int) =
network.rechargingRate = rechargingScale * (rechargingScale /
(100 * rechargingScale / reqCostTarget - rechargingScale))
network.maxReqCount = maxReqCount
network.maxReqCostSum = maxReqCostSum
if not network.loadMessageStats(les):
warn "Failed to load persisted LES message stats. " &
"Flow control will be re-initilized."
proc canMakeRequest(peer: var LesPeer, maxCost: int): (LesTime, float64) =
peer.localFlowState.update now()
return peer.localFlowState.canMakeRequest(maxCost)
template getRequestCost(peer: LesPeer, localOrRemote: untyped,
msgId, costQuantity: int): ReqCostInt =
let
baseCost = peer.`localOrRemote ReqCosts`[msgId].baseCost
reqCost = peer.`localOrRemote ReqCosts`[msgId].reqCost
min(baseCost + reqCost * costQuantity,
peer.`localOrRemote FlowState`.bufLimit)
proc trackOutgoingRequest*(network: LesNetwork, peer: LesPeer,
msgId, reqId, costQuantity: int) =
let maxCost = peer.getRequestCost(local, msgId, costQuantity)
peer.localFlowState.bufValue -= maxCost
peer.pendingReqsCost += maxCost
peer.pendingReqs[reqId] = peer.pendingReqsCost
proc trackIncomingResponse*(peer: LesPeer, reqId: int, bv: BufValueInt) =
let bv = min(bv, peer.localFlowState.bufLimit)
if not peer.pendingReqs.hasKey(reqId):
return
let costsSumAtSending = peer.pendingReqs.pop(reqId)
let costsSumChange = peer.pendingReqsCost - costsSumAtSending
peer.localFlowState.bufValue = if bv > costsSumChange: bv - costsSumChange
else: 0
peer.localFlowState.lastUpdate = now()
proc acceptRequest*(network: LesNetwork, peer: LesPeer,
msgId, costQuantity: int): Future[bool] {.async.} =
let t = now()
let reqCost = peer.getRequestCost(remote, msgId, costQuantity)
peer.remoteFlowState.update t
network.updateFlowControl t
while not network.canServeRequest:
await sleepAsync(chronos.milliseconds(10))
if peer notin network.peers:
# The peer was disconnected or the network
# was shut down while we waited
return false
network.trackRequests peer, +1
network.updateFlowControl network.lastUpdate
if reqCost > peer.remoteFlowState.bufValue:
error "LES peer sent request too early",
recharge = (reqCost - peer.remoteFlowState.bufValue) * rechargingScale /
peer.remoteFlowState.minRecharge
return false
return true
proc bufValueAfterRequest*(network: LesNetwork, peer: LesPeer,
msgId: int, quantity: int): BufValueInt =
let t = now()
let costs = peer.remoteReqCosts[msgId]
var reqCost = costs.baseCost + quantity * costs.reqCost
peer.remoteFlowState.update t
peer.remoteFlowState.bufValue -= reqCost
network.endPendingRequest peer, t
let curReqCost = peer.reqCostVal
if curReqCost < peer.remoteFlowState.bufLimit:
let bv = peer.remoteFlowState.bufLimit - curReqCost
if bv > peer.remoteFlowState.bufValue:
peer.remoteFlowState.bufValue = bv
network.messageStats[msgId].addSample(float64(quantity),
float64(curReqCost - peer.startReqCostVal))
return peer.remoteFlowState.bufValue
when defined(testing):
import unittest2, random, ../../rlpx
proc isMax(s: FlowControlState): bool =
s.bufValue == s.bufLimit
p2pProtocol dummyLes(version = 1, rlpxName = "abc"):
proc a(p: Peer)
proc b(p: Peer)
proc c(p: Peer)
proc d(p: Peer)
proc e(p: Peer)
template fequals(lhs, rhs: float64, epsilon = 0.0001): bool =
abs(lhs-rhs) < epsilon
proc tests* =
randomize(3913631)
suite "les flow control":
suite "running averages":
test "consistent costs":
var s: StatsRunningAverage
for i in 0..100:
s.addSample(5.0, 100.0)
let (cost, base) = s.calc
check:
fequals(cost, 100.0)
fequals(base, 0.0)
test "randomized averages":
proc performTest(qBase, qRandom: int, cBase, cRandom: float64) =
var
s: StatsRunningAverage
expectedFinalCost = cBase + cRandom / 2
error = expectedFinalCost
for samples in [100, 1000, 10000]:
for i in 0..samples:
let q = float64(qBase + rand(10))
s.addSample(q, q * (cBase + rand(cRandom)))
let (newCost, newBase) = s.calc
# With more samples, our error should decrease, getting
# closer and closer to the average (unless we are already close enough)
let newError = abs(newCost - expectedFinalCost)
# This check fails with Nim-1.6:
# check newError < error
error = newError
# After enough samples we should be very close the the final result
check error < (expectedFinalCost * 0.02)
performTest(1, 10, 5.0, 100.0)
performTest(1, 4, 200.0, 1000.0)
suite "buffer value calculations":
type TestReq = object
peer: LesPeer
msgId, quantity: int
accepted: bool
setup:
var lesNetwork = new LesNetwork
lesNetwork.peers = initHashSet[LesPeer]()
lesNetwork.initFlowControl(dummyLes.protocolInfo,
reqCostTarget = 300,
maxReqCount = 5,
maxReqCostSum = 1000)
for i in 0 ..< lesNetwork.messageStats.len:
lesNetwork.messageStats[i].addSample(1.0, float(i) * 100.0)
var client = new LesPeer
client.isClient = true
var server = new LesPeer
server.isServer = true
var clientServer = new LesPeer
clientServer.isClient = true
clientServer.isServer = true
var client2 = new LesPeer
client2.isClient = true
var client3 = new LesPeer
client3.isClient = true
var bv: BufValueInt
template enlist(peer: LesPeer) {.dirty.} =
let reqCosts = currentRequestsCosts(lesNetwork, dummyLes.protocolInfo)
peer.remoteReqCosts = reqCosts
peer.localReqCosts = reqCosts
lesNetwork.peers.incl peer
lesNetwork.enlistInFlowControl peer
template startReq(p: LesPeer, msg, q: int): TestReq =
var req: TestReq
req.peer = p
req.msgId = msg
req.quantity = q
req.accepted = waitFor lesNetwork.acceptRequest(p, msg, q)
req
template endReq(req: TestReq): BufValueInt =
bufValueAfterRequest(lesNetwork, req.peer, req.msgId, req.quantity)
test "single peer recharging":
lesNetwork.bufferLimit = 1000
lesNetwork.minRechargingRate = 100
enlist client
check:
client.remoteFlowState.isMax
client.rechargingPower > 0
advanceTime 100
let r1 = client.startReq(0, 100)
check r1.accepted
check client.isRecharging == false
advanceTime 50
let r2 = client.startReq(1, 1)
check r2.accepted
check client.isRecharging == false
advanceTime 25
bv = endReq r2
check client.isRecharging == false
advanceTime 130
bv = endReq r1
check client.isRecharging == true
advanceTime 300
lesNetwork.updateFlowControl now()
check:
client.isRecharging == false
client.remoteFlowState.isMax

View File

@ -0,0 +1,117 @@
import
std/[hashes, tables, sets],
eth/common
type
AnnounceType* = enum
None,
Simple,
Signed,
Unspecified
ReqCostInfo* = object
msgId*: int
baseCost*, reqCost*: ReqCostInt
FlowControlState* = object
bufValue*, bufLimit*: int
minRecharge*: int
lastUpdate*: LesTime
StatsRunningAverage* = object
sumX*, sumY*, sumXX*, sumXY*: float64
count*: int
LesPeer* = ref object of RootRef
isServer*: bool
isClient*: bool
announceType*: AnnounceType
bestDifficulty*: DifficultyInt
bestBlockHash*: KeccakHash
bestBlockNumber*: BlockNumber
hasChainSince*: HashOrNum
hasStateSince*: HashOrNum
relaysTransactions*: bool
# The variables below are used to implement the flow control
# mechanisms of LES from our point of view as a server.
# They describe how much load has been generated by this
# particular peer.
reqCount*: int # How many outstanding requests are there?
#
rechargingPower*: int # Do we give this peer any extra priority
# (implemented as a faster recharning rate)
# 100 is the default. You can go higher and lower.
#
isRecharging*: bool # This is true while the peer is not making
# any requests
#
reqCostGradient*: int # Measures the speed of recharging or accumulating
# "requests cost" at any given moment.
#
reqCostVal*: int # The accumulated "requests cost"
#
rechargingEndsAt*: int # When will recharging end?
# (the buffer of the Peer will be fully restored)
#
lastRechargeTime*: LesTime # When did we last update the recharging parameters
#
startReqCostVal*: int # TODO
remoteFlowState*: FlowControlState
remoteReqCosts*: seq[ReqCostInfo]
# The next variables are used to limit ourselves as a client in order to
# not violate the control-flow requirements of the remote LES server.
pendingReqs*: Table[int, ReqCostInt]
pendingReqsCost*: int
localFlowState*: FlowControlState
localReqCosts*: seq[ReqCostInfo]
LesNetwork* = ref object of RootRef
peers*: HashSet[LesPeer]
messageStats*: seq[StatsRunningAverage]
ourAnnounceType*: AnnounceType
# The fields below are relevant when serving data.
bufferLimit*: int
minRechargingRate*: int
reqCostSum*, maxReqCostSum*: ReqCostInt
reqCount*, maxReqCount*: int
sumWeigth*: int
rechargingRate*: int64
totalRechargedUnits*: int
totalRechargingPower*: int
lastUpdate*: LesTime
KeyValuePair* = object
key*: string
value*: Blob
HandshakeError* = object of CatchableError
LesTime* = int # this is in milliseconds
BufValueInt* = int
ReqCostInt* = int
template hash*(peer: LesPeer): Hash = hash(cast[pointer](peer))
template areWeServingData*(network: LesNetwork): bool =
network.maxReqCount != 0
template areWeRequestingData*(network: LesNetwork): bool =
network.ourAnnounceType != AnnounceType.Unspecified
proc setSetting*(ctx: LesNetwork, key: string, val: openArray[byte]) =
discard
proc getSetting*(ctx: LesNetwork, key: string): seq[byte] =
discard

View File

@ -0,0 +1,522 @@
#
# Ethereum P2P
# (c) Copyright 2018
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
#
import
std/[times, tables, options, sets, hashes, strutils],
stew/shims/macros, chronicles, chronos, nimcrypto/[keccak, hash],
eth/[rlp, keys, common],
eth/p2p/[rlpx, kademlia, private/p2p_types],
./les/private/les_types, ./les/flow_control,
../types
export
les_types
type
ProofRequest* = object
blockHash*: KeccakHash
accountKey*: Blob
key*: Blob
fromLevel*: uint
ContractCodeRequest* = object
blockHash*: KeccakHash
key*: EthAddress
HelperTrieProofRequest* = object
subType*: uint
sectionIdx*: uint
key*: Blob
fromLevel*: uint
auxReq*: uint
LesStatus = object
difficulty : DifficultyInt
blockHash : Hash256
blockNumber: BlockNumber
genesisHash: Hash256
const
lesVersion = 2
maxHeadersFetch = 192
maxBodiesFetch = 32
maxReceiptsFetch = 128
maxCodeFetch = 64
maxProofsFetch = 64
maxHeaderProofsFetch = 64
maxTransactionsFetch = 64
# Handshake properties:
# https://github.com/zsfelfoldi/go-ethereum/wiki/Light-Ethereum-Subprotocol-(LES)
keyProtocolVersion = "protocolVersion"
## P: is 1 for the LPV1 protocol version.
keyNetworkId = "networkId"
## P: should be 0 for testnet, 1 for mainnet.
keyHeadTotalDifficulty = "headTd"
## P: Total Difficulty of the best chain.
## Integer, as found in block header.
keyHeadHash = "headHash"
## B_32: the hash of the best (i.e. highest TD) known block.
keyHeadNumber = "headNum"
## P: the number of the best (i.e. highest TD) known block.
keyGenesisHash = "genesisHash"
## B_32: the hash of the Genesis block.
keyServeHeaders = "serveHeaders"
## (optional, no value)
## present if the peer can serve header chain downloads.
keyServeChainSince = "serveChainSince"
## P (optional)
## present if the peer can serve Body/Receipts ODR requests
## starting from the given block number.
keyServeStateSince = "serveStateSince"
## P (optional):
## present if the peer can serve Proof/Code ODR requests
## starting from the given block number.
keyRelaysTransactions = "txRelay"
## (optional, no value)
## present if the peer can relay transactions to the ETH network.
keyFlowControlBL = "flowControl/BL"
keyFlowControlMRC = "flowControl/MRC"
keyFlowControlMRR = "flowControl/MRR"
## see Client Side Flow Control:
## https://github.com/zsfelfoldi/go-ethereum/wiki/Client-Side-Flow-Control-model-for-the-LES-protocol
keyAnnounceType = "announceType"
keyAnnounceSignature = "sign"
proc getStatus(ctx: LesNetwork): LesStatus =
discard
proc getBlockBodies(ctx: LesNetwork, hashes: openArray[Hash256]): seq[BlockBody] =
discard
proc getBlockHeaders(ctx: LesNetwork, req: BlocksRequest): seq[BlockHeader] =
discard
proc getReceipts(ctx: LesNetwork, hashes: openArray[Hash256]): seq[Receipt] =
discard
proc getProofs(ctx: LesNetwork, proofs: openArray[ProofRequest]): seq[Blob] =
discard
proc getContractCodes(ctx: LesNetwork, reqs: openArray[ContractCodeRequest]): seq[Blob] =
discard
proc getHeaderProofs(ctx: LesNetwork, reqs: openArray[ProofRequest]): seq[Blob] =
discard
proc getHelperTrieProofs(ctx: LesNetwork,
reqs: openArray[HelperTrieProofRequest],
outNodes: var seq[Blob], outAuxData: var seq[Blob]) =
discard
proc getTransactionStatus(ctx: LesNetwork, txHash: KeccakHash): TransactionStatusMsg =
discard
proc addTransactions(ctx: LesNetwork, transactions: openArray[Transaction]) =
discard
proc initProtocolState(network: LesNetwork, node: EthereumNode) {.gcsafe.} =
network.peers = initHashSet[LesPeer]()
proc addPeer(network: LesNetwork, peer: LesPeer) =
network.enlistInFlowControl peer
network.peers.incl peer
proc removePeer(network: LesNetwork, peer: LesPeer) =
network.delistFromFlowControl peer
network.peers.excl peer
template costQuantity(quantityExpr, max: untyped) {.pragma.}
proc getCostQuantity(fn: NimNode): tuple[quantityExpr, maxQuantity: NimNode] =
# XXX: `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes
# (TODO: file as an issue)
let costQuantity = fn.pragma.findPragma(bindSym"costQuantity")
doAssert costQuantity != nil
result.quantityExpr = costQuantity[1]
result.maxQuantity= costQuantity[2]
if result.maxQuantity.kind == nnkExprEqExpr:
result.maxQuantity = result.maxQuantity[1]
macro outgoingRequestDecorator(n: untyped): untyped =
result = n
let (costQuantity, maxQuantity) = n.getCostQuantity
result.body.add quote do:
trackOutgoingRequest(peer.networkState(les),
peer.state(les),
perProtocolMsgId, reqId, `costQuantity`)
# echo result.repr
macro incomingResponseDecorator(n: untyped): untyped =
result = n
let trackingCall = quote do:
trackIncomingResponse(peer.state(les), reqId, msg.bufValue)
result.body.insert(n.body.len - 1, trackingCall)
# echo result.repr
macro incomingRequestDecorator(n: untyped): untyped =
result = n
let (costQuantity, maxQuantity) = n.getCostQuantity
template acceptStep(quantityExpr, maxQuantity) {.dirty.} =
let requestCostQuantity = quantityExpr
if requestCostQuantity > maxQuantity:
await peer.disconnect(BreachOfProtocol)
return
let lesPeer = peer.state
let lesNetwork = peer.networkState
if not await acceptRequest(lesNetwork, lesPeer,
perProtocolMsgId,
requestCostQuantity): return
result.body.insert(1, getAst(acceptStep(costQuantity, maxQuantity)))
# echo result.repr
template updateBV: BufValueInt =
bufValueAfterRequest(lesNetwork, lesPeer,
perProtocolMsgId, requestCostQuantity)
func getValue(values: openArray[KeyValuePair],
key: string, T: typedesc): Option[T] =
for v in values:
if v.key == key:
return some(rlp.decode(v.value, T))
func getRequiredValue(values: openArray[KeyValuePair],
key: string, T: typedesc): T =
for v in values:
if v.key == key:
return rlp.decode(v.value, T)
raise newException(HandshakeError,
"Required handshake field " & key & " missing")
p2pProtocol les(version = lesVersion,
peerState = LesPeer,
networkState = LesNetwork,
outgoingRequestDecorator = outgoingRequestDecorator,
incomingRequestDecorator = incomingRequestDecorator,
incomingResponseThunkDecorator = incomingResponseDecorator):
handshake:
proc status(p: Peer, values: openArray[KeyValuePair])
onPeerConnected do (peer: Peer):
let
network = peer.network
lesPeer = peer.state
lesNetwork = peer.networkState
status = lesNetwork.getStatus()
template `=>`(k, v: untyped): untyped =
KeyValuePair(key: k, value: rlp.encode(v))
var lesProperties = @[
keyProtocolVersion => lesVersion,
keyNetworkId => network.networkId,
keyHeadTotalDifficulty => status.difficulty,
keyHeadHash => status.blockHash,
keyHeadNumber => status.blockNumber,
keyGenesisHash => status.genesisHash
]
lesPeer.remoteReqCosts = currentRequestsCosts(lesNetwork, les.protocolInfo)
if lesNetwork.areWeServingData:
lesProperties.add [
# keyServeHeaders => nil,
keyServeChainSince => 0,
keyServeStateSince => 0,
# keyRelaysTransactions => nil,
keyFlowControlBL => lesNetwork.bufferLimit,
keyFlowControlMRR => lesNetwork.minRechargingRate,
keyFlowControlMRC => lesPeer.remoteReqCosts
]
if lesNetwork.areWeRequestingData:
lesProperties.add(keyAnnounceType => lesNetwork.ourAnnounceType)
let
s = await peer.status(lesProperties, timeout = chronos.seconds(10))
peerNetworkId = s.values.getRequiredValue(keyNetworkId, NetworkId)
peerGenesisHash = s.values.getRequiredValue(keyGenesisHash, KeccakHash)
peerLesVersion = s.values.getRequiredValue(keyProtocolVersion, uint)
template requireCompatibility(peerVar, localVar, varName: untyped) =
if localVar != peerVar:
raise newException(HandshakeError,
"Incompatibility detected! $1 mismatch ($2 != $3)" %
[varName, $localVar, $peerVar])
requireCompatibility(peerLesVersion, uint(lesVersion), "les version")
requireCompatibility(peerNetworkId, network.networkId, "network id")
requireCompatibility(peerGenesisHash, status.genesisHash, "genesis hash")
template `:=`(lhs, key) =
lhs = s.values.getRequiredValue(key, type(lhs))
lesPeer.bestBlockHash := keyHeadHash
lesPeer.bestBlockNumber := keyHeadNumber
lesPeer.bestDifficulty := keyHeadTotalDifficulty
let peerAnnounceType = s.values.getValue(keyAnnounceType, AnnounceType)
if peerAnnounceType.isSome:
lesPeer.isClient = true
lesPeer.announceType = peerAnnounceType.get
else:
lesPeer.announceType = AnnounceType.Simple
lesPeer.hasChainSince := keyServeChainSince
lesPeer.hasStateSince := keyServeStateSince
lesPeer.relaysTransactions := keyRelaysTransactions
lesPeer.localFlowState.bufLimit := keyFlowControlBL
lesPeer.localFlowState.minRecharge := keyFlowControlMRR
lesPeer.localReqCosts := keyFlowControlMRC
lesNetwork.addPeer lesPeer
onPeerDisconnected do (peer: Peer, reason: DisconnectionReason) {.gcsafe.}:
peer.networkState.removePeer peer.state
## Header synchronisation
##
proc announce(
peer: Peer,
headHash: KeccakHash,
headNumber: BlockNumber,
headTotalDifficulty: DifficultyInt,
reorgDepth: BlockNumber,
values: openArray[KeyValuePair],
announceType: AnnounceType) =
if peer.state.announceType == AnnounceType.None:
error "unexpected announce message", peer
return
if announceType == AnnounceType.Signed:
let signature = values.getValue(keyAnnounceSignature, Blob)
if signature.isNone:
error "missing announce signature"
return
let sig = Signature.fromRaw(signature.get).tryGet()
let sigMsg = rlp.encodeList(headHash, headNumber, headTotalDifficulty)
let signerKey = recover(sig, sigMsg).tryGet()
if signerKey.toNodeId != peer.remote.id:
error "invalid announce signature"
# TODO: should we disconnect this peer?
return
# TODO: handle new block
requestResponse:
proc getBlockHeaders(
peer: Peer,
req: BlocksRequest) {.
costQuantity(req.maxResults.int, max = maxHeadersFetch).} =
let ctx = peer.networkState()
let headers = ctx.getBlockHeaders(req)
await response.send(updateBV(), headers)
proc blockHeaders(
peer: Peer,
bufValue: BufValueInt,
blocks: openArray[BlockHeader])
## On-damand data retrieval
##
requestResponse:
proc getBlockBodies(
peer: Peer,
blocks: openArray[KeccakHash]) {.
costQuantity(blocks.len, max = maxBodiesFetch), gcsafe.} =
let ctx = peer.networkState()
let blocks = ctx.getBlockBodies(blocks)
await response.send(updateBV(), blocks)
proc blockBodies(
peer: Peer,
bufValue: BufValueInt,
bodies: openArray[BlockBody])
requestResponse:
proc getReceipts(
peer: Peer,
hashes: openArray[KeccakHash])
{.costQuantity(hashes.len, max = maxReceiptsFetch).} =
let ctx = peer.networkState()
let receipts = ctx.getReceipts(hashes)
await response.send(updateBV(), receipts)
proc receipts(
peer: Peer,
bufValue: BufValueInt,
receipts: openArray[Receipt])
requestResponse:
proc getProofs(
peer: Peer,
proofs: openArray[ProofRequest]) {.
costQuantity(proofs.len, max = maxProofsFetch).} =
let ctx = peer.networkState()
let proofs = ctx.getProofs(proofs)
await response.send(updateBV(), proofs)
proc proofs(
peer: Peer,
bufValue: BufValueInt,
proofs: openArray[Blob])
requestResponse:
proc getContractCodes(
peer: Peer,
reqs: seq[ContractCodeRequest]) {.
costQuantity(reqs.len, max = maxCodeFetch).} =
let ctx = peer.networkState()
let results = ctx.getContractCodes(reqs)
await response.send(updateBV(), results)
proc contractCodes(
peer: Peer,
bufValue: BufValueInt,
results: seq[Blob])
nextID 15
requestResponse:
proc getHeaderProofs(
peer: Peer,
reqs: openArray[ProofRequest]) {.
costQuantity(reqs.len, max = maxHeaderProofsFetch).} =
let ctx = peer.networkState()
let proofs = ctx.getHeaderProofs(reqs)
await response.send(updateBV(), proofs)
proc headerProofs(
peer: Peer,
bufValue: BufValueInt,
proofs: openArray[Blob])
requestResponse:
proc getHelperTrieProofs(
peer: Peer,
reqs: openArray[HelperTrieProofRequest]) {.
costQuantity(reqs.len, max = maxProofsFetch).} =
let ctx = peer.networkState()
var nodes, auxData: seq[Blob]
ctx.getHelperTrieProofs(reqs, nodes, auxData)
await response.send(updateBV(), nodes, auxData)
proc helperTrieProofs(
peer: Peer,
bufValue: BufValueInt,
nodes: seq[Blob],
auxData: seq[Blob])
## Transaction relaying and status retrieval
##
requestResponse:
proc sendTxV2(
peer: Peer,
transactions: openArray[Transaction]) {.
costQuantity(transactions.len, max = maxTransactionsFetch).} =
let ctx = peer.networkState()
var results: seq[TransactionStatusMsg]
for t in transactions:
let hash = t.rlpHash # TODO: this is not optimal, we can compute
# the hash from the request bytes.
# The RLP module can offer a helper Hashed[T]
# to make this easy.
var s = ctx.getTransactionStatus(hash)
if s.status == TransactionStatus.Unknown:
ctx.addTransactions([t])
s = ctx.getTransactionStatus(hash)
results.add s
await response.send(updateBV(), results)
proc getTxStatus(
peer: Peer,
transactions: openArray[Transaction]) {.
costQuantity(transactions.len, max = maxTransactionsFetch).} =
let ctx = peer.networkState()
var results: seq[TransactionStatusMsg]
for t in transactions:
results.add ctx.getTransactionStatus(t.rlpHash)
await response.send(updateBV(), results)
proc txStatus(
peer: Peer,
bufValue: BufValueInt,
transactions: openArray[TransactionStatusMsg])
proc configureLes*(node: EthereumNode,
# Client options:
announceType = AnnounceType.Simple,
# Server options.
# The zero default values indicate that the
# LES server will be deactivated.
maxReqCount = 0,
maxReqCostSum = 0,
reqCostTarget = 0) =
doAssert announceType != AnnounceType.Unspecified or maxReqCount > 0
var lesNetwork = node.protocolState(les)
lesNetwork.ourAnnounceType = announceType
initFlowControl(lesNetwork, les.protocolInfo,
maxReqCount, maxReqCostSum, reqCostTarget)
proc configureLesServer*(node: EthereumNode,
# Client options:
announceType = AnnounceType.Unspecified,
# Server options.
# The zero default values indicate that the
# LES server will be deactivated.
maxReqCount = 0,
maxReqCostSum = 0,
reqCostTarget = 0) =
## This is similar to `configureLes`, but with default parameter
## values appropriate for a server.
node.configureLes(announceType, maxReqCount, maxReqCostSum, reqCostTarget)
proc persistLesMessageStats*(node: EthereumNode) =
persistMessageStats(node.protocolState(les))

View File

@ -137,7 +137,7 @@ import
std/options,
chronicles,
chronos,
eth/[common/eth_types, p2p, p2p/private/p2p_types],
eth/[common, p2p, p2p/private/p2p_types],
nimcrypto/hash,
stew/byteutils,
../../constants,

View File

@ -63,7 +63,7 @@ proc init*(
dbBackend: ChainDb,
enableTicker = false): T =
new result
result.initSync(ethNode, maxPeers, enableTicker)
result.initSync(ethNode, chain, maxPeers, enableTicker)
result.ctx.chain = chain # explicitely override
result.ctx.data.rng = rng
result.ctx.data.dbBackend = dbBackend

View File

@ -275,7 +275,7 @@ proc setup*(ctx: SnapCtxRef; tickerOK: bool): bool =
## Global set up
ctx.data.coveredAccounts = NodeTagRangeSet.init()
ctx.data.snapDb =
if ctx.data.dbBackend.isNil: SnapDbRef.init(ctx.chain.getTrieDB)
if ctx.data.dbBackend.isNil: SnapDbRef.init(ctx.chain.db.db)
else: SnapDbRef.init(ctx.data.dbBackend)
ctx.pivotSetup()
if tickerOK:

View File

@ -12,7 +12,7 @@ import
std/options,
chronos,
eth/[common/eth_types, p2p],
../../../protocol,
"../../.."/[protocol, types],
../../worker_desc,
./com_error

View File

@ -11,7 +11,7 @@
import
std/[algorithm, strutils, tables],
chronicles,
eth/[common/eth_types, trie/db],
eth/[common, trie/db],
../../../../db/[kvstore_rocksdb, storage_types],
../../../types,
../../range_desc,

View File

@ -11,7 +11,7 @@
import
std/[algorithm, sequtils, strutils, tables],
chronicles,
eth/[common/eth_types, p2p, rlp, trie/nibbles],
eth/[common, p2p, rlp, trie/nibbles, trie/db],
stew/byteutils,
../../range_desc,
"."/[bulk_storage, hexary_desc, hexary_error, hexary_import,

View File

@ -11,7 +11,7 @@
import
std/[sequtils, tables],
chronicles,
eth/[common/eth_types, p2p],
eth/[common/eth_types, p2p, trie/db],
../../../../db/select_backend,
../../range_desc,
"."/[bulk_storage, hexary_desc, hexary_error, hexary_import, hexary_paths,

View File

@ -14,7 +14,13 @@
## Public descriptors
import
eth/[common/eth_types, p2p]
eth/[common, p2p],
../p2p/chain,
../db/db_chain
export
chain,
db_chain
{.push raises: [Defect].}
@ -40,7 +46,7 @@ type
CtxRef*[S] = ref object
## Shared state among all syncing peer workers (aka buddies.)
buddiesMax*: int ## Max number of buddies
chain*: AbstractChainDB ## Block chain database (no need for `Peer`)
chain*: Chain ## Block chain database (no need for `Peer`)
poolMode*: bool ## Activate `runPool()` workers if set `true`
data*: S ## Shared context for all worker peers

View File

@ -258,6 +258,7 @@ proc onPeerDisconnected[S,W](dsc: RunnerSyncRef[S,W], peer: Peer) =
proc initSync*[S,W](
dsc: RunnerSyncRef[S,W];
node: EthereumNode;
chain: Chain,
slots: int;
noisy = false) =
## Constructor
@ -266,7 +267,7 @@ proc initSync*[S,W](
# rejected as long as its worker descriptor is registered.
dsc.ctx = CtxRef[S](
buddiesMax: max(1, slots + 1),
chain: node.chain)
chain: chain)
dsc.pool = node.peerPool
dsc.tickerOk = noisy
dsc.buddies.init(dsc.ctx.buddiesMax)

View File

@ -22,6 +22,11 @@ type
## Note that the `ethXX` protocol driver always uses the
## underlying `Hash256` type which needs to be converted to `BlockHash`.
BlocksRequest* = object
startBlock*: HashOrNum
maxResults*, skip*: uint
reverse*: bool
# ------------------------------------------------------------------------------
# Public constructors
# ------------------------------------------------------------------------------

View File

@ -287,7 +287,7 @@ proc setupEthNode*(
var node = newEthereumNode(
keypair, srvAddress,
conf.networkId,
nil, conf.agentString,
conf.agentString,
addAllCapabilities = false,
bindUdpPort = conf.udpPort, bindTcpPort = conf.tcpPort)

View File

@ -104,7 +104,7 @@ proc runTest(steps: Steps) =
sealingEngine.start()
rpcServer.start()
waitFor client.connect("localhost", Port(conf.rpcPort))
waitFor client.connect("localhost", conf.rpcPort)
suite "Engine API tests":
for i, step in steps.list:

View File

@ -147,7 +147,6 @@ proc rpcMain*() =
ks2: EthAddress = hexToByteArray[20]("0xa3b2222afa5c987da6ef773fde8d01b9f23d481f")
ks3: EthAddress = hexToByteArray[20]("0x597176e9a64aad0845d83afdaf698fbeff77703b")
ethNode.chain = newChain(chain)
let keyStore = "tests" / "keystore"
let res = ctx.am.loadKeystores(keyStore)
if res.isErr:

View File

@ -0,0 +1,41 @@
import
eth/p2p, eth/p2p/rlpx,
chronos, testutils/unittests,
../nimbus/sync/protocol
var nextPort = 30303
proc localAddress*(port: int): Address =
let port = Port(port)
result = Address(udpPort: port, tcpPort: port,
ip: parseIpAddress("127.0.0.1"))
proc setupTestNode*(
rng: ref HmacDrbgContext,
capabilities: varargs[ProtocolInfo, `protocolInfo`]): EthereumNode {.gcsafe.} =
# Don't create new RNG every time in production code!
let keys1 = KeyPair.random(rng[])
var node = newEthereumNode(
keys1, localAddress(nextPort), NetworkId(1),
addAllCapabilities = false,
bindUdpPort = Port(nextPort), bindTcpPort = Port(nextPort),
rng = rng)
nextPort.inc
for capability in capabilities:
node.addCapability capability
node
suite "Testing protocol handlers":
asyncTest "Failing connection handler":
let rng = newRng()
var node1 = setupTestNode(rng, eth)
var node2 = setupTestNode(rng, eth)
node2.startListening()
let peer = await node1.rlpxConnect(newNode(node2.toENode()))
check:
peer.isNil == false
# To check if the disconnection handler did not run
#node1.protocolState(eth).count == 0