From ae616935c3e1162e68f6c7662ae6a8667ff7e2db Mon Sep 17 00:00:00 2001 From: jangko Date: Tue, 11 Oct 2022 10:02:16 +0700 Subject: [PATCH] move eth1 specific code to where it belongs --- eth/common/chaindb.nim | 100 ---- eth/common/eth_types.nim | 38 -- eth/common/state_accessors.nim | 27 - eth/p2p.nim | 3 +- eth/p2p/blockchain_sync.nim | 412 -------------- eth/p2p/blockchain_utils.nim | 52 -- eth/p2p/private/p2p_types.nim | 4 - eth/p2p/rlpx_protocols/eth_protocol.nim | 117 ---- eth/p2p/rlpx_protocols/les/flow_control.nim | 506 ------------------ .../rlpx_protocols/les/private/les_types.nim | 111 ---- eth/p2p/rlpx_protocols/les_protocol.nim | 460 ---------------- eth/p2p/sync.nim | 45 -- tests/fuzzing/rlpx/thunk.nim | 2 +- tests/p2p/all_tests.nim | 3 +- tests/p2p/eth_protocol.nim | 51 ++ tests/p2p/les/test_flow_control.nim | 7 - tests/p2p/p2p_test_helper.nim | 2 +- tests/p2p/test_rlpx_thunk.nim | 7 +- 18 files changed, 59 insertions(+), 1888 deletions(-) delete mode 100644 eth/common/chaindb.nim delete mode 100644 eth/common/state_accessors.nim delete mode 100644 eth/p2p/blockchain_sync.nim delete mode 100644 eth/p2p/blockchain_utils.nim delete mode 100644 eth/p2p/rlpx_protocols/eth_protocol.nim delete mode 100644 eth/p2p/rlpx_protocols/les/flow_control.nim delete mode 100644 eth/p2p/rlpx_protocols/les/private/les_types.nim delete mode 100644 eth/p2p/rlpx_protocols/les_protocol.nim delete mode 100644 eth/p2p/sync.nim create mode 100644 tests/p2p/eth_protocol.nim delete mode 100644 tests/p2p/les/test_flow_control.nim diff --git a/eth/common/chaindb.nim b/eth/common/chaindb.nim deleted file mode 100644 index 9475077..0000000 --- a/eth/common/chaindb.nim +++ /dev/null @@ -1,100 +0,0 @@ -# Copyright (c) 2022 Status Research & Development GmbH -# Licensed and distributed under either of -# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). -# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). -# at your option. This file may not be copied, modified, or distributed except according to those terms. - -import - chronicles, - ./eth_types_rlp, - ../rlp, - ../trie/db - -export eth_types_rlp, rlp, db - -type - AbstractChainDB* = ref object of RootRef - -proc notImplemented(name: string) = - debug "Method not implemented", meth = name - -method genesisHash*(db: AbstractChainDB): KeccakHash - {.base, gcsafe, raises: [Defect].} = - notImplemented("genesisHash") - -method getBlockHeader*(db: AbstractChainDB, b: HashOrNum, - output: var BlockHeader): bool {.base, gcsafe, raises: [RlpError, Defect].} = - notImplemented("getBlockHeader") - -proc getBlockHeader*(db: AbstractChainDB, hash: KeccakHash): BlockHeaderRef {.gcsafe.} = - new result - if not db.getBlockHeader(HashOrNum(isHash: true, hash: hash), result[]): - return nil - -proc getBlockHeader*(db: AbstractChainDB, b: BlockNumber): BlockHeaderRef {.gcsafe.} = - new result - if not db.getBlockHeader(HashOrNum(isHash: false, number: b), result[]): - return nil - -# Need to add `RlpError` and sometimes `CatchableError` as the implementations -# of these methods in nimbus-eth1 will raise these. Using `CatchableError` -# because some can raise for errors not know to this repository such as -# `CanonicalHeadNotFound`. It would probably be better to use Result. -method getBestBlockHeader*(self: AbstractChainDB): BlockHeader - {.base, gcsafe, raises: [RlpError, CatchableError, Defect].} = - notImplemented("getBestBlockHeader") - -method getSuccessorHeader*(db: AbstractChainDB, h: BlockHeader, - output: var BlockHeader, skip = 0'u): bool - {.base, gcsafe, raises: [RlpError, Defect].} = - notImplemented("getSuccessorHeader") - -method getAncestorHeader*(db: AbstractChainDB, h: BlockHeader, - output: var BlockHeader, skip = 0'u): bool - {.base, gcsafe, raises: [RlpError, Defect].} = - notImplemented("getAncestorHeader") - -method getBlockBody*(db: AbstractChainDB, blockHash: KeccakHash): BlockBodyRef - {.base, gcsafe, raises: [RlpError, Defect].} = - notImplemented("getBlockBody") - -method getReceipt*(db: AbstractChainDB, hash: KeccakHash): ReceiptRef {.base, gcsafe.} = - notImplemented("getReceipt") - -method getTrieDB*(db: AbstractChainDB): TrieDatabaseRef - {.base, gcsafe, raises: [Defect].} = - notImplemented("getTrieDB") - -method getCodeByHash*(db: AbstractChainDB, hash: KeccakHash): Blob {.base, gcsafe.} = - notImplemented("getCodeByHash") - -method getSetting*(db: AbstractChainDB, key: string): seq[byte] {.base, gcsafe.} = - notImplemented("getSetting") - -method setSetting*(db: AbstractChainDB, key: string, val: openArray[byte]) {.base, gcsafe.} = - notImplemented("setSetting") - -method getHeaderProof*(db: AbstractChainDB, req: ProofRequest): Blob {.base, gcsafe.} = - notImplemented("getHeaderProof") - -method getProof*(db: AbstractChainDB, req: ProofRequest): Blob {.base, gcsafe.} = - notImplemented("getProof") - -method getHelperTrieProof*(db: AbstractChainDB, req: HelperTrieProofRequest): Blob {.base, gcsafe.} = - notImplemented("getHelperTrieProof") - -method getTransactionStatus*(db: AbstractChainDB, txHash: KeccakHash): TransactionStatusMsg {.base, gcsafe.} = - notImplemented("getTransactionStatus") - -method addTransactions*(db: AbstractChainDB, transactions: openArray[Transaction]) {.base, gcsafe.} = - notImplemented("addTransactions") - -method persistBlocks*(db: AbstractChainDB, headers: openArray[BlockHeader], bodies: openArray[BlockBody]): ValidationResult {.base, gcsafe.} = - notImplemented("persistBlocks") - -method getForkId*(db: AbstractChainDB, n: BlockNumber): ForkID {.base, gcsafe.} = - # EIP 2364/2124 - notImplemented("getForkId") - -method getTotalDifficulty*(db: AbstractChainDB): DifficultyInt {.base, gcsafe, raises: [RlpError, Defect].} = - notImplemented("getTotalDifficulty") diff --git a/eth/common/eth_types.nim b/eth/common/eth_types.nim index f85854e..8e8c5b3 100644 --- a/eth/common/eth_types.nim +++ b/eth/common/eth_types.nim @@ -143,17 +143,6 @@ type txs*: seq[Transaction] uncles*: seq[BlockHeader] - CollationHeader* = object - shard*: uint - expectedPeriod*: uint - periodStartPrevHash*: Hash256 - parentHash*: Hash256 - txRoot*: Hash256 - coinbase*: EthAddress - stateRoot*: Hash256 - receiptRoot*: Hash256 - blockNumber*: BlockNumber - # TODO: Make BlockNumber a uint64 and deprecate either this or BlockHashOrNumber HashOrNum* = object case isHash*: bool @@ -169,33 +158,6 @@ type else: number*: uint64 - BlocksRequest* = object - startBlock*: HashOrNum - maxResults*, skip*: uint - reverse*: bool - - ProofRequest* = object - blockHash*: KeccakHash - accountKey*: Blob - key*: Blob - fromLevel*: uint - - HeaderProofRequest* = object - chtNumber*: uint - blockNumber*: uint - fromLevel*: uint - - ContractCodeRequest* = object - blockHash*: KeccakHash - key*: EthAddress - - HelperTrieProofRequest* = object - subType*: uint - sectionIdx*: uint - key*: Blob - fromLevel*: uint - auxReq*: uint - BlockHeaderRef* = ref BlockHeader BlockBodyRef* = ref BlockBody ReceiptRef* = ref Receipt diff --git a/eth/common/state_accessors.nim b/eth/common/state_accessors.nim deleted file mode 100644 index f3f8ad0..0000000 --- a/eth/common/state_accessors.nim +++ /dev/null @@ -1,27 +0,0 @@ -import - ../trie/[trie_defs, db, hexary], - ../rlp, - ./chaindb - -export chaindb - -proc getAccount*(db: TrieDatabaseRef, - rootHash: KeccakHash, - account: EthAddress): Account = - let trie = initSecureHexaryTrie(db, rootHash) - let data = trie.get(account) - if data.len > 0: - result = rlp.decode(data, Account) - else: - result = newAccount() - -proc getContractCode*(chain: AbstractChainDB, req: ContractCodeRequest): Blob {.gcsafe.} = - let b = chain.getBlockHeader(req.blockHash) - if b.hasData: - let acc = getAccount(chain.getTrieDB, b.stateRoot, req.key) - result = chain.getCodeByHash(acc.codeHash) - -proc getStorageNode*(chain: AbstractChainDB, hash: KeccakHash): Blob - {.raises: [CatchableError, Defect].} = - let db = chain.getTrieDB - return db.get(hash.data) diff --git a/eth/p2p.nim b/eth/p2p.nim index f5982e7..fe990e8 100644 --- a/eth/p2p.nim +++ b/eth/p2p.nim @@ -10,7 +10,7 @@ import std/[tables, algorithm, random], chronos, chronos/timer, chronicles, - ./keys, ./common/chaindb, ./p2p/private/p2p_types, + ./keys, ./p2p/private/p2p_types, ./p2p/[kademlia, discovery, enode, peer_pool, rlpx] export @@ -33,7 +33,6 @@ proc newEthereumNode*( keys: KeyPair, address: Address, networkId: NetworkId, - chain: AbstractChainDB, clientId = "nim-eth-p2p/0.2.0", # TODO: read this value from nimble somehow addAllCapabilities = true, useCompression: bool = false, diff --git a/eth/p2p/blockchain_sync.nim b/eth/p2p/blockchain_sync.nim deleted file mode 100644 index eeb54b4..0000000 --- a/eth/p2p/blockchain_sync.nim +++ /dev/null @@ -1,412 +0,0 @@ -# nim-eth -# Copyright (c) 2018-2021 Status Research & Development GmbH -# Licensed and distributed under either of -# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). -# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). -# at your option. This file may not be copied, modified, or distributed except according to those terms. - -import - std/[sets, options, random, hashes], - chronos, chronicles, - ../common/eth_types, ../p2p, - ./private/p2p_types, ./rlpx_protocols/eth_protocol, "."/[rlpx, peer_pool] - -const - minPeersToStartSync* = 2 # Wait for consensus of at least this - # number of peers before syncing - -type - SyncStatus* = enum - syncSuccess - syncNotEnoughPeers - syncTimeOut - - WantedBlocksState = enum - Initial, - Requested, - Received, - Persisted - - WantedBlocks = object - startIndex: BlockNumber - numBlocks: uint - state: WantedBlocksState - headers: seq[BlockHeader] - bodies: seq[BlockBody] - - SyncContext = ref object - workQueue: seq[WantedBlocks] - endBlockNumber: BlockNumber - finalizedBlock: BlockNumber # Block which was downloaded and verified - chain: AbstractChainDB - peerPool: PeerPool - trustedPeers: HashSet[Peer] - hasOutOfOrderBlocks: bool - -proc hash*(p: Peer): Hash = hash(cast[pointer](p)) - -proc endIndex(b: WantedBlocks): BlockNumber = - result = b.startIndex - result += (b.numBlocks - 1).toBlockNumber - -proc availableWorkItem(ctx: SyncContext): int = - var maxPendingBlock = ctx.finalizedBlock # the last downloaded & processed - trace "queue len", length = ctx.workQueue.len - result = -1 - for i in 0 .. ctx.workQueue.high: - case ctx.workQueue[i].state - of Initial: - # When there is a work item at Initial state, immediatly use this one. - # This usually means a previous work item that failed somewhere in the - # process, and thus can be reused to work on. - return i - of Persisted: - # In case of Persisted, we can reset this work item to a new one. - result = i - # No break here to give work items in Initial state priority and to - # calculate endBlock. - else: - discard - - # Check all endBlocks of all workqueue items to decide on next range of - # blocks to collect & process. - let endBlock = ctx.workQueue[i].endIndex - if endBlock > maxPendingBlock: - maxPendingBlock = endBlock - - let nextRequestedBlock = maxPendingBlock + 1 - # If this next block doesn't exist yet according to any of our peers, don't - # return a work item (and sync will be stopped). - if nextRequestedBlock >= ctx.endBlockNumber: - return -1 - - # Increase queue when there are no free (Initial / Persisted) work items in - # the queue. At start, queue will be empty. - if result == -1: - result = ctx.workQueue.len - ctx.workQueue.setLen(result + 1) - - # Create new work item when queue was increased, reset when selected work item - # is at Persisted state. - var numBlocks = (ctx.endBlockNumber - nextRequestedBlock).toInt - if numBlocks > maxHeadersFetch: - numBlocks = maxHeadersFetch - ctx.workQueue[result] = WantedBlocks(startIndex: nextRequestedBlock, numBlocks: numBlocks.uint, state: Initial) - -proc persistWorkItem(ctx: SyncContext, wi: var WantedBlocks): ValidationResult = - result = ctx.chain.persistBlocks(wi.headers, wi.bodies) - case result - of ValidationResult.OK: - ctx.finalizedBlock = wi.endIndex - wi.state = Persisted - of ValidationResult.Error: - wi.state = Initial - # successful or not, we're done with these blocks - wi.headers = @[] - wi.bodies = @[] - -proc persistPendingWorkItems(ctx: SyncContext): (int, ValidationResult) = - var nextStartIndex = ctx.finalizedBlock + 1 - var keepRunning = true - var hasOutOfOrderBlocks = false - trace "Looking for out of order blocks" - while keepRunning: - keepRunning = false - hasOutOfOrderBlocks = false - # Go over the full work queue and check for every work item if it is in - # Received state and has the next blocks in line to be processed. - for i in 0 ..< ctx.workQueue.len: - let start = ctx.workQueue[i].startIndex - # There should be at least 1 like this, namely the just received work item - # that initiated this call. - if ctx.workQueue[i].state == Received: - if start == nextStartIndex: - trace "Processing pending work item", number = start - result = (i, ctx.persistWorkItem(ctx.workQueue[i])) - # TODO: We can stop here on failure, but have to set - # hasOutofORderBlocks. Is this always valid? - nextStartIndex = ctx.finalizedBlock + 1 - keepRunning = true - break - else: - hasOutOfOrderBlocks = true - - ctx.hasOutOfOrderBlocks = hasOutOfOrderBlocks - -proc returnWorkItem(ctx: SyncContext, workItem: int): ValidationResult = - let wi = addr ctx.workQueue[workItem] - let askedBlocks = wi.numBlocks.int - let receivedBlocks = wi.headers.len - let start = wi.startIndex - - if askedBlocks == receivedBlocks: - trace "Work item complete", - start, - askedBlocks, - receivedBlocks - - if wi.startIndex != ctx.finalizedBlock + 1: - trace "Blocks out of order", start, final = ctx.finalizedBlock - ctx.hasOutOfOrderBlocks = true - - if ctx.hasOutOfOrderBlocks: - let (index, validation) = ctx.persistPendingWorkItems() - # Only report an error if it was this peer's work item that failed - if validation == ValidationResult.Error and index == workitem: - result = ValidationResult.Error - # TODO: What about failures on other peers' work items? - # In that case the peer will probably get disconnected on future erroneous - # work items, but before this occurs, several more blocks (that will fail) - # might get downloaded from this peer. This will delay the sync and this - # should be improved. - else: - trace "Processing work item", number = wi.startIndex - # Validation result needs to be returned so that higher up can be decided - # to disconnect from this peer in case of error. - result = ctx.persistWorkItem(wi[]) - else: - trace "Work item complete but we got fewer blocks than requested, so we're ditching the whole thing.", - start, - askedBlocks, - receivedBlocks - return ValidationResult.Error - -proc newSyncContext(chain: AbstractChainDB, peerPool: PeerPool): SyncContext = - new result - result.chain = chain - result.peerPool = peerPool - result.trustedPeers = initHashSet[Peer]() - result.finalizedBlock = chain.getBestBlockHeader().blockNumber - -proc handleLostPeer(ctx: SyncContext) = - # TODO: ask the PeerPool for new connections and then call - # `obtainBlocksFromPeer` - discard - -proc getBestBlockNumber(p: Peer): Future[BlockNumber] {.async.} = - let request = BlocksRequest( - startBlock: HashOrNum(isHash: true, - hash: p.state(eth).bestBlockHash), - maxResults: 1, - skip: 0, - reverse: true) - - let latestBlock = await p.getBlockHeaders(request) - - if latestBlock.isSome and latestBlock.get.headers.len > 0: - result = latestBlock.get.headers[0].blockNumber - -proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} = - # Update our best block number - try: - let bestBlockNumber = await peer.getBestBlockNumber() - if bestBlockNumber > syncCtx.endBlockNumber: - trace "New sync end block number", number = bestBlockNumber - syncCtx.endBlockNumber = bestBlockNumber - except TransportError: - debug "Transport got closed during obtainBlocksFromPeer" - except CatchableError as e: - debug "Exception in getBestBlockNumber()", exc = e.name, err = e.msg - # no need to exit here, because the context might still have blocks to fetch - # from this peer - - while (let workItemIdx = syncCtx.availableWorkItem(); workItemIdx != -1 and - peer.connectionState notin {Disconnecting, Disconnected}): - template workItem: auto = syncCtx.workQueue[workItemIdx] - workItem.state = Requested - trace "Requesting block headers", start = workItem.startIndex, - count = workItem.numBlocks, peer = peer.remote.node - let request = BlocksRequest( - startBlock: HashOrNum(isHash: false, number: workItem.startIndex), - maxResults: workItem.numBlocks, - skip: 0, - reverse: false) - - var dataReceived = false - try: - let results = await peer.getBlockHeaders(request) - if results.isSome: - shallowCopy(workItem.headers, results.get.headers) - - var bodies = newSeq[BlockBody]() - var hashes = newSeq[KeccakHash]() - var nextIndex = workItem.startIndex - for i in workItem.headers: - if i.blockNumber != nextIndex: - raise newException(CatchableError, "The block numbers are not in sequence. Not processing this workItem.") - else: - nextIndex = nextIndex + 1 - hashes.add(blockHash(i)) - if hashes.len == maxBodiesFetch: - let b = await peer.getBlockBodies(hashes) - if b.isNone: - raise newException(CatchableError, "Was not able to get the block bodies.") - hashes.setLen(0) - bodies.add(b.get.blocks) - - if hashes.len != 0: - let b = await peer.getBlockBodies(hashes) - if b.isNone: - raise newException(CatchableError, "Was not able to get the block bodies.") - bodies.add(b.get.blocks) - - if bodies.len == workItem.headers.len: - shallowCopy(workItem.bodies, bodies) - dataReceived = true - else: - warn "Bodies len != headers.len", bodies = bodies.len, headers = workItem.headers.len - except TransportError: - debug "Transport got closed during obtainBlocksFromPeer" - except CatchableError as e: - # the success case sets `dataReceived`, so we can just fall back to the - # failure path below. If we signal time-outs with exceptions such - # failures will be easier to handle. - debug "Exception in obtainBlocksFromPeer()", exc = e.name, err = e.msg - - var giveUpOnPeer = false - - if dataReceived: - workItem.state = Received - if syncCtx.returnWorkItem(workItemIdx) != ValidationResult.OK: - giveUpOnPeer = true - else: - giveUpOnPeer = true - - if giveUpOnPeer: - workItem.state = Initial - try: - await peer.disconnect(SubprotocolReason) - except CatchableError: - discard - syncCtx.handleLostPeer() - break - - trace "Finished obtaining blocks", peer - -proc peersAgreeOnChain(a, b: Peer): Future[bool] {.async.} = - # Returns true if one of the peers acknowledges existence of the best block - # of another peer. - var - a = a - b = b - - if a.state(eth).bestDifficulty < b.state(eth).bestDifficulty: - swap(a, b) - - let request = BlocksRequest( - startBlock: HashOrNum(isHash: true, - hash: b.state(eth).bestBlockHash), - maxResults: 1, - skip: 0, - reverse: true) - - let latestBlock = await a.getBlockHeaders(request) - result = latestBlock.isSome and latestBlock.get.headers.len > 0 - -proc randomTrustedPeer(ctx: SyncContext): Peer = - var k = rand(ctx.trustedPeers.len - 1) - var i = 0 - for p in ctx.trustedPeers: - result = p - if i == k: return - inc i - -proc startSyncWithPeer(ctx: SyncContext, peer: Peer) {.async.} = - trace "start sync", peer, trustedPeers = ctx.trustedPeers.len - if ctx.trustedPeers.len >= minPeersToStartSync: - # We have enough trusted peers. Validate new peer against trusted - if await peersAgreeOnChain(peer, ctx.randomTrustedPeer()): - ctx.trustedPeers.incl(peer) - asyncCheck ctx.obtainBlocksFromPeer(peer) - elif ctx.trustedPeers.len == 0: - # Assume the peer is trusted, but don't start sync until we reevaluate - # it with more peers - trace "Assume trusted peer", peer - ctx.trustedPeers.incl(peer) - else: - # At this point we have some "trusted" candidates, but they are not - # "trusted" enough. We evaluate `peer` against all other candidates. - # If one of the candidates disagrees, we swap it for `peer`. If all - # candidates agree, we add `peer` to trusted set. The peers in the set - # will become "fully trusted" (and sync will start) when the set is big - # enough - var - agreeScore = 0 - disagreedPeer: Peer - - for tp in ctx.trustedPeers: - if await peersAgreeOnChain(peer, tp): - inc agreeScore - else: - disagreedPeer = tp - - let disagreeScore = ctx.trustedPeers.len - agreeScore - - if agreeScore == ctx.trustedPeers.len: - ctx.trustedPeers.incl(peer) # The best possible outcome - elif disagreeScore == 1: - trace "Peer is no longer trusted for sync", peer - ctx.trustedPeers.excl(disagreedPeer) - ctx.trustedPeers.incl(peer) - else: - trace "Peer not trusted for sync", peer - - if ctx.trustedPeers.len == minPeersToStartSync: - for p in ctx.trustedPeers: - asyncCheck ctx.obtainBlocksFromPeer(p) - - -proc onPeerConnected(ctx: SyncContext, peer: Peer) = - trace "New candidate for sync", peer - try: - let f = ctx.startSyncWithPeer(peer) - f.callback = proc(data: pointer) {.gcsafe.} = - if f.failed: - if f.error of TransportError: - debug "Transport got closed during startSyncWithPeer" - else: - error "startSyncWithPeer failed", msg = f.readError.msg, peer - except TransportError: - debug "Transport got closed during startSyncWithPeer" - except CatchableError as e: - debug "Exception in startSyncWithPeer()", exc = e.name, err = e.msg - - -proc onPeerDisconnected(ctx: SyncContext, p: Peer) = - trace "peer disconnected ", peer = p - ctx.trustedPeers.excl(p) - -proc startSync(ctx: SyncContext) = - var po: PeerObserver - po.onPeerConnected = proc(p: Peer) {.gcsafe.} = - ctx.onPeerConnected(p) - - po.onPeerDisconnected = proc(p: Peer) {.gcsafe.} = - ctx.onPeerDisconnected(p) - - po.setProtocol eth - ctx.peerPool.addObserver(ctx, po) - -proc findBestPeer(node: EthereumNode): (Peer, DifficultyInt) = - var - bestBlockDifficulty: DifficultyInt = 0.stuint(256) - bestPeer: Peer = nil - - for peer in node.peers(eth): - let peerEthState = peer.state(eth) - if peerEthState.initialized: - if peerEthState.bestDifficulty > bestBlockDifficulty: - bestBlockDifficulty = peerEthState.bestDifficulty - bestPeer = peer - - result = (bestPeer, bestBlockDifficulty) - -proc fastBlockchainSync*(node: EthereumNode): Future[SyncStatus] {.async.} = - ## Code for the fast blockchain sync procedure: - ## https://github.com/ethereum/wiki/wiki/Parallel-Block-Downloads - ## https://github.com/ethereum/go-ethereum/pull/1889 - # TODO: This needs a better interface. Consider removing this function and - # exposing SyncCtx - var syncCtx = newSyncContext(node.chain, node.peerPool) - syncCtx.startSync() - diff --git a/eth/p2p/blockchain_utils.nim b/eth/p2p/blockchain_utils.nim deleted file mode 100644 index b2ad84c..0000000 --- a/eth/p2p/blockchain_utils.nim +++ /dev/null @@ -1,52 +0,0 @@ -# nim-eth -# Copyright (c) 2018-2021 Status Research & Development GmbH -# Licensed and distributed under either of -# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). -# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). -# at your option. This file may not be copied, modified, or distributed except according to those terms. - -import - ../common/[eth_types, state_accessors] - -# TODO: Perhaps we can move this to eth-common - -proc getBlockHeaders*(db: AbstractChainDB, req: BlocksRequest): seq[BlockHeader] - {.gcsafe, raises: [RlpError, Defect].} = - result = newSeqOfCap[BlockHeader](req.maxResults) - - var foundBlock: BlockHeader - if db.getBlockHeader(req.startBlock, foundBlock): - result.add foundBlock - - while uint64(result.len) < req.maxResults: - if not req.reverse: - if not db.getSuccessorHeader(foundBlock, foundBlock, req.skip): - break - else: - if not db.getAncestorHeader(foundBlock, foundBlock, req.skip): - break - result.add foundBlock - - -template fetcher*(fetcherName, fetchingFunc, InputType, ResultType: untyped) = - proc fetcherName*(db: AbstractChainDB, - lookups: openArray[InputType]): seq[ResultType] {.gcsafe.} = - for lookup in lookups: - let fetched = fetchingFunc(db, lookup) - if fetched.hasData: - # TODO: should there be an else clause here. - # Is the peer responsible of figuring out that - # some of the requested items were not found? - result.add deref(fetched) - -fetcher getContractCodes, getContractCode, ContractCodeRequest, Blob -fetcher getBlockBodies, getBlockBody, KeccakHash, BlockBody -fetcher getStorageNodes, getStorageNode, KeccakHash, Blob -fetcher getReceipts, getReceipt, KeccakHash, Receipt -fetcher getProofs, getProof, ProofRequest, Blob -fetcher getHeaderProofs, getHeaderProof, ProofRequest, Blob - -proc getHelperTrieProofs*(db: AbstractChainDB, - reqs: openArray[HelperTrieProofRequest], - outNodes: var seq[Blob], outAuxData: var seq[Blob]) = - discard diff --git a/eth/p2p/private/p2p_types.nim b/eth/p2p/private/p2p_types.nim index d0ea286..56e31c5 100644 --- a/eth/p2p/private/p2p_types.nim +++ b/eth/p2p/private/p2p_types.nim @@ -10,12 +10,9 @@ import std/[deques, tables], chronos, stew/results, - ../../common/chaindb, ".."/../[rlp, keys], ".."/[enode, kademlia, discovery, rlpxcrypt] -export chaindb - const useSnappy* = defined(useSnappy) @@ -24,7 +21,6 @@ type EthereumNode* = ref object networkId*: NetworkId - chain*: AbstractChainDB clientId*: string connectionState*: ConnectionState keys*: KeyPair diff --git a/eth/p2p/rlpx_protocols/eth_protocol.nim b/eth/p2p/rlpx_protocols/eth_protocol.nim deleted file mode 100644 index 40c5834..0000000 --- a/eth/p2p/rlpx_protocols/eth_protocol.nim +++ /dev/null @@ -1,117 +0,0 @@ -# -# Ethereum P2P -# (c) Copyright 2018 -# Status Research & Development GmbH -# -# Licensed under either of -# Apache License, version 2.0, (LICENSE-APACHEv2) -# MIT license (LICENSE-MIT) -# - -## This module implements the Ethereum Wire Protocol: -## https://github.com/ethereum/wiki/wiki/Ethereum-Wire-Protocol - -import - chronos, stint, chronicles, - ../../rlp, ../../common/eth_types, ../../p2p, - ../rlpx, ../private/p2p_types, ../blockchain_utils - -type - NewBlockHashesAnnounce* = object - hash: KeccakHash - number: uint - - NewBlockAnnounce* = object - header*: BlockHeader - txs*: seq[Transaction] - uncles*: seq[BlockHeader] - - PeerState = ref object - initialized*: bool - bestBlockHash*: KeccakHash - bestDifficulty*: DifficultyInt - -const - maxStateFetch* = 384 - maxBodiesFetch* = 128 - maxReceiptsFetch* = 256 - maxHeadersFetch* = 192 - protocolVersion* = 63 - -p2pProtocol eth(version = protocolVersion, - peerState = PeerState, - useRequestIds = false): - - onPeerConnected do (peer: Peer): - let - network = peer.network - chain = network.chain - bestBlock = chain.getBestBlockHeader - - let m = await peer.status(protocolVersion, - network.networkId, - bestBlock.difficulty, - bestBlock.blockHash, - chain.genesisHash, - timeout = chronos.seconds(10)) - - if m.networkId == network.networkId and m.genesisHash == chain.genesisHash: - trace "suitable peer", peer - else: - raise newException(UselessPeerError, "Eth handshake params mismatch") - peer.state.initialized = true - peer.state.bestDifficulty = m.totalDifficulty - peer.state.bestBlockHash = m.bestHash - - handshake: - proc status(peer: Peer, - protocolVersion: uint, - networkId: NetworkId, - totalDifficulty: DifficultyInt, - bestHash: KeccakHash, - genesisHash: KeccakHash) - - proc newBlockHashes(peer: Peer, hashes: openArray[NewBlockHashesAnnounce]) = - discard - - proc transactions(peer: Peer, transactions: openArray[Transaction]) = - discard - - requestResponse: - proc getBlockHeaders(peer: Peer, request: BlocksRequest) {.gcsafe.} = - if request.maxResults > uint64(maxHeadersFetch): - await peer.disconnect(BreachOfProtocol) - return - - await response.send(peer.network.chain.getBlockHeaders(request)) - - proc blockHeaders(p: Peer, headers: openArray[BlockHeader]) - - requestResponse: - proc getBlockBodies(peer: Peer, hashes: openArray[KeccakHash]) {.gcsafe.} = - if hashes.len > maxBodiesFetch: - await peer.disconnect(BreachOfProtocol) - return - - await response.send(peer.network.chain.getBlockBodies(hashes)) - - proc blockBodies(peer: Peer, blocks: openArray[BlockBody]) - - proc newBlock(peer: Peer, bh: NewBlockAnnounce, totalDifficulty: DifficultyInt) = - discard - - nextID 13 - - requestResponse: - proc getNodeData(peer: Peer, hashes: openArray[KeccakHash]) = - await response.send(peer.network.chain.getStorageNodes(hashes)) - - proc nodeData(peer: Peer, data: openArray[Blob]) - - requestResponse: - proc getReceipts(peer: Peer, hashes: openArray[KeccakHash]) = discard - # TODO: implement `getReceipts` and reactivate this code - # await response.send(peer.network.chain.getReceipts(hashes)) - - proc receipts(peer: Peer, receipts: openArray[Receipt]) - diff --git a/eth/p2p/rlpx_protocols/les/flow_control.nim b/eth/p2p/rlpx_protocols/les/flow_control.nim deleted file mode 100644 index 6ef8c06..0000000 --- a/eth/p2p/rlpx_protocols/les/flow_control.nim +++ /dev/null @@ -1,506 +0,0 @@ -import - std/[tables, sets], - chronicles, chronos, - ../../../rlp, ../../..//common/eth_types, - ../../rlpx, ../../private/p2p_types, - ./private/les_types - -const - maxSamples = 100000 - rechargingScale = 1000000 - - lesStatsKey = "les.flow_control.stats" - lesStatsVer = 0 - -logScope: - topics = "les flow_control" - -# TODO: move this somewhere -proc pop[A, B](t: var Table[A, B], key: A): B = - result = t[key] - t.del(key) - -when LesTime is SomeInteger: - template `/`(lhs, rhs: LesTime): LesTime = - lhs div rhs - -when defined(testing): - var lesTime* = LesTime(0) - template now(): LesTime = lesTime - template advanceTime(t) = lesTime += LesTime(t) - -else: - import times - let startTime = epochTime() - - proc now(): LesTime = - return LesTime((times.epochTime() - startTime) * 1000.0) - -proc addSample(ra: var StatsRunningAverage; x, y: float64) = - if ra.count >= maxSamples: - let decay = float64(ra.count + 1 - maxSamples) / maxSamples - template applyDecay(x) = x -= x * decay - - applyDecay ra.sumX - applyDecay ra.sumY - applyDecay ra.sumXX - applyDecay ra.sumXY - ra.count = maxSamples - 1 - - inc ra.count - ra.sumX += x - ra.sumY += y - ra.sumXX += x * x - ra.sumXY += x * y - -proc calc(ra: StatsRunningAverage): tuple[m, b: float] = - if ra.count == 0: - return - - let count = float64(ra.count) - let d = count * ra.sumXX - ra.sumX * ra.sumX - if d < 0.001: - return (m: ra.sumY / count, b: 0.0) - - result.m = (count * ra.sumXY - ra.sumX * ra.sumY) / d - result.b = (ra.sumY / count) - (result.m * ra.sumX / count) - -proc currentRequestsCosts*(network: LesNetwork, - les: ProtocolInfo): seq[ReqCostInfo] = - # Make sure the message costs are already initialized - doAssert network.messageStats.len > les.messages[^1].id, - "Have you called `initFlowControl`" - - for msg in les.messages: - var (m, b) = network.messageStats[msg.id].calc() - if m < 0: - b += m - m = 0 - - if b < 0: - b = 0 - - result.add ReqCostInfo(msgId: msg.id, - baseCost: ReqCostInt(b * 2), - reqCost: ReqCostInt(m * 2)) - -proc persistMessageStats*(db: AbstractChainDB, - network: LesNetwork) = - doAssert db != nil - # XXX: Because of the package_visible_types template magic, Nim complains - # when we pass the messageStats expression directly to `encodeList` - let stats = network.messageStats - db.setSetting(lesStatsKey, rlp.encodeList(lesStatsVer, stats)) - -proc loadMessageStats*(network: LesNetwork, - les: ProtocolInfo, - db: AbstractChainDB): bool = - block readFromDB: - if db == nil: - break readFromDB - - var stats = db.getSetting(lesStatsKey) - if stats.len == 0: - notice "LES stats not present in the database" - break readFromDB - - try: - var statsRlp = rlpFromBytes(stats) - if not statsRlp.enterList: - notice "Found a corrupted LES stats record" - break readFromDB - - let version = statsRlp.read(int) - if version != lesStatsVer: - notice "Found an outdated LES stats record" - break readFromDB - - statsRlp >> network.messageStats - if network.messageStats.len <= les.messages[^1].id: - notice "Found an incomplete LES stats record" - break readFromDB - - return true - - except RlpError as e: - error "Error while loading LES message stats", err = e.msg - - newSeq(network.messageStats, les.messages[^1].id + 1) - return false - -proc update(s: var FlowControlState, t: LesTime) = - let dt = max(t - s.lastUpdate, LesTime(0)) - - s.bufValue = min( - s.bufValue + s.minRecharge * dt, - s.bufLimit) - - s.lastUpdate = t - -proc init(s: var FlowControlState, - bufLimit: BufValueInt, minRecharge: int, t: LesTime) = - s.bufValue = bufLimit - s.bufLimit = bufLimit - s.minRecharge = minRecharge - s.lastUpdate = t - -func canMakeRequest(s: FlowControlState, - maxCost: ReqCostInt): (LesTime, float64) = - ## Returns the required waiting time before sending a request and - ## the estimated buffer level afterwards (as a fraction of the limit) - const safetyMargin = 50 - - var maxCost = min( - maxCost + safetyMargin * s.minRecharge, - s.bufLimit) - - if s.bufValue >= maxCost: - result[1] = float64(s.bufValue - maxCost) / float64(s.bufLimit) - else: - result[0] = (maxCost - s.bufValue) / s.minRecharge - -func canServeRequest(srv: LesNetwork): bool = - result = srv.reqCount < srv.maxReqCount and - srv.reqCostSum < srv.maxReqCostSum - -proc rechargeReqCost(peer: LesPeer, t: LesTime) = - let dt = t - peer.lastRechargeTime - peer.reqCostVal += peer.reqCostGradient * dt / rechargingScale - peer.lastRechargeTime = t - if peer.isRecharging and t >= peer.rechargingEndsAt: - peer.isRecharging = false - peer.reqCostGradient = 0 - peer.reqCostVal = 0 - -proc updateRechargingParams(peer: LesPeer, network: LesNetwork) = - peer.reqCostGradient = 0 - if peer.reqCount > 0: - peer.reqCostGradient = rechargingScale / network.reqCount - - if peer.isRecharging: - peer.reqCostGradient = (network.rechargingRate * (peer.rechargingPower / - network.totalRechargingPower).int64).int - peer.rechargingEndsAt = peer.lastRechargeTime + - LesTime(peer.reqCostVal * rechargingScale / - -peer.reqCostGradient ) - -proc trackRequests(network: LesNetwork, peer: LesPeer, reqCountChange: int) = - peer.reqCount += reqCountChange - network.reqCount += reqCountChange - - doAssert peer.reqCount >= 0 and network.reqCount >= 0 - - if peer.reqCount == 0: - # All requests have been finished. Start recharging. - peer.isRecharging = true - network.totalRechargingPower += peer.rechargingPower - elif peer.reqCount == reqCountChange and peer.isRecharging: - # `peer.reqCount` must have been 0 for the condition above to hold. - # This is a transition from recharging to serving state. - peer.isRecharging = false - network.totalRechargingPower -= peer.rechargingPower - peer.startReqCostVal = peer.reqCostVal - - updateRechargingParams peer, network - -proc updateFlowControl(network: LesNetwork, t: LesTime) = - while true: - var firstTime = t - for peer in network.peers: - # TODO: perhaps use a bin heap here - if peer.isRecharging and peer.rechargingEndsAt < firstTime: - firstTime = peer.rechargingEndsAt - - let rechargingEndedForSomePeer = firstTime < t - - network.reqCostSum = 0 - for peer in network.peers: - peer.rechargeReqCost firstTime - network.reqCostSum += peer.reqCostVal - - if rechargingEndedForSomePeer: - for peer in network.peers: - if peer.isRecharging: - updateRechargingParams peer, network - else: - network.lastUpdate = t - return - -proc endPendingRequest*(network: LesNetwork, peer: LesPeer, t: LesTime) = - if peer.reqCount > 0: - network.updateFlowControl t - network.trackRequests peer, -1 - network.updateFlowControl t - -proc enlistInFlowControl*(network: LesNetwork, - peer: LesPeer, - peerRechargingPower = 100) = - let t = now() - - doAssert peer.isServer or peer.isClient - # Each Peer must be potential communication partner for us. - # There will be useless peers on the network, but the logic - # should make sure to disconnect them earlier in `onPeerConnected`. - - if peer.isServer: - peer.localFlowState.init network.bufferLimit, network.minRechargingRate, t - peer.pendingReqs = initTable[int, ReqCostInt]() - - if peer.isClient: - peer.remoteFlowState.init network.bufferLimit, network.minRechargingRate, t - peer.lastRechargeTime = t - peer.rechargingEndsAt = t - peer.rechargingPower = peerRechargingPower - - network.updateFlowControl t - -proc delistFromFlowControl*(network: LesNetwork, peer: LesPeer) = - let t = now() - - # XXX: perhaps this is not safe with our reqCount logic. - # The original code may depend on the binarity of the `serving` flag. - network.endPendingRequest peer, t - network.updateFlowControl t - -proc initFlowControl*(network: LesNetwork, les: ProtocolInfo, - maxReqCount, maxReqCostSum, reqCostTarget: int, - db: AbstractChainDB = nil) = - network.rechargingRate = rechargingScale * (rechargingScale / - (100 * rechargingScale / reqCostTarget - rechargingScale)) - network.maxReqCount = maxReqCount - network.maxReqCostSum = maxReqCostSum - - if not network.loadMessageStats(les, db): - warn "Failed to load persisted LES message stats. " & - "Flow control will be re-initilized." - -proc canMakeRequest(peer: var LesPeer, maxCost: int): (LesTime, float64) = - peer.localFlowState.update now() - return peer.localFlowState.canMakeRequest(maxCost) - -template getRequestCost(peer: LesPeer, localOrRemote: untyped, - msgId, costQuantity: int): ReqCostInt = - let - baseCost = peer.`localOrRemote ReqCosts`[msgId].baseCost - reqCost = peer.`localOrRemote ReqCosts`[msgId].reqCost - - min(baseCost + reqCost * costQuantity, - peer.`localOrRemote FlowState`.bufLimit) - -proc trackOutgoingRequest*(network: LesNetwork, peer: LesPeer, - msgId, reqId, costQuantity: int) = - let maxCost = peer.getRequestCost(local, msgId, costQuantity) - - peer.localFlowState.bufValue -= maxCost - peer.pendingReqsCost += maxCost - peer.pendingReqs[reqId] = peer.pendingReqsCost - -proc trackIncomingResponse*(peer: LesPeer, reqId: int, bv: BufValueInt) = - let bv = min(bv, peer.localFlowState.bufLimit) - if not peer.pendingReqs.hasKey(reqId): - return - - let costsSumAtSending = peer.pendingReqs.pop(reqId) - let costsSumChange = peer.pendingReqsCost - costsSumAtSending - - peer.localFlowState.bufValue = if bv > costsSumChange: bv - costsSumChange - else: 0 - peer.localFlowState.lastUpdate = now() - -proc acceptRequest*(network: LesNetwork, peer: LesPeer, - msgId, costQuantity: int): Future[bool] {.async.} = - let t = now() - let reqCost = peer.getRequestCost(remote, msgId, costQuantity) - - peer.remoteFlowState.update t - network.updateFlowControl t - - while not network.canServeRequest: - await sleepAsync(chronos.milliseconds(10)) - - if peer notin network.peers: - # The peer was disconnected or the network - # was shut down while we waited - return false - - network.trackRequests peer, +1 - network.updateFlowControl network.lastUpdate - - if reqCost > peer.remoteFlowState.bufValue: - error "LES peer sent request too early", - recharge = (reqCost - peer.remoteFlowState.bufValue) * rechargingScale / - peer.remoteFlowState.minRecharge - return false - - return true - -proc bufValueAfterRequest*(network: LesNetwork, peer: LesPeer, - msgId: int, quantity: int): BufValueInt = - let t = now() - let costs = peer.remoteReqCosts[msgId] - var reqCost = costs.baseCost + quantity * costs.reqCost - - peer.remoteFlowState.update t - peer.remoteFlowState.bufValue -= reqCost - - network.endPendingRequest peer, t - - let curReqCost = peer.reqCostVal - if curReqCost < peer.remoteFlowState.bufLimit: - let bv = peer.remoteFlowState.bufLimit - curReqCost - if bv > peer.remoteFlowState.bufValue: - peer.remoteFlowState.bufValue = bv - - network.messageStats[msgId].addSample(float64(quantity), - float64(curReqCost - peer.startReqCostVal)) - - return peer.remoteFlowState.bufValue - -when defined(testing): - import unittest2, random, ../../rlpx - - proc isMax(s: FlowControlState): bool = - s.bufValue == s.bufLimit - - p2pProtocol dummyLes(version = 1, rlpxName = "abc"): - proc a(p: Peer) - proc b(p: Peer) - proc c(p: Peer) - proc d(p: Peer) - proc e(p: Peer) - - template fequals(lhs, rhs: float64, epsilon = 0.0001): bool = - abs(lhs-rhs) < epsilon - - proc tests* = - randomize(3913631) - - suite "les flow control": - suite "running averages": - test "consistent costs": - var s: StatsRunningAverage - for i in 0..100: - s.addSample(5.0, 100.0) - - let (cost, base) = s.calc - - check: - fequals(cost, 100.0) - fequals(base, 0.0) - - test "randomized averages": - proc performTest(qBase, qRandom: int, cBase, cRandom: float64) = - var - s: StatsRunningAverage - expectedFinalCost = cBase + cRandom / 2 - error = expectedFinalCost - - for samples in [100, 1000, 10000]: - for i in 0..samples: - let q = float64(qBase + rand(10)) - s.addSample(q, q * (cBase + rand(cRandom))) - - let (newCost, newBase) = s.calc - # With more samples, our error should decrease, getting - # closer and closer to the average (unless we are already close enough) - let newError = abs(newCost - expectedFinalCost) - # This check fails with Nim-1.6: - # check newError < error - error = newError - - # After enough samples we should be very close the the final result - check error < (expectedFinalCost * 0.02) - - performTest(1, 10, 5.0, 100.0) - performTest(1, 4, 200.0, 1000.0) - - suite "buffer value calculations": - type TestReq = object - peer: LesPeer - msgId, quantity: int - accepted: bool - - setup: - var lesNetwork = new LesNetwork - lesNetwork.peers = initHashSet[LesPeer]() - lesNetwork.initFlowControl(dummyLes.protocolInfo, - reqCostTarget = 300, - maxReqCount = 5, - maxReqCostSum = 1000) - - for i in 0 ..< lesNetwork.messageStats.len: - lesNetwork.messageStats[i].addSample(1.0, float(i) * 100.0) - - var client = new LesPeer - client.isClient = true - - var server = new LesPeer - server.isServer = true - - var clientServer = new LesPeer - clientServer.isClient = true - clientServer.isServer = true - - var client2 = new LesPeer - client2.isClient = true - - var client3 = new LesPeer - client3.isClient = true - - var bv: BufValueInt - - template enlist(peer: LesPeer) {.dirty.} = - let reqCosts = currentRequestsCosts(lesNetwork, dummyLes.protocolInfo) - peer.remoteReqCosts = reqCosts - peer.localReqCosts = reqCosts - lesNetwork.peers.incl peer - lesNetwork.enlistInFlowControl peer - - template startReq(p: LesPeer, msg, q: int): TestReq = - var req: TestReq - req.peer = p - req.msgId = msg - req.quantity = q - req.accepted = waitFor lesNetwork.acceptRequest(p, msg, q) - req - - template endReq(req: TestReq): BufValueInt = - bufValueAfterRequest(lesNetwork, req.peer, req.msgId, req.quantity) - - test "single peer recharging": - lesNetwork.bufferLimit = 1000 - lesNetwork.minRechargingRate = 100 - - enlist client - - check: - client.remoteFlowState.isMax - client.rechargingPower > 0 - - advanceTime 100 - - let r1 = client.startReq(0, 100) - check r1.accepted - check client.isRecharging == false - - advanceTime 50 - - let r2 = client.startReq(1, 1) - check r2.accepted - check client.isRecharging == false - - advanceTime 25 - bv = endReq r2 - check client.isRecharging == false - - advanceTime 130 - bv = endReq r1 - check client.isRecharging == true - - advanceTime 300 - lesNetwork.updateFlowControl now() - - check: - client.isRecharging == false - client.remoteFlowState.isMax - diff --git a/eth/p2p/rlpx_protocols/les/private/les_types.nim b/eth/p2p/rlpx_protocols/les/private/les_types.nim deleted file mode 100644 index 16aab15..0000000 --- a/eth/p2p/rlpx_protocols/les/private/les_types.nim +++ /dev/null @@ -1,111 +0,0 @@ -import - std/[hashes, tables, sets], - ../../../../common/eth_types - -type - AnnounceType* = enum - None, - Simple, - Signed, - Unspecified - - ReqCostInfo* = object - msgId*: int - baseCost*, reqCost*: ReqCostInt - - FlowControlState* = object - bufValue*, bufLimit*: int - minRecharge*: int - lastUpdate*: LesTime - - StatsRunningAverage* = object - sumX*, sumY*, sumXX*, sumXY*: float64 - count*: int - - LesPeer* = ref object - isServer*: bool - isClient*: bool - announceType*: AnnounceType - - bestDifficulty*: DifficultyInt - bestBlockHash*: KeccakHash - bestBlockNumber*: BlockNumber - - hasChainSince*: HashOrNum - hasStateSince*: HashOrNum - relaysTransactions*: bool - - # The variables below are used to implement the flow control - # mechanisms of LES from our point of view as a server. - # They describe how much load has been generated by this - # particular peer. - reqCount*: int # How many outstanding requests are there? - # - rechargingPower*: int # Do we give this peer any extra priority - # (implemented as a faster recharning rate) - # 100 is the default. You can go higher and lower. - # - isRecharging*: bool # This is true while the peer is not making - # any requests - # - reqCostGradient*: int # Measures the speed of recharging or accumulating - # "requests cost" at any given moment. - # - reqCostVal*: int # The accumulated "requests cost" - # - rechargingEndsAt*: int # When will recharging end? - # (the buffer of the Peer will be fully restored) - # - lastRechargeTime*: LesTime # When did we last update the recharging parameters - # - startReqCostVal*: int # TODO - - remoteFlowState*: FlowControlState - remoteReqCosts*: seq[ReqCostInfo] - - # The next variables are used to limit ourselves as a client in order to - # not violate the control-flow requirements of the remote LES server. - - pendingReqs*: Table[int, ReqCostInt] - pendingReqsCost*: int - - localFlowState*: FlowControlState - localReqCosts*: seq[ReqCostInfo] - - LesNetwork* = ref object - peers*: HashSet[LesPeer] - messageStats*: seq[StatsRunningAverage] - ourAnnounceType*: AnnounceType - - # The fields below are relevant when serving data. - bufferLimit*: int - minRechargingRate*: int - - reqCostSum*, maxReqCostSum*: ReqCostInt - reqCount*, maxReqCount*: int - sumWeigth*: int - - rechargingRate*: int64 - totalRechargedUnits*: int - totalRechargingPower*: int - - lastUpdate*: LesTime - - KeyValuePair* = object - key*: string - value*: Blob - - HandshakeError* = object of CatchableError - - LesTime* = int # this is in milliseconds - BufValueInt* = int - ReqCostInt* = int - -template hash*(peer: LesPeer): Hash = hash(cast[pointer](peer)) - -template areWeServingData*(network: LesNetwork): bool = - network.maxReqCount != 0 - -template areWeRequestingData*(network: LesNetwork): bool = - network.ourAnnounceType != AnnounceType.Unspecified - diff --git a/eth/p2p/rlpx_protocols/les_protocol.nim b/eth/p2p/rlpx_protocols/les_protocol.nim deleted file mode 100644 index 520b11e..0000000 --- a/eth/p2p/rlpx_protocols/les_protocol.nim +++ /dev/null @@ -1,460 +0,0 @@ -# -# Ethereum P2P -# (c) Copyright 2018 -# Status Research & Development GmbH -# -# Licensed under either of -# Apache License, version 2.0, (LICENSE-APACHEv2) -# MIT license (LICENSE-MIT) -# - -import - std/[times, tables, options, sets, hashes, strutils], - stew/shims/macros, chronicles, chronos, nimcrypto/[keccak, hash], - ".."/../[rlp, keys], ../../common/eth_types, - ".."/[rlpx, kademlia, blockchain_utils], ../private/p2p_types, - ./les/private/les_types, ./les/flow_control - -export - les_types - -const - lesVersion = 2 - maxHeadersFetch = 192 - maxBodiesFetch = 32 - maxReceiptsFetch = 128 - maxCodeFetch = 64 - maxProofsFetch = 64 - maxHeaderProofsFetch = 64 - maxTransactionsFetch = 64 - - # Handshake properties: - # https://github.com/zsfelfoldi/go-ethereum/wiki/Light-Ethereum-Subprotocol-(LES) - keyProtocolVersion = "protocolVersion" - ## P: is 1 for the LPV1 protocol version. - - keyNetworkId = "networkId" - ## P: should be 0 for testnet, 1 for mainnet. - - keyHeadTotalDifficulty = "headTd" - ## P: Total Difficulty of the best chain. - ## Integer, as found in block header. - - keyHeadHash = "headHash" - ## B_32: the hash of the best (i.e. highest TD) known block. - - keyHeadNumber = "headNum" - ## P: the number of the best (i.e. highest TD) known block. - - keyGenesisHash = "genesisHash" - ## B_32: the hash of the Genesis block. - - keyServeHeaders = "serveHeaders" - ## (optional, no value) - ## present if the peer can serve header chain downloads. - - keyServeChainSince = "serveChainSince" - ## P (optional) - ## present if the peer can serve Body/Receipts ODR requests - ## starting from the given block number. - - keyServeStateSince = "serveStateSince" - ## P (optional): - ## present if the peer can serve Proof/Code ODR requests - ## starting from the given block number. - - keyRelaysTransactions = "txRelay" - ## (optional, no value) - ## present if the peer can relay transactions to the ETH network. - - keyFlowControlBL = "flowControl/BL" - keyFlowControlMRC = "flowControl/MRC" - keyFlowControlMRR = "flowControl/MRR" - ## see Client Side Flow Control: - ## https://github.com/zsfelfoldi/go-ethereum/wiki/Client-Side-Flow-Control-model-for-the-LES-protocol - - keyAnnounceType = "announceType" - keyAnnounceSignature = "sign" - -proc initProtocolState(network: LesNetwork, node: EthereumNode) {.gcsafe.} = - network.peers = initHashSet[LesPeer]() - -proc addPeer(network: LesNetwork, peer: LesPeer) = - network.enlistInFlowControl peer - network.peers.incl peer - -proc removePeer(network: LesNetwork, peer: LesPeer) = - network.delistFromFlowControl peer - network.peers.excl peer - -template costQuantity(quantityExpr, max: untyped) {.pragma.} - -proc getCostQuantity(fn: NimNode): tuple[quantityExpr, maxQuantity: NimNode] = - # XXX: `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes - # (TODO: file as an issue) - let costQuantity = fn.pragma.findPragma(bindSym"costQuantity") - doAssert costQuantity != nil - - result.quantityExpr = costQuantity[1] - result.maxQuantity= costQuantity[2] - - if result.maxQuantity.kind == nnkExprEqExpr: - result.maxQuantity = result.maxQuantity[1] - -macro outgoingRequestDecorator(n: untyped): untyped = - result = n - let (costQuantity, maxQuantity) = n.getCostQuantity - - result.body.add quote do: - trackOutgoingRequest(peer.networkState(les), - peer.state(les), - perProtocolMsgId, reqId, `costQuantity`) - # echo result.repr - -macro incomingResponseDecorator(n: untyped): untyped = - result = n - - let trackingCall = quote do: - trackIncomingResponse(peer.state(les), reqId, msg.bufValue) - - result.body.insert(n.body.len - 1, trackingCall) - # echo result.repr - -macro incomingRequestDecorator(n: untyped): untyped = - result = n - let (costQuantity, maxQuantity) = n.getCostQuantity - - template acceptStep(quantityExpr, maxQuantity) {.dirty.} = - let requestCostQuantity = quantityExpr - if requestCostQuantity > maxQuantity: - await peer.disconnect(BreachOfProtocol) - return - - let lesPeer = peer.state - let lesNetwork = peer.networkState - - if not await acceptRequest(lesNetwork, lesPeer, - perProtocolMsgId, - requestCostQuantity): return - - result.body.insert(1, getAst(acceptStep(costQuantity, maxQuantity))) - # echo result.repr - -template updateBV: BufValueInt = - bufValueAfterRequest(lesNetwork, lesPeer, - perProtocolMsgId, requestCostQuantity) - -func getValue(values: openArray[KeyValuePair], - key: string, T: typedesc): Option[T] = - for v in values: - if v.key == key: - return some(rlp.decode(v.value, T)) - -func getRequiredValue(values: openArray[KeyValuePair], - key: string, T: typedesc): T = - for v in values: - if v.key == key: - return rlp.decode(v.value, T) - - raise newException(HandshakeError, - "Required handshake field " & key & " missing") - -p2pProtocol les(version = lesVersion, - peerState = LesPeer, - networkState = LesNetwork, - outgoingRequestDecorator = outgoingRequestDecorator, - incomingRequestDecorator = incomingRequestDecorator, - incomingResponseThunkDecorator = incomingResponseDecorator): - handshake: - proc status(p: Peer, values: openArray[KeyValuePair]) - - onPeerConnected do (peer: Peer): - let - network = peer.network - chain = network.chain - bestBlock = chain.getBestBlockHeader - lesPeer = peer.state - lesNetwork = peer.networkState - - template `=>`(k, v: untyped): untyped = - KeyValuePair(key: k, value: rlp.encode(v)) - - var lesProperties = @[ - keyProtocolVersion => lesVersion, - keyNetworkId => network.networkId, - keyHeadTotalDifficulty => bestBlock.difficulty, - keyHeadHash => bestBlock.blockHash, - keyHeadNumber => bestBlock.blockNumber, - keyGenesisHash => chain.genesisHash - ] - - lesPeer.remoteReqCosts = currentRequestsCosts(lesNetwork, les.protocolInfo) - - if lesNetwork.areWeServingData: - lesProperties.add [ - # keyServeHeaders => nil, - keyServeChainSince => 0, - keyServeStateSince => 0, - # keyRelaysTransactions => nil, - keyFlowControlBL => lesNetwork.bufferLimit, - keyFlowControlMRR => lesNetwork.minRechargingRate, - keyFlowControlMRC => lesPeer.remoteReqCosts - ] - - if lesNetwork.areWeRequestingData: - lesProperties.add(keyAnnounceType => lesNetwork.ourAnnounceType) - - let - s = await peer.status(lesProperties, timeout = chronos.seconds(10)) - peerNetworkId = s.values.getRequiredValue(keyNetworkId, NetworkId) - peerGenesisHash = s.values.getRequiredValue(keyGenesisHash, KeccakHash) - peerLesVersion = s.values.getRequiredValue(keyProtocolVersion, uint) - - template requireCompatibility(peerVar, localVar, varName: untyped) = - if localVar != peerVar: - raise newException(HandshakeError, - "Incompatibility detected! $1 mismatch ($2 != $3)" % - [varName, $localVar, $peerVar]) - - requireCompatibility(peerLesVersion, uint(lesVersion), "les version") - requireCompatibility(peerNetworkId, network.networkId, "network id") - requireCompatibility(peerGenesisHash, chain.genesisHash, "genesis hash") - - template `:=`(lhs, key) = - lhs = s.values.getRequiredValue(key, type(lhs)) - - lesPeer.bestBlockHash := keyHeadHash - lesPeer.bestBlockNumber := keyHeadNumber - lesPeer.bestDifficulty := keyHeadTotalDifficulty - - let peerAnnounceType = s.values.getValue(keyAnnounceType, AnnounceType) - if peerAnnounceType.isSome: - lesPeer.isClient = true - lesPeer.announceType = peerAnnounceType.get - else: - lesPeer.announceType = AnnounceType.Simple - lesPeer.hasChainSince := keyServeChainSince - lesPeer.hasStateSince := keyServeStateSince - lesPeer.relaysTransactions := keyRelaysTransactions - lesPeer.localFlowState.bufLimit := keyFlowControlBL - lesPeer.localFlowState.minRecharge := keyFlowControlMRR - lesPeer.localReqCosts := keyFlowControlMRC - - lesNetwork.addPeer lesPeer - - onPeerDisconnected do (peer: Peer, reason: DisconnectionReason) {.gcsafe.}: - peer.networkState.removePeer peer.state - - ## Header synchronisation - ## - - proc announce( - peer: Peer, - headHash: KeccakHash, - headNumber: BlockNumber, - headTotalDifficulty: DifficultyInt, - reorgDepth: BlockNumber, - values: openArray[KeyValuePair], - announceType: AnnounceType) = - - if peer.state.announceType == AnnounceType.None: - error "unexpected announce message", peer - return - - if announceType == AnnounceType.Signed: - let signature = values.getValue(keyAnnounceSignature, Blob) - if signature.isNone: - error "missing announce signature" - return - let sig = Signature.fromRaw(signature.get).tryGet() - let sigMsg = rlp.encodeList(headHash, headNumber, headTotalDifficulty) - let signerKey = recover(sig, sigMsg).tryGet() - if signerKey.toNodeId != peer.remote.id: - error "invalid announce signature" - # TODO: should we disconnect this peer? - return - - # TODO: handle new block - - requestResponse: - proc getBlockHeaders( - peer: Peer, - req: BlocksRequest) {. - costQuantity(req.maxResults.int, max = maxHeadersFetch).} = - - let headers = peer.network.chain.getBlockHeaders(req) - await response.send(updateBV(), headers) - - proc blockHeaders( - peer: Peer, - bufValue: BufValueInt, - blocks: openArray[BlockHeader]) - - ## On-damand data retrieval - ## - - requestResponse: - proc getBlockBodies( - peer: Peer, - blocks: openArray[KeccakHash]) {. - costQuantity(blocks.len, max = maxBodiesFetch), gcsafe.} = - - let blocks = peer.network.chain.getBlockBodies(blocks) - await response.send(updateBV(), blocks) - - proc blockBodies( - peer: Peer, - bufValue: BufValueInt, - bodies: openArray[BlockBody]) - - requestResponse: - proc getReceipts( - peer: Peer, - hashes: openArray[KeccakHash]) - {.costQuantity(hashes.len, max = maxReceiptsFetch).} = - - let receipts = peer.network.chain.getReceipts(hashes) - await response.send(updateBV(), receipts) - - proc receipts( - peer: Peer, - bufValue: BufValueInt, - receipts: openArray[Receipt]) - - requestResponse: - proc getProofs( - peer: Peer, - proofs: openArray[ProofRequest]) {. - costQuantity(proofs.len, max = maxProofsFetch).} = - - let proofs = peer.network.chain.getProofs(proofs) - await response.send(updateBV(), proofs) - - proc proofs( - peer: Peer, - bufValue: BufValueInt, - proofs: openArray[Blob]) - - requestResponse: - proc getContractCodes( - peer: Peer, - reqs: seq[ContractCodeRequest]) {. - costQuantity(reqs.len, max = maxCodeFetch).} = - - let results = peer.network.chain.getContractCodes(reqs) - await response.send(updateBV(), results) - - proc contractCodes( - peer: Peer, - bufValue: BufValueInt, - results: seq[Blob]) - - nextID 15 - - requestResponse: - proc getHeaderProofs( - peer: Peer, - reqs: openArray[ProofRequest]) {. - costQuantity(reqs.len, max = maxHeaderProofsFetch).} = - - let proofs = peer.network.chain.getHeaderProofs(reqs) - await response.send(updateBV(), proofs) - - proc headerProofs( - peer: Peer, - bufValue: BufValueInt, - proofs: openArray[Blob]) - - requestResponse: - proc getHelperTrieProofs( - peer: Peer, - reqs: openArray[HelperTrieProofRequest]) {. - costQuantity(reqs.len, max = maxProofsFetch).} = - - var nodes, auxData: seq[Blob] - peer.network.chain.getHelperTrieProofs(reqs, nodes, auxData) - await response.send(updateBV(), nodes, auxData) - - proc helperTrieProofs( - peer: Peer, - bufValue: BufValueInt, - nodes: seq[Blob], - auxData: seq[Blob]) - - ## Transaction relaying and status retrieval - ## - - requestResponse: - proc sendTxV2( - peer: Peer, - transactions: openArray[Transaction]) {. - costQuantity(transactions.len, max = maxTransactionsFetch).} = - - let chain = peer.network.chain - - var results: seq[TransactionStatusMsg] - for t in transactions: - let hash = t.rlpHash # TODO: this is not optimal, we can compute - # the hash from the request bytes. - # The RLP module can offer a helper Hashed[T] - # to make this easy. - var s = chain.getTransactionStatus(hash) - if s.status == TransactionStatus.Unknown: - chain.addTransactions([t]) - s = chain.getTransactionStatus(hash) - - results.add s - - await response.send(updateBV(), results) - - proc getTxStatus( - peer: Peer, - transactions: openArray[Transaction]) {. - costQuantity(transactions.len, max = maxTransactionsFetch).} = - - let chain = peer.network.chain - - var results: seq[TransactionStatusMsg] - for t in transactions: - results.add chain.getTransactionStatus(t.rlpHash) - await response.send(updateBV(), results) - - proc txStatus( - peer: Peer, - bufValue: BufValueInt, - transactions: openArray[TransactionStatusMsg]) - -proc configureLes*(node: EthereumNode, - # Client options: - announceType = AnnounceType.Simple, - # Server options. - # The zero default values indicate that the - # LES server will be deactivated. - maxReqCount = 0, - maxReqCostSum = 0, - reqCostTarget = 0) = - - doAssert announceType != AnnounceType.Unspecified or maxReqCount > 0 - - var lesNetwork = node.protocolState(les) - lesNetwork.ourAnnounceType = announceType - initFlowControl(lesNetwork, les.protocolInfo, - maxReqCount, maxReqCostSum, reqCostTarget, - node.chain) - -proc configureLesServer*(node: EthereumNode, - # Client options: - announceType = AnnounceType.Unspecified, - # Server options. - # The zero default values indicate that the - # LES server will be deactivated. - maxReqCount = 0, - maxReqCostSum = 0, - reqCostTarget = 0) = - ## This is similar to `configureLes`, but with default parameter - ## values appropriate for a server. - node.configureLes(announceType, maxReqCount, maxReqCostSum, reqCostTarget) - -proc persistLesMessageStats*(node: EthereumNode) = - persistMessageStats(node.chain, node.protocolState(les)) - diff --git a/eth/p2p/sync.nim b/eth/p2p/sync.nim deleted file mode 100644 index 6e3e053..0000000 --- a/eth/p2p/sync.nim +++ /dev/null @@ -1,45 +0,0 @@ -import - std/times, - chronos - -type - FullNodeSyncer* = ref object - chaindb: ChainDB - FastChainSyncer = ref object - RegularChainSyncer = ref object - -# How old (in seconds) must our local head be to cause us to start with a fast-sync before we -# switch to regular-sync. -const FAST_SYNC_CUTOFF = 60 * 60 * 24 - - -proc run(s: FullNodeSyncer) {.async.} = - let head = await s.chaindb.getCanonicalHead() - - # We're still too slow at block processing, so if our local head is older than - # FAST_SYNC_CUTOFF we first do a fast-sync run to catch up with the rest of the network. - # See https://github.com/ethereum/py-evm/issues/654 for more details - if head.timestamp < epochTime() - FAST_SYNC_CUTOFF: - # Fast-sync chain data. - self.logger.info("Starting fast-sync; current head: #%d", head.block_number) - chain_syncer = FastChainSyncer(self.chaindb, self.peer_pool, self.cancel_token) - await chain_syncer.run() - - # Ensure we have the state for our current head. - head = await self.wait(self.chaindb.coro_get_canonical_head()) - if head.state_root != EMPTY_ROOT_HASH and head.state_root not in self.base_db: - self.logger.info( - "Missing state for current head (#%d), downloading it", head.block_number) - downloader = StateDownloader( - self.base_db, head.state_root, self.peer_pool, self.cancel_token) - await downloader.run() - - # Now, loop forever, fetching missing blocks and applying them. - self.logger.info("Starting regular sync; current head: #%d", head.block_number) - # This is a bit of a hack, but self.chain is stuck in the past as during the fast-sync we - # did not use it to import the blocks, so we need this to get a Chain instance with our - # latest head so that we can start importing blocks. - new_chain = type(self.chain)(self.base_db) - chain_syncer = RegularChainSyncer( - new_chain, self.chaindb, self.peer_pool, self.cancel_token) - await chain_syncer.run() diff --git a/tests/fuzzing/rlpx/thunk.nim b/tests/fuzzing/rlpx/thunk.nim index 508cb64..adcf980 100644 --- a/tests/fuzzing/rlpx/thunk.nim +++ b/tests/fuzzing/rlpx/thunk.nim @@ -1,7 +1,7 @@ import testutils/fuzzing, chronos, ../../../eth/p2p, ../../../eth/p2p/rlpx, ../../../eth/p2p/private/p2p_types, - ../../../eth/p2p/rlpx_protocols/eth_protocol, + ../../p2p/eth_protocol, ../../p2p/p2p_test_helper var diff --git a/tests/p2p/all_tests.nim b/tests/p2p/all_tests.nim index f80189d..66b2193 100644 --- a/tests/p2p/all_tests.nim +++ b/tests/p2p/all_tests.nim @@ -6,5 +6,4 @@ import ./test_ecies, ./test_enode, ./test_rlpx_thunk, - ./test_protocol_handlers, - ./les/test_flow_control + ./test_protocol_handlers \ No newline at end of file diff --git a/tests/p2p/eth_protocol.nim b/tests/p2p/eth_protocol.nim new file mode 100644 index 0000000..795f3db --- /dev/null +++ b/tests/p2p/eth_protocol.nim @@ -0,0 +1,51 @@ +import + chronos, + ../../eth/[p2p, common] + +# for testing purpose +# real eth protocol implementation is in nimbus-eth1 repo + +type + PeerState = ref object + initialized*: bool + +p2pProtocol eth(version = 63, + peerState = PeerState, + useRequestIds = false): + + onPeerConnected do (peer: Peer): + let + network = peer.network + + let m = await peer.status(63, + network.networkId, + 0.u256, + Hash256(), + Hash256(), + timeout = chronos.seconds(10)) + + handshake: + proc status(peer: Peer, + protocolVersion: uint, + networkId: NetworkId, + totalDifficulty: DifficultyInt, + bestHash: KeccakHash, + genesisHash: KeccakHash) + + requestResponse: + proc getBlockHeaders(peer: Peer, request: openArray[KeccakHash]) {.gcsafe.} = discard + proc blockHeaders(p: Peer, headers: openArray[BlockHeader]) + + requestResponse: + proc getBlockBodies(peer: Peer, hashes: openArray[KeccakHash]) {.gcsafe.} = discard + proc blockBodies(peer: Peer, blocks: openArray[BlockBody]) + + nextID 13 + + requestResponse: + proc getNodeData(peer: Peer, hashes: openArray[KeccakHash]) = discard + proc nodeData(peer: Peer, data: openArray[Blob]) + + requestResponse: + proc getReceipts(peer: Peer, hashes: openArray[KeccakHash]) = discard + proc receipts(peer: Peer, receipts: openArray[Receipt]) diff --git a/tests/p2p/les/test_flow_control.nim b/tests/p2p/les/test_flow_control.nim deleted file mode 100644 index baf142b..0000000 --- a/tests/p2p/les/test_flow_control.nim +++ /dev/null @@ -1,7 +0,0 @@ -{.used.} - -import - ../../../eth/p2p/rlpx_protocols/les/flow_control - -flow_control.tests() - diff --git a/tests/p2p/p2p_test_helper.nim b/tests/p2p/p2p_test_helper.nim index b6c04ef..3950dae 100644 --- a/tests/p2p/p2p_test_helper.nim +++ b/tests/p2p/p2p_test_helper.nim @@ -16,7 +16,7 @@ proc setupTestNode*( # Don't create new RNG every time in production code! let keys1 = KeyPair.random(rng[]) var node = newEthereumNode( - keys1, localAddress(nextPort), NetworkId(1), nil, + keys1, localAddress(nextPort), NetworkId(1), addAllCapabilities = false, bindUdpPort = Port(nextPort), bindTcpPort = Port(nextPort), rng = rng) diff --git a/tests/p2p/test_rlpx_thunk.nim b/tests/p2p/test_rlpx_thunk.nim index 4747e67..b39f22c 100644 --- a/tests/p2p/test_rlpx_thunk.nim +++ b/tests/p2p/test_rlpx_thunk.nim @@ -2,10 +2,11 @@ import std/[json, os], - unittest2, + unittest2, stint, chronos, stew/byteutils, - ../../eth/p2p, ../../eth/p2p/rlpx_protocols/eth_protocol, - ./p2p_test_helper + ../../eth/[p2p, common], + ./p2p_test_helper, + ./eth_protocol let rng = newRng()