mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-07-02 14:29:46 +00:00
fix: fix portsShift feature (#4006)
* Fold portsShift into each port in WakuConfBuilder.build() * Drop portsShift field from WakuConf * Drop portsShift param from networkConfiguration, discv5 setup, rest/metrics start * Keep Port(0) auto-assign sentinel (also fixes port-0 + shift) * Add regression test: announced port == bound port under portsShift * Fix lint
This commit is contained in:
parent
a7f893555d
commit
a763a59ac9
@ -413,7 +413,6 @@ proc setupDiscoveryV5*(
|
||||
rng: crypto.Rng,
|
||||
key: crypto.PrivateKey,
|
||||
p2pListenAddress: IpAddress,
|
||||
portsShift: uint16,
|
||||
): Result[WakuDiscoveryV5, string] =
|
||||
## Public only for testing. Callers should use `setupAndStartDiscv5`, which
|
||||
## additionally handles `udpPort == 0` via auto-port retry.
|
||||
@ -443,7 +442,7 @@ proc setupDiscoveryV5*(
|
||||
let discv5Config =
|
||||
DiscoveryConfig.init(conf.tableIpLimit, conf.bucketIpLimit, conf.bitsPerHop)
|
||||
|
||||
let discv5UdpPort = Port(uint16(conf.udpPort) + portsShift)
|
||||
let discv5UdpPort = conf.udpPort
|
||||
|
||||
let discv5Conf = WakuDiscoveryV5Config(
|
||||
discv5Config: some(discv5Config),
|
||||
@ -469,7 +468,6 @@ proc setupAndStartDiscv5*(
|
||||
rng: crypto.Rng,
|
||||
key: crypto.PrivateKey,
|
||||
p2pListenAddress: IpAddress,
|
||||
portsShift: uint16,
|
||||
): Future[Result[WakuDiscoveryV5, string]] {.async: (raises: []).} =
|
||||
## Construct and start a `WakuDiscoveryV5` instance, handling auto-port
|
||||
## retry when the caller asks for `udpPort == 0`.
|
||||
@ -480,7 +478,7 @@ proc setupAndStartDiscv5*(
|
||||
c.udpPort = port
|
||||
let wd = setupDiscoveryV5(
|
||||
myENR, nodePeerManager, nodeTopicSubscriptionQueue, c, dynamicBootstrapNodes, rng,
|
||||
key, p2pListenAddress, portsShift,
|
||||
key, p2pListenAddress,
|
||||
).valueOr:
|
||||
return err(error)
|
||||
let startRes = await wd.start()
|
||||
|
||||
@ -529,6 +529,13 @@ proc enforceSecurityConstraints(builder: WakuConfBuilder): Result[void, string]
|
||||
|
||||
ok()
|
||||
|
||||
func resolvePortsShift(configured: Port, portsShift: uint16): Port =
|
||||
## Fold portsShift into a configured port. Port(0) (auto-assign) is left as-is.
|
||||
if configured == Port(0):
|
||||
configured
|
||||
else:
|
||||
Port(configured.uint16 + portsShift)
|
||||
|
||||
proc build*(
|
||||
builder: var WakuConfBuilder, rng: crypto.Rng = crypto.newRng()
|
||||
): Result[WakuConf, string] =
|
||||
@ -624,7 +631,7 @@ proc build*(
|
||||
let contentTopics = builder.contentTopics.get(@[])
|
||||
|
||||
# Build sub-configs
|
||||
let discv5Conf = builder.discv5Conf.build().valueOr:
|
||||
var discv5Conf = builder.discv5Conf.build().valueOr:
|
||||
return err("Discv5 Conf building failed: " & $error)
|
||||
|
||||
let dnsDiscoveryConf = builder.dnsDiscoveryConf.build().valueOr:
|
||||
@ -633,10 +640,10 @@ proc build*(
|
||||
let filterServiceConf = builder.filterServiceConf.build().valueOr:
|
||||
return err("Filter Service Conf building failed: " & $error)
|
||||
|
||||
let metricsServerConf = builder.metricsServerConf.build().valueOr:
|
||||
var metricsServerConf = builder.metricsServerConf.build().valueOr:
|
||||
return err("Metrics Server Conf building failed: " & $error)
|
||||
|
||||
let restServerConf = builder.restServerConf.build().valueOr:
|
||||
var restServerConf = builder.restServerConf.build().valueOr:
|
||||
return err("REST Server Conf building failed: " & $error)
|
||||
|
||||
let rlnRelayConf = builder.rlnRelayConf.build().valueOr:
|
||||
@ -648,10 +655,10 @@ proc build*(
|
||||
let mixConf = builder.mixConf.build().valueOr:
|
||||
return err("Mix Conf building failed: " & $error)
|
||||
|
||||
let webSocketConf = builder.webSocketConf.build().valueOr:
|
||||
var webSocketConf = builder.webSocketConf.build().valueOr:
|
||||
return err("WebSocket Conf building failed: " & $error)
|
||||
|
||||
let quicConf = builder.quicConf.build().valueOr:
|
||||
var quicConf = builder.quicConf.build().valueOr:
|
||||
return err("QUIC Conf building failed: " & $error)
|
||||
|
||||
let rateLimit = builder.rateLimitConf.build().valueOr:
|
||||
@ -683,7 +690,7 @@ proc build*(
|
||||
warn "Nat Strategy is not specified, defaulting to none"
|
||||
DefaultNatStrategy
|
||||
|
||||
let p2pTcpPort = builder.p2pTcpPort.get(DefaultP2pTcpPort)
|
||||
var p2pTcpPort = builder.p2pTcpPort.get(DefaultP2pTcpPort)
|
||||
|
||||
let p2pListenAddress =
|
||||
if builder.p2pListenAddress.isSome():
|
||||
@ -765,6 +772,20 @@ proc build*(
|
||||
mix = mix,
|
||||
)
|
||||
|
||||
# portsShift is consumed here, WakuConf carries final bind ports.
|
||||
p2pTcpPort = resolvePortsShift(p2pTcpPort, portsShift)
|
||||
if webSocketConf.isSome():
|
||||
webSocketConf.get().port = resolvePortsShift(webSocketConf.get().port, portsShift)
|
||||
if quicConf.isSome():
|
||||
quicConf.get().port = resolvePortsShift(quicConf.get().port, portsShift)
|
||||
if discv5Conf.isSome():
|
||||
discv5Conf.get().udpPort = resolvePortsShift(discv5Conf.get().udpPort, portsShift)
|
||||
if restServerConf.isSome():
|
||||
restServerConf.get().port = resolvePortsShift(restServerConf.get().port, portsShift)
|
||||
if metricsServerConf.isSome():
|
||||
metricsServerConf.get().httpPort =
|
||||
resolvePortsShift(metricsServerConf.get().httpPort, portsShift)
|
||||
|
||||
let wakuConf = WakuConf(
|
||||
# confs
|
||||
storeServiceConf: storeServiceConf,
|
||||
@ -806,7 +827,6 @@ proc build*(
|
||||
extMultiAddrs: extMultiAddrs,
|
||||
extMultiAddrsOnly: extMultiAddrsOnly,
|
||||
),
|
||||
portsShift: portsShift,
|
||||
webSocketConf: webSocketConf,
|
||||
quicConf: quicConf,
|
||||
dnsAddrsNameServers: dnsAddrsNameServers,
|
||||
|
||||
@ -90,15 +90,14 @@ proc networkConfiguration*(
|
||||
quicConf: Option[QuicConf],
|
||||
wakuFlags: CapabilitiesBitfield,
|
||||
dnsAddrsNameServers: seq[IpAddress],
|
||||
portsShift: uint16,
|
||||
clientId: string,
|
||||
): Future[NetConfigResult] {.async.} =
|
||||
let tcpBindPort = Port(uint16(conf.p2pTcpPort) + portsShift)
|
||||
let tcpBindPort = conf.p2pTcpPort
|
||||
|
||||
let (quicEnabled, quicBindPort) =
|
||||
if quicConf.isSome():
|
||||
let qConf = quicConf.get()
|
||||
(true, some(Port(qConf.port.uint16 + portsShift)))
|
||||
(true, some(qConf.port))
|
||||
else:
|
||||
(false, none(Port))
|
||||
|
||||
@ -111,7 +110,7 @@ proc networkConfiguration*(
|
||||
let
|
||||
discv5UdpPort =
|
||||
if discv5Conf.isSome():
|
||||
some(Port(uint16(discv5Conf.get().udpPort) + portsShift))
|
||||
some(discv5Conf.get().udpPort)
|
||||
else:
|
||||
none(Port)
|
||||
|
||||
@ -145,7 +144,7 @@ proc networkConfiguration*(
|
||||
let (wsEnabled, wsBindPort, wssEnabled) =
|
||||
if webSocketConf.isSome:
|
||||
let wsConf = webSocketConf.get()
|
||||
(true, some(Port(wsConf.port.uint16 + portsShift)), wsConf.secureConf.isSome)
|
||||
(true, some(wsConf.port), wsConf.secureConf.isSome)
|
||||
else:
|
||||
(false, none(Port), false)
|
||||
|
||||
|
||||
@ -462,7 +462,7 @@ proc setupNode*(
|
||||
await networkConfiguration(
|
||||
wakuConf.clusterId, wakuConf.endpointConf, wakuConf.discv5Conf,
|
||||
wakuConf.webSocketConf, wakuConf.quicConf, wakuConf.wakuFlags,
|
||||
wakuConf.dnsAddrsNameServers, wakuConf.portsShift, clientId,
|
||||
wakuConf.dnsAddrsNameServers, clientId,
|
||||
)
|
||||
).valueOr:
|
||||
error "failed to create internal config", error = error
|
||||
|
||||
@ -121,7 +121,6 @@ type WakuConf* {.requiresInit.} = ref object
|
||||
mixConf*: Option[MixConf]
|
||||
kademliaDiscoveryConf*: Option[KademliaDiscoveryConf]
|
||||
|
||||
portsShift*: uint16
|
||||
dnsAddrsNameServers*: seq[IpAddress]
|
||||
endpointConf*: EndpointConf
|
||||
wakuFlags*: CapabilitiesBitfield
|
||||
|
||||
@ -86,13 +86,9 @@ proc startMetricsServer(
|
||||
return ok(started)
|
||||
|
||||
proc startMetricsServerAndLogging*(
|
||||
conf: MetricsServerConf, portsShift: uint16
|
||||
conf: MetricsServerConf
|
||||
): Future[Result[StartedMetricsServer, string]] {.async.} =
|
||||
let started = (
|
||||
await (
|
||||
startMetricsServer(conf.httpAddress, Port(conf.httpPort.uint16 + portsShift))
|
||||
)
|
||||
).valueOr:
|
||||
let started = (await (startMetricsServer(conf.httpAddress, conf.httpPort))).valueOr:
|
||||
return err("Starting metrics server failed. Continuing in current state:" & $error)
|
||||
|
||||
if conf.logging:
|
||||
|
||||
@ -41,7 +41,7 @@ type RestServerConf* = object
|
||||
relayCacheCapacity*: uint32
|
||||
|
||||
proc startRestServerEssentials*(
|
||||
nodeHealthMonitor: NodeHealthMonitor, conf: RestServerConf, portsShift: uint16
|
||||
nodeHealthMonitor: NodeHealthMonitor, conf: RestServerConf
|
||||
): Result[WakuRestServerRef, string] =
|
||||
if restServerNotInstalledTab.isNil:
|
||||
restServerNotInstalledTab = newTable[string, string]()
|
||||
@ -87,7 +87,7 @@ proc startRestServerEssentials*(
|
||||
none(string)
|
||||
|
||||
let address = conf.listenAddress
|
||||
let port = Port(conf.port.uint16 + portsShift)
|
||||
let port = conf.port
|
||||
let server = ?newRestHttpServer(
|
||||
address,
|
||||
port,
|
||||
|
||||
@ -214,7 +214,7 @@ proc new*(
|
||||
let restServer: WakuRestServerRef =
|
||||
if wakuConf.restServerConf.isSome():
|
||||
let restServer = startRestServerEssentials(
|
||||
healthMonitor, wakuConf.restServerConf.get(), wakuConf.portsShift
|
||||
healthMonitor, wakuConf.restServerConf.get()
|
||||
).valueOr:
|
||||
error "Starting essential REST server failed", error = $error
|
||||
return err("Failed to start essential REST server in Waku.new: " & $error)
|
||||
@ -290,11 +290,11 @@ proc getRunningNetConfig(waku: Waku): Future[Result[NetConfig, string]] {.async.
|
||||
if quicPort.isSome() and conf.quicConf.isSome():
|
||||
conf.quicConf.get().port = quicPort.get()
|
||||
|
||||
# Rebuild NetConfig with bound port values
|
||||
# Rebuild NetConfig from the bound ports already read back into `conf`.
|
||||
let netConf = (
|
||||
await networkConfiguration(
|
||||
conf.clusterId, conf.endpointConf, conf.discv5Conf, conf.webSocketConf,
|
||||
conf.quicConf, conf.wakuFlags, conf.dnsAddrsNameServers, conf.portsShift, clientId,
|
||||
conf.quicConf, conf.wakuFlags, conf.dnsAddrsNameServers, clientId,
|
||||
)
|
||||
).valueOr:
|
||||
return err("Could not update NetConfig: " & error)
|
||||
@ -443,7 +443,6 @@ proc start*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
|
||||
waku.rng,
|
||||
conf.nodeKey,
|
||||
conf.endpointConf.p2pListenAddress,
|
||||
conf.portsShift,
|
||||
)
|
||||
).valueOr:
|
||||
return err("failed to start waku discovery v5: " & error)
|
||||
@ -525,9 +524,7 @@ proc start*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
|
||||
if conf.metricsServerConf.isSome():
|
||||
try:
|
||||
let (server, port) = (
|
||||
await waku_metrics.startMetricsServerAndLogging(
|
||||
conf.metricsServerConf.get(), conf.portsShift
|
||||
)
|
||||
await waku_metrics.startMetricsServerAndLogging(conf.metricsServerConf.get())
|
||||
).valueOr:
|
||||
return err("Starting monitoring and external interfaces failed: " & error)
|
||||
waku.metricsServer = server
|
||||
|
||||
@ -156,10 +156,10 @@ suite "Auto-port retry":
|
||||
b.withHttpPort(port)
|
||||
b.build().value.get()
|
||||
|
||||
let failRes = await startMetricsServerAndLogging(buildMetricsConf(takenPort), 0'u16)
|
||||
let failRes = await startMetricsServerAndLogging(buildMetricsConf(takenPort))
|
||||
check failRes.isErr()
|
||||
|
||||
let okRes = await startMetricsServerAndLogging(buildMetricsConf(freePort), 0'u16)
|
||||
let okRes = await startMetricsServerAndLogging(buildMetricsConf(freePort))
|
||||
check okRes.isOk()
|
||||
if okRes.isOk():
|
||||
await okRes.get().server.close()
|
||||
@ -204,7 +204,6 @@ suite "Auto-port retry":
|
||||
node.rng,
|
||||
nodeKey,
|
||||
parseIpAddress("0.0.0.0"),
|
||||
0'u16,
|
||||
)
|
||||
check failRes.isErr()
|
||||
|
||||
@ -217,7 +216,6 @@ suite "Auto-port retry":
|
||||
node.rng,
|
||||
nodeKey,
|
||||
parseIpAddress("0.0.0.0"),
|
||||
0'u16,
|
||||
)
|
||||
check okRes.isOk()
|
||||
if okRes.isOk():
|
||||
|
||||
@ -505,7 +505,6 @@ suite "Waku Discovery v5":
|
||||
waku.rng,
|
||||
waku.conf.nodeKey,
|
||||
waku.conf.endpointConf.p2pListenAddress,
|
||||
waku.conf.portsShift,
|
||||
).valueOr:
|
||||
raiseAssert "failed setup discv5 in test: " & $error
|
||||
|
||||
@ -537,7 +536,6 @@ suite "Waku Discovery v5":
|
||||
waku.rng,
|
||||
waku.conf.nodeKey,
|
||||
waku.conf.endpointConf.p2pListenAddress,
|
||||
waku.conf.portsShift,
|
||||
).valueOr:
|
||||
raiseAssert "failed setup discv5 in test: " & $error
|
||||
|
||||
|
||||
@ -170,3 +170,42 @@ suite "Wakunode2 - Waku initialization":
|
||||
check:
|
||||
quicAddrs.len >= 1
|
||||
quicAddrs.allIt("/udp/0/quic-v1" notin $it)
|
||||
|
||||
test "portsShift is applied exactly once":
|
||||
# The announced port must equal the bound port, not bound + portsShift.
|
||||
const shift = 5'u16
|
||||
|
||||
# Reserve a free port, then set base = port - shift so base + shift binds onto it.
|
||||
let boundTarget = block:
|
||||
let sock = newSocket()
|
||||
defer:
|
||||
sock.close()
|
||||
sock.bindAddr(Port(0), "127.0.0.1")
|
||||
sock.getLocalAddr()[1]
|
||||
doAssert boundTarget.uint16 > shift, "ephemeral port unexpectedly low"
|
||||
|
||||
var builder = defaultTestWakuConfBuilder()
|
||||
builder.withP2pListenAddress(parseIpAddress("127.0.0.1"))
|
||||
builder.withP2pTcpPort(Port(boundTarget.uint16 - shift))
|
||||
builder.withPortsShift(shift)
|
||||
|
||||
let conf = builder.build().valueOr:
|
||||
raiseAssert error
|
||||
|
||||
var waku = (waitFor Waku.new(conf)).valueOr:
|
||||
raiseAssert error
|
||||
defer:
|
||||
(waitFor waku.stop()).isOkOr:
|
||||
raiseAssert error
|
||||
|
||||
(waitFor waku.start()).isOkOr:
|
||||
raiseAssert error
|
||||
|
||||
let typedEnr = waku.node.enr.toTyped().valueOr:
|
||||
raiseAssert $error
|
||||
let announcedTcp = typedEnr.tcp()
|
||||
|
||||
check:
|
||||
announcedTcp.isSome()
|
||||
waku.node.ports.tcp == boundTarget.uint16
|
||||
announcedTcp.get() == waku.node.ports.tcp
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user