mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-03 14:33:12 +00:00
refactor: addition of waku_api/rest/builder.nim and reduce app.nim (#2623)
This commit is contained in:
parent
34aa2c372d
commit
963d79aee7
@ -19,7 +19,8 @@ import
|
||||
../../waku/factory/external_config,
|
||||
../../waku/factory/networks_config,
|
||||
../../waku/factory/app,
|
||||
../../waku/node/health_monitor
|
||||
../../waku/node/health_monitor,
|
||||
../../waku/waku_api/rest/builder as rest_server_builder
|
||||
|
||||
logScope:
|
||||
topics = "wakunode main"
|
||||
@ -127,25 +128,31 @@ when isMainModule:
|
||||
nodeHealthMonitor = WakuNodeHealthMonitor()
|
||||
nodeHealthMonitor.setOverallHealth(HealthStatus.INITIALIZING)
|
||||
|
||||
let restServerRes = startRestServerEsentials(nodeHealthMonitor, conf)
|
||||
if restServerRes.isErr():
|
||||
error "Starting REST server failed.", error = $restServerRes.error()
|
||||
let restServer = rest_server_builder.startRestServerEsentials(
|
||||
nodeHealthMonitor, conf
|
||||
).valueOr:
|
||||
error "Starting esential REST server failed.", error = $error
|
||||
quit(QuitFailure)
|
||||
|
||||
var wakunode2 = App.init(conf).valueOr:
|
||||
error "App initialization failed", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
wakunode2.restServer = restServer
|
||||
|
||||
nodeHealthMonitor.setNode(wakunode2.node)
|
||||
|
||||
wakunode2.startApp().isOkOr:
|
||||
error "Starting app failed", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
if conf.rest and not restServerRes.isErr():
|
||||
wakunode2.restServer = restServerRes.value
|
||||
rest_server_builder.startRestServerProtocolSupport(
|
||||
restServer, wakunode2.node, wakunode2.wakuDiscv5, conf
|
||||
).isOkOr:
|
||||
error "Starting protocols support REST server failed.", error = $error
|
||||
quit(QuitFailure)
|
||||
|
||||
wakunode2.setupMonitoringAndExternalInterfaces().isOkOr:
|
||||
wakunode2.startMetricsServerAndLogging().isOkOr:
|
||||
error "Starting monitoring and external interfaces failed", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
|
||||
@ -51,7 +51,7 @@ suite "Wakunode2 - App initialization":
|
||||
wakunode2.startApp().isOkOr:
|
||||
raiseAssert error
|
||||
|
||||
let mountRes = wakunode2.setupMonitoringAndExternalInterfaces()
|
||||
let mountRes = wakunode2.startMetricsServerAndLogging()
|
||||
assert mountRes.isOk(), mountRes.error
|
||||
|
||||
## Then
|
||||
|
||||
@ -58,12 +58,12 @@ type
|
||||
rng: ref HmacDrbgContext
|
||||
key: crypto.PrivateKey
|
||||
|
||||
wakuDiscv5: Option[WakuDiscoveryV5]
|
||||
wakuDiscv5*: Option[WakuDiscoveryV5]
|
||||
dynamicBootstrapNodes: seq[RemotePeerInfo]
|
||||
|
||||
node: WakuNode
|
||||
|
||||
restServer*: Option[WakuRestServerRef]
|
||||
restServer*: WakuRestServerRef
|
||||
metricsServer: Option[MetricsHttpServerRef]
|
||||
|
||||
AppResult*[T] = Result[T, string]
|
||||
@ -272,172 +272,6 @@ proc startApp*(app: var App): AppResult[void] =
|
||||
|
||||
return ok()
|
||||
|
||||
## Monitoring and external interfaces
|
||||
|
||||
# Used to register api endpoints that are not currently installed as keys,
|
||||
# values are holding error messages to be returned to the client
|
||||
# NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it
|
||||
# It will always be called from main thread anyway.
|
||||
# Ref: https://nim-lang.org/docs/manual.html#threads-gc-safety
|
||||
var restServerNotInstalledTab {.threadvar.}: TableRef[string, string]
|
||||
restServerNotInstalledTab = newTable[string, string]()
|
||||
|
||||
proc startRestServerEsentials*(
|
||||
nodeHealthMonitor: WakuNodeHealthMonitor, conf: WakuNodeConf
|
||||
): AppResult[Option[WakuRestServerRef]] =
|
||||
if not conf.rest:
|
||||
return ok(none(WakuRestServerRef))
|
||||
|
||||
let requestErrorHandler: RestRequestErrorHandler = proc(
|
||||
error: RestRequestError, request: HttpRequestRef
|
||||
): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} =
|
||||
try:
|
||||
case error
|
||||
of RestRequestError.Invalid:
|
||||
return await request.respond(Http400, "Invalid request", HttpTable.init())
|
||||
of RestRequestError.NotFound:
|
||||
let paths = request.rawPath.split("/")
|
||||
let rootPath =
|
||||
if len(paths) > 1:
|
||||
paths[1]
|
||||
else:
|
||||
""
|
||||
restServerNotInstalledTab[].withValue(rootPath, errMsg):
|
||||
return await request.respond(Http404, errMsg[], HttpTable.init())
|
||||
do:
|
||||
return await request.respond(
|
||||
Http400,
|
||||
"Bad request initiated. Invalid path or method used.",
|
||||
HttpTable.init(),
|
||||
)
|
||||
of RestRequestError.InvalidContentBody:
|
||||
return await request.respond(Http400, "Invalid content body", HttpTable.init())
|
||||
of RestRequestError.InvalidContentType:
|
||||
return await request.respond(Http400, "Invalid content type", HttpTable.init())
|
||||
of RestRequestError.Unexpected:
|
||||
return defaultResponse()
|
||||
except HttpWriteError:
|
||||
error "Failed to write response to client", error = getCurrentExceptionMsg()
|
||||
discard
|
||||
|
||||
return defaultResponse()
|
||||
|
||||
let allowedOrigin =
|
||||
if len(conf.restAllowOrigin) > 0:
|
||||
some(conf.restAllowOrigin.join(","))
|
||||
else:
|
||||
none(string)
|
||||
|
||||
let address = conf.restAddress
|
||||
let port = Port(conf.restPort + conf.portsShift)
|
||||
let server =
|
||||
?newRestHttpServer(
|
||||
address,
|
||||
port,
|
||||
allowedOrigin = allowedOrigin,
|
||||
requestErrorHandler = requestErrorHandler,
|
||||
)
|
||||
|
||||
## Health REST API
|
||||
installHealthApiHandler(server.router, nodeHealthMonitor)
|
||||
|
||||
restServerNotInstalledTab["admin"] =
|
||||
"/admin endpoints are not available while initializing."
|
||||
restServerNotInstalledTab["debug"] =
|
||||
"/debug endpoints are not available while initializing."
|
||||
restServerNotInstalledTab["relay"] =
|
||||
"/relay endpoints are not available while initializing."
|
||||
restServerNotInstalledTab["filter"] =
|
||||
"/filter endpoints are not available while initializing."
|
||||
restServerNotInstalledTab["lightpush"] =
|
||||
"/lightpush endpoints are not available while initializing."
|
||||
restServerNotInstalledTab["store"] =
|
||||
"/store endpoints are not available while initializing."
|
||||
|
||||
server.start()
|
||||
info "Starting REST HTTP server", url = "http://" & $address & ":" & $port & "/"
|
||||
|
||||
ok(some(server))
|
||||
|
||||
proc startRestServerProtocolSupport(app: var App): AppResult[void] =
|
||||
if not app.conf.rest or app.restServer.isNone():
|
||||
## Maybe we don't need rest server at all, so it is ok.
|
||||
return ok()
|
||||
|
||||
var router = app.restServer.get().router
|
||||
## Admin REST API
|
||||
if app.conf.restAdmin:
|
||||
installAdminApiHandlers(router, app.node)
|
||||
else:
|
||||
restServerNotInstalledTab["admin"] =
|
||||
"/admin endpoints are not available. Please check your configuration: --rest-admin=true"
|
||||
|
||||
## Debug REST API
|
||||
installDebugApiHandlers(router, app.node)
|
||||
|
||||
## Relay REST API
|
||||
if app.conf.relay:
|
||||
let cache = MessageCache.init(int(app.conf.restRelayCacheCapacity))
|
||||
|
||||
let handler = messageCacheHandler(cache)
|
||||
|
||||
for pubsubTopic in app.conf.pubsubTopics:
|
||||
cache.pubsubSubscribe(pubsubTopic)
|
||||
app.node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))
|
||||
|
||||
for contentTopic in app.conf.contentTopics:
|
||||
cache.contentSubscribe(contentTopic)
|
||||
app.node.subscribe((kind: ContentSub, topic: contentTopic), some(handler))
|
||||
|
||||
installRelayApiHandlers(router, app.node, cache)
|
||||
else:
|
||||
restServerNotInstalledTab["relay"] =
|
||||
"/relay endpoints are not available. Please check your configuration: --relay"
|
||||
|
||||
## Filter REST API
|
||||
if app.conf.filternode != "" and app.node.wakuFilterClient != nil:
|
||||
let filterCache = MessageCache.init()
|
||||
|
||||
let filterDiscoHandler =
|
||||
if app.wakuDiscv5.isSome():
|
||||
some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Filter))
|
||||
else:
|
||||
none(DiscoveryHandler)
|
||||
|
||||
rest_filter_api.installFilterRestApiHandlers(
|
||||
router, app.node, filterCache, filterDiscoHandler
|
||||
)
|
||||
else:
|
||||
restServerNotInstalledTab["filter"] =
|
||||
"/filter endpoints are not available. Please check your configuration: --filternode"
|
||||
|
||||
## Store REST API
|
||||
let storeDiscoHandler =
|
||||
if app.wakuDiscv5.isSome():
|
||||
some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Store))
|
||||
else:
|
||||
none(DiscoveryHandler)
|
||||
|
||||
installStoreApiHandlers(router, app.node, storeDiscoHandler)
|
||||
|
||||
## Light push API
|
||||
if app.conf.lightpushnode != "" and app.node.wakuLightpushClient != nil:
|
||||
let lightDiscoHandler =
|
||||
if app.wakuDiscv5.isSome():
|
||||
some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Lightpush))
|
||||
else:
|
||||
none(DiscoveryHandler)
|
||||
|
||||
rest_lightpush_api.installLightPushRequestHandler(
|
||||
router, app.node, lightDiscoHandler
|
||||
)
|
||||
else:
|
||||
restServerNotInstalledTab["lightpush"] =
|
||||
"/lightpush endpoints are not available. Please check your configuration: --lightpushnode"
|
||||
|
||||
info "REST services are installed"
|
||||
ok()
|
||||
|
||||
proc startMetricsServer(
|
||||
serverIp: IpAddress, serverPort: Port
|
||||
): AppResult[MetricsHttpServerRef] =
|
||||
@ -460,13 +294,7 @@ proc startMetricsLogging(): AppResult[void] =
|
||||
startMetricsLog()
|
||||
ok()
|
||||
|
||||
proc setupMonitoringAndExternalInterfaces*(app: var App): AppResult[void] =
|
||||
if app.conf.rest and app.restServer.isSome():
|
||||
let restProtocolSupportRes = startRestServerProtocolSupport(app)
|
||||
if restProtocolSupportRes.isErr():
|
||||
error "Starting REST server protocol support failed. Continuing in current state.",
|
||||
error = restProtocolSupportRes.error
|
||||
|
||||
proc startMetricsServerAndLogging*(app: var App): AppResult[void] =
|
||||
if app.conf.metricsServer:
|
||||
let startMetricsServerRes = startMetricsServer(
|
||||
app.conf.metricsServerAddress,
|
||||
@ -489,8 +317,8 @@ proc setupMonitoringAndExternalInterfaces*(app: var App): AppResult[void] =
|
||||
# App shutdown
|
||||
|
||||
proc stop*(app: App): Future[void] {.async: (raises: [Exception]).} =
|
||||
if app.restServer.isSome():
|
||||
await app.restServer.get().stop()
|
||||
if app.conf.rest:
|
||||
await app.restServer.stop()
|
||||
|
||||
if app.metricsServer.isSome():
|
||||
await app.metricsServer.get().stop()
|
||||
|
||||
191
waku/waku_api/rest/builder.nim
Normal file
191
waku/waku_api/rest/builder.nim
Normal file
@ -0,0 +1,191 @@
|
||||
when (NimMajor, NimMinor) < (1, 4):
|
||||
{.push raises: [Defect].}
|
||||
else:
|
||||
{.push raises: [].}
|
||||
|
||||
import net, tables
|
||||
import presto
|
||||
import
|
||||
../../waku/common/utils/nat,
|
||||
../../waku/waku_node,
|
||||
../../waku/discovery/waku_discv5,
|
||||
../../waku/factory/external_config,
|
||||
../../waku/waku_api/message_cache,
|
||||
../../waku/waku_api/handlers,
|
||||
../../waku/waku_api/rest/server,
|
||||
../../waku/waku_api/rest/debug/handlers as rest_debug_api,
|
||||
../../waku/waku_api/rest/relay/handlers as rest_relay_api,
|
||||
../../waku/waku_api/rest/filter/handlers as rest_filter_api,
|
||||
../../waku/waku_api/rest/lightpush/handlers as rest_lightpush_api,
|
||||
../../waku/waku_api/rest/store/handlers as rest_store_api,
|
||||
../../waku/waku_api/rest/health/handlers as rest_health_api,
|
||||
../../waku/waku_api/rest/admin/handlers as rest_admin_api,
|
||||
../../waku/waku_core/topics
|
||||
|
||||
## Monitoring and external interfaces
|
||||
|
||||
# Used to register api endpoints that are not currently installed as keys,
|
||||
# values are holding error messages to be returned to the client
|
||||
# NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it
|
||||
# It will always be called from main thread anyway.
|
||||
# Ref: https://nim-lang.org/docs/manual.html#threads-gc-safety
|
||||
var restServerNotInstalledTab {.threadvar.}: TableRef[string, string]
|
||||
restServerNotInstalledTab = newTable[string, string]()
|
||||
|
||||
proc startRestServerEsentials*(
|
||||
nodeHealthMonitor: WakuNodeHealthMonitor, conf: WakuNodeConf
|
||||
): Result[WakuRestServerRef, string] =
|
||||
if not conf.rest:
|
||||
return
|
||||
|
||||
let requestErrorHandler: RestRequestErrorHandler = proc(
|
||||
error: RestRequestError, request: HttpRequestRef
|
||||
): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} =
|
||||
try:
|
||||
case error
|
||||
of RestRequestError.Invalid:
|
||||
return await request.respond(Http400, "Invalid request", HttpTable.init())
|
||||
of RestRequestError.NotFound:
|
||||
let paths = request.rawPath.split("/")
|
||||
let rootPath =
|
||||
if len(paths) > 1:
|
||||
paths[1]
|
||||
else:
|
||||
""
|
||||
restServerNotInstalledTab[].withValue(rootPath, errMsg):
|
||||
return await request.respond(Http404, errMsg[], HttpTable.init())
|
||||
do:
|
||||
return await request.respond(
|
||||
Http400,
|
||||
"Bad request initiated. Invalid path or method used.",
|
||||
HttpTable.init(),
|
||||
)
|
||||
of RestRequestError.InvalidContentBody:
|
||||
return await request.respond(Http400, "Invalid content body", HttpTable.init())
|
||||
of RestRequestError.InvalidContentType:
|
||||
return await request.respond(Http400, "Invalid content type", HttpTable.init())
|
||||
of RestRequestError.Unexpected:
|
||||
return defaultResponse()
|
||||
except HttpWriteError:
|
||||
error "Failed to write response to client", error = getCurrentExceptionMsg()
|
||||
discard
|
||||
|
||||
return defaultResponse()
|
||||
|
||||
let allowedOrigin =
|
||||
if len(conf.restAllowOrigin) > 0:
|
||||
some(conf.restAllowOrigin.join(","))
|
||||
else:
|
||||
none(string)
|
||||
|
||||
let address = conf.restAddress
|
||||
let port = Port(conf.restPort + conf.portsShift)
|
||||
let server =
|
||||
?newRestHttpServer(
|
||||
address,
|
||||
port,
|
||||
allowedOrigin = allowedOrigin,
|
||||
requestErrorHandler = requestErrorHandler,
|
||||
)
|
||||
|
||||
## Health REST API
|
||||
installHealthApiHandler(server.router, nodeHealthMonitor)
|
||||
|
||||
restServerNotInstalledTab["admin"] =
|
||||
"/admin endpoints are not available while initializing."
|
||||
restServerNotInstalledTab["debug"] =
|
||||
"/debug endpoints are not available while initializing."
|
||||
restServerNotInstalledTab["relay"] =
|
||||
"/relay endpoints are not available while initializing."
|
||||
restServerNotInstalledTab["filter"] =
|
||||
"/filter endpoints are not available while initializing."
|
||||
restServerNotInstalledTab["lightpush"] =
|
||||
"/lightpush endpoints are not available while initializing."
|
||||
restServerNotInstalledTab["store"] =
|
||||
"/store endpoints are not available while initializing."
|
||||
|
||||
server.start()
|
||||
info "Starting REST HTTP server", url = "http://" & $address & ":" & $port & "/"
|
||||
|
||||
ok(server)
|
||||
|
||||
proc startRestServerProtocolSupport*(
|
||||
restServer: WakuRestServerRef,
|
||||
node: WakuNode,
|
||||
wakuDiscv5: Option[WakuDiscoveryV5],
|
||||
conf: WakuNodeConf,
|
||||
): Result[void, string] =
|
||||
if not conf.rest:
|
||||
return
|
||||
|
||||
var router = restServer.router
|
||||
## Admin REST API
|
||||
if conf.restAdmin:
|
||||
installAdminApiHandlers(router, node)
|
||||
else:
|
||||
restServerNotInstalledTab["admin"] =
|
||||
"/admin endpoints are not available. Please check your configuration: --rest-admin=true"
|
||||
|
||||
## Debug REST API
|
||||
installDebugApiHandlers(router, node)
|
||||
|
||||
## Relay REST API
|
||||
if conf.relay:
|
||||
let cache = MessageCache.init(int(conf.restRelayCacheCapacity))
|
||||
|
||||
let handler = messageCacheHandler(cache)
|
||||
|
||||
for pubsubTopic in conf.pubsubTopics:
|
||||
cache.pubsubSubscribe(pubsubTopic)
|
||||
node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))
|
||||
|
||||
for contentTopic in conf.contentTopics:
|
||||
cache.contentSubscribe(contentTopic)
|
||||
node.subscribe((kind: ContentSub, topic: contentTopic), some(handler))
|
||||
|
||||
installRelayApiHandlers(router, node, cache)
|
||||
else:
|
||||
restServerNotInstalledTab["relay"] =
|
||||
"/relay endpoints are not available. Please check your configuration: --relay"
|
||||
|
||||
## Filter REST API
|
||||
if conf.filternode != "" and node.wakuFilterClient != nil:
|
||||
let filterCache = MessageCache.init()
|
||||
|
||||
let filterDiscoHandler =
|
||||
if wakuDiscv5.isSome():
|
||||
some(defaultDiscoveryHandler(wakuDiscv5.get(), Filter))
|
||||
else:
|
||||
none(DiscoveryHandler)
|
||||
|
||||
rest_filter_api.installFilterRestApiHandlers(
|
||||
router, node, filterCache, filterDiscoHandler
|
||||
)
|
||||
else:
|
||||
restServerNotInstalledTab["filter"] =
|
||||
"/filter endpoints are not available. Please check your configuration: --filternode"
|
||||
|
||||
## Store REST API
|
||||
let storeDiscoHandler =
|
||||
if wakuDiscv5.isSome():
|
||||
some(defaultDiscoveryHandler(wakuDiscv5.get(), Store))
|
||||
else:
|
||||
none(DiscoveryHandler)
|
||||
|
||||
installStoreApiHandlers(router, node, storeDiscoHandler)
|
||||
|
||||
## Light push API
|
||||
if conf.lightpushnode != "" and node.wakuLightpushClient != nil:
|
||||
let lightDiscoHandler =
|
||||
if wakuDiscv5.isSome():
|
||||
some(defaultDiscoveryHandler(wakuDiscv5.get(), Lightpush))
|
||||
else:
|
||||
none(DiscoveryHandler)
|
||||
|
||||
rest_lightpush_api.installLightPushRequestHandler(router, node, lightDiscoHandler)
|
||||
else:
|
||||
restServerNotInstalledTab["lightpush"] =
|
||||
"/lightpush endpoints are not available. Please check your configuration: --lightpushnode"
|
||||
|
||||
info "REST services are installed"
|
||||
return ok()
|
||||
Loading…
x
Reference in New Issue
Block a user