From 41e92ca8df039da2e8e8770bb60aa41bb483108a Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Mon, 23 Jul 2018 00:32:45 +0300 Subject: [PATCH] The ETH and LES protocols now compile as part of nimbus --- eth_p2p.nim | 67 +++++++++++++++++++++++----------- eth_p2p/rlpx_protocols/eth.nim | 67 ++++++++++++++++++++-------------- eth_p2p/rlpx_protocols/les.nim | 20 +++++++--- 3 files changed, 98 insertions(+), 56 deletions(-) diff --git a/eth_p2p.nim b/eth_p2p.nim index 4db9453..5d6a674 100644 --- a/eth_p2p.nim +++ b/eth_p2p.nim @@ -16,28 +16,28 @@ import eth_p2p/[kademlia, discovery, auth, rlpxcrypt, enode] export - enode, kademlia + enode, kademlia, options type EthereumNode* = ref object - networkId*: int + networkId*: uint chain*: AbstractChainDB clientId*: string - connectionState: ConnectionState - keys: KeyPair - address: Address + connectionState*: ConnectionState + keys*: KeyPair + address*: Address rlpxCapabilities: seq[Capability] rlpxProtocols: seq[ProtocolInfo] listeningServer: StreamServer protocolStates: seq[RootRef] discovery: DiscoveryProtocol - peerPool: PeerPool + peerPool*: PeerPool Peer* = ref object transp: StreamTransport dispatcher: Dispatcher nextReqId: int - network: EthereumNode + network*: EthereumNode secretsState: SecretState connectionState: ConnectionState remote*: Node @@ -53,7 +53,7 @@ type PeerPool* = ref object network: EthereumNode keyPair: KeyPair - networkId: int + networkId: uint minPeers: int clientId: string discovery: DiscoveryProtocol @@ -265,7 +265,10 @@ proc cmp*(lhs, rhs: ProtocolInfo): int {.inline.} = return 0 proc messagePrinter[MsgType](msg: pointer): string = - result = $(cast[ptr MsgType](msg)[]) + result = "" + # TODO: uncommenting the line below increases the compile-time + # tremendously (for reasons not yet known) + # result = $(cast[ptr MsgType](msg)[]) proc nextMsgResolver[MsgType](msgData: Rlp, future: FutureBase) = var reader = msgData @@ -343,7 +346,7 @@ proc sendMsg(p: Peer, data: BytesRange) {.async.} = var cipherText = encryptMsg(data, p.secretsState) discard await p.transp.write(cipherText) -proc registerRequest(peer: Peer, +proc registerRequest*(peer: Peer, timeout: int, responseFuture: FutureBase, responseMsgId: int): int = @@ -615,6 +618,7 @@ macro rlpxProtocol*(protoIdentifier: untyped, sendMsg = bindSym "sendMsg" startList = bindSym "startList" writeMsgId = bindSym "writeMsgId" + getState = bindSym "getState" # By convention, all Ethereum protocol names must be abbreviated to 3 letters assert protoName.len == 3 @@ -624,12 +628,18 @@ macro rlpxProtocol*(protoIdentifier: untyped, ## the helpers for accessing the peer and network protocol states. userHandlerProc.addPragma newIdentNode"async" + # We allow the user handler to use `openarray` params, but we turn + # those into sequences to make the `async` pragma happy. + for i in 1 ..< userHandlerProc.params.len: + var param = userHandlerProc.params[i] + param[^2] = chooseFieldType(param[^2]) + # Define local accessors for the peer and the network protocol states # inside each user message handler proc (e.g. peer.state.foo = bar) if stateType != nil: var localStateAccessor = quote: template state(p: `Peer`): ref `stateType` = - cast[ref `stateType`](p.getState(`protocol`)) + cast[ref `stateType`](`getState`(p, `protocol`)) userHandlerProc.body.insert 0, localStateAccessor @@ -929,6 +939,9 @@ macro rlpxProtocol*(protoIdentifier: untyped, discard addMsgHandler(nextId, n) inc nextId + of nnkCommentStmt: + discard + else: macros.error("illegal syntax in a RLPx protocol definition", n) @@ -971,7 +984,7 @@ rlpxProtocol p2p, 0: proc pong(peer: Peer) = discard -proc disconnect(peer: Peer, reason: DisconnectionReason) {.async.} = +proc disconnect*(peer: Peer, reason: DisconnectionReason) {.async.} = await peer.sendDisconnectMsg(reason) # TODO: Any other clean up required? @@ -1138,7 +1151,7 @@ const connectLoopSleepMs = 2000 proc newPeerPool*(network: EthereumNode, - chainDb: AbstractChainDB, networkId: int, keyPair: KeyPair, + chainDb: AbstractChainDB, networkId: uint, keyPair: KeyPair, discovery: DiscoveryProtocol, clientId: string, listenPort = Port(30303), minPeers = 10): PeerPool = new result @@ -1308,7 +1321,7 @@ template addCapability*(n: var EthereumNode, Protocol: type) = proc newEthereumNode*(keys: KeyPair, address: Address, - networkId: int, + networkId: uint, chain: AbstractChainDB, clientId = clientId, addAllCapabilities = true): EthereumNode = @@ -1338,17 +1351,17 @@ proc processIncoming(server: StreamServer, $remote.remoteAddress() remote.close() -proc startListening*(node: var EthereumNode) = +proc startListening*(node: EthereumNode) = let ta = initTAddress(node.address.ip, node.address.tcpPort) if node.listeningServer == nil: node.listeningServer = createStreamServer(ta, processIncoming, {ReuseAddr}, - udata = addr(node)) + udata = unsafeAddr(node)) node.listeningServer.start() -proc connectToNetwork*(node: var EthereumNode, - bootstrapNodes: openarray[ENode], - startListening = true) = +proc connectToNetwork*(node: EthereumNode, + bootstrapNodes: seq[ENode], + startListening = true) {.async.} = assert node.connectionState == ConnectionState.None node.connectionState = Connecting @@ -1361,7 +1374,7 @@ proc connectToNetwork*(node: var EthereumNode, node.clientId, node.address.tcpPort) if startListening: - node.startListening() + eth_p2p.startListening(node) node.protocolStates.newSeq(rlpxProtocols.len) for p in node.rlpxProtocols: @@ -1371,8 +1384,18 @@ proc connectToNetwork*(node: var EthereumNode, if startListening: node.listeningServer.start() -proc stopListening*(s: EthereumNode) = - s.listeningServer.stop() + node.discovery.open() + await node.discovery.bootstrap() + await node.peerPool.maybeConnectToMorePeers() + + node.peerPool.start() + +proc stopListening*(node: EthereumNode) = + node.listeningServer.stop() + +iterator peers*(node: EthereumNode): Peer = + for remote, peer in node.peerPool.connectedNodes: + yield peer when isMainModule: import rlp, strformat diff --git a/eth_p2p/rlpx_protocols/eth.nim b/eth_p2p/rlpx_protocols/eth.nim index 62f8a10..5ba2857 100644 --- a/eth_p2p/rlpx_protocols/eth.nim +++ b/eth_p2p/rlpx_protocols/eth.nim @@ -9,7 +9,8 @@ # import - rlp/types, stint, rlpx, eth_common + asyncdispatch2, rlp, stint, eth_common, + ../../eth_p2p type NewBlockHashesAnnounce* = object @@ -36,90 +37,100 @@ const rlpxProtocol eth, 63: useRequestIds = false + type State = PeerState + proc status(peer: Peer, protocolVersion, networkId: uint, totalDifficulty: Difficulty, bestHash, genesisHash: KeccakHash) = # verify that the peer is on the same chain: - if peer.network.id != networkId or + if peer.network.networkId != networkId or peer.network.chain.genesisHash != genesisHash: - peer.disconnect() + # TODO: Is there a more specific reason here? + await peer.disconnect(SubprotocolReason) return - p.state.reportedTotalDifficulty = totalDifficulty + peer.state.reportedTotalDifficulty = totalDifficulty proc newBlockHashes(peer: Peer, hashes: openarray[NewBlockHashesAnnounce]) = discard - proc transactions(p: Peer, transactions: openarray[Transaction]) = + proc transactions(peer: Peer, transactions: openarray[Transaction]) = discard requestResponse: proc getBlockHeaders(peer: Peer, request: BlocksRequest) = - if request.maxResults > maxHeadersFetch: - peer.disconnect() + if request.maxResults > uint64(maxHeadersFetch): + await peer.disconnect(BreachOfProtocol) return - var foundBlock = peer.network.chain.locateBlock(startBlock) + var chain = peer.network.chain + + var foundBlock = chain.getBlockHeader(request.startBlock) if not foundBlock.isNil: var headers = newSeqOfCap[BlockHeader](request.maxResults) - while headers.len < request.maxResults: - headers.add peer.network.chain.getBlockHeader(foundBlock) - foundBlock = foundBlock.nextBlock() + while uint64(headers.len) < request.maxResults: + headers.add deref(foundBlock) + foundBlock = chain.getSuccessorHeader deref(foundBlock) if foundBlock.isNil: break - discard await peer.blockHeaders(headers) + await peer.blockHeaders(headers) proc blockHeaders(p: Peer, headers: openarray[BlockHeader]) requestResponse: - proc getBlockBodies(p: Peer, hashes: openarray[KeccakHash]) = + proc getBlockBodies(peer: Peer, hashes: openarray[KeccakHash]) = if hashes.len > maxBodiesFetch: - peer.disconnect() + await peer.disconnect(BreachOfProtocol) return + var chain = peer.network.chain + var blockBodies = newSeqOfCap[BlockBody](hashes.len) for hash in hashes: - let blockBody = peer.network.chain.getBlockBody(hash) + let blockBody = chain.getBlockBody(hash) if not blockBody.isNil: + # TODO: should there be an else clause here. + # Is the peer responsible of figuring out that + # some blocks were not found? blockBodies.add deref(blockBody) - discard await peer.blockBodies(blockBodies) + await peer.blockBodies(blockBodies) - proc blockBodies(p: Peer, blocks: openarray[BlockBody]) + proc blockBodies(peer: Peer, blocks: openarray[BlockBody]) - proc newBlock(p: Peer, bh: NewBlockAnnounce, totalDifficulty: Difficulty) = + proc newBlock(peer: Peer, bh: NewBlockAnnounce, totalDifficulty: Difficulty) = discard nextID 13 requestResponse: - proc getNodeData(p: Peer, hashes: openarray[KeccakHash]) = + proc getNodeData(peer: Peer, hashes: openarray[KeccakHash]) = discard - proc nodeData(p: Peer, data: openarray[Blob]) = + proc nodeData(peer: Peer, data: openarray[Blob]) = discard requestResponse: - proc getReceipts(p: Peer, hashes: openarray[KeccakHash]) = + proc getReceipts(peer: Peer, hashes: openarray[KeccakHash]) = discard - proc receipts(p: Peer, receipts: openarray[Receipt]) = + proc receipts(peer: Peer, receipts: openarray[Receipt]) = discard -proc fastBlockchainSync*(network: EthereumNode) {.async.} = +proc fastBlockchainSync*(node: EthereumNode) {.async.} = # 1. obtain last N block headers from all peers var latestBlocksRequest: BlocksRequest - var requests = newSeqOfCap[Future[eth.blockHeaders]](32) - for peer in network.peerPool: + var requests = newSeqOfCap[Future[Option[eth.blockHeaders]]](32) + for peer in node.peers: if peer.supports(eth): requests.add peer.getBlockHeaders(latestBlocksRequest) - await all(requests) + discard await all(requests) # 2. find out what is the block with best total difficulty - var bestBlockDifficulty: Difficulty = 0 + var bestBlockDifficulty: Difficulty = 0.stuint(256) for req in requests: if req.read.isNone: continue for header in req.read.get.headers: @@ -134,7 +145,7 @@ proc fastBlockchainSync*(network: EthereumNode) {.async.} = # missing (by requesting blocks from peers while honoring maxHeadersFetch). # Make sure the blocks hashes add up. Don't count on everyone replying, ask # a different peer in case of time-out. Handle invalid or incomplete replies - # properly. The peer may response with fewer headers than requested (or with + # properly. The peer may respond with fewer headers than requested (or with # different ones if the peer is not behaving properly). # 5. Store the obtained headers in the blockchain DB diff --git a/eth_p2p/rlpx_protocols/les.nim b/eth_p2p/rlpx_protocols/les.nim index 4b26744..c7663fd 100644 --- a/eth_p2p/rlpx_protocols/les.nim +++ b/eth_p2p/rlpx_protocols/les.nim @@ -9,14 +9,21 @@ # import - rlp/types, rlpx, eth_common, times + times, + asyncdispatch2, rlp, eth_common/eth_types, + ../../eth_p2p type ProofRequest* = object blockHash*: KeccakHash accountKey*: Blob key*: Blob - fromLevel*: UInt256 + fromLevel*: uint + + HeaderProofRequest* = object + chtNumber*: uint + blockNumber*: uint + fromLevel*: uint ContractCodeRequest* = object blockHash*: KeccakHash @@ -47,7 +54,7 @@ type KeyValuePair = object key: string - value: Rlp + value: Blob const maxHeadersFetch = 192 @@ -122,7 +129,8 @@ rlpxProtocol les, 2: ## Header synchronisation ## - proc announce(p: Peer, headHash: KeccakHash, + proc announce(p: Peer, + headHash: KeccakHash, headNumber: BlockNumber, headTotalDifficulty: Difficulty, reorgDepth: BlockNumber, @@ -134,7 +142,7 @@ rlpxProtocol les, 2: proc getBlockHeaders(p: Peer, BV: uint, req: BlocksRequest) = discard - proc blockHeaders(p: Peer, BV: uint, blocks: openarray[BlockHeaders]) = + proc blockHeaders(p: Peer, BV: uint, blocks: openarray[BlockHeader]) = discard ## On-damand data retrieval @@ -171,7 +179,7 @@ rlpxProtocol les, 2: nextID 15 requestResponse: - proc getHeaderProofs(p: Peer, requests: openarray[HeaderProofRequest]) = + proc getHeaderProofs(p: Peer, requests: openarray[ProofRequest]) = discard proc headerProof(p: Peer, BV: uint, proofs: openarray[Blob]) =