From 963d79aee705a215b66a592937008aade45115de Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Wed, 24 Apr 2024 15:59:50 +0200 Subject: [PATCH] refactor: addition of waku_api/rest/builder.nim and reduce app.nim (#2623) --- apps/wakunode2/wakunode2.nim | 21 ++-- tests/wakunode2/test_app.nim | 2 +- waku/factory/app.nim | 182 +------------------------------ waku/waku_api/rest/builder.nim | 191 +++++++++++++++++++++++++++++++++ 4 files changed, 211 insertions(+), 185 deletions(-) create mode 100644 waku/waku_api/rest/builder.nim diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index b11943a1d..f14a1f97f 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -19,7 +19,8 @@ import ../../waku/factory/external_config, ../../waku/factory/networks_config, ../../waku/factory/app, - ../../waku/node/health_monitor + ../../waku/node/health_monitor, + ../../waku/waku_api/rest/builder as rest_server_builder logScope: topics = "wakunode main" @@ -127,25 +128,31 @@ when isMainModule: nodeHealthMonitor = WakuNodeHealthMonitor() nodeHealthMonitor.setOverallHealth(HealthStatus.INITIALIZING) - let restServerRes = startRestServerEsentials(nodeHealthMonitor, conf) - if restServerRes.isErr(): - error "Starting REST server failed.", error = $restServerRes.error() + let restServer = rest_server_builder.startRestServerEsentials( + nodeHealthMonitor, conf + ).valueOr: + error "Starting esential REST server failed.", error = $error quit(QuitFailure) var wakunode2 = App.init(conf).valueOr: error "App initialization failed", error = error quit(QuitFailure) + wakunode2.restServer = restServer + nodeHealthMonitor.setNode(wakunode2.node) wakunode2.startApp().isOkOr: error "Starting app failed", error = error quit(QuitFailure) - if conf.rest and not restServerRes.isErr(): - wakunode2.restServer = restServerRes.value + rest_server_builder.startRestServerProtocolSupport( + restServer, wakunode2.node, wakunode2.wakuDiscv5, conf + ).isOkOr: + error "Starting protocols support REST server failed.", error = $error + quit(QuitFailure) - wakunode2.setupMonitoringAndExternalInterfaces().isOkOr: + wakunode2.startMetricsServerAndLogging().isOkOr: error "Starting monitoring and external interfaces failed", error = error quit(QuitFailure) diff --git a/tests/wakunode2/test_app.nim b/tests/wakunode2/test_app.nim index 64d8f7a06..14ae0d74a 100644 --- a/tests/wakunode2/test_app.nim +++ b/tests/wakunode2/test_app.nim @@ -51,7 +51,7 @@ suite "Wakunode2 - App initialization": wakunode2.startApp().isOkOr: raiseAssert error - let mountRes = wakunode2.setupMonitoringAndExternalInterfaces() + let mountRes = wakunode2.startMetricsServerAndLogging() assert mountRes.isOk(), mountRes.error ## Then diff --git a/waku/factory/app.nim b/waku/factory/app.nim index 6a5a17f4d..e9d0e281b 100644 --- a/waku/factory/app.nim +++ b/waku/factory/app.nim @@ -58,12 +58,12 @@ type rng: ref HmacDrbgContext key: crypto.PrivateKey - wakuDiscv5: Option[WakuDiscoveryV5] + wakuDiscv5*: Option[WakuDiscoveryV5] dynamicBootstrapNodes: seq[RemotePeerInfo] node: WakuNode - restServer*: Option[WakuRestServerRef] + restServer*: WakuRestServerRef metricsServer: Option[MetricsHttpServerRef] AppResult*[T] = Result[T, string] @@ -272,172 +272,6 @@ proc startApp*(app: var App): AppResult[void] = return ok() -## Monitoring and external interfaces - -# Used to register api endpoints that are not currently installed as keys, -# values are holding error messages to be returned to the client -# NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it -# It will always be called from main thread anyway. -# Ref: https://nim-lang.org/docs/manual.html#threads-gc-safety -var restServerNotInstalledTab {.threadvar.}: TableRef[string, string] -restServerNotInstalledTab = newTable[string, string]() - -proc startRestServerEsentials*( - nodeHealthMonitor: WakuNodeHealthMonitor, conf: WakuNodeConf -): AppResult[Option[WakuRestServerRef]] = - if not conf.rest: - return ok(none(WakuRestServerRef)) - - let requestErrorHandler: RestRequestErrorHandler = proc( - error: RestRequestError, request: HttpRequestRef - ): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} = - try: - case error - of RestRequestError.Invalid: - return await request.respond(Http400, "Invalid request", HttpTable.init()) - of RestRequestError.NotFound: - let paths = request.rawPath.split("/") - let rootPath = - if len(paths) > 1: - paths[1] - else: - "" - restServerNotInstalledTab[].withValue(rootPath, errMsg): - return await request.respond(Http404, errMsg[], HttpTable.init()) - do: - return await request.respond( - Http400, - "Bad request initiated. Invalid path or method used.", - HttpTable.init(), - ) - of RestRequestError.InvalidContentBody: - return await request.respond(Http400, "Invalid content body", HttpTable.init()) - of RestRequestError.InvalidContentType: - return await request.respond(Http400, "Invalid content type", HttpTable.init()) - of RestRequestError.Unexpected: - return defaultResponse() - except HttpWriteError: - error "Failed to write response to client", error = getCurrentExceptionMsg() - discard - - return defaultResponse() - - let allowedOrigin = - if len(conf.restAllowOrigin) > 0: - some(conf.restAllowOrigin.join(",")) - else: - none(string) - - let address = conf.restAddress - let port = Port(conf.restPort + conf.portsShift) - let server = - ?newRestHttpServer( - address, - port, - allowedOrigin = allowedOrigin, - requestErrorHandler = requestErrorHandler, - ) - - ## Health REST API - installHealthApiHandler(server.router, nodeHealthMonitor) - - restServerNotInstalledTab["admin"] = - "/admin endpoints are not available while initializing." - restServerNotInstalledTab["debug"] = - "/debug endpoints are not available while initializing." - restServerNotInstalledTab["relay"] = - "/relay endpoints are not available while initializing." - restServerNotInstalledTab["filter"] = - "/filter endpoints are not available while initializing." - restServerNotInstalledTab["lightpush"] = - "/lightpush endpoints are not available while initializing." - restServerNotInstalledTab["store"] = - "/store endpoints are not available while initializing." - - server.start() - info "Starting REST HTTP server", url = "http://" & $address & ":" & $port & "/" - - ok(some(server)) - -proc startRestServerProtocolSupport(app: var App): AppResult[void] = - if not app.conf.rest or app.restServer.isNone(): - ## Maybe we don't need rest server at all, so it is ok. - return ok() - - var router = app.restServer.get().router - ## Admin REST API - if app.conf.restAdmin: - installAdminApiHandlers(router, app.node) - else: - restServerNotInstalledTab["admin"] = - "/admin endpoints are not available. Please check your configuration: --rest-admin=true" - - ## Debug REST API - installDebugApiHandlers(router, app.node) - - ## Relay REST API - if app.conf.relay: - let cache = MessageCache.init(int(app.conf.restRelayCacheCapacity)) - - let handler = messageCacheHandler(cache) - - for pubsubTopic in app.conf.pubsubTopics: - cache.pubsubSubscribe(pubsubTopic) - app.node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler)) - - for contentTopic in app.conf.contentTopics: - cache.contentSubscribe(contentTopic) - app.node.subscribe((kind: ContentSub, topic: contentTopic), some(handler)) - - installRelayApiHandlers(router, app.node, cache) - else: - restServerNotInstalledTab["relay"] = - "/relay endpoints are not available. Please check your configuration: --relay" - - ## Filter REST API - if app.conf.filternode != "" and app.node.wakuFilterClient != nil: - let filterCache = MessageCache.init() - - let filterDiscoHandler = - if app.wakuDiscv5.isSome(): - some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Filter)) - else: - none(DiscoveryHandler) - - rest_filter_api.installFilterRestApiHandlers( - router, app.node, filterCache, filterDiscoHandler - ) - else: - restServerNotInstalledTab["filter"] = - "/filter endpoints are not available. Please check your configuration: --filternode" - - ## Store REST API - let storeDiscoHandler = - if app.wakuDiscv5.isSome(): - some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Store)) - else: - none(DiscoveryHandler) - - installStoreApiHandlers(router, app.node, storeDiscoHandler) - - ## Light push API - if app.conf.lightpushnode != "" and app.node.wakuLightpushClient != nil: - let lightDiscoHandler = - if app.wakuDiscv5.isSome(): - some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Lightpush)) - else: - none(DiscoveryHandler) - - rest_lightpush_api.installLightPushRequestHandler( - router, app.node, lightDiscoHandler - ) - else: - restServerNotInstalledTab["lightpush"] = - "/lightpush endpoints are not available. Please check your configuration: --lightpushnode" - - info "REST services are installed" - ok() - proc startMetricsServer( serverIp: IpAddress, serverPort: Port ): AppResult[MetricsHttpServerRef] = @@ -460,13 +294,7 @@ proc startMetricsLogging(): AppResult[void] = startMetricsLog() ok() -proc setupMonitoringAndExternalInterfaces*(app: var App): AppResult[void] = - if app.conf.rest and app.restServer.isSome(): - let restProtocolSupportRes = startRestServerProtocolSupport(app) - if restProtocolSupportRes.isErr(): - error "Starting REST server protocol support failed. Continuing in current state.", - error = restProtocolSupportRes.error - +proc startMetricsServerAndLogging*(app: var App): AppResult[void] = if app.conf.metricsServer: let startMetricsServerRes = startMetricsServer( app.conf.metricsServerAddress, @@ -489,8 +317,8 @@ proc setupMonitoringAndExternalInterfaces*(app: var App): AppResult[void] = # App shutdown proc stop*(app: App): Future[void] {.async: (raises: [Exception]).} = - if app.restServer.isSome(): - await app.restServer.get().stop() + if app.conf.rest: + await app.restServer.stop() if app.metricsServer.isSome(): await app.metricsServer.get().stop() diff --git a/waku/waku_api/rest/builder.nim b/waku/waku_api/rest/builder.nim new file mode 100644 index 000000000..e5b489334 --- /dev/null +++ b/waku/waku_api/rest/builder.nim @@ -0,0 +1,191 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import net, tables +import presto +import + ../../waku/common/utils/nat, + ../../waku/waku_node, + ../../waku/discovery/waku_discv5, + ../../waku/factory/external_config, + ../../waku/waku_api/message_cache, + ../../waku/waku_api/handlers, + ../../waku/waku_api/rest/server, + ../../waku/waku_api/rest/debug/handlers as rest_debug_api, + ../../waku/waku_api/rest/relay/handlers as rest_relay_api, + ../../waku/waku_api/rest/filter/handlers as rest_filter_api, + ../../waku/waku_api/rest/lightpush/handlers as rest_lightpush_api, + ../../waku/waku_api/rest/store/handlers as rest_store_api, + ../../waku/waku_api/rest/health/handlers as rest_health_api, + ../../waku/waku_api/rest/admin/handlers as rest_admin_api, + ../../waku/waku_core/topics + +## Monitoring and external interfaces + +# Used to register api endpoints that are not currently installed as keys, +# values are holding error messages to be returned to the client +# NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it +# It will always be called from main thread anyway. +# Ref: https://nim-lang.org/docs/manual.html#threads-gc-safety +var restServerNotInstalledTab {.threadvar.}: TableRef[string, string] +restServerNotInstalledTab = newTable[string, string]() + +proc startRestServerEsentials*( + nodeHealthMonitor: WakuNodeHealthMonitor, conf: WakuNodeConf +): Result[WakuRestServerRef, string] = + if not conf.rest: + return + + let requestErrorHandler: RestRequestErrorHandler = proc( + error: RestRequestError, request: HttpRequestRef + ): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} = + try: + case error + of RestRequestError.Invalid: + return await request.respond(Http400, "Invalid request", HttpTable.init()) + of RestRequestError.NotFound: + let paths = request.rawPath.split("/") + let rootPath = + if len(paths) > 1: + paths[1] + else: + "" + restServerNotInstalledTab[].withValue(rootPath, errMsg): + return await request.respond(Http404, errMsg[], HttpTable.init()) + do: + return await request.respond( + Http400, + "Bad request initiated. Invalid path or method used.", + HttpTable.init(), + ) + of RestRequestError.InvalidContentBody: + return await request.respond(Http400, "Invalid content body", HttpTable.init()) + of RestRequestError.InvalidContentType: + return await request.respond(Http400, "Invalid content type", HttpTable.init()) + of RestRequestError.Unexpected: + return defaultResponse() + except HttpWriteError: + error "Failed to write response to client", error = getCurrentExceptionMsg() + discard + + return defaultResponse() + + let allowedOrigin = + if len(conf.restAllowOrigin) > 0: + some(conf.restAllowOrigin.join(",")) + else: + none(string) + + let address = conf.restAddress + let port = Port(conf.restPort + conf.portsShift) + let server = + ?newRestHttpServer( + address, + port, + allowedOrigin = allowedOrigin, + requestErrorHandler = requestErrorHandler, + ) + + ## Health REST API + installHealthApiHandler(server.router, nodeHealthMonitor) + + restServerNotInstalledTab["admin"] = + "/admin endpoints are not available while initializing." + restServerNotInstalledTab["debug"] = + "/debug endpoints are not available while initializing." + restServerNotInstalledTab["relay"] = + "/relay endpoints are not available while initializing." + restServerNotInstalledTab["filter"] = + "/filter endpoints are not available while initializing." + restServerNotInstalledTab["lightpush"] = + "/lightpush endpoints are not available while initializing." + restServerNotInstalledTab["store"] = + "/store endpoints are not available while initializing." + + server.start() + info "Starting REST HTTP server", url = "http://" & $address & ":" & $port & "/" + + ok(server) + +proc startRestServerProtocolSupport*( + restServer: WakuRestServerRef, + node: WakuNode, + wakuDiscv5: Option[WakuDiscoveryV5], + conf: WakuNodeConf, +): Result[void, string] = + if not conf.rest: + return + + var router = restServer.router + ## Admin REST API + if conf.restAdmin: + installAdminApiHandlers(router, node) + else: + restServerNotInstalledTab["admin"] = + "/admin endpoints are not available. Please check your configuration: --rest-admin=true" + + ## Debug REST API + installDebugApiHandlers(router, node) + + ## Relay REST API + if conf.relay: + let cache = MessageCache.init(int(conf.restRelayCacheCapacity)) + + let handler = messageCacheHandler(cache) + + for pubsubTopic in conf.pubsubTopics: + cache.pubsubSubscribe(pubsubTopic) + node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler)) + + for contentTopic in conf.contentTopics: + cache.contentSubscribe(contentTopic) + node.subscribe((kind: ContentSub, topic: contentTopic), some(handler)) + + installRelayApiHandlers(router, node, cache) + else: + restServerNotInstalledTab["relay"] = + "/relay endpoints are not available. Please check your configuration: --relay" + + ## Filter REST API + if conf.filternode != "" and node.wakuFilterClient != nil: + let filterCache = MessageCache.init() + + let filterDiscoHandler = + if wakuDiscv5.isSome(): + some(defaultDiscoveryHandler(wakuDiscv5.get(), Filter)) + else: + none(DiscoveryHandler) + + rest_filter_api.installFilterRestApiHandlers( + router, node, filterCache, filterDiscoHandler + ) + else: + restServerNotInstalledTab["filter"] = + "/filter endpoints are not available. Please check your configuration: --filternode" + + ## Store REST API + let storeDiscoHandler = + if wakuDiscv5.isSome(): + some(defaultDiscoveryHandler(wakuDiscv5.get(), Store)) + else: + none(DiscoveryHandler) + + installStoreApiHandlers(router, node, storeDiscoHandler) + + ## Light push API + if conf.lightpushnode != "" and node.wakuLightpushClient != nil: + let lightDiscoHandler = + if wakuDiscv5.isSome(): + some(defaultDiscoveryHandler(wakuDiscv5.get(), Lightpush)) + else: + none(DiscoveryHandler) + + rest_lightpush_api.installLightPushRequestHandler(router, node, lightDiscoHandler) + else: + restServerNotInstalledTab["lightpush"] = + "/lightpush endpoints are not available. Please check your configuration: --lightpushnode" + + info "REST services are installed" + return ok()