diff --git a/nimbus/nimbus.nim b/nimbus/nimbus.nim index 4e3d3046c..0859e444e 100644 --- a/nimbus/nimbus.nim +++ b/nimbus/nimbus.nim @@ -17,7 +17,6 @@ import eth/[keys, net/nat, trie/db], eth/common as eth_common, eth/p2p as eth_p2p, - eth/p2p/[rlpx_protocols/les_protocol], json_rpc/rpcserver, metrics, metrics/[chronos_httpserver, chronicles_support], @@ -28,7 +27,7 @@ import ./graphql/ethapi, ./p2p/[chain, clique/clique_desc, clique/clique_sealer], ./rpc/[common, debug, engine_api, jwt_auth, p2p, cors], - ./sync/[fast, full, protocol, snap], + ./sync/[fast, full, protocol, snap, protocol/les_protocol, handlers], ./utils/tx_pool when defined(evmc_enabled): @@ -67,6 +66,21 @@ proc importBlocks(conf: NimbusConf, chainDB: BaseChainDB) = else: quit(QuitSuccess) +proc basicServices(nimbus: NimbusNode, + conf: NimbusConf, + chainDB: BaseChainDB) = + # app wide TxPool singleton + # TODO: disable some of txPool internal mechanism if + # the engineSigner is zero. + nimbus.txPool = TxPoolRef.new(chainDB, conf.engineSigner) + + # chainRef: some name to avoid module-name/filed/function misunderstandings + nimbus.chainRef = newChain(chainDB) + if conf.verifyFrom.isSome: + let verifyFrom = conf.verifyFrom.get() + nimbus.chainRef.extraValidation = 0 < verifyFrom + nimbus.chainRef.verifyFrom = verifyFrom + proc manageAccounts(nimbus: NimbusNode, conf: NimbusConf) = if string(conf.keyStore).len > 0: let res = nimbus.ctx.am.loadKeystores(string conf.keyStore) @@ -81,7 +95,7 @@ proc manageAccounts(nimbus: NimbusNode, conf: NimbusConf) = quit(QuitFailure) proc setupP2P(nimbus: NimbusNode, conf: NimbusConf, - chainDB: BaseChainDB, protocols: set[ProtocolFlag]) = + protocols: set[ProtocolFlag]) = ## Creating P2P Server let kpres = nimbus.ctx.getNetKeys(conf.netKey, conf.dataDir.string) if kpres.isErr: @@ -115,7 +129,7 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf, let bootstrapNodes = conf.getBootNodes() nimbus.ethNode = newEthereumNode( - keypair, address, conf.networkId, nil, conf.agentString, + keypair, address, conf.networkId, conf.agentString, addAllCapabilities = false, minPeers = conf.maxPeers, bootstrapNodes = bootstrapNodes, bindUdpPort = conf.udpPort, bindTcpPort = conf.tcpPort, @@ -124,7 +138,8 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf, # Add protocol capabilities based on protocol flags if ProtocolFlag.Eth in protocols: - nimbus.ethNode.addCapability protocol.eth + let ethWireHandler = EthWireRef.new(nimbus.chainRef, nimbus.txPool) + nimbus.ethNode.addCapability(protocol.eth, ethWireHandler) case conf.syncMode: of SyncMode.Snap: nimbus.ethNode.addCapability protocol.snap @@ -134,22 +149,14 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf, if ProtocolFlag.Les in protocols: nimbus.ethNode.addCapability les - # chainRef: some name to avoid module-name/filed/function misunderstandings - nimbus.chainRef = newChain(chainDB) - nimbus.ethNode.chain = nimbus.chainRef - if conf.verifyFrom.isSome: - let verifyFrom = conf.verifyFrom.get() - nimbus.chainRef.extraValidation = 0 < verifyFrom - nimbus.chainRef.verifyFrom = verifyFrom - # Early-initialise "--snap-sync" before starting any network connections. if ProtocolFlag.Eth in protocols: let tickerOK = conf.logLevel in {LogLevel.INFO, LogLevel.DEBUG, LogLevel.TRACE} case conf.syncMode: of SyncMode.Full: - FullSyncRef.init(nimbus.ethNode, nimbus.ctx.rng, conf.maxPeers, - tickerOK).start + FullSyncRef.init(nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, + conf.maxPeers, tickerOK).start of SyncMode.Snap: SnapSyncRef.init(nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, conf.maxPeers, nimbus.dbBackend, tickerOK).start @@ -181,12 +188,6 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf, proc localServices(nimbus: NimbusNode, conf: NimbusConf, chainDB: BaseChainDB, protocols: set[ProtocolFlag]) = - - # app wide TxPool singleton - # TODO: disable some of txPool internal mechanism if - # the engineSigner is zero. - nimbus.txPool = TxPoolRef.new(chainDB, conf.engineSigner) - # metrics logging if conf.logMetricsEnabled: # https://github.com/nim-lang/Nim/issues/17369 @@ -389,14 +390,15 @@ proc start(nimbus: NimbusNode, conf: NimbusConf) = of NimbusCmd.`import`: importBlocks(conf, chainDB) else: + basicServices(nimbus, conf, chainDB) manageAccounts(nimbus, conf) - setupP2P(nimbus, conf, chainDB, protocols) + setupP2P(nimbus, conf, protocols) localServices(nimbus, conf, chainDB, protocols) if ProtocolFlag.Eth in protocols and conf.maxPeers > 0: case conf.syncMode: of SyncMode.Default: - FastSyncCtx.new(nimbus.ethNode).start + FastSyncCtx.new(nimbus.ethNode, nimbus.chainRef).start of SyncMode.Full, SyncMode.Snap: discard diff --git a/nimbus/p2p/chain/chain_desc.nim b/nimbus/p2p/chain/chain_desc.nim index 0fccdfd58..502d9d585 100644 --- a/nimbus/p2p/chain/chain_desc.nim +++ b/nimbus/p2p/chain/chain_desc.nim @@ -18,8 +18,7 @@ import ../clique, ../validate, chronicles, - eth/common/chaindb, - eth/[common, trie/db], + eth/[common], stew/endians2, stint @@ -43,7 +42,7 @@ type ArrowGlacier, MergeFork - Chain* = ref object of AbstractChainDB + Chain* = ref object of RootRef db: BaseChainDB forkIds: array[ChainFork, ForkID] @@ -204,27 +203,6 @@ proc newChain*(db: BaseChainDB): Chain new result result.initChain(db, db.newClique, db.config.poaEngine) -# ------------------------------------------------------------------------------ -# Public `AbstractChainDB` getter overload methods -# ------------------------------------------------------------------------------ - -method genesisHash*(c: Chain): KeccakHash {.gcsafe.} = - ## Getter: `AbstractChainDB` overload method - c.blockZeroHash - -method genesisStateRoot*(c: Chain): KeccakHash {.gcsafe, base.} = - ## Getter: `AbstractChainDB` overloadable base method - c.blockZeroStateRoot - -method getBestBlockHeader*(c: Chain): BlockHeader - {.gcsafe, raises: [Defect,CatchableError].} = - ## Getter: `AbstractChainDB` overload method - c.db.getCanonicalHead() - -method getTrieDB*(c: Chain): TrieDatabaseRef {.gcsafe.} = - ## Getter: `AbstractChainDB` overload method - c.db.db - # ------------------------------------------------------------------------------ # Public `Chain` getters # ------------------------------------------------------------------------------ @@ -264,6 +242,10 @@ proc currentBlock*(c: Chain): BlockHeader ## but now it's enough to retrieve it from database c.db.getCanonicalHead() +func genesisHash*(c: Chain): Hash256 = + ## Getter + c.blockZeroHash + # ------------------------------------------------------------------------------ # Public `Chain` setters # ------------------------------------------------------------------------------ diff --git a/nimbus/p2p/chain/chain_misc.nim b/nimbus/p2p/chain/chain_misc.nim index a23f2f73f..292b6ea36 100644 --- a/nimbus/p2p/chain/chain_misc.nim +++ b/nimbus/p2p/chain/chain_misc.nim @@ -40,48 +40,11 @@ func toChainFork(c: ChainConfig, number: BlockNumber): ChainFork = elif number >= c.homesteadBlock: Homestead else: Frontier -# ------------------------------------------------------------------------------ -# Public `AbstractChainDB` overload methods -# ------------------------------------------------------------------------------ - -method getBlockHeader*(c: Chain, b: HashOrNum, output: var BlockHeader): bool - {.gcsafe, raises: [Defect,RlpError].} = - case b.isHash - of true: - c.db.getBlockHeader(b.hash, output) - else: - c.db.getBlockHeader(b.number, output) - - -method getSuccessorHeader*(c: Chain, h: BlockHeader, output: var BlockHeader, - skip = 0'u): bool {.gcsafe, raises: [Defect,RlpError].} = - let offset = 1 + skip.toBlockNumber - if h.blockNumber <= (not 0.toBlockNumber) - offset: - result = c.db.getBlockHeader(h.blockNumber + offset, output) - - -method getAncestorHeader*(c: Chain, h: BlockHeader, output: var BlockHeader, - skip = 0'u): bool {.gcsafe, raises: [Defect,RlpError].} = - let offset = 1 + skip.toBlockNumber - if h.blockNumber >= offset: - result = c.db.getBlockHeader(h.blockNumber - offset, output) - - -method getBlockBody*(c: Chain, blockHash: KeccakHash): BlockBodyRef {.gcsafe, raises: [Defect,RlpError].} = - result = BlockBodyRef() - if not c.db.getBlockBody(blockHash, result[]): - result = nil - - -method getForkId*(c: Chain, n: BlockNumber): ForkID {.gcsafe.} = +func getForkId*(c: Chain, n: BlockNumber): ForkID {.gcsafe.} = ## EIP 2364/2124 let fork = c.db.config.toChainFork(n) c.forkIds[fork] - -method getTotalDifficulty*(c: Chain): DifficultyInt {.gcsafe, raises: [Defect,RlpError].} = - c.db.headTotalDifficulty() - # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/p2p/chain/persist_blocks.nim b/nimbus/p2p/chain/persist_blocks.nim index 47f628d19..931d9d82f 100644 --- a/nimbus/p2p/chain/persist_blocks.nim +++ b/nimbus/p2p/chain/persist_blocks.nim @@ -172,11 +172,7 @@ proc setCanonical*(c: Chain, blockHash: Hash256): ValidationResult setCanonical(c, header) -# ------------------------------------------------------------------------------ -# Public `AbstractChainDB` overload method -# ------------------------------------------------------------------------------ - -method persistBlocks*(c: Chain; headers: openArray[BlockHeader]; +proc persistBlocks*(c: Chain; headers: openArray[BlockHeader]; bodies: openArray[BlockBody]): ValidationResult {.gcsafe, raises: [Defect,CatchableError].} = # Run the VM here diff --git a/nimbus/rpc/jwt_auth.nim b/nimbus/rpc/jwt_auth.nim index 4f4c6e35d..c0c32f88a 100644 --- a/nimbus/rpc/jwt_auth.nim +++ b/nimbus/rpc/jwt_auth.nim @@ -235,6 +235,7 @@ proc jwtSharedSecret*(rndSecret: JwtGenSecret; config: NimbusConf): # client using authentication. This keeps it lower-risk initially. warn "Could not write JWT secret to data directory", jwtSecretPath + discard e return ok(newSecret) try: @@ -245,7 +246,7 @@ proc jwtSharedSecret*(rndSecret: JwtGenSecret; config: NimbusConf): let rc = key.fromHex(lines[0]) if rc.isErr: return err(rc.error) - return ok(key.JwtSharedKey) + return ok(key) except IOError: return err(jwtKeyFileCannotOpen) except ValueError: diff --git a/nimbus/sync/fast.nim b/nimbus/sync/fast.nim index 3f91bde9a..b6ac4883c 100644 --- a/nimbus/sync/fast.nim +++ b/nimbus/sync/fast.nim @@ -12,10 +12,11 @@ import std/[sets, options, random, hashes, sequtils], chronicles, chronos, - eth/[common/eth_types, p2p], + eth/[common, p2p], eth/p2p/[private/p2p_types, peer_pool], stew/byteutils, - "."/[protocol, types] + "."/[protocol, types], + ../p2p/chain {.push raises:[Defect].} @@ -52,7 +53,7 @@ type workQueue: seq[WantedBlocks] endBlockNumber: BlockNumber finalizedBlock: BlockNumber # Block which was downloaded and verified - chain: AbstractChainDB + chain: Chain peerPool: PeerPool trustedPeers: HashSet[Peer] hasOutOfOrderBlocks: bool @@ -240,6 +241,7 @@ proc obtainBlocksFromPeer(syncCtx: FastSyncCtx, peer: Peer) {.async.} = debug "Exception in getBestBlockNumber()", exc = e.name, err = e.msg # no need to exit here, because the context might still have blocks to fetch # from this peer + discard e while (let workItemIdx = syncCtx.availableWorkItem(); workItemIdx != -1 and peer.connectionState notin {Disconnecting, Disconnected}): @@ -433,16 +435,16 @@ proc onPeerDisconnected(ctx: FastSyncCtx, p: Peer) = trace "peer disconnected ", peer = p ctx.trustedPeers.excl(p) -proc new*(T: type FastSyncCtx; ethNode: EthereumNode): T +proc new*(T: type FastSyncCtx; ethNode: EthereumNode; chain: Chain): T {.gcsafe, raises:[Defect,CatchableError].} = FastSyncCtx( # workQueue: n/a # endBlockNumber: n/a # hasOutOfOrderBlocks: n/a - chain: ethNode.chain, + chain: chain, peerPool: ethNode.peerPool, trustedPeers: initHashSet[Peer](), - finalizedBlock: ethNode.chain.getBestBlockHeader.blockNumber) + finalizedBlock: chain.db.getCanonicalHead().blockNumber) proc start*(ctx: FastSyncCtx) = ## Code for the fast blockchain sync procedure: diff --git a/nimbus/sync/full.nim b/nimbus/sync/full.nim index 0c08868f1..54164f526 100644 --- a/nimbus/sync/full.nim +++ b/nimbus/sync/full.nim @@ -55,11 +55,12 @@ proc runMulti(buddy: FullBuddyRef) {.async.} = proc init*( T: type FullSyncRef; ethNode: EthereumNode; + chain: Chain; rng: ref HmacDrbgContext; maxPeers: int; enableTicker = false): T = new result - result.initSync(ethNode, maxPeers, enableTicker) + result.initSync(ethNode, chain, maxPeers, enableTicker) result.ctx.data.rng = rng proc start*(ctx: FullSyncRef) = diff --git a/nimbus/sync/full/worker.nim b/nimbus/sync/full/worker.nim index 607465e2d..43398ea98 100644 --- a/nimbus/sync/full/worker.nim +++ b/nimbus/sync/full/worker.nim @@ -64,7 +64,7 @@ proc topUsedNumber( top = 0.toBlockNumber try: let - bestNumber = ctx.chain.getBestBlockHeader.blockNumber + bestNumber = ctx.chain.db.getCanonicalHead().blockNumber nBackBlocks = backBlocks.toBlockNumber # Initialise before best block number if nBackBlocks < bestNumber: @@ -95,7 +95,8 @@ proc processStaged(buddy: FullBuddyRef): bool = let ctx = buddy.ctx peer = buddy.peer - chainDb = buddy.ctx.chain + chainDb = buddy.ctx.chain.db + chain = buddy.ctx.chain bq = buddy.data.bQueue # Get a work item, a list of headers + bodies @@ -109,7 +110,7 @@ proc processStaged(buddy: FullBuddyRef): bool = # Store in persistent database try: - if chainDb.persistBlocks(wi.headers, wi.bodies) == ValidationResult.OK: + if chain.persistBlocks(wi.headers, wi.bodies) == ValidationResult.OK: bq.blockQueueAccept(wi) return true except CatchableError as e: @@ -128,11 +129,11 @@ proc processStaged(buddy: FullBuddyRef): bool = # Something went wrong. Recycle work item (needs to be re-fetched, anyway) let - parentHoN = HashOrNum(isHash: true, hash: wi.headers[0].parentHash) + parentHash = wi.headers[0].parentHash try: # Check whether hash of the first block is consistent var parent: BlockHeader - if chainDb.getBlockHeader(parentHoN, parent): + if chainDb.getBlockHeader(parentHash, parent): # First block parent is ok, so there might be other problems. Re-fetch # the blocks from another peer. trace "Storing persistent blocks failed", peer, range=($wi.blocks) diff --git a/nimbus/sync/handlers.nim b/nimbus/sync/handlers.nim new file mode 100644 index 000000000..c87b47694 --- /dev/null +++ b/nimbus/sync/handlers.nim @@ -0,0 +1,134 @@ +import + chronicles, + eth/[common, p2p, trie/db], + ./types, + ./protocol/eth/eth_types, + ../db/db_chain, + ../p2p/chain, + ../utils/tx_pool + +type + EthWireRef* = ref object of EthWireBase + db: BaseChainDB + chain: Chain + txPool: TxPoolRef + +proc new*(_: type EthWireRef, chain: Chain, txPool: TxPoolRef): EthWireRef = + EthWireRef( + db: chain.db, + chain: chain, + txPool: txPool + ) + +proc notImplemented(name: string) = + debug "Wire handler method not implemented", meth = name + +method getStatus*(ctx: EthWireRef): EthState {.gcsafe.} = + let + db = ctx.db + chain = ctx.chain + bestBlock = db.getCanonicalHead() + forkId = chain.getForkId(bestBlock.blockNumber) + + EthState( + totalDifficulty: db.headTotalDifficulty, + genesisHash: chain.genesisHash, + bestBlockHash: bestBlock.blockHash, + forkId: ChainForkId( + forkHash: forkId.crc.toBytesBE, + forkNext: forkId.nextFork.toBlockNumber + ) + ) + +method getReceipts*(ctx: EthWireRef, hashes: openArray[Hash256]): seq[seq[Receipt]] {.gcsafe.} = + let db = ctx.db + var header: BlockHeader + for blockHash in hashes: + if db.getBlockHeader(blockHash, header): + result.add db.getReceipts(header.receiptRoot) + else: + result.add @[] + trace "handlers.getReceipts: blockHeader not found", blockHash + +method getPooledTxs*(ctx: EthWireRef, hashes: openArray[Hash256]): seq[Transaction] {.gcsafe.} = + let txPool = ctx.txPool + for txHash in hashes: + let res = txPool.getItem(txHash) + if res.isOk: + result.add res.value.tx + else: + result.add Transaction() + trace "handlers.getPooledTxs: tx not found", txHash + +method getBlockBodies*(ctx: EthWireRef, hashes: openArray[Hash256]): seq[BlockBody] {.gcsafe.} = + let db = ctx.db + var body: BlockBody + for blockHash in hashes: + if db.getBlockBody(blockHash, body): + result.add body + else: + result.add BlockBody() + trace "handlers.getBlockBodies: blockBody not found", blockHash + +proc successorHeader(db: BaseChainDB, + h: BlockHeader, + output: var BlockHeader, + skip = 0'u): bool {.gcsafe, raises: [Defect,RlpError].} = + let offset = 1 + skip.toBlockNumber + if h.blockNumber <= (not 0.toBlockNumber) - offset: + result = db.getBlockHeader(h.blockNumber + offset, output) + +proc ancestorHeader*(db: BaseChainDB, + h: BlockHeader, + output: var BlockHeader, + skip = 0'u): bool {.gcsafe, raises: [Defect,RlpError].} = + let offset = 1 + skip.toBlockNumber + if h.blockNumber >= offset: + result = db.getBlockHeader(h.blockNumber - offset, output) + +proc blockHeader(db: BaseChainDB, + b: HashOrNum, + output: var BlockHeader): bool + {.gcsafe, raises: [Defect,RlpError].} = + if b.isHash: + db.getBlockHeader(b.hash, output) + else: + db.getBlockHeader(b.number, output) + +method getBlockHeaders*(ctx: EthWireRef, req: BlocksRequest): seq[BlockHeader] {.gcsafe.} = + let db = ctx.db + var foundBlock: BlockHeader + result = newSeqOfCap[BlockHeader](req.maxResults) + + if db.blockHeader(req.startBlock, foundBlock): + result.add foundBlock + + while uint64(result.len) < req.maxResults: + if not req.reverse: + if not db.successorHeader(foundBlock, foundBlock, req.skip): + break + else: + if not db.ancestorHeader(foundBlock, foundBlock, req.skip): + break + result.add foundBlock + +method handleAnnouncedTxs*(ctx: EthWireRef, peer: Peer, txs: openArray[Transaction]) {.gcsafe.} = + ctx.txPool.jobAddTxs(txs) + +method handleAnnouncedTxsHashes*(ctx: EthWireRef, peer: Peer, txHashes: openArray[Hash256]) {.gcsafe.} = + notImplemented("handleAnnouncedTxsHashes") + +method handleNewBlock*(ctx: EthWireRef, peer: Peer, blk: EthBlock, totalDifficulty: DifficultyInt) {.gcsafe.} = + notImplemented("handleNewBlock") + +method handleNewBlockHashes*(ctx: EthWireRef, peer: Peer, hashes: openArray[NewBlockHashesAnnounce]) {.gcsafe.} = + notImplemented("handleNewBlockHashes") + +when defined(legacy_eth66_enabled): + method getStorageNodes*(ctx: EthWireRef, hashes: openArray[Hash256]): seq[Blob] {.gcsafe.} = + let db = ctx.db.db + for hash in hashes: + result.add db.get(hash.data) + + method handleNodeData*(ctx: EthWireRef, peer: Peer, data: openArray[Blob]) {.gcsafe.} = + notImplemented("handleNodeData") diff --git a/nimbus/sync/misc/best_pivot.nim b/nimbus/sync/misc/best_pivot.nim index 7421a260d..2511c5d4f 100644 --- a/nimbus/sync/misc/best_pivot.nim +++ b/nimbus/sync/misc/best_pivot.nim @@ -19,7 +19,7 @@ import chronos, eth/[common/eth_types, p2p], stew/byteutils, - ".."/[protocol, sync_desc] + ".."/[protocol, sync_desc, types] {.push raises:[Defect].} diff --git a/nimbus/sync/misc/block_queue.nim b/nimbus/sync/misc/block_queue.nim index 12092faa1..59debe627 100644 --- a/nimbus/sync/misc/block_queue.nim +++ b/nimbus/sync/misc/block_queue.nim @@ -63,7 +63,7 @@ import eth/[common/eth_types, p2p], stew/[byteutils, interval_set, sorted_set], "../.."/[db/db_chain, utils], - ".."/[protocol, sync_desc] + ".."/[protocol, sync_desc, types] {.push raises:[Defect].} diff --git a/nimbus/sync/protocol.nim b/nimbus/sync/protocol.nim index b8306f3bf..a8b6ad493 100644 --- a/nimbus/sync/protocol.nim +++ b/nimbus/sync/protocol.nim @@ -16,7 +16,6 @@ else: type eth* = eth67 import - #./protocol/eth67 as proto_eth ./protocol/snap1 as proto_snap export @@ -24,7 +23,6 @@ export proto_snap type - #eth* = eth67 snap* = snap1 SnapAccountRange* = accountRangeObj diff --git a/nimbus/sync/protocol/eth/eth_types.nim b/nimbus/sync/protocol/eth/eth_types.nim new file mode 100644 index 000000000..d18c41ce3 --- /dev/null +++ b/nimbus/sync/protocol/eth/eth_types.nim @@ -0,0 +1,69 @@ +import + chronicles, + eth/[common, p2p, p2p/private/p2p_types], + ../../types + +type + NewBlockHashesAnnounce* = object + hash*: Hash256 + number*: BlockNumber + + ChainForkId* = object + forkHash*: array[4, byte] # The RLP encoding must be exactly 4 bytes. + forkNext*: BlockNumber # The RLP encoding must be variable-length + + EthWireBase* = ref object of RootRef + + EthState* = object + totalDifficulty*: DifficultyInt + genesisHash*: Hash256 + bestBlockHash*: Hash256 + forkId*: ChainForkId + + PeerState* = ref object of RootRef + initialized*: bool + bestBlockHash*: Hash256 + bestDifficulty*: DifficultyInt + +const + maxStateFetch* = 384 + maxBodiesFetch* = 128 + maxReceiptsFetch* = 256 + maxHeadersFetch* = 192 + +proc notImplemented(name: string) = + debug "Method not implemented", meth = name + +method getStatus*(ctx: EthWireBase): EthState {.base.} = + notImplemented("getStatus") + +method getReceipts*(ctx: EthWireBase, hashes: openArray[Hash256]): seq[seq[Receipt]] {.base.} = + notImplemented("getReceipts") + +method getPooledTxs*(ctx: EthWireBase, hashes: openArray[Hash256]): seq[Transaction] {.base.} = + notImplemented("getPooledTxs") + +method getBlockBodies*(ctx: EthWireBase, hashes: openArray[Hash256]): seq[BlockBody] {.base.} = + notImplemented("getBlockBodies") + +method getBlockHeaders*(ctx: EthWireBase, req: BlocksRequest): seq[BlockHeader] {.base.} = + notImplemented("getBlockHeaders") + +method handleNewBlock*(ctx: EthWireBase, peer: Peer, blk: EthBlock, totalDifficulty: DifficultyInt) {.base.} = + notImplemented("handleNewBlock") + +method handleAnnouncedTxs*(ctx: EthWireBase, peer: Peer, txs: openArray[Transaction]) {.base.} = + notImplemented("handleAnnouncedTxs") + +method handleAnnouncedTxsHashes*(ctx: EthWireBase, peer: Peer, txHashes: openArray[Hash256]) {.base.} = + notImplemented("handleAnnouncedTxsHashes") + +method handleNewBlockHashes*(ctx: EthWireBase, peer: Peer, hashes: openArray[NewBlockHashesAnnounce]) {.base.} = + notImplemented("handleNewBlockHashes") + +when defined(legacy_eth66_enabled): + method getStorageNodes*(ctx: EthWireBase, hashes: openArray[Hash256]): seq[Blob] {.base.} = + notImplemented("getStorageNodes") + + method handleNodeData*(ctx: EthWireBase, peer: Peer, data: openArray[Blob]) {.base.} = + notImplemented("handleNodeData") diff --git a/nimbus/sync/protocol/eth66.nim b/nimbus/sync/protocol/eth66.nim index 0ab5c805d..a555df388 100644 --- a/nimbus/sync/protocol/eth66.nim +++ b/nimbus/sync/protocol/eth66.nim @@ -1,4 +1,4 @@ -# Nimbus - Ethereum Wire Protocol, version eth/65 +# Nimbus - Ethereum Wire Protocol # # Copyright (c) 2018-2021 Status Research & Development GmbH # Licensed under either of @@ -9,69 +9,28 @@ # at your option. This file may not be copied, modified, or distributed # except according to those terms. -## This module implements `eth/66`, the `Ethereum Wire Protocol version 66 -## `_ -## -## Optional peply processor function hooks -## --------------------------------------- -## -## The `onGetNodeData` and `onNodeData` hooks allow new sync code to register -## for providing reply data or consume incoming events without a circular -## import dependency involving the `p2pProtocol`. -## -## Without the hooks, the protocol file needs to import functions that consume -## incoming network messages. So the `p2pProtocol` can call them, and the -## functions that produce outgoing network messages need to import the protocol -## file. -## -## But related producer/consumer function pairs are typically located in the -## very same file because they are closely related. For an example see the -## producer of `GetNodeData` and the consumer of `NodeData`. -## -## In this specific case, we need to split the `requestResponse` relationship -## between `GetNodeData` and `NodeData` messages when pipelining. -## -## Among others, this way is the most practical to acomplish the split -## implementation. It allows different protocol-using modules to coexist -## easily. When the hooks aren't set, default behaviour applies. +## This module implements Ethereum Wire Protocol version 66, `eth/66`. +## Specification: +## `eth/66 `_ import + stint, chronicles, chronos, - eth/[common/eth_types_rlp, p2p, p2p/private/p2p_types, p2p/blockchain_utils], + eth/[common, p2p, p2p/private/p2p_types], stew/byteutils, - ./trace_config + ./trace_config, + ./eth/eth_types, + ../types, + ../../utils + +export + eth_types logScope: - topics = "datax" - -type - NewBlockHashesAnnounce* = object - hash: Hash256 - number: BlockNumber - - NewBlockAnnounce* = EthBlock - - ForkId* = object - forkHash: array[4, byte] # The RLP encoding must be exactly 4 bytes. - forkNext: BlockNumber # The RLP encoding must be variable-length - - PeerState* = ref object - initialized*: bool - bestBlockHash*: Hash256 - bestDifficulty*: DifficultyInt - - onGetNodeData*: - proc (peer: Peer, hashes: openArray[Hash256], - data: var seq[Blob]) {.gcsafe.} - onNodeData*: - proc (peer: Peer, data: openArray[Blob]) {.gcsafe.} + topics = "eth66" const - maxStateFetch* = 384 - maxBodiesFetch* = 128 - maxReceiptsFetch* = 256 - maxHeadersFetch* = 192 ethVersion* = 66 prettyEthProtoName* = "[eth/" & $ethVersion & "]" @@ -82,10 +41,6 @@ const trEthRecvReceived & "BlockHeaders (0x04)" trEthRecvReceivedBlockBodies* = trEthRecvReceived & "BlockBodies (0x06)" - trEthRecvReceivedGetNodeData* = - trEthRecvReceived & "GetNodeData (0x0d)" - trEthRecvReceivedNodeData* = - trEthRecvReceived & "NodeData (0x0e)" trEthRecvProtocolViolation* = "<< " & prettyEthProtoName & " Protocol violation, " @@ -105,8 +60,6 @@ const trEthSendReplying* = ">> " & prettyEthProtoName & " Replying " - trEthSendReplyingNodeData* = - trEthSendReplying & "NodeData (0x0e)" trEthSendDelaying* = ">> " & prettyEthProtoName & " Delaying " @@ -115,50 +68,41 @@ func toHex(hash: Hash256): string = ## Shortcut for `byteutils.toHex(hash.data)` hash.data.toHex -func traceStep(request: BlocksRequest): string = - var str = if request.reverse: "-" else: "+" - if request.skip < high(typeof(request.skip)): - return str & $(request.skip + 1) - return static($(high(typeof(request.skip)).u256 + 1)) - p2pProtocol eth66(version = ethVersion, rlpxName = "eth", peerState = PeerState, + networkState = EthWireBase, useRequestIds = true): onPeerConnected do (peer: Peer): let - network = peer.network - chain = network.chain - bestBlock = chain.getBestBlockHeader - totalDifficulty = chain.getTotalDifficulty - chainForkId = chain.getForkId(bestBlock.blockNumber) - forkId = ForkId( - forkHash: chainForkId.crc.toBytesBE, - forkNext: chainForkId.nextFork.toBlockNumber) + network = peer.network + ctx = peer.networkState + status = ctx.getStatus() trace trEthSendSending & "Status (0x00)", peer, - td=totalDifficulty, - bestHash=bestBlock.blockHash.toHex, - networkId=network.networkId, - genesis=chain.genesisHash.toHex, - forkHash=forkId.forkHash.toHex, forkNext=forkId.forkNext + td = status.totalDifficulty, + bestHash = status.bestBlockHash, + networkId = network.networkId, + genesis = status.genesisHash, + forkHash = status.forkId.forkHash.toHex, + forkNext = status.forkId.forkNext let m = await peer.status(ethVersion, network.networkId, - totalDifficulty, - bestBlock.blockHash, - chain.genesisHash, - forkId, + status.totalDifficulty, + status.bestBlockHash, + status.genesisHash, + status.forkId, timeout = chronos.seconds(10)) when trEthTraceHandshakesOk: trace "Handshake: Local and remote networkId", local=network.networkId, remote=m.networkId trace "Handshake: Local and remote genesisHash", - local=chain.genesisHash.toHex, remote=m.genesisHash.toHex + local=status.genesisHash, remote=m.genesisHash trace "Handshake: Local and remote forkId", - local=(forkId.forkHash.toHex & "/" & $forkId.forkNext), + local=(status.forkId.forkHash.toHex & "/" & $status.forkId.forkNext), remote=(m.forkId.forkHash.toHex & "/" & $m.forkId.forkNext) if m.networkId != network.networkId: @@ -167,9 +111,9 @@ p2pProtocol eth66(version = ethVersion, raise newException( UselessPeerError, "Eth handshake for different network") - if m.genesisHash != chain.genesisHash: + if m.genesisHash != status.genesisHash: trace "Peer for a different network (genesisHash)", peer, - expectGenesis=chain.genesisHash.toHex, gotGenesis=m.genesisHash.toHex + expectGenesis=status.genesisHash, gotGenesis=m.genesisHash raise newException( UselessPeerError, "Eth handshake for different network") @@ -186,7 +130,7 @@ p2pProtocol eth66(version = ethVersion, totalDifficulty: DifficultyInt, bestHash: Hash256, genesisHash: Hash256, - forkId: ForkId) = + forkId: ChainForkId) = trace trEthRecvReceived & "Status (0x00)", peer, networkId, totalDifficulty, bestHash, genesisHash, forkHash=forkId.forkHash.toHex, forkNext=forkId.forkNext @@ -194,30 +138,27 @@ p2pProtocol eth66(version = ethVersion, # User message 0x01: NewBlockHashes. proc newBlockHashes(peer: Peer, hashes: openArray[NewBlockHashesAnnounce]) = when trEthTraceGossipOk: - trace trEthRecvDiscarding & "NewBlockHashes (0x01)", peer, + trace trEthRecvReceived & "NewBlockHashes (0x01)", peer, hashes=hashes.len - discard + + let ctx = peer.networkState() + ctx.handleNewBlockHashes(peer, hashes) # User message 0x02: Transactions. proc transactions(peer: Peer, transactions: openArray[Transaction]) = when trEthTraceGossipOk: - trace trEthRecvDiscarding & "Transactions (0x02)", peer, + trace trEthRecvReceived & "Transactions (0x02)", peer, transactions=transactions.len - discard + + let ctx = peer.networkState() + ctx.handleAnnouncedTxs(peer, transactions) requestResponse: # User message 0x03: GetBlockHeaders. proc getBlockHeaders(peer: Peer, request: BlocksRequest) = when trEthTracePacketsOk: - let - startBlock = - if request.startBlock.isHash: request.startBlock.hash.toHex - else: '#' & $request.startBlock.number - step = - if request.maxResults == 1: "n/a" - else: $request.traceStep trace trEthRecvReceived & "GetBlockHeaders (0x03)", peer, - startBlock, count=request.maxResults, step + count=request.maxResults if request.maxResults > uint64(maxHeadersFetch): debug "GetBlockHeaders (0x03) requested too many headers", @@ -225,7 +166,8 @@ p2pProtocol eth66(version = ethVersion, await peer.disconnect(BreachOfProtocol) return - let headers = peer.network.chain.getBlockHeaders(request) + let ctx = peer.networkState() + let headers = ctx.getBlockHeaders(request) if headers.len > 0: trace trEthSendReplying & "with BlockHeaders (0x04)", peer, sent=headers.len, requested=request.maxResults @@ -249,7 +191,8 @@ p2pProtocol eth66(version = ethVersion, await peer.disconnect(BreachOfProtocol) return - let bodies = peer.network.chain.getBlockBodies(hashes) + let ctx = peer.networkState() + let bodies = ctx.getBlockBodies(hashes) if bodies.len > 0: trace trEthSendReplying & "with BlockBodies (0x06)", peer, sent=bodies.len, requested=hashes.len @@ -263,22 +206,26 @@ p2pProtocol eth66(version = ethVersion, proc blockBodies(peer: Peer, blocks: openArray[BlockBody]) # User message 0x07: NewBlock. - proc newBlock(peer: Peer, bh: EthBlock, totalDifficulty: DifficultyInt) = + proc newBlock(peer: Peer, blk: EthBlock, totalDifficulty: DifficultyInt) = # (Note, needs to use `EthBlock` instead of its alias `NewBlockAnnounce` # because either `p2pProtocol` or RLPx doesn't work with an alias.) when trEthTraceGossipOk: - trace trEthRecvDiscarding & "NewBlock (0x07)", peer, + trace trEthRecvReceived & "NewBlock (0x07)", peer, totalDifficulty, - blockNumber = bh.header.blockNumber, - blockDifficulty = bh.header.difficulty - discard + blockNumber = blk.header.blockNumber, + blockDifficulty = blk.header.difficulty + + let ctx = peer.networkState() + ctx.handleNewBlock(peer, blk, totalDifficulty) # User message 0x08: NewPooledTransactionHashes. proc newPooledTransactionHashes(peer: Peer, txHashes: openArray[Hash256]) = when trEthTraceGossipOk: - trace trEthRecvDiscarding & "NewPooledTransactionHashes (0x08)", peer, + trace trEthRecvReceived & "NewPooledTransactionHashes (0x08)", peer, hashes=txHashes.len - discard + + let ctx = peer.networkState() + ctx.handleAnnouncedTxsHashes(peer, txHashes) requestResponse: # User message 0x09: GetPooledTransactions. @@ -286,9 +233,16 @@ p2pProtocol eth66(version = ethVersion, trace trEthRecvReceived & "GetPooledTransactions (0x09)", peer, hashes=txHashes.len - trace trEthSendReplying & "EMPTY PooledTransactions (0x10)", peer, - sent=0, requested=txHashes.len - await response.send([]) + let ctx = peer.networkState() + let txs = ctx.getPooledTxs(txHashes) + if txs.len > 0: + trace trEthSendReplying & "with PooledTransactions (0x0a)", peer, + sent=txs.len, requested=txHashes.len + else: + trace trEthSendReplying & "EMPTY PooledTransactions (0x0a)", peer, + sent=0, requested=txHashes.len + + await response.send(txs) # User message 0x0a: PooledTransactions. proc pooledTransactions(peer: Peer, transactions: openArray[Transaction]) @@ -297,29 +251,23 @@ p2pProtocol eth66(version = ethVersion, # User message 0x0d: GetNodeData. proc getNodeData(peer: Peer, nodeHashes: openArray[Hash256]) = - trace trEthRecvReceivedGetNodeData, peer, + trace trEthRecvReceived & "GetNodeData (0x0d)", peer, hashes=nodeHashes.len - var data: seq[Blob] - if not peer.state.onGetNodeData.isNil: - peer.state.onGetNodeData(peer, nodeHashes, data) - else: - data = peer.network.chain.getStorageNodes(nodeHashes) + let ctx = peer.networkState() + let data = ctx.getStorageNodes(nodeHashes) - trace trEthSendReplyingNodeData, peer, + trace trEthSendReplying & "NodeData (0x0e)", peer, sent=data.len, requested=nodeHashes.len await peer.nodeData(data) # User message 0x0e: NodeData. proc nodeData(peer: Peer, data: openArray[Blob]) = - if not peer.state.onNodeData.isNil: - # The `onNodeData` should do its own `tracePacket`, because we don't - # know if this is a valid reply ("Got reply") or something else. - peer.state.onNodeData(peer, data) - else: - trace trEthRecvDiscarding & "NodeData (0x0e)", peer, + trace trEthRecvReceived & "NodeData (0x0e)", peer, bytes=data.len + let ctx = peer.networkState() + ctx.handleNodeData(peer, data) requestResponse: # User message 0x0f: GetReceipts. @@ -327,11 +275,16 @@ p2pProtocol eth66(version = ethVersion, trace trEthRecvReceived & "GetReceipts (0x0f)", peer, hashes=hashes.len - trace trEthSendReplying & "EMPTY Receipts (0x10)", peer, - sent=0, requested=hashes.len - await response.send([]) - # TODO: implement `getReceipts` and reactivate this code - # await response.send(peer.network.chain.getReceipts(hashes)) + let ctx = peer.networkState() + let rec = ctx.getReceipts(hashes) + if rec.len > 0: + trace trEthSendReplying & "with Receipts (0x10)", peer, + sent=rec.len, requested=hashes.len + else: + trace trEthSendReplying & "EMPTY Receipts (0x10)", peer, + sent=0, requested=hashes.len + + await response.send(rec) # User message 0x10: Receipts. - proc receipts(peer: Peer, receipts: openArray[Receipt]) + proc receipts(peer: Peer, receipts: openArray[seq[Receipt]]) diff --git a/nimbus/sync/protocol/eth67.nim b/nimbus/sync/protocol/eth67.nim index 3889e5ed1..12abb3ae4 100644 --- a/nimbus/sync/protocol/eth67.nim +++ b/nimbus/sync/protocol/eth67.nim @@ -14,36 +14,23 @@ ## `eth/67 `_ import + stint, chronicles, chronos, - eth/[common/eth_types, p2p, p2p/private/p2p_types, p2p/blockchain_utils], + eth/[common, p2p, p2p/private/p2p_types], stew/byteutils, - ./trace_config + ./trace_config, + ./eth/eth_types, + ../types, + ../../utils + +export + eth_types logScope: - topics = "datax" - -type - NewBlockHashesAnnounce* = object - hash: Hash256 - number: BlockNumber - - NewBlockAnnounce* = EthBlock - - ForkId* = object - forkHash: array[4, byte] # The RLP encoding must be exactly 4 bytes. - forkNext: BlockNumber # The RLP encoding must be variable-length - - PeerState = ref object - initialized*: bool - bestBlockHash*: Hash256 - bestDifficulty*: DifficultyInt + topics = "eth67" const - maxStateFetch* = 384 - maxBodiesFetch* = 128 - maxReceiptsFetch* = 256 - maxHeadersFetch* = 192 ethVersion* = 67 prettyEthProtoName* = "[eth/" & $ethVersion & "]" @@ -84,41 +71,38 @@ func toHex(hash: Hash256): string = p2pProtocol eth67(version = ethVersion, rlpxName = "eth", peerState = PeerState, + networkState = EthWireBase, useRequestIds = true): onPeerConnected do (peer: Peer): let - network = peer.network - chain = network.chain - bestBlock = chain.getBestBlockHeader - totalDifficulty = chain.getTotalDifficulty - chainForkId = chain.getForkId(bestBlock.blockNumber) - forkId = ForkId( - forkHash: chainForkId.crc.toBytesBE, - forkNext: chainForkId.nextFork.toBlockNumber) + network = peer.network + ctx = peer.networkState + status = ctx.getStatus() trace trEthSendSending & "Status (0x00)", peer, - td=totalDifficulty, - bestHash=bestBlock.blockHash.toHex, - networkId=network.networkId, - genesis=chain.genesisHash.toHex, - forkHash=forkId.forkHash.toHex, forkNext=forkId.forkNext + td = status.totalDifficulty, + bestHash = status.bestBlockHash, + networkId = network.networkId, + genesis = status.genesisHash, + forkHash = status.forkId.forkHash.toHex, + forkNext = status.forkId.forkNext let m = await peer.status(ethVersion, network.networkId, - totalDifficulty, - bestBlock.blockHash, - chain.genesisHash, - forkId, + status.totalDifficulty, + status.bestBlockHash, + status.genesisHash, + status.forkId, timeout = chronos.seconds(10)) when trEthTraceHandshakesOk: trace "Handshake: Local and remote networkId", local=network.networkId, remote=m.networkId trace "Handshake: Local and remote genesisHash", - local=chain.genesisHash.toHex, remote=m.genesisHash.toHex + local=status.genesisHash, remote=m.genesisHash trace "Handshake: Local and remote forkId", - local=(forkId.forkHash.toHex & "/" & $forkId.forkNext), + local=(status.forkId.forkHash.toHex & "/" & $status.forkId.forkNext), remote=(m.forkId.forkHash.toHex & "/" & $m.forkId.forkNext) if m.networkId != network.networkId: @@ -127,9 +111,9 @@ p2pProtocol eth67(version = ethVersion, raise newException( UselessPeerError, "Eth handshake for different network") - if m.genesisHash != chain.genesisHash: + if m.genesisHash != status.genesisHash: trace "Peer for a different network (genesisHash)", peer, - expectGenesis=chain.genesisHash.toHex, gotGenesis=m.genesisHash.toHex + expectGenesis=status.genesisHash, gotGenesis=m.genesisHash raise newException( UselessPeerError, "Eth handshake for different network") @@ -146,7 +130,7 @@ p2pProtocol eth67(version = ethVersion, totalDifficulty: DifficultyInt, bestHash: Hash256, genesisHash: Hash256, - forkId: ForkId) = + forkId: ChainForkId) = trace trEthRecvReceived & "Status (0x00)", peer, networkId, totalDifficulty, bestHash, genesisHash, forkHash=forkId.forkHash.toHex, forkNext=forkId.forkNext @@ -154,16 +138,20 @@ p2pProtocol eth67(version = ethVersion, # User message 0x01: NewBlockHashes. proc newBlockHashes(peer: Peer, hashes: openArray[NewBlockHashesAnnounce]) = when trEthTraceGossipOk: - trace trEthRecvDiscarding & "NewBlockHashes (0x01)", peer, + trace trEthRecvReceived & "NewBlockHashes (0x01)", peer, hashes=hashes.len - discard + + let ctx = peer.networkState() + ctx.handleNewBlockHashes(peer, hashes) # User message 0x02: Transactions. proc transactions(peer: Peer, transactions: openArray[Transaction]) = when trEthTraceGossipOk: - trace trEthRecvDiscarding & "Transactions (0x02)", peer, + trace trEthRecvReceived & "Transactions (0x02)", peer, transactions=transactions.len - discard + + let ctx = peer.networkState() + ctx.handleAnnouncedTxs(peer, transactions) requestResponse: # User message 0x03: GetBlockHeaders. @@ -178,7 +166,8 @@ p2pProtocol eth67(version = ethVersion, await peer.disconnect(BreachOfProtocol) return - let headers = peer.network.chain.getBlockHeaders(request) + let ctx = peer.networkState() + let headers = ctx.getBlockHeaders(request) if headers.len > 0: trace trEthSendReplying & "with BlockHeaders (0x04)", peer, sent=headers.len, requested=request.maxResults @@ -202,7 +191,8 @@ p2pProtocol eth67(version = ethVersion, await peer.disconnect(BreachOfProtocol) return - let bodies = peer.network.chain.getBlockBodies(hashes) + let ctx = peer.networkState() + let bodies = ctx.getBlockBodies(hashes) if bodies.len > 0: trace trEthSendReplying & "with BlockBodies (0x06)", peer, sent=bodies.len, requested=hashes.len @@ -216,22 +206,26 @@ p2pProtocol eth67(version = ethVersion, proc blockBodies(peer: Peer, blocks: openArray[BlockBody]) # User message 0x07: NewBlock. - proc newBlock(peer: Peer, bh: EthBlock, totalDifficulty: DifficultyInt) = + proc newBlock(peer: Peer, blk: EthBlock, totalDifficulty: DifficultyInt) = # (Note, needs to use `EthBlock` instead of its alias `NewBlockAnnounce` # because either `p2pProtocol` or RLPx doesn't work with an alias.) when trEthTraceGossipOk: - trace trEthRecvDiscarding & "NewBlock (0x07)", peer, + trace trEthRecvReceived & "NewBlock (0x07)", peer, totalDifficulty, - blockNumber = bh.header.blockNumber, - blockDifficulty = bh.header.difficulty - discard + blockNumber = blk.header.blockNumber, + blockDifficulty = blk.header.difficulty + + let ctx = peer.networkState() + ctx.handleNewBlock(peer, blk, totalDifficulty) # User message 0x08: NewPooledTransactionHashes. proc newPooledTransactionHashes(peer: Peer, txHashes: openArray[Hash256]) = when trEthTraceGossipOk: - trace trEthRecvDiscarding & "NewPooledTransactionHashes (0x08)", peer, + trace trEthRecvReceived & "NewPooledTransactionHashes (0x08)", peer, hashes=txHashes.len - discard + + let ctx = peer.networkState() + ctx.handleAnnouncedTxsHashes(peer, txHashes) requestResponse: # User message 0x09: GetPooledTransactions. @@ -239,9 +233,16 @@ p2pProtocol eth67(version = ethVersion, trace trEthRecvReceived & "GetPooledTransactions (0x09)", peer, hashes=txHashes.len - trace trEthSendReplying & "EMPTY PooledTransactions (0x10)", peer, - sent=0, requested=txHashes.len - await response.send([]) + let ctx = peer.networkState() + let txs = ctx.getPooledTxs(txHashes) + if txs.len > 0: + trace trEthSendReplying & "with PooledTransactions (0x0a)", peer, + sent=txs.len, requested=txHashes.len + else: + trace trEthSendReplying & "EMPTY PooledTransactions (0x0a)", peer, + sent=0, requested=txHashes.len + + await response.send(txs) # User message 0x0a: PooledTransactions. proc pooledTransactions(peer: Peer, transactions: openArray[Transaction]) @@ -257,11 +258,16 @@ p2pProtocol eth67(version = ethVersion, trace trEthRecvReceived & "GetReceipts (0x0f)", peer, hashes=hashes.len - trace trEthSendReplying & "EMPTY Receipts (0x10)", peer, - sent=0, requested=hashes.len - await response.send([]) - # TODO: implement `getReceipts` and reactivate this code - # await response.send(peer.network.chain.getReceipts(hashes)) + let ctx = peer.networkState() + let rec = ctx.getReceipts(hashes) + if rec.len > 0: + trace trEthSendReplying & "with Receipts (0x10)", peer, + sent=rec.len, requested=hashes.len + else: + trace trEthSendReplying & "EMPTY Receipts (0x10)", peer, + sent=0, requested=hashes.len + + await response.send(rec) # User message 0x10: Receipts. - proc receipts(peer: Peer, receipts: openArray[Receipt]) + proc receipts(peer: Peer, receipts: openArray[seq[Receipt]]) diff --git a/nimbus/sync/protocol/les/flow_control.nim b/nimbus/sync/protocol/les/flow_control.nim new file mode 100644 index 000000000..03f4e85cb --- /dev/null +++ b/nimbus/sync/protocol/les/flow_control.nim @@ -0,0 +1,499 @@ +import + std/[tables, sets], + chronicles, chronos, + eth/[rlp, common], + eth/p2p/[rlpx, private/p2p_types], + ./private/les_types + +const + maxSamples = 100000 + rechargingScale = 1000000 + + lesStatsKey = "les.flow_control.stats" + lesStatsVer = 0 + +logScope: + topics = "les flow_control" + +# TODO: move this somewhere +proc pop[A, B](t: var Table[A, B], key: A): B = + result = t[key] + t.del(key) + +when LesTime is SomeInteger: + template `/`(lhs, rhs: LesTime): LesTime = + lhs div rhs + +when defined(testing): + var lesTime* = LesTime(0) + template now(): LesTime = lesTime + template advanceTime(t) = lesTime += LesTime(t) + +else: + import times + let startTime = epochTime() + + proc now(): LesTime = + return LesTime((times.epochTime() - startTime) * 1000.0) + +proc addSample(ra: var StatsRunningAverage; x, y: float64) = + if ra.count >= maxSamples: + let decay = float64(ra.count + 1 - maxSamples) / maxSamples + template applyDecay(x) = x -= x * decay + + applyDecay ra.sumX + applyDecay ra.sumY + applyDecay ra.sumXX + applyDecay ra.sumXY + ra.count = maxSamples - 1 + + inc ra.count + ra.sumX += x + ra.sumY += y + ra.sumXX += x * x + ra.sumXY += x * y + +proc calc(ra: StatsRunningAverage): tuple[m, b: float] = + if ra.count == 0: + return + + let count = float64(ra.count) + let d = count * ra.sumXX - ra.sumX * ra.sumX + if d < 0.001: + return (m: ra.sumY / count, b: 0.0) + + result.m = (count * ra.sumXY - ra.sumX * ra.sumY) / d + result.b = (ra.sumY / count) - (result.m * ra.sumX / count) + +proc currentRequestsCosts*(network: LesNetwork, + les: ProtocolInfo): seq[ReqCostInfo] = + # Make sure the message costs are already initialized + doAssert network.messageStats.len > les.messages[^1].id, + "Have you called `initFlowControl`" + + for msg in les.messages: + var (m, b) = network.messageStats[msg.id].calc() + if m < 0: + b += m + m = 0 + + if b < 0: + b = 0 + + result.add ReqCostInfo(msgId: msg.id, + baseCost: ReqCostInt(b * 2), + reqCost: ReqCostInt(m * 2)) + +proc persistMessageStats*(network: LesNetwork) = + # XXX: Because of the package_visible_types template magic, Nim complains + # when we pass the messageStats expression directly to `encodeList` + let stats = network.messageStats + network.setSetting(lesStatsKey, rlp.encodeList(lesStatsVer, stats)) + +proc loadMessageStats*(network: LesNetwork, + les: ProtocolInfo): bool = + block readFromDB: + var stats = network.getSetting(lesStatsKey) + if stats.len == 0: + notice "LES stats not present in the database" + break readFromDB + + try: + var statsRlp = rlpFromBytes(stats) + if not statsRlp.enterList: + notice "Found a corrupted LES stats record" + break readFromDB + + let version = statsRlp.read(int) + if version != lesStatsVer: + notice "Found an outdated LES stats record" + break readFromDB + + statsRlp >> network.messageStats + if network.messageStats.len <= les.messages[^1].id: + notice "Found an incomplete LES stats record" + break readFromDB + + return true + + except RlpError as e: + error "Error while loading LES message stats", err = e.msg + + newSeq(network.messageStats, les.messages[^1].id + 1) + return false + +proc update(s: var FlowControlState, t: LesTime) = + let dt = max(t - s.lastUpdate, LesTime(0)) + + s.bufValue = min( + s.bufValue + s.minRecharge * dt, + s.bufLimit) + + s.lastUpdate = t + +proc init(s: var FlowControlState, + bufLimit: BufValueInt, minRecharge: int, t: LesTime) = + s.bufValue = bufLimit + s.bufLimit = bufLimit + s.minRecharge = minRecharge + s.lastUpdate = t + +func canMakeRequest(s: FlowControlState, + maxCost: ReqCostInt): (LesTime, float64) = + ## Returns the required waiting time before sending a request and + ## the estimated buffer level afterwards (as a fraction of the limit) + const safetyMargin = 50 + + var maxCost = min( + maxCost + safetyMargin * s.minRecharge, + s.bufLimit) + + if s.bufValue >= maxCost: + result[1] = float64(s.bufValue - maxCost) / float64(s.bufLimit) + else: + result[0] = (maxCost - s.bufValue) / s.minRecharge + +func canServeRequest(srv: LesNetwork): bool = + result = srv.reqCount < srv.maxReqCount and + srv.reqCostSum < srv.maxReqCostSum + +proc rechargeReqCost(peer: LesPeer, t: LesTime) = + let dt = t - peer.lastRechargeTime + peer.reqCostVal += peer.reqCostGradient * dt / rechargingScale + peer.lastRechargeTime = t + if peer.isRecharging and t >= peer.rechargingEndsAt: + peer.isRecharging = false + peer.reqCostGradient = 0 + peer.reqCostVal = 0 + +proc updateRechargingParams(peer: LesPeer, network: LesNetwork) = + peer.reqCostGradient = 0 + if peer.reqCount > 0: + peer.reqCostGradient = rechargingScale / network.reqCount + + if peer.isRecharging: + peer.reqCostGradient = (network.rechargingRate * (peer.rechargingPower / + network.totalRechargingPower).int64).int + peer.rechargingEndsAt = peer.lastRechargeTime + + LesTime(peer.reqCostVal * rechargingScale / + -peer.reqCostGradient ) + +proc trackRequests(network: LesNetwork, peer: LesPeer, reqCountChange: int) = + peer.reqCount += reqCountChange + network.reqCount += reqCountChange + + doAssert peer.reqCount >= 0 and network.reqCount >= 0 + + if peer.reqCount == 0: + # All requests have been finished. Start recharging. + peer.isRecharging = true + network.totalRechargingPower += peer.rechargingPower + elif peer.reqCount == reqCountChange and peer.isRecharging: + # `peer.reqCount` must have been 0 for the condition above to hold. + # This is a transition from recharging to serving state. + peer.isRecharging = false + network.totalRechargingPower -= peer.rechargingPower + peer.startReqCostVal = peer.reqCostVal + + updateRechargingParams peer, network + +proc updateFlowControl(network: LesNetwork, t: LesTime) = + while true: + var firstTime = t + for peer in network.peers: + # TODO: perhaps use a bin heap here + if peer.isRecharging and peer.rechargingEndsAt < firstTime: + firstTime = peer.rechargingEndsAt + + let rechargingEndedForSomePeer = firstTime < t + + network.reqCostSum = 0 + for peer in network.peers: + peer.rechargeReqCost firstTime + network.reqCostSum += peer.reqCostVal + + if rechargingEndedForSomePeer: + for peer in network.peers: + if peer.isRecharging: + updateRechargingParams peer, network + else: + network.lastUpdate = t + return + +proc endPendingRequest*(network: LesNetwork, peer: LesPeer, t: LesTime) = + if peer.reqCount > 0: + network.updateFlowControl t + network.trackRequests peer, -1 + network.updateFlowControl t + +proc enlistInFlowControl*(network: LesNetwork, + peer: LesPeer, + peerRechargingPower = 100) = + let t = now() + + doAssert peer.isServer or peer.isClient + # Each Peer must be potential communication partner for us. + # There will be useless peers on the network, but the logic + # should make sure to disconnect them earlier in `onPeerConnected`. + + if peer.isServer: + peer.localFlowState.init network.bufferLimit, network.minRechargingRate, t + peer.pendingReqs = initTable[int, ReqCostInt]() + + if peer.isClient: + peer.remoteFlowState.init network.bufferLimit, network.minRechargingRate, t + peer.lastRechargeTime = t + peer.rechargingEndsAt = t + peer.rechargingPower = peerRechargingPower + + network.updateFlowControl t + +proc delistFromFlowControl*(network: LesNetwork, peer: LesPeer) = + let t = now() + + # XXX: perhaps this is not safe with our reqCount logic. + # The original code may depend on the binarity of the `serving` flag. + network.endPendingRequest peer, t + network.updateFlowControl t + +proc initFlowControl*(network: LesNetwork, les: ProtocolInfo, + maxReqCount, maxReqCostSum, reqCostTarget: int) = + network.rechargingRate = rechargingScale * (rechargingScale / + (100 * rechargingScale / reqCostTarget - rechargingScale)) + network.maxReqCount = maxReqCount + network.maxReqCostSum = maxReqCostSum + + if not network.loadMessageStats(les): + warn "Failed to load persisted LES message stats. " & + "Flow control will be re-initilized." + +proc canMakeRequest(peer: var LesPeer, maxCost: int): (LesTime, float64) = + peer.localFlowState.update now() + return peer.localFlowState.canMakeRequest(maxCost) + +template getRequestCost(peer: LesPeer, localOrRemote: untyped, + msgId, costQuantity: int): ReqCostInt = + let + baseCost = peer.`localOrRemote ReqCosts`[msgId].baseCost + reqCost = peer.`localOrRemote ReqCosts`[msgId].reqCost + + min(baseCost + reqCost * costQuantity, + peer.`localOrRemote FlowState`.bufLimit) + +proc trackOutgoingRequest*(network: LesNetwork, peer: LesPeer, + msgId, reqId, costQuantity: int) = + let maxCost = peer.getRequestCost(local, msgId, costQuantity) + + peer.localFlowState.bufValue -= maxCost + peer.pendingReqsCost += maxCost + peer.pendingReqs[reqId] = peer.pendingReqsCost + +proc trackIncomingResponse*(peer: LesPeer, reqId: int, bv: BufValueInt) = + let bv = min(bv, peer.localFlowState.bufLimit) + if not peer.pendingReqs.hasKey(reqId): + return + + let costsSumAtSending = peer.pendingReqs.pop(reqId) + let costsSumChange = peer.pendingReqsCost - costsSumAtSending + + peer.localFlowState.bufValue = if bv > costsSumChange: bv - costsSumChange + else: 0 + peer.localFlowState.lastUpdate = now() + +proc acceptRequest*(network: LesNetwork, peer: LesPeer, + msgId, costQuantity: int): Future[bool] {.async.} = + let t = now() + let reqCost = peer.getRequestCost(remote, msgId, costQuantity) + + peer.remoteFlowState.update t + network.updateFlowControl t + + while not network.canServeRequest: + await sleepAsync(chronos.milliseconds(10)) + + if peer notin network.peers: + # The peer was disconnected or the network + # was shut down while we waited + return false + + network.trackRequests peer, +1 + network.updateFlowControl network.lastUpdate + + if reqCost > peer.remoteFlowState.bufValue: + error "LES peer sent request too early", + recharge = (reqCost - peer.remoteFlowState.bufValue) * rechargingScale / + peer.remoteFlowState.minRecharge + return false + + return true + +proc bufValueAfterRequest*(network: LesNetwork, peer: LesPeer, + msgId: int, quantity: int): BufValueInt = + let t = now() + let costs = peer.remoteReqCosts[msgId] + var reqCost = costs.baseCost + quantity * costs.reqCost + + peer.remoteFlowState.update t + peer.remoteFlowState.bufValue -= reqCost + + network.endPendingRequest peer, t + + let curReqCost = peer.reqCostVal + if curReqCost < peer.remoteFlowState.bufLimit: + let bv = peer.remoteFlowState.bufLimit - curReqCost + if bv > peer.remoteFlowState.bufValue: + peer.remoteFlowState.bufValue = bv + + network.messageStats[msgId].addSample(float64(quantity), + float64(curReqCost - peer.startReqCostVal)) + + return peer.remoteFlowState.bufValue + +when defined(testing): + import unittest2, random, ../../rlpx + + proc isMax(s: FlowControlState): bool = + s.bufValue == s.bufLimit + + p2pProtocol dummyLes(version = 1, rlpxName = "abc"): + proc a(p: Peer) + proc b(p: Peer) + proc c(p: Peer) + proc d(p: Peer) + proc e(p: Peer) + + template fequals(lhs, rhs: float64, epsilon = 0.0001): bool = + abs(lhs-rhs) < epsilon + + proc tests* = + randomize(3913631) + + suite "les flow control": + suite "running averages": + test "consistent costs": + var s: StatsRunningAverage + for i in 0..100: + s.addSample(5.0, 100.0) + + let (cost, base) = s.calc + + check: + fequals(cost, 100.0) + fequals(base, 0.0) + + test "randomized averages": + proc performTest(qBase, qRandom: int, cBase, cRandom: float64) = + var + s: StatsRunningAverage + expectedFinalCost = cBase + cRandom / 2 + error = expectedFinalCost + + for samples in [100, 1000, 10000]: + for i in 0..samples: + let q = float64(qBase + rand(10)) + s.addSample(q, q * (cBase + rand(cRandom))) + + let (newCost, newBase) = s.calc + # With more samples, our error should decrease, getting + # closer and closer to the average (unless we are already close enough) + let newError = abs(newCost - expectedFinalCost) + # This check fails with Nim-1.6: + # check newError < error + error = newError + + # After enough samples we should be very close the the final result + check error < (expectedFinalCost * 0.02) + + performTest(1, 10, 5.0, 100.0) + performTest(1, 4, 200.0, 1000.0) + + suite "buffer value calculations": + type TestReq = object + peer: LesPeer + msgId, quantity: int + accepted: bool + + setup: + var lesNetwork = new LesNetwork + lesNetwork.peers = initHashSet[LesPeer]() + lesNetwork.initFlowControl(dummyLes.protocolInfo, + reqCostTarget = 300, + maxReqCount = 5, + maxReqCostSum = 1000) + + for i in 0 ..< lesNetwork.messageStats.len: + lesNetwork.messageStats[i].addSample(1.0, float(i) * 100.0) + + var client = new LesPeer + client.isClient = true + + var server = new LesPeer + server.isServer = true + + var clientServer = new LesPeer + clientServer.isClient = true + clientServer.isServer = true + + var client2 = new LesPeer + client2.isClient = true + + var client3 = new LesPeer + client3.isClient = true + + var bv: BufValueInt + + template enlist(peer: LesPeer) {.dirty.} = + let reqCosts = currentRequestsCosts(lesNetwork, dummyLes.protocolInfo) + peer.remoteReqCosts = reqCosts + peer.localReqCosts = reqCosts + lesNetwork.peers.incl peer + lesNetwork.enlistInFlowControl peer + + template startReq(p: LesPeer, msg, q: int): TestReq = + var req: TestReq + req.peer = p + req.msgId = msg + req.quantity = q + req.accepted = waitFor lesNetwork.acceptRequest(p, msg, q) + req + + template endReq(req: TestReq): BufValueInt = + bufValueAfterRequest(lesNetwork, req.peer, req.msgId, req.quantity) + + test "single peer recharging": + lesNetwork.bufferLimit = 1000 + lesNetwork.minRechargingRate = 100 + + enlist client + + check: + client.remoteFlowState.isMax + client.rechargingPower > 0 + + advanceTime 100 + + let r1 = client.startReq(0, 100) + check r1.accepted + check client.isRecharging == false + + advanceTime 50 + + let r2 = client.startReq(1, 1) + check r2.accepted + check client.isRecharging == false + + advanceTime 25 + bv = endReq r2 + check client.isRecharging == false + + advanceTime 130 + bv = endReq r1 + check client.isRecharging == true + + advanceTime 300 + lesNetwork.updateFlowControl now() + + check: + client.isRecharging == false + client.remoteFlowState.isMax + diff --git a/nimbus/sync/protocol/les/private/les_types.nim b/nimbus/sync/protocol/les/private/les_types.nim new file mode 100644 index 000000000..b6eae8785 --- /dev/null +++ b/nimbus/sync/protocol/les/private/les_types.nim @@ -0,0 +1,117 @@ +import + std/[hashes, tables, sets], + eth/common + +type + AnnounceType* = enum + None, + Simple, + Signed, + Unspecified + + ReqCostInfo* = object + msgId*: int + baseCost*, reqCost*: ReqCostInt + + FlowControlState* = object + bufValue*, bufLimit*: int + minRecharge*: int + lastUpdate*: LesTime + + StatsRunningAverage* = object + sumX*, sumY*, sumXX*, sumXY*: float64 + count*: int + + LesPeer* = ref object of RootRef + isServer*: bool + isClient*: bool + announceType*: AnnounceType + + bestDifficulty*: DifficultyInt + bestBlockHash*: KeccakHash + bestBlockNumber*: BlockNumber + + hasChainSince*: HashOrNum + hasStateSince*: HashOrNum + relaysTransactions*: bool + + # The variables below are used to implement the flow control + # mechanisms of LES from our point of view as a server. + # They describe how much load has been generated by this + # particular peer. + reqCount*: int # How many outstanding requests are there? + # + rechargingPower*: int # Do we give this peer any extra priority + # (implemented as a faster recharning rate) + # 100 is the default. You can go higher and lower. + # + isRecharging*: bool # This is true while the peer is not making + # any requests + # + reqCostGradient*: int # Measures the speed of recharging or accumulating + # "requests cost" at any given moment. + # + reqCostVal*: int # The accumulated "requests cost" + # + rechargingEndsAt*: int # When will recharging end? + # (the buffer of the Peer will be fully restored) + # + lastRechargeTime*: LesTime # When did we last update the recharging parameters + # + startReqCostVal*: int # TODO + + remoteFlowState*: FlowControlState + remoteReqCosts*: seq[ReqCostInfo] + + # The next variables are used to limit ourselves as a client in order to + # not violate the control-flow requirements of the remote LES server. + + pendingReqs*: Table[int, ReqCostInt] + pendingReqsCost*: int + + localFlowState*: FlowControlState + localReqCosts*: seq[ReqCostInfo] + + LesNetwork* = ref object of RootRef + peers*: HashSet[LesPeer] + messageStats*: seq[StatsRunningAverage] + ourAnnounceType*: AnnounceType + + # The fields below are relevant when serving data. + bufferLimit*: int + minRechargingRate*: int + + reqCostSum*, maxReqCostSum*: ReqCostInt + reqCount*, maxReqCount*: int + sumWeigth*: int + + rechargingRate*: int64 + totalRechargedUnits*: int + totalRechargingPower*: int + + lastUpdate*: LesTime + + KeyValuePair* = object + key*: string + value*: Blob + + HandshakeError* = object of CatchableError + + LesTime* = int # this is in milliseconds + BufValueInt* = int + ReqCostInt* = int + +template hash*(peer: LesPeer): Hash = hash(cast[pointer](peer)) + +template areWeServingData*(network: LesNetwork): bool = + network.maxReqCount != 0 + +template areWeRequestingData*(network: LesNetwork): bool = + network.ourAnnounceType != AnnounceType.Unspecified + +proc setSetting*(ctx: LesNetwork, key: string, val: openArray[byte]) = + discard + +proc getSetting*(ctx: LesNetwork, key: string): seq[byte] = + discard + diff --git a/nimbus/sync/protocol/les_protocol.nim b/nimbus/sync/protocol/les_protocol.nim new file mode 100644 index 000000000..6f4374562 --- /dev/null +++ b/nimbus/sync/protocol/les_protocol.nim @@ -0,0 +1,522 @@ +# +# Ethereum P2P +# (c) Copyright 2018 +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) +# + +import + std/[times, tables, options, sets, hashes, strutils], + stew/shims/macros, chronicles, chronos, nimcrypto/[keccak, hash], + eth/[rlp, keys, common], + eth/p2p/[rlpx, kademlia, private/p2p_types], + ./les/private/les_types, ./les/flow_control, + ../types + +export + les_types + +type + ProofRequest* = object + blockHash*: KeccakHash + accountKey*: Blob + key*: Blob + fromLevel*: uint + + ContractCodeRequest* = object + blockHash*: KeccakHash + key*: EthAddress + + HelperTrieProofRequest* = object + subType*: uint + sectionIdx*: uint + key*: Blob + fromLevel*: uint + auxReq*: uint + + LesStatus = object + difficulty : DifficultyInt + blockHash : Hash256 + blockNumber: BlockNumber + genesisHash: Hash256 + +const + lesVersion = 2 + maxHeadersFetch = 192 + maxBodiesFetch = 32 + maxReceiptsFetch = 128 + maxCodeFetch = 64 + maxProofsFetch = 64 + maxHeaderProofsFetch = 64 + maxTransactionsFetch = 64 + + # Handshake properties: + # https://github.com/zsfelfoldi/go-ethereum/wiki/Light-Ethereum-Subprotocol-(LES) + keyProtocolVersion = "protocolVersion" + ## P: is 1 for the LPV1 protocol version. + + keyNetworkId = "networkId" + ## P: should be 0 for testnet, 1 for mainnet. + + keyHeadTotalDifficulty = "headTd" + ## P: Total Difficulty of the best chain. + ## Integer, as found in block header. + + keyHeadHash = "headHash" + ## B_32: the hash of the best (i.e. highest TD) known block. + + keyHeadNumber = "headNum" + ## P: the number of the best (i.e. highest TD) known block. + + keyGenesisHash = "genesisHash" + ## B_32: the hash of the Genesis block. + + keyServeHeaders = "serveHeaders" + ## (optional, no value) + ## present if the peer can serve header chain downloads. + + keyServeChainSince = "serveChainSince" + ## P (optional) + ## present if the peer can serve Body/Receipts ODR requests + ## starting from the given block number. + + keyServeStateSince = "serveStateSince" + ## P (optional): + ## present if the peer can serve Proof/Code ODR requests + ## starting from the given block number. + + keyRelaysTransactions = "txRelay" + ## (optional, no value) + ## present if the peer can relay transactions to the ETH network. + + keyFlowControlBL = "flowControl/BL" + keyFlowControlMRC = "flowControl/MRC" + keyFlowControlMRR = "flowControl/MRR" + ## see Client Side Flow Control: + ## https://github.com/zsfelfoldi/go-ethereum/wiki/Client-Side-Flow-Control-model-for-the-LES-protocol + + keyAnnounceType = "announceType" + keyAnnounceSignature = "sign" + +proc getStatus(ctx: LesNetwork): LesStatus = + discard + +proc getBlockBodies(ctx: LesNetwork, hashes: openArray[Hash256]): seq[BlockBody] = + discard + +proc getBlockHeaders(ctx: LesNetwork, req: BlocksRequest): seq[BlockHeader] = + discard + +proc getReceipts(ctx: LesNetwork, hashes: openArray[Hash256]): seq[Receipt] = + discard + +proc getProofs(ctx: LesNetwork, proofs: openArray[ProofRequest]): seq[Blob] = + discard + +proc getContractCodes(ctx: LesNetwork, reqs: openArray[ContractCodeRequest]): seq[Blob] = + discard + +proc getHeaderProofs(ctx: LesNetwork, reqs: openArray[ProofRequest]): seq[Blob] = + discard + +proc getHelperTrieProofs(ctx: LesNetwork, + reqs: openArray[HelperTrieProofRequest], + outNodes: var seq[Blob], outAuxData: var seq[Blob]) = + discard + +proc getTransactionStatus(ctx: LesNetwork, txHash: KeccakHash): TransactionStatusMsg = + discard + +proc addTransactions(ctx: LesNetwork, transactions: openArray[Transaction]) = + discard + +proc initProtocolState(network: LesNetwork, node: EthereumNode) {.gcsafe.} = + network.peers = initHashSet[LesPeer]() + +proc addPeer(network: LesNetwork, peer: LesPeer) = + network.enlistInFlowControl peer + network.peers.incl peer + +proc removePeer(network: LesNetwork, peer: LesPeer) = + network.delistFromFlowControl peer + network.peers.excl peer + +template costQuantity(quantityExpr, max: untyped) {.pragma.} + +proc getCostQuantity(fn: NimNode): tuple[quantityExpr, maxQuantity: NimNode] = + # XXX: `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes + # (TODO: file as an issue) + let costQuantity = fn.pragma.findPragma(bindSym"costQuantity") + doAssert costQuantity != nil + + result.quantityExpr = costQuantity[1] + result.maxQuantity= costQuantity[2] + + if result.maxQuantity.kind == nnkExprEqExpr: + result.maxQuantity = result.maxQuantity[1] + +macro outgoingRequestDecorator(n: untyped): untyped = + result = n + let (costQuantity, maxQuantity) = n.getCostQuantity + + result.body.add quote do: + trackOutgoingRequest(peer.networkState(les), + peer.state(les), + perProtocolMsgId, reqId, `costQuantity`) + # echo result.repr + +macro incomingResponseDecorator(n: untyped): untyped = + result = n + + let trackingCall = quote do: + trackIncomingResponse(peer.state(les), reqId, msg.bufValue) + + result.body.insert(n.body.len - 1, trackingCall) + # echo result.repr + +macro incomingRequestDecorator(n: untyped): untyped = + result = n + let (costQuantity, maxQuantity) = n.getCostQuantity + + template acceptStep(quantityExpr, maxQuantity) {.dirty.} = + let requestCostQuantity = quantityExpr + if requestCostQuantity > maxQuantity: + await peer.disconnect(BreachOfProtocol) + return + + let lesPeer = peer.state + let lesNetwork = peer.networkState + + if not await acceptRequest(lesNetwork, lesPeer, + perProtocolMsgId, + requestCostQuantity): return + + result.body.insert(1, getAst(acceptStep(costQuantity, maxQuantity))) + # echo result.repr + +template updateBV: BufValueInt = + bufValueAfterRequest(lesNetwork, lesPeer, + perProtocolMsgId, requestCostQuantity) + +func getValue(values: openArray[KeyValuePair], + key: string, T: typedesc): Option[T] = + for v in values: + if v.key == key: + return some(rlp.decode(v.value, T)) + +func getRequiredValue(values: openArray[KeyValuePair], + key: string, T: typedesc): T = + for v in values: + if v.key == key: + return rlp.decode(v.value, T) + + raise newException(HandshakeError, + "Required handshake field " & key & " missing") + +p2pProtocol les(version = lesVersion, + peerState = LesPeer, + networkState = LesNetwork, + outgoingRequestDecorator = outgoingRequestDecorator, + incomingRequestDecorator = incomingRequestDecorator, + incomingResponseThunkDecorator = incomingResponseDecorator): + handshake: + proc status(p: Peer, values: openArray[KeyValuePair]) + + onPeerConnected do (peer: Peer): + let + network = peer.network + lesPeer = peer.state + lesNetwork = peer.networkState + status = lesNetwork.getStatus() + + template `=>`(k, v: untyped): untyped = + KeyValuePair(key: k, value: rlp.encode(v)) + + var lesProperties = @[ + keyProtocolVersion => lesVersion, + keyNetworkId => network.networkId, + keyHeadTotalDifficulty => status.difficulty, + keyHeadHash => status.blockHash, + keyHeadNumber => status.blockNumber, + keyGenesisHash => status.genesisHash + ] + + lesPeer.remoteReqCosts = currentRequestsCosts(lesNetwork, les.protocolInfo) + + if lesNetwork.areWeServingData: + lesProperties.add [ + # keyServeHeaders => nil, + keyServeChainSince => 0, + keyServeStateSince => 0, + # keyRelaysTransactions => nil, + keyFlowControlBL => lesNetwork.bufferLimit, + keyFlowControlMRR => lesNetwork.minRechargingRate, + keyFlowControlMRC => lesPeer.remoteReqCosts + ] + + if lesNetwork.areWeRequestingData: + lesProperties.add(keyAnnounceType => lesNetwork.ourAnnounceType) + + let + s = await peer.status(lesProperties, timeout = chronos.seconds(10)) + peerNetworkId = s.values.getRequiredValue(keyNetworkId, NetworkId) + peerGenesisHash = s.values.getRequiredValue(keyGenesisHash, KeccakHash) + peerLesVersion = s.values.getRequiredValue(keyProtocolVersion, uint) + + template requireCompatibility(peerVar, localVar, varName: untyped) = + if localVar != peerVar: + raise newException(HandshakeError, + "Incompatibility detected! $1 mismatch ($2 != $3)" % + [varName, $localVar, $peerVar]) + + requireCompatibility(peerLesVersion, uint(lesVersion), "les version") + requireCompatibility(peerNetworkId, network.networkId, "network id") + requireCompatibility(peerGenesisHash, status.genesisHash, "genesis hash") + + template `:=`(lhs, key) = + lhs = s.values.getRequiredValue(key, type(lhs)) + + lesPeer.bestBlockHash := keyHeadHash + lesPeer.bestBlockNumber := keyHeadNumber + lesPeer.bestDifficulty := keyHeadTotalDifficulty + + let peerAnnounceType = s.values.getValue(keyAnnounceType, AnnounceType) + if peerAnnounceType.isSome: + lesPeer.isClient = true + lesPeer.announceType = peerAnnounceType.get + else: + lesPeer.announceType = AnnounceType.Simple + lesPeer.hasChainSince := keyServeChainSince + lesPeer.hasStateSince := keyServeStateSince + lesPeer.relaysTransactions := keyRelaysTransactions + lesPeer.localFlowState.bufLimit := keyFlowControlBL + lesPeer.localFlowState.minRecharge := keyFlowControlMRR + lesPeer.localReqCosts := keyFlowControlMRC + + lesNetwork.addPeer lesPeer + + onPeerDisconnected do (peer: Peer, reason: DisconnectionReason) {.gcsafe.}: + peer.networkState.removePeer peer.state + + ## Header synchronisation + ## + + proc announce( + peer: Peer, + headHash: KeccakHash, + headNumber: BlockNumber, + headTotalDifficulty: DifficultyInt, + reorgDepth: BlockNumber, + values: openArray[KeyValuePair], + announceType: AnnounceType) = + + if peer.state.announceType == AnnounceType.None: + error "unexpected announce message", peer + return + + if announceType == AnnounceType.Signed: + let signature = values.getValue(keyAnnounceSignature, Blob) + if signature.isNone: + error "missing announce signature" + return + let sig = Signature.fromRaw(signature.get).tryGet() + let sigMsg = rlp.encodeList(headHash, headNumber, headTotalDifficulty) + let signerKey = recover(sig, sigMsg).tryGet() + if signerKey.toNodeId != peer.remote.id: + error "invalid announce signature" + # TODO: should we disconnect this peer? + return + + # TODO: handle new block + + requestResponse: + proc getBlockHeaders( + peer: Peer, + req: BlocksRequest) {. + costQuantity(req.maxResults.int, max = maxHeadersFetch).} = + + let ctx = peer.networkState() + let headers = ctx.getBlockHeaders(req) + await response.send(updateBV(), headers) + + proc blockHeaders( + peer: Peer, + bufValue: BufValueInt, + blocks: openArray[BlockHeader]) + + ## On-damand data retrieval + ## + + requestResponse: + proc getBlockBodies( + peer: Peer, + blocks: openArray[KeccakHash]) {. + costQuantity(blocks.len, max = maxBodiesFetch), gcsafe.} = + + let ctx = peer.networkState() + let blocks = ctx.getBlockBodies(blocks) + await response.send(updateBV(), blocks) + + proc blockBodies( + peer: Peer, + bufValue: BufValueInt, + bodies: openArray[BlockBody]) + + requestResponse: + proc getReceipts( + peer: Peer, + hashes: openArray[KeccakHash]) + {.costQuantity(hashes.len, max = maxReceiptsFetch).} = + + let ctx = peer.networkState() + let receipts = ctx.getReceipts(hashes) + await response.send(updateBV(), receipts) + + proc receipts( + peer: Peer, + bufValue: BufValueInt, + receipts: openArray[Receipt]) + + requestResponse: + proc getProofs( + peer: Peer, + proofs: openArray[ProofRequest]) {. + costQuantity(proofs.len, max = maxProofsFetch).} = + + let ctx = peer.networkState() + let proofs = ctx.getProofs(proofs) + await response.send(updateBV(), proofs) + + proc proofs( + peer: Peer, + bufValue: BufValueInt, + proofs: openArray[Blob]) + + requestResponse: + proc getContractCodes( + peer: Peer, + reqs: seq[ContractCodeRequest]) {. + costQuantity(reqs.len, max = maxCodeFetch).} = + + let ctx = peer.networkState() + let results = ctx.getContractCodes(reqs) + await response.send(updateBV(), results) + + proc contractCodes( + peer: Peer, + bufValue: BufValueInt, + results: seq[Blob]) + + nextID 15 + + requestResponse: + proc getHeaderProofs( + peer: Peer, + reqs: openArray[ProofRequest]) {. + costQuantity(reqs.len, max = maxHeaderProofsFetch).} = + + let ctx = peer.networkState() + let proofs = ctx.getHeaderProofs(reqs) + await response.send(updateBV(), proofs) + + proc headerProofs( + peer: Peer, + bufValue: BufValueInt, + proofs: openArray[Blob]) + + requestResponse: + proc getHelperTrieProofs( + peer: Peer, + reqs: openArray[HelperTrieProofRequest]) {. + costQuantity(reqs.len, max = maxProofsFetch).} = + + let ctx = peer.networkState() + var nodes, auxData: seq[Blob] + ctx.getHelperTrieProofs(reqs, nodes, auxData) + await response.send(updateBV(), nodes, auxData) + + proc helperTrieProofs( + peer: Peer, + bufValue: BufValueInt, + nodes: seq[Blob], + auxData: seq[Blob]) + + ## Transaction relaying and status retrieval + ## + + requestResponse: + proc sendTxV2( + peer: Peer, + transactions: openArray[Transaction]) {. + costQuantity(transactions.len, max = maxTransactionsFetch).} = + + let ctx = peer.networkState() + + var results: seq[TransactionStatusMsg] + for t in transactions: + let hash = t.rlpHash # TODO: this is not optimal, we can compute + # the hash from the request bytes. + # The RLP module can offer a helper Hashed[T] + # to make this easy. + var s = ctx.getTransactionStatus(hash) + if s.status == TransactionStatus.Unknown: + ctx.addTransactions([t]) + s = ctx.getTransactionStatus(hash) + + results.add s + + await response.send(updateBV(), results) + + proc getTxStatus( + peer: Peer, + transactions: openArray[Transaction]) {. + costQuantity(transactions.len, max = maxTransactionsFetch).} = + + let ctx = peer.networkState() + + var results: seq[TransactionStatusMsg] + for t in transactions: + results.add ctx.getTransactionStatus(t.rlpHash) + await response.send(updateBV(), results) + + proc txStatus( + peer: Peer, + bufValue: BufValueInt, + transactions: openArray[TransactionStatusMsg]) + +proc configureLes*(node: EthereumNode, + # Client options: + announceType = AnnounceType.Simple, + # Server options. + # The zero default values indicate that the + # LES server will be deactivated. + maxReqCount = 0, + maxReqCostSum = 0, + reqCostTarget = 0) = + + doAssert announceType != AnnounceType.Unspecified or maxReqCount > 0 + + var lesNetwork = node.protocolState(les) + lesNetwork.ourAnnounceType = announceType + initFlowControl(lesNetwork, les.protocolInfo, + maxReqCount, maxReqCostSum, reqCostTarget) + +proc configureLesServer*(node: EthereumNode, + # Client options: + announceType = AnnounceType.Unspecified, + # Server options. + # The zero default values indicate that the + # LES server will be deactivated. + maxReqCount = 0, + maxReqCostSum = 0, + reqCostTarget = 0) = + ## This is similar to `configureLes`, but with default parameter + ## values appropriate for a server. + node.configureLes(announceType, maxReqCount, maxReqCostSum, reqCostTarget) + +proc persistLesMessageStats*(node: EthereumNode) = + persistMessageStats(node.protocolState(les)) + diff --git a/nimbus/sync/protocol/snap1.nim b/nimbus/sync/protocol/snap1.nim index 07357e3d1..5a21cf13a 100644 --- a/nimbus/sync/protocol/snap1.nim +++ b/nimbus/sync/protocol/snap1.nim @@ -137,7 +137,7 @@ import std/options, chronicles, chronos, - eth/[common/eth_types, p2p, p2p/private/p2p_types], + eth/[common, p2p, p2p/private/p2p_types], nimcrypto/hash, stew/byteutils, ../../constants, diff --git a/nimbus/sync/snap.nim b/nimbus/sync/snap.nim index 6c8c2e4a2..4b88e4695 100644 --- a/nimbus/sync/snap.nim +++ b/nimbus/sync/snap.nim @@ -63,7 +63,7 @@ proc init*( dbBackend: ChainDb, enableTicker = false): T = new result - result.initSync(ethNode, maxPeers, enableTicker) + result.initSync(ethNode, chain, maxPeers, enableTicker) result.ctx.chain = chain # explicitely override result.ctx.data.rng = rng result.ctx.data.dbBackend = dbBackend diff --git a/nimbus/sync/snap/worker.nim b/nimbus/sync/snap/worker.nim index 5a878348a..97a6a3b78 100644 --- a/nimbus/sync/snap/worker.nim +++ b/nimbus/sync/snap/worker.nim @@ -275,7 +275,7 @@ proc setup*(ctx: SnapCtxRef; tickerOK: bool): bool = ## Global set up ctx.data.coveredAccounts = NodeTagRangeSet.init() ctx.data.snapDb = - if ctx.data.dbBackend.isNil: SnapDbRef.init(ctx.chain.getTrieDB) + if ctx.data.dbBackend.isNil: SnapDbRef.init(ctx.chain.db.db) else: SnapDbRef.init(ctx.data.dbBackend) ctx.pivotSetup() if tickerOK: diff --git a/nimbus/sync/snap/worker/com/get_block_header.nim b/nimbus/sync/snap/worker/com/get_block_header.nim index 817622d1d..c89be8449 100644 --- a/nimbus/sync/snap/worker/com/get_block_header.nim +++ b/nimbus/sync/snap/worker/com/get_block_header.nim @@ -12,7 +12,7 @@ import std/options, chronos, eth/[common/eth_types, p2p], - ../../../protocol, + "../../.."/[protocol, types], ../../worker_desc, ./com_error diff --git a/nimbus/sync/snap/worker/db/bulk_storage.nim b/nimbus/sync/snap/worker/db/bulk_storage.nim index 14e733d80..813def0a1 100644 --- a/nimbus/sync/snap/worker/db/bulk_storage.nim +++ b/nimbus/sync/snap/worker/db/bulk_storage.nim @@ -11,7 +11,7 @@ import std/[algorithm, strutils, tables], chronicles, - eth/[common/eth_types, trie/db], + eth/[common, trie/db], ../../../../db/[kvstore_rocksdb, storage_types], ../../../types, ../../range_desc, diff --git a/nimbus/sync/snap/worker/db/snapdb_accounts.nim b/nimbus/sync/snap/worker/db/snapdb_accounts.nim index b718461b2..764b324fc 100644 --- a/nimbus/sync/snap/worker/db/snapdb_accounts.nim +++ b/nimbus/sync/snap/worker/db/snapdb_accounts.nim @@ -11,7 +11,7 @@ import std/[algorithm, sequtils, strutils, tables], chronicles, - eth/[common/eth_types, p2p, rlp, trie/nibbles], + eth/[common, p2p, rlp, trie/nibbles, trie/db], stew/byteutils, ../../range_desc, "."/[bulk_storage, hexary_desc, hexary_error, hexary_import, diff --git a/nimbus/sync/snap/worker/db/snapdb_desc.nim b/nimbus/sync/snap/worker/db/snapdb_desc.nim index 9e5a1f7b3..707f6406f 100644 --- a/nimbus/sync/snap/worker/db/snapdb_desc.nim +++ b/nimbus/sync/snap/worker/db/snapdb_desc.nim @@ -11,7 +11,7 @@ import std/[sequtils, tables], chronicles, - eth/[common/eth_types, p2p], + eth/[common/eth_types, p2p, trie/db], ../../../../db/select_backend, ../../range_desc, "."/[bulk_storage, hexary_desc, hexary_error, hexary_import, hexary_paths, diff --git a/nimbus/sync/sync_desc.nim b/nimbus/sync/sync_desc.nim index 142f25135..4c89bbc6f 100644 --- a/nimbus/sync/sync_desc.nim +++ b/nimbus/sync/sync_desc.nim @@ -14,7 +14,13 @@ ## Public descriptors import - eth/[common/eth_types, p2p] + eth/[common, p2p], + ../p2p/chain, + ../db/db_chain + +export + chain, + db_chain {.push raises: [Defect].} @@ -40,7 +46,7 @@ type CtxRef*[S] = ref object ## Shared state among all syncing peer workers (aka buddies.) buddiesMax*: int ## Max number of buddies - chain*: AbstractChainDB ## Block chain database (no need for `Peer`) + chain*: Chain ## Block chain database (no need for `Peer`) poolMode*: bool ## Activate `runPool()` workers if set `true` data*: S ## Shared context for all worker peers diff --git a/nimbus/sync/sync_sched.nim b/nimbus/sync/sync_sched.nim index b9a49c54e..0637a5e9f 100644 --- a/nimbus/sync/sync_sched.nim +++ b/nimbus/sync/sync_sched.nim @@ -258,6 +258,7 @@ proc onPeerDisconnected[S,W](dsc: RunnerSyncRef[S,W], peer: Peer) = proc initSync*[S,W]( dsc: RunnerSyncRef[S,W]; node: EthereumNode; + chain: Chain, slots: int; noisy = false) = ## Constructor @@ -266,7 +267,7 @@ proc initSync*[S,W]( # rejected as long as its worker descriptor is registered. dsc.ctx = CtxRef[S]( buddiesMax: max(1, slots + 1), - chain: node.chain) + chain: chain) dsc.pool = node.peerPool dsc.tickerOk = noisy dsc.buddies.init(dsc.ctx.buddiesMax) diff --git a/nimbus/sync/types.nim b/nimbus/sync/types.nim index 2d2c9263a..62fa8b5fa 100644 --- a/nimbus/sync/types.nim +++ b/nimbus/sync/types.nim @@ -22,6 +22,11 @@ type ## Note that the `ethXX` protocol driver always uses the ## underlying `Hash256` type which needs to be converted to `BlockHash`. + BlocksRequest* = object + startBlock*: HashOrNum + maxResults*, skip*: uint + reverse*: bool + # ------------------------------------------------------------------------------ # Public constructors # ------------------------------------------------------------------------------ diff --git a/tests/test_helpers.nim b/tests/test_helpers.nim index 504b4ceef..e2cbce9d3 100644 --- a/tests/test_helpers.nim +++ b/tests/test_helpers.nim @@ -287,7 +287,7 @@ proc setupEthNode*( var node = newEthereumNode( keypair, srvAddress, conf.networkId, - nil, conf.agentString, + conf.agentString, addAllCapabilities = false, bindUdpPort = conf.udpPort, bindTcpPort = conf.tcpPort) diff --git a/tests/test_merge.nim b/tests/test_merge.nim index 7b9ae5973..ad0ec4ffe 100644 --- a/tests/test_merge.nim +++ b/tests/test_merge.nim @@ -104,7 +104,7 @@ proc runTest(steps: Steps) = sealingEngine.start() rpcServer.start() - waitFor client.connect("localhost", Port(conf.rpcPort)) + waitFor client.connect("localhost", conf.rpcPort) suite "Engine API tests": for i, step in steps.list: diff --git a/tests/test_rpc.nim b/tests/test_rpc.nim index 47e0a2598..6fd5e5e88 100644 --- a/tests/test_rpc.nim +++ b/tests/test_rpc.nim @@ -147,7 +147,6 @@ proc rpcMain*() = ks2: EthAddress = hexToByteArray[20]("0xa3b2222afa5c987da6ef773fde8d01b9f23d481f") ks3: EthAddress = hexToByteArray[20]("0x597176e9a64aad0845d83afdaf698fbeff77703b") - ethNode.chain = newChain(chain) let keyStore = "tests" / "keystore" let res = ctx.am.loadKeystores(keyStore) if res.isErr: diff --git a/tests/test_wire_protocol.nim b/tests/test_wire_protocol.nim new file mode 100644 index 000000000..05eea2b46 --- /dev/null +++ b/tests/test_wire_protocol.nim @@ -0,0 +1,41 @@ +import + eth/p2p, eth/p2p/rlpx, + chronos, testutils/unittests, + ../nimbus/sync/protocol + +var nextPort = 30303 + +proc localAddress*(port: int): Address = + let port = Port(port) + result = Address(udpPort: port, tcpPort: port, + ip: parseIpAddress("127.0.0.1")) + +proc setupTestNode*( + rng: ref HmacDrbgContext, + capabilities: varargs[ProtocolInfo, `protocolInfo`]): EthereumNode {.gcsafe.} = + # Don't create new RNG every time in production code! + let keys1 = KeyPair.random(rng[]) + var node = newEthereumNode( + keys1, localAddress(nextPort), NetworkId(1), + addAllCapabilities = false, + bindUdpPort = Port(nextPort), bindTcpPort = Port(nextPort), + rng = rng) + nextPort.inc + for capability in capabilities: + node.addCapability capability + + node + + +suite "Testing protocol handlers": + asyncTest "Failing connection handler": + let rng = newRng() + + var node1 = setupTestNode(rng, eth) + var node2 = setupTestNode(rng, eth) + node2.startListening() + let peer = await node1.rlpxConnect(newNode(node2.toENode())) + check: + peer.isNil == false + # To check if the disconnection handler did not run + #node1.protocolState(eth).count == 0