mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-02 14:03:06 +00:00
chore: improve disconnection handling (#3385)
This commit is contained in:
parent
98c3979119
commit
7c7ed5634f
@ -196,6 +196,8 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
|
||||
except CatchableError:
|
||||
return err("failed to create switch: " & getCurrentExceptionMsg())
|
||||
|
||||
let netConfig = builder.netConfig.get()
|
||||
|
||||
let peerManager = PeerManager.new(
|
||||
switch = switch,
|
||||
storage = builder.peerStorage.get(nil),
|
||||
@ -203,12 +205,13 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
|
||||
maxServicePeers = some(builder.maxServicePeers),
|
||||
colocationLimit = builder.colocationLimit,
|
||||
shardedPeerManagement = builder.shardAware,
|
||||
dnsNameServers = netConfig.dnsNameServers,
|
||||
)
|
||||
|
||||
var node: WakuNode
|
||||
try:
|
||||
node = WakuNode.new(
|
||||
netConfig = builder.netConfig.get(),
|
||||
netConfig = netConfig,
|
||||
enr = builder.record.get(),
|
||||
switch = switch,
|
||||
peerManager = peerManager,
|
||||
|
||||
@ -155,6 +155,7 @@ proc networkConfiguration*(conf: WakuNodeConf, clientId: string): NetConfigResul
|
||||
dns4DomainName = dns4DomainName,
|
||||
discv5UdpPort = discv5UdpPort,
|
||||
wakuFlags = some(wakuFlags),
|
||||
dnsNameServers = conf.dnsAddrsNameServers,
|
||||
)
|
||||
|
||||
return netConfigRes
|
||||
|
||||
@ -59,6 +59,7 @@ type Waku* = ref object
|
||||
wakuDiscv5*: WakuDiscoveryV5
|
||||
dynamicBootstrapNodes: seq[RemotePeerInfo]
|
||||
dnsRetryLoopHandle: Future[void]
|
||||
networkConnLoopHandle: Future[void]
|
||||
discoveryMngr: DiscoveryManager
|
||||
|
||||
node*: WakuNode
|
||||
@ -363,6 +364,15 @@ proc startDnsDiscoveryRetryLoop(waku: ptr Waku): Future[void] {.async.} =
|
||||
error "failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg()
|
||||
return
|
||||
|
||||
# The network connectivity loop checks periodically whether the node is online or not
|
||||
# and triggers any change that depends on the network connectivity state
|
||||
proc startNetworkConnectivityLoop(waku: Waku): Future[void] {.async.} =
|
||||
while true:
|
||||
await sleepAsync(15.seconds)
|
||||
|
||||
# Update online state
|
||||
await waku.node.peerManager.updateOnlineState()
|
||||
|
||||
proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} =
|
||||
debug "Retrieve dynamic bootstrap nodes"
|
||||
|
||||
@ -400,6 +410,9 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} =
|
||||
if not waku[].deliveryMonitor.isNil():
|
||||
waku[].deliveryMonitor.startDeliveryMonitor()
|
||||
|
||||
# Start network connectivity check loop
|
||||
waku[].networkConnLoopHandle = waku[].startNetworkConnectivityLoop()
|
||||
|
||||
return ok()
|
||||
|
||||
# Waku shutdown
|
||||
@ -411,6 +424,9 @@ proc stop*(waku: Waku): Future[void] {.async: (raises: [Exception]).} =
|
||||
if not waku.metricsServer.isNil():
|
||||
await waku.metricsServer.stop()
|
||||
|
||||
if not waku.networkConnLoopHandle.isNil():
|
||||
await waku.networkConnLoopHandle.cancelAndWait()
|
||||
|
||||
if not waku.wakuDiscv5.isNil():
|
||||
await waku.wakuDiscv5.stop()
|
||||
|
||||
|
||||
@ -15,6 +15,7 @@ type NetConfig* = object
|
||||
extIp*: Option[IpAddress]
|
||||
extPort*: Option[Port]
|
||||
dns4DomainName*: Option[string]
|
||||
dnsNameServers*: seq[IpAddress]
|
||||
announcedAddresses*: seq[MultiAddress]
|
||||
extMultiAddrs*: seq[MultiAddress]
|
||||
enrMultiAddrs*: seq[MultiAddress]
|
||||
@ -75,6 +76,7 @@ proc init*(
|
||||
discv5UdpPort = none(Port),
|
||||
clusterId: uint16 = 0,
|
||||
wakuFlags = none(CapabilitiesBitfield),
|
||||
dnsNameServers = newSeq[IpAddress](),
|
||||
): NetConfigResult =
|
||||
## Initialize and validate waku node network configuration
|
||||
|
||||
@ -165,6 +167,7 @@ proc init*(
|
||||
extPort: extPort,
|
||||
wssEnabled: wssEnabled,
|
||||
dns4DomainName: dns4DomainName,
|
||||
dnsNameServers: dnsNameServers,
|
||||
announcedAddresses: announcedAddresses,
|
||||
extMultiAddrs: extMultiAddrs,
|
||||
enrMultiaddrs: enrMultiaddrs,
|
||||
|
||||
@ -8,6 +8,7 @@ import
|
||||
libp2p/multistream,
|
||||
libp2p/muxers/muxer,
|
||||
libp2p/nameresolving/nameresolver,
|
||||
libp2p/nameresolving/dnsresolver,
|
||||
libp2p/peerstore
|
||||
|
||||
import
|
||||
@ -73,6 +74,8 @@ const
|
||||
# Max peers that we allow from the same IP
|
||||
DefaultColocationLimit* = 5
|
||||
|
||||
DNSCheckDomain = "one.one.one.one"
|
||||
|
||||
type ConnectionChangeHandler* = proc(
|
||||
peerId: PeerId, peerEvent: PeerEventKind
|
||||
): Future[void] {.gcsafe, raises: [Defect].}
|
||||
@ -95,11 +98,16 @@ type PeerManager* = ref object of RootObj
|
||||
started: bool
|
||||
shardedPeerManagement: bool # temp feature flag
|
||||
onConnectionChange*: ConnectionChangeHandler
|
||||
dnsNameServers*: seq[IpAddress]
|
||||
online: bool
|
||||
|
||||
#~~~~~~~~~~~~~~~~~~~#
|
||||
# Helper Functions #
|
||||
#~~~~~~~~~~~~~~~~~~~#
|
||||
|
||||
template isOnline*(self: PeerManager): bool =
|
||||
self.online
|
||||
|
||||
proc calculateBackoff(
|
||||
initialBackoffInSec: int, backoffFactor: int, failedAttempts: int
|
||||
): timer.Duration =
|
||||
@ -535,7 +543,38 @@ proc getStreamByPeerIdAndProtocol*(
|
||||
|
||||
return ok(streamRes.get())
|
||||
|
||||
proc checkInternetConnectivity(
|
||||
nameServerIps: seq[IpAddress], timeout = 2.seconds
|
||||
): Future[bool] {.async.} =
|
||||
var nameServers: seq[TransportAddress]
|
||||
for ip in nameServerIps:
|
||||
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53
|
||||
|
||||
let dnsResolver = DnsResolver.new(nameServers)
|
||||
|
||||
# Resolve domain IP
|
||||
let resolved = await dnsResolver.resolveIp(DNSCheckDomain, 0.Port, Domain.AF_UNSPEC)
|
||||
|
||||
if resolved.len > 0:
|
||||
return true
|
||||
else:
|
||||
return false
|
||||
|
||||
proc updateOnlineState*(pm: PeerManager) {.async.} =
|
||||
let numConnectedPeers =
|
||||
pm.switch.peerStore.peers().countIt(it.connectedness == Connected)
|
||||
|
||||
if numConnectedPeers > 0:
|
||||
pm.online = true
|
||||
else:
|
||||
pm.online = await checkInternetConnectivity(pm.dnsNameServers)
|
||||
|
||||
proc connectToRelayPeers*(pm: PeerManager) {.async.} =
|
||||
# only attempt if current node is online
|
||||
if not pm.isOnline():
|
||||
error "connectToRelayPeers: won't attempt new connections - node is offline"
|
||||
return
|
||||
|
||||
var (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
|
||||
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
|
||||
|
||||
@ -778,6 +817,10 @@ proc manageRelayPeers*(pm: PeerManager) {.async.} =
|
||||
if pm.wakuMetadata.shards.len == 0:
|
||||
return
|
||||
|
||||
if not pm.isOnline():
|
||||
error "manageRelayPeers: won't attempt new connections - node is offline"
|
||||
return
|
||||
|
||||
var peersToConnect: HashSet[PeerId] # Can't use RemotePeerInfo as they are ref objects
|
||||
var peersToDisconnect: int
|
||||
|
||||
@ -1005,6 +1048,7 @@ proc new*(
|
||||
maxFailedAttempts = MaxFailedAttempts,
|
||||
colocationLimit = DefaultColocationLimit,
|
||||
shardedPeerManagement = false,
|
||||
dnsNameServers = newSeq[IpAddress](),
|
||||
): PeerManager {.gcsafe.} =
|
||||
let capacity = switch.peerStore.capacity
|
||||
let maxConnections = switch.connManager.inSema.size
|
||||
@ -1055,6 +1099,8 @@ proc new*(
|
||||
maxFailedAttempts: maxFailedAttempts,
|
||||
colocationLimit: colocationLimit,
|
||||
shardedPeerManagement: shardedPeerManagement,
|
||||
dnsNameServers: dnsNameServers,
|
||||
online: true,
|
||||
)
|
||||
|
||||
proc peerHook(peerId: PeerId, event: PeerEvent): Future[void] {.gcsafe.} =
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user