Use thread to set the nat ports

This commit is contained in:
Arnaud 2026-05-08 14:56:42 +04:00
parent ae261e3210
commit 082411e050
No known key found for this signature in database
GPG Key ID: A6C7C781817146FA
2 changed files with 76 additions and 11 deletions

View File

@ -12,6 +12,7 @@ import std/[options, net]
import results
import pkg/chronos
import pkg/chronos/threadsync
import pkg/chronicles
import pkg/libp2p
import pkg/libp2p/services/autorelayservice
@ -24,6 +25,8 @@ import ./discovery
logScope:
topics = "nat"
const NatPortMappingTimeout = 5.seconds
type NatConfig* = object
case hasExtIp*: bool
of true: extIp*: IpAddress
@ -35,26 +38,86 @@ type NatMapper* = ref object of RootObj
discoveryPort*: Port
hasUpnpMapping: bool
method mapNatPorts*(m: NatMapper): Option[(Port, Port)] {.base, gcsafe, raises: [].} =
if m.natConfig.hasExtIp:
return none((Port, Port))
type MapNatPortsCtx = object
natConfig: NatConfig
tcpPort: Port
discoveryPort: Port
signal: ThreadSignalPtr
result: Option[(Port, Port)]
hasUpnpMapping: bool
proc mapNatPortsThread(ctx: ptr MapNatPortsCtx) {.thread.} =
if ctx.natConfig.hasExtIp:
discard ctx.signal.fireSync()
return
# Devices are recreated on each call: discover() costs ~200ms but only fires
# when AutoNAT reports NotReachable, which is exactly when we want a fresh scan.
let upnpRes = UpnpDevice.init()
if upnpRes.isOk:
let ports = upnpRes.value.mapPorts(m.tcpPort, m.discoveryPort)
let ports = upnpRes.value.mapPorts(ctx.tcpPort, ctx.discoveryPort)
if ports.isSome:
m.hasUpnpMapping = true
return ports
ctx.hasUpnpMapping = true
ctx.result = ports
discard ctx.signal.fireSync()
return
let pmpRes = PmpDevice.init()
if pmpRes.isOk:
let ports = pmpRes.value.mapPorts(m.tcpPort, m.discoveryPort)
let ports = pmpRes.value.mapPorts(ctx.tcpPort, ctx.discoveryPort)
if ports.isSome:
return ports
ctx.result = ports
none((Port, Port))
discard ctx.signal.fireSync()
method mapNatPorts*(
m: NatMapper
): Future[Option[(Port, Port)]] {.async: (raises: [CancelledError]), base, gcsafe.} =
let signal = ThreadSignalPtr.new().valueOr:
warn "Failed to create ThreadSignalPtr for NAT port mapping"
return none((Port, Port))
var ctx = cast[ptr MapNatPortsCtx](createShared(MapNatPortsCtx))
ctx[] = MapNatPortsCtx(
natConfig: m.natConfig,
tcpPort: m.tcpPort,
discoveryPort: m.discoveryPort,
signal: signal,
)
var thread: Thread[ptr MapNatPortsCtx]
var threadStarted = false
defer:
if threadStarted:
# Blocking the event loop here is acceptable: UPnP discover() is bounded
# by UPNP_TIMEOUT (200ms), so the worst-case stall is ~200ms.
joinThread(thread)
# Always sync hasUpnpMapping back, even on timeout or cancellation.
# If the thread mapped ports just after the timeout, close() will
# still clean them up on the router.
if ctx.hasUpnpMapping:
m.hasUpnpMapping = true
freeShared(ctx)
discard signal.close()
try:
createThread(thread, mapNatPortsThread, ctx)
threadStarted = true
except ValueError, ResourceExhaustedError:
warn "Failed to create thread for NAT port mapping"
return none((Port, Port))
try:
if not await signal.wait().withTimeout(NatPortMappingTimeout):
warn "NAT port mapping thread timed out"
return none((Port, Port))
except CancelledError as exc:
raise exc
except AsyncError as exc:
warn "Error waiting for NAT port mapping thread", error = exc.msg
return none((Port, Port))
return ctx.result
method handleNatStatus*(
m: NatMapper,
@ -85,7 +148,7 @@ method handleNatStatus*(
if dialBackAddr.isNone:
warn "Got empty dialback address in AutoNat when node is NotReachable"
else:
let maybePorts = m.mapNatPorts()
let maybePorts = await m.mapNatPorts()
if maybePorts.isSome:
let (tcpPort, udpPort) = maybePorts.get()

View File

@ -37,7 +37,9 @@ method deletePortMapping*(
type MockNatMapper = ref object of NatMapper
mappedPorts: Option[(Port, Port)]
method mapNatPorts*(m: MockNatMapper): Option[(Port, Port)] {.gcsafe, raises: [].} =
method mapNatPorts*(
m: MockNatMapper
): Future[Option[(Port, Port)]] {.async: (raises: [CancelledError]), gcsafe.} =
m.mappedPorts
suite "NatMapper.close":