Add events to interface level

This commit is contained in:
NagyZoltanPeter 2026-06-24 14:24:49 +02:00
parent 019937a79b
commit 2322dafd70
No known key found for this signature in database
GPG Key ID: 3E1F97CF4A7B6F42
27 changed files with 171 additions and 170 deletions

View File

@ -3,9 +3,9 @@ import chronos, chronicles, results, ffi
import
logos_delivery,
logos_delivery/waku/node/waku_node,
logos_delivery/waku/events/message_events,
logos_delivery/api/messaging_client_api,
logos_delivery/api/types,
logos_delivery/waku/events/[message_events, health_events],
logos_delivery/waku/events/health_events,
tools/confutils/conf_from_json,
../declare_lib,
../json_event
@ -116,7 +116,7 @@ proc logosdelivery_start_node(
chronicles.error "MessageReceivedEvent.listen failed", err = $error
return err("MessageReceivedEvent.listen failed: " & $error)
let ConnectionStatusChangeListener = EventConnectionStatusChange.listen(
discard EventConnectionStatusChange.listen(
ctx.myLib[].waku.brokerCtx,
proc(event: EventConnectionStatusChange) {.async: (raises: []).} =
callEventCallback(ctx, "onConnectionStatusChange"):

View File

@ -1,18 +1,25 @@
import std/options
import chronos, results
import logos_delivery/api/types
import brokers/event_broker
import logos_delivery/api/types as api_types
import logos_delivery/waku/waku_core/topics/pubsub_topic
import logos_delivery/waku/waku_store/common
import logos_delivery/waku/waku_store/common as store_types
export event_broker
export api_types, pubsub_topic, store_types
type IKernel* = ref object of RootObj
EventBroker:
# Internal event emitted when a message arrives from the network via any protocol
type MessageSeenEvent* = object
topic*: PubsubTopic
message*: WakuMessage
# --- topic construction ---
method buildContentTopic*(
self: IKernel,
appName: string,
appVersion: uint32,
name: string,
encoding: string,
self: IKernel, appName: string, appVersion: uint32, name: string, encoding: string
): Future[Result[ContentTopic, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.buildContentTopic not implemented")

View File

@ -0,0 +1,33 @@
## `LogosDelivery` is the project entry point. It is a pure concentrator: it
## owns exactly one instance of each API layer
##
## Waku <- MessagingClient <- ReliableChannelManager
##
## and chains them together (each layer drives the one below it). Every layer
## keeps its own, separate public API — `LogosDelivery` only wires them up and
## drives the shared `new` / `start` / `stop` lifecycle.
{.push raises: [].}
import results, chronos
import brokers/event_broker
import types as api_types
export api_types, event_broker
type
## Entry point. Holds one instance of each API layer.
ILogosDelivery* = ref object of RootObj
EventBroker:
type EventConnectionStatusChange* = object
connectionStatus*: ConnectionStatus
method start*(self: ILogosDelivery): Future[Result[void, string]] {.async, base.} =
return err("ILogosDelivery.start not implemented")
method stop*(self: ILogosDelivery): Future[Result[void, string]] {.async, base.} =
return err("ILogosDelivery.stop not implemented")
method isOnline*(self: ILogosDelivery): Future[Result[bool, string]] {.async, base.} =
return err("ILogosDelivery.isOnline not implemented")

View File

@ -1,8 +1,37 @@
import chronos, results
import logos_delivery/api/types
import brokers/event_broker
import logos_delivery/api/types as api_types
export event_broker, api_types
type IMessagingClient* = ref object of RootObj
EventBroker:
# Event emitted when a message is sent to the network
type MessageSentEvent* = object
requestId*: RequestId
messageHash*: string
EventBroker:
# Event emitted when a message send operation fails
type MessageErrorEvent* = object
requestId*: RequestId
messageHash*: string
error*: string
EventBroker:
# Confirmation that a message has been correctly delivered to some neighbouring nodes.
type MessagePropagatedEvent* = object
requestId*: RequestId
messageHash*: string
EventBroker:
# Event emitted when a message is received via Waku
type MessageReceivedEvent* = object
messageHash*: string
message*: WakuMessage
method subscribe*(
self: IMessagingClient, contentTopic: ContentTopic
): Future[Result[void, string]] {.async: (raises: []), base.} =

View File

@ -1,9 +1,43 @@
import chronos, results
import logos_delivery/api/types
import logos_delivery/channels/types
import logos_delivery/channels/reliable_channel
type IReliableChannelManager* = ref object of RootObj
import brokers/event_broker
import logos_delivery/api/types as api_types
export event_broker, api_types
type
IReliableChannelManager* = ref object of RootObj
SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.
async: (raises: [CatchableError]), gcsafe
.}
## Egress dispatch boundary. Typically wraps `MessagingClient.send`;
## tests inject a fake that records calls and returns canned
## `RequestId`s so the send state machine can be exercised end-to-end
## without a network.
EventBroker:
type ChannelMessageReceivedEvent* = object
channelId*: ChannelId
senderId*: SdsParticipantID
payload*: seq[byte]
EventBroker:
## Emitted when every segment of a channel-level `send()` reached
## `Confirmed`. Channel-level analogue of `MessageSentEvent`; the
## `requestId` is the channel-layer parent returned by `send()`.
type ChannelMessageSentEvent* = object
channelId*: ChannelId
requestId*: RequestId
EventBroker:
## Emitted when a channel-level `send()` finalises with at least one
## segment in `Failed`. Channel-level analogue of `MessageErrorEvent`.
type ChannelMessageErrorEvent* = object
channelId*: ChannelId
requestId*: RequestId
error*: string
method createReliableChannel*(
self: IReliableChannelManager,

View File

@ -1,14 +1,19 @@
import logos_delivery/waku/compat/option_valueor
import libp2p/crypto/crypto
{.push raises: [].}
import std/hashes
import bearssl/rand, std/times, chronos
import stew/byteutils
import logos_delivery/waku/utils/requests as request_utils
import libp2p/crypto/crypto
import logos_delivery/waku/compat/option_valueor
import logos_delivery/waku/waku_core/[topics/content_topic, message/message, time]
export content_topic, message
import types/sds_message_id
export sds_message_id
type
MessageEnvelope* = object
contentTopic*: ContentTopic
@ -26,9 +31,16 @@ type
PartiallyConnected
Connected
ChannelId* = SdsChannelID
proc generateRequestId*(rng: crypto.Rng): string =
var bytes: array[10, byte]
rng.generate(bytes)
return byteutils.toHex(bytes)
proc new*(T: typedesc[RequestId], rng: crypto.Rng): T =
## Generate a new RequestId using the provided RNG.
RequestId(request_utils.generateRequestId(rng))
RequestId(generateRequestId(rng))
proc `$`*(r: RequestId): string {.inline.} =
string(r)
@ -36,6 +48,11 @@ proc `$`*(r: RequestId): string {.inline.} =
proc `==`*(a, b: RequestId): bool {.inline.} =
string(a) == string(b)
proc hash*(r: RequestId): Hash =
## Allows `RequestId` to be used as a `Table` key.
hash(string(r))
proc init*(
T: type MessageEnvelope,
contentTopic: ContentTopic,

View File

@ -1,39 +0,0 @@
## Reliable Channel event types emitted to API consumers.
##
## Lifecycle events for individual segments (sent / propagated / errored)
## are the same as the network-level ones the MessagingClient already
## emits — `requestId` is shared across layers — so we just re-export
## `waku/events/message_events` and avoid declaring duplicates.
##
## Only the channel-level `MessageReceivedEvent` carries data that has
## no analogue in the lower layer (reassembled application payload,
## senderId, channelId), so it lives here.
import logos_delivery/waku/events/message_events as waku_message_events
import brokers/event_broker
import ./types as channel_types
export waku_message_events, channel_types, event_broker
EventBroker:
type ChannelMessageReceivedEvent* = object
channelId*: ChannelId
senderId*: SdsParticipantID
payload*: seq[byte]
EventBroker:
## Emitted when every segment of a channel-level `send()` reached
## `Confirmed`. Channel-level analogue of `MessageSentEvent`; the
## `requestId` is the channel-layer parent returned by `send()`.
type ChannelMessageSentEvent* = object
channelId*: ChannelId
requestId*: RequestId
EventBroker:
## Emitted when a channel-level `send()` finalises with at least one
## segment in `Failed`. Channel-level analogue of `MessageErrorEvent`.
type ChannelMessageErrorEvent* = object
channelId*: ChannelId
requestId*: RequestId
error*: string

View File

@ -24,16 +24,17 @@ import libp2p/crypto/crypto as libp2p_crypto
import logos_delivery/api/types
import logos_delivery/messaging/delivery_service/send_service
import logos_delivery/waku/waku_core/topics
import logos_delivery/api/reliable_channel_manager_api
import logos_delivery/api/messaging_client_api
import ./events
import ./segmentation/segmentation
import ./scalable_data_sync/scalable_data_sync
import ./rate_limit_manager/rate_limit_manager
import ./encryption/encryption
export
types, send_service, events, segmentation, scalable_data_sync, rate_limit_manager,
encryption
types, send_service, reliable_channel_manager_api, segmentation, scalable_data_sync,
rate_limit_manager, encryption
const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1"
## Wire-format spec marker for the Reliable Channel layer, as defined
@ -44,14 +45,6 @@ const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1"
## on breaking on-the-wire changes; implementations pin one version.
type
SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.
async: (raises: [CatchableError]), gcsafe
.}
## Egress dispatch boundary. Typically wraps `MessagingClient.send`;
## tests inject a fake that records calls and returns canned
## `RequestId`s so the send state machine can be exercised end-to-end
## without a network.
MessagePersistence {.pure.} = enum
Persistent
Ephemeral

View File

@ -15,7 +15,6 @@ import brokers/broker_context
import logos_delivery/api/types
import logos_delivery/api/reliable_channel_manager_api
import logos_delivery/waku/events/message_events as waku_message_events
import logos_delivery/messaging/messaging_client
import logos_delivery/waku/waku_core/topics
import logos_delivery/waku/persistency/sds_persistency

View File

@ -1,15 +0,0 @@
## Core identifier types for the Reliable Channel API.
import std/hashes
import logos_delivery/api/types as api_types
import ./scalable_data_sync/scalable_data_sync
export scalable_data_sync
export api_types
type ChannelId* = SdsChannelID
proc hash*(r: RequestId): Hash =
## Allows `RequestId` to be used as a `Table` key.
hash(string(r))

View File

@ -11,6 +11,8 @@
import results, chronos, chronicles
import logos_delivery/api/logos_delivery_api
export logos_delivery_api
import logos_delivery/waku/waku
export waku
import logos_delivery/messaging/messaging_client
@ -35,7 +37,8 @@ type
messaging*: MessagingClientConf
reliableChannel*: ReliableChannelManagerConf
LogosDelivery* = ref object ## Entry point. Holds one instance of each API layer.
LogosDelivery* = ref object of ILogosDelivery
## Entry point. Holds one instance of each API layer.
waku*: Waku
messagingClient*: MessagingClient
reliableChannelManager*: ReliableChannelManager
@ -79,7 +82,7 @@ proc new*(
)
)
proc start*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
method start*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
## Starts each layer bottom-up: transport first, then messaging, then channels.
if self.waku.isNil():
return err("Waku node is not initialized")
@ -99,7 +102,7 @@ proc start*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
return ok()
proc stop*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
method stop*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
## Stops in reverse order so higher layers drain before their dependencies.
await self.reliableChannelManager.stop()
await self.messagingClient.stop()
@ -109,7 +112,7 @@ proc stop*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
return ok()
proc isOnline*(self: LogosDelivery): Future[Result[bool, string]] {.async.} =
method isOnline*(self: LogosDelivery): Future[Result[bool, string]] {.async.} =
if self.waku.isNil():
return err("Waku node is not initialized")
return ok(self.waku.healthMonitor.onlineMonitor.amIOnline())

View File

@ -6,6 +6,7 @@ import logos_delivery/waku/compat/option_valueor
import std/[tables, sequtils, options, sets]
import chronos, chronicles, libp2p/utility
import brokers/broker_context
import logos_delivery/api/[types, logos_delivery_api, kernel_api, messaging_client_api]
import
logos_delivery/waku/[
waku_core,
@ -13,7 +14,6 @@ import
waku_store/client,
waku_store/common,
waku_filter_v2/client,
events/message_events,
events/health_events,
waku_node,
node/subscription_manager,

View File

@ -5,6 +5,7 @@ import logos_delivery/waku/compat/option_valueor
import std/[sequtils, tables, options, typetraits]
import chronos, chronicles, libp2p/utility
import brokers/broker_context
import logos_delivery/api/messaging_client_api
import
./[send_processor, relay_processor, lightpush_processor, delivery_task],
logos_delivery/waku/[
@ -18,7 +19,6 @@ import
waku_rln_relay/rln_relay,
waku_lightpush/client,
waku_lightpush/callbacks,
events/message_events,
]
logScope:

View File

@ -1,9 +1,5 @@
import
./[
message_events, delivery_events, health_events, peer_events, lifecycle_events,
discovery_events,
]
./[delivery_events, health_events, peer_events, discovery_events]
export
message_events, delivery_events, health_events, peer_events, lifecycle_events,
discovery_events
delivery_events, health_events, peer_events, discovery_events

View File

@ -1,15 +1,10 @@
import brokers/event_broker
import logos_delivery/api/types
import logos_delivery/waku/node/health_monitor/[protocol_health, topic_health]
import logos_delivery/waku/waku_core/topics
from logos_delivery/api/logos_delivery_api import EventConnectionStatusChange
import logos_delivery/waku/node/health_monitor/topic_health
from logos_delivery/waku/waku_core/topics import ContentTopic, PubsubTopic
export protocol_health, topic_health
# Notify health changes to node connectivity
EventBroker:
type EventConnectionStatusChange* = object
connectionStatus*: ConnectionStatus
export topic_health, EventConnectionStatusChange
# Notify health changes to a subscribed topic
# TODO: emit content topic health change events when subscribe/unsubscribe

View File

@ -1,35 +0,0 @@
import brokers/event_broker
import logos_delivery/api/types
import logos_delivery/waku/[waku_core/message, waku_core/topics]
export types
EventBroker:
# Event emitted when a message is sent to the network
type MessageSentEvent* = object
requestId*: RequestId
messageHash*: string
EventBroker:
# Event emitted when a message send operation fails
type MessageErrorEvent* = object
requestId*: RequestId
messageHash*: string
error*: string
EventBroker:
# Confirmation that a message has been correctly delivered to some neighbouring nodes.
type MessagePropagatedEvent* = object
requestId*: RequestId
messageHash*: string
EventBroker:
# Event emitted when a message is received via Waku
type MessageReceivedEvent* = object
messageHash*: string
message*: WakuMessage
EventBroker:
# Internal event emitted when a message arrives from the network via any protocol
type MessageSeenEvent* = object
topic*: PubsubTopic
message*: WakuMessage

View File

@ -9,6 +9,7 @@ import
libp2p/protocols/pubsub,
libp2p/protocols/pubsub/rpc/messages,
logos_delivery/api/types,
logos_delivery/api/logos_delivery_api,
logos_delivery/waku/[
waku_relay,
waku_rln_relay,

View File

@ -1,6 +1,6 @@
import chronos
import logos_delivery/waku/waku_core
from logos_delivery/waku/waku_core/topics import PubsubTopic
type TopicHealth* = enum
UNHEALTHY

View File

@ -2,6 +2,7 @@ import logos_delivery/waku/compat/option_valueor
import std/[sequtils, sets, tables, options], chronos, chronicles, metrics, results
import libp2p/[peerid, peerinfo]
import brokers/broker_context
import logos_delivery/api/kernel_api
import
logos_delivery/waku/[
@ -16,7 +17,6 @@ import
waku_filter_v2/client as filter_client,
waku_filter_v2/protocol as filter_protocol,
events/health_events,
events/message_events,
events/peer_events,
requests/health_requests,
node/peer_manager,

View File

@ -60,9 +60,9 @@ import
requests/node_requests,
requests/health_requests,
events/health_events,
events/message_events,
events/peer_events,
],
logos_delivery/api/kernel_api,
logos_delivery/waku/discovery/waku_kademlia,
logos_delivery/waku/net/[bound_ports, net_config],
./peer_manager,

View File

@ -32,7 +32,6 @@ import
node/waku_node,
node/subscription_manager,
node/peer_manager,
events/message_events,
]
export waku_relay.WakuRelayHandler

View File

@ -1,10 +0,0 @@
# Request utils.
{.push raises: [].}
import libp2p/crypto/crypto, stew/byteutils
proc generateRequestId*(rng: crypto.Rng): string =
var bytes: array[10, byte]
rng.generate(bytes)
return byteutils.toHex(bytes)

View File

@ -4,10 +4,10 @@ import logos_delivery/waku/compat/option_valueor
import std/options, results, chronicles, chronos, metrics, bearssl/rand, stew/byteutils
import libp2p/peerid, libp2p/stream/connection
import logos_delivery/api/types
import
../waku_core/peers,
../node/peer_manager,
../utils/requests,
../waku_core,
./common,
./protocol_metrics,

View File

@ -10,8 +10,8 @@
## that could be used also as a lightpush client, helping testing and development.
import results, chronos, std/options, metrics
import ../waku_core, ./protocol, ./common, ./rpc, ./rpc_codec, ../utils/requests
import logos_delivery/api/types
import ../waku_core, ./protocol, ./common, ./rpc, ./rpc_codec
proc handleSelfLightPushRequest*(
self: WakuLightPush, pubSubTopic: Option[PubsubTopic], message: WakuMessage
): Future[WakuLightPushResult] {.async.} =

View File

@ -4,10 +4,10 @@ import logos_delivery/waku/compat/option_valueor
import std/options, results, chronicles, chronos, metrics, bearssl/rand, stew/byteutils
import libp2p/peerid
import logos_delivery/api/types
import
../waku_core/peers,
../node/peer_manager,
../utils/requests,
../waku_core,
./common,
./protocol_metrics,

View File

@ -10,14 +10,8 @@
## that could be used also as a lightpush client, helping testing and development.
import results, chronos, chronicles, std/options, metrics, stew/byteutils
import
../waku_core,
./protocol,
./common,
./rpc,
./rpc_codec,
./protocol_metrics,
../utils/requests
import logos_delivery/api/types
import ../waku_core, ./protocol, ./common, ./rpc, ./rpc_codec, ./protocol_metrics
proc handleSelfLightPushRequest*(
self: WakuLegacyLightPush, pubSubTopic: PubsubTopic, message: WakuMessage

View File

@ -9,8 +9,8 @@ import
chronos,
metrics,
bearssl/rand
import
../node/peer_manager, ../utils/requests, ./protocol_metrics, ./common, ./rpc_codec
import logos_delivery/api/types
import ../node/peer_manager, ./protocol_metrics, ./common, ./rpc_codec
logScope:
topics = "waku store client"