From 1b8537494f297cbaa32e57970a55c59e1a82cbf0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Uhl=C3=AD=C5=99?= Date: Thu, 6 Nov 2025 17:49:23 +0100 Subject: [PATCH] feat: drop renewal thread in favor of async polling --- codex/codex.nim | 12 +- codex/conf.nim | 2 +- codex/nat/port_mapping.nim | 240 +++++++++++++----------------- codex/nat/reachabilitymanager.nim | 77 ++++++---- tests/codex/helpers/nodeutils.nim | 6 - 5 files changed, 163 insertions(+), 174 deletions(-) diff --git a/codex/codex.nim b/codex/codex.nim index 9bb6882c..364306e3 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -76,10 +76,12 @@ proc start*(s: CodexServer) {.async.} = await s.codexNode.switch.start() - s.reachabilityManager.getAnnounceRecords = some proc() = - s.codexNode.switch.peerInfo.addrs - s.reachabilityManager.getDiscoveryRecords = some proc() = - s.codexNode.discovery.dhtRecord.data.addresses.mapIt(it.address) + s.reachabilityManager.getAnnounceRecords = some proc(): ?seq[MultiAddress] = + s.codexNode.switch.peerInfo.addrs.some + s.reachabilityManager.getDiscoveryRecords = some proc(): ?seq[MultiAddress] = + if dhtRecord =? s.codexNode.discovery.dhtRecord: + return dhtRecord.data.addresses.mapIt(it.address).some + s.reachabilityManager.updateAnnounceRecords = some proc(records: seq[MultiAddress]) = s.codexNode.discovery.updateAnnounceRecord(records) s.reachabilityManager.updateDiscoveryRecords = some proc(records: seq[MultiAddress]) = @@ -148,7 +150,7 @@ proc new*( ): CodexServer = ## create CodexServer including setting up datastore, repostore, etc - let reachabilityManager = ReachabilityManager.new(config.portMappingStrategy) + let reachabilityManager = ReachabilityManager.new(config.forcePortMapping) let switch = SwitchBuilder .new() diff --git a/codex/conf.nim b/codex/conf.nim index 00122f73..e98d6361 100644 --- a/codex/conf.nim +++ b/codex/conf.nim @@ -49,7 +49,7 @@ export units, net, codextypes, logutils, completeCmdArg, parseCmdArg export DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockInterval, DefaultNumBlocksPerInterval, - DefaultBlockRetries + DefaultBlockRetries, PortMappingStrategy type ThreadCount* = distinct Natural diff --git a/codex/nat/port_mapping.nim b/codex/nat/port_mapping.nim index 2409da20..57944d08 100644 --- a/codex/nat/port_mapping.nim +++ b/codex/nat/port_mapping.nim @@ -8,7 +8,7 @@ {.push raises: [].} -import std/[options, os, strutils, times, net, atomics] +import std/[options, strutils, net] import pkg/nat_traversal/[miniupnpc, natpmp] import pkg/json_serialization/std/net @@ -26,7 +26,7 @@ logScope: const UPNP_TIMEOUT = 200 # ms - RENEWAL_INTERVAL = 20 * 60 # seconds + RENEWAL_SLEEP = (20 * 60).seconds Pmp_LIFETIME = 60 * 60 # in seconds, must be longer than RENEWAL_INTERVAL MAPPING_DESCRIPTION = "codex" @@ -39,7 +39,7 @@ type PortMappingStrategy* = enum type MappingPort* = ref object of RootObj value*: Port -proc `$`(p: MappingPort): string = +proc `$`*(p: MappingPort): string = $(p.value) type TcpPort* = ref object of MappingPort @@ -51,37 +51,33 @@ proc newTcpMappingPort*(value: Port): TcpPort = proc newUdpMappingPort*(value: Port): UdpPort = UdpPort(value: value) -type PortMapping* = tuple[internalPort: MappingPort, externalPort: Option[MappingPort]] -type RenewelThreadArgs = - tuple[strategy: PortMappingStrategy, portMapping: seq[PortMapping]] +type PortMappingEntry* = + tuple[internalPort: MappingPort, externalPort: Option[MappingPort]] -var - upnp {.threadvar.}: Miniupnp - npmp {.threadvar.}: NatPmp - mappings: seq[PortMapping] - portMappingExiting: Atomic[bool] - renewalThread: Thread[RenewelThreadArgs] +type PortMapping* = ref object of RootObj + upnp: Miniupnp + npmp: NatPmp + mappings: seq[PortMappingEntry] + renewalLoop: Future[void] -proc initUpnp(): bool = +proc initUpnp(self: PortMapping) = logScope: protocol = "upnp" - if upnp != nil: + if not self.upnp.isNil: warn "UPnP already initialized!" - return true - upnp = newMiniupnp() - upnp.discoverDelay = UPNP_TIMEOUT + self.upnp = newMiniupnp() + self.upnp.discoverDelay = UPNP_TIMEOUT - if err =? upnp.discover().errorOption: + if err =? self.upnp.discover().errorOption: warn "UPnP error discoverning Internet Gateway Devices", msg = err - upnp = nil - return false + self.upnp = nil - case upnp.selectIGD() + case self.upnp.selectIGD() of IGDNotFound: info "UPnP Internet Gateway Device not found. Giving up." - upnp = nil + self.upnp = nil # As UPnP is not supported on our network we won't be using it --> lets erase it. of IGDFound: info "UPnP Internet Gateway Device found." @@ -92,47 +88,45 @@ proc initUpnp(): bool = of IGDIpNotRoutable: info "UPnP Internet Gateway Device found and is connected, but with a reserved or non-routable IP. Trying anyway." - return true - -proc initNpmp(): bool = +proc initNpmp(self: PortMapping) = logScope: protocol = "npmp" - if npmp != nil: + if not self.npmp.isNil: warn "NAT-PMP already initialized!" - return true - npmp = newNatPmp() + self.npmp = newNatPmp() - if err =? npmp.init().errorOption: + if err =? self.npmp.init().errorOption: warn "Error initialization of NAT-PMP", msg = err - npmp = nil - return false + self.npmp = nil - if err =? npmp.externalIPAddress().errorOption: + if err =? self.npmp.externalIPAddress().errorOption: warn "Fetching of external IP failed.", msg = err - npmp = nil - return false + self.npmp = nil info "NAT-PMP initialized" - return true -## Try to initilize all the port mapping protocols and returns -## the protocol that will be used. -proc initProtocols(strategy: PortMappingStrategy): PortMappingStrategy = +## Try to initilize all the port mapping protocols based on what is available on the network +proc initProtocols(self: PortMapping, strategy: PortMappingStrategy) = if strategy == PortMappingStrategy.Any or strategy == PortMappingStrategy.Upnp: - if initUpnp(): - return PortMappingStrategy.Upnp + self.initUpnp() + + if not self.upnp.isNil: + return # UPnP is available, using that, no need for NAT-PMP. if strategy == PortMappingStrategy.Any or strategy == PortMappingStrategy.Pmp: - if initNpmp(): - return PortMappingStrategy.Pmp + self.initNpmp() - return PortMappingStrategy.None +proc new*(T: type PortMapping, strategy: PortMappingStrategy): PortMapping = + let mapping = PortMapping(upnp: nil, npmp: nil, mappings: @[]) + mapping.initProtocols(strategy) + + return mapping proc upnpPortMapping( - internalPort: MappingPort, externalPort: MappingPort -): ?!MappingPort {.gcsafe.} = + self: PortMapping, internalPort: MappingPort, externalPort: MappingPort +): ?!MappingPort = let protocol = if (internalPort is TcpPort): UPNPProtocol.TCP else: UPNPProtocol.UDP logScope: @@ -141,10 +135,10 @@ proc upnpPortMapping( internalPort = internalPort.value protocol = protocol - let pmres = upnp.addPortMapping( + let pmres = self.upnp.addPortMapping( externalPort = $(externalPort.value), protocol = protocol, - internalHost = upnp.lanAddr, + internalHost = self.upnp.lanAddr, internalPort = $(internalPort.value), desc = MAPPING_DESCRIPTION, leaseDuration = 0, @@ -155,7 +149,7 @@ proc upnpPortMapping( return failure($pmres.error) # let's check it - let cres = upnp.getSpecificPortMapping( + let cres = self.upnp.getSpecificPortMapping( externalPort = $(externalPort.value), protocol = protocol ) if cres.isErr: @@ -166,8 +160,8 @@ proc upnpPortMapping( return success(externalPort) proc npmpPortMapping( - internalPort: MappingPort, externalPort: MappingPort -): ?!MappingPort {.gcsafe.} = + self: PortMapping, internalPort: MappingPort, externalPort: MappingPort +): ?!MappingPort = let protocol = if (internalPort is TcpPort): NatPmpProtocol.TCP else: NatPmpProtocol.UDP @@ -177,7 +171,7 @@ proc npmpPortMapping( internalPort = internalPort.value protocol = protocol - let extPortRes = npmp.addPortMapping( + let extPortRes = self.npmp.addPortMapping( eport = externalPort.value.cushort, iport = internalPort.value.cushort, protocol = protocol, @@ -200,75 +194,48 @@ proc npmpPortMapping( ## ## TODO: Add support for trying mapping of random external port. -proc doPortMapping(port: MappingPort): ?!MappingPort {.gcsafe.} = - if upnp != nil: - return upnpPortMapping(port, port) +proc doPortMapping(self: PortMapping, port: MappingPort): ?!MappingPort = + if not self.upnp.isNil: + return self.upnpPortMapping(port, port) - if npmp != nil: - return npmpPortMapping(port, port) + if not self.npmp.isNil: + return self.npmpPortMapping(port, port) return failure("No active startegy") proc doPortMapping( - internalPort: MappingPort, externalPort: MappingPort -): ?!MappingPort {.gcsafe.} = - if upnp != nil: - return upnpPortMapping(internalPort, externalPort) + self: PortMapping, internalPort: MappingPort, externalPort: MappingPort +): ?!MappingPort = + if not self.upnp.isNil: + return self.upnpPortMapping(internalPort, externalPort) - if npmp != nil: - return npmpPortMapping(internalPort, externalPort) + if not self.npmp.isNil: + return self.npmpPortMapping(internalPort, externalPort) return failure("No active startegy") -proc renewPortMapping(args: RenewelThreadArgs) {.thread, raises: [ValueError].} = - ignoreSignalsInThread() - let - (strategy, portMappings) = args - interval = initDuration(seconds = RENEWAL_INTERVAL) - sleepDuration = 1_000 # in ms, also the maximum delay after pressing Ctrl-C +proc renewPortMapping(self: PortMapping) {.async.} = + while true: + for mapping in self.mappings: + if externalPort =? mapping.externalPort: + without renewedExternalPort =? + self.doPortMapping(mapping.internalPort, externalPort), err: + error "Error while renewal of port mapping", msg = err.msg - var lastUpdate = now() + if renewedExternalPort.value != externalPort.value: + error "The renewed external port is not the same as the originally mapped" - # We can't use copies of Miniupnp and Pmp objects in this thread, because they share - # C pointers with other instances that have already been garbage collected, so - # we use threadvars instead and initialise them again with initProtocols(), - # even though we don't need the external IP's value. - - if initProtocols(strategy) == PortMappingStrategy.None: - error "Could not initiate protocols in renewal thread" - return - - while portMappingExiting.load() == false: - if now() >= (lastUpdate + interval): - for mapping in portMappings: - if externalPort =? mapping.externalPort: - without renewedExternalPort =? - doPortMapping(mapping.internalPort, externalPort), err: - error "Error while renewal of port mapping", msg = err.msg - - if renewedExternalPort.value != externalPort.value: - error "The renewed external port is not the same as the originally mapped" - - lastUpdate = now() - - sleep(sleepDuration) - -proc startRenewalThread(strategy: PortMappingStrategy) = - try: - renewalThread = Thread[RenewelThreadArgs]() - renewalThread.createThread(renewPortMapping, (strategy, mappings)) - except CatchableError as exc: - warn "Failed to create NAT port mapping renewal thread", exc = exc.msg + await sleepAsync(RENEWAL_SLEEP) ## Gets external IP provided by the port mapping protocols ## Port mapping needs to be succesfully started first using `startPortMapping()` -proc getExternalIP*(): ?IpAddress = - if upnp == nil and npmp == nil: +proc getExternalIP*(self: PortMapping): ?IpAddress = + if self.upnp.isNil and self.npmp.isNil: warn "No available port-mapping protocol" return IpAddress.none - if upnp != nil: - let ires = upnp.externalIPAddress + if not self.upnp.isNil: + let ires = self.upnp.externalIPAddress if ires.isOk(): info "Got externa IP address", ip = ires.value try: @@ -279,8 +246,8 @@ proc getExternalIP*(): ?IpAddress = debug "Getting external IP address using UPnP failed", msg = ires.error, protocol = "upnp" - if npmp != nil: - let nires = npmp.externalIPAddress() + if not self.npmp.isNil: + let nires = self.npmp.externalIPAddress() if nires.isErr: debug "Getting external IP address using NAT-PMP failed", msg = nires.error else: @@ -292,57 +259,62 @@ proc getExternalIP*(): ?IpAddress = return IpAddress.none -proc startPortMapping*( - strategy: var PortMappingStrategy, internalPorts: seq[MappingPort] -): ?!seq[PortMapping] = - if strategy == PortMappingStrategy.None: - return failure("No port mapping strategy requested") +## Returns true if some supported port mapping protocol +## is available on the local network +proc isAvailable*(self: PortMapping): bool = + return (not self.upnp.isNil) or (not self.npmp.isNil) +proc start*( + self: PortMapping, internalPorts: seq[MappingPort] +): Future[?!seq[PortMappingEntry]] {.async: (raises: [CancelledError]).} = if internalPorts.len == 0: return failure("No internal ports to be mapped were supplied") - strategy = initProtocols(strategy) - if strategy == PortMappingStrategy.None: + if not self.isAvailable(): return failure("No available port mapping protocols on the network") - if mappings.len > 0: + if self.mappings.len > 0: return failure("Port mapping was already started! Stop first before re-starting.") - mappings = newSeqOfCap[PortMapping](internalPorts.len) - for port in internalPorts: - without mappedPort =? doPortMapping(port), err: + without mappedPort =? self.doPortMapping(port), err: warn "Failed to map port", port = port, msg = err.msg - mappings.add((internalPort: port, externalPort: MappingPort.none)) + self.mappings.add((internalPort: port, externalPort: MappingPort.none)) - mappings.add((internalPort: port, externalPort: mappedPort.some)) + self.mappings.add((internalPort: port, externalPort: mappedPort.some)) - startRenewalThread(strategy) + self.renewalLoop = self.renewPortMapping() + asyncSpawn(self.renewalLoop) - return success(mappings) + return success(self.mappings) -proc stopPortMapping*() = - if upnp == nil or npmp == nil: +proc stop*(self: PortMapping) {.async: (raises: [CancelledError]).} = + if self.upnp.isNil or self.npmp.isNil: debug "Port mapping is not running, nothing to stop" return - info "Stopping port mapping renewal threads" - try: - portMappingExiting.store(true) - renewalThread.joinThread() - except CatchableError as exc: - warn "Failed to stop port mapping renewal thread", exc = exc.msg + info "Stopping port mapping renewal loop" + if not self.renewalLoop.isNil: + if not self.renewalLoop.finished: + try: + await self.renewalLoop.cancelAndWait() + except CancelledError: + discard + except CatchableError as e: + error "Error during cancellation of renewal loop", msg = e.msg - for mapping in mappings: + self.renewalLoop = nil + + for mapping in self.mappings: if mapping.externalPort.isNone: continue - if upnp != nil: + if not self.upnp.isNil: let protocol = if (mapping.internalPort is TcpPort): UPNPProtocol.TCP else: UPNPProtocol.UDP if err =? - upnp.deletePortMapping( + self.upnp.deletePortMapping( externalPort = $((!mapping.externalPort).value), protocol = protocol ).errorOption: error "UPnP port mapping deletion error", msg = err @@ -352,12 +324,12 @@ proc stopPortMapping*() = internalPort = mapping.internalPort, protocol = protocol - if npmp != nil: + if not self.npmp.isNil: let protocol = if (mapping.internalPort is TcpPort): NatPmpProtocol.TCP else: NatPmpProtocol.UDP if err =? - npmp.deletePortMapping( + self.npmp.deletePortMapping( eport = (!mapping.externalPort).value.cushort, iport = mapping.internalPort.value.cushort, protocol = protocol, @@ -369,4 +341,4 @@ proc stopPortMapping*() = internalPort = mapping.internalPort, protocol = protocol - mappings = @[] + self.mappings = @[] diff --git a/codex/nat/reachabilitymanager.nim b/codex/nat/reachabilitymanager.nim index 864a2d75..bbc63198 100644 --- a/codex/nat/reachabilitymanager.nim +++ b/codex/nat/reachabilitymanager.nim @@ -19,58 +19,74 @@ logScope: type ReachabilityManager* = ref object of RootObj + started = false + portMapping: PortMapping networkReachability*: NetworkReachability - portMappingStrategy: PortMappingStrategy getAnnounceRecords*: ?GetRecords getDiscoveryRecords*: ?GetRecords updateAnnounceRecords*: ?UpdateRecords updateDiscoveryRecords*: ?UpdateRecords - started = false - GetRecords* = proc(): seq[MultiAddress] {.raises: [].} + GetRecords* = proc(): ?seq[MultiAddress] {.raises: [].} UpdateRecords* = proc(records: seq[MultiAddress]) {.raises: [].} proc new*( T: typedesc[ReachabilityManager], portMappingStrategy: PortMappingStrategy ): T = - return T(portMappingStrategy: portMappingStrategy) + return T(portMapping: PortMapping.new(portMappingStrategy)) -proc startPortMapping(self: ReachabilityManager): bool = +proc startPortMapping( + self: ReachabilityManager +): Future[bool] {.async: (raises: [CancelledError]).} = + # This check guarantees us that the callbacks are set + # and hence we can use ! (Option.get) without fear. if not self.started: warn "ReachabilityManager is not started, yet we are trying to map ports already!" return false try: - let announceRecords = (!self.getAnnounceRecords)() - let discoveryRecords = (!self.getDiscoveryRecords)() - let portsToBeMapped = - (announceRecords & discoveryRecords).mapIt(getAddressAndPort(it)).mapIt(it.port) + {.gcsafe.}: + let announceRecords = (!self.getAnnounceRecords)() + let discoveryRecords = (!self.getDiscoveryRecords)() - without mappedPorts =? startPortMapping(self.portMappingStrategy, portsToBeMapped), - err: + var records: seq[MultiAddress] = @[] + + if announceRecords.isSome: + records.add(!announceRecords) + if discoveryRecords.isSome: + records.add(!discoveryRecords) + + let portsToBeMapped = records.mapIt(getAddressAndPort(it)).mapIt(it.port) + + without mappedPorts =? (await self.portMapping.start(portsToBeMapped)), err: warn "Could not start port mapping", msg = err.msg return false if mappedPorts.any( - proc(x: PortMapping): bool = + proc(x: PortMappingEntry): bool = isNone(x.externalPort) ): warn "Some ports were not mapped - not using port mapping then" return false - info "Started port mapping" + info "Succesfully exposed ports", ports = portsToBeMapped - let announceMappedRecords = zip( - announceRecords, mappedPorts[0 .. announceRecords.len - 1] - ) - .mapIt(getMultiAddr(getAddressAndPort(it[0]).ip, !it[1].externalPort)) - (!self.updateAnnounceRecords)(announceMappedRecords) + if announceRecords.isSome: + let announceMappedRecords = zip( + !announceRecords, mappedPorts[0 .. (!announceRecords).len - 1] + ) + .mapIt(getMultiAddr(getAddressAndPort(it[0]).ip, !it[1].externalPort)) + {.gcsafe.}: + (!self.updateAnnounceRecords)(announceMappedRecords) - let discoveryMappedRecords = zip( - discoveryRecords, mappedPorts[announceRecords.len .. ^1] - ) - .mapIt(getMultiAddr(getAddressAndPort(it[0]).ip, !it[1].externalPort)) - (!self.updateDiscoveryRecords)(discoveryMappedRecords) + if discoveryRecords.isSome: + let discoveryMappedRecords = zip( + !discoveryRecords, + mappedPorts[(mappedPorts.len - (!discoveryRecords).len) .. ^1], + ) + .mapIt(getMultiAddr(getAddressAndPort(it[0]).ip, !it[1].externalPort)) + {.gcsafe.}: + (!self.updateDiscoveryRecords)(discoveryMappedRecords) return true except ValueError as exc: @@ -80,7 +96,7 @@ proc startPortMapping(self: ReachabilityManager): bool = proc getReachabilityHandler(manager: ReachabilityManager): StatusAndConfidenceHandler = let statusAndConfidenceHandler = proc( networkReachability: NetworkReachability, confidenceOpt: Opt[float] - ): Future[void] {.async: (raises: [CancelledError]).} = + ): Future[void] {.gcsafe, async: (raises: [CancelledError]).} = if not manager.started: warn "ReachabilityManager was not started, but we are already getting reachability updates! Ignoring..." return @@ -101,8 +117,11 @@ proc getReachabilityHandler(manager: ReachabilityManager): StatusAndConfidenceHa if networkReachability == NetworkReachability.NotReachable: # Lets first start to expose port using port mapping protocols like NAT-PMP or UPnP - if manager.startPortMapping(): - return # We exposed ports so we should be good! + if manager.portMapping.isAvailable(): + debug "Port mapping available on the network" + + if await manager.startPortMapping(): + return # We exposed ports so we should be good! info "No more options to become reachable" @@ -129,8 +148,10 @@ proc start*( except CatchableError as exc: info "Failed to dial bootstrap nodes", err = exc.msg -proc stop*(): Future[void] {.async: (raises: [CancelledError]).} = - stopPortMapping() +proc stop*( + self: ReachabilityManager +): Future[void] {.async: (raises: [CancelledError]).} = + await self.portMapping.stop() self.started = false proc getAutonatService*(self: ReachabilityManager): Service = diff --git a/tests/codex/helpers/nodeutils.nim b/tests/codex/helpers/nodeutils.nim index 857092bb..7374d094 100644 --- a/tests/codex/helpers/nodeutils.nim +++ b/tests/codex/helpers/nodeutils.nim @@ -11,8 +11,6 @@ import pkg/codex/stores import pkg/codex/blocktype as bt import pkg/codex/blockexchange import pkg/codex/systemclock -import pkg/codex/nat -import pkg/codex/utils/natutils import pkg/codex/utils/safeasynciter import pkg/codex/merkletree import pkg/codex/manifest @@ -219,10 +217,6 @@ proc generateNodes*( if config.enableBootstrap: waitFor switch.peerInfo.update() - let (announceAddrs, discoveryAddrs) = - nattedAddress(switch.peerInfo.addrs, bindPort.Port) - blockDiscovery.updateAnnounceRecord(announceAddrs) - blockDiscovery.updateDhtRecord(discoveryAddrs) if blockDiscovery.dhtRecord.isSome: bootstrapNodes.add !blockDiscovery.dhtRecord