From 3db31473dd33ff40e9f4a3cffd0bf12386cf3f51 Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Wed, 26 Apr 2023 19:25:18 +0200 Subject: [PATCH] refactor(wakunode2): flatten and simplify app setup (#1705) --- apps/chat2/chat2.nim | 28 ++++++- apps/wakunode2/app.nim | 100 +++++++++++++++++++++--- apps/wakunode2/wakunode2_setup_rest.nim | 45 ----------- apps/wakunode2/wakunode2_setup_rpc.nim | 58 -------------- waku/v2/node/waku_metrics.nim | 11 --- 5 files changed, 112 insertions(+), 130 deletions(-) delete mode 100644 apps/wakunode2/wakunode2_setup_rest.nim delete mode 100644 apps/wakunode2/wakunode2_setup_rpc.nim diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index aac21c293..61bac6105 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -12,7 +12,9 @@ else: import std/[strformat, strutils, times, json, options, random] import confutils, chronicles, chronos, stew/shims/net as stewNet, eth/keys, bearssl, stew/[byteutils, results], - nimcrypto/pbkdf2 + nimcrypto/pbkdf2, + metrics, + metrics/chronos_httpserver import libp2p/[switch, # manage transports, a single entry point for dialing and listening crypto/crypto, # cryptographic functions stream/connection, # create and close stream read / write connections @@ -205,6 +207,24 @@ proc readNick(transp: StreamTransport): Future[string] {.async.} = stdout.flushFile() return await transp.readLine() + +proc startMetricsServer(serverIp: ValidIpAddress, serverPort: Port): Result[MetricsHttpServerRef, string] = + info "Starting metrics HTTP server", serverIp= $serverIp, serverPort= $serverPort + + let metricsServerRes = MetricsHttpServerRef.new($serverIp, serverPort) + if metricsServerRes.isErr(): + return err("metrics HTTP server start failed: " & $metricsServerRes.error) + + let server = metricsServerRes.value + try: + waitFor server.start() + except CatchableError: + return err("metrics HTTP server start failed: " & getCurrentExceptionMsg()) + + info "Metrics HTTP server started", serverIp= $serverIp, serverPort= $serverPort + ok(metricsServerRes.value) + + proc publish(c: Chat, line: string) = # First create a Chat2Message protobuf with this line of text let time = getTime().toUnix() @@ -604,8 +624,10 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = startMetricsLog() if conf.metricsServer: - startMetricsServer(conf.metricsServerAddress, - Port(conf.metricsServerPort + conf.portsShift)) + let metricsServer = startMetricsServer( + conf.metricsServerAddress, + Port(conf.metricsServerPort + conf.portsShift) + ) await chat.readWriteLoop() diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index 2e9c0d155..b9f3cae5b 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -15,7 +15,9 @@ import eth/keys, eth/net/nat, json_rpc/rpcserver, - presto + presto, + metrics, + metrics/chronos_httpserver import ../../waku/common/sqlite, ../../waku/v2/waku_core, @@ -40,6 +42,18 @@ import ../../waku/v2/waku_lightpush, ../../waku/v2/waku_filter, ./config +import + ../../waku/v2/node/message_cache, + ../../waku/v2/node/rest/server, + ../../waku/v2/node/rest/debug/handlers as rest_debug_api, + ../../waku/v2/node/rest/relay/handlers as rest_relay_api, + ../../waku/v2/node/rest/relay/topic_cache, + ../../waku/v2/node/rest/store/handlers as rest_store_api, + ../../waku/v2/node/jsonrpc/admin/handlers as rpc_admin_api, + ../../waku/v2/node/jsonrpc/debug/handlers as rpc_debug_api, + ../../waku/v2/node/jsonrpc/filter/handlers as rpc_filter_api, + ../../waku/v2/node/jsonrpc/relay/handlers as rpc_relay_api, + ../../waku/v2/node/jsonrpc/store/handlers as rpc_store_api when defined(rln): import ../../waku/v2/waku_rln_relay @@ -66,6 +80,7 @@ type rpcServer: Option[RpcHttpServer] restServer: Option[RestServerRef] + metricsServer: Option[MetricsHttpServerRef] AppResult*[T] = Result[T, string] @@ -713,17 +728,71 @@ proc startNode*(app: App): Future[AppResult[void]] {.async.} = ## Monitoring and external interfaces -# TODO: Merge the `wakunode_setup_*.nim` files here. Once the encapsulating -# type (e.g., App) is implemented. Hold both servers instances to support -# a graceful shutdown. -import - ./wakunode2_setup_rpc, - ./wakunode2_setup_rest +proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNodeConf): AppResult[RestServerRef] = + let server = ? newRestHttpServer(address, port) + ## Debug REST API + installDebugApiHandlers(server.router, app.node) -proc startMetricsServer(node: WakuNode, address: ValidIpAddress, port: uint16, portsShift: uint16): AppResult[void] = - startMetricsServer(address, Port(port + portsShift)) - ok() + ## Relay REST API + if conf.relay: + let relayCache = TopicCache.init(capacity=conf.restRelayCacheCapacity) + installRelayApiHandlers(server.router, app.node, relayCache) + + ## Store REST API + installStoreApiHandlers(server.router, app.node) + + server.start() + info "Starting REST HTTP server", url = "http://" & $address & ":" & $port & "/" + + ok(server) + +proc startRpcServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNodeConf): AppResult[RpcHttpServer] = + let ta = initTAddress(address, port) + + var server: RpcHttpServer + try: + server = newRpcHttpServer([ta]) + except CatchableError: + return err("failed to init JSON-RPC server: " & getCurrentExceptionMsg()) + + installDebugApiHandlers(app.node, server) + + if conf.relay: + let relayMessageCache = rpc_relay_api.MessageCache.init(capacity=30) + installRelayApiHandlers(app.node, server, relayMessageCache) + if conf.rpcPrivate: + installRelayPrivateApiHandlers(app.node, server, relayMessageCache) + + if conf.filternode != "": + let filterMessageCache = rpc_filter_api.MessageCache.init(capacity=30) + installFilterApiHandlers(app.node, server, filterMessageCache) + + installStoreApiHandlers(app.node, server) + + if conf.rpcAdmin: + installAdminApiHandlers(app.node, server) + + server.start() + info "RPC Server started", address=ta + + ok(server) + +proc startMetricsServer(serverIp: ValidIpAddress, serverPort: Port): AppResult[MetricsHttpServerRef] = + info "Starting metrics HTTP server", serverIp= $serverIp, serverPort= $serverPort + + let metricsServerRes = MetricsHttpServerRef.new($serverIp, serverPort) + if metricsServerRes.isErr(): + return err("metrics HTTP server start failed: " & $metricsServerRes.error) + + let server = metricsServerRes.value + try: + waitFor server.start() + except CatchableError: + return err("metrics HTTP server start failed: " & getCurrentExceptionMsg()) + + info "Metrics HTTP server started", serverIp= $serverIp, serverPort= $serverPort + ok(server) proc startMetricsLogging(): AppResult[void] = startMetricsLog() @@ -731,14 +800,14 @@ proc startMetricsLogging(): AppResult[void] = proc setupMonitoringAndExternalInterfaces*(app: var App): AppResult[void] = if app.conf.rpc: - let startRpcServerRes = startRpcServer(app.node, app.conf.rpcAddress, app.conf.rpcPort, app.conf.portsShift, app.conf) + let startRpcServerRes = startRpcServer(app, app.conf.rpcAddress, Port(app.conf.rpcPort + app.conf.portsShift), app.conf) if startRpcServerRes.isErr(): error "6/7 Starting JSON-RPC server failed. Continuing in current state.", error=startRpcServerRes.error else: app.rpcServer = some(startRpcServerRes.value) if app.conf.rest: - let startRestServerRes = startRestServer(app.node, app.conf.restAddress, app.conf.restPort, app.conf.portsShift, app.conf) + let startRestServerRes = startRestServer(app, app.conf.restAddress, Port(app.conf.restPort + app.conf.portsShift), app.conf) if startRestServerRes.isErr(): error "6/7 Starting REST server failed. Continuing in current state.", error=startRestServerRes.error else: @@ -746,9 +815,11 @@ proc setupMonitoringAndExternalInterfaces*(app: var App): AppResult[void] = if app.conf.metricsServer: - let startMetricsServerRes = startMetricsServer(app.node, app.conf.metricsServerAddress, app.conf.metricsServerPort, app.conf.portsShift) + let startMetricsServerRes = startMetricsServer(app.conf.metricsServerAddress, Port(app.conf.metricsServerPort + app.conf.portsShift)) if startMetricsServerRes.isErr(): error "6/7 Starting metrics server failed. Continuing in current state.", error=startMetricsServerRes.error + else: + app.metricsServer = some(startMetricsServerRes.value) if app.conf.metricsLogging: let startMetricsLoggingRes = startMetricsLogging() @@ -767,5 +838,8 @@ proc stop*(app: App): Future[void] {.async.} = if app.rpcServer.isSome(): await app.rpcServer.get().stop() + if app.metricsServer.isSome(): + await app.metricsServer.get().stop() + if not app.node.isNil(): await app.node.stop() diff --git a/apps/wakunode2/wakunode2_setup_rest.nim b/apps/wakunode2/wakunode2_setup_rest.nim deleted file mode 100644 index 072fd10f1..000000000 --- a/apps/wakunode2/wakunode2_setup_rest.nim +++ /dev/null @@ -1,45 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import - stew/results, - stew/shims/net, - chronicles, - presto -import - ../../waku/v2/waku_node, - ../../waku/v2/node/rest/server, - ../../waku/v2/node/rest/debug/handlers as debug_api, - ../../waku/v2/node/rest/relay/handlers as relay_api, - ../../waku/v2/node/rest/relay/topic_cache, - ../../waku/v2/node/rest/store/handlers as store_api, - ./config - - -logScope: - topics = "wakunode rest" - - -proc startRestServer(node: WakuNode, address: ValidIpAddress, port: Port, conf: WakuNodeConf): RestServerResult[RestServerRef] = - let server = ? newRestHttpServer(address, port) - - ## Debug REST API - installDebugApiHandlers(server.router, node) - - ## Relay REST API - if conf.relay: - let relayCache = TopicCache.init(capacity=conf.restRelayCacheCapacity) - installRelayApiHandlers(server.router, node, relayCache) - - ## Store REST API - installStoreApiHandlers(server.router, node) - - server.start() - info "Starting REST HTTP server", url = "http://" & $address & ":" & $port & "/" - - ok(server) - -proc startRestServer*(node: WakuNode, address: ValidIpAddress, port: uint16, portsShift: uint16, conf: WakuNodeConf): RestServerResult[RestServerRef] = - return startRestServer(node, address, Port(port + portsShift), conf) diff --git a/apps/wakunode2/wakunode2_setup_rpc.nim b/apps/wakunode2/wakunode2_setup_rpc.nim deleted file mode 100644 index ea174261e..000000000 --- a/apps/wakunode2/wakunode2_setup_rpc.nim +++ /dev/null @@ -1,58 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import - stew/results, - stew/shims/net, - chronicles, - json_rpc/rpcserver -import - ../../waku/v2/node/message_cache, - ../../waku/v2/waku_node, - ../../waku/v2/node/jsonrpc/admin/handlers as admin_api, - ../../waku/v2/node/jsonrpc/debug/handlers as debug_api, - ../../waku/v2/node/jsonrpc/filter/handlers as filter_api, - ../../waku/v2/node/jsonrpc/relay/handlers as relay_api, - ../../waku/v2/node/jsonrpc/store/handlers as store_api, - ./config - -logScope: - topics = "wakunode jsonrpc" - -proc startRpcServer(node: WakuNode, address: ValidIpAddress, port: Port, conf: WakuNodeConf): Result[RpcHttpServer, string] = - let ta = initTAddress(address, port) - - var server: RpcHttpServer - try: - server = newRpcHttpServer([ta]) - except CatchableError: - return err("failed to init JSON-RPC server: " & getCurrentExceptionMsg()) - - installDebugApiHandlers(node, server) - - # TODO: Move to setup protocols proc - if conf.relay: - let relayMessageCache = relay_api.MessageCache.init(capacity=30) - installRelayApiHandlers(node, server, relayMessageCache) - if conf.rpcPrivate: - installRelayPrivateApiHandlers(node, server, relayMessageCache) - - # TODO: Move to setup protocols proc - if conf.filternode != "": - let filterMessageCache = filter_api.MessageCache.init(capacity=30) - installFilterApiHandlers(node, server, filterMessageCache) - - installStoreApiHandlers(node, server) - - if conf.rpcAdmin: - installAdminApiHandlers(node, server) - - server.start() - info "RPC Server started", address=ta - - ok(server) - -proc startRpcServer*(node: WakuNode, address: ValidIpAddress, port: uint16, portsShift: uint16, conf: WakuNodeConf): Result[RpcHttpServer, string] = - return startRpcServer(node, address, Port(port + portsShift), conf) diff --git a/waku/v2/node/waku_metrics.nim b/waku/v2/node/waku_metrics.nim index 2a5fdf04b..9e857fa3e 100644 --- a/waku/v2/node/waku_metrics.nim +++ b/waku/v2/node/waku_metrics.nim @@ -4,7 +4,6 @@ else: {.push raises: [].} import - stew/shims/net, chronicles, chronos, metrics, @@ -25,16 +24,6 @@ logScope: topics = "waku node metrics" -proc startMetricsServer*(serverIp: ValidIpAddress, serverPort: Port) = - info "Starting metrics HTTP server", serverIp= $serverIp, serverPort= $serverPort - - try: - startMetricsHttpServer($serverIp, serverPort) - except Exception as e: - raiseAssert("Exception while starting metrics HTTP server: " & e.msg) - - info "Metrics HTTP server started", serverIp= $serverIp, serverPort= $serverPort - type # https://github.com/nim-lang/Nim/issues/17369 MetricsLogger = proc(udata: pointer) {.gcsafe, raises: [Defect].}