From 72016046fbb35c82df9776a176564e70103e2c33 Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Fri, 6 Jul 2018 15:25:21 +0300 Subject: [PATCH] Merge peer_pool and server into rlpx This was done because a cycle was formed between the structures of the three modules: - A Peer holds a reference to its Network - The Network holds a reference to its PeerPool - The PeerPool keeps a table of connected Peers I could have resolved the issue by introducing a new types module, but it would have required all of the currently private fields to become public (due to lack of package-level visibility in Nim). Instead I decided to merge the modules because they were relatively small anyway. Please note that the former `P2PServer` type is now called `NetworkConnection`. There are slight changes in the APIs that will be carried out in Nimbus when merging this. --- eth_p2p/ethereum_types.nim | 104 -------------- eth_p2p/peer_pool.nim | 191 ------------------------- eth_p2p/rlpx.nim | 251 ++++++++++++++++++++++++++++++--- eth_p2p/rlpx_protocols/eth.nim | 2 +- eth_p2p/rlpx_protocols/les.nim | 2 +- eth_p2p/server.nim | 60 -------- tests/tdiscovery.nim | 2 +- tests/tserver.nim | 5 +- 8 files changed, 239 insertions(+), 378 deletions(-) delete mode 100644 eth_p2p/ethereum_types.nim delete mode 100644 eth_p2p/peer_pool.nim delete mode 100644 eth_p2p/server.nim diff --git a/eth_p2p/ethereum_types.nim b/eth_p2p/ethereum_types.nim deleted file mode 100644 index 1b5b234..0000000 --- a/eth_p2p/ethereum_types.nim +++ /dev/null @@ -1,104 +0,0 @@ -# -# Ethereum P2P -# (c) Copyright 2018 -# Status Research & Development GmbH -# -# Licensed under either of -# Apache License, version 2.0, (LICENSE-APACHEv2) -# MIT license (LICENSE-MIT) -# - -import rlp/types, nimcrypto/hash, stint - -export - MDigest - -type - # XXX: Some of the UInt256 fields may be unnecessarily large - - P* = UInt256 - - KeccakHash* = MDigest[256] - KeyValuePair* = (BytesRange, BytesRange) - - BlockNonce* = UInt256 - Blob* = seq[byte] - - BloomFilter* = distinct KeccakHash - EthAddress* = distinct MDigest[160] - - Transaction* = object - accountNonce*: uint64 - gasPrice*: UInt256 - gasLimit*: uint64 - to*: EthAddress - value*: UInt256 - payload*: Blob - V*, R*, S*: UInt256 - - AccessList* = object - # XXX: Specify the structure of this - - BlockHeader* = object - parentHash*: KeccakHash - uncleHash*: KeccakHash - coinbase*: EthAddress - stateRoot*: KeccakHash - txRoot*: KeccakHash - receiptRoot*: KeccakHash - bloom*: BloomFilter - difficulty*: UInt256 - blockNumber*: uint - gasLimit*: uint64 - gasUsed*: uint64 - timestamp*: uint64 - extraData*: Blob - mixDigest*: KeccakHash - nonce*: BlockNonce - - BlockBody* = object - transactions*: seq[Transaction] - uncles*: seq[BlockHeader] - - Log* = object - address*: EthAddress - topics*: seq[int32] - data*: Blob - - Receipt* = object - stateRoot*: Blob - gasUsed*: uint64 - bloom*: BloomFilter - logs*: seq[Log] - - ShardTransaction* = object - chain*: uint - shard*: uint - to*: EthAddress - data*: Blob - gas*: uint64 - acceesList*: AccessList - code*: Blob - salt*: KeccakHash - - CollationHeader* = object - shard*: uint - expectedPeriod*: uint - periodStartPrevHash*: KeccakHash - parentHash*: KeccakHash - txRoot*: KeccakHash - coinbase*: EthAddress - stateRoot*: KeccakHash - receiptRoot*: KeccakHash - blockNumber*: uint - - HashOrNum* = object - case isHash*: bool - of true: - hash*: KeccakHash - else: - number*: uint - - BlocksRequest* = object - startBlock*: HashOrNum - maxResults*, skip*, reverse*: uint diff --git a/eth_p2p/peer_pool.nim b/eth_p2p/peer_pool.nim deleted file mode 100644 index 7b6bbdf..0000000 --- a/eth_p2p/peer_pool.nim +++ /dev/null @@ -1,191 +0,0 @@ -# -# Ethereum P2P -# (c) Copyright 2018 -# Status Research & Development GmbH -# -# Licensed under either of -# Apache License, version 2.0, (LICENSE-APACHEv2) -# MIT license (LICENSE-MIT) -# - -import logging, tables, times, random -import eth_keys, asyncdispatch2 -import discovery, rlpx, kademlia - -type - PeerPool* = ref object - keyPair: KeyPair - networkId: int - minPeers: int - clientId: string - discovery: DiscoveryProtocol - lastLookupTime: float - connectedNodes: Table[Node, Peer] - running: bool - listenPort*: Port - - AsyncChainDb* = ref object # TODO: This should be defined elsewhere - -# class PeerPool: -# PeerPool attempts to keep connections to at least min_peers on the given network. - -const - lookupInterval = 5 - connectLoopSleepMs = 2000 - -proc newPeerPool*(chainDb: AsyncChainDb, networkId: int, keyPair: KeyPair, - discovery: DiscoveryProtocol, clientId: string, - listenPort = Port(30303), minPeers = 10): PeerPool = - result.new() - result.keyPair = keyPair - result.minPeers = minPeers - result.networkId = networkId - result.discovery = discovery - result.connectedNodes = initTable[Node, Peer]() - result.listenPort = listenPort - -template ensureFuture(f: untyped) = asyncCheck f - -proc nodesToConnect(p: PeerPool): seq[Node] {.inline.} = - p.discovery.randomNodes(p.minPeers) - -# def subscribe(self, subscriber: PeerPoolSubscriber) -> None: -# self._subscribers.append(subscriber) -# for peer in self.connected_nodes.values(): -# subscriber.register_peer(peer) - -# def unsubscribe(self, subscriber: PeerPoolSubscriber) -> None: -# if subscriber in self._subscribers: -# self._subscribers.remove(subscriber) - -proc stopAllPeers(p: PeerPool) {.async.} = - info "Stopping all peers ..." - # TODO: ... - # await asyncio.gather( - # *[peer.stop() for peer in self.connected_nodes.values()]) - -# async def stop(self) -> None: -# self.cancel_token.trigger() -# await self.stop_all_peers() - -proc connect(p: PeerPool, remote: Node): Future[Peer] {.async.} = - ## Connect to the given remote and return a Peer instance when successful. - ## Returns nil if the remote is unreachable, times out or is useless. - if remote in p.connectedNodes: - debug "Skipping ", remote, "; already connected to it" - return nil - - result = await remote.rlpxConnect(p.keyPair, p.listenPort, p.clientId) - - # expected_exceptions = ( - # UnreachablePeer, TimeoutError, PeerConnectionLost, HandshakeFailure) - # try: - # self.logger.debug("Connecting to %s...", remote) - # peer = await wait_with_token( - # handshake(remote, self.privkey, self.peer_class, self.chaindb, self.network_id), - # token=self.cancel_token, - # timeout=HANDSHAKE_TIMEOUT) - # return peer - # except OperationCancelled: - # # Pass it on to instruct our main loop to stop. - # raise - # except expected_exceptions as e: - # self.logger.debug("Could not complete handshake with %s: %s", remote, repr(e)) - # except Exception: - # self.logger.exception("Unexpected error during auth/p2p handshake with %s", remote) - # return None - -proc lookupRandomNode(p: PeerPool) {.async.} = - # This method runs in the background, so we must catch OperationCancelled - # ere otherwise asyncio will warn that its exception was never retrieved. - try: - discard await p.discovery.lookupRandom() - except: # OperationCancelled - discard - p.lastLookupTime = epochTime() - -proc getRandomBootnode(p: PeerPool): seq[Node] = - @[p.discovery.bootstrapNodes.rand()] - -proc peerFinished(p: PeerPool, peer: Peer) = - ## Remove the given peer from our list of connected nodes. - ## This is passed as a callback to be called when a peer finishes. - p.connectedNodes.del(peer.remote) - -proc run(p: Peer, completionHandler: proc() = nil) {.async.} = - # TODO: This is a stub that should be implemented in rlpx.nim - await sleepAsync(20000) # sleep 20 sec - if not completionHandler.isNil: completionHandler() - -proc connectToNodes(p: PeerPool, nodes: seq[Node]) {.async.} = - for node in nodes: - # TODO: Consider changing connect() to raise an exception instead of - # returning None, as discussed in - # https://github.com/ethereum/py-evm/pull/139#discussion_r152067425 - let peer = await p.connect(node) - if not peer.isNil: - info "Successfully connected to ", peer - ensureFuture peer.run() do(): - p.peerFinished(peer) - - p.connectedNodes[peer.remote] = peer - # for subscriber in self._subscribers: - # subscriber.register_peer(peer) - if p.connectedNodes.len >= p.minPeers: - return - -proc maybeConnectToMorePeers(p: PeerPool) {.async.} = - ## Connect to more peers if we're not yet connected to at least self.minPeers. - if p.connectedNodes.len >= p.minPeers: - debug "Already connected to enough peers: ", p.connectedNodes, "; sleeping" - return - - if p.lastLookupTime + lookupInterval < epochTime(): - ensureFuture p.lookupRandomNode() - - await p.connectToNodes(p.nodesToConnect()) - - # In some cases (e.g ROPSTEN or private testnets), the discovery table might - # be full of bad peers, so if we can't connect to any peers we try a random - # bootstrap node as well. - if p.connectedNodes.len == 0: - await p.connectToNodes(p.getRandomBootnode()) - -proc run(p: PeerPool) {.async.} = - info "Running PeerPool..." - p.running = true - while p.running: - var dropConnections = false - try: - await p.maybeConnectToMorePeers() - except: - # Most unexpected errors should be transient, so we log and restart from - # scratch. - error "Unexpected error, restarting" - dropConnections = true - - if dropConnections: - await p.stopAllPeers() - - await sleepAsync(connectLoopSleepMs) - -proc start*(p: PeerPool) = - if not p.running: - asyncCheck p.run() - - -# @property -# def peers(self) -> List[BasePeer]: -# peers = list(self.connected_nodes.values()) -# # Shuffle the list of peers so that dumb callsites are less likely to send -# # all requests to -# # a single peer even if they always pick the first one from the list. -# random.shuffle(peers) -# return peers - -# async def get_random_peer(self) -> BasePeer: -# while not self.peers: -# self.logger.debug("No connected peers, sleeping a bit") -# await asyncio.sleep(0.5) -# return random.choice(self.peers) - diff --git a/eth_p2p/rlpx.nim b/eth_p2p/rlpx.nim index 81fe428..1963ce8 100644 --- a/eth_p2p/rlpx.nim +++ b/eth_p2p/rlpx.nim @@ -8,33 +8,52 @@ # MIT license (LICENSE-MIT) # -import macros, sets, algorithm, logging, hashes -import rlp, ranges/[stackarrays, ptr_arith], eth_keys, ethereum_types, - nimcrypto, asyncdispatch2 -import kademlia, discovery, auth, rlpxcrypt, enode +import + tables, macros, sets, algorithm, logging, hashes, times, random, + rlp, ranges/[stackarrays, ptr_arith], nimcrypto, asyncdispatch2, + eth_keys, eth_common, + kademlia, discovery, auth, rlpxcrypt, enode type - ConnectionState = enum + ConnectionState* = enum None, Connected, Disconnecting, Disconnected - Network* = ref object + NetworkConnection* = ref object id: int + listeningServer: StreamServer protocolStates: seq[RootRef] + chainDb: AbstractChainDB + keyPair: KeyPair + address: Address + clientId: string + discovery: DiscoveryProtocol + peerPool: PeerPool Peer* = ref object transp: StreamTransport dispatcher: Dispatcher networkId: int nextRequestId: int - network: Network + network: NetworkConnection secretsState: SecretState connectionState: ConnectionState protocolStates: seq[RootRef] remote*: Node + PeerPool* = ref object + keyPair: KeyPair + networkId: int + minPeers: int + clientId: string + discovery: DiscoveryProtocol + lastLookupTime: float + connectedNodes: Table[Node, Peer] + running: bool + listenPort*: Port + MessageHandler* = proc(x: Peer, data: Rlp): Future[void] MessageInfo* = object @@ -71,7 +90,7 @@ type protocolOffsets: seq[int] thunks: seq[MessageHandler] - RlpxMessageKind = enum + RlpxMessageKind* = enum rlpxNotification, rlpxRequest, rlpxResponse @@ -193,15 +212,6 @@ proc registerProtocol(protocol: ProtocolInfo) = else: devp2p = protocol -# RLP serialization -# - -proc append*(rlpWriter: var RlpWriter, hash: KeccakHash) = - rlpWriter.append(hash.data) - -proc read*(rlp: var Rlp, T: typedesc[KeccakHash]): T = - result.data = rlp.read(type(result.data)) - # Message composition and encryption # @@ -796,6 +806,213 @@ proc rlpxAccept*(transp: StreamTransport, myKeys: KeyPair, except: transp.close() +# PeerPool attempts to keep connections to at least min_peers +# on the given network. + +const + lookupInterval = 5 + connectLoopSleepMs = 2000 + +proc newPeerPool*(chainDb: AbstractChainDB, networkId: int, keyPair: KeyPair, + discovery: DiscoveryProtocol, clientId: string, + listenPort = Port(30303), minPeers = 10): PeerPool = + result.new() + result.keyPair = keyPair + result.minPeers = minPeers + result.networkId = networkId + result.discovery = discovery + result.connectedNodes = initTable[Node, Peer]() + result.listenPort = listenPort + +template ensureFuture(f: untyped) = asyncCheck f + +proc nodesToConnect(p: PeerPool): seq[Node] {.inline.} = + p.discovery.randomNodes(p.minPeers) + +# def subscribe(self, subscriber: PeerPoolSubscriber) -> None: +# self._subscribers.append(subscriber) +# for peer in self.connected_nodes.values(): +# subscriber.register_peer(peer) + +# def unsubscribe(self, subscriber: PeerPoolSubscriber) -> None: +# if subscriber in self._subscribers: +# self._subscribers.remove(subscriber) + +proc stopAllPeers(p: PeerPool) {.async.} = + info "Stopping all peers ..." + # TODO: ... + # await asyncio.gather( + # *[peer.stop() for peer in self.connected_nodes.values()]) + +# async def stop(self) -> None: +# self.cancel_token.trigger() +# await self.stop_all_peers() + +proc connect(p: PeerPool, remote: Node): Future[Peer] {.async.} = + ## Connect to the given remote and return a Peer instance when successful. + ## Returns nil if the remote is unreachable, times out or is useless. + if remote in p.connectedNodes: + debug "Skipping ", remote, "; already connected to it" + return nil + + result = await remote.rlpxConnect(p.keyPair, p.listenPort, p.clientId) + + # expected_exceptions = ( + # UnreachablePeer, TimeoutError, PeerConnectionLost, HandshakeFailure) + # try: + # self.logger.debug("Connecting to %s...", remote) + # peer = await wait_with_token( + # handshake(remote, self.privkey, self.peer_class, self.chaindb, self.network_id), + # token=self.cancel_token, + # timeout=HANDSHAKE_TIMEOUT) + # return peer + # except OperationCancelled: + # # Pass it on to instruct our main loop to stop. + # raise + # except expected_exceptions as e: + # self.logger.debug("Could not complete handshake with %s: %s", remote, repr(e)) + # except Exception: + # self.logger.exception("Unexpected error during auth/p2p handshake with %s", remote) + # return None + +proc lookupRandomNode(p: PeerPool) {.async.} = + # This method runs in the background, so we must catch OperationCancelled + # ere otherwise asyncio will warn that its exception was never retrieved. + try: + discard await p.discovery.lookupRandom() + except: # OperationCancelled + discard + p.lastLookupTime = epochTime() + +proc getRandomBootnode(p: PeerPool): seq[Node] = + @[p.discovery.bootstrapNodes.rand()] + +proc peerFinished(p: PeerPool, peer: Peer) = + ## Remove the given peer from our list of connected nodes. + ## This is passed as a callback to be called when a peer finishes. + p.connectedNodes.del(peer.remote) + +proc run(p: Peer, completionHandler: proc() = nil) {.async.} = + # TODO: This is a stub that should be implemented in rlpx.nim + await sleepAsync(20000) # sleep 20 sec + if not completionHandler.isNil: completionHandler() + +proc connectToNodes(p: PeerPool, nodes: seq[Node]) {.async.} = + for node in nodes: + # TODO: Consider changing connect() to raise an exception instead of + # returning None, as discussed in + # https://github.com/ethereum/py-evm/pull/139#discussion_r152067425 + let peer = await p.connect(node) + if not peer.isNil: + info "Successfully connected to ", peer + ensureFuture peer.run() do(): + p.peerFinished(peer) + + p.connectedNodes[peer.remote] = peer + # for subscriber in self._subscribers: + # subscriber.register_peer(peer) + if p.connectedNodes.len >= p.minPeers: + return + +proc maybeConnectToMorePeers(p: PeerPool) {.async.} = + ## Connect to more peers if we're not yet connected to at least self.minPeers. + if p.connectedNodes.len >= p.minPeers: + debug "Already connected to enough peers: ", p.connectedNodes, "; sleeping" + return + + if p.lastLookupTime + lookupInterval < epochTime(): + ensureFuture p.lookupRandomNode() + + await p.connectToNodes(p.nodesToConnect()) + + # In some cases (e.g ROPSTEN or private testnets), the discovery table might + # be full of bad peers, so if we can't connect to any peers we try a random + # bootstrap node as well. + if p.connectedNodes.len == 0: + await p.connectToNodes(p.getRandomBootnode()) + +proc run(p: PeerPool) {.async.} = + info "Running PeerPool..." + p.running = true + while p.running: + var dropConnections = false + try: + await p.maybeConnectToMorePeers() + except: + # Most unexpected errors should be transient, so we log and restart from + # scratch. + error "Unexpected error, restarting" + dropConnections = true + + if dropConnections: + await p.stopAllPeers() + + await sleepAsync(connectLoopSleepMs) + +proc start*(p: PeerPool) = + if not p.running: + asyncCheck p.run() + +# @property +# def peers(self) -> List[BasePeer]: +# peers = list(self.connected_nodes.values()) +# # Shuffle the list of peers so that dumb callsites are less likely to send +# # all requests to +# # a single peer even if they always pick the first one from the list. +# random.shuffle(peers) +# return peers + +# async def get_random_peer(self) -> BasePeer: +# while not self.peers: +# self.logger.debug("No connected peers, sleeping a bit") +# await asyncio.sleep(0.5) +# return random.choice(self.peers) + +proc processIncoming(server: StreamServer, + remote: StreamTransport): Future[void] {.async, gcsafe.} = + var p2p = getUserData[NetworkConnection](server) + let peerfut = remote.rlpxAccept(p2p.keyPair, p2p.clientId) + yield peerfut + if not peerfut.failed: + let peer = peerfut.read() + echo "TODO: Add peer to the pool..." + else: + echo "Could not establish connection with incoming peer ", + $remote.remoteAddress() + remote.close() + +proc connectToNetwork*(keyPair: KeyPair, + address: Address, + chainDb: AbstractChainDB, + bootstrapNodes: openarray[ENode], + clientId: string, + networkId: int, + startListening = true): NetworkConnection = + new result + result.id = networkId + result.chainDb = chainDb + result.keyPair = keyPair + result.address = address + result.clientId = clientId + result.discovery = newDiscoveryProtocol(keyPair.seckey, address, + bootstrapNodes) + result.peerPool = newPeerPool(chainDb, networkId, keyPair, result.discovery, + clientId, address.tcpPort) + + let ta = initTAddress(address.ip, address.tcpPort) + result.listeningServer = createStreamServer(ta, processIncoming, + {ReuseAddr}, + udata = result) + + if startListening: + result.listeningServer.start() + +proc startListening*(s: NetworkConnection) = + s.listeningServer.start() + +proc stopListening*(s: NetworkConnection) = + s.listeningServer.stop() + when isMainModule: import rlp diff --git a/eth_p2p/rlpx_protocols/eth.nim b/eth_p2p/rlpx_protocols/eth.nim index 67451c0..401ef56 100644 --- a/eth_p2p/rlpx_protocols/eth.nim +++ b/eth_p2p/rlpx_protocols/eth.nim @@ -9,7 +9,7 @@ # import - rlp/types, stint, rlpx, ethereum_types + rlp/types, stint, rlpx, eth_common type P = UInt256 diff --git a/eth_p2p/rlpx_protocols/les.nim b/eth_p2p/rlpx_protocols/les.nim index 9ad398b..2466ecf 100644 --- a/eth_p2p/rlpx_protocols/les.nim +++ b/eth_p2p/rlpx_protocols/les.nim @@ -9,7 +9,7 @@ # import - rlp/types, rlpx, ethereum_types + rlp/types, rlpx, eth_common type ProofRequest* = object diff --git a/eth_p2p/server.nim b/eth_p2p/server.nim deleted file mode 100644 index 01e37f6..0000000 --- a/eth_p2p/server.nim +++ /dev/null @@ -1,60 +0,0 @@ -# -# Ethereum P2P -# (c) Copyright 2018 -# Status Research & Development GmbH -# -# Licensed under either of -# Apache License, version 2.0, (LICENSE-APACHEv2) -# MIT license (LICENSE-MIT) -# - -import asyncdispatch2, eth_keys -import peer_pool, discovery, enode, auth, rlpx - -type - P2PServer* = ref object - server: StreamServer - chainDb: AsyncChainDb - keyPair: KeyPair - address: Address - networkId: int - clientId: string - discovery: DiscoveryProtocol - peerPool: PeerPool - -proc processIncoming(server: StreamServer, - remote: StreamTransport): Future[void] {.async, gcsafe.} = - var p2p = getUserData[P2PServer](server) - let peerfut = remote.rlpxAccept(p2p.keyPair, p2p.clientId) - yield peerfut - if not peerfut.failed: - let peer = peerfut.read() - echo "TODO: Add peer to the pool..." - else: - echo "Could not establish connection with incoming peer ", - $remote.remoteAddress() - remote.close() - -proc newP2PServer*(keyPair: KeyPair, address: Address, chainDb: AsyncChainDB, - bootstrapNodes: openarray[ENode], clientId: string, - networkId: int): P2PServer = - result.new() - result.chainDb = chainDb - result.keyPair = keyPair - result.address = address - result.clientId = clientId - result.networkId = networkId - result.discovery = newDiscoveryProtocol(keyPair.seckey, address, - bootstrapNodes) - result.peerPool = newPeerPool(chainDb, networkId, keyPair, result.discovery, - clientId, address.tcpPort) - - let ta = initTAddress(address.ip, address.tcpPort) - result.server = createStreamServer(ta, processIncoming, {ReuseAddr}, - udata = result) - -proc start*(s: P2PServer) = - s.server.start() - -proc stop*(s: P2PServer) = - s.server.stop() diff --git a/tests/tdiscovery.nim b/tests/tdiscovery.nim index d2bff96..0a8f233 100644 --- a/tests/tdiscovery.nim +++ b/tests/tdiscovery.nim @@ -9,7 +9,7 @@ import sequtils, logging import eth_keys, asyncdispatch2, byteutils -import eth_p2p/[discovery, kademlia, peer_pool, enode] +import eth_p2p/[discovery, kademlia, rlpx, enode] const clientId = "nim-eth-p2p/0.0.1" diff --git a/tests/tserver.nim b/tests/tserver.nim index e61aedc..e825a91 100644 --- a/tests/tserver.nim +++ b/tests/tserver.nim @@ -9,7 +9,7 @@ import sequtils import eth_keys, asyncdispatch2 -import eth_p2p/[discovery, kademlia, peer_pool, enode, server, rlpx] +import eth_p2p/[discovery, kademlia, enode, rlpx] const clientId = "nim-eth-p2p/0.0.1" @@ -21,8 +21,7 @@ proc test() {.async.} = let kp = newKeyPair() let address = localAddress(20301) - let s = newP2PServer(kp, address, nil, [], clientId, 1) - s.start() + let s = connectToNetwork(kp, address, nil, [], clientId, 1) let n = newNode(initENode(kp.pubKey, address)) let peer = await rlpxConnect(n, newKeyPair(), Port(1234), clientId)