mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-02 13:33:10 +00:00
first round of fixing NAT
This commit is contained in:
parent
f791a960f2
commit
a18c3a8d2a
@ -57,6 +57,8 @@ type
|
||||
repoStore: RepoStore
|
||||
maintenance: BlockMaintainer
|
||||
taskpool: Taskpool
|
||||
isStarted: bool
|
||||
natManager: NatManager
|
||||
|
||||
CodexPrivateKey* = libp2p.PrivateKey # alias
|
||||
EthWallet = ethers.Wallet
|
||||
@ -167,7 +169,7 @@ proc start*(s: CodexServer) {.async.} =
|
||||
await s.codexNode.switch.start()
|
||||
|
||||
let (announceAddrs, discoveryAddrs) = nattedAddress(
|
||||
s.config.nat, s.codexNode.switch.peerInfo.addrs, s.config.discoveryPort
|
||||
s.natManager, s.config.nat, s.codexNode.switch.peerInfo.addrs, s.config.discoveryPort
|
||||
)
|
||||
|
||||
s.codexNode.discovery.updateAnnounceRecord(announceAddrs)
|
||||
@ -190,17 +192,34 @@ proc stop*(s: CodexServer) {.async.} =
|
||||
]
|
||||
)
|
||||
|
||||
shutdownNat(s.natManager)
|
||||
|
||||
if res.failure.len > 0:
|
||||
error "Failed to stop codex node", failures = res.failure.len
|
||||
raiseAssert "Failed to stop codex node"
|
||||
|
||||
if not s.taskpool.isNil:
|
||||
s.taskpool.shutdown()
|
||||
try:
|
||||
s.taskpool.shutdown()
|
||||
except Exception as exc:
|
||||
error "Failed to stop the taskpool", failures = res.failure.len
|
||||
raiseAssert("Failure in taskpool shutdown:" & exc.msg)
|
||||
|
||||
shutdownNat(s.natManager)
|
||||
|
||||
if res.failure.len > 0:
|
||||
error "Failed to close codex node", failures = res.failure.len
|
||||
raiseAssert "Failed to close codex node"
|
||||
|
||||
proc shutdown*(server: CodexServer) {.async.} =
|
||||
await server.stop()
|
||||
await server.close()
|
||||
|
||||
proc new*(
|
||||
T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey
|
||||
): CodexServer =
|
||||
## create CodexServer including setting up datastore, repostore, etc
|
||||
let natManager = newNatManager()
|
||||
let switch = SwitchBuilder
|
||||
.new()
|
||||
.withPrivateKey(privateKey)
|
||||
@ -338,4 +357,5 @@ proc new*(
|
||||
repoStore: repoStore,
|
||||
maintenance: maintenance,
|
||||
taskpool: taskpool,
|
||||
natManager: natManager,
|
||||
)
|
||||
|
||||
245
codex/nat.nim
245
codex/nat.nim
@ -28,15 +28,29 @@ const
|
||||
PORT_MAPPING_INTERVAL = 20 * 60 # seconds
|
||||
NATPMP_LIFETIME = 60 * 60 # in seconds, must be longer than PORT_MAPPING_INTERVAL
|
||||
|
||||
type PortMappings* = object
|
||||
internalTcpPort: Port
|
||||
externalTcpPort: Port
|
||||
internalUdpPort: Port
|
||||
externalUdpPort: Port
|
||||
description: string
|
||||
type
|
||||
PortMappings* = object
|
||||
internalTcpPort: Port
|
||||
externalTcpPort: Port
|
||||
internalUdpPort: Port
|
||||
externalUdpPort: Port
|
||||
description: string
|
||||
|
||||
type PortMappingArgs =
|
||||
tuple[strategy: NatStrategy, tcpPort, udpPort: Port, description: string]
|
||||
NatManager* = ref NatManagerObj
|
||||
|
||||
PortMappingArgs = object
|
||||
manager: NatManager
|
||||
strategy: NatStrategy
|
||||
tcpPort: Port
|
||||
udpPort: Port
|
||||
description: string
|
||||
|
||||
NatManagerObj = object
|
||||
selectedStrategy*: NatStrategy
|
||||
extIp*: Option[IpAddress]
|
||||
natClosed*: Atomic[bool]
|
||||
activeMappings*: seq[PortMappings]
|
||||
natThreads*: seq[Thread[PortMappingArgs]]
|
||||
|
||||
type NatConfig* = object
|
||||
case hasExtIp*: bool
|
||||
@ -46,11 +60,6 @@ type NatConfig* = object
|
||||
var
|
||||
upnp {.threadvar.}: Miniupnp
|
||||
npmp {.threadvar.}: NatPmp
|
||||
strategy = NatStrategy.NatNone
|
||||
natClosed: Atomic[bool]
|
||||
extIp: Option[IpAddress]
|
||||
activeMappings: seq[PortMappings]
|
||||
natThreads: seq[Thread[PortMappingArgs]] = @[]
|
||||
|
||||
logScope:
|
||||
topics = "nat"
|
||||
@ -62,11 +71,25 @@ type PrefSrcStatus = enum
|
||||
BindAddressIsPublic
|
||||
BindAddressIsPrivate
|
||||
|
||||
proc shutdownNat*(manager: NatManager)
|
||||
|
||||
proc newNatManager*(): NatManager {.gcsafe.} =
|
||||
new(result)
|
||||
result.selectedStrategy = NatStrategy.NatNone
|
||||
result.extIp = none(IpAddress)
|
||||
result.activeMappings = @[]
|
||||
result.natThreads = @[]
|
||||
result.natClosed.store(false)
|
||||
|
||||
## Also does threadvar initialisation.
|
||||
## Must be called before redirectPorts() in each thread.
|
||||
proc getExternalIP*(natStrategy: NatStrategy, quiet = false): Option[IpAddress] =
|
||||
proc getExternalIP*(
|
||||
natStrategy: NatStrategy, quiet = false
|
||||
): tuple[ip: Option[IpAddress], selected: NatStrategy] =
|
||||
var externalIP: IpAddress
|
||||
|
||||
result.selected = NatStrategy.NatNone
|
||||
|
||||
if natStrategy == NatStrategy.NatAny or natStrategy == NatStrategy.NatUpnp:
|
||||
if upnp == nil:
|
||||
upnp = newMiniupnp()
|
||||
@ -103,8 +126,9 @@ proc getExternalIP*(natStrategy: NatStrategy, quiet = false): Option[IpAddress]
|
||||
# if we got this far, UPnP is working and we don't need to try NAT-PMP
|
||||
try:
|
||||
externalIP = parseIpAddress(ires.value)
|
||||
strategy = NatStrategy.NatUpnp
|
||||
return some(externalIP)
|
||||
result.selected = NatStrategy.NatUpnp
|
||||
result.ip = some(externalIP)
|
||||
return
|
||||
except ValueError as e:
|
||||
error "parseIpAddress() exception", err = e.msg
|
||||
return
|
||||
@ -122,8 +146,9 @@ proc getExternalIP*(natStrategy: NatStrategy, quiet = false): Option[IpAddress]
|
||||
else:
|
||||
try:
|
||||
externalIP = parseIpAddress($(nires.value))
|
||||
strategy = NatStrategy.NatPmp
|
||||
return some(externalIP)
|
||||
result.selected = NatStrategy.NatPmp
|
||||
result.ip = some(externalIP)
|
||||
return
|
||||
except ValueError as e:
|
||||
error "parseIpAddress() exception", err = e.msg
|
||||
return
|
||||
@ -163,9 +188,9 @@ proc getPublicRoutePrefSrcOrExternalIP*(
|
||||
of NoRoutingInfo, PrefSrcIsPublic, BindAddressIsPublic:
|
||||
return prefSrcIp
|
||||
of PrefSrcIsPrivate, BindAddressIsPrivate:
|
||||
let extIp = getExternalIP(natStrategy, quiet)
|
||||
let (extIp, _) = getExternalIP(natStrategy, quiet)
|
||||
if extIp.isSome:
|
||||
return some(extIp.get)
|
||||
return extIp
|
||||
|
||||
proc doPortMapping(
|
||||
strategy: NatStrategy, tcpPort, udpPort: Port, description: string
|
||||
@ -230,20 +255,28 @@ proc doPortMapping(
|
||||
|
||||
proc repeatPortMapping(args: PortMappingArgs) {.thread, raises: [ValueError].} =
|
||||
ignoreSignalsInThread()
|
||||
|
||||
let
|
||||
(strategy, tcpPort, udpPort, description) = args
|
||||
manager = args.manager
|
||||
strategy = args.strategy
|
||||
tcpPort = args.tcpPort
|
||||
udpPort = args.udpPort
|
||||
description = args.description
|
||||
interval = initDuration(seconds = PORT_MAPPING_INTERVAL)
|
||||
sleepDuration = 1_000 # in ms, also the maximum delay after pressing Ctrl-C
|
||||
|
||||
if manager.isNil:
|
||||
return
|
||||
|
||||
var lastUpdate = now()
|
||||
|
||||
# We can't use copies of Miniupnp and NatPmp objects in this thread, because they share
|
||||
# C pointers with other instances that have already been garbage collected, so
|
||||
# we use threadvars instead and initialise them again with getExternalIP(),
|
||||
# even though we don't need the external IP's value.
|
||||
let ipres = getExternalIP(strategy, quiet = true)
|
||||
let (ipres, _) = getExternalIP(strategy, quiet = true)
|
||||
if ipres.isSome:
|
||||
while natClosed.load() == false:
|
||||
while manager.natClosed.load() == false:
|
||||
let
|
||||
# we're being silly here with this channel polling because we can't
|
||||
# select on Nim channels like on Go ones
|
||||
@ -252,61 +285,69 @@ proc repeatPortMapping(args: PortMappingArgs) {.thread, raises: [ValueError].} =
|
||||
discard doPortMapping(strategy, tcpPort, udpPort, description)
|
||||
lastUpdate = currTime
|
||||
|
||||
sleep(sleepDuration)
|
||||
sleep(sleepDuration)
|
||||
|
||||
proc stopNatThreads() {.noconv.} =
|
||||
# stop the thread
|
||||
debug "Stopping NAT port mapping renewal threads"
|
||||
try:
|
||||
natClosed.store(true)
|
||||
joinThreads(natThreads)
|
||||
except Exception as exc:
|
||||
warn "Failed to stop NAT port mapping renewal thread", exc = exc.msg
|
||||
proc shutdownNat*(manager: NatManager) =
|
||||
if manager.isNil:
|
||||
return
|
||||
|
||||
# delete our port mappings
|
||||
if manager.natThreads.len > 0:
|
||||
debug "Stopping NAT port mapping renewal threads"
|
||||
manager.natClosed.store(true)
|
||||
|
||||
# FIXME: if the initial port mapping failed because it already existed for the
|
||||
# required external port, we should not delete it. It might have been set up
|
||||
# by another program.
|
||||
for thread in manager.natThreads.mitems:
|
||||
try:
|
||||
joinThread(thread)
|
||||
except Exception as exc:
|
||||
warn "Failed to stop NAT port mapping renewal thread", exc = exc.msg
|
||||
manager.natThreads.setLen(0)
|
||||
|
||||
# In Windows, a new thread is created for the signal handler, so we need to
|
||||
# initialise our threadvars again.
|
||||
let selected = manager.selectedStrategy
|
||||
if selected in {NatStrategy.NatUpnp, NatStrategy.NatPmp} and
|
||||
manager.activeMappings.len > 0:
|
||||
let (ipres, _) = getExternalIP(selected, quiet = true)
|
||||
if ipres.isSome:
|
||||
case selected
|
||||
of NatStrategy.NatUpnp:
|
||||
for entry in manager.activeMappings:
|
||||
for t in [
|
||||
(entry.externalTcpPort, entry.internalTcpPort, UPNPProtocol.TCP),
|
||||
(entry.externalUdpPort, entry.internalUdpPort, UPNPProtocol.UDP),
|
||||
]:
|
||||
let
|
||||
(eport, iport, protocol) = t
|
||||
pmres = upnp.deletePortMapping(externalPort = $eport, protocol = protocol)
|
||||
if pmres.isErr:
|
||||
error "UPnP port mapping deletion", msg = pmres.error
|
||||
else:
|
||||
debug "UPnP: deleted port mapping",
|
||||
externalPort = eport, internalPort = iport, protocol = protocol
|
||||
of NatStrategy.NatPmp:
|
||||
for entry in manager.activeMappings:
|
||||
for t in [
|
||||
(entry.externalTcpPort, entry.internalTcpPort, NatPmpProtocol.TCP),
|
||||
(entry.externalUdpPort, entry.internalUdpPort, NatPmpProtocol.UDP),
|
||||
]:
|
||||
let
|
||||
(eport, iport, protocol) = t
|
||||
pmres = npmp.deletePortMapping(
|
||||
eport = eport.cushort, iport = iport.cushort, protocol = protocol
|
||||
)
|
||||
if pmres.isErr:
|
||||
error "NAT-PMP port mapping deletion", msg = pmres.error
|
||||
else:
|
||||
debug "NAT-PMP: deleted port mapping",
|
||||
externalPort = eport, internalPort = iport, protocol = protocol
|
||||
else:
|
||||
discard
|
||||
|
||||
let ipres = getExternalIP(strategy, quiet = true)
|
||||
if ipres.isSome:
|
||||
if strategy == NatStrategy.NatUpnp:
|
||||
for entry in activeMappings:
|
||||
for t in [
|
||||
(entry.externalTcpPort, entry.internalTcpPort, UPNPProtocol.TCP),
|
||||
(entry.externalUdpPort, entry.internalUdpPort, UPNPProtocol.UDP),
|
||||
]:
|
||||
let
|
||||
(eport, iport, protocol) = t
|
||||
pmres = upnp.deletePortMapping(externalPort = $eport, protocol = protocol)
|
||||
if pmres.isErr:
|
||||
error "UPnP port mapping deletion", msg = pmres.error
|
||||
else:
|
||||
debug "UPnP: deleted port mapping",
|
||||
externalPort = eport, internalPort = iport, protocol = protocol
|
||||
elif strategy == NatStrategy.NatPmp:
|
||||
for entry in activeMappings:
|
||||
for t in [
|
||||
(entry.externalTcpPort, entry.internalTcpPort, NatPmpProtocol.TCP),
|
||||
(entry.externalUdpPort, entry.internalUdpPort, NatPmpProtocol.UDP),
|
||||
]:
|
||||
let
|
||||
(eport, iport, protocol) = t
|
||||
pmres = npmp.deletePortMapping(
|
||||
eport = eport.cushort, iport = iport.cushort, protocol = protocol
|
||||
)
|
||||
if pmres.isErr:
|
||||
error "NAT-PMP port mapping deletion", msg = pmres.error
|
||||
else:
|
||||
debug "NAT-PMP: deleted port mapping",
|
||||
externalPort = eport, internalPort = iport, protocol = protocol
|
||||
manager.activeMappings.setLen(0)
|
||||
manager.extIp = none(IpAddress)
|
||||
manager.selectedStrategy = NatStrategy.NatNone
|
||||
manager.natClosed.store(false)
|
||||
|
||||
proc redirectPorts*(
|
||||
strategy: NatStrategy, tcpPort, udpPort: Port, description: string
|
||||
manager: NatManager, strategy: NatStrategy, tcpPort, udpPort: Port, description: string
|
||||
): Option[(Port, Port)] =
|
||||
result = doPortMapping(strategy, tcpPort, udpPort, description)
|
||||
if result.isSome:
|
||||
@ -315,7 +356,8 @@ proc redirectPorts*(
|
||||
# Port mapping works. Let's launch a thread that repeats it, in case the
|
||||
# NAT-PMP lease expires or the router is rebooted and forgets all about
|
||||
# these mappings.
|
||||
activeMappings.add(
|
||||
manager.natClosed.store(false)
|
||||
manager.activeMappings.add(
|
||||
PortMappings(
|
||||
internalTcpPort: tcpPort,
|
||||
externalTcpPort: externalTcpPort,
|
||||
@ -324,33 +366,41 @@ proc redirectPorts*(
|
||||
description: description,
|
||||
)
|
||||
)
|
||||
try:
|
||||
natThreads.add(Thread[PortMappingArgs]())
|
||||
natThreads[^1].createThread(
|
||||
repeatPortMapping, (strategy, externalTcpPort, externalUdpPort, description)
|
||||
)
|
||||
# atexit() in disguise
|
||||
if natThreads.len == 1:
|
||||
# we should register the thread termination function only once
|
||||
addQuitProc(stopNatThreads)
|
||||
except Exception as exc:
|
||||
warn "Failed to create NAT port mapping renewal thread", exc = exc.msg
|
||||
# Renewal thread temporarily disabled pending thread-safe reimplementation.
|
||||
|
||||
proc setupNat*(
|
||||
natStrategy: NatStrategy, tcpPort, udpPort: Port, clientId: string
|
||||
manager: NatManager, natStrategy: NatStrategy, tcpPort, udpPort: Port, clientId: string
|
||||
): tuple[ip: Option[IpAddress], tcpPort, udpPort: Option[Port]] =
|
||||
## Setup NAT port mapping and get external IP address.
|
||||
## If any of this fails, we don't return any IP address but do return the
|
||||
## original ports as best effort.
|
||||
## TODO: Allow for tcp or udp port mapping to be optional.
|
||||
if extIp.isNone:
|
||||
extIp = getExternalIP(natStrategy)
|
||||
if extIp.isSome:
|
||||
let ip = extIp.get
|
||||
|
||||
if manager.isNil:
|
||||
warn "NAT manager not initialised"
|
||||
return (ip: none(IpAddress), tcpPort: some(tcpPort), udpPort: some(udpPort))
|
||||
|
||||
if manager.extIp.isNone or manager.selectedStrategy == NatStrategy.NatNone:
|
||||
let (ipRes, selected) = getExternalIP(natStrategy)
|
||||
if ipRes.isSome and selected in {NatStrategy.NatUpnp, NatStrategy.NatPmp}:
|
||||
manager.extIp = ipRes
|
||||
manager.selectedStrategy = selected
|
||||
else:
|
||||
warn "UPnP/NAT-PMP not available"
|
||||
manager.extIp = none(IpAddress)
|
||||
manager.selectedStrategy = NatStrategy.NatNone
|
||||
return (ip: none(IpAddress), tcpPort: some(tcpPort), udpPort: some(udpPort))
|
||||
|
||||
if manager.extIp.isSome:
|
||||
let ip = manager.extIp.get
|
||||
let extPorts = (
|
||||
{.gcsafe.}:
|
||||
redirectPorts(
|
||||
strategy, tcpPort = tcpPort, udpPort = udpPort, description = clientId
|
||||
manager,
|
||||
manager.selectedStrategy,
|
||||
tcpPort = tcpPort,
|
||||
udpPort = udpPort,
|
||||
description = clientId,
|
||||
)
|
||||
)
|
||||
if extPorts.isSome:
|
||||
@ -364,7 +414,12 @@ proc setupNat*(
|
||||
(ip: none(IpAddress), tcpPort: some(tcpPort), udpPort: some(udpPort))
|
||||
|
||||
proc setupAddress*(
|
||||
natConfig: NatConfig, bindIp: IpAddress, tcpPort, udpPort: Port, clientId: string
|
||||
manager: NatManager,
|
||||
natConfig: NatConfig,
|
||||
bindIp: IpAddress,
|
||||
tcpPort,
|
||||
udpPort: Port,
|
||||
clientId: string
|
||||
): tuple[ip: Option[IpAddress], tcpPort, udpPort: Option[Port]] {.gcsafe.} =
|
||||
## Set-up of the external address via any of the ways as configured in
|
||||
## `NatConfig`. In case all fails an error is logged and the bind ports are
|
||||
@ -384,7 +439,7 @@ proc setupAddress*(
|
||||
of NoRoutingInfo, PrefSrcIsPublic, BindAddressIsPublic:
|
||||
return (prefSrcIp, some(tcpPort), some(udpPort))
|
||||
of PrefSrcIsPrivate, BindAddressIsPrivate:
|
||||
return setupNat(natConfig.nat, tcpPort, udpPort, clientId)
|
||||
return setupNat(manager, natConfig.nat, tcpPort, udpPort, clientId)
|
||||
of NatStrategy.NatNone:
|
||||
let (prefSrcIp, prefSrcStatus) = getRoutePrefSrc(bindIp)
|
||||
|
||||
@ -398,15 +453,17 @@ proc setupAddress*(
|
||||
error "Bind IP is not a public IP address. Should not use --nat:none option"
|
||||
return (none(IpAddress), some(tcpPort), some(udpPort))
|
||||
of NatStrategy.NatUpnp, NatStrategy.NatPmp:
|
||||
return setupNat(natConfig.nat, tcpPort, udpPort, clientId)
|
||||
return setupNat(manager, natConfig.nat, tcpPort, udpPort, clientId)
|
||||
|
||||
proc nattedAddress*(
|
||||
natConfig: NatConfig, addrs: seq[MultiAddress], udpPort: Port
|
||||
manager: NatManager, natConfig: NatConfig, addrs: seq[MultiAddress], udpPort: Port
|
||||
): tuple[libp2p, discovery: seq[MultiAddress]] =
|
||||
## Takes a NAT configuration, sequence of multiaddresses and UDP port and returns:
|
||||
## - Modified multiaddresses with NAT-mapped addresses for libp2p
|
||||
## - Discovery addresses with NAT-mapped UDP ports
|
||||
|
||||
doAssert(not manager.isNil, "NatManager must not be nil")
|
||||
|
||||
var discoveryAddrs = newSeq[MultiAddress](0)
|
||||
let newAddrs = addrs.mapIt:
|
||||
block:
|
||||
@ -415,7 +472,7 @@ proc nattedAddress*(
|
||||
if ipPart.isSome and port.isSome:
|
||||
# Try to setup NAT mapping for the address
|
||||
let (newIP, tcp, udp) =
|
||||
setupAddress(natConfig, ipPart.get, port.get, udpPort, "codex")
|
||||
setupAddress(manager, natConfig, ipPart.get, port.get, udpPort, "codex")
|
||||
if newIP.isSome:
|
||||
# NAT mapping successful - add discovery address with mapped UDP port
|
||||
discoveryAddrs.add(getMultiAddrWithIPAndUDPPort(newIP.get, udp.get))
|
||||
|
||||
@ -186,7 +186,11 @@ proc generateNodes*(
|
||||
|
||||
if config.enableBootstrap:
|
||||
waitFor switch.peerInfo.update()
|
||||
let natManager = newNatManager()
|
||||
defer:
|
||||
shutdownNat(natManager)
|
||||
let (announceAddrs, discoveryAddrs) = nattedAddress(
|
||||
natManager,
|
||||
NatConfig(hasExtIp: false, nat: NatNone),
|
||||
switch.peerInfo.addrs,
|
||||
bindPort.Port,
|
||||
|
||||
@ -38,8 +38,12 @@ suite "NAT Address Tests":
|
||||
#ipv6Addr = MultiAddress.init("/ip6/::1/tcp/5000").expect("valid multiaddr")
|
||||
addrs = @[localAddr, anyAddr, publicAddr]
|
||||
|
||||
let natManager = newNatManager()
|
||||
defer:
|
||||
shutdownNat(natManager)
|
||||
|
||||
# Test address remapping
|
||||
let (libp2pAddrs, discoveryAddrs) = nattedAddress(natConfig, addrs, udpPort)
|
||||
let (libp2pAddrs, discoveryAddrs) = nattedAddress(natManager, natConfig, addrs, udpPort)
|
||||
|
||||
# Verify results
|
||||
check(discoveryAddrs == expectedDiscoveryAddrs)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user