From c6f35142a8e738d880572968899111350e7cd599 Mon Sep 17 00:00:00 2001 From: jangko Date: Fri, 26 Aug 2022 16:36:04 +0700 Subject: [PATCH] simple peer manager to handle static peers reconnection fix #618 --- nimbus/config.nim | 11 ++++ nimbus/nimbus.nim | 19 +++++-- nimbus/peers.nim | 122 ++++++++++++++++++++++++++++++++++++++++++ nimbus/rpc/common.nim | 11 ++-- vendor/nim-eth | 2 +- 5 files changed, 155 insertions(+), 10 deletions(-) create mode 100644 nimbus/peers.nim diff --git a/nimbus/config.nim b/nimbus/config.nim index be5da6fea..4adf92370 100644 --- a/nimbus/config.nim +++ b/nimbus/config.nim @@ -277,6 +277,17 @@ type defaultValue: "" name: "static-peers-file" }: InputFile + reconnectMaxRetry* {. + desc: "Specifies max number of retries if static peers disconnected/not connected. " & + "0 = infinite." + defaultValue: 0 + name: "reconnect-max-retry" }: int + + reconnectInterval* {. + desc: "Interval in seconds before next attempt to reconnect to static peers. Min 5 seconds." + defaultValue: 15 + name: "reconnect-interval" }: int + listenAddress* {. desc: "Listening IP address for Ethereum P2P and Discovery traffic" defaultValue: defaultListenAddress diff --git a/nimbus/nimbus.nim b/nimbus/nimbus.nim index 47da1dd27..bf4a6fe2d 100644 --- a/nimbus/nimbus.nim +++ b/nimbus/nimbus.nim @@ -17,13 +17,13 @@ import eth/[keys, net/nat, trie/db], eth/common as eth_common, eth/p2p as eth_p2p, - eth/p2p/[peer_pool, rlpx_protocols/les_protocol], + eth/p2p/[rlpx_protocols/les_protocol], json_rpc/rpcserver, metrics, metrics/[chronos_httpserver, chronicles_support], stew/shims/net as stewNet, websock/websock as ws, - "."/[conf_utils, config, constants, context, genesis, sealer, utils, version], + "."/[conf_utils, config, constants, context, genesis, sealer, utils, version, peers], ./db/[storage_types, db_chain, select_backend], ./graphql/ethapi, ./p2p/[chain, clique/clique_desc, clique/clique_sealer], @@ -57,6 +57,7 @@ type txPool: TxPoolRef networkLoop: Future[void] dbBackend: ChainDB + peerManager: PeerManagerRef proc importBlocks(conf: NimbusConf, chainDB: BaseChainDB) = if string(conf.blocksFile).len > 0: @@ -156,8 +157,14 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf, # Connect directly to the static nodes let staticPeers = conf.getStaticPeers() - for enode in staticPeers: - asyncSpawn nimbus.ethNode.peerPool.connectToNode(newNode(enode)) + if staticPeers.len > 0: + nimbus.peerManager = PeerManagerRef.new( + nimbus.ethNode.peerPool, + conf.reconnectInterval, + conf.reconnectMaxRetry, + staticPeers + ) + nimbus.peerManager.start() # Start Eth node if conf.maxPeers > 0: @@ -412,6 +419,10 @@ proc stop*(nimbus: NimbusNode, conf: NimbusConf) {.async, gcsafe.} = await nimbus.graphqlServer.stop() if conf.engineSigner != ZERO_ADDRESS: await nimbus.sealingEngine.stop() + if conf.maxPeers > 0: + await nimbus.networkLoop.cancelAndWait() + if nimbus.peerManager.isNil.not: + await nimbus.peerManager.stop() proc process*(nimbus: NimbusNode, conf: NimbusConf) = # Main event loop diff --git a/nimbus/peers.nim b/nimbus/peers.nim new file mode 100644 index 000000000..052b3f403 --- /dev/null +++ b/nimbus/peers.nim @@ -0,0 +1,122 @@ +# Nimbus +# Copyright (c) 2022 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +import + std/[tables], + chronicles, + chronos, + eth/p2p, + eth/p2p/peer_pool, + ../nimbus/sync/protocol + +# Currently, this module only handles static peers +# but we can extend it to handles trusted peers as well +# or bootnodes + +type + ReconnectState = ref object + node: Node + retryCount: int + connected: bool + + PMState = enum + Starting, Running, Stopping, Stopped + + PeerManagerRef* = ref object + state: PMState + pool: PeerPool + maxRetryCount: int # zero == infinite + retryInterval: int # in seconds + reconnectStates: seq[ReconnectState] + reconnectFut: Future[void] + +logScope: + topics = "PeerManagerRef" + +proc setConnected(pm: PeerManagerRef, peer: Peer, connected: bool) = + for n in pm.reconnectStates: + if peer.remote.id == n.node.id: + n.connected = connected + return + + doAssert(false, "unreachable code") + +proc needReconnect(pm: PeerManagerRef): bool = + for n in pm.reconnectStates: + if not n.connected: + return true + +proc reconnect(pm: PeerManagerRef) {.async, gcsafe.} = + for n in pm.reconnectStates: + if not n.connected and pm.state == Running: + if n.retryCount < pm.maxRetryCount or pm.maxRetryCount == 0: + trace "Reconnecting to", remote=n.node.node + await pm.pool.connectToNode(n.node) + inc n.retryCount + elif n.retryCount == pm.maxRetryCount: + trace "Exceed max retry count, give up reconnecting", remote=n.node.node + inc n.retryCount + +proc runReconnectLoop(pm: PeerManagerRef) {.async, gcsafe.} = + while pm.state == Running: + if pm.needReconnect: + await pm.reconnect + else: + pm.state = Stopping + break + await sleepAsync(pm.retryInterval.seconds) + +proc setupManager(pm: PeerManagerRef, enodes: openArray[ENode]) = + var po: PeerObserver + po.onPeerConnected = proc(peer: Peer) {.gcsafe.} = + trace "Peer connected", remote=peer.remote.node + pm.setConnected(peer, true) + + po.onPeerDisconnected = proc(peer: Peer) {.gcsafe.} = + trace "Peer disconnected", remote=peer.remote.node + pm.setConnected(peer, false) + if pm.state notin {Running, Stopped}: + pm.state = Running + pm.reconnectFut = pm.runReconnectLoop() + + po.setProtocol eth + pm.pool.addObserver(pm, po) + + for enode in enodes: + let state = ReconnectState( + node: newNode(enode), + retryCount: 0, + connected: false + ) + pm.reconnectStates.add(state) + +proc new*(_: type PeerManagerRef, + pool: PeerPool, + retryInterval: int, + maxRetryCount: int, + enodes: openArray[ENode]): PeerManagerRef = + result = PeerManagerRef( + pool: pool, + state: Starting, + maxRetryCount: max(0, maxRetryCount), + retryInterval: max(5, retryInterval) + ) + result.setupManager(enodes) + +proc start*(pm: PeerManagerRef) = + if pm.state notin {Stopped, Running} and pm.needReconnect: + pm.state = Running + pm.reconnectFut = pm.runReconnectLoop() + info "Reconnecting to static peers" + +proc stop*(pm: PeerManagerRef) {.async.} = + if pm.state notin {Stopped, Stopping}: + pm.state = Stopped + await pm.reconnectFut.cancelAndWait() + info "Peer manager stopped" diff --git a/nimbus/rpc/common.nim b/nimbus/rpc/common.nim index ca1ebf8bf..6b43c0669 100644 --- a/nimbus/rpc/common.nim +++ b/nimbus/rpc/common.nim @@ -9,8 +9,9 @@ import std/[strutils, tables], - nimcrypto, eth/common as eth_common, stint, json_rpc/server, json_rpc/errors, - eth/p2p, eth/p2p/enode, eth/p2p/peer_pool, + nimcrypto, eth/common as eth_common, + stint, json_rpc/server, json_rpc/errors, + eth/p2p, eth/p2p/enode, ../config, ./hexstrings type @@ -37,11 +38,11 @@ proc setupCommonRpc*(node: EthereumNode, conf: NimbusConf, server: RpcServer) = result = $conf.networkId server.rpc("net_listening") do() -> bool: - let numPeers = node.peerPool.connectedNodes.len + let numPeers = node.numPeers result = numPeers < conf.maxPeers server.rpc("net_peerCount") do() -> HexQuantityStr: - let peerCount = uint node.peerPool.connectedNodes.len + let peerCount = uint node.numPeers result = encodeQuantity(peerCount) server.rpc("net_nodeInfo") do() -> NodeInfo: @@ -61,6 +62,6 @@ proc setupCommonRpc*(node: EthereumNode, conf: NimbusConf, server: RpcServer) = server.rpc("nimbus_addPeer") do(enode: string) -> bool: var res = ENode.fromString(enode) if res.isOk: - asyncSpawn node.peerPool.connectToNode(newNode(res.get())) + asyncSpawn node.connectToNode(res.get()) return true raise (ref InvalidRequest)(code: -32602, msg: "Invalid ENode") diff --git a/vendor/nim-eth b/vendor/nim-eth index 9f1d5ef1a..4f0155e62 160000 --- a/vendor/nim-eth +++ b/vendor/nim-eth @@ -1 +1 @@ -Subproject commit 9f1d5ef1a06d5ba3294d2e751dea8297b76b4a14 +Subproject commit 4f0155e626a9f7b0c8a71d66328618eb6a780453