diff --git a/waku/factory/builder.nim b/waku/factory/builder.nim index caa84db63..78b07ed9b 100644 --- a/waku/factory/builder.nim +++ b/waku/factory/builder.nim @@ -196,6 +196,8 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] = except CatchableError: return err("failed to create switch: " & getCurrentExceptionMsg()) + let netConfig = builder.netConfig.get() + let peerManager = PeerManager.new( switch = switch, storage = builder.peerStorage.get(nil), @@ -203,12 +205,13 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] = maxServicePeers = some(builder.maxServicePeers), colocationLimit = builder.colocationLimit, shardedPeerManagement = builder.shardAware, + dnsNameServers = netConfig.dnsNameServers, ) var node: WakuNode try: node = WakuNode.new( - netConfig = builder.netConfig.get(), + netConfig = netConfig, enr = builder.record.get(), switch = switch, peerManager = peerManager, diff --git a/waku/factory/internal_config.nim b/waku/factory/internal_config.nim index 08f11f1c5..b5275d00b 100644 --- a/waku/factory/internal_config.nim +++ b/waku/factory/internal_config.nim @@ -155,6 +155,7 @@ proc networkConfiguration*(conf: WakuNodeConf, clientId: string): NetConfigResul dns4DomainName = dns4DomainName, discv5UdpPort = discv5UdpPort, wakuFlags = some(wakuFlags), + dnsNameServers = conf.dnsAddrsNameServers, ) return netConfigRes diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 9760d1580..c40db3b54 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -59,6 +59,7 @@ type Waku* = ref object wakuDiscv5*: WakuDiscoveryV5 dynamicBootstrapNodes: seq[RemotePeerInfo] dnsRetryLoopHandle: Future[void] + networkConnLoopHandle: Future[void] discoveryMngr: DiscoveryManager node*: WakuNode @@ -363,6 +364,15 @@ proc startDnsDiscoveryRetryLoop(waku: ptr Waku): Future[void] {.async.} = error "failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg() return +# The network connectivity loop checks periodically whether the node is online or not +# and triggers any change that depends on the network connectivity state +proc startNetworkConnectivityLoop(waku: Waku): Future[void] {.async.} = + while true: + await sleepAsync(15.seconds) + + # Update online state + await waku.node.peerManager.updateOnlineState() + proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} = debug "Retrieve dynamic bootstrap nodes" @@ -400,6 +410,9 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} = if not waku[].deliveryMonitor.isNil(): waku[].deliveryMonitor.startDeliveryMonitor() + # Start network connectivity check loop + waku[].networkConnLoopHandle = waku[].startNetworkConnectivityLoop() + return ok() # Waku shutdown @@ -411,6 +424,9 @@ proc stop*(waku: Waku): Future[void] {.async: (raises: [Exception]).} = if not waku.metricsServer.isNil(): await waku.metricsServer.stop() + if not waku.networkConnLoopHandle.isNil(): + await waku.networkConnLoopHandle.cancelAndWait() + if not waku.wakuDiscv5.isNil(): await waku.wakuDiscv5.stop() diff --git a/waku/node/config.nim b/waku/node/config.nim index 311e26771..51aadb48d 100644 --- a/waku/node/config.nim +++ b/waku/node/config.nim @@ -15,6 +15,7 @@ type NetConfig* = object extIp*: Option[IpAddress] extPort*: Option[Port] dns4DomainName*: Option[string] + dnsNameServers*: seq[IpAddress] announcedAddresses*: seq[MultiAddress] extMultiAddrs*: seq[MultiAddress] enrMultiAddrs*: seq[MultiAddress] @@ -75,6 +76,7 @@ proc init*( discv5UdpPort = none(Port), clusterId: uint16 = 0, wakuFlags = none(CapabilitiesBitfield), + dnsNameServers = newSeq[IpAddress](), ): NetConfigResult = ## Initialize and validate waku node network configuration @@ -165,6 +167,7 @@ proc init*( extPort: extPort, wssEnabled: wssEnabled, dns4DomainName: dns4DomainName, + dnsNameServers: dnsNameServers, announcedAddresses: announcedAddresses, extMultiAddrs: extMultiAddrs, enrMultiaddrs: enrMultiaddrs, diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 602718d5d..75c72449a 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -8,6 +8,7 @@ import libp2p/multistream, libp2p/muxers/muxer, libp2p/nameresolving/nameresolver, + libp2p/nameresolving/dnsresolver, libp2p/peerstore import @@ -73,6 +74,8 @@ const # Max peers that we allow from the same IP DefaultColocationLimit* = 5 + DNSCheckDomain = "one.one.one.one" + type ConnectionChangeHandler* = proc( peerId: PeerId, peerEvent: PeerEventKind ): Future[void] {.gcsafe, raises: [Defect].} @@ -95,11 +98,16 @@ type PeerManager* = ref object of RootObj started: bool shardedPeerManagement: bool # temp feature flag onConnectionChange*: ConnectionChangeHandler + dnsNameServers*: seq[IpAddress] + online: bool #~~~~~~~~~~~~~~~~~~~# # Helper Functions # #~~~~~~~~~~~~~~~~~~~# +template isOnline*(self: PeerManager): bool = + self.online + proc calculateBackoff( initialBackoffInSec: int, backoffFactor: int, failedAttempts: int ): timer.Duration = @@ -535,7 +543,38 @@ proc getStreamByPeerIdAndProtocol*( return ok(streamRes.get()) +proc checkInternetConnectivity( + nameServerIps: seq[IpAddress], timeout = 2.seconds +): Future[bool] {.async.} = + var nameServers: seq[TransportAddress] + for ip in nameServerIps: + nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53 + + let dnsResolver = DnsResolver.new(nameServers) + + # Resolve domain IP + let resolved = await dnsResolver.resolveIp(DNSCheckDomain, 0.Port, Domain.AF_UNSPEC) + + if resolved.len > 0: + return true + else: + return false + +proc updateOnlineState*(pm: PeerManager) {.async.} = + let numConnectedPeers = + pm.switch.peerStore.peers().countIt(it.connectedness == Connected) + + if numConnectedPeers > 0: + pm.online = true + else: + pm.online = await checkInternetConnectivity(pm.dnsNameServers) + proc connectToRelayPeers*(pm: PeerManager) {.async.} = + # only attempt if current node is online + if not pm.isOnline(): + error "connectToRelayPeers: won't attempt new connections - node is offline" + return + var (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec) let totalRelayPeers = inRelayPeers.len + outRelayPeers.len @@ -778,6 +817,10 @@ proc manageRelayPeers*(pm: PeerManager) {.async.} = if pm.wakuMetadata.shards.len == 0: return + if not pm.isOnline(): + error "manageRelayPeers: won't attempt new connections - node is offline" + return + var peersToConnect: HashSet[PeerId] # Can't use RemotePeerInfo as they are ref objects var peersToDisconnect: int @@ -1005,6 +1048,7 @@ proc new*( maxFailedAttempts = MaxFailedAttempts, colocationLimit = DefaultColocationLimit, shardedPeerManagement = false, + dnsNameServers = newSeq[IpAddress](), ): PeerManager {.gcsafe.} = let capacity = switch.peerStore.capacity let maxConnections = switch.connManager.inSema.size @@ -1055,6 +1099,8 @@ proc new*( maxFailedAttempts: maxFailedAttempts, colocationLimit: colocationLimit, shardedPeerManagement: shardedPeerManagement, + dnsNameServers: dnsNameServers, + online: true, ) proc peerHook(peerId: PeerId, event: PeerEvent): Future[void] {.gcsafe.} =