mirror of https://github.com/status-im/nim-eth.git
309 lines
10 KiB
Nim
309 lines
10 KiB
Nim
# nim-eth
|
|
# Copyright (c) 2018-2023 Status Research & Development GmbH
|
|
# Licensed and distributed under either of
|
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
|
|
|
# PeerPool attempts to keep connections to at least min_peers
|
|
# on the given network.
|
|
|
|
{.push raises: [].}
|
|
|
|
import
|
|
std/[os, tables, times, random, sequtils, options],
|
|
chronos, chronicles,
|
|
".."/[keys, common],
|
|
./private/p2p_types, "."/[discovery, kademlia, rlpx, enode]
|
|
|
|
logScope:
|
|
topics = "eth p2p peer_pool"
|
|
|
|
|
|
const
|
|
lookupInterval = 5
|
|
connectLoopSleep = chronos.milliseconds(2000)
|
|
maxConcurrentConnectionRequests = 40
|
|
sleepBeforeTryingARandomBootnode = chronos.milliseconds(3000)
|
|
|
|
## Period of time for dead / unreachable peers.
|
|
SeenTableTimeDeadPeer = chronos.minutes(10)
|
|
## Period of time for Useless peers, either because of no matching
|
|
## capabilities or on an irrelevant network.
|
|
SeenTableTimeUselessPeer = chronos.hours(24)
|
|
## Period of time for peers with a protocol error.
|
|
SeenTableTimeProtocolError = chronos.minutes(30)
|
|
## Period of time for peers with general disconnections / transport errors.
|
|
SeenTableTimeReconnect = chronos.minutes(5)
|
|
|
|
proc isSeen(p: PeerPool, nodeId: NodeId): bool =
|
|
## Returns ``true`` if ``nodeId`` present in SeenTable and time period is not
|
|
## yet expired.
|
|
let currentTime = now(chronos.Moment)
|
|
if nodeId notin p.seenTable:
|
|
false
|
|
else:
|
|
let item = try: p.seenTable[nodeId]
|
|
except KeyError: raiseAssert "checked with notin"
|
|
if currentTime >= item.stamp:
|
|
# Peer is in SeenTable, but the time period has expired.
|
|
p.seenTable.del(nodeId)
|
|
false
|
|
else:
|
|
true
|
|
|
|
proc addSeen(
|
|
p: PeerPool, nodeId: NodeId, period: chronos.Duration) =
|
|
## Adds peer with NodeId ``nodeId`` to SeenTable and timeout ``period``.
|
|
let item = SeenNode(nodeId: nodeId, stamp: now(chronos.Moment) + period)
|
|
withValue(p.seenTable, nodeId, entry) do:
|
|
if entry.stamp < item.stamp:
|
|
entry.stamp = item.stamp
|
|
do:
|
|
p.seenTable[nodeId] = item
|
|
|
|
proc newPeerPool*(
|
|
network: EthereumNode, networkId: NetworkId, keyPair: KeyPair,
|
|
discovery: DiscoveryProtocol, clientId: string, minPeers = 10): PeerPool =
|
|
new result
|
|
result.network = network
|
|
result.keyPair = keyPair
|
|
result.minPeers = minPeers
|
|
result.networkId = networkId
|
|
result.discovery = discovery
|
|
result.connQueue = newAsyncQueue[Node](maxConcurrentConnectionRequests)
|
|
result.connectedNodes = initTable[Node, Peer]()
|
|
result.connectingNodes = initHashSet[Node]()
|
|
result.observers = initTable[int, PeerObserver]()
|
|
|
|
proc nodesToConnect(p: PeerPool): seq[Node] =
|
|
p.discovery.randomNodes(p.minPeers).filterIt(it notin p.discovery.bootstrapNodes)
|
|
|
|
proc addObserver*(p: PeerPool, observerId: int, observer: PeerObserver) =
|
|
doAssert(observerId notin p.observers)
|
|
p.observers[observerId] = observer
|
|
if not observer.onPeerConnected.isNil:
|
|
for peer in p.connectedNodes.values:
|
|
if observer.protocol.isNil or peer.supports(observer.protocol):
|
|
observer.onPeerConnected(peer)
|
|
|
|
proc delObserver*(p: PeerPool, observerId: int) =
|
|
p.observers.del(observerId)
|
|
|
|
proc addObserver*(p: PeerPool, observerId: ref, observer: PeerObserver) =
|
|
p.addObserver(cast[int](observerId), observer)
|
|
|
|
proc delObserver*(p: PeerPool, observerId: ref) =
|
|
p.delObserver(cast[int](observerId))
|
|
|
|
template setProtocol*(observer: PeerObserver, Protocol: type) =
|
|
observer.protocol = Protocol.protocolInfo
|
|
|
|
proc stopAllPeers(p: PeerPool) {.async.} =
|
|
debug "Stopping all peers ..."
|
|
# TODO: ...
|
|
# await asyncio.gather(
|
|
# *[peer.stop() for peer in self.connected_nodes.values()])
|
|
|
|
# async def stop(self) -> None:
|
|
# self.cancel_token.trigger()
|
|
# await self.stop_all_peers()
|
|
|
|
proc connect(p: PeerPool, remote: Node): Future[Peer] {.async.} =
|
|
## Connect to the given remote and return a Peer instance when successful.
|
|
## Returns nil if the remote is unreachable, times out or is useless.
|
|
if remote in p.connectedNodes:
|
|
trace "skipping_connection_to_already_connected_peer", remote
|
|
return nil
|
|
|
|
if remote in p.connectingNodes:
|
|
# debug "skipping connection"
|
|
return nil
|
|
|
|
if p.isSeen(remote.id):
|
|
return nil
|
|
|
|
trace "Connecting to node", remote
|
|
p.connectingNodes.incl(remote)
|
|
let res = await p.network.rlpxConnect(remote)
|
|
p.connectingNodes.excl(remote)
|
|
|
|
# TODO: Probably should move all this logic to rlpx.nim
|
|
if res.isOk():
|
|
rlpx_connect_success.inc()
|
|
return res.get()
|
|
else:
|
|
rlpx_connect_failure.inc()
|
|
rlpx_connect_failure.inc(labelValues = [$res.error])
|
|
case res.error():
|
|
of UselessRlpxPeerError:
|
|
p.addSeen(remote.id, SeenTableTimeUselessPeer)
|
|
of TransportConnectError:
|
|
p.addSeen(remote.id, SeenTableTimeDeadPeer)
|
|
of RlpxHandshakeError, ProtocolError, InvalidIdentityError:
|
|
p.addSeen(remote.id, SeenTableTimeProtocolError)
|
|
of RlpxHandshakeTransportError,
|
|
P2PHandshakeError,
|
|
P2PTransportError,
|
|
PeerDisconnectedError,
|
|
TooManyPeersError:
|
|
p.addSeen(remote.id, SeenTableTimeReconnect)
|
|
|
|
return nil
|
|
|
|
proc lookupRandomNode(p: PeerPool) {.async.} =
|
|
discard await p.discovery.lookupRandom()
|
|
p.lastLookupTime = epochTime()
|
|
|
|
proc getRandomBootnode(p: PeerPool): Option[Node] =
|
|
if p.discovery.bootstrapNodes.len != 0:
|
|
result = option(p.discovery.bootstrapNodes.sample())
|
|
|
|
proc addPeer*(pool: PeerPool, peer: Peer) {.gcsafe.} =
|
|
doAssert(peer.remote notin pool.connectedNodes)
|
|
pool.connectedNodes[peer.remote] = peer
|
|
rlpx_connected_peers.inc()
|
|
for o in pool.observers.values:
|
|
if not o.onPeerConnected.isNil:
|
|
if o.protocol.isNil or peer.supports(o.protocol):
|
|
o.onPeerConnected(peer)
|
|
|
|
proc connectToNode*(p: PeerPool, n: Node) {.async.} =
|
|
let peer = await p.connect(n)
|
|
if not peer.isNil:
|
|
trace "Connection established (outgoing)", peer
|
|
p.addPeer(peer)
|
|
|
|
proc connectToNode*(p: PeerPool, n: ENode) {.async.} =
|
|
await p.connectToNode(newNode(n))
|
|
|
|
|
|
# This code is loosely based on code from nimbus-eth2;
|
|
# see eth2_network.nim and search for connQueue.
|
|
proc createConnectionWorker(p: PeerPool, workerId: int): Future[void] {.async.} =
|
|
trace "Connection worker started", workerId = workerId
|
|
while true:
|
|
let n = await p.connQueue.popFirst()
|
|
await connectToNode(p, n)
|
|
|
|
# # TODO: Consider changing connect() to raise an exception instead of
|
|
# # returning None, as discussed in
|
|
# # https://github.com/ethereum/py-evm/pull/139#discussion_r152067425
|
|
# echo "Connecting to node: ", node
|
|
# let peer = await p.connect(node)
|
|
# if not peer.isNil:
|
|
# info "Successfully connected to ", peer
|
|
# ensureFuture peer.run(p)
|
|
|
|
# p.connectedNodes[peer.remote] = peer
|
|
# # for subscriber in self._subscribers:
|
|
# # subscriber.register_peer(peer)
|
|
# if p.connectedNodes.len >= p.minPeers:
|
|
# return
|
|
|
|
proc startConnectionWorkerPool(p: PeerPool, workerCount: int) =
|
|
for i in 0 ..< workerCount:
|
|
asyncSpawn createConnectionWorker(p, i)
|
|
|
|
proc maybeConnectToMorePeers(p: PeerPool) {.async.} =
|
|
## Connect to more peers if we're not yet connected to at least self.minPeers.
|
|
if p.connectedNodes.len >= p.minPeers:
|
|
# debug "pool already connected to enough peers (sleeping)", count = p.connectedNodes
|
|
return
|
|
|
|
if p.lastLookupTime + lookupInterval < epochTime():
|
|
asyncSpawn p.lookupRandomNode()
|
|
|
|
let debugEnode = getEnv("ETH_DEBUG_ENODE")
|
|
if debugEnode.len != 0:
|
|
await p.connectToNode(newNode(debugEnode))
|
|
else:
|
|
for n in p.nodesToConnect():
|
|
await p.connQueue.addLast(n)
|
|
|
|
# The old version of the code (which did all the connection
|
|
# attempts in serial, not parallel) actually *awaited* all
|
|
# the connection attempts before reaching the code at the
|
|
# end of this proc that tries a random bootnode. Should
|
|
# that still be what happens? I don't think so; one of the
|
|
# reasons we're doing the connection attempts concurrently
|
|
# is because sometimes the attempt takes a long time. Still,
|
|
# it seems like we should give the many connection attempts
|
|
# a *chance* to complete before moving on to trying a random
|
|
# bootnode. So let's try just waiting a few seconds. (I am
|
|
# really not sure this makes sense.)
|
|
#
|
|
# --Adam, Dec. 2022
|
|
await sleepAsync(sleepBeforeTryingARandomBootnode)
|
|
|
|
# In some cases (e.g ROPSTEN or private testnets), the discovery table might
|
|
# be full of bad peers, so if we can't connect to any peers we try a random
|
|
# bootstrap node as well.
|
|
if p.connectedNodes.len == 0 and (let n = p.getRandomBootnode(); n.isSome):
|
|
await p.connectToNode(n.get())
|
|
|
|
proc run(p: PeerPool) {.async.} =
|
|
trace "Running PeerPool..."
|
|
p.running = true
|
|
p.startConnectionWorkerPool(maxConcurrentConnectionRequests)
|
|
while p.running:
|
|
|
|
debug "Amount of peers", amount = p.connectedNodes.len()
|
|
var dropConnections = false
|
|
try:
|
|
await p.maybeConnectToMorePeers()
|
|
except CatchableError as e:
|
|
# Most unexpected errors should be transient, so we log and restart from
|
|
# scratch.
|
|
error "Unexpected PeerPool error, restarting",
|
|
err = e.msg, stackTrace = e.getStackTrace()
|
|
dropConnections = true
|
|
|
|
if dropConnections:
|
|
await p.stopAllPeers()
|
|
|
|
await sleepAsync(connectLoopSleep)
|
|
|
|
proc start*(p: PeerPool) =
|
|
if not p.running:
|
|
asyncSpawn p.run()
|
|
|
|
proc len*(p: PeerPool): int = p.connectedNodes.len
|
|
# @property
|
|
# def peers(self) -> List[BasePeer]:
|
|
# peers = list(self.connected_nodes.values())
|
|
# # Shuffle the list of peers so that dumb callsites are less likely to send
|
|
# # all requests to
|
|
# # a single peer even if they always pick the first one from the list.
|
|
# random.shuffle(peers)
|
|
# return peers
|
|
|
|
# async def get_random_peer(self) -> BasePeer:
|
|
# while not self.peers:
|
|
# self.logger.debug("No connected peers, sleeping a bit")
|
|
# await asyncio.sleep(0.5)
|
|
# return random.choice(self.peers)
|
|
|
|
iterator peers*(p: PeerPool): Peer =
|
|
for remote, peer in p.connectedNodes:
|
|
yield peer
|
|
|
|
iterator peers*(p: PeerPool, Protocol: type): Peer =
|
|
for peer in p.peers:
|
|
if peer.supports(Protocol):
|
|
yield peer
|
|
|
|
func numPeers*(p: PeerPool): int =
|
|
p.connectedNodes.len
|
|
|
|
func contains*(p: PeerPool, n: ENode): bool =
|
|
for remote, _ in p.connectedNodes:
|
|
if remote.node == n:
|
|
return true
|
|
|
|
func contains*(p: PeerPool, n: Node): bool =
|
|
n in p.connectedNodes
|
|
|
|
func contains*(p: PeerPool, n: Peer): bool =
|
|
n.remote in p.connectedNodes
|