diff --git a/.gitmodules b/.gitmodules index e6214d80..ec152ae6 100644 --- a/.gitmodules +++ b/.gitmodules @@ -196,3 +196,6 @@ [submodule "vendor/nim-merkletree"] path = vendor/nim-merkletree url = https://github.com/logos-storage/nim-merkletree +[submodule "vendor/nim-lsquic"] + path = vendor/nim-lsquic + url = https://github.com/vacp2p/nim-lsquic diff --git a/library/storage_thread_requests/requests/node_debug_request.nim b/library/storage_thread_requests/requests/node_debug_request.nim index 8bf3106c..8c7dec7d 100644 --- a/library/storage_thread_requests/requests/node_debug_request.nim +++ b/library/storage_thread_requests/requests/node_debug_request.nim @@ -59,6 +59,13 @@ proc getDebug( if node.discovery.dhtRecord.isSome: node.discovery.dhtRecord.get.toURI else: "", "announceAddresses": node.discovery.announceAddrs, "table": table, + "nat": { + "reachability": + if storage[].autonatService.isSome: + $storage[].autonatService.get.networkReachability + else: + "unknown" + }, } return ok($json) diff --git a/storage/blockexchange/network/network.nim b/storage/blockexchange/network/network.nim index 5f53fe20..95870595 100644 --- a/storage/blockexchange/network/network.nim +++ b/storage/blockexchange/network/network.nim @@ -13,7 +13,7 @@ import std/sequtils import pkg/chronos import pkg/libp2p -import pkg/libp2p/utils/semaphore +import pkg/chronos/asyncsync import pkg/questionable import pkg/questionable/results @@ -107,13 +107,17 @@ proc send*( let peer = b.peers[id] await b.inflightSema.acquire() - await peer.send(msg) + try: + await peer.send(msg) + finally: + try: + b.inflightSema.release() + except AsyncSemaphoreError as err: + error "Failed to release semaphore", msg = err.msg except CancelledError as error: raise error except CatchableError as err: error "Error sending message", peer = id, msg = err.msg - finally: - b.inflightSema.release() proc handleWantList( b: BlockExcNetwork, peer: NetworkPeer, list: WantList diff --git a/storage/blockexchange/peers/peercontext.nim b/storage/blockexchange/peers/peercontext.nim index 9e7c80f6..d81a4161 100644 --- a/storage/blockexchange/peers/peercontext.nim +++ b/storage/blockexchange/peers/peercontext.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/math +import std/[math, options] import pkg/libp2p import pkg/chronos diff --git a/storage/conf.nim b/storage/conf.nim index d24fd3ea..0b6a1c26 100644 --- a/storage/conf.nim +++ b/storage/conf.nim @@ -52,6 +52,8 @@ export DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockInterval, DefaultNumBlocksPerInterval, DefaultBlockRetries +const DefaultNatScheduleInterval* = 5.minutes + type ThreadCount* = distinct Natural proc `==`*(a, b: ThreadCount): bool {.borrow.} @@ -155,10 +157,9 @@ type nat* {. desc: "Specify method to use for determining public address. " & - "Must be one of: any, none, upnp, pmp, extip:. " & - "If connecting to peers on a local network only, use 'none'.", + "Must be one of: auto, extip:.", defaultValue: defaultNatConfig(), - defaultValueDesc: "any", + defaultValueDesc: "auto", name: "nat" .}: NatConfig @@ -286,11 +287,48 @@ type desc: "Logs to file", defaultValue: string.none, name: "log-file", hidden .}: Option[string] + natScheduleInterval* {. + desc: "Interval between AutoNAT reachability checks", + defaultValue: DefaultNatScheduleInterval, + defaultValueDesc: $DefaultNatScheduleInterval, + name: "nat-schedule-interval" + .}: Duration + + natNumPeersToAsk* {. + desc: "Number of peers to ask per AutoNAT round", + defaultValue: 3, + name: "nat-num-peers-to-ask" + .}: int + + natMaxQueueSize* {. + desc: "Number of past AutoNAT results kept to calculate confidence", + defaultValue: 3, + name: "nat-max-queue-size" + .}: int + + natMinConfidence* {. + desc: "Minimum confidence threshold to confirm reachability", + defaultValue: 0.7, + name: "nat-min-confidence" + .}: float + + natMaxRelays* {. + desc: "Maximum number of relay servers to reserve slots on simultaneously", + defaultValue: 2, + name: "nat-max-relays" + .}: int + + relay* {. + desc: "Enable circuit relay server (hop) - use on publicly reachable nodes only", + defaultValue: false, + name: "relay" + .}: bool + func defaultAddress*(conf: StorageConf): IpAddress = result = static parseIpAddress("127.0.0.1") func defaultNatConfig*(): NatConfig = - result = NatConfig(hasExtIp: false, nat: NatStrategy.NatAny) + result = NatConfig(hasExtIp: false, nat: NatStrategy.NatAuto) proc getStorageVersion(): string = let tag = strip(staticExec("git describe --tags --abbrev=0")) @@ -366,14 +404,8 @@ proc parseCmdArg*(T: type SignedPeerRecord, uri: string): T = func parse*(T: type NatConfig, p: string): Result[NatConfig, string] = case p.toLowerAscii - of "any": - return ok(NatConfig(hasExtIp: false, nat: NatStrategy.NatAny)) - of "none": - return ok(NatConfig(hasExtIp: false, nat: NatStrategy.NatNone)) - of "upnp": - return ok(NatConfig(hasExtIp: false, nat: NatStrategy.NatUpnp)) - of "pmp": - return ok(NatConfig(hasExtIp: false, nat: NatStrategy.NatPmp)) + of "auto": + return ok(NatConfig(hasExtIp: false, nat: NatStrategy.NatAuto)) else: if p.startsWith("extip:"): try: diff --git a/storage/discovery.nim b/storage/discovery.nim index 73be4ee3..d9faf162 100644 --- a/storage/discovery.nim +++ b/storage/discovery.nim @@ -22,6 +22,7 @@ import pkg/codexdht/discv5/[routing_table, protocol as discv5] from pkg/nimcrypto import keccak256 import ./rng +import ./utils/addrutils import ./errors import ./logutils @@ -175,29 +176,47 @@ method removeProvider*( warn "Error removing provider", peerId = peerId, exc = exc.msg raiseAssert("Unexpected Exception in removeProvider") +proc updateRecords*( + d: Discovery, announceAddrs: openArray[MultiAddress], discoveryPort: Port +) = + ## Update both provider and DHT records from TCP announce addresses. + ## Discovery (UDP) addresses are derived by remapping announceAddrs to UDP with discoveryPort. + ## Updates the discv5 SPR once with the full set of addresses. + let tcpAddrs = @announceAddrs + let udpAddrs = + tcpAddrs.mapIt(it.remapAddr(protocol = some("udp"), port = some(discoveryPort))) + + debug "Updating addresses", tcpAddrs, udpAddrs + + d.announceAddrs = tcpAddrs + d.providerRecord = SignedPeerRecord + .init(d.key, PeerRecord.init(d.peerId, tcpAddrs)) + .expect("Should construct signed record").some + d.dhtRecord = SignedPeerRecord + .init(d.key, PeerRecord.init(d.peerId, tcpAddrs & udpAddrs)) + .expect("Should construct signed record").some + + if not d.protocol.isNil: + d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR") + proc updateAnnounceRecord*(d: Discovery, addrs: openArray[MultiAddress]) = - ## Update providers record - ## - + # Updates announce addresses only, not the DHT routing record. + # Relay addresses should not pollute DHT routing. d.announceAddrs = @addrs - info "Updating announce record", addrs = d.announceAddrs d.providerRecord = SignedPeerRecord .init(d.key, PeerRecord.init(d.peerId, d.announceAddrs)) .expect("Should construct signed record").some - if not d.protocol.isNil: d.protocol.updateRecord(d.providerRecord).expect("Should update SPR") -proc updateDhtRecord*(d: Discovery, addrs: openArray[MultiAddress]) = - ## Update providers record - ## - +proc updateDhtRecord*( + d: Discovery, addrs: openArray[MultiAddress] +) {.deprecated: "use updateRecords instead".} = info "Updating Dht record", addrs = addrs d.dhtRecord = SignedPeerRecord .init(d.key, PeerRecord.init(d.peerId, @addrs)) .expect("Should construct signed record").some - if not d.protocol.isNil: d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR") @@ -237,7 +256,8 @@ proc new*( key: PrivateKey, bindIp = IPv4_any(), bindPort = 0.Port, - announceAddrs: openArray[MultiAddress], + announceAddrs: openArray[MultiAddress] = [], + discoveryPort = 0.Port, bootstrapNodes: openArray[SignedPeerRecord] = [], store: Datastore = SQLiteDatastore.new(Memory).expect("Should not fail!"), ): Discovery = @@ -248,7 +268,7 @@ proc new*( key: key, peerId: PeerId.init(key).expect("Should construct PeerId"), store: store ) - self.updateAnnounceRecord(announceAddrs) + self.updateRecords(announceAddrs, discoveryPort) # -------------------------------------------------------------------------- # FIXME disable IP limits temporarily so we can run our workshop. Re-enable diff --git a/storage/manifest/coders.nim b/storage/manifest/coders.nim index 80a9879e..a774a96a 100644 --- a/storage/manifest/coders.nim +++ b/storage/manifest/coders.nim @@ -13,7 +13,7 @@ import times {.push raises: [].} -import std/tables +import std/[tables, options] import pkg/libp2p import pkg/questionable diff --git a/storage/nat.nim b/storage/nat.nim index 60dedc49..acbcbcbe 100644 --- a/storage/nat.nim +++ b/storage/nat.nim @@ -8,421 +8,187 @@ {.push raises: [].} -import - std/[options, os, times, net, atomics, exitprocs], - nat_traversal/[miniupnpc, natpmp], - json_serialization/std/net, - results +import std/[options, net] +import results import pkg/chronos +import pkg/chronos/threadsync import pkg/chronicles import pkg/libp2p +import pkg/libp2p/services/autorelayservice import ./utils import ./utils/natutils import ./utils/addrutils +import ./discovery -const - UPNP_TIMEOUT = 200 # ms - PORT_MAPPING_INTERVAL = 20 * 60 # seconds - NATPMP_LIFETIME = 60 * 60 # in seconds, must be longer than PORT_MAPPING_INTERVAL +logScope: + topics = "nat" -type PortMappings* = object - internalTcpPort: Port - externalTcpPort: Port - internalUdpPort: Port - externalUdpPort: Port - description: string - -type PortMappingArgs = - tuple[strategy: NatStrategy, tcpPort, udpPort: Port, description: string] +const NatPortMappingTimeout = 5.seconds type NatConfig* = object case hasExtIp*: bool of true: extIp*: IpAddress of false: nat*: NatStrategy -var - upnp {.threadvar.}: Miniupnp - npmp {.threadvar.}: NatPmp - strategy = NatStrategy.NatNone - natClosed: Atomic[bool] - extIp: Option[IpAddress] - activeMappings: seq[PortMappings] - natThreads: seq[Thread[PortMappingArgs]] = @[] +type NatMapper* = ref object of RootObj + natConfig*: NatConfig + tcpPort*: Port + discoveryPort*: Port + hasUpnpMapping: bool -logScope: - topics = "nat" +type MapNatPortsCtx = object + natConfig: NatConfig + tcpPort: Port + discoveryPort: Port + signal: ThreadSignalPtr + result: Option[(Port, Port)] + hasUpnpMapping: bool -type PrefSrcStatus = enum - NoRoutingInfo - PrefSrcIsPublic - PrefSrcIsPrivate - BindAddressIsPublic - BindAddressIsPrivate +proc mapNatPortsThread(ctx: ptr MapNatPortsCtx) {.thread.} = + if ctx.natConfig.hasExtIp: + discard ctx.signal.fireSync() + return -## Also does threadvar initialisation. -## Must be called before redirectPorts() in each thread. -proc getExternalIP*(natStrategy: NatStrategy, quiet = false): Option[IpAddress] = - var externalIP: IpAddress + # Devices are recreated on each call: discover() costs ~200ms but only fires + # when AutoNAT reports NotReachable, which is exactly when we want a fresh scan. + let upnpRes = UpnpDevice.init() + if upnpRes.isOk: + let ports = upnpRes.value.mapPorts(ctx.tcpPort, ctx.discoveryPort) + if ports.isSome: + ctx.hasUpnpMapping = true + ctx.result = ports + discard ctx.signal.fireSync() + return - if natStrategy == NatStrategy.NatAny or natStrategy == NatStrategy.NatUpnp: - if upnp == nil: - upnp = newMiniupnp() + let pmpRes = PmpDevice.init() + if pmpRes.isOk: + let ports = pmpRes.value.mapPorts(ctx.tcpPort, ctx.discoveryPort) + if ports.isSome: + ctx.result = ports - upnp.discoverDelay = UPNP_TIMEOUT - let dres = upnp.discover() - if dres.isErr: - debug "UPnP", msg = dres.error - else: - var - msg: cstring - canContinue = true - case upnp.selectIGD() - of IGDNotFound: - msg = "Internet Gateway Device not found. Giving up." - canContinue = false - of IGDFound: - msg = "Internet Gateway Device found." - of IGDNotConnected: - msg = "Internet Gateway Device found but it's not connected. Trying anyway." - of NotAnIGD: - msg = - "Some device found, but it's not recognised as an Internet Gateway Device. Trying anyway." - of IGDIpNotRoutable: - msg = - "Internet Gateway Device found and is connected, but with a reserved or non-routable IP. Trying anyway." - if not quiet: - debug "UPnP", msg - if canContinue: - let ires = upnp.externalIPAddress() - if ires.isErr: - debug "UPnP", msg = ires.error - else: - # if we got this far, UPnP is working and we don't need to try NAT-PMP - try: - externalIP = parseIpAddress(ires.value) - strategy = NatStrategy.NatUpnp - return some(externalIP) - except ValueError as e: - error "parseIpAddress() exception", err = e.msg - return + discard ctx.signal.fireSync() - if natStrategy == NatStrategy.NatAny or natStrategy == NatStrategy.NatPmp: - if npmp == nil: - npmp = newNatPmp() - let nres = npmp.init() - if nres.isErr: - debug "NAT-PMP", msg = nres.error - else: - let nires = npmp.externalIPAddress() - if nires.isErr: - debug "NAT-PMP", msg = nires.error - else: - try: - externalIP = parseIpAddress($(nires.value)) - strategy = NatStrategy.NatPmp - return some(externalIP) - except ValueError as e: - error "parseIpAddress() exception", err = e.msg - return +method mapNatPorts*( + m: NatMapper +): Future[Option[(Port, Port)]] {.async: (raises: [CancelledError]), base, gcsafe.} = + let signal = ThreadSignalPtr.new().valueOr: + warn "Failed to create ThreadSignalPtr for NAT port mapping" + return none((Port, Port)) -# This queries the routing table to get the "preferred source" attribute and -# checks if it's a public IP. If so, then it's our public IP. -# -# Further more, we check if the bind address (user provided, or a "0.0.0.0" -# default) is a public IP. That's a long shot, because code paths involving a -# user-provided bind address are not supposed to get here. -proc getRoutePrefSrc(bindIp: IpAddress): (Option[IpAddress], PrefSrcStatus) = - let bindAddress = initTAddress(bindIp, Port(0)) + var ctx = cast[ptr MapNatPortsCtx](createShared(MapNatPortsCtx)) + ctx[] = MapNatPortsCtx( + natConfig: m.natConfig, + tcpPort: m.tcpPort, + discoveryPort: m.discoveryPort, + signal: signal, + ) - if bindAddress.isAnyLocal(): - let ip = getRouteIpv4() - if ip.isErr(): - # No route was found, log error and continue without IP. - error "No routable IP address found, check your network connection", - error = ip.error - return (none(IpAddress), NoRoutingInfo) - elif ip.get().isGlobalUnicast(): - return (some(ip.get()), PrefSrcIsPublic) - else: - return (none(IpAddress), PrefSrcIsPrivate) - elif bindAddress.isGlobalUnicast(): - return (some(bindIp), BindAddressIsPublic) - else: - return (none(IpAddress), BindAddressIsPrivate) + var thread: Thread[ptr MapNatPortsCtx] + var threadStarted = false + defer: + if threadStarted: + # Blocking the event loop here is acceptable: UPnP discover() is bounded + # by UPNP_TIMEOUT (200ms), so the worst-case stall is ~200ms. + joinThread(thread) + # Always sync hasUpnpMapping back, even on timeout or cancellation. + # If the thread mapped ports just after the timeout, close() will + # still clean them up on the router. + if ctx.hasUpnpMapping: + m.hasUpnpMapping = true + freeShared(ctx) + discard signal.close() -# Try to detect a public IP assigned to this host, before trying NAT traversal. -proc getPublicRoutePrefSrcOrExternalIP*( - natStrategy: NatStrategy, bindIp: IpAddress, quiet = true -): Option[IpAddress] = - let (prefSrcIp, prefSrcStatus) = getRoutePrefSrc(bindIp) - - case prefSrcStatus - of NoRoutingInfo, PrefSrcIsPublic, BindAddressIsPublic: - return prefSrcIp - of PrefSrcIsPrivate, BindAddressIsPrivate: - let extIp = getExternalIP(natStrategy, quiet) - if extIp.isSome: - return some(extIp.get) - -proc doPortMapping( - strategy: NatStrategy, tcpPort, udpPort: Port, description: string -): Option[(Port, Port)] {.gcsafe.} = - var - extTcpPort: Port - extUdpPort: Port - - if strategy == NatStrategy.NatUpnp: - for t in [(tcpPort, UPNPProtocol.TCP), (udpPort, UPNPProtocol.UDP)]: - let - (port, protocol) = t - pmres = upnp.addPortMapping( - externalPort = $port, - protocol = protocol, - internalHost = upnp.lanAddr, - internalPort = $port, - desc = description, - leaseDuration = 0, - ) - if pmres.isErr: - error "UPnP port mapping", msg = pmres.error, port - return - else: - # let's check it - let cres = - upnp.getSpecificPortMapping(externalPort = $port, protocol = protocol) - if cres.isErr: - warn "UPnP port mapping check failed. Assuming the check itself is broken and the port mapping was done.", - msg = cres.error - - info "UPnP: added port mapping", - externalPort = port, internalPort = port, protocol = protocol - case protocol - of UPNPProtocol.TCP: - extTcpPort = port - of UPNPProtocol.UDP: - extUdpPort = port - elif strategy == NatStrategy.NatPmp: - for t in [(tcpPort, NatPmpProtocol.TCP), (udpPort, NatPmpProtocol.UDP)]: - let - (port, protocol) = t - pmres = npmp.addPortMapping( - eport = port.cushort, - iport = port.cushort, - protocol = protocol, - lifetime = NATPMP_LIFETIME, - ) - if pmres.isErr: - error "NAT-PMP port mapping", msg = pmres.error, port - return - else: - let extPort = Port(pmres.value) - info "NAT-PMP: added port mapping", - externalPort = extPort, internalPort = port, protocol = protocol - case protocol - of NatPmpProtocol.TCP: - extTcpPort = extPort - of NatPmpProtocol.UDP: - extUdpPort = extPort - return some((extTcpPort, extUdpPort)) - -proc repeatPortMapping(args: PortMappingArgs) {.thread, raises: [ValueError].} = - ignoreSignalsInThread() - let - (strategy, tcpPort, udpPort, description) = args - interval = initDuration(seconds = PORT_MAPPING_INTERVAL) - sleepDuration = 1_000 # in ms, also the maximum delay after pressing Ctrl-C - - var lastUpdate = now() - - # We can't use copies of Miniupnp and NatPmp objects in this thread, because they share - # C pointers with other instances that have already been garbage collected, so - # we use threadvars instead and initialise them again with getExternalIP(), - # even though we don't need the external IP's value. - let ipres = getExternalIP(strategy, quiet = true) - if ipres.isSome: - while natClosed.load() == false: - let currTime = now() - if currTime >= (lastUpdate + interval): - discard doPortMapping(strategy, tcpPort, udpPort, description) - lastUpdate = currTime - - sleep(sleepDuration) - -proc stopNatThreads() {.noconv.} = - # stop the thread - debug "Stopping NAT port mapping renewal threads" try: - natClosed.store(true) - joinThreads(natThreads) - except Exception as exc: - warn "Failed to stop NAT port mapping renewal thread", exc = exc.msg + createThread(thread, mapNatPortsThread, ctx) + threadStarted = true + except ValueError, ResourceExhaustedError: + warn "Failed to create thread for NAT port mapping" + return none((Port, Port)) - # delete our port mappings + try: + if not await signal.wait().withTimeout(NatPortMappingTimeout): + warn "NAT port mapping thread timed out" + return none((Port, Port)) + except CancelledError as exc: + raise exc + except AsyncError as exc: + warn "Error waiting for NAT port mapping thread", error = exc.msg + return none((Port, Port)) - # FIXME: if the initial port mapping failed because it already existed for the - # required external port, we should not delete it. It might have been set up - # by another program. + return ctx.result - # In Windows, a new thread is created for the signal handler, so we need to - # initialise our threadvars again. +method handleNatStatus*( + m: NatMapper, + networkReachability: NetworkReachability, + dialBackAddr: Opt[MultiAddress], + discoveryPort: Port, + discovery: Discovery, + switch: Switch, + autoRelayService: AutoRelayService, +) {.async: (raises: [CancelledError]), base, gcsafe.} = + case networkReachability + of Unknown: + discard + of Reachable: + if dialBackAddr.isNone: + warn "Got empty dialback address in AutoNat when node is Reachable" + return - let ipres = getExternalIP(strategy, quiet = true) - if ipres.isSome: - if strategy == NatStrategy.NatUpnp: - for entry in activeMappings: - for t in [ - (entry.externalTcpPort, entry.internalTcpPort, UPNPProtocol.TCP), - (entry.externalUdpPort, entry.internalUdpPort, UPNPProtocol.UDP), - ]: - let - (eport, iport, protocol) = t - pmres = upnp.deletePortMapping(externalPort = $eport, protocol = protocol) - if pmres.isErr: - error "UPnP port mapping deletion", msg = pmres.error - else: - debug "UPnP: deleted port mapping", - externalPort = eport, internalPort = iport, protocol = protocol - elif strategy == NatStrategy.NatPmp: - for entry in activeMappings: - for t in [ - (entry.externalTcpPort, entry.internalTcpPort, NatPmpProtocol.TCP), - (entry.externalUdpPort, entry.internalUdpPort, NatPmpProtocol.UDP), - ]: - let - (eport, iport, protocol) = t - pmres = npmp.deletePortMapping( - eport = eport.cushort, iport = iport.cushort, protocol = protocol - ) - if pmres.isErr: - error "NAT-PMP port mapping deletion", msg = pmres.error - else: - debug "NAT-PMP: deleted port mapping", - externalPort = eport, internalPort = iport, protocol = protocol + if autoRelayService.isRunning: + if not await autoRelayService.stop(switch): + debug "AutoRelayService stop method returned false" -proc redirectPorts*( - strategy: NatStrategy, tcpPort, udpPort: Port, description: string -): Option[(Port, Port)] = - result = doPortMapping(strategy, tcpPort, udpPort, description) - if result.isSome: - let (externalTcpPort, externalUdpPort) = result.get() - # needed by NAT-PMP on port mapping deletion - # Port mapping works. Let's launch a thread that repeats it, in case the - # NAT-PMP lease expires or the router is rebooted and forgets all about - # these mappings. - activeMappings.add( - PortMappings( - internalTcpPort: tcpPort, - externalTcpPort: externalTcpPort, - internalUdpPort: udpPort, - externalUdpPort: externalUdpPort, - description: description, - ) - ) - try: - natThreads.add(Thread[PortMappingArgs]()) - natThreads[^1].createThread( - repeatPortMapping, (strategy, externalTcpPort, externalUdpPort, description) - ) - # atexit() in disguise - if natThreads.len == 1: - # we should register the thread termination function only once - addExitProc(stopNatThreads) - except Exception as exc: - warn "Failed to create NAT port mapping renewal thread", exc = exc.msg + discovery.updateRecords(@[dialBackAddr.get], discoveryPort) + # TODO: switch DHT to server mode + of NotReachable: + var hasPortMapping = false -proc setupNat*( - natStrategy: NatStrategy, tcpPort, udpPort: Port, clientId: string -): tuple[ip: Option[IpAddress], tcpPort, udpPort: Option[Port]] = - ## Setup NAT port mapping and get external IP address. - ## If any of this fails, we don't return any IP address but do return the - ## original ports as best effort. - ## TODO: Allow for tcp or udp port mapping to be optional. - if extIp.isNone: - extIp = getExternalIP(natStrategy) - if extIp.isSome: - let ip = extIp.get - let extPorts = ( - {.gcsafe.}: - redirectPorts( - strategy, tcpPort = tcpPort, udpPort = udpPort, description = clientId - ) - ) - if extPorts.isSome: - let (extTcpPort, extUdpPort) = extPorts.get() - (ip: some(ip), tcpPort: some(extTcpPort), udpPort: some(extUdpPort)) + if dialBackAddr.isNone: + warn "Got empty dialback address in AutoNat when node is NotReachable" else: - warn "UPnP/NAT-PMP available but port forwarding failed" - (ip: none(IpAddress), tcpPort: some(tcpPort), udpPort: some(udpPort)) - else: - warn "UPnP/NAT-PMP not available" - (ip: none(IpAddress), tcpPort: some(tcpPort), udpPort: some(udpPort)) + let maybePorts = await m.mapNatPorts() -proc setupAddress*( - natConfig: NatConfig, bindIp: IpAddress, tcpPort, udpPort: Port, clientId: string -): tuple[ip: Option[IpAddress], tcpPort, udpPort: Option[Port]] {.gcsafe.} = - ## Set-up of the external address via any of the ways as configured in - ## `NatConfig`. In case all fails an error is logged and the bind ports are - ## selected also as external ports, as best effort and in hope that the - ## external IP can be figured out by other means at a later stage. - ## TODO: Allow for tcp or udp bind ports to be optional. + if maybePorts.isSome: + let (tcpPort, udpPort) = maybePorts.get() + let announceAddress = dialBackAddr.get.remapAddr(port = some(tcpPort)) - if natConfig.hasExtIp: - # any required port redirection must be done by hand - return (some(natConfig.extIp), some(tcpPort), some(udpPort)) + # TODO: Try a dial me to make sure we are reachable - case natConfig.nat - of NatStrategy.NatAny: - let (prefSrcIp, prefSrcStatus) = getRoutePrefSrc(bindIp) + if autoRelayService.isRunning: + if not await autoRelayService.stop(switch): + debug "AutoRelayService stop method returned false" - case prefSrcStatus - of NoRoutingInfo, PrefSrcIsPublic, BindAddressIsPublic: - return (prefSrcIp, some(tcpPort), some(udpPort)) - of PrefSrcIsPrivate, BindAddressIsPrivate: - return setupNat(natConfig.nat, tcpPort, udpPort, clientId) - of NatStrategy.NatNone: - let (prefSrcIp, prefSrcStatus) = getRoutePrefSrc(bindIp) + discovery.updateRecords(@[announceAddress], udpPort) + hasPortMapping = true - case prefSrcStatus - of NoRoutingInfo, PrefSrcIsPublic, BindAddressIsPublic: - return (prefSrcIp, some(tcpPort), some(udpPort)) - of PrefSrcIsPrivate: - error "No public IP address found. Should not use --nat:none option" - return (none(IpAddress), some(tcpPort), some(udpPort)) - of BindAddressIsPrivate: - error "Bind IP is not a public IP address. Should not use --nat:none option" - return (none(IpAddress), some(tcpPort), some(udpPort)) - of NatStrategy.NatUpnp, NatStrategy.NatPmp: - return setupNat(natConfig.nat, tcpPort, udpPort, clientId) + if not hasPortMapping and not autoRelayService.isRunning: + if not await autoRelayService.setup(switch): + debug "AutoRelayService setup method returned false" -proc nattedAddress*( - natConfig: NatConfig, addrs: seq[MultiAddress], udpPort: Port -): tuple[libp2p, discovery: seq[MultiAddress]] = - ## Takes a NAT configuration, sequence of multiaddresses and UDP port and returns: - ## - Modified multiaddresses with NAT-mapped addresses for libp2p - ## - Discovery addresses with NAT-mapped UDP ports +proc close*(m: NatMapper, device = UpnpDevice()) = + # UPnP mappings are permanent (leaseDuration=0) and must be deleted explicitly. + # NAT-PMP mappings expire automatically after NATPMP_LIFETIME seconds. + if not m.hasUpnpMapping: + return - var discoveryAddrs = newSeq[MultiAddress](0) - let newAddrs = addrs.mapIt: - block: - # Extract IP address and port from the multiaddress - let (ipPart, port) = getAddressAndPort(it) - if ipPart.isSome and port.isSome: - # Try to setup NAT mapping for the address - let (newIP, tcp, udp) = - setupAddress(natConfig, ipPart.get, port.get, udpPort, "storage") - if newIP.isSome: - # NAT mapping successful - add discovery address with mapped UDP port - discoveryAddrs.add(getMultiAddrWithIPAndUDPPort(newIP.get, udp.get)) - # Remap original address with NAT IP and TCP port - it.remapAddr(ip = newIP, port = tcp) - else: - # NAT mapping failed - use original address - echo "Failed to get external IP, using original address", it - discoveryAddrs.add(getMultiAddrWithIPAndUDPPort(ipPart.get, udpPort)) - it - else: - # Invalid multiaddress format - return as is - it - (newAddrs, discoveryAddrs) + # deletePortMapping requires the IGD control URL set during init + let deviceRes = device.init() + if deviceRes.isErr: + warn "UPnP reinit failed during cleanup, port mappings may remain", + msg = deviceRes.error + return + + for (port, proto) in [ + (m.tcpPort, NatIpProtocol.Tcp), (m.discoveryPort, NatIpProtocol.Udp) + ]: + let res = deviceRes.value.deletePortMapping(port, proto) + if res.isErr: + error "UPnP port mapping deletion failed", port, proto, msg = res.error + +proc findReachableNodes*(bootstrapNodes: seq[SignedPeerRecord]): seq[SignedPeerRecord] = + ## Returns the list of nodes known to be directly reachable. + ## Currently returns bootstrap nodes. In the future, any network participant + ## confirmed reachable by AutoNAT could be included. + bootstrapNodes diff --git a/storage/rest/api.nim b/storage/rest/api.nim index 865591fc..5047686d 100644 --- a/storage/rest/api.nim +++ b/storage/rest/api.nim @@ -23,6 +23,7 @@ import pkg/confutils import pkg/libp2p import pkg/libp2p/routing_record +import pkg/libp2p/protocols/connectivity/autonatv2/service import pkg/codexdht/discv5/spr as spr import ../logutils @@ -557,7 +558,12 @@ proc initNodeApi(node: StorageNodeRef, conf: StorageConf, router: var RestRouter return RestApiResponse.error(Http500, "Unknown error dialling peer", headers = headers) -proc initDebugApi(node: StorageNodeRef, conf: StorageConf, router: var RestRouter) = +proc initDebugApi( + node: StorageNodeRef, + conf: StorageConf, + autonat: Option[AutonatV2Service], + router: var RestRouter, +) = let allowedOrigin = router.allowedOrigin router.api(MethodGet, "/api/storage/v1/debug/info") do() -> RestApiResponse: @@ -577,6 +583,13 @@ proc initDebugApi(node: StorageNodeRef, conf: StorageConf, router: var RestRoute "announceAddresses": node.discovery.announceAddrs, "table": table, "storage": {"version": $storageVersion, "revision": $storageRevision}, + "nat": { + "reachability": + if autonat.isSome: + $autonat.get.networkReachability + else: + "unknown" + }, } # return pretty json for human readability @@ -637,12 +650,13 @@ proc initRestApi*( node: StorageNodeRef, conf: StorageConf, repoStore: RepoStore, + autonat: Option[AutonatV2Service], corsAllowedOrigin: ?string, ): RestRouter = var router = RestRouter.init(validate, corsAllowedOrigin) initDataApi(node, repoStore, router) initNodeApi(node, conf, router) - initDebugApi(node, conf, router) + initDebugApi(node, conf, autonat, router) return router diff --git a/storage/storage.nim b/storage/storage.nim index f332f530..26aa205d 100644 --- a/storage/storage.nim +++ b/storage/storage.nim @@ -17,6 +17,9 @@ import pkg/chronos import pkg/taskpools import pkg/presto import pkg/libp2p +import pkg/libp2p/protocols/connectivity/autonatv2/[service, client] +import pkg/libp2p/protocols/connectivity/relay/client as relayClientModule +import pkg/libp2p/services/autorelayservice import pkg/confutils import pkg/confutils/defs import pkg/stew/io2 @@ -33,7 +36,6 @@ import ./blockexchange import ./utils/fileutils import ./discovery import ./utils/addrutils -import ./utils/natutils import ./namespaces import ./storagetypes import ./logutils @@ -51,6 +53,9 @@ type repoStore: RepoStore maintenance: BlockMaintainer taskpool: Taskpool + autonatService*: Option[AutonatV2Service] + autoRelayService: AutoRelayService + natMapper: NatMapper isStarted: bool StoragePrivateKey* = libp2p.PrivateKey # alias @@ -76,25 +81,40 @@ proc start*(s: StorageServer) {.async.} = await s.storageNode.switch.start() - let (announceAddrs, discoveryAddrs) = nattedAddress( - s.config.nat, s.storageNode.switch.peerInfo.addrs, s.config.discoveryPort - ) + let announceAddrs = + if s.config.nat.hasExtIp: + # extip means that we assume the IP is reachable + # So we just take the first peer addr and remap it with extip to keep the port only + if s.storageNode.switch.peerInfo.addrs.len == 0: + raise + newException(StorageError, "extip is set but switch has no listen addresses") + @[ + s.storageNode.switch.peerInfo.addrs[0].remapAddr( + ip = some(s.config.nat.extIp), port = none(Port) + ) + ] + else: + # If extip is not set, we have 2 choices: + # 1- Announce the peer addrs contains detected addresses on the machine. + # 2- Wait for AutoNat + # The problem with 1 is that you will certainly announce private addresses + # and if you advertise a CID, you will advertise these private addresses. + # TODO: DHT client mode + #s.storageNode.switch.peerInfo.addrs + @[] - var hasPublicAddr = false - for announceAddr in announceAddrs: - let (maybeIp, _) = getAddressAndPort(announceAddr) - if maybeIp.isSome and maybeIp.get.isGlobalUnicast(): - hasPublicAddr = true - break - - if not hasPublicAddr: - warn "Unable to determine a public IP address. This node will only be reachable on a private network." - - s.storageNode.discovery.updateAnnounceRecord(announceAddrs) - s.storageNode.discovery.updateDhtRecord(discoveryAddrs) + s.storageNode.discovery.updateRecords(announceAddrs, s.config.discoveryPort) await s.storageNode.start() + for spr in findReachableNodes(s.config.bootstrapNodes): + try: + let addrs = spr.data.addresses.mapIt(it.address) + await s.storageNode.switch.connect(spr.data.peerId, addrs) + except CatchableError as e: + warn "Cannot connect to bootstrap node", error = e.msg + discard + if s.restServer != nil: s.restServer.start() @@ -107,6 +127,9 @@ proc stop*(s: StorageServer) {.async.} = notice "Stopping Storage node" + if s.natMapper != nil: + s.natMapper.close() + var futures = @[ s.storageNode.switch.stop(), s.storageNode.stop(), @@ -171,6 +194,27 @@ proc new*( ## create StorageServer including setting up datastore, repostore, etc let listenMultiAddr = getMultiAddrWithIpAndTcpPort(config.listenIp, config.listenPort) + let relayClient = relayClientModule.RelayClient.new(canHop = config.relay) + + let autonatClient = AutonatV2Client.new(random.Rng.instance()) + let autonatService = + if config.nat.hasExtIp: + none(AutonatV2Service) + else: + some( + AutonatV2Service.new( + rng = random.Rng.instance(), + client = autonatClient, + config = AutonatV2ServiceConfig.new( + scheduleInterval = Opt.some(config.natScheduleInterval), + askNewConnectedPeers = true, + numPeersToAsk = config.natNumPeersToAsk, + maxQueueSize = config.natMaxQueueSize, + minConfidence = config.natMinConfidence, + ), + ) + ) + let switch = SwitchBuilder .new() .withPrivateKey(privateKey) @@ -182,8 +226,19 @@ proc new*( .withAgentVersion(config.agentString) .withSignedPeerRecord(true) .withTcpTransport({ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay}) + .withAutonatV2Server() + .withCircuitRelay(relayClient) + .withServices( + if autonatService.isSome: + @[Service(autonatService.get)] + else: + @[] + ) .build() + autonatClient.setup(switch) + switch.mount(autonatClient) + var cache: CacheStore = nil taskPool: Taskpool @@ -222,8 +277,9 @@ proc new*( discovery = Discovery.new( switch.peerInfo.privateKey, - announceAddrs = @[listenMultiAddr], + announceAddrs = @[], bindPort = config.discoveryPort, + discoveryPort = config.discoveryPort, bootstrapNodes = config.bootstrapNodes, store = discoveryStore, ) @@ -285,12 +341,24 @@ proc new*( taskPool = taskPool, ) + autoRelayService = AutoRelayService.new( + maxNumRelays = config.natMaxRelays, + client = relayClient, + onReservation = proc(addresses: seq[MultiAddress]) {.gcsafe, raises: [].} = + debug "Relay reservation updated", addresses + # relay addresses are for download traffic only, not DHT routing + discovery.updateAnnounceRecord(addresses), + rng = random.Rng.instance(), + ) + var restServer: RestServerRef = nil if config.apiBindAddress.isSome: restServer = RestServerRef .new( - storageNode.initRestApi(config, repoStore, config.apiCorsAllowedOrigin), + storageNode.initRestApi( + config, repoStore, autonatService, config.apiCorsAllowedOrigin + ), initTAddress(config.apiBindAddress.get(), config.apiPort), bufferSize = (1024 * 64), maxRequestBodySize = int.high, @@ -300,6 +368,25 @@ proc new*( switch.mount(network) switch.mount(manifestProto) + let natMapper = NatMapper( + natConfig: config.nat, + tcpPort: config.listenPort, + discoveryPort: config.discoveryPort, + ) + if autonatService.isSome: + autonatService.get.setStatusAndConfidenceHandler( + proc( + networkReachability: NetworkReachability, + confidence: Opt[float], + addrs: Opt[MultiAddress], + ) {.async: (raises: [CancelledError]).} = + debug "AutoNAT status", reachability = networkReachability, confidence + await natMapper.handleNatStatus( + networkReachability, addrs, config.discoveryPort, discovery, switch, + autoRelayService, + ) + ) + StorageServer( config: config, storageNode: storageNode, @@ -308,4 +395,7 @@ proc new*( maintenance: maintenance, taskPool: taskPool, logFile: logFile, + autonatService: autonatService, + autoRelayService: autoRelayService, + natMapper: natMapper, ) diff --git a/storage/stores/networkstore.nim b/storage/stores/networkstore.nim index 8c781f6b..6767f042 100644 --- a/storage/stores/networkstore.nim +++ b/storage/stores/networkstore.nim @@ -9,6 +9,8 @@ {.push raises: [].} +import std/options + import pkg/chronos import pkg/libp2p import pkg/questionable/results diff --git a/storage/utils/addrutils.nim b/storage/utils/addrutils.nim index 31570e06..47204889 100644 --- a/storage/utils/addrutils.nim +++ b/storage/utils/addrutils.nim @@ -14,14 +14,14 @@ import std/strutils import std/options import pkg/libp2p -import pkg/stew/endians2 func remapAddr*( address: MultiAddress, ip: Option[IpAddress] = IpAddress.none, port: Option[Port] = Port.none, + protocol: Option[string] = string.none, ): MultiAddress = - ## Remap addresses to new IP and/or Port + ## Remap addresses to new IP, port, and/or transport protocol (e.g. "tcp" → "udp") ## var parts = ($address).split("/") @@ -32,6 +32,12 @@ func remapAddr*( else: parts[2] + parts[3] = + if protocol.isSome: + protocol.get + else: + parts[3] + parts[4] = if port.isSome: $port.get @@ -67,38 +73,3 @@ proc getMultiAddrWithIpAndTcpPort*(ip: IpAddress, port: Port): MultiAddress = return MultiAddress.init(ipFamily & $ip & "/tcp/" & $port).expect( "Failed to construct multiaddress with IP and TCP port" ) - -proc getAddressAndPort*( - ma: MultiAddress -): tuple[ip: Option[IpAddress], port: Option[Port]] = - try: - # Try IPv4 first - let ipv4Result = ma[multiCodec("ip4")] - let ip = - if ipv4Result.isOk: - let ipBytes = ipv4Result.get().protoArgument().expect("Invalid IPv4 format") - let ipArray = [ipBytes[0], ipBytes[1], ipBytes[2], ipBytes[3]] - some(IpAddress(family: IPv4, address_v4: ipArray)) - else: - # Try IPv6 if IPv4 not found - let ipv6Result = ma[multiCodec("ip6")] - if ipv6Result.isOk: - let ipBytes = ipv6Result.get().protoArgument().expect("Invalid IPv6 format") - var ipArray: array[16, byte] - for i in 0 .. 15: - ipArray[i] = ipBytes[i] - some(IpAddress(family: IPv6, address_v6: ipArray)) - else: - none(IpAddress) - - # Get TCP Port - let portResult = ma[multiCodec("tcp")] - let port = - if portResult.isOk: - let portBytes = portResult.get().protoArgument().expect("Invalid port format") - some(Port(fromBytesBE(uint16, portBytes))) - else: - none(Port) - (ip: ip, port: port) - except Exception: - (ip: none(IpAddress), port: none(Port)) diff --git a/storage/utils/natutils.nim b/storage/utils/natutils.nim index 45ad7589..45abd178 100644 --- a/storage/utils/natutils.nim +++ b/storage/utils/natutils.nim @@ -1,66 +1,205 @@ {.push raises: [].} -import std/[net, tables, hashes], pkg/results, chronos, chronicles +import std/[options, net] +import nat_traversal/[miniupnpc, natpmp] +import pkg/chronicles +import results -import pkg/libp2p +export miniupnpc, natpmp, results, options, net + +logScope: + topics = "nat" + +const UPNP_TIMEOUT* = 200 # ms +const NATPMP_LIFETIME* = 60 * 60 # seconds type NatStrategy* = enum - NatAny - NatUpnp - NatPmp - NatNone + NatAuto -type IpLimits* = object - limit*: uint - ips: Table[IpAddress, uint] +type NatIpProtocol* = enum + Tcp + Udp -func hash*(ip: IpAddress): Hash = - case ip.family - of IpAddressFamily.IPv6: - hash(ip.address_v6) - of IpAddressFamily.IPv4: - hash(ip.address_v4) +# Generic Nat device can be UPnP or PmP +type NatDevice* = ref object of RootObj -func inc*(ipLimits: var IpLimits, ip: IpAddress): bool = - let val = ipLimits.ips.getOrDefault(ip, 0) - if val < ipLimits.limit: - ipLimits.ips[ip] = val + 1 - true - else: - false +type UpnpDevice* = ref object of NatDevice + upnp: Miniupnp -func dec*(ipLimits: var IpLimits, ip: IpAddress) = - let val = ipLimits.ips.getOrDefault(ip, 0) - if val == 1: - ipLimits.ips.del(ip) - elif val > 1: - ipLimits.ips[ip] = val - 1 +type PmpDevice* = ref object of NatDevice + npmp: NatPmp -func isGlobalUnicast*(address: TransportAddress): bool = - if address.isGlobal() and address.isUnicast(): true else: false +# appPortMapping is specific to the type of Nat device +method addPortMapping*( + d: NatDevice, port: Port, proto: NatIpProtocol +): Result[Port, string] {.base, gcsafe.} = + return err("not implemented") -func isGlobalUnicast*(address: IpAddress): bool = - let a = initTAddress(address, Port(0)) - a.isGlobalUnicast() +# Creates the mapping the the router and +# returns the opened ports. +method mapPorts*( + d: NatDevice, tcpPort, udpPort: Port +): Option[(Port, Port)] {.base, gcsafe.} = + var extTcpPort, extUdpPort: Port -proc getRouteIpv4*(): Result[IpAddress, cstring] = - # Avoiding Exception with initTAddress and can't make it work with static. - # Note: `publicAddress` is only used an "example" IP to find the best route, - # no data is send over the network to this IP! - let - publicAddress = TransportAddress( - family: AddressFamily.IPv4, address_v4: [1'u8, 1, 1, 1], port: Port(0) - ) - route = getBestRoute(publicAddress) + for t in [(tcpPort, NatIpProtocol.Tcp), (udpPort, NatIpProtocol.Udp)]: + let (port, proto) = t + let pmres = d.addPortMapping(port, proto) - if route.source.isUnspecified(): - err("No best ipv4 route found") - else: - let ip = - try: - route.source.address() - except ValueError as e: - # This should not occur really. - error "Address conversion error", exception = e.name, msg = e.msg - return err("Invalid IP address") - ok(ip) + if pmres.isErr: + error "port mapping failed", msg = pmres.error + return none((Port, Port)) + + case proto + of Tcp: + extTcpPort = pmres.value + of Udp: + extUdpPort = pmres.value + + return some((extTcpPort, extUdpPort)) + +method getSpecificPortMapping*( + d: UpnpDevice, externalPort: string, protocol: UPNPProtocol +): Result[PortMappingRes, cstring] {.base, gcsafe.} = + if d.upnp == nil: + return err(cstring("upnp not initialized")) + + d.upnp.getSpecificPortMapping(externalPort = externalPort, protocol = protocol) + +method discover*(d: UpnpDevice): Result[int, cstring] {.base, gcsafe.} = + if d.upnp == nil: + return err(cstring("upnp not initialized")) + + return d.upnp.discover() + +method selectIGD*(d: UpnpDevice): SelectIGDResult {.base, gcsafe.} = + if d.upnp == nil: + return IGDNotFound + + return d.upnp.selectIGD() + +proc init*(T: type UpnpDevice): Result[UpnpDevice, string] {.gcsafe.} = + UpnpDevice().init() + +# Init UPnP device and create miniupnp instance. +# It call "discover" to retrieve the UPnP devices on the network, +# and then "selectIGD" to select a suitable device. +proc init*(d: UpnpDevice): Result[UpnpDevice, string] {.gcsafe.} = + if d.upnp == nil: + d.upnp = newMiniupnp() + + d.upnp.discoverDelay = UPNP_TIMEOUT + + let dres = d.discover() + if dres.isErr: + debug "UPnP", msg = dres.error + return err($dres.error) + + case d.selectIGD() + of IGDNotFound: + debug "UPnP", msg = "Internet Gateway Device not found. Giving up." + return err("IGD not found") + of IGDFound: + debug "UPnP", msg = "Internet Gateway Device found." + of IGDNotConnected: + debug "UPnP", + msg = "Internet Gateway Device found but it's not connected. Trying anyway." + of NotAnIGD: + debug "UPnP", + msg = + "Some device found, but it's not recognised as an Internet Gateway Device. Trying anyway." + of IGDIpNotRoutable: + debug "UPnP", + msg = + "Internet Gateway Device found and is connected, but with a reserved or non-routable IP. Trying anyway." + + return ok(d) + +# For UPnP, the external port is the same as the application port. +# This should work for most of the case. +# We could change this by using addAnyPortMapping for IGD2 compatible routers +# if needed. +method addPortMapping*( + d: UpnpDevice, port: Port, proto: NatIpProtocol +): Result[Port, string] {.gcsafe.} = + if d.upnp == nil: + return err("upnp not initialized") + + let protocol = if proto == NatIpProtocol.Tcp: UPNPProtocol.TCP else: UPNPProtocol.UDP + let pmres = d.upnp.addPortMapping( + externalPort = $port, + protocol = protocol, + internalHost = d.upnp.lanAddr, + internalPort = $port, + desc = "logos-storage", + leaseDuration = 0, + ) + if pmres.isErr: + return err($pmres.error) + + let cres = d.getSpecificPortMapping(externalPort = $port, protocol = protocol) + if cres.isErr: + # Eventually, the check could fail on some router even if the router is successful. + # So we log a warning but we still want to continue because it is not sure it is a failure. + warn "UPnP port mapping check failed. Assuming the check itself is broken and the port mapping was done.", + msg = cres.error + + info "UPnP: added port mapping", externalPort = port, internalPort = port + + return ok(port) + +method deletePortMapping*( + d: UpnpDevice, port: Port, proto: NatIpProtocol +): Result[void, string] {.base, gcsafe.} = + if d.upnp == nil: + return err("upnp not initialized") + + let protocol = if proto == NatIpProtocol.Tcp: UPNPProtocol.TCP else: UPNPProtocol.UDP + let res = d.upnp.deletePortMapping(externalPort = $port, protocol = protocol) + if res.isErr: + return err($res.error) + + debug "UPnP: deleted port mapping", port, proto + + ok() + +proc init*(T: type PmpDevice): Result[PmpDevice, string] {.gcsafe.} = + PmpDevice().init() + +# Create a NatPmP instance. +proc init*(d: PmpDevice): Result[PmpDevice, string] {.gcsafe.} = + if d.npmp == nil: + d.npmp = newNatPmp() + + let res = d.npmp.init() + if res.isErr: + debug "NAT-PMP", msg = res.error + return err($res.error) + + return ok(d) + +# Add a port mapping on NAT-PMP device. +# The application port might not be the external port. +# The latter is returned. +method addPortMapping*( + d: PmpDevice, port: Port, proto: NatIpProtocol +): Result[Port, string] {.gcsafe.} = + if d.npmp == nil: + return err("npmp not initialized") + + let protocol = + if proto == NatIpProtocol.Tcp: NatPmpProtocol.TCP else: NatPmpProtocol.UDP + let pmres = d.npmp.addPortMapping( + eport = port.cushort, + iport = port.cushort, + protocol = protocol, + lifetime = NATPMP_LIFETIME, + ) + if pmres.isErr: + return err(pmres.error) + + let extPort = Port(pmres.value) + + info "NAT-PMP: added port mapping", externalPort = extPort, internalPort = port + + return ok(extPort) diff --git a/tests/integration/1_minute/testnat.nim b/tests/integration/1_minute/testnat.nim new file mode 100644 index 00000000..50c0467d --- /dev/null +++ b/tests/integration/1_minute/testnat.nim @@ -0,0 +1,32 @@ +import std/json +import std/options +import pkg/chronos +import pkg/questionable/results + +import ../multinodes +import ../storageclient +import ../storageconfig + +multinodesuite "AutoNAT integration": + let natConfig = NodeConfigs( + clients: StorageConfigs + .init(nodes = 2) + .withNatNumPeersToAsk(1) + .withNatMinConfidence(0.5) + .withNatScheduleInterval(10.seconds) + .withNatMaxQueueSize(1) + # .withLogFile() + # .withLogLevel("DEBUG") + .some + ) + + # Reminder: multinodesuite setup the first node as bootstrap node + test "node is reachable when using bootstrap node on same network", natConfig: + let node1 = clients()[0] + let node2 = clients()[1] + + check eventuallySafe( + (await node2.client.natReachability()).get() == "Reachable", + timeout = 30_000, + pollInterval = 500, + ) diff --git a/tests/integration/multinodes.nim b/tests/integration/multinodes.nim index 9d4153bd..4d6d6f62 100644 --- a/tests/integration/multinodes.nim +++ b/tests/integration/multinodes.nim @@ -131,7 +131,6 @@ template multinodesuite*(suiteName: string, body: untyped) = config.addCliOption("--bootstrap-node", bootstrapNode) config.addCliOption("--data-dir", datadir) - config.addCliOption("--nat", "none") except StorageConfigError as e: raiseMultiNodeSuiteError "invalid cli option, error: " & e.msg @@ -214,6 +213,9 @@ template multinodesuite*(suiteName: string, body: untyped) = trace "Setting up test", suite = suiteName, test = currentTestName, nodeConfigs if var clients =? nodeConfigs.clients: failAndTeardownOnError "failed to start client nodes": + # Only the first node (bootstrap) gets a known extip. Other nodes use + # nat=auto so AutoNAT can run and determine their reachability. + clients = clients.withExtIp(0) for config in clients.configs: let node = await startClientNode(config) running.add RunningNode(role: Role.Client, node: node) diff --git a/tests/integration/storageclient.nim b/tests/integration/storageclient.nim index ec990bb9..62d72917 100644 --- a/tests/integration/storageclient.nim +++ b/tests/integration/storageclient.nim @@ -1,4 +1,5 @@ import std/strutils +import std/sequtils from pkg/libp2p import Cid, `$`, init import pkg/questionable/results @@ -260,3 +261,23 @@ proc hasBlockRaw*( .} = let url = client.baseurl & "/data/" & cid & "/exists" return client.get(url) + +proc connectPeer*( + client: StorageClient, peerId: string, addrs: seq[string] +): Future[void] {.async: (raises: [CancelledError, HttpError]).} = + var url = client.baseurl & "/connect/" & peerId + if addrs.len > 0: + url &= "?" & addrs.mapIt("addrs=" & it).join("&") + let response = await client.get(url) + assert response.status == 200 + +proc natReachability*( + client: StorageClient +): Future[?!string] {.async: (raises: [CancelledError, HttpError]).} = + let info = await client.info() + if info.isErr: + return failure "Failed to get node info" + try: + return info.get()["nat"]["reachability"].getStr().success + except KeyError as e: + return failure e.msg diff --git a/tests/integration/storageconfig.nim b/tests/integration/storageconfig.nim index 4aeb6d60..7b218bd3 100644 --- a/tests/integration/storageconfig.nim +++ b/tests/integration/storageconfig.nim @@ -5,6 +5,7 @@ import std/strutils import std/sugar import std/tables from pkg/chronicles import LogLevel +import pkg/chronos import pkg/storage/conf import pkg/storage/units import pkg/confutils @@ -280,3 +281,60 @@ proc withStorageQuota*( for config in startConfig.configs.mitems: config.addCliOption("--storage-quota", $quota) return startConfig + +proc withListenIp*( + self: StorageConfigs, ip: string +): StorageConfigs {.raises: [StorageConfigError].} = + var startConfig = self + for config in startConfig.configs.mitems: + config.addCliOption("--listen-ip", ip) + return startConfig + +proc withNatNumPeersToAsk*( + self: StorageConfigs, numPeersToAsk: int +): StorageConfigs {.raises: [StorageConfigError].} = + var startConfig = self + for config in startConfig.configs.mitems: + config.addCliOption("--nat-num-peers-to-ask", $numPeersToAsk) + return startConfig + +proc withNatMaxQueueSize*( + self: StorageConfigs, maxQueueSize: int +): StorageConfigs {.raises: [StorageConfigError].} = + var startConfig = self + for config in startConfig.configs.mitems: + config.addCliOption("--nat-max-queue-size", $maxQueueSize) + return startConfig + +proc withNatMinConfidence*( + self: StorageConfigs, minConfidence: float +): StorageConfigs {.raises: [StorageConfigError].} = + var startConfig = self + for config in startConfig.configs.mitems: + config.addCliOption("--nat-min-confidence", $minConfidence) + return startConfig + +proc withNatScheduleInterval*( + self: StorageConfigs, scheduleInterval: Duration +): StorageConfigs {.raises: [StorageConfigError].} = + var startConfig = self + for config in startConfig.configs.mitems: + config.addCliOption("--nat-schedule-interval", $scheduleInterval) + return startConfig + +proc withExtIp*( + self: StorageConfigs, idx: int, ip = "127.0.0.1" +): StorageConfigs {.raises: [StorageConfigError].} = + self.checkBounds idx + + var startConfig = self + startConfig.configs[idx].addCliOption("--nat", "extip:" & ip) + return startConfig + +proc withExtIp*( + self: StorageConfigs, ip = "127.0.0.1" +): StorageConfigs {.raises: [StorageConfigError].} = + var startConfig = self + for config in startConfig.configs.mitems: + config.addCliOption("--nat", "extip:" & ip) + return startConfig diff --git a/tests/storage/blockexchange/discovery/testdiscovery.nim b/tests/storage/blockexchange/discovery/testdiscovery.nim index d467361e..471f9e88 100644 --- a/tests/storage/blockexchange/discovery/testdiscovery.nim +++ b/tests/storage/blockexchange/discovery/testdiscovery.nim @@ -1,6 +1,4 @@ -import std/sequtils -import std/sugar -import std/tables +import std/[sequtils, sugar, tables, options] import pkg/chronos diff --git a/tests/storage/blockexchange/testnetwork.nim b/tests/storage/blockexchange/testnetwork.nim index dd035a44..638f6735 100644 --- a/tests/storage/blockexchange/testnetwork.nim +++ b/tests/storage/blockexchange/testnetwork.nim @@ -1,3 +1,4 @@ +import std/importutils import std/[sequtils, tables] import pkg/chronos @@ -12,6 +13,8 @@ import ../../asynctest import ../examples import ../helpers +privateAccess(BlockExcNetwork) + asyncchecksuite "Network - Handlers": let rng = Rng.instance() @@ -185,7 +188,7 @@ asyncchecksuite "Network - Test Limits": switch1 = newStandardSwitch() switch2 = newStandardSwitch() - network1 = BlockExcNetwork.new(switch = switch1, maxInflight = 0) + network1 = BlockExcNetwork.new(switch = switch1, maxInflight = 1) switch1.mount(network1) network2 = BlockExcNetwork.new(switch = switch2) @@ -205,6 +208,8 @@ asyncchecksuite "Network - Test Limits": ): Future[void] {.async: (raises: []).} = check false + await network1.inflightSema.acquire() + let fut = network1.send(switch2.peerInfo.peerId, Message()) await sleepAsync(100.millis) diff --git a/tests/storage/helpers/nodeutils.nim b/tests/storage/helpers/nodeutils.nim index e1951b83..ad06995c 100644 --- a/tests/storage/helpers/nodeutils.nim +++ b/tests/storage/helpers/nodeutils.nim @@ -10,8 +10,6 @@ import pkg/storage/stores import pkg/storage/blocktype as bt import pkg/storage/blockexchange import pkg/storage/systemclock -import pkg/storage/nat -import pkg/storage/utils/natutils import pkg/storage/merkletree import pkg/storage/manifest @@ -218,13 +216,7 @@ proc generateNodes*( if config.enableBootstrap: waitFor switch.peerInfo.update() - let (announceAddrs, discoveryAddrs) = nattedAddress( - NatConfig(hasExtIp: false, nat: NatNone), - switch.peerInfo.addrs, - bindPort.Port, - ) - blockDiscovery.updateAnnounceRecord(announceAddrs) - blockDiscovery.updateDhtRecord(discoveryAddrs) + blockDiscovery.updateRecords(switch.peerInfo.addrs, bindPort.Port) if blockDiscovery.dhtRecord.isSome: bootstrapNodes.add !blockDiscovery.dhtRecord diff --git a/tests/storage/testnat.nim b/tests/storage/testnat.nim index 21faa156..ff3da3cb 100644 --- a/tests/storage/testnat.nim +++ b/tests/storage/testnat.nim @@ -1,43 +1,160 @@ -import std/[unittest, net] +import std/[net, importutils, envvars] import pkg/chronos +import ../../storage/utils/natutils import pkg/libp2p/[multiaddress, multihash, multicodec] +import pkg/libp2p/protocols/connectivity/autonat/types +import pkg/libp2p/protocols/connectivity/relay/client as relayClientModule +import pkg/libp2p/services/autorelayservice except setup + import pkg/results +import ./helpers +import ../asynctest import ../../storage/nat +import ../../storage/discovery +import ../../storage/rng import ../../storage/utils -suite "NAT Address Tests": - test "nattedAddress with local addresses": - # Setup test data - let - udpPort = Port(1234) - natConfig = NatConfig(hasExtIp: true, extIp: parseIpAddress("8.8.8.8")) +privateAccess(NatMapper) - # Create test addresses - localAddr = MultiAddress.init("/ip4/127.0.0.1/tcp/5000").expect("valid multiaddr") - anyAddr = MultiAddress.init("/ip4/0.0.0.0/tcp/5000").expect("valid multiaddr") - publicAddr = - MultiAddress.init("/ip4/192.168.1.1/tcp/5000").expect("valid multiaddr") +type MockUpnpDevice = ref object of UpnpDevice + deletedPorts: seq[(Port, NatIpProtocol)] - # Expected results - let - expectedDiscoveryAddrs = @[ - MultiAddress.init("/ip4/8.8.8.8/udp/1234").expect("valid multiaddr"), - MultiAddress.init("/ip4/8.8.8.8/udp/1234").expect("valid multiaddr"), - MultiAddress.init("/ip4/8.8.8.8/udp/1234").expect("valid multiaddr"), - ] - expectedlibp2pAddrs = @[ - MultiAddress.init("/ip4/8.8.8.8/tcp/5000").expect("valid multiaddr"), - MultiAddress.init("/ip4/8.8.8.8/tcp/5000").expect("valid multiaddr"), - MultiAddress.init("/ip4/8.8.8.8/tcp/5000").expect("valid multiaddr"), - ] +method discover*(d: MockUpnpDevice): Result[int, cstring] {.gcsafe.} = + ok(1) - #ipv6Addr = MultiAddress.init("/ip6/::1/tcp/5000").expect("valid multiaddr") - addrs = @[localAddr, anyAddr, publicAddr] +method selectIGD*(d: MockUpnpDevice): SelectIGDResult {.gcsafe.} = + IGDFound - # Test address remapping - let (libp2pAddrs, discoveryAddrs) = nattedAddress(natConfig, addrs, udpPort) +method deletePortMapping*( + d: MockUpnpDevice, port: Port, proto: NatIpProtocol +): Result[void, string] {.gcsafe.} = + d.deletedPorts.add((port, proto)) + ok() - # Verify results - check(discoveryAddrs == expectedDiscoveryAddrs) - check(libp2pAddrs == expectedlibp2pAddrs) +type MockNatMapper = ref object of NatMapper + mappedPorts: Option[(Port, Port)] + +method mapNatPorts*( + m: MockNatMapper +): Future[Option[(Port, Port)]] {.async: (raises: [CancelledError]), gcsafe.} = + m.mappedPorts + +suite "NAT - NatMapper.close": + test "does nothing when no upnp mapping": + let mapper = MockNatMapper( + natConfig: NatConfig(hasExtIp: false, nat: NatAuto), + tcpPort: Port(8080), + discoveryPort: Port(8090), + ) + let device = MockUpnpDevice() + mapper.close(device) + check device.deletedPorts.len == 0 + + test "deletes tcp and udp ports when upnp mapping exists": + let mapper = MockNatMapper( + natConfig: NatConfig(hasExtIp: false, nat: NatAuto), + tcpPort: Port(8080), + discoveryPort: Port(8090), + ) + mapper.hasUpnpMapping = true + let device = MockUpnpDevice() + mapper.close(device) + check device.deletedPorts == + @[(Port(8080), NatIpProtocol.Tcp), (Port(8090), NatIpProtocol.Udp)] + +asyncchecksuite "NAT - handleNatStatus": + var sw: Switch + var key: PrivateKey + var disc: Discovery + var autoRelay: AutoRelayService + + setup: + autoRelay = + AutoRelayService.new(1, relayClientModule.RelayClient.new(), nil, Rng.instance()) + key = PrivateKey.random(Rng.instance[]).get() + disc = Discovery.new(key, announceAddrs = @[]) + sw = newStandardSwitch() + await sw.start() + + teardown: + await sw.stop() + + if autoRelay.isRunning: + discard await autoRelay.stop(sw) + + let discoveryPort = Port(8090) + + test "handleNatStatus announces mapped address when NotReachable and UPnP succeeds": + let dialBack = MultiAddress.init("/ip4/1.2.3.4/tcp/8080").expect("valid") + let mapper = MockNatMapper(mappedPorts: some((Port(9000), Port(9001)))) + + await mapper.handleNatStatus( + NotReachable, Opt.some(dialBack), discoveryPort, disc, sw, autoRelay + ) + + check disc.announceAddrs == + @[MultiAddress.init("/ip4/1.2.3.4/tcp/9000").expect("valid")] + check not autoRelay.isRunning + + test "handleNatStatus starts autoRelay when NotReachable and UPnP failed": + let mapper = MockNatMapper(mappedPorts: none((Port, Port))) + + await mapper.handleNatStatus( + NotReachable, Opt.none(MultiAddress), discoveryPort, disc, sw, autoRelay + ) + + check autoRelay.isRunning + + test "handleNatStatus starts autoRelay when NotReachable and mapping fails": + let dialBack = MultiAddress.init("/ip4/1.2.3.4/tcp/8080").expect("valid") + let mapper = MockNatMapper(mappedPorts: none((Port, Port))) + + await mapper.handleNatStatus( + NotReachable, Opt.some(dialBack), discoveryPort, disc, sw, autoRelay + ) + + check autoRelay.isRunning + check disc.announceAddrs == newSeq[MultiAddress]() + + test "handleNatStatus does not announce address when Reachable and no dialBackAddr": + let mapper = MockNatMapper(mappedPorts: none((Port, Port))) + + await mapper.handleNatStatus( + Reachable, Opt.none(MultiAddress), discoveryPort, disc, sw, autoRelay + ) + + check disc.announceAddrs == newSeq[MultiAddress]() + check not autoRelay.isRunning + + test "handleNatStatus stops relay and announces dialBackAddr when Reachable": + let dialBack = MultiAddress.init("/ip4/1.2.3.4/tcp/8080").expect("valid") + let mapper = MockNatMapper(mappedPorts: none((Port, Port))) + + discard await autorelayservice.setup(autoRelay, sw) + await mapper.handleNatStatus( + Reachable, Opt.some(dialBack), discoveryPort, disc, sw, autoRelay + ) + + check not autoRelay.isRunning + check disc.announceAddrs == @[dialBack] + +suite "NAT - UPnP port mapping (requires NAT_TEST_UPNP=1)": + test "mapPorts and cleanup": + if getEnv("NAT_TEST_UPNP") != "1": + skip() + return + + let res = UpnpDevice.init() + check res.isOk + + let device = res.value + let ports = device.mapPorts(Port(8101), Port(8090)) + check ports.isSome + + let (tcp, udp) = ports.get() + check tcp == Port(8101) + check udp == Port(8090) + + check device.deletePortMapping(Port(8101), NatIpProtocol.Tcp).isOk + check device.deletePortMapping(Port(8090), NatIpProtocol.Udp).isOk diff --git a/tests/storage/testnatutils.nim b/tests/storage/testnatutils.nim new file mode 100644 index 00000000..10de4d18 --- /dev/null +++ b/tests/storage/testnatutils.nim @@ -0,0 +1,93 @@ +import std/[options, net] +import nat_traversal/[miniupnpc, natpmp] +import pkg/results +import ../asynctest +import ../../storage/utils/natutils + +type MockUpnpDev = ref object of UpnpDevice + discoverOk: bool + igdResult: SelectIGDResult + addPortMappingOk: bool + failOnProto: Option[NatIpProtocol] + +type MockPmpDev = ref object of PmpDevice + addPortMappingOk: bool + mappedPort: Port + +method discover*(d: MockUpnpDev): Result[int, cstring] {.gcsafe.} = + if d.discoverOk: + ok(1) + else: + err(cstring("discover failed")) + +method selectIGD*(d: MockUpnpDev): SelectIGDResult {.gcsafe.} = + d.igdResult + +method addPortMapping*( + d: MockUpnpDev, port: Port, proto: NatIpProtocol +): Result[Port, string] {.gcsafe.} = + if d.failOnProto == some(proto): + err("mapping failed") + elif d.addPortMappingOk: + ok(port) + else: + err("mapping failed") + +method getSpecificPortMapping*( + d: MockUpnpDev, externalPort: string, protocol: UPNPProtocol +): Result[PortMappingRes, cstring] {.gcsafe.} = + ok(PortMappingRes()) + +method addPortMapping*( + d: MockPmpDev, port: Port, proto: NatIpProtocol +): Result[Port, string] {.gcsafe.} = + if d.addPortMappingOk: + ok(d.mappedPort) + else: + err("mapping failed") + +suite "NAT - UpnpDevice.init": + test "returns err when discover fails": + check MockUpnpDev(discoverOk: false).init().isErr + + test "returns err when IGD not found": + check MockUpnpDev(discoverOk: true, igdResult: IGDNotFound).init().isErr + + test "returns ok when IGD found": + check MockUpnpDev(discoverOk: true, igdResult: IGDFound).init().isOk + + test "returns ok when IGD not connected": + check MockUpnpDev(discoverOk: true, igdResult: IGDNotConnected).init().isOk + + test "returns ok when not an IGD": + check MockUpnpDev(discoverOk: true, igdResult: NotAnIGD).init().isOk + + test "returns ok when IP not routable": + check MockUpnpDev(discoverOk: true, igdResult: IGDIpNotRoutable).init().isOk + +suite "NAT - UpnpDevice.mapPorts": + test "returns none when addPortMapping fails": + check MockUpnpDev(addPortMappingOk: false).mapPorts(Port(8080), Port(8090)).isNone + + test "returns mapped ports": + let res = MockUpnpDev(addPortMappingOk: true).mapPorts(Port(8080), Port(8090)) + check res.isSome + check res.get() == (Port(8080), Port(8090)) + + test "returns none when tcp mapping fails": + let d = MockUpnpDev(addPortMappingOk: true, failOnProto: some(NatIpProtocol.Tcp)) + check d.mapPorts(Port(8080), Port(8090)).isNone + + test "returns none when udp mapping fails": + let d = MockUpnpDev(addPortMappingOk: true, failOnProto: some(NatIpProtocol.Udp)) + check d.mapPorts(Port(8080), Port(8090)).isNone + +suite "NAT - PmpDevice.mapPorts": + test "returns none when mapping fails": + check MockPmpDev(addPortMappingOk: false).mapPorts(Port(8080), Port(8090)).isNone + + test "returns assigned external ports": + let d = MockPmpDev(addPortMappingOk: true, mappedPort: Port(9000)) + let res = d.mapPorts(Port(8080), Port(8090)) + check res.isSome + check res.get() == (Port(9000), Port(9000)) diff --git a/vendor/nim-chronos b/vendor/nim-chronos index 785fcf4d..45f43a9a 160000 --- a/vendor/nim-chronos +++ b/vendor/nim-chronos @@ -1 +1 @@ -Subproject commit 785fcf4ddec1101a3df1f044d6331504d7ab95c6 +Subproject commit 45f43a9ad8bd8bcf5903b42f365c1c879bd54240 diff --git a/vendor/nim-libp2p b/vendor/nim-libp2p index e82080f7..425e7248 160000 --- a/vendor/nim-libp2p +++ b/vendor/nim-libp2p @@ -1 +1 @@ -Subproject commit e82080f7b1aa61c6d35fa5311b873f41eff4bb52 +Subproject commit 425e72487ea4bd5766b5a0590a05039406f46a2f diff --git a/vendor/nim-lsquic b/vendor/nim-lsquic new file mode 160000 index 00000000..a776eced --- /dev/null +++ b/vendor/nim-lsquic @@ -0,0 +1 @@ +Subproject commit a776eced48d1f3c630d8f3a8a3e976171dd1f9c1