mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-05-12 05:19:33 +00:00
Fixes from Ivan's review
* fix node info to use announcedAddresses again
* expose ports via Debug API ("MyBoundPorts")
* builders default to port 0 in discv5/metrics/p2pTcp
* add waku/net/bound_ports.nim
* move auto_port and net_config to waku/net/
* extract port-0 retry loop to tryWithAutoPort[T]
* misc refactors and fixes
This commit is contained in:
parent
abaae88df9
commit
2dc8176407
@ -10,7 +10,7 @@ import
|
||||
|
||||
import
|
||||
tests/testlib/[wakunode, wakucore],
|
||||
waku/[waku_node, common/auto_port, discovery/waku_discv5, node/waku_metrics],
|
||||
waku/[waku_node, net/auto_port, discovery/waku_discv5, node/waku_metrics],
|
||||
waku/factory/
|
||||
[node_factory, conf_builder/conf_builder, conf_builder/web_socket_conf_builder]
|
||||
|
||||
@ -89,6 +89,9 @@ suite "Auto-port retry":
|
||||
let takenPort = Port(55100)
|
||||
let freePort = Port(55101)
|
||||
let taken = createStreamServer(initTAddress("127.0.0.1", takenPort))
|
||||
defer:
|
||||
taken.stop()
|
||||
await taken.closeWait()
|
||||
|
||||
proc buildMetricsConf(port: Port): MetricsServerConf =
|
||||
var b = MetricsServerConfBuilder.init()
|
||||
@ -96,18 +99,13 @@ suite "Auto-port retry":
|
||||
b.withHttpPort(port)
|
||||
b.build().value.get()
|
||||
|
||||
try:
|
||||
let failRes =
|
||||
await startMetricsServerAndLogging(buildMetricsConf(takenPort), 0'u16)
|
||||
check failRes.isErr
|
||||
let failRes = await startMetricsServerAndLogging(buildMetricsConf(takenPort), 0'u16)
|
||||
check failRes.isErr()
|
||||
|
||||
let okRes = await startMetricsServerAndLogging(buildMetricsConf(freePort), 0'u16)
|
||||
check okRes.isOk
|
||||
if okRes.isOk:
|
||||
await okRes.get().server.close()
|
||||
finally:
|
||||
taken.stop()
|
||||
await taken.closeWait()
|
||||
let okRes = await startMetricsServerAndLogging(buildMetricsConf(freePort), 0'u16)
|
||||
check okRes.isOk()
|
||||
if okRes.isOk():
|
||||
await okRes.get().server.close()
|
||||
|
||||
asynctest "discv5 binds on free UDP port, fails on taken":
|
||||
let takenPort = Port(55200)
|
||||
@ -120,10 +118,14 @@ suite "Auto-port retry":
|
||||
|
||||
let takenUdp =
|
||||
newDatagramTransport(dummyCb, local = initTAddress("0.0.0.0", takenPort))
|
||||
defer:
|
||||
await takenUdp.closeWait()
|
||||
|
||||
let nodeKey = generateSecp256k1Key()
|
||||
let node = newTestWakuNode(nodeKey, parseIpAddress("0.0.0.0"), Port(0))
|
||||
await node.start()
|
||||
defer:
|
||||
await node.stop()
|
||||
|
||||
proc buildDiscv5Conf(port: Port): Discv5Conf =
|
||||
var b = Discv5ConfBuilder.init()
|
||||
@ -131,85 +133,30 @@ suite "Auto-port retry":
|
||||
b.withUdpPort(port)
|
||||
b.build().value.get()
|
||||
|
||||
try:
|
||||
let failRes = await setupAndStartDiscv5(
|
||||
node.enr,
|
||||
node.peerManager,
|
||||
node.topicSubscriptionQueue,
|
||||
buildDiscv5Conf(takenPort),
|
||||
@[],
|
||||
node.rng,
|
||||
nodeKey,
|
||||
parseIpAddress("0.0.0.0"),
|
||||
0'u16,
|
||||
)
|
||||
check failRes.isErr
|
||||
let failRes = await setupAndStartDiscv5(
|
||||
node.enr,
|
||||
node.peerManager,
|
||||
node.topicSubscriptionQueue,
|
||||
buildDiscv5Conf(takenPort),
|
||||
@[],
|
||||
node.rng,
|
||||
nodeKey,
|
||||
parseIpAddress("0.0.0.0"),
|
||||
0'u16,
|
||||
)
|
||||
check failRes.isErr()
|
||||
|
||||
let okRes = await setupAndStartDiscv5(
|
||||
node.enr,
|
||||
node.peerManager,
|
||||
node.topicSubscriptionQueue,
|
||||
buildDiscv5Conf(freePort),
|
||||
@[],
|
||||
node.rng,
|
||||
nodeKey,
|
||||
parseIpAddress("0.0.0.0"),
|
||||
0'u16,
|
||||
)
|
||||
check okRes.isOk
|
||||
if okRes.isOk:
|
||||
await okRes.get().stop()
|
||||
finally:
|
||||
await takenUdp.closeWait()
|
||||
await node.stop()
|
||||
|
||||
asynctest "exhausted retries err for metrics and discv5":
|
||||
let origMin = autoPortMin
|
||||
let origMax = autoPortMax
|
||||
let pinned = 58888'u16
|
||||
autoPortMin = pinned
|
||||
autoPortMax = pinned
|
||||
|
||||
let takenTcp = createStreamServer(initTAddress("127.0.0.1", Port(pinned)))
|
||||
|
||||
proc dummyCb(
|
||||
transp: DatagramTransport, raddr: TransportAddress
|
||||
): Future[void] {.async: (raises: []).} =
|
||||
discard
|
||||
|
||||
let takenUdp =
|
||||
newDatagramTransport(dummyCb, local = initTAddress("0.0.0.0", Port(pinned)))
|
||||
|
||||
let nodeKey = generateSecp256k1Key()
|
||||
let node = newTestWakuNode(nodeKey, parseIpAddress("0.0.0.0"), Port(0))
|
||||
await node.start()
|
||||
|
||||
try:
|
||||
var mb = MetricsServerConfBuilder.init()
|
||||
mb.withEnabled(true)
|
||||
mb.withHttpPort(0'u16)
|
||||
let metricsRes = await startMetricsServerAndLogging(mb.build().value.get(), 0'u16)
|
||||
check metricsRes.isErr
|
||||
|
||||
var db = Discv5ConfBuilder.init()
|
||||
db.withEnabled(true)
|
||||
db.withUdpPort(0'u16)
|
||||
let discv5Res = await setupAndStartDiscv5(
|
||||
node.enr,
|
||||
node.peerManager,
|
||||
node.topicSubscriptionQueue,
|
||||
db.build().value.get(),
|
||||
@[],
|
||||
node.rng,
|
||||
nodeKey,
|
||||
parseIpAddress("0.0.0.0"),
|
||||
0'u16,
|
||||
)
|
||||
check discv5Res.isErr
|
||||
finally:
|
||||
takenTcp.stop()
|
||||
await takenTcp.closeWait()
|
||||
await takenUdp.closeWait()
|
||||
await node.stop()
|
||||
autoPortMin = origMin
|
||||
autoPortMax = origMax
|
||||
let okRes = await setupAndStartDiscv5(
|
||||
node.enr,
|
||||
node.peerManager,
|
||||
node.topicSubscriptionQueue,
|
||||
buildDiscv5Conf(freePort),
|
||||
@[],
|
||||
node.rng,
|
||||
nodeKey,
|
||||
parseIpAddress("0.0.0.0"),
|
||||
0'u16,
|
||||
)
|
||||
check okRes.isOk()
|
||||
if okRes.isOk():
|
||||
await okRes.get().stop()
|
||||
|
||||
@ -18,7 +18,6 @@ suite "Waku Conf - build with cluster conf":
|
||||
## Setup
|
||||
let networkConf = NetworkConf.TheWakuNetworkConf()
|
||||
var builder = WakuConfBuilder.init()
|
||||
builder.withP2pTcpPort(0'u16)
|
||||
builder.discv5Conf.withUdpPort(9000)
|
||||
builder.withRelayServiceRatio("50:50")
|
||||
# Mount all shards in network
|
||||
@ -63,7 +62,6 @@ suite "Waku Conf - build with cluster conf":
|
||||
## Setup
|
||||
let networkConf = NetworkConf.TheWakuNetworkConf()
|
||||
var builder = WakuConfBuilder.init()
|
||||
builder.withP2pTcpPort(0'u16)
|
||||
builder.withRelayServiceRatio("50:50")
|
||||
builder.discv5Conf.withUdpPort(9000)
|
||||
# Mount all shards in network
|
||||
@ -97,8 +95,6 @@ suite "Waku Conf - build with cluster conf":
|
||||
## Setup
|
||||
let networkConf = NetworkConf.TheWakuNetworkConf()
|
||||
var builder = WakuConfBuilder.init()
|
||||
builder.withP2pTcpPort(0'u16)
|
||||
builder.discv5Conf.withUdpPort(0'u16)
|
||||
|
||||
let # Mount all shards in network
|
||||
expectedShards = toSeq[0.uint16 .. 7.uint16]
|
||||
@ -130,8 +126,6 @@ suite "Waku Conf - build with cluster conf":
|
||||
## Setup
|
||||
let networkConf = NetworkConf.TheWakuNetworkConf()
|
||||
var builder = WakuConfBuilder.init()
|
||||
builder.withP2pTcpPort(0'u16)
|
||||
builder.discv5Conf.withUdpPort(0'u16)
|
||||
let shards = @[2.uint16, 3.uint16]
|
||||
|
||||
## Given
|
||||
@ -177,8 +171,6 @@ suite "Waku Conf - build with cluster conf":
|
||||
## Setup
|
||||
let networkConf = NetworkConf.TheWakuNetworkConf()
|
||||
var builder = WakuConfBuilder.init()
|
||||
builder.withP2pTcpPort(0'u16)
|
||||
builder.discv5Conf.withUdpPort(0'u16)
|
||||
builder.rlnRelayConf.withEthClientUrls(@["https://my_eth_rpc_url/"])
|
||||
|
||||
# Mount all shards in network
|
||||
@ -225,8 +217,6 @@ suite "Waku Conf - build with cluster conf":
|
||||
## Setup
|
||||
let networkConf = NetworkConf.LogosDevConf()
|
||||
var builder = WakuConfBuilder.init()
|
||||
builder.withP2pTcpPort(0'u16)
|
||||
builder.discv5Conf.withUdpPort(0'u16)
|
||||
|
||||
# Sanity check
|
||||
check networkConf.shardingConf.kind == AutoSharding
|
||||
@ -256,8 +246,6 @@ suite "Waku Conf - build with cluster conf":
|
||||
## Given: emulate --preset=logos.dev --num-shards-in-network=0
|
||||
let networkConf = NetworkConf.LogosDevConf()
|
||||
var builder = WakuConfBuilder.init()
|
||||
builder.withP2pTcpPort(0'u16)
|
||||
builder.discv5Conf.withUdpPort(0'u16)
|
||||
builder.withNetworkConf(networkConf)
|
||||
# Note: builder.withNumShardsInCluster() is not called when the
|
||||
# value that comes from the CLI path is 0 (which means it was
|
||||
@ -277,7 +265,6 @@ suite "Waku Conf - node key":
|
||||
test "Node key is generated":
|
||||
## Setup
|
||||
var builder = WakuConfBuilder.init()
|
||||
builder.withP2pTcpPort(0'u16)
|
||||
builder.withClusterId(1)
|
||||
|
||||
## Given
|
||||
@ -301,7 +288,6 @@ suite "Waku Conf - node key":
|
||||
let key = SkPrivateKey.init(utils.fromHex(nodeKeyStr)).tryGet()
|
||||
crypto.PrivateKey(scheme: Secp256k1, skkey: key)
|
||||
var builder = WakuConfBuilder.init()
|
||||
builder.withP2pTcpPort(0'u16)
|
||||
builder.withClusterId(1)
|
||||
|
||||
## Given
|
||||
@ -323,7 +309,6 @@ suite "Waku Conf - extMultiaddrs":
|
||||
test "Valid multiaddresses are passed and accepted":
|
||||
## Setup
|
||||
var builder = WakuConfBuilder.init()
|
||||
builder.withP2pTcpPort(0'u16)
|
||||
builder.withClusterId(1)
|
||||
|
||||
## Given
|
||||
@ -361,66 +346,3 @@ suite "Waku Conf Builder - rate limits":
|
||||
|
||||
## Then
|
||||
assert res.isOk(), $res.error
|
||||
|
||||
suite "Waku Conf - port required":
|
||||
test "p2pTcpPort not specified returns err":
|
||||
## Setup: minimal builder with no withP2pTcpPort call
|
||||
var builder = WakuConfBuilder.init()
|
||||
|
||||
## When
|
||||
let res = builder.build()
|
||||
|
||||
## Then
|
||||
check res.isErr()
|
||||
check res.error == "p2pTcpPort is not specified"
|
||||
|
||||
test "discv5 enabled without udpPort returns err":
|
||||
## Setup
|
||||
var builder = WakuConfBuilder.init()
|
||||
builder.discv5Conf.withEnabled(true)
|
||||
|
||||
## When
|
||||
let res = builder.build()
|
||||
|
||||
## Then
|
||||
check res.isErr()
|
||||
check res.error == "Discv5 Conf building failed: discv5.udpPort is not specified"
|
||||
|
||||
test "metricsServer enabled without httpPort returns err":
|
||||
## Setup
|
||||
var builder = WakuConfBuilder.init()
|
||||
builder.metricsServerConf.withEnabled(true)
|
||||
|
||||
## When
|
||||
let res = builder.build()
|
||||
|
||||
## Then
|
||||
check res.isErr()
|
||||
check res.error ==
|
||||
"Metrics Server Conf building failed: metricsServer.httpPort is not specified"
|
||||
|
||||
test "restServer enabled without port returns err":
|
||||
## Setup: listenAddress must be set (checked before port)
|
||||
var builder = WakuConfBuilder.init()
|
||||
builder.restServerConf.withEnabled(true)
|
||||
builder.restServerConf.withListenAddress(parseIpAddress("127.0.0.1"))
|
||||
|
||||
## When
|
||||
let res = builder.build()
|
||||
|
||||
## Then
|
||||
check res.isErr()
|
||||
check res.error ==
|
||||
"REST Server Conf building failed: restServer.port is not specified"
|
||||
|
||||
test "webSocket enabled without port returns err":
|
||||
## Setup
|
||||
var builder = WakuConfBuilder.init()
|
||||
builder.webSocketConf.withEnabled(true)
|
||||
|
||||
## When
|
||||
let res = builder.build()
|
||||
|
||||
## Then
|
||||
check res.isErr()
|
||||
check res.error == "WebSocket Conf building failed: websocket.port is not specified"
|
||||
|
||||
@ -5,7 +5,7 @@ import chronos, confutils/toml/std/net, libp2p/multiaddress, testutils/unittests
|
||||
import ./testlib/wakunode, waku/waku_enr/capabilities
|
||||
|
||||
include
|
||||
waku/node/net_config,
|
||||
waku/net/net_config,
|
||||
waku/factory/conf_builder/web_socket_conf_builder,
|
||||
waku/factory/conf_builder/conf_builder
|
||||
|
||||
|
||||
@ -507,7 +507,7 @@ suite "Waku Discovery v5":
|
||||
waku.conf.endpointConf.p2pListenAddress,
|
||||
waku.conf.portsShift,
|
||||
).valueOr:
|
||||
raiseAssert error
|
||||
raiseAssert "failed setup discv5 in test: " & $error
|
||||
|
||||
check:
|
||||
waku.node.peerManager.switch.peerStore.peers().anyIt(
|
||||
@ -539,7 +539,7 @@ suite "Waku Discovery v5":
|
||||
waku.conf.endpointConf.p2pListenAddress,
|
||||
waku.conf.portsShift,
|
||||
).valueOr:
|
||||
raiseAssert error
|
||||
raiseAssert "failed setup discv5 in test: " & $error
|
||||
|
||||
check:
|
||||
not waku.node.peerManager.switch.peerStore.peers().anyIt(
|
||||
|
||||
@ -1,14 +1,13 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/json,
|
||||
testutils/unittests,
|
||||
chronicles,
|
||||
chronos,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/crypto/secp,
|
||||
libp2p/multiaddress,
|
||||
libp2p/switch
|
||||
import ../testlib/wakucore, ../testlib/wakunode
|
||||
libp2p/[crypto/crypto, crypto/secp, multiaddress, switch],
|
||||
tests/testlib/[wakucore, wakunode],
|
||||
waku/factory/conf_builder/conf_builder
|
||||
|
||||
include waku/factory/waku, waku/common/enr/typed_record
|
||||
|
||||
@ -99,3 +98,36 @@ suite "Wakunode2 - Waku initialization":
|
||||
## Cleanup
|
||||
(waitFor waku.stop()).isOkOr:
|
||||
raiseAssert error
|
||||
|
||||
test "MyBoundPorts should report a bound port for each enabled service":
|
||||
var builder = defaultTestWakuConfBuilder()
|
||||
builder.discv5Conf.withEnabled(true)
|
||||
builder.restServerConf.withEnabled(true)
|
||||
builder.restServerConf.withPort(Port(0))
|
||||
builder.restServerConf.withRelayCacheCapacity(50'u32)
|
||||
builder.metricsServerConf.withEnabled(true)
|
||||
builder.webSocketConf.withEnabled(true)
|
||||
builder.webSocketConf.withWebSocketPort(Port(0))
|
||||
|
||||
let conf = builder.build().valueOr:
|
||||
raiseAssert error
|
||||
|
||||
var waku = (waitFor Waku.new(conf)).valueOr:
|
||||
raiseAssert error
|
||||
defer:
|
||||
(waitFor waku.stop()).isOkOr:
|
||||
raiseAssert error
|
||||
|
||||
(waitFor startWaku(addr waku)).isOkOr:
|
||||
raiseAssert error
|
||||
|
||||
let portsJson = waku.stateInfo.getNodeInfoItem(NodeInfoId.MyBoundPorts)
|
||||
let parsed = parseJson(portsJson)
|
||||
|
||||
check:
|
||||
parsed.kind == JObject
|
||||
parsed["tcp"].getInt() != 0
|
||||
parsed["webSocket"].getInt() != 0
|
||||
parsed["rest"].getInt() != 0
|
||||
parsed["discv5Udp"].getInt() != 0
|
||||
parsed["metrics"].getInt() != 0
|
||||
|
||||
@ -75,6 +75,10 @@ suite "Waku v2 REST API - Debug":
|
||||
|
||||
let restAddress = parseIpAddress("0.0.0.0")
|
||||
let restServer = WakuRestServerRef.init(restAddress, Port(0)).tryGet()
|
||||
defer:
|
||||
await restServer.stop()
|
||||
await restServer.closeWait()
|
||||
|
||||
installDebugApiHandlers(restServer.router, node)
|
||||
restServer.start()
|
||||
|
||||
@ -90,9 +94,6 @@ suite "Waku v2 REST API - Debug":
|
||||
response.data.ports.discv5Udp == some(1004'u16)
|
||||
response.data.ports.metrics == some(1005'u16)
|
||||
|
||||
await restServer.stop()
|
||||
await restServer.closeWait()
|
||||
|
||||
asyncTest "Get node version - GET /version":
|
||||
# Given
|
||||
let node = testWakuNode()
|
||||
|
||||
@ -192,7 +192,9 @@ type WakuNodeConf* = object
|
||||
name: "listen-address"
|
||||
.}: IpAddress
|
||||
|
||||
tcpPort* {.desc: "TCP listening port.", defaultValue: 0, name: "tcp-port".}: uint16
|
||||
tcpPort* {.
|
||||
desc: "TCP listening port. 0 = auto-assign.", defaultValue: 0, name: "tcp-port"
|
||||
.}: uint16
|
||||
|
||||
portsShift* {.
|
||||
desc: "Add a shift to all port numbers.", defaultValue: 0, name: "ports-shift"
|
||||
@ -488,7 +490,7 @@ with the drawback of consuming some more bandwidth.""",
|
||||
.}: IpAddress
|
||||
|
||||
restPort* {.
|
||||
desc: "Listening port of the REST HTTP server.",
|
||||
desc: "Listening port of the REST HTTP server. 0 = auto-assign.",
|
||||
defaultValue: 0,
|
||||
name: "rest-port"
|
||||
.}: uint16
|
||||
@ -529,7 +531,7 @@ with the drawback of consuming some more bandwidth.""",
|
||||
.}: IpAddress
|
||||
|
||||
metricsServerPort* {.
|
||||
desc: "Listening HTTP port of the metrics server.",
|
||||
desc: "Listening HTTP port of the metrics server. 0 = auto-assign.",
|
||||
defaultValue: 0,
|
||||
name: "metrics-server-port"
|
||||
.}: uint16
|
||||
@ -563,7 +565,7 @@ with the drawback of consuming some more bandwidth.""",
|
||||
.}: Option[bool]
|
||||
|
||||
discv5UdpPort* {.
|
||||
desc: "Listening UDP port for Node Discovery v5.",
|
||||
desc: "Listening UDP port for Node Discovery v5. 0 = auto-assign.",
|
||||
defaultValue: 0,
|
||||
name: "discv5-udp-port"
|
||||
.}: uint16
|
||||
@ -663,7 +665,9 @@ with the drawback of consuming some more bandwidth.""",
|
||||
.}: bool
|
||||
|
||||
websocketPort* {.
|
||||
desc: "WebSocket listening port.", defaultValue: 0, name: "websocket-port"
|
||||
desc: "WebSocket listening port. 0 = auto-assign.",
|
||||
defaultValue: 0,
|
||||
name: "websocket-port"
|
||||
.}: uint16
|
||||
|
||||
websocketSecureSupport* {.
|
||||
|
||||
@ -1,13 +0,0 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/random
|
||||
|
||||
const AutoPortRetryCount* = 20
|
||||
|
||||
var
|
||||
autoPortMin* = 50000'u16
|
||||
autoPortMax* = 59000'u16
|
||||
rng = initRand()
|
||||
|
||||
proc getAutoPort*(): uint16 =
|
||||
uint16(rng.rand(autoPortMin.int .. autoPortMax.int))
|
||||
@ -10,7 +10,7 @@ import
|
||||
eth/keys as eth_keys,
|
||||
eth/p2p/discoveryv5/node,
|
||||
eth/p2p/discoveryv5/protocol
|
||||
import waku/[common/auto_port, node/peer_manager/peer_manager, waku_core, waku_enr]
|
||||
import waku/[net/auto_port, node/peer_manager/peer_manager, waku_core, waku_enr]
|
||||
|
||||
export protocol, waku_enr
|
||||
|
||||
@ -447,7 +447,7 @@ proc setupDiscoveryV5*(
|
||||
autoupdateRecord: conf.enrAutoUpdate,
|
||||
)
|
||||
|
||||
ok(
|
||||
return ok(
|
||||
WakuDiscoveryV5.new(
|
||||
rng, discv5Conf, some(myENR), some(nodePeerManager), nodeTopicSubscriptionQueue
|
||||
)
|
||||
@ -466,29 +466,24 @@ proc setupAndStartDiscv5*(
|
||||
): Future[Result[WakuDiscoveryV5, string]] {.async: (raises: []).} =
|
||||
## Construct and start a `WakuDiscoveryV5` instance, handling auto-port
|
||||
## retry when the caller asks for `udpPort == 0`.
|
||||
var c = conf
|
||||
let autoMode = c.udpPort == Port(0)
|
||||
let attempts = if autoMode: AutoPortRetryCount else: 1
|
||||
var lastErr = ""
|
||||
|
||||
for attempt in 1 .. attempts:
|
||||
if autoMode:
|
||||
c.udpPort = Port(getAutoPort())
|
||||
|
||||
proc attempt(
|
||||
port: Port
|
||||
): Future[Result[WakuDiscoveryV5, string]] {.async: (raises: []).} =
|
||||
var c = conf
|
||||
c.udpPort = port
|
||||
let wd = setupDiscoveryV5(
|
||||
myENR, nodePeerManager, nodeTopicSubscriptionQueue, c, dynamicBootstrapNodes, rng,
|
||||
key, p2pListenAddress, portsShift,
|
||||
).valueOr:
|
||||
return err(error)
|
||||
|
||||
let startRes = await wd.start()
|
||||
if startRes.isOk():
|
||||
return ok(wd)
|
||||
lastErr = startRes.error
|
||||
if startRes.isErr():
|
||||
return err(startRes.error)
|
||||
return ok(wd)
|
||||
|
||||
if autoMode:
|
||||
return err("discv5: auto-port bind exhausted; last error: " & lastErr)
|
||||
return err(lastErr)
|
||||
let wd = (await tryWithAutoPort[WakuDiscoveryV5](conf.udpPort, attempt)).valueOr:
|
||||
return err("setupAndStartDiscv5: " & error)
|
||||
return ok(wd)
|
||||
|
||||
proc udpPort*(wd: WakuDiscoveryV5): Port =
|
||||
wd.conf.port
|
||||
|
||||
@ -49,9 +49,6 @@ proc build*(b: Discv5ConfBuilder): Result[Option[Discv5Conf], string] =
|
||||
if not b.enabled.get(false):
|
||||
return ok(none(Discv5Conf))
|
||||
|
||||
if b.udpPort.isNone():
|
||||
return err("discv5.udpPort is not specified")
|
||||
|
||||
return ok(
|
||||
some(
|
||||
Discv5Conf(
|
||||
@ -60,7 +57,7 @@ proc build*(b: Discv5ConfBuilder): Result[Option[Discv5Conf], string] =
|
||||
bucketIpLimit: b.bucketIpLimit.get(2),
|
||||
enrAutoUpdate: b.enrAutoUpdate.get(true),
|
||||
tableIpLimit: b.tableIpLimit.get(10),
|
||||
udpPort: b.udpPort.get(),
|
||||
udpPort: b.udpPort.get(Port(0)),
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
@ -36,14 +36,11 @@ proc build*(b: MetricsServerConfBuilder): Result[Option[MetricsServerConf], stri
|
||||
if not b.enabled.get(false):
|
||||
return ok(none(MetricsServerConf))
|
||||
|
||||
if b.httpPort.isNone():
|
||||
return err("metricsServer.httpPort is not specified")
|
||||
|
||||
return ok(
|
||||
some(
|
||||
MetricsServerConf(
|
||||
httpAddress: b.httpAddress.get(static parseIpAddress("127.0.0.1")),
|
||||
httpPort: b.httpPort.get(),
|
||||
httpPort: b.httpPort.get(Port(0)),
|
||||
logging: b.logging.get(false),
|
||||
)
|
||||
)
|
||||
|
||||
@ -576,9 +576,7 @@ proc build*(
|
||||
warn "Nat Strategy is not specified, defaulting to none"
|
||||
"none"
|
||||
|
||||
if builder.p2pTcpPort.isNone():
|
||||
return err("p2pTcpPort is not specified")
|
||||
let p2pTcpPort = builder.p2pTcpPort.get()
|
||||
let p2pTcpPort = builder.p2pTcpPort.get(Port(0))
|
||||
|
||||
let p2pListenAddress =
|
||||
if builder.p2pListenAddress.isSome():
|
||||
|
||||
@ -8,7 +8,7 @@ import
|
||||
std/[options, sequtils, net],
|
||||
results
|
||||
|
||||
import ../common/utils/nat, ../node/net_config, ../waku_enr, ../waku_core, ./waku_conf
|
||||
import waku/[common/utils/nat, net/net_config, waku_enr, waku_core], ./waku_conf
|
||||
|
||||
proc enrConfiguration*(
|
||||
conf: WakuConf, netConfig: NetConfig
|
||||
|
||||
@ -6,7 +6,7 @@
|
||||
|
||||
import std/[tables, sequtils, strutils]
|
||||
import metrics, eth/p2p/discoveryv5/enr, libp2p/peerid
|
||||
import waku/waku_node
|
||||
import waku/[waku_node, net/bound_ports]
|
||||
|
||||
type
|
||||
NodeInfoId* {.pure.} = enum
|
||||
@ -15,6 +15,7 @@ type
|
||||
MyMultiaddresses
|
||||
MyENR
|
||||
MyPeerId
|
||||
MyBoundPorts
|
||||
|
||||
WakuStateInfo* {.requiresInit.} = object
|
||||
node: WakuNode
|
||||
@ -43,6 +44,8 @@ proc getNodeInfoItem*(self: WakuStateInfo, infoItemId: NodeInfoId): string =
|
||||
return self.node.enr.toURI()
|
||||
of NodeInfoId.MyPeerId:
|
||||
return $PeerId(self.node.peerId())
|
||||
of NodeInfoId.MyBoundPorts:
|
||||
return self.node.ports.toJsonString()
|
||||
else:
|
||||
return "unknown info item id"
|
||||
|
||||
|
||||
38
waku/net/auto_port.nim
Normal file
38
waku/net/auto_port.nim
Normal file
@ -0,0 +1,38 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[net, random]
|
||||
import chronos, results
|
||||
|
||||
const
|
||||
AutoPortRetryCount* = 20
|
||||
AutoPortMin = 50000'u16
|
||||
AutoPortMax = 59000'u16
|
||||
|
||||
var rng = initRand()
|
||||
|
||||
proc getAutoPort*(): uint16 =
|
||||
uint16(rng.rand(AutoPortMin.int .. AutoPortMax.int))
|
||||
|
||||
proc tryWithAutoPort*[T](
|
||||
startingPort: Port,
|
||||
attempt: proc(p: Port): Future[Result[T, string]] {.async: (raises: []).},
|
||||
): Future[Result[T, string]] {.async: (raises: []).} =
|
||||
## If `startingPort == Port(0)`, call `attempt` up to `AutoPortRetryCount`
|
||||
## times with random ports. Otherwise call it once with `startingPort`.
|
||||
## Returns the first ok or the last err.
|
||||
let autoMode = startingPort == Port(0)
|
||||
let attempts = if autoMode: AutoPortRetryCount else: 1
|
||||
var lastErr = ""
|
||||
for i in 1 .. attempts:
|
||||
let port =
|
||||
if autoMode:
|
||||
Port(getAutoPort())
|
||||
else:
|
||||
startingPort
|
||||
let res = await attempt(port)
|
||||
if res.isOk():
|
||||
return ok(res.get())
|
||||
lastErr = res.error
|
||||
if autoMode:
|
||||
return err("auto-port exhausted; last error: " & lastErr)
|
||||
return err(lastErr)
|
||||
27
waku/net/bound_ports.nim
Normal file
27
waku/net/bound_ports.nim
Normal file
@ -0,0 +1,27 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[json, options]
|
||||
|
||||
type BoundPorts* {.requiresInit.} = object
|
||||
## Set by the factory once each service has bound to a port.
|
||||
tcp*: Option[uint16]
|
||||
webSocket*: Option[uint16]
|
||||
rest*: Option[uint16]
|
||||
discv5Udp*: Option[uint16]
|
||||
metrics*: Option[uint16]
|
||||
|
||||
proc init*(T: type BoundPorts): BoundPorts =
|
||||
BoundPorts(
|
||||
tcp: none(uint16),
|
||||
webSocket: none(uint16),
|
||||
rest: none(uint16),
|
||||
discv5Udp: none(uint16),
|
||||
metrics: none(uint16),
|
||||
)
|
||||
|
||||
proc toJsonString*(p: BoundPorts): string =
|
||||
var obj = newJObject()
|
||||
for name, value in fieldPairs(p):
|
||||
if value.isSome():
|
||||
obj[name] = %value.get()
|
||||
return $obj
|
||||
@ -2,8 +2,7 @@
|
||||
|
||||
import chronicles, chronos, metrics, metrics/chronos_httpserver
|
||||
import
|
||||
waku/
|
||||
[common/auto_port, waku_rln_relay/protocol_metrics as rln_metrics, utils/collector],
|
||||
waku/[net/auto_port, waku_rln_relay/protocol_metrics as rln_metrics, utils/collector],
|
||||
./peer_manager,
|
||||
./waku_node
|
||||
|
||||
@ -62,34 +61,25 @@ type StartedMetricsServer* = tuple[server: MetricsHttpServerRef, port: Port]
|
||||
proc startMetricsServer(
|
||||
serverIp: IpAddress, serverPort: Port
|
||||
): Future[Result[StartedMetricsServer, string]] {.async.} =
|
||||
let autoMode = serverPort == Port(0)
|
||||
let attempts = if autoMode: AutoPortRetryCount else: 1
|
||||
var lastErr = ""
|
||||
|
||||
for attempt in 1 .. attempts:
|
||||
let port =
|
||||
if autoMode:
|
||||
Port(getAutoPort())
|
||||
else:
|
||||
serverPort
|
||||
proc attempt(
|
||||
port: Port
|
||||
): Future[Result[StartedMetricsServer, string]] {.async: (raises: []).} =
|
||||
info "Starting metrics HTTP server", serverIp = $serverIp, serverPort = $port
|
||||
|
||||
let server = MetricsHttpServerRef.new($serverIp, port).valueOr:
|
||||
lastErr = $error
|
||||
continue
|
||||
return err($error)
|
||||
|
||||
try:
|
||||
await server.start()
|
||||
except CatchableError:
|
||||
lastErr = getCurrentExceptionMsg()
|
||||
continue
|
||||
return err(getCurrentExceptionMsg())
|
||||
|
||||
info "Metrics HTTP server started", serverIp = $serverIp, serverPort = $port
|
||||
return ok((server: server, port: port))
|
||||
|
||||
if autoMode:
|
||||
return err("metrics HTTP server: auto-port bind exhausted; last error: " & lastErr)
|
||||
return err("metrics HTTP server start failed: " & lastErr)
|
||||
let started = (await tryWithAutoPort[StartedMetricsServer](serverPort, attempt)).valueOr:
|
||||
return err("metrics HTTP server start failed: " & error)
|
||||
return ok(started)
|
||||
|
||||
proc startMetricsServerAndLogging*(
|
||||
conf: MetricsServerConf, portsShift: uint16
|
||||
|
||||
@ -62,7 +62,7 @@ import
|
||||
events/message_events,
|
||||
],
|
||||
waku/discovery/waku_kademlia,
|
||||
./net_config,
|
||||
waku/net/[bound_ports, net_config],
|
||||
./peer_manager,
|
||||
./health_monitor/health_status,
|
||||
./health_monitor/topic_health
|
||||
@ -96,13 +96,6 @@ const WakuNodeVersionString* = "version / git commit hash: " & git_version
|
||||
|
||||
# key and crypto modules different
|
||||
type
|
||||
BoundPorts* = object ## Set by the factory once each service has bound to a port.
|
||||
tcp*: Option[uint16]
|
||||
webSocket*: Option[uint16]
|
||||
rest*: Option[uint16]
|
||||
discv5Udp*: Option[uint16]
|
||||
metrics*: Option[uint16]
|
||||
|
||||
# TODO: Move to application instance (e.g., `WakuNode2`)
|
||||
WakuInfo* = object # NOTE One for simplicity, can extend later as needed
|
||||
listenAddresses*: seq[string]
|
||||
@ -233,6 +226,7 @@ proc new*(
|
||||
announcedAddresses: netConfig.announcedAddresses,
|
||||
topicSubscriptionQueue: queue,
|
||||
rateLimitSettings: rateLimitSettings,
|
||||
ports: BoundPorts.init(),
|
||||
)
|
||||
|
||||
peerManager.setShardGetter(node.getShardsGetter(@[]))
|
||||
@ -253,9 +247,7 @@ proc info*(node: WakuNode): WakuInfo =
|
||||
let peerInfo = node.switch.peerInfo
|
||||
|
||||
var listenStr: seq[string]
|
||||
# Post-bind: when a transport was given port=0, this reflects the real
|
||||
# OS-assigned port rather than the pre-bind configured 0.
|
||||
for address in peerInfo.listenAddrs:
|
||||
for address in node.announcedAddresses:
|
||||
var fulladdr = $address & "/p2p/" & $peerInfo.peerId
|
||||
listenStr &= fulladdr
|
||||
let enrUri = node.enr.toUri()
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import chronicles, json_serialization, json_serialization/std/options
|
||||
import ../../../waku_node, ../serdes
|
||||
import waku/[waku_node, net/bound_ports, rest_api/endpoint/serdes]
|
||||
import std/typetraits
|
||||
|
||||
#### Types
|
||||
@ -76,7 +76,7 @@ proc readValue*(
|
||||
var
|
||||
listenAddresses: Option[seq[string]]
|
||||
enrUri: Option[string]
|
||||
ports: BoundPorts
|
||||
ports = BoundPorts.init()
|
||||
|
||||
for fieldName in readObjectFields(reader):
|
||||
case fieldName
|
||||
@ -97,7 +97,7 @@ proc readValue*(
|
||||
)
|
||||
value.mixPubKey = some(reader.readValue(string))
|
||||
of "ports":
|
||||
ports = reader.readValue(BoundPorts)
|
||||
reader.readValue(ports)
|
||||
else:
|
||||
unrecognizedFieldWarning(value)
|
||||
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
import
|
||||
./node/net_config,
|
||||
./net/net_config,
|
||||
./node/waku_switch as switch,
|
||||
./node/waku_node as node,
|
||||
./node/health_monitor as health_monitor,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user