mirror of https://github.com/waku-org/nwaku.git
refactor(wakunode2): flatten and simplify app setup (#1705)
This commit is contained in:
parent
b397ed6cbd
commit
ce92fc1aed
|
@ -12,7 +12,9 @@ else:
|
|||
import std/[strformat, strutils, times, json, options, random]
|
||||
import confutils, chronicles, chronos, stew/shims/net as stewNet,
|
||||
eth/keys, bearssl, stew/[byteutils, results],
|
||||
nimcrypto/pbkdf2
|
||||
nimcrypto/pbkdf2,
|
||||
metrics,
|
||||
metrics/chronos_httpserver
|
||||
import libp2p/[switch, # manage transports, a single entry point for dialing and listening
|
||||
crypto/crypto, # cryptographic functions
|
||||
stream/connection, # create and close stream read / write connections
|
||||
|
@ -205,6 +207,24 @@ proc readNick(transp: StreamTransport): Future[string] {.async.} =
|
|||
stdout.flushFile()
|
||||
return await transp.readLine()
|
||||
|
||||
|
||||
proc startMetricsServer(serverIp: ValidIpAddress, serverPort: Port): Result[MetricsHttpServerRef, string] =
|
||||
info "Starting metrics HTTP server", serverIp= $serverIp, serverPort= $serverPort
|
||||
|
||||
let metricsServerRes = MetricsHttpServerRef.new($serverIp, serverPort)
|
||||
if metricsServerRes.isErr():
|
||||
return err("metrics HTTP server start failed: " & $metricsServerRes.error)
|
||||
|
||||
let server = metricsServerRes.value
|
||||
try:
|
||||
waitFor server.start()
|
||||
except CatchableError:
|
||||
return err("metrics HTTP server start failed: " & getCurrentExceptionMsg())
|
||||
|
||||
info "Metrics HTTP server started", serverIp= $serverIp, serverPort= $serverPort
|
||||
ok(metricsServerRes.value)
|
||||
|
||||
|
||||
proc publish(c: Chat, line: string) =
|
||||
# First create a Chat2Message protobuf with this line of text
|
||||
let time = getTime().toUnix()
|
||||
|
@ -604,8 +624,10 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
|||
startMetricsLog()
|
||||
|
||||
if conf.metricsServer:
|
||||
startMetricsServer(conf.metricsServerAddress,
|
||||
Port(conf.metricsServerPort + conf.portsShift))
|
||||
let metricsServer = startMetricsServer(
|
||||
conf.metricsServerAddress,
|
||||
Port(conf.metricsServerPort + conf.portsShift)
|
||||
)
|
||||
|
||||
|
||||
await chat.readWriteLoop()
|
||||
|
|
|
@ -15,7 +15,9 @@ import
|
|||
eth/keys,
|
||||
eth/net/nat,
|
||||
json_rpc/rpcserver,
|
||||
presto
|
||||
presto,
|
||||
metrics,
|
||||
metrics/chronos_httpserver
|
||||
import
|
||||
../../waku/common/sqlite,
|
||||
../../waku/v2/waku_core,
|
||||
|
@ -40,6 +42,18 @@ import
|
|||
../../waku/v2/waku_lightpush,
|
||||
../../waku/v2/waku_filter,
|
||||
./config
|
||||
import
|
||||
../../waku/v2/node/message_cache,
|
||||
../../waku/v2/node/rest/server,
|
||||
../../waku/v2/node/rest/debug/handlers as rest_debug_api,
|
||||
../../waku/v2/node/rest/relay/handlers as rest_relay_api,
|
||||
../../waku/v2/node/rest/relay/topic_cache,
|
||||
../../waku/v2/node/rest/store/handlers as rest_store_api,
|
||||
../../waku/v2/node/jsonrpc/admin/handlers as rpc_admin_api,
|
||||
../../waku/v2/node/jsonrpc/debug/handlers as rpc_debug_api,
|
||||
../../waku/v2/node/jsonrpc/filter/handlers as rpc_filter_api,
|
||||
../../waku/v2/node/jsonrpc/relay/handlers as rpc_relay_api,
|
||||
../../waku/v2/node/jsonrpc/store/handlers as rpc_store_api
|
||||
|
||||
when defined(rln):
|
||||
import ../../waku/v2/waku_rln_relay
|
||||
|
@ -66,6 +80,7 @@ type
|
|||
|
||||
rpcServer: Option[RpcHttpServer]
|
||||
restServer: Option[RestServerRef]
|
||||
metricsServer: Option[MetricsHttpServerRef]
|
||||
|
||||
AppResult*[T] = Result[T, string]
|
||||
|
||||
|
@ -713,17 +728,71 @@ proc startNode*(app: App): Future[AppResult[void]] {.async.} =
|
|||
|
||||
## Monitoring and external interfaces
|
||||
|
||||
# TODO: Merge the `wakunode_setup_*.nim` files here. Once the encapsulating
|
||||
# type (e.g., App) is implemented. Hold both servers instances to support
|
||||
# a graceful shutdown.
|
||||
import
|
||||
./wakunode2_setup_rpc,
|
||||
./wakunode2_setup_rest
|
||||
proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNodeConf): AppResult[RestServerRef] =
|
||||
let server = ? newRestHttpServer(address, port)
|
||||
|
||||
## Debug REST API
|
||||
installDebugApiHandlers(server.router, app.node)
|
||||
|
||||
proc startMetricsServer(node: WakuNode, address: ValidIpAddress, port: uint16, portsShift: uint16): AppResult[void] =
|
||||
startMetricsServer(address, Port(port + portsShift))
|
||||
ok()
|
||||
## Relay REST API
|
||||
if conf.relay:
|
||||
let relayCache = TopicCache.init(capacity=conf.restRelayCacheCapacity)
|
||||
installRelayApiHandlers(server.router, app.node, relayCache)
|
||||
|
||||
## Store REST API
|
||||
installStoreApiHandlers(server.router, app.node)
|
||||
|
||||
server.start()
|
||||
info "Starting REST HTTP server", url = "http://" & $address & ":" & $port & "/"
|
||||
|
||||
ok(server)
|
||||
|
||||
proc startRpcServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNodeConf): AppResult[RpcHttpServer] =
|
||||
let ta = initTAddress(address, port)
|
||||
|
||||
var server: RpcHttpServer
|
||||
try:
|
||||
server = newRpcHttpServer([ta])
|
||||
except CatchableError:
|
||||
return err("failed to init JSON-RPC server: " & getCurrentExceptionMsg())
|
||||
|
||||
installDebugApiHandlers(app.node, server)
|
||||
|
||||
if conf.relay:
|
||||
let relayMessageCache = rpc_relay_api.MessageCache.init(capacity=30)
|
||||
installRelayApiHandlers(app.node, server, relayMessageCache)
|
||||
if conf.rpcPrivate:
|
||||
installRelayPrivateApiHandlers(app.node, server, relayMessageCache)
|
||||
|
||||
if conf.filternode != "":
|
||||
let filterMessageCache = rpc_filter_api.MessageCache.init(capacity=30)
|
||||
installFilterApiHandlers(app.node, server, filterMessageCache)
|
||||
|
||||
installStoreApiHandlers(app.node, server)
|
||||
|
||||
if conf.rpcAdmin:
|
||||
installAdminApiHandlers(app.node, server)
|
||||
|
||||
server.start()
|
||||
info "RPC Server started", address=ta
|
||||
|
||||
ok(server)
|
||||
|
||||
proc startMetricsServer(serverIp: ValidIpAddress, serverPort: Port): AppResult[MetricsHttpServerRef] =
|
||||
info "Starting metrics HTTP server", serverIp= $serverIp, serverPort= $serverPort
|
||||
|
||||
let metricsServerRes = MetricsHttpServerRef.new($serverIp, serverPort)
|
||||
if metricsServerRes.isErr():
|
||||
return err("metrics HTTP server start failed: " & $metricsServerRes.error)
|
||||
|
||||
let server = metricsServerRes.value
|
||||
try:
|
||||
waitFor server.start()
|
||||
except CatchableError:
|
||||
return err("metrics HTTP server start failed: " & getCurrentExceptionMsg())
|
||||
|
||||
info "Metrics HTTP server started", serverIp= $serverIp, serverPort= $serverPort
|
||||
ok(server)
|
||||
|
||||
proc startMetricsLogging(): AppResult[void] =
|
||||
startMetricsLog()
|
||||
|
@ -731,14 +800,14 @@ proc startMetricsLogging(): AppResult[void] =
|
|||
|
||||
proc setupMonitoringAndExternalInterfaces*(app: var App): AppResult[void] =
|
||||
if app.conf.rpc:
|
||||
let startRpcServerRes = startRpcServer(app.node, app.conf.rpcAddress, app.conf.rpcPort, app.conf.portsShift, app.conf)
|
||||
let startRpcServerRes = startRpcServer(app, app.conf.rpcAddress, Port(app.conf.rpcPort + app.conf.portsShift), app.conf)
|
||||
if startRpcServerRes.isErr():
|
||||
error "6/7 Starting JSON-RPC server failed. Continuing in current state.", error=startRpcServerRes.error
|
||||
else:
|
||||
app.rpcServer = some(startRpcServerRes.value)
|
||||
|
||||
if app.conf.rest:
|
||||
let startRestServerRes = startRestServer(app.node, app.conf.restAddress, app.conf.restPort, app.conf.portsShift, app.conf)
|
||||
let startRestServerRes = startRestServer(app, app.conf.restAddress, Port(app.conf.restPort + app.conf.portsShift), app.conf)
|
||||
if startRestServerRes.isErr():
|
||||
error "6/7 Starting REST server failed. Continuing in current state.", error=startRestServerRes.error
|
||||
else:
|
||||
|
@ -746,9 +815,11 @@ proc setupMonitoringAndExternalInterfaces*(app: var App): AppResult[void] =
|
|||
|
||||
|
||||
if app.conf.metricsServer:
|
||||
let startMetricsServerRes = startMetricsServer(app.node, app.conf.metricsServerAddress, app.conf.metricsServerPort, app.conf.portsShift)
|
||||
let startMetricsServerRes = startMetricsServer(app.conf.metricsServerAddress, Port(app.conf.metricsServerPort + app.conf.portsShift))
|
||||
if startMetricsServerRes.isErr():
|
||||
error "6/7 Starting metrics server failed. Continuing in current state.", error=startMetricsServerRes.error
|
||||
else:
|
||||
app.metricsServer = some(startMetricsServerRes.value)
|
||||
|
||||
if app.conf.metricsLogging:
|
||||
let startMetricsLoggingRes = startMetricsLogging()
|
||||
|
@ -767,5 +838,8 @@ proc stop*(app: App): Future[void] {.async.} =
|
|||
if app.rpcServer.isSome():
|
||||
await app.rpcServer.get().stop()
|
||||
|
||||
if app.metricsServer.isSome():
|
||||
await app.metricsServer.get().stop()
|
||||
|
||||
if not app.node.isNil():
|
||||
await app.node.stop()
|
||||
|
|
|
@ -1,45 +0,0 @@
|
|||
when (NimMajor, NimMinor) < (1, 4):
|
||||
{.push raises: [Defect].}
|
||||
else:
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
stew/results,
|
||||
stew/shims/net,
|
||||
chronicles,
|
||||
presto
|
||||
import
|
||||
../../waku/v2/waku_node,
|
||||
../../waku/v2/node/rest/server,
|
||||
../../waku/v2/node/rest/debug/handlers as debug_api,
|
||||
../../waku/v2/node/rest/relay/handlers as relay_api,
|
||||
../../waku/v2/node/rest/relay/topic_cache,
|
||||
../../waku/v2/node/rest/store/handlers as store_api,
|
||||
./config
|
||||
|
||||
|
||||
logScope:
|
||||
topics = "wakunode rest"
|
||||
|
||||
|
||||
proc startRestServer(node: WakuNode, address: ValidIpAddress, port: Port, conf: WakuNodeConf): RestServerResult[RestServerRef] =
|
||||
let server = ? newRestHttpServer(address, port)
|
||||
|
||||
## Debug REST API
|
||||
installDebugApiHandlers(server.router, node)
|
||||
|
||||
## Relay REST API
|
||||
if conf.relay:
|
||||
let relayCache = TopicCache.init(capacity=conf.restRelayCacheCapacity)
|
||||
installRelayApiHandlers(server.router, node, relayCache)
|
||||
|
||||
## Store REST API
|
||||
installStoreApiHandlers(server.router, node)
|
||||
|
||||
server.start()
|
||||
info "Starting REST HTTP server", url = "http://" & $address & ":" & $port & "/"
|
||||
|
||||
ok(server)
|
||||
|
||||
proc startRestServer*(node: WakuNode, address: ValidIpAddress, port: uint16, portsShift: uint16, conf: WakuNodeConf): RestServerResult[RestServerRef] =
|
||||
return startRestServer(node, address, Port(port + portsShift), conf)
|
|
@ -1,58 +0,0 @@
|
|||
when (NimMajor, NimMinor) < (1, 4):
|
||||
{.push raises: [Defect].}
|
||||
else:
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
stew/results,
|
||||
stew/shims/net,
|
||||
chronicles,
|
||||
json_rpc/rpcserver
|
||||
import
|
||||
../../waku/v2/node/message_cache,
|
||||
../../waku/v2/waku_node,
|
||||
../../waku/v2/node/jsonrpc/admin/handlers as admin_api,
|
||||
../../waku/v2/node/jsonrpc/debug/handlers as debug_api,
|
||||
../../waku/v2/node/jsonrpc/filter/handlers as filter_api,
|
||||
../../waku/v2/node/jsonrpc/relay/handlers as relay_api,
|
||||
../../waku/v2/node/jsonrpc/store/handlers as store_api,
|
||||
./config
|
||||
|
||||
logScope:
|
||||
topics = "wakunode jsonrpc"
|
||||
|
||||
proc startRpcServer(node: WakuNode, address: ValidIpAddress, port: Port, conf: WakuNodeConf): Result[RpcHttpServer, string] =
|
||||
let ta = initTAddress(address, port)
|
||||
|
||||
var server: RpcHttpServer
|
||||
try:
|
||||
server = newRpcHttpServer([ta])
|
||||
except CatchableError:
|
||||
return err("failed to init JSON-RPC server: " & getCurrentExceptionMsg())
|
||||
|
||||
installDebugApiHandlers(node, server)
|
||||
|
||||
# TODO: Move to setup protocols proc
|
||||
if conf.relay:
|
||||
let relayMessageCache = relay_api.MessageCache.init(capacity=30)
|
||||
installRelayApiHandlers(node, server, relayMessageCache)
|
||||
if conf.rpcPrivate:
|
||||
installRelayPrivateApiHandlers(node, server, relayMessageCache)
|
||||
|
||||
# TODO: Move to setup protocols proc
|
||||
if conf.filternode != "":
|
||||
let filterMessageCache = filter_api.MessageCache.init(capacity=30)
|
||||
installFilterApiHandlers(node, server, filterMessageCache)
|
||||
|
||||
installStoreApiHandlers(node, server)
|
||||
|
||||
if conf.rpcAdmin:
|
||||
installAdminApiHandlers(node, server)
|
||||
|
||||
server.start()
|
||||
info "RPC Server started", address=ta
|
||||
|
||||
ok(server)
|
||||
|
||||
proc startRpcServer*(node: WakuNode, address: ValidIpAddress, port: uint16, portsShift: uint16, conf: WakuNodeConf): Result[RpcHttpServer, string] =
|
||||
return startRpcServer(node, address, Port(port + portsShift), conf)
|
|
@ -4,7 +4,6 @@ else:
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
stew/shims/net,
|
||||
chronicles,
|
||||
chronos,
|
||||
metrics,
|
||||
|
@ -25,16 +24,6 @@ logScope:
|
|||
topics = "waku node metrics"
|
||||
|
||||
|
||||
proc startMetricsServer*(serverIp: ValidIpAddress, serverPort: Port) =
|
||||
info "Starting metrics HTTP server", serverIp= $serverIp, serverPort= $serverPort
|
||||
|
||||
try:
|
||||
startMetricsHttpServer($serverIp, serverPort)
|
||||
except Exception as e:
|
||||
raiseAssert("Exception while starting metrics HTTP server: " & e.msg)
|
||||
|
||||
info "Metrics HTTP server started", serverIp= $serverIp, serverPort= $serverPort
|
||||
|
||||
type
|
||||
# https://github.com/nim-lang/Nim/issues/17369
|
||||
MetricsLogger = proc(udata: pointer) {.gcsafe, raises: [Defect].}
|
||||
|
|
Loading…
Reference in New Issue