mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-05-04 22:13:07 +00:00
refactor(wakunode2): flatten and simplify app setup (#1705)
This commit is contained in:
parent
a4d22fadc3
commit
3db31473dd
@ -12,7 +12,9 @@ else:
|
|||||||
import std/[strformat, strutils, times, json, options, random]
|
import std/[strformat, strutils, times, json, options, random]
|
||||||
import confutils, chronicles, chronos, stew/shims/net as stewNet,
|
import confutils, chronicles, chronos, stew/shims/net as stewNet,
|
||||||
eth/keys, bearssl, stew/[byteutils, results],
|
eth/keys, bearssl, stew/[byteutils, results],
|
||||||
nimcrypto/pbkdf2
|
nimcrypto/pbkdf2,
|
||||||
|
metrics,
|
||||||
|
metrics/chronos_httpserver
|
||||||
import libp2p/[switch, # manage transports, a single entry point for dialing and listening
|
import libp2p/[switch, # manage transports, a single entry point for dialing and listening
|
||||||
crypto/crypto, # cryptographic functions
|
crypto/crypto, # cryptographic functions
|
||||||
stream/connection, # create and close stream read / write connections
|
stream/connection, # create and close stream read / write connections
|
||||||
@ -205,6 +207,24 @@ proc readNick(transp: StreamTransport): Future[string] {.async.} =
|
|||||||
stdout.flushFile()
|
stdout.flushFile()
|
||||||
return await transp.readLine()
|
return await transp.readLine()
|
||||||
|
|
||||||
|
|
||||||
|
proc startMetricsServer(serverIp: ValidIpAddress, serverPort: Port): Result[MetricsHttpServerRef, string] =
|
||||||
|
info "Starting metrics HTTP server", serverIp= $serverIp, serverPort= $serverPort
|
||||||
|
|
||||||
|
let metricsServerRes = MetricsHttpServerRef.new($serverIp, serverPort)
|
||||||
|
if metricsServerRes.isErr():
|
||||||
|
return err("metrics HTTP server start failed: " & $metricsServerRes.error)
|
||||||
|
|
||||||
|
let server = metricsServerRes.value
|
||||||
|
try:
|
||||||
|
waitFor server.start()
|
||||||
|
except CatchableError:
|
||||||
|
return err("metrics HTTP server start failed: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
info "Metrics HTTP server started", serverIp= $serverIp, serverPort= $serverPort
|
||||||
|
ok(metricsServerRes.value)
|
||||||
|
|
||||||
|
|
||||||
proc publish(c: Chat, line: string) =
|
proc publish(c: Chat, line: string) =
|
||||||
# First create a Chat2Message protobuf with this line of text
|
# First create a Chat2Message protobuf with this line of text
|
||||||
let time = getTime().toUnix()
|
let time = getTime().toUnix()
|
||||||
@ -604,8 +624,10 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
|||||||
startMetricsLog()
|
startMetricsLog()
|
||||||
|
|
||||||
if conf.metricsServer:
|
if conf.metricsServer:
|
||||||
startMetricsServer(conf.metricsServerAddress,
|
let metricsServer = startMetricsServer(
|
||||||
Port(conf.metricsServerPort + conf.portsShift))
|
conf.metricsServerAddress,
|
||||||
|
Port(conf.metricsServerPort + conf.portsShift)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
await chat.readWriteLoop()
|
await chat.readWriteLoop()
|
||||||
|
|||||||
@ -15,7 +15,9 @@ import
|
|||||||
eth/keys,
|
eth/keys,
|
||||||
eth/net/nat,
|
eth/net/nat,
|
||||||
json_rpc/rpcserver,
|
json_rpc/rpcserver,
|
||||||
presto
|
presto,
|
||||||
|
metrics,
|
||||||
|
metrics/chronos_httpserver
|
||||||
import
|
import
|
||||||
../../waku/common/sqlite,
|
../../waku/common/sqlite,
|
||||||
../../waku/v2/waku_core,
|
../../waku/v2/waku_core,
|
||||||
@ -40,6 +42,18 @@ import
|
|||||||
../../waku/v2/waku_lightpush,
|
../../waku/v2/waku_lightpush,
|
||||||
../../waku/v2/waku_filter,
|
../../waku/v2/waku_filter,
|
||||||
./config
|
./config
|
||||||
|
import
|
||||||
|
../../waku/v2/node/message_cache,
|
||||||
|
../../waku/v2/node/rest/server,
|
||||||
|
../../waku/v2/node/rest/debug/handlers as rest_debug_api,
|
||||||
|
../../waku/v2/node/rest/relay/handlers as rest_relay_api,
|
||||||
|
../../waku/v2/node/rest/relay/topic_cache,
|
||||||
|
../../waku/v2/node/rest/store/handlers as rest_store_api,
|
||||||
|
../../waku/v2/node/jsonrpc/admin/handlers as rpc_admin_api,
|
||||||
|
../../waku/v2/node/jsonrpc/debug/handlers as rpc_debug_api,
|
||||||
|
../../waku/v2/node/jsonrpc/filter/handlers as rpc_filter_api,
|
||||||
|
../../waku/v2/node/jsonrpc/relay/handlers as rpc_relay_api,
|
||||||
|
../../waku/v2/node/jsonrpc/store/handlers as rpc_store_api
|
||||||
|
|
||||||
when defined(rln):
|
when defined(rln):
|
||||||
import ../../waku/v2/waku_rln_relay
|
import ../../waku/v2/waku_rln_relay
|
||||||
@ -66,6 +80,7 @@ type
|
|||||||
|
|
||||||
rpcServer: Option[RpcHttpServer]
|
rpcServer: Option[RpcHttpServer]
|
||||||
restServer: Option[RestServerRef]
|
restServer: Option[RestServerRef]
|
||||||
|
metricsServer: Option[MetricsHttpServerRef]
|
||||||
|
|
||||||
AppResult*[T] = Result[T, string]
|
AppResult*[T] = Result[T, string]
|
||||||
|
|
||||||
@ -713,17 +728,71 @@ proc startNode*(app: App): Future[AppResult[void]] {.async.} =
|
|||||||
|
|
||||||
## Monitoring and external interfaces
|
## Monitoring and external interfaces
|
||||||
|
|
||||||
# TODO: Merge the `wakunode_setup_*.nim` files here. Once the encapsulating
|
proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNodeConf): AppResult[RestServerRef] =
|
||||||
# type (e.g., App) is implemented. Hold both servers instances to support
|
let server = ? newRestHttpServer(address, port)
|
||||||
# a graceful shutdown.
|
|
||||||
import
|
|
||||||
./wakunode2_setup_rpc,
|
|
||||||
./wakunode2_setup_rest
|
|
||||||
|
|
||||||
|
## Debug REST API
|
||||||
|
installDebugApiHandlers(server.router, app.node)
|
||||||
|
|
||||||
proc startMetricsServer(node: WakuNode, address: ValidIpAddress, port: uint16, portsShift: uint16): AppResult[void] =
|
## Relay REST API
|
||||||
startMetricsServer(address, Port(port + portsShift))
|
if conf.relay:
|
||||||
ok()
|
let relayCache = TopicCache.init(capacity=conf.restRelayCacheCapacity)
|
||||||
|
installRelayApiHandlers(server.router, app.node, relayCache)
|
||||||
|
|
||||||
|
## Store REST API
|
||||||
|
installStoreApiHandlers(server.router, app.node)
|
||||||
|
|
||||||
|
server.start()
|
||||||
|
info "Starting REST HTTP server", url = "http://" & $address & ":" & $port & "/"
|
||||||
|
|
||||||
|
ok(server)
|
||||||
|
|
||||||
|
proc startRpcServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNodeConf): AppResult[RpcHttpServer] =
|
||||||
|
let ta = initTAddress(address, port)
|
||||||
|
|
||||||
|
var server: RpcHttpServer
|
||||||
|
try:
|
||||||
|
server = newRpcHttpServer([ta])
|
||||||
|
except CatchableError:
|
||||||
|
return err("failed to init JSON-RPC server: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
installDebugApiHandlers(app.node, server)
|
||||||
|
|
||||||
|
if conf.relay:
|
||||||
|
let relayMessageCache = rpc_relay_api.MessageCache.init(capacity=30)
|
||||||
|
installRelayApiHandlers(app.node, server, relayMessageCache)
|
||||||
|
if conf.rpcPrivate:
|
||||||
|
installRelayPrivateApiHandlers(app.node, server, relayMessageCache)
|
||||||
|
|
||||||
|
if conf.filternode != "":
|
||||||
|
let filterMessageCache = rpc_filter_api.MessageCache.init(capacity=30)
|
||||||
|
installFilterApiHandlers(app.node, server, filterMessageCache)
|
||||||
|
|
||||||
|
installStoreApiHandlers(app.node, server)
|
||||||
|
|
||||||
|
if conf.rpcAdmin:
|
||||||
|
installAdminApiHandlers(app.node, server)
|
||||||
|
|
||||||
|
server.start()
|
||||||
|
info "RPC Server started", address=ta
|
||||||
|
|
||||||
|
ok(server)
|
||||||
|
|
||||||
|
proc startMetricsServer(serverIp: ValidIpAddress, serverPort: Port): AppResult[MetricsHttpServerRef] =
|
||||||
|
info "Starting metrics HTTP server", serverIp= $serverIp, serverPort= $serverPort
|
||||||
|
|
||||||
|
let metricsServerRes = MetricsHttpServerRef.new($serverIp, serverPort)
|
||||||
|
if metricsServerRes.isErr():
|
||||||
|
return err("metrics HTTP server start failed: " & $metricsServerRes.error)
|
||||||
|
|
||||||
|
let server = metricsServerRes.value
|
||||||
|
try:
|
||||||
|
waitFor server.start()
|
||||||
|
except CatchableError:
|
||||||
|
return err("metrics HTTP server start failed: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
info "Metrics HTTP server started", serverIp= $serverIp, serverPort= $serverPort
|
||||||
|
ok(server)
|
||||||
|
|
||||||
proc startMetricsLogging(): AppResult[void] =
|
proc startMetricsLogging(): AppResult[void] =
|
||||||
startMetricsLog()
|
startMetricsLog()
|
||||||
@ -731,14 +800,14 @@ proc startMetricsLogging(): AppResult[void] =
|
|||||||
|
|
||||||
proc setupMonitoringAndExternalInterfaces*(app: var App): AppResult[void] =
|
proc setupMonitoringAndExternalInterfaces*(app: var App): AppResult[void] =
|
||||||
if app.conf.rpc:
|
if app.conf.rpc:
|
||||||
let startRpcServerRes = startRpcServer(app.node, app.conf.rpcAddress, app.conf.rpcPort, app.conf.portsShift, app.conf)
|
let startRpcServerRes = startRpcServer(app, app.conf.rpcAddress, Port(app.conf.rpcPort + app.conf.portsShift), app.conf)
|
||||||
if startRpcServerRes.isErr():
|
if startRpcServerRes.isErr():
|
||||||
error "6/7 Starting JSON-RPC server failed. Continuing in current state.", error=startRpcServerRes.error
|
error "6/7 Starting JSON-RPC server failed. Continuing in current state.", error=startRpcServerRes.error
|
||||||
else:
|
else:
|
||||||
app.rpcServer = some(startRpcServerRes.value)
|
app.rpcServer = some(startRpcServerRes.value)
|
||||||
|
|
||||||
if app.conf.rest:
|
if app.conf.rest:
|
||||||
let startRestServerRes = startRestServer(app.node, app.conf.restAddress, app.conf.restPort, app.conf.portsShift, app.conf)
|
let startRestServerRes = startRestServer(app, app.conf.restAddress, Port(app.conf.restPort + app.conf.portsShift), app.conf)
|
||||||
if startRestServerRes.isErr():
|
if startRestServerRes.isErr():
|
||||||
error "6/7 Starting REST server failed. Continuing in current state.", error=startRestServerRes.error
|
error "6/7 Starting REST server failed. Continuing in current state.", error=startRestServerRes.error
|
||||||
else:
|
else:
|
||||||
@ -746,9 +815,11 @@ proc setupMonitoringAndExternalInterfaces*(app: var App): AppResult[void] =
|
|||||||
|
|
||||||
|
|
||||||
if app.conf.metricsServer:
|
if app.conf.metricsServer:
|
||||||
let startMetricsServerRes = startMetricsServer(app.node, app.conf.metricsServerAddress, app.conf.metricsServerPort, app.conf.portsShift)
|
let startMetricsServerRes = startMetricsServer(app.conf.metricsServerAddress, Port(app.conf.metricsServerPort + app.conf.portsShift))
|
||||||
if startMetricsServerRes.isErr():
|
if startMetricsServerRes.isErr():
|
||||||
error "6/7 Starting metrics server failed. Continuing in current state.", error=startMetricsServerRes.error
|
error "6/7 Starting metrics server failed. Continuing in current state.", error=startMetricsServerRes.error
|
||||||
|
else:
|
||||||
|
app.metricsServer = some(startMetricsServerRes.value)
|
||||||
|
|
||||||
if app.conf.metricsLogging:
|
if app.conf.metricsLogging:
|
||||||
let startMetricsLoggingRes = startMetricsLogging()
|
let startMetricsLoggingRes = startMetricsLogging()
|
||||||
@ -767,5 +838,8 @@ proc stop*(app: App): Future[void] {.async.} =
|
|||||||
if app.rpcServer.isSome():
|
if app.rpcServer.isSome():
|
||||||
await app.rpcServer.get().stop()
|
await app.rpcServer.get().stop()
|
||||||
|
|
||||||
|
if app.metricsServer.isSome():
|
||||||
|
await app.metricsServer.get().stop()
|
||||||
|
|
||||||
if not app.node.isNil():
|
if not app.node.isNil():
|
||||||
await app.node.stop()
|
await app.node.stop()
|
||||||
|
|||||||
@ -1,45 +0,0 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
|
||||||
stew/results,
|
|
||||||
stew/shims/net,
|
|
||||||
chronicles,
|
|
||||||
presto
|
|
||||||
import
|
|
||||||
../../waku/v2/waku_node,
|
|
||||||
../../waku/v2/node/rest/server,
|
|
||||||
../../waku/v2/node/rest/debug/handlers as debug_api,
|
|
||||||
../../waku/v2/node/rest/relay/handlers as relay_api,
|
|
||||||
../../waku/v2/node/rest/relay/topic_cache,
|
|
||||||
../../waku/v2/node/rest/store/handlers as store_api,
|
|
||||||
./config
|
|
||||||
|
|
||||||
|
|
||||||
logScope:
|
|
||||||
topics = "wakunode rest"
|
|
||||||
|
|
||||||
|
|
||||||
proc startRestServer(node: WakuNode, address: ValidIpAddress, port: Port, conf: WakuNodeConf): RestServerResult[RestServerRef] =
|
|
||||||
let server = ? newRestHttpServer(address, port)
|
|
||||||
|
|
||||||
## Debug REST API
|
|
||||||
installDebugApiHandlers(server.router, node)
|
|
||||||
|
|
||||||
## Relay REST API
|
|
||||||
if conf.relay:
|
|
||||||
let relayCache = TopicCache.init(capacity=conf.restRelayCacheCapacity)
|
|
||||||
installRelayApiHandlers(server.router, node, relayCache)
|
|
||||||
|
|
||||||
## Store REST API
|
|
||||||
installStoreApiHandlers(server.router, node)
|
|
||||||
|
|
||||||
server.start()
|
|
||||||
info "Starting REST HTTP server", url = "http://" & $address & ":" & $port & "/"
|
|
||||||
|
|
||||||
ok(server)
|
|
||||||
|
|
||||||
proc startRestServer*(node: WakuNode, address: ValidIpAddress, port: uint16, portsShift: uint16, conf: WakuNodeConf): RestServerResult[RestServerRef] =
|
|
||||||
return startRestServer(node, address, Port(port + portsShift), conf)
|
|
||||||
@ -1,58 +0,0 @@
|
|||||||
when (NimMajor, NimMinor) < (1, 4):
|
|
||||||
{.push raises: [Defect].}
|
|
||||||
else:
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
import
|
|
||||||
stew/results,
|
|
||||||
stew/shims/net,
|
|
||||||
chronicles,
|
|
||||||
json_rpc/rpcserver
|
|
||||||
import
|
|
||||||
../../waku/v2/node/message_cache,
|
|
||||||
../../waku/v2/waku_node,
|
|
||||||
../../waku/v2/node/jsonrpc/admin/handlers as admin_api,
|
|
||||||
../../waku/v2/node/jsonrpc/debug/handlers as debug_api,
|
|
||||||
../../waku/v2/node/jsonrpc/filter/handlers as filter_api,
|
|
||||||
../../waku/v2/node/jsonrpc/relay/handlers as relay_api,
|
|
||||||
../../waku/v2/node/jsonrpc/store/handlers as store_api,
|
|
||||||
./config
|
|
||||||
|
|
||||||
logScope:
|
|
||||||
topics = "wakunode jsonrpc"
|
|
||||||
|
|
||||||
proc startRpcServer(node: WakuNode, address: ValidIpAddress, port: Port, conf: WakuNodeConf): Result[RpcHttpServer, string] =
|
|
||||||
let ta = initTAddress(address, port)
|
|
||||||
|
|
||||||
var server: RpcHttpServer
|
|
||||||
try:
|
|
||||||
server = newRpcHttpServer([ta])
|
|
||||||
except CatchableError:
|
|
||||||
return err("failed to init JSON-RPC server: " & getCurrentExceptionMsg())
|
|
||||||
|
|
||||||
installDebugApiHandlers(node, server)
|
|
||||||
|
|
||||||
# TODO: Move to setup protocols proc
|
|
||||||
if conf.relay:
|
|
||||||
let relayMessageCache = relay_api.MessageCache.init(capacity=30)
|
|
||||||
installRelayApiHandlers(node, server, relayMessageCache)
|
|
||||||
if conf.rpcPrivate:
|
|
||||||
installRelayPrivateApiHandlers(node, server, relayMessageCache)
|
|
||||||
|
|
||||||
# TODO: Move to setup protocols proc
|
|
||||||
if conf.filternode != "":
|
|
||||||
let filterMessageCache = filter_api.MessageCache.init(capacity=30)
|
|
||||||
installFilterApiHandlers(node, server, filterMessageCache)
|
|
||||||
|
|
||||||
installStoreApiHandlers(node, server)
|
|
||||||
|
|
||||||
if conf.rpcAdmin:
|
|
||||||
installAdminApiHandlers(node, server)
|
|
||||||
|
|
||||||
server.start()
|
|
||||||
info "RPC Server started", address=ta
|
|
||||||
|
|
||||||
ok(server)
|
|
||||||
|
|
||||||
proc startRpcServer*(node: WakuNode, address: ValidIpAddress, port: uint16, portsShift: uint16, conf: WakuNodeConf): Result[RpcHttpServer, string] =
|
|
||||||
return startRpcServer(node, address, Port(port + portsShift), conf)
|
|
||||||
@ -4,7 +4,6 @@ else:
|
|||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
import
|
import
|
||||||
stew/shims/net,
|
|
||||||
chronicles,
|
chronicles,
|
||||||
chronos,
|
chronos,
|
||||||
metrics,
|
metrics,
|
||||||
@ -25,16 +24,6 @@ logScope:
|
|||||||
topics = "waku node metrics"
|
topics = "waku node metrics"
|
||||||
|
|
||||||
|
|
||||||
proc startMetricsServer*(serverIp: ValidIpAddress, serverPort: Port) =
|
|
||||||
info "Starting metrics HTTP server", serverIp= $serverIp, serverPort= $serverPort
|
|
||||||
|
|
||||||
try:
|
|
||||||
startMetricsHttpServer($serverIp, serverPort)
|
|
||||||
except Exception as e:
|
|
||||||
raiseAssert("Exception while starting metrics HTTP server: " & e.msg)
|
|
||||||
|
|
||||||
info "Metrics HTTP server started", serverIp= $serverIp, serverPort= $serverPort
|
|
||||||
|
|
||||||
type
|
type
|
||||||
# https://github.com/nim-lang/Nim/issues/17369
|
# https://github.com/nim-lang/Nim/issues/17369
|
||||||
MetricsLogger = proc(udata: pointer) {.gcsafe, raises: [Defect].}
|
MetricsLogger = proc(udata: pointer) {.gcsafe, raises: [Defect].}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user