diff --git a/eth/p2p/peer_pool.nim b/eth/p2p/peer_pool.nim index 6158191..98a587b 100644 --- a/eth/p2p/peer_pool.nim +++ b/eth/p2p/peer_pool.nim @@ -23,6 +23,8 @@ logScope: const lookupInterval = 5 connectLoopSleep = chronos.milliseconds(2000) + maxConcurrentConnectionRequests = 40 + sleepBeforeTryingARandomBootnode = chronos.milliseconds(3000) proc newPeerPool*( network: EthereumNode, networkId: NetworkId, keyPair: KeyPair, @@ -33,6 +35,7 @@ proc newPeerPool*( result.minPeers = minPeers result.networkId = networkId result.discovery = discovery + result.connQueue = newAsyncQueue[Node](maxConcurrentConnectionRequests) result.connectedNodes = initTable[Node, Peer]() result.connectingNodes = initHashSet[Node]() result.observers = initTable[int, PeerObserver]() @@ -130,9 +133,14 @@ proc connectToNode*(p: PeerPool, n: Node) {.async.} = proc connectToNode*(p: PeerPool, n: ENode) {.async.} = await p.connectToNode(newNode(n)) -proc connectToNodes(p: PeerPool, nodes: seq[Node]) {.async.} = - for node in nodes: - await p.connectToNode(node) + +# This code is loosely based on code from nimbus-eth2; +# see eth2_network.nim and search for connQueue. +proc createConnectionWorker(p: PeerPool, workerId: int): Future[void] {.async.} = + trace "Connection worker started", workerId = workerId + while true: + let n = await p.connQueue.popFirst() + await connectToNode(p, n) # # TODO: Consider changing connect() to raise an exception instead of # # returning None, as discussed in @@ -149,6 +157,10 @@ proc connectToNodes(p: PeerPool, nodes: seq[Node]) {.async.} = # if p.connectedNodes.len >= p.minPeers: # return +proc startConnectionWorkerPool(p: PeerPool, workerCount: int) = + for i in 0 ..< workerCount: + asyncSpawn createConnectionWorker(p, i) + proc maybeConnectToMorePeers(p: PeerPool) {.async.} = ## Connect to more peers if we're not yet connected to at least self.minPeers. if p.connectedNodes.len >= p.minPeers: @@ -162,8 +174,24 @@ proc maybeConnectToMorePeers(p: PeerPool) {.async.} = if debugEnode.len != 0: await p.connectToNode(newNode(debugEnode)) else: - await p.connectToNodes(p.nodesToConnect()) - + for n in p.nodesToConnect(): + await p.connQueue.addLast(n) + + # The old version of the code (which did all the connection + # attempts in serial, not parallel) actually *awaited* all + # the connection attempts before reaching the code at the + # end of this proc that tries a random bootnode. Should + # that still be what happens? I don't think so; one of the + # reasons we're doing the connection attempts concurrently + # is because sometimes the attempt takes a long time. Still, + # it seems like we should give the many connection attempts + # a *chance* to complete before moving on to trying a random + # bootnode. So let's try just waiting a few seconds. (I am + # really not sure this makes sense.) + # + # --Adam, Dec. 2022 + await sleepAsync(sleepBeforeTryingARandomBootnode) + # In some cases (e.g ROPSTEN or private testnets), the discovery table might # be full of bad peers, so if we can't connect to any peers we try a random # bootstrap node as well. @@ -173,6 +201,7 @@ proc maybeConnectToMorePeers(p: PeerPool) {.async.} = proc run(p: PeerPool) {.async.} = trace "Running PeerPool..." p.running = true + p.startConnectionWorkerPool(maxConcurrentConnectionRequests) while p.running: debug "Amount of peers", amount = p.connectedNodes.len() diff --git a/eth/p2p/private/p2p_types.nim b/eth/p2p/private/p2p_types.nim index 8dde77c..91d1ead 100644 --- a/eth/p2p/private/p2p_types.nim +++ b/eth/p2p/private/p2p_types.nim @@ -62,6 +62,7 @@ type clientId*: string discovery*: DiscoveryProtocol lastLookupTime*: float + connQueue*: AsyncQueue[Node] connectedNodes*: Table[Node, Peer] connectingNodes*: HashSet[Node] running*: bool