feat: drop renewal thread in favor of async polling

This commit is contained in:
Adam Uhlíř 2025-11-06 17:49:23 +01:00
parent 56a1b664b5
commit 1b8537494f
No known key found for this signature in database
GPG Key ID: 0CBD7AA7B5A72FED
5 changed files with 163 additions and 174 deletions

View File

@ -76,10 +76,12 @@ proc start*(s: CodexServer) {.async.} =
await s.codexNode.switch.start()
s.reachabilityManager.getAnnounceRecords = some proc() =
s.codexNode.switch.peerInfo.addrs
s.reachabilityManager.getDiscoveryRecords = some proc() =
s.codexNode.discovery.dhtRecord.data.addresses.mapIt(it.address)
s.reachabilityManager.getAnnounceRecords = some proc(): ?seq[MultiAddress] =
s.codexNode.switch.peerInfo.addrs.some
s.reachabilityManager.getDiscoveryRecords = some proc(): ?seq[MultiAddress] =
if dhtRecord =? s.codexNode.discovery.dhtRecord:
return dhtRecord.data.addresses.mapIt(it.address).some
s.reachabilityManager.updateAnnounceRecords = some proc(records: seq[MultiAddress]) =
s.codexNode.discovery.updateAnnounceRecord(records)
s.reachabilityManager.updateDiscoveryRecords = some proc(records: seq[MultiAddress]) =
@ -148,7 +150,7 @@ proc new*(
): CodexServer =
## create CodexServer including setting up datastore, repostore, etc
let reachabilityManager = ReachabilityManager.new(config.portMappingStrategy)
let reachabilityManager = ReachabilityManager.new(config.forcePortMapping)
let switch = SwitchBuilder
.new()

View File

@ -49,7 +49,7 @@ export units, net, codextypes, logutils, completeCmdArg, parseCmdArg
export
DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockInterval, DefaultNumBlocksPerInterval,
DefaultBlockRetries
DefaultBlockRetries, PortMappingStrategy
type ThreadCount* = distinct Natural

View File

