mirror of https://github.com/status-im/nim-eth.git
Attempt to connect to many peers concurrently, not in sequence. (#575)
The old code, which did it in sequence, made it take a very long time to accumulate peers in the peer pool, particularly because occasionally a connection-attempt would hang for a couple of minutes before timing out.
This commit is contained in:
parent
8f0ae55353
commit
58284ffeda
|
@ -23,6 +23,8 @@ logScope:
|
||||||
const
|
const
|
||||||
lookupInterval = 5
|
lookupInterval = 5
|
||||||
connectLoopSleep = chronos.milliseconds(2000)
|
connectLoopSleep = chronos.milliseconds(2000)
|
||||||
|
maxConcurrentConnectionRequests = 40
|
||||||
|
sleepBeforeTryingARandomBootnode = chronos.milliseconds(3000)
|
||||||
|
|
||||||
proc newPeerPool*(
|
proc newPeerPool*(
|
||||||
network: EthereumNode, networkId: NetworkId, keyPair: KeyPair,
|
network: EthereumNode, networkId: NetworkId, keyPair: KeyPair,
|
||||||
|
@ -33,6 +35,7 @@ proc newPeerPool*(
|
||||||
result.minPeers = minPeers
|
result.minPeers = minPeers
|
||||||
result.networkId = networkId
|
result.networkId = networkId
|
||||||
result.discovery = discovery
|
result.discovery = discovery
|
||||||
|
result.connQueue = newAsyncQueue[Node](maxConcurrentConnectionRequests)
|
||||||
result.connectedNodes = initTable[Node, Peer]()
|
result.connectedNodes = initTable[Node, Peer]()
|
||||||
result.connectingNodes = initHashSet[Node]()
|
result.connectingNodes = initHashSet[Node]()
|
||||||
result.observers = initTable[int, PeerObserver]()
|
result.observers = initTable[int, PeerObserver]()
|
||||||
|
@ -130,9 +133,14 @@ proc connectToNode*(p: PeerPool, n: Node) {.async.} =
|
||||||
proc connectToNode*(p: PeerPool, n: ENode) {.async.} =
|
proc connectToNode*(p: PeerPool, n: ENode) {.async.} =
|
||||||
await p.connectToNode(newNode(n))
|
await p.connectToNode(newNode(n))
|
||||||
|
|
||||||
proc connectToNodes(p: PeerPool, nodes: seq[Node]) {.async.} =
|
|
||||||
for node in nodes:
|
# This code is loosely based on code from nimbus-eth2;
|
||||||
await p.connectToNode(node)
|
# see eth2_network.nim and search for connQueue.
|
||||||
|
proc createConnectionWorker(p: PeerPool, workerId: int): Future[void] {.async.} =
|
||||||
|
trace "Connection worker started", workerId = workerId
|
||||||
|
while true:
|
||||||
|
let n = await p.connQueue.popFirst()
|
||||||
|
await connectToNode(p, n)
|
||||||
|
|
||||||
# # TODO: Consider changing connect() to raise an exception instead of
|
# # TODO: Consider changing connect() to raise an exception instead of
|
||||||
# # returning None, as discussed in
|
# # returning None, as discussed in
|
||||||
|
@ -149,6 +157,10 @@ proc connectToNodes(p: PeerPool, nodes: seq[Node]) {.async.} =
|
||||||
# if p.connectedNodes.len >= p.minPeers:
|
# if p.connectedNodes.len >= p.minPeers:
|
||||||
# return
|
# return
|
||||||
|
|
||||||
|
proc startConnectionWorkerPool(p: PeerPool, workerCount: int) =
|
||||||
|
for i in 0 ..< workerCount:
|
||||||
|
asyncSpawn createConnectionWorker(p, i)
|
||||||
|
|
||||||
proc maybeConnectToMorePeers(p: PeerPool) {.async.} =
|
proc maybeConnectToMorePeers(p: PeerPool) {.async.} =
|
||||||
## Connect to more peers if we're not yet connected to at least self.minPeers.
|
## Connect to more peers if we're not yet connected to at least self.minPeers.
|
||||||
if p.connectedNodes.len >= p.minPeers:
|
if p.connectedNodes.len >= p.minPeers:
|
||||||
|
@ -162,8 +174,24 @@ proc maybeConnectToMorePeers(p: PeerPool) {.async.} =
|
||||||
if debugEnode.len != 0:
|
if debugEnode.len != 0:
|
||||||
await p.connectToNode(newNode(debugEnode))
|
await p.connectToNode(newNode(debugEnode))
|
||||||
else:
|
else:
|
||||||
await p.connectToNodes(p.nodesToConnect())
|
for n in p.nodesToConnect():
|
||||||
|
await p.connQueue.addLast(n)
|
||||||
|
|
||||||
|
# The old version of the code (which did all the connection
|
||||||
|
# attempts in serial, not parallel) actually *awaited* all
|
||||||
|
# the connection attempts before reaching the code at the
|
||||||
|
# end of this proc that tries a random bootnode. Should
|
||||||
|
# that still be what happens? I don't think so; one of the
|
||||||
|
# reasons we're doing the connection attempts concurrently
|
||||||
|
# is because sometimes the attempt takes a long time. Still,
|
||||||
|
# it seems like we should give the many connection attempts
|
||||||
|
# a *chance* to complete before moving on to trying a random
|
||||||
|
# bootnode. So let's try just waiting a few seconds. (I am
|
||||||
|
# really not sure this makes sense.)
|
||||||
|
#
|
||||||
|
# --Adam, Dec. 2022
|
||||||
|
await sleepAsync(sleepBeforeTryingARandomBootnode)
|
||||||
|
|
||||||
# In some cases (e.g ROPSTEN or private testnets), the discovery table might
|
# In some cases (e.g ROPSTEN or private testnets), the discovery table might
|
||||||
# be full of bad peers, so if we can't connect to any peers we try a random
|
# be full of bad peers, so if we can't connect to any peers we try a random
|
||||||
# bootstrap node as well.
|
# bootstrap node as well.
|
||||||
|
@ -173,6 +201,7 @@ proc maybeConnectToMorePeers(p: PeerPool) {.async.} =
|
||||||
proc run(p: PeerPool) {.async.} =
|
proc run(p: PeerPool) {.async.} =
|
||||||
trace "Running PeerPool..."
|
trace "Running PeerPool..."
|
||||||
p.running = true
|
p.running = true
|
||||||
|
p.startConnectionWorkerPool(maxConcurrentConnectionRequests)
|
||||||
while p.running:
|
while p.running:
|
||||||
|
|
||||||
debug "Amount of peers", amount = p.connectedNodes.len()
|
debug "Amount of peers", amount = p.connectedNodes.len()
|
||||||
|
|
|
@ -62,6 +62,7 @@ type
|
||||||
clientId*: string
|
clientId*: string
|
||||||
discovery*: DiscoveryProtocol
|
discovery*: DiscoveryProtocol
|
||||||
lastLookupTime*: float
|
lastLookupTime*: float
|
||||||
|
connQueue*: AsyncQueue[Node]
|
||||||
connectedNodes*: Table[Node, Peer]
|
connectedNodes*: Table[Node, Peer]
|
||||||
connectingNodes*: HashSet[Node]
|
connectingNodes*: HashSet[Node]
|
||||||
running*: bool
|
running*: bool
|
||||||
|
|
Loading…
Reference in New Issue