Merge 14c934676e1922626c93c70eaad7a215081d192a into cf2f40f5591ce9e75e49b5c7e70d2ec53d296cfd

This commit is contained in:
Arnaud 2026-05-08 13:20:12 +00:00 committed by GitHub
commit d0951c4002
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 932 additions and 565 deletions

3
.gitmodules vendored
View File

@ -196,3 +196,6 @@
[submodule "vendor/nim-merkletree"]
path = vendor/nim-merkletree
url = https://github.com/logos-storage/nim-merkletree
[submodule "vendor/nim-lsquic"]
path = vendor/nim-lsquic
url = https://github.com/vacp2p/nim-lsquic

View File

@ -59,6 +59,13 @@ proc getDebug(
if node.discovery.dhtRecord.isSome: node.discovery.dhtRecord.get.toURI else: "",
"announceAddresses": node.discovery.announceAddrs,
"table": table,
"nat": {
"reachability":
if storage[].autonatService.isSome:
$storage[].autonatService.get.networkReachability
else:
"unknown"
},
}
return ok($json)

View File

@ -13,7 +13,7 @@ import std/sequtils
import pkg/chronos
import pkg/libp2p
import pkg/libp2p/utils/semaphore
import pkg/chronos/asyncsync
import pkg/questionable
import pkg/questionable/results
@ -107,13 +107,17 @@ proc send*(
let peer = b.peers[id]
await b.inflightSema.acquire()
await peer.send(msg)
try:
await peer.send(msg)
finally:
try:
b.inflightSema.release()
except AsyncSemaphoreError as err:
error "Failed to release semaphore", msg = err.msg
except CancelledError as error:
raise error
except CatchableError as err:
error "Error sending message", peer = id, msg = err.msg
finally:
b.inflightSema.release()
proc handleWantList(
b: BlockExcNetwork, peer: NetworkPeer, list: WantList

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/math
import std/[math, options]
import pkg/libp2p
import pkg/chronos

View File

@ -52,6 +52,8 @@ export
DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockInterval, DefaultNumBlocksPerInterval,
DefaultBlockRetries
const DefaultNatScheduleInterval* = 5.minutes
type ThreadCount* = distinct Natural
proc `==`*(a, b: ThreadCount): bool {.borrow.}
@ -155,10 +157,9 @@ type
nat* {.
desc:
"Specify method to use for determining public address. " &
"Must be one of: any, none, upnp, pmp, extip:<IP>. " &
"If connecting to peers on a local network only, use 'none'.",
"Must be one of: auto, extip:<IP>.",
defaultValue: defaultNatConfig(),
defaultValueDesc: "any",
defaultValueDesc: "auto",
name: "nat"
.}: NatConfig
@ -286,11 +287,48 @@ type
desc: "Logs to file", defaultValue: string.none, name: "log-file", hidden
.}: Option[string]
natScheduleInterval* {.
desc: "Interval between AutoNAT reachability checks",
defaultValue: DefaultNatScheduleInterval,
defaultValueDesc: $DefaultNatScheduleInterval,
name: "nat-schedule-interval"
.}: Duration
natNumPeersToAsk* {.
desc: "Number of peers to ask per AutoNAT round",
defaultValue: 3,
name: "nat-num-peers-to-ask"
.}: int
natMaxQueueSize* {.
desc: "Number of past AutoNAT results kept to calculate confidence",
defaultValue: 3,
name: "nat-max-queue-size"
.}: int
natMinConfidence* {.
desc: "Minimum confidence threshold to confirm reachability",
defaultValue: 0.7,
name: "nat-min-confidence"
.}: float
natMaxRelays* {.
desc: "Maximum number of relay servers to reserve slots on simultaneously",
defaultValue: 2,
name: "nat-max-relays"
.}: int
relay* {.
desc: "Enable circuit relay server (hop) - use on publicly reachable nodes only",
defaultValue: false,
name: "relay"
.}: bool
func defaultAddress*(conf: StorageConf): IpAddress =
result = static parseIpAddress("127.0.0.1")
func defaultNatConfig*(): NatConfig =
result = NatConfig(hasExtIp: false, nat: NatStrategy.NatAny)
result = NatConfig(hasExtIp: false, nat: NatStrategy.NatAuto)
proc getStorageVersion(): string =
let tag = strip(staticExec("git describe --tags --abbrev=0"))
@ -366,14 +404,8 @@ proc parseCmdArg*(T: type SignedPeerRecord, uri: string): T =
func parse*(T: type NatConfig, p: string): Result[NatConfig, string] =
case p.toLowerAscii
of "any":
return ok(NatConfig(hasExtIp: false, nat: NatStrategy.NatAny))
of "none":
return ok(NatConfig(hasExtIp: false, nat: NatStrategy.NatNone))
of "upnp":
return ok(NatConfig(hasExtIp: false, nat: NatStrategy.NatUpnp))
of "pmp":
return ok(NatConfig(hasExtIp: false, nat: NatStrategy.NatPmp))
of "auto":
return ok(NatConfig(hasExtIp: false, nat: NatStrategy.NatAuto))
else:
if p.startsWith("extip:"):
try:

View File

@ -22,6 +22,7 @@ import pkg/codexdht/discv5/[routing_table, protocol as discv5]
from pkg/nimcrypto import keccak256
import ./rng
import ./utils/addrutils
import ./errors
import ./logutils
@ -175,29 +176,47 @@ method removeProvider*(
warn "Error removing provider", peerId = peerId, exc = exc.msg
raiseAssert("Unexpected Exception in removeProvider")
proc updateRecords*(
d: Discovery, announceAddrs: openArray[MultiAddress], discoveryPort: Port
) =
## Update both provider and DHT records from TCP announce addresses.
## Discovery (UDP) addresses are derived by remapping announceAddrs to UDP with discoveryPort.
## Updates the discv5 SPR once with the full set of addresses.
let tcpAddrs = @announceAddrs
let udpAddrs =
tcpAddrs.mapIt(it.remapAddr(protocol = some("udp"), port = some(discoveryPort)))
debug "Updating addresses", tcpAddrs, udpAddrs
d.announceAddrs = tcpAddrs
d.providerRecord = SignedPeerRecord
.init(d.key, PeerRecord.init(d.peerId, tcpAddrs))
.expect("Should construct signed record").some
d.dhtRecord = SignedPeerRecord
.init(d.key, PeerRecord.init(d.peerId, tcpAddrs & udpAddrs))
.expect("Should construct signed record").some
if not d.protocol.isNil:
d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR")
proc updateAnnounceRecord*(d: Discovery, addrs: openArray[MultiAddress]) =
## Update providers record
##
# Updates announce addresses only, not the DHT routing record.
# Relay addresses should not pollute DHT routing.
d.announceAddrs = @addrs
info "Updating announce record", addrs = d.announceAddrs
d.providerRecord = SignedPeerRecord
.init(d.key, PeerRecord.init(d.peerId, d.announceAddrs))
.expect("Should construct signed record").some
if not d.protocol.isNil:
d.protocol.updateRecord(d.providerRecord).expect("Should update SPR")
proc updateDhtRecord*(d: Discovery, addrs: openArray[MultiAddress]) =
## Update providers record
##
proc updateDhtRecord*(
d: Discovery, addrs: openArray[MultiAddress]
) {.deprecated: "use updateRecords instead".} =
info "Updating Dht record", addrs = addrs
d.dhtRecord = SignedPeerRecord
.init(d.key, PeerRecord.init(d.peerId, @addrs))
.expect("Should construct signed record").some
if not d.protocol.isNil:
d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR")
@ -237,7 +256,8 @@ proc new*(
key: PrivateKey,
bindIp = IPv4_any(),
bindPort = 0.Port,
announceAddrs: openArray[MultiAddress],
announceAddrs: openArray[MultiAddress] = [],
discoveryPort = 0.Port,
bootstrapNodes: openArray[SignedPeerRecord] = [],
store: Datastore = SQLiteDatastore.new(Memory).expect("Should not fail!"),
): Discovery =
@ -248,7 +268,7 @@ proc new*(
key: key, peerId: PeerId.init(key).expect("Should construct PeerId"), store: store
)
self.updateAnnounceRecord(announceAddrs)
self.updateRecords(announceAddrs, discoveryPort)
# --------------------------------------------------------------------------
# FIXME disable IP limits temporarily so we can run our workshop. Re-enable

View File

@ -13,7 +13,7 @@ import times
{.push raises: [].}
import std/tables
import std/[tables, options]
import pkg/libp2p
import pkg/questionable

View File

@ -8,421 +8,187 @@
{.push raises: [].}
import
std/[options, os, times, net, atomics, exitprocs],
nat_traversal/[miniupnpc, natpmp],
json_serialization/std/net,
results
import std/[options, net]
import results
import pkg/chronos
import pkg/chronos/threadsync
import pkg/chronicles
import pkg/libp2p
import pkg/libp2p/services/autorelayservice
import ./utils
import ./utils/natutils
import ./utils/addrutils
import ./discovery
const
UPNP_TIMEOUT = 200 # ms
PORT_MAPPING_INTERVAL = 20 * 60 # seconds
NATPMP_LIFETIME = 60 * 60 # in seconds, must be longer than PORT_MAPPING_INTERVAL
logScope:
topics = "nat"
type PortMappings* = object
internalTcpPort: Port
externalTcpPort: Port
internalUdpPort: Port
externalUdpPort: Port
description: string
type PortMappingArgs =
tuple[strategy: NatStrategy, tcpPort, udpPort: Port, description: string]
const NatPortMappingTimeout = 5.seconds
type NatConfig* = object
case hasExtIp*: bool
of true: extIp*: IpAddress
of false: nat*: NatStrategy
var
upnp {.threadvar.}: Miniupnp
npmp {.threadvar.}: NatPmp
strategy = NatStrategy.NatNone
natClosed: Atomic[bool]
extIp: Option[IpAddress]
activeMappings: seq[PortMappings]
natThreads: seq[Thread[PortMappingArgs]] = @[]
type NatMapper* = ref object of RootObj
natConfig*: NatConfig
tcpPort*: Port
discoveryPort*: Port
hasUpnpMapping: bool
logScope:
topics = "nat"
type MapNatPortsCtx = object
natConfig: NatConfig
tcpPort: Port
discoveryPort: Port
signal: ThreadSignalPtr
result: Option[(Port, Port)]
hasUpnpMapping: bool
type PrefSrcStatus = enum
NoRoutingInfo
PrefSrcIsPublic
PrefSrcIsPrivate
BindAddressIsPublic
BindAddressIsPrivate
proc mapNatPortsThread(ctx: ptr MapNatPortsCtx) {.thread.} =
if ctx.natConfig.hasExtIp:
discard ctx.signal.fireSync()
return
## Also does threadvar initialisation.
## Must be called before redirectPorts() in each thread.
proc getExternalIP*(natStrategy: NatStrategy, quiet = false): Option[IpAddress] =
var externalIP: IpAddress
# Devices are recreated on each call: discover() costs ~200ms but only fires
# when AutoNAT reports NotReachable, which is exactly when we want a fresh scan.
let upnpRes = UpnpDevice.init()
if upnpRes.isOk:
let ports = upnpRes.value.mapPorts(ctx.tcpPort, ctx.discoveryPort)
if ports.isSome:
ctx.hasUpnpMapping = true
ctx.result = ports
discard ctx.signal.fireSync()
return
if natStrategy == NatStrategy.NatAny or natStrategy == NatStrategy.NatUpnp:
if upnp == nil:
upnp = newMiniupnp()
let pmpRes = PmpDevice.init()
if pmpRes.isOk:
let ports = pmpRes.value.mapPorts(ctx.tcpPort, ctx.discoveryPort)
if ports.isSome:
ctx.result = ports
upnp.discoverDelay = UPNP_TIMEOUT
let dres = upnp.discover()
if dres.isErr:
debug "UPnP", msg = dres.error
else:
var
msg: cstring
canContinue = true
case upnp.selectIGD()
of IGDNotFound:
msg = "Internet Gateway Device not found. Giving up."
canContinue = false
of IGDFound:
msg = "Internet Gateway Device found."
of IGDNotConnected:
msg = "Internet Gateway Device found but it's not connected. Trying anyway."
of NotAnIGD:
msg =
"Some device found, but it's not recognised as an Internet Gateway Device. Trying anyway."
of IGDIpNotRoutable:
msg =
"Internet Gateway Device found and is connected, but with a reserved or non-routable IP. Trying anyway."
if not quiet:
debug "UPnP", msg
if canContinue:
let ires = upnp.externalIPAddress()
if ires.isErr:
debug "UPnP", msg = ires.error
else:
# if we got this far, UPnP is working and we don't need to try NAT-PMP
try:
externalIP = parseIpAddress(ires.value)
strategy = NatStrategy.NatUpnp
return some(externalIP)
except ValueError as e:
error "parseIpAddress() exception", err = e.msg
return
discard ctx.signal.fireSync()
if natStrategy == NatStrategy.NatAny or natStrategy == NatStrategy.NatPmp:
if npmp == nil:
npmp = newNatPmp()
let nres = npmp.init()
if nres.isErr:
debug "NAT-PMP", msg = nres.error
else:
let nires = npmp.externalIPAddress()
if nires.isErr:
debug "NAT-PMP", msg = nires.error
else:
try:
externalIP = parseIpAddress($(nires.value))
strategy = NatStrategy.NatPmp
return some(externalIP)
except ValueError as e:
error "parseIpAddress() exception", err = e.msg
return
method mapNatPorts*(
m: NatMapper
): Future[Option[(Port, Port)]] {.async: (raises: [CancelledError]), base, gcsafe.} =
let signal = ThreadSignalPtr.new().valueOr:
warn "Failed to create ThreadSignalPtr for NAT port mapping"
return none((Port, Port))
# This queries the routing table to get the "preferred source" attribute and
# checks if it's a public IP. If so, then it's our public IP.
#
# Further more, we check if the bind address (user provided, or a "0.0.0.0"
# default) is a public IP. That's a long shot, because code paths involving a
# user-provided bind address are not supposed to get here.
proc getRoutePrefSrc(bindIp: IpAddress): (Option[IpAddress], PrefSrcStatus) =
let bindAddress = initTAddress(bindIp, Port(0))
var ctx = cast[ptr MapNatPortsCtx](createShared(MapNatPortsCtx))
ctx[] = MapNatPortsCtx(
natConfig: m.natConfig,
tcpPort: m.tcpPort,
discoveryPort: m.discoveryPort,
signal: signal,
)
if bindAddress.isAnyLocal():
let ip = getRouteIpv4()
if ip.isErr():
# No route was found, log error and continue without IP.
error "No routable IP address found, check your network connection",
error = ip.error
return (none(IpAddress), NoRoutingInfo)
elif ip.get().isGlobalUnicast():
return (some(ip.get()), PrefSrcIsPublic)
else:
return (none(IpAddress), PrefSrcIsPrivate)
elif bindAddress.isGlobalUnicast():
return (some(bindIp), BindAddressIsPublic)
else:
return (none(IpAddress), BindAddressIsPrivate)
var thread: Thread[ptr MapNatPortsCtx]
var threadStarted = false
defer:
if threadStarted:
# Blocking the event loop here is acceptable: UPnP discover() is bounded
# by UPNP_TIMEOUT (200ms), so the worst-case stall is ~200ms.
joinThread(thread)
# Always sync hasUpnpMapping back, even on timeout or cancellation.
# If the thread mapped ports just after the timeout, close() will
# still clean them up on the router.
if ctx.hasUpnpMapping:
m.hasUpnpMapping = true
freeShared(ctx)
discard signal.close()
# Try to detect a public IP assigned to this host, before trying NAT traversal.
proc getPublicRoutePrefSrcOrExternalIP*(
natStrategy: NatStrategy, bindIp: IpAddress, quiet = true
): Option[IpAddress] =
let (prefSrcIp, prefSrcStatus) = getRoutePrefSrc(bindIp)
case prefSrcStatus
of NoRoutingInfo, PrefSrcIsPublic, BindAddressIsPublic:
return prefSrcIp
of PrefSrcIsPrivate, BindAddressIsPrivate:
let extIp = getExternalIP(natStrategy, quiet)
if extIp.isSome:
return some(extIp.get)
proc doPortMapping(
strategy: NatStrategy, tcpPort, udpPort: Port, description: string
): Option[(Port, Port)] {.gcsafe.} =
var
extTcpPort: Port
extUdpPort: Port
if strategy == NatStrategy.NatUpnp:
for t in [(tcpPort, UPNPProtocol.TCP), (udpPort, UPNPProtocol.UDP)]:
let
(port, protocol) = t
pmres = upnp.addPortMapping(
externalPort = $port,
protocol = protocol,
internalHost = upnp.lanAddr,
internalPort = $port,
desc = description,
leaseDuration = 0,
)
if pmres.isErr:
error "UPnP port mapping", msg = pmres.error, port
return
else:
# let's check it
let cres =
upnp.getSpecificPortMapping(externalPort = $port, protocol = protocol)
if cres.isErr:
warn "UPnP port mapping check failed. Assuming the check itself is broken and the port mapping was done.",
msg = cres.error
info "UPnP: added port mapping",
externalPort = port, internalPort = port, protocol = protocol
case protocol
of UPNPProtocol.TCP:
extTcpPort = port
of UPNPProtocol.UDP:
extUdpPort = port
elif strategy == NatStrategy.NatPmp:
for t in [(tcpPort, NatPmpProtocol.TCP), (udpPort, NatPmpProtocol.UDP)]:
let
(port, protocol) = t
pmres = npmp.addPortMapping(
eport = port.cushort,
iport = port.cushort,
protocol = protocol,
lifetime = NATPMP_LIFETIME,
)
if pmres.isErr:
error "NAT-PMP port mapping", msg = pmres.error, port
return
else:
let extPort = Port(pmres.value)
info "NAT-PMP: added port mapping",
externalPort = extPort, internalPort = port, protocol = protocol
case protocol
of NatPmpProtocol.TCP:
extTcpPort = extPort
of NatPmpProtocol.UDP:
extUdpPort = extPort
return some((extTcpPort, extUdpPort))
proc repeatPortMapping(args: PortMappingArgs) {.thread, raises: [ValueError].} =
ignoreSignalsInThread()
let
(strategy, tcpPort, udpPort, description) = args
interval = initDuration(seconds = PORT_MAPPING_INTERVAL)
sleepDuration = 1_000 # in ms, also the maximum delay after pressing Ctrl-C
var lastUpdate = now()
# We can't use copies of Miniupnp and NatPmp objects in this thread, because they share
# C pointers with other instances that have already been garbage collected, so
# we use threadvars instead and initialise them again with getExternalIP(),
# even though we don't need the external IP's value.
let ipres = getExternalIP(strategy, quiet = true)
if ipres.isSome:
while natClosed.load() == false:
let currTime = now()
if currTime >= (lastUpdate + interval):
discard doPortMapping(strategy, tcpPort, udpPort, description)
lastUpdate = currTime
sleep(sleepDuration)
proc stopNatThreads() {.noconv.} =
# stop the thread
debug "Stopping NAT port mapping renewal threads"
try:
natClosed.store(true)
joinThreads(natThreads)
except Exception as exc:
warn "Failed to stop NAT port mapping renewal thread", exc = exc.msg
createThread(thread, mapNatPortsThread, ctx)
threadStarted = true
except ValueError, ResourceExhaustedError:
warn "Failed to create thread for NAT port mapping"
return none((Port, Port))
# delete our port mappings
try:
if not await signal.wait().withTimeout(NatPortMappingTimeout):
warn "NAT port mapping thread timed out"
return none((Port, Port))
except CancelledError as exc:
raise exc
except AsyncError as exc:
warn "Error waiting for NAT port mapping thread", error = exc.msg
return none((Port, Port))
# FIXME: if the initial port mapping failed because it already existed for the
# required external port, we should not delete it. It might have been set up
# by another program.
return ctx.result
# In Windows, a new thread is created for the signal handler, so we need to
# initialise our threadvars again.
method handleNatStatus*(
m: NatMapper,
networkReachability: NetworkReachability,
dialBackAddr: Opt[MultiAddress],
discoveryPort: Port,
discovery: Discovery,
switch: Switch,
autoRelayService: AutoRelayService,
) {.async: (raises: [CancelledError]), base, gcsafe.} =
case networkReachability
of Unknown:
discard
of Reachable:
if dialBackAddr.isNone:
warn "Got empty dialback address in AutoNat when node is Reachable"
return
let ipres = getExternalIP(strategy, quiet = true)
if ipres.isSome:
if strategy == NatStrategy.NatUpnp:
for entry in activeMappings:
for t in [
(entry.externalTcpPort, entry.internalTcpPort, UPNPProtocol.TCP),
(entry.externalUdpPort, entry.internalUdpPort, UPNPProtocol.UDP),
]:
let
(eport, iport, protocol) = t
pmres = upnp.deletePortMapping(externalPort = $eport, protocol = protocol)
if pmres.isErr:
error "UPnP port mapping deletion", msg = pmres.error
else:
debug "UPnP: deleted port mapping",
externalPort = eport, internalPort = iport, protocol = protocol
elif strategy == NatStrategy.NatPmp:
for entry in activeMappings:
for t in [
(entry.externalTcpPort, entry.internalTcpPort, NatPmpProtocol.TCP),
(entry.externalUdpPort, entry.internalUdpPort, NatPmpProtocol.UDP),
]:
let
(eport, iport, protocol) = t
pmres = npmp.deletePortMapping(
eport = eport.cushort, iport = iport.cushort, protocol = protocol
)
if pmres.isErr:
error "NAT-PMP port mapping deletion", msg = pmres.error
else:
debug "NAT-PMP: deleted port mapping",
externalPort = eport, internalPort = iport, protocol = protocol
if autoRelayService.isRunning:
if not await autoRelayService.stop(switch):
debug "AutoRelayService stop method returned false"
proc redirectPorts*(
strategy: NatStrategy, tcpPort, udpPort: Port, description: string
): Option[(Port, Port)] =
result = doPortMapping(strategy, tcpPort, udpPort, description)
if result.isSome:
let (externalTcpPort, externalUdpPort) = result.get()
# needed by NAT-PMP on port mapping deletion
# Port mapping works. Let's launch a thread that repeats it, in case the
# NAT-PMP lease expires or the router is rebooted and forgets all about
# these mappings.
activeMappings.add(
PortMappings(
internalTcpPort: tcpPort,
externalTcpPort: externalTcpPort,
internalUdpPort: udpPort,
externalUdpPort: externalUdpPort,
description: description,
)
)
try:
natThreads.add(Thread[PortMappingArgs]())
natThreads[^1].createThread(
repeatPortMapping, (strategy, externalTcpPort, externalUdpPort, description)
)
# atexit() in disguise
if natThreads.len == 1:
# we should register the thread termination function only once
addExitProc(stopNatThreads)
except Exception as exc:
warn "Failed to create NAT port mapping renewal thread", exc = exc.msg
discovery.updateRecords(@[dialBackAddr.get], discoveryPort)
# TODO: switch DHT to server mode
of NotReachable:
var hasPortMapping = false
proc setupNat*(
natStrategy: NatStrategy, tcpPort, udpPort: Port, clientId: string
): tuple[ip: Option[IpAddress], tcpPort, udpPort: Option[Port]] =
## Setup NAT port mapping and get external IP address.
## If any of this fails, we don't return any IP address but do return the
## original ports as best effort.
## TODO: Allow for tcp or udp port mapping to be optional.
if extIp.isNone:
extIp = getExternalIP(natStrategy)
if extIp.isSome:
let ip = extIp.get
let extPorts = (
{.gcsafe.}:
redirectPorts(
strategy, tcpPort = tcpPort, udpPort = udpPort, description = clientId
)
)
if extPorts.isSome:
let (extTcpPort, extUdpPort) = extPorts.get()
(ip: some(ip), tcpPort: some(extTcpPort), udpPort: some(extUdpPort))
if dialBackAddr.isNone:
warn "Got empty dialback address in AutoNat when node is NotReachable"
else:
warn "UPnP/NAT-PMP available but port forwarding failed"
(ip: none(IpAddress), tcpPort: some(tcpPort), udpPort: some(udpPort))
else:
warn "UPnP/NAT-PMP not available"
(ip: none(IpAddress), tcpPort: some(tcpPort), udpPort: some(udpPort))
let maybePorts = await m.mapNatPorts()
proc setupAddress*(
natConfig: NatConfig, bindIp: IpAddress, tcpPort, udpPort: Port, clientId: string
): tuple[ip: Option[IpAddress], tcpPort, udpPort: Option[Port]] {.gcsafe.} =
## Set-up of the external address via any of the ways as configured in
## `NatConfig`. In case all fails an error is logged and the bind ports are
## selected also as external ports, as best effort and in hope that the
## external IP can be figured out by other means at a later stage.
## TODO: Allow for tcp or udp bind ports to be optional.
if maybePorts.isSome:
let (tcpPort, udpPort) = maybePorts.get()
let announceAddress = dialBackAddr.get.remapAddr(port = some(tcpPort))
if natConfig.hasExtIp:
# any required port redirection must be done by hand
return (some(natConfig.extIp), some(tcpPort), some(udpPort))
# TODO: Try a dial me to make sure we are reachable
case natConfig.nat
of NatStrategy.NatAny:
let (prefSrcIp, prefSrcStatus) = getRoutePrefSrc(bindIp)
if autoRelayService.isRunning:
if not await autoRelayService.stop(switch):
debug "AutoRelayService stop method returned false"
case prefSrcStatus
of NoRoutingInfo, PrefSrcIsPublic, BindAddressIsPublic:
return (prefSrcIp, some(tcpPort), some(udpPort))
of PrefSrcIsPrivate, BindAddressIsPrivate:
return setupNat(natConfig.nat, tcpPort, udpPort, clientId)
of NatStrategy.NatNone:
let (prefSrcIp, prefSrcStatus) = getRoutePrefSrc(bindIp)
discovery.updateRecords(@[announceAddress], udpPort)
hasPortMapping = true
case prefSrcStatus
of NoRoutingInfo, PrefSrcIsPublic, BindAddressIsPublic:
return (prefSrcIp, some(tcpPort), some(udpPort))
of PrefSrcIsPrivate:
error "No public IP address found. Should not use --nat:none option"
return (none(IpAddress), some(tcpPort), some(udpPort))
of BindAddressIsPrivate:
error "Bind IP is not a public IP address. Should not use --nat:none option"
return (none(IpAddress), some(tcpPort), some(udpPort))
of NatStrategy.NatUpnp, NatStrategy.NatPmp:
return setupNat(natConfig.nat, tcpPort, udpPort, clientId)
if not hasPortMapping and not autoRelayService.isRunning:
if not await autoRelayService.setup(switch):
debug "AutoRelayService setup method returned false"
proc nattedAddress*(
natConfig: NatConfig, addrs: seq[MultiAddress], udpPort: Port
): tuple[libp2p, discovery: seq[MultiAddress]] =
## Takes a NAT configuration, sequence of multiaddresses and UDP port and returns:
## - Modified multiaddresses with NAT-mapped addresses for libp2p
## - Discovery addresses with NAT-mapped UDP ports
proc close*(m: NatMapper, device = UpnpDevice()) =
# UPnP mappings are permanent (leaseDuration=0) and must be deleted explicitly.
# NAT-PMP mappings expire automatically after NATPMP_LIFETIME seconds.
if not m.hasUpnpMapping:
return
var discoveryAddrs = newSeq[MultiAddress](0)
let newAddrs = addrs.mapIt:
block:
# Extract IP address and port from the multiaddress
let (ipPart, port) = getAddressAndPort(it)
if ipPart.isSome and port.isSome:
# Try to setup NAT mapping for the address
let (newIP, tcp, udp) =
setupAddress(natConfig, ipPart.get, port.get, udpPort, "storage")
if newIP.isSome:
# NAT mapping successful - add discovery address with mapped UDP port
discoveryAddrs.add(getMultiAddrWithIPAndUDPPort(newIP.get, udp.get))
# Remap original address with NAT IP and TCP port
it.remapAddr(ip = newIP, port = tcp)
else:
# NAT mapping failed - use original address
echo "Failed to get external IP, using original address", it
discoveryAddrs.add(getMultiAddrWithIPAndUDPPort(ipPart.get, udpPort))
it
else:
# Invalid multiaddress format - return as is
it
(newAddrs, discoveryAddrs)
# deletePortMapping requires the IGD control URL set during init
let deviceRes = device.init()
if deviceRes.isErr:
warn "UPnP reinit failed during cleanup, port mappings may remain",
msg = deviceRes.error
return
for (port, proto) in [
(m.tcpPort, NatIpProtocol.Tcp), (m.discoveryPort, NatIpProtocol.Udp)
]:
let res = deviceRes.value.deletePortMapping(port, proto)
if res.isErr:
error "UPnP port mapping deletion failed", port, proto, msg = res.error
proc findReachableNodes*(bootstrapNodes: seq[SignedPeerRecord]): seq[SignedPeerRecord] =
## Returns the list of nodes known to be directly reachable.
## Currently returns bootstrap nodes. In the future, any network participant
## confirmed reachable by AutoNAT could be included.
bootstrapNodes

View File

@ -23,6 +23,7 @@ import pkg/confutils
import pkg/libp2p
import pkg/libp2p/routing_record
import pkg/libp2p/protocols/connectivity/autonatv2/service
import pkg/codexdht/discv5/spr as spr
import ../logutils
@ -557,7 +558,12 @@ proc initNodeApi(node: StorageNodeRef, conf: StorageConf, router: var RestRouter
return
RestApiResponse.error(Http500, "Unknown error dialling peer", headers = headers)
proc initDebugApi(node: StorageNodeRef, conf: StorageConf, router: var RestRouter) =
proc initDebugApi(
node: StorageNodeRef,
conf: StorageConf,
autonat: Option[AutonatV2Service],
router: var RestRouter,
) =
let allowedOrigin = router.allowedOrigin
router.api(MethodGet, "/api/storage/v1/debug/info") do() -> RestApiResponse:
@ -577,6 +583,13 @@ proc initDebugApi(node: StorageNodeRef, conf: StorageConf, router: var RestRoute
"announceAddresses": node.discovery.announceAddrs,
"table": table,
"storage": {"version": $storageVersion, "revision": $storageRevision},
"nat": {
"reachability":
if autonat.isSome:
$autonat.get.networkReachability
else:
"unknown"
},
}
# return pretty json for human readability
@ -637,12 +650,13 @@ proc initRestApi*(
node: StorageNodeRef,
conf: StorageConf,
repoStore: RepoStore,
autonat: Option[AutonatV2Service],
corsAllowedOrigin: ?string,
): RestRouter =
var router = RestRouter.init(validate, corsAllowedOrigin)
initDataApi(node, repoStore, router)
initNodeApi(node, conf, router)
initDebugApi(node, conf, router)
initDebugApi(node, conf, autonat, router)
return router

View File

@ -17,6 +17,9 @@ import pkg/chronos
import pkg/taskpools
import pkg/presto
import pkg/libp2p
import pkg/libp2p/protocols/connectivity/autonatv2/[service, client]
import pkg/libp2p/protocols/connectivity/relay/client as relayClientModule
import pkg/libp2p/services/autorelayservice
import pkg/confutils
import pkg/confutils/defs
import pkg/stew/io2
@ -33,7 +36,6 @@ import ./blockexchange
import ./utils/fileutils
import ./discovery
import ./utils/addrutils
import ./utils/natutils
import ./namespaces
import ./storagetypes
import ./logutils
@ -51,6 +53,9 @@ type
repoStore: RepoStore
maintenance: BlockMaintainer
taskpool: Taskpool
autonatService*: Option[AutonatV2Service]
autoRelayService: AutoRelayService
natMapper: NatMapper
isStarted: bool
StoragePrivateKey* = libp2p.PrivateKey # alias
@ -76,25 +81,40 @@ proc start*(s: StorageServer) {.async.} =
await s.storageNode.switch.start()
let (announceAddrs, discoveryAddrs) = nattedAddress(
s.config.nat, s.storageNode.switch.peerInfo.addrs, s.config.discoveryPort
)
let announceAddrs =
if s.config.nat.hasExtIp:
# extip means that we assume the IP is reachable
# So we just take the first peer addr and remap it with extip to keep the port only
if s.storageNode.switch.peerInfo.addrs.len == 0:
raise
newException(StorageError, "extip is set but switch has no listen addresses")
@[
s.storageNode.switch.peerInfo.addrs[0].remapAddr(
ip = some(s.config.nat.extIp), port = none(Port)
)
]
else:
# If extip is not set, we have 2 choices:
# 1- Announce the peer addrs contains detected addresses on the machine.
# 2- Wait for AutoNat
# The problem with 1 is that you will certainly announce private addresses
# and if you advertise a CID, you will advertise these private addresses.
# TODO: DHT client mode
#s.storageNode.switch.peerInfo.addrs
@[]
var hasPublicAddr = false
for announceAddr in announceAddrs:
let (maybeIp, _) = getAddressAndPort(announceAddr)
if maybeIp.isSome and maybeIp.get.isGlobalUnicast():
hasPublicAddr = true
break
if not hasPublicAddr:
warn "Unable to determine a public IP address. This node will only be reachable on a private network."
s.storageNode.discovery.updateAnnounceRecord(announceAddrs)
s.storageNode.discovery.updateDhtRecord(discoveryAddrs)
s.storageNode.discovery.updateRecords(announceAddrs, s.config.discoveryPort)
await s.storageNode.start()
for spr in findReachableNodes(s.config.bootstrapNodes):
try:
let addrs = spr.data.addresses.mapIt(it.address)
await s.storageNode.switch.connect(spr.data.peerId, addrs)
except CatchableError as e:
warn "Cannot connect to bootstrap node", error = e.msg
discard
if s.restServer != nil:
s.restServer.start()
@ -107,6 +127,9 @@ proc stop*(s: StorageServer) {.async.} =
notice "Stopping Storage node"
if s.natMapper != nil:
s.natMapper.close()
var futures = @[
s.storageNode.switch.stop(),
s.storageNode.stop(),
@ -171,6 +194,27 @@ proc new*(
## create StorageServer including setting up datastore, repostore, etc
let listenMultiAddr = getMultiAddrWithIpAndTcpPort(config.listenIp, config.listenPort)
let relayClient = relayClientModule.RelayClient.new(canHop = config.relay)
let autonatClient = AutonatV2Client.new(random.Rng.instance())
let autonatService =
if config.nat.hasExtIp:
none(AutonatV2Service)
else:
some(
AutonatV2Service.new(
rng = random.Rng.instance(),
client = autonatClient,
config = AutonatV2ServiceConfig.new(
scheduleInterval = Opt.some(config.natScheduleInterval),
askNewConnectedPeers = true,
numPeersToAsk = config.natNumPeersToAsk,
maxQueueSize = config.natMaxQueueSize,
minConfidence = config.natMinConfidence,
),
)
)
let switch = SwitchBuilder
.new()
.withPrivateKey(privateKey)
@ -182,8 +226,19 @@ proc new*(
.withAgentVersion(config.agentString)
.withSignedPeerRecord(true)
.withTcpTransport({ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay})
.withAutonatV2Server()
.withCircuitRelay(relayClient)
.withServices(
if autonatService.isSome:
@[Service(autonatService.get)]
else:
@[]
)
.build()
autonatClient.setup(switch)
switch.mount(autonatClient)
var
cache: CacheStore = nil
taskPool: Taskpool
@ -222,8 +277,9 @@ proc new*(
discovery = Discovery.new(
switch.peerInfo.privateKey,
announceAddrs = @[listenMultiAddr],
announceAddrs = @[],
bindPort = config.discoveryPort,
discoveryPort = config.discoveryPort,
bootstrapNodes = config.bootstrapNodes,
store = discoveryStore,
)
@ -285,12 +341,24 @@ proc new*(
taskPool = taskPool,
)
autoRelayService = AutoRelayService.new(
maxNumRelays = config.natMaxRelays,
client = relayClient,
onReservation = proc(addresses: seq[MultiAddress]) {.gcsafe, raises: [].} =
debug "Relay reservation updated", addresses
# relay addresses are for download traffic only, not DHT routing
discovery.updateAnnounceRecord(addresses),
rng = random.Rng.instance(),
)
var restServer: RestServerRef = nil
if config.apiBindAddress.isSome:
restServer = RestServerRef
.new(
storageNode.initRestApi(config, repoStore, config.apiCorsAllowedOrigin),
storageNode.initRestApi(
config, repoStore, autonatService, config.apiCorsAllowedOrigin
),
initTAddress(config.apiBindAddress.get(), config.apiPort),
bufferSize = (1024 * 64),
maxRequestBodySize = int.high,
@ -300,6 +368,25 @@ proc new*(
switch.mount(network)
switch.mount(manifestProto)
let natMapper = NatMapper(
natConfig: config.nat,
tcpPort: config.listenPort,
discoveryPort: config.discoveryPort,
)
if autonatService.isSome:
autonatService.get.setStatusAndConfidenceHandler(
proc(
networkReachability: NetworkReachability,
confidence: Opt[float],
addrs: Opt[MultiAddress],
) {.async: (raises: [CancelledError]).} =
debug "AutoNAT status", reachability = networkReachability, confidence
await natMapper.handleNatStatus(
networkReachability, addrs, config.discoveryPort, discovery, switch,
autoRelayService,
)
)
StorageServer(
config: config,
storageNode: storageNode,
@ -308,4 +395,7 @@ proc new*(
maintenance: maintenance,
taskPool: taskPool,
logFile: logFile,
autonatService: autonatService,
autoRelayService: autoRelayService,
natMapper: natMapper,
)

View File

@ -9,6 +9,8 @@
{.push raises: [].}
import std/options
import pkg/chronos
import pkg/libp2p
import pkg/questionable/results

View File

@ -14,14 +14,14 @@ import std/strutils
import std/options
import pkg/libp2p
import pkg/stew/endians2
func remapAddr*(
address: MultiAddress,
ip: Option[IpAddress] = IpAddress.none,
port: Option[Port] = Port.none,
protocol: Option[string] = string.none,
): MultiAddress =
## Remap addresses to new IP and/or Port
## Remap addresses to new IP, port, and/or transport protocol (e.g. "tcp" → "udp")
##
var parts = ($address).split("/")
@ -32,6 +32,12 @@ func remapAddr*(
else:
parts[2]
parts[3] =
if protocol.isSome:
protocol.get
else:
parts[3]
parts[4] =
if port.isSome:
$port.get
@ -67,38 +73,3 @@ proc getMultiAddrWithIpAndTcpPort*(ip: IpAddress, port: Port): MultiAddress =
return MultiAddress.init(ipFamily & $ip & "/tcp/" & $port).expect(
"Failed to construct multiaddress with IP and TCP port"
)
proc getAddressAndPort*(
ma: MultiAddress
): tuple[ip: Option[IpAddress], port: Option[Port]] =
try:
# Try IPv4 first
let ipv4Result = ma[multiCodec("ip4")]
let ip =
if ipv4Result.isOk:
let ipBytes = ipv4Result.get().protoArgument().expect("Invalid IPv4 format")
let ipArray = [ipBytes[0], ipBytes[1], ipBytes[2], ipBytes[3]]
some(IpAddress(family: IPv4, address_v4: ipArray))
else:
# Try IPv6 if IPv4 not found
let ipv6Result = ma[multiCodec("ip6")]
if ipv6Result.isOk:
let ipBytes = ipv6Result.get().protoArgument().expect("Invalid IPv6 format")
var ipArray: array[16, byte]
for i in 0 .. 15:
ipArray[i] = ipBytes[i]
some(IpAddress(family: IPv6, address_v6: ipArray))
else:
none(IpAddress)
# Get TCP Port
let portResult = ma[multiCodec("tcp")]
let port =
if portResult.isOk:
let portBytes = portResult.get().protoArgument().expect("Invalid port format")
some(Port(fromBytesBE(uint16, portBytes)))
else:
none(Port)
(ip: ip, port: port)
except Exception:
(ip: none(IpAddress), port: none(Port))

View File

@ -1,66 +1,205 @@
{.push raises: [].}
import std/[net, tables, hashes], pkg/results, chronos, chronicles
import std/[options, net]
import nat_traversal/[miniupnpc, natpmp]
import pkg/chronicles
import results
import pkg/libp2p
export miniupnpc, natpmp, results, options, net
logScope:
topics = "nat"
const UPNP_TIMEOUT* = 200 # ms
const NATPMP_LIFETIME* = 60 * 60 # seconds
type NatStrategy* = enum
NatAny
NatUpnp
NatPmp
NatNone
NatAuto
type IpLimits* = object
limit*: uint
ips: Table[IpAddress, uint]
type NatIpProtocol* = enum
Tcp
Udp
func hash*(ip: IpAddress): Hash =
case ip.family
of IpAddressFamily.IPv6:
hash(ip.address_v6)
of IpAddressFamily.IPv4:
hash(ip.address_v4)
# Generic Nat device can be UPnP or PmP
type NatDevice* = ref object of RootObj
func inc*(ipLimits: var IpLimits, ip: IpAddress): bool =
let val = ipLimits.ips.getOrDefault(ip, 0)
if val < ipLimits.limit:
ipLimits.ips[ip] = val + 1
true
else:
false
type UpnpDevice* = ref object of NatDevice
upnp: Miniupnp
func dec*(ipLimits: var IpLimits, ip: IpAddress) =
let val = ipLimits.ips.getOrDefault(ip, 0)
if val == 1:
ipLimits.ips.del(ip)
elif val > 1:
ipLimits.ips[ip] = val - 1
type PmpDevice* = ref object of NatDevice
npmp: NatPmp
func isGlobalUnicast*(address: TransportAddress): bool =
if address.isGlobal() and address.isUnicast(): true else: false
# appPortMapping is specific to the type of Nat device
method addPortMapping*(
d: NatDevice, port: Port, proto: NatIpProtocol
): Result[Port, string] {.base, gcsafe.} =
return err("not implemented")
func isGlobalUnicast*(address: IpAddress): bool =
let a = initTAddress(address, Port(0))
a.isGlobalUnicast()
# Creates the mapping the the router and
# returns the opened ports.
method mapPorts*(
d: NatDevice, tcpPort, udpPort: Port
): Option[(Port, Port)] {.base, gcsafe.} =
var extTcpPort, extUdpPort: Port
proc getRouteIpv4*(): Result[IpAddress, cstring] =
# Avoiding Exception with initTAddress and can't make it work with static.
# Note: `publicAddress` is only used an "example" IP to find the best route,
# no data is send over the network to this IP!
let
publicAddress = TransportAddress(
family: AddressFamily.IPv4, address_v4: [1'u8, 1, 1, 1], port: Port(0)
)
route = getBestRoute(publicAddress)
for t in [(tcpPort, NatIpProtocol.Tcp), (udpPort, NatIpProtocol.Udp)]:
let (port, proto) = t
let pmres = d.addPortMapping(port, proto)
if route.source.isUnspecified():
err("No best ipv4 route found")
else:
let ip =
try:
route.source.address()
except ValueError as e:
# This should not occur really.
error "Address conversion error", exception = e.name, msg = e.msg
return err("Invalid IP address")
ok(ip)
if pmres.isErr:
error "port mapping failed", msg = pmres.error
return none((Port, Port))
case proto
of Tcp:
extTcpPort = pmres.value
of Udp:
extUdpPort = pmres.value
return some((extTcpPort, extUdpPort))
method getSpecificPortMapping*(
d: UpnpDevice, externalPort: string, protocol: UPNPProtocol
): Result[PortMappingRes, cstring] {.base, gcsafe.} =
if d.upnp == nil:
return err(cstring("upnp not initialized"))
d.upnp.getSpecificPortMapping(externalPort = externalPort, protocol = protocol)
method discover*(d: UpnpDevice): Result[int, cstring] {.base, gcsafe.} =
if d.upnp == nil:
return err(cstring("upnp not initialized"))
return d.upnp.discover()
method selectIGD*(d: UpnpDevice): SelectIGDResult {.base, gcsafe.} =
if d.upnp == nil:
return IGDNotFound
return d.upnp.selectIGD()
proc init*(T: type UpnpDevice): Result[UpnpDevice, string] {.gcsafe.} =
UpnpDevice().init()
# Init UPnP device and create miniupnp instance.
# It call "discover" to retrieve the UPnP devices on the network,
# and then "selectIGD" to select a suitable device.
proc init*(d: UpnpDevice): Result[UpnpDevice, string] {.gcsafe.} =
if d.upnp == nil:
d.upnp = newMiniupnp()
d.upnp.discoverDelay = UPNP_TIMEOUT
let dres = d.discover()
if dres.isErr:
debug "UPnP", msg = dres.error
return err($dres.error)
case d.selectIGD()
of IGDNotFound:
debug "UPnP", msg = "Internet Gateway Device not found. Giving up."
return err("IGD not found")
of IGDFound:
debug "UPnP", msg = "Internet Gateway Device found."
of IGDNotConnected:
debug "UPnP",
msg = "Internet Gateway Device found but it's not connected. Trying anyway."
of NotAnIGD:
debug "UPnP",
msg =
"Some device found, but it's not recognised as an Internet Gateway Device. Trying anyway."
of IGDIpNotRoutable:
debug "UPnP",
msg =
"Internet Gateway Device found and is connected, but with a reserved or non-routable IP. Trying anyway."
return ok(d)
# For UPnP, the external port is the same as the application port.
# This should work for most of the case.
# We could change this by using addAnyPortMapping for IGD2 compatible routers
# if needed.
method addPortMapping*(
d: UpnpDevice, port: Port, proto: NatIpProtocol
): Result[Port, string] {.gcsafe.} =
if d.upnp == nil:
return err("upnp not initialized")
let protocol = if proto == NatIpProtocol.Tcp: UPNPProtocol.TCP else: UPNPProtocol.UDP
let pmres = d.upnp.addPortMapping(
externalPort = $port,
protocol = protocol,
internalHost = d.upnp.lanAddr,
internalPort = $port,
desc = "logos-storage",
leaseDuration = 0,
)
if pmres.isErr:
return err($pmres.error)
let cres = d.getSpecificPortMapping(externalPort = $port, protocol = protocol)
if cres.isErr:
# Eventually, the check could fail on some router even if the router is successful.
# So we log a warning but we still want to continue because it is not sure it is a failure.
warn "UPnP port mapping check failed. Assuming the check itself is broken and the port mapping was done.",
msg = cres.error
info "UPnP: added port mapping", externalPort = port, internalPort = port
return ok(port)
method deletePortMapping*(
d: UpnpDevice, port: Port, proto: NatIpProtocol
): Result[void, string] {.base, gcsafe.} =
if d.upnp == nil:
return err("upnp not initialized")
let protocol = if proto == NatIpProtocol.Tcp: UPNPProtocol.TCP else: UPNPProtocol.UDP
let res = d.upnp.deletePortMapping(externalPort = $port, protocol = protocol)
if res.isErr:
return err($res.error)
debug "UPnP: deleted port mapping", port, proto
ok()
proc init*(T: type PmpDevice): Result[PmpDevice, string] {.gcsafe.} =
PmpDevice().init()
# Create a NatPmP instance.
proc init*(d: PmpDevice): Result[PmpDevice, string] {.gcsafe.} =
if d.npmp == nil:
d.npmp = newNatPmp()
let res = d.npmp.init()
if res.isErr:
debug "NAT-PMP", msg = res.error
return err($res.error)
return ok(d)
# Add a port mapping on NAT-PMP device.
# The application port might not be the external port.
# The latter is returned.
method addPortMapping*(
d: PmpDevice, port: Port, proto: NatIpProtocol
): Result[Port, string] {.gcsafe.} =
if d.npmp == nil:
return err("npmp not initialized")
let protocol =
if proto == NatIpProtocol.Tcp: NatPmpProtocol.TCP else: NatPmpProtocol.UDP
let pmres = d.npmp.addPortMapping(
eport = port.cushort,
iport = port.cushort,
protocol = protocol,
lifetime = NATPMP_LIFETIME,
)
if pmres.isErr:
return err(pmres.error)
let extPort = Port(pmres.value)
info "NAT-PMP: added port mapping", externalPort = extPort, internalPort = port
return ok(extPort)

View File

@ -0,0 +1,32 @@
import std/json
import std/options
import pkg/chronos
import pkg/questionable/results
import ../multinodes
import ../storageclient
import ../storageconfig
multinodesuite "AutoNAT integration":
let natConfig = NodeConfigs(
clients: StorageConfigs
.init(nodes = 2)
.withNatNumPeersToAsk(1)
.withNatMinConfidence(0.5)
.withNatScheduleInterval(10.seconds)
.withNatMaxQueueSize(1)
# .withLogFile()
# .withLogLevel("DEBUG")
.some
)
# Reminder: multinodesuite setup the first node as bootstrap node
test "node is reachable when using bootstrap node on same network", natConfig:
let node1 = clients()[0]
let node2 = clients()[1]
check eventuallySafe(
(await node2.client.natReachability()).get() == "Reachable",
timeout = 30_000,
pollInterval = 500,
)

View File

@ -131,7 +131,6 @@ template multinodesuite*(suiteName: string, body: untyped) =
config.addCliOption("--bootstrap-node", bootstrapNode)
config.addCliOption("--data-dir", datadir)
config.addCliOption("--nat", "none")
except StorageConfigError as e:
raiseMultiNodeSuiteError "invalid cli option, error: " & e.msg
@ -214,6 +213,9 @@ template multinodesuite*(suiteName: string, body: untyped) =
trace "Setting up test", suite = suiteName, test = currentTestName, nodeConfigs
if var clients =? nodeConfigs.clients:
failAndTeardownOnError "failed to start client nodes":
# Only the first node (bootstrap) gets a known extip. Other nodes use
# nat=auto so AutoNAT can run and determine their reachability.
clients = clients.withExtIp(0)
for config in clients.configs:
let node = await startClientNode(config)
running.add RunningNode(role: Role.Client, node: node)

View File

@ -1,4 +1,5 @@
import std/strutils
import std/sequtils
from pkg/libp2p import Cid, `$`, init
import pkg/questionable/results
@ -260,3 +261,23 @@ proc hasBlockRaw*(
.} =
let url = client.baseurl & "/data/" & cid & "/exists"
return client.get(url)
proc connectPeer*(
client: StorageClient, peerId: string, addrs: seq[string]
): Future[void] {.async: (raises: [CancelledError, HttpError]).} =
var url = client.baseurl & "/connect/" & peerId
if addrs.len > 0:
url &= "?" & addrs.mapIt("addrs=" & it).join("&")
let response = await client.get(url)
assert response.status == 200
proc natReachability*(
client: StorageClient
): Future[?!string] {.async: (raises: [CancelledError, HttpError]).} =
let info = await client.info()
if info.isErr:
return failure "Failed to get node info"
try:
return info.get()["nat"]["reachability"].getStr().success
except KeyError as e:
return failure e.msg

View File

@ -5,6 +5,7 @@ import std/strutils
import std/sugar
import std/tables
from pkg/chronicles import LogLevel
import pkg/chronos
import pkg/storage/conf
import pkg/storage/units
import pkg/confutils
@ -280,3 +281,60 @@ proc withStorageQuota*(
for config in startConfig.configs.mitems:
config.addCliOption("--storage-quota", $quota)
return startConfig
proc withListenIp*(
self: StorageConfigs, ip: string
): StorageConfigs {.raises: [StorageConfigError].} =
var startConfig = self
for config in startConfig.configs.mitems:
config.addCliOption("--listen-ip", ip)
return startConfig
proc withNatNumPeersToAsk*(
self: StorageConfigs, numPeersToAsk: int
): StorageConfigs {.raises: [StorageConfigError].} =
var startConfig = self
for config in startConfig.configs.mitems:
config.addCliOption("--nat-num-peers-to-ask", $numPeersToAsk)
return startConfig
proc withNatMaxQueueSize*(
self: StorageConfigs, maxQueueSize: int
): StorageConfigs {.raises: [StorageConfigError].} =
var startConfig = self
for config in startConfig.configs.mitems:
config.addCliOption("--nat-max-queue-size", $maxQueueSize)
return startConfig
proc withNatMinConfidence*(
self: StorageConfigs, minConfidence: float
): StorageConfigs {.raises: [StorageConfigError].} =
var startConfig = self
for config in startConfig.configs.mitems:
config.addCliOption("--nat-min-confidence", $minConfidence)
return startConfig
proc withNatScheduleInterval*(
self: StorageConfigs, scheduleInterval: Duration
): StorageConfigs {.raises: [StorageConfigError].} =
var startConfig = self
for config in startConfig.configs.mitems:
config.addCliOption("--nat-schedule-interval", $scheduleInterval)
return startConfig
proc withExtIp*(
self: StorageConfigs, idx: int, ip = "127.0.0.1"
): StorageConfigs {.raises: [StorageConfigError].} =
self.checkBounds idx
var startConfig = self
startConfig.configs[idx].addCliOption("--nat", "extip:" & ip)
return startConfig
proc withExtIp*(
self: StorageConfigs, ip = "127.0.0.1"
): StorageConfigs {.raises: [StorageConfigError].} =
var startConfig = self
for config in startConfig.configs.mitems:
config.addCliOption("--nat", "extip:" & ip)
return startConfig

View File

@ -1,6 +1,4 @@
import std/sequtils
import std/sugar
import std/tables
import std/[sequtils, sugar, tables, options]
import pkg/chronos

View File

@ -1,3 +1,4 @@
import std/importutils
import std/[sequtils, tables]
import pkg/chronos
@ -12,6 +13,8 @@ import ../../asynctest
import ../examples
import ../helpers
privateAccess(BlockExcNetwork)
asyncchecksuite "Network - Handlers":
let
rng = Rng.instance()
@ -185,7 +188,7 @@ asyncchecksuite "Network - Test Limits":
switch1 = newStandardSwitch()
switch2 = newStandardSwitch()
network1 = BlockExcNetwork.new(switch = switch1, maxInflight = 0)
network1 = BlockExcNetwork.new(switch = switch1, maxInflight = 1)
switch1.mount(network1)
network2 = BlockExcNetwork.new(switch = switch2)
@ -205,6 +208,8 @@ asyncchecksuite "Network - Test Limits":
): Future[void] {.async: (raises: []).} =
check false
await network1.inflightSema.acquire()
let fut = network1.send(switch2.peerInfo.peerId, Message())
await sleepAsync(100.millis)

View File

@ -10,8 +10,6 @@ import pkg/storage/stores
import pkg/storage/blocktype as bt
import pkg/storage/blockexchange
import pkg/storage/systemclock
import pkg/storage/nat
import pkg/storage/utils/natutils
import pkg/storage/merkletree
import pkg/storage/manifest
@ -218,13 +216,7 @@ proc generateNodes*(
if config.enableBootstrap:
waitFor switch.peerInfo.update()
let (announceAddrs, discoveryAddrs) = nattedAddress(
NatConfig(hasExtIp: false, nat: NatNone),
switch.peerInfo.addrs,
bindPort.Port,
)
blockDiscovery.updateAnnounceRecord(announceAddrs)
blockDiscovery.updateDhtRecord(discoveryAddrs)
blockDiscovery.updateRecords(switch.peerInfo.addrs, bindPort.Port)
if blockDiscovery.dhtRecord.isSome:
bootstrapNodes.add !blockDiscovery.dhtRecord

View File

@ -1,43 +1,160 @@
import std/[unittest, net]
import std/[net, importutils, envvars]
import pkg/chronos
import ../../storage/utils/natutils
import pkg/libp2p/[multiaddress, multihash, multicodec]
import pkg/libp2p/protocols/connectivity/autonat/types
import pkg/libp2p/protocols/connectivity/relay/client as relayClientModule
import pkg/libp2p/services/autorelayservice except setup
import pkg/results
import ./helpers
import ../asynctest
import ../../storage/nat
import ../../storage/discovery
import ../../storage/rng
import ../../storage/utils
suite "NAT Address Tests":
test "nattedAddress with local addresses":
# Setup test data
let
udpPort = Port(1234)
natConfig = NatConfig(hasExtIp: true, extIp: parseIpAddress("8.8.8.8"))
privateAccess(NatMapper)
# Create test addresses
localAddr = MultiAddress.init("/ip4/127.0.0.1/tcp/5000").expect("valid multiaddr")
anyAddr = MultiAddress.init("/ip4/0.0.0.0/tcp/5000").expect("valid multiaddr")
publicAddr =
MultiAddress.init("/ip4/192.168.1.1/tcp/5000").expect("valid multiaddr")
type MockUpnpDevice = ref object of UpnpDevice
deletedPorts: seq[(Port, NatIpProtocol)]
# Expected results
let
expectedDiscoveryAddrs = @[
MultiAddress.init("/ip4/8.8.8.8/udp/1234").expect("valid multiaddr"),
MultiAddress.init("/ip4/8.8.8.8/udp/1234").expect("valid multiaddr"),
MultiAddress.init("/ip4/8.8.8.8/udp/1234").expect("valid multiaddr"),
]
expectedlibp2pAddrs = @[
MultiAddress.init("/ip4/8.8.8.8/tcp/5000").expect("valid multiaddr"),
MultiAddress.init("/ip4/8.8.8.8/tcp/5000").expect("valid multiaddr"),
MultiAddress.init("/ip4/8.8.8.8/tcp/5000").expect("valid multiaddr"),
]
method discover*(d: MockUpnpDevice): Result[int, cstring] {.gcsafe.} =
ok(1)
#ipv6Addr = MultiAddress.init("/ip6/::1/tcp/5000").expect("valid multiaddr")
addrs = @[localAddr, anyAddr, publicAddr]
method selectIGD*(d: MockUpnpDevice): SelectIGDResult {.gcsafe.} =
IGDFound
# Test address remapping
let (libp2pAddrs, discoveryAddrs) = nattedAddress(natConfig, addrs, udpPort)
method deletePortMapping*(
d: MockUpnpDevice, port: Port, proto: NatIpProtocol
): Result[void, string] {.gcsafe.} =
d.deletedPorts.add((port, proto))
ok()
# Verify results
check(discoveryAddrs == expectedDiscoveryAddrs)
check(libp2pAddrs == expectedlibp2pAddrs)
type MockNatMapper = ref object of NatMapper
mappedPorts: Option[(Port, Port)]
method mapNatPorts*(
m: MockNatMapper
): Future[Option[(Port, Port)]] {.async: (raises: [CancelledError]), gcsafe.} =
m.mappedPorts
suite "NAT - NatMapper.close":
test "does nothing when no upnp mapping":
let mapper = MockNatMapper(
natConfig: NatConfig(hasExtIp: false, nat: NatAuto),
tcpPort: Port(8080),
discoveryPort: Port(8090),
)
let device = MockUpnpDevice()
mapper.close(device)
check device.deletedPorts.len == 0
test "deletes tcp and udp ports when upnp mapping exists":
let mapper = MockNatMapper(
natConfig: NatConfig(hasExtIp: false, nat: NatAuto),
tcpPort: Port(8080),
discoveryPort: Port(8090),
)
mapper.hasUpnpMapping = true
let device = MockUpnpDevice()
mapper.close(device)
check device.deletedPorts ==
@[(Port(8080), NatIpProtocol.Tcp), (Port(8090), NatIpProtocol.Udp)]
asyncchecksuite "NAT - handleNatStatus":
var sw: Switch
var key: PrivateKey
var disc: Discovery
var autoRelay: AutoRelayService
setup:
autoRelay =
AutoRelayService.new(1, relayClientModule.RelayClient.new(), nil, Rng.instance())
key = PrivateKey.random(Rng.instance[]).get()
disc = Discovery.new(key, announceAddrs = @[])
sw = newStandardSwitch()
await sw.start()
teardown:
await sw.stop()
if autoRelay.isRunning:
discard await autoRelay.stop(sw)
let discoveryPort = Port(8090)
test "handleNatStatus announces mapped address when NotReachable and UPnP succeeds":
let dialBack = MultiAddress.init("/ip4/1.2.3.4/tcp/8080").expect("valid")
let mapper = MockNatMapper(mappedPorts: some((Port(9000), Port(9001))))
await mapper.handleNatStatus(
NotReachable, Opt.some(dialBack), discoveryPort, disc, sw, autoRelay
)
check disc.announceAddrs ==
@[MultiAddress.init("/ip4/1.2.3.4/tcp/9000").expect("valid")]
check not autoRelay.isRunning
test "handleNatStatus starts autoRelay when NotReachable and UPnP failed":
let mapper = MockNatMapper(mappedPorts: none((Port, Port)))
await mapper.handleNatStatus(
NotReachable, Opt.none(MultiAddress), discoveryPort, disc, sw, autoRelay
)
check autoRelay.isRunning
test "handleNatStatus starts autoRelay when NotReachable and mapping fails":
let dialBack = MultiAddress.init("/ip4/1.2.3.4/tcp/8080").expect("valid")
let mapper = MockNatMapper(mappedPorts: none((Port, Port)))
await mapper.handleNatStatus(
NotReachable, Opt.some(dialBack), discoveryPort, disc, sw, autoRelay
)
check autoRelay.isRunning
check disc.announceAddrs == newSeq[MultiAddress]()
test "handleNatStatus does not announce address when Reachable and no dialBackAddr":
let mapper = MockNatMapper(mappedPorts: none((Port, Port)))
await mapper.handleNatStatus(
Reachable, Opt.none(MultiAddress), discoveryPort, disc, sw, autoRelay
)
check disc.announceAddrs == newSeq[MultiAddress]()
check not autoRelay.isRunning
test "handleNatStatus stops relay and announces dialBackAddr when Reachable":
let dialBack = MultiAddress.init("/ip4/1.2.3.4/tcp/8080").expect("valid")
let mapper = MockNatMapper(mappedPorts: none((Port, Port)))
discard await autorelayservice.setup(autoRelay, sw)
await mapper.handleNatStatus(
Reachable, Opt.some(dialBack), discoveryPort, disc, sw, autoRelay
)
check not autoRelay.isRunning
check disc.announceAddrs == @[dialBack]
suite "NAT - UPnP port mapping (requires NAT_TEST_UPNP=1)":
test "mapPorts and cleanup":
if getEnv("NAT_TEST_UPNP") != "1":
skip()
return
let res = UpnpDevice.init()
check res.isOk
let device = res.value
let ports = device.mapPorts(Port(8101), Port(8090))
check ports.isSome
let (tcp, udp) = ports.get()
check tcp == Port(8101)
check udp == Port(8090)
check device.deletePortMapping(Port(8101), NatIpProtocol.Tcp).isOk
check device.deletePortMapping(Port(8090), NatIpProtocol.Udp).isOk

View File

@ -0,0 +1,93 @@
import std/[options, net]
import nat_traversal/[miniupnpc, natpmp]
import pkg/results
import ../asynctest
import ../../storage/utils/natutils
type MockUpnpDev = ref object of UpnpDevice
discoverOk: bool
igdResult: SelectIGDResult
addPortMappingOk: bool
failOnProto: Option[NatIpProtocol]
type MockPmpDev = ref object of PmpDevice
addPortMappingOk: bool
mappedPort: Port
method discover*(d: MockUpnpDev): Result[int, cstring] {.gcsafe.} =
if d.discoverOk:
ok(1)
else:
err(cstring("discover failed"))
method selectIGD*(d: MockUpnpDev): SelectIGDResult {.gcsafe.} =
d.igdResult
method addPortMapping*(
d: MockUpnpDev, port: Port, proto: NatIpProtocol
): Result[Port, string] {.gcsafe.} =
if d.failOnProto == some(proto):
err("mapping failed")
elif d.addPortMappingOk:
ok(port)
else:
err("mapping failed")
method getSpecificPortMapping*(
d: MockUpnpDev, externalPort: string, protocol: UPNPProtocol
): Result[PortMappingRes, cstring] {.gcsafe.} =
ok(PortMappingRes())
method addPortMapping*(
d: MockPmpDev, port: Port, proto: NatIpProtocol
): Result[Port, string] {.gcsafe.} =
if d.addPortMappingOk:
ok(d.mappedPort)
else:
err("mapping failed")
suite "NAT - UpnpDevice.init":
test "returns err when discover fails":
check MockUpnpDev(discoverOk: false).init().isErr
test "returns err when IGD not found":
check MockUpnpDev(discoverOk: true, igdResult: IGDNotFound).init().isErr
test "returns ok when IGD found":
check MockUpnpDev(discoverOk: true, igdResult: IGDFound).init().isOk
test "returns ok when IGD not connected":
check MockUpnpDev(discoverOk: true, igdResult: IGDNotConnected).init().isOk
test "returns ok when not an IGD":
check MockUpnpDev(discoverOk: true, igdResult: NotAnIGD).init().isOk
test "returns ok when IP not routable":
check MockUpnpDev(discoverOk: true, igdResult: IGDIpNotRoutable).init().isOk
suite "NAT - UpnpDevice.mapPorts":
test "returns none when addPortMapping fails":
check MockUpnpDev(addPortMappingOk: false).mapPorts(Port(8080), Port(8090)).isNone
test "returns mapped ports":
let res = MockUpnpDev(addPortMappingOk: true).mapPorts(Port(8080), Port(8090))
check res.isSome
check res.get() == (Port(8080), Port(8090))
test "returns none when tcp mapping fails":
let d = MockUpnpDev(addPortMappingOk: true, failOnProto: some(NatIpProtocol.Tcp))
check d.mapPorts(Port(8080), Port(8090)).isNone
test "returns none when udp mapping fails":
let d = MockUpnpDev(addPortMappingOk: true, failOnProto: some(NatIpProtocol.Udp))
check d.mapPorts(Port(8080), Port(8090)).isNone
suite "NAT - PmpDevice.mapPorts":
test "returns none when mapping fails":
check MockPmpDev(addPortMappingOk: false).mapPorts(Port(8080), Port(8090)).isNone
test "returns assigned external ports":
let d = MockPmpDev(addPortMappingOk: true, mappedPort: Port(9000))
let res = d.mapPorts(Port(8080), Port(8090))
check res.isSome
check res.get() == (Port(9000), Port(9000))

2
vendor/nim-chronos vendored

@ -1 +1 @@
Subproject commit 785fcf4ddec1101a3df1f044d6331504d7ab95c6
Subproject commit 45f43a9ad8bd8bcf5903b42f365c1c879bd54240

2
vendor/nim-libp2p vendored

@ -1 +1 @@
Subproject commit e82080f7b1aa61c6d35fa5311b873f41eff4bb52
Subproject commit 425e72487ea4bd5766b5a0590a05039406f46a2f

1
vendor/nim-lsquic vendored Submodule

@ -0,0 +1 @@
Subproject commit a776eced48d1f3c630d8f3a8a3e976171dd1f9c1