@ -8,7 +8,7 @@
{.push raises: [].}
import std/[options, os, strutils, times, net, atomics]
import std/[options, strutils, net]
import pkg/nat_traversal/[miniupnpc, natpmp]
import pkg/json_serialization/std/net
@ -26,7 +26,7 @@ logScope:
const
UPNP_TIMEOUT = 200 # ms
RENEWAL_INTERVAL = 20 * 60 # seconds
RENEWAL_SLEEP = (20 * 60).seconds
Pmp_LIFETIME = 60 * 60 # in seconds, must be longer than RENEWAL_INTERVAL
MAPPING_DESCRIPTION = "codex"
@ -39,7 +39,7 @@ type PortMappingStrategy* = enum
type MappingPort* = ref object of RootObj
value*: Port
proc `$`(p: MappingPort): string =
proc `$`*(p: MappingPort): string =
$(p.value)
type TcpPort* = ref object of MappingPort
@ -51,37 +51,33 @@ proc newTcpMappingPort*(value: Port): TcpPort =
proc newUdpMappingPort*(value: Port): UdpPort =
UdpPort(value: value)
type PortMapping* = tuple[internalPort: MappingPort, externalPort: Option[MappingPort]]
type RenewelThreadArgs =
tuple[strategy: PortMappingStrategy, portMapping: seq[PortMapping]]
type PortMappingEntry* =
tuple[internalPort: MappingPort, externalPort: Option[MappingPort]]
var
upnp {.threadvar.}: Miniupnp
npmp {.threadvar.}: NatPmp
mappings: seq[PortMapping]
portMappingExiting: Atomic[bool]
renewalThread: Thread[RenewelThreadArgs]
type PortMapping* = ref object of RootObj
upnp: Miniupnp
npmp: NatPmp
mappings: seq[PortMappingEntry]
renewalLoop: Future[void]
proc initUpnp(): bool =
proc initUpnp(self: PortMapping) =
logScope:
protocol = "upnp"
if upnp != nil:
if not self.upnp.isNil:
warn "UPnP already initialized!"
return true
upnp = newMiniupnp()
upnp.discoverDelay = UPNP_TIMEOUT
self.upnp = newMiniupnp()
self.upnp.discoverDelay = UPNP_TIMEOUT
if err =? upnp.discover().errorOption:
if err =? self.upnp.discover().errorOption:
warn "UPnP error discoverning Internet Gateway Devices", msg = err
upnp = nil
return false
self.upnp = nil
case upnp.selectIGD()
case self.upnp.selectIGD()
of IGDNotFound:
info "UPnP Internet Gateway Device not found. Giving up."
upnp = nil
self.upnp = nil
# As UPnP is not supported on our network we won't be using it --> lets erase it.
of IGDFound:
info "UPnP Internet Gateway Device found."
@ -92,47 +88,45 @@ proc initUpnp(): bool =
of IGDIpNotRoutable:
info "UPnP Internet Gateway Device found and is connected, but with a reserved or non-routable IP. Trying anyway."
return true
proc initNpmp(): bool =
proc initNpmp(self: PortMapping) =
logScope:
protocol = "npmp"
if npmp != nil:
if not self.npmp.isNil:
warn "NAT-PMP already initialized!"
return true
npmp = newNatPmp()
self.npmp = newNatPmp()
if err =? npmp.init().errorOption:
if err =? self.npmp.init().errorOption:
warn "Error initialization of NAT-PMP", msg = err
npmp = nil
return false
self.npmp = nil
if err =? npmp.externalIPAddress().errorOption:
if err =? self.npmp.externalIPAddress().errorOption:
warn "Fetching of external IP failed.", msg = err
npmp = nil
return false
self.npmp = nil
info "NAT-PMP initialized"
return true
## Try to initilize all the port mapping protocols and returns
## the protocol that will be used.
proc initProtocols(strategy: PortMappingStrategy): PortMappingStrategy =
## Try to initilize all the port mapping protocols based on what is available on the network
proc initProtocols(self: PortMapping, strategy: PortMappingStrategy) =
if strategy == PortMappingStrategy.Any or strategy == PortMappingStrategy.Upnp:
if initUpnp():
return PortMappingStrategy.Upnp
self.initUpnp()
if not self.upnp.isNil:
return # UPnP is available, using that, no need for NAT-PMP.
if strategy == PortMappingStrategy.Any or strategy == PortMappingStrategy.Pmp:
if initNpmp():
return PortMappingStrategy.Pmp
self.initNpmp()
return PortMappingStrategy.None
proc new*(T: type PortMapping, strategy: PortMappingStrategy): PortMapping =
let mapping = PortMapping(upnp: nil, npmp: nil, mappings: @[])
mapping.initProtocols(strategy)
return mapping
proc upnpPortMapping(
internalPort: MappingPort, externalPort: MappingPort
): ?!MappingPort {.gcsafe.} =
self: PortMapping, internalPort: MappingPort, externalPort: MappingPort
): ?!MappingPort =
let protocol = if (internalPort is TcpPort): UPNPProtocol.TCP else: UPNPProtocol.UDP
logScope:
@ -141,10 +135,10 @@ proc upnpPortMapping(
internalPort = internalPort.value
protocol = protocol
let pmres = upnp.addPortMapping(
let pmres = self.upnp.addPortMapping(
externalPort = $(externalPort.value),
protocol = protocol,
internalHost = upnp.lanAddr,
internalHost = self.upnp.lanAddr,
internalPort = $(internalPort.value),
desc = MAPPING_DESCRIPTION,
leaseDuration = 0,
@ -155,7 +149,7 @@ proc upnpPortMapping(
return failure($pmres.error)
# let's check it
let cres = upnp.getSpecificPortMapping(
let cres = self.upnp.getSpecificPortMapping(
externalPort = $(externalPort.value), protocol = protocol
)
if cres.isErr:
@ -166,8 +160,8 @@ proc upnpPortMapping(
return success(externalPort)
proc npmpPortMapping(
internalPort: MappingPort, externalPort: MappingPort
): ?!MappingPort {.gcsafe.} =
self: PortMapping, internalPort: MappingPort, externalPort: MappingPort
): ?!MappingPort =
let protocol =
if (internalPort is TcpPort): NatPmpProtocol.TCP else: NatPmpProtocol.UDP
@ -177,7 +171,7 @@ proc npmpPortMapping(
internalPort = internalPort.value
protocol = protocol
let extPortRes = npmp.addPortMapping(
let extPortRes = self.npmp.addPortMapping(
eport = externalPort.value.cushort,
iport = internalPort.value.cushort,
protocol = protocol,
@ -200,75 +194,48 @@ proc npmpPortMapping(
##
## TODO: Add support for trying mapping of random external port.
proc doPortMapping(port: MappingPort): ?!MappingPort {.gcsafe.} =
if upnp != nil:
return upnpPortMapping(port, port)
proc doPortMapping(self: PortMapping, port: MappingPort): ?!MappingPort =
if not self.upnp.isNil:
return self.upnpPortMapping(port, port)
if npmp != nil:
return npmpPortMapping(port, port)
if not self.npmp.isNil:
return self.npmpPortMapping(port, port)
return failure("No active startegy")
proc doPortMapping(
internalPort: MappingPort, externalPort: MappingPort
): ?!MappingPort {.gcsafe.} =
if upnp != nil:
return upnpPortMapping(internalPort, externalPort)
self: PortMapping, internalPort: MappingPort, externalPort: MappingPort
): ?!MappingPort =
if not self.upnp.isNil:
return self.upnpPortMapping(internalPort, externalPort)
if npmp != nil:
return npmpPortMapping(internalPort, externalPort)
if not self.npmp.isNil:
return self.npmpPortMapping(internalPort, externalPort)
return failure("No active startegy")
proc renewPortMapping(args: RenewelThreadArgs) {.thread, raises: [ValueError].} =
ignoreSignalsInThread()
let
(strategy, portMappings) = args
interval = initDuration(seconds = RENEWAL_INTERVAL)
sleepDuration = 1_000 # in ms, also the maximum delay after pressing Ctrl-C
proc renewPortMapping(self: PortMapping) {.async.} =
while true:
for mapping in self.mappings:
if externalPort =? mapping.externalPort:
without renewedExternalPort =?
self.doPortMapping(mapping.internalPort, externalPort), err:
error "Error while renewal of port mapping", msg = err.msg
var lastUpdate = now()
if renewedExternalPort.value != externalPort.value:
error "The renewed external port is not the same as the originally mapped"
# We can't use copies of Miniupnp and Pmp objects in this thread, because they share
# C pointers with other instances that have already been garbage collected, so
# we use threadvars instead and initialise them again with initProtocols(),
# even though we don't need the external IP's value.
if initProtocols(strategy) == PortMappingStrategy.None:
error "Could not initiate protocols in renewal thread"
return
while portMappingExiting.load() == false:
if now() >= (lastUpdate + interval):
for mapping in portMappings:
if externalPort =? mapping.externalPort:
without renewedExternalPort =?
doPortMapping(mapping.internalPort, externalPort), err:
error "Error while renewal of port mapping", msg = err.msg
if renewedExternalPort.value != externalPort.value:
error "The renewed external port is not the same as the originally mapped"
lastUpdate = now()
sleep(sleepDuration)
proc startRenewalThread(strategy: PortMappingStrategy) =
try:
renewalThread = Thread[RenewelThreadArgs]()
renewalThread.createThread(renewPortMapping, (strategy, mappings))
except CatchableError as exc:
warn "Failed to create NAT port mapping renewal thread", exc = exc.msg
await sleepAsync(RENEWAL_SLEEP)
## Gets external IP provided by the port mapping protocols
## Port mapping needs to be succesfully started first using `startPortMapping()`
proc getExternalIP*(): ?IpAddress =
if upnp == nil and npmp == nil:
proc getExternalIP*(self: PortMapping): ?IpAddress =
if self.upnp.isNil and self.npmp.isNil:
warn "No available port-mapping protocol"
return IpAddress.none
if upnp != nil:
let ires = upnp.externalIPAddress
if not self.upnp.isNil:
let ires = self.upnp.externalIPAddress
if ires.isOk():
info "Got externa IP address", ip = ires.value
try:
@ -279,8 +246,8 @@ proc getExternalIP*(): ?IpAddress =
debug "Getting external IP address using UPnP failed",
msg = ires.error, protocol = "upnp"
if npmp != nil:
let nires = npmp.externalIPAddress()
if not self.npmp.isNil:
let nires = self.npmp.externalIPAddress()
if nires.isErr:
debug "Getting external IP address using NAT-PMP failed", msg = nires.error
else:
@ -292,57 +259,62 @@ proc getExternalIP*(): ?IpAddress =
return IpAddress.none
proc startPortMapping*(
strategy: var PortMappingStrategy, internalPorts: seq[MappingPort]
): ?!seq[PortMapping] =
if strategy == PortMappingStrategy.None:
return failure("No port mapping strategy requested")
## Returns true if some supported port mapping protocol
## is available on the local network
proc isAvailable*(self: PortMapping): bool =
return (not self.upnp.isNil) or (not self.npmp.isNil)
proc start*(
self: PortMapping, internalPorts: seq[MappingPort]
): Future[?!seq[PortMappingEntry]] {.async: (raises: [CancelledError]).} =
if internalPorts.len == 0:
return failure("No internal ports to be mapped were supplied")
strategy = initProtocols(strategy)
if strategy == PortMappingStrategy.None:
if not self.isAvailable():
return failure("No available port mapping protocols on the network")
if mappings.len > 0:
if self.mappings.len > 0:
return failure("Port mapping was already started! Stop first before re-starting.")
mappings = newSeqOfCap[PortMapping](internalPorts.len)
for port in internalPorts:
without mappedPort =? doPortMapping(port), err:
without mappedPort =? self.doPortMapping(port), err:
warn "Failed to map port", port = port, msg = err.msg
mappings.add((internalPort: port, externalPort: MappingPort.none))
self.mappings.add((internalPort: port, externalPort: MappingPort.none))
mappings.add((internalPort: port, externalPort: mappedPort.some))
self.mappings.add((internalPort: port, externalPort: mappedPort.some))
startRenewalThread(strategy)
self.renewalLoop = self.renewPortMapping()
asyncSpawn(self.renewalLoop)
return success(mappings)
return success(self.mappings)
proc stopPortMapping*() =
if upnp == nil or npmp == nil:
proc stop*(self: PortMapping) {.async: (raises: [CancelledError]).} =
if self.upnp.isNil or self.npmp.isNil:
debug "Port mapping is not running, nothing to stop"
return
info "Stopping port mapping renewal threads"
try:
portMappingExiting.store(true)
renewalThread.joinThread()
except CatchableError as exc:
warn "Failed to stop port mapping renewal thread", exc = exc.msg
info "Stopping port mapping renewal loop"
if not self.renewalLoop.isNil:
if not self.renewalLoop.finished:
try:
await self.renewalLoop.cancelAndWait()
except CancelledError:
discard
except CatchableError as e:
error "Error during cancellation of renewal loop", msg = e.msg
for mapping in mappings:
self.renewalLoop = nil
for mapping in self.mappings:
if mapping.externalPort.isNone:
continue
if upnp != nil:
if not self.upnp.isNil:
let protocol =
if (mapping.internalPort is TcpPort): UPNPProtocol.TCP else: UPNPProtocol.UDP
if err =?
upnp.deletePortMapping(
self.upnp.deletePortMapping(
externalPort = $((!mapping.externalPort).value), protocol = protocol
).errorOption:
error "UPnP port mapping deletion error", msg = err
@ -352,12 +324,12 @@ proc stopPortMapping*() =
internalPort = mapping.internalPort,
protocol = protocol
if npmp != nil:
if not self.npmp.isNil:
let protocol =
if (mapping.internalPort is TcpPort): NatPmpProtocol.TCP else: NatPmpProtocol.UDP
if err =?
npmp.deletePortMapping(
self.npmp.deletePortMapping(
eport = (!mapping.externalPort).value.cushort,
iport = mapping.internalPort.value.cushort,
protocol = protocol,
@ -369,4 +341,4 @@ proc stopPortMapping*() =
internalPort = mapping.internalPort,
protocol = protocol
mappings = @[]
self.mappings = @[]

View File

@ -19,58 +19,74 @@ logScope:
type
ReachabilityManager* = ref object of RootObj
started = false
portMapping: PortMapping
networkReachability*: NetworkReachability
portMappingStrategy: PortMappingStrategy
getAnnounceRecords*: ?GetRecords
getDiscoveryRecords*: ?GetRecords
updateAnnounceRecords*: ?UpdateRecords
updateDiscoveryRecords*: ?UpdateRecords
started = false
GetRecords* = proc(): seq[MultiAddress] {.raises: [].}
GetRecords* = proc(): ?seq[MultiAddress] {.raises: [].}
UpdateRecords* = proc(records: seq[MultiAddress]) {.raises: [].}
proc new*(
T: typedesc[ReachabilityManager], portMappingStrategy: PortMappingStrategy
): T =
return T(portMappingStrategy: portMappingStrategy)
return T(portMapping: PortMapping.new(portMappingStrategy))
proc startPortMapping(self: ReachabilityManager): bool =
proc startPortMapping(
self: ReachabilityManager
): Future[bool] {.async: (raises: [CancelledError]).} =
# This check guarantees us that the callbacks are set
# and hence we can use ! (Option.get) without fear.
if not self.started:
warn "ReachabilityManager is not started, yet we are trying to map ports already!"
return false
try:
let announceRecords = (!self.getAnnounceRecords)()
let discoveryRecords = (!self.getDiscoveryRecords)()
let portsToBeMapped =
(announceRecords & discoveryRecords).mapIt(getAddressAndPort(it)).mapIt(it.port)
{.gcsafe.}:
let announceRecords = (!self.getAnnounceRecords)()
let discoveryRecords = (!self.getDiscoveryRecords)()
without mappedPorts =? startPortMapping(self.portMappingStrategy, portsToBeMapped),
err:
var records: seq[MultiAddress] = @[]
if announceRecords.isSome:
records.add(!announceRecords)
if discoveryRecords.isSome:
records.add(!discoveryRecords)
let portsToBeMapped = records.mapIt(getAddressAndPort(it)).mapIt(it.port)
without mappedPorts =? (await self.portMapping.start(portsToBeMapped)), err:
warn "Could not start port mapping", msg = err.msg
return false
if mappedPorts.any(
proc(x: PortMapping): bool =
proc(x: PortMappingEntry): bool =
isNone(x.externalPort)
):
warn "Some ports were not mapped - not using port mapping then"
return false
info "Started port mapping"
info "Succesfully exposed ports", ports = portsToBeMapped
let announceMappedRecords = zip(
announceRecords, mappedPorts[0 .. announceRecords.len - 1]
)
.mapIt(getMultiAddr(getAddressAndPort(it[0]).ip, !it[1].externalPort))
(!self.updateAnnounceRecords)(announceMappedRecords)
if announceRecords.isSome:
let announceMappedRecords = zip(
!announceRecords, mappedPorts[0 .. (!announceRecords).len - 1]
)
.mapIt(getMultiAddr(getAddressAndPort(it[0]).ip, !it[1].externalPort))
{.gcsafe.}:
(!self.updateAnnounceRecords)(announceMappedRecords)
let discoveryMappedRecords = zip(
discoveryRecords, mappedPorts[announceRecords.len .. ^1]
)
.mapIt(getMultiAddr(getAddressAndPort(it[0]).ip, !it[1].externalPort))
(!self.updateDiscoveryRecords)(discoveryMappedRecords)
if discoveryRecords.isSome:
let discoveryMappedRecords = zip(
!discoveryRecords,
mappedPorts[(mappedPorts.len - (!discoveryRecords).len) .. ^1],
)
.mapIt(getMultiAddr(getAddressAndPort(it[0]).ip, !it[1].externalPort))
{.gcsafe.}:
(!self.updateDiscoveryRecords)(discoveryMappedRecords)
return true
except ValueError as exc:
@ -80,7 +96,7 @@ proc startPortMapping(self: ReachabilityManager): bool =
proc getReachabilityHandler(manager: ReachabilityManager): StatusAndConfidenceHandler =
let statusAndConfidenceHandler = proc(
networkReachability: NetworkReachability, confidenceOpt: Opt[float]
): Future[void] {.async: (raises: [CancelledError]).} =
): Future[void] {.gcsafe, async: (raises: [CancelledError]).} =
if not manager.started:
warn "ReachabilityManager was not started, but we are already getting reachability updates! Ignoring..."
return
@ -101,8 +117,11 @@ proc getReachabilityHandler(manager: ReachabilityManager): StatusAndConfidenceHa
if networkReachability == NetworkReachability.NotReachable:
# Lets first start to expose port using port mapping protocols like NAT-PMP or UPnP
if manager.startPortMapping():
return # We exposed ports so we should be good!
if manager.portMapping.isAvailable():
debug "Port mapping available on the network"
if await manager.startPortMapping():
return # We exposed ports so we should be good!
info "No more options to become reachable"
@ -129,8 +148,10 @@ proc start*(
except CatchableError as exc:
info "Failed to dial bootstrap nodes", err = exc.msg
proc stop*(): Future[void] {.async: (raises: [CancelledError]).} =
stopPortMapping()
proc stop*(
self: ReachabilityManager
): Future[void] {.async: (raises: [CancelledError]).} =
await self.portMapping.stop()
self.started = false
proc getAutonatService*(self: ReachabilityManager): Service =

View File

@ -11,8 +11,6 @@ import pkg/codex/stores
import pkg/codex/blocktype as bt
import pkg/codex/blockexchange
import pkg/codex/systemclock
import pkg/codex/nat
import pkg/codex/utils/natutils
import pkg/codex/utils/safeasynciter
import pkg/codex/merkletree
import pkg/codex/manifest
@ -219,10 +217,6 @@ proc generateNodes*(
if config.enableBootstrap:
waitFor switch.peerInfo.update()
let (announceAddrs, discoveryAddrs) =
nattedAddress(switch.peerInfo.addrs, bindPort.Port)
blockDiscovery.updateAnnounceRecord(announceAddrs)
blockDiscovery.updateDhtRecord(discoveryAddrs)
if blockDiscovery.dhtRecord.isSome:
bootstrapNodes.add !blockDiscovery.dhtRecord