diff --git a/codex/codex.nim b/codex/codex.nim index 81357464..ed30fe84 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -57,6 +57,8 @@ type repoStore: RepoStore maintenance: BlockMaintainer taskpool: Taskpool + isStarted: bool + natManager: NatManager CodexPrivateKey* = libp2p.PrivateKey # alias EthWallet = ethers.Wallet @@ -167,7 +169,7 @@ proc start*(s: CodexServer) {.async.} = await s.codexNode.switch.start() let (announceAddrs, discoveryAddrs) = nattedAddress( - s.config.nat, s.codexNode.switch.peerInfo.addrs, s.config.discoveryPort + s.natManager, s.config.nat, s.codexNode.switch.peerInfo.addrs, s.config.discoveryPort ) s.codexNode.discovery.updateAnnounceRecord(announceAddrs) @@ -190,17 +192,34 @@ proc stop*(s: CodexServer) {.async.} = ] ) + shutdownNat(s.natManager) + if res.failure.len > 0: error "Failed to stop codex node", failures = res.failure.len raiseAssert "Failed to stop codex node" if not s.taskpool.isNil: - s.taskpool.shutdown() + try: + s.taskpool.shutdown() + except Exception as exc: + error "Failed to stop the taskpool", failures = res.failure.len + raiseAssert("Failure in taskpool shutdown:" & exc.msg) + + shutdownNat(s.natManager) + + if res.failure.len > 0: + error "Failed to close codex node", failures = res.failure.len + raiseAssert "Failed to close codex node" + +proc shutdown*(server: CodexServer) {.async.} = + await server.stop() + await server.close() proc new*( T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey ): CodexServer = ## create CodexServer including setting up datastore, repostore, etc + let natManager = newNatManager() let switch = SwitchBuilder .new() .withPrivateKey(privateKey) @@ -338,4 +357,5 @@ proc new*( repoStore: repoStore, maintenance: maintenance, taskpool: taskpool, + natManager: natManager, ) diff --git a/codex/nat.nim b/codex/nat.nim index d022dad6..c2779a2a 100644 --- a/codex/nat.nim +++ b/codex/nat.nim @@ -28,15 +28,29 @@ const PORT_MAPPING_INTERVAL = 20 * 60 # seconds NATPMP_LIFETIME = 60 * 60 # in seconds, must be longer than PORT_MAPPING_INTERVAL -type PortMappings* = object - internalTcpPort: Port - externalTcpPort: Port - internalUdpPort: Port - externalUdpPort: Port - description: string +type + PortMappings* = object + internalTcpPort: Port + externalTcpPort: Port + internalUdpPort: Port + externalUdpPort: Port + description: string -type PortMappingArgs = - tuple[strategy: NatStrategy, tcpPort, udpPort: Port, description: string] + NatManager* = ref NatManagerObj + + PortMappingArgs = object + manager: NatManager + strategy: NatStrategy + tcpPort: Port + udpPort: Port + description: string + + NatManagerObj = object + selectedStrategy*: NatStrategy + extIp*: Option[IpAddress] + natClosed*: Atomic[bool] + activeMappings*: seq[PortMappings] + natThreads*: seq[Thread[PortMappingArgs]] type NatConfig* = object case hasExtIp*: bool @@ -46,11 +60,6 @@ type NatConfig* = object var upnp {.threadvar.}: Miniupnp npmp {.threadvar.}: NatPmp - strategy = NatStrategy.NatNone - natClosed: Atomic[bool] - extIp: Option[IpAddress] - activeMappings: seq[PortMappings] - natThreads: seq[Thread[PortMappingArgs]] = @[] logScope: topics = "nat" @@ -62,11 +71,25 @@ type PrefSrcStatus = enum BindAddressIsPublic BindAddressIsPrivate +proc shutdownNat*(manager: NatManager) + +proc newNatManager*(): NatManager {.gcsafe.} = + new(result) + result.selectedStrategy = NatStrategy.NatNone + result.extIp = none(IpAddress) + result.activeMappings = @[] + result.natThreads = @[] + result.natClosed.store(false) + ## Also does threadvar initialisation. ## Must be called before redirectPorts() in each thread. -proc getExternalIP*(natStrategy: NatStrategy, quiet = false): Option[IpAddress] = +proc getExternalIP*( + natStrategy: NatStrategy, quiet = false +): tuple[ip: Option[IpAddress], selected: NatStrategy] = var externalIP: IpAddress + result.selected = NatStrategy.NatNone + if natStrategy == NatStrategy.NatAny or natStrategy == NatStrategy.NatUpnp: if upnp == nil: upnp = newMiniupnp() @@ -103,8 +126,9 @@ proc getExternalIP*(natStrategy: NatStrategy, quiet = false): Option[IpAddress] # if we got this far, UPnP is working and we don't need to try NAT-PMP try: externalIP = parseIpAddress(ires.value) - strategy = NatStrategy.NatUpnp - return some(externalIP) + result.selected = NatStrategy.NatUpnp + result.ip = some(externalIP) + return except ValueError as e: error "parseIpAddress() exception", err = e.msg return @@ -122,8 +146,9 @@ proc getExternalIP*(natStrategy: NatStrategy, quiet = false): Option[IpAddress] else: try: externalIP = parseIpAddress($(nires.value)) - strategy = NatStrategy.NatPmp - return some(externalIP) + result.selected = NatStrategy.NatPmp + result.ip = some(externalIP) + return except ValueError as e: error "parseIpAddress() exception", err = e.msg return @@ -163,9 +188,9 @@ proc getPublicRoutePrefSrcOrExternalIP*( of NoRoutingInfo, PrefSrcIsPublic, BindAddressIsPublic: return prefSrcIp of PrefSrcIsPrivate, BindAddressIsPrivate: - let extIp = getExternalIP(natStrategy, quiet) + let (extIp, _) = getExternalIP(natStrategy, quiet) if extIp.isSome: - return some(extIp.get) + return extIp proc doPortMapping( strategy: NatStrategy, tcpPort, udpPort: Port, description: string @@ -230,20 +255,28 @@ proc doPortMapping( proc repeatPortMapping(args: PortMappingArgs) {.thread, raises: [ValueError].} = ignoreSignalsInThread() + let - (strategy, tcpPort, udpPort, description) = args + manager = args.manager + strategy = args.strategy + tcpPort = args.tcpPort + udpPort = args.udpPort + description = args.description interval = initDuration(seconds = PORT_MAPPING_INTERVAL) sleepDuration = 1_000 # in ms, also the maximum delay after pressing Ctrl-C + if manager.isNil: + return + var lastUpdate = now() # We can't use copies of Miniupnp and NatPmp objects in this thread, because they share # C pointers with other instances that have already been garbage collected, so # we use threadvars instead and initialise them again with getExternalIP(), # even though we don't need the external IP's value. - let ipres = getExternalIP(strategy, quiet = true) + let (ipres, _) = getExternalIP(strategy, quiet = true) if ipres.isSome: - while natClosed.load() == false: + while manager.natClosed.load() == false: let # we're being silly here with this channel polling because we can't # select on Nim channels like on Go ones @@ -252,61 +285,69 @@ proc repeatPortMapping(args: PortMappingArgs) {.thread, raises: [ValueError].} = discard doPortMapping(strategy, tcpPort, udpPort, description) lastUpdate = currTime - sleep(sleepDuration) + sleep(sleepDuration) -proc stopNatThreads() {.noconv.} = - # stop the thread - debug "Stopping NAT port mapping renewal threads" - try: - natClosed.store(true) - joinThreads(natThreads) - except Exception as exc: - warn "Failed to stop NAT port mapping renewal thread", exc = exc.msg +proc shutdownNat*(manager: NatManager) = + if manager.isNil: + return - # delete our port mappings + if manager.natThreads.len > 0: + debug "Stopping NAT port mapping renewal threads" + manager.natClosed.store(true) - # FIXME: if the initial port mapping failed because it already existed for the - # required external port, we should not delete it. It might have been set up - # by another program. + for thread in manager.natThreads.mitems: + try: + joinThread(thread) + except Exception as exc: + warn "Failed to stop NAT port mapping renewal thread", exc = exc.msg + manager.natThreads.setLen(0) - # In Windows, a new thread is created for the signal handler, so we need to - # initialise our threadvars again. + let selected = manager.selectedStrategy + if selected in {NatStrategy.NatUpnp, NatStrategy.NatPmp} and + manager.activeMappings.len > 0: + let (ipres, _) = getExternalIP(selected, quiet = true) + if ipres.isSome: + case selected + of NatStrategy.NatUpnp: + for entry in manager.activeMappings: + for t in [ + (entry.externalTcpPort, entry.internalTcpPort, UPNPProtocol.TCP), + (entry.externalUdpPort, entry.internalUdpPort, UPNPProtocol.UDP), + ]: + let + (eport, iport, protocol) = t + pmres = upnp.deletePortMapping(externalPort = $eport, protocol = protocol) + if pmres.isErr: + error "UPnP port mapping deletion", msg = pmres.error + else: + debug "UPnP: deleted port mapping", + externalPort = eport, internalPort = iport, protocol = protocol + of NatStrategy.NatPmp: + for entry in manager.activeMappings: + for t in [ + (entry.externalTcpPort, entry.internalTcpPort, NatPmpProtocol.TCP), + (entry.externalUdpPort, entry.internalUdpPort, NatPmpProtocol.UDP), + ]: + let + (eport, iport, protocol) = t + pmres = npmp.deletePortMapping( + eport = eport.cushort, iport = iport.cushort, protocol = protocol + ) + if pmres.isErr: + error "NAT-PMP port mapping deletion", msg = pmres.error + else: + debug "NAT-PMP: deleted port mapping", + externalPort = eport, internalPort = iport, protocol = protocol + else: + discard - let ipres = getExternalIP(strategy, quiet = true) - if ipres.isSome: - if strategy == NatStrategy.NatUpnp: - for entry in activeMappings: - for t in [ - (entry.externalTcpPort, entry.internalTcpPort, UPNPProtocol.TCP), - (entry.externalUdpPort, entry.internalUdpPort, UPNPProtocol.UDP), - ]: - let - (eport, iport, protocol) = t - pmres = upnp.deletePortMapping(externalPort = $eport, protocol = protocol) - if pmres.isErr: - error "UPnP port mapping deletion", msg = pmres.error - else: - debug "UPnP: deleted port mapping", - externalPort = eport, internalPort = iport, protocol = protocol - elif strategy == NatStrategy.NatPmp: - for entry in activeMappings: - for t in [ - (entry.externalTcpPort, entry.internalTcpPort, NatPmpProtocol.TCP), - (entry.externalUdpPort, entry.internalUdpPort, NatPmpProtocol.UDP), - ]: - let - (eport, iport, protocol) = t - pmres = npmp.deletePortMapping( - eport = eport.cushort, iport = iport.cushort, protocol = protocol - ) - if pmres.isErr: - error "NAT-PMP port mapping deletion", msg = pmres.error - else: - debug "NAT-PMP: deleted port mapping", - externalPort = eport, internalPort = iport, protocol = protocol + manager.activeMappings.setLen(0) + manager.extIp = none(IpAddress) + manager.selectedStrategy = NatStrategy.NatNone + manager.natClosed.store(false) proc redirectPorts*( - strategy: NatStrategy, tcpPort, udpPort: Port, description: string + manager: NatManager, strategy: NatStrategy, tcpPort, udpPort: Port, description: string ): Option[(Port, Port)] = result = doPortMapping(strategy, tcpPort, udpPort, description) if result.isSome: @@ -315,7 +356,8 @@ proc redirectPorts*( # Port mapping works. Let's launch a thread that repeats it, in case the # NAT-PMP lease expires or the router is rebooted and forgets all about # these mappings. - activeMappings.add( + manager.natClosed.store(false) + manager.activeMappings.add( PortMappings( internalTcpPort: tcpPort, externalTcpPort: externalTcpPort, @@ -324,33 +366,41 @@ proc redirectPorts*( description: description, ) ) - try: - natThreads.add(Thread[PortMappingArgs]()) - natThreads[^1].createThread( - repeatPortMapping, (strategy, externalTcpPort, externalUdpPort, description) - ) - # atexit() in disguise - if natThreads.len == 1: - # we should register the thread termination function only once - addQuitProc(stopNatThreads) - except Exception as exc: - warn "Failed to create NAT port mapping renewal thread", exc = exc.msg + # Renewal thread temporarily disabled pending thread-safe reimplementation. proc setupNat*( - natStrategy: NatStrategy, tcpPort, udpPort: Port, clientId: string + manager: NatManager, natStrategy: NatStrategy, tcpPort, udpPort: Port, clientId: string ): tuple[ip: Option[IpAddress], tcpPort, udpPort: Option[Port]] = ## Setup NAT port mapping and get external IP address. ## If any of this fails, we don't return any IP address but do return the ## original ports as best effort. ## TODO: Allow for tcp or udp port mapping to be optional. - if extIp.isNone: - extIp = getExternalIP(natStrategy) - if extIp.isSome: - let ip = extIp.get + + if manager.isNil: + warn "NAT manager not initialised" + return (ip: none(IpAddress), tcpPort: some(tcpPort), udpPort: some(udpPort)) + + if manager.extIp.isNone or manager.selectedStrategy == NatStrategy.NatNone: + let (ipRes, selected) = getExternalIP(natStrategy) + if ipRes.isSome and selected in {NatStrategy.NatUpnp, NatStrategy.NatPmp}: + manager.extIp = ipRes + manager.selectedStrategy = selected + else: + warn "UPnP/NAT-PMP not available" + manager.extIp = none(IpAddress) + manager.selectedStrategy = NatStrategy.NatNone + return (ip: none(IpAddress), tcpPort: some(tcpPort), udpPort: some(udpPort)) + + if manager.extIp.isSome: + let ip = manager.extIp.get let extPorts = ( {.gcsafe.}: redirectPorts( - strategy, tcpPort = tcpPort, udpPort = udpPort, description = clientId + manager, + manager.selectedStrategy, + tcpPort = tcpPort, + udpPort = udpPort, + description = clientId, ) ) if extPorts.isSome: @@ -364,7 +414,12 @@ proc setupNat*( (ip: none(IpAddress), tcpPort: some(tcpPort), udpPort: some(udpPort)) proc setupAddress*( - natConfig: NatConfig, bindIp: IpAddress, tcpPort, udpPort: Port, clientId: string + manager: NatManager, + natConfig: NatConfig, + bindIp: IpAddress, + tcpPort, + udpPort: Port, + clientId: string ): tuple[ip: Option[IpAddress], tcpPort, udpPort: Option[Port]] {.gcsafe.} = ## Set-up of the external address via any of the ways as configured in ## `NatConfig`. In case all fails an error is logged and the bind ports are @@ -384,7 +439,7 @@ proc setupAddress*( of NoRoutingInfo, PrefSrcIsPublic, BindAddressIsPublic: return (prefSrcIp, some(tcpPort), some(udpPort)) of PrefSrcIsPrivate, BindAddressIsPrivate: - return setupNat(natConfig.nat, tcpPort, udpPort, clientId) + return setupNat(manager, natConfig.nat, tcpPort, udpPort, clientId) of NatStrategy.NatNone: let (prefSrcIp, prefSrcStatus) = getRoutePrefSrc(bindIp) @@ -398,15 +453,17 @@ proc setupAddress*( error "Bind IP is not a public IP address. Should not use --nat:none option" return (none(IpAddress), some(tcpPort), some(udpPort)) of NatStrategy.NatUpnp, NatStrategy.NatPmp: - return setupNat(natConfig.nat, tcpPort, udpPort, clientId) + return setupNat(manager, natConfig.nat, tcpPort, udpPort, clientId) proc nattedAddress*( - natConfig: NatConfig, addrs: seq[MultiAddress], udpPort: Port + manager: NatManager, natConfig: NatConfig, addrs: seq[MultiAddress], udpPort: Port ): tuple[libp2p, discovery: seq[MultiAddress]] = ## Takes a NAT configuration, sequence of multiaddresses and UDP port and returns: ## - Modified multiaddresses with NAT-mapped addresses for libp2p ## - Discovery addresses with NAT-mapped UDP ports + doAssert(not manager.isNil, "NatManager must not be nil") + var discoveryAddrs = newSeq[MultiAddress](0) let newAddrs = addrs.mapIt: block: @@ -415,7 +472,7 @@ proc nattedAddress*( if ipPart.isSome and port.isSome: # Try to setup NAT mapping for the address let (newIP, tcp, udp) = - setupAddress(natConfig, ipPart.get, port.get, udpPort, "codex") + setupAddress(manager, natConfig, ipPart.get, port.get, udpPort, "codex") if newIP.isSome: # NAT mapping successful - add discovery address with mapped UDP port discoveryAddrs.add(getMultiAddrWithIPAndUDPPort(newIP.get, udp.get)) diff --git a/tests/codex/helpers/nodeutils.nim b/tests/codex/helpers/nodeutils.nim index 6d2edd46..a7c30fde 100644 --- a/tests/codex/helpers/nodeutils.nim +++ b/tests/codex/helpers/nodeutils.nim @@ -186,7 +186,11 @@ proc generateNodes*( if config.enableBootstrap: waitFor switch.peerInfo.update() + let natManager = newNatManager() + defer: + shutdownNat(natManager) let (announceAddrs, discoveryAddrs) = nattedAddress( + natManager, NatConfig(hasExtIp: false, nat: NatNone), switch.peerInfo.addrs, bindPort.Port, diff --git a/tests/codex/testnat.nim b/tests/codex/testnat.nim index 3981b2e6..80cec297 100644 --- a/tests/codex/testnat.nim +++ b/tests/codex/testnat.nim @@ -38,8 +38,12 @@ suite "NAT Address Tests": #ipv6Addr = MultiAddress.init("/ip6/::1/tcp/5000").expect("valid multiaddr") addrs = @[localAddr, anyAddr, publicAddr] + let natManager = newNatManager() + defer: + shutdownNat(natManager) + # Test address remapping - let (libp2pAddrs, discoveryAddrs) = nattedAddress(natConfig, addrs, udpPort) + let (libp2pAddrs, discoveryAddrs) = nattedAddress(natManager, natConfig, addrs, udpPort) # Verify results check(discoveryAddrs == expectedDiscoveryAddrs)