mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-06-29 13:00:06 +00:00
Merge branch 'master' into test/pr-job-update
This commit is contained in:
commit
a4e7ecc8b6
2
.github/workflows/ci.yml
vendored
2
.github/workflows/ci.yml
vendored
@ -136,7 +136,7 @@ jobs:
|
||||
matrix:
|
||||
os: [ubuntu-22.04, macos-15]
|
||||
runs-on: ${{ matrix.os }}
|
||||
timeout-minutes: 45
|
||||
timeout-minutes: 60
|
||||
|
||||
name: test-${{ matrix.os }}
|
||||
steps:
|
||||
|
||||
@ -11,7 +11,7 @@ private communications.
|
||||
- Examples.
|
||||
- Various tests of above.
|
||||
|
||||
For more details see the [source code](waku/README.md)
|
||||
For more details see the [source code](logos_delivery/waku/README.md)
|
||||
|
||||
## How to Build & Run ( Linux, MacOS & WSL )
|
||||
|
||||
@ -48,11 +48,14 @@ make wakunode2 NIMFLAGS="-d:chronicles_colors:none -d:disableMarchNative"
|
||||
# Run with DNS bootstrapping
|
||||
./build/wakunode2 --dns-discovery --dns-discovery-url=DNS_BOOTSTRAP_NODE_URL
|
||||
|
||||
# Run with the QUIC transport enabled
|
||||
./build/wakunode2 --quic-support=true
|
||||
|
||||
# See available command line options
|
||||
./build/wakunode2 --help
|
||||
```
|
||||
To join the network, you need to know the address of at least one bootstrap node.
|
||||
Please refer to the [Waku README](https://github.com/logos-messaging/logos-delivery/blob/master/waku/README.md) for more information.
|
||||
Please refer to the [Waku README](https://github.com/logos-messaging/logos-delivery/blob/master/logos_delivery/waku/README.md) for more information.
|
||||
|
||||
For more on how to run `wakunode2`, refer to:
|
||||
- [Run using binaries](https://docs.waku.org/run-node/build-source)
|
||||
|
||||
@ -14,7 +14,7 @@ import
|
||||
logos_delivery/waku/[
|
||||
common/enr,
|
||||
common/logging,
|
||||
factory/waku as waku_factory,
|
||||
waku as waku_factory,
|
||||
waku_node,
|
||||
node/waku_metrics,
|
||||
node/peer_manager,
|
||||
|
||||
@ -11,7 +11,7 @@ import
|
||||
../../tools/[rln_keystore_generator/rln_keystore_generator, confutils/cli_args],
|
||||
logos_delivery/waku/[
|
||||
common/logging,
|
||||
factory/waku,
|
||||
waku,
|
||||
node/health_monitor,
|
||||
rest_api/endpoint/builder as rest_server_builder,
|
||||
waku_core/message/default_values,
|
||||
|
||||
20
docs/operators/how-to/configure-quic.md
Normal file
20
docs/operators/how-to/configure-quic.md
Normal file
@ -0,0 +1,20 @@
|
||||
# Configure QUIC transport
|
||||
|
||||
QUIC is a UDP-based transport. Enabling it allows peers to connect to your node over QUIC, in addition to the default TCP transport.
|
||||
|
||||
To enable QUIC, use the `--quic-support` option.
|
||||
Note, the default port for QUIC is 60000.
|
||||
|
||||
```shell
|
||||
wakunode2 --quic-support=true
|
||||
```
|
||||
|
||||
To listen on a different UDP port, use `--quic-port`:
|
||||
|
||||
```shell
|
||||
wakunode2 --quic-support=true --quic-port=<port>
|
||||
```
|
||||
|
||||
QUIC runs alongside the existing TCP transport. The node keeps listening on TCP and announces a `/udp/<port>/quic-v1` address in its ENR, so peers that support QUIC can connect over it while others continue to use TCP.
|
||||
|
||||
If you restrict the node's announced addresses with `--ext-multiaddr-only`, the QUIC address is no longer announced automatically. In that case, include the QUIC multiaddr in `--ext-multiaddr` yourself, for example `/ip4/<ip>/udp/<port>/quic-v1`.
|
||||
@ -132,5 +132,6 @@ This is an index of tutorials explaining how to configure your nwaku node for di
|
||||
4. [Configure store protocol and message store](./configure-store.md)
|
||||
5. [Generate and configure a node key](./configure-key.md)
|
||||
6. [Configure websocket transport](./configure-websocket.md)
|
||||
7. [Run nwaku with rate limiting enabled](./run-with-rln.md)
|
||||
8. [Configure a REST API node](./configure-rest-api.md)
|
||||
7. [Configure QUIC transport](./configure-quic.md)
|
||||
8. [Run nwaku with rate limiting enabled](./run-with-rln.md)
|
||||
9. [Configure a REST API node](./configure-rest-api.md)
|
||||
|
||||
@ -56,6 +56,12 @@ and websocket address
|
||||
/ip4/0.0.0.0/tcp/8000/ws/p2p/16Uiu2HAkzjwwgEAXfeGNMKFPSpc6vGBRqCdTLG5q3Gmk2v4pQw7H
|
||||
```
|
||||
|
||||
If your node is running with QUIC enabled, the log also includes a QUIC address
|
||||
|
||||
```
|
||||
/ip4/0.0.0.0/udp/60000/quic-v1/p2p/16Uiu2HAkzjwwgEAXfeGNMKFPSpc6vGBRqCdTLG5q3Gmk2v4pQw7H
|
||||
```
|
||||
|
||||
You can also query a running node for its listening addresses
|
||||
using the REST API.
|
||||
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <argp.h>
|
||||
#include <getopt.h>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
#include <stdint.h>
|
||||
@ -64,51 +64,35 @@ void *ctx;
|
||||
// For the case of C language we don't need to store a particular userData
|
||||
void *userData = NULL;
|
||||
|
||||
// Arguments parsing
|
||||
static char doc[] = "\nC example that shows how to use the waku library.";
|
||||
static char args_doc[] = "";
|
||||
|
||||
static struct argp_option options[] = {
|
||||
{"host", 'h', "HOST", 0, "IP to listen for for LibP2P traffic. (default: \"0.0.0.0\")"},
|
||||
{"port", 'p', "PORT", 0, "TCP listening port. (default: \"60000\")"},
|
||||
{"key", 'k', "KEY", 0, "P2P node private key as 64 char hex string."},
|
||||
{"relay", 'r', "RELAY", 0, "Enable relay protocol: 1 or 0. (default: 1)"},
|
||||
{"peers", 'a', "PEERS", 0, "Comma-separated list of peer-multiaddress to connect\
|
||||
to. (default: \"\") e.g. \"/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN\""},
|
||||
{0}};
|
||||
|
||||
static error_t parse_opt(int key, char *arg, struct argp_state *state)
|
||||
// Arguments parsing. Uses POSIX getopt so the example builds on glibc and on
|
||||
// macOS/BSD alike (argp is a GNU libc extension not available everywhere).
|
||||
static void parse_args(int argc, char **argv, struct ConfigNode *cfgNode)
|
||||
{
|
||||
|
||||
struct ConfigNode *cfgNode = state->input;
|
||||
switch (key)
|
||||
int opt;
|
||||
while ((opt = getopt(argc, argv, "h:p:k:r:a:")) != -1)
|
||||
{
|
||||
case 'h':
|
||||
snprintf(cfgNode->host, 128, "%s", arg);
|
||||
break;
|
||||
case 'p':
|
||||
cfgNode->port = atoi(arg);
|
||||
break;
|
||||
case 'k':
|
||||
snprintf(cfgNode->key, 128, "%s", arg);
|
||||
break;
|
||||
case 'r':
|
||||
cfgNode->relay = atoi(arg);
|
||||
break;
|
||||
case 'a':
|
||||
snprintf(cfgNode->peers, 2048, "%s", arg);
|
||||
break;
|
||||
case ARGP_KEY_ARG:
|
||||
if (state->arg_num >= 1) /* Too many arguments. */
|
||||
argp_usage(state);
|
||||
break;
|
||||
case ARGP_KEY_END:
|
||||
break;
|
||||
default:
|
||||
return ARGP_ERR_UNKNOWN;
|
||||
switch (opt)
|
||||
{
|
||||
case 'h':
|
||||
snprintf(cfgNode->host, 128, "%s", optarg);
|
||||
break;
|
||||
case 'p':
|
||||
cfgNode->port = atoi(optarg);
|
||||
break;
|
||||
case 'k':
|
||||
snprintf(cfgNode->key, 128, "%s", optarg);
|
||||
break;
|
||||
case 'r':
|
||||
cfgNode->relay = atoi(optarg);
|
||||
break;
|
||||
case 'a':
|
||||
snprintf(cfgNode->peers, 2048, "%s", optarg);
|
||||
break;
|
||||
default:
|
||||
printf("Wrong parameters\n");
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void signal_cond()
|
||||
@ -119,8 +103,6 @@ void signal_cond()
|
||||
pthread_mutex_unlock(&mutex);
|
||||
}
|
||||
|
||||
static struct argp argp = {options, parse_opt, args_doc, doc, 0, 0, 0};
|
||||
|
||||
void event_handler(int callerRet, const char *msg, size_t len, void *userData)
|
||||
{
|
||||
if (callerRet == RET_ERR)
|
||||
@ -315,10 +297,7 @@ int main(int argc, char **argv)
|
||||
cfgNode.storeDbMigration = 0;
|
||||
cfgNode.storeMaxNumDbConnections = 30;
|
||||
|
||||
if (argp_parse(&argp, argc, argv, 0, 0, &cfgNode) == ARGP_ERR_UNKNOWN)
|
||||
{
|
||||
show_help_and_exit();
|
||||
}
|
||||
parse_args(argc, argv, &cfgNode);
|
||||
|
||||
char jsonConfig[5000];
|
||||
snprintf(jsonConfig, 5000, "{ \
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <argp.h>
|
||||
#include <getopt.h>
|
||||
#include <signal.h>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
@ -60,51 +60,35 @@ struct ConfigNode
|
||||
char peers[2048];
|
||||
};
|
||||
|
||||
// Arguments parsing
|
||||
static char doc[] = "\nC example that shows how to use the waku library.";
|
||||
static char args_doc[] = "";
|
||||
|
||||
static struct argp_option options[] = {
|
||||
{"host", 'h', "HOST", 0, "IP to listen for for LibP2P traffic. (default: \"0.0.0.0\")"},
|
||||
{"port", 'p', "PORT", 0, "TCP listening port. (default: \"60000\")"},
|
||||
{"key", 'k', "KEY", 0, "P2P node private key as 64 char hex string."},
|
||||
{"relay", 'r', "RELAY", 0, "Enable relay protocol: 1 or 0. (default: 1)"},
|
||||
{"peers", 'a', "PEERS", 0, "Comma-separated list of peer-multiaddress to connect\
|
||||
to. (default: \"\") e.g. \"/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN\""},
|
||||
{0}};
|
||||
|
||||
static error_t parse_opt(int key, char *arg, struct argp_state *state)
|
||||
// Arguments parsing. Uses POSIX getopt so the example builds on glibc and on
|
||||
// macOS/BSD alike (argp is a GNU libc extension not available everywhere).
|
||||
static void parse_args(int argc, char **argv, struct ConfigNode *cfgNode)
|
||||
{
|
||||
|
||||
struct ConfigNode *cfgNode = (ConfigNode *)state->input;
|
||||
switch (key)
|
||||
int opt;
|
||||
while ((opt = getopt(argc, argv, "h:p:k:r:a:")) != -1)
|
||||
{
|
||||
case 'h':
|
||||
snprintf(cfgNode->host, 128, "%s", arg);
|
||||
break;
|
||||
case 'p':
|
||||
cfgNode->port = atoi(arg);
|
||||
break;
|
||||
case 'k':
|
||||
snprintf(cfgNode->key, 128, "%s", arg);
|
||||
break;
|
||||
case 'r':
|
||||
cfgNode->relay = atoi(arg);
|
||||
break;
|
||||
case 'a':
|
||||
snprintf(cfgNode->peers, 2048, "%s", arg);
|
||||
break;
|
||||
case ARGP_KEY_ARG:
|
||||
if (state->arg_num >= 1) /* Too many arguments. */
|
||||
argp_usage(state);
|
||||
break;
|
||||
case ARGP_KEY_END:
|
||||
break;
|
||||
default:
|
||||
return ARGP_ERR_UNKNOWN;
|
||||
switch (opt)
|
||||
{
|
||||
case 'h':
|
||||
snprintf(cfgNode->host, 128, "%s", optarg);
|
||||
break;
|
||||
case 'p':
|
||||
cfgNode->port = atoi(optarg);
|
||||
break;
|
||||
case 'k':
|
||||
snprintf(cfgNode->key, 128, "%s", optarg);
|
||||
break;
|
||||
case 'r':
|
||||
cfgNode->relay = atoi(optarg);
|
||||
break;
|
||||
case 'a':
|
||||
snprintf(cfgNode->peers, 2048, "%s", optarg);
|
||||
break;
|
||||
default:
|
||||
printf("Wrong parameters\n");
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void event_handler(const char *msg, size_t len)
|
||||
@ -129,8 +113,6 @@ auto cify(F &&f)
|
||||
};
|
||||
}
|
||||
|
||||
static struct argp argp = {options, parse_opt, args_doc, doc, 0, 0, 0};
|
||||
|
||||
// Beginning of UI program logic
|
||||
|
||||
enum PROGRAM_STATE
|
||||
@ -254,10 +236,7 @@ int main(int argc, char **argv)
|
||||
cfgNode.port = 60000;
|
||||
cfgNode.relay = 1;
|
||||
|
||||
if (argp_parse(&argp, argc, argv, 0, 0, &cfgNode) == ARGP_ERR_UNKNOWN)
|
||||
{
|
||||
show_help_and_exit();
|
||||
}
|
||||
parse_args(argc, argv, &cfgNode);
|
||||
|
||||
char jsonConfig[2048];
|
||||
snprintf(jsonConfig, 2048, "{ \
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import tools/confutils/cli_args
|
||||
import logos_delivery/waku/[common/logging, factory/[waku, networks_config]]
|
||||
import logos_delivery/waku/[common/logging, waku, factory/networks_config]
|
||||
import
|
||||
std/[options, strutils, os, sequtils],
|
||||
chronicles,
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
|
||||
import system, std/json
|
||||
import ./json_base_event
|
||||
import ../../logos_delivery/waku/api/types
|
||||
import ../../logos_delivery/api/types
|
||||
|
||||
type JsonConnectionStatusChangeEvent* = ref object of JsonEvent
|
||||
status*: ConnectionStatus
|
||||
|
||||
@ -9,7 +9,7 @@ import
|
||||
metrics,
|
||||
ffi
|
||||
import
|
||||
logos_delivery/waku/factory/waku,
|
||||
logos_delivery/waku/waku,
|
||||
logos_delivery/waku/node/waku_node,
|
||||
logos_delivery/waku/node/health_monitor,
|
||||
library/declare_lib
|
||||
|
||||
@ -2,7 +2,7 @@ import logos_delivery/waku/compat/option_valueor
|
||||
import std/json
|
||||
import chronos, chronicles, results, strutils, libp2p/multiaddress, ffi
|
||||
import
|
||||
logos_delivery/waku/factory/waku,
|
||||
logos_delivery/waku/waku,
|
||||
logos_delivery/waku/discovery/waku_dnsdisc,
|
||||
logos_delivery/waku/discovery/waku_discv5,
|
||||
logos_delivery/waku/waku_core/peers,
|
||||
|
||||
@ -5,7 +5,7 @@ import chronos, chronicles, results, confutils, confutils/std/net, ffi
|
||||
import
|
||||
logos_delivery/waku/node/peer_manager/peer_manager,
|
||||
tools/confutils/cli_args,
|
||||
logos_delivery/waku/factory/waku,
|
||||
logos_delivery/waku/waku,
|
||||
logos_delivery/waku/factory/node_factory,
|
||||
logos_delivery/waku/factory/app_callbacks,
|
||||
logos_delivery/waku/rest_api/endpoint/builder,
|
||||
|
||||
@ -2,7 +2,7 @@ import logos_delivery/waku/compat/option_valueor
|
||||
import std/[sequtils, strutils, tables]
|
||||
import chronicles, chronos, results, options, json, ffi
|
||||
import
|
||||
logos_delivery/waku/factory/waku,
|
||||
logos_delivery/waku/waku,
|
||||
logos_delivery/waku/node/waku_node,
|
||||
logos_delivery/waku/node/peer_manager,
|
||||
library/declare_lib
|
||||
|
||||
@ -1,9 +1,7 @@
|
||||
import std/[json, strutils]
|
||||
import chronos, results, ffi
|
||||
import libp2p/[protocols/ping, switch, multiaddress, multicodec]
|
||||
import
|
||||
logos_delivery/waku/[factory/waku, waku_core/peers, node/waku_node],
|
||||
library/declare_lib
|
||||
import logos_delivery/waku/[waku, waku_core/peers, node/waku_node], library/declare_lib
|
||||
|
||||
proc waku_ping_peer(
|
||||
ctx: ptr FFIContext[LogosDelivery],
|
||||
|
||||
@ -4,7 +4,7 @@ import chronicles, chronos, results, ffi
|
||||
import
|
||||
logos_delivery/waku/waku_filter_v2/client,
|
||||
logos_delivery/waku/waku_core/message/message,
|
||||
logos_delivery/waku/factory/waku,
|
||||
logos_delivery/waku/waku,
|
||||
logos_delivery/waku/waku_relay,
|
||||
logos_delivery/waku/waku_filter_v2/common,
|
||||
logos_delivery/waku/waku_core/subscription/push_handler,
|
||||
|
||||
@ -4,7 +4,7 @@ import chronicles, chronos, results, ffi
|
||||
import
|
||||
logos_delivery/waku/waku_core/message/message,
|
||||
logos_delivery/waku/waku_core/codecs,
|
||||
logos_delivery/waku/factory/waku,
|
||||
logos_delivery/waku/waku,
|
||||
logos_delivery/waku/waku_core/message,
|
||||
logos_delivery/waku/waku_core/topics/pubsub_topic,
|
||||
logos_delivery/waku/waku_lightpush_legacy/client,
|
||||
|
||||
@ -3,7 +3,8 @@ import std/[net, sequtils, strutils, json], strformat
|
||||
import chronicles, chronos, stew/byteutils, results, ffi
|
||||
import
|
||||
logos_delivery/waku/waku_core/message/message,
|
||||
logos_delivery/waku/factory/[validator_signed, waku],
|
||||
logos_delivery/waku/factory/validator_signed,
|
||||
logos_delivery/waku/waku,
|
||||
tools/confutils/cli_args,
|
||||
logos_delivery/waku/waku_core/message,
|
||||
logos_delivery/waku/waku_core/topics/pubsub_topic,
|
||||
|
||||
@ -2,7 +2,7 @@ import logos_delivery/waku/compat/option_valueor
|
||||
import std/[json, sugar, strutils, options]
|
||||
import chronos, chronicles, results, stew/byteutils, ffi
|
||||
import
|
||||
logos_delivery/waku/factory/waku,
|
||||
logos_delivery/waku/waku,
|
||||
library/utils,
|
||||
logos_delivery/waku/waku_core/peers,
|
||||
logos_delivery/waku/waku_core/message/digest,
|
||||
|
||||
@ -6,7 +6,7 @@ import
|
||||
logos_delivery/waku/waku_core/topics/pubsub_topic,
|
||||
logos_delivery/waku/waku_relay,
|
||||
logos_delivery,
|
||||
logos_delivery/waku/factory/waku,
|
||||
logos_delivery/waku/waku,
|
||||
logos_delivery/waku/node/waku_node,
|
||||
logos_delivery/waku/node/health_monitor/health_status,
|
||||
../logos_delivery/waku/factory/app_callbacks,
|
||||
|
||||
@ -3,9 +3,9 @@ import chronos, results, ffi
|
||||
import stew/byteutils
|
||||
import
|
||||
logos_delivery/waku/common/base64,
|
||||
logos_delivery/waku/factory/waku,
|
||||
logos_delivery/waku/waku,
|
||||
logos_delivery/waku/waku_core/topics/content_topic,
|
||||
logos_delivery/waku/api/[api, types],
|
||||
logos_delivery/api/types,
|
||||
../declare_lib
|
||||
|
||||
proc logosdelivery_subscribe(
|
||||
@ -20,7 +20,7 @@ proc logosdelivery_subscribe(
|
||||
# ContentTopic is just a string type alias
|
||||
let contentTopic = ContentTopic($contentTopicStr)
|
||||
|
||||
(await api.subscribe(ctx.myLib[].waku, contentTopic)).isOkOr:
|
||||
(await ctx.myLib[].messagingClient.subscribe(contentTopic)).isOkOr:
|
||||
let errMsg = $error
|
||||
return err("Subscribe failed: " & errMsg)
|
||||
|
||||
@ -38,7 +38,7 @@ proc logosdelivery_unsubscribe(
|
||||
# ContentTopic is just a string type alias
|
||||
let contentTopic = ContentTopic($contentTopicStr)
|
||||
|
||||
api.unsubscribe(ctx.myLib[].waku, contentTopic).isOkOr:
|
||||
ctx.myLib[].messagingClient.unsubscribe(contentTopic).isOkOr:
|
||||
let errMsg = $error
|
||||
return err("Unsubscribe failed: " & errMsg)
|
||||
|
||||
|
||||
@ -3,7 +3,8 @@ import chronos, chronicles, results, ffi
|
||||
import
|
||||
logos_delivery,
|
||||
logos_delivery/waku/node/waku_node,
|
||||
logos_delivery/waku/api/[api, types],
|
||||
logos_delivery/waku/events/message_events,
|
||||
logos_delivery/api/types,
|
||||
logos_delivery/waku/events/[message_events, health_events],
|
||||
tools/confutils/conf_from_json,
|
||||
../declare_lib,
|
||||
|
||||
@ -6,7 +6,8 @@ import bearssl/rand, std/times, chronos
|
||||
import stew/byteutils
|
||||
import logos_delivery/waku/utils/requests as request_utils
|
||||
import logos_delivery/waku/waku_core/[topics/content_topic, message/message, time]
|
||||
import logos_delivery/waku/requests/requests
|
||||
|
||||
export content_topic, message
|
||||
|
||||
type
|
||||
MessageEnvelope* = object
|
||||
@ -21,7 +21,7 @@ import bearssl/rand
|
||||
import stew/byteutils
|
||||
import libp2p/crypto/crypto as libp2p_crypto
|
||||
|
||||
import logos_delivery/waku/api/types
|
||||
import logos_delivery/api/types
|
||||
import logos_delivery/messaging/delivery_service/send_service
|
||||
import logos_delivery/waku/waku_core/topics
|
||||
|
||||
@ -135,7 +135,7 @@ proc tryFinalizeChannelReq(self: ReliableChannel, channelReqId: RequestId) =
|
||||
## and the total number of confirmed + failed segments equals the total expected segments.
|
||||
## Therefore, the channel-level request is removed from `self.channelReqs`
|
||||
## and the appropriate final event is emitted.
|
||||
##
|
||||
##
|
||||
let state = self.channelReqs.getOrDefault(channelReqId)
|
||||
if state.totalExpectedSegments == 0:
|
||||
## Either already finalized (and removed) or never inserted.
|
||||
|
||||
@ -15,7 +15,7 @@ import brokers/broker_context
|
||||
|
||||
import logos_delivery/waku/events/message_events as waku_message_events
|
||||
import logos_delivery/messaging/messaging_client
|
||||
import logos_delivery/waku/api/types
|
||||
import logos_delivery/api/types
|
||||
import logos_delivery/waku/waku_core/topics
|
||||
import logos_delivery/waku/persistency/sds_persistency
|
||||
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
## Core identifier types for the Reliable Channel API.
|
||||
|
||||
import std/hashes
|
||||
import logos_delivery/waku/api/types as api_types
|
||||
import logos_delivery/api/types as api_types
|
||||
|
||||
import ./scalable_data_sync/scalable_data_sync
|
||||
|
||||
|
||||
@ -11,9 +11,7 @@
|
||||
|
||||
import results, chronos, chronicles
|
||||
|
||||
import logos_delivery/waku/api
|
||||
export api
|
||||
import logos_delivery/waku/factory/waku
|
||||
import logos_delivery/waku/waku
|
||||
export waku
|
||||
import logos_delivery/messaging/messaging_client
|
||||
export messaging_client
|
||||
@ -22,7 +20,8 @@ export reliable_channel_manager
|
||||
|
||||
import logos_delivery/waku/factory/waku_conf
|
||||
import logos_delivery/waku/factory/app_callbacks
|
||||
import logos_delivery/waku/api/[api_conf, types]
|
||||
import tools/confutils/cli_args
|
||||
import logos_delivery/waku/node/health_monitor/online_monitor
|
||||
|
||||
logScope:
|
||||
topics = "logosdelivery"
|
||||
@ -82,6 +81,13 @@ proc new*(
|
||||
|
||||
proc start*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
|
||||
## Starts each layer bottom-up: transport first, then messaging, then channels.
|
||||
if self.waku.isNil():
|
||||
return err("Waku node is not initialized")
|
||||
if self.messagingClient.isNil():
|
||||
return err("MessagingClient is not initialized")
|
||||
if self.reliableChannelManager.isNil():
|
||||
return err("ReliableChannelManager is not initialized")
|
||||
|
||||
(await self.waku.start()).isOkOr:
|
||||
return err("failed to start Waku: " & error)
|
||||
|
||||
@ -102,3 +108,8 @@ proc stop*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
|
||||
return err("failed to stop Waku: " & error)
|
||||
|
||||
return ok()
|
||||
|
||||
proc isOnline*(self: LogosDelivery): Future[Result[bool, string]] {.async.} =
|
||||
if self.waku.isNil():
|
||||
return err("Waku node is not initialized")
|
||||
return ok(self.waku.healthMonitor.onlineMonitor.amIOnline())
|
||||
|
||||
@ -3,7 +3,7 @@ import std/[options, times], chronos
|
||||
import brokers/broker_context
|
||||
import
|
||||
logos_delivery/waku/waku_core,
|
||||
logos_delivery/waku/api/types,
|
||||
logos_delivery/api/types,
|
||||
logos_delivery/waku/requests/node_requests
|
||||
|
||||
type DeliveryState* {.pure.} = enum
|
||||
|
||||
@ -4,7 +4,7 @@ import chronos, chronicles
|
||||
import brokers/broker_context
|
||||
import logos_delivery/waku/[waku_core], logos_delivery/waku/waku_lightpush/[common, rpc]
|
||||
import logos_delivery/waku/requests/health_requests
|
||||
import logos_delivery/waku/api/types
|
||||
import logos_delivery/api/types
|
||||
import ./[delivery_task, send_processor]
|
||||
|
||||
logScope:
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
import results, chronos
|
||||
import chronicles
|
||||
import
|
||||
logos_delivery/waku/api/types,
|
||||
logos_delivery/api/types,
|
||||
logos_delivery/waku/node/[waku_node, subscription_manager],
|
||||
logos_delivery/messaging/delivery_service/[recv_service, send_service],
|
||||
logos_delivery/messaging/delivery_service/send_service/delivery_task
|
||||
@ -43,6 +43,26 @@ proc stop*(self: MessagingClient) {.async.} =
|
||||
await self.recvService.stopRecvService()
|
||||
self.started = false
|
||||
|
||||
proc checkApiAvailability(self: MessagingClient): Result[void, string] =
|
||||
if self.isNil():
|
||||
return err("MessagingClient is not initialized")
|
||||
|
||||
return ok()
|
||||
|
||||
proc subscribe*(
|
||||
self: MessagingClient, contentTopic: ContentTopic
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
?checkApiAvailability(self)
|
||||
|
||||
return self.node.subscriptionManager.subscribe(contentTopic)
|
||||
|
||||
proc unsubscribe*(
|
||||
self: MessagingClient, contentTopic: ContentTopic
|
||||
): Result[void, string] =
|
||||
?checkApiAvailability(self)
|
||||
|
||||
return self.node.subscriptionManager.unsubscribe(contentTopic)
|
||||
|
||||
proc send*(
|
||||
self: MessagingClient, envelope: MessageEnvelope
|
||||
): Future[Result[RequestId, string]] {.async.} =
|
||||
|
||||
@ -232,4 +232,20 @@ However, they can be used for local testing purposes:
|
||||
mkdir -p ./ssl_dir/
|
||||
openssl req -x509 -newkey rsa:4096 -keyout ./ssl_dir/key.pem -out ./ssl_dir/cert.pem -sha256 -nodes
|
||||
wakunode2 --websocket-secure-support=true --websocket-secure-key-path="./ssl_dir/key.pem" --websocket-secure-cert-path="./ssl_dir/cert.pem"
|
||||
```
|
||||
|
||||
## Enabling QUIC
|
||||
|
||||
QUIC is a UDP-based transport that peers can use to connect to your node.
|
||||
|
||||
The default port for QUIC is 60000.
|
||||
|
||||
```shell
|
||||
wakunode2 --quic-support=true
|
||||
```
|
||||
|
||||
To listen on a different UDP port, use `--quic-port`:
|
||||
|
||||
```shell
|
||||
wakunode2 --quic-support=true --quic-port=<port>
|
||||
```
|
||||
@ -1,5 +0,0 @@
|
||||
import ./api/[api, api_conf]
|
||||
import ./events/message_events
|
||||
import tools/confutils/entry_nodes
|
||||
|
||||
export api, api_conf, entry_nodes, message_events
|
||||
@ -1,48 +0,0 @@
|
||||
import logos_delivery/waku/compat/option_valueor
|
||||
import std/[net, options]
|
||||
|
||||
import chronicles, chronos, libp2p/peerid, results
|
||||
|
||||
import logos_delivery/waku/factory/waku
|
||||
import logos_delivery/waku/[requests/health_requests, waku_core, waku_node]
|
||||
import logos_delivery/waku/node/subscription_manager
|
||||
import libp2p/peerid
|
||||
import tools/confutils/cli_args
|
||||
import ./[api_conf, types]
|
||||
|
||||
export cli_args
|
||||
|
||||
logScope:
|
||||
topics = "api"
|
||||
|
||||
proc createNode*(conf: WakuNodeConf): Future[Result[Waku, string]] {.async.} =
|
||||
let wakuConf = conf.toWakuConf().valueOr:
|
||||
return err("Failed to handle the configuration: " & error)
|
||||
|
||||
## We are not defining app callbacks at node creation
|
||||
let wakuRes = (await Waku.new(wakuConf)).valueOr:
|
||||
error "waku initialization failed", error = error
|
||||
return err("Failed setting up Waku: " & $error)
|
||||
|
||||
return ok(wakuRes)
|
||||
|
||||
proc checkApiAvailability(w: Waku): Result[void, string] =
|
||||
if w.isNil():
|
||||
return err("Waku node is not initialized")
|
||||
|
||||
# TODO: Conciliate request-bouncing health checks here with unit testing.
|
||||
# (For now, better to just allow all sends and rely on retries.)
|
||||
|
||||
return ok()
|
||||
|
||||
proc subscribe*(
|
||||
w: Waku, contentTopic: ContentTopic
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
?checkApiAvailability(w)
|
||||
|
||||
return w.node.subscriptionManager.subscribe(contentTopic)
|
||||
|
||||
proc unsubscribe*(w: Waku, contentTopic: ContentTopic): Result[void, string] =
|
||||
?checkApiAvailability(w)
|
||||
|
||||
return w.node.subscriptionManager.unsubscribe(contentTopic)
|
||||
@ -1,46 +0,0 @@
|
||||
# SEND API
|
||||
|
||||
**THIS IS TO BE REMOVED BEFORE PR MERGE**
|
||||
|
||||
This document collects logic and todo's around the Send API.
|
||||
|
||||
## Overview
|
||||
|
||||
Send api hides the complex logic of using raw protocols for reliable message delivery.
|
||||
The delivery method is chosen based on the node configuration and actual availabilities of peers.
|
||||
|
||||
## Delivery task
|
||||
|
||||
Each message send request is bundled into a task that not just holds the composed message but also the state of the delivery.
|
||||
|
||||
## Delivery methods
|
||||
|
||||
Depending on the configuration and the availability of store client protocol + actual configured and/or discovered store nodes:
|
||||
- P2PReliability validation - checking network store node whether the message is reached at least a store node.
|
||||
- Simple retry until message is propagated to the network
|
||||
- Relay says >0 peers as publish result
|
||||
- LightpushClient returns with success
|
||||
|
||||
Depending on node config:
|
||||
- Relay
|
||||
- Lightpush
|
||||
|
||||
These methods are used in combination to achieve the best reliability.
|
||||
Fallback mechanism is used to switch between methods if the current one fails.
|
||||
|
||||
Relay+StoreCheck -> Relay+simple retry -> Lightpush+StoreCheck -> Lightpush simple retry -> Error
|
||||
|
||||
Combination is dynamically chosen on node configuration. Levels can be skipped depending on actual connectivity.
|
||||
Actual connectivity is checked:
|
||||
- Relay's topic health check - at least dLow peers in the mesh for the topic
|
||||
- Store nodes availability - at least one store service node is available in peer manager
|
||||
- Lightpush client availability - at least one lightpush service node is available in peer manager
|
||||
|
||||
## Delivery processing
|
||||
|
||||
At every send request, each task is tried to be delivered right away.
|
||||
Any further retries and store check is done as a background task in a loop with predefined intervals.
|
||||
Each task is set for a maximum number of retries and/or maximum time to live.
|
||||
|
||||
In each round of store check and retry send tasks are selected based on their state.
|
||||
The state is updated based on the result of the delivery method.
|
||||
@ -1,6 +1,6 @@
|
||||
import brokers/event_broker
|
||||
|
||||
import logos_delivery/waku/api/types
|
||||
import logos_delivery/api/types
|
||||
import logos_delivery/waku/node/health_monitor/[protocol_health, topic_health]
|
||||
import logos_delivery/waku/waku_core/topics
|
||||
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import brokers/event_broker
|
||||
import logos_delivery/waku/[api/types, waku_core/message, waku_core/topics]
|
||||
import logos_delivery/api/types
|
||||
import logos_delivery/waku/[waku_core/message, waku_core/topics]
|
||||
export types
|
||||
|
||||
EventBroker:
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import chronos, results, std/strutils, ../../api/types
|
||||
import chronos, results, std/strutils
|
||||
from logos_delivery/api/types import ConnectionStatus
|
||||
|
||||
export ConnectionStatus
|
||||
|
||||
|
||||
@ -8,10 +8,10 @@ import
|
||||
libp2p/protocols/rendezvous,
|
||||
libp2p/protocols/pubsub,
|
||||
libp2p/protocols/pubsub/rpc/messages,
|
||||
logos_delivery/api/types,
|
||||
logos_delivery/waku/[
|
||||
waku_relay,
|
||||
waku_rln_relay,
|
||||
api/types,
|
||||
events/health_events,
|
||||
events/peer_events,
|
||||
node/waku_node,
|
||||
|
||||
@ -29,6 +29,7 @@ import
|
||||
waku_archive,
|
||||
waku_store_sync,
|
||||
waku_rln_relay,
|
||||
waku_rln_relay/adapters/relay as waku_rln_relay_adapter,
|
||||
node/waku_node,
|
||||
node/subscription_manager,
|
||||
node/peer_manager,
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import brokers/request_broker
|
||||
|
||||
import logos_delivery/waku/api/types
|
||||
import logos_delivery/api/types
|
||||
import
|
||||
logos_delivery/waku/node/health_monitor/[protocol_health, topic_health, health_report]
|
||||
import logos_delivery/waku/waku_core/topics
|
||||
|
||||
@ -4,7 +4,8 @@ import logos_delivery/waku/compat/option_valueor
|
||||
import results
|
||||
import chronicles, json_serialization, json_serialization/std/options
|
||||
import ../serdes
|
||||
import logos_delivery/waku/[waku_node, api/types, node/health_monitor]
|
||||
import logos_delivery/api/types
|
||||
import logos_delivery/waku/[waku_node, node/health_monitor]
|
||||
|
||||
#### Serialization and deserialization
|
||||
|
||||
|
||||
@ -11,6 +11,7 @@ import
|
||||
libp2p/wire,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
libp2p/protocols/ping,
|
||||
libp2p/services/autorelayservice,
|
||||
libp2p/services/hpservice,
|
||||
libp2p/peerid,
|
||||
@ -20,6 +21,7 @@ import
|
||||
metrics,
|
||||
metrics/chronos_httpserver,
|
||||
brokers/broker_context,
|
||||
logos_delivery/api/types,
|
||||
logos_delivery/waku/[
|
||||
waku_core,
|
||||
waku_node,
|
||||
@ -30,7 +32,6 @@ import
|
||||
waku_relay/protocol,
|
||||
waku_enr/sharding,
|
||||
waku_enr/multiaddr,
|
||||
api/types,
|
||||
common/logging,
|
||||
node/peer_manager,
|
||||
node/health_monitor,
|
||||
@ -48,9 +49,13 @@ import
|
||||
factory/internal_config,
|
||||
factory/app_callbacks,
|
||||
persistency/persistency,
|
||||
factory/validator_signed,
|
||||
waku_lightpush/client,
|
||||
waku_lightpush_legacy/client,
|
||||
waku_store/client,
|
||||
],
|
||||
./waku_conf,
|
||||
./waku_state_info
|
||||
./factory/waku_conf,
|
||||
./factory/waku_state_info
|
||||
|
||||
logScope:
|
||||
topics = "wakunode waku"
|
||||
@ -58,6 +63,8 @@ logScope:
|
||||
# Git version in git describe format (defined at compile time)
|
||||
const git_version* {.strdefine.} = "n/a"
|
||||
|
||||
const FilterOpTimeout = 5.seconds
|
||||
|
||||
type Waku* = ref object
|
||||
stateInfo*: WakuStateInfo
|
||||
conf*: WakuConf
|
||||
@ -567,12 +574,418 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
|
||||
|
||||
return ok()
|
||||
|
||||
proc isModeCoreAvailable*(waku: Waku): bool =
|
||||
return not waku.node.wakuRelay.isNil()
|
||||
## Kernel API realization
|
||||
##
|
||||
# --- topic construction ---
|
||||
proc buildContentTopic*(
|
||||
self: Waku, appName: string, appVersion: uint32, name: string, encoding: string
|
||||
): Future[Result[ContentTopic, string]] {.async.} =
|
||||
try:
|
||||
return ok(ContentTopic(fmt"/{appName}/{appVersion}/{name}/{encoding}"))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc isModeEdgeAvailable*(waku: Waku): bool =
|
||||
return
|
||||
waku.node.wakuRelay.isNil() and not waku.node.wakuStoreClient.isNil() and
|
||||
not waku.node.wakuFilterClient.isNil() and not waku.node.wakuLightPushClient.isNil()
|
||||
proc buildPubsubTopic*(
|
||||
self: Waku, topicName: string
|
||||
): Future[Result[PubsubTopic, string]] {.async.} =
|
||||
try:
|
||||
return ok(PubsubTopic(fmt"/waku/2/{topicName}"))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc defaultPubsubTopic*(self: Waku): Future[Result[PubsubTopic, string]] {.async.} =
|
||||
return ok(DefaultPubsubTopic)
|
||||
|
||||
# --- relay ---
|
||||
proc relayPublish*(
|
||||
self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32
|
||||
): Future[Result[int, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayPublish: WakuRelay not mounted")
|
||||
|
||||
let numPeers = (await self.node.wakuRelay.publish(pubsubTopic, message)).valueOr:
|
||||
return err($error)
|
||||
|
||||
return ok(numPeers)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relaySubscribe*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relaySubscribe: WakuRelay not mounted")
|
||||
|
||||
self.node.subscribe(
|
||||
(kind: SubscriptionKind.PubsubSub, topic: pubsubTopic), WakuRelayHandler(nil)
|
||||
).isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayUnsubscribe*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayUnsubscribe: WakuRelay not mounted")
|
||||
|
||||
self.node.unsubscribe((kind: SubscriptionKind.PubsubSub, topic: pubsubTopic)).isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayAddProtectedShard*(
|
||||
self: Waku, clusterId: uint16, shardId: uint16, publicKey: string
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayAddProtectedShard: WakuRelay not mounted")
|
||||
|
||||
let pubKey = SkPublicKey.fromHex(publicKey).valueOr:
|
||||
return err("relayAddProtectedShard: invalid public key: " & $error)
|
||||
|
||||
let protectedShard = ProtectedShard(shard: shardId, key: pubKey)
|
||||
self.node.wakuRelay.addSignedShardsValidator(@[protectedShard], clusterId)
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayConnectedPeers*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayConnectedPeers: WakuRelay not mounted")
|
||||
|
||||
let connPeers = self.node.wakuRelay.getConnectedPeers(pubsubTopic).valueOr:
|
||||
return err($error)
|
||||
|
||||
return ok(connPeers.mapIt($it))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayPeersInMesh*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayPeersInMesh: WakuRelay not mounted")
|
||||
|
||||
let meshPeers = self.node.wakuRelay.getPeersInMesh(pubsubTopic).valueOr:
|
||||
return err($error)
|
||||
|
||||
return ok(meshPeers.mapIt($it))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
# --- filter ---
|
||||
proc filterSubscribe*(
|
||||
self: Waku,
|
||||
pubsubTopic: Option[PubsubTopic],
|
||||
contentTopics: seq[ContentTopic],
|
||||
peer: string,
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuFilterClient.isNil():
|
||||
return err("wakuFilterClient is not mounted")
|
||||
|
||||
let subFut = self.node.filterSubscribe(pubsubTopic, contentTopics, peer)
|
||||
if not await subFut.withTimeout(FilterOpTimeout):
|
||||
return err("filter subscription timed out")
|
||||
subFut.read().isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc filterUnsubscribe*(
|
||||
self: Waku,
|
||||
pubsubTopic: Option[PubsubTopic],
|
||||
contentTopics: seq[ContentTopic],
|
||||
peer: string,
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuFilterClient.isNil():
|
||||
return err("wakuFilterClient is not mounted")
|
||||
|
||||
let unsubFut = self.node.filterUnsubscribe(pubsubTopic, contentTopics, peer)
|
||||
if not await unsubFut.withTimeout(FilterOpTimeout):
|
||||
return err("filter un-subscription timed out")
|
||||
unsubFut.read().isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc filterUnsubscribeAll*(
|
||||
self: Waku, peer: string
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuFilterClient.isNil():
|
||||
return err("wakuFilterClient is not mounted")
|
||||
|
||||
let unsubFut = self.node.filterUnsubscribeAll(peer)
|
||||
if not await unsubFut.withTimeout(FilterOpTimeout):
|
||||
return err("filter un-subscription all timed out")
|
||||
unsubFut.read().isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
# --- lightpush ---
|
||||
proc lightpushPublish*(
|
||||
self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, peer: string
|
||||
): Future[Result[string, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuLegacyLightpushClient.isNil():
|
||||
return err("wakuLegacyLightpushClient is not mounted")
|
||||
|
||||
let remotePeer = parsePeerInfo(peer).valueOr:
|
||||
return err("lightpushPublish failed to parse peer addr: " & $error)
|
||||
|
||||
let msgHashHex = (
|
||||
await self.node.wakuLegacyLightpushClient.publish(
|
||||
pubsubTopic, message, remotePeer
|
||||
)
|
||||
).valueOr:
|
||||
return err($error)
|
||||
|
||||
return ok(msgHashHex)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
# --- store ---
|
||||
proc storeQuery*(
|
||||
self: Waku, request: StoreQueryRequest, peer: string, timeoutMs: int
|
||||
): Future[Result[StoreQueryResponse, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuStoreClient.isNil():
|
||||
return err("wakuStoreClient is not mounted")
|
||||
|
||||
let remotePeer = parsePeerInfo(peer).valueOr:
|
||||
return err("storeQuery failed to parse peer addr: " & $error)
|
||||
|
||||
let queryFut = self.node.wakuStoreClient.query(request, remotePeer)
|
||||
if not await queryFut.withTimeout(timeoutMs.milliseconds):
|
||||
return err("storeQuery timed out")
|
||||
|
||||
let queryResponse = queryFut.read().valueOr:
|
||||
return err("storeQuery failed: " & $error)
|
||||
|
||||
return ok(queryResponse)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
# --- peer management ---
|
||||
proc connect*(
|
||||
self: Waku, peers: seq[string], timeoutMs: uint32
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
await self.node.connectToNodes(peers.mapIt(strip(it)), source = "static")
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc disconnectPeerById*(
|
||||
self: Waku, peerId: string
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
let pId = PeerId.init(peerId).valueOr:
|
||||
return err($error)
|
||||
await self.node.peerManager.disconnectNode(pId)
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc disconnectAllPeers*(self: Waku): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
await self.node.peerManager.disconnectAllPeers()
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc dialPeer*(
|
||||
self: Waku, peerAddr: string, protocol: string, timeoutMs: int
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
let remotePeerInfo = parsePeerInfo(peerAddr).valueOr:
|
||||
return err($error)
|
||||
let conn = await self.node.peerManager.dialPeer(remotePeerInfo, protocol)
|
||||
if conn.isNone():
|
||||
return err("failed dialing peer")
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc dialPeerById*(
|
||||
self: Waku, peerId: string, protocol: string, timeoutMs: int
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
let pId = PeerId.init(peerId).valueOr:
|
||||
return err($error)
|
||||
let conn = await self.node.peerManager.dialPeer(pId, protocol)
|
||||
if conn.isNone():
|
||||
return err("failed dialing peer")
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc peerIdsFromPeerstore*(self: Waku): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
return ok(self.node.peerManager.switch.peerStore.peers().mapIt($it.peerId))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc connectedPeersInfo*(self: Waku): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
return ok(
|
||||
self.node.peerManager.switch.peerStore
|
||||
.peers()
|
||||
.filterIt(it.connectedness == Connected)
|
||||
.mapIt($it.peerId)
|
||||
)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc connectedPeers*(self: Waku): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
let (inPeerIds, outPeerIds) = self.node.peerManager.connectedPeers()
|
||||
return ok(concat(inPeerIds, outPeerIds).mapIt($it))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc peerIdsByProtocol*(
|
||||
self: Waku, protocol: string
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
return ok(
|
||||
self.node.peerManager.switch.peerStore
|
||||
.peers(protocol)
|
||||
.filterIt(it.connectedness == Connected)
|
||||
.mapIt($it.peerId)
|
||||
)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
# --- discovery ---
|
||||
proc dnsDiscovery*(
|
||||
self: Waku, enrTreeUrl: string, nameServer: string, timeoutMs: int
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
let dnsNameServers = @[parseIpAddress(nameServer)]
|
||||
let discoveredPeers = (
|
||||
await retrieveDynamicBootstrapNodes(enrTreeUrl, dnsNameServers)
|
||||
).valueOr:
|
||||
return err("failed discovering peers from DNS: " & $error)
|
||||
|
||||
var multiAddresses = newSeq[string]()
|
||||
for discPeer in discoveredPeers:
|
||||
for address in discPeer.addrs:
|
||||
multiAddresses.add($address & "/p2p/" & $discPeer)
|
||||
|
||||
return ok(multiAddresses)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc discv5UpdateBootnodes*(
|
||||
self: Waku, bootnodes: seq[string]
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.wakuDiscv5.isNil():
|
||||
return err("discv5 not started")
|
||||
let jsonArray = "[" & bootnodes.mapIt("\"" & it & "\"").join(",") & "]"
|
||||
self.wakuDiscv5.updateBootstrapRecords(jsonArray).isOkOr:
|
||||
return err("error in discv5UpdateBootnodes: " & $error)
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc startDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.wakuDiscv5.isNil():
|
||||
return err("discv5 not started")
|
||||
(await self.wakuDiscv5.start()).isOkOr:
|
||||
return err("error starting discv5: " & $error)
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc stopDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.wakuDiscv5.isNil():
|
||||
return err("discv5 not started")
|
||||
await self.wakuDiscv5.stop()
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc peerExchangeRequest*(
|
||||
self: Waku, numPeers: uint64
|
||||
): Future[Result[int, string]] {.async.} =
|
||||
try:
|
||||
let numPeersRecv = (await self.node.fetchPeerExchangePeers(numPeers)).valueOr:
|
||||
return err("failed peer exchange: " & $error)
|
||||
return ok(numPeersRecv)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
# --- debug / info ---
|
||||
proc version*(self: Waku): Future[Result[string, string]] {.async.} =
|
||||
return ok(WakuNodeVersionString)
|
||||
|
||||
proc listenAddresses*(self: Waku): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
return ok(self.node.info().listenAddresses)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc myEnr*(self: Waku): Future[Result[string, string]] {.async.} =
|
||||
try:
|
||||
return ok(self.node.enr.toURI())
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc myPeerId*(self: Waku): Future[Result[string, string]] {.async.} =
|
||||
try:
|
||||
return ok($self.node.peerId())
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc metrics*(self: Waku): Future[Result[string, string]] {.async.} =
|
||||
{.gcsafe.}:
|
||||
try:
|
||||
return ok(defaultRegistry.toText())
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc pingPeer*(
|
||||
self: Waku, peerAddr: string, timeoutMs: int
|
||||
): Future[Result[int64, string]] {.async.} =
|
||||
try:
|
||||
let peerInfo = parsePeerInfo(peerAddr).valueOr:
|
||||
return err("pingPeer failed to parse peer addr: " & $error)
|
||||
|
||||
let conn = await self.node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec)
|
||||
defer:
|
||||
await conn.close()
|
||||
let pingRTT = await self.node.libp2pPing.ping(conn)
|
||||
|
||||
if pingRTT == 0.nanos:
|
||||
return err("could not ping peer: rtt-0")
|
||||
|
||||
return ok(pingRTT.nanos)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
{.pop.}
|
||||
78
logos_delivery/waku/waku_rln_relay/adapters/relay.nim
Normal file
78
logos_delivery/waku/waku_rln_relay/adapters/relay.nim
Normal file
@ -0,0 +1,78 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/options,
|
||||
chronicles,
|
||||
chronos,
|
||||
results,
|
||||
stew/byteutils,
|
||||
libp2p/protocols/pubsub/pubsub
|
||||
|
||||
import ../rln_relay, ../protocol_types, ../protocol_metrics, ../conversion_utils
|
||||
|
||||
import logos_delivery/waku/[waku_relay, waku_core]
|
||||
|
||||
logScope:
|
||||
topics = "waku rln_relay adapter"
|
||||
|
||||
proc generateRlnValidator*(
|
||||
wakuRlnRelay: WakuRLNRelay, spamHandler = none(SpamHandler)
|
||||
): WakuValidatorHandler =
|
||||
## Bridges RLN's protocol-agnostic message validation into a relay
|
||||
## (gossipsub) validator. The core decision is made by
|
||||
## `validateMessageAndUpdateLog`; this adapter maps the result to
|
||||
## `pubsub.ValidationResult` so the validator can be installed on
|
||||
## WakuRelay's validator chain.
|
||||
## Validation logic follows https://rfc.vac.dev/spec/17/
|
||||
proc validator(
|
||||
topic: string, message: WakuMessage
|
||||
): Future[pubsub.ValidationResult] {.async.} =
|
||||
trace "rln-relay topic validator is called"
|
||||
wakuRlnRelay.clearNullifierLog()
|
||||
|
||||
let msgProof = RateLimitProof.init(message.proof).valueOr:
|
||||
trace "generateRlnValidator reject", error = error
|
||||
return pubsub.ValidationResult.Reject
|
||||
|
||||
# validate the message and update log
|
||||
let validationRes = await wakuRlnRelay.validateMessageAndUpdateLog(message)
|
||||
|
||||
let
|
||||
proof = byteutils.toHex(msgProof.proof)
|
||||
epoch = fromEpoch(msgProof.epoch)
|
||||
root = inHex(msgProof.merkleRoot)
|
||||
shareX = inHex(msgProof.shareX)
|
||||
shareY = inHex(msgProof.shareY)
|
||||
nullifier = inHex(msgProof.nullifier)
|
||||
payload = string.fromBytes(message.payload)
|
||||
case validationRes
|
||||
of Valid:
|
||||
trace "message validity is verified, relaying",
|
||||
proof = proof,
|
||||
root = root,
|
||||
shareX = shareX,
|
||||
shareY = shareY,
|
||||
nullifier = nullifier
|
||||
waku_rln_valid_messages_total.inc(labelValues = [topic])
|
||||
return pubsub.ValidationResult.Accept
|
||||
of Invalid:
|
||||
trace "message validity could not be verified, discarding",
|
||||
proof = proof,
|
||||
root = root,
|
||||
shareX = shareX,
|
||||
shareY = shareY,
|
||||
nullifier = nullifier
|
||||
return pubsub.ValidationResult.Reject
|
||||
of Spam:
|
||||
trace "A spam message is found! yay! discarding:",
|
||||
proof = proof,
|
||||
root = root,
|
||||
shareX = shareX,
|
||||
shareY = shareY,
|
||||
nullifier = nullifier
|
||||
if spamHandler.isSome():
|
||||
let handler = spamHandler.get()
|
||||
handler(message)
|
||||
return pubsub.ValidationResult.Reject
|
||||
|
||||
return validator
|
||||
@ -12,7 +12,6 @@ import
|
||||
web3/eth_api_types,
|
||||
eth/keys,
|
||||
libp2p/protocols/pubsub/rpc/messages,
|
||||
libp2p/protocols/pubsub/pubsub,
|
||||
results,
|
||||
stew/[byteutils, arrayops],
|
||||
brokers/broker_context
|
||||
@ -27,13 +26,8 @@ import
|
||||
./nonce_manager
|
||||
|
||||
import
|
||||
logos_delivery/waku/[
|
||||
common/error_handling,
|
||||
waku_relay, # for WakuRelayHandler
|
||||
waku_core,
|
||||
requests/rln_requests,
|
||||
waku_keystore,
|
||||
]
|
||||
logos_delivery/waku/
|
||||
[common/error_handling, waku_core, requests/rln_requests, waku_keystore]
|
||||
|
||||
logScope:
|
||||
topics = "waku rln_relay"
|
||||
@ -321,65 +315,6 @@ proc clearNullifierLog*(rlnPeer: WakuRlnRelay) =
|
||||
currentEpoch = currentEpoch, cleanedEpoch = fromEpoch(epochRemove)
|
||||
rlnPeer.nullifierLog.del(epochRemove)
|
||||
|
||||
proc generateRlnValidator*(
|
||||
wakuRlnRelay: WakuRLNRelay, spamHandler = none(SpamHandler)
|
||||
): WakuValidatorHandler =
|
||||
## this procedure is a thin wrapper for the pubsub addValidator method
|
||||
## it sets a validator for waku messages, acting in the registered pubsub topic
|
||||
## the message validation logic is according to https://rfc.vac.dev/spec/17/
|
||||
proc validator(
|
||||
topic: string, message: WakuMessage
|
||||
): Future[pubsub.ValidationResult] {.async.} =
|
||||
trace "rln-relay topic validator is called"
|
||||
wakuRlnRelay.clearNullifierLog()
|
||||
|
||||
let msgProof = RateLimitProof.init(message.proof).valueOr:
|
||||
trace "generateRlnValidator reject", error = error
|
||||
return pubsub.ValidationResult.Reject
|
||||
|
||||
# validate the message and update log
|
||||
let validationRes = await wakuRlnRelay.validateMessageAndUpdateLog(message)
|
||||
|
||||
let
|
||||
proof = byteutils.toHex(msgProof.proof)
|
||||
epoch = fromEpoch(msgProof.epoch)
|
||||
root = inHex(msgProof.merkleRoot)
|
||||
shareX = inHex(msgProof.shareX)
|
||||
shareY = inHex(msgProof.shareY)
|
||||
nullifier = inHex(msgProof.nullifier)
|
||||
payload = string.fromBytes(message.payload)
|
||||
case validationRes
|
||||
of Valid:
|
||||
trace "message validity is verified, relaying",
|
||||
proof = proof,
|
||||
root = root,
|
||||
shareX = shareX,
|
||||
shareY = shareY,
|
||||
nullifier = nullifier
|
||||
waku_rln_valid_messages_total.inc(labelValues = [topic])
|
||||
return pubsub.ValidationResult.Accept
|
||||
of Invalid:
|
||||
trace "message validity could not be verified, discarding",
|
||||
proof = proof,
|
||||
root = root,
|
||||
shareX = shareX,
|
||||
shareY = shareY,
|
||||
nullifier = nullifier
|
||||
return pubsub.ValidationResult.Reject
|
||||
of Spam:
|
||||
trace "A spam message is found! yay! discarding:",
|
||||
proof = proof,
|
||||
root = root,
|
||||
shareX = shareX,
|
||||
shareY = shareY,
|
||||
nullifier = nullifier
|
||||
if spamHandler.isSome():
|
||||
let handler = spamHandler.get()
|
||||
handler(message)
|
||||
return pubsub.ValidationResult.Reject
|
||||
|
||||
return validator
|
||||
|
||||
proc monitorEpochs(wakuRlnRelay: WakuRLNRelay) {.async.} =
|
||||
while true:
|
||||
try:
|
||||
|
||||
@ -187,7 +187,7 @@ proc setupNetwork(testTopic: ContentTopic): Future[TestNetwork] {.async.} =
|
||||
raiseAssert "Message was not archived in time"
|
||||
|
||||
# subscribe to the content topic; with no peers yet the subscriber stays offline
|
||||
(await subscriber.waku.subscribe(testTopic)).expect("Failed to subscribe")
|
||||
(await subscriber.messagingClient.subscribe(testTopic)).expect("Failed to subscribe")
|
||||
|
||||
return TestNetwork(
|
||||
storeNode: storeNode,
|
||||
|
||||
@ -355,22 +355,20 @@ suite "Waku API - Send":
|
||||
asyncTest "Edge sender delivers via lightpush (no relay)":
|
||||
## Reproduces issue #3847: an Edge node (no relay mounted) that is only
|
||||
## connected to a lightpush-capable peer must deliver through lightpush.
|
||||
var node: Waku
|
||||
var node: LogosDelivery
|
||||
lockNewGlobalBrokerContext:
|
||||
node = (await createNode(createApiNodeConf(cli_args.WakuMode.Edge))).valueOr:
|
||||
node = (await LogosDelivery.new(createApiNodeConf(cli_args.WakuMode.Edge))).valueOr:
|
||||
raiseAssert error
|
||||
node.mountMessagingClient().isOkOr:
|
||||
raiseAssert "Failed to mount messaging: " & error
|
||||
(await node.start()).isOkOr:
|
||||
raiseAssert "Failed to start Waku node: " & error
|
||||
|
||||
# Edge node has no relay; its only path to the network is the
|
||||
# lightpush peer it is connected to.
|
||||
await node.node.connectToNodes(@[lightpushNodePeerInfo])
|
||||
await node.waku.node.connectToNodes(@[lightpushNodePeerInfo])
|
||||
|
||||
check node.node.wakuRelay.isNil()
|
||||
check node.waku.node.wakuRelay.isNil()
|
||||
|
||||
let eventManager = newSendEventListenerManager(node.brokerCtx)
|
||||
let eventManager = newSendEventListenerManager(node.waku.brokerCtx)
|
||||
defer:
|
||||
await eventManager.teardown()
|
||||
|
||||
@ -378,7 +376,7 @@ suite "Waku API - Send":
|
||||
ContentTopic("/waku/2/default-content/proto"), "test payload"
|
||||
)
|
||||
|
||||
let requestId = (await node.send(envelope)).valueOr:
|
||||
let requestId = (await node.messagingClient.send(envelope)).valueOr:
|
||||
raiseAssert error
|
||||
|
||||
const eventTimeout = 10.seconds
|
||||
@ -389,6 +387,46 @@ suite "Waku API - Send":
|
||||
(await node.stop()).isOkOr:
|
||||
raiseAssert "Failed to stop node: " & error
|
||||
|
||||
asyncTest "S16 - isolated sender recovers when lightpush peer appears later":
|
||||
## Edge sender starts with no peer; a lightpush peer appears mid-send and a
|
||||
## later retry must deliver the queued message.
|
||||
var node: LogosDelivery
|
||||
lockNewGlobalBrokerContext:
|
||||
node = (await LogosDelivery.new(createApiNodeConf(cli_args.WakuMode.Edge))).valueOr:
|
||||
raiseAssert error
|
||||
(await node.start()).isOkOr:
|
||||
raiseAssert "Failed to start Waku node: " & error
|
||||
# No connectToNodes: the sender has no reachable peer at T0.
|
||||
|
||||
check node.waku.node.wakuRelay.isNil()
|
||||
|
||||
let eventManager = newSendEventListenerManager(node.waku.brokerCtx)
|
||||
defer:
|
||||
await eventManager.teardown()
|
||||
|
||||
let envelope = MessageEnvelope.init(
|
||||
ContentTopic("/waku/2/default-content/proto"), "test payload"
|
||||
)
|
||||
|
||||
# send() with no peer must still succeed; the message is queued and retried.
|
||||
let requestId = (await node.messagingClient.send(envelope)).valueOr:
|
||||
raiseAssert error
|
||||
|
||||
# Nothing should propagate while isolated (retry interval is 1s).
|
||||
await sleepAsync(5.seconds)
|
||||
check eventManager.propagatedCount == 0
|
||||
|
||||
# The lightpush peer appears; a later retry must now deliver.
|
||||
await node.waku.node.connectToNodes(@[lightpushNodePeerInfo])
|
||||
|
||||
const eventTimeout = 10.seconds
|
||||
discard await eventManager.waitForEvents(eventTimeout)
|
||||
|
||||
eventManager.validate({SendEventOutcome.Propagated}, requestId)
|
||||
|
||||
(await node.stop()).isOkOr:
|
||||
raiseAssert "Failed to stop node: " & error
|
||||
|
||||
asyncTest "Send fully validates fallback to lightpush":
|
||||
var node: LogosDelivery
|
||||
lockNewGlobalBrokerContext:
|
||||
@ -497,19 +535,19 @@ suite "Waku API - Send":
|
||||
# it can answer store queries but never holds the published message.
|
||||
let isolatedStoreNodePeerInfo = isolatedStoreNode.peerInfo.toRemotePeerInfo()
|
||||
|
||||
var node: Waku
|
||||
var node: LogosDelivery
|
||||
lockNewGlobalBrokerContext:
|
||||
node = (await createNode(createApiNodeConf())).valueOr:
|
||||
node = (await LogosDelivery.new(createApiNodeConf())).valueOr:
|
||||
raiseAssert error
|
||||
node.mountMessagingClient().isOkOr:
|
||||
raiseAssert "Failed to mount messaging: " & error
|
||||
(await node.start()).isOkOr:
|
||||
raiseAssert "Failed to start Waku node: " & error
|
||||
|
||||
# Propagate via relayNode1; store queries can only reach the isolated store node.
|
||||
await node.node.connectToNodes(@[relayNode1PeerInfo, isolatedStoreNodePeerInfo])
|
||||
await node.waku.node.connectToNodes(
|
||||
@[relayNode1PeerInfo, isolatedStoreNodePeerInfo]
|
||||
)
|
||||
|
||||
let eventManager = newSendEventListenerManager(node.brokerCtx)
|
||||
let eventManager = newSendEventListenerManager(node.waku.brokerCtx)
|
||||
defer:
|
||||
await eventManager.teardown()
|
||||
|
||||
@ -517,7 +555,7 @@ suite "Waku API - Send":
|
||||
ContentTopic("/waku/2/default-content/proto"), "test payload"
|
||||
)
|
||||
|
||||
let requestId = (await node.send(envelope)).valueOr:
|
||||
let requestId = (await node.messagingClient.send(envelope)).valueOr:
|
||||
raiseAssert error
|
||||
|
||||
# Must outlive MaxTimeInCache (1 min) so the store-validation timeout drop fires.
|
||||
|
||||
@ -225,7 +225,7 @@ suite "Messaging API, SubscriptionManager":
|
||||
await net.teardown()
|
||||
|
||||
let testTopic = ContentTopic("/waku/2/test-content/proto")
|
||||
(await net.subscriber.waku.subscribe(testTopic)).expect(
|
||||
(await net.subscriber.messagingClient.subscribe(testTopic)).expect(
|
||||
"subscriberNode failed to subscribe"
|
||||
)
|
||||
|
||||
@ -248,7 +248,9 @@ suite "Messaging API, SubscriptionManager":
|
||||
|
||||
let subbedTopic = ContentTopic("/waku/2/subbed-topic/proto")
|
||||
let ignoredTopic = ContentTopic("/waku/2/ignored-topic/proto")
|
||||
(await net.subscriber.waku.subscribe(subbedTopic)).expect("failed to subscribe")
|
||||
(await net.subscriber.messagingClient.subscribe(subbedTopic)).expect(
|
||||
"failed to subscribe"
|
||||
)
|
||||
|
||||
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
||||
defer:
|
||||
@ -268,8 +270,12 @@ suite "Messaging API, SubscriptionManager":
|
||||
|
||||
let testTopic = ContentTopic("/waku/2/unsub-test/proto")
|
||||
|
||||
(await net.subscriber.waku.subscribe(testTopic)).expect("failed to subscribe")
|
||||
net.subscriber.waku.unsubscribe(testTopic).expect("failed to unsubscribe")
|
||||
(await net.subscriber.messagingClient.subscribe(testTopic)).expect(
|
||||
"failed to subscribe"
|
||||
)
|
||||
net.subscriber.messagingClient.unsubscribe(testTopic).expect(
|
||||
"failed to unsubscribe"
|
||||
)
|
||||
|
||||
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
||||
defer:
|
||||
@ -289,14 +295,14 @@ suite "Messaging API, SubscriptionManager":
|
||||
|
||||
let topicA = ContentTopic("/waku/2/topic-a/proto")
|
||||
let topicB = ContentTopic("/waku/2/topic-b/proto")
|
||||
(await net.subscriber.waku.subscribe(topicA)).expect("failed to sub A")
|
||||
(await net.subscriber.waku.subscribe(topicB)).expect("failed to sub B")
|
||||
(await net.subscriber.messagingClient.subscribe(topicA)).expect("failed to sub A")
|
||||
(await net.subscriber.messagingClient.subscribe(topicB)).expect("failed to sub B")
|
||||
|
||||
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
||||
defer:
|
||||
await eventManager.teardown()
|
||||
|
||||
net.subscriber.waku.unsubscribe(topicA).expect("failed to unsub A")
|
||||
net.subscriber.messagingClient.unsubscribe(topicA).expect("failed to unsub A")
|
||||
|
||||
discard (await net.publishToMesh(topicA, "Dropped Message".toBytes())).expect(
|
||||
"Publish A failed"
|
||||
@ -315,9 +321,13 @@ suite "Messaging API, SubscriptionManager":
|
||||
|
||||
let glitchTopic = ContentTopic("/waku/2/glitch/proto")
|
||||
|
||||
(await net.subscriber.waku.subscribe(glitchTopic)).expect("failed to sub")
|
||||
(await net.subscriber.waku.subscribe(glitchTopic)).expect("failed to double sub")
|
||||
net.subscriber.waku.unsubscribe(glitchTopic).expect("failed to unsub")
|
||||
(await net.subscriber.messagingClient.subscribe(glitchTopic)).expect(
|
||||
"failed to sub"
|
||||
)
|
||||
(await net.subscriber.messagingClient.subscribe(glitchTopic)).expect(
|
||||
"failed to double sub"
|
||||
)
|
||||
net.subscriber.messagingClient.unsubscribe(glitchTopic).expect("failed to unsub")
|
||||
|
||||
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
||||
defer:
|
||||
@ -338,7 +348,9 @@ suite "Messaging API, SubscriptionManager":
|
||||
let testTopic = ContentTopic("/waku/2/resub-test/proto")
|
||||
|
||||
# Subscribe
|
||||
(await net.subscriber.waku.subscribe(testTopic)).expect("Initial sub failed")
|
||||
(await net.subscriber.messagingClient.subscribe(testTopic)).expect(
|
||||
"Initial sub failed"
|
||||
)
|
||||
|
||||
var eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
||||
discard
|
||||
@ -348,7 +360,7 @@ suite "Messaging API, SubscriptionManager":
|
||||
await eventManager.teardown()
|
||||
|
||||
# Unsubscribe and verify teardown
|
||||
net.subscriber.waku.unsubscribe(testTopic).expect("Unsub failed")
|
||||
net.subscriber.messagingClient.unsubscribe(testTopic).expect("Unsub failed")
|
||||
eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
||||
|
||||
discard
|
||||
@ -358,7 +370,7 @@ suite "Messaging API, SubscriptionManager":
|
||||
await eventManager.teardown()
|
||||
|
||||
# Resubscribe
|
||||
(await net.subscriber.waku.subscribe(testTopic)).expect("Resub failed")
|
||||
(await net.subscriber.messagingClient.subscribe(testTopic)).expect("Resub failed")
|
||||
eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
||||
|
||||
discard
|
||||
@ -382,8 +394,8 @@ suite "Messaging API, SubscriptionManager":
|
||||
topicB = ContentTopic("/appB" & $i & "/2/shard-test-b/proto")
|
||||
inc i
|
||||
|
||||
(await net.subscriber.waku.subscribe(topicA)).expect("failed to sub A")
|
||||
(await net.subscriber.waku.subscribe(topicB)).expect("failed to sub B")
|
||||
(await net.subscriber.messagingClient.subscribe(topicA)).expect("failed to sub A")
|
||||
(await net.subscriber.messagingClient.subscribe(topicB)).expect("failed to sub B")
|
||||
|
||||
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 2)
|
||||
defer:
|
||||
@ -440,7 +452,7 @@ suite "Messaging API, SubscriptionManager":
|
||||
|
||||
# subscribe to all content topics we generated
|
||||
for t in allTopics:
|
||||
(await net.subscriber.waku.subscribe(t)).expect("sub failed")
|
||||
(await net.subscriber.messagingClient.subscribe(t)).expect("sub failed")
|
||||
activeSubs.add(t)
|
||||
|
||||
await verifyNetworkState(activeSubs)
|
||||
@ -448,7 +460,7 @@ suite "Messaging API, SubscriptionManager":
|
||||
# unsubscribe from some content topics
|
||||
for i in 0 ..< 50:
|
||||
let t = allTopics[i]
|
||||
net.subscriber.waku.unsubscribe(t).expect("unsub failed")
|
||||
net.subscriber.messagingClient.unsubscribe(t).expect("unsub failed")
|
||||
|
||||
let idx = activeSubs.find(t)
|
||||
if idx >= 0:
|
||||
@ -459,7 +471,7 @@ suite "Messaging API, SubscriptionManager":
|
||||
# re-subscribe to some content topics
|
||||
for i in 0 ..< 25:
|
||||
let t = allTopics[i]
|
||||
(await net.subscriber.waku.subscribe(t)).expect("resub failed")
|
||||
(await net.subscriber.messagingClient.subscribe(t)).expect("resub failed")
|
||||
activeSubs.add(t)
|
||||
|
||||
await verifyNetworkState(activeSubs)
|
||||
@ -470,7 +482,9 @@ suite "Messaging API, SubscriptionManager":
|
||||
await net.teardown()
|
||||
|
||||
let testTopic = ContentTopic("/waku/2/test-content/proto")
|
||||
(await net.subscriber.waku.subscribe(testTopic)).expect("failed to subscribe")
|
||||
(await net.subscriber.messagingClient.subscribe(testTopic)).expect(
|
||||
"failed to subscribe"
|
||||
)
|
||||
|
||||
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
||||
defer:
|
||||
@ -491,7 +505,9 @@ suite "Messaging API, SubscriptionManager":
|
||||
|
||||
let subbedTopic = ContentTopic("/waku/2/subbed-topic/proto")
|
||||
let ignoredTopic = ContentTopic("/waku/2/ignored-topic/proto")
|
||||
(await net.subscriber.waku.subscribe(subbedTopic)).expect("failed to subscribe")
|
||||
(await net.subscriber.messagingClient.subscribe(subbedTopic)).expect(
|
||||
"failed to subscribe"
|
||||
)
|
||||
|
||||
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
||||
defer:
|
||||
@ -511,8 +527,12 @@ suite "Messaging API, SubscriptionManager":
|
||||
|
||||
let testTopic = ContentTopic("/waku/2/unsub-test/proto")
|
||||
|
||||
(await net.subscriber.waku.subscribe(testTopic)).expect("failed to subscribe")
|
||||
net.subscriber.waku.unsubscribe(testTopic).expect("failed to unsubscribe")
|
||||
(await net.subscriber.messagingClient.subscribe(testTopic)).expect(
|
||||
"failed to subscribe"
|
||||
)
|
||||
net.subscriber.messagingClient.unsubscribe(testTopic).expect(
|
||||
"failed to unsubscribe"
|
||||
)
|
||||
|
||||
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
||||
defer:
|
||||
@ -532,8 +552,8 @@ suite "Messaging API, SubscriptionManager":
|
||||
|
||||
let topicA = ContentTopic("/waku/2/topic-a/proto")
|
||||
let topicB = ContentTopic("/waku/2/topic-b/proto")
|
||||
(await net.subscriber.waku.subscribe(topicA)).expect("failed to sub A")
|
||||
(await net.subscriber.waku.subscribe(topicB)).expect("failed to sub B")
|
||||
(await net.subscriber.messagingClient.subscribe(topicA)).expect("failed to sub A")
|
||||
(await net.subscriber.messagingClient.subscribe(topicB)).expect("failed to sub B")
|
||||
|
||||
let shard = net.subscriber.waku.node.getRelayShard(topicA)
|
||||
await waitForEdgeSubs(net.subscriber, shard)
|
||||
@ -542,7 +562,7 @@ suite "Messaging API, SubscriptionManager":
|
||||
defer:
|
||||
await eventManager.teardown()
|
||||
|
||||
net.subscriber.waku.unsubscribe(topicA).expect("failed to unsub A")
|
||||
net.subscriber.messagingClient.unsubscribe(topicA).expect("failed to unsub A")
|
||||
|
||||
discard (await net.publishToMesh(topicA, "Dropped Message".toBytes())).expect(
|
||||
"Publish A failed"
|
||||
@ -561,7 +581,9 @@ suite "Messaging API, SubscriptionManager":
|
||||
|
||||
let testTopic = ContentTopic("/waku/2/resub-test/proto")
|
||||
|
||||
(await net.subscriber.waku.subscribe(testTopic)).expect("Initial sub failed")
|
||||
(await net.subscriber.messagingClient.subscribe(testTopic)).expect(
|
||||
"Initial sub failed"
|
||||
)
|
||||
|
||||
var eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
||||
discard (await net.publishToMeshAfterEdgeReady(testTopic, "Msg 1".toBytes())).expect(
|
||||
@ -571,7 +593,7 @@ suite "Messaging API, SubscriptionManager":
|
||||
require await eventManager.waitForEvents(TestTimeout)
|
||||
await eventManager.teardown()
|
||||
|
||||
net.subscriber.waku.unsubscribe(testTopic).expect("Unsub failed")
|
||||
net.subscriber.messagingClient.unsubscribe(testTopic).expect("Unsub failed")
|
||||
eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
||||
|
||||
discard
|
||||
@ -580,7 +602,7 @@ suite "Messaging API, SubscriptionManager":
|
||||
check not await eventManager.waitForEvents(NegativeTestTimeout)
|
||||
await eventManager.teardown()
|
||||
|
||||
(await net.subscriber.waku.subscribe(testTopic)).expect("Resub failed")
|
||||
(await net.subscriber.messagingClient.subscribe(testTopic)).expect("Resub failed")
|
||||
eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
||||
|
||||
discard (await net.publishToMeshAfterEdgeReady(testTopic, "Msg 2".toBytes())).expect(
|
||||
@ -653,7 +675,9 @@ suite "Messaging API, SubscriptionManager":
|
||||
let testTopic = ContentTopic("/waku/2/failover-test/proto")
|
||||
let shard = subscriber.waku.node.getRelayShard(testTopic)
|
||||
|
||||
(await subscriber.waku.subscribe(testTopic)).expect("Failed to subscribe")
|
||||
(await subscriber.messagingClient.subscribe(testTopic)).expect(
|
||||
"Failed to subscribe"
|
||||
)
|
||||
|
||||
# Wait for dialing both filter servers (HealthyThreshold = 2)
|
||||
check await edgePeersReached(subscriber, shard, 2)
|
||||
@ -783,7 +807,9 @@ suite "Messaging API, SubscriptionManager":
|
||||
let testTopic = ContentTopic("/waku/2/replacement-test/proto")
|
||||
let shard = subscriber.waku.node.getRelayShard(testTopic)
|
||||
|
||||
(await subscriber.waku.subscribe(testTopic)).expect("Failed to subscribe")
|
||||
(await subscriber.messagingClient.subscribe(testTopic)).expect(
|
||||
"Failed to subscribe"
|
||||
)
|
||||
|
||||
# Wait for 2 confirmed peers (HealthyThreshold). The 3rd is available but not dialed.
|
||||
check await edgePeersReached(subscriber, shard, 2)
|
||||
|
||||
@ -5,7 +5,7 @@ import json_serialization, confutils, confutils/std/net
|
||||
import
|
||||
tools/confutils/cli_args,
|
||||
tools/confutils/conf_from_json,
|
||||
logos_delivery/waku/api/api_conf,
|
||||
logos_delivery/api/api_conf,
|
||||
logos_delivery/waku/factory/waku_conf,
|
||||
logos_delivery/waku/factory/networks_config,
|
||||
logos_delivery/waku/factory/conf_builder/conf_builder,
|
||||
@ -350,7 +350,7 @@ suite "WakuNodeConf JSON -> WakuConf integration":
|
||||
|
||||
{.push warning[Deprecated]: off.}
|
||||
|
||||
import logos_delivery/waku/api/api_conf
|
||||
import logos_delivery/api/api_conf
|
||||
|
||||
suite "NodeConfig (deprecated) - toWakuConf":
|
||||
test "Minimal configuration":
|
||||
|
||||
@ -58,7 +58,7 @@ suite "Reliable Channel - ingress":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
## Noop encryption providers so the Encrypt/Decrypt brokers have
|
||||
@ -124,7 +124,7 @@ suite "Reliable Channel - ingress":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -181,7 +181,7 @@ suite "Reliable Channel - send state machine":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -246,7 +246,7 @@ suite "Reliable Channel - send state machine":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -347,7 +347,7 @@ suite "Reliable Channel - send state machine":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -452,7 +452,7 @@ suite "Reliable Channel - SDS persistence":
|
||||
var waku: LogosDelivery
|
||||
var manager: ReliableChannelManager
|
||||
lockNewGlobalBrokerContext:
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -522,7 +522,7 @@ suite "Reliable Channel - SDS lifecycle":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -593,7 +593,7 @@ suite "Reliable Channel - SDS lifecycle":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -650,7 +650,7 @@ suite "Reliable Channel - SDS lifecycle":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -710,7 +710,7 @@ suite "Reliable Channel - SDS lifecycle":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -792,7 +792,7 @@ suite "Reliable Channel - SDS protocol semantics":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -860,7 +860,7 @@ suite "Reliable Channel - SDS protocol semantics":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -948,7 +948,7 @@ suite "Reliable Channel - SDS protocol semantics":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -1023,7 +1023,7 @@ suite "Reliable Channel - SDS protocol semantics":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -1096,7 +1096,7 @@ suite "Reliable Channel - SDS protocol semantics":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -1162,7 +1162,7 @@ suite "Reliable Channel - SDS protocol semantics":
|
||||
var waku: LogosDelivery
|
||||
var manager: ReliableChannelManager
|
||||
lockNewGlobalBrokerContext:
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
check (await manager.send(ChannelId("no-such-channel"), "x".toBytes())).isErr()
|
||||
|
||||
@ -9,7 +9,7 @@ import tools/confutils/cli_args
|
||||
import logos_delivery/waku/factory/networks_config
|
||||
import logos_delivery/waku/factory/conf_builder/conf_builder
|
||||
|
||||
suite "Waku API - Create node":
|
||||
suite "LogosDelivery API - Create node":
|
||||
asyncTest "Create node with minimal configuration":
|
||||
## Given
|
||||
var nodeConf = defaultWakuNodeConf().valueOr:
|
||||
@ -21,14 +21,14 @@ suite "Waku API - Create node":
|
||||
# This is the actual minimal config but as the node auto-start, it is not suitable for tests
|
||||
|
||||
## When
|
||||
let node = (await createNode(nodeConf)).valueOr:
|
||||
raiseAssert "createNode (minimal config) failed: " & error
|
||||
let ld = (await LogosDelivery.new(nodeConf)).valueOr:
|
||||
raiseAssert "LogosDelivery.new (minimal config) failed: " & error
|
||||
|
||||
## Then
|
||||
check:
|
||||
not node.isNil()
|
||||
node.conf.clusterId == 3
|
||||
node.conf.relay == true
|
||||
not ld.isNil()
|
||||
ld.waku.conf.clusterId == 3
|
||||
ld.waku.conf.relay == true
|
||||
|
||||
asyncTest "Create node with full configuration":
|
||||
## Given
|
||||
@ -47,20 +47,20 @@ suite "Waku API - Create node":
|
||||
]
|
||||
|
||||
## When
|
||||
let node = (await createNode(nodeConf)).valueOr:
|
||||
raiseAssert "createNode (full config) failed: " & error
|
||||
let ld = (await LogosDelivery.new(nodeConf)).valueOr:
|
||||
raiseAssert "LogosDelivery.new (full config) failed: " & error
|
||||
|
||||
## Then
|
||||
check:
|
||||
not node.isNil()
|
||||
node.conf.clusterId == 99
|
||||
node.conf.shardingConf.numShardsInCluster == 16
|
||||
node.conf.maxMessageSizeBytes == 1024'u64 * 1024'u64
|
||||
node.conf.staticNodes.len == 1
|
||||
node.conf.relay == true
|
||||
node.conf.lightPush == true
|
||||
node.conf.peerExchangeService == true
|
||||
node.conf.rendezvous == true
|
||||
not ld.isNil()
|
||||
ld.waku.conf.clusterId == 99
|
||||
ld.waku.conf.shardingConf.numShardsInCluster == 16
|
||||
ld.waku.conf.maxMessageSizeBytes == 1024'u64 * 1024'u64
|
||||
ld.waku.conf.staticNodes.len == 1
|
||||
ld.waku.conf.relay == true
|
||||
ld.waku.conf.lightPush == true
|
||||
ld.waku.conf.peerExchangeService == true
|
||||
ld.waku.conf.rendezvous == true
|
||||
|
||||
asyncTest "Create node with mixed entry nodes (enrtree, multiaddr)":
|
||||
## Given
|
||||
@ -75,18 +75,18 @@ suite "Waku API - Create node":
|
||||
]
|
||||
|
||||
## When
|
||||
let node = (await createNode(nodeConf)).valueOr:
|
||||
raiseAssert "createNode (mixed entry nodes) failed: " & error
|
||||
let ld = (await LogosDelivery.new(nodeConf)).valueOr:
|
||||
raiseAssert "LogosDelivery.new (mixed entry nodes) failed: " & error
|
||||
|
||||
## Then
|
||||
check:
|
||||
not node.isNil()
|
||||
node.conf.clusterId == 42
|
||||
not ld.isNil()
|
||||
ld.waku.conf.clusterId == 42
|
||||
# ENRTree should go to DNS discovery
|
||||
node.conf.dnsDiscoveryConf.isSome()
|
||||
node.conf.dnsDiscoveryConf.get().enrTreeUrl ==
|
||||
ld.waku.conf.dnsDiscoveryConf.isSome()
|
||||
ld.waku.conf.dnsDiscoveryConf.get().enrTreeUrl ==
|
||||
"enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im"
|
||||
# Multiaddr should go to static nodes
|
||||
node.conf.staticNodes.len == 1
|
||||
node.conf.staticNodes[0] ==
|
||||
ld.waku.conf.staticNodes.len == 1
|
||||
ld.waku.conf.staticNodes[0] ==
|
||||
"/ip4/127.0.0.1/tcp/60000/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"
|
||||
|
||||
@ -21,7 +21,7 @@ import
|
||||
discovery/waku_discv5,
|
||||
waku_enr/capabilities,
|
||||
factory/conf_builder/conf_builder,
|
||||
factory/waku,
|
||||
waku,
|
||||
waku_node,
|
||||
node/peer_manager,
|
||||
],
|
||||
|
||||
@ -10,7 +10,7 @@ import
|
||||
tests/testlib/[wakucore, wakunode],
|
||||
logos_delivery/waku/factory/conf_builder/conf_builder
|
||||
|
||||
include logos_delivery/waku/factory/waku, logos_delivery/waku/common/enr/typed_record
|
||||
include logos_delivery/waku/waku, logos_delivery/waku/common/enr/typed_record
|
||||
|
||||
suite "Wakunode2 - Waku":
|
||||
test "compilation version should be reported":
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user