mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-05-12 05:19:33 +00:00
feat: allow a port value of zero for service ports (auto-assign port) (#3828)
* any port set to 0 on conf results in a random port bound * Debug API MyBoundPorts reports actually bound ports for all services, reports 0 if disabled * write back bound values to both WakuConf and WakuNode.ports * setupDiscoveryV5 returns Result and errors out on port 0 * rename setupAndStartDiscv5WithAutoPort to setupAndStartDiscv5 * updateWaku ENR rebuild now runs after discv5 startup * Add DefaultP2pTcpPort, DefaultDiscv5UdpPort, DefaultWebSocketPort, DefaultRestPort, DefaultMetricsHttpPort * add tests
This commit is contained in:
parent
a62ab1e7b1
commit
71a369ffad
@ -376,6 +376,7 @@ suite "WakuConfBuilder - store retention policies":
|
||||
test "Multiple retention policies":
|
||||
## Given
|
||||
var b = WakuConfBuilder.init()
|
||||
b.withP2pTcpPort(0'u16)
|
||||
b.storeServiceConf.withEnabled(true)
|
||||
b.storeServiceConf.withDbUrl("sqlite://test.db")
|
||||
b.storeServiceConf.withRetentionPolicies(
|
||||
@ -420,6 +421,7 @@ suite "WakuConfBuilder - store retention policies":
|
||||
test "Store disabled - no retention policy applied":
|
||||
## Given
|
||||
var b = WakuConfBuilder.init()
|
||||
b.withP2pTcpPort(0'u16)
|
||||
# storeServiceConf not enabled
|
||||
|
||||
## When
|
||||
|
||||
@ -1,21 +1,23 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[options, sequtils],
|
||||
std/[net, options, sequtils, strutils],
|
||||
testutils/unittests,
|
||||
chronos,
|
||||
libp2p/multiaddress,
|
||||
libp2p/protocols/connectivity/relay/relay
|
||||
import eth/p2p/discoveryv5/enr
|
||||
chronos/transports/[stream, datagram, common],
|
||||
metrics/chronos_httpserver,
|
||||
libp2p/[crypto/crypto, multiaddress, protocols/connectivity/relay/relay],
|
||||
eth/p2p/discoveryv5/enr
|
||||
|
||||
import
|
||||
../testlib/wakunode,
|
||||
waku/waku_node,
|
||||
waku/waku_enr,
|
||||
waku/factory/node_factory,
|
||||
waku/factory/internal_config,
|
||||
waku/factory/conf_builder/conf_builder,
|
||||
waku/factory/conf_builder/web_socket_conf_builder
|
||||
tests/testlib/[wakunode, wakucore],
|
||||
waku/[waku_node, waku_enr, net/auto_port, discovery/waku_discv5, node/waku_metrics],
|
||||
waku/factory/[
|
||||
node_factory,
|
||||
internal_config,
|
||||
conf_builder/conf_builder,
|
||||
conf_builder/web_socket_conf_builder,
|
||||
]
|
||||
|
||||
suite "Node Factory":
|
||||
asynctest "Set up a node based on default configurations":
|
||||
@ -115,5 +117,90 @@ asynctest "Start a node based on default test configuration":
|
||||
check:
|
||||
node.started == true
|
||||
|
||||
# Default conf has p2pTcpPort=0, so the OS must have assigned a real port.
|
||||
var hasNonZeroTcp = false
|
||||
for a in node.switch.peerInfo.listenAddrs:
|
||||
let s = $a
|
||||
if ("/tcp/" in s) and not ("/tcp/0" in s):
|
||||
hasNonZeroTcp = true
|
||||
check hasNonZeroTcp
|
||||
|
||||
## Cleanup
|
||||
await node.stop()
|
||||
|
||||
suite "Auto-port retry":
|
||||
asynctest "metrics binds on free TCP port, fails on taken":
|
||||
let takenPort = Port(55100)
|
||||
let freePort = Port(55101)
|
||||
let taken = createStreamServer(initTAddress("127.0.0.1", takenPort))
|
||||
defer:
|
||||
taken.stop()
|
||||
await taken.closeWait()
|
||||
|
||||
proc buildMetricsConf(port: Port): MetricsServerConf =
|
||||
var b = MetricsServerConfBuilder.init()
|
||||
b.withEnabled(true)
|
||||
b.withHttpPort(port)
|
||||
b.build().value.get()
|
||||
|
||||
let failRes = await startMetricsServerAndLogging(buildMetricsConf(takenPort), 0'u16)
|
||||
check failRes.isErr()
|
||||
|
||||
let okRes = await startMetricsServerAndLogging(buildMetricsConf(freePort), 0'u16)
|
||||
check okRes.isOk()
|
||||
if okRes.isOk():
|
||||
await okRes.get().server.close()
|
||||
|
||||
asynctest "discv5 binds on free UDP port, fails on taken":
|
||||
let takenPort = Port(55200)
|
||||
let freePort = Port(55201)
|
||||
|
||||
proc dummyCb(
|
||||
transp: DatagramTransport, raddr: TransportAddress
|
||||
): Future[void] {.async: (raises: []).} =
|
||||
discard
|
||||
|
||||
let takenUdp =
|
||||
newDatagramTransport(dummyCb, local = initTAddress("0.0.0.0", takenPort))
|
||||
defer:
|
||||
await takenUdp.closeWait()
|
||||
|
||||
let nodeKey = generateSecp256k1Key()
|
||||
let node = newTestWakuNode(nodeKey, parseIpAddress("0.0.0.0"), Port(0))
|
||||
await node.start()
|
||||
defer:
|
||||
await node.stop()
|
||||
|
||||
proc buildDiscv5Conf(port: Port): Discv5Conf =
|
||||
var b = Discv5ConfBuilder.init()
|
||||
b.withEnabled(true)
|
||||
b.withUdpPort(port)
|
||||
b.build().value.get()
|
||||
|
||||
let failRes = await setupAndStartDiscv5(
|
||||
node.enr,
|
||||
node.peerManager,
|
||||
node.topicSubscriptionQueue,
|
||||
buildDiscv5Conf(takenPort),
|
||||
@[],
|
||||
node.rng,
|
||||
nodeKey,
|
||||
parseIpAddress("0.0.0.0"),
|
||||
0'u16,
|
||||
)
|
||||
check failRes.isErr()
|
||||
|
||||
let okRes = await setupAndStartDiscv5(
|
||||
node.enr,
|
||||
node.peerManager,
|
||||
node.topicSubscriptionQueue,
|
||||
buildDiscv5Conf(freePort),
|
||||
@[],
|
||||
node.rng,
|
||||
nodeKey,
|
||||
parseIpAddress("0.0.0.0"),
|
||||
0'u16,
|
||||
)
|
||||
check okRes.isOk()
|
||||
if okRes.isOk():
|
||||
await okRes.get().stop()
|
||||
|
||||
@ -4,7 +4,7 @@ import
|
||||
libp2p/crypto/[crypto, secp],
|
||||
libp2p/multiaddress,
|
||||
nimcrypto/utils,
|
||||
std/[options, random, sequtils],
|
||||
std/[net, options, random, sequtils],
|
||||
results,
|
||||
testutils/unittests
|
||||
import
|
||||
|
||||
@ -5,7 +5,7 @@ import chronos, confutils/toml/std/net, libp2p/multiaddress, testutils/unittests
|
||||
import ./testlib/wakunode, waku/waku_enr/capabilities
|
||||
|
||||
include
|
||||
waku/node/net_config,
|
||||
waku/net/net_config,
|
||||
waku/factory/conf_builder/web_socket_conf_builder,
|
||||
waku/factory/conf_builder/conf_builder
|
||||
|
||||
|
||||
@ -27,7 +27,6 @@ import
|
||||
# TODO: migrate to usage of a test cluster conf
|
||||
proc defaultTestWakuConfBuilder*(): WakuConfBuilder =
|
||||
var builder = WakuConfBuilder.init()
|
||||
builder.withP2pTcpPort(Port(0))
|
||||
builder.withP2pListenAddress(parseIpAddress("0.0.0.0"))
|
||||
builder.restServerConf.withListenAddress(parseIpAddress("127.0.0.1"))
|
||||
builder.withDnsAddrsNameServers(
|
||||
|
||||
@ -506,7 +506,8 @@ suite "Waku Discovery v5":
|
||||
waku.conf.nodeKey,
|
||||
waku.conf.endpointConf.p2pListenAddress,
|
||||
waku.conf.portsShift,
|
||||
)
|
||||
).valueOr:
|
||||
raiseAssert "failed setup discv5 in test: " & $error
|
||||
|
||||
check:
|
||||
waku.node.peerManager.switch.peerStore.peers().anyIt(
|
||||
@ -537,7 +538,8 @@ suite "Waku Discovery v5":
|
||||
waku.conf.nodeKey,
|
||||
waku.conf.endpointConf.p2pListenAddress,
|
||||
waku.conf.portsShift,
|
||||
)
|
||||
).valueOr:
|
||||
raiseAssert "failed setup discv5 in test: " & $error
|
||||
|
||||
check:
|
||||
not waku.node.peerManager.switch.peerStore.peers().anyIt(
|
||||
|
||||
@ -1,14 +1,13 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/json,
|
||||
testutils/unittests,
|
||||
chronicles,
|
||||
chronos,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/crypto/secp,
|
||||
libp2p/multiaddress,
|
||||
libp2p/switch
|
||||
import ../testlib/wakucore, ../testlib/wakunode
|
||||
libp2p/[crypto/crypto, crypto/secp, multiaddress, switch],
|
||||
tests/testlib/[wakucore, wakunode],
|
||||
waku/factory/conf_builder/conf_builder
|
||||
|
||||
include waku/factory/waku, waku/common/enr/typed_record
|
||||
|
||||
@ -99,3 +98,46 @@ suite "Wakunode2 - Waku initialization":
|
||||
## Cleanup
|
||||
(waitFor waku.stop()).isOkOr:
|
||||
raiseAssert error
|
||||
|
||||
test "explicit port=0 triggers auto-bind across all services":
|
||||
var builder = defaultTestWakuConfBuilder()
|
||||
builder.withP2pTcpPort(Port(0))
|
||||
builder.discv5Conf.withEnabled(true)
|
||||
builder.discv5Conf.withUdpPort(Port(0))
|
||||
builder.restServerConf.withEnabled(true)
|
||||
builder.restServerConf.withRelayCacheCapacity(50'u32)
|
||||
builder.restServerConf.withPort(Port(0))
|
||||
builder.metricsServerConf.withEnabled(true)
|
||||
builder.metricsServerConf.withHttpPort(Port(0))
|
||||
builder.webSocketConf.withEnabled(true)
|
||||
builder.webSocketConf.withWebSocketPort(Port(0))
|
||||
|
||||
let conf = builder.build().valueOr:
|
||||
raiseAssert error
|
||||
|
||||
check:
|
||||
conf.endpointConf.p2pTcpPort == Port(0)
|
||||
conf.discv5Conf.get().udpPort == Port(0)
|
||||
conf.restServerConf.get().port == Port(0)
|
||||
conf.metricsServerConf.get().httpPort == Port(0)
|
||||
conf.webSocketConf.get().port == Port(0)
|
||||
|
||||
var waku = (waitFor Waku.new(conf)).valueOr:
|
||||
raiseAssert error
|
||||
defer:
|
||||
(waitFor waku.stop()).isOkOr:
|
||||
raiseAssert error
|
||||
|
||||
(waitFor startWaku(addr waku)).isOkOr:
|
||||
raiseAssert error
|
||||
|
||||
let portsJson = waku.stateInfo.getNodeInfoItem(NodeInfoId.MyBoundPorts)
|
||||
let parsed = parseJson(portsJson)
|
||||
|
||||
check:
|
||||
parsed.kind == JObject
|
||||
parsed["tcp"].getInt() != 0
|
||||
parsed["webSocket"].getInt() != 0
|
||||
parsed["rest"].getInt() != 0
|
||||
parsed["discv5Udp"].getInt() != 0
|
||||
parsed["metrics"].getInt() != 0
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/options,
|
||||
testutils/unittests,
|
||||
presto,
|
||||
presto/client as presto_client,
|
||||
|
||||
@ -10,7 +10,7 @@ import
|
||||
eth/keys as eth_keys,
|
||||
eth/p2p/discoveryv5/node,
|
||||
eth/p2p/discoveryv5/protocol
|
||||
import ../node/peer_manager/peer_manager, ../waku_core, ../waku_enr
|
||||
import waku/[net/auto_port, node/peer_manager/peer_manager, waku_core, waku_enr]
|
||||
|
||||
export protocol, waku_enr
|
||||
|
||||
@ -409,7 +409,15 @@ proc setupDiscoveryV5*(
|
||||
key: crypto.PrivateKey,
|
||||
p2pListenAddress: IpAddress,
|
||||
portsShift: uint16,
|
||||
): WakuDiscoveryV5 =
|
||||
): Result[WakuDiscoveryV5, string] =
|
||||
## Public only for testing. Callers should use `setupAndStartDiscv5`, which
|
||||
## additionally handles `udpPort == 0` via auto-port retry.
|
||||
if conf.udpPort == Port(0):
|
||||
return err(
|
||||
"setupDiscoveryV5: udpPort must be non-zero; " &
|
||||
"use setupAndStartDiscv5 for port=0 auto-port retry"
|
||||
)
|
||||
|
||||
let dynamicBootstrapEnrs =
|
||||
dynamicBootstrapNodes.filterIt(it.hasUdpPort()).mapIt(it.enr.get())
|
||||
|
||||
@ -441,10 +449,47 @@ proc setupDiscoveryV5*(
|
||||
autoupdateRecord: conf.enrAutoUpdate,
|
||||
)
|
||||
|
||||
WakuDiscoveryV5.new(
|
||||
rng, discv5Conf, some(myENR), some(nodePeerManager), nodeTopicSubscriptionQueue
|
||||
return ok(
|
||||
WakuDiscoveryV5.new(
|
||||
rng, discv5Conf, some(myENR), some(nodePeerManager), nodeTopicSubscriptionQueue
|
||||
)
|
||||
)
|
||||
|
||||
proc setupAndStartDiscv5*(
|
||||
myENR: enr.Record,
|
||||
nodePeerManager: PeerManager,
|
||||
nodeTopicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent],
|
||||
conf: Discv5Conf,
|
||||
dynamicBootstrapNodes: seq[RemotePeerInfo],
|
||||
rng: ref HmacDrbgContext,
|
||||
key: crypto.PrivateKey,
|
||||
p2pListenAddress: IpAddress,
|
||||
portsShift: uint16,
|
||||
): Future[Result[WakuDiscoveryV5, string]] {.async: (raises: []).} =
|
||||
## Construct and start a `WakuDiscoveryV5` instance, handling auto-port
|
||||
## retry when the caller asks for `udpPort == 0`.
|
||||
proc attempt(
|
||||
port: Port
|
||||
): Future[Result[WakuDiscoveryV5, string]] {.async: (raises: []).} =
|
||||
var c = conf
|
||||
c.udpPort = port
|
||||
let wd = setupDiscoveryV5(
|
||||
myENR, nodePeerManager, nodeTopicSubscriptionQueue, c, dynamicBootstrapNodes, rng,
|
||||
key, p2pListenAddress, portsShift,
|
||||
).valueOr:
|
||||
return err(error)
|
||||
let startRes = await wd.start()
|
||||
if startRes.isErr():
|
||||
return err("failed to start discovery, attempt: " & startRes.error)
|
||||
return ok(wd)
|
||||
|
||||
let wd = (await tryWithAutoPort[WakuDiscoveryV5](conf.udpPort, attempt)).valueOr:
|
||||
return err("setupAndStartDiscv5: " & error)
|
||||
return ok(wd)
|
||||
|
||||
proc udpPort*(wd: WakuDiscoveryV5): Port =
|
||||
wd.conf.port
|
||||
|
||||
proc updateBootstrapRecords*(
|
||||
self: var WakuDiscoveryV5, newRecordsString: string
|
||||
): Result[void, string] =
|
||||
|
||||
@ -4,6 +4,8 @@ import ../waku_conf
|
||||
logScope:
|
||||
topics = "waku conf builder discv5"
|
||||
|
||||
const DefaultDiscv5UdpPort*: Port = Port(9000)
|
||||
|
||||
###########################
|
||||
## Discv5 Config Builder ##
|
||||
###########################
|
||||
@ -38,8 +40,8 @@ proc withTableIpLimit*(b: var Discv5ConfBuilder, tableIpLimit: uint) =
|
||||
proc withUdpPort*(b: var Discv5ConfBuilder, udpPort: Port) =
|
||||
b.udpPort = some(udpPort)
|
||||
|
||||
proc withUdpPort*(b: var Discv5ConfBuilder, udpPort: uint) =
|
||||
b.udpPort = some(Port(udpPort.uint16))
|
||||
proc withUdpPort*(b: var Discv5ConfBuilder, udpPort: uint16) =
|
||||
b.udpPort = some(Port(udpPort))
|
||||
|
||||
proc withBootstrapNodes*(b: var Discv5ConfBuilder, bootstrapNodes: seq[string]) =
|
||||
# TODO: validate ENRs?
|
||||
@ -57,7 +59,7 @@ proc build*(b: Discv5ConfBuilder): Result[Option[Discv5Conf], string] =
|
||||
bucketIpLimit: b.bucketIpLimit.get(2),
|
||||
enrAutoUpdate: b.enrAutoUpdate.get(true),
|
||||
tableIpLimit: b.tableIpLimit.get(10),
|
||||
udpPort: b.udpPort.get(9000.Port),
|
||||
udpPort: b.udpPort.get(DefaultDiscv5UdpPort),
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
@ -4,6 +4,8 @@ import ../waku_conf
|
||||
logScope:
|
||||
topics = "waku conf builder metrics server"
|
||||
|
||||
const DefaultMetricsHttpPort*: Port = Port(8008)
|
||||
|
||||
###################################
|
||||
## Metrics Server Config Builder ##
|
||||
###################################
|
||||
@ -40,7 +42,7 @@ proc build*(b: MetricsServerConfBuilder): Result[Option[MetricsServerConf], stri
|
||||
some(
|
||||
MetricsServerConf(
|
||||
httpAddress: b.httpAddress.get(static parseIpAddress("127.0.0.1")),
|
||||
httpPort: b.httpPort.get(8008.Port),
|
||||
httpPort: b.httpPort.get(DefaultMetricsHttpPort),
|
||||
logging: b.logging.get(false),
|
||||
)
|
||||
)
|
||||
|
||||
@ -4,6 +4,8 @@ import ../waku_conf
|
||||
logScope:
|
||||
topics = "waku conf builder rest server"
|
||||
|
||||
const DefaultRestPort*: Port = Port(8645)
|
||||
|
||||
################################
|
||||
## REST Server Config Builder ##
|
||||
################################
|
||||
@ -46,8 +48,6 @@ proc build*(b: RestServerConfBuilder): Result[Option[RestServerConf], string] =
|
||||
|
||||
if b.listenAddress.isNone():
|
||||
return err("restServer.listenAddress is not specified")
|
||||
if b.port.isNone():
|
||||
return err("restServer.port is not specified")
|
||||
if b.relayCacheCapacity.isNone():
|
||||
return err("restServer.relayCacheCapacity is not specified")
|
||||
|
||||
@ -56,7 +56,7 @@ proc build*(b: RestServerConfBuilder): Result[Option[RestServerConf], string] =
|
||||
RestServerConf(
|
||||
allowOrigin: b.allowOrigin,
|
||||
listenAddress: b.listenAddress.get(),
|
||||
port: b.port.get(),
|
||||
port: b.port.get(DefaultRestPort),
|
||||
admin: b.admin.get(false),
|
||||
relayCacheCapacity: b.relayCacheCapacity.get(),
|
||||
)
|
||||
|
||||
@ -8,11 +8,13 @@ import
|
||||
results
|
||||
|
||||
import
|
||||
../waku_conf,
|
||||
../networks_config,
|
||||
../../common/logging,
|
||||
../../common/utils/parse_size_units,
|
||||
../../waku_enr/capabilities,
|
||||
waku/[
|
||||
factory/waku_conf,
|
||||
factory/networks_config,
|
||||
common/logging,
|
||||
common/utils/parse_size_units,
|
||||
waku_enr/capabilities,
|
||||
],
|
||||
tools/confutils/entry_nodes
|
||||
|
||||
import
|
||||
@ -32,7 +34,9 @@ import
|
||||
logScope:
|
||||
topics = "waku conf builder"
|
||||
|
||||
const DefaultMaxConnections* = 150
|
||||
const
|
||||
DefaultMaxConnections* = 150
|
||||
DefaultP2pTcpPort*: Port = Port(60000)
|
||||
|
||||
type MaxMessageSizeKind* = enum
|
||||
mmskNone
|
||||
@ -574,12 +578,7 @@ proc build*(
|
||||
warn "Nat Strategy is not specified, defaulting to none"
|
||||
"none"
|
||||
|
||||
let p2pTcpPort =
|
||||
if builder.p2pTcpPort.isSome():
|
||||
builder.p2pTcpPort.get()
|
||||
else:
|
||||
warn "P2P Listening TCP Port is not specified, listening on 60000"
|
||||
60000.Port
|
||||
let p2pTcpPort = builder.p2pTcpPort.get(DefaultP2pTcpPort)
|
||||
|
||||
let p2pListenAddress =
|
||||
if builder.p2pListenAddress.isSome():
|
||||
|
||||
@ -4,6 +4,8 @@ import waku/factory/waku_conf
|
||||
logScope:
|
||||
topics = "waku conf builder websocket"
|
||||
|
||||
const DefaultWebSocketPort*: Port = Port(8000)
|
||||
|
||||
##############################
|
||||
## WebSocket Config Builder ##
|
||||
##############################
|
||||
@ -41,14 +43,12 @@ proc build*(b: WebSocketConfBuilder): Result[Option[WebSocketConf], string] =
|
||||
if not b.enabled.get(false):
|
||||
return ok(none(WebSocketConf))
|
||||
|
||||
if b.webSocketPort.isNone():
|
||||
return err("websocket.port is not specified")
|
||||
|
||||
if not b.secureEnabled.get(false):
|
||||
return ok(
|
||||
some(
|
||||
WebSocketConf(
|
||||
port: b.websocketPort.get(), secureConf: none(WebSocketSecureConf)
|
||||
port: b.webSocketPort.get(DefaultWebSocketPort),
|
||||
secureConf: none(WebSocketSecureConf),
|
||||
)
|
||||
)
|
||||
)
|
||||
@ -61,7 +61,7 @@ proc build*(b: WebSocketConfBuilder): Result[Option[WebSocketConf], string] =
|
||||
return ok(
|
||||
some(
|
||||
WebSocketConf(
|
||||
port: b.webSocketPort.get(),
|
||||
port: b.webSocketPort.get(DefaultWebSocketPort),
|
||||
secureConf: some(
|
||||
WebSocketSecureConf(keyPath: b.keyPath.get(), certPath: b.certPath.get())
|
||||
),
|
||||
|
||||
@ -8,7 +8,7 @@ import
|
||||
std/[options, sequtils, net],
|
||||
results
|
||||
|
||||
import ../common/utils/nat, ../node/net_config, ../waku_enr, ../waku_core, ./waku_conf
|
||||
import waku/[common/utils/nat, net/net_config, waku_enr, waku_core], ./waku_conf
|
||||
|
||||
proc tryBuildEnrRecord(
|
||||
conf: WakuConf, netConfig: NetConfig, multiaddrs: seq[MultiAddress]
|
||||
|
||||
@ -202,6 +202,11 @@ proc new*(
|
||||
else:
|
||||
nil
|
||||
|
||||
if not restServer.isNil():
|
||||
let boundRestPort = restServer.httpServer.address.port
|
||||
node.ports.rest = boundRestPort.uint16
|
||||
wakuConf.restServerConf.get().port = boundRestPort
|
||||
|
||||
# Set the extMultiAddrsOnly flag so the node knows not to replace explicit addresses
|
||||
node.extMultiAddrsOnly = wakuConf.endpointConf.extMultiAddrsOnly
|
||||
|
||||
@ -249,7 +254,7 @@ proc getPorts(
|
||||
return ok((tcpPort: tcpPort, websocketPort: websocketPort))
|
||||
|
||||
proc getRunningNetConfig(waku: ptr Waku): Future[Result[NetConfig, string]] {.async.} =
|
||||
var conf = waku[].conf
|
||||
let conf = waku[].conf
|
||||
let (tcpPort, websocketPort) = getPorts(waku[].node.switch.peerInfo.listenAddrs).valueOr:
|
||||
return err("Could not retrieve ports: " & error)
|
||||
|
||||
@ -281,6 +286,10 @@ proc updateEnr(waku: ptr Waku): Future[Result[void, string]] {.async.} =
|
||||
|
||||
waku[].node.enr = record
|
||||
|
||||
# If TCP/WS was configured with port 0, node.announcedAddresses was built
|
||||
# pre-bind with a port value of 0. In any case, the resync is harmless.
|
||||
waku[].node.announcedAddresses = netConf.announcedAddresses
|
||||
|
||||
return ok()
|
||||
|
||||
proc updateAddressInENR(waku: ptr Waku): Result[void, string] =
|
||||
@ -312,11 +321,8 @@ proc updateAddressInENR(waku: ptr Waku): Result[void, string] =
|
||||
return ok()
|
||||
|
||||
proc updateWaku(waku: ptr Waku): Future[Result[void, string]] {.async.} =
|
||||
let conf = waku[].conf
|
||||
if conf.endpointConf.p2pTcpPort == Port(0) or
|
||||
(conf.websocketConf.isSome() and conf.websocketConf.get.port == Port(0)):
|
||||
(await updateEnr(waku)).isOkOr:
|
||||
return err("error calling updateEnr: " & $error)
|
||||
(await updateEnr(waku)).isOkOr:
|
||||
return err("error calling updateEnr: " & $error)
|
||||
|
||||
?updateAnnouncedAddrWithPrimaryIpAddr(waku[].node)
|
||||
|
||||
@ -390,29 +396,37 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
|
||||
(await startNode(waku.node, waku.conf, waku.dynamicBootstrapNodes)).isOkOr:
|
||||
return err("error while calling startNode: " & $error)
|
||||
|
||||
## Update waku data that is set dynamically on node start
|
||||
try:
|
||||
(await updateWaku(waku)).isOkOr:
|
||||
return err("Error in updateApp: " & $error)
|
||||
except CatchableError:
|
||||
return err("Caught exception in updateApp: " & getCurrentExceptionMsg())
|
||||
let bound = getPorts(waku.node.switch.peerInfo.listenAddrs).valueOr:
|
||||
return err("failed to read bound ports from switch: " & $error)
|
||||
waku[].node.ports.tcp = bound.tcpPort.get(Port(0)).uint16
|
||||
waku[].node.ports.webSocket = bound.websocketPort.get(Port(0)).uint16
|
||||
|
||||
## Discv5
|
||||
if conf.discv5Conf.isSome():
|
||||
waku[].wakuDiscV5 = waku_discv5.setupDiscoveryV5(
|
||||
waku.node.enr,
|
||||
waku.node.peerManager,
|
||||
waku.node.topicSubscriptionQueue,
|
||||
conf.discv5Conf.get(),
|
||||
waku.dynamicBootstrapNodes,
|
||||
waku.rng,
|
||||
conf.nodeKey,
|
||||
conf.endpointConf.p2pListenAddress,
|
||||
conf.portsShift,
|
||||
)
|
||||
waku[].wakuDiscV5 = (
|
||||
await waku_discv5.setupAndStartDiscv5(
|
||||
waku.node.enr,
|
||||
waku.node.peerManager,
|
||||
waku.node.topicSubscriptionQueue,
|
||||
conf.discv5Conf.get(),
|
||||
waku.dynamicBootstrapNodes,
|
||||
waku.rng,
|
||||
conf.nodeKey,
|
||||
conf.endpointConf.p2pListenAddress,
|
||||
conf.portsShift,
|
||||
)
|
||||
).valueOr:
|
||||
return err("failed to start waku discovery v5: " & error)
|
||||
|
||||
(await waku.wakuDiscV5.start()).isOkOr:
|
||||
return err("failed to start waku discovery v5: " & $error)
|
||||
waku[].node.ports.discv5Udp = waku[].wakuDiscV5.udpPort.uint16
|
||||
waku[].conf.discv5Conf.get().udpPort = waku[].wakuDiscV5.udpPort
|
||||
|
||||
## Update waku data that is set dynamically on node start
|
||||
try:
|
||||
(await updateWaku(waku)).isOkOr:
|
||||
return err("Error in startWaku: " & $error)
|
||||
except CatchableError:
|
||||
return err("Caught exception in startWaku: " & getCurrentExceptionMsg())
|
||||
|
||||
## Reliability
|
||||
if not waku[].deliveryService.isNil():
|
||||
@ -482,14 +496,15 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
|
||||
|
||||
if conf.metricsServerConf.isSome():
|
||||
try:
|
||||
waku[].metricsServer = (
|
||||
await (
|
||||
waku_metrics.startMetricsServerAndLogging(
|
||||
conf.metricsServerConf.get(), conf.portsShift
|
||||
)
|
||||
let (server, port) = (
|
||||
await waku_metrics.startMetricsServerAndLogging(
|
||||
conf.metricsServerConf.get(), conf.portsShift
|
||||
)
|
||||
).valueOr:
|
||||
return err("Starting monitoring and external interfaces failed: " & error)
|
||||
waku[].metricsServer = server
|
||||
waku[].node.ports.metrics = port.uint16
|
||||
waku[].conf.metricsServerConf.get().httpPort = port
|
||||
except CatchableError:
|
||||
return err(
|
||||
"Caught exception starting monitoring and external interfaces failed: " &
|
||||
|
||||
@ -6,7 +6,7 @@
|
||||
|
||||
import std/[tables, sequtils, strutils]
|
||||
import metrics, eth/p2p/discoveryv5/enr, libp2p/peerid
|
||||
import waku/waku_node
|
||||
import waku/[waku_node, net/bound_ports]
|
||||
|
||||
type
|
||||
NodeInfoId* {.pure.} = enum
|
||||
@ -15,6 +15,7 @@ type
|
||||
MyMultiaddresses
|
||||
MyENR
|
||||
MyPeerId
|
||||
MyBoundPorts
|
||||
|
||||
WakuStateInfo* {.requiresInit.} = object
|
||||
node: WakuNode
|
||||
@ -43,6 +44,8 @@ proc getNodeInfoItem*(self: WakuStateInfo, infoItemId: NodeInfoId): string =
|
||||
return self.node.enr.toURI()
|
||||
of NodeInfoId.MyPeerId:
|
||||
return $PeerId(self.node.peerId())
|
||||
of NodeInfoId.MyBoundPorts:
|
||||
return $self.node.ports
|
||||
else:
|
||||
return "unknown info item id"
|
||||
|
||||
|
||||
48
waku/net/auto_port.nim
Normal file
48
waku/net/auto_port.nim
Normal file
@ -0,0 +1,48 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[net, random]
|
||||
import chronos, results
|
||||
|
||||
const
|
||||
AutoPortRetryCount* = 20
|
||||
AutoPortMin = 50000'u16
|
||||
AutoPortMax = 59000'u16
|
||||
AutoPortAttemptTimeout = chronos.seconds(30)
|
||||
|
||||
proc getAutoPort*(): uint16 =
|
||||
var rng = initRand()
|
||||
uint16(rng.rand(AutoPortMin.int .. AutoPortMax.int))
|
||||
|
||||
proc tryWithAutoPort*[T](
|
||||
startingPort: Port,
|
||||
attempt: proc(p: Port): Future[Result[T, string]] {.async: (raises: []).},
|
||||
): Future[Result[T, string]] {.async: (raises: []).} =
|
||||
## If `startingPort == Port(0)`, call `attempt` up to `AutoPortRetryCount`
|
||||
## times with random ports. Otherwise call it once with `startingPort`.
|
||||
## Returns the first ok or the last err.
|
||||
let autoMode = startingPort == Port(0)
|
||||
let attempts = if autoMode: AutoPortRetryCount else: 1
|
||||
var lastErr = ""
|
||||
for i in 1 .. attempts:
|
||||
let port =
|
||||
if autoMode:
|
||||
Port(getAutoPort())
|
||||
else:
|
||||
startingPort
|
||||
let fut = attempt(port)
|
||||
let res =
|
||||
try:
|
||||
if await fut.withTimeout(AutoPortAttemptTimeout):
|
||||
await fut
|
||||
else:
|
||||
fut.cancelSoon()
|
||||
Result[T, string].err("bind attempt timed out")
|
||||
except CancelledError:
|
||||
fut.cancelSoon()
|
||||
Result[T, string].err("bind attempt cancelled")
|
||||
if res.isOk():
|
||||
return ok(res.get())
|
||||
lastErr = res.error
|
||||
if autoMode:
|
||||
return err("auto-port exhausted; last error: " & lastErr)
|
||||
return err("port bind failed: " & lastErr)
|
||||
20
waku/net/bound_ports.nim
Normal file
20
waku/net/bound_ports.nim
Normal file
@ -0,0 +1,20 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/json
|
||||
|
||||
type BoundPorts* {.requiresInit.} = object
|
||||
## Set by the factory once each service has bound to a port.
|
||||
## A value of 0 means the service was not enabled or did not bind.
|
||||
tcp*: uint16
|
||||
webSocket*: uint16
|
||||
rest*: uint16
|
||||
discv5Udp*: uint16
|
||||
metrics*: uint16
|
||||
|
||||
proc init*(T: type BoundPorts): BoundPorts =
|
||||
return BoundPorts(
|
||||
tcp: 0'u16, webSocket: 0'u16, rest: 0'u16, discv5Udp: 0'u16, metrics: 0'u16
|
||||
)
|
||||
|
||||
proc `$`*(p: BoundPorts): string =
|
||||
return $(%*p)
|
||||
@ -2,8 +2,7 @@
|
||||
|
||||
import chronicles, chronos, metrics, metrics/chronos_httpserver
|
||||
import
|
||||
../waku_rln_relay/protocol_metrics as rln_metrics,
|
||||
../utils/collector,
|
||||
waku/[net/auto_port, waku_rln_relay/protocol_metrics as rln_metrics, utils/collector],
|
||||
./peer_manager,
|
||||
./waku_node
|
||||
|
||||
@ -57,27 +56,36 @@ proc startMetricsLog*() =
|
||||
|
||||
discard setTimer(Moment.fromNow(LogInterval), logMetrics)
|
||||
|
||||
type StartedMetricsServer* = tuple[server: MetricsHttpServerRef, port: Port]
|
||||
|
||||
proc startMetricsServer(
|
||||
serverIp: IpAddress, serverPort: Port
|
||||
): Future[Result[MetricsHttpServerRef, string]] {.async.} =
|
||||
info "Starting metrics HTTP server", serverIp = $serverIp, serverPort = $serverPort
|
||||
): Future[Result[StartedMetricsServer, string]] {.async.} =
|
||||
proc attempt(
|
||||
port: Port
|
||||
): Future[Result[StartedMetricsServer, string]] {.async: (raises: []).} =
|
||||
info "Starting metrics HTTP server", serverIp = $serverIp, serverPort = $port
|
||||
|
||||
let server = MetricsHttpServerRef.new($serverIp, serverPort).valueOr:
|
||||
return err("metrics HTTP server start failed: " & $error)
|
||||
let server = MetricsHttpServerRef.new($serverIp, port).valueOr:
|
||||
return err("fail to start service metrics server, attempt:" & $error)
|
||||
|
||||
try:
|
||||
await server.start()
|
||||
except CatchableError:
|
||||
return err("metrics HTTP server start failed: " & getCurrentExceptionMsg())
|
||||
try:
|
||||
await server.start()
|
||||
except CatchableError:
|
||||
return
|
||||
err("exception while startMetricsServer, attempt: " & getCurrentExceptionMsg())
|
||||
|
||||
info "Metrics HTTP server started", serverIp = $serverIp, serverPort = $serverPort
|
||||
return ok(server)
|
||||
info "Metrics HTTP server started", serverIp = $serverIp, serverPort = $port
|
||||
return ok((server: server, port: port))
|
||||
|
||||
let started = (await tryWithAutoPort[StartedMetricsServer](serverPort, attempt)).valueOr:
|
||||
return err("metrics HTTP server start failed: " & error)
|
||||
return ok(started)
|
||||
|
||||
proc startMetricsServerAndLogging*(
|
||||
conf: MetricsServerConf, portsShift: uint16
|
||||
): Future[Result[MetricsHttpServerRef, string]] {.async.} =
|
||||
var metricsServer: MetricsHttpServerRef
|
||||
metricsServer = (
|
||||
): Future[Result[StartedMetricsServer, string]] {.async.} =
|
||||
let started = (
|
||||
await (
|
||||
startMetricsServer(conf.httpAddress, Port(conf.httpPort.uint16 + portsShift))
|
||||
)
|
||||
@ -87,4 +95,4 @@ proc startMetricsServerAndLogging*(
|
||||
if conf.logging:
|
||||
startMetricsLog()
|
||||
|
||||
return ok(metricsServer)
|
||||
return ok(started)
|
||||
|
||||
@ -62,7 +62,7 @@ import
|
||||
events/message_events,
|
||||
],
|
||||
waku/discovery/waku_kademlia,
|
||||
./net_config,
|
||||
waku/net/[bound_ports, net_config],
|
||||
./peer_manager,
|
||||
./health_monitor/health_status,
|
||||
./health_monitor/topic_health
|
||||
@ -140,6 +140,7 @@ type
|
||||
wakuMix*: WakuMix
|
||||
kademliaDiscoveryLoop*: Future[void]
|
||||
wakuKademlia*: WakuKademlia
|
||||
ports*: BoundPorts
|
||||
|
||||
proc deduceRelayShard(
|
||||
node: WakuNode,
|
||||
@ -224,6 +225,7 @@ proc new*(
|
||||
announcedAddresses: netConfig.announcedAddresses,
|
||||
topicSubscriptionQueue: queue,
|
||||
rateLimitSettings: rateLimitSettings,
|
||||
ports: BoundPorts.init(),
|
||||
)
|
||||
|
||||
peerManager.setShardGetter(node.getShardsGetter(@[]))
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
import
|
||||
./node/net_config,
|
||||
./net/net_config,
|
||||
./node/waku_switch as switch,
|
||||
./node/waku_node as node,
|
||||
./node/health_monitor as health_monitor,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user