Merge branch 'master' into release/v0.38

This commit is contained in:
Darshan 2026-03-11 01:58:02 +05:30 committed by GitHub
commit 3bd62f55be
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
45 changed files with 1883 additions and 1333 deletions

View File

@ -469,6 +469,7 @@ logosdelivery_example: | build liblogosdelivery
ifeq ($(detected_OS),Darwin)
gcc -o build/$@ \
liblogosdelivery/examples/logosdelivery_example.c \
liblogosdelivery/examples/json_utils.c \
-I./liblogosdelivery \
-L./build \
-llogosdelivery \
@ -476,6 +477,7 @@ ifeq ($(detected_OS),Darwin)
else ifeq ($(detected_OS),Linux)
gcc -o build/$@ \
liblogosdelivery/examples/logosdelivery_example.c \
liblogosdelivery/examples/json_utils.c \
-I./liblogosdelivery \
-L./build \
-llogosdelivery \
@ -483,6 +485,7 @@ else ifeq ($(detected_OS),Linux)
else ifeq ($(detected_OS),Windows)
gcc -o build/$@.exe \
liblogosdelivery/examples/logosdelivery_example.c \
liblogosdelivery/examples/json_utils.c \
-I./liblogosdelivery \
-L./build \
-llogosdelivery \

View File

@ -59,19 +59,24 @@ when isMainModule:
echo "Starting Waku node..."
let config =
if (args.ethRpcEndpoint == ""):
# Create a basic configuration for the Waku node
# No RLN as we don't have an ETH RPC Endpoint
NodeConfig.init(
protocolsConfig = ProtocolsConfig.init(entryNodes = @[], clusterId = 42)
)
else:
# Connect to TWN, use ETH RPC Endpoint for RLN
NodeConfig.init(mode = WakuMode.Core, ethRpcEndpoints = @[args.ethRpcEndpoint])
# Use WakuNodeConf (the CLI configuration type) for node setup
var conf = defaultWakuNodeConf().valueOr:
echo "Failed to create default config: ", error
quit(QuitFailure)
if args.ethRpcEndpoint == "":
# Create a basic configuration for the Waku node
# No RLN as we don't have an ETH RPC Endpoint
conf.mode = Core
conf.preset = "logos.dev"
else:
# Connect to TWN, use ETH RPC Endpoint for RLN
conf.mode = Core
conf.preset = "twn"
conf.ethClientUrls = @[EthRpcUrl(args.ethRpcEndpoint)]
# Create the node using the library API's createNode function
let node = (waitFor createNode(config)).valueOr:
let node = (waitFor createNode(conf)).valueOr:
echo "Failed to create node: ", error
quit(QuitFailure)

View File

