Refactor waku/api into messaging_client, waku/api/types and api_conf into logos_delivery/api

This commit is contained in:
NagyZoltanPeter 2026-06-23 13:55:23 +02:00
parent 95f31edf11
commit dd231c4c9f
No known key found for this signature in database
GPG Key ID: 3E1F97CF4A7B6F42
24 changed files with 1130 additions and 166 deletions

View File

@ -2,7 +2,7 @@
import system, std/json
import ./json_base_event
import ../../logos_delivery/waku/api/types
import ../../logos_delivery/api/types
type JsonConnectionStatusChangeEvent* = ref object of JsonEvent
status*: ConnectionStatus

View File

@ -21,7 +21,7 @@ import bearssl/rand
import stew/byteutils
import libp2p/crypto/crypto as libp2p_crypto
import logos_delivery/waku/api/types
import logos_delivery/api/types
import logos_delivery/messaging/delivery_service/send_service
import logos_delivery/waku/waku_core/topics
@ -135,7 +135,7 @@ proc tryFinalizeChannelReq(self: ReliableChannel, channelReqId: RequestId) =
## and the total number of confirmed + failed segments equals the total expected segments.
## Therefore, the channel-level request is removed from `self.channelReqs`
## and the appropriate final event is emitted.
##
##
let state = self.channelReqs.getOrDefault(channelReqId)
if state.totalExpectedSegments == 0:
## Either already finalized (and removed) or never inserted.

View File

@ -15,7 +15,7 @@ import brokers/broker_context
import logos_delivery/waku/events/message_events as waku_message_events
import logos_delivery/messaging/messaging_client
import logos_delivery/waku/api/types
import logos_delivery/api/types
import logos_delivery/waku/waku_core/topics
import logos_delivery/waku/persistency/sds_persistency

View File

@ -1,7 +1,7 @@
## Core identifier types for the Reliable Channel API.
import std/hashes
import logos_delivery/waku/api/types as api_types
import logos_delivery/api/types as api_types
import ./scalable_data_sync/scalable_data_sync

View File

