mirror of
https://github.com/status-im/nim-eth-p2p.git
synced 2025-01-27 23:25:05 +00:00
Merge peer_pool and server into rlpx
This was done because a cycle was formed between the structures of the three modules: - A Peer holds a reference to its Network - The Network holds a reference to its PeerPool - The PeerPool keeps a table of connected Peers I could have resolved the issue by introducing a new types module, but it would have required all of the currently private fields to become public (due to lack of package-level visibility in Nim). Instead I decided to merge the modules because they were relatively small anyway. Please note that the former `P2PServer` type is now called `NetworkConnection`. There are slight changes in the APIs that will be carried out in Nimbus when merging this.
This commit is contained in:
parent
fac8bbd917
commit
72016046fb
@ -1,104 +0,0 @@
|
||||
#
|
||||
# Ethereum P2P
|
||||
# (c) Copyright 2018
|
||||
# Status Research & Development GmbH
|
||||
#
|
||||
# Licensed under either of
|
||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||
# MIT license (LICENSE-MIT)
|
||||
#
|
||||
|
||||
import rlp/types, nimcrypto/hash, stint
|
||||
|
||||
export
|
||||
MDigest
|
||||
|
||||
type
|
||||
# XXX: Some of the UInt256 fields may be unnecessarily large
|
||||
|
||||
P* = UInt256
|
||||
|
||||
KeccakHash* = MDigest[256]
|
||||
KeyValuePair* = (BytesRange, BytesRange)
|
||||
|
||||
BlockNonce* = UInt256
|
||||
Blob* = seq[byte]
|
||||
|
||||
BloomFilter* = distinct KeccakHash
|
||||
EthAddress* = distinct MDigest[160]
|
||||
|
||||
Transaction* = object
|
||||
accountNonce*: uint64
|
||||
gasPrice*: UInt256
|
||||
gasLimit*: uint64
|
||||
to*: EthAddress
|
||||
value*: UInt256
|
||||
payload*: Blob
|
||||
V*, R*, S*: UInt256
|
||||
|
||||
AccessList* = object
|
||||
# XXX: Specify the structure of this
|
||||
|
||||
BlockHeader* = object
|
||||
parentHash*: KeccakHash
|
||||
uncleHash*: KeccakHash
|
||||
coinbase*: EthAddress
|
||||
stateRoot*: KeccakHash
|
||||
txRoot*: KeccakHash
|
||||
receiptRoot*: KeccakHash
|
||||
bloom*: BloomFilter
|
||||
difficulty*: UInt256
|
||||
blockNumber*: uint
|
||||
gasLimit*: uint64
|
||||
gasUsed*: uint64
|
||||
timestamp*: uint64
|
||||
extraData*: Blob
|
||||
mixDigest*: KeccakHash
|
||||
nonce*: BlockNonce
|
||||
|
||||
BlockBody* = object
|
||||
transactions*: seq[Transaction]
|
||||
uncles*: seq[BlockHeader]
|
||||
|
||||
Log* = object
|
||||
address*: EthAddress
|
||||
topics*: seq[int32]
|
||||
data*: Blob
|
||||
|
||||
Receipt* = object
|
||||
stateRoot*: Blob
|
||||
gasUsed*: uint64
|
||||
bloom*: BloomFilter
|
||||
logs*: seq[Log]
|
||||
|
||||
ShardTransaction* = object
|
||||
chain*: uint
|
||||
shard*: uint
|
||||
to*: EthAddress
|
||||
data*: Blob
|
||||
gas*: uint64
|
||||
acceesList*: AccessList
|
||||
code*: Blob
|
||||
salt*: KeccakHash
|
||||
|
||||
CollationHeader* = object
|
||||
shard*: uint
|
||||
expectedPeriod*: uint
|
||||
periodStartPrevHash*: KeccakHash
|
||||
parentHash*: KeccakHash
|
||||
txRoot*: KeccakHash
|
||||
coinbase*: EthAddress
|
||||
stateRoot*: KeccakHash
|
||||
receiptRoot*: KeccakHash
|
||||
blockNumber*: uint
|
||||
|
||||
HashOrNum* = object
|
||||
case isHash*: bool
|
||||
of true:
|
||||
hash*: KeccakHash
|
||||
else:
|
||||
number*: uint
|
||||
|
||||
BlocksRequest* = object
|
||||
startBlock*: HashOrNum
|
||||
maxResults*, skip*, reverse*: uint
|
@ -1,191 +0,0 @@
|
||||
#
|
||||
# Ethereum P2P
|
||||
# (c) Copyright 2018
|
||||
# Status Research & Development GmbH
|
||||
#
|
||||
# Licensed under either of
|
||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||
# MIT license (LICENSE-MIT)
|
||||
#
|
||||
|
||||
import logging, tables, times, random
|
||||
import eth_keys, asyncdispatch2
|
||||
import discovery, rlpx, kademlia
|
||||
|
||||
type
|
||||
PeerPool* = ref object
|
||||
keyPair: KeyPair
|
||||
networkId: int
|
||||
minPeers: int
|
||||
clientId: string
|
||||
discovery: DiscoveryProtocol
|
||||
lastLookupTime: float
|
||||
connectedNodes: Table[Node, Peer]
|
||||
running: bool
|
||||
listenPort*: Port
|
||||
|
||||
AsyncChainDb* = ref object # TODO: This should be defined elsewhere
|
||||
|
||||
# class PeerPool:
|
||||
# PeerPool attempts to keep connections to at least min_peers on the given network.
|
||||
|
||||
const
|
||||
lookupInterval = 5
|
||||
connectLoopSleepMs = 2000
|
||||
|
||||
proc newPeerPool*(chainDb: AsyncChainDb, networkId: int, keyPair: KeyPair,
|
||||
discovery: DiscoveryProtocol, clientId: string,
|
||||
listenPort = Port(30303), minPeers = 10): PeerPool =
|
||||
result.new()
|
||||
result.keyPair = keyPair
|
||||
result.minPeers = minPeers
|
||||
result.networkId = networkId
|
||||
result.discovery = discovery
|
||||
result.connectedNodes = initTable[Node, Peer]()
|
||||
result.listenPort = listenPort
|
||||
|
||||
template ensureFuture(f: untyped) = asyncCheck f
|
||||
|
||||
proc nodesToConnect(p: PeerPool): seq[Node] {.inline.} =
|
||||
p.discovery.randomNodes(p.minPeers)
|
||||
|
||||
# def subscribe(self, subscriber: PeerPoolSubscriber) -> None:
|
||||
# self._subscribers.append(subscriber)
|
||||
# for peer in self.connected_nodes.values():
|
||||
# subscriber.register_peer(peer)
|
||||
|
||||
# def unsubscribe(self, subscriber: PeerPoolSubscriber) -> None:
|
||||
# if subscriber in self._subscribers:
|
||||
# self._subscribers.remove(subscriber)
|
||||
|
||||
proc stopAllPeers(p: PeerPool) {.async.} =
|
||||
info "Stopping all peers ..."
|
||||
# TODO: ...
|
||||
# await asyncio.gather(
|
||||
# *[peer.stop() for peer in self.connected_nodes.values()])
|
||||
|
||||
# async def stop(self) -> None:
|
||||
# self.cancel_token.trigger()
|
||||
# await self.stop_all_peers()
|
||||
|
||||
proc connect(p: PeerPool, remote: Node): Future[Peer] {.async.} =
|
||||
## Connect to the given remote and return a Peer instance when successful.
|
||||
## Returns nil if the remote is unreachable, times out or is useless.
|
||||
if remote in p.connectedNodes:
|
||||
debug "Skipping ", remote, "; already connected to it"
|
||||
return nil
|
||||
|
||||
result = await remote.rlpxConnect(p.keyPair, p.listenPort, p.clientId)
|
||||
|
||||
# expected_exceptions = (
|
||||
# UnreachablePeer, TimeoutError, PeerConnectionLost, HandshakeFailure)
|
||||
# try:
|
||||
# self.logger.debug("Connecting to %s...", remote)
|
||||
# peer = await wait_with_token(
|
||||
# handshake(remote, self.privkey, self.peer_class, self.chaindb, self.network_id),
|
||||
# token=self.cancel_token,
|
||||
# timeout=HANDSHAKE_TIMEOUT)
|
||||
# return peer
|
||||
# except OperationCancelled:
|
||||
# # Pass it on to instruct our main loop to stop.
|
||||
# raise
|
||||
# except expected_exceptions as e:
|
||||
# self.logger.debug("Could not complete handshake with %s: %s", remote, repr(e))
|
||||
# except Exception:
|
||||
# self.logger.exception("Unexpected error during auth/p2p handshake with %s", remote)
|
||||
# return None
|
||||
|
||||
proc lookupRandomNode(p: PeerPool) {.async.} =
|
||||
# This method runs in the background, so we must catch OperationCancelled
|
||||
# ere otherwise asyncio will warn that its exception was never retrieved.
|
||||
try:
|
||||
discard await p.discovery.lookupRandom()
|
||||
except: # OperationCancelled
|
||||
discard
|
||||
p.lastLookupTime = epochTime()
|
||||
|
||||
proc getRandomBootnode(p: PeerPool): seq[Node] =
|
||||
@[p.discovery.bootstrapNodes.rand()]
|
||||
|
||||
proc peerFinished(p: PeerPool, peer: Peer) =
|
||||
## Remove the given peer from our list of connected nodes.
|
||||
## This is passed as a callback to be called when a peer finishes.
|
||||
p.connectedNodes.del(peer.remote)
|
||||
|
||||
proc run(p: Peer, completionHandler: proc() = nil) {.async.} =
|
||||
# TODO: This is a stub that should be implemented in rlpx.nim
|
||||
await sleepAsync(20000) # sleep 20 sec
|
||||
if not completionHandler.isNil: completionHandler()
|
||||
|
||||
proc connectToNodes(p: PeerPool, nodes: seq[Node]) {.async.} =
|
||||
for node in nodes:
|
||||
# TODO: Consider changing connect() to raise an exception instead of
|
||||
# returning None, as discussed in
|
||||
# https://github.com/ethereum/py-evm/pull/139#discussion_r152067425
|
||||
let peer = await p.connect(node)
|
||||
if not peer.isNil:
|
||||
info "Successfully connected to ", peer
|
||||
ensureFuture peer.run() do():
|
||||
p.peerFinished(peer)
|
||||
|
||||
p.connectedNodes[peer.remote] = peer
|
||||
# for subscriber in self._subscribers:
|
||||
# subscriber.register_peer(peer)
|
||||
if p.connectedNodes.len >= p.minPeers:
|
||||
return
|
||||
|
||||
proc maybeConnectToMorePeers(p: PeerPool) {.async.} =
|
||||
## Connect to more peers if we're not yet connected to at least self.minPeers.
|
||||
if p.connectedNodes.len >= p.minPeers:
|
||||
debug "Already connected to enough peers: ", p.connectedNodes, "; sleeping"
|
||||
return
|
||||
|
||||
if p.lastLookupTime + lookupInterval < epochTime():
|
||||
ensureFuture p.lookupRandomNode()
|
||||
|
||||
await p.connectToNodes(p.nodesToConnect())
|
||||
|
||||
# In some cases (e.g ROPSTEN or private testnets), the discovery table might
|
||||
# be full of bad peers, so if we can't connect to any peers we try a random
|
||||
# bootstrap node as well.
|
||||
if p.connectedNodes.len == 0:
|
||||
await p.connectToNodes(p.getRandomBootnode())
|
||||
|
||||
proc run(p: PeerPool) {.async.} =
|
||||
info "Running PeerPool..."
|
||||
p.running = true
|
||||
while p.running:
|
||||
var dropConnections = false
|
||||
try:
|
||||
await p.maybeConnectToMorePeers()
|
||||
except:
|
||||
# Most unexpected errors should be transient, so we log and restart from
|
||||
# scratch.
|
||||
error "Unexpected error, restarting"
|
||||
dropConnections = true
|
||||
|
||||
if dropConnections:
|
||||
await p.stopAllPeers()
|
||||
|
||||
await sleepAsync(connectLoopSleepMs)
|
||||
|
||||
proc start*(p: PeerPool) =
|
||||
if not p.running:
|
||||
asyncCheck p.run()
|
||||
|
||||
|
||||
# @property
|
||||
# def peers(self) -> List[BasePeer]:
|
||||
# peers = list(self.connected_nodes.values())
|
||||
# # Shuffle the list of peers so that dumb callsites are less likely to send
|
||||
# # all requests to
|
||||
# # a single peer even if they always pick the first one from the list.
|
||||
# random.shuffle(peers)
|
||||
# return peers
|
||||
|
||||
# async def get_random_peer(self) -> BasePeer:
|
||||
# while not self.peers:
|
||||
# self.logger.debug("No connected peers, sleeping a bit")
|
||||
# await asyncio.sleep(0.5)
|
||||
# return random.choice(self.peers)
|
||||
|
251
eth_p2p/rlpx.nim
251
eth_p2p/rlpx.nim
@ -8,33 +8,52 @@
|
||||
# MIT license (LICENSE-MIT)
|
||||
#
|
||||
|
||||
import macros, sets, algorithm, logging, hashes
|
||||
import rlp, ranges/[stackarrays, ptr_arith], eth_keys, ethereum_types,
|
||||
nimcrypto, asyncdispatch2
|
||||
import kademlia, discovery, auth, rlpxcrypt, enode
|
||||
import
|
||||
tables, macros, sets, algorithm, logging, hashes, times, random,
|
||||
rlp, ranges/[stackarrays, ptr_arith], nimcrypto, asyncdispatch2,
|
||||
eth_keys, eth_common,
|
||||
kademlia, discovery, auth, rlpxcrypt, enode
|
||||
|
||||
type
|
||||
ConnectionState = enum
|
||||
ConnectionState* = enum
|
||||
None,
|
||||
Connected,
|
||||
Disconnecting,
|
||||
Disconnected
|
||||
|
||||
Network* = ref object
|
||||
NetworkConnection* = ref object
|
||||
id: int
|
||||
listeningServer: StreamServer
|
||||
protocolStates: seq[RootRef]
|
||||
chainDb: AbstractChainDB
|
||||
keyPair: KeyPair
|
||||
address: Address
|
||||
clientId: string
|
||||
discovery: DiscoveryProtocol
|
||||
peerPool: PeerPool
|
||||
|
||||
Peer* = ref object
|
||||
transp: StreamTransport
|
||||
dispatcher: Dispatcher
|
||||
networkId: int
|
||||
nextRequestId: int
|
||||
network: Network
|
||||
network: NetworkConnection
|
||||
secretsState: SecretState
|
||||
connectionState: ConnectionState
|
||||
protocolStates: seq[RootRef]
|
||||
remote*: Node
|
||||
|
||||
PeerPool* = ref object
|
||||
keyPair: KeyPair
|
||||
networkId: int
|
||||
minPeers: int
|
||||
clientId: string
|
||||
discovery: DiscoveryProtocol
|
||||
lastLookupTime: float
|
||||
connectedNodes: Table[Node, Peer]
|
||||
running: bool
|
||||
listenPort*: Port
|
||||
|
||||
MessageHandler* = proc(x: Peer, data: Rlp): Future[void]
|
||||
|
||||
MessageInfo* = object
|
||||
@ -71,7 +90,7 @@ type
|
||||
protocolOffsets: seq[int]
|
||||
thunks: seq[MessageHandler]
|
||||
|
||||
RlpxMessageKind = enum
|
||||
RlpxMessageKind* = enum
|
||||
rlpxNotification,
|
||||
rlpxRequest,
|
||||
rlpxResponse
|
||||
@ -193,15 +212,6 @@ proc registerProtocol(protocol: ProtocolInfo) =
|
||||
else:
|
||||
devp2p = protocol
|
||||
|
||||
# RLP serialization
|
||||
#
|
||||
|
||||
proc append*(rlpWriter: var RlpWriter, hash: KeccakHash) =
|
||||
rlpWriter.append(hash.data)
|
||||
|
||||
proc read*(rlp: var Rlp, T: typedesc[KeccakHash]): T =
|
||||
result.data = rlp.read(type(result.data))
|
||||
|
||||
# Message composition and encryption
|
||||
#
|
||||
|
||||
@ -796,6 +806,213 @@ proc rlpxAccept*(transp: StreamTransport, myKeys: KeyPair,
|
||||
except:
|
||||
transp.close()
|
||||
|
||||
# PeerPool attempts to keep connections to at least min_peers
|
||||
# on the given network.
|
||||
|
||||
const
|
||||
lookupInterval = 5
|
||||
connectLoopSleepMs = 2000
|
||||
|
||||
proc newPeerPool*(chainDb: AbstractChainDB, networkId: int, keyPair: KeyPair,
|
||||
discovery: DiscoveryProtocol, clientId: string,
|
||||
listenPort = Port(30303), minPeers = 10): PeerPool =
|
||||
result.new()
|
||||
result.keyPair = keyPair
|
||||
result.minPeers = minPeers
|
||||
result.networkId = networkId
|
||||
result.discovery = discovery
|
||||
result.connectedNodes = initTable[Node, Peer]()
|
||||
result.listenPort = listenPort
|
||||
|
||||
template ensureFuture(f: untyped) = asyncCheck f
|
||||
|
||||
proc nodesToConnect(p: PeerPool): seq[Node] {.inline.} =
|
||||
p.discovery.randomNodes(p.minPeers)
|
||||
|
||||
# def subscribe(self, subscriber: PeerPoolSubscriber) -> None:
|
||||
# self._subscribers.append(subscriber)
|
||||
# for peer in self.connected_nodes.values():
|
||||
# subscriber.register_peer(peer)
|
||||
|
||||
# def unsubscribe(self, subscriber: PeerPoolSubscriber) -> None:
|
||||
# if subscriber in self._subscribers:
|
||||
# self._subscribers.remove(subscriber)
|
||||
|
||||
proc stopAllPeers(p: PeerPool) {.async.} =
|
||||
info "Stopping all peers ..."
|
||||
# TODO: ...
|
||||
# await asyncio.gather(
|
||||
# *[peer.stop() for peer in self.connected_nodes.values()])
|
||||
|
||||
# async def stop(self) -> None:
|
||||
# self.cancel_token.trigger()
|
||||
# await self.stop_all_peers()
|
||||
|
||||
proc connect(p: PeerPool, remote: Node): Future[Peer] {.async.} =
|
||||
## Connect to the given remote and return a Peer instance when successful.
|
||||
## Returns nil if the remote is unreachable, times out or is useless.
|
||||
if remote in p.connectedNodes:
|
||||
debug "Skipping ", remote, "; already connected to it"
|
||||
return nil
|
||||
|
||||
result = await remote.rlpxConnect(p.keyPair, p.listenPort, p.clientId)
|
||||
|
||||
# expected_exceptions = (
|
||||
# UnreachablePeer, TimeoutError, PeerConnectionLost, HandshakeFailure)
|
||||
# try:
|
||||
# self.logger.debug("Connecting to %s...", remote)
|
||||
# peer = await wait_with_token(
|
||||
# handshake(remote, self.privkey, self.peer_class, self.chaindb, self.network_id),
|
||||
# token=self.cancel_token,
|
||||
# timeout=HANDSHAKE_TIMEOUT)
|
||||
# return peer
|
||||
# except OperationCancelled:
|
||||
# # Pass it on to instruct our main loop to stop.
|
||||
# raise
|
||||
# except expected_exceptions as e:
|
||||
# self.logger.debug("Could not complete handshake with %s: %s", remote, repr(e))
|
||||
# except Exception:
|
||||
# self.logger.exception("Unexpected error during auth/p2p handshake with %s", remote)
|
||||
# return None
|
||||
|
||||
proc lookupRandomNode(p: PeerPool) {.async.} =
|
||||
# This method runs in the background, so we must catch OperationCancelled
|
||||
# ere otherwise asyncio will warn that its exception was never retrieved.
|
||||
try:
|
||||
discard await p.discovery.lookupRandom()
|
||||
except: # OperationCancelled
|
||||
discard
|
||||
p.lastLookupTime = epochTime()
|
||||
|
||||
proc getRandomBootnode(p: PeerPool): seq[Node] =
|
||||
@[p.discovery.bootstrapNodes.rand()]
|
||||
|
||||
proc peerFinished(p: PeerPool, peer: Peer) =
|
||||
## Remove the given peer from our list of connected nodes.
|
||||
## This is passed as a callback to be called when a peer finishes.
|
||||
p.connectedNodes.del(peer.remote)
|
||||
|
||||
proc run(p: Peer, completionHandler: proc() = nil) {.async.} =
|
||||
# TODO: This is a stub that should be implemented in rlpx.nim
|
||||
await sleepAsync(20000) # sleep 20 sec
|
||||
if not completionHandler.isNil: completionHandler()
|
||||
|
||||
proc connectToNodes(p: PeerPool, nodes: seq[Node]) {.async.} =
|
||||
for node in nodes:
|
||||
# TODO: Consider changing connect() to raise an exception instead of
|
||||
# returning None, as discussed in
|
||||
# https://github.com/ethereum/py-evm/pull/139#discussion_r152067425
|
||||
let peer = await p.connect(node)
|
||||
if not peer.isNil:
|
||||
info "Successfully connected to ", peer
|
||||
ensureFuture peer.run() do():
|
||||
p.peerFinished(peer)
|
||||
|
||||
p.connectedNodes[peer.remote] = peer
|
||||
# for subscriber in self._subscribers:
|
||||
# subscriber.register_peer(peer)
|
||||
if p.connectedNodes.len >= p.minPeers:
|
||||
return
|
||||
|
||||
proc maybeConnectToMorePeers(p: PeerPool) {.async.} =
|
||||
## Connect to more peers if we're not yet connected to at least self.minPeers.
|
||||
if p.connectedNodes.len >= p.minPeers:
|
||||
debug "Already connected to enough peers: ", p.connectedNodes, "; sleeping"
|
||||
return
|
||||
|
||||
if p.lastLookupTime + lookupInterval < epochTime():
|
||||
ensureFuture p.lookupRandomNode()
|
||||
|
||||
await p.connectToNodes(p.nodesToConnect())
|
||||
|
||||
# In some cases (e.g ROPSTEN or private testnets), the discovery table might
|
||||
# be full of bad peers, so if we can't connect to any peers we try a random
|
||||
# bootstrap node as well.
|
||||
if p.connectedNodes.len == 0:
|
||||
await p.connectToNodes(p.getRandomBootnode())
|
||||
|
||||
proc run(p: PeerPool) {.async.} =
|
||||
info "Running PeerPool..."
|
||||
p.running = true
|
||||
while p.running:
|
||||
var dropConnections = false
|
||||
try:
|
||||
await p.maybeConnectToMorePeers()
|
||||
except:
|
||||
# Most unexpected errors should be transient, so we log and restart from
|
||||
# scratch.
|
||||
error "Unexpected error, restarting"
|
||||
dropConnections = true
|
||||
|
||||
if dropConnections:
|
||||
await p.stopAllPeers()
|
||||
|
||||
await sleepAsync(connectLoopSleepMs)
|
||||
|
||||
proc start*(p: PeerPool) =
|
||||
if not p.running:
|
||||
asyncCheck p.run()
|
||||
|
||||
# @property
|
||||
# def peers(self) -> List[BasePeer]:
|
||||
# peers = list(self.connected_nodes.values())
|
||||
# # Shuffle the list of peers so that dumb callsites are less likely to send
|
||||
# # all requests to
|
||||
# # a single peer even if they always pick the first one from the list.
|
||||
# random.shuffle(peers)
|
||||
# return peers
|
||||
|
||||
# async def get_random_peer(self) -> BasePeer:
|
||||
# while not self.peers:
|
||||
# self.logger.debug("No connected peers, sleeping a bit")
|
||||
# await asyncio.sleep(0.5)
|
||||
# return random.choice(self.peers)
|
||||
|
||||
proc processIncoming(server: StreamServer,
|
||||
remote: StreamTransport): Future[void] {.async, gcsafe.} =
|
||||
var p2p = getUserData[NetworkConnection](server)
|
||||
let peerfut = remote.rlpxAccept(p2p.keyPair, p2p.clientId)
|
||||
yield peerfut
|
||||
if not peerfut.failed:
|
||||
let peer = peerfut.read()
|
||||
echo "TODO: Add peer to the pool..."
|
||||
else:
|
||||
echo "Could not establish connection with incoming peer ",
|
||||
$remote.remoteAddress()
|
||||
remote.close()
|
||||
|
||||
proc connectToNetwork*(keyPair: KeyPair,
|
||||
address: Address,
|
||||
chainDb: AbstractChainDB,
|
||||
bootstrapNodes: openarray[ENode],
|
||||
clientId: string,
|
||||
networkId: int,
|
||||
startListening = true): NetworkConnection =
|
||||
new result
|
||||
result.id = networkId
|
||||
result.chainDb = chainDb
|
||||
result.keyPair = keyPair
|
||||
result.address = address
|
||||
result.clientId = clientId
|
||||
result.discovery = newDiscoveryProtocol(keyPair.seckey, address,
|
||||
bootstrapNodes)
|
||||
result.peerPool = newPeerPool(chainDb, networkId, keyPair, result.discovery,
|
||||
clientId, address.tcpPort)
|
||||
|
||||
let ta = initTAddress(address.ip, address.tcpPort)
|
||||
result.listeningServer = createStreamServer(ta, processIncoming,
|
||||
{ReuseAddr},
|
||||
udata = result)
|
||||
|
||||
if startListening:
|
||||
result.listeningServer.start()
|
||||
|
||||
proc startListening*(s: NetworkConnection) =
|
||||
s.listeningServer.start()
|
||||
|
||||
proc stopListening*(s: NetworkConnection) =
|
||||
s.listeningServer.stop()
|
||||
|
||||
when isMainModule:
|
||||
import rlp
|
||||
|
||||
|
@ -9,7 +9,7 @@
|
||||
#
|
||||
|
||||
import
|
||||
rlp/types, stint, rlpx, ethereum_types
|
||||
rlp/types, stint, rlpx, eth_common
|
||||
|
||||
type
|
||||
P = UInt256
|
||||
|
@ -9,7 +9,7 @@
|
||||
#
|
||||
|
||||
import
|
||||
rlp/types, rlpx, ethereum_types
|
||||
rlp/types, rlpx, eth_common
|
||||
|
||||
type
|
||||
ProofRequest* = object
|
||||
|
@ -1,60 +0,0 @@
|
||||
#
|
||||
# Ethereum P2P
|
||||
# (c) Copyright 2018
|
||||
# Status Research & Development GmbH
|
||||
#
|
||||
# Licensed under either of
|
||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||
# MIT license (LICENSE-MIT)
|
||||
#
|
||||
|
||||
import asyncdispatch2, eth_keys
|
||||
import peer_pool, discovery, enode, auth, rlpx
|
||||
|
||||
type
|
||||
P2PServer* = ref object
|
||||
server: StreamServer
|
||||
chainDb: AsyncChainDb
|
||||
keyPair: KeyPair
|
||||
address: Address
|
||||
networkId: int
|
||||
clientId: string
|
||||
discovery: DiscoveryProtocol
|
||||
peerPool: PeerPool
|
||||
|
||||
proc processIncoming(server: StreamServer,
|
||||
remote: StreamTransport): Future[void] {.async, gcsafe.} =
|
||||
var p2p = getUserData[P2PServer](server)
|
||||
let peerfut = remote.rlpxAccept(p2p.keyPair, p2p.clientId)
|
||||
yield peerfut
|
||||
if not peerfut.failed:
|
||||
let peer = peerfut.read()
|
||||
echo "TODO: Add peer to the pool..."
|
||||
else:
|
||||
echo "Could not establish connection with incoming peer ",
|
||||
$remote.remoteAddress()
|
||||
remote.close()
|
||||
|
||||
proc newP2PServer*(keyPair: KeyPair, address: Address, chainDb: AsyncChainDB,
|
||||
bootstrapNodes: openarray[ENode], clientId: string,
|
||||
networkId: int): P2PServer =
|
||||
result.new()
|
||||
result.chainDb = chainDb
|
||||
result.keyPair = keyPair
|
||||
result.address = address
|
||||
result.clientId = clientId
|
||||
result.networkId = networkId
|
||||
result.discovery = newDiscoveryProtocol(keyPair.seckey, address,
|
||||
bootstrapNodes)
|
||||
result.peerPool = newPeerPool(chainDb, networkId, keyPair, result.discovery,
|
||||
clientId, address.tcpPort)
|
||||
|
||||
let ta = initTAddress(address.ip, address.tcpPort)
|
||||
result.server = createStreamServer(ta, processIncoming, {ReuseAddr},
|
||||
udata = result)
|
||||
|
||||
proc start*(s: P2PServer) =
|
||||
s.server.start()
|
||||
|
||||
proc stop*(s: P2PServer) =
|
||||
s.server.stop()
|
@ -9,7 +9,7 @@
|
||||
|
||||
import sequtils, logging
|
||||
import eth_keys, asyncdispatch2, byteutils
|
||||
import eth_p2p/[discovery, kademlia, peer_pool, enode]
|
||||
import eth_p2p/[discovery, kademlia, rlpx, enode]
|
||||
|
||||
const clientId = "nim-eth-p2p/0.0.1"
|
||||
|
||||
|
@ -9,7 +9,7 @@
|
||||
|
||||
import sequtils
|
||||
import eth_keys, asyncdispatch2
|
||||
import eth_p2p/[discovery, kademlia, peer_pool, enode, server, rlpx]
|
||||
import eth_p2p/[discovery, kademlia, enode, rlpx]
|
||||
|
||||
const clientId = "nim-eth-p2p/0.0.1"
|
||||
|
||||
@ -21,8 +21,7 @@ proc test() {.async.} =
|
||||
let kp = newKeyPair()
|
||||
let address = localAddress(20301)
|
||||
|
||||
let s = newP2PServer(kp, address, nil, [], clientId, 1)
|
||||
s.start()
|
||||
let s = connectToNetwork(kp, address, nil, [], clientId, 1)
|
||||
|
||||
let n = newNode(initENode(kp.pubKey, address))
|
||||
let peer = await rlpxConnect(n, newKeyPair(), Port(1234), clientId)
|
||||
|
Loading…
x
Reference in New Issue
Block a user