move eth1 specific code to where it belongs

This commit is contained in:
jangko 2022-10-11 10:02:16 +07:00
parent bd8d9c65a3
commit ae616935c3
No known key found for this signature in database
GPG Key ID: 31702AE10541E6B9
18 changed files with 59 additions and 1888 deletions

View File

@ -1,100 +0,0 @@
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
chronicles,
./eth_types_rlp,
../rlp,
../trie/db
export eth_types_rlp, rlp, db
type
AbstractChainDB* = ref object of RootRef
proc notImplemented(name: string) =
debug "Method not implemented", meth = name
method genesisHash*(db: AbstractChainDB): KeccakHash
{.base, gcsafe, raises: [Defect].} =
notImplemented("genesisHash")
method getBlockHeader*(db: AbstractChainDB, b: HashOrNum,
output: var BlockHeader): bool {.base, gcsafe, raises: [RlpError, Defect].} =
notImplemented("getBlockHeader")
proc getBlockHeader*(db: AbstractChainDB, hash: KeccakHash): BlockHeaderRef {.gcsafe.} =
new result
if not db.getBlockHeader(HashOrNum(isHash: true, hash: hash), result[]):
return nil
proc getBlockHeader*(db: AbstractChainDB, b: BlockNumber): BlockHeaderRef {.gcsafe.} =
new result
if not db.getBlockHeader(HashOrNum(isHash: false, number: b), result[]):
return nil
# Need to add `RlpError` and sometimes `CatchableError` as the implementations
# of these methods in nimbus-eth1 will raise these. Using `CatchableError`
# because some can raise for errors not know to this repository such as
# `CanonicalHeadNotFound`. It would probably be better to use Result.
method getBestBlockHeader*(self: AbstractChainDB): BlockHeader
{.base, gcsafe, raises: [RlpError, CatchableError, Defect].} =
notImplemented("getBestBlockHeader")
method getSuccessorHeader*(db: AbstractChainDB, h: BlockHeader,
output: var BlockHeader, skip = 0'u): bool
{.base, gcsafe, raises: [RlpError, Defect].} =
notImplemented("getSuccessorHeader")
method getAncestorHeader*(db: AbstractChainDB, h: BlockHeader,
output: var BlockHeader, skip = 0'u): bool
{.base, gcsafe, raises: [RlpError, Defect].} =
notImplemented("getAncestorHeader")
method getBlockBody*(db: AbstractChainDB, blockHash: KeccakHash): BlockBodyRef
{.base, gcsafe, raises: [RlpError, Defect].} =
notImplemented("getBlockBody")
method getReceipt*(db: AbstractChainDB, hash: KeccakHash): ReceiptRef {.base, gcsafe.} =
notImplemented("getReceipt")
method getTrieDB*(db: AbstractChainDB): TrieDatabaseRef
{.base, gcsafe, raises: [Defect].} =
notImplemented("getTrieDB")
method getCodeByHash*(db: AbstractChainDB, hash: KeccakHash): Blob {.base, gcsafe.} =
notImplemented("getCodeByHash")
method getSetting*(db: AbstractChainDB, key: string): seq[byte] {.base, gcsafe.} =
notImplemented("getSetting")
method setSetting*(db: AbstractChainDB, key: string, val: openArray[byte]) {.base, gcsafe.} =
notImplemented("setSetting")
method getHeaderProof*(db: AbstractChainDB, req: ProofRequest): Blob {.base, gcsafe.} =
notImplemented("getHeaderProof")
method getProof*(db: AbstractChainDB, req: ProofRequest): Blob {.base, gcsafe.} =
notImplemented("getProof")
method getHelperTrieProof*(db: AbstractChainDB, req: HelperTrieProofRequest): Blob {.base, gcsafe.} =
notImplemented("getHelperTrieProof")
method getTransactionStatus*(db: AbstractChainDB, txHash: KeccakHash): TransactionStatusMsg {.base, gcsafe.} =
notImplemented("getTransactionStatus")
method addTransactions*(db: AbstractChainDB, transactions: openArray[Transaction]) {.base, gcsafe.} =
notImplemented("addTransactions")
method persistBlocks*(db: AbstractChainDB, headers: openArray[BlockHeader], bodies: openArray[BlockBody]): ValidationResult {.base, gcsafe.} =
notImplemented("persistBlocks")
method getForkId*(db: AbstractChainDB, n: BlockNumber): ForkID {.base, gcsafe.} =
# EIP 2364/2124
notImplemented("getForkId")
method getTotalDifficulty*(db: AbstractChainDB): DifficultyInt {.base, gcsafe, raises: [RlpError, Defect].} =
notImplemented("getTotalDifficulty")

View File

@ -143,17 +143,6 @@ type
txs*: seq[Transaction]
uncles*: seq[BlockHeader]
CollationHeader* = object
shard*: uint
expectedPeriod*: uint
periodStartPrevHash*: Hash256
parentHash*: Hash256
txRoot*: Hash256
coinbase*: EthAddress
stateRoot*: Hash256
receiptRoot*: Hash256
blockNumber*: BlockNumber
# TODO: Make BlockNumber a uint64 and deprecate either this or BlockHashOrNumber
HashOrNum* = object
case isHash*: bool
@ -169,33 +158,6 @@ type
else:
number*: uint64
BlocksRequest* = object
startBlock*: HashOrNum
maxResults*, skip*: uint
reverse*: bool
ProofRequest* = object
blockHash*: KeccakHash
accountKey*: Blob
key*: Blob
fromLevel*: uint
HeaderProofRequest* = object
chtNumber*: uint
blockNumber*: uint
fromLevel*: uint
ContractCodeRequest* = object
blockHash*: KeccakHash
key*: EthAddress
HelperTrieProofRequest* = object
subType*: uint
sectionIdx*: uint
key*: Blob
fromLevel*: uint
auxReq*: uint
BlockHeaderRef* = ref BlockHeader
BlockBodyRef* = ref BlockBody
ReceiptRef* = ref Receipt

View File

@ -1,27 +0,0 @@
import
../trie/[trie_defs, db, hexary],
../rlp,
./chaindb
export chaindb
proc getAccount*(db: TrieDatabaseRef,
rootHash: KeccakHash,
account: EthAddress): Account =
let trie = initSecureHexaryTrie(db, rootHash)
let data = trie.get(account)
if data.len > 0:
result = rlp.decode(data, Account)
else:
result = newAccount()
proc getContractCode*(chain: AbstractChainDB, req: ContractCodeRequest): Blob {.gcsafe.} =
let b = chain.getBlockHeader(req.blockHash)
if b.hasData:
let acc = getAccount(chain.getTrieDB, b.stateRoot, req.key)
result = chain.getCodeByHash(acc.codeHash)
proc getStorageNode*(chain: AbstractChainDB, hash: KeccakHash): Blob
{.raises: [CatchableError, Defect].} =
let db = chain.getTrieDB
return db.get(hash.data)

View File

@ -10,7 +10,7 @@
import
std/[tables, algorithm, random],
chronos, chronos/timer, chronicles,
./keys, ./common/chaindb, ./p2p/private/p2p_types,
./keys, ./p2p/private/p2p_types,
./p2p/[kademlia, discovery, enode, peer_pool, rlpx]
export
@ -33,7 +33,6 @@ proc newEthereumNode*(
keys: KeyPair,
address: Address,
networkId: NetworkId,
chain: AbstractChainDB,
clientId = "nim-eth-p2p/0.2.0", # TODO: read this value from nimble somehow
addAllCapabilities = true,
useCompression: bool = false,

View File

@ -1,412 +0,0 @@
# nim-eth
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
std/[sets, options, random, hashes],
chronos, chronicles,
../common/eth_types, ../p2p,
./private/p2p_types, ./rlpx_protocols/eth_protocol, "."/[rlpx, peer_pool]
const
minPeersToStartSync* = 2 # Wait for consensus of at least this
# number of peers before syncing
type
SyncStatus* = enum
syncSuccess
syncNotEnoughPeers
syncTimeOut
WantedBlocksState = enum
Initial,
Requested,
Received,
Persisted
WantedBlocks = object
startIndex: BlockNumber
numBlocks: uint
state: WantedBlocksState
headers: seq[BlockHeader]
bodies: seq[BlockBody]
SyncContext = ref object
workQueue: seq[WantedBlocks]
endBlockNumber: BlockNumber
finalizedBlock: BlockNumber # Block which was downloaded and verified
chain: AbstractChainDB
peerPool: PeerPool
trustedPeers: HashSet[Peer]
hasOutOfOrderBlocks: bool
proc hash*(p: Peer): Hash = hash(cast[pointer](p))
proc endIndex(b: WantedBlocks): BlockNumber =
result = b.startIndex
result += (b.numBlocks - 1).toBlockNumber
proc availableWorkItem(ctx: SyncContext): int =
var maxPendingBlock = ctx.finalizedBlock # the last downloaded & processed
trace "queue len", length = ctx.workQueue.len
result = -1
for i in 0 .. ctx.workQueue.high:
case ctx.workQueue[i].state
of Initial:
# When there is a work item at Initial state, immediatly use this one.
# This usually means a previous work item that failed somewhere in the
# process, and thus can be reused to work on.
return i
of Persisted:
# In case of Persisted, we can reset this work item to a new one.
result = i
# No break here to give work items in Initial state priority and to
# calculate endBlock.
else:
discard
# Check all endBlocks of all workqueue items to decide on next range of
# blocks to collect & process.
let endBlock = ctx.workQueue[i].endIndex
if endBlock > maxPendingBlock:
maxPendingBlock = endBlock
let nextRequestedBlock = maxPendingBlock + 1
# If this next block doesn't exist yet according to any of our peers, don't
# return a work item (and sync will be stopped).
if nextRequestedBlock >= ctx.endBlockNumber:
return -1
# Increase queue when there are no free (Initial / Persisted) work items in
# the queue. At start, queue will be empty.
if result == -1:
result = ctx.workQueue.len
ctx.workQueue.setLen(result + 1)
# Create new work item when queue was increased, reset when selected work item
# is at Persisted state.
var numBlocks = (ctx.endBlockNumber - nextRequestedBlock).toInt
if numBlocks > maxHeadersFetch:
numBlocks = maxHeadersFetch
ctx.workQueue[result] = WantedBlocks(startIndex: nextRequestedBlock, numBlocks: numBlocks.uint, state: Initial)
proc persistWorkItem(ctx: SyncContext, wi: var WantedBlocks): ValidationResult =
result = ctx.chain.persistBlocks(wi.headers, wi.bodies)
case result
of ValidationResult.OK:
ctx.finalizedBlock = wi.endIndex
wi.state = Persisted
of ValidationResult.Error:
wi.state = Initial
# successful or not, we're done with these blocks
wi.headers = @[]
wi.bodies = @[]
proc persistPendingWorkItems(ctx: SyncContext): (int, ValidationResult) =
var nextStartIndex = ctx.finalizedBlock + 1
var keepRunning = true
var hasOutOfOrderBlocks = false
trace "Looking for out of order blocks"
while keepRunning:
keepRunning = false
hasOutOfOrderBlocks = false
# Go over the full work queue and check for every work item if it is in
# Received state and has the next blocks in line to be processed.
for i in 0 ..< ctx.workQueue.len:
let start = ctx.workQueue[i].startIndex
# There should be at least 1 like this, namely the just received work item
# that initiated this call.
if ctx.workQueue[i].state == Received:
if start == nextStartIndex:
trace "Processing pending work item", number = start
result = (i, ctx.persistWorkItem(ctx.workQueue[i]))
# TODO: We can stop here on failure, but have to set
# hasOutofORderBlocks. Is this always valid?
nextStartIndex = ctx.finalizedBlock + 1
keepRunning = true
break
else:
hasOutOfOrderBlocks = true
ctx.hasOutOfOrderBlocks = hasOutOfOrderBlocks
proc returnWorkItem(ctx: SyncContext, workItem: int): ValidationResult =
let wi = addr ctx.workQueue[workItem]
let askedBlocks = wi.numBlocks.int
let receivedBlocks = wi.headers.len
let start = wi.startIndex
if askedBlocks == receivedBlocks:
trace "Work item complete",
start,
askedBlocks,
receivedBlocks
if wi.startIndex != ctx.finalizedBlock + 1:
trace "Blocks out of order", start, final = ctx.finalizedBlock
ctx.hasOutOfOrderBlocks = true
if ctx.hasOutOfOrderBlocks:
let (index, validation) = ctx.persistPendingWorkItems()
# Only report an error if it was this peer's work item that failed
if validation == ValidationResult.Error and index == workitem:
result = ValidationResult.Error
# TODO: What about failures on other peers' work items?
# In that case the peer will probably get disconnected on future erroneous
# work items, but before this occurs, several more blocks (that will fail)
# might get downloaded from this peer. This will delay the sync and this
# should be improved.
else:
trace "Processing work item", number = wi.startIndex
# Validation result needs to be returned so that higher up can be decided
# to disconnect from this peer in case of error.
result = ctx.persistWorkItem(wi[])
else:
trace "Work item complete but we got fewer blocks than requested, so we're ditching the whole thing.",
start,
askedBlocks,
receivedBlocks
return ValidationResult.Error
proc newSyncContext(chain: AbstractChainDB, peerPool: PeerPool): SyncContext =
new result
result.chain = chain
result.peerPool = peerPool
result.trustedPeers = initHashSet[Peer]()
result.finalizedBlock = chain.getBestBlockHeader().blockNumber
proc handleLostPeer(ctx: SyncContext) =
# TODO: ask the PeerPool for new connections and then call
# `obtainBlocksFromPeer`
discard
proc getBestBlockNumber(p: Peer): Future[BlockNumber] {.async.} =
let request = BlocksRequest(
startBlock: HashOrNum(isHash: true,
hash: p.state(eth).bestBlockHash),
maxResults: 1,
skip: 0,
reverse: true)
let latestBlock = await p.getBlockHeaders(request)
if latestBlock.isSome and latestBlock.get.headers.len > 0:
result = latestBlock.get.headers[0].blockNumber
proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} =
# Update our best block number
try:
let bestBlockNumber = await peer.getBestBlockNumber()
if bestBlockNumber > syncCtx.endBlockNumber:
trace "New sync end block number", number = bestBlockNumber
syncCtx.endBlockNumber = bestBlockNumber
except TransportError:
debug "Transport got closed during obtainBlocksFromPeer"
except CatchableError as e:
debug "Exception in getBestBlockNumber()", exc = e.name, err = e.msg
# no need to exit here, because the context might still have blocks to fetch
# from this peer
while (let workItemIdx = syncCtx.availableWorkItem(); workItemIdx != -1 and
peer.connectionState notin {Disconnecting, Disconnected}):
template workItem: auto = syncCtx.workQueue[workItemIdx]
workItem.state = Requested
trace "Requesting block headers", start = workItem.startIndex,
count = workItem.numBlocks, peer = peer.remote.node
let request = BlocksRequest(
startBlock: HashOrNum(isHash: false, number: workItem.startIndex),
maxResults: workItem.numBlocks,
skip: 0,
reverse: false)
var dataReceived = false
try:
let results = await peer.getBlockHeaders(request)
if results.isSome:
shallowCopy(workItem.headers, results.get.headers)
var bodies = newSeq[BlockBody]()
var hashes = newSeq[KeccakHash]()
var nextIndex = workItem.startIndex
for i in workItem.headers:
if i.blockNumber != nextIndex:
raise newException(CatchableError, "The block numbers are not in sequence. Not processing this workItem.")
else:
nextIndex = nextIndex + 1
hashes.add(blockHash(i))
if hashes.len == maxBodiesFetch:
let b = await peer.getBlockBodies(hashes)
if b.isNone:
raise newException(CatchableError, "Was not able to get the block bodies.")
hashes.setLen(0)
bodies.add(b.get.blocks)
if hashes.len != 0:
let b = await peer.getBlockBodies(hashes)
if b.isNone:
raise newException(CatchableError, "Was not able to get the block bodies.")
bodies.add(b.get.blocks)
if bodies.len == workItem.headers.len:
shallowCopy(workItem.bodies, bodies)
dataReceived = true
else:
warn "Bodies len != headers.len", bodies = bodies.len, headers = workItem.headers.len
except TransportError:
debug "Transport got closed during obtainBlocksFromPeer"
except CatchableError as e:
# the success case sets `dataReceived`, so we can just fall back to the
# failure path below. If we signal time-outs with exceptions such
# failures will be easier to handle.
debug "Exception in obtainBlocksFromPeer()", exc = e.name, err = e.msg
var giveUpOnPeer = false
if dataReceived:
workItem.state = Received
if syncCtx.returnWorkItem(workItemIdx) != ValidationResult.OK:
giveUpOnPeer = true
else:
giveUpOnPeer = true
if giveUpOnPeer:
workItem.state = Initial
try:
await peer.disconnect(SubprotocolReason)
except CatchableError:
discard
syncCtx.handleLostPeer()
break
trace "Finished obtaining blocks", peer
proc peersAgreeOnChain(a, b: Peer): Future[bool] {.async.} =
# Returns true if one of the peers acknowledges existence of the best block
# of another peer.
var
a = a
b = b
if a.state(eth).bestDifficulty < b.state(eth).bestDifficulty:
swap(a, b)
let request = BlocksRequest(
startBlock: HashOrNum(isHash: true,
hash: b.state(eth).bestBlockHash),
maxResults: 1,
skip: 0,
reverse: true)
let latestBlock = await a.getBlockHeaders(request)
result = latestBlock.isSome and latestBlock.get.headers.len > 0
proc randomTrustedPeer(ctx: SyncContext): Peer =
var k = rand(ctx.trustedPeers.len - 1)
var i = 0
for p in ctx.trustedPeers:
result = p
if i == k: return
inc i
proc startSyncWithPeer(ctx: SyncContext, peer: Peer) {.async.} =
trace "start sync", peer, trustedPeers = ctx.trustedPeers.len
if ctx.trustedPeers.len >= minPeersToStartSync:
# We have enough trusted peers. Validate new peer against trusted
if await peersAgreeOnChain(peer, ctx.randomTrustedPeer()):
ctx.trustedPeers.incl(peer)
asyncCheck ctx.obtainBlocksFromPeer(peer)
elif ctx.trustedPeers.len == 0:
# Assume the peer is trusted, but don't start sync until we reevaluate
# it with more peers
trace "Assume trusted peer", peer
ctx.trustedPeers.incl(peer)
else:
# At this point we have some "trusted" candidates, but they are not
# "trusted" enough. We evaluate `peer` against all other candidates.
# If one of the candidates disagrees, we swap it for `peer`. If all
# candidates agree, we add `peer` to trusted set. The peers in the set
# will become "fully trusted" (and sync will start) when the set is big
# enough
var
agreeScore = 0
disagreedPeer: Peer
for tp in ctx.trustedPeers:
if await peersAgreeOnChain(peer, tp):
inc agreeScore
else:
disagreedPeer = tp
let disagreeScore = ctx.trustedPeers.len - agreeScore
if agreeScore == ctx.trustedPeers.len:
ctx.trustedPeers.incl(peer) # The best possible outcome
elif disagreeScore == 1:
trace "Peer is no longer trusted for sync", peer
ctx.trustedPeers.excl(disagreedPeer)
ctx.trustedPeers.incl(peer)
else:
trace "Peer not trusted for sync", peer
if ctx.trustedPeers.len == minPeersToStartSync:
for p in ctx.trustedPeers:
asyncCheck ctx.obtainBlocksFromPeer(p)
proc onPeerConnected(ctx: SyncContext, peer: Peer) =
trace "New candidate for sync", peer
try:
let f = ctx.startSyncWithPeer(peer)
f.callback = proc(data: pointer) {.gcsafe.} =
if f.failed:
if f.error of TransportError:
debug "Transport got closed during startSyncWithPeer"
else:
error "startSyncWithPeer failed", msg = f.readError.msg, peer
except TransportError:
debug "Transport got closed during startSyncWithPeer"
except CatchableError as e:
debug "Exception in startSyncWithPeer()", exc = e.name, err = e.msg
proc onPeerDisconnected(ctx: SyncContext, p: Peer) =
trace "peer disconnected ", peer = p
ctx.trustedPeers.excl(p)
proc startSync(ctx: SyncContext) =
var po: PeerObserver
po.onPeerConnected = proc(p: Peer) {.gcsafe.} =
ctx.onPeerConnected(p)
po.onPeerDisconnected = proc(p: Peer) {.gcsafe.} =
ctx.onPeerDisconnected(p)
po.setProtocol eth
ctx.peerPool.addObserver(ctx, po)
proc findBestPeer(node: EthereumNode): (Peer, DifficultyInt) =
var
bestBlockDifficulty: DifficultyInt = 0.stuint(256)
bestPeer: Peer = nil
for peer in node.peers(eth):
let peerEthState = peer.state(eth)
if peerEthState.initialized:
if peerEthState.bestDifficulty > bestBlockDifficulty:
bestBlockDifficulty = peerEthState.bestDifficulty
bestPeer = peer
result = (bestPeer, bestBlockDifficulty)
proc fastBlockchainSync*(node: EthereumNode): Future[SyncStatus] {.async.} =
## Code for the fast blockchain sync procedure:
## https://github.com/ethereum/wiki/wiki/Parallel-Block-Downloads
## https://github.com/ethereum/go-ethereum/pull/1889
# TODO: This needs a better interface. Consider removing this function and
# exposing SyncCtx
var syncCtx = newSyncContext(node.chain, node.peerPool)
syncCtx.startSync()

View File

@ -1,52 +0,0 @@
# nim-eth
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
../common/[eth_types, state_accessors]
# TODO: Perhaps we can move this to eth-common
proc getBlockHeaders*(db: AbstractChainDB, req: BlocksRequest): seq[BlockHeader]
{.gcsafe, raises: [RlpError, Defect].} =
result = newSeqOfCap[BlockHeader](req.maxResults)
var foundBlock: BlockHeader
if db.getBlockHeader(req.startBlock, foundBlock):
result.add foundBlock
while uint64(result.len) < req.maxResults:
if not req.reverse:
if not db.getSuccessorHeader(foundBlock, foundBlock, req.skip):
break
else:
if not db.getAncestorHeader(foundBlock, foundBlock, req.skip):
break
result.add foundBlock
template fetcher*(fetcherName, fetchingFunc, InputType, ResultType: untyped) =
proc fetcherName*(db: AbstractChainDB,
lookups: openArray[InputType]): seq[ResultType] {.gcsafe.} =
for lookup in lookups:
let fetched = fetchingFunc(db, lookup)
if fetched.hasData:
# TODO: should there be an else clause here.
# Is the peer responsible of figuring out that
# some of the requested items were not found?
result.add deref(fetched)
fetcher getContractCodes, getContractCode, ContractCodeRequest, Blob
fetcher getBlockBodies, getBlockBody, KeccakHash, BlockBody
fetcher getStorageNodes, getStorageNode, KeccakHash, Blob
fetcher getReceipts, getReceipt, KeccakHash, Receipt
fetcher getProofs, getProof, ProofRequest, Blob
fetcher getHeaderProofs, getHeaderProof, ProofRequest, Blob
proc getHelperTrieProofs*(db: AbstractChainDB,
reqs: openArray[HelperTrieProofRequest],
outNodes: var seq[Blob], outAuxData: var seq[Blob]) =
discard

View File

@ -10,12 +10,9 @@ import
std/[deques, tables],
chronos,
stew/results,
../../common/chaindb,
".."/../[rlp, keys],
".."/[enode, kademlia, discovery, rlpxcrypt]
export chaindb
const
useSnappy* = defined(useSnappy)
@ -24,7 +21,6 @@ type
EthereumNode* = ref object
networkId*: NetworkId
chain*: AbstractChainDB
clientId*: string
connectionState*: ConnectionState
keys*: KeyPair

View File

@ -1,117 +0,0 @@
#
# Ethereum P2P
# (c) Copyright 2018
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
#
## This module implements the Ethereum Wire Protocol:
## https://github.com/ethereum/wiki/wiki/Ethereum-Wire-Protocol
import
chronos, stint, chronicles,
../../rlp, ../../common/eth_types, ../../p2p,
../rlpx, ../private/p2p_types, ../blockchain_utils
type
NewBlockHashesAnnounce* = object
hash: KeccakHash
number: uint
NewBlockAnnounce* = object
header*: BlockHeader
txs*: seq[Transaction]
uncles*: seq[BlockHeader]
PeerState = ref object
initialized*: bool
bestBlockHash*: KeccakHash
bestDifficulty*: DifficultyInt
const
maxStateFetch* = 384
maxBodiesFetch* = 128
maxReceiptsFetch* = 256
maxHeadersFetch* = 192
protocolVersion* = 63
p2pProtocol eth(version = protocolVersion,
peerState = PeerState,
useRequestIds = false):
onPeerConnected do (peer: Peer):
let
network = peer.network
chain = network.chain
bestBlock = chain.getBestBlockHeader
let m = await peer.status(protocolVersion,
network.networkId,
bestBlock.difficulty,
bestBlock.blockHash,
chain.genesisHash,
timeout = chronos.seconds(10))
if m.networkId == network.networkId and m.genesisHash == chain.genesisHash:
trace "suitable peer", peer
else:
raise newException(UselessPeerError, "Eth handshake params mismatch")
peer.state.initialized = true
peer.state.bestDifficulty = m.totalDifficulty
peer.state.bestBlockHash = m.bestHash
handshake:
proc status(peer: Peer,
protocolVersion: uint,
networkId: NetworkId,
totalDifficulty: DifficultyInt,
bestHash: KeccakHash,
genesisHash: KeccakHash)
proc newBlockHashes(peer: Peer, hashes: openArray[NewBlockHashesAnnounce]) =
discard
proc transactions(peer: Peer, transactions: openArray[Transaction]) =
discard
requestResponse:
proc getBlockHeaders(peer: Peer, request: BlocksRequest) {.gcsafe.} =
if request.maxResults > uint64(maxHeadersFetch):
await peer.disconnect(BreachOfProtocol)
return
await response.send(peer.network.chain.getBlockHeaders(request))
proc blockHeaders(p: Peer, headers: openArray[BlockHeader])
requestResponse:
proc getBlockBodies(peer: Peer, hashes: openArray[KeccakHash]) {.gcsafe.} =
if hashes.len > maxBodiesFetch:
await peer.disconnect(BreachOfProtocol)
return
await response.send(peer.network.chain.getBlockBodies(hashes))
proc blockBodies(peer: Peer, blocks: openArray[BlockBody])
proc newBlock(peer: Peer, bh: NewBlockAnnounce, totalDifficulty: DifficultyInt) =
discard
nextID 13
requestResponse:
proc getNodeData(peer: Peer, hashes: openArray[KeccakHash]) =
await response.send(peer.network.chain.getStorageNodes(hashes))
proc nodeData(peer: Peer, data: openArray[Blob])
requestResponse:
proc getReceipts(peer: Peer, hashes: openArray[KeccakHash]) = discard
# TODO: implement `getReceipts` and reactivate this code
# await response.send(peer.network.chain.getReceipts(hashes))
proc receipts(peer: Peer, receipts: openArray[Receipt])

View File

@ -1,506 +0,0 @@
import
std/[tables, sets],
chronicles, chronos,
../../../rlp, ../../..//common/eth_types,
../../rlpx, ../../private/p2p_types,
./private/les_types
const
maxSamples = 100000
rechargingScale = 1000000
lesStatsKey = "les.flow_control.stats"
lesStatsVer = 0
logScope:
topics = "les flow_control"
# TODO: move this somewhere
proc pop[A, B](t: var Table[A, B], key: A): B =
result = t[key]
t.del(key)
when LesTime is SomeInteger:
template `/`(lhs, rhs: LesTime): LesTime =
lhs div rhs
when defined(testing):
var lesTime* = LesTime(0)
template now(): LesTime = lesTime
template advanceTime(t) = lesTime += LesTime(t)
else:
import times
let startTime = epochTime()
proc now(): LesTime =
return LesTime((times.epochTime() - startTime) * 1000.0)
proc addSample(ra: var StatsRunningAverage; x, y: float64) =
if ra.count >= maxSamples:
let decay = float64(ra.count + 1 - maxSamples) / maxSamples
template applyDecay(x) = x -= x * decay
applyDecay ra.sumX
applyDecay ra.sumY
applyDecay ra.sumXX
applyDecay ra.sumXY
ra.count = maxSamples - 1
inc ra.count
ra.sumX += x
ra.sumY += y
ra.sumXX += x * x
ra.sumXY += x * y
proc calc(ra: StatsRunningAverage): tuple[m, b: float] =
if ra.count == 0:
return
let count = float64(ra.count)
let d = count * ra.sumXX - ra.sumX * ra.sumX
if d < 0.001:
return (m: ra.sumY / count, b: 0.0)
result.m = (count * ra.sumXY - ra.sumX * ra.sumY) / d
result.b = (ra.sumY / count) - (result.m * ra.sumX / count)
proc currentRequestsCosts*(network: LesNetwork,
les: ProtocolInfo): seq[ReqCostInfo] =
# Make sure the message costs are already initialized
doAssert network.messageStats.len > les.messages[^1].id,
"Have you called `initFlowControl`"
for msg in les.messages:
var (m, b) = network.messageStats[msg.id].calc()
if m < 0:
b += m
m = 0
if b < 0:
b = 0
result.add ReqCostInfo(msgId: msg.id,
baseCost: ReqCostInt(b * 2),
reqCost: ReqCostInt(m * 2))
proc persistMessageStats*(db: AbstractChainDB,
network: LesNetwork) =
doAssert db != nil
# XXX: Because of the package_visible_types template magic, Nim complains
# when we pass the messageStats expression directly to `encodeList`
let stats = network.messageStats
db.setSetting(lesStatsKey, rlp.encodeList(lesStatsVer, stats))
proc loadMessageStats*(network: LesNetwork,
les: ProtocolInfo,
db: AbstractChainDB): bool =
block readFromDB:
if db == nil:
break readFromDB
var stats = db.getSetting(lesStatsKey)
if stats.len == 0:
notice "LES stats not present in the database"
break readFromDB
try:
var statsRlp = rlpFromBytes(stats)
if not statsRlp.enterList:
notice "Found a corrupted LES stats record"
break readFromDB
let version = statsRlp.read(int)
if version != lesStatsVer:
notice "Found an outdated LES stats record"
break readFromDB
statsRlp >> network.messageStats
if network.messageStats.len <= les.messages[^1].id:
notice "Found an incomplete LES stats record"
break readFromDB
return true
except RlpError as e:
error "Error while loading LES message stats", err = e.msg
newSeq(network.messageStats, les.messages[^1].id + 1)
return false
proc update(s: var FlowControlState, t: LesTime) =
let dt = max(t - s.lastUpdate, LesTime(0))
s.bufValue = min(
s.bufValue + s.minRecharge * dt,
s.bufLimit)
s.lastUpdate = t
proc init(s: var FlowControlState,
bufLimit: BufValueInt, minRecharge: int, t: LesTime) =
s.bufValue = bufLimit
s.bufLimit = bufLimit
s.minRecharge = minRecharge
s.lastUpdate = t
func canMakeRequest(s: FlowControlState,
maxCost: ReqCostInt): (LesTime, float64) =
## Returns the required waiting time before sending a request and
## the estimated buffer level afterwards (as a fraction of the limit)
const safetyMargin = 50
var maxCost = min(
maxCost + safetyMargin * s.minRecharge,
s.bufLimit)
if s.bufValue >= maxCost:
result[1] = float64(s.bufValue - maxCost) / float64(s.bufLimit)
else:
result[0] = (maxCost - s.bufValue) / s.minRecharge
func canServeRequest(srv: LesNetwork): bool =
result = srv.reqCount < srv.maxReqCount and
srv.reqCostSum < srv.maxReqCostSum
proc rechargeReqCost(peer: LesPeer, t: LesTime) =
let dt = t - peer.lastRechargeTime
peer.reqCostVal += peer.reqCostGradient * dt / rechargingScale
peer.lastRechargeTime = t
if peer.isRecharging and t >= peer.rechargingEndsAt:
peer.isRecharging = false
peer.reqCostGradient = 0
peer.reqCostVal = 0
proc updateRechargingParams(peer: LesPeer, network: LesNetwork) =
peer.reqCostGradient = 0
if peer.reqCount > 0:
peer.reqCostGradient = rechargingScale / network.reqCount
if peer.isRecharging:
peer.reqCostGradient = (network.rechargingRate * (peer.rechargingPower /
network.totalRechargingPower).int64).int
peer.rechargingEndsAt = peer.lastRechargeTime +
LesTime(peer.reqCostVal * rechargingScale /
-peer.reqCostGradient )
proc trackRequests(network: LesNetwork, peer: LesPeer, reqCountChange: int) =
peer.reqCount += reqCountChange
network.reqCount += reqCountChange
doAssert peer.reqCount >= 0 and network.reqCount >= 0
if peer.reqCount == 0:
# All requests have been finished. Start recharging.
peer.isRecharging = true
network.totalRechargingPower += peer.rechargingPower
elif peer.reqCount == reqCountChange and peer.isRecharging:
# `peer.reqCount` must have been 0 for the condition above to hold.
# This is a transition from recharging to serving state.
peer.isRecharging = false
network.totalRechargingPower -= peer.rechargingPower
peer.startReqCostVal = peer.reqCostVal
updateRechargingParams peer, network
proc updateFlowControl(network: LesNetwork, t: LesTime) =
while true:
var firstTime = t
for peer in network.peers:
# TODO: perhaps use a bin heap here
if peer.isRecharging and peer.rechargingEndsAt < firstTime:
firstTime = peer.rechargingEndsAt
let rechargingEndedForSomePeer = firstTime < t
network.reqCostSum = 0
for peer in network.peers:
peer.rechargeReqCost firstTime
network.reqCostSum += peer.reqCostVal
if rechargingEndedForSomePeer:
for peer in network.peers:
if peer.isRecharging:
updateRechargingParams peer, network
else:
network.lastUpdate = t
return
proc endPendingRequest*(network: LesNetwork, peer: LesPeer, t: LesTime) =
if peer.reqCount > 0:
network.updateFlowControl t
network.trackRequests peer, -1
network.updateFlowControl t
proc enlistInFlowControl*(network: LesNetwork,
peer: LesPeer,
peerRechargingPower = 100) =
let t = now()
doAssert peer.isServer or peer.isClient
# Each Peer must be potential communication partner for us.
# There will be useless peers on the network, but the logic
# should make sure to disconnect them earlier in `onPeerConnected`.
if peer.isServer:
peer.localFlowState.init network.bufferLimit, network.minRechargingRate, t
peer.pendingReqs = initTable[int, ReqCostInt]()
if peer.isClient:
peer.remoteFlowState.init network.bufferLimit, network.minRechargingRate, t
peer.lastRechargeTime = t
peer.rechargingEndsAt = t
peer.rechargingPower = peerRechargingPower
network.updateFlowControl t
proc delistFromFlowControl*(network: LesNetwork, peer: LesPeer) =
let t = now()
# XXX: perhaps this is not safe with our reqCount logic.
# The original code may depend on the binarity of the `serving` flag.
network.endPendingRequest peer, t
network.updateFlowControl t
proc initFlowControl*(network: LesNetwork, les: ProtocolInfo,
maxReqCount, maxReqCostSum, reqCostTarget: int,
db: AbstractChainDB = nil) =
network.rechargingRate = rechargingScale * (rechargingScale /
(100 * rechargingScale / reqCostTarget - rechargingScale))
network.maxReqCount = maxReqCount
network.maxReqCostSum = maxReqCostSum
if not network.loadMessageStats(les, db):
warn "Failed to load persisted LES message stats. " &
"Flow control will be re-initilized."
proc canMakeRequest(peer: var LesPeer, maxCost: int): (LesTime, float64) =
peer.localFlowState.update now()
return peer.localFlowState.canMakeRequest(maxCost)
template getRequestCost(peer: LesPeer, localOrRemote: untyped,
msgId, costQuantity: int): ReqCostInt =
let
baseCost = peer.`localOrRemote ReqCosts`[msgId].baseCost
reqCost = peer.`localOrRemote ReqCosts`[msgId].reqCost
min(baseCost + reqCost * costQuantity,
peer.`localOrRemote FlowState`.bufLimit)
proc trackOutgoingRequest*(network: LesNetwork, peer: LesPeer,
msgId, reqId, costQuantity: int) =
let maxCost = peer.getRequestCost(local, msgId, costQuantity)
peer.localFlowState.bufValue -= maxCost
peer.pendingReqsCost += maxCost
peer.pendingReqs[reqId] = peer.pendingReqsCost
proc trackIncomingResponse*(peer: LesPeer, reqId: int, bv: BufValueInt) =
let bv = min(bv, peer.localFlowState.bufLimit)
if not peer.pendingReqs.hasKey(reqId):
return
let costsSumAtSending = peer.pendingReqs.pop(reqId)
let costsSumChange = peer.pendingReqsCost - costsSumAtSending
peer.localFlowState.bufValue = if bv > costsSumChange: bv - costsSumChange
else: 0
peer.localFlowState.lastUpdate = now()
proc acceptRequest*(network: LesNetwork, peer: LesPeer,
msgId, costQuantity: int): Future[bool] {.async.} =
let t = now()
let reqCost = peer.getRequestCost(remote, msgId, costQuantity)
peer.remoteFlowState.update t
network.updateFlowControl t
while not network.canServeRequest:
await sleepAsync(chronos.milliseconds(10))
if peer notin network.peers:
# The peer was disconnected or the network
# was shut down while we waited
return false
network.trackRequests peer, +1
network.updateFlowControl network.lastUpdate
if reqCost > peer.remoteFlowState.bufValue:
error "LES peer sent request too early",
recharge = (reqCost - peer.remoteFlowState.bufValue) * rechargingScale /
peer.remoteFlowState.minRecharge
return false
return true
proc bufValueAfterRequest*(network: LesNetwork, peer: LesPeer,
msgId: int, quantity: int): BufValueInt =
let t = now()
let costs = peer.remoteReqCosts[msgId]
var reqCost = costs.baseCost + quantity * costs.reqCost
peer.remoteFlowState.update t
peer.remoteFlowState.bufValue -= reqCost
network.endPendingRequest peer, t
let curReqCost = peer.reqCostVal
if curReqCost < peer.remoteFlowState.bufLimit:
let bv = peer.remoteFlowState.bufLimit - curReqCost
if bv > peer.remoteFlowState.bufValue:
peer.remoteFlowState.bufValue = bv
network.messageStats[msgId].addSample(float64(quantity),
float64(curReqCost - peer.startReqCostVal))
return peer.remoteFlowState.bufValue
when defined(testing):
import unittest2, random, ../../rlpx
proc isMax(s: FlowControlState): bool =
s.bufValue == s.bufLimit
p2pProtocol dummyLes(version = 1, rlpxName = "abc"):
proc a(p: Peer)
proc b(p: Peer)
proc c(p: Peer)
proc d(p: Peer)
proc e(p: Peer)
template fequals(lhs, rhs: float64, epsilon = 0.0001): bool =
abs(lhs-rhs) < epsilon
proc tests* =
randomize(3913631)
suite "les flow control":
suite "running averages":
test "consistent costs":
var s: StatsRunningAverage
for i in 0..100:
s.addSample(5.0, 100.0)
let (cost, base) = s.calc
check:
fequals(cost, 100.0)
fequals(base, 0.0)
test "randomized averages":
proc performTest(qBase, qRandom: int, cBase, cRandom: float64) =
var
s: StatsRunningAverage
expectedFinalCost = cBase + cRandom / 2
error = expectedFinalCost
for samples in [100, 1000, 10000]:
for i in 0..samples:
let q = float64(qBase + rand(10))
s.addSample(q, q * (cBase + rand(cRandom)))
let (newCost, newBase) = s.calc
# With more samples, our error should decrease, getting
# closer and closer to the average (unless we are already close enough)
let newError = abs(newCost - expectedFinalCost)
# This check fails with Nim-1.6:
# check newError < error
error = newError
# After enough samples we should be very close the the final result
check error < (expectedFinalCost * 0.02)
performTest(1, 10, 5.0, 100.0)
performTest(1, 4, 200.0, 1000.0)
suite "buffer value calculations":
type TestReq = object
peer: LesPeer
msgId, quantity: int
accepted: bool
setup:
var lesNetwork = new LesNetwork
lesNetwork.peers = initHashSet[LesPeer]()
lesNetwork.initFlowControl(dummyLes.protocolInfo,
reqCostTarget = 300,
maxReqCount = 5,
maxReqCostSum = 1000)
for i in 0 ..< lesNetwork.messageStats.len:
lesNetwork.messageStats[i].addSample(1.0, float(i) * 100.0)
var client = new LesPeer
client.isClient = true
var server = new LesPeer
server.isServer = true
var clientServer = new LesPeer
clientServer.isClient = true
clientServer.isServer = true
var client2 = new LesPeer
client2.isClient = true
var client3 = new LesPeer
client3.isClient = true
var bv: BufValueInt
template enlist(peer: LesPeer) {.dirty.} =
let reqCosts = currentRequestsCosts(lesNetwork, dummyLes.protocolInfo)
peer.remoteReqCosts = reqCosts
peer.localReqCosts = reqCosts
lesNetwork.peers.incl peer
lesNetwork.enlistInFlowControl peer
template startReq(p: LesPeer, msg, q: int): TestReq =
var req: TestReq
req.peer = p
req.msgId = msg
req.quantity = q
req.accepted = waitFor lesNetwork.acceptRequest(p, msg, q)
req
template endReq(req: TestReq): BufValueInt =
bufValueAfterRequest(lesNetwork, req.peer, req.msgId, req.quantity)
test "single peer recharging":
lesNetwork.bufferLimit = 1000
lesNetwork.minRechargingRate = 100
enlist client
check:
client.remoteFlowState.isMax
client.rechargingPower > 0
advanceTime 100
let r1 = client.startReq(0, 100)
check r1.accepted
check client.isRecharging == false
advanceTime 50
let r2 = client.startReq(1, 1)
check r2.accepted
check client.isRecharging == false
advanceTime 25
bv = endReq r2
check client.isRecharging == false
advanceTime 130
bv = endReq r1
check client.isRecharging == true
advanceTime 300
lesNetwork.updateFlowControl now()
check:
client.isRecharging == false
client.remoteFlowState.isMax

View File

@ -1,111 +0,0 @@
import
std/[hashes, tables, sets],
../../../../common/eth_types
type
AnnounceType* = enum
None,
Simple,
Signed,
Unspecified
ReqCostInfo* = object
msgId*: int
baseCost*, reqCost*: ReqCostInt
FlowControlState* = object
bufValue*, bufLimit*: int
minRecharge*: int
lastUpdate*: LesTime
StatsRunningAverage* = object
sumX*, sumY*, sumXX*, sumXY*: float64
count*: int
LesPeer* = ref object
isServer*: bool
isClient*: bool
announceType*: AnnounceType
bestDifficulty*: DifficultyInt
bestBlockHash*: KeccakHash
bestBlockNumber*: BlockNumber
hasChainSince*: HashOrNum
hasStateSince*: HashOrNum
relaysTransactions*: bool
# The variables below are used to implement the flow control
# mechanisms of LES from our point of view as a server.
# They describe how much load has been generated by this
# particular peer.
reqCount*: int # How many outstanding requests are there?
#
rechargingPower*: int # Do we give this peer any extra priority
# (implemented as a faster recharning rate)
# 100 is the default. You can go higher and lower.
#
isRecharging*: bool # This is true while the peer is not making
# any requests
#
reqCostGradient*: int # Measures the speed of recharging or accumulating
# "requests cost" at any given moment.
#
reqCostVal*: int # The accumulated "requests cost"
#
rechargingEndsAt*: int # When will recharging end?
# (the buffer of the Peer will be fully restored)
#
lastRechargeTime*: LesTime # When did we last update the recharging parameters
#
startReqCostVal*: int # TODO
remoteFlowState*: FlowControlState
remoteReqCosts*: seq[ReqCostInfo]
# The next variables are used to limit ourselves as a client in order to
# not violate the control-flow requirements of the remote LES server.
pendingReqs*: Table[int, ReqCostInt]
pendingReqsCost*: int
localFlowState*: FlowControlState
localReqCosts*: seq[ReqCostInfo]
LesNetwork* = ref object
peers*: HashSet[LesPeer]
messageStats*: seq[StatsRunningAverage]
ourAnnounceType*: AnnounceType
# The fields below are relevant when serving data.
bufferLimit*: int
minRechargingRate*: int
reqCostSum*, maxReqCostSum*: ReqCostInt
reqCount*, maxReqCount*: int
sumWeigth*: int
rechargingRate*: int64
totalRechargedUnits*: int
totalRechargingPower*: int
lastUpdate*: LesTime
KeyValuePair* = object
key*: string
value*: Blob
HandshakeError* = object of CatchableError
LesTime* = int # this is in milliseconds
BufValueInt* = int
ReqCostInt* = int
template hash*(peer: LesPeer): Hash = hash(cast[pointer](peer))
template areWeServingData*(network: LesNetwork): bool =
network.maxReqCount != 0
template areWeRequestingData*(network: LesNetwork): bool =
network.ourAnnounceType != AnnounceType.Unspecified

View File

@ -1,460 +0,0 @@
#
# Ethereum P2P
# (c) Copyright 2018
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
#
import
std/[times, tables, options, sets, hashes, strutils],
stew/shims/macros, chronicles, chronos, nimcrypto/[keccak, hash],
".."/../[rlp, keys], ../../common/eth_types,
".."/[rlpx, kademlia, blockchain_utils], ../private/p2p_types,
./les/private/les_types, ./les/flow_control
export
les_types
const
lesVersion = 2
maxHeadersFetch = 192
maxBodiesFetch = 32
maxReceiptsFetch = 128
maxCodeFetch = 64
maxProofsFetch = 64
maxHeaderProofsFetch = 64
maxTransactionsFetch = 64
# Handshake properties:
# https://github.com/zsfelfoldi/go-ethereum/wiki/Light-Ethereum-Subprotocol-(LES)
keyProtocolVersion = "protocolVersion"
## P: is 1 for the LPV1 protocol version.
keyNetworkId = "networkId"
## P: should be 0 for testnet, 1 for mainnet.
keyHeadTotalDifficulty = "headTd"
## P: Total Difficulty of the best chain.
## Integer, as found in block header.
keyHeadHash = "headHash"
## B_32: the hash of the best (i.e. highest TD) known block.
keyHeadNumber = "headNum"
## P: the number of the best (i.e. highest TD) known block.
keyGenesisHash = "genesisHash"
## B_32: the hash of the Genesis block.
keyServeHeaders = "serveHeaders"
## (optional, no value)
## present if the peer can serve header chain downloads.
keyServeChainSince = "serveChainSince"
## P (optional)
## present if the peer can serve Body/Receipts ODR requests
## starting from the given block number.
keyServeStateSince = "serveStateSince"
## P (optional):
## present if the peer can serve Proof/Code ODR requests
## starting from the given block number.
keyRelaysTransactions = "txRelay"
## (optional, no value)
## present if the peer can relay transactions to the ETH network.
keyFlowControlBL = "flowControl/BL"
keyFlowControlMRC = "flowControl/MRC"
keyFlowControlMRR = "flowControl/MRR"
## see Client Side Flow Control:
## https://github.com/zsfelfoldi/go-ethereum/wiki/Client-Side-Flow-Control-model-for-the-LES-protocol
keyAnnounceType = "announceType"
keyAnnounceSignature = "sign"
proc initProtocolState(network: LesNetwork, node: EthereumNode) {.gcsafe.} =
network.peers = initHashSet[LesPeer]()
proc addPeer(network: LesNetwork, peer: LesPeer) =
network.enlistInFlowControl peer
network.peers.incl peer
proc removePeer(network: LesNetwork, peer: LesPeer) =
network.delistFromFlowControl peer
network.peers.excl peer
template costQuantity(quantityExpr, max: untyped) {.pragma.}
proc getCostQuantity(fn: NimNode): tuple[quantityExpr, maxQuantity: NimNode] =
# XXX: `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes
# (TODO: file as an issue)
let costQuantity = fn.pragma.findPragma(bindSym"costQuantity")
doAssert costQuantity != nil
result.quantityExpr = costQuantity[1]
result.maxQuantity= costQuantity[2]
if result.maxQuantity.kind == nnkExprEqExpr:
result.maxQuantity = result.maxQuantity[1]
macro outgoingRequestDecorator(n: untyped): untyped =
result = n
let (costQuantity, maxQuantity) = n.getCostQuantity
result.body.add quote do:
trackOutgoingRequest(peer.networkState(les),
peer.state(les),
perProtocolMsgId, reqId, `costQuantity`)
# echo result.repr
macro incomingResponseDecorator(n: untyped): untyped =
result = n
let trackingCall = quote do:
trackIncomingResponse(peer.state(les), reqId, msg.bufValue)
result.body.insert(n.body.len - 1, trackingCall)
# echo result.repr
macro incomingRequestDecorator(n: untyped): untyped =
result = n
let (costQuantity, maxQuantity) = n.getCostQuantity
template acceptStep(quantityExpr, maxQuantity) {.dirty.} =
let requestCostQuantity = quantityExpr
if requestCostQuantity > maxQuantity:
await peer.disconnect(BreachOfProtocol)
return
let lesPeer = peer.state
let lesNetwork = peer.networkState
if not await acceptRequest(lesNetwork, lesPeer,
perProtocolMsgId,
requestCostQuantity): return
result.body.insert(1, getAst(acceptStep(costQuantity, maxQuantity)))
# echo result.repr
template updateBV: BufValueInt =
bufValueAfterRequest(lesNetwork, lesPeer,
perProtocolMsgId, requestCostQuantity)
func getValue(values: openArray[KeyValuePair],
key: string, T: typedesc): Option[T] =
for v in values:
if v.key == key:
return some(rlp.decode(v.value, T))
func getRequiredValue(values: openArray[KeyValuePair],
key: string, T: typedesc): T =
for v in values:
if v.key == key:
return rlp.decode(v.value, T)
raise newException(HandshakeError,
"Required handshake field " & key & " missing")
p2pProtocol les(version = lesVersion,
peerState = LesPeer,
networkState = LesNetwork,
outgoingRequestDecorator = outgoingRequestDecorator,
incomingRequestDecorator = incomingRequestDecorator,
incomingResponseThunkDecorator = incomingResponseDecorator):
handshake:
proc status(p: Peer, values: openArray[KeyValuePair])
onPeerConnected do (peer: Peer):
let
network = peer.network
chain = network.chain
bestBlock = chain.getBestBlockHeader
lesPeer = peer.state
lesNetwork = peer.networkState
template `=>`(k, v: untyped): untyped =
KeyValuePair(key: k, value: rlp.encode(v))
var lesProperties = @[
keyProtocolVersion => lesVersion,
keyNetworkId => network.networkId,
keyHeadTotalDifficulty => bestBlock.difficulty,
keyHeadHash => bestBlock.blockHash,
keyHeadNumber => bestBlock.blockNumber,
keyGenesisHash => chain.genesisHash
]
lesPeer.remoteReqCosts = currentRequestsCosts(lesNetwork, les.protocolInfo)
if lesNetwork.areWeServingData:
lesProperties.add [
# keyServeHeaders => nil,
keyServeChainSince => 0,
keyServeStateSince => 0,
# keyRelaysTransactions => nil,
keyFlowControlBL => lesNetwork.bufferLimit,
keyFlowControlMRR => lesNetwork.minRechargingRate,
keyFlowControlMRC => lesPeer.remoteReqCosts
]
if lesNetwork.areWeRequestingData:
lesProperties.add(keyAnnounceType => lesNetwork.ourAnnounceType)
let
s = await peer.status(lesProperties, timeout = chronos.seconds(10))
peerNetworkId = s.values.getRequiredValue(keyNetworkId, NetworkId)
peerGenesisHash = s.values.getRequiredValue(keyGenesisHash, KeccakHash)
peerLesVersion = s.values.getRequiredValue(keyProtocolVersion, uint)
template requireCompatibility(peerVar, localVar, varName: untyped) =
if localVar != peerVar:
raise newException(HandshakeError,
"Incompatibility detected! $1 mismatch ($2 != $3)" %
[varName, $localVar, $peerVar])
requireCompatibility(peerLesVersion, uint(lesVersion), "les version")
requireCompatibility(peerNetworkId, network.networkId, "network id")
requireCompatibility(peerGenesisHash, chain.genesisHash, "genesis hash")
template `:=`(lhs, key) =
lhs = s.values.getRequiredValue(key, type(lhs))
lesPeer.bestBlockHash := keyHeadHash
lesPeer.bestBlockNumber := keyHeadNumber
lesPeer.bestDifficulty := keyHeadTotalDifficulty
let peerAnnounceType = s.values.getValue(keyAnnounceType, AnnounceType)
if peerAnnounceType.isSome:
lesPeer.isClient = true
lesPeer.announceType = peerAnnounceType.get
else:
lesPeer.announceType = AnnounceType.Simple
lesPeer.hasChainSince := keyServeChainSince
lesPeer.hasStateSince := keyServeStateSince
lesPeer.relaysTransactions := keyRelaysTransactions
lesPeer.localFlowState.bufLimit := keyFlowControlBL
lesPeer.localFlowState.minRecharge := keyFlowControlMRR
lesPeer.localReqCosts := keyFlowControlMRC
lesNetwork.addPeer lesPeer
onPeerDisconnected do (peer: Peer, reason: DisconnectionReason) {.gcsafe.}:
peer.networkState.removePeer peer.state
## Header synchronisation
##
proc announce(
peer: Peer,
headHash: KeccakHash,
headNumber: BlockNumber,
headTotalDifficulty: DifficultyInt,
reorgDepth: BlockNumber,
values: openArray[KeyValuePair],
announceType: AnnounceType) =
if peer.state.announceType == AnnounceType.None:
error "unexpected announce message", peer
return
if announceType == AnnounceType.Signed:
let signature = values.getValue(keyAnnounceSignature, Blob)
if signature.isNone:
error "missing announce signature"
return
let sig = Signature.fromRaw(signature.get).tryGet()
let sigMsg = rlp.encodeList(headHash, headNumber, headTotalDifficulty)
let signerKey = recover(sig, sigMsg).tryGet()
if signerKey.toNodeId != peer.remote.id:
error "invalid announce signature"
# TODO: should we disconnect this peer?
return
# TODO: handle new block
requestResponse:
proc getBlockHeaders(
peer: Peer,
req: BlocksRequest) {.
costQuantity(req.maxResults.int, max = maxHeadersFetch).} =
let headers = peer.network.chain.getBlockHeaders(req)
await response.send(updateBV(), headers)
proc blockHeaders(
peer: Peer,
bufValue: BufValueInt,
blocks: openArray[BlockHeader])
## On-damand data retrieval
##
requestResponse:
proc getBlockBodies(
peer: Peer,
blocks: openArray[KeccakHash]) {.
costQuantity(blocks.len, max = maxBodiesFetch), gcsafe.} =
let blocks = peer.network.chain.getBlockBodies(blocks)
await response.send(updateBV(), blocks)
proc blockBodies(
peer: Peer,
bufValue: BufValueInt,
bodies: openArray[BlockBody])
requestResponse:
proc getReceipts(
peer: Peer,
hashes: openArray[KeccakHash])
{.costQuantity(hashes.len, max = maxReceiptsFetch).} =
let receipts = peer.network.chain.getReceipts(hashes)
await response.send(updateBV(), receipts)
proc receipts(
peer: Peer,
bufValue: BufValueInt,
receipts: openArray[Receipt])
requestResponse:
proc getProofs(
peer: Peer,
proofs: openArray[ProofRequest]) {.
costQuantity(proofs.len, max = maxProofsFetch).} =
let proofs = peer.network.chain.getProofs(proofs)
await response.send(updateBV(), proofs)
proc proofs(
peer: Peer,
bufValue: BufValueInt,
proofs: openArray[Blob])
requestResponse:
proc getContractCodes(
peer: Peer,
reqs: seq[ContractCodeRequest]) {.
costQuantity(reqs.len, max = maxCodeFetch).} =
let results = peer.network.chain.getContractCodes(reqs)
await response.send(updateBV(), results)
proc contractCodes(
peer: Peer,
bufValue: BufValueInt,
results: seq[Blob])
nextID 15
requestResponse:
proc getHeaderProofs(
peer: Peer,
reqs: openArray[ProofRequest]) {.
costQuantity(reqs.len, max = maxHeaderProofsFetch).} =
let proofs = peer.network.chain.getHeaderProofs(reqs)
await response.send(updateBV(), proofs)
proc headerProofs(
peer: Peer,
bufValue: BufValueInt,
proofs: openArray[Blob])
requestResponse:
proc getHelperTrieProofs(
peer: Peer,
reqs: openArray[HelperTrieProofRequest]) {.
costQuantity(reqs.len, max = maxProofsFetch).} =
var nodes, auxData: seq[Blob]
peer.network.chain.getHelperTrieProofs(reqs, nodes, auxData)
await response.send(updateBV(), nodes, auxData)
proc helperTrieProofs(
peer: Peer,
bufValue: BufValueInt,
nodes: seq[Blob],
auxData: seq[Blob])
## Transaction relaying and status retrieval
##
requestResponse:
proc sendTxV2(
peer: Peer,
transactions: openArray[Transaction]) {.
costQuantity(transactions.len, max = maxTransactionsFetch).} =
let chain = peer.network.chain
var results: seq[TransactionStatusMsg]
for t in transactions:
let hash = t.rlpHash # TODO: this is not optimal, we can compute
# the hash from the request bytes.
# The RLP module can offer a helper Hashed[T]
# to make this easy.
var s = chain.getTransactionStatus(hash)
if s.status == TransactionStatus.Unknown:
chain.addTransactions([t])
s = chain.getTransactionStatus(hash)
results.add s
await response.send(updateBV(), results)
proc getTxStatus(
peer: Peer,
transactions: openArray[Transaction]) {.
costQuantity(transactions.len, max = maxTransactionsFetch).} =
let chain = peer.network.chain
var results: seq[TransactionStatusMsg]
for t in transactions:
results.add chain.getTransactionStatus(t.rlpHash)
await response.send(updateBV(), results)
proc txStatus(
peer: Peer,
bufValue: BufValueInt,
transactions: openArray[TransactionStatusMsg])
proc configureLes*(node: EthereumNode,
# Client options:
announceType = AnnounceType.Simple,
# Server options.
# The zero default values indicate that the
# LES server will be deactivated.
maxReqCount = 0,
maxReqCostSum = 0,
reqCostTarget = 0) =
doAssert announceType != AnnounceType.Unspecified or maxReqCount > 0
var lesNetwork = node.protocolState(les)
lesNetwork.ourAnnounceType = announceType
initFlowControl(lesNetwork, les.protocolInfo,
maxReqCount, maxReqCostSum, reqCostTarget,
node.chain)
proc configureLesServer*(node: EthereumNode,
# Client options:
announceType = AnnounceType.Unspecified,
# Server options.
# The zero default values indicate that the
# LES server will be deactivated.
maxReqCount = 0,
maxReqCostSum = 0,
reqCostTarget = 0) =
## This is similar to `configureLes`, but with default parameter
## values appropriate for a server.
node.configureLes(announceType, maxReqCount, maxReqCostSum, reqCostTarget)
proc persistLesMessageStats*(node: EthereumNode) =
persistMessageStats(node.chain, node.protocolState(les))

View File

@ -1,45 +0,0 @@
import
std/times,
chronos
type
FullNodeSyncer* = ref object
chaindb: ChainDB
FastChainSyncer = ref object
RegularChainSyncer = ref object
# How old (in seconds) must our local head be to cause us to start with a fast-sync before we
# switch to regular-sync.
const FAST_SYNC_CUTOFF = 60 * 60 * 24
proc run(s: FullNodeSyncer) {.async.} =
let head = await s.chaindb.getCanonicalHead()
# We're still too slow at block processing, so if our local head is older than
# FAST_SYNC_CUTOFF we first do a fast-sync run to catch up with the rest of the network.
# See https://github.com/ethereum/py-evm/issues/654 for more details
if head.timestamp < epochTime() - FAST_SYNC_CUTOFF:
# Fast-sync chain data.
self.logger.info("Starting fast-sync; current head: #%d", head.block_number)
chain_syncer = FastChainSyncer(self.chaindb, self.peer_pool, self.cancel_token)
await chain_syncer.run()
# Ensure we have the state for our current head.
head = await self.wait(self.chaindb.coro_get_canonical_head())
if head.state_root != EMPTY_ROOT_HASH and head.state_root not in self.base_db:
self.logger.info(
"Missing state for current head (#%d), downloading it", head.block_number)
downloader = StateDownloader(
self.base_db, head.state_root, self.peer_pool, self.cancel_token)
await downloader.run()
# Now, loop forever, fetching missing blocks and applying them.
self.logger.info("Starting regular sync; current head: #%d", head.block_number)
# This is a bit of a hack, but self.chain is stuck in the past as during the fast-sync we
# did not use it to import the blocks, so we need this to get a Chain instance with our
# latest head so that we can start importing blocks.
new_chain = type(self.chain)(self.base_db)
chain_syncer = RegularChainSyncer(
new_chain, self.chaindb, self.peer_pool, self.cancel_token)
await chain_syncer.run()

View File

@ -1,7 +1,7 @@
import
testutils/fuzzing, chronos,
../../../eth/p2p, ../../../eth/p2p/rlpx, ../../../eth/p2p/private/p2p_types,
../../../eth/p2p/rlpx_protocols/eth_protocol,
../../p2p/eth_protocol,
../../p2p/p2p_test_helper
var

View File

@ -6,5 +6,4 @@ import
./test_ecies,
./test_enode,
./test_rlpx_thunk,
./test_protocol_handlers,
./les/test_flow_control
./test_protocol_handlers

View File

@ -0,0 +1,51 @@
import
chronos,
../../eth/[p2p, common]
# for testing purpose
# real eth protocol implementation is in nimbus-eth1 repo
type
PeerState = ref object
initialized*: bool
p2pProtocol eth(version = 63,
peerState = PeerState,
useRequestIds = false):
onPeerConnected do (peer: Peer):
let
network = peer.network
let m = await peer.status(63,
network.networkId,
0.u256,
Hash256(),
Hash256(),
timeout = chronos.seconds(10))
handshake:
proc status(peer: Peer,
protocolVersion: uint,
networkId: NetworkId,
totalDifficulty: DifficultyInt,
bestHash: KeccakHash,
genesisHash: KeccakHash)
requestResponse:
proc getBlockHeaders(peer: Peer, request: openArray[KeccakHash]) {.gcsafe.} = discard
proc blockHeaders(p: Peer, headers: openArray[BlockHeader])
requestResponse:
proc getBlockBodies(peer: Peer, hashes: openArray[KeccakHash]) {.gcsafe.} = discard
proc blockBodies(peer: Peer, blocks: openArray[BlockBody])
nextID 13
requestResponse:
proc getNodeData(peer: Peer, hashes: openArray[KeccakHash]) = discard
proc nodeData(peer: Peer, data: openArray[Blob])
requestResponse:
proc getReceipts(peer: Peer, hashes: openArray[KeccakHash]) = discard
proc receipts(peer: Peer, receipts: openArray[Receipt])

View File

@ -1,7 +0,0 @@
{.used.}
import
../../../eth/p2p/rlpx_protocols/les/flow_control
flow_control.tests()

View File

@ -16,7 +16,7 @@ proc setupTestNode*(
# Don't create new RNG every time in production code!
let keys1 = KeyPair.random(rng[])
var node = newEthereumNode(
keys1, localAddress(nextPort), NetworkId(1), nil,
keys1, localAddress(nextPort), NetworkId(1),
addAllCapabilities = false,
bindUdpPort = Port(nextPort), bindTcpPort = Port(nextPort),
rng = rng)

View File

@ -2,10 +2,11 @@
import
std/[json, os],
unittest2,
unittest2, stint,
chronos, stew/byteutils,
../../eth/p2p, ../../eth/p2p/rlpx_protocols/eth_protocol,
./p2p_test_helper
../../eth/[p2p, common],
./p2p_test_helper,
./eth_protocol
let rng = newRng()