simple peer manager to handle static peers reconnection

fix #618
This commit is contained in:
jangko 2022-08-26 16:36:04 +07:00
parent caa5e009ff
commit c6f35142a8
No known key found for this signature in database
GPG Key ID: 31702AE10541E6B9
5 changed files with 155 additions and 10 deletions

View File

@ -277,6 +277,17 @@ type
defaultValue: ""
name: "static-peers-file" }: InputFile
reconnectMaxRetry* {.
desc: "Specifies max number of retries if static peers disconnected/not connected. " &
"0 = infinite."
defaultValue: 0
name: "reconnect-max-retry" }: int
reconnectInterval* {.
desc: "Interval in seconds before next attempt to reconnect to static peers. Min 5 seconds."
defaultValue: 15
name: "reconnect-interval" }: int
listenAddress* {.
desc: "Listening IP address for Ethereum P2P and Discovery traffic"
defaultValue: defaultListenAddress

View File

@ -17,13 +17,13 @@ import
eth/[keys, net/nat, trie/db],
eth/common as eth_common,
eth/p2p as eth_p2p,
eth/p2p/[peer_pool, rlpx_protocols/les_protocol],
eth/p2p/[rlpx_protocols/les_protocol],
json_rpc/rpcserver,
metrics,
metrics/[chronos_httpserver, chronicles_support],
stew/shims/net as stewNet,
websock/websock as ws,
"."/[conf_utils, config, constants, context, genesis, sealer, utils, version],
"."/[conf_utils, config, constants, context, genesis, sealer, utils, version, peers],
./db/[storage_types, db_chain, select_backend],
./graphql/ethapi,
./p2p/[chain, clique/clique_desc, clique/clique_sealer],
@ -57,6 +57,7 @@ type
txPool: TxPoolRef
networkLoop: Future[void]
dbBackend: ChainDB
peerManager: PeerManagerRef
proc importBlocks(conf: NimbusConf, chainDB: BaseChainDB) =
if string(conf.blocksFile).len > 0:
@ -156,8 +157,14 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
# Connect directly to the static nodes
let staticPeers = conf.getStaticPeers()
for enode in staticPeers:
asyncSpawn nimbus.ethNode.peerPool.connectToNode(newNode(enode))
if staticPeers.len > 0:
nimbus.peerManager = PeerManagerRef.new(
nimbus.ethNode.peerPool,
conf.reconnectInterval,
conf.reconnectMaxRetry,
staticPeers
)
nimbus.peerManager.start()
# Start Eth node
if conf.maxPeers > 0:
@ -412,6 +419,10 @@ proc stop*(nimbus: NimbusNode, conf: NimbusConf) {.async, gcsafe.} =
await nimbus.graphqlServer.stop()
if conf.engineSigner != ZERO_ADDRESS:
await nimbus.sealingEngine.stop()
if conf.maxPeers > 0:
await nimbus.networkLoop.cancelAndWait()
if nimbus.peerManager.isNil.not:
await nimbus.peerManager.stop()
proc process*(nimbus: NimbusNode, conf: NimbusConf) =
# Main event loop

122
nimbus/peers.nim Normal file
View File

@ -0,0 +1,122 @@
# Nimbus
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import
std/[tables],
chronicles,
chronos,
eth/p2p,
eth/p2p/peer_pool,
../nimbus/sync/protocol
# Currently, this module only handles static peers
# but we can extend it to handles trusted peers as well
# or bootnodes
type
ReconnectState = ref object
node: Node
retryCount: int
connected: bool
PMState = enum
Starting, Running, Stopping, Stopped
PeerManagerRef* = ref object
state: PMState
pool: PeerPool
maxRetryCount: int # zero == infinite
retryInterval: int # in seconds
reconnectStates: seq[ReconnectState]
reconnectFut: Future[void]
logScope:
topics = "PeerManagerRef"
proc setConnected(pm: PeerManagerRef, peer: Peer, connected: bool) =
for n in pm.reconnectStates:
if peer.remote.id == n.node.id:
n.connected = connected
return
doAssert(false, "unreachable code")
proc needReconnect(pm: PeerManagerRef): bool =
for n in pm.reconnectStates:
if not n.connected:
return true
proc reconnect(pm: PeerManagerRef) {.async, gcsafe.} =
for n in pm.reconnectStates:
if not n.connected and pm.state == Running:
if n.retryCount < pm.maxRetryCount or pm.maxRetryCount == 0:
trace "Reconnecting to", remote=n.node.node
await pm.pool.connectToNode(n.node)
inc n.retryCount
elif n.retryCount == pm.maxRetryCount:
trace "Exceed max retry count, give up reconnecting", remote=n.node.node
inc n.retryCount
proc runReconnectLoop(pm: PeerManagerRef) {.async, gcsafe.} =
while pm.state == Running:
if pm.needReconnect:
await pm.reconnect
else:
pm.state = Stopping
break
await sleepAsync(pm.retryInterval.seconds)
proc setupManager(pm: PeerManagerRef, enodes: openArray[ENode]) =
var po: PeerObserver
po.onPeerConnected = proc(peer: Peer) {.gcsafe.} =
trace "Peer connected", remote=peer.remote.node
pm.setConnected(peer, true)
po.onPeerDisconnected = proc(peer: Peer) {.gcsafe.} =
trace "Peer disconnected", remote=peer.remote.node
pm.setConnected(peer, false)
if pm.state notin {Running, Stopped}:
pm.state = Running
pm.reconnectFut = pm.runReconnectLoop()
po.setProtocol eth
pm.pool.addObserver(pm, po)
for enode in enodes:
let state = ReconnectState(
node: newNode(enode),
retryCount: 0,
connected: false
)
pm.reconnectStates.add(state)
proc new*(_: type PeerManagerRef,
pool: PeerPool,
retryInterval: int,
maxRetryCount: int,
enodes: openArray[ENode]): PeerManagerRef =
result = PeerManagerRef(
pool: pool,
state: Starting,
maxRetryCount: max(0, maxRetryCount),
retryInterval: max(5, retryInterval)
)
result.setupManager(enodes)
proc start*(pm: PeerManagerRef) =
if pm.state notin {Stopped, Running} and pm.needReconnect:
pm.state = Running
pm.reconnectFut = pm.runReconnectLoop()
info "Reconnecting to static peers"
proc stop*(pm: PeerManagerRef) {.async.} =
if pm.state notin {Stopped, Stopping}:
pm.state = Stopped
await pm.reconnectFut.cancelAndWait()
info "Peer manager stopped"

View File

@ -9,8 +9,9 @@
import
std/[strutils, tables],
nimcrypto, eth/common as eth_common, stint, json_rpc/server, json_rpc/errors,
eth/p2p, eth/p2p/enode, eth/p2p/peer_pool,
nimcrypto, eth/common as eth_common,
stint, json_rpc/server, json_rpc/errors,
eth/p2p, eth/p2p/enode,
../config, ./hexstrings
type
@ -37,11 +38,11 @@ proc setupCommonRpc*(node: EthereumNode, conf: NimbusConf, server: RpcServer) =
result = $conf.networkId
server.rpc("net_listening") do() -> bool:
let numPeers = node.peerPool.connectedNodes.len
let numPeers = node.numPeers
result = numPeers < conf.maxPeers
server.rpc("net_peerCount") do() -> HexQuantityStr:
let peerCount = uint node.peerPool.connectedNodes.len
let peerCount = uint node.numPeers
result = encodeQuantity(peerCount)
server.rpc("net_nodeInfo") do() -> NodeInfo:
@ -61,6 +62,6 @@ proc setupCommonRpc*(node: EthereumNode, conf: NimbusConf, server: RpcServer) =
server.rpc("nimbus_addPeer") do(enode: string) -> bool:
var res = ENode.fromString(enode)
if res.isOk:
asyncSpawn node.peerPool.connectToNode(newNode(res.get()))
asyncSpawn node.connectToNode(res.get())
return true
raise (ref InvalidRequest)(code: -32602, msg: "Invalid ENode")

2
vendor/nim-eth vendored

@ -1 +1 @@
Subproject commit 9f1d5ef1a06d5ba3294d2e751dea8297b76b4a14
Subproject commit 4f0155e626a9f7b0c8a71d66328618eb6a780453