@ -22,7 +22,6 @@ export reliable_channel_manager
import logos_delivery/waku/factory/waku_conf
import logos_delivery/waku/factory/app_callbacks
import logos_delivery/waku/api/[api_conf, types]
logScope:
topics = "logosdelivery"
@ -82,6 +81,13 @@ proc new*(
proc start*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
## Starts each layer bottom-up: transport first, then messaging, then channels.
if self.waku.isNil():
return err("Waku node is not initialized")
if self.messagingClient.isNil():
return err("MessagingClient is not initialized")
if self.reliableChannelManager.isNil():
return err("ReliableChannelManager is not initialized")
(await self.waku.start()).isOkOr:
return err("failed to start Waku: " & error)
@ -102,3 +108,8 @@ proc stop*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
return err("failed to stop Waku: " & error)
return ok()
proc isOnline*(self: LogosDelivery): Future[Result[bool, string]] {.async.} =
if self.waku.isNil():
return err("Waku node is not initialized")
return ok(self.waku.healthMonitor.onlineMonitor.amIOnline())

View File

@ -3,7 +3,7 @@ import std/[options, times], chronos
import brokers/broker_context
import
logos_delivery/waku/waku_core,
logos_delivery/waku/api/types,
logos_delivery/api/types,
logos_delivery/waku/requests/node_requests
type DeliveryState* {.pure.} = enum

View File

@ -4,7 +4,7 @@ import chronos, chronicles
import brokers/broker_context
import logos_delivery/waku/[waku_core], logos_delivery/waku/waku_lightpush/[common, rpc]
import logos_delivery/waku/requests/health_requests
import logos_delivery/waku/api/types
import logos_delivery/api/types
import ./[delivery_task, send_processor]
logScope:

View File

@ -1,7 +1,7 @@
import results, chronos
import chronicles
import
logos_delivery/waku/api/types,
logos_delivery/api/types,
logos_delivery/waku/node/[waku_node, subscription_manager],
logos_delivery/messaging/delivery_service/[recv_service, send_service],
logos_delivery/messaging/delivery_service/send_service/delivery_task
@ -43,6 +43,26 @@ proc stop*(self: MessagingClient) {.async.} =
await self.recvService.stopRecvService()
self.started = false
proc checkApiAvailability(mc: MessagingClient): Result[void, string] =
if mc.isNil():
return err("MessagingClient is not initialized")
return ok()
proc subscribe*(
mc: MessagingClient, contentTopic: ContentTopic
): Future[Result[void, string]] {.async.} =
?checkApiAvailability(mc)
return mc.node.subscriptionManager.subscribe(contentTopic)
proc unsubscribe*(
mc: MessagingClient, contentTopic: ContentTopic
): Result[void, string] =
?checkApiAvailability(mc)
return mc.node.subscriptionManager.unsubscribe(contentTopic)
proc send*(
self: MessagingClient, envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async.} =

View File

@ -1,5 +1,4 @@
import ./api/[api, api_conf]
import ./events/message_events
import tools/confutils/entry_nodes
export api, api_conf, entry_nodes, message_events
export entry_nodes, message_events

View File

@ -1,48 +0,0 @@
import logos_delivery/waku/compat/option_valueor
import std/[net, options]
import chronicles, chronos, libp2p/peerid, results
import logos_delivery/waku/waku
import logos_delivery/waku/[requests/health_requests, waku_core, waku_node]
import logos_delivery/waku/node/subscription_manager
import libp2p/peerid
import tools/confutils/cli_args
import ./[api_conf, types]
export cli_args
logScope:
topics = "api"
proc createNode*(conf: WakuNodeConf): Future[Result[Waku, string]] {.async.} =
let wakuConf = conf.toWakuConf().valueOr:
return err("Failed to handle the configuration: " & error)
## We are not defining app callbacks at node creation
let wakuRes = (await Waku.new(wakuConf)).valueOr:
error "waku initialization failed", error = error
return err("Failed setting up Waku: " & $error)
return ok(wakuRes)
proc checkApiAvailability(w: Waku): Result[void, string] =
if w.isNil():
return err("Waku node is not initialized")
# TODO: Conciliate request-bouncing health checks here with unit testing.
# (For now, better to just allow all sends and rely on retries.)
return ok()
proc subscribe*(
w: Waku, contentTopic: ContentTopic
): Future[Result[void, string]] {.async.} =
?checkApiAvailability(w)
return w.node.subscriptionManager.subscribe(contentTopic)
proc unsubscribe*(w: Waku, contentTopic: ContentTopic): Result[void, string] =
?checkApiAvailability(w)
return w.node.subscriptionManager.unsubscribe(contentTopic)

View File

@ -1,46 +0,0 @@
# SEND API
**THIS IS TO BE REMOVED BEFORE PR MERGE**
This document collects logic and todo's around the Send API.
## Overview
Send api hides the complex logic of using raw protocols for reliable message delivery.
The delivery method is chosen based on the node configuration and actual availabilities of peers.
## Delivery task
Each message send request is bundled into a task that not just holds the composed message but also the state of the delivery.
## Delivery methods
Depending on the configuration and the availability of store client protocol + actual configured and/or discovered store nodes:
- P2PReliability validation - checking network store node whether the message is reached at least a store node.
- Simple retry until message is propagated to the network
- Relay says >0 peers as publish result
- LightpushClient returns with success
Depending on node config:
- Relay
- Lightpush
These methods are used in combination to achieve the best reliability.
Fallback mechanism is used to switch between methods if the current one fails.
Relay+StoreCheck -> Relay+simple retry -> Lightpush+StoreCheck -> Lightpush simple retry -> Error
Combination is dynamically chosen on node configuration. Levels can be skipped depending on actual connectivity.
Actual connectivity is checked:
- Relay's topic health check - at least dLow peers in the mesh for the topic
- Store nodes availability - at least one store service node is available in peer manager
- Lightpush client availability - at least one lightpush service node is available in peer manager
## Delivery processing
At every send request, each task is tried to be delivered right away.
Any further retries and store check is done as a background task in a loop with predefined intervals.
Each task is set for a maximum number of retries and/or maximum time to live.
In each round of store check and retry send tasks are selected based on their state.
The state is updated based on the result of the delivery method.

View File

@ -1,6 +1,6 @@
import brokers/event_broker
import logos_delivery/waku/api/types
import logos_delivery/api/types
import logos_delivery/waku/node/health_monitor/[protocol_health, topic_health]
import logos_delivery/waku/waku_core/topics

View File

@ -1,5 +1,6 @@
import brokers/event_broker
import logos_delivery/waku/[api/types, waku_core/message, waku_core/topics]
import logos_delivery/api/types
import logos_delivery/waku/[waku_core/message, waku_core/topics]
export types
EventBroker:

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,5 @@
import chronos, results, std/strutils, ../../api/types
import chronos, results, std/strutils
import logos_delivery/api/types
export ConnectionStatus

View File

@ -8,10 +8,10 @@ import
libp2p/protocols/rendezvous,
libp2p/protocols/pubsub,
libp2p/protocols/pubsub/rpc/messages,
logos_delivery/api/types,
logos_delivery/waku/[
waku_relay,
waku_rln_relay,
api/types,
events/health_events,
events/peer_events,
node/waku_node,

View File

@ -1,6 +1,6 @@
import brokers/request_broker
import logos_delivery/waku/api/types
import logos_delivery/api/types
import
logos_delivery/waku/node/health_monitor/[protocol_health, topic_health, health_report]
import logos_delivery/waku/waku_core/topics

View File

@ -4,7 +4,8 @@ import logos_delivery/waku/compat/option_valueor
import results
import chronicles, json_serialization, json_serialization/std/options
import ../serdes
import logos_delivery/waku/[waku_node, api/types, node/health_monitor]
import logos_delivery/api/types
import logos_delivery/waku/[waku_node, node/health_monitor]
#### Serialization and deserialization

View File

@ -20,6 +20,7 @@ import
metrics,
metrics/chronos_httpserver,
brokers/broker_context,
logos_delivery/api/types,
logos_delivery/waku/[
waku_core,
waku_node,
@ -30,7 +31,6 @@ import
waku_relay/protocol,
waku_enr/sharding,
waku_enr/multiaddr,
api/types,
common/logging,
node/peer_manager,
node/health_monitor,
@ -48,6 +48,7 @@ import
factory/internal_config,
factory/app_callbacks,
persistency/persistency,
factory/validator_signed,
],
./factory/waku_conf,
./factory/waku_state_info
@ -611,15 +612,8 @@ proc relaySubscribe*(
if self.node.wakuRelay.isNil():
return err("relaySubscribe: WakuRelay not mounted")
let handler = proc(topic: PubsubTopic, msg: WakuMessage) {.async.} =
## Bridge inbound relay traffic to the `ReceivedMessage` kernel event
## (replaces libwaku's set_event_callback message path).
ReceivedMessage.emit(
self.brokerCtx, ReceivedMessage(pubsubTopic: topic, message: msg)
)
self.node.subscribe(
(kind: SubscriptionKind.PubsubSub, topic: pubsubTopic), WakuRelayHandler(handler)
(kind: SubscriptionKind.PubsubSub, topic: pubsubTopic), WakuRelayHandler(nil)
).isOkOr:
return err($error)
@ -969,9 +963,6 @@ proc metrics*(self: Waku): Future[Result[string, string]] {.async.} =
except CatchableError as e:
return err(e.msg)
proc isOnline*(self: Waku): Future[Result[bool, string]] {.async.} =
return ok(self.healthMonitor.onlineMonitor.amIOnline())
proc pingPeer*(
self: Waku, peerAddr: string, timeoutMs: int
): Future[Result[int64, string]] {.async.} =

View File

@ -5,7 +5,7 @@ import json_serialization, confutils, confutils/std/net
import
tools/confutils/cli_args,
tools/confutils/conf_from_json,
logos_delivery/waku/api/api_conf,
logos_delivery/api/api_conf,
logos_delivery/waku/factory/waku_conf,
logos_delivery/waku/factory/networks_config,
logos_delivery/waku/factory/conf_builder/conf_builder,
@ -350,7 +350,7 @@ suite "WakuNodeConf JSON -> WakuConf integration":
{.push warning[Deprecated]: off.}
import logos_delivery/waku/api/api_conf
import logos_delivery/api/api_conf
suite "NodeConfig (deprecated) - toWakuConf":
test "Minimal configuration":

View File

@ -58,7 +58,7 @@ suite "Reliable Channel - ingress":
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
manager = waku.reliableChannelManager
## Noop encryption providers so the Encrypt/Decrypt brokers have
@ -124,7 +124,7 @@ suite "Reliable Channel - ingress":
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -181,7 +181,7 @@ suite "Reliable Channel - send state machine":
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -246,7 +246,7 @@ suite "Reliable Channel - send state machine":
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -347,7 +347,7 @@ suite "Reliable Channel - send state machine":
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -452,7 +452,7 @@ suite "Reliable Channel - SDS persistence":
var waku: LogosDelivery
var manager: ReliableChannelManager
lockNewGlobalBrokerContext:
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -522,7 +522,7 @@ suite "Reliable Channel - SDS lifecycle":
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -593,7 +593,7 @@ suite "Reliable Channel - SDS lifecycle":
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -650,7 +650,7 @@ suite "Reliable Channel - SDS lifecycle":
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -710,7 +710,7 @@ suite "Reliable Channel - SDS lifecycle":
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -792,7 +792,7 @@ suite "Reliable Channel - SDS protocol semantics":
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -860,7 +860,7 @@ suite "Reliable Channel - SDS protocol semantics":
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -948,7 +948,7 @@ suite "Reliable Channel - SDS protocol semantics":
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -1023,7 +1023,7 @@ suite "Reliable Channel - SDS protocol semantics":
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -1096,7 +1096,7 @@ suite "Reliable Channel - SDS protocol semantics":
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -1162,7 +1162,7 @@ suite "Reliable Channel - SDS protocol semantics":
var waku: LogosDelivery
var manager: ReliableChannelManager
lockNewGlobalBrokerContext:
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new")
manager = waku.reliableChannelManager
check (await manager.send(ChannelId("no-such-channel"), "x".toBytes())).isErr()

View File

@ -9,7 +9,7 @@ import tools/confutils/cli_args
import logos_delivery/waku/factory/networks_config
import logos_delivery/waku/factory/conf_builder/conf_builder
suite "Waku API - Create node":
suite "LogosDelivery API - Create node":
asyncTest "Create node with minimal configuration":
## Given
var nodeConf = defaultWakuNodeConf().valueOr:
@ -21,14 +21,14 @@ suite "Waku API - Create node":
# This is the actual minimal config but as the node auto-start, it is not suitable for tests
## When
let node = (await createNode(nodeConf)).valueOr:
raiseAssert "createNode (minimal config) failed: " & error
let ld = (await LogosDelivery.new(nodeConf)).valueOr:
raiseAssert "LogosDelivery.new (minimal config) failed: " & error
## Then
check:
not node.isNil()
node.conf.clusterId == 3
node.conf.relay == true
not ld.isNil()
ld.waku.conf.clusterId == 3
ld.waku.conf.relay == true
asyncTest "Create node with full configuration":
## Given
@ -47,20 +47,20 @@ suite "Waku API - Create node":
]
## When
let node = (await createNode(nodeConf)).valueOr:
raiseAssert "createNode (full config) failed: " & error
let ld = (await LogosDelivery.new(nodeConf)).valueOr:
raiseAssert "LogosDelivery.new (full config) failed: " & error
## Then
check:
not node.isNil()
node.conf.clusterId == 99
node.conf.shardingConf.numShardsInCluster == 16
node.conf.maxMessageSizeBytes == 1024'u64 * 1024'u64
node.conf.staticNodes.len == 1
node.conf.relay == true
node.conf.lightPush == true
node.conf.peerExchangeService == true
node.conf.rendezvous == true
not ld.isNil()
ld.waku.conf.clusterId == 99
ld.waku.conf.shardingConf.numShardsInCluster == 16
ld.waku.conf.maxMessageSizeBytes == 1024'u64 * 1024'u64
ld.waku.conf.staticNodes.len == 1
ld.waku.conf.relay == true
ld.waku.conf.lightPush == true
ld.waku.conf.peerExchangeService == true
ld.waku.conf.rendezvous == true
asyncTest "Create node with mixed entry nodes (enrtree, multiaddr)":
## Given
@ -75,18 +75,18 @@ suite "Waku API - Create node":
]
## When
let node = (await createNode(nodeConf)).valueOr:
raiseAssert "createNode (mixed entry nodes) failed: " & error
let ld = (await LogosDelivery.new(nodeConf)).valueOr:
raiseAssert "LogosDelivery.new (mixed entry nodes) failed: " & error
## Then
check:
not node.isNil()
node.conf.clusterId == 42
not ld.isNil()
ld.waku.conf.clusterId == 42
# ENRTree should go to DNS discovery
node.conf.dnsDiscoveryConf.isSome()
node.conf.dnsDiscoveryConf.get().enrTreeUrl ==
ld.waku.conf.dnsDiscoveryConf.isSome()
ld.waku.conf.dnsDiscoveryConf.get().enrTreeUrl ==
"enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im"
# Multiaddr should go to static nodes
node.conf.staticNodes.len == 1
node.conf.staticNodes[0] ==
ld.waku.conf.staticNodes.len == 1
ld.waku.conf.staticNodes[0] ==
"/ip4/127.0.0.1/tcp/60000/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"