@ -32,18 +32,17 @@ void *logosdelivery_create_node(
```json
{
"mode": "Core",
"clusterId": 1,
"entryNodes": [
"enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im"
],
"networkingConfig": {
"listenIpv4": "0.0.0.0",
"p2pTcpPort": 60000,
"discv5UdpPort": 9000
}
"preset": "logos.dev",
"listenAddress": "0.0.0.0",
"tcpPort": 60000,
"discv5UdpPort": 9000
}
```
Configuration uses flat field names matching `WakuNodeConf` in `tools/confutils/cli_args.nim`.
Use `"preset"` to select a network preset (e.g., `"twn"`, `"logos.dev"`) which auto-configures
entry nodes, cluster ID, sharding, and other network-specific settings.
#### `logosdelivery_start_node`
Starts the node.
@ -207,8 +206,9 @@ void callback(int ret, const char *msg, size_t len, void *userData) {
int main() {
const char *config = "{"
"\"logLevel\": \"INFO\","
"\"mode\": \"Core\","
"\"clusterId\": 1"
"\"preset\": \"logos.dev\""
"}";
// Create node

View File

@ -1,8 +1,12 @@
import ffi
import std/locks
import waku/factory/waku
declareLibrary("logosdelivery")
var eventCallbackLock: Lock
initLock(eventCallbackLock)
template requireInitializedNode*(
ctx: ptr FFIContext[Waku], opName: string, onError: untyped
) =
@ -20,5 +24,10 @@ proc logosdelivery_set_event_callback(
echo "error: invalid context in logosdelivery_set_event_callback"
return
# prevent race conditions that might happen due incorrect usage.
eventCallbackLock.acquire()
defer:
eventCallbackLock.release()
ctx[].eventCallback = cast[pointer](callback)
ctx[].eventUserData = userData

View File

@ -0,0 +1,96 @@
#include "json_utils.h"
#include <stdio.h>
#include <string.h>
const char* extract_json_field(const char *json, const char *field, char *buffer, size_t bufSize) {
char searchStr[256];
snprintf(searchStr, sizeof(searchStr), "\"%s\":\"", field);
const char *start = strstr(json, searchStr);
if (!start) {
return NULL;
}
start += strlen(searchStr);
const char *end = strchr(start, '"');
if (!end) {
return NULL;
}
size_t len = end - start;
if (len >= bufSize) {
len = bufSize - 1;
}
memcpy(buffer, start, len);
buffer[len] = '\0';
return buffer;
}
const char* extract_json_object(const char *json, const char *field, size_t *outLen) {
char searchStr[256];
snprintf(searchStr, sizeof(searchStr), "\"%s\":{", field);
const char *start = strstr(json, searchStr);
if (!start) {
return NULL;
}
// Advance to the opening brace
start = strchr(start, '{');
if (!start) {
return NULL;
}
// Find the matching closing brace (handles nested braces)
int depth = 0;
const char *p = start;
while (*p) {
if (*p == '{') depth++;
else if (*p == '}') {
depth--;
if (depth == 0) {
*outLen = (size_t)(p - start + 1);
return start;
}
}
p++;
}
return NULL;
}
int decode_json_byte_array(const char *json, const char *field, char *buffer, size_t bufSize) {
char searchStr[256];
snprintf(searchStr, sizeof(searchStr), "\"%s\":[", field);
const char *start = strstr(json, searchStr);
if (!start) {
return -1;
}
// Advance to the opening bracket
start = strchr(start, '[');
if (!start) {
return -1;
}
start++; // skip '['
size_t pos = 0;
const char *p = start;
while (*p && *p != ']' && pos < bufSize - 1) {
// Skip whitespace and commas
while (*p == ' ' || *p == ',' || *p == '\n' || *p == '\r' || *p == '\t') p++;
if (*p == ']') break;
// Parse integer
int val = 0;
while (*p >= '0' && *p <= '9') {
val = val * 10 + (*p - '0');
p++;
}
buffer[pos++] = (char)val;
}
buffer[pos] = '\0';
return (int)pos;
}

View File

@ -0,0 +1,21 @@
#ifndef JSON_UTILS_H
#define JSON_UTILS_H
#include <stddef.h>
// Extract a JSON string field value into buffer.
// Returns pointer to buffer on success, NULL on failure.
// Very basic parser - for production use a proper JSON library.
const char* extract_json_field(const char *json, const char *field, char *buffer, size_t bufSize);
// Extract a nested JSON object as a raw string.
// Returns a pointer into `json` at the start of the object, and sets `outLen`.
// Handles nested braces.
const char* extract_json_object(const char *json, const char *field, size_t *outLen);
// Decode a JSON array of integers (byte values) into a buffer.
// Parses e.g. [72,101,108,108,111] into "Hello".
// Returns number of bytes decoded, or -1 on error.
int decode_json_byte_array(const char *json, const char *field, char *buffer, size_t bufSize);
#endif // JSON_UTILS_H

View File

@ -1,4 +1,5 @@
#include "../liblogosdelivery.h"
#include "json_utils.h"
#include <stdio.h>
#include <string.h>
#include <unistd.h>
@ -6,33 +7,10 @@
static int create_node_ok = -1;
// Helper function to extract a JSON string field value
// Very basic parser - for production use a proper JSON library
const char* extract_json_field(const char *json, const char *field, char *buffer, size_t bufSize) {
char searchStr[256];
snprintf(searchStr, sizeof(searchStr), "\"%s\":\"", field);
const char *start = strstr(json, searchStr);
if (!start) {
return NULL;
}
start += strlen(searchStr);
const char *end = strchr(start, '"');
if (!end) {
return NULL;
}
size_t len = end - start;
if (len >= bufSize) {
len = bufSize - 1;
}
memcpy(buffer, start, len);
buffer[len] = '\0';
return buffer;
}
// Flags set by event callback, polled by main thread
static volatile int got_message_sent = 0;
static volatile int got_message_error = 0;
static volatile int got_message_received = 0;
// Event callback that handles message events
void event_callback(int ret, const char *msg, size_t len, void *userData) {
@ -61,7 +39,8 @@ void event_callback(int ret, const char *msg, size_t len, void *userData) {
char messageHash[128];
extract_json_field(eventJson, "requestId", requestId, sizeof(requestId));
extract_json_field(eventJson, "messageHash", messageHash, sizeof(messageHash));
printf("📤 [EVENT] Message sent - RequestID: %s, Hash: %s\n", requestId, messageHash);
printf("[EVENT] Message sent - RequestID: %s, Hash: %s\n", requestId, messageHash);
got_message_sent = 1;
} else if (strcmp(eventType, "message_error") == 0) {
char requestId[128];
@ -70,18 +49,59 @@ void event_callback(int ret, const char *msg, size_t len, void *userData) {
extract_json_field(eventJson, "requestId", requestId, sizeof(requestId));
extract_json_field(eventJson, "messageHash", messageHash, sizeof(messageHash));
extract_json_field(eventJson, "error", error, sizeof(error));
printf("[EVENT] Message error - RequestID: %s, Hash: %s, Error: %s\n",
printf("[EVENT] Message error - RequestID: %s, Hash: %s, Error: %s\n",
requestId, messageHash, error);
got_message_error = 1;
} else if (strcmp(eventType, "message_propagated") == 0) {
char requestId[128];
char messageHash[128];
extract_json_field(eventJson, "requestId", requestId, sizeof(requestId));
extract_json_field(eventJson, "messageHash", messageHash, sizeof(messageHash));
printf("✅ [EVENT] Message propagated - RequestID: %s, Hash: %s\n", requestId, messageHash);
printf("[EVENT] Message propagated - RequestID: %s, Hash: %s\n", requestId, messageHash);
} else if (strcmp(eventType, "connection_status_change") == 0) {
char connectionStatus[256];
extract_json_field(eventJson, "connectionStatus", connectionStatus, sizeof(connectionStatus));
printf("[EVENT] Connection status change - Status: %s\n", connectionStatus);
} else if (strcmp(eventType, "message_received") == 0) {
char messageHash[128];
extract_json_field(eventJson, "messageHash", messageHash, sizeof(messageHash));
// Extract the nested "message" object
size_t msgObjLen = 0;
const char *msgObj = extract_json_object(eventJson, "message", &msgObjLen);
if (msgObj) {
// Make a null-terminated copy of the message object
char *msgJson = malloc(msgObjLen + 1);
if (msgJson) {
memcpy(msgJson, msgObj, msgObjLen);
msgJson[msgObjLen] = '\0';
char contentTopic[256];
extract_json_field(msgJson, "contentTopic", contentTopic, sizeof(contentTopic));
// Decode payload from JSON byte array to string
char payload[4096];
int payloadLen = decode_json_byte_array(msgJson, "payload", payload, sizeof(payload));
printf("[EVENT] Message received - Hash: %s, ContentTopic: %s\n", messageHash, contentTopic);
if (payloadLen > 0) {
printf(" Payload (%d bytes): %.*s\n", payloadLen, payloadLen, payload);
} else {
printf(" Payload: (empty or could not decode)\n");
}
free(msgJson);
}
} else {
printf("[EVENT] Message received - Hash: %s (could not parse message)\n", messageHash);
}
got_message_received = 1;
} else {
printf(" [EVENT] Unknown event type: %s\n", eventType);
printf("[EVENT] Unknown event type: %s\n", eventType);
}
free(eventJson);
@ -109,23 +129,12 @@ void simple_callback(int ret, const char *msg, size_t len, void *userData) {
int main() {
printf("=== Logos Messaging API (LMAPI) Example ===\n\n");
// Configuration JSON for creating a node
// Configuration JSON using WakuNodeConf field names (flat structure).
// Field names match Nim identifiers from WakuNodeConf in tools/confutils/cli_args.nim.
const char *config = "{"
"\"logLevel\": \"DEBUG\","
// "\"mode\": \"Edge\","
"\"logLevel\": \"INFO\","
"\"mode\": \"Core\","
"\"protocolsConfig\": {"
"\"entryNodes\": [\"/dns4/node-01.do-ams3.misc.logos-chat.status.im/tcp/30303/p2p/16Uiu2HAkxoqUTud5LUPQBRmkeL2xP4iKx2kaABYXomQRgmLUgf78\"],"
"\"clusterId\": 42,"
"\"autoShardingConfig\": {"
"\"numShardsInCluster\": 8"
"}"
"},"
"\"networkingConfig\": {"
"\"listenIpv4\": \"0.0.0.0\","
"\"p2pTcpPort\": 60000,"
"\"discv5UdpPort\": 9000"
"}"
"\"preset\": \"logos.dev\""
"}";
printf("1. Creating node...\n");
@ -152,7 +161,7 @@ int main() {
logosdelivery_start_node(ctx, simple_callback, (void *)"start_node");
// Wait for node to start
sleep(2);
sleep(5);
printf("\n4. Subscribing to content topic...\n");
const char *contentTopic = "/example/1/chat/proto";
@ -161,7 +170,23 @@ int main() {
// Wait for subscription
sleep(1);
printf("\n5. Sending a message...\n");
printf("\n5. Retrieving all possibl node info ids...\n");
logosdelivery_get_available_node_info_ids(ctx, simple_callback, (void *)"get_available_node_info_ids");
printf("\nRetrieving node info for a specific invalid ID...\n");
logosdelivery_get_node_info(ctx, simple_callback, (void *)"get_node_info", "WrongNodeInfoId");
printf("\nRetrieving several node info for specific correct IDs...\n");
logosdelivery_get_node_info(ctx, simple_callback, (void *)"get_node_info", "Version");
// logosdelivery_get_node_info(ctx, simple_callback, (void *)"get_node_info", "Metrics");
logosdelivery_get_node_info(ctx, simple_callback, (void *)"get_node_info", "MyMultiaddresses");
logosdelivery_get_node_info(ctx, simple_callback, (void *)"get_node_info", "MyENR");
logosdelivery_get_node_info(ctx, simple_callback, (void *)"get_node_info", "MyPeerId");
printf("\nRetrieving available configs...\n");
logosdelivery_get_available_configs(ctx, simple_callback, (void *)"get_available_configs");
printf("\n6. Sending a message...\n");
printf("Watch for message events (sent, propagated, or error):\n");
// Create base64-encoded payload: "Hello, Logos Messaging!"
const char *message = "{"
@ -171,21 +196,30 @@ int main() {
"}";
logosdelivery_send(ctx, simple_callback, (void *)"send", message);
// Wait for message events to arrive
// Poll for terminal message events (sent, error, or received) with timeout
printf("Waiting for message delivery events...\n");
sleep(60);
int timeout_sec = 60;
int elapsed = 0;
while (!(got_message_sent || got_message_error || got_message_received)
&& elapsed < timeout_sec) {
usleep(100000); // 100ms
elapsed++;
}
if (elapsed >= timeout_sec) {
printf("Timed out waiting for message events after %d seconds\n", timeout_sec);
}
printf("\n6. Unsubscribing from content topic...\n");
printf("\n7. Unsubscribing from content topic...\n");
logosdelivery_unsubscribe(ctx, simple_callback, (void *)"unsubscribe", contentTopic);
sleep(1);
printf("\n7. Stopping node...\n");
printf("\n8. Stopping node...\n");
logosdelivery_stop_node(ctx, simple_callback, (void *)"stop_node");
sleep(1);
printf("\n8. Destroying context...\n");
printf("\n9. Destroying context...\n");
logosdelivery_destroy(ctx, simple_callback, (void *)"destroy");
printf("\n=== Example completed ===\n");

View File

@ -22,7 +22,9 @@ extern "C"
// Creates a new instance of the node from the given configuration JSON.
// Returns a pointer to the Context needed by the rest of the API functions.
// Configuration should be in JSON format following the NodeConfig structure.
// Configuration should be in JSON format using WakuNodeConf field names.
// Field names match Nim identifiers from WakuNodeConf (camelCase).
// Example: {"mode": "Core", "clusterId": 42, "relay": true}
void *logosdelivery_create_node(
const char *configJson,
FFICallBack callback,
@ -75,6 +77,22 @@ extern "C"
FFICallBack callback,
void *userData);
// Retrieves the list of available node info IDs.
int logosdelivery_get_available_node_info_ids(void *ctx,
FFICallBack callback,
void *userData);
// Given a node info ID, retrieves the corresponding info.
int logosdelivery_get_node_info(void *ctx,
FFICallBack callback,
void *userData,
const char *nodeInfoId);
// Retrieves the list of available configurations.
int logosdelivery_get_available_configs(void *ctx,
FFICallBack callback,
void *userData);
#ifdef __cplusplus
}
#endif

View File

@ -4,26 +4,8 @@ import waku/factory/waku, waku/node/waku_node, ./declare_lib
################################################################################
## Include different APIs, i.e. all procs with {.ffi.} pragma
include ./logos_delivery_api/node_api, ./logos_delivery_api/messaging_api
################################################################################
### Exported procs
proc logosdelivery_destroy(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
): cint {.dynlib, exportc, cdecl.} =
initializeLibrary()
checkParams(ctx, callback, userData)
ffi.destroyFFIContext(ctx).isOkOr:
let msg = "liblogosdelivery error: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
## always need to invoke the callback although we don't retrieve value to the caller
callback(RET_OK, nil, 0, userData)
return RET_OK
# ### End of exported procs
# ################################################################################
include
./logos_delivery_api/node_api,
./logos_delivery_api/messaging_api,
./logos_delivery_api/debug_api

View File

@ -0,0 +1,54 @@
import std/[json, strutils]
import waku/factory/waku_state_info
import tools/confutils/[cli_args, config_option_meta]
proc logosdelivery_get_available_node_info_ids(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
) {.ffi.} =
## Returns the list of all available node info item ids that
## can be queried with `get_node_info_item`.
requireInitializedNode(ctx, "GetNodeInfoIds"):
return err(errMsg)
return ok($ctx.myLib[].stateInfo.getAllPossibleInfoItemIds())
proc logosdelivery_get_node_info(
ctx: ptr FFIContext[Waku],
callback: FFICallBack,
userData: pointer,
nodeInfoId: cstring,
) {.ffi.} =
## Returns the content of the node info item with the given id if it exists.
requireInitializedNode(ctx, "GetNodeInfoItem"):
return err(errMsg)
let infoItemIdEnum =
try:
parseEnum[NodeInfoId]($nodeInfoId)
except ValueError:
return err("Invalid node info id: " & $nodeInfoId)
return ok(ctx.myLib[].stateInfo.getNodeInfoItem(infoItemIdEnum))
proc logosdelivery_get_available_configs(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
) {.ffi.} =
## Returns information about the accepted config items.
requireInitializedNode(ctx, "GetAvailableConfigs"):
return err(errMsg)
let optionMetas: seq[ConfigOptionMeta] = extractConfigOptionMeta(WakuNodeConf)
var configOptionDetails = newJArray()
# for confField, confValue in fieldPairs(conf):
# defaultConfig[confField] = $confValue
for meta in optionMetas:
configOptionDetails.add(
%*{meta.fieldName: meta.typeName & "(" & meta.defaultValue & ")", "desc": meta.desc}
)
var jsonNode = newJObject()
jsonNode["configOptions"] = configOptionDetails
let asString = pretty(jsonNode)
return ok(pretty(jsonNode))

View File

@ -1,10 +1,11 @@
import std/json
import chronos, results, ffi
import std/[json, strutils]
import chronos, chronicles, results, confutils, confutils/std/net, ffi
import
waku/factory/waku,
waku/node/waku_node,
waku/api/[api, api_conf, types],
waku/events/message_events,
waku/api/[api, types],
waku/events/[message_events, health_events],
tools/confutils/cli_args,
../declare_lib,
../json_event
@ -14,21 +15,54 @@ proc `%`*(id: RequestId): JsonNode =
registerReqFFI(CreateNodeRequest, ctx: ptr FFIContext[Waku]):
proc(configJson: cstring): Future[Result[string, string]] {.async.} =
## Parse the JSON configuration and create a node
let nodeConfig =
try:
decodeNodeConfigFromJson($configJson)
except SerializationError as e:
return err("Failed to parse config JSON: " & e.msg)
## Parse the JSON configuration using fieldPairs approach (WakuNodeConf)
var conf = defaultWakuNodeConf().valueOr:
return err("Failed creating default conf: " & error)
var jsonNode: JsonNode
try:
jsonNode = parseJson($configJson)
except Exception:
return err(
"Failed to parse config JSON: " & getCurrentExceptionMsg() &
" configJson string: " & $configJson
)
for confField, confValue in fieldPairs(conf):
if jsonNode.contains(confField):
let formattedString = ($jsonNode[confField]).strip(chars = {'\"'})
try:
confValue = parseCmdArg(typeof(confValue), formattedString)
except Exception:
return err(
"Failed to parse field '" & confField & "': " & getCurrentExceptionMsg() &
". Value: " & formattedString
)
# Create the node
ctx.myLib[] = (await api.createNode(nodeConfig)).valueOr:
ctx.myLib[] = (await api.createNode(conf)).valueOr:
let errMsg = $error
chronicles.error "CreateNodeRequest failed", err = errMsg
return err(errMsg)
return ok("")
proc logosdelivery_destroy(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
): cint {.dynlib, exportc, cdecl.} =
initializeLibrary()
checkParams(ctx, callback, userData)
ffi.destroyFFIContext(ctx).isOkOr:
let msg = "liblogosdelivery error: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
## always need to invoke the callback although we don't retrieve value to the caller
callback(RET_OK, nil, 0, userData)
return RET_OK
proc logosdelivery_create_node(
configJson: cstring, callback: FFICallback, userData: pointer
): pointer {.dynlib, exportc, cdecl.} =
@ -50,6 +84,10 @@ proc logosdelivery_create_node(
).isOkOr:
let msg = "error in sendRequestToFFIThread: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
# free allocated resources as they won't be available
ffi.destroyFFIContext(ctx).isOkOr:
chronicles.error "Error in destroyFFIContext after sendRequestToFFIThread during creation",
err = $error
return nil
return ctx
@ -88,6 +126,24 @@ proc logosdelivery_start_node(
chronicles.error "MessagePropagatedEvent.listen failed", err = $error
return err("MessagePropagatedEvent.listen failed: " & $error)
let receivedListener = MessageReceivedEvent.listen(
ctx.myLib[].brokerCtx,
proc(event: MessageReceivedEvent) {.async: (raises: []).} =
callEventCallback(ctx, "onMessageReceived"):
$newJsonEvent("message_received", event),
).valueOr:
chronicles.error "MessageReceivedEvent.listen failed", err = $error
return err("MessageReceivedEvent.listen failed: " & $error)
let ConnectionStatusChangeListener = EventConnectionStatusChange.listen(
ctx.myLib[].brokerCtx,
proc(event: EventConnectionStatusChange) {.async: (raises: []).} =
callEventCallback(ctx, "onConnectionStatusChange"):
$newJsonEvent("connection_status_change", event),
).valueOr:
chronicles.error "ConnectionStatusChange.listen failed", err = $error
return err("ConnectionStatusChange.listen failed: " & $error)
(await startWaku(addr ctx.myLib[])).isOkOr:
let errMsg = $error
chronicles.error "START_NODE failed", err = errMsg
@ -103,6 +159,8 @@ proc logosdelivery_stop_node(
MessageErrorEvent.dropAllListeners(ctx.myLib[].brokerCtx)
MessageSentEvent.dropAllListeners(ctx.myLib[].brokerCtx)
MessagePropagatedEvent.dropAllListeners(ctx.myLib[].brokerCtx)
MessageReceivedEvent.dropAllListeners(ctx.myLib[].brokerCtx)
EventConnectionStatusChange.dropAllListeners(ctx.myLib[].brokerCtx)
(await ctx.myLib[].stop()).isOkOr:
let errMsg = $error

View File

@ -61,6 +61,7 @@ in stdenv.mkDerivation {
"QUICK_AND_DIRTY_NIMBLE=${if quickAndDirty then "1" else "0"}"
"USE_SYSTEM_NIM=${if useSystemNim then "1" else "0"}"
"LIBRLN_FILE=${zerokitRln}/lib/librln.${if abidir != null then "so" else "a"}"
"POSTGRES=1"
];
configurePhase = ''

View File

@ -1,3 +1,8 @@
{.used.}
import ./test_entry_nodes, ./test_node_conf, ./test_api_send, ./test_api_health
import
./test_entry_nodes,
./test_node_conf,
./test_api_send,
./test_api_subscription,
./test_api_health

View File

@ -13,9 +13,10 @@ import
waku/events/health_events,
waku/common/waku_protocol,
waku/factory/waku_conf
import tools/confutils/cli_args
const TestTimeout = chronos.seconds(10)
const DefaultShard = PubsubTopic("/waku/2/rs/1/0")
const DefaultShard = PubsubTopic("/waku/2/rs/3/0")
const TestContentTopic = ContentTopic("/waku/2/default-content/proto")
proc dummyHandler(
@ -80,7 +81,7 @@ suite "LM API health checking":
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
(await serviceNode.mountRelay()).isOkOr:
raiseAssert error
serviceNode.mountMetadata(1, @[0'u16]).isOkOr:
serviceNode.mountMetadata(3, @[0'u16]).isOkOr:
raiseAssert error
await serviceNode.mountLibp2pPing()
await serviceNode.start()
@ -89,16 +90,15 @@ suite "LM API health checking":
serviceNode.wakuRelay.subscribe(DefaultShard, dummyHandler)
lockNewGlobalBrokerContext:
let conf = NodeConfig.init(
mode = WakuMode.Core,
networkingConfig =
NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0),
protocolsConfig = ProtocolsConfig.init(
entryNodes = @[],
clusterId = 1'u16,
autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1),
),
)
var conf = defaultWakuNodeConf().valueOr:
raiseAssert error
conf.mode = Core
conf.listenAddress = parseIpAddress("0.0.0.0")
conf.tcpPort = Port(0)
conf.discv5UdpPort = Port(0)
conf.clusterId = 3'u16
conf.numShardsInNetwork = 1
conf.rest = false
client = (await createNode(conf)).valueOr:
raiseAssert error
@ -267,17 +267,15 @@ suite "LM API health checking":
var edgeWaku: Waku
lockNewGlobalBrokerContext:
let edgeConf = NodeConfig.init(
mode = WakuMode.Edge,
networkingConfig =
NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0),
protocolsConfig = ProtocolsConfig.init(
entryNodes = @[],
clusterId = 1'u16,
messageValidation =
MessageValidation(maxMessageSize: "150 KiB", rlnConfig: none(RlnConfig)),
),
)
var edgeConf = defaultWakuNodeConf().valueOr:
raiseAssert error
edgeConf.mode = Edge
edgeConf.listenAddress = parseIpAddress("0.0.0.0")
edgeConf.tcpPort = Port(0)
edgeConf.discv5UdpPort = Port(0)
edgeConf.clusterId = 3'u16
edgeConf.maxMessageSize = "150 KiB"
edgeConf.rest = false
edgeWaku = (await createNode(edgeConf)).valueOr:
raiseAssert "Failed to create edge node: " & error

View File

@ -6,7 +6,8 @@ import ../testlib/[common, wakucore, wakunode, testasync]
import ../waku_archive/archive_utils
import
waku, waku/[waku_node, waku_core, waku_relay/protocol, common/broker/broker_context]
import waku/api/api_conf, waku/factory/waku_conf
import waku/factory/waku_conf
import tools/confutils/cli_args
type SendEventOutcome {.pure.} = enum
Sent
@ -116,20 +117,18 @@ proc validate(
for requestId in manager.errorRequestIds:
check requestId == expectedRequestId
proc createApiNodeConf(mode: WakuMode = WakuMode.Core): NodeConfig =
# allocate random ports to avoid port-already-in-use errors
let netConf = NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0)
result = NodeConfig.init(
mode = mode,
protocolsConfig = ProtocolsConfig.init(
entryNodes = @[],
clusterId = 1,
autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1),
),
networkingConfig = netConf,
p2pReliability = true,
)
proc createApiNodeConf(mode: cli_args.WakuMode = cli_args.WakuMode.Core): WakuNodeConf =
var conf = defaultWakuNodeConf().valueOr:
raiseAssert error
conf.mode = mode
conf.listenAddress = parseIpAddress("0.0.0.0")
conf.tcpPort = Port(0)
conf.discv5UdpPort = Port(0)
conf.clusterId = 3'u16
conf.numShardsInNetwork = 1
conf.reliabilityEnabled = true
conf.rest = false
result = conf
suite "Waku API - Send":
var
@ -153,7 +152,7 @@ suite "Waku API - Send":
lockNewGlobalBrokerContext:
relayNode1 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
relayNode1.mountMetadata(1, @[0'u16]).isOkOr:
relayNode1.mountMetadata(3, @[0'u16]).isOkOr:
raiseAssert "Failed to mount metadata: " & error
(await relayNode1.mountRelay()).isOkOr:
raiseAssert "Failed to mount relay"
@ -163,7 +162,7 @@ suite "Waku API - Send":
lockNewGlobalBrokerContext:
relayNode2 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
relayNode2.mountMetadata(1, @[0'u16]).isOkOr:
relayNode2.mountMetadata(3, @[0'u16]).isOkOr:
raiseAssert "Failed to mount metadata: " & error
(await relayNode2.mountRelay()).isOkOr:
raiseAssert "Failed to mount relay"
@ -173,7 +172,7 @@ suite "Waku API - Send":
lockNewGlobalBrokerContext:
lightpushNode =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
lightpushNode.mountMetadata(1, @[0'u16]).isOkOr:
lightpushNode.mountMetadata(3, @[0'u16]).isOkOr:
raiseAssert "Failed to mount metadata: " & error
(await lightpushNode.mountRelay()).isOkOr:
raiseAssert "Failed to mount relay"
@ -185,7 +184,7 @@ suite "Waku API - Send":
lockNewGlobalBrokerContext:
storeNode =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
storeNode.mountMetadata(1, @[0'u16]).isOkOr:
storeNode.mountMetadata(3, @[0'u16]).isOkOr:
raiseAssert "Failed to mount metadata: " & error
(await storeNode.mountRelay()).isOkOr:
raiseAssert "Failed to mount relay"
@ -210,7 +209,7 @@ suite "Waku API - Send":
storeNodePeerId = storeNode.peerInfo.peerId
# Subscribe all relay nodes to the default shard topic
const testPubsubTopic = PubsubTopic("/waku/2/rs/1/0")
const testPubsubTopic = PubsubTopic("/waku/2/rs/3/0")
proc dummyHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
@ -387,7 +386,7 @@ suite "Waku API - Send":
lockNewGlobalBrokerContext:
fakeLightpushNode =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
fakeLightpushNode.mountMetadata(1, @[0'u16]).isOkOr:
fakeLightpushNode.mountMetadata(3, @[0'u16]).isOkOr:
raiseAssert "Failed to mount metadata: " & error
(await fakeLightpushNode.mountRelay()).isOkOr:
raiseAssert "Failed to mount relay"
@ -402,13 +401,13 @@ suite "Waku API - Send":
discard
fakeLightpushNode.subscribe(
(kind: PubsubSub, topic: PubsubTopic("/waku/2/rs/1/0")), dummyHandler
(kind: PubsubSub, topic: PubsubTopic("/waku/2/rs/3/0")), dummyHandler
).isOkOr:
raiseAssert "Failed to subscribe fakeLightpushNode: " & error
var node: Waku
lockNewGlobalBrokerContext:
node = (await createNode(createApiNodeConf(WakuMode.Edge))).valueOr:
node = (await createNode(createApiNodeConf(cli_args.WakuMode.Edge))).valueOr:
raiseAssert error
(await startWaku(addr node)).isOkOr:
raiseAssert "Failed to start Waku node: " & error

View File

@ -0,0 +1,400 @@
{.used.}
import std/[strutils, net, options, sets]
import chronos, testutils/unittests, stew/byteutils
import libp2p/[peerid, peerinfo, multiaddress, crypto/crypto]
import ../testlib/[common, wakucore, wakunode, testasync]
import
waku,
waku/[
waku_node,
waku_core,
common/broker/broker_context,
events/message_events,
waku_relay/protocol,
]
import waku/factory/waku_conf
import tools/confutils/cli_args
# TODO: Edge testing (after MAPI edge support is completed)
const TestTimeout = chronos.seconds(10)
const NegativeTestTimeout = chronos.seconds(2)
type ReceiveEventListenerManager = ref object
brokerCtx: BrokerContext
receivedListener: MessageReceivedEventListener
receivedEvent: AsyncEvent
receivedMessages: seq[WakuMessage]
targetCount: int
proc newReceiveEventListenerManager(
brokerCtx: BrokerContext, expectedCount: int = 1
): ReceiveEventListenerManager =
let manager = ReceiveEventListenerManager(
brokerCtx: brokerCtx, receivedMessages: @[], targetCount: expectedCount
)
manager.receivedEvent = newAsyncEvent()
manager.receivedListener = MessageReceivedEvent
.listen(
brokerCtx,
proc(event: MessageReceivedEvent) {.async: (raises: []).} =
manager.receivedMessages.add(event.message)
if manager.receivedMessages.len >= manager.targetCount:
manager.receivedEvent.fire()
,
)
.expect("Failed to listen to MessageReceivedEvent")
return manager
proc teardown(manager: ReceiveEventListenerManager) =
MessageReceivedEvent.dropListener(manager.brokerCtx, manager.receivedListener)
proc waitForEvents(
manager: ReceiveEventListenerManager, timeout: Duration
): Future[bool] {.async.} =
return await manager.receivedEvent.wait().withTimeout(timeout)
type TestNetwork = ref object
publisher: WakuNode
subscriber: Waku
publisherPeerInfo: RemotePeerInfo
proc createApiNodeConf(
mode: cli_args.WakuMode = cli_args.WakuMode.Core, numShards: uint16 = 1
): WakuNodeConf =
var conf = defaultWakuNodeConf().valueOr:
raiseAssert error
conf.mode = mode
conf.listenAddress = parseIpAddress("0.0.0.0")
conf.tcpPort = Port(0)
conf.discv5UdpPort = Port(0)
conf.clusterId = 3'u16
conf.numShardsInNetwork = numShards
conf.reliabilityEnabled = true
conf.rest = false
result = conf
proc setupSubscriberNode(conf: WakuNodeConf): Future[Waku] {.async.} =
var node: Waku
lockNewGlobalBrokerContext:
node = (await createNode(conf)).expect("Failed to create subscriber node")
(await startWaku(addr node)).expect("Failed to start subscriber node")
return node
proc setupNetwork(
numShards: uint16 = 1, mode: cli_args.WakuMode = cli_args.WakuMode.Core
): Future[TestNetwork] {.async.} =
var net = TestNetwork()
lockNewGlobalBrokerContext:
net.publisher =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
net.publisher.mountMetadata(3, @[0'u16]).expect("Failed to mount metadata")
(await net.publisher.mountRelay()).expect("Failed to mount relay")
await net.publisher.mountLibp2pPing()
await net.publisher.start()
net.publisherPeerInfo = net.publisher.peerInfo.toRemotePeerInfo()
proc dummyHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
discard
# Subscribe the publisher to all shards to guarantee a GossipSub mesh with the subscriber.
# Currently, Core/Relay nodes auto-subscribe to all network shards on boot, but if
# that changes, this will be needed to cause the publisher to have shard interest
# for any shards the subscriber may want to use, which is required for waitForMesh to work.
for i in 0 ..< numShards.int:
let shard = PubsubTopic("/waku/2/rs/3/" & $i)
net.publisher.subscribe((kind: PubsubSub, topic: shard), dummyHandler).expect(
"Failed to sub publisher"
)
net.subscriber = await setupSubscriberNode(createApiNodeConf(mode, numShards))
await net.subscriber.node.connectToNodes(@[net.publisherPeerInfo])
return net
proc teardown(net: TestNetwork) {.async.} =
if not isNil(net.subscriber):
(await net.subscriber.stop()).expect("Failed to stop subscriber node")
net.subscriber = nil
if not isNil(net.publisher):
await net.publisher.stop()
net.publisher = nil
proc getRelayShard(node: WakuNode, contentTopic: ContentTopic): PubsubTopic =
let autoSharding = node.wakuAutoSharding.get()
let shardObj = autoSharding.getShard(contentTopic).expect("Failed to get shard")
return PubsubTopic($shardObj)
proc waitForMesh(node: WakuNode, shard: PubsubTopic) {.async.} =
for _ in 0 ..< 50:
if node.wakuRelay.getNumPeersInMesh(shard).valueOr(0) > 0:
return
await sleepAsync(100.milliseconds)
raise newException(ValueError, "GossipSub Mesh failed to stabilize on " & shard)
proc publishToMesh(
net: TestNetwork, contentTopic: ContentTopic, payload: seq[byte]
): Future[Result[int, string]] {.async.} =
let shard = net.subscriber.node.getRelayShard(contentTopic)
await waitForMesh(net.publisher, shard)
let msg = WakuMessage(
payload: payload, contentTopic: contentTopic, version: 0, timestamp: now()
)
return await net.publisher.publish(some(shard), msg)
suite "Messaging API, SubscriptionManager":
asyncTest "Subscription API, relay node auto subscribe and receive message":
let net = await setupNetwork(1)
defer:
await net.teardown()
let testTopic = ContentTopic("/waku/2/test-content/proto")
(await net.subscriber.subscribe(testTopic)).expect(
"subscriberNode failed to subscribe"
)
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
defer:
eventManager.teardown()
discard (await net.publishToMesh(testTopic, "Hello, world!".toBytes())).expect(
"Publish failed"
)
require await eventManager.waitForEvents(TestTimeout)
require eventManager.receivedMessages.len == 1
check eventManager.receivedMessages[0].contentTopic == testTopic
asyncTest "Subscription API, relay node ignores unsubscribed content topics on same shard":
let net = await setupNetwork(1)
defer:
await net.teardown()
let subbedTopic = ContentTopic("/waku/2/subbed-topic/proto")
let ignoredTopic = ContentTopic("/waku/2/ignored-topic/proto")
(await net.subscriber.subscribe(subbedTopic)).expect("failed to subscribe")
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
defer:
eventManager.teardown()
discard (await net.publishToMesh(ignoredTopic, "Ghost Msg".toBytes())).expect(
"Publish failed"
)
check not await eventManager.waitForEvents(NegativeTestTimeout)
check eventManager.receivedMessages.len == 0
asyncTest "Subscription API, relay node unsubscribe stops message receipt":
let net = await setupNetwork(1)
defer:
await net.teardown()
let testTopic = ContentTopic("/waku/2/unsub-test/proto")
(await net.subscriber.subscribe(testTopic)).expect("failed to subscribe")
net.subscriber.unsubscribe(testTopic).expect("failed to unsubscribe")
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
defer:
eventManager.teardown()
discard (await net.publishToMesh(testTopic, "Should be dropped".toBytes())).expect(
"Publish failed"
)
check not await eventManager.waitForEvents(NegativeTestTimeout)
check eventManager.receivedMessages.len == 0
asyncTest "Subscription API, overlapping topics on same shard maintain correct isolation":
let net = await setupNetwork(1)
defer:
await net.teardown()
let topicA = ContentTopic("/waku/2/topic-a/proto")
let topicB = ContentTopic("/waku/2/topic-b/proto")
(await net.subscriber.subscribe(topicA)).expect("failed to sub A")
(await net.subscriber.subscribe(topicB)).expect("failed to sub B")
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
defer:
eventManager.teardown()
net.subscriber.unsubscribe(topicA).expect("failed to unsub A")
discard (await net.publishToMesh(topicA, "Dropped Message".toBytes())).expect(
"Publish A failed"
)
discard
(await net.publishToMesh(topicB, "Kept Msg".toBytes())).expect("Publish B failed")
require await eventManager.waitForEvents(TestTimeout)
require eventManager.receivedMessages.len == 1
check eventManager.receivedMessages[0].contentTopic == topicB
asyncTest "Subscription API, redundant subs tolerated and subs are removed":
let net = await setupNetwork(1)
defer:
await net.teardown()
let glitchTopic = ContentTopic("/waku/2/glitch/proto")
(await net.subscriber.subscribe(glitchTopic)).expect("failed to sub")
(await net.subscriber.subscribe(glitchTopic)).expect("failed to double sub")
net.subscriber.unsubscribe(glitchTopic).expect("failed to unsub")
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
defer:
eventManager.teardown()
discard (await net.publishToMesh(glitchTopic, "Ghost Msg".toBytes())).expect(
"Publish failed"
)
check not await eventManager.waitForEvents(NegativeTestTimeout)
check eventManager.receivedMessages.len == 0
asyncTest "Subscription API, resubscribe to an unsubscribed topic":
let net = await setupNetwork(1)
defer:
await net.teardown()
let testTopic = ContentTopic("/waku/2/resub-test/proto")
# Subscribe
(await net.subscriber.subscribe(testTopic)).expect("Initial sub failed")
var eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
discard
(await net.publishToMesh(testTopic, "Msg 1".toBytes())).expect("Pub 1 failed")
require await eventManager.waitForEvents(TestTimeout)
eventManager.teardown()
# Unsubscribe and verify teardown
net.subscriber.unsubscribe(testTopic).expect("Unsub failed")
eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
discard
(await net.publishToMesh(testTopic, "Ghost".toBytes())).expect("Ghost pub failed")
check not await eventManager.waitForEvents(NegativeTestTimeout)
eventManager.teardown()
# Resubscribe
(await net.subscriber.subscribe(testTopic)).expect("Resub failed")
eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
discard
(await net.publishToMesh(testTopic, "Msg 2".toBytes())).expect("Pub 2 failed")
require await eventManager.waitForEvents(TestTimeout)
check eventManager.receivedMessages[0].payload == "Msg 2".toBytes()
asyncTest "Subscription API, two content topics in different shards":
let net = await setupNetwork(8)
defer:
await net.teardown()
var topicA = ContentTopic("/appA/2/shard-test-a/proto")
var topicB = ContentTopic("/appB/2/shard-test-b/proto")
# generate two content topics that land in two different shards
var i = 0
while net.subscriber.node.getRelayShard(topicA) ==
net.subscriber.node.getRelayShard(topicB):
topicB = ContentTopic("/appB" & $i & "/2/shard-test-b/proto")
inc i
(await net.subscriber.subscribe(topicA)).expect("failed to sub A")
(await net.subscriber.subscribe(topicB)).expect("failed to sub B")
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 2)
defer:
eventManager.teardown()
discard (await net.publishToMesh(topicA, "Msg on Shard A".toBytes())).expect(
"Publish A failed"
)
discard (await net.publishToMesh(topicB, "Msg on Shard B".toBytes())).expect(
"Publish B failed"
)
require await eventManager.waitForEvents(TestTimeout)
require eventManager.receivedMessages.len == 2
asyncTest "Subscription API, many content topics in many shards":
let net = await setupNetwork(8)
defer:
await net.teardown()
var allTopics: seq[ContentTopic]
for i in 0 ..< 100:
allTopics.add(ContentTopic("/stress-app-" & $i & "/2/state-test/proto"))
var activeSubs: seq[ContentTopic]
proc verifyNetworkState(expected: seq[ContentTopic]) {.async.} =
let eventManager =
newReceiveEventListenerManager(net.subscriber.brokerCtx, expected.len)
for topic in allTopics:
discard (await net.publishToMesh(topic, "Stress Payload".toBytes())).expect(
"publish failed"
)
require await eventManager.waitForEvents(TestTimeout)
# here we just give a chance for any messages that we don't expect to arrive
await sleepAsync(1.seconds)
eventManager.teardown()
# weak check (but catches most bugs)
require eventManager.receivedMessages.len == expected.len
# strict expected receipt test
var receivedTopics = initHashSet[ContentTopic]()
for msg in eventManager.receivedMessages:
receivedTopics.incl(msg.contentTopic)
var expectedTopics = initHashSet[ContentTopic]()
for t in expected:
expectedTopics.incl(t)
check receivedTopics == expectedTopics
# subscribe to all content topics we generated
for t in allTopics:
(await net.subscriber.subscribe(t)).expect("sub failed")
activeSubs.add(t)
await verifyNetworkState(activeSubs)
# unsubscribe from some content topics
for i in 0 ..< 50:
let t = allTopics[i]
net.subscriber.unsubscribe(t).expect("unsub failed")
let idx = activeSubs.find(t)
if idx >= 0:
activeSubs.del(idx)
await verifyNetworkState(activeSubs)
# re-subscribe to some content topics
for i in 0 ..< 25:
let t = allTopics[i]
(await net.subscriber.subscribe(t)).expect("resub failed")
activeSubs.add(t)
await verifyNetworkState(activeSubs)

View File

@ -2,7 +2,7 @@
import std/options, results, testutils/unittests
import waku/api/entry_nodes
import tools/confutils/entry_nodes
# Since classifyEntryNode is internal, we test it indirectly through processEntryNodes behavior
# The enum is exported so we can test against it

File diff suppressed because it is too large Load Diff

View File

@ -3,49 +3,49 @@
import chronos, testutils/unittests, std/options
import waku
import tools/confutils/cli_args
suite "Waku API - Create node":
asyncTest "Create node with minimal configuration":
## Given
let nodeConfig = NodeConfig.init(
protocolsConfig = ProtocolsConfig.init(entryNodes = @[], clusterId = 1)
)
var nodeConf = defaultWakuNodeConf().valueOr:
raiseAssert error
nodeConf.mode = Core
nodeConf.clusterId = 3'u16
nodeConf.rest = false
# This is the actual minimal config but as the node auto-start, it is not suitable for tests
# NodeConfig.init(ethRpcEndpoints = @["http://someaddress"])
## When
let node = (await createNode(nodeConfig)).valueOr:
let node = (await createNode(nodeConf)).valueOr:
raiseAssert error
## Then
check:
not node.isNil()
node.conf.clusterId == 1
node.conf.clusterId == 3
node.conf.relay == true
asyncTest "Create node with full configuration":
## Given
let nodeConfig = NodeConfig.init(
mode = Core,
protocolsConfig = ProtocolsConfig.init(
entryNodes =
@[
"enr:-QESuEC1p_s3xJzAC_XlOuuNrhVUETmfhbm1wxRGis0f7DlqGSw2FM-p2Vn7gmfkTTnAe8Ys2cgGBN8ufJnvzKQFZqFMBgmlkgnY0iXNlY3AyNTZrMaEDS8-D878DrdbNwcuY-3p1qdDp5MOoCurhdsNPJTXZ3c5g3RjcIJ2X4N1ZHCCd2g"
],
staticStoreNodes =
@[
"/ip4/127.0.0.1/tcp/60000/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"
],
clusterId = 99,
autoShardingConfig = AutoShardingConfig(numShardsInCluster: 16),
messageValidation =
MessageValidation(maxMessageSize: "1024 KiB", rlnConfig: none(RlnConfig)),
),
)
var nodeConf = defaultWakuNodeConf().valueOr:
raiseAssert error
nodeConf.mode = Core
nodeConf.clusterId = 99'u16
nodeConf.rest = false
nodeConf.numShardsInNetwork = 16
nodeConf.maxMessageSize = "1024 KiB"
nodeConf.entryNodes =
@[
"enr:-QESuEC1p_s3xJzAC_XlOuuNrhVUETmfhbm1wxRGis0f7DlqGSw2FM-p2Vn7gmfkTTnAe8Ys2cgGBN8ufJnvzKQFZqFMBgmlkgnY0iXNlY3AyNTZrMaEDS8-D878DrdbNwcuY-3p1qdDp5MOoCurhdsNPJTXZ3c5g3RjcIJ2X4N1ZHCCd2g"
]
nodeConf.staticnodes =
@[
"/ip4/127.0.0.1/tcp/60000/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"
]
## When
let node = (await createNode(nodeConfig)).valueOr:
let node = (await createNode(nodeConf)).valueOr:
raiseAssert error
## Then
@ -62,20 +62,19 @@ suite "Waku API - Create node":
asyncTest "Create node with mixed entry nodes (enrtree, multiaddr)":
## Given
let nodeConfig = NodeConfig.init(
mode = Core,
protocolsConfig = ProtocolsConfig.init(
entryNodes =
@[
"enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im",
"/ip4/127.0.0.1/tcp/60000/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc",
],
clusterId = 42,
),
)
var nodeConf = defaultWakuNodeConf().valueOr:
raiseAssert error
nodeConf.mode = Core
nodeConf.clusterId = 42'u16
nodeConf.rest = false
nodeConf.entryNodes =
@[
"enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im",
"/ip4/127.0.0.1/tcp/60000/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc",
]
## When
let node = (await createNode(nodeConfig)).valueOr:
let node = (await createNode(nodeConf)).valueOr:
raiseAssert error
## Then

View File

@ -374,6 +374,12 @@ procSuite "WakuNode - Store":
waitFor allFutures(client.stop(), server.stop())
test "Store protocol queries overrun request rate limitation":
when defined(macosx):
# on macos CI, this test is resulting a code 200 (OK) instead of a 429 error
# means the runner is somehow too slow to cause a request limit failure
skip()
return
## Setup
let
serverKey = generateSecp256k1Key()

View File

@ -21,7 +21,7 @@ suite "Wakunode2 - Waku":
raiseAssert error
## When
let version = waku.version
let version = waku.stateInfo.getNodeInfoItem(NodeInfoId.Version)
## Then
check:

View File

@ -30,7 +30,8 @@ import
waku_core/message/default_values,
waku_mix,
],
../../tools/rln_keystore_generator/rln_keystore_generator
../../tools/rln_keystore_generator/rln_keystore_generator,
./entry_nodes
import ./envvar as confEnvvarDefs, ./envvar_net as confEnvvarNet
@ -52,6 +53,11 @@ type StartUpCommand* = enum
noCommand # default, runs waku
generateRlnKeystore # generates a new RLN keystore
type WakuMode* {.pure.} = enum
noMode # default - use explicit CLI flags as-is
Core # full service node
Edge # client-only node
type WakuNodeConf* = object
configFile* {.
desc: "Loads configuration from a TOML file (cmd-line parameters take precedence)",
@ -150,9 +156,16 @@ type WakuNodeConf* = object
.}: seq[ProtectedShard]
## General node config
mode* {.
desc:
"Node operation mode. 'Core' enables relay+service protocols. 'Edge' enables client-only protocols. Default: explicit CLI flags used.",
defaultValue: WakuMode.noMode,
name: "mode"
.}: WakuMode
preset* {.
desc:
"Network preset to use. 'twn' is The RLN-protected Waku Network (cluster 1). Overrides other values.",
"Network preset to use. 'twn' is The RLN-protected Waku Network (cluster 1). 'logos.dev' is the Logos Dev Network (cluster 2). Overrides other values.",
defaultValue: "",
name: "preset"
.}: string
@ -165,7 +178,7 @@ type WakuNodeConf* = object
.}: uint16
agentString* {.
defaultValue: "nwaku-" & cli_args.git_version,
defaultValue: "logos-delivery-" & cli_args.git_version,
desc: "Node agent string which is used as identifier in network",
name: "agent-string"
.}: string
@ -293,6 +306,14 @@ hence would have reachability issues.""",
name: "rln-relay-dynamic"
.}: bool
entryNodes* {.
desc:
"Entry node address (enrtree:, enr:, or multiaddr). " &
"Automatically classified and distributed to DNS discovery, discv5 bootstrap, " &
"and static nodes. Argument may be repeated.",
name: "entry-node"
.}: seq[string]
staticnodes* {.
desc: "Peer multiaddr to directly connect with. Argument may be repeated.",
name: "staticnode"
@ -453,13 +474,15 @@ hence would have reachability issues.""",
desc:
"""Adds an extra effort in the delivery/reception of messages by leveraging store-v3 requests.
with the drawback of consuming some more bandwidth.""",
defaultValue: false,
defaultValue: true,
name: "reliability"
.}: bool
## REST HTTP config
rest* {.
desc: "Enable Waku REST HTTP server: true|false", defaultValue: true, name: "rest"
desc: "Enable Waku REST HTTP server: true|false",
defaultValue: false,
name: "rest"
.}: bool
restAddress* {.
@ -907,12 +930,19 @@ proc toNetworkConf(
"TWN - The Waku Network configuration will not be applied when `--cluster-id=1` is passed in future releases. Use `--preset=twn` instead."
)
lcPreset = "twn"
if clusterId.isSome() and clusterId.get() == 2:
warn(
"Logos.dev - Logos.dev configuration will not be applied when `--cluster-id=2` is passed in future releases. Use `--preset=logos.dev` instead."
)
lcPreset = "logos.dev"
case lcPreset
of "":
ok(none(NetworkConf))
of "twn":
ok(some(NetworkConf.TheWakuNetworkConf()))
of "logos.dev", "logosdev":
ok(some(NetworkConf.LogosDevConf()))
else:
err("Invalid --preset value passed: " & lcPreset)
@ -982,6 +1012,26 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] =
b.withRelayShardedPeerManagement(n.relayShardedPeerManagement)
b.withStaticNodes(n.staticNodes)
# Process entry nodes - supports enrtree:, enr:, and multiaddress formats
if n.entryNodes.len > 0:
let (enrTreeUrls, bootstrapEnrs, staticNodesFromEntry) = processEntryNodes(
n.entryNodes
).valueOr:
return err("Failed to process entry nodes: " & error)
# Set ENRTree URLs for DNS discovery
if enrTreeUrls.len > 0:
for url in enrTreeUrls:
b.dnsDiscoveryConf.withEnrTreeUrl(url)
# Set ENR records as bootstrap nodes for discv5
if bootstrapEnrs.len > 0:
b.discv5Conf.withBootstrapNodes(bootstrapEnrs)
# Add static nodes (multiaddrs and those extracted from ENR entries)
if staticNodesFromEntry.len > 0:
b.withStaticNodes(staticNodesFromEntry)
if n.numShardsInNetwork != 0:
b.withNumShardsInCluster(n.numShardsInNetwork)
b.withShardingConf(AutoSharding)
@ -1069,9 +1119,31 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] =
b.webSocketConf.withKeyPath(n.websocketSecureKeyPath)
b.webSocketConf.withCertPath(n.websocketSecureCertPath)
b.rateLimitConf.withRateLimits(n.rateLimits)
if n.rateLimits.len > 0:
b.rateLimitConf.withRateLimits(n.rateLimits)
b.kademliaDiscoveryConf.withEnabled(n.enableKadDiscovery)
b.kademliaDiscoveryConf.withBootstrapNodes(n.kadBootstrapNodes)
# Mode-driven configuration overrides
case n.mode
of WakuMode.Core:
b.withRelay(true)
b.filterServiceConf.withEnabled(true)
b.withLightPush(true)
b.discv5Conf.withEnabled(true)
b.withPeerExchange(true)
b.withRendezvous(true)
b.rateLimitConf.withRateLimitsIfNotAssigned(
@["filter:100/1s", "lightpush:5/1s", "px:5/1s"]
)
of WakuMode.Edge:
b.withPeerExchange(true)
b.withRelay(false)
b.filterServiceConf.withEnabled(false)
b.withLightPush(false)
b.storeServiceConf.withEnabled(false)
of WakuMode.noMode:
discard # use explicit CLI flags as-is
return b.build()

View File

@ -0,0 +1,143 @@
import std/[macros]
type ConfigOptionMeta* = object
fieldName*: string
typeName*: string
cliName*: string
desc*: string
defaultValue*: string
command*: string
proc getPragmaValue(pragmaNode: NimNode, pragmaName: string): string {.compileTime.} =
if pragmaNode.kind != nnkPragma:
return ""
for item in pragmaNode:
if item.kind == nnkExprColonExpr and item[0].eqIdent(pragmaName):
return item[1].repr
return ""
proc getFieldName(fieldNode: NimNode): string {.compileTime.} =
case fieldNode.kind
of nnkPragmaExpr:
if fieldNode.len >= 1:
return getFieldName(fieldNode[0])
of nnkPostfix:
if fieldNode.len >= 2:
return getFieldName(fieldNode[1])
of nnkIdent, nnkSym:
return fieldNode.strVal
else:
discard
return fieldNode.repr
proc getFieldAndPragma(
fieldDef: NimNode
): tuple[fieldName, typeName: string, pragmaNode: NimNode] {.compileTime.} =
if fieldDef.kind != nnkIdentDefs:
return ("", "", newNimNode(nnkEmpty))
let declaredField = fieldDef[0]
var typeNode = fieldDef[1]
var pragmaNode = newNimNode(nnkEmpty)
if declaredField.kind == nnkPragmaExpr:
pragmaNode = declaredField[1]
elif typeNode.kind == nnkPragmaExpr:
pragmaNode = typeNode[1]
typeNode = typeNode[0]
return (getFieldName(declaredField), typeNode.repr, pragmaNode)
proc makeMetaNode(
fieldName, typeName, cliName, desc, defaultValue, command: string
): NimNode {.compileTime.} =
result = newTree(
nnkObjConstr,
ident("ConfigOptionMeta"),
newTree(nnkExprColonExpr, ident("fieldName"), newLit(fieldName)),
newTree(nnkExprColonExpr, ident("typeName"), newLit(typeName)),
newTree(nnkExprColonExpr, ident("cliName"), newLit(cliName)),
newTree(nnkExprColonExpr, ident("desc"), newLit(desc)),
newTree(nnkExprColonExpr, ident("defaultValue"), newLit(defaultValue)),
newTree(nnkExprColonExpr, ident("command"), newLit(command)),
)
macro extractConfigOptionMeta*(T: typedesc): untyped =
proc findFirstRecList(n: NimNode): NimNode {.compileTime.} =
if n.kind == nnkRecList:
return n
for child in n:
let found = findFirstRecList(child)
if not found.isNil:
return found
return nil
proc collectRecList(
recList: NimNode, metas: var seq[NimNode], commandCtx: string
) {.compileTime.} =
for child in recList:
case child.kind
of nnkIdentDefs:
let (fieldName, typeName, pragmaNode) = getFieldAndPragma(child)
if fieldName.len == 0:
continue
let cliName = block:
let n = getPragmaValue(pragmaNode, "name")
if n.len > 0: n else: fieldName
let desc = getPragmaValue(pragmaNode, "desc")
let defaultValue = getPragmaValue(pragmaNode, "defaultValue")
metas.add(
makeMetaNode(fieldName, typeName, cliName, desc, defaultValue, commandCtx)
)
of nnkRecCase:
let discriminator = child[0]
if discriminator.kind == nnkIdentDefs:
let (fieldName, typeName, pragmaNode) = getFieldAndPragma(discriminator)
if fieldName.len > 0:
let cliName = block:
let n = getPragmaValue(pragmaNode, "name")
if n.len > 0: n else: fieldName
let desc = getPragmaValue(pragmaNode, "desc")
let defaultValue = getPragmaValue(pragmaNode, "defaultValue")
metas.add(
makeMetaNode(fieldName, typeName, cliName, desc, defaultValue, commandCtx)
)
for i in 1 ..< child.len:
let branch = child[i]
case branch.kind
of nnkOfBranch:
let branchCtx = branch[0].repr
for j in 1 ..< branch.len:
if branch[j].kind == nnkRecList:
collectRecList(branch[j], metas, branchCtx)
of nnkElse:
for j in 0 ..< branch.len:
if branch[j].kind == nnkRecList:
collectRecList(branch[j], metas, commandCtx)
else:
discard
else:
discard
let typeInst = getTypeInst(T)
var targetType = T
if typeInst.kind == nnkBracketExpr and typeInst.len >= 2:
targetType = typeInst[1]
let typeImpl = getImpl(targetType)
let recList = findFirstRecList(typeImpl)
if recList.isNil:
return newTree(nnkPrefix, ident("@"), newNimNode(nnkBracket))
var metas: seq[NimNode] = @[]
collectRecList(recList, metas, "")
let bracket = newNimNode(nnkBracket)
for node in metas:
bracket.add(node)
result = newTree(nnkPrefix, ident("@"), bracket)

2
vendor/nim-metrics vendored

@ -1 +1 @@
Subproject commit 11d0cddfb0e711aa2a8c75d1892ae24a64c299fc
Subproject commit a1296caf3ebb5f30f51a5feae7749a30df2824c2

View File

@ -1,4 +1,5 @@
import ./api/[api, api_conf, entry_nodes]
import ./api/[api, api_conf]
import ./events/message_events
import tools/confutils/entry_nodes
export api, api_conf, entry_nodes, message_events

View File

@ -1,18 +1,20 @@
import chronicles, chronos, results, std/strutils
import chronicles, chronos, results
import waku/factory/waku
import waku/[requests/health_requests, waku_core, waku_node]
import waku/node/delivery_service/send_service
import waku/node/delivery_service/subscription_service
import waku/node/delivery_service/subscription_manager
import libp2p/peerid
import ../../tools/confutils/cli_args
import ./[api_conf, types]
export cli_args
logScope:
topics = "api"
# TODO: Specs says it should return a `WakuNode`. As `send` and other APIs are defined, we can align.
proc createNode*(config: NodeConfig): Future[Result[Waku, string]] {.async.} =
let wakuConf = toWakuConf(config).valueOr:
proc createNode*(conf: WakuNodeConf): Future[Result[Waku, string]] {.async.} =
let wakuConf = conf.toWakuConf().valueOr:
return err("Failed to handle the configuration: " & error)
## We are not defining app callbacks at node creation
@ -36,18 +38,27 @@ proc subscribe*(
): Future[Result[void, string]] {.async.} =
?checkApiAvailability(w)
return w.deliveryService.subscriptionService.subscribe(contentTopic)
return w.deliveryService.subscriptionManager.subscribe(contentTopic)
proc unsubscribe*(w: Waku, contentTopic: ContentTopic): Result[void, string] =
?checkApiAvailability(w)
return w.deliveryService.subscriptionService.unsubscribe(contentTopic)
return w.deliveryService.subscriptionManager.unsubscribe(contentTopic)
proc send*(
w: Waku, envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async.} =
?checkApiAvailability(w)
let isSubbed = w.deliveryService.subscriptionManager
.isSubscribed(envelope.contentTopic)
.valueOr(false)
if not isSubbed:
info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic
w.deliveryService.subscriptionManager.subscribe(envelope.contentTopic).isOkOr:
warn "Failed to auto-subscribe", error = error
return err("Failed to auto-subscribe before sending: " & error)
let requestId = RequestId.new(w.rng)
let deliveryTask = DeliveryTask.new(requestId, envelope, w.brokerCtx).valueOr:

View File

@ -9,7 +9,7 @@ import
waku/factory/waku_conf,
waku/factory/conf_builder/conf_builder,
waku/factory/networks_config,
./entry_nodes
tools/confutils/entry_nodes
export json_serialization, json_options
@ -85,7 +85,9 @@ type WakuMode* {.pure.} = enum
Edge
Core
type NodeConfig* {.requiresInit.} = object
type NodeConfig* {.
requiresInit, deprecated: "Use WakuNodeConf from tools/confutils/cli_args instead"
.} = object
mode: WakuMode
protocolsConfig: ProtocolsConfig
networkingConfig: NetworkingConfig
@ -154,7 +156,9 @@ proc logLevel*(c: NodeConfig): LogLevel =
proc logFormat*(c: NodeConfig): LogFormat =
c.logFormat
proc toWakuConf*(nodeConfig: NodeConfig): Result[WakuConf, string] =
proc toWakuConf*(
nodeConfig: NodeConfig
): Result[WakuConf, string] {.deprecated: "Use WakuNodeConf.toWakuConf instead".} =
var b = WakuConfBuilder.init()
# Apply log configuration
@ -516,7 +520,10 @@ proc readValue*(
proc decodeNodeConfigFromJson*(
jsonStr: string
): NodeConfig {.raises: [SerializationError].} =
): NodeConfig {.
raises: [SerializationError],
deprecated: "Use WakuNodeConf with fieldPairs-based JSON parsing instead"
.} =
var val = NodeConfig.init() # default-initialized
try:
var stream = unsafeMemoryInput(jsonStr)

View File

@ -1,6 +1,4 @@
import waku/common/broker/event_broker
import waku/api/types
import waku/waku_core/message
import waku/[api/types, waku_core/message, waku_core/topics, common/broker/event_broker]
export types
@ -28,3 +26,9 @@ EventBroker:
type MessageReceivedEvent* = object
messageHash*: string
message*: WakuMessage
EventBroker:
# Internal event emitted when a message arrives from the network via any protocol
type MessageSeenEvent* = object
topic*: PubsubTopic
message*: WakuMessage

View File

@ -22,6 +22,12 @@ proc withEnabled*(b: var FilterServiceConfBuilder, enabled: bool) =
proc withMaxPeersToServe*(b: var FilterServiceConfBuilder, maxPeersToServe: uint32) =
b.maxPeersToServe = some(maxPeersToServe)
proc withMaxPeersToServeIfNotAssigned*(
b: var FilterServiceConfBuilder, maxPeersToServe: uint32
) =
if b.maxPeersToServe.isNone():
b.maxPeersToServe = some(maxPeersToServe)
proc withSubscriptionTimeout*(
b: var FilterServiceConfBuilder, subscriptionTimeout: uint16
) =

View File

@ -14,6 +14,12 @@ proc init*(T: type RateLimitConfBuilder): RateLimitConfBuilder =
proc withRateLimits*(b: var RateLimitConfBuilder, rateLimits: seq[string]) =
b.strValue = some(rateLimits)
proc withRateLimitsIfNotAssigned*(
b: var RateLimitConfBuilder, rateLimits: seq[string]
) =
if b.strValue.isNone() or b.strValue.get().len == 0:
b.strValue = some(rateLimits)
proc build*(b: RateLimitConfBuilder): Result[ProtocolRateLimitSettings, string] =
if b.strValue.isSome() and b.objValue.isSome():
return err("Rate limits conf must only be set once on the builder")

View File

@ -12,7 +12,8 @@ import
../networks_config,
../../common/logging,
../../common/utils/parse_size_units,
../../waku_enr/capabilities
../../waku_enr/capabilities,
tools/confutils/entry_nodes
import
./filter_service_conf_builder,
@ -393,6 +394,42 @@ proc applyNetworkConf(builder: var WakuConfBuilder) =
discarded = builder.discv5Conf.bootstrapNodes
builder.discv5Conf.withBootstrapNodes(networkConf.discv5BootstrapNodes)
if networkConf.enableKadDiscovery:
if not builder.kademliaDiscoveryConf.enabled:
builder.kademliaDiscoveryConf.withEnabled(networkConf.enableKadDiscovery)
if builder.kademliaDiscoveryConf.bootstrapNodes.len == 0 and
networkConf.kadBootstrapNodes.len > 0:
builder.kademliaDiscoveryConf.withBootstrapNodes(networkConf.kadBootstrapNodes)
if networkConf.mix:
if builder.mix.isNone:
builder.mix = some(networkConf.mix)
if builder.p2pReliability.isNone:
builder.withP2pReliability(networkConf.p2pReliability)
# Process entry nodes from network config - classify and distribute
if networkConf.entryNodes.len > 0:
let processed = processEntryNodes(networkConf.entryNodes)
if processed.isOk():
let (enrTreeUrls, bootstrapEnrs, staticNodesFromEntry) = processed.get()
# Set ENRTree URLs for DNS discovery
if enrTreeUrls.len > 0:
for url in enrTreeUrls:
builder.dnsDiscoveryConf.withEnrTreeUrl(url)
# Set ENR records as bootstrap nodes for discv5
if bootstrapEnrs.len > 0:
builder.discv5Conf.withBootstrapNodes(bootstrapEnrs)
# Add static nodes (multiaddrs and those extracted from ENR entries)
if staticNodesFromEntry.len > 0:
builder.withStaticNodes(staticNodesFromEntry)
else:
warn "Failed to process entry nodes from network conf", error = processed.error()
proc build*(
builder: var WakuConfBuilder, rng: ref HmacDrbgContext = crypto.newRng()
): Result[WakuConf, string] =
@ -606,7 +643,7 @@ proc build*(
provided = maxConnections, recommended = DefaultMaxConnections
# TODO: Do the git version thing here
let agentString = builder.agentString.get("nwaku")
let agentString = builder.agentString.get("logos-delivery")
# TODO: use `DefaultColocationLimit`. the user of this value should
# probably be defining a config object

View File

@ -29,6 +29,11 @@ type NetworkConf* = object
shardingConf*: ShardingConf
discv5Discovery*: bool
discv5BootstrapNodes*: seq[string]
enableKadDiscovery*: bool
kadBootstrapNodes*: seq[string]
entryNodes*: seq[string]
mix*: bool
p2pReliability*: bool
# cluster-id=1 (aka The Waku Network)
# Cluster configuration corresponding to The Waku Network. Note that it
@ -45,6 +50,11 @@ proc TheWakuNetworkConf*(T: type NetworkConf): NetworkConf =
rlnEpochSizeSec: 600,
rlnRelayUserMessageLimit: 100,
shardingConf: ShardingConf(kind: AutoSharding, numShardsInCluster: 8),
enableKadDiscovery: false,
kadBootstrapNodes: @[],
entryNodes: @[],
mix: false,
p2pReliability: false,
discv5Discovery: true,
discv5BootstrapNodes:
@[
@ -54,6 +64,36 @@ proc TheWakuNetworkConf*(T: type NetworkConf): NetworkConf =
],
)
# cluster-id=2 (Logos Dev Network)
# Cluster configuration for the Logos Dev Network.
proc LogosDevConf*(T: type NetworkConf): NetworkConf =
const ZeroChainId = 0'u256
return NetworkConf(
maxMessageSize: "150KiB",
clusterId: 2,
rlnRelay: false,
rlnRelayEthContractAddress: "",
rlnRelayDynamic: false,
rlnRelayChainId: ZeroChainId,
rlnEpochSizeSec: 0,
rlnRelayUserMessageLimit: 0,
shardingConf: ShardingConf(kind: AutoSharding, numShardsInCluster: 8),
enableKadDiscovery: true,
mix: true,
p2pReliability: true,
discv5Discovery: true,
discv5BootstrapNodes: @[],
entryNodes:
@[
"/dns4/delivery-01.do-ams3.logos.dev.status.im/tcp/30303/p2p/16Uiu2HAmTUbnxLGT9JvV6mu9oPyDjqHK4Phs1VDJNUgESgNSkuby",
"/dns4/delivery-02.do-ams3.logos.dev.status.im/tcp/30303/p2p/16Uiu2HAmMK7PYygBtKUQ8EHp7EfaD3bCEsJrkFooK8RQ2PVpJprH",
"/dns4/delivery-01.gc-us-central1-a.logos.dev.status.im/tcp/30303/p2p/16Uiu2HAm4S1JYkuzDKLKQvwgAhZKs9otxXqt8SCGtB4hoJP1S397",
"/dns4/delivery-02.gc-us-central1-a.logos.dev.status.im/tcp/30303/p2p/16Uiu2HAm8Y9kgBNtjxvCnf1X6gnZJW5EGE4UwwCL3CCm55TwqBiH",
"/dns4/delivery-01.ac-cn-hongkong-c.logos.dev.status.im/tcp/30303/p2p/16Uiu2HAm8YokiNun9BkeA1ZRmhLbtNUvcwRr64F69tYj9fkGyuEP",
"/dns4/delivery-02.ac-cn-hongkong-c.logos.dev.status.im/tcp/30303/p2p/16Uiu2HAkvwhGHKNry6LACrB8TmEFoCJKEX29XR5dDUzk3UT3UNSE",
],
)
proc validateShards*(
shardingConf: ShardingConf, shards: seq[uint16]
): Result[void, string] =

View File

@ -35,6 +35,7 @@ import
node/health_monitor,
node/waku_metrics,
node/delivery_service/delivery_service,
node/delivery_service/subscription_manager,
rest_api/message_cache,
rest_api/endpoint/server,
rest_api/endpoint/builder as rest_server_builder,
@ -46,7 +47,8 @@ import
factory/internal_config,
factory/app_callbacks,
],
./waku_conf
./waku_conf,
./waku_state_info
logScope:
topics = "wakunode waku"
@ -55,7 +57,7 @@ logScope:
const git_version* {.strdefine.} = "n/a"
type Waku* = ref object
version: string
stateInfo*: WakuStateInfo
conf*: WakuConf
rng*: ref HmacDrbgContext
@ -78,9 +80,6 @@ type Waku* = ref object
brokerCtx*: BrokerContext
func version*(waku: Waku): string =
waku.version
proc setupSwitchServices(
waku: Waku, conf: WakuConf, circuitRelay: Relay, rng: ref HmacDrbgContext
) =
@ -215,7 +214,7 @@ proc new*(
return err("could not create delivery service: " & $error)
var waku = Waku(
version: git_version,
stateInfo: WakuStateInfo.init(node),
conf: wakuConf,
rng: rng,
key: wakuConf.nodeKey,
@ -453,7 +452,7 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
).isOkOr:
error "Failed to set RequestProtocolHealth provider", error = error
## Setup RequestHealthReport provider (The lost child)
## Setup RequestHealthReport provider
RequestHealthReport.setProvider(
globalBrokerContext(),
@ -514,6 +513,10 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
if not waku.wakuDiscv5.isNil():
await waku.wakuDiscv5.stop()
if not waku.deliveryService.isNil():
await waku.deliveryService.stopDeliveryService()
waku.deliveryService = nil
if not waku.node.isNil():
await waku.node.stop()

View File

@ -0,0 +1,50 @@
## This module is aimed to collect and provide information about the state of the node,
## such as its version, metrics values, etc.
## It has been originally designed to be used by the debug API, which acts as a consumer of
## this information, but any other module can populate the information it needs to be
## accessible through the debug API.
import std/[tables, sequtils, strutils]
import metrics, eth/p2p/discoveryv5/enr, libp2p/peerid
import waku/waku_node
type
NodeInfoId* {.pure.} = enum
Version
Metrics
MyMultiaddresses
MyENR
MyPeerId
WakuStateInfo* {.requiresInit.} = object
node: WakuNode
proc getAllPossibleInfoItemIds*(self: WakuStateInfo): seq[NodeInfoId] =
## Returns all possible options that can be queried to learn about the node's information.
var ret = newSeq[NodeInfoId](0)
for item in NodeInfoId:
ret.add(item)
return ret
proc getMetrics(): string =
{.gcsafe.}:
return defaultRegistry.toText() ## defaultRegistry is {.global.} in metrics module
proc getNodeInfoItem*(self: WakuStateInfo, infoItemId: NodeInfoId): string =
## Returns the content of the info item with the given id if it exists.
case infoItemId
of NodeInfoId.Version:
return git_version
of NodeInfoId.Metrics:
return getMetrics()
of NodeInfoId.MyMultiaddresses:
return self.node.info().listenAddresses.join(",")
of NodeInfoId.MyENR:
return self.node.enr.toURI()
of NodeInfoId.MyPeerId:
return $PeerId(self.node.peerId())
else:
return "unknown info item id"
proc init*(T: typedesc[WakuStateInfo], node: WakuNode): T =
return WakuStateInfo(node: node)

View File

@ -5,7 +5,7 @@ import chronos
import
./recv_service,
./send_service,
./subscription_service,
./subscription_manager,
waku/[
waku_core,
waku_node,
@ -18,29 +18,31 @@ import
type DeliveryService* = ref object
sendService*: SendService
recvService: RecvService
subscriptionService*: SubscriptionService
subscriptionManager*: SubscriptionManager
proc new*(
T: type DeliveryService, useP2PReliability: bool, w: WakuNode
): Result[T, string] =
## storeClient is needed to give store visitility to DeliveryService
## wakuRelay and wakuLightpushClient are needed to give a mechanism to SendService to re-publish
let subscriptionService = SubscriptionService.new(w)
let sendService = ?SendService.new(useP2PReliability, w, subscriptionService)
let recvService = RecvService.new(w, subscriptionService)
let subscriptionManager = SubscriptionManager.new(w)
let sendService = ?SendService.new(useP2PReliability, w, subscriptionManager)
let recvService = RecvService.new(w, subscriptionManager)
return ok(
DeliveryService(
sendService: sendService,
recvService: recvService,
subscriptionService: subscriptionService,
subscriptionManager: subscriptionManager,
)
)
proc startDeliveryService*(self: DeliveryService) =
self.sendService.startSendService()
self.subscriptionManager.startSubscriptionManager()
self.recvService.startRecvService()
self.sendService.startSendService()
proc stopDeliveryService*(self: DeliveryService) {.async.} =
self.sendService.stopSendService()
await self.sendService.stopSendService()
await self.recvService.stopRecvService()
await self.subscriptionManager.stopSubscriptionManager()

View File

@ -2,9 +2,9 @@
## receive and is backed by store-v3 requests to get an additional degree of certainty
##
import std/[tables, sequtils, options]
import std/[tables, sequtils, options, sets]
import chronos, chronicles, libp2p/utility
import ../[subscription_service]
import ../[subscription_manager]
import
waku/[
waku_core,
@ -13,6 +13,7 @@ import
waku_filter_v2/client,
waku_core/topics,
events/delivery_events,
events/message_events,
waku_node,
common/broker/broker_context,
]
@ -35,14 +36,9 @@ type RecvMessage = object
type RecvService* = ref object of RootObj
brokerCtx: BrokerContext
topicsInterest: Table[PubsubTopic, seq[ContentTopic]]
## Tracks message verification requests and when was the last time a
## pubsub topic was verified for missing messages
## The key contains pubsub-topics
node: WakuNode
onSubscribeListener: OnFilterSubscribeEventListener
onUnsubscribeListener: OnFilterUnsubscribeEventListener
subscriptionService: SubscriptionService
seenMsgListener: MessageSeenEventListener
subscriptionManager: SubscriptionManager
recentReceivedMsgs: seq[RecvMessage]
@ -95,20 +91,20 @@ proc msgChecker(self: RecvService) {.async.} =
self.endTimeToCheck = getNowInNanosecondTime()
var msgHashesInStore = newSeq[WakuMessageHash](0)
for pubsubTopic, cTopics in self.topicsInterest.pairs:
for sub in self.subscriptionManager.getActiveSubscriptions():
let storeResp: StoreQueryResponse = (
await self.node.wakuStoreClient.queryToAny(
StoreQueryRequest(
includeData: false,
pubsubTopic: some(PubsubTopic(pubsubTopic)),
contentTopics: cTopics,
pubsubTopic: some(PubsubTopic(sub.pubsubTopic)),
contentTopics: sub.contentTopics,
startTime: some(self.startTimeToCheck - DelayExtra.nanos),
endTime: some(self.endTimeToCheck + DelayExtra.nanos),
)
)
).valueOr:
error "msgChecker failed to get remote msgHashes",
pubsubTopic, cTopics, error = $error
pubsubTopic = sub.pubsubTopic, cTopics = sub.contentTopics, error = $error
continue
msgHashesInStore.add(storeResp.messages.mapIt(it.messageHash))
@ -133,31 +129,20 @@ proc msgChecker(self: RecvService) {.async.} =
## update next check times
self.startTimeToCheck = self.endTimeToCheck
proc onSubscribe(
self: RecvService, pubsubTopic: string, contentTopics: seq[string]
) {.gcsafe, raises: [].} =
info "onSubscribe", pubsubTopic, contentTopics
self.topicsInterest.withValue(pubsubTopic, contentTopicsOfInterest):
contentTopicsOfInterest[].add(contentTopics)
do:
self.topicsInterest[pubsubTopic] = contentTopics
proc processIncomingMessageOfInterest(
self: RecvService, pubsubTopic: string, message: WakuMessage
) =
## Resolve an incoming network message that was already filtered by topic.
## Deduplicate (by hash), store (saves in recently-seen messages) and emit
## the MAPI MessageReceivedEvent for every unique incoming message.
proc onUnsubscribe(
self: RecvService, pubsubTopic: string, contentTopics: seq[string]
) {.gcsafe, raises: [].} =
info "onUnsubscribe", pubsubTopic, contentTopics
let msgHash = computeMessageHash(pubsubTopic, message)
if not self.recentReceivedMsgs.anyIt(it.msgHash == msgHash):
let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp)
self.recentReceivedMsgs.add(rxMsg)
MessageReceivedEvent.emit(self.brokerCtx, msgHash.to0xHex(), message)
self.topicsInterest.withValue(pubsubTopic, contentTopicsOfInterest):
let remainingCTopics =
contentTopicsOfInterest[].filterIt(not contentTopics.contains(it))
contentTopicsOfInterest[] = remainingCTopics
if remainingCTopics.len == 0:
self.topicsInterest.del(pubsubTopic)
do:
error "onUnsubscribe unsubscribing from wrong topic", pubsubTopic, contentTopics
proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionService): T =
proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionManager): T =
## The storeClient will help to acquire any possible missed messages
let now = getNowInNanosecondTime()
@ -165,22 +150,13 @@ proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionService): T =
node: node,
startTimeToCheck: now,
brokerCtx: node.brokerCtx,
subscriptionService: s,
topicsInterest: initTable[PubsubTopic, seq[ContentTopic]](),
subscriptionManager: s,
recentReceivedMsgs: @[],
)
if not node.wakuFilterClient.isNil():
let filterPushHandler = proc(
pubsubTopic: PubsubTopic, message: WakuMessage
) {.async, closure.} =
## Captures all the messages received through filter
let msgHash = computeMessageHash(pubSubTopic, message)
let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp)
recvService.recentReceivedMsgs.add(rxMsg)
node.wakuFilterClient.registerPushHandler(filterPushHandler)
# TODO: For MAPI Edge support, either call node.wakuFilterClient.registerPushHandler
# so that the RecvService listens to incoming filter messages,
# or have the filter client emit MessageSeenEvent.
return recvService
@ -194,26 +170,26 @@ proc startRecvService*(self: RecvService) =
self.msgCheckerHandler = self.msgChecker()
self.msgPrunerHandler = self.loopPruneOldMessages()
self.onSubscribeListener = OnFilterSubscribeEvent.listen(
self.seenMsgListener = MessageSeenEvent.listen(
self.brokerCtx,
proc(subsEv: OnFilterSubscribeEvent) {.async: (raises: []).} =
self.onSubscribe(subsEv.pubsubTopic, subsEv.contentTopics),
).valueOr:
error "Failed to set OnFilterSubscribeEvent listener", error = error
quit(QuitFailure)
proc(event: MessageSeenEvent) {.async: (raises: []).} =
if not self.subscriptionManager.isSubscribed(
event.topic, event.message.contentTopic
):
trace "skipping message as I am not subscribed",
shard = event.topic, contenttopic = event.message.contentTopic
return
self.onUnsubscribeListener = OnFilterUnsubscribeEvent.listen(
self.brokerCtx,
proc(subsEv: OnFilterUnsubscribeEvent) {.async: (raises: []).} =
self.onUnsubscribe(subsEv.pubsubTopic, subsEv.contentTopics),
self.processIncomingMessageOfInterest(event.topic, event.message),
).valueOr:
error "Failed to set OnFilterUnsubscribeEvent listener", error = error
error "Failed to set MessageSeenEvent listener", error = error
quit(QuitFailure)
proc stopRecvService*(self: RecvService) {.async.} =
OnFilterSubscribeEvent.dropListener(self.brokerCtx, self.onSubscribeListener)
OnFilterUnsubscribeEvent.dropListener(self.brokerCtx, self.onUnsubscribeListener)
MessageSeenEvent.dropListener(self.brokerCtx, self.seenMsgListener)
if not self.msgCheckerHandler.isNil():
await self.msgCheckerHandler.cancelAndWait()
self.msgCheckerHandler = nil
if not self.msgPrunerHandler.isNil():
await self.msgPrunerHandler.cancelAndWait()
self.msgPrunerHandler = nil

View File

@ -5,7 +5,7 @@ import std/[sequtils, tables, options]
import chronos, chronicles, libp2p/utility
import
./[send_processor, relay_processor, lightpush_processor, delivery_task],
../[subscription_service],
../[subscription_manager],
waku/[
waku_core,
node/waku_node,
@ -58,7 +58,7 @@ type SendService* = ref object of RootObj
node: WakuNode
checkStoreForMessages: bool
subscriptionService: SubscriptionService
subscriptionManager: SubscriptionManager
proc setupSendProcessorChain(
peerManager: PeerManager,
@ -99,7 +99,7 @@ proc new*(
T: typedesc[SendService],
preferP2PReliability: bool,
w: WakuNode,
s: SubscriptionService,
s: SubscriptionManager,
): Result[T, string] =
if w.wakuRelay.isNil() and w.wakuLightpushClient.isNil():
return err(
@ -120,7 +120,7 @@ proc new*(
sendProcessor: sendProcessorChain,
node: w,
checkStoreForMessages: checkStoreForMessages,
subscriptionService: s,
subscriptionManager: s,
)
return ok(sendService)
@ -250,9 +250,9 @@ proc serviceLoop(self: SendService) {.async.} =
proc startSendService*(self: SendService) =
self.serviceLoopHandle = self.serviceLoop()
proc stopSendService*(self: SendService) =
proc stopSendService*(self: SendService) {.async.} =
if not self.serviceLoopHandle.isNil():
discard self.serviceLoopHandle.cancelAndWait()
await self.serviceLoopHandle.cancelAndWait()
proc send*(self: SendService, task: DeliveryTask) {.async.} =
assert(not task.isNil(), "task for send must not be nil")
@ -260,7 +260,7 @@ proc send*(self: SendService, task: DeliveryTask) {.async.} =
info "SendService.send: processing delivery task",
requestId = task.requestId, msgHash = task.msgHash.to0xHex()
self.subscriptionService.subscribe(task.msg.contentTopic).isOkOr:
self.subscriptionManager.subscribe(task.msg.contentTopic).isOkOr:
error "SendService.send: failed to subscribe to content topic",
contentTopic = task.msg.contentTopic, error = error

View File

@ -0,0 +1,164 @@
import std/[sets, tables, options, strutils], chronos, chronicles, results
import
waku/[
waku_core,
waku_core/topics,
waku_core/topics/sharding,
waku_node,
waku_relay,
common/broker/broker_context,
events/delivery_events,
]
type SubscriptionManager* = ref object of RootObj
node: WakuNode
contentTopicSubs: Table[PubsubTopic, HashSet[ContentTopic]]
## Map of Shard to ContentTopic needed because e.g. WakuRelay is PubsubTopic only.
## A present key with an empty HashSet value means pubsubtopic already subscribed
## (via subscribePubsubTopics()) but there's no specific content topic interest yet.
proc new*(T: typedesc[SubscriptionManager], node: WakuNode): T =
SubscriptionManager(
node: node, contentTopicSubs: initTable[PubsubTopic, HashSet[ContentTopic]]()
)
proc addContentTopicInterest(
self: SubscriptionManager, shard: PubsubTopic, topic: ContentTopic
): Result[void, string] =
if not self.contentTopicSubs.hasKey(shard):
self.contentTopicSubs[shard] = initHashSet[ContentTopic]()
self.contentTopicSubs.withValue(shard, cTopics):
if not cTopics[].contains(topic):
cTopics[].incl(topic)
# TODO: Call a "subscribe(shard, topic)" on filter client here,
# so the filter client can know that subscriptions changed.
return ok()
proc removeContentTopicInterest(
self: SubscriptionManager, shard: PubsubTopic, topic: ContentTopic
): Result[void, string] =
self.contentTopicSubs.withValue(shard, cTopics):
if cTopics[].contains(topic):
cTopics[].excl(topic)
if cTopics[].len == 0 and isNil(self.node.wakuRelay):
self.contentTopicSubs.del(shard) # We're done with cTopics here
# TODO: Call a "unsubscribe(shard, topic)" on filter client here,
# so the filter client can know that subscriptions changed.
return ok()
proc subscribePubsubTopics(
self: SubscriptionManager, shards: seq[PubsubTopic]
): Result[void, string] =
if isNil(self.node.wakuRelay):
return err("subscribePubsubTopics requires a Relay")
var errors: seq[string] = @[]
for shard in shards:
if not self.contentTopicSubs.hasKey(shard):
self.node.subscribe((kind: PubsubSub, topic: shard), nil).isOkOr:
errors.add("shard " & shard & ": " & error)
continue
self.contentTopicSubs[shard] = initHashSet[ContentTopic]()
if errors.len > 0:
return err("subscribeShard errors: " & errors.join("; "))
return ok()
proc startSubscriptionManager*(self: SubscriptionManager) =
if isNil(self.node.wakuRelay):
return
if self.node.wakuAutoSharding.isSome():
# Subscribe relay to all shards in autosharding.
let autoSharding = self.node.wakuAutoSharding.get()
let clusterId = autoSharding.clusterId
let numShards = autoSharding.shardCountGenZero
if numShards > 0:
var clusterPubsubTopics = newSeqOfCap[PubsubTopic](numShards)
for i in 0 ..< numShards:
let shardObj = RelayShard(clusterId: clusterId, shardId: uint16(i))
clusterPubsubTopics.add(PubsubTopic($shardObj))
self.subscribePubsubTopics(clusterPubsubTopics).isOkOr:
error "Failed to auto-subscribe Relay to cluster shards: ", error = error
else:
info "SubscriptionManager has no AutoSharding configured; skipping auto-subscribe."
proc stopSubscriptionManager*(self: SubscriptionManager) {.async.} =
discard
proc getActiveSubscriptions*(
self: SubscriptionManager
): seq[tuple[pubsubTopic: string, contentTopics: seq[ContentTopic]]] =
var activeSubs: seq[tuple[pubsubTopic: string, contentTopics: seq[ContentTopic]]] =
@[]
for pubsub, cTopicSet in self.contentTopicSubs.pairs:
if cTopicSet.len > 0:
var cTopicSeq = newSeqOfCap[ContentTopic](cTopicSet.len)
for t in cTopicSet:
cTopicSeq.add(t)
activeSubs.add((pubsub, cTopicSeq))
return activeSubs
proc getShardForContentTopic(
self: SubscriptionManager, topic: ContentTopic
): Result[PubsubTopic, string] =
if self.node.wakuAutoSharding.isSome():
let shardObj = ?self.node.wakuAutoSharding.get().getShard(topic)
return ok($shardObj)
return err("SubscriptionManager requires AutoSharding")
proc isSubscribed*(
self: SubscriptionManager, topic: ContentTopic
): Result[bool, string] =
let shard = ?self.getShardForContentTopic(topic)
return ok(
self.contentTopicSubs.hasKey(shard) and self.contentTopicSubs[shard].contains(topic)
)
proc isSubscribed*(
self: SubscriptionManager, shard: PubsubTopic, contentTopic: ContentTopic
): bool {.raises: [].} =
self.contentTopicSubs.withValue(shard, cTopics):
return cTopics[].contains(contentTopic)
return false
proc subscribe*(self: SubscriptionManager, topic: ContentTopic): Result[void, string] =
if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient):
return err("SubscriptionManager requires either Relay or Filter Client.")
let shard = ?self.getShardForContentTopic(topic)
if not isNil(self.node.wakuRelay) and not self.contentTopicSubs.hasKey(shard):
?self.subscribePubsubTopics(@[shard])
?self.addContentTopicInterest(shard, topic)
return ok()
proc unsubscribe*(
self: SubscriptionManager, topic: ContentTopic
): Result[void, string] =
if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient):
return err("SubscriptionManager requires either Relay or Filter Client.")
let shard = ?self.getShardForContentTopic(topic)
if self.isSubscribed(shard, topic):
?self.removeContentTopicInterest(shard, topic)
return ok()

View File

@ -1,64 +0,0 @@
import chronos, chronicles
import
waku/[
waku_core,
waku_core/topics,
events/message_events,
waku_node,
common/broker/broker_context,
]
type SubscriptionService* = ref object of RootObj
brokerCtx: BrokerContext
node: WakuNode
proc new*(T: typedesc[SubscriptionService], node: WakuNode): T =
## The storeClient will help to acquire any possible missed messages
return SubscriptionService(brokerCtx: node.brokerCtx, node: node)
proc isSubscribed*(
self: SubscriptionService, topic: ContentTopic
): Result[bool, string] =
var isSubscribed = false
if self.node.wakuRelay.isNil() == false:
return self.node.isSubscribed((kind: ContentSub, topic: topic))
# TODO: Add support for edge mode with Filter subscription management
return ok(isSubscribed)
#TODO: later PR may consider to refactor or place this function elsewhere
# The only important part is that it emits MessageReceivedEvent
proc getReceiveHandler(self: SubscriptionService): WakuRelayHandler =
return proc(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
let msgHash = computeMessageHash(topic, msg).to0xHex()
info "API received message",
pubsubTopic = topic, contentTopic = msg.contentTopic, msgHash = msgHash
MessageReceivedEvent.emit(self.brokerCtx, msgHash, msg)
proc subscribe*(self: SubscriptionService, topic: ContentTopic): Result[void, string] =
let isSubscribed = self.isSubscribed(topic).valueOr:
error "Failed to check subscription status: ", error = error
return err("Failed to check subscription status: " & error)
if isSubscribed == false:
if self.node.wakuRelay.isNil() == false:
self.node.subscribe((kind: ContentSub, topic: topic), self.getReceiveHandler()).isOkOr:
error "Failed to subscribe: ", error = error
return err("Failed to subscribe: " & error)
# TODO: Add support for edge mode with Filter subscription management
return ok()
proc unsubscribe*(
self: SubscriptionService, topic: ContentTopic
): Result[void, string] =
if self.node.wakuRelay.isNil() == false:
self.node.unsubscribe((kind: ContentSub, topic: topic)).isOkOr:
error "Failed to unsubscribe: ", error = error
return err("Failed to unsubscribe: " & error)
# TODO: Add support for edge mode with Filter subscription management
return ok()

View File

@ -405,13 +405,8 @@ proc calculateConnectionState*(
elif kind in FilterClientProtocols:
filterCount = max(filterCount, strength)
debug "calculateConnectionState",
protocol = kind,
strength = strength,
relayCount = relayCount,
storeClientCount = storeClientCount,
lightpushCount = lightpushCount,
filterCount = filterCount
debug "calculateConnectionState",
relayCount, storeClientCount, lightpushCount, filterCount
# Relay connectivity should be a sufficient check in Core mode.
# "Store peers" are relay peers because incoming messages in
@ -528,6 +523,9 @@ proc healthLoop(hm: NodeHealthMonitor) {.async.} =
let newConnectionStatus = hm.calculateConnectionState()
if newConnectionStatus != hm.connectionStatus:
debug "connectionStatus change",
oldstatus = hm.connectionStatus, newstatus = newConnectionStatus
hm.connectionStatus = newConnectionStatus
EventConnectionStatusChange.emit(hm.node.brokerCtx, newConnectionStatus)

View File

@ -19,16 +19,20 @@ import
libp2p/utility
import
../waku_node,
../../waku_relay,
../../waku_core,
../../waku_core/topics/sharding,
../../waku_filter_v2,
../../waku_archive_legacy,
../../waku_archive,
../../waku_store_sync,
../peer_manager,
../../waku_rln_relay
waku/[
waku_relay,
waku_core,
waku_core/topics/sharding,
waku_filter_v2,
waku_archive_legacy,
waku_archive,
waku_store_sync,
waku_rln_relay,
node/waku_node,
node/peer_manager,
common/broker/broker_context,
events/message_events,
]
export waku_relay.WakuRelayHandler
@ -44,14 +48,25 @@ logScope:
## Waku relay
proc registerRelayHandler(
node: WakuNode, topic: PubsubTopic, appHandler: WakuRelayHandler
) =
node: WakuNode, topic: PubsubTopic, appHandler: WakuRelayHandler = nil
): bool =
## Registers the only handler for the given topic.
## Notice that this handler internally calls other handlers, such as filter,
## archive, etc, plus the handler provided by the application.
## Returns `true` if a mesh subscription was created or `false` if the relay
## was already subscribed to the topic.
if node.wakuRelay.isSubscribed(topic):
return
let alreadySubscribed = node.wakuRelay.isSubscribed(topic)
if not appHandler.isNil():
if not alreadySubscribed or not node.legacyAppHandlers.hasKey(topic):
node.legacyAppHandlers[topic] = appHandler
else:
debug "Legacy appHandler already exists for active PubsubTopic, ignoring new handler",
topic = topic
if alreadySubscribed:
return false
proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
let msgSizeKB = msg.payload.len / 1000
@ -82,6 +97,9 @@ proc registerRelayHandler(
node.wakuStoreReconciliation.messageIngress(topic, msg)
proc internalHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
MessageSeenEvent.emit(node.brokerCtx, topic, msg)
let uniqueTopicHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
@ -89,7 +107,15 @@ proc registerRelayHandler(
await filterHandler(topic, msg)
await archiveHandler(topic, msg)
await syncHandler(topic, msg)
await appHandler(topic, msg)
await internalHandler(topic, msg)
# Call the legacy (kernel API) app handler if it exists.
# Normally, hasKey is false and the MessageSeenEvent bus (new API) is used instead.
# But we need to support legacy behavior (kernel API use), hence this.
# NOTE: We can delete `legacyAppHandlers` if instead we refactor WakuRelay to support multiple
# PubsubTopic handlers, since that's actually supported by libp2p PubSub (bigger refactor...)
if node.legacyAppHandlers.hasKey(topic) and not node.legacyAppHandlers[topic].isNil():
await node.legacyAppHandlers[topic](topic, msg)
node.wakuRelay.subscribe(topic, uniqueTopicHandler)
@ -115,8 +141,11 @@ proc subscribe*(
): Result[void, string] =
## Subscribes to a PubSub or Content topic. Triggers handler when receiving messages on
## this topic. WakuRelayHandler is a method that takes a topic and a Waku message.
## If `handler` is nil, the API call will subscribe to the topic in the relay mesh
## but no app handler will be registered at this time (it can be registered later with
## another call to this proc for the same gossipsub topic).
if node.wakuRelay.isNil():
if isNil(node.wakuRelay):
error "Invalid API call to `subscribe`. WakuRelay not mounted."
return err("Invalid API call to `subscribe`. WakuRelay not mounted.")
@ -124,13 +153,15 @@ proc subscribe*(
error "Failed to decode subscription event", error = error
return err("Failed to decode subscription event: " & error)
if node.wakuRelay.isSubscribed(pubsubTopic):
warn "No-effect API call to subscribe. Already subscribed to topic", pubsubTopic
return ok()
info "subscribe", pubsubTopic, contentTopicOp
node.registerRelayHandler(pubsubTopic, handler)
node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic))
if node.registerRelayHandler(pubsubTopic, handler):
info "subscribe", pubsubTopic, contentTopicOp
node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic))
else:
if isNil(handler):
warn "No-effect API call to subscribe. Already subscribed to topic", pubsubTopic
else:
info "subscribe (was already subscribed in the mesh; appHandler set)",
pubsubTopic = pubsubTopic
return ok()
@ -138,8 +169,10 @@ proc unsubscribe*(
node: WakuNode, subscription: SubscriptionEvent
): Result[void, string] =
## Unsubscribes from a specific PubSub or Content topic.
## This will both unsubscribe from the relay mesh and remove the app handler, if any.
## NOTE: This works because using MAPI and Kernel API at the same time is unsupported.
if node.wakuRelay.isNil():
if isNil(node.wakuRelay):
error "Invalid API call to `unsubscribe`. WakuRelay not mounted."
return err("Invalid API call to `unsubscribe`. WakuRelay not mounted.")
@ -147,13 +180,20 @@ proc unsubscribe*(
error "Failed to decode unsubscribe event", error = error
return err("Failed to decode unsubscribe event: " & error)
if not node.wakuRelay.isSubscribed(pubsubTopic):
warn "No-effect API call to `unsubscribe`. Was not subscribed", pubsubTopic
return ok()
let hadHandler = node.legacyAppHandlers.hasKey(pubsubTopic)
if hadHandler:
node.legacyAppHandlers.del(pubsubTopic)
info "unsubscribe", pubsubTopic, contentTopicOp
node.wakuRelay.unsubscribe(pubsubTopic)
node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic))
if node.wakuRelay.isSubscribed(pubsubTopic):
info "unsubscribe", pubsubTopic, contentTopicOp
node.wakuRelay.unsubscribe(pubsubTopic)
node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic))
else:
if not hadHandler:
warn "No-effect API call to `unsubscribe`. Was not subscribed", pubsubTopic
else:
info "unsubscribe (was not subscribed in the mesh; appHandler removed)",
pubsubTopic = pubsubTopic
return ok()

View File

@ -146,6 +146,8 @@ type
started*: bool # Indicates that node has started listening
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
rateLimitSettings*: ProtocolRateLimitSettings
legacyAppHandlers*: Table[PubsubTopic, WakuRelayHandler]
## Kernel API Relay appHandlers (if any)
wakuMix*: WakuMix
edgeTopicsHealth*: Table[PubsubTopic, TopicHealth]
edgeHealthEvent*: AsyncEvent

View File

@ -28,7 +28,6 @@ import
# It will always be called from main thread anyway.
# Ref: https://nim-lang.org/docs/manual.html#threads-gc-safety
var restServerNotInstalledTab {.threadvar.}: TableRef[string, string]
restServerNotInstalledTab = newTable[string, string]()
export WakuRestServerRef
@ -42,6 +41,9 @@ type RestServerConf* = object
proc startRestServerEssentials*(
nodeHealthMonitor: NodeHealthMonitor, conf: RestServerConf, portsShift: uint16
): Result[WakuRestServerRef, string] =
if restServerNotInstalledTab.isNil:
restServerNotInstalledTab = newTable[string, string]()
let requestErrorHandler: RestRequestErrorHandler = proc(
error: RestRequestError, request: HttpRequestRef
): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} =

View File

@ -5,19 +5,19 @@ import std/tables, results, chronicles, chronos
import ./push_handler, ../topics, ../message
## Subscription manager
type SubscriptionManager* = object
type LegacySubscriptionManager* = object
subscriptions: TableRef[(string, ContentTopic), FilterPushHandler]
proc init*(T: type SubscriptionManager): T =
SubscriptionManager(
proc init*(T: type LegacySubscriptionManager): T =
LegacySubscriptionManager(
subscriptions: newTable[(string, ContentTopic), FilterPushHandler]()
)
proc clear*(m: var SubscriptionManager) =
proc clear*(m: var LegacySubscriptionManager) =
m.subscriptions.clear()
proc registerSubscription*(
m: SubscriptionManager,
m: LegacySubscriptionManager,
pubsubTopic: PubsubTopic,
contentTopic: ContentTopic,
handler: FilterPushHandler,
@ -29,12 +29,12 @@ proc registerSubscription*(
error "failed to register filter subscription", error = getCurrentExceptionMsg()
proc removeSubscription*(
m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic
m: LegacySubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic
) =
m.subscriptions.del((pubsubTopic, contentTopic))
proc notifySubscriptionHandler*(
m: SubscriptionManager,
m: LegacySubscriptionManager,
pubsubTopic: PubsubTopic,
contentTopic: ContentTopic,
message: WakuMessage,
@ -48,5 +48,5 @@ proc notifySubscriptionHandler*(
except CatchableError:
discard
proc getSubscriptionsCount*(m: SubscriptionManager): int =
proc getSubscriptionsCount*(m: LegacySubscriptionManager): int =
m.subscriptions.len()