mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-06-26 11:29:28 +00:00
Merge branch 'master' into chore/separate-rln-from-relay-phase1
This commit is contained in:
commit
731dd43e74
@ -14,7 +14,7 @@ import
|
||||
logos_delivery/waku/[
|
||||
common/enr,
|
||||
common/logging,
|
||||
factory/waku as waku_factory,
|
||||
waku as waku_factory,
|
||||
waku_node,
|
||||
node/waku_metrics,
|
||||
node/peer_manager,
|
||||
|
||||
@ -11,7 +11,7 @@ import
|
||||
../../tools/[rln_keystore_generator/rln_keystore_generator, confutils/cli_args],
|
||||
logos_delivery/waku/[
|
||||
common/logging,
|
||||
factory/waku,
|
||||
waku,
|
||||
node/health_monitor,
|
||||
rest_api/endpoint/builder as rest_server_builder,
|
||||
waku_core/message/default_values,
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import tools/confutils/cli_args
|
||||
import logos_delivery/waku/[common/logging, factory/[waku, networks_config]]
|
||||
import logos_delivery/waku/[common/logging, waku, factory/networks_config]
|
||||
import
|
||||
std/[options, strutils, os, sequtils],
|
||||
chronicles,
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
|
||||
import system, std/json
|
||||
import ./json_base_event
|
||||
import ../../logos_delivery/waku/api/types
|
||||
import ../../logos_delivery/api/types
|
||||
|
||||
type JsonConnectionStatusChangeEvent* = ref object of JsonEvent
|
||||
status*: ConnectionStatus
|
||||
|
||||
@ -9,7 +9,7 @@ import
|
||||
metrics,
|
||||
ffi
|
||||
import
|
||||
logos_delivery/waku/factory/waku,
|
||||
logos_delivery/waku/waku,
|
||||
logos_delivery/waku/node/waku_node,
|
||||
logos_delivery/waku/node/health_monitor,
|
||||
library/declare_lib
|
||||
|
||||
@ -2,7 +2,7 @@ import logos_delivery/waku/compat/option_valueor
|
||||
import std/json
|
||||
import chronos, chronicles, results, strutils, libp2p/multiaddress, ffi
|
||||
import
|
||||
logos_delivery/waku/factory/waku,
|
||||
logos_delivery/waku/waku,
|
||||
logos_delivery/waku/discovery/waku_dnsdisc,
|
||||
logos_delivery/waku/discovery/waku_discv5,
|
||||
logos_delivery/waku/waku_core/peers,
|
||||
|
||||
@ -5,7 +5,7 @@ import chronos, chronicles, results, confutils, confutils/std/net, ffi
|
||||
import
|
||||
logos_delivery/waku/node/peer_manager/peer_manager,
|
||||
tools/confutils/cli_args,
|
||||
logos_delivery/waku/factory/waku,
|
||||
logos_delivery/waku/waku,
|
||||
logos_delivery/waku/factory/node_factory,
|
||||
logos_delivery/waku/factory/app_callbacks,
|
||||
logos_delivery/waku/rest_api/endpoint/builder,
|
||||
|
||||
@ -2,7 +2,7 @@ import logos_delivery/waku/compat/option_valueor
|
||||
import std/[sequtils, strutils, tables]
|
||||
import chronicles, chronos, results, options, json, ffi
|
||||
import
|
||||
logos_delivery/waku/factory/waku,
|
||||
logos_delivery/waku/waku,
|
||||
logos_delivery/waku/node/waku_node,
|
||||
logos_delivery/waku/node/peer_manager,
|
||||
library/declare_lib
|
||||
|
||||
@ -1,9 +1,7 @@
|
||||
import std/[json, strutils]
|
||||
import chronos, results, ffi
|
||||
import libp2p/[protocols/ping, switch, multiaddress, multicodec]
|
||||
import
|
||||
logos_delivery/waku/[factory/waku, waku_core/peers, node/waku_node],
|
||||
library/declare_lib
|
||||
import logos_delivery/waku/[waku, waku_core/peers, node/waku_node], library/declare_lib
|
||||
|
||||
proc waku_ping_peer(
|
||||
ctx: ptr FFIContext[LogosDelivery],
|
||||
|
||||
@ -4,7 +4,7 @@ import chronicles, chronos, results, ffi
|
||||
import
|
||||
logos_delivery/waku/waku_filter_v2/client,
|
||||
logos_delivery/waku/waku_core/message/message,
|
||||
logos_delivery/waku/factory/waku,
|
||||
logos_delivery/waku/waku,
|
||||
logos_delivery/waku/waku_relay,
|
||||
logos_delivery/waku/waku_filter_v2/common,
|
||||
logos_delivery/waku/waku_core/subscription/push_handler,
|
||||
|
||||
@ -4,7 +4,7 @@ import chronicles, chronos, results, ffi
|
||||
import
|
||||
logos_delivery/waku/waku_core/message/message,
|
||||
logos_delivery/waku/waku_core/codecs,
|
||||
logos_delivery/waku/factory/waku,
|
||||
logos_delivery/waku/waku,
|
||||
logos_delivery/waku/waku_core/message,
|
||||
logos_delivery/waku/waku_core/topics/pubsub_topic,
|
||||
logos_delivery/waku/waku_lightpush_legacy/client,
|
||||
|
||||
@ -3,7 +3,8 @@ import std/[net, sequtils, strutils, json], strformat
|
||||
import chronicles, chronos, stew/byteutils, results, ffi
|
||||
import
|
||||
logos_delivery/waku/waku_core/message/message,
|
||||
logos_delivery/waku/factory/[validator_signed, waku],
|
||||
logos_delivery/waku/factory/validator_signed,
|
||||
logos_delivery/waku/waku,
|
||||
tools/confutils/cli_args,
|
||||
logos_delivery/waku/waku_core/message,
|
||||
logos_delivery/waku/waku_core/topics/pubsub_topic,
|
||||
|
||||
@ -2,7 +2,7 @@ import logos_delivery/waku/compat/option_valueor
|
||||
import std/[json, sugar, strutils, options]
|
||||
import chronos, chronicles, results, stew/byteutils, ffi
|
||||
import
|
||||
logos_delivery/waku/factory/waku,
|
||||
logos_delivery/waku/waku,
|
||||
library/utils,
|
||||
logos_delivery/waku/waku_core/peers,
|
||||
logos_delivery/waku/waku_core/message/digest,
|
||||
|
||||
@ -6,7 +6,7 @@ import
|
||||
logos_delivery/waku/waku_core/topics/pubsub_topic,
|
||||
logos_delivery/waku/waku_relay,
|
||||
logos_delivery,
|
||||
logos_delivery/waku/factory/waku,
|
||||
logos_delivery/waku/waku,
|
||||
logos_delivery/waku/node/waku_node,
|
||||
logos_delivery/waku/node/health_monitor/health_status,
|
||||
../logos_delivery/waku/factory/app_callbacks,
|
||||
|
||||
@ -3,9 +3,9 @@ import chronos, results, ffi
|
||||
import stew/byteutils
|
||||
import
|
||||
logos_delivery/waku/common/base64,
|
||||
logos_delivery/waku/factory/waku,
|
||||
logos_delivery/waku/waku,
|
||||
logos_delivery/waku/waku_core/topics/content_topic,
|
||||
logos_delivery/waku/api/[api, types],
|
||||
logos_delivery/api/types,
|
||||
../declare_lib
|
||||
|
||||
proc logosdelivery_subscribe(
|
||||
@ -20,7 +20,7 @@ proc logosdelivery_subscribe(
|
||||
# ContentTopic is just a string type alias
|
||||
let contentTopic = ContentTopic($contentTopicStr)
|
||||
|
||||
(await api.subscribe(ctx.myLib[].waku, contentTopic)).isOkOr:
|
||||
(await ctx.myLib[].messagingClient.subscribe(contentTopic)).isOkOr:
|
||||
let errMsg = $error
|
||||
return err("Subscribe failed: " & errMsg)
|
||||
|
||||
@ -38,7 +38,7 @@ proc logosdelivery_unsubscribe(
|
||||
# ContentTopic is just a string type alias
|
||||
let contentTopic = ContentTopic($contentTopicStr)
|
||||
|
||||
api.unsubscribe(ctx.myLib[].waku, contentTopic).isOkOr:
|
||||
ctx.myLib[].messagingClient.unsubscribe(contentTopic).isOkOr:
|
||||
let errMsg = $error
|
||||
return err("Unsubscribe failed: " & errMsg)
|
||||
|
||||
|
||||
@ -3,7 +3,8 @@ import chronos, chronicles, results, ffi
|
||||
import
|
||||
logos_delivery,
|
||||
logos_delivery/waku/node/waku_node,
|
||||
logos_delivery/waku/api/[api, types],
|
||||
logos_delivery/waku/events/message_events,
|
||||
logos_delivery/api/types,
|
||||
logos_delivery/waku/events/[message_events, health_events],
|
||||
tools/confutils/conf_from_json,
|
||||
../declare_lib,
|
||||
|
||||
@ -6,7 +6,8 @@ import bearssl/rand, std/times, chronos
|
||||
import stew/byteutils
|
||||
import logos_delivery/waku/utils/requests as request_utils
|
||||
import logos_delivery/waku/waku_core/[topics/content_topic, message/message, time]
|
||||
import logos_delivery/waku/requests/requests
|
||||
|
||||
export content_topic, message
|
||||
|
||||
type
|
||||
MessageEnvelope* = object
|
||||
@ -21,7 +21,7 @@ import bearssl/rand
|
||||
import stew/byteutils
|
||||
import libp2p/crypto/crypto as libp2p_crypto
|
||||
|
||||
import logos_delivery/waku/api/types
|
||||
import logos_delivery/api/types
|
||||
import logos_delivery/messaging/delivery_service/send_service
|
||||
import logos_delivery/waku/waku_core/topics
|
||||
|
||||
@ -135,7 +135,7 @@ proc tryFinalizeChannelReq(self: ReliableChannel, channelReqId: RequestId) =
|
||||
## and the total number of confirmed + failed segments equals the total expected segments.
|
||||
## Therefore, the channel-level request is removed from `self.channelReqs`
|
||||
## and the appropriate final event is emitted.
|
||||
##
|
||||
##
|
||||
let state = self.channelReqs.getOrDefault(channelReqId)
|
||||
if state.totalExpectedSegments == 0:
|
||||
## Either already finalized (and removed) or never inserted.
|
||||
|
||||
@ -15,7 +15,7 @@ import brokers/broker_context
|
||||
|
||||
import logos_delivery/waku/events/message_events as waku_message_events
|
||||
import logos_delivery/messaging/messaging_client
|
||||
import logos_delivery/waku/api/types
|
||||
import logos_delivery/api/types
|
||||
import logos_delivery/waku/waku_core/topics
|
||||
import logos_delivery/waku/persistency/sds_persistency
|
||||
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
## Core identifier types for the Reliable Channel API.
|
||||
|
||||
import std/hashes
|
||||
import logos_delivery/waku/api/types as api_types
|
||||
import logos_delivery/api/types as api_types
|
||||
|
||||
import ./scalable_data_sync/scalable_data_sync
|
||||
|
||||
|
||||
@ -11,9 +11,7 @@
|
||||
|
||||
import results, chronos, chronicles
|
||||
|
||||
import logos_delivery/waku/api
|
||||
export api
|
||||
import logos_delivery/waku/factory/waku
|
||||
import logos_delivery/waku/waku
|
||||
export waku
|
||||
import logos_delivery/messaging/messaging_client
|
||||
export messaging_client
|
||||
@ -22,7 +20,8 @@ export reliable_channel_manager
|
||||
|
||||
import logos_delivery/waku/factory/waku_conf
|
||||
import logos_delivery/waku/factory/app_callbacks
|
||||
import logos_delivery/waku/api/[api_conf, types]
|
||||
import tools/confutils/cli_args
|
||||
import logos_delivery/waku/node/health_monitor/online_monitor
|
||||
|
||||
logScope:
|
||||
topics = "logosdelivery"
|
||||
@ -82,6 +81,13 @@ proc new*(
|
||||
|
||||
proc start*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
|
||||
## Starts each layer bottom-up: transport first, then messaging, then channels.
|
||||
if self.waku.isNil():
|
||||
return err("Waku node is not initialized")
|
||||
if self.messagingClient.isNil():
|
||||
return err("MessagingClient is not initialized")
|
||||
if self.reliableChannelManager.isNil():
|
||||
return err("ReliableChannelManager is not initialized")
|
||||
|
||||
(await self.waku.start()).isOkOr:
|
||||
return err("failed to start Waku: " & error)
|
||||
|
||||
@ -102,3 +108,8 @@ proc stop*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
|
||||
return err("failed to stop Waku: " & error)
|
||||
|
||||
return ok()
|
||||
|
||||
proc isOnline*(self: LogosDelivery): Future[Result[bool, string]] {.async.} =
|
||||
if self.waku.isNil():
|
||||
return err("Waku node is not initialized")
|
||||
return ok(self.waku.healthMonitor.onlineMonitor.amIOnline())
|
||||
|
||||
@ -3,7 +3,7 @@ import std/[options, times], chronos
|
||||
import brokers/broker_context
|
||||
import
|
||||
logos_delivery/waku/waku_core,
|
||||
logos_delivery/waku/api/types,
|
||||
logos_delivery/api/types,
|
||||
logos_delivery/waku/requests/node_requests
|
||||
|
||||
type DeliveryState* {.pure.} = enum
|
||||
|
||||
@ -4,7 +4,7 @@ import chronos, chronicles
|
||||
import brokers/broker_context
|
||||
import logos_delivery/waku/[waku_core], logos_delivery/waku/waku_lightpush/[common, rpc]
|
||||
import logos_delivery/waku/requests/health_requests
|
||||
import logos_delivery/waku/api/types
|
||||
import logos_delivery/api/types
|
||||
import ./[delivery_task, send_processor]
|
||||
|
||||
logScope:
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
import results, chronos
|
||||
import chronicles
|
||||
import
|
||||
logos_delivery/waku/api/types,
|
||||
logos_delivery/api/types,
|
||||
logos_delivery/waku/node/[waku_node, subscription_manager],
|
||||
logos_delivery/messaging/delivery_service/[recv_service, send_service],
|
||||
logos_delivery/messaging/delivery_service/send_service/delivery_task
|
||||
@ -43,6 +43,26 @@ proc stop*(self: MessagingClient) {.async.} =
|
||||
await self.recvService.stopRecvService()
|
||||
self.started = false
|
||||
|
||||
proc checkApiAvailability(self: MessagingClient): Result[void, string] =
|
||||
if self.isNil():
|
||||
return err("MessagingClient is not initialized")
|
||||
|
||||
return ok()
|
||||
|
||||
proc subscribe*(
|
||||
self: MessagingClient, contentTopic: ContentTopic
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
?checkApiAvailability(self)
|
||||
|
||||
return self.node.subscriptionManager.subscribe(contentTopic)
|
||||
|
||||
proc unsubscribe*(
|
||||
self: MessagingClient, contentTopic: ContentTopic
|
||||
): Result[void, string] =
|
||||
?checkApiAvailability(self)
|
||||
|
||||
return self.node.subscriptionManager.unsubscribe(contentTopic)
|
||||
|
||||
proc send*(
|
||||
self: MessagingClient, envelope: MessageEnvelope
|
||||
): Future[Result[RequestId, string]] {.async.} =
|
||||
|
||||
@ -1,5 +0,0 @@
|
||||
import ./api/[api, api_conf]
|
||||
import ./events/message_events
|
||||
import tools/confutils/entry_nodes
|
||||
|
||||
export api, api_conf, entry_nodes, message_events
|
||||
@ -1,48 +0,0 @@
|
||||
import logos_delivery/waku/compat/option_valueor
|
||||
import std/[net, options]
|
||||
|
||||
import chronicles, chronos, libp2p/peerid, results
|
||||
|
||||
import logos_delivery/waku/factory/waku
|
||||
import logos_delivery/waku/[requests/health_requests, waku_core, waku_node]
|
||||
import logos_delivery/waku/node/subscription_manager
|
||||
import libp2p/peerid
|
||||
import tools/confutils/cli_args
|
||||
import ./[api_conf, types]
|
||||
|
||||
export cli_args
|
||||
|
||||
logScope:
|
||||
topics = "api"
|
||||
|
||||
proc createNode*(conf: WakuNodeConf): Future[Result[Waku, string]] {.async.} =
|
||||
let wakuConf = conf.toWakuConf().valueOr:
|
||||
return err("Failed to handle the configuration: " & error)
|
||||
|
||||
## We are not defining app callbacks at node creation
|
||||
let wakuRes = (await Waku.new(wakuConf)).valueOr:
|
||||
error "waku initialization failed", error = error
|
||||
return err("Failed setting up Waku: " & $error)
|
||||
|
||||
return ok(wakuRes)
|
||||
|
||||
proc checkApiAvailability(w: Waku): Result[void, string] =
|
||||
if w.isNil():
|
||||
return err("Waku node is not initialized")
|
||||
|
||||
# TODO: Conciliate request-bouncing health checks here with unit testing.
|
||||
# (For now, better to just allow all sends and rely on retries.)
|
||||
|
||||
return ok()
|
||||
|
||||
proc subscribe*(
|
||||
w: Waku, contentTopic: ContentTopic
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
?checkApiAvailability(w)
|
||||
|
||||
return w.node.subscriptionManager.subscribe(contentTopic)
|
||||
|
||||
proc unsubscribe*(w: Waku, contentTopic: ContentTopic): Result[void, string] =
|
||||
?checkApiAvailability(w)
|
||||
|
||||
return w.node.subscriptionManager.unsubscribe(contentTopic)
|
||||
@ -1,46 +0,0 @@
|
||||
# SEND API
|
||||
|
||||
**THIS IS TO BE REMOVED BEFORE PR MERGE**
|
||||
|
||||
This document collects logic and todo's around the Send API.
|
||||
|
||||
## Overview
|
||||
|
||||
Send api hides the complex logic of using raw protocols for reliable message delivery.
|
||||
The delivery method is chosen based on the node configuration and actual availabilities of peers.
|
||||
|
||||
## Delivery task
|
||||
|
||||
Each message send request is bundled into a task that not just holds the composed message but also the state of the delivery.
|
||||
|
||||
## Delivery methods
|
||||
|
||||
Depending on the configuration and the availability of store client protocol + actual configured and/or discovered store nodes:
|
||||
- P2PReliability validation - checking network store node whether the message is reached at least a store node.
|
||||
- Simple retry until message is propagated to the network
|
||||
- Relay says >0 peers as publish result
|
||||
- LightpushClient returns with success
|
||||
|
||||
Depending on node config:
|
||||
- Relay
|
||||
- Lightpush
|
||||
|
||||
These methods are used in combination to achieve the best reliability.
|
||||
Fallback mechanism is used to switch between methods if the current one fails.
|
||||
|
||||
Relay+StoreCheck -> Relay+simple retry -> Lightpush+StoreCheck -> Lightpush simple retry -> Error
|
||||
|
||||
Combination is dynamically chosen on node configuration. Levels can be skipped depending on actual connectivity.
|
||||
Actual connectivity is checked:
|
||||
- Relay's topic health check - at least dLow peers in the mesh for the topic
|
||||
- Store nodes availability - at least one store service node is available in peer manager
|
||||
- Lightpush client availability - at least one lightpush service node is available in peer manager
|
||||
|
||||
## Delivery processing
|
||||
|
||||
At every send request, each task is tried to be delivered right away.
|
||||
Any further retries and store check is done as a background task in a loop with predefined intervals.
|
||||
Each task is set for a maximum number of retries and/or maximum time to live.
|
||||
|
||||
In each round of store check and retry send tasks are selected based on their state.
|
||||
The state is updated based on the result of the delivery method.
|
||||
@ -1,6 +1,6 @@
|
||||
import brokers/event_broker
|
||||
|
||||
import logos_delivery/waku/api/types
|
||||
import logos_delivery/api/types
|
||||
import logos_delivery/waku/node/health_monitor/[protocol_health, topic_health]
|
||||
import logos_delivery/waku/waku_core/topics
|
||||
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import brokers/event_broker
|
||||
import logos_delivery/waku/[api/types, waku_core/message, waku_core/topics]
|
||||
import logos_delivery/api/types
|
||||
import logos_delivery/waku/[waku_core/message, waku_core/topics]
|
||||
export types
|
||||
|
||||
EventBroker:
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import chronos, results, std/strutils, ../../api/types
|
||||
import chronos, results, std/strutils
|
||||
from logos_delivery/api/types import ConnectionStatus
|
||||
|
||||
export ConnectionStatus
|
||||
|
||||
|
||||
@ -8,10 +8,10 @@ import
|
||||
libp2p/protocols/rendezvous,
|
||||
libp2p/protocols/pubsub,
|
||||
libp2p/protocols/pubsub/rpc/messages,
|
||||
logos_delivery/api/types,
|
||||
logos_delivery/waku/[
|
||||
waku_relay,
|
||||
waku_rln_relay,
|
||||
api/types,
|
||||
events/health_events,
|
||||
events/peer_events,
|
||||
node/waku_node,
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import brokers/request_broker
|
||||
|
||||
import logos_delivery/waku/api/types
|
||||
import logos_delivery/api/types
|
||||
import
|
||||
logos_delivery/waku/node/health_monitor/[protocol_health, topic_health, health_report]
|
||||
import logos_delivery/waku/waku_core/topics
|
||||
|
||||
@ -4,7 +4,8 @@ import logos_delivery/waku/compat/option_valueor
|
||||
import results
|
||||
import chronicles, json_serialization, json_serialization/std/options
|
||||
import ../serdes
|
||||
import logos_delivery/waku/[waku_node, api/types, node/health_monitor]
|
||||
import logos_delivery/api/types
|
||||
import logos_delivery/waku/[waku_node, node/health_monitor]
|
||||
|
||||
#### Serialization and deserialization
|
||||
|
||||
|
||||
@ -11,6 +11,7 @@ import
|
||||
libp2p/wire,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
libp2p/protocols/ping,
|
||||
libp2p/services/autorelayservice,
|
||||
libp2p/services/hpservice,
|
||||
libp2p/peerid,
|
||||
@ -20,6 +21,7 @@ import
|
||||
metrics,
|
||||
metrics/chronos_httpserver,
|
||||
brokers/broker_context,
|
||||
logos_delivery/api/types,
|
||||
logos_delivery/waku/[
|
||||
waku_core,
|
||||
waku_node,
|
||||
@ -30,7 +32,6 @@ import
|
||||
waku_relay/protocol,
|
||||
waku_enr/sharding,
|
||||
waku_enr/multiaddr,
|
||||
api/types,
|
||||
common/logging,
|
||||
node/peer_manager,
|
||||
node/health_monitor,
|
||||
@ -48,9 +49,13 @@ import
|
||||
factory/internal_config,
|
||||
factory/app_callbacks,
|
||||
persistency/persistency,
|
||||
factory/validator_signed,
|
||||
waku_lightpush/client,
|
||||
waku_lightpush_legacy/client,
|
||||
waku_store/client,
|
||||
],
|
||||
./waku_conf,
|
||||
./waku_state_info
|
||||
./factory/waku_conf,
|
||||
./factory/waku_state_info
|
||||
|
||||
logScope:
|
||||
topics = "wakunode waku"
|
||||
@ -58,6 +63,8 @@ logScope:
|
||||
# Git version in git describe format (defined at compile time)
|
||||
const git_version* {.strdefine.} = "n/a"
|
||||
|
||||
const FilterOpTimeout = 5.seconds
|
||||
|
||||
type Waku* = ref object
|
||||
stateInfo*: WakuStateInfo
|
||||
conf*: WakuConf
|
||||
@ -567,12 +574,418 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
|
||||
|
||||
return ok()
|
||||
|
||||
proc isModeCoreAvailable*(waku: Waku): bool =
|
||||
return not waku.node.wakuRelay.isNil()
|
||||
## Kernel API realization
|
||||
##
|
||||
# --- topic construction ---
|
||||
proc buildContentTopic*(
|
||||
self: Waku, appName: string, appVersion: uint32, name: string, encoding: string
|
||||
): Future[Result[ContentTopic, string]] {.async.} =
|
||||
try:
|
||||
return ok(ContentTopic(fmt"/{appName}/{appVersion}/{name}/{encoding}"))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc isModeEdgeAvailable*(waku: Waku): bool =
|
||||
return
|
||||
waku.node.wakuRelay.isNil() and not waku.node.wakuStoreClient.isNil() and
|
||||
not waku.node.wakuFilterClient.isNil() and not waku.node.wakuLightPushClient.isNil()
|
||||
proc buildPubsubTopic*(
|
||||
self: Waku, topicName: string
|
||||
): Future[Result[PubsubTopic, string]] {.async.} =
|
||||
try:
|
||||
return ok(PubsubTopic(fmt"/waku/2/{topicName}"))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc defaultPubsubTopic*(self: Waku): Future[Result[PubsubTopic, string]] {.async.} =
|
||||
return ok(DefaultPubsubTopic)
|
||||
|
||||
# --- relay ---
|
||||
proc relayPublish*(
|
||||
self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32
|
||||
): Future[Result[int, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayPublish: WakuRelay not mounted")
|
||||
|
||||
let numPeers = (await self.node.wakuRelay.publish(pubsubTopic, message)).valueOr:
|
||||
return err($error)
|
||||
|
||||
return ok(numPeers)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relaySubscribe*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relaySubscribe: WakuRelay not mounted")
|
||||
|
||||
self.node.subscribe(
|
||||
(kind: SubscriptionKind.PubsubSub, topic: pubsubTopic), WakuRelayHandler(nil)
|
||||
).isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayUnsubscribe*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayUnsubscribe: WakuRelay not mounted")
|
||||
|
||||
self.node.unsubscribe((kind: SubscriptionKind.PubsubSub, topic: pubsubTopic)).isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayAddProtectedShard*(
|
||||
self: Waku, clusterId: uint16, shardId: uint16, publicKey: string
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayAddProtectedShard: WakuRelay not mounted")
|
||||
|
||||
let pubKey = SkPublicKey.fromHex(publicKey).valueOr:
|
||||
return err("relayAddProtectedShard: invalid public key: " & $error)
|
||||
|
||||
let protectedShard = ProtectedShard(shard: shardId, key: pubKey)
|
||||
self.node.wakuRelay.addSignedShardsValidator(@[protectedShard], clusterId)
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayConnectedPeers*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayConnectedPeers: WakuRelay not mounted")
|
||||
|
||||
let connPeers = self.node.wakuRelay.getConnectedPeers(pubsubTopic).valueOr:
|
||||
return err($error)
|
||||
|
||||
return ok(connPeers.mapIt($it))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayPeersInMesh*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayPeersInMesh: WakuRelay not mounted")
|
||||
|
||||
let meshPeers = self.node.wakuRelay.getPeersInMesh(pubsubTopic).valueOr:
|
||||
return err($error)
|
||||
|
||||
return ok(meshPeers.mapIt($it))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
# --- filter ---
|
||||
proc filterSubscribe*(
|
||||
self: Waku,
|
||||
pubsubTopic: Option[PubsubTopic],
|
||||
contentTopics: seq[ContentTopic],
|
||||
peer: string,
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuFilterClient.isNil():
|
||||
return err("wakuFilterClient is not mounted")
|
||||
|
||||
let subFut = self.node.filterSubscribe(pubsubTopic, contentTopics, peer)
|
||||
if not await subFut.withTimeout(FilterOpTimeout):
|
||||
return err("filter subscription timed out")
|
||||
subFut.read().isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc filterUnsubscribe*(
|
||||
self: Waku,
|
||||
pubsubTopic: Option[PubsubTopic],
|
||||
contentTopics: seq[ContentTopic],
|
||||
peer: string,
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuFilterClient.isNil():
|
||||
return err("wakuFilterClient is not mounted")
|
||||
|
||||
let unsubFut = self.node.filterUnsubscribe(pubsubTopic, contentTopics, peer)
|
||||
if not await unsubFut.withTimeout(FilterOpTimeout):
|
||||
return err("filter un-subscription timed out")
|
||||
unsubFut.read().isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc filterUnsubscribeAll*(
|
||||
self: Waku, peer: string
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuFilterClient.isNil():
|
||||
return err("wakuFilterClient is not mounted")
|
||||
|
||||
let unsubFut = self.node.filterUnsubscribeAll(peer)
|
||||
if not await unsubFut.withTimeout(FilterOpTimeout):
|
||||
return err("filter un-subscription all timed out")
|
||||
unsubFut.read().isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
# --- lightpush ---
|
||||
proc lightpushPublish*(
|
||||
self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, peer: string
|
||||
): Future[Result[string, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuLegacyLightpushClient.isNil():
|
||||
return err("wakuLegacyLightpushClient is not mounted")
|
||||
|
||||
let remotePeer = parsePeerInfo(peer).valueOr:
|
||||
return err("lightpushPublish failed to parse peer addr: " & $error)
|
||||
|
||||
let msgHashHex = (
|
||||
await self.node.wakuLegacyLightpushClient.publish(
|
||||
pubsubTopic, message, remotePeer
|
||||
)
|
||||
).valueOr:
|
||||
return err($error)
|
||||
|
||||
return ok(msgHashHex)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
# --- store ---
|
||||
proc storeQuery*(
|
||||
self: Waku, request: StoreQueryRequest, peer: string, timeoutMs: int
|
||||
): Future[Result[StoreQueryResponse, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuStoreClient.isNil():
|
||||
return err("wakuStoreClient is not mounted")
|
||||
|
||||
let remotePeer = parsePeerInfo(peer).valueOr:
|
||||
return err("storeQuery failed to parse peer addr: " & $error)
|
||||
|
||||
let queryFut = self.node.wakuStoreClient.query(request, remotePeer)
|
||||
if not await queryFut.withTimeout(timeoutMs.milliseconds):
|
||||
return err("storeQuery timed out")
|
||||
|
||||
let queryResponse = queryFut.read().valueOr:
|
||||
return err("storeQuery failed: " & $error)
|
||||
|
||||
return ok(queryResponse)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
# --- peer management ---
|
||||
proc connect*(
|
||||
self: Waku, peers: seq[string], timeoutMs: uint32
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
await self.node.connectToNodes(peers.mapIt(strip(it)), source = "static")
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc disconnectPeerById*(
|
||||
self: Waku, peerId: string
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
let pId = PeerId.init(peerId).valueOr:
|
||||
return err($error)
|
||||
await self.node.peerManager.disconnectNode(pId)
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc disconnectAllPeers*(self: Waku): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
await self.node.peerManager.disconnectAllPeers()
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc dialPeer*(
|
||||
self: Waku, peerAddr: string, protocol: string, timeoutMs: int
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
let remotePeerInfo = parsePeerInfo(peerAddr).valueOr:
|
||||
return err($error)
|
||||
let conn = await self.node.peerManager.dialPeer(remotePeerInfo, protocol)
|
||||
if conn.isNone():
|
||||
return err("failed dialing peer")
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc dialPeerById*(
|
||||
self: Waku, peerId: string, protocol: string, timeoutMs: int
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
let pId = PeerId.init(peerId).valueOr:
|
||||
return err($error)
|
||||
let conn = await self.node.peerManager.dialPeer(pId, protocol)
|
||||
if conn.isNone():
|
||||
return err("failed dialing peer")
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc peerIdsFromPeerstore*(self: Waku): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
return ok(self.node.peerManager.switch.peerStore.peers().mapIt($it.peerId))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc connectedPeersInfo*(self: Waku): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
return ok(
|
||||
self.node.peerManager.switch.peerStore
|
||||
.peers()
|
||||
.filterIt(it.connectedness == Connected)
|
||||
.mapIt($it.peerId)
|
||||
)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc connectedPeers*(self: Waku): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
let (inPeerIds, outPeerIds) = self.node.peerManager.connectedPeers()
|
||||
return ok(concat(inPeerIds, outPeerIds).mapIt($it))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc peerIdsByProtocol*(
|
||||
self: Waku, protocol: string
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
return ok(
|
||||
self.node.peerManager.switch.peerStore
|
||||
.peers(protocol)
|
||||
.filterIt(it.connectedness == Connected)
|
||||
.mapIt($it.peerId)
|
||||
)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
# --- discovery ---
|
||||
proc dnsDiscovery*(
|
||||
self: Waku, enrTreeUrl: string, nameServer: string, timeoutMs: int
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
let dnsNameServers = @[parseIpAddress(nameServer)]
|
||||
let discoveredPeers = (
|
||||
await retrieveDynamicBootstrapNodes(enrTreeUrl, dnsNameServers)
|
||||
).valueOr:
|
||||
return err("failed discovering peers from DNS: " & $error)
|
||||
|
||||
var multiAddresses = newSeq[string]()
|
||||
for discPeer in discoveredPeers:
|
||||
for address in discPeer.addrs:
|
||||
multiAddresses.add($address & "/p2p/" & $discPeer)
|
||||
|
||||
return ok(multiAddresses)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc discv5UpdateBootnodes*(
|
||||
self: Waku, bootnodes: seq[string]
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.wakuDiscv5.isNil():
|
||||
return err("discv5 not started")
|
||||
let jsonArray = "[" & bootnodes.mapIt("\"" & it & "\"").join(",") & "]"
|
||||
self.wakuDiscv5.updateBootstrapRecords(jsonArray).isOkOr:
|
||||
return err("error in discv5UpdateBootnodes: " & $error)
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc startDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.wakuDiscv5.isNil():
|
||||
return err("discv5 not started")
|
||||
(await self.wakuDiscv5.start()).isOkOr:
|
||||
return err("error starting discv5: " & $error)
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc stopDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.wakuDiscv5.isNil():
|
||||
return err("discv5 not started")
|
||||
await self.wakuDiscv5.stop()
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc peerExchangeRequest*(
|
||||
self: Waku, numPeers: uint64
|
||||
): Future[Result[int, string]] {.async.} =
|
||||
try:
|
||||
let numPeersRecv = (await self.node.fetchPeerExchangePeers(numPeers)).valueOr:
|
||||
return err("failed peer exchange: " & $error)
|
||||
return ok(numPeersRecv)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
# --- debug / info ---
|
||||
proc version*(self: Waku): Future[Result[string, string]] {.async.} =
|
||||
return ok(WakuNodeVersionString)
|
||||
|
||||
proc listenAddresses*(self: Waku): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
return ok(self.node.info().listenAddresses)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc myEnr*(self: Waku): Future[Result[string, string]] {.async.} =
|
||||
try:
|
||||
return ok(self.node.enr.toURI())
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc myPeerId*(self: Waku): Future[Result[string, string]] {.async.} =
|
||||
try:
|
||||
return ok($self.node.peerId())
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc metrics*(self: Waku): Future[Result[string, string]] {.async.} =
|
||||
{.gcsafe.}:
|
||||
try:
|
||||
return ok(defaultRegistry.toText())
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc pingPeer*(
|
||||
self: Waku, peerAddr: string, timeoutMs: int
|
||||
): Future[Result[int64, string]] {.async.} =
|
||||
try:
|
||||
let peerInfo = parsePeerInfo(peerAddr).valueOr:
|
||||
return err("pingPeer failed to parse peer addr: " & $error)
|
||||
|
||||
let conn = await self.node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec)
|
||||
defer:
|
||||
await conn.close()
|
||||
let pingRTT = await self.node.libp2pPing.ping(conn)
|
||||
|
||||
if pingRTT == 0.nanos:
|
||||
return err("could not ping peer: rtt-0")
|
||||
|
||||
return ok(pingRTT.nanos)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
{.pop.}
|
||||
@ -187,7 +187,7 @@ proc setupNetwork(testTopic: ContentTopic): Future[TestNetwork] {.async.} =
|
||||
raiseAssert "Message was not archived in time"
|
||||
|
||||
# subscribe to the content topic; with no peers yet the subscriber stays offline
|
||||
(await subscriber.waku.subscribe(testTopic)).expect("Failed to subscribe")
|
||||
(await subscriber.messagingClient.subscribe(testTopic)).expect("Failed to subscribe")
|
||||
|
||||
return TestNetwork(
|
||||
storeNode: storeNode,
|
||||
|
||||
@ -225,7 +225,7 @@ suite "Messaging API, SubscriptionManager":
|
||||
await net.teardown()
|
||||
|
||||
let testTopic = ContentTopic("/waku/2/test-content/proto")
|
||||
(await net.subscriber.waku.subscribe(testTopic)).expect(
|
||||
(await net.subscriber.messagingClient.subscribe(testTopic)).expect(
|
||||
"subscriberNode failed to subscribe"
|
||||
)
|
||||
|
||||
@ -248,7 +248,9 @@ suite "Messaging API, SubscriptionManager":
|
||||
|
||||
let subbedTopic = ContentTopic("/waku/2/subbed-topic/proto")
|
||||
let ignoredTopic = ContentTopic("/waku/2/ignored-topic/proto")
|
||||
(await net.subscriber.waku.subscribe(subbedTopic)).expect("failed to subscribe")
|
||||
(await net.subscriber.messagingClient.subscribe(subbedTopic)).expect(
|
||||
"failed to subscribe"
|
||||
)
|
||||
|
||||
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
||||
defer:
|
||||
@ -268,8 +270,12 @@ suite "Messaging API, SubscriptionManager":
|
||||
|
||||
let testTopic = ContentTopic("/waku/2/unsub-test/proto")
|
||||
|
||||
(await net.subscriber.waku.subscribe(testTopic)).expect("failed to subscribe")
|
||||
net.subscriber.waku.unsubscribe(testTopic).expect("failed to unsubscribe")
|
||||
(await net.subscriber.messagingClient.subscribe(testTopic)).expect(
|
||||
"failed to subscribe"
|
||||
)
|
||||
net.subscriber.messagingClient.unsubscribe(testTopic).expect(
|
||||
"failed to unsubscribe"
|
||||
)
|
||||
|
||||
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
||||
defer:
|
||||
@ -289,14 +295,14 @@ suite "Messaging API, SubscriptionManager":
|
||||
|
||||
let topicA = ContentTopic("/waku/2/topic-a/proto")
|
||||
let topicB = ContentTopic("/waku/2/topic-b/proto")
|
||||
(await net.subscriber.waku.subscribe(topicA)).expect("failed to sub A")
|
||||
(await net.subscriber.waku.subscribe(topicB)).expect("failed to sub B")
|
||||
(await net.subscriber.messagingClient.subscribe(topicA)).expect("failed to sub A")
|
||||
(await net.subscriber.messagingClient.subscribe(topicB)).expect("failed to sub B")
|
||||
|
||||
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
||||
defer:
|
||||
await eventManager.teardown()
|
||||
|
||||
net.subscriber.waku.unsubscribe(topicA).expect("failed to unsub A")
|
||||
net.subscriber.messagingClient.unsubscribe(topicA).expect("failed to unsub A")
|
||||
|
||||
discard (await net.publishToMesh(topicA, "Dropped Message".toBytes())).expect(
|
||||
"Publish A failed"
|
||||
@ -315,9 +321,13 @@ suite "Messaging API, SubscriptionManager":
|
||||
|
||||
let glitchTopic = ContentTopic("/waku/2/glitch/proto")
|
||||
|
||||
(await net.subscriber.waku.subscribe(glitchTopic)).expect("failed to sub")
|
||||
(await net.subscriber.waku.subscribe(glitchTopic)).expect("failed to double sub")
|
||||
net.subscriber.waku.unsubscribe(glitchTopic).expect("failed to unsub")
|
||||
(await net.subscriber.messagingClient.subscribe(glitchTopic)).expect(
|
||||
"failed to sub"
|
||||
)
|
||||
(await net.subscriber.messagingClient.subscribe(glitchTopic)).expect(
|
||||
"failed to double sub"
|
||||
)
|
||||
net.subscriber.messagingClient.unsubscribe(glitchTopic).expect("failed to unsub")
|
||||
|
||||
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
||||
defer:
|
||||
@ -338,7 +348,9 @@ suite "Messaging API, SubscriptionManager":
|
||||
let testTopic = ContentTopic("/waku/2/resub-test/proto")
|
||||
|
||||
# Subscribe
|
||||
(await net.subscriber.waku.subscribe(testTopic)).expect("Initial sub failed")
|
||||
(await net.subscriber.messagingClient.subscribe(testTopic)).expect(
|
||||
"Initial sub failed"
|
||||
)
|
||||
|
||||
var eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
||||
discard
|
||||
@ -348,7 +360,7 @@ suite "Messaging API, SubscriptionManager":
|
||||
await eventManager.teardown()
|
||||
|
||||
# Unsubscribe and verify teardown
|
||||
net.subscriber.waku.unsubscribe(testTopic).expect("Unsub failed")
|
||||
net.subscriber.messagingClient.unsubscribe(testTopic).expect("Unsub failed")
|
||||
eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
||||
|
||||
discard
|
||||
@ -358,7 +370,7 @@ suite "Messaging API, SubscriptionManager":
|
||||
await eventManager.teardown()
|
||||
|
||||
# Resubscribe
|
||||
(await net.subscriber.waku.subscribe(testTopic)).expect("Resub failed")
|
||||
(await net.subscriber.messagingClient.subscribe(testTopic)).expect("Resub failed")
|
||||
eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
||||
|
||||
discard
|
||||
@ -382,8 +394,8 @@ suite "Messaging API, SubscriptionManager":
|
||||
topicB = ContentTopic("/appB" & $i & "/2/shard-test-b/proto")
|
||||
inc i
|
||||
|
||||
(await net.subscriber.waku.subscribe(topicA)).expect("failed to sub A")
|
||||
(await net.subscriber.waku.subscribe(topicB)).expect("failed to sub B")
|
||||
(await net.subscriber.messagingClient.subscribe(topicA)).expect("failed to sub A")
|
||||
(await net.subscriber.messagingClient.subscribe(topicB)).expect("failed to sub B")
|
||||
|
||||
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 2)
|
||||
defer:
|
||||
@ -440,7 +452,7 @@ suite "Messaging API, SubscriptionManager":
|
||||
|
||||
# subscribe to all content topics we generated
|
||||
for t in allTopics:
|
||||
(await net.subscriber.waku.subscribe(t)).expect("sub failed")
|
||||
(await net.subscriber.messagingClient.subscribe(t)).expect("sub failed")
|
||||
activeSubs.add(t)
|
||||
|
||||
await verifyNetworkState(activeSubs)
|
||||
@ -448,7 +460,7 @@ suite "Messaging API, SubscriptionManager":
|
||||
# unsubscribe from some content topics
|
||||
for i in 0 ..< 50:
|
||||
let t = allTopics[i]
|
||||
net.subscriber.waku.unsubscribe(t).expect("unsub failed")
|
||||
net.subscriber.messagingClient.unsubscribe(t).expect("unsub failed")
|
||||
|
||||
let idx = activeSubs.find(t)
|
||||
if idx >= 0:
|
||||
@ -459,7 +471,7 @@ suite "Messaging API, SubscriptionManager":
|
||||
# re-subscribe to some content topics
|
||||
for i in 0 ..< 25:
|
||||
let t = allTopics[i]
|
||||
(await net.subscriber.waku.subscribe(t)).expect("resub failed")
|
||||
(await net.subscriber.messagingClient.subscribe(t)).expect("resub failed")
|
||||
activeSubs.add(t)
|
||||
|
||||
await verifyNetworkState(activeSubs)
|
||||
@ -470,7 +482,9 @@ suite "Messaging API, SubscriptionManager":
|
||||
await net.teardown()
|
||||
|
||||
let testTopic = ContentTopic("/waku/2/test-content/proto")
|
||||
(await net.subscriber.waku.subscribe(testTopic)).expect("failed to subscribe")
|
||||
(await net.subscriber.messagingClient.subscribe(testTopic)).expect(
|
||||
"failed to subscribe"
|
||||
)
|
||||
|
||||
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
||||
defer:
|
||||
@ -491,7 +505,9 @@ suite "Messaging API, SubscriptionManager":
|
||||
|
||||
let subbedTopic = ContentTopic("/waku/2/subbed-topic/proto")
|
||||
let ignoredTopic = ContentTopic("/waku/2/ignored-topic/proto")
|
||||
(await net.subscriber.waku.subscribe(subbedTopic)).expect("failed to subscribe")
|
||||
(await net.subscriber.messagingClient.subscribe(subbedTopic)).expect(
|
||||
"failed to subscribe"
|
||||
)
|
||||
|
||||
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
||||
defer:
|
||||
@ -511,8 +527,12 @@ suite "Messaging API, SubscriptionManager":
|
||||
|
||||
let testTopic = ContentTopic("/waku/2/unsub-test/proto")
|
||||
|
||||
(await net.subscriber.waku.subscribe(testTopic)).expect("failed to subscribe")
|
||||
net.subscriber.waku.unsubscribe(testTopic).expect("failed to unsubscribe")
|
||||
(await net.subscriber.messagingClient.subscribe(testTopic)).expect(
|
||||
"failed to subscribe"
|
||||
)
|
||||
net.subscriber.messagingClient.unsubscribe(testTopic).expect(
|
||||
"failed to unsubscribe"
|
||||
)
|
||||
|
||||
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
||||
defer:
|
||||
@ -532,8 +552,8 @@ suite "Messaging API, SubscriptionManager":
|
||||
|
||||
let topicA = ContentTopic("/waku/2/topic-a/proto")
|
||||
let topicB = ContentTopic("/waku/2/topic-b/proto")
|
||||
(await net.subscriber.waku.subscribe(topicA)).expect("failed to sub A")
|
||||
(await net.subscriber.waku.subscribe(topicB)).expect("failed to sub B")
|
||||
(await net.subscriber.messagingClient.subscribe(topicA)).expect("failed to sub A")
|
||||
(await net.subscriber.messagingClient.subscribe(topicB)).expect("failed to sub B")
|
||||
|
||||
let shard = net.subscriber.waku.node.getRelayShard(topicA)
|
||||
await waitForEdgeSubs(net.subscriber, shard)
|
||||
@ -542,7 +562,7 @@ suite "Messaging API, SubscriptionManager":
|
||||
defer:
|
||||
await eventManager.teardown()
|
||||
|
||||
net.subscriber.waku.unsubscribe(topicA).expect("failed to unsub A")
|
||||
net.subscriber.messagingClient.unsubscribe(topicA).expect("failed to unsub A")
|
||||
|
||||
discard (await net.publishToMesh(topicA, "Dropped Message".toBytes())).expect(
|
||||
"Publish A failed"
|
||||
@ -561,7 +581,9 @@ suite "Messaging API, SubscriptionManager":
|
||||
|
||||
let testTopic = ContentTopic("/waku/2/resub-test/proto")
|
||||
|
||||
(await net.subscriber.waku.subscribe(testTopic)).expect("Initial sub failed")
|
||||
(await net.subscriber.messagingClient.subscribe(testTopic)).expect(
|
||||
"Initial sub failed"
|
||||
)
|
||||
|
||||
var eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
||||
discard (await net.publishToMeshAfterEdgeReady(testTopic, "Msg 1".toBytes())).expect(
|
||||
@ -571,7 +593,7 @@ suite "Messaging API, SubscriptionManager":
|
||||
require await eventManager.waitForEvents(TestTimeout)
|
||||
await eventManager.teardown()
|
||||
|
||||
net.subscriber.waku.unsubscribe(testTopic).expect("Unsub failed")
|
||||
net.subscriber.messagingClient.unsubscribe(testTopic).expect("Unsub failed")
|
||||
eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
||||
|
||||
discard
|
||||
@ -580,7 +602,7 @@ suite "Messaging API, SubscriptionManager":
|
||||
check not await eventManager.waitForEvents(NegativeTestTimeout)
|
||||
await eventManager.teardown()
|
||||
|
||||
(await net.subscriber.waku.subscribe(testTopic)).expect("Resub failed")
|
||||
(await net.subscriber.messagingClient.subscribe(testTopic)).expect("Resub failed")
|
||||
eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
||||
|
||||
discard (await net.publishToMeshAfterEdgeReady(testTopic, "Msg 2".toBytes())).expect(
|
||||
@ -653,7 +675,9 @@ suite "Messaging API, SubscriptionManager":
|
||||
let testTopic = ContentTopic("/waku/2/failover-test/proto")
|
||||
let shard = subscriber.waku.node.getRelayShard(testTopic)
|
||||
|
||||
(await subscriber.waku.subscribe(testTopic)).expect("Failed to subscribe")
|
||||
(await subscriber.messagingClient.subscribe(testTopic)).expect(
|
||||
"Failed to subscribe"
|
||||
)
|
||||
|
||||
# Wait for dialing both filter servers (HealthyThreshold = 2)
|
||||
check await edgePeersReached(subscriber, shard, 2)
|
||||
@ -783,7 +807,9 @@ suite "Messaging API, SubscriptionManager":
|
||||
let testTopic = ContentTopic("/waku/2/replacement-test/proto")
|
||||
let shard = subscriber.waku.node.getRelayShard(testTopic)
|
||||
|
||||
(await subscriber.waku.subscribe(testTopic)).expect("Failed to subscribe")
|
||||
(await subscriber.messagingClient.subscribe(testTopic)).expect(
|
||||
"Failed to subscribe"
|
||||
)
|
||||
|
||||
# Wait for 2 confirmed peers (HealthyThreshold). The 3rd is available but not dialed.
|
||||
check await edgePeersReached(subscriber, shard, 2)
|
||||
|
||||
@ -5,7 +5,7 @@ import json_serialization, confutils, confutils/std/net
|
||||
import
|
||||
tools/confutils/cli_args,
|
||||
tools/confutils/conf_from_json,
|
||||
logos_delivery/waku/api/api_conf,
|
||||
logos_delivery/api/api_conf,
|
||||
logos_delivery/waku/factory/waku_conf,
|
||||
logos_delivery/waku/factory/networks_config,
|
||||
logos_delivery/waku/factory/conf_builder/conf_builder,
|
||||
@ -350,7 +350,7 @@ suite "WakuNodeConf JSON -> WakuConf integration":
|
||||
|
||||
{.push warning[Deprecated]: off.}
|
||||
|
||||
import logos_delivery/waku/api/api_conf
|
||||
import logos_delivery/api/api_conf
|
||||
|
||||
suite "NodeConfig (deprecated) - toWakuConf":
|
||||
test "Minimal configuration":
|
||||
|
||||
@ -58,7 +58,7 @@ suite "Reliable Channel - ingress":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
## Noop encryption providers so the Encrypt/Decrypt brokers have
|
||||
@ -124,7 +124,7 @@ suite "Reliable Channel - ingress":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -181,7 +181,7 @@ suite "Reliable Channel - send state machine":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -246,7 +246,7 @@ suite "Reliable Channel - send state machine":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -347,7 +347,7 @@ suite "Reliable Channel - send state machine":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -452,7 +452,7 @@ suite "Reliable Channel - SDS persistence":
|
||||
var waku: LogosDelivery
|
||||
var manager: ReliableChannelManager
|
||||
lockNewGlobalBrokerContext:
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -522,7 +522,7 @@ suite "Reliable Channel - SDS lifecycle":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -593,7 +593,7 @@ suite "Reliable Channel - SDS lifecycle":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -650,7 +650,7 @@ suite "Reliable Channel - SDS lifecycle":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -710,7 +710,7 @@ suite "Reliable Channel - SDS lifecycle":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -792,7 +792,7 @@ suite "Reliable Channel - SDS protocol semantics":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -860,7 +860,7 @@ suite "Reliable Channel - SDS protocol semantics":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -948,7 +948,7 @@ suite "Reliable Channel - SDS protocol semantics":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -1023,7 +1023,7 @@ suite "Reliable Channel - SDS protocol semantics":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -1096,7 +1096,7 @@ suite "Reliable Channel - SDS protocol semantics":
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
@ -1162,7 +1162,7 @@ suite "Reliable Channel - SDS protocol semantics":
|
||||
var waku: LogosDelivery
|
||||
var manager: ReliableChannelManager
|
||||
lockNewGlobalBrokerContext:
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
|
||||
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
check (await manager.send(ChannelId("no-such-channel"), "x".toBytes())).isErr()
|
||||
|
||||
@ -9,7 +9,7 @@ import tools/confutils/cli_args
|
||||
import logos_delivery/waku/factory/networks_config
|
||||
import logos_delivery/waku/factory/conf_builder/conf_builder
|
||||
|
||||
suite "Waku API - Create node":
|
||||
suite "LogosDelivery API - Create node":
|
||||
asyncTest "Create node with minimal configuration":
|
||||
## Given
|
||||
var nodeConf = defaultWakuNodeConf().valueOr:
|
||||
@ -21,14 +21,14 @@ suite "Waku API - Create node":
|
||||
# This is the actual minimal config but as the node auto-start, it is not suitable for tests
|
||||
|
||||
## When
|
||||
let node = (await createNode(nodeConf)).valueOr:
|
||||
raiseAssert "createNode (minimal config) failed: " & error
|
||||
let ld = (await LogosDelivery.new(nodeConf)).valueOr:
|
||||
raiseAssert "LogosDelivery.new (minimal config) failed: " & error
|
||||
|
||||
## Then
|
||||
check:
|
||||
not node.isNil()
|
||||
node.conf.clusterId == 3
|
||||
node.conf.relay == true
|
||||
not ld.isNil()
|
||||
ld.waku.conf.clusterId == 3
|
||||
ld.waku.conf.relay == true
|
||||
|
||||
asyncTest "Create node with full configuration":
|
||||
## Given
|
||||
@ -47,20 +47,20 @@ suite "Waku API - Create node":
|
||||
]
|
||||
|
||||
## When
|
||||
let node = (await createNode(nodeConf)).valueOr:
|
||||
raiseAssert "createNode (full config) failed: " & error
|
||||
let ld = (await LogosDelivery.new(nodeConf)).valueOr:
|
||||
raiseAssert "LogosDelivery.new (full config) failed: " & error
|
||||
|
||||
## Then
|
||||
check:
|
||||
not node.isNil()
|
||||
node.conf.clusterId == 99
|
||||
node.conf.shardingConf.numShardsInCluster == 16
|
||||
node.conf.maxMessageSizeBytes == 1024'u64 * 1024'u64
|
||||
node.conf.staticNodes.len == 1
|
||||
node.conf.relay == true
|
||||
node.conf.lightPush == true
|
||||
node.conf.peerExchangeService == true
|
||||
node.conf.rendezvous == true
|
||||
not ld.isNil()
|
||||
ld.waku.conf.clusterId == 99
|
||||
ld.waku.conf.shardingConf.numShardsInCluster == 16
|
||||
ld.waku.conf.maxMessageSizeBytes == 1024'u64 * 1024'u64
|
||||
ld.waku.conf.staticNodes.len == 1
|
||||
ld.waku.conf.relay == true
|
||||
ld.waku.conf.lightPush == true
|
||||
ld.waku.conf.peerExchangeService == true
|
||||
ld.waku.conf.rendezvous == true
|
||||
|
||||
asyncTest "Create node with mixed entry nodes (enrtree, multiaddr)":
|
||||
## Given
|
||||
@ -75,18 +75,18 @@ suite "Waku API - Create node":
|
||||
]
|
||||
|
||||
## When
|
||||
let node = (await createNode(nodeConf)).valueOr:
|
||||
raiseAssert "createNode (mixed entry nodes) failed: " & error
|
||||
let ld = (await LogosDelivery.new(nodeConf)).valueOr:
|
||||
raiseAssert "LogosDelivery.new (mixed entry nodes) failed: " & error
|
||||
|
||||
## Then
|
||||
check:
|
||||
not node.isNil()
|
||||
node.conf.clusterId == 42
|
||||
not ld.isNil()
|
||||
ld.waku.conf.clusterId == 42
|
||||
# ENRTree should go to DNS discovery
|
||||
node.conf.dnsDiscoveryConf.isSome()
|
||||
node.conf.dnsDiscoveryConf.get().enrTreeUrl ==
|
||||
ld.waku.conf.dnsDiscoveryConf.isSome()
|
||||
ld.waku.conf.dnsDiscoveryConf.get().enrTreeUrl ==
|
||||
"enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im"
|
||||
# Multiaddr should go to static nodes
|
||||
node.conf.staticNodes.len == 1
|
||||
node.conf.staticNodes[0] ==
|
||||
ld.waku.conf.staticNodes.len == 1
|
||||
ld.waku.conf.staticNodes[0] ==
|
||||
"/ip4/127.0.0.1/tcp/60000/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"
|
||||
|
||||
@ -21,7 +21,7 @@ import
|
||||
discovery/waku_discv5,
|
||||
waku_enr/capabilities,
|
||||
factory/conf_builder/conf_builder,
|
||||
factory/waku,
|
||||
waku,
|
||||
waku_node,
|
||||
node/peer_manager,
|
||||
],
|
||||
|
||||
@ -10,7 +10,7 @@ import
|
||||
tests/testlib/[wakucore, wakunode],
|
||||
logos_delivery/waku/factory/conf_builder/conf_builder
|
||||
|
||||
include logos_delivery/waku/factory/waku, logos_delivery/waku/common/enr/typed_record
|
||||
include logos_delivery/waku/waku, logos_delivery/waku/common/enr/typed_record
|
||||
|
||||
suite "Wakunode2 - Waku":
|
||||
test "compilation version should be reported":
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user