123 lines
3.4 KiB
Nim
123 lines
3.4 KiB
Nim
|
# Nimbus
|
||
|
# Copyright (c) 2022 Status Research & Development GmbH
|
||
|
# Licensed under either of
|
||
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||
|
# at your option.
|
||
|
# This file may not be copied, modified, or distributed except according to
|
||
|
# those terms.
|
||
|
|
||
|
import
|
||
|
std/[tables],
|
||
|
chronicles,
|
||
|
chronos,
|
||
|
eth/p2p,
|
||
|
eth/p2p/peer_pool,
|
||
|
../nimbus/sync/protocol
|
||
|
|
||
|
# Currently, this module only handles static peers
|
||
|
# but we can extend it to handles trusted peers as well
|
||
|
# or bootnodes
|
||
|
|
||
|
type
|
||
|
ReconnectState = ref object
|
||
|
node: Node
|
||
|
retryCount: int
|
||
|
connected: bool
|
||
|
|
||
|
PMState = enum
|
||
|
Starting, Running, Stopping, Stopped
|
||
|
|
||
|
PeerManagerRef* = ref object
|
||
|
state: PMState
|
||
|
pool: PeerPool
|
||
|
maxRetryCount: int # zero == infinite
|
||
|
retryInterval: int # in seconds
|
||
|
reconnectStates: seq[ReconnectState]
|
||
|
reconnectFut: Future[void]
|
||
|
|
||
|
logScope:
|
||
|
topics = "PeerManagerRef"
|
||
|
|
||
|
proc setConnected(pm: PeerManagerRef, peer: Peer, connected: bool) =
|
||
|
for n in pm.reconnectStates:
|
||
|
if peer.remote.id == n.node.id:
|
||
|
n.connected = connected
|
||
|
return
|
||
|
|
||
|
doAssert(false, "unreachable code")
|
||
|
|
||
|
proc needReconnect(pm: PeerManagerRef): bool =
|
||
|
for n in pm.reconnectStates:
|
||
|
if not n.connected:
|
||
|
return true
|
||
|
|
||
|
proc reconnect(pm: PeerManagerRef) {.async, gcsafe.} =
|
||
|
for n in pm.reconnectStates:
|
||
|
if not n.connected and pm.state == Running:
|
||
|
if n.retryCount < pm.maxRetryCount or pm.maxRetryCount == 0:
|
||
|
trace "Reconnecting to", remote=n.node.node
|
||
|
await pm.pool.connectToNode(n.node)
|
||
|
inc n.retryCount
|
||
|
elif n.retryCount == pm.maxRetryCount:
|
||
|
trace "Exceed max retry count, give up reconnecting", remote=n.node.node
|
||
|
inc n.retryCount
|
||
|
|
||
|
proc runReconnectLoop(pm: PeerManagerRef) {.async, gcsafe.} =
|
||
|
while pm.state == Running:
|
||
|
if pm.needReconnect:
|
||
|
await pm.reconnect
|
||
|
else:
|
||
|
pm.state = Stopping
|
||
|
break
|
||
|
await sleepAsync(pm.retryInterval.seconds)
|
||
|
|
||
|
proc setupManager(pm: PeerManagerRef, enodes: openArray[ENode]) =
|
||
|
var po: PeerObserver
|
||
|
po.onPeerConnected = proc(peer: Peer) {.gcsafe.} =
|
||
|
trace "Peer connected", remote=peer.remote.node
|
||
|
pm.setConnected(peer, true)
|
||
|
|
||
|
po.onPeerDisconnected = proc(peer: Peer) {.gcsafe.} =
|
||
|
trace "Peer disconnected", remote=peer.remote.node
|
||
|
pm.setConnected(peer, false)
|
||
|
if pm.state notin {Running, Stopped}:
|
||
|
pm.state = Running
|
||
|
pm.reconnectFut = pm.runReconnectLoop()
|
||
|
|
||
|
po.setProtocol eth
|
||
|
pm.pool.addObserver(pm, po)
|
||
|
|
||
|
for enode in enodes:
|
||
|
let state = ReconnectState(
|
||
|
node: newNode(enode),
|
||
|
retryCount: 0,
|
||
|
connected: false
|
||
|
)
|
||
|
pm.reconnectStates.add(state)
|
||
|
|
||
|
proc new*(_: type PeerManagerRef,
|
||
|
pool: PeerPool,
|
||
|
retryInterval: int,
|
||
|
maxRetryCount: int,
|
||
|
enodes: openArray[ENode]): PeerManagerRef =
|
||
|
result = PeerManagerRef(
|
||
|
pool: pool,
|
||
|
state: Starting,
|
||
|
maxRetryCount: max(0, maxRetryCount),
|
||
|
retryInterval: max(5, retryInterval)
|
||
|
)
|
||
|
result.setupManager(enodes)
|
||
|
|
||
|
proc start*(pm: PeerManagerRef) =
|
||
|
if pm.state notin {Stopped, Running} and pm.needReconnect:
|
||
|
pm.state = Running
|
||
|
pm.reconnectFut = pm.runReconnectLoop()
|
||
|
info "Reconnecting to static peers"
|
||
|
|
||
|
proc stop*(pm: PeerManagerRef) {.async.} =
|
||
|
if pm.state notin {Stopped, Stopping}:
|
||
|
pm.state = Stopped
|
||
|
await pm.reconnectFut.cancelAndWait()
|
||
|
info "Peer manager stopped"
|