The ETH and LES protocols now compile as part of nimbus

This commit is contained in:
Zahary Karadjov 2018-07-23 00:32:45 +03:00
parent 4335052e9f
commit 41e92ca8df
3 changed files with 98 additions and 56 deletions

View File

@ -16,28 +16,28 @@ import
eth_p2p/[kademlia, discovery, auth, rlpxcrypt, enode] eth_p2p/[kademlia, discovery, auth, rlpxcrypt, enode]
export export
enode, kademlia enode, kademlia, options
type type
EthereumNode* = ref object EthereumNode* = ref object
networkId*: int networkId*: uint
chain*: AbstractChainDB chain*: AbstractChainDB
clientId*: string clientId*: string
connectionState: ConnectionState connectionState*: ConnectionState
keys: KeyPair keys*: KeyPair
address: Address address*: Address
rlpxCapabilities: seq[Capability] rlpxCapabilities: seq[Capability]
rlpxProtocols: seq[ProtocolInfo] rlpxProtocols: seq[ProtocolInfo]
listeningServer: StreamServer listeningServer: StreamServer
protocolStates: seq[RootRef] protocolStates: seq[RootRef]
discovery: DiscoveryProtocol discovery: DiscoveryProtocol
peerPool: PeerPool peerPool*: PeerPool
Peer* = ref object Peer* = ref object
transp: StreamTransport transp: StreamTransport
dispatcher: Dispatcher dispatcher: Dispatcher
nextReqId: int nextReqId: int
network: EthereumNode network*: EthereumNode
secretsState: SecretState secretsState: SecretState
connectionState: ConnectionState connectionState: ConnectionState
remote*: Node remote*: Node
@ -53,7 +53,7 @@ type
PeerPool* = ref object PeerPool* = ref object
network: EthereumNode network: EthereumNode
keyPair: KeyPair keyPair: KeyPair
networkId: int networkId: uint
minPeers: int minPeers: int
clientId: string clientId: string
discovery: DiscoveryProtocol discovery: DiscoveryProtocol
@ -265,7 +265,10 @@ proc cmp*(lhs, rhs: ProtocolInfo): int {.inline.} =
return 0 return 0
proc messagePrinter[MsgType](msg: pointer): string = proc messagePrinter[MsgType](msg: pointer): string =
result = $(cast[ptr MsgType](msg)[]) result = ""
# TODO: uncommenting the line below increases the compile-time
# tremendously (for reasons not yet known)
# result = $(cast[ptr MsgType](msg)[])
proc nextMsgResolver[MsgType](msgData: Rlp, future: FutureBase) = proc nextMsgResolver[MsgType](msgData: Rlp, future: FutureBase) =
var reader = msgData var reader = msgData
@ -343,7 +346,7 @@ proc sendMsg(p: Peer, data: BytesRange) {.async.} =
var cipherText = encryptMsg(data, p.secretsState) var cipherText = encryptMsg(data, p.secretsState)
discard await p.transp.write(cipherText) discard await p.transp.write(cipherText)
proc registerRequest(peer: Peer, proc registerRequest*(peer: Peer,
timeout: int, timeout: int,
responseFuture: FutureBase, responseFuture: FutureBase,
responseMsgId: int): int = responseMsgId: int): int =
@ -615,6 +618,7 @@ macro rlpxProtocol*(protoIdentifier: untyped,
sendMsg = bindSym "sendMsg" sendMsg = bindSym "sendMsg"
startList = bindSym "startList" startList = bindSym "startList"
writeMsgId = bindSym "writeMsgId" writeMsgId = bindSym "writeMsgId"
getState = bindSym "getState"
# By convention, all Ethereum protocol names must be abbreviated to 3 letters # By convention, all Ethereum protocol names must be abbreviated to 3 letters
assert protoName.len == 3 assert protoName.len == 3
@ -624,12 +628,18 @@ macro rlpxProtocol*(protoIdentifier: untyped,
## the helpers for accessing the peer and network protocol states. ## the helpers for accessing the peer and network protocol states.
userHandlerProc.addPragma newIdentNode"async" userHandlerProc.addPragma newIdentNode"async"
# We allow the user handler to use `openarray` params, but we turn
# those into sequences to make the `async` pragma happy.
for i in 1 ..< userHandlerProc.params.len:
var param = userHandlerProc.params[i]
param[^2] = chooseFieldType(param[^2])
# Define local accessors for the peer and the network protocol states # Define local accessors for the peer and the network protocol states
# inside each user message handler proc (e.g. peer.state.foo = bar) # inside each user message handler proc (e.g. peer.state.foo = bar)
if stateType != nil: if stateType != nil:
var localStateAccessor = quote: var localStateAccessor = quote:
template state(p: `Peer`): ref `stateType` = template state(p: `Peer`): ref `stateType` =
cast[ref `stateType`](p.getState(`protocol`)) cast[ref `stateType`](`getState`(p, `protocol`))
userHandlerProc.body.insert 0, localStateAccessor userHandlerProc.body.insert 0, localStateAccessor
@ -929,6 +939,9 @@ macro rlpxProtocol*(protoIdentifier: untyped,
discard addMsgHandler(nextId, n) discard addMsgHandler(nextId, n)
inc nextId inc nextId
of nnkCommentStmt:
discard
else: else:
macros.error("illegal syntax in a RLPx protocol definition", n) macros.error("illegal syntax in a RLPx protocol definition", n)
@ -971,7 +984,7 @@ rlpxProtocol p2p, 0:
proc pong(peer: Peer) = proc pong(peer: Peer) =
discard discard
proc disconnect(peer: Peer, reason: DisconnectionReason) {.async.} = proc disconnect*(peer: Peer, reason: DisconnectionReason) {.async.} =
await peer.sendDisconnectMsg(reason) await peer.sendDisconnectMsg(reason)
# TODO: Any other clean up required? # TODO: Any other clean up required?
@ -1138,7 +1151,7 @@ const
connectLoopSleepMs = 2000 connectLoopSleepMs = 2000
proc newPeerPool*(network: EthereumNode, proc newPeerPool*(network: EthereumNode,
chainDb: AbstractChainDB, networkId: int, keyPair: KeyPair, chainDb: AbstractChainDB, networkId: uint, keyPair: KeyPair,
discovery: DiscoveryProtocol, clientId: string, discovery: DiscoveryProtocol, clientId: string,
listenPort = Port(30303), minPeers = 10): PeerPool = listenPort = Port(30303), minPeers = 10): PeerPool =
new result new result
@ -1308,7 +1321,7 @@ template addCapability*(n: var EthereumNode, Protocol: type) =
proc newEthereumNode*(keys: KeyPair, proc newEthereumNode*(keys: KeyPair,
address: Address, address: Address,
networkId: int, networkId: uint,
chain: AbstractChainDB, chain: AbstractChainDB,
clientId = clientId, clientId = clientId,
addAllCapabilities = true): EthereumNode = addAllCapabilities = true): EthereumNode =
@ -1338,17 +1351,17 @@ proc processIncoming(server: StreamServer,
$remote.remoteAddress() $remote.remoteAddress()
remote.close() remote.close()
proc startListening*(node: var EthereumNode) = proc startListening*(node: EthereumNode) =
let ta = initTAddress(node.address.ip, node.address.tcpPort) let ta = initTAddress(node.address.ip, node.address.tcpPort)
if node.listeningServer == nil: if node.listeningServer == nil:
node.listeningServer = createStreamServer(ta, processIncoming, node.listeningServer = createStreamServer(ta, processIncoming,
{ReuseAddr}, {ReuseAddr},
udata = addr(node)) udata = unsafeAddr(node))
node.listeningServer.start() node.listeningServer.start()
proc connectToNetwork*(node: var EthereumNode, proc connectToNetwork*(node: EthereumNode,
bootstrapNodes: openarray[ENode], bootstrapNodes: seq[ENode],
startListening = true) = startListening = true) {.async.} =
assert node.connectionState == ConnectionState.None assert node.connectionState == ConnectionState.None
node.connectionState = Connecting node.connectionState = Connecting
@ -1361,7 +1374,7 @@ proc connectToNetwork*(node: var EthereumNode,
node.clientId, node.address.tcpPort) node.clientId, node.address.tcpPort)
if startListening: if startListening:
node.startListening() eth_p2p.startListening(node)
node.protocolStates.newSeq(rlpxProtocols.len) node.protocolStates.newSeq(rlpxProtocols.len)
for p in node.rlpxProtocols: for p in node.rlpxProtocols:
@ -1371,8 +1384,18 @@ proc connectToNetwork*(node: var EthereumNode,
if startListening: if startListening:
node.listeningServer.start() node.listeningServer.start()
proc stopListening*(s: EthereumNode) = node.discovery.open()
s.listeningServer.stop() await node.discovery.bootstrap()
await node.peerPool.maybeConnectToMorePeers()
node.peerPool.start()
proc stopListening*(node: EthereumNode) =
node.listeningServer.stop()
iterator peers*(node: EthereumNode): Peer =
for remote, peer in node.peerPool.connectedNodes:
yield peer
when isMainModule: when isMainModule:
import rlp, strformat import rlp, strformat

View File

@ -9,7 +9,8 @@
# #
import import
rlp/types, stint, rlpx, eth_common asyncdispatch2, rlp, stint, eth_common,
../../eth_p2p
type type
NewBlockHashesAnnounce* = object NewBlockHashesAnnounce* = object
@ -36,90 +37,100 @@ const
rlpxProtocol eth, 63: rlpxProtocol eth, 63:
useRequestIds = false useRequestIds = false
type State = PeerState
proc status(peer: Peer, proc status(peer: Peer,
protocolVersion, networkId: uint, protocolVersion, networkId: uint,
totalDifficulty: Difficulty, totalDifficulty: Difficulty,
bestHash, genesisHash: KeccakHash) = bestHash, genesisHash: KeccakHash) =
# verify that the peer is on the same chain: # verify that the peer is on the same chain:
if peer.network.id != networkId or if peer.network.networkId != networkId or
peer.network.chain.genesisHash != genesisHash: peer.network.chain.genesisHash != genesisHash:
peer.disconnect() # TODO: Is there a more specific reason here?
await peer.disconnect(SubprotocolReason)
return return
p.state.reportedTotalDifficulty = totalDifficulty peer.state.reportedTotalDifficulty = totalDifficulty
proc newBlockHashes(peer: Peer, hashes: openarray[NewBlockHashesAnnounce]) = proc newBlockHashes(peer: Peer, hashes: openarray[NewBlockHashesAnnounce]) =
discard discard
proc transactions(p: Peer, transactions: openarray[Transaction]) = proc transactions(peer: Peer, transactions: openarray[Transaction]) =
discard discard
requestResponse: requestResponse:
proc getBlockHeaders(peer: Peer, request: BlocksRequest) = proc getBlockHeaders(peer: Peer, request: BlocksRequest) =
if request.maxResults > maxHeadersFetch: if request.maxResults > uint64(maxHeadersFetch):
peer.disconnect() await peer.disconnect(BreachOfProtocol)
return return
var foundBlock = peer.network.chain.locateBlock(startBlock) var chain = peer.network.chain
var foundBlock = chain.getBlockHeader(request.startBlock)
if not foundBlock.isNil: if not foundBlock.isNil:
var headers = newSeqOfCap[BlockHeader](request.maxResults) var headers = newSeqOfCap[BlockHeader](request.maxResults)
while headers.len < request.maxResults: while uint64(headers.len) < request.maxResults:
headers.add peer.network.chain.getBlockHeader(foundBlock) headers.add deref(foundBlock)
foundBlock = foundBlock.nextBlock() foundBlock = chain.getSuccessorHeader deref(foundBlock)
if foundBlock.isNil: break if foundBlock.isNil: break
discard await peer.blockHeaders(headers) await peer.blockHeaders(headers)
proc blockHeaders(p: Peer, headers: openarray[BlockHeader]) proc blockHeaders(p: Peer, headers: openarray[BlockHeader])
requestResponse: requestResponse:
proc getBlockBodies(p: Peer, hashes: openarray[KeccakHash]) = proc getBlockBodies(peer: Peer, hashes: openarray[KeccakHash]) =
if hashes.len > maxBodiesFetch: if hashes.len > maxBodiesFetch:
peer.disconnect() await peer.disconnect(BreachOfProtocol)
return return
var chain = peer.network.chain
var blockBodies = newSeqOfCap[BlockBody](hashes.len) var blockBodies = newSeqOfCap[BlockBody](hashes.len)
for hash in hashes: for hash in hashes:
let blockBody = peer.network.chain.getBlockBody(hash) let blockBody = chain.getBlockBody(hash)
if not blockBody.isNil: if not blockBody.isNil:
# TODO: should there be an else clause here.
# Is the peer responsible of figuring out that
# some blocks were not found?
blockBodies.add deref(blockBody) blockBodies.add deref(blockBody)
discard await peer.blockBodies(blockBodies) await peer.blockBodies(blockBodies)
proc blockBodies(p: Peer, blocks: openarray[BlockBody]) proc blockBodies(peer: Peer, blocks: openarray[BlockBody])
proc newBlock(p: Peer, bh: NewBlockAnnounce, totalDifficulty: Difficulty) = proc newBlock(peer: Peer, bh: NewBlockAnnounce, totalDifficulty: Difficulty) =
discard discard
nextID 13 nextID 13
requestResponse: requestResponse:
proc getNodeData(p: Peer, hashes: openarray[KeccakHash]) = proc getNodeData(peer: Peer, hashes: openarray[KeccakHash]) =
discard discard
proc nodeData(p: Peer, data: openarray[Blob]) = proc nodeData(peer: Peer, data: openarray[Blob]) =
discard discard
requestResponse: requestResponse:
proc getReceipts(p: Peer, hashes: openarray[KeccakHash]) = proc getReceipts(peer: Peer, hashes: openarray[KeccakHash]) =
discard discard
proc receipts(p: Peer, receipts: openarray[Receipt]) = proc receipts(peer: Peer, receipts: openarray[Receipt]) =
discard discard
proc fastBlockchainSync*(network: EthereumNode) {.async.} = proc fastBlockchainSync*(node: EthereumNode) {.async.} =
# 1. obtain last N block headers from all peers # 1. obtain last N block headers from all peers
var latestBlocksRequest: BlocksRequest var latestBlocksRequest: BlocksRequest
var requests = newSeqOfCap[Future[eth.blockHeaders]](32) var requests = newSeqOfCap[Future[Option[eth.blockHeaders]]](32)
for peer in network.peerPool: for peer in node.peers:
if peer.supports(eth): if peer.supports(eth):
requests.add peer.getBlockHeaders(latestBlocksRequest) requests.add peer.getBlockHeaders(latestBlocksRequest)
await all(requests) discard await all(requests)
# 2. find out what is the block with best total difficulty # 2. find out what is the block with best total difficulty
var bestBlockDifficulty: Difficulty = 0 var bestBlockDifficulty: Difficulty = 0.stuint(256)
for req in requests: for req in requests:
if req.read.isNone: continue if req.read.isNone: continue
for header in req.read.get.headers: for header in req.read.get.headers:
@ -134,7 +145,7 @@ proc fastBlockchainSync*(network: EthereumNode) {.async.} =
# missing (by requesting blocks from peers while honoring maxHeadersFetch). # missing (by requesting blocks from peers while honoring maxHeadersFetch).
# Make sure the blocks hashes add up. Don't count on everyone replying, ask # Make sure the blocks hashes add up. Don't count on everyone replying, ask
# a different peer in case of time-out. Handle invalid or incomplete replies # a different peer in case of time-out. Handle invalid or incomplete replies
# properly. The peer may response with fewer headers than requested (or with # properly. The peer may respond with fewer headers than requested (or with
# different ones if the peer is not behaving properly). # different ones if the peer is not behaving properly).
# 5. Store the obtained headers in the blockchain DB # 5. Store the obtained headers in the blockchain DB

View File

@ -9,14 +9,21 @@
# #
import import
rlp/types, rlpx, eth_common, times times,
asyncdispatch2, rlp, eth_common/eth_types,
../../eth_p2p
type type
ProofRequest* = object ProofRequest* = object
blockHash*: KeccakHash blockHash*: KeccakHash
accountKey*: Blob accountKey*: Blob
key*: Blob key*: Blob
fromLevel*: UInt256 fromLevel*: uint
HeaderProofRequest* = object
chtNumber*: uint
blockNumber*: uint
fromLevel*: uint
ContractCodeRequest* = object ContractCodeRequest* = object
blockHash*: KeccakHash blockHash*: KeccakHash
@ -47,7 +54,7 @@ type
KeyValuePair = object KeyValuePair = object
key: string key: string
value: Rlp value: Blob
const const
maxHeadersFetch = 192 maxHeadersFetch = 192
@ -122,7 +129,8 @@ rlpxProtocol les, 2:
## Header synchronisation ## Header synchronisation
## ##
proc announce(p: Peer, headHash: KeccakHash, proc announce(p: Peer,
headHash: KeccakHash,
headNumber: BlockNumber, headNumber: BlockNumber,
headTotalDifficulty: Difficulty, headTotalDifficulty: Difficulty,
reorgDepth: BlockNumber, reorgDepth: BlockNumber,
@ -134,7 +142,7 @@ rlpxProtocol les, 2:
proc getBlockHeaders(p: Peer, BV: uint, req: BlocksRequest) = proc getBlockHeaders(p: Peer, BV: uint, req: BlocksRequest) =
discard discard
proc blockHeaders(p: Peer, BV: uint, blocks: openarray[BlockHeaders]) = proc blockHeaders(p: Peer, BV: uint, blocks: openarray[BlockHeader]) =
discard discard
## On-damand data retrieval ## On-damand data retrieval
@ -171,7 +179,7 @@ rlpxProtocol les, 2:
nextID 15 nextID 15
requestResponse: requestResponse:
proc getHeaderProofs(p: Peer, requests: openarray[HeaderProofRequest]) = proc getHeaderProofs(p: Peer, requests: openarray[ProofRequest]) =
discard discard
proc headerProof(p: Peer, BV: uint, proofs: openarray[Blob]) = proc headerProof(p: Peer, BV: uint, proofs: openarray[Blob]) =