diff --git a/Makefile b/Makefile index 4fafd6310..8f98e90bd 100644 --- a/Makefile +++ b/Makefile @@ -469,6 +469,7 @@ logosdelivery_example: | build liblogosdelivery ifeq ($(detected_OS),Darwin) gcc -o build/$@ \ liblogosdelivery/examples/logosdelivery_example.c \ + liblogosdelivery/examples/json_utils.c \ -I./liblogosdelivery \ -L./build \ -llogosdelivery \ @@ -476,6 +477,7 @@ ifeq ($(detected_OS),Darwin) else ifeq ($(detected_OS),Linux) gcc -o build/$@ \ liblogosdelivery/examples/logosdelivery_example.c \ + liblogosdelivery/examples/json_utils.c \ -I./liblogosdelivery \ -L./build \ -llogosdelivery \ @@ -483,6 +485,7 @@ else ifeq ($(detected_OS),Linux) else ifeq ($(detected_OS),Windows) gcc -o build/$@.exe \ liblogosdelivery/examples/logosdelivery_example.c \ + liblogosdelivery/examples/json_utils.c \ -I./liblogosdelivery \ -L./build \ -llogosdelivery \ diff --git a/examples/api_example/api_example.nim b/examples/api_example/api_example.nim index 37dd5d34b..4a7cde5db 100644 --- a/examples/api_example/api_example.nim +++ b/examples/api_example/api_example.nim @@ -59,19 +59,24 @@ when isMainModule: echo "Starting Waku node..." - let config = - if (args.ethRpcEndpoint == ""): - # Create a basic configuration for the Waku node - # No RLN as we don't have an ETH RPC Endpoint - NodeConfig.init( - protocolsConfig = ProtocolsConfig.init(entryNodes = @[], clusterId = 42) - ) - else: - # Connect to TWN, use ETH RPC Endpoint for RLN - NodeConfig.init(mode = WakuMode.Core, ethRpcEndpoints = @[args.ethRpcEndpoint]) + # Use WakuNodeConf (the CLI configuration type) for node setup + var conf = defaultWakuNodeConf().valueOr: + echo "Failed to create default config: ", error + quit(QuitFailure) + + if args.ethRpcEndpoint == "": + # Create a basic configuration for the Waku node + # No RLN as we don't have an ETH RPC Endpoint + conf.mode = Core + conf.preset = "logos.dev" + else: + # Connect to TWN, use ETH RPC Endpoint for RLN + conf.mode = Core + conf.preset = "twn" + conf.ethClientUrls = @[EthRpcUrl(args.ethRpcEndpoint)] # Create the node using the library API's createNode function - let node = (waitFor createNode(config)).valueOr: + let node = (waitFor createNode(conf)).valueOr: echo "Failed to create node: ", error quit(QuitFailure) diff --git a/liblogosdelivery/README.md b/liblogosdelivery/README.md index f9909dd3d..e8352c611 100644 --- a/liblogosdelivery/README.md +++ b/liblogosdelivery/README.md @@ -32,18 +32,17 @@ void *logosdelivery_create_node( ```json { "mode": "Core", - "clusterId": 1, - "entryNodes": [ - "enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im" - ], - "networkingConfig": { - "listenIpv4": "0.0.0.0", - "p2pTcpPort": 60000, - "discv5UdpPort": 9000 - } + "preset": "logos.dev", + "listenAddress": "0.0.0.0", + "tcpPort": 60000, + "discv5UdpPort": 9000 } ``` +Configuration uses flat field names matching `WakuNodeConf` in `tools/confutils/cli_args.nim`. +Use `"preset"` to select a network preset (e.g., `"twn"`, `"logos.dev"`) which auto-configures +entry nodes, cluster ID, sharding, and other network-specific settings. + #### `logosdelivery_start_node` Starts the node. @@ -207,8 +206,9 @@ void callback(int ret, const char *msg, size_t len, void *userData) { int main() { const char *config = "{" + "\"logLevel\": \"INFO\"," "\"mode\": \"Core\"," - "\"clusterId\": 1" + "\"preset\": \"logos.dev\"" "}"; // Create node diff --git a/liblogosdelivery/declare_lib.nim b/liblogosdelivery/declare_lib.nim index 98209c649..5087a0dee 100644 --- a/liblogosdelivery/declare_lib.nim +++ b/liblogosdelivery/declare_lib.nim @@ -1,8 +1,12 @@ import ffi +import std/locks import waku/factory/waku declareLibrary("logosdelivery") +var eventCallbackLock: Lock +initLock(eventCallbackLock) + template requireInitializedNode*( ctx: ptr FFIContext[Waku], opName: string, onError: untyped ) = @@ -20,5 +24,10 @@ proc logosdelivery_set_event_callback( echo "error: invalid context in logosdelivery_set_event_callback" return + # prevent race conditions that might happen due incorrect usage. + eventCallbackLock.acquire() + defer: + eventCallbackLock.release() + ctx[].eventCallback = cast[pointer](callback) ctx[].eventUserData = userData diff --git a/liblogosdelivery/examples/json_utils.c b/liblogosdelivery/examples/json_utils.c new file mode 100644 index 000000000..8b33bb648 --- /dev/null +++ b/liblogosdelivery/examples/json_utils.c @@ -0,0 +1,96 @@ +#include "json_utils.h" +#include +#include + +const char* extract_json_field(const char *json, const char *field, char *buffer, size_t bufSize) { + char searchStr[256]; + snprintf(searchStr, sizeof(searchStr), "\"%s\":\"", field); + + const char *start = strstr(json, searchStr); + if (!start) { + return NULL; + } + + start += strlen(searchStr); + const char *end = strchr(start, '"'); + if (!end) { + return NULL; + } + + size_t len = end - start; + if (len >= bufSize) { + len = bufSize - 1; + } + + memcpy(buffer, start, len); + buffer[len] = '\0'; + + return buffer; +} + +const char* extract_json_object(const char *json, const char *field, size_t *outLen) { + char searchStr[256]; + snprintf(searchStr, sizeof(searchStr), "\"%s\":{", field); + + const char *start = strstr(json, searchStr); + if (!start) { + return NULL; + } + + // Advance to the opening brace + start = strchr(start, '{'); + if (!start) { + return NULL; + } + + // Find the matching closing brace (handles nested braces) + int depth = 0; + const char *p = start; + while (*p) { + if (*p == '{') depth++; + else if (*p == '}') { + depth--; + if (depth == 0) { + *outLen = (size_t)(p - start + 1); + return start; + } + } + p++; + } + return NULL; +} + +int decode_json_byte_array(const char *json, const char *field, char *buffer, size_t bufSize) { + char searchStr[256]; + snprintf(searchStr, sizeof(searchStr), "\"%s\":[", field); + + const char *start = strstr(json, searchStr); + if (!start) { + return -1; + } + + // Advance to the opening bracket + start = strchr(start, '['); + if (!start) { + return -1; + } + start++; // skip '[' + + size_t pos = 0; + const char *p = start; + while (*p && *p != ']' && pos < bufSize - 1) { + // Skip whitespace and commas + while (*p == ' ' || *p == ',' || *p == '\n' || *p == '\r' || *p == '\t') p++; + if (*p == ']') break; + + // Parse integer + int val = 0; + while (*p >= '0' && *p <= '9') { + val = val * 10 + (*p - '0'); + p++; + } + buffer[pos++] = (char)val; + } + buffer[pos] = '\0'; + return (int)pos; +} diff --git a/liblogosdelivery/examples/json_utils.h b/liblogosdelivery/examples/json_utils.h new file mode 100644 index 000000000..4039ca4f6 --- /dev/null +++ b/liblogosdelivery/examples/json_utils.h @@ -0,0 +1,21 @@ +#ifndef JSON_UTILS_H +#define JSON_UTILS_H + +#include + +// Extract a JSON string field value into buffer. +// Returns pointer to buffer on success, NULL on failure. +// Very basic parser - for production use a proper JSON library. +const char* extract_json_field(const char *json, const char *field, char *buffer, size_t bufSize); + +// Extract a nested JSON object as a raw string. +// Returns a pointer into `json` at the start of the object, and sets `outLen`. +// Handles nested braces. +const char* extract_json_object(const char *json, const char *field, size_t *outLen); + +// Decode a JSON array of integers (byte values) into a buffer. +// Parses e.g. [72,101,108,108,111] into "Hello". +// Returns number of bytes decoded, or -1 on error. +int decode_json_byte_array(const char *json, const char *field, char *buffer, size_t bufSize); + +#endif // JSON_UTILS_H diff --git a/liblogosdelivery/examples/logosdelivery_example.c b/liblogosdelivery/examples/logosdelivery_example.c index 5437be427..729f7f0dc 100644 --- a/liblogosdelivery/examples/logosdelivery_example.c +++ b/liblogosdelivery/examples/logosdelivery_example.c @@ -1,4 +1,5 @@ #include "../liblogosdelivery.h" +#include "json_utils.h" #include #include #include @@ -6,33 +7,10 @@ static int create_node_ok = -1; -// Helper function to extract a JSON string field value -// Very basic parser - for production use a proper JSON library -const char* extract_json_field(const char *json, const char *field, char *buffer, size_t bufSize) { - char searchStr[256]; - snprintf(searchStr, sizeof(searchStr), "\"%s\":\"", field); - - const char *start = strstr(json, searchStr); - if (!start) { - return NULL; - } - - start += strlen(searchStr); - const char *end = strchr(start, '"'); - if (!end) { - return NULL; - } - - size_t len = end - start; - if (len >= bufSize) { - len = bufSize - 1; - } - - memcpy(buffer, start, len); - buffer[len] = '\0'; - - return buffer; -} +// Flags set by event callback, polled by main thread +static volatile int got_message_sent = 0; +static volatile int got_message_error = 0; +static volatile int got_message_received = 0; // Event callback that handles message events void event_callback(int ret, const char *msg, size_t len, void *userData) { @@ -61,7 +39,8 @@ void event_callback(int ret, const char *msg, size_t len, void *userData) { char messageHash[128]; extract_json_field(eventJson, "requestId", requestId, sizeof(requestId)); extract_json_field(eventJson, "messageHash", messageHash, sizeof(messageHash)); - printf("📤 [EVENT] Message sent - RequestID: %s, Hash: %s\n", requestId, messageHash); + printf("[EVENT] Message sent - RequestID: %s, Hash: %s\n", requestId, messageHash); + got_message_sent = 1; } else if (strcmp(eventType, "message_error") == 0) { char requestId[128]; @@ -70,18 +49,59 @@ void event_callback(int ret, const char *msg, size_t len, void *userData) { extract_json_field(eventJson, "requestId", requestId, sizeof(requestId)); extract_json_field(eventJson, "messageHash", messageHash, sizeof(messageHash)); extract_json_field(eventJson, "error", error, sizeof(error)); - printf("❌ [EVENT] Message error - RequestID: %s, Hash: %s, Error: %s\n", + printf("[EVENT] Message error - RequestID: %s, Hash: %s, Error: %s\n", requestId, messageHash, error); + got_message_error = 1; } else if (strcmp(eventType, "message_propagated") == 0) { char requestId[128]; char messageHash[128]; extract_json_field(eventJson, "requestId", requestId, sizeof(requestId)); extract_json_field(eventJson, "messageHash", messageHash, sizeof(messageHash)); - printf("✅ [EVENT] Message propagated - RequestID: %s, Hash: %s\n", requestId, messageHash); + printf("[EVENT] Message propagated - RequestID: %s, Hash: %s\n", requestId, messageHash); + + } else if (strcmp(eventType, "connection_status_change") == 0) { + char connectionStatus[256]; + extract_json_field(eventJson, "connectionStatus", connectionStatus, sizeof(connectionStatus)); + printf("[EVENT] Connection status change - Status: %s\n", connectionStatus); + + } else if (strcmp(eventType, "message_received") == 0) { + char messageHash[128]; + extract_json_field(eventJson, "messageHash", messageHash, sizeof(messageHash)); + + // Extract the nested "message" object + size_t msgObjLen = 0; + const char *msgObj = extract_json_object(eventJson, "message", &msgObjLen); + if (msgObj) { + // Make a null-terminated copy of the message object + char *msgJson = malloc(msgObjLen + 1); + if (msgJson) { + memcpy(msgJson, msgObj, msgObjLen); + msgJson[msgObjLen] = '\0'; + + char contentTopic[256]; + extract_json_field(msgJson, "contentTopic", contentTopic, sizeof(contentTopic)); + + // Decode payload from JSON byte array to string + char payload[4096]; + int payloadLen = decode_json_byte_array(msgJson, "payload", payload, sizeof(payload)); + + printf("[EVENT] Message received - Hash: %s, ContentTopic: %s\n", messageHash, contentTopic); + if (payloadLen > 0) { + printf(" Payload (%d bytes): %.*s\n", payloadLen, payloadLen, payload); + } else { + printf(" Payload: (empty or could not decode)\n"); + } + + free(msgJson); + } + } else { + printf("[EVENT] Message received - Hash: %s (could not parse message)\n", messageHash); + } + got_message_received = 1; } else { - printf("â„šī¸ [EVENT] Unknown event type: %s\n", eventType); + printf("[EVENT] Unknown event type: %s\n", eventType); } free(eventJson); @@ -109,23 +129,12 @@ void simple_callback(int ret, const char *msg, size_t len, void *userData) { int main() { printf("=== Logos Messaging API (LMAPI) Example ===\n\n"); - // Configuration JSON for creating a node + // Configuration JSON using WakuNodeConf field names (flat structure). + // Field names match Nim identifiers from WakuNodeConf in tools/confutils/cli_args.nim. const char *config = "{" - "\"logLevel\": \"DEBUG\"," - // "\"mode\": \"Edge\"," + "\"logLevel\": \"INFO\"," "\"mode\": \"Core\"," - "\"protocolsConfig\": {" - "\"entryNodes\": [\"/dns4/node-01.do-ams3.misc.logos-chat.status.im/tcp/30303/p2p/16Uiu2HAkxoqUTud5LUPQBRmkeL2xP4iKx2kaABYXomQRgmLUgf78\"]," - "\"clusterId\": 42," - "\"autoShardingConfig\": {" - "\"numShardsInCluster\": 8" - "}" - "}," - "\"networkingConfig\": {" - "\"listenIpv4\": \"0.0.0.0\"," - "\"p2pTcpPort\": 60000," - "\"discv5UdpPort\": 9000" - "}" + "\"preset\": \"logos.dev\"" "}"; printf("1. Creating node...\n"); @@ -152,7 +161,7 @@ int main() { logosdelivery_start_node(ctx, simple_callback, (void *)"start_node"); // Wait for node to start - sleep(2); + sleep(5); printf("\n4. Subscribing to content topic...\n"); const char *contentTopic = "/example/1/chat/proto"; @@ -161,7 +170,23 @@ int main() { // Wait for subscription sleep(1); - printf("\n5. Sending a message...\n"); + printf("\n5. Retrieving all possibl node info ids...\n"); + logosdelivery_get_available_node_info_ids(ctx, simple_callback, (void *)"get_available_node_info_ids"); + + printf("\nRetrieving node info for a specific invalid ID...\n"); + logosdelivery_get_node_info(ctx, simple_callback, (void *)"get_node_info", "WrongNodeInfoId"); + + printf("\nRetrieving several node info for specific correct IDs...\n"); + logosdelivery_get_node_info(ctx, simple_callback, (void *)"get_node_info", "Version"); + // logosdelivery_get_node_info(ctx, simple_callback, (void *)"get_node_info", "Metrics"); + logosdelivery_get_node_info(ctx, simple_callback, (void *)"get_node_info", "MyMultiaddresses"); + logosdelivery_get_node_info(ctx, simple_callback, (void *)"get_node_info", "MyENR"); + logosdelivery_get_node_info(ctx, simple_callback, (void *)"get_node_info", "MyPeerId"); + + printf("\nRetrieving available configs...\n"); + logosdelivery_get_available_configs(ctx, simple_callback, (void *)"get_available_configs"); + + printf("\n6. Sending a message...\n"); printf("Watch for message events (sent, propagated, or error):\n"); // Create base64-encoded payload: "Hello, Logos Messaging!" const char *message = "{" @@ -171,21 +196,30 @@ int main() { "}"; logosdelivery_send(ctx, simple_callback, (void *)"send", message); - // Wait for message events to arrive + // Poll for terminal message events (sent, error, or received) with timeout printf("Waiting for message delivery events...\n"); - sleep(60); + int timeout_sec = 60; + int elapsed = 0; + while (!(got_message_sent || got_message_error || got_message_received) + && elapsed < timeout_sec) { + usleep(100000); // 100ms + elapsed++; + } + if (elapsed >= timeout_sec) { + printf("Timed out waiting for message events after %d seconds\n", timeout_sec); + } - printf("\n6. Unsubscribing from content topic...\n"); + printf("\n7. Unsubscribing from content topic...\n"); logosdelivery_unsubscribe(ctx, simple_callback, (void *)"unsubscribe", contentTopic); sleep(1); - printf("\n7. Stopping node...\n"); + printf("\n8. Stopping node...\n"); logosdelivery_stop_node(ctx, simple_callback, (void *)"stop_node"); sleep(1); - printf("\n8. Destroying context...\n"); + printf("\n9. Destroying context...\n"); logosdelivery_destroy(ctx, simple_callback, (void *)"destroy"); printf("\n=== Example completed ===\n"); diff --git a/liblogosdelivery/liblogosdelivery.h b/liblogosdelivery/liblogosdelivery.h index b014d6385..5092db9f2 100644 --- a/liblogosdelivery/liblogosdelivery.h +++ b/liblogosdelivery/liblogosdelivery.h @@ -22,7 +22,9 @@ extern "C" // Creates a new instance of the node from the given configuration JSON. // Returns a pointer to the Context needed by the rest of the API functions. - // Configuration should be in JSON format following the NodeConfig structure. + // Configuration should be in JSON format using WakuNodeConf field names. + // Field names match Nim identifiers from WakuNodeConf (camelCase). + // Example: {"mode": "Core", "clusterId": 42, "relay": true} void *logosdelivery_create_node( const char *configJson, FFICallBack callback, @@ -75,6 +77,22 @@ extern "C" FFICallBack callback, void *userData); + // Retrieves the list of available node info IDs. + int logosdelivery_get_available_node_info_ids(void *ctx, + FFICallBack callback, + void *userData); + + // Given a node info ID, retrieves the corresponding info. + int logosdelivery_get_node_info(void *ctx, + FFICallBack callback, + void *userData, + const char *nodeInfoId); + + // Retrieves the list of available configurations. + int logosdelivery_get_available_configs(void *ctx, + FFICallBack callback, + void *userData); + #ifdef __cplusplus } #endif diff --git a/liblogosdelivery/liblogosdelivery.nim b/liblogosdelivery/liblogosdelivery.nim index 7d068b065..fc907498a 100644 --- a/liblogosdelivery/liblogosdelivery.nim +++ b/liblogosdelivery/liblogosdelivery.nim @@ -4,26 +4,8 @@ import waku/factory/waku, waku/node/waku_node, ./declare_lib ################################################################################ ## Include different APIs, i.e. all procs with {.ffi.} pragma -include ./logos_delivery_api/node_api, ./logos_delivery_api/messaging_api -################################################################################ -### Exported procs - -proc logosdelivery_destroy( - ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer -): cint {.dynlib, exportc, cdecl.} = - initializeLibrary() - checkParams(ctx, callback, userData) - - ffi.destroyFFIContext(ctx).isOkOr: - let msg = "liblogosdelivery error: " & $error - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) - return RET_ERR - - ## always need to invoke the callback although we don't retrieve value to the caller - callback(RET_OK, nil, 0, userData) - - return RET_OK - -# ### End of exported procs -# ################################################################################ +include + ./logos_delivery_api/node_api, + ./logos_delivery_api/messaging_api, + ./logos_delivery_api/debug_api diff --git a/liblogosdelivery/logos_delivery_api/debug_api.nim b/liblogosdelivery/logos_delivery_api/debug_api.nim new file mode 100644 index 000000000..623b3b08f --- /dev/null +++ b/liblogosdelivery/logos_delivery_api/debug_api.nim @@ -0,0 +1,54 @@ +import std/[json, strutils] +import waku/factory/waku_state_info +import tools/confutils/[cli_args, config_option_meta] + +proc logosdelivery_get_available_node_info_ids( + ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer +) {.ffi.} = + ## Returns the list of all available node info item ids that + ## can be queried with `get_node_info_item`. + requireInitializedNode(ctx, "GetNodeInfoIds"): + return err(errMsg) + + return ok($ctx.myLib[].stateInfo.getAllPossibleInfoItemIds()) + +proc logosdelivery_get_node_info( + ctx: ptr FFIContext[Waku], + callback: FFICallBack, + userData: pointer, + nodeInfoId: cstring, +) {.ffi.} = + ## Returns the content of the node info item with the given id if it exists. + requireInitializedNode(ctx, "GetNodeInfoItem"): + return err(errMsg) + + let infoItemIdEnum = + try: + parseEnum[NodeInfoId]($nodeInfoId) + except ValueError: + return err("Invalid node info id: " & $nodeInfoId) + + return ok(ctx.myLib[].stateInfo.getNodeInfoItem(infoItemIdEnum)) + +proc logosdelivery_get_available_configs( + ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer +) {.ffi.} = + ## Returns information about the accepted config items. + requireInitializedNode(ctx, "GetAvailableConfigs"): + return err(errMsg) + + let optionMetas: seq[ConfigOptionMeta] = extractConfigOptionMeta(WakuNodeConf) + var configOptionDetails = newJArray() + + # for confField, confValue in fieldPairs(conf): + # defaultConfig[confField] = $confValue + + for meta in optionMetas: + configOptionDetails.add( + %*{meta.fieldName: meta.typeName & "(" & meta.defaultValue & ")", "desc": meta.desc} + ) + + var jsonNode = newJObject() + jsonNode["configOptions"] = configOptionDetails + let asString = pretty(jsonNode) + return ok(pretty(jsonNode)) diff --git a/liblogosdelivery/logos_delivery_api/node_api.nim b/liblogosdelivery/logos_delivery_api/node_api.nim index 6a0041857..cd644abd7 100644 --- a/liblogosdelivery/logos_delivery_api/node_api.nim +++ b/liblogosdelivery/logos_delivery_api/node_api.nim @@ -1,10 +1,11 @@ -import std/json -import chronos, results, ffi +import std/[json, strutils] +import chronos, chronicles, results, confutils, confutils/std/net, ffi import waku/factory/waku, waku/node/waku_node, - waku/api/[api, api_conf, types], - waku/events/message_events, + waku/api/[api, types], + waku/events/[message_events, health_events], + tools/confutils/cli_args, ../declare_lib, ../json_event @@ -14,21 +15,54 @@ proc `%`*(id: RequestId): JsonNode = registerReqFFI(CreateNodeRequest, ctx: ptr FFIContext[Waku]): proc(configJson: cstring): Future[Result[string, string]] {.async.} = - ## Parse the JSON configuration and create a node - let nodeConfig = - try: - decodeNodeConfigFromJson($configJson) - except SerializationError as e: - return err("Failed to parse config JSON: " & e.msg) + ## Parse the JSON configuration using fieldPairs approach (WakuNodeConf) + var conf = defaultWakuNodeConf().valueOr: + return err("Failed creating default conf: " & error) + + var jsonNode: JsonNode + try: + jsonNode = parseJson($configJson) + except Exception: + return err( + "Failed to parse config JSON: " & getCurrentExceptionMsg() & + " configJson string: " & $configJson + ) + + for confField, confValue in fieldPairs(conf): + if jsonNode.contains(confField): + let formattedString = ($jsonNode[confField]).strip(chars = {'\"'}) + try: + confValue = parseCmdArg(typeof(confValue), formattedString) + except Exception: + return err( + "Failed to parse field '" & confField & "': " & getCurrentExceptionMsg() & + ". Value: " & formattedString + ) # Create the node - ctx.myLib[] = (await api.createNode(nodeConfig)).valueOr: + ctx.myLib[] = (await api.createNode(conf)).valueOr: let errMsg = $error chronicles.error "CreateNodeRequest failed", err = errMsg return err(errMsg) return ok("") +proc logosdelivery_destroy( + ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer +): cint {.dynlib, exportc, cdecl.} = + initializeLibrary() + checkParams(ctx, callback, userData) + + ffi.destroyFFIContext(ctx).isOkOr: + let msg = "liblogosdelivery error: " & $error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + ## always need to invoke the callback although we don't retrieve value to the caller + callback(RET_OK, nil, 0, userData) + + return RET_OK + proc logosdelivery_create_node( configJson: cstring, callback: FFICallback, userData: pointer ): pointer {.dynlib, exportc, cdecl.} = @@ -50,6 +84,10 @@ proc logosdelivery_create_node( ).isOkOr: let msg = "error in sendRequestToFFIThread: " & $error callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + # free allocated resources as they won't be available + ffi.destroyFFIContext(ctx).isOkOr: + chronicles.error "Error in destroyFFIContext after sendRequestToFFIThread during creation", + err = $error return nil return ctx @@ -88,6 +126,24 @@ proc logosdelivery_start_node( chronicles.error "MessagePropagatedEvent.listen failed", err = $error return err("MessagePropagatedEvent.listen failed: " & $error) + let receivedListener = MessageReceivedEvent.listen( + ctx.myLib[].brokerCtx, + proc(event: MessageReceivedEvent) {.async: (raises: []).} = + callEventCallback(ctx, "onMessageReceived"): + $newJsonEvent("message_received", event), + ).valueOr: + chronicles.error "MessageReceivedEvent.listen failed", err = $error + return err("MessageReceivedEvent.listen failed: " & $error) + + let ConnectionStatusChangeListener = EventConnectionStatusChange.listen( + ctx.myLib[].brokerCtx, + proc(event: EventConnectionStatusChange) {.async: (raises: []).} = + callEventCallback(ctx, "onConnectionStatusChange"): + $newJsonEvent("connection_status_change", event), + ).valueOr: + chronicles.error "ConnectionStatusChange.listen failed", err = $error + return err("ConnectionStatusChange.listen failed: " & $error) + (await startWaku(addr ctx.myLib[])).isOkOr: let errMsg = $error chronicles.error "START_NODE failed", err = errMsg @@ -103,6 +159,8 @@ proc logosdelivery_stop_node( MessageErrorEvent.dropAllListeners(ctx.myLib[].brokerCtx) MessageSentEvent.dropAllListeners(ctx.myLib[].brokerCtx) MessagePropagatedEvent.dropAllListeners(ctx.myLib[].brokerCtx) + MessageReceivedEvent.dropAllListeners(ctx.myLib[].brokerCtx) + EventConnectionStatusChange.dropAllListeners(ctx.myLib[].brokerCtx) (await ctx.myLib[].stop()).isOkOr: let errMsg = $error diff --git a/nix/default.nix b/nix/default.nix index ca91d0e2f..7df58df60 100644 --- a/nix/default.nix +++ b/nix/default.nix @@ -61,6 +61,7 @@ in stdenv.mkDerivation { "QUICK_AND_DIRTY_NIMBLE=${if quickAndDirty then "1" else "0"}" "USE_SYSTEM_NIM=${if useSystemNim then "1" else "0"}" "LIBRLN_FILE=${zerokitRln}/lib/librln.${if abidir != null then "so" else "a"}" + "POSTGRES=1" ]; configurePhase = '' diff --git a/tests/api/test_all.nim b/tests/api/test_all.nim index 57f7f37f2..4617c8cdb 100644 --- a/tests/api/test_all.nim +++ b/tests/api/test_all.nim @@ -1,3 +1,8 @@ {.used.} -import ./test_entry_nodes, ./test_node_conf, ./test_api_send, ./test_api_health +import + ./test_entry_nodes, + ./test_node_conf, + ./test_api_send, + ./test_api_subscription, + ./test_api_health diff --git a/tests/api/test_api_health.nim b/tests/api/test_api_health.nim index b7aab43f9..f3dd340af 100644 --- a/tests/api/test_api_health.nim +++ b/tests/api/test_api_health.nim @@ -13,9 +13,10 @@ import waku/events/health_events, waku/common/waku_protocol, waku/factory/waku_conf +import tools/confutils/cli_args const TestTimeout = chronos.seconds(10) -const DefaultShard = PubsubTopic("/waku/2/rs/1/0") +const DefaultShard = PubsubTopic("/waku/2/rs/3/0") const TestContentTopic = ContentTopic("/waku/2/default-content/proto") proc dummyHandler( @@ -80,7 +81,7 @@ suite "LM API health checking": newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) (await serviceNode.mountRelay()).isOkOr: raiseAssert error - serviceNode.mountMetadata(1, @[0'u16]).isOkOr: + serviceNode.mountMetadata(3, @[0'u16]).isOkOr: raiseAssert error await serviceNode.mountLibp2pPing() await serviceNode.start() @@ -89,16 +90,15 @@ suite "LM API health checking": serviceNode.wakuRelay.subscribe(DefaultShard, dummyHandler) lockNewGlobalBrokerContext: - let conf = NodeConfig.init( - mode = WakuMode.Core, - networkingConfig = - NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0), - protocolsConfig = ProtocolsConfig.init( - entryNodes = @[], - clusterId = 1'u16, - autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1), - ), - ) + var conf = defaultWakuNodeConf().valueOr: + raiseAssert error + conf.mode = Core + conf.listenAddress = parseIpAddress("0.0.0.0") + conf.tcpPort = Port(0) + conf.discv5UdpPort = Port(0) + conf.clusterId = 3'u16 + conf.numShardsInNetwork = 1 + conf.rest = false client = (await createNode(conf)).valueOr: raiseAssert error @@ -267,17 +267,15 @@ suite "LM API health checking": var edgeWaku: Waku lockNewGlobalBrokerContext: - let edgeConf = NodeConfig.init( - mode = WakuMode.Edge, - networkingConfig = - NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0), - protocolsConfig = ProtocolsConfig.init( - entryNodes = @[], - clusterId = 1'u16, - messageValidation = - MessageValidation(maxMessageSize: "150 KiB", rlnConfig: none(RlnConfig)), - ), - ) + var edgeConf = defaultWakuNodeConf().valueOr: + raiseAssert error + edgeConf.mode = Edge + edgeConf.listenAddress = parseIpAddress("0.0.0.0") + edgeConf.tcpPort = Port(0) + edgeConf.discv5UdpPort = Port(0) + edgeConf.clusterId = 3'u16 + edgeConf.maxMessageSize = "150 KiB" + edgeConf.rest = false edgeWaku = (await createNode(edgeConf)).valueOr: raiseAssert "Failed to create edge node: " & error diff --git a/tests/api/test_api_send.nim b/tests/api/test_api_send.nim index 7343fc655..30a176119 100644 --- a/tests/api/test_api_send.nim +++ b/tests/api/test_api_send.nim @@ -6,7 +6,8 @@ import ../testlib/[common, wakucore, wakunode, testasync] import ../waku_archive/archive_utils import waku, waku/[waku_node, waku_core, waku_relay/protocol, common/broker/broker_context] -import waku/api/api_conf, waku/factory/waku_conf +import waku/factory/waku_conf +import tools/confutils/cli_args type SendEventOutcome {.pure.} = enum Sent @@ -116,20 +117,18 @@ proc validate( for requestId in manager.errorRequestIds: check requestId == expectedRequestId -proc createApiNodeConf(mode: WakuMode = WakuMode.Core): NodeConfig = - # allocate random ports to avoid port-already-in-use errors - let netConf = NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0) - - result = NodeConfig.init( - mode = mode, - protocolsConfig = ProtocolsConfig.init( - entryNodes = @[], - clusterId = 1, - autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1), - ), - networkingConfig = netConf, - p2pReliability = true, - ) +proc createApiNodeConf(mode: cli_args.WakuMode = cli_args.WakuMode.Core): WakuNodeConf = + var conf = defaultWakuNodeConf().valueOr: + raiseAssert error + conf.mode = mode + conf.listenAddress = parseIpAddress("0.0.0.0") + conf.tcpPort = Port(0) + conf.discv5UdpPort = Port(0) + conf.clusterId = 3'u16 + conf.numShardsInNetwork = 1 + conf.reliabilityEnabled = true + conf.rest = false + result = conf suite "Waku API - Send": var @@ -153,7 +152,7 @@ suite "Waku API - Send": lockNewGlobalBrokerContext: relayNode1 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) - relayNode1.mountMetadata(1, @[0'u16]).isOkOr: + relayNode1.mountMetadata(3, @[0'u16]).isOkOr: raiseAssert "Failed to mount metadata: " & error (await relayNode1.mountRelay()).isOkOr: raiseAssert "Failed to mount relay" @@ -163,7 +162,7 @@ suite "Waku API - Send": lockNewGlobalBrokerContext: relayNode2 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) - relayNode2.mountMetadata(1, @[0'u16]).isOkOr: + relayNode2.mountMetadata(3, @[0'u16]).isOkOr: raiseAssert "Failed to mount metadata: " & error (await relayNode2.mountRelay()).isOkOr: raiseAssert "Failed to mount relay" @@ -173,7 +172,7 @@ suite "Waku API - Send": lockNewGlobalBrokerContext: lightpushNode = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) - lightpushNode.mountMetadata(1, @[0'u16]).isOkOr: + lightpushNode.mountMetadata(3, @[0'u16]).isOkOr: raiseAssert "Failed to mount metadata: " & error (await lightpushNode.mountRelay()).isOkOr: raiseAssert "Failed to mount relay" @@ -185,7 +184,7 @@ suite "Waku API - Send": lockNewGlobalBrokerContext: storeNode = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) - storeNode.mountMetadata(1, @[0'u16]).isOkOr: + storeNode.mountMetadata(3, @[0'u16]).isOkOr: raiseAssert "Failed to mount metadata: " & error (await storeNode.mountRelay()).isOkOr: raiseAssert "Failed to mount relay" @@ -210,7 +209,7 @@ suite "Waku API - Send": storeNodePeerId = storeNode.peerInfo.peerId # Subscribe all relay nodes to the default shard topic - const testPubsubTopic = PubsubTopic("/waku/2/rs/1/0") + const testPubsubTopic = PubsubTopic("/waku/2/rs/3/0") proc dummyHandler( topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = @@ -387,7 +386,7 @@ suite "Waku API - Send": lockNewGlobalBrokerContext: fakeLightpushNode = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) - fakeLightpushNode.mountMetadata(1, @[0'u16]).isOkOr: + fakeLightpushNode.mountMetadata(3, @[0'u16]).isOkOr: raiseAssert "Failed to mount metadata: " & error (await fakeLightpushNode.mountRelay()).isOkOr: raiseAssert "Failed to mount relay" @@ -402,13 +401,13 @@ suite "Waku API - Send": discard fakeLightpushNode.subscribe( - (kind: PubsubSub, topic: PubsubTopic("/waku/2/rs/1/0")), dummyHandler + (kind: PubsubSub, topic: PubsubTopic("/waku/2/rs/3/0")), dummyHandler ).isOkOr: raiseAssert "Failed to subscribe fakeLightpushNode: " & error var node: Waku lockNewGlobalBrokerContext: - node = (await createNode(createApiNodeConf(WakuMode.Edge))).valueOr: + node = (await createNode(createApiNodeConf(cli_args.WakuMode.Edge))).valueOr: raiseAssert error (await startWaku(addr node)).isOkOr: raiseAssert "Failed to start Waku node: " & error diff --git a/tests/api/test_api_subscription.nim b/tests/api/test_api_subscription.nim new file mode 100644 index 000000000..6639e3dea --- /dev/null +++ b/tests/api/test_api_subscription.nim @@ -0,0 +1,400 @@ +{.used.} + +import std/[strutils, net, options, sets] +import chronos, testutils/unittests, stew/byteutils +import libp2p/[peerid, peerinfo, multiaddress, crypto/crypto] +import ../testlib/[common, wakucore, wakunode, testasync] + +import + waku, + waku/[ + waku_node, + waku_core, + common/broker/broker_context, + events/message_events, + waku_relay/protocol, + ] +import waku/factory/waku_conf +import tools/confutils/cli_args + +# TODO: Edge testing (after MAPI edge support is completed) + +const TestTimeout = chronos.seconds(10) +const NegativeTestTimeout = chronos.seconds(2) + +type ReceiveEventListenerManager = ref object + brokerCtx: BrokerContext + receivedListener: MessageReceivedEventListener + receivedEvent: AsyncEvent + receivedMessages: seq[WakuMessage] + targetCount: int + +proc newReceiveEventListenerManager( + brokerCtx: BrokerContext, expectedCount: int = 1 +): ReceiveEventListenerManager = + let manager = ReceiveEventListenerManager( + brokerCtx: brokerCtx, receivedMessages: @[], targetCount: expectedCount + ) + manager.receivedEvent = newAsyncEvent() + + manager.receivedListener = MessageReceivedEvent + .listen( + brokerCtx, + proc(event: MessageReceivedEvent) {.async: (raises: []).} = + manager.receivedMessages.add(event.message) + + if manager.receivedMessages.len >= manager.targetCount: + manager.receivedEvent.fire() + , + ) + .expect("Failed to listen to MessageReceivedEvent") + + return manager + +proc teardown(manager: ReceiveEventListenerManager) = + MessageReceivedEvent.dropListener(manager.brokerCtx, manager.receivedListener) + +proc waitForEvents( + manager: ReceiveEventListenerManager, timeout: Duration +): Future[bool] {.async.} = + return await manager.receivedEvent.wait().withTimeout(timeout) + +type TestNetwork = ref object + publisher: WakuNode + subscriber: Waku + publisherPeerInfo: RemotePeerInfo + +proc createApiNodeConf( + mode: cli_args.WakuMode = cli_args.WakuMode.Core, numShards: uint16 = 1 +): WakuNodeConf = + var conf = defaultWakuNodeConf().valueOr: + raiseAssert error + conf.mode = mode + conf.listenAddress = parseIpAddress("0.0.0.0") + conf.tcpPort = Port(0) + conf.discv5UdpPort = Port(0) + conf.clusterId = 3'u16 + conf.numShardsInNetwork = numShards + conf.reliabilityEnabled = true + conf.rest = false + result = conf + +proc setupSubscriberNode(conf: WakuNodeConf): Future[Waku] {.async.} = + var node: Waku + lockNewGlobalBrokerContext: + node = (await createNode(conf)).expect("Failed to create subscriber node") + (await startWaku(addr node)).expect("Failed to start subscriber node") + return node + +proc setupNetwork( + numShards: uint16 = 1, mode: cli_args.WakuMode = cli_args.WakuMode.Core +): Future[TestNetwork] {.async.} = + var net = TestNetwork() + + lockNewGlobalBrokerContext: + net.publisher = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + net.publisher.mountMetadata(3, @[0'u16]).expect("Failed to mount metadata") + (await net.publisher.mountRelay()).expect("Failed to mount relay") + await net.publisher.mountLibp2pPing() + await net.publisher.start() + + net.publisherPeerInfo = net.publisher.peerInfo.toRemotePeerInfo() + + proc dummyHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + discard + + # Subscribe the publisher to all shards to guarantee a GossipSub mesh with the subscriber. + # Currently, Core/Relay nodes auto-subscribe to all network shards on boot, but if + # that changes, this will be needed to cause the publisher to have shard interest + # for any shards the subscriber may want to use, which is required for waitForMesh to work. + for i in 0 ..< numShards.int: + let shard = PubsubTopic("/waku/2/rs/3/" & $i) + net.publisher.subscribe((kind: PubsubSub, topic: shard), dummyHandler).expect( + "Failed to sub publisher" + ) + + net.subscriber = await setupSubscriberNode(createApiNodeConf(mode, numShards)) + + await net.subscriber.node.connectToNodes(@[net.publisherPeerInfo]) + + return net + +proc teardown(net: TestNetwork) {.async.} = + if not isNil(net.subscriber): + (await net.subscriber.stop()).expect("Failed to stop subscriber node") + net.subscriber = nil + + if not isNil(net.publisher): + await net.publisher.stop() + net.publisher = nil + +proc getRelayShard(node: WakuNode, contentTopic: ContentTopic): PubsubTopic = + let autoSharding = node.wakuAutoSharding.get() + let shardObj = autoSharding.getShard(contentTopic).expect("Failed to get shard") + return PubsubTopic($shardObj) + +proc waitForMesh(node: WakuNode, shard: PubsubTopic) {.async.} = + for _ in 0 ..< 50: + if node.wakuRelay.getNumPeersInMesh(shard).valueOr(0) > 0: + return + await sleepAsync(100.milliseconds) + raise newException(ValueError, "GossipSub Mesh failed to stabilize on " & shard) + +proc publishToMesh( + net: TestNetwork, contentTopic: ContentTopic, payload: seq[byte] +): Future[Result[int, string]] {.async.} = + let shard = net.subscriber.node.getRelayShard(contentTopic) + + await waitForMesh(net.publisher, shard) + + let msg = WakuMessage( + payload: payload, contentTopic: contentTopic, version: 0, timestamp: now() + ) + return await net.publisher.publish(some(shard), msg) + +suite "Messaging API, SubscriptionManager": + asyncTest "Subscription API, relay node auto subscribe and receive message": + let net = await setupNetwork(1) + defer: + await net.teardown() + + let testTopic = ContentTopic("/waku/2/test-content/proto") + (await net.subscriber.subscribe(testTopic)).expect( + "subscriberNode failed to subscribe" + ) + + let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + defer: + eventManager.teardown() + + discard (await net.publishToMesh(testTopic, "Hello, world!".toBytes())).expect( + "Publish failed" + ) + + require await eventManager.waitForEvents(TestTimeout) + require eventManager.receivedMessages.len == 1 + check eventManager.receivedMessages[0].contentTopic == testTopic + + asyncTest "Subscription API, relay node ignores unsubscribed content topics on same shard": + let net = await setupNetwork(1) + defer: + await net.teardown() + + let subbedTopic = ContentTopic("/waku/2/subbed-topic/proto") + let ignoredTopic = ContentTopic("/waku/2/ignored-topic/proto") + (await net.subscriber.subscribe(subbedTopic)).expect("failed to subscribe") + + let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + defer: + eventManager.teardown() + + discard (await net.publishToMesh(ignoredTopic, "Ghost Msg".toBytes())).expect( + "Publish failed" + ) + + check not await eventManager.waitForEvents(NegativeTestTimeout) + check eventManager.receivedMessages.len == 0 + + asyncTest "Subscription API, relay node unsubscribe stops message receipt": + let net = await setupNetwork(1) + defer: + await net.teardown() + + let testTopic = ContentTopic("/waku/2/unsub-test/proto") + + (await net.subscriber.subscribe(testTopic)).expect("failed to subscribe") + net.subscriber.unsubscribe(testTopic).expect("failed to unsubscribe") + + let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + defer: + eventManager.teardown() + + discard (await net.publishToMesh(testTopic, "Should be dropped".toBytes())).expect( + "Publish failed" + ) + + check not await eventManager.waitForEvents(NegativeTestTimeout) + check eventManager.receivedMessages.len == 0 + + asyncTest "Subscription API, overlapping topics on same shard maintain correct isolation": + let net = await setupNetwork(1) + defer: + await net.teardown() + + let topicA = ContentTopic("/waku/2/topic-a/proto") + let topicB = ContentTopic("/waku/2/topic-b/proto") + (await net.subscriber.subscribe(topicA)).expect("failed to sub A") + (await net.subscriber.subscribe(topicB)).expect("failed to sub B") + + let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + defer: + eventManager.teardown() + + net.subscriber.unsubscribe(topicA).expect("failed to unsub A") + + discard (await net.publishToMesh(topicA, "Dropped Message".toBytes())).expect( + "Publish A failed" + ) + discard + (await net.publishToMesh(topicB, "Kept Msg".toBytes())).expect("Publish B failed") + + require await eventManager.waitForEvents(TestTimeout) + require eventManager.receivedMessages.len == 1 + check eventManager.receivedMessages[0].contentTopic == topicB + + asyncTest "Subscription API, redundant subs tolerated and subs are removed": + let net = await setupNetwork(1) + defer: + await net.teardown() + + let glitchTopic = ContentTopic("/waku/2/glitch/proto") + + (await net.subscriber.subscribe(glitchTopic)).expect("failed to sub") + (await net.subscriber.subscribe(glitchTopic)).expect("failed to double sub") + net.subscriber.unsubscribe(glitchTopic).expect("failed to unsub") + + let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + defer: + eventManager.teardown() + + discard (await net.publishToMesh(glitchTopic, "Ghost Msg".toBytes())).expect( + "Publish failed" + ) + + check not await eventManager.waitForEvents(NegativeTestTimeout) + check eventManager.receivedMessages.len == 0 + + asyncTest "Subscription API, resubscribe to an unsubscribed topic": + let net = await setupNetwork(1) + defer: + await net.teardown() + + let testTopic = ContentTopic("/waku/2/resub-test/proto") + + # Subscribe + (await net.subscriber.subscribe(testTopic)).expect("Initial sub failed") + + var eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + discard + (await net.publishToMesh(testTopic, "Msg 1".toBytes())).expect("Pub 1 failed") + + require await eventManager.waitForEvents(TestTimeout) + eventManager.teardown() + + # Unsubscribe and verify teardown + net.subscriber.unsubscribe(testTopic).expect("Unsub failed") + eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + + discard + (await net.publishToMesh(testTopic, "Ghost".toBytes())).expect("Ghost pub failed") + + check not await eventManager.waitForEvents(NegativeTestTimeout) + eventManager.teardown() + + # Resubscribe + (await net.subscriber.subscribe(testTopic)).expect("Resub failed") + eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + + discard + (await net.publishToMesh(testTopic, "Msg 2".toBytes())).expect("Pub 2 failed") + + require await eventManager.waitForEvents(TestTimeout) + check eventManager.receivedMessages[0].payload == "Msg 2".toBytes() + + asyncTest "Subscription API, two content topics in different shards": + let net = await setupNetwork(8) + defer: + await net.teardown() + + var topicA = ContentTopic("/appA/2/shard-test-a/proto") + var topicB = ContentTopic("/appB/2/shard-test-b/proto") + + # generate two content topics that land in two different shards + var i = 0 + while net.subscriber.node.getRelayShard(topicA) == + net.subscriber.node.getRelayShard(topicB): + topicB = ContentTopic("/appB" & $i & "/2/shard-test-b/proto") + inc i + + (await net.subscriber.subscribe(topicA)).expect("failed to sub A") + (await net.subscriber.subscribe(topicB)).expect("failed to sub B") + + let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 2) + defer: + eventManager.teardown() + + discard (await net.publishToMesh(topicA, "Msg on Shard A".toBytes())).expect( + "Publish A failed" + ) + discard (await net.publishToMesh(topicB, "Msg on Shard B".toBytes())).expect( + "Publish B failed" + ) + + require await eventManager.waitForEvents(TestTimeout) + require eventManager.receivedMessages.len == 2 + + asyncTest "Subscription API, many content topics in many shards": + let net = await setupNetwork(8) + defer: + await net.teardown() + + var allTopics: seq[ContentTopic] + for i in 0 ..< 100: + allTopics.add(ContentTopic("/stress-app-" & $i & "/2/state-test/proto")) + + var activeSubs: seq[ContentTopic] + + proc verifyNetworkState(expected: seq[ContentTopic]) {.async.} = + let eventManager = + newReceiveEventListenerManager(net.subscriber.brokerCtx, expected.len) + + for topic in allTopics: + discard (await net.publishToMesh(topic, "Stress Payload".toBytes())).expect( + "publish failed" + ) + + require await eventManager.waitForEvents(TestTimeout) + + # here we just give a chance for any messages that we don't expect to arrive + await sleepAsync(1.seconds) + eventManager.teardown() + + # weak check (but catches most bugs) + require eventManager.receivedMessages.len == expected.len + + # strict expected receipt test + var receivedTopics = initHashSet[ContentTopic]() + for msg in eventManager.receivedMessages: + receivedTopics.incl(msg.contentTopic) + var expectedTopics = initHashSet[ContentTopic]() + for t in expected: + expectedTopics.incl(t) + + check receivedTopics == expectedTopics + + # subscribe to all content topics we generated + for t in allTopics: + (await net.subscriber.subscribe(t)).expect("sub failed") + activeSubs.add(t) + + await verifyNetworkState(activeSubs) + + # unsubscribe from some content topics + for i in 0 ..< 50: + let t = allTopics[i] + net.subscriber.unsubscribe(t).expect("unsub failed") + + let idx = activeSubs.find(t) + if idx >= 0: + activeSubs.del(idx) + + await verifyNetworkState(activeSubs) + + # re-subscribe to some content topics + for i in 0 ..< 25: + let t = allTopics[i] + (await net.subscriber.subscribe(t)).expect("resub failed") + activeSubs.add(t) + + await verifyNetworkState(activeSubs) diff --git a/tests/api/test_entry_nodes.nim b/tests/api/test_entry_nodes.nim index 136a49b2b..38dc38ba4 100644 --- a/tests/api/test_entry_nodes.nim +++ b/tests/api/test_entry_nodes.nim @@ -2,7 +2,7 @@ import std/options, results, testutils/unittests -import waku/api/entry_nodes +import tools/confutils/entry_nodes # Since classifyEntryNode is internal, we test it indirectly through processEntryNodes behavior # The enum is exported so we can test against it diff --git a/tests/api/test_node_conf.nim b/tests/api/test_node_conf.nim index 84bbfead3..d0b3d433c 100644 --- a/tests/api/test_node_conf.nim +++ b/tests/api/test_node_conf.nim @@ -1,36 +1,64 @@ {.used.} -import std/options, results, stint, testutils/unittests +import std/[options, json, strutils], results, stint, testutils/unittests import json_serialization -import waku/api/api_conf, waku/factory/waku_conf, waku/factory/networks_config +import confutils, confutils/std/net +import tools/confutils/cli_args +import waku/factory/waku_conf, waku/factory/networks_config import waku/common/logging -suite "LibWaku Conf - toWakuConf": - test "Minimal configuration": +# Helper: parse JSON into WakuNodeConf using fieldPairs (same as liblogosdelivery) +proc parseWakuNodeConfFromJson(jsonStr: string): Result[WakuNodeConf, string] = + var conf = defaultWakuNodeConf().valueOr: + return err(error) + var jsonNode: JsonNode + try: + jsonNode = parseJson(jsonStr) + except Exception: + return err("JSON parse error: " & getCurrentExceptionMsg()) + for confField, confValue in fieldPairs(conf): + if jsonNode.contains(confField): + let formattedString = ($jsonNode[confField]).strip(chars = {'\"'}) + try: + confValue = parseCmdArg(typeof(confValue), formattedString) + except Exception: + return err( + "Field '" & confField & "' parse error: " & getCurrentExceptionMsg() & + ". Value: " & formattedString + ) + return ok(conf) + +suite "WakuNodeConf - mode-driven toWakuConf": + test "Core mode enables service protocols": ## Given - let nodeConfig = NodeConfig.init(ethRpcEndpoints = @["http://someaddress"]) + var conf = defaultWakuNodeConf().valueOr: + raiseAssert error + conf.mode = Core + conf.clusterId = 1 ## When - let wakuConfRes = toWakuConf(nodeConfig) + let wakuConfRes = conf.toWakuConf() ## Then - let wakuConf = wakuConfRes.valueOr: - raiseAssert error - wakuConf.validate().isOkOr: - raiseAssert error + require wakuConfRes.isOk() + let wakuConf = wakuConfRes.get() + require wakuConf.validate().isOk() check: + wakuConf.relay == true + wakuConf.lightPush == true + wakuConf.peerExchangeService == true + wakuConf.rendezvous == true wakuConf.clusterId == 1 - wakuConf.shardingConf.numShardsInCluster == 8 - wakuConf.staticNodes.len == 0 - test "Edge mode configuration": + test "Edge mode disables service protocols": ## Given - let protocolsConfig = ProtocolsConfig.init(entryNodes = @[], clusterId = 1) - - let nodeConfig = NodeConfig.init(mode = Edge, protocolsConfig = protocolsConfig) + var conf = defaultWakuNodeConf().valueOr: + raiseAssert error + conf.mode = Edge + conf.clusterId = 1 ## When - let wakuConfRes = toWakuConf(nodeConfig) + let wakuConfRes = conf.toWakuConf() ## Then require wakuConfRes.isOk() @@ -42,16 +70,175 @@ suite "LibWaku Conf - toWakuConf": wakuConf.filterServiceConf.isSome() == false wakuConf.storeServiceConf.isSome() == false wakuConf.peerExchangeService == true - wakuConf.clusterId == 1 - test "Core mode configuration": + test "noMode uses explicit CLI flags as-is": ## Given - let protocolsConfig = ProtocolsConfig.init(entryNodes = @[], clusterId = 1) - - let nodeConfig = NodeConfig.init(mode = Core, protocolsConfig = protocolsConfig) + var conf = defaultWakuNodeConf().valueOr: + raiseAssert error + conf.mode = WakuMode.noMode + conf.relay = true + conf.lightpush = false + conf.clusterId = 5 ## When - let wakuConfRes = toWakuConf(nodeConfig) + let wakuConfRes = conf.toWakuConf() + + ## Then + require wakuConfRes.isOk() + let wakuConf = wakuConfRes.get() + require wakuConf.validate().isOk() + check: + wakuConf.relay == true + wakuConf.lightPush == false + wakuConf.clusterId == 5 + + test "Core mode overrides individual protocol flags": + ## Given - user sets relay=false but mode=Core should override + var conf = defaultWakuNodeConf().valueOr: + raiseAssert error + conf.mode = Core + conf.relay = false # will be overridden by Core mode + + ## When + let wakuConfRes = conf.toWakuConf() + + ## Then + require wakuConfRes.isOk() + let wakuConf = wakuConfRes.get() + require wakuConf.validate().isOk() + check: + wakuConf.relay == true # mode overrides + +suite "WakuNodeConf - JSON parsing with fieldPairs": + test "Empty JSON produces valid default conf": + ## Given / When + let confRes = parseWakuNodeConfFromJson("{}") + + ## Then + require confRes.isOk() + let conf = confRes.get() + check: + conf.mode == WakuMode.noMode + conf.clusterId == 0 + conf.logLevel == logging.LogLevel.INFO + + test "JSON with mode and clusterId": + ## Given / When + let confRes = parseWakuNodeConfFromJson("""{"mode": "Core", "clusterId": 42}""") + + ## Then + require confRes.isOk() + let conf = confRes.get() + check: + conf.mode == Core + conf.clusterId == 42 + + test "JSON with Edge mode": + ## Given / When + let confRes = parseWakuNodeConfFromJson("""{"mode": "Edge"}""") + + ## Then + require confRes.isOk() + let conf = confRes.get() + check: + conf.mode == Edge + + test "JSON with logLevel": + ## Given / When + let confRes = parseWakuNodeConfFromJson("""{"logLevel": "DEBUG"}""") + + ## Then + require confRes.isOk() + let conf = confRes.get() + check: + conf.logLevel == logging.LogLevel.DEBUG + + test "JSON with sharding config": + ## Given / When + let confRes = + parseWakuNodeConfFromJson("""{"clusterId": 99, "numShardsInNetwork": 16}""") + + ## Then + require confRes.isOk() + let conf = confRes.get() + check: + conf.clusterId == 99 + conf.numShardsInNetwork == 16 + + test "JSON with unknown fields is silently ignored": + ## Given / When + let confRes = + parseWakuNodeConfFromJson("""{"unknownField": true, "clusterId": 5}""") + + ## Then - unknown fields are just ignored (not in fieldPairs) + require confRes.isOk() + let conf = confRes.get() + check: + conf.clusterId == 5 + + test "Invalid JSON syntax returns error": + ## Given / When + let confRes = parseWakuNodeConfFromJson("{ not valid json }") + + ## Then + check confRes.isErr() + +suite "WakuNodeConf - preset integration": + test "TWN preset applies TheWakuNetworkConf": + ## Given + var conf = defaultWakuNodeConf().valueOr: + raiseAssert error + conf.preset = "twn" + + ## When + let wakuConfRes = conf.toWakuConf() + + ## Then + require wakuConfRes.isOk() + let wakuConf = wakuConfRes.get() + require wakuConf.validate().isOk() + check: + wakuConf.clusterId == 1 + + test "LogosDev preset applies LogosDevConf": + ## Given + var conf = defaultWakuNodeConf().valueOr: + raiseAssert error + conf.preset = "logosdev" + + ## When + let wakuConfRes = conf.toWakuConf() + + ## Then + require wakuConfRes.isOk() + let wakuConf = wakuConfRes.get() + require wakuConf.validate().isOk() + check: + wakuConf.clusterId == 2 + + test "Invalid preset returns error": + ## Given + var conf = defaultWakuNodeConf().valueOr: + raiseAssert error + conf.preset = "nonexistent" + + ## When + let wakuConfRes = conf.toWakuConf() + + ## Then + check wakuConfRes.isErr() + +suite "WakuNodeConf JSON -> WakuConf integration": + test "Core mode JSON config produces valid WakuConf": + ## Given + let confRes = parseWakuNodeConfFromJson( + """{"mode": "Core", "clusterId": 55, "numShardsInNetwork": 6}""" + ) + require confRes.isOk() + let conf = confRes.get() + + ## When + let wakuConfRes = conf.toWakuConf() ## Then require wakuConfRes.isOk() @@ -61,93 +248,72 @@ suite "LibWaku Conf - toWakuConf": wakuConf.relay == true wakuConf.lightPush == true wakuConf.peerExchangeService == true - wakuConf.clusterId == 1 + wakuConf.clusterId == 55 + wakuConf.shardingConf.numShardsInCluster == 6 - test "Auto-sharding configuration": + test "Edge mode JSON config produces valid WakuConf": ## Given - let nodeConfig = NodeConfig.init( - mode = Core, - protocolsConfig = ProtocolsConfig.init( - entryNodes = @[], - staticStoreNodes = @[], - clusterId = 42, - autoShardingConfig = AutoShardingConfig(numShardsInCluster: 16), - ), - ) + let confRes = parseWakuNodeConfFromJson("""{"mode": "Edge", "clusterId": 1}""") + require confRes.isOk() + let conf = confRes.get() ## When - let wakuConfRes = toWakuConf(nodeConfig) + let wakuConfRes = conf.toWakuConf() ## Then require wakuConfRes.isOk() let wakuConf = wakuConfRes.get() require wakuConf.validate().isOk() check: - wakuConf.clusterId == 42 - wakuConf.shardingConf.numShardsInCluster == 16 + wakuConf.relay == false + wakuConf.lightPush == false + wakuConf.peerExchangeService == true - test "Bootstrap nodes configuration": + test "JSON with preset produces valid WakuConf": ## Given - let entryNodes = - @[ - "enr:-QESuEC1p_s3xJzAC_XlOuuNrhVUETmfhbm1wxRGis0f7DlqGSw2FM-p2Vn7gmfkTTnAe8Ys2cgGBN8ufJnvzKQFZqFMBgmlkgnY0iXNlY3AyNTZrMaEDS8-D878DrdbNwcuY-3p1qdDp5MOoCurhdsNPJTXZ3c5g3RjcIJ2X4N1ZHCCd2g", - "enr:-QEkuECnZ3IbVAgkOzv-QLnKC4dRKAPRY80m1-R7G8jZ7yfT3ipEfBrhKN7ARcQgQ-vg-h40AQzyvAkPYlHPaFKk6u9MBgmlkgnY0iXNlY3AyNTZrMaEDk49D8JjMSns4p1XVNBvJquOUzT4PENSJknkROspfAFGg3RjcIJ2X4N1ZHCCd2g", - ] - let libConf = NodeConfig.init( - mode = Core, - protocolsConfig = ProtocolsConfig.init( - entryNodes = entryNodes, staticStoreNodes = @[], clusterId = 1 - ), - ) + let confRes = + parseWakuNodeConfFromJson("""{"mode": "Core", "preset": "logosdev"}""") + require confRes.isOk() + let conf = confRes.get() ## When - let wakuConfRes = toWakuConf(libConf) - - ## Then - require wakuConfRes.isOk() - let wakuConf = wakuConfRes.get() - require wakuConf.validate().isOk() - require wakuConf.discv5Conf.isSome() - check: - wakuConf.discv5Conf.get().bootstrapNodes == entryNodes - - test "Static store nodes configuration": - ## Given - let staticStoreNodes = - @[ - "/ip4/127.0.0.1/tcp/60000/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc", - "/ip4/192.168.1.1/tcp/60001/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYd", - ] - let nodeConf = NodeConfig.init( - protocolsConfig = ProtocolsConfig.init( - entryNodes = @[], staticStoreNodes = staticStoreNodes, clusterId = 1 - ) - ) - - ## When - let wakuConfRes = toWakuConf(nodeConf) + let wakuConfRes = conf.toWakuConf() ## Then require wakuConfRes.isOk() let wakuConf = wakuConfRes.get() require wakuConf.validate().isOk() check: - wakuConf.staticNodes == staticStoreNodes + wakuConf.clusterId == 2 + wakuConf.relay == true - test "Message validation with max message size": + test "JSON with static nodes": ## Given - let nodeConfig = NodeConfig.init( - protocolsConfig = ProtocolsConfig.init( - entryNodes = @[], - staticStoreNodes = @[], - clusterId = 1, - messageValidation = - MessageValidation(maxMessageSize: "100KiB", rlnConfig: none(RlnConfig)), - ) + let confRes = parseWakuNodeConfFromJson( + """{"mode": "Core", "clusterId": 42, "staticnodes": ["/ip4/127.0.0.1/tcp/60000/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"]}""" ) + require confRes.isOk() + let conf = confRes.get() ## When - let wakuConfRes = toWakuConf(nodeConfig) + let wakuConfRes = conf.toWakuConf() + + ## Then + require wakuConfRes.isOk() + let wakuConf = wakuConfRes.get() + require wakuConf.validate().isOk() + check: + wakuConf.staticNodes.len == 1 + + test "JSON with max message size": + ## Given + let confRes = + parseWakuNodeConfFromJson("""{"clusterId": 42, "maxMessageSize": "100KiB"}""") + require confRes.isOk() + let conf = confRes.get() + + ## When + let wakuConfRes = conf.toWakuConf() ## Then require wakuConfRes.isOk() @@ -156,853 +322,49 @@ suite "LibWaku Conf - toWakuConf": check: wakuConf.maxMessageSizeBytes == 100'u64 * 1024'u64 - test "Message validation with RLN config": - ## Given - let nodeConfig = NodeConfig.init( - protocolsConfig = ProtocolsConfig.init( - entryNodes = @[], - clusterId = 1, - messageValidation = MessageValidation( - maxMessageSize: "150 KiB", - rlnConfig: some( - RlnConfig( - contractAddress: "0x1234567890123456789012345678901234567890", - chainId: 1'u, - epochSizeSec: 600'u64, - ) - ), - ), - ), - ethRpcEndpoints = @["http://127.0.0.1:1111"], - ) +# ---- Deprecated NodeConfig tests (kept for backward compatibility) ---- - ## When - let wakuConf = toWakuConf(nodeConfig).valueOr: - raiseAssert error +{.push warning[Deprecated]: off.} - wakuConf.validate().isOkOr: - raiseAssert error +import waku/api/api_conf - check: - wakuConf.maxMessageSizeBytes == 150'u64 * 1024'u64 - - require wakuConf.rlnRelayConf.isSome() - let rlnConf = wakuConf.rlnRelayConf.get() - check: - rlnConf.dynamic == true - rlnConf.ethContractAddress == "0x1234567890123456789012345678901234567890" - rlnConf.chainId == 1'u256 - rlnConf.epochSizeSec == 600'u64 - - test "Full Core mode configuration with all fields": - ## Given - let nodeConfig = NodeConfig.init( - mode = Core, - protocolsConfig = ProtocolsConfig.init( - entryNodes = - @[ - "enr:-QESuEC1p_s3xJzAC_XlOuuNrhVUETmfhbm1wxRGis0f7DlqGSw2FM-p2Vn7gmfkTTnAe8Ys2cgGBN8ufJnvzKQFZqFMBgmlkgnY0iXNlY3AyNTZrMaEDS8-D878DrdbNwcuY-3p1qdDp5MOoCurhdsNPJTXZ3c5g3RjcIJ2X4N1ZHCCd2g" - ], - staticStoreNodes = - @[ - "/ip4/127.0.0.1/tcp/60000/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc" - ], - clusterId = 99, - autoShardingConfig = AutoShardingConfig(numShardsInCluster: 12), - messageValidation = MessageValidation( - maxMessageSize: "512KiB", - rlnConfig: some( - RlnConfig( - contractAddress: "0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", - chainId: 5'u, # Goerli - epochSizeSec: 300'u64, - ) - ), - ), - ), - ethRpcEndpoints = @["https://127.0.0.1:8333"], - ) - - ## When - let wakuConfRes = toWakuConf(nodeConfig) - - ## Then +suite "NodeConfig (deprecated) - toWakuConf": + test "Minimal configuration": + let nodeConfig = NodeConfig.init(ethRpcEndpoints = @["http://someaddress"]) + let wakuConfRes = api_conf.toWakuConf(nodeConfig) let wakuConf = wakuConfRes.valueOr: raiseAssert error wakuConf.validate().isOkOr: raiseAssert error + check: + wakuConf.clusterId == 1 + wakuConf.shardingConf.numShardsInCluster == 8 + wakuConf.staticNodes.len == 0 - # Check basic settings + test "Edge mode configuration": + let protocolsConfig = ProtocolsConfig.init(entryNodes = @[], clusterId = 1) + let nodeConfig = + NodeConfig.init(mode = api_conf.WakuMode.Edge, protocolsConfig = protocolsConfig) + let wakuConfRes = api_conf.toWakuConf(nodeConfig) + require wakuConfRes.isOk() + let wakuConf = wakuConfRes.get() + require wakuConf.validate().isOk() + check: + wakuConf.relay == false + wakuConf.lightPush == false + wakuConf.peerExchangeService == true + + test "Core mode configuration": + let protocolsConfig = ProtocolsConfig.init(entryNodes = @[], clusterId = 1) + let nodeConfig = + NodeConfig.init(mode = api_conf.WakuMode.Core, protocolsConfig = protocolsConfig) + let wakuConfRes = api_conf.toWakuConf(nodeConfig) + require wakuConfRes.isOk() + let wakuConf = wakuConfRes.get() + require wakuConf.validate().isOk() check: wakuConf.relay == true wakuConf.lightPush == true wakuConf.peerExchangeService == true - wakuConf.rendezvous == true - wakuConf.clusterId == 99 - # Check sharding - check: - wakuConf.shardingConf.numShardsInCluster == 12 - - # Check bootstrap nodes - require wakuConf.discv5Conf.isSome() - check: - wakuConf.discv5Conf.get().bootstrapNodes.len == 1 - - # Check static nodes - check: - wakuConf.staticNodes.len == 1 - wakuConf.staticNodes[0] == - "/ip4/127.0.0.1/tcp/60000/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc" - - # Check message validation - check: - wakuConf.maxMessageSizeBytes == 512'u64 * 1024'u64 - - # Check RLN config - require wakuConf.rlnRelayConf.isSome() - let rlnConf = wakuConf.rlnRelayConf.get() - check: - rlnConf.dynamic == true - rlnConf.ethContractAddress == "0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" - rlnConf.chainId == 5'u256 - rlnConf.epochSizeSec == 300'u64 - - test "NodeConfig with mixed entry nodes (integration test)": - ## Given - let entryNodes = - @[ - "enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im", - "/ip4/127.0.0.1/tcp/60000/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc", - ] - - let nodeConfig = NodeConfig.init( - mode = Core, - protocolsConfig = ProtocolsConfig.init( - entryNodes = entryNodes, staticStoreNodes = @[], clusterId = 1 - ), - ) - - ## When - let wakuConfRes = toWakuConf(nodeConfig) - - ## Then - require wakuConfRes.isOk() - let wakuConf = wakuConfRes.get() - require wakuConf.validate().isOk() - - # Check that ENRTree went to DNS discovery - require wakuConf.dnsDiscoveryConf.isSome() - check: - wakuConf.dnsDiscoveryConf.get().enrTreeUrl == entryNodes[0] - - # Check that multiaddr went to static nodes - check: - wakuConf.staticNodes.len == 1 - wakuConf.staticNodes[0] == entryNodes[1] - -suite "NodeConfig JSON - complete format": - test "Full NodeConfig from complete JSON with field validation": - ## Given - let jsonStr = - """ - { - "mode": "Core", - "protocolsConfig": { - "entryNodes": ["enrtree://TREE@nodes.example.com"], - "staticStoreNodes": ["/ip4/1.2.3.4/tcp/80/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"], - "clusterId": 10, - "autoShardingConfig": { - "numShardsInCluster": 4 - }, - "messageValidation": { - "maxMessageSize": "100 KiB", - "rlnConfig": null - } - }, - "networkingConfig": { - "listenIpv4": "192.168.1.1", - "p2pTcpPort": 7000, - "discv5UdpPort": 7001 - }, - "ethRpcEndpoints": ["http://localhost:8545"], - "p2pReliability": true, - "logLevel": "WARN", - "logFormat": "TEXT" - } - """ - - ## When - let config = decodeNodeConfigFromJson(jsonStr) - - ## Then — check every field - check: - config.mode == WakuMode.Core - config.ethRpcEndpoints == @["http://localhost:8545"] - config.p2pReliability == true - config.logLevel == LogLevel.WARN - config.logFormat == LogFormat.TEXT - - check: - config.networkingConfig.listenIpv4 == "192.168.1.1" - config.networkingConfig.p2pTcpPort == 7000 - config.networkingConfig.discv5UdpPort == 7001 - - let pc = config.protocolsConfig - check: - pc.entryNodes == @["enrtree://TREE@nodes.example.com"] - pc.staticStoreNodes == - @[ - "/ip4/1.2.3.4/tcp/80/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc" - ] - pc.clusterId == 10 - pc.autoShardingConfig.numShardsInCluster == 4 - pc.messageValidation.maxMessageSize == "100 KiB" - pc.messageValidation.rlnConfig.isNone() - - test "Full NodeConfig with RlnConfig present": - ## Given - let jsonStr = - """ - { - "mode": "Edge", - "protocolsConfig": { - "entryNodes": [], - "clusterId": 1, - "messageValidation": { - "maxMessageSize": "150 KiB", - "rlnConfig": { - "contractAddress": "0x1234567890ABCDEF1234567890ABCDEF12345678", - "chainId": 5, - "epochSizeSec": 600 - } - } - }, - "networkingConfig": { - "listenIpv4": "0.0.0.0", - "p2pTcpPort": 60000, - "discv5UdpPort": 9000 - } - } - """ - - ## When - let config = decodeNodeConfigFromJson(jsonStr) - - ## Then - check config.mode == WakuMode.Edge - - let mv = config.protocolsConfig.messageValidation - check: - mv.maxMessageSize == "150 KiB" - mv.rlnConfig.isSome() - let rln = mv.rlnConfig.get() - check: - rln.contractAddress == "0x1234567890ABCDEF1234567890ABCDEF12345678" - rln.chainId == 5'u - rln.epochSizeSec == 600'u64 - - test "Round-trip encode/decode preserves all fields": - ## Given - let original = NodeConfig.init( - mode = Edge, - protocolsConfig = ProtocolsConfig.init( - entryNodes = @["enrtree://TREE@example.com"], - staticStoreNodes = - @[ - "/ip4/1.2.3.4/tcp/80/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc" - ], - clusterId = 42, - autoShardingConfig = AutoShardingConfig(numShardsInCluster: 16), - messageValidation = MessageValidation( - maxMessageSize: "256 KiB", - rlnConfig: some( - RlnConfig( - contractAddress: "0xAABBCCDDEEFF00112233445566778899AABBCCDD", - chainId: 137, - epochSizeSec: 300, - ) - ), - ), - ), - networkingConfig = - NetworkingConfig(listenIpv4: "10.0.0.1", p2pTcpPort: 9090, discv5UdpPort: 9091), - ethRpcEndpoints = @["https://rpc.example.com"], - p2pReliability = true, - logLevel = LogLevel.DEBUG, - logFormat = LogFormat.JSON, - ) - - ## When - let decoded = decodeNodeConfigFromJson(Json.encode(original)) - - ## Then — check field by field - check: - decoded.mode == original.mode - decoded.ethRpcEndpoints == original.ethRpcEndpoints - decoded.p2pReliability == original.p2pReliability - decoded.logLevel == original.logLevel - decoded.logFormat == original.logFormat - decoded.networkingConfig.listenIpv4 == original.networkingConfig.listenIpv4 - decoded.networkingConfig.p2pTcpPort == original.networkingConfig.p2pTcpPort - decoded.networkingConfig.discv5UdpPort == original.networkingConfig.discv5UdpPort - decoded.protocolsConfig.entryNodes == original.protocolsConfig.entryNodes - decoded.protocolsConfig.staticStoreNodes == - original.protocolsConfig.staticStoreNodes - decoded.protocolsConfig.clusterId == original.protocolsConfig.clusterId - decoded.protocolsConfig.autoShardingConfig.numShardsInCluster == - original.protocolsConfig.autoShardingConfig.numShardsInCluster - decoded.protocolsConfig.messageValidation.maxMessageSize == - original.protocolsConfig.messageValidation.maxMessageSize - decoded.protocolsConfig.messageValidation.rlnConfig.isSome() - - let decodedRln = decoded.protocolsConfig.messageValidation.rlnConfig.get() - let originalRln = original.protocolsConfig.messageValidation.rlnConfig.get() - check: - decodedRln.contractAddress == originalRln.contractAddress - decodedRln.chainId == originalRln.chainId - decodedRln.epochSizeSec == originalRln.epochSizeSec - -suite "NodeConfig JSON - partial format with defaults": - test "Minimal NodeConfig - empty object uses all defaults": - ## Given - let config = decodeNodeConfigFromJson("{}") - let defaultConfig = NodeConfig.init() - - ## Then — compare field by field against defaults - check: - config.mode == defaultConfig.mode - config.ethRpcEndpoints == defaultConfig.ethRpcEndpoints - config.p2pReliability == defaultConfig.p2pReliability - config.logLevel == defaultConfig.logLevel - config.logFormat == defaultConfig.logFormat - config.networkingConfig.listenIpv4 == defaultConfig.networkingConfig.listenIpv4 - config.networkingConfig.p2pTcpPort == defaultConfig.networkingConfig.p2pTcpPort - config.networkingConfig.discv5UdpPort == - defaultConfig.networkingConfig.discv5UdpPort - config.protocolsConfig.entryNodes == defaultConfig.protocolsConfig.entryNodes - config.protocolsConfig.staticStoreNodes == - defaultConfig.protocolsConfig.staticStoreNodes - config.protocolsConfig.clusterId == defaultConfig.protocolsConfig.clusterId - config.protocolsConfig.autoShardingConfig.numShardsInCluster == - defaultConfig.protocolsConfig.autoShardingConfig.numShardsInCluster - config.protocolsConfig.messageValidation.maxMessageSize == - defaultConfig.protocolsConfig.messageValidation.maxMessageSize - config.protocolsConfig.messageValidation.rlnConfig.isSome() == - defaultConfig.protocolsConfig.messageValidation.rlnConfig.isSome() - - test "Minimal NodeConfig keeps network preset defaults": - ## Given - let config = decodeNodeConfigFromJson("{}") - - ## Then - check: - config.protocolsConfig.entryNodes == TheWakuNetworkPreset.entryNodes - config.protocolsConfig.messageValidation.rlnConfig.isSome() - - test "NodeConfig with only mode specified": - ## Given - let config = decodeNodeConfigFromJson("""{"mode": "Edge"}""") - - ## Then - check: - config.mode == WakuMode.Edge - ## Remaining fields get defaults - config.logLevel == LogLevel.INFO - config.logFormat == LogFormat.TEXT - config.p2pReliability == false - config.ethRpcEndpoints == newSeq[string]() - - test "ProtocolsConfig partial - optional fields get defaults": - ## Given — only entryNodes and clusterId provided - let jsonStr = - """ - { - "protocolsConfig": { - "entryNodes": ["enrtree://X@y.com"], - "clusterId": 5 - }, - "networkingConfig": { - "listenIpv4": "0.0.0.0", - "p2pTcpPort": 60000, - "discv5UdpPort": 9000 - } - } - """ - - ## When - let config = decodeNodeConfigFromJson(jsonStr) - - ## Then — required fields are set, optionals get defaults - check: - config.protocolsConfig.entryNodes == @["enrtree://X@y.com"] - config.protocolsConfig.clusterId == 5 - config.protocolsConfig.staticStoreNodes == newSeq[string]() - config.protocolsConfig.autoShardingConfig.numShardsInCluster == - DefaultAutoShardingConfig.numShardsInCluster - config.protocolsConfig.messageValidation.maxMessageSize == - DefaultMessageValidation.maxMessageSize - config.protocolsConfig.messageValidation.rlnConfig.isNone() - - test "MessageValidation partial - rlnConfig omitted defaults to none": - ## Given - let jsonStr = - """ - { - "protocolsConfig": { - "entryNodes": [], - "clusterId": 1, - "messageValidation": { - "maxMessageSize": "200 KiB" - } - }, - "networkingConfig": { - "listenIpv4": "0.0.0.0", - "p2pTcpPort": 60000, - "discv5UdpPort": 9000 - } - } - """ - - ## When - let config = decodeNodeConfigFromJson(jsonStr) - - ## Then - check: - config.protocolsConfig.messageValidation.maxMessageSize == "200 KiB" - config.protocolsConfig.messageValidation.rlnConfig.isNone() - - test "logLevel and logFormat omitted use defaults": - ## Given - let jsonStr = - """ - { - "mode": "Core", - "protocolsConfig": { - "entryNodes": [], - "clusterId": 1 - }, - "networkingConfig": { - "listenIpv4": "0.0.0.0", - "p2pTcpPort": 60000, - "discv5UdpPort": 9000 - } - } - """ - - ## When - let config = decodeNodeConfigFromJson(jsonStr) - - ## Then - check: - config.logLevel == LogLevel.INFO - config.logFormat == LogFormat.TEXT - -suite "NodeConfig JSON - unsupported fields raise errors": - test "Unknown field at NodeConfig level raises": - let jsonStr = - """ - { - "mode": "Core", - "unknownTopLevel": true - } - """ - - var raised = false - try: - discard decodeNodeConfigFromJson(jsonStr) - except SerializationError: - raised = true - check raised - - test "Typo in NodeConfig field name raises": - let jsonStr = - """ - { - "modes": "Core" - } - """ - - var raised = false - try: - discard decodeNodeConfigFromJson(jsonStr) - except SerializationError: - raised = true - check raised - - test "Unknown field in ProtocolsConfig raises": - let jsonStr = - """ - { - "protocolsConfig": { - "entryNodes": [], - "clusterId": 1, - "futureField": "something" - }, - "networkingConfig": { - "listenIpv4": "0.0.0.0", - "p2pTcpPort": 60000, - "discv5UdpPort": 9000 - } - } - """ - - var raised = false - try: - discard decodeNodeConfigFromJson(jsonStr) - except SerializationError: - raised = true - check raised - - test "Unknown field in NetworkingConfig raises": - let jsonStr = - """ - { - "protocolsConfig": { - "entryNodes": [], - "clusterId": 1 - }, - "networkingConfig": { - "listenIpv4": "0.0.0.0", - "p2pTcpPort": 60000, - "discv5UdpPort": 9000, - "futureNetworkField": "value" - } - } - """ - - var raised = false - try: - discard decodeNodeConfigFromJson(jsonStr) - except SerializationError: - raised = true - check raised - - test "Unknown field in MessageValidation raises": - let jsonStr = - """ - { - "protocolsConfig": { - "entryNodes": [], - "clusterId": 1, - "messageValidation": { - "maxMessageSize": "150 KiB", - "maxMesssageSize": "typo" - } - }, - "networkingConfig": { - "listenIpv4": "0.0.0.0", - "p2pTcpPort": 60000, - "discv5UdpPort": 9000 - } - } - """ - - var raised = false - try: - discard decodeNodeConfigFromJson(jsonStr) - except SerializationError: - raised = true - check raised - - test "Unknown field in RlnConfig raises": - let jsonStr = - """ - { - "protocolsConfig": { - "entryNodes": [], - "clusterId": 1, - "messageValidation": { - "maxMessageSize": "150 KiB", - "rlnConfig": { - "contractAddress": "0xABCDEF1234567890ABCDEF1234567890ABCDEF12", - "chainId": 1, - "epochSizeSec": 600, - "unknownRlnField": true - } - } - }, - "networkingConfig": { - "listenIpv4": "0.0.0.0", - "p2pTcpPort": 60000, - "discv5UdpPort": 9000 - } - } - """ - - var raised = false - try: - discard decodeNodeConfigFromJson(jsonStr) - except SerializationError: - raised = true - check raised - - test "Unknown field in AutoShardingConfig raises": - let jsonStr = - """ - { - "protocolsConfig": { - "entryNodes": [], - "clusterId": 1, - "autoShardingConfig": { - "numShardsInCluster": 8, - "shardPrefix": "extra" - } - }, - "networkingConfig": { - "listenIpv4": "0.0.0.0", - "p2pTcpPort": 60000, - "discv5UdpPort": 9000 - } - } - """ - - var raised = false - try: - discard decodeNodeConfigFromJson(jsonStr) - except SerializationError: - raised = true - check raised - -suite "NodeConfig JSON - missing required fields": - test "Missing 'entryNodes' in ProtocolsConfig": - let jsonStr = - """ - { - "protocolsConfig": { - "clusterId": 1 - }, - "networkingConfig": { - "listenIpv4": "0.0.0.0", - "p2pTcpPort": 60000, - "discv5UdpPort": 9000 - } - } - """ - - var raised = false - try: - discard decodeNodeConfigFromJson(jsonStr) - except SerializationError: - raised = true - check raised - - test "Missing 'clusterId' in ProtocolsConfig": - let jsonStr = - """ - { - "protocolsConfig": { - "entryNodes": [] - }, - "networkingConfig": { - "listenIpv4": "0.0.0.0", - "p2pTcpPort": 60000, - "discv5UdpPort": 9000 - } - } - """ - - var raised = false - try: - discard decodeNodeConfigFromJson(jsonStr) - except SerializationError: - raised = true - check raised - - test "Missing required fields in NetworkingConfig": - let jsonStr = - """ - { - "protocolsConfig": { - "entryNodes": [], - "clusterId": 1 - }, - "networkingConfig": { - "listenIpv4": "0.0.0.0" - } - } - """ - - var raised = false - try: - discard decodeNodeConfigFromJson(jsonStr) - except SerializationError: - raised = true - check raised - - test "Missing 'numShardsInCluster' in AutoShardingConfig": - let jsonStr = - """ - { - "protocolsConfig": { - "entryNodes": [], - "clusterId": 1, - "autoShardingConfig": {} - }, - "networkingConfig": { - "listenIpv4": "0.0.0.0", - "p2pTcpPort": 60000, - "discv5UdpPort": 9000 - } - } - """ - - var raised = false - try: - discard decodeNodeConfigFromJson(jsonStr) - except SerializationError: - raised = true - check raised - - test "Missing required fields in RlnConfig": - let jsonStr = - """ - { - "protocolsConfig": { - "entryNodes": [], - "clusterId": 1, - "messageValidation": { - "maxMessageSize": "150 KiB", - "rlnConfig": { - "contractAddress": "0xABCD" - } - } - }, - "networkingConfig": { - "listenIpv4": "0.0.0.0", - "p2pTcpPort": 60000, - "discv5UdpPort": 9000 - } - } - """ - - var raised = false - try: - discard decodeNodeConfigFromJson(jsonStr) - except SerializationError: - raised = true - check raised - - test "Missing 'maxMessageSize' in MessageValidation": - let jsonStr = - """ - { - "protocolsConfig": { - "entryNodes": [], - "clusterId": 1, - "messageValidation": { - "rlnConfig": null - } - }, - "networkingConfig": { - "listenIpv4": "0.0.0.0", - "p2pTcpPort": 60000, - "discv5UdpPort": 9000 - } - } - """ - - var raised = false - try: - discard decodeNodeConfigFromJson(jsonStr) - except SerializationError: - raised = true - check raised - -suite "NodeConfig JSON - invalid values": - test "Invalid enum value for mode": - var raised = false - try: - discard decodeNodeConfigFromJson("""{"mode": "InvalidMode"}""") - except SerializationError: - raised = true - check raised - - test "Invalid enum value for logLevel": - var raised = false - try: - discard decodeNodeConfigFromJson("""{"logLevel": "SUPERVERBOSE"}""") - except SerializationError: - raised = true - check raised - - test "Wrong type for clusterId (string instead of number)": - let jsonStr = - """ - { - "protocolsConfig": { - "entryNodes": [], - "clusterId": "not-a-number" - }, - "networkingConfig": { - "listenIpv4": "0.0.0.0", - "p2pTcpPort": 60000, - "discv5UdpPort": 9000 - } - } - """ - - var raised = false - try: - discard decodeNodeConfigFromJson(jsonStr) - except SerializationError: - raised = true - check raised - - test "Completely invalid JSON syntax": - var raised = false - try: - discard decodeNodeConfigFromJson("""{ not valid json at all }""") - except SerializationError: - raised = true - check raised - -suite "NodeConfig JSON -> WakuConf integration": - test "Decoded config translates to valid WakuConf": - ## Given - let jsonStr = - """ - { - "mode": "Core", - "protocolsConfig": { - "entryNodes": [ - "enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im" - ], - "staticStoreNodes": [ - "/ip4/127.0.0.1/tcp/60000/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc" - ], - "clusterId": 55, - "autoShardingConfig": { - "numShardsInCluster": 6 - }, - "messageValidation": { - "maxMessageSize": "256 KiB", - "rlnConfig": null - } - }, - "networkingConfig": { - "listenIpv4": "0.0.0.0", - "p2pTcpPort": 60000, - "discv5UdpPort": 9000 - }, - "ethRpcEndpoints": ["http://localhost:8545"], - "p2pReliability": true, - "logLevel": "INFO", - "logFormat": "TEXT" - } - """ - - ## When - let nodeConfig = decodeNodeConfigFromJson(jsonStr) - let wakuConfRes = toWakuConf(nodeConfig) - - ## Then - require wakuConfRes.isOk() - let wakuConf = wakuConfRes.get() - require wakuConf.validate().isOk() - check: - wakuConf.clusterId == 55 - wakuConf.shardingConf.numShardsInCluster == 6 - wakuConf.maxMessageSizeBytes == 256'u64 * 1024'u64 - wakuConf.staticNodes.len == 1 - wakuConf.p2pReliability == true +{.pop.} diff --git a/tests/test_waku.nim b/tests/test_waku.nim index b8e2b26b1..dabd65af7 100644 --- a/tests/test_waku.nim +++ b/tests/test_waku.nim @@ -3,49 +3,49 @@ import chronos, testutils/unittests, std/options import waku +import tools/confutils/cli_args suite "Waku API - Create node": asyncTest "Create node with minimal configuration": ## Given - let nodeConfig = NodeConfig.init( - protocolsConfig = ProtocolsConfig.init(entryNodes = @[], clusterId = 1) - ) + var nodeConf = defaultWakuNodeConf().valueOr: + raiseAssert error + nodeConf.mode = Core + nodeConf.clusterId = 3'u16 + nodeConf.rest = false # This is the actual minimal config but as the node auto-start, it is not suitable for tests - # NodeConfig.init(ethRpcEndpoints = @["http://someaddress"]) ## When - let node = (await createNode(nodeConfig)).valueOr: + let node = (await createNode(nodeConf)).valueOr: raiseAssert error ## Then check: not node.isNil() - node.conf.clusterId == 1 + node.conf.clusterId == 3 node.conf.relay == true asyncTest "Create node with full configuration": ## Given - let nodeConfig = NodeConfig.init( - mode = Core, - protocolsConfig = ProtocolsConfig.init( - entryNodes = - @[ - "enr:-QESuEC1p_s3xJzAC_XlOuuNrhVUETmfhbm1wxRGis0f7DlqGSw2FM-p2Vn7gmfkTTnAe8Ys2cgGBN8ufJnvzKQFZqFMBgmlkgnY0iXNlY3AyNTZrMaEDS8-D878DrdbNwcuY-3p1qdDp5MOoCurhdsNPJTXZ3c5g3RjcIJ2X4N1ZHCCd2g" - ], - staticStoreNodes = - @[ - "/ip4/127.0.0.1/tcp/60000/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc" - ], - clusterId = 99, - autoShardingConfig = AutoShardingConfig(numShardsInCluster: 16), - messageValidation = - MessageValidation(maxMessageSize: "1024 KiB", rlnConfig: none(RlnConfig)), - ), - ) + var nodeConf = defaultWakuNodeConf().valueOr: + raiseAssert error + nodeConf.mode = Core + nodeConf.clusterId = 99'u16 + nodeConf.rest = false + nodeConf.numShardsInNetwork = 16 + nodeConf.maxMessageSize = "1024 KiB" + nodeConf.entryNodes = + @[ + "enr:-QESuEC1p_s3xJzAC_XlOuuNrhVUETmfhbm1wxRGis0f7DlqGSw2FM-p2Vn7gmfkTTnAe8Ys2cgGBN8ufJnvzKQFZqFMBgmlkgnY0iXNlY3AyNTZrMaEDS8-D878DrdbNwcuY-3p1qdDp5MOoCurhdsNPJTXZ3c5g3RjcIJ2X4N1ZHCCd2g" + ] + nodeConf.staticnodes = + @[ + "/ip4/127.0.0.1/tcp/60000/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc" + ] ## When - let node = (await createNode(nodeConfig)).valueOr: + let node = (await createNode(nodeConf)).valueOr: raiseAssert error ## Then @@ -62,20 +62,19 @@ suite "Waku API - Create node": asyncTest "Create node with mixed entry nodes (enrtree, multiaddr)": ## Given - let nodeConfig = NodeConfig.init( - mode = Core, - protocolsConfig = ProtocolsConfig.init( - entryNodes = - @[ - "enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im", - "/ip4/127.0.0.1/tcp/60000/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc", - ], - clusterId = 42, - ), - ) + var nodeConf = defaultWakuNodeConf().valueOr: + raiseAssert error + nodeConf.mode = Core + nodeConf.clusterId = 42'u16 + nodeConf.rest = false + nodeConf.entryNodes = + @[ + "enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im", + "/ip4/127.0.0.1/tcp/60000/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc", + ] ## When - let node = (await createNode(nodeConfig)).valueOr: + let node = (await createNode(nodeConf)).valueOr: raiseAssert error ## Then diff --git a/tests/waku_store/test_wakunode_store.nim b/tests/waku_store/test_wakunode_store.nim index e30854906..9239435af 100644 --- a/tests/waku_store/test_wakunode_store.nim +++ b/tests/waku_store/test_wakunode_store.nim @@ -374,6 +374,12 @@ procSuite "WakuNode - Store": waitFor allFutures(client.stop(), server.stop()) test "Store protocol queries overrun request rate limitation": + when defined(macosx): + # on macos CI, this test is resulting a code 200 (OK) instead of a 429 error + # means the runner is somehow too slow to cause a request limit failure + skip() + return + ## Setup let serverKey = generateSecp256k1Key() diff --git a/tests/wakunode2/test_app.nim b/tests/wakunode2/test_app.nim index e94a3b21d..6ec6043fe 100644 --- a/tests/wakunode2/test_app.nim +++ b/tests/wakunode2/test_app.nim @@ -21,7 +21,7 @@ suite "Wakunode2 - Waku": raiseAssert error ## When - let version = waku.version + let version = waku.stateInfo.getNodeInfoItem(NodeInfoId.Version) ## Then check: diff --git a/tools/confutils/cli_args.nim b/tools/confutils/cli_args.nim index 5e4adacb2..4a6e8c618 100644 --- a/tools/confutils/cli_args.nim +++ b/tools/confutils/cli_args.nim @@ -30,7 +30,8 @@ import waku_core/message/default_values, waku_mix, ], - ../../tools/rln_keystore_generator/rln_keystore_generator + ../../tools/rln_keystore_generator/rln_keystore_generator, + ./entry_nodes import ./envvar as confEnvvarDefs, ./envvar_net as confEnvvarNet @@ -52,6 +53,11 @@ type StartUpCommand* = enum noCommand # default, runs waku generateRlnKeystore # generates a new RLN keystore +type WakuMode* {.pure.} = enum + noMode # default - use explicit CLI flags as-is + Core # full service node + Edge # client-only node + type WakuNodeConf* = object configFile* {. desc: "Loads configuration from a TOML file (cmd-line parameters take precedence)", @@ -150,9 +156,16 @@ type WakuNodeConf* = object .}: seq[ProtectedShard] ## General node config + mode* {. + desc: + "Node operation mode. 'Core' enables relay+service protocols. 'Edge' enables client-only protocols. Default: explicit CLI flags used.", + defaultValue: WakuMode.noMode, + name: "mode" + .}: WakuMode + preset* {. desc: - "Network preset to use. 'twn' is The RLN-protected Waku Network (cluster 1). Overrides other values.", + "Network preset to use. 'twn' is The RLN-protected Waku Network (cluster 1). 'logos.dev' is the Logos Dev Network (cluster 2). Overrides other values.", defaultValue: "", name: "preset" .}: string @@ -165,7 +178,7 @@ type WakuNodeConf* = object .}: uint16 agentString* {. - defaultValue: "nwaku-" & cli_args.git_version, + defaultValue: "logos-delivery-" & cli_args.git_version, desc: "Node agent string which is used as identifier in network", name: "agent-string" .}: string @@ -293,6 +306,14 @@ hence would have reachability issues.""", name: "rln-relay-dynamic" .}: bool + entryNodes* {. + desc: + "Entry node address (enrtree:, enr:, or multiaddr). " & + "Automatically classified and distributed to DNS discovery, discv5 bootstrap, " & + "and static nodes. Argument may be repeated.", + name: "entry-node" + .}: seq[string] + staticnodes* {. desc: "Peer multiaddr to directly connect with. Argument may be repeated.", name: "staticnode" @@ -453,13 +474,15 @@ hence would have reachability issues.""", desc: """Adds an extra effort in the delivery/reception of messages by leveraging store-v3 requests. with the drawback of consuming some more bandwidth.""", - defaultValue: false, + defaultValue: true, name: "reliability" .}: bool ## REST HTTP config rest* {. - desc: "Enable Waku REST HTTP server: true|false", defaultValue: true, name: "rest" + desc: "Enable Waku REST HTTP server: true|false", + defaultValue: false, + name: "rest" .}: bool restAddress* {. @@ -907,12 +930,19 @@ proc toNetworkConf( "TWN - The Waku Network configuration will not be applied when `--cluster-id=1` is passed in future releases. Use `--preset=twn` instead." ) lcPreset = "twn" + if clusterId.isSome() and clusterId.get() == 2: + warn( + "Logos.dev - Logos.dev configuration will not be applied when `--cluster-id=2` is passed in future releases. Use `--preset=logos.dev` instead." + ) + lcPreset = "logos.dev" case lcPreset of "": ok(none(NetworkConf)) of "twn": ok(some(NetworkConf.TheWakuNetworkConf())) + of "logos.dev", "logosdev": + ok(some(NetworkConf.LogosDevConf())) else: err("Invalid --preset value passed: " & lcPreset) @@ -982,6 +1012,26 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] = b.withRelayShardedPeerManagement(n.relayShardedPeerManagement) b.withStaticNodes(n.staticNodes) + # Process entry nodes - supports enrtree:, enr:, and multiaddress formats + if n.entryNodes.len > 0: + let (enrTreeUrls, bootstrapEnrs, staticNodesFromEntry) = processEntryNodes( + n.entryNodes + ).valueOr: + return err("Failed to process entry nodes: " & error) + + # Set ENRTree URLs for DNS discovery + if enrTreeUrls.len > 0: + for url in enrTreeUrls: + b.dnsDiscoveryConf.withEnrTreeUrl(url) + + # Set ENR records as bootstrap nodes for discv5 + if bootstrapEnrs.len > 0: + b.discv5Conf.withBootstrapNodes(bootstrapEnrs) + + # Add static nodes (multiaddrs and those extracted from ENR entries) + if staticNodesFromEntry.len > 0: + b.withStaticNodes(staticNodesFromEntry) + if n.numShardsInNetwork != 0: b.withNumShardsInCluster(n.numShardsInNetwork) b.withShardingConf(AutoSharding) @@ -1069,9 +1119,31 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] = b.webSocketConf.withKeyPath(n.websocketSecureKeyPath) b.webSocketConf.withCertPath(n.websocketSecureCertPath) - b.rateLimitConf.withRateLimits(n.rateLimits) + if n.rateLimits.len > 0: + b.rateLimitConf.withRateLimits(n.rateLimits) b.kademliaDiscoveryConf.withEnabled(n.enableKadDiscovery) b.kademliaDiscoveryConf.withBootstrapNodes(n.kadBootstrapNodes) + # Mode-driven configuration overrides + case n.mode + of WakuMode.Core: + b.withRelay(true) + b.filterServiceConf.withEnabled(true) + b.withLightPush(true) + b.discv5Conf.withEnabled(true) + b.withPeerExchange(true) + b.withRendezvous(true) + b.rateLimitConf.withRateLimitsIfNotAssigned( + @["filter:100/1s", "lightpush:5/1s", "px:5/1s"] + ) + of WakuMode.Edge: + b.withPeerExchange(true) + b.withRelay(false) + b.filterServiceConf.withEnabled(false) + b.withLightPush(false) + b.storeServiceConf.withEnabled(false) + of WakuMode.noMode: + discard # use explicit CLI flags as-is + return b.build() diff --git a/tools/confutils/config_option_meta.nim b/tools/confutils/config_option_meta.nim new file mode 100644 index 000000000..1880fdef5 --- /dev/null +++ b/tools/confutils/config_option_meta.nim @@ -0,0 +1,143 @@ +import std/[macros] + +type ConfigOptionMeta* = object + fieldName*: string + typeName*: string + cliName*: string + desc*: string + defaultValue*: string + command*: string + +proc getPragmaValue(pragmaNode: NimNode, pragmaName: string): string {.compileTime.} = + if pragmaNode.kind != nnkPragma: + return "" + + for item in pragmaNode: + if item.kind == nnkExprColonExpr and item[0].eqIdent(pragmaName): + return item[1].repr + + return "" + +proc getFieldName(fieldNode: NimNode): string {.compileTime.} = + case fieldNode.kind + of nnkPragmaExpr: + if fieldNode.len >= 1: + return getFieldName(fieldNode[0]) + of nnkPostfix: + if fieldNode.len >= 2: + return getFieldName(fieldNode[1]) + of nnkIdent, nnkSym: + return fieldNode.strVal + else: + discard + + return fieldNode.repr + +proc getFieldAndPragma( + fieldDef: NimNode +): tuple[fieldName, typeName: string, pragmaNode: NimNode] {.compileTime.} = + if fieldDef.kind != nnkIdentDefs: + return ("", "", newNimNode(nnkEmpty)) + + let declaredField = fieldDef[0] + var typeNode = fieldDef[1] + var pragmaNode = newNimNode(nnkEmpty) + + if declaredField.kind == nnkPragmaExpr: + pragmaNode = declaredField[1] + elif typeNode.kind == nnkPragmaExpr: + pragmaNode = typeNode[1] + typeNode = typeNode[0] + + return (getFieldName(declaredField), typeNode.repr, pragmaNode) + +proc makeMetaNode( + fieldName, typeName, cliName, desc, defaultValue, command: string +): NimNode {.compileTime.} = + result = newTree( + nnkObjConstr, + ident("ConfigOptionMeta"), + newTree(nnkExprColonExpr, ident("fieldName"), newLit(fieldName)), + newTree(nnkExprColonExpr, ident("typeName"), newLit(typeName)), + newTree(nnkExprColonExpr, ident("cliName"), newLit(cliName)), + newTree(nnkExprColonExpr, ident("desc"), newLit(desc)), + newTree(nnkExprColonExpr, ident("defaultValue"), newLit(defaultValue)), + newTree(nnkExprColonExpr, ident("command"), newLit(command)), + ) + +macro extractConfigOptionMeta*(T: typedesc): untyped = + proc findFirstRecList(n: NimNode): NimNode {.compileTime.} = + if n.kind == nnkRecList: + return n + for child in n: + let found = findFirstRecList(child) + if not found.isNil: + return found + return nil + + proc collectRecList( + recList: NimNode, metas: var seq[NimNode], commandCtx: string + ) {.compileTime.} = + for child in recList: + case child.kind + of nnkIdentDefs: + let (fieldName, typeName, pragmaNode) = getFieldAndPragma(child) + if fieldName.len == 0: + continue + let cliName = block: + let n = getPragmaValue(pragmaNode, "name") + if n.len > 0: n else: fieldName + let desc = getPragmaValue(pragmaNode, "desc") + let defaultValue = getPragmaValue(pragmaNode, "defaultValue") + metas.add( + makeMetaNode(fieldName, typeName, cliName, desc, defaultValue, commandCtx) + ) + of nnkRecCase: + let discriminator = child[0] + if discriminator.kind == nnkIdentDefs: + let (fieldName, typeName, pragmaNode) = getFieldAndPragma(discriminator) + if fieldName.len > 0: + let cliName = block: + let n = getPragmaValue(pragmaNode, "name") + if n.len > 0: n else: fieldName + let desc = getPragmaValue(pragmaNode, "desc") + let defaultValue = getPragmaValue(pragmaNode, "defaultValue") + metas.add( + makeMetaNode(fieldName, typeName, cliName, desc, defaultValue, commandCtx) + ) + + for i in 1 ..< child.len: + let branch = child[i] + case branch.kind + of nnkOfBranch: + let branchCtx = branch[0].repr + for j in 1 ..< branch.len: + if branch[j].kind == nnkRecList: + collectRecList(branch[j], metas, branchCtx) + of nnkElse: + for j in 0 ..< branch.len: + if branch[j].kind == nnkRecList: + collectRecList(branch[j], metas, commandCtx) + else: + discard + else: + discard + + let typeInst = getTypeInst(T) + var targetType = T + if typeInst.kind == nnkBracketExpr and typeInst.len >= 2: + targetType = typeInst[1] + + let typeImpl = getImpl(targetType) + let recList = findFirstRecList(typeImpl) + if recList.isNil: + return newTree(nnkPrefix, ident("@"), newNimNode(nnkBracket)) + + var metas: seq[NimNode] = @[] + collectRecList(recList, metas, "") + + let bracket = newNimNode(nnkBracket) + for node in metas: + bracket.add(node) + + result = newTree(nnkPrefix, ident("@"), bracket) diff --git a/waku/api/entry_nodes.nim b/tools/confutils/entry_nodes.nim similarity index 100% rename from waku/api/entry_nodes.nim rename to tools/confutils/entry_nodes.nim diff --git a/vendor/nim-metrics b/vendor/nim-metrics index 11d0cddfb..a1296caf3 160000 --- a/vendor/nim-metrics +++ b/vendor/nim-metrics @@ -1 +1 @@ -Subproject commit 11d0cddfb0e711aa2a8c75d1892ae24a64c299fc +Subproject commit a1296caf3ebb5f30f51a5feae7749a30df2824c2 diff --git a/waku/api.nim b/waku/api.nim index 110a8f431..a977a062a 100644 --- a/waku/api.nim +++ b/waku/api.nim @@ -1,4 +1,5 @@ -import ./api/[api, api_conf, entry_nodes] +import ./api/[api, api_conf] import ./events/message_events +import tools/confutils/entry_nodes export api, api_conf, entry_nodes, message_events diff --git a/waku/api/api.nim b/waku/api/api.nim index 3493513a3..1eee982fd 100644 --- a/waku/api/api.nim +++ b/waku/api/api.nim @@ -1,18 +1,20 @@ -import chronicles, chronos, results, std/strutils +import chronicles, chronos, results import waku/factory/waku import waku/[requests/health_requests, waku_core, waku_node] import waku/node/delivery_service/send_service -import waku/node/delivery_service/subscription_service +import waku/node/delivery_service/subscription_manager import libp2p/peerid +import ../../tools/confutils/cli_args import ./[api_conf, types] +export cli_args + logScope: topics = "api" -# TODO: Specs says it should return a `WakuNode`. As `send` and other APIs are defined, we can align. -proc createNode*(config: NodeConfig): Future[Result[Waku, string]] {.async.} = - let wakuConf = toWakuConf(config).valueOr: +proc createNode*(conf: WakuNodeConf): Future[Result[Waku, string]] {.async.} = + let wakuConf = conf.toWakuConf().valueOr: return err("Failed to handle the configuration: " & error) ## We are not defining app callbacks at node creation @@ -36,18 +38,27 @@ proc subscribe*( ): Future[Result[void, string]] {.async.} = ?checkApiAvailability(w) - return w.deliveryService.subscriptionService.subscribe(contentTopic) + return w.deliveryService.subscriptionManager.subscribe(contentTopic) proc unsubscribe*(w: Waku, contentTopic: ContentTopic): Result[void, string] = ?checkApiAvailability(w) - return w.deliveryService.subscriptionService.unsubscribe(contentTopic) + return w.deliveryService.subscriptionManager.unsubscribe(contentTopic) proc send*( w: Waku, envelope: MessageEnvelope ): Future[Result[RequestId, string]] {.async.} = ?checkApiAvailability(w) + let isSubbed = w.deliveryService.subscriptionManager + .isSubscribed(envelope.contentTopic) + .valueOr(false) + if not isSubbed: + info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic + w.deliveryService.subscriptionManager.subscribe(envelope.contentTopic).isOkOr: + warn "Failed to auto-subscribe", error = error + return err("Failed to auto-subscribe before sending: " & error) + let requestId = RequestId.new(w.rng) let deliveryTask = DeliveryTask.new(requestId, envelope, w.brokerCtx).valueOr: diff --git a/waku/api/api_conf.nim b/waku/api/api_conf.nim index 7cac66426..70bb02af3 100644 --- a/waku/api/api_conf.nim +++ b/waku/api/api_conf.nim @@ -9,7 +9,7 @@ import waku/factory/waku_conf, waku/factory/conf_builder/conf_builder, waku/factory/networks_config, - ./entry_nodes + tools/confutils/entry_nodes export json_serialization, json_options @@ -85,7 +85,9 @@ type WakuMode* {.pure.} = enum Edge Core -type NodeConfig* {.requiresInit.} = object +type NodeConfig* {. + requiresInit, deprecated: "Use WakuNodeConf from tools/confutils/cli_args instead" +.} = object mode: WakuMode protocolsConfig: ProtocolsConfig networkingConfig: NetworkingConfig @@ -154,7 +156,9 @@ proc logLevel*(c: NodeConfig): LogLevel = proc logFormat*(c: NodeConfig): LogFormat = c.logFormat -proc toWakuConf*(nodeConfig: NodeConfig): Result[WakuConf, string] = +proc toWakuConf*( + nodeConfig: NodeConfig +): Result[WakuConf, string] {.deprecated: "Use WakuNodeConf.toWakuConf instead".} = var b = WakuConfBuilder.init() # Apply log configuration @@ -516,7 +520,10 @@ proc readValue*( proc decodeNodeConfigFromJson*( jsonStr: string -): NodeConfig {.raises: [SerializationError].} = +): NodeConfig {. + raises: [SerializationError], + deprecated: "Use WakuNodeConf with fieldPairs-based JSON parsing instead" +.} = var val = NodeConfig.init() # default-initialized try: var stream = unsafeMemoryInput(jsonStr) diff --git a/waku/events/message_events.nim b/waku/events/message_events.nim index cf3dac9b7..677a4a433 100644 --- a/waku/events/message_events.nim +++ b/waku/events/message_events.nim @@ -1,6 +1,4 @@ -import waku/common/broker/event_broker -import waku/api/types -import waku/waku_core/message +import waku/[api/types, waku_core/message, waku_core/topics, common/broker/event_broker] export types @@ -28,3 +26,9 @@ EventBroker: type MessageReceivedEvent* = object messageHash*: string message*: WakuMessage + +EventBroker: + # Internal event emitted when a message arrives from the network via any protocol + type MessageSeenEvent* = object + topic*: PubsubTopic + message*: WakuMessage diff --git a/waku/factory/conf_builder/filter_service_conf_builder.nim b/waku/factory/conf_builder/filter_service_conf_builder.nim index a3f056b01..0a6617430 100644 --- a/waku/factory/conf_builder/filter_service_conf_builder.nim +++ b/waku/factory/conf_builder/filter_service_conf_builder.nim @@ -22,6 +22,12 @@ proc withEnabled*(b: var FilterServiceConfBuilder, enabled: bool) = proc withMaxPeersToServe*(b: var FilterServiceConfBuilder, maxPeersToServe: uint32) = b.maxPeersToServe = some(maxPeersToServe) +proc withMaxPeersToServeIfNotAssigned*( + b: var FilterServiceConfBuilder, maxPeersToServe: uint32 +) = + if b.maxPeersToServe.isNone(): + b.maxPeersToServe = some(maxPeersToServe) + proc withSubscriptionTimeout*( b: var FilterServiceConfBuilder, subscriptionTimeout: uint16 ) = diff --git a/waku/factory/conf_builder/rate_limit_conf_builder.nim b/waku/factory/conf_builder/rate_limit_conf_builder.nim index 0d466a132..b2edbef03 100644 --- a/waku/factory/conf_builder/rate_limit_conf_builder.nim +++ b/waku/factory/conf_builder/rate_limit_conf_builder.nim @@ -14,6 +14,12 @@ proc init*(T: type RateLimitConfBuilder): RateLimitConfBuilder = proc withRateLimits*(b: var RateLimitConfBuilder, rateLimits: seq[string]) = b.strValue = some(rateLimits) +proc withRateLimitsIfNotAssigned*( + b: var RateLimitConfBuilder, rateLimits: seq[string] +) = + if b.strValue.isNone() or b.strValue.get().len == 0: + b.strValue = some(rateLimits) + proc build*(b: RateLimitConfBuilder): Result[ProtocolRateLimitSettings, string] = if b.strValue.isSome() and b.objValue.isSome(): return err("Rate limits conf must only be set once on the builder") diff --git a/waku/factory/conf_builder/waku_conf_builder.nim b/waku/factory/conf_builder/waku_conf_builder.nim index e51f02dbd..2c427918d 100644 --- a/waku/factory/conf_builder/waku_conf_builder.nim +++ b/waku/factory/conf_builder/waku_conf_builder.nim @@ -12,7 +12,8 @@ import ../networks_config, ../../common/logging, ../../common/utils/parse_size_units, - ../../waku_enr/capabilities + ../../waku_enr/capabilities, + tools/confutils/entry_nodes import ./filter_service_conf_builder, @@ -393,6 +394,42 @@ proc applyNetworkConf(builder: var WakuConfBuilder) = discarded = builder.discv5Conf.bootstrapNodes builder.discv5Conf.withBootstrapNodes(networkConf.discv5BootstrapNodes) + if networkConf.enableKadDiscovery: + if not builder.kademliaDiscoveryConf.enabled: + builder.kademliaDiscoveryConf.withEnabled(networkConf.enableKadDiscovery) + + if builder.kademliaDiscoveryConf.bootstrapNodes.len == 0 and + networkConf.kadBootstrapNodes.len > 0: + builder.kademliaDiscoveryConf.withBootstrapNodes(networkConf.kadBootstrapNodes) + + if networkConf.mix: + if builder.mix.isNone: + builder.mix = some(networkConf.mix) + + if builder.p2pReliability.isNone: + builder.withP2pReliability(networkConf.p2pReliability) + + # Process entry nodes from network config - classify and distribute + if networkConf.entryNodes.len > 0: + let processed = processEntryNodes(networkConf.entryNodes) + if processed.isOk(): + let (enrTreeUrls, bootstrapEnrs, staticNodesFromEntry) = processed.get() + + # Set ENRTree URLs for DNS discovery + if enrTreeUrls.len > 0: + for url in enrTreeUrls: + builder.dnsDiscoveryConf.withEnrTreeUrl(url) + + # Set ENR records as bootstrap nodes for discv5 + if bootstrapEnrs.len > 0: + builder.discv5Conf.withBootstrapNodes(bootstrapEnrs) + + # Add static nodes (multiaddrs and those extracted from ENR entries) + if staticNodesFromEntry.len > 0: + builder.withStaticNodes(staticNodesFromEntry) + else: + warn "Failed to process entry nodes from network conf", error = processed.error() + proc build*( builder: var WakuConfBuilder, rng: ref HmacDrbgContext = crypto.newRng() ): Result[WakuConf, string] = @@ -606,7 +643,7 @@ proc build*( provided = maxConnections, recommended = DefaultMaxConnections # TODO: Do the git version thing here - let agentString = builder.agentString.get("nwaku") + let agentString = builder.agentString.get("logos-delivery") # TODO: use `DefaultColocationLimit`. the user of this value should # probably be defining a config object diff --git a/waku/factory/networks_config.nim b/waku/factory/networks_config.nim index c7193aa9c..94856fb21 100644 --- a/waku/factory/networks_config.nim +++ b/waku/factory/networks_config.nim @@ -29,6 +29,11 @@ type NetworkConf* = object shardingConf*: ShardingConf discv5Discovery*: bool discv5BootstrapNodes*: seq[string] + enableKadDiscovery*: bool + kadBootstrapNodes*: seq[string] + entryNodes*: seq[string] + mix*: bool + p2pReliability*: bool # cluster-id=1 (aka The Waku Network) # Cluster configuration corresponding to The Waku Network. Note that it @@ -45,6 +50,11 @@ proc TheWakuNetworkConf*(T: type NetworkConf): NetworkConf = rlnEpochSizeSec: 600, rlnRelayUserMessageLimit: 100, shardingConf: ShardingConf(kind: AutoSharding, numShardsInCluster: 8), + enableKadDiscovery: false, + kadBootstrapNodes: @[], + entryNodes: @[], + mix: false, + p2pReliability: false, discv5Discovery: true, discv5BootstrapNodes: @[ @@ -54,6 +64,36 @@ proc TheWakuNetworkConf*(T: type NetworkConf): NetworkConf = ], ) +# cluster-id=2 (Logos Dev Network) +# Cluster configuration for the Logos Dev Network. +proc LogosDevConf*(T: type NetworkConf): NetworkConf = + const ZeroChainId = 0'u256 + return NetworkConf( + maxMessageSize: "150KiB", + clusterId: 2, + rlnRelay: false, + rlnRelayEthContractAddress: "", + rlnRelayDynamic: false, + rlnRelayChainId: ZeroChainId, + rlnEpochSizeSec: 0, + rlnRelayUserMessageLimit: 0, + shardingConf: ShardingConf(kind: AutoSharding, numShardsInCluster: 8), + enableKadDiscovery: true, + mix: true, + p2pReliability: true, + discv5Discovery: true, + discv5BootstrapNodes: @[], + entryNodes: + @[ + "/dns4/delivery-01.do-ams3.logos.dev.status.im/tcp/30303/p2p/16Uiu2HAmTUbnxLGT9JvV6mu9oPyDjqHK4Phs1VDJNUgESgNSkuby", + "/dns4/delivery-02.do-ams3.logos.dev.status.im/tcp/30303/p2p/16Uiu2HAmMK7PYygBtKUQ8EHp7EfaD3bCEsJrkFooK8RQ2PVpJprH", + "/dns4/delivery-01.gc-us-central1-a.logos.dev.status.im/tcp/30303/p2p/16Uiu2HAm4S1JYkuzDKLKQvwgAhZKs9otxXqt8SCGtB4hoJP1S397", + "/dns4/delivery-02.gc-us-central1-a.logos.dev.status.im/tcp/30303/p2p/16Uiu2HAm8Y9kgBNtjxvCnf1X6gnZJW5EGE4UwwCL3CCm55TwqBiH", + "/dns4/delivery-01.ac-cn-hongkong-c.logos.dev.status.im/tcp/30303/p2p/16Uiu2HAm8YokiNun9BkeA1ZRmhLbtNUvcwRr64F69tYj9fkGyuEP", + "/dns4/delivery-02.ac-cn-hongkong-c.logos.dev.status.im/tcp/30303/p2p/16Uiu2HAkvwhGHKNry6LACrB8TmEFoCJKEX29XR5dDUzk3UT3UNSE", + ], + ) + proc validateShards*( shardingConf: ShardingConf, shards: seq[uint16] ): Result[void, string] = diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 9803a53a9..dbee8d093 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -35,6 +35,7 @@ import node/health_monitor, node/waku_metrics, node/delivery_service/delivery_service, + node/delivery_service/subscription_manager, rest_api/message_cache, rest_api/endpoint/server, rest_api/endpoint/builder as rest_server_builder, @@ -46,7 +47,8 @@ import factory/internal_config, factory/app_callbacks, ], - ./waku_conf + ./waku_conf, + ./waku_state_info logScope: topics = "wakunode waku" @@ -55,7 +57,7 @@ logScope: const git_version* {.strdefine.} = "n/a" type Waku* = ref object - version: string + stateInfo*: WakuStateInfo conf*: WakuConf rng*: ref HmacDrbgContext @@ -78,9 +80,6 @@ type Waku* = ref object brokerCtx*: BrokerContext -func version*(waku: Waku): string = - waku.version - proc setupSwitchServices( waku: Waku, conf: WakuConf, circuitRelay: Relay, rng: ref HmacDrbgContext ) = @@ -215,7 +214,7 @@ proc new*( return err("could not create delivery service: " & $error) var waku = Waku( - version: git_version, + stateInfo: WakuStateInfo.init(node), conf: wakuConf, rng: rng, key: wakuConf.nodeKey, @@ -453,7 +452,7 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: ).isOkOr: error "Failed to set RequestProtocolHealth provider", error = error - ## Setup RequestHealthReport provider (The lost child) + ## Setup RequestHealthReport provider RequestHealthReport.setProvider( globalBrokerContext(), @@ -514,6 +513,10 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} = if not waku.wakuDiscv5.isNil(): await waku.wakuDiscv5.stop() + if not waku.deliveryService.isNil(): + await waku.deliveryService.stopDeliveryService() + waku.deliveryService = nil + if not waku.node.isNil(): await waku.node.stop() diff --git a/waku/factory/waku_state_info.nim b/waku/factory/waku_state_info.nim new file mode 100644 index 000000000..5dc72a693 --- /dev/null +++ b/waku/factory/waku_state_info.nim @@ -0,0 +1,50 @@ +## This module is aimed to collect and provide information about the state of the node, +## such as its version, metrics values, etc. +## It has been originally designed to be used by the debug API, which acts as a consumer of +## this information, but any other module can populate the information it needs to be +## accessible through the debug API. + +import std/[tables, sequtils, strutils] +import metrics, eth/p2p/discoveryv5/enr, libp2p/peerid +import waku/waku_node + +type + NodeInfoId* {.pure.} = enum + Version + Metrics + MyMultiaddresses + MyENR + MyPeerId + + WakuStateInfo* {.requiresInit.} = object + node: WakuNode + +proc getAllPossibleInfoItemIds*(self: WakuStateInfo): seq[NodeInfoId] = + ## Returns all possible options that can be queried to learn about the node's information. + var ret = newSeq[NodeInfoId](0) + for item in NodeInfoId: + ret.add(item) + return ret + +proc getMetrics(): string = + {.gcsafe.}: + return defaultRegistry.toText() ## defaultRegistry is {.global.} in metrics module + +proc getNodeInfoItem*(self: WakuStateInfo, infoItemId: NodeInfoId): string = + ## Returns the content of the info item with the given id if it exists. + case infoItemId + of NodeInfoId.Version: + return git_version + of NodeInfoId.Metrics: + return getMetrics() + of NodeInfoId.MyMultiaddresses: + return self.node.info().listenAddresses.join(",") + of NodeInfoId.MyENR: + return self.node.enr.toURI() + of NodeInfoId.MyPeerId: + return $PeerId(self.node.peerId()) + else: + return "unknown info item id" + +proc init*(T: typedesc[WakuStateInfo], node: WakuNode): T = + return WakuStateInfo(node: node) diff --git a/waku/node/delivery_service/delivery_service.nim b/waku/node/delivery_service/delivery_service.nim index 8106cba9f..258c01e95 100644 --- a/waku/node/delivery_service/delivery_service.nim +++ b/waku/node/delivery_service/delivery_service.nim @@ -5,7 +5,7 @@ import chronos import ./recv_service, ./send_service, - ./subscription_service, + ./subscription_manager, waku/[ waku_core, waku_node, @@ -18,29 +18,31 @@ import type DeliveryService* = ref object sendService*: SendService recvService: RecvService - subscriptionService*: SubscriptionService + subscriptionManager*: SubscriptionManager proc new*( T: type DeliveryService, useP2PReliability: bool, w: WakuNode ): Result[T, string] = ## storeClient is needed to give store visitility to DeliveryService ## wakuRelay and wakuLightpushClient are needed to give a mechanism to SendService to re-publish - let subscriptionService = SubscriptionService.new(w) - let sendService = ?SendService.new(useP2PReliability, w, subscriptionService) - let recvService = RecvService.new(w, subscriptionService) + let subscriptionManager = SubscriptionManager.new(w) + let sendService = ?SendService.new(useP2PReliability, w, subscriptionManager) + let recvService = RecvService.new(w, subscriptionManager) return ok( DeliveryService( sendService: sendService, recvService: recvService, - subscriptionService: subscriptionService, + subscriptionManager: subscriptionManager, ) ) proc startDeliveryService*(self: DeliveryService) = - self.sendService.startSendService() + self.subscriptionManager.startSubscriptionManager() self.recvService.startRecvService() + self.sendService.startSendService() proc stopDeliveryService*(self: DeliveryService) {.async.} = - self.sendService.stopSendService() + await self.sendService.stopSendService() await self.recvService.stopRecvService() + await self.subscriptionManager.stopSubscriptionManager() diff --git a/waku/node/delivery_service/recv_service/recv_service.nim b/waku/node/delivery_service/recv_service/recv_service.nim index 12780033a..0eba2c450 100644 --- a/waku/node/delivery_service/recv_service/recv_service.nim +++ b/waku/node/delivery_service/recv_service/recv_service.nim @@ -2,9 +2,9 @@ ## receive and is backed by store-v3 requests to get an additional degree of certainty ## -import std/[tables, sequtils, options] +import std/[tables, sequtils, options, sets] import chronos, chronicles, libp2p/utility -import ../[subscription_service] +import ../[subscription_manager] import waku/[ waku_core, @@ -13,6 +13,7 @@ import waku_filter_v2/client, waku_core/topics, events/delivery_events, + events/message_events, waku_node, common/broker/broker_context, ] @@ -35,14 +36,9 @@ type RecvMessage = object type RecvService* = ref object of RootObj brokerCtx: BrokerContext - topicsInterest: Table[PubsubTopic, seq[ContentTopic]] - ## Tracks message verification requests and when was the last time a - ## pubsub topic was verified for missing messages - ## The key contains pubsub-topics node: WakuNode - onSubscribeListener: OnFilterSubscribeEventListener - onUnsubscribeListener: OnFilterUnsubscribeEventListener - subscriptionService: SubscriptionService + seenMsgListener: MessageSeenEventListener + subscriptionManager: SubscriptionManager recentReceivedMsgs: seq[RecvMessage] @@ -95,20 +91,20 @@ proc msgChecker(self: RecvService) {.async.} = self.endTimeToCheck = getNowInNanosecondTime() var msgHashesInStore = newSeq[WakuMessageHash](0) - for pubsubTopic, cTopics in self.topicsInterest.pairs: + for sub in self.subscriptionManager.getActiveSubscriptions(): let storeResp: StoreQueryResponse = ( await self.node.wakuStoreClient.queryToAny( StoreQueryRequest( includeData: false, - pubsubTopic: some(PubsubTopic(pubsubTopic)), - contentTopics: cTopics, + pubsubTopic: some(PubsubTopic(sub.pubsubTopic)), + contentTopics: sub.contentTopics, startTime: some(self.startTimeToCheck - DelayExtra.nanos), endTime: some(self.endTimeToCheck + DelayExtra.nanos), ) ) ).valueOr: error "msgChecker failed to get remote msgHashes", - pubsubTopic, cTopics, error = $error + pubsubTopic = sub.pubsubTopic, cTopics = sub.contentTopics, error = $error continue msgHashesInStore.add(storeResp.messages.mapIt(it.messageHash)) @@ -133,31 +129,20 @@ proc msgChecker(self: RecvService) {.async.} = ## update next check times self.startTimeToCheck = self.endTimeToCheck -proc onSubscribe( - self: RecvService, pubsubTopic: string, contentTopics: seq[string] -) {.gcsafe, raises: [].} = - info "onSubscribe", pubsubTopic, contentTopics - self.topicsInterest.withValue(pubsubTopic, contentTopicsOfInterest): - contentTopicsOfInterest[].add(contentTopics) - do: - self.topicsInterest[pubsubTopic] = contentTopics +proc processIncomingMessageOfInterest( + self: RecvService, pubsubTopic: string, message: WakuMessage +) = + ## Resolve an incoming network message that was already filtered by topic. + ## Deduplicate (by hash), store (saves in recently-seen messages) and emit + ## the MAPI MessageReceivedEvent for every unique incoming message. -proc onUnsubscribe( - self: RecvService, pubsubTopic: string, contentTopics: seq[string] -) {.gcsafe, raises: [].} = - info "onUnsubscribe", pubsubTopic, contentTopics + let msgHash = computeMessageHash(pubsubTopic, message) + if not self.recentReceivedMsgs.anyIt(it.msgHash == msgHash): + let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp) + self.recentReceivedMsgs.add(rxMsg) + MessageReceivedEvent.emit(self.brokerCtx, msgHash.to0xHex(), message) - self.topicsInterest.withValue(pubsubTopic, contentTopicsOfInterest): - let remainingCTopics = - contentTopicsOfInterest[].filterIt(not contentTopics.contains(it)) - contentTopicsOfInterest[] = remainingCTopics - - if remainingCTopics.len == 0: - self.topicsInterest.del(pubsubTopic) - do: - error "onUnsubscribe unsubscribing from wrong topic", pubsubTopic, contentTopics - -proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionService): T = +proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionManager): T = ## The storeClient will help to acquire any possible missed messages let now = getNowInNanosecondTime() @@ -165,22 +150,13 @@ proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionService): T = node: node, startTimeToCheck: now, brokerCtx: node.brokerCtx, - subscriptionService: s, - topicsInterest: initTable[PubsubTopic, seq[ContentTopic]](), + subscriptionManager: s, recentReceivedMsgs: @[], ) - if not node.wakuFilterClient.isNil(): - let filterPushHandler = proc( - pubsubTopic: PubsubTopic, message: WakuMessage - ) {.async, closure.} = - ## Captures all the messages received through filter - - let msgHash = computeMessageHash(pubSubTopic, message) - let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp) - recvService.recentReceivedMsgs.add(rxMsg) - - node.wakuFilterClient.registerPushHandler(filterPushHandler) + # TODO: For MAPI Edge support, either call node.wakuFilterClient.registerPushHandler + # so that the RecvService listens to incoming filter messages, + # or have the filter client emit MessageSeenEvent. return recvService @@ -194,26 +170,26 @@ proc startRecvService*(self: RecvService) = self.msgCheckerHandler = self.msgChecker() self.msgPrunerHandler = self.loopPruneOldMessages() - self.onSubscribeListener = OnFilterSubscribeEvent.listen( + self.seenMsgListener = MessageSeenEvent.listen( self.brokerCtx, - proc(subsEv: OnFilterSubscribeEvent) {.async: (raises: []).} = - self.onSubscribe(subsEv.pubsubTopic, subsEv.contentTopics), - ).valueOr: - error "Failed to set OnFilterSubscribeEvent listener", error = error - quit(QuitFailure) + proc(event: MessageSeenEvent) {.async: (raises: []).} = + if not self.subscriptionManager.isSubscribed( + event.topic, event.message.contentTopic + ): + trace "skipping message as I am not subscribed", + shard = event.topic, contenttopic = event.message.contentTopic + return - self.onUnsubscribeListener = OnFilterUnsubscribeEvent.listen( - self.brokerCtx, - proc(subsEv: OnFilterUnsubscribeEvent) {.async: (raises: []).} = - self.onUnsubscribe(subsEv.pubsubTopic, subsEv.contentTopics), + self.processIncomingMessageOfInterest(event.topic, event.message), ).valueOr: - error "Failed to set OnFilterUnsubscribeEvent listener", error = error + error "Failed to set MessageSeenEvent listener", error = error quit(QuitFailure) proc stopRecvService*(self: RecvService) {.async.} = - OnFilterSubscribeEvent.dropListener(self.brokerCtx, self.onSubscribeListener) - OnFilterUnsubscribeEvent.dropListener(self.brokerCtx, self.onUnsubscribeListener) + MessageSeenEvent.dropListener(self.brokerCtx, self.seenMsgListener) if not self.msgCheckerHandler.isNil(): await self.msgCheckerHandler.cancelAndWait() + self.msgCheckerHandler = nil if not self.msgPrunerHandler.isNil(): await self.msgPrunerHandler.cancelAndWait() + self.msgPrunerHandler = nil diff --git a/waku/node/delivery_service/send_service/send_service.nim b/waku/node/delivery_service/send_service/send_service.nim index a41d07786..a3c44bc0c 100644 --- a/waku/node/delivery_service/send_service/send_service.nim +++ b/waku/node/delivery_service/send_service/send_service.nim @@ -5,7 +5,7 @@ import std/[sequtils, tables, options] import chronos, chronicles, libp2p/utility import ./[send_processor, relay_processor, lightpush_processor, delivery_task], - ../[subscription_service], + ../[subscription_manager], waku/[ waku_core, node/waku_node, @@ -58,7 +58,7 @@ type SendService* = ref object of RootObj node: WakuNode checkStoreForMessages: bool - subscriptionService: SubscriptionService + subscriptionManager: SubscriptionManager proc setupSendProcessorChain( peerManager: PeerManager, @@ -99,7 +99,7 @@ proc new*( T: typedesc[SendService], preferP2PReliability: bool, w: WakuNode, - s: SubscriptionService, + s: SubscriptionManager, ): Result[T, string] = if w.wakuRelay.isNil() and w.wakuLightpushClient.isNil(): return err( @@ -120,7 +120,7 @@ proc new*( sendProcessor: sendProcessorChain, node: w, checkStoreForMessages: checkStoreForMessages, - subscriptionService: s, + subscriptionManager: s, ) return ok(sendService) @@ -250,9 +250,9 @@ proc serviceLoop(self: SendService) {.async.} = proc startSendService*(self: SendService) = self.serviceLoopHandle = self.serviceLoop() -proc stopSendService*(self: SendService) = +proc stopSendService*(self: SendService) {.async.} = if not self.serviceLoopHandle.isNil(): - discard self.serviceLoopHandle.cancelAndWait() + await self.serviceLoopHandle.cancelAndWait() proc send*(self: SendService, task: DeliveryTask) {.async.} = assert(not task.isNil(), "task for send must not be nil") @@ -260,7 +260,7 @@ proc send*(self: SendService, task: DeliveryTask) {.async.} = info "SendService.send: processing delivery task", requestId = task.requestId, msgHash = task.msgHash.to0xHex() - self.subscriptionService.subscribe(task.msg.contentTopic).isOkOr: + self.subscriptionManager.subscribe(task.msg.contentTopic).isOkOr: error "SendService.send: failed to subscribe to content topic", contentTopic = task.msg.contentTopic, error = error diff --git a/waku/node/delivery_service/subscription_manager.nim b/waku/node/delivery_service/subscription_manager.nim new file mode 100644 index 000000000..22df47413 --- /dev/null +++ b/waku/node/delivery_service/subscription_manager.nim @@ -0,0 +1,164 @@ +import std/[sets, tables, options, strutils], chronos, chronicles, results +import + waku/[ + waku_core, + waku_core/topics, + waku_core/topics/sharding, + waku_node, + waku_relay, + common/broker/broker_context, + events/delivery_events, + ] + +type SubscriptionManager* = ref object of RootObj + node: WakuNode + contentTopicSubs: Table[PubsubTopic, HashSet[ContentTopic]] + ## Map of Shard to ContentTopic needed because e.g. WakuRelay is PubsubTopic only. + ## A present key with an empty HashSet value means pubsubtopic already subscribed + ## (via subscribePubsubTopics()) but there's no specific content topic interest yet. + +proc new*(T: typedesc[SubscriptionManager], node: WakuNode): T = + SubscriptionManager( + node: node, contentTopicSubs: initTable[PubsubTopic, HashSet[ContentTopic]]() + ) + +proc addContentTopicInterest( + self: SubscriptionManager, shard: PubsubTopic, topic: ContentTopic +): Result[void, string] = + if not self.contentTopicSubs.hasKey(shard): + self.contentTopicSubs[shard] = initHashSet[ContentTopic]() + + self.contentTopicSubs.withValue(shard, cTopics): + if not cTopics[].contains(topic): + cTopics[].incl(topic) + + # TODO: Call a "subscribe(shard, topic)" on filter client here, + # so the filter client can know that subscriptions changed. + + return ok() + +proc removeContentTopicInterest( + self: SubscriptionManager, shard: PubsubTopic, topic: ContentTopic +): Result[void, string] = + self.contentTopicSubs.withValue(shard, cTopics): + if cTopics[].contains(topic): + cTopics[].excl(topic) + + if cTopics[].len == 0 and isNil(self.node.wakuRelay): + self.contentTopicSubs.del(shard) # We're done with cTopics here + + # TODO: Call a "unsubscribe(shard, topic)" on filter client here, + # so the filter client can know that subscriptions changed. + + return ok() + +proc subscribePubsubTopics( + self: SubscriptionManager, shards: seq[PubsubTopic] +): Result[void, string] = + if isNil(self.node.wakuRelay): + return err("subscribePubsubTopics requires a Relay") + + var errors: seq[string] = @[] + + for shard in shards: + if not self.contentTopicSubs.hasKey(shard): + self.node.subscribe((kind: PubsubSub, topic: shard), nil).isOkOr: + errors.add("shard " & shard & ": " & error) + continue + + self.contentTopicSubs[shard] = initHashSet[ContentTopic]() + + if errors.len > 0: + return err("subscribeShard errors: " & errors.join("; ")) + + return ok() + +proc startSubscriptionManager*(self: SubscriptionManager) = + if isNil(self.node.wakuRelay): + return + + if self.node.wakuAutoSharding.isSome(): + # Subscribe relay to all shards in autosharding. + let autoSharding = self.node.wakuAutoSharding.get() + let clusterId = autoSharding.clusterId + let numShards = autoSharding.shardCountGenZero + + if numShards > 0: + var clusterPubsubTopics = newSeqOfCap[PubsubTopic](numShards) + + for i in 0 ..< numShards: + let shardObj = RelayShard(clusterId: clusterId, shardId: uint16(i)) + clusterPubsubTopics.add(PubsubTopic($shardObj)) + + self.subscribePubsubTopics(clusterPubsubTopics).isOkOr: + error "Failed to auto-subscribe Relay to cluster shards: ", error = error + else: + info "SubscriptionManager has no AutoSharding configured; skipping auto-subscribe." + +proc stopSubscriptionManager*(self: SubscriptionManager) {.async.} = + discard + +proc getActiveSubscriptions*( + self: SubscriptionManager +): seq[tuple[pubsubTopic: string, contentTopics: seq[ContentTopic]]] = + var activeSubs: seq[tuple[pubsubTopic: string, contentTopics: seq[ContentTopic]]] = + @[] + + for pubsub, cTopicSet in self.contentTopicSubs.pairs: + if cTopicSet.len > 0: + var cTopicSeq = newSeqOfCap[ContentTopic](cTopicSet.len) + for t in cTopicSet: + cTopicSeq.add(t) + activeSubs.add((pubsub, cTopicSeq)) + + return activeSubs + +proc getShardForContentTopic( + self: SubscriptionManager, topic: ContentTopic +): Result[PubsubTopic, string] = + if self.node.wakuAutoSharding.isSome(): + let shardObj = ?self.node.wakuAutoSharding.get().getShard(topic) + return ok($shardObj) + + return err("SubscriptionManager requires AutoSharding") + +proc isSubscribed*( + self: SubscriptionManager, topic: ContentTopic +): Result[bool, string] = + let shard = ?self.getShardForContentTopic(topic) + return ok( + self.contentTopicSubs.hasKey(shard) and self.contentTopicSubs[shard].contains(topic) + ) + +proc isSubscribed*( + self: SubscriptionManager, shard: PubsubTopic, contentTopic: ContentTopic +): bool {.raises: [].} = + self.contentTopicSubs.withValue(shard, cTopics): + return cTopics[].contains(contentTopic) + return false + +proc subscribe*(self: SubscriptionManager, topic: ContentTopic): Result[void, string] = + if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient): + return err("SubscriptionManager requires either Relay or Filter Client.") + + let shard = ?self.getShardForContentTopic(topic) + + if not isNil(self.node.wakuRelay) and not self.contentTopicSubs.hasKey(shard): + ?self.subscribePubsubTopics(@[shard]) + + ?self.addContentTopicInterest(shard, topic) + + return ok() + +proc unsubscribe*( + self: SubscriptionManager, topic: ContentTopic +): Result[void, string] = + if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient): + return err("SubscriptionManager requires either Relay or Filter Client.") + + let shard = ?self.getShardForContentTopic(topic) + + if self.isSubscribed(shard, topic): + ?self.removeContentTopicInterest(shard, topic) + + return ok() diff --git a/waku/node/delivery_service/subscription_service.nim b/waku/node/delivery_service/subscription_service.nim deleted file mode 100644 index 78763161b..000000000 --- a/waku/node/delivery_service/subscription_service.nim +++ /dev/null @@ -1,64 +0,0 @@ -import chronos, chronicles -import - waku/[ - waku_core, - waku_core/topics, - events/message_events, - waku_node, - common/broker/broker_context, - ] - -type SubscriptionService* = ref object of RootObj - brokerCtx: BrokerContext - node: WakuNode - -proc new*(T: typedesc[SubscriptionService], node: WakuNode): T = - ## The storeClient will help to acquire any possible missed messages - - return SubscriptionService(brokerCtx: node.brokerCtx, node: node) - -proc isSubscribed*( - self: SubscriptionService, topic: ContentTopic -): Result[bool, string] = - var isSubscribed = false - if self.node.wakuRelay.isNil() == false: - return self.node.isSubscribed((kind: ContentSub, topic: topic)) - - # TODO: Add support for edge mode with Filter subscription management - return ok(isSubscribed) - -#TODO: later PR may consider to refactor or place this function elsewhere -# The only important part is that it emits MessageReceivedEvent -proc getReceiveHandler(self: SubscriptionService): WakuRelayHandler = - return proc(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = - let msgHash = computeMessageHash(topic, msg).to0xHex() - info "API received message", - pubsubTopic = topic, contentTopic = msg.contentTopic, msgHash = msgHash - - MessageReceivedEvent.emit(self.brokerCtx, msgHash, msg) - -proc subscribe*(self: SubscriptionService, topic: ContentTopic): Result[void, string] = - let isSubscribed = self.isSubscribed(topic).valueOr: - error "Failed to check subscription status: ", error = error - return err("Failed to check subscription status: " & error) - - if isSubscribed == false: - if self.node.wakuRelay.isNil() == false: - self.node.subscribe((kind: ContentSub, topic: topic), self.getReceiveHandler()).isOkOr: - error "Failed to subscribe: ", error = error - return err("Failed to subscribe: " & error) - - # TODO: Add support for edge mode with Filter subscription management - - return ok() - -proc unsubscribe*( - self: SubscriptionService, topic: ContentTopic -): Result[void, string] = - if self.node.wakuRelay.isNil() == false: - self.node.unsubscribe((kind: ContentSub, topic: topic)).isOkOr: - error "Failed to unsubscribe: ", error = error - return err("Failed to unsubscribe: " & error) - - # TODO: Add support for edge mode with Filter subscription management - return ok() diff --git a/waku/node/health_monitor/node_health_monitor.nim b/waku/node/health_monitor/node_health_monitor.nim index ba0518e61..ddba47ccb 100644 --- a/waku/node/health_monitor/node_health_monitor.nim +++ b/waku/node/health_monitor/node_health_monitor.nim @@ -405,13 +405,8 @@ proc calculateConnectionState*( elif kind in FilterClientProtocols: filterCount = max(filterCount, strength) - debug "calculateConnectionState", - protocol = kind, - strength = strength, - relayCount = relayCount, - storeClientCount = storeClientCount, - lightpushCount = lightpushCount, - filterCount = filterCount + debug "calculateConnectionState", + relayCount, storeClientCount, lightpushCount, filterCount # Relay connectivity should be a sufficient check in Core mode. # "Store peers" are relay peers because incoming messages in @@ -528,6 +523,9 @@ proc healthLoop(hm: NodeHealthMonitor) {.async.} = let newConnectionStatus = hm.calculateConnectionState() if newConnectionStatus != hm.connectionStatus: + debug "connectionStatus change", + oldstatus = hm.connectionStatus, newstatus = newConnectionStatus + hm.connectionStatus = newConnectionStatus EventConnectionStatusChange.emit(hm.node.brokerCtx, newConnectionStatus) diff --git a/waku/node/kernel_api/relay.nim b/waku/node/kernel_api/relay.nim index a0a128449..ec4d05ddd 100644 --- a/waku/node/kernel_api/relay.nim +++ b/waku/node/kernel_api/relay.nim @@ -19,16 +19,20 @@ import libp2p/utility import - ../waku_node, - ../../waku_relay, - ../../waku_core, - ../../waku_core/topics/sharding, - ../../waku_filter_v2, - ../../waku_archive_legacy, - ../../waku_archive, - ../../waku_store_sync, - ../peer_manager, - ../../waku_rln_relay + waku/[ + waku_relay, + waku_core, + waku_core/topics/sharding, + waku_filter_v2, + waku_archive_legacy, + waku_archive, + waku_store_sync, + waku_rln_relay, + node/waku_node, + node/peer_manager, + common/broker/broker_context, + events/message_events, + ] export waku_relay.WakuRelayHandler @@ -44,14 +48,25 @@ logScope: ## Waku relay proc registerRelayHandler( - node: WakuNode, topic: PubsubTopic, appHandler: WakuRelayHandler -) = + node: WakuNode, topic: PubsubTopic, appHandler: WakuRelayHandler = nil +): bool = ## Registers the only handler for the given topic. ## Notice that this handler internally calls other handlers, such as filter, ## archive, etc, plus the handler provided by the application. + ## Returns `true` if a mesh subscription was created or `false` if the relay + ## was already subscribed to the topic. - if node.wakuRelay.isSubscribed(topic): - return + let alreadySubscribed = node.wakuRelay.isSubscribed(topic) + + if not appHandler.isNil(): + if not alreadySubscribed or not node.legacyAppHandlers.hasKey(topic): + node.legacyAppHandlers[topic] = appHandler + else: + debug "Legacy appHandler already exists for active PubsubTopic, ignoring new handler", + topic = topic + + if alreadySubscribed: + return false proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = let msgSizeKB = msg.payload.len / 1000 @@ -82,6 +97,9 @@ proc registerRelayHandler( node.wakuStoreReconciliation.messageIngress(topic, msg) + proc internalHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + MessageSeenEvent.emit(node.brokerCtx, topic, msg) + let uniqueTopicHandler = proc( topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = @@ -89,7 +107,15 @@ proc registerRelayHandler( await filterHandler(topic, msg) await archiveHandler(topic, msg) await syncHandler(topic, msg) - await appHandler(topic, msg) + await internalHandler(topic, msg) + + # Call the legacy (kernel API) app handler if it exists. + # Normally, hasKey is false and the MessageSeenEvent bus (new API) is used instead. + # But we need to support legacy behavior (kernel API use), hence this. + # NOTE: We can delete `legacyAppHandlers` if instead we refactor WakuRelay to support multiple + # PubsubTopic handlers, since that's actually supported by libp2p PubSub (bigger refactor...) + if node.legacyAppHandlers.hasKey(topic) and not node.legacyAppHandlers[topic].isNil(): + await node.legacyAppHandlers[topic](topic, msg) node.wakuRelay.subscribe(topic, uniqueTopicHandler) @@ -115,8 +141,11 @@ proc subscribe*( ): Result[void, string] = ## Subscribes to a PubSub or Content topic. Triggers handler when receiving messages on ## this topic. WakuRelayHandler is a method that takes a topic and a Waku message. + ## If `handler` is nil, the API call will subscribe to the topic in the relay mesh + ## but no app handler will be registered at this time (it can be registered later with + ## another call to this proc for the same gossipsub topic). - if node.wakuRelay.isNil(): + if isNil(node.wakuRelay): error "Invalid API call to `subscribe`. WakuRelay not mounted." return err("Invalid API call to `subscribe`. WakuRelay not mounted.") @@ -124,13 +153,15 @@ proc subscribe*( error "Failed to decode subscription event", error = error return err("Failed to decode subscription event: " & error) - if node.wakuRelay.isSubscribed(pubsubTopic): - warn "No-effect API call to subscribe. Already subscribed to topic", pubsubTopic - return ok() - - info "subscribe", pubsubTopic, contentTopicOp - node.registerRelayHandler(pubsubTopic, handler) - node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic)) + if node.registerRelayHandler(pubsubTopic, handler): + info "subscribe", pubsubTopic, contentTopicOp + node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic)) + else: + if isNil(handler): + warn "No-effect API call to subscribe. Already subscribed to topic", pubsubTopic + else: + info "subscribe (was already subscribed in the mesh; appHandler set)", + pubsubTopic = pubsubTopic return ok() @@ -138,8 +169,10 @@ proc unsubscribe*( node: WakuNode, subscription: SubscriptionEvent ): Result[void, string] = ## Unsubscribes from a specific PubSub or Content topic. + ## This will both unsubscribe from the relay mesh and remove the app handler, if any. + ## NOTE: This works because using MAPI and Kernel API at the same time is unsupported. - if node.wakuRelay.isNil(): + if isNil(node.wakuRelay): error "Invalid API call to `unsubscribe`. WakuRelay not mounted." return err("Invalid API call to `unsubscribe`. WakuRelay not mounted.") @@ -147,13 +180,20 @@ proc unsubscribe*( error "Failed to decode unsubscribe event", error = error return err("Failed to decode unsubscribe event: " & error) - if not node.wakuRelay.isSubscribed(pubsubTopic): - warn "No-effect API call to `unsubscribe`. Was not subscribed", pubsubTopic - return ok() + let hadHandler = node.legacyAppHandlers.hasKey(pubsubTopic) + if hadHandler: + node.legacyAppHandlers.del(pubsubTopic) - info "unsubscribe", pubsubTopic, contentTopicOp - node.wakuRelay.unsubscribe(pubsubTopic) - node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic)) + if node.wakuRelay.isSubscribed(pubsubTopic): + info "unsubscribe", pubsubTopic, contentTopicOp + node.wakuRelay.unsubscribe(pubsubTopic) + node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic)) + else: + if not hadHandler: + warn "No-effect API call to `unsubscribe`. Was not subscribed", pubsubTopic + else: + info "unsubscribe (was not subscribed in the mesh; appHandler removed)", + pubsubTopic = pubsubTopic return ok() diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 254387c32..0cef4cc5d 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -146,6 +146,8 @@ type started*: bool # Indicates that node has started listening topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent] rateLimitSettings*: ProtocolRateLimitSettings + legacyAppHandlers*: Table[PubsubTopic, WakuRelayHandler] + ## Kernel API Relay appHandlers (if any) wakuMix*: WakuMix edgeTopicsHealth*: Table[PubsubTopic, TopicHealth] edgeHealthEvent*: AsyncEvent diff --git a/waku/rest_api/endpoint/builder.nim b/waku/rest_api/endpoint/builder.nim index bbd8de422..41ab7e06b 100644 --- a/waku/rest_api/endpoint/builder.nim +++ b/waku/rest_api/endpoint/builder.nim @@ -28,7 +28,6 @@ import # It will always be called from main thread anyway. # Ref: https://nim-lang.org/docs/manual.html#threads-gc-safety var restServerNotInstalledTab {.threadvar.}: TableRef[string, string] -restServerNotInstalledTab = newTable[string, string]() export WakuRestServerRef @@ -42,6 +41,9 @@ type RestServerConf* = object proc startRestServerEssentials*( nodeHealthMonitor: NodeHealthMonitor, conf: RestServerConf, portsShift: uint16 ): Result[WakuRestServerRef, string] = + if restServerNotInstalledTab.isNil: + restServerNotInstalledTab = newTable[string, string]() + let requestErrorHandler: RestRequestErrorHandler = proc( error: RestRequestError, request: HttpRequestRef ): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} = diff --git a/waku/waku_core/subscription/subscription_manager.nim b/waku/waku_core/subscription/subscription_manager.nim index 1b950b3b4..ccade763b 100644 --- a/waku/waku_core/subscription/subscription_manager.nim +++ b/waku/waku_core/subscription/subscription_manager.nim @@ -5,19 +5,19 @@ import std/tables, results, chronicles, chronos import ./push_handler, ../topics, ../message ## Subscription manager -type SubscriptionManager* = object +type LegacySubscriptionManager* = object subscriptions: TableRef[(string, ContentTopic), FilterPushHandler] -proc init*(T: type SubscriptionManager): T = - SubscriptionManager( +proc init*(T: type LegacySubscriptionManager): T = + LegacySubscriptionManager( subscriptions: newTable[(string, ContentTopic), FilterPushHandler]() ) -proc clear*(m: var SubscriptionManager) = +proc clear*(m: var LegacySubscriptionManager) = m.subscriptions.clear() proc registerSubscription*( - m: SubscriptionManager, + m: LegacySubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic, handler: FilterPushHandler, @@ -29,12 +29,12 @@ proc registerSubscription*( error "failed to register filter subscription", error = getCurrentExceptionMsg() proc removeSubscription*( - m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic + m: LegacySubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic ) = m.subscriptions.del((pubsubTopic, contentTopic)) proc notifySubscriptionHandler*( - m: SubscriptionManager, + m: LegacySubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic, message: WakuMessage, @@ -48,5 +48,5 @@ proc notifySubscriptionHandler*( except CatchableError: discard -proc getSubscriptionsCount*(m: SubscriptionManager): int = +proc getSubscriptionsCount*(m: LegacySubscriptionManager): int = m.subscriptions.len()