mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-02-25 07:53:19 +00:00
fix: Support for mapping multiple listener address (#1236)
* multi address mapping support * fix thread issues * fix local thread var issue * chore: rename stopNatThread to stopNatThreads --------- Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com>
This commit is contained in:
parent
25a8077e80
commit
71422f0d3d
167
codex/nat.nim
167
codex/nat.nim
@ -9,7 +9,7 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[options, os, strutils, times, net],
|
||||
std/[options, os, strutils, times, net, atomics],
|
||||
stew/shims/net as stewNet,
|
||||
stew/[objects, results],
|
||||
nat_traversal/[miniupnpc, natpmp],
|
||||
@ -28,14 +28,29 @@ const
|
||||
PORT_MAPPING_INTERVAL = 20 * 60 # seconds
|
||||
NATPMP_LIFETIME = 60 * 60 # in seconds, must be longer than PORT_MAPPING_INTERVAL
|
||||
|
||||
var
|
||||
upnp {.threadvar.}: Miniupnp
|
||||
npmp {.threadvar.}: NatPmp
|
||||
strategy = NatStrategy.NatNone
|
||||
type PortMappings* = object
|
||||
internalTcpPort: Port
|
||||
externalTcpPort: Port
|
||||
internalUdpPort: Port
|
||||
externalUdpPort: Port
|
||||
description: string
|
||||
|
||||
type PortMappingArgs =
|
||||
tuple[strategy: NatStrategy, tcpPort, udpPort: Port, description: string]
|
||||
|
||||
type NatConfig* = object
|
||||
case hasExtIp*: bool
|
||||
of true: extIp*: IpAddress
|
||||
of false: nat*: NatStrategy
|
||||
|
||||
var
|
||||
upnp {.threadvar.}: Miniupnp
|
||||
npmp {.threadvar.}: NatPmp
|
||||
strategy = NatStrategy.NatNone
|
||||
natClosed: Atomic[bool]
|
||||
extIp: Option[IpAddress]
|
||||
activeMappings: seq[PortMappings]
|
||||
natThreads: seq[Thread[PortMappingArgs]] = @[]
|
||||
|
||||
logScope:
|
||||
topics = "nat"
|
||||
@ -107,7 +122,7 @@ proc getExternalIP*(natStrategy: NatStrategy, quiet = false): Option[IpAddress]
|
||||
else:
|
||||
try:
|
||||
externalIP = parseIpAddress($(nires.value))
|
||||
strategy = NatPmp
|
||||
strategy = NatStrategy.NatPmp
|
||||
return some(externalIP)
|
||||
except ValueError as e:
|
||||
error "parseIpAddress() exception", err = e.msg
|
||||
@ -153,7 +168,7 @@ proc getPublicRoutePrefSrcOrExternalIP*(
|
||||
return some(extIp.get)
|
||||
|
||||
proc doPortMapping(
|
||||
tcpPort, udpPort: Port, description: string
|
||||
strategy: NatStrategy, tcpPort, udpPort: Port, description: string
|
||||
): Option[(Port, Port)] {.gcsafe.} =
|
||||
var
|
||||
extTcpPort: Port
|
||||
@ -213,15 +228,10 @@ proc doPortMapping(
|
||||
extUdpPort = extPort
|
||||
return some((extTcpPort, extUdpPort))
|
||||
|
||||
type PortMappingArgs = tuple[tcpPort, udpPort: Port, description: string]
|
||||
var
|
||||
natThread: Thread[PortMappingArgs]
|
||||
natCloseChan: Channel[bool]
|
||||
|
||||
proc repeatPortMapping(args: PortMappingArgs) {.thread, raises: [ValueError].} =
|
||||
ignoreSignalsInThread()
|
||||
let
|
||||
(tcpPort, udpPort, description) = args
|
||||
(strategy, tcpPort, udpPort, description) = args
|
||||
interval = initDuration(seconds = PORT_MAPPING_INTERVAL)
|
||||
sleepDuration = 1_000 # in ms, also the maximum delay after pressing Ctrl-C
|
||||
|
||||
@ -233,30 +243,23 @@ proc repeatPortMapping(args: PortMappingArgs) {.thread, raises: [ValueError].} =
|
||||
# even though we don't need the external IP's value.
|
||||
let ipres = getExternalIP(strategy, quiet = true)
|
||||
if ipres.isSome:
|
||||
while true:
|
||||
# we're being silly here with this channel polling because we can't
|
||||
# select on Nim channels like on Go ones
|
||||
let (dataAvailable, _) =
|
||||
try:
|
||||
natCloseChan.tryRecv()
|
||||
except Exception:
|
||||
(false, false)
|
||||
if dataAvailable:
|
||||
return
|
||||
else:
|
||||
let currTime = now()
|
||||
if currTime >= (lastUpdate + interval):
|
||||
discard doPortMapping(tcpPort, udpPort, description)
|
||||
lastUpdate = currTime
|
||||
while natClosed.load() == false:
|
||||
let
|
||||
# we're being silly here with this channel polling because we can't
|
||||
# select on Nim channels like on Go ones
|
||||
currTime = now()
|
||||
if currTime >= (lastUpdate + interval):
|
||||
discard doPortMapping(strategy, tcpPort, udpPort, description)
|
||||
lastUpdate = currTime
|
||||
|
||||
sleep(sleepDuration)
|
||||
|
||||
proc stopNatThread() {.noconv.} =
|
||||
proc stopNatThreads() {.noconv.} =
|
||||
# stop the thread
|
||||
|
||||
debug "Stopping NAT port mapping renewal threads"
|
||||
try:
|
||||
natCloseChan.send(true)
|
||||
natThread.joinThread()
|
||||
natCloseChan.close()
|
||||
natClosed.store(true)
|
||||
joinThreads(natThreads)
|
||||
except Exception as exc:
|
||||
warn "Failed to stop NAT port mapping renewal thread", exc = exc.msg
|
||||
|
||||
@ -268,54 +271,68 @@ proc stopNatThread() {.noconv.} =
|
||||
|
||||
# In Windows, a new thread is created for the signal handler, so we need to
|
||||
# initialise our threadvars again.
|
||||
|
||||
let ipres = getExternalIP(strategy, quiet = true)
|
||||
if ipres.isSome:
|
||||
if strategy == NatStrategy.NatUpnp:
|
||||
for t in [
|
||||
(externalTcpPort, internalTcpPort, UPNPProtocol.TCP),
|
||||
(externalUdpPort, internalUdpPort, UPNPProtocol.UDP),
|
||||
]:
|
||||
let
|
||||
(eport, iport, protocol) = t
|
||||
pmres = upnp.deletePortMapping(externalPort = $eport, protocol = protocol)
|
||||
if pmres.isErr:
|
||||
error "UPnP port mapping deletion", msg = pmres.error
|
||||
else:
|
||||
debug "UPnP: deleted port mapping",
|
||||
externalPort = eport, internalPort = iport, protocol = protocol
|
||||
for entry in activeMappings:
|
||||
for t in [
|
||||
(entry.externalTcpPort, entry.internalTcpPort, UPNPProtocol.TCP),
|
||||
(entry.externalUdpPort, entry.internalUdpPort, UPNPProtocol.UDP),
|
||||
]:
|
||||
let
|
||||
(eport, iport, protocol) = t
|
||||
pmres = upnp.deletePortMapping(externalPort = $eport, protocol = protocol)
|
||||
if pmres.isErr:
|
||||
error "UPnP port mapping deletion", msg = pmres.error
|
||||
else:
|
||||
debug "UPnP: deleted port mapping",
|
||||
externalPort = eport, internalPort = iport, protocol = protocol
|
||||
elif strategy == NatStrategy.NatPmp:
|
||||
for t in [
|
||||
(externalTcpPort, internalTcpPort, NatPmpProtocol.TCP),
|
||||
(externalUdpPort, internalUdpPort, NatPmpProtocol.UDP),
|
||||
]:
|
||||
let
|
||||
(eport, iport, protocol) = t
|
||||
pmres = npmp.deletePortMapping(
|
||||
eport = eport.cushort, iport = iport.cushort, protocol = protocol
|
||||
)
|
||||
if pmres.isErr:
|
||||
error "NAT-PMP port mapping deletion", msg = pmres.error
|
||||
else:
|
||||
debug "NAT-PMP: deleted port mapping",
|
||||
externalPort = eport, internalPort = iport, protocol = protocol
|
||||
for entry in activeMappings:
|
||||
for t in [
|
||||
(entry.externalTcpPort, entry.internalTcpPort, NatPmpProtocol.TCP),
|
||||
(entry.externalUdpPort, entry.internalUdpPort, NatPmpProtocol.UDP),
|
||||
]:
|
||||
let
|
||||
(eport, iport, protocol) = t
|
||||
pmres = npmp.deletePortMapping(
|
||||
eport = eport.cushort, iport = iport.cushort, protocol = protocol
|
||||
)
|
||||
if pmres.isErr:
|
||||
error "NAT-PMP port mapping deletion", msg = pmres.error
|
||||
else:
|
||||
debug "NAT-PMP: deleted port mapping",
|
||||
externalPort = eport, internalPort = iport, protocol = protocol
|
||||
|
||||
proc redirectPorts*(tcpPort, udpPort: Port, description: string): Option[(Port, Port)] =
|
||||
result = doPortMapping(tcpPort, udpPort, description)
|
||||
proc redirectPorts*(
|
||||
strategy: NatStrategy, tcpPort, udpPort: Port, description: string
|
||||
): Option[(Port, Port)] =
|
||||
result = doPortMapping(strategy, tcpPort, udpPort, description)
|
||||
if result.isSome:
|
||||
(externalTcpPort, externalUdpPort) = result.get()
|
||||
let (externalTcpPort, externalUdpPort) = result.get()
|
||||
# needed by NAT-PMP on port mapping deletion
|
||||
internalTcpPort = tcpPort
|
||||
internalUdpPort = udpPort
|
||||
# Port mapping works. Let's launch a thread that repeats it, in case the
|
||||
# NAT-PMP lease expires or the router is rebooted and forgets all about
|
||||
# these mappings.
|
||||
natCloseChan.open()
|
||||
activeMappings.add(
|
||||
PortMappings(
|
||||
internalTcpPort: tcpPort,
|
||||
externalTcpPort: externalTcpPort,
|
||||
internalUdpPort: udpPort,
|
||||
externalUdpPort: externalUdpPort,
|
||||
description: description,
|
||||
)
|
||||
)
|
||||
try:
|
||||
natThread.createThread(
|
||||
repeatPortMapping, (externalTcpPort, externalUdpPort, description)
|
||||
natThreads.add(Thread[PortMappingArgs]())
|
||||
natThreads[^1].createThread(
|
||||
repeatPortMapping, (strategy, externalTcpPort, externalUdpPort, description)
|
||||
)
|
||||
# atexit() in disguise
|
||||
addQuitProc(stopNatThread)
|
||||
if natThreads.len == 1:
|
||||
# we should register the thread termination function only once
|
||||
addQuitProc(stopNatThreads)
|
||||
except Exception as exc:
|
||||
warn "Failed to create NAT port mapping renewal thread", exc = exc.msg
|
||||
|
||||
@ -326,12 +343,15 @@ proc setupNat*(
|
||||
## If any of this fails, we don't return any IP address but do return the
|
||||
## original ports as best effort.
|
||||
## TODO: Allow for tcp or udp port mapping to be optional.
|
||||
let extIp = getExternalIP(natStrategy)
|
||||
if extIp.isNone:
|
||||
extIp = getExternalIP(natStrategy)
|
||||
if extIp.isSome:
|
||||
let ip = extIp.get
|
||||
let extPorts = (
|
||||
{.gcsafe.}:
|
||||
redirectPorts(tcpPort = tcpPort, udpPort = udpPort, description = clientId)
|
||||
redirectPorts(
|
||||
strategy, tcpPort = tcpPort, udpPort = udpPort, description = clientId
|
||||
)
|
||||
)
|
||||
if extPorts.isSome:
|
||||
let (extTcpPort, extUdpPort) = extPorts.get()
|
||||
@ -343,11 +363,6 @@ proc setupNat*(
|
||||
warn "UPnP/NAT-PMP not available"
|
||||
(ip: none(IpAddress), tcpPort: some(tcpPort), udpPort: some(udpPort))
|
||||
|
||||
type NatConfig* = object
|
||||
case hasExtIp*: bool
|
||||
of true: extIp*: IpAddress
|
||||
of false: nat*: NatStrategy
|
||||
|
||||
proc setupAddress*(
|
||||
natConfig: NatConfig, bindIp: IpAddress, tcpPort, udpPort: Port, clientId: string
|
||||
): tuple[ip: Option[IpAddress], tcpPort, udpPort: Option[Port]] {.gcsafe.} =
|
||||
@ -389,7 +404,7 @@ proc nattedAddress*(
|
||||
natConfig: NatConfig, addrs: seq[MultiAddress], udpPort: Port
|
||||
): tuple[libp2p, discovery: seq[MultiAddress]] =
|
||||
## Takes a NAT configuration, sequence of multiaddresses and UDP port and returns:
|
||||
## - Modified multiaddresses with NAT-mapped addresses for libp2p
|
||||
## - Modified multiaddresses with NAT-mapped addresses for libp2p
|
||||
## - Discovery addresses with NAT-mapped UDP ports
|
||||
|
||||
var discoveryAddrs = newSeq[MultiAddress](0)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user