Merge 8f9ddc80cec1719de64fffd245df6e17612a41f6 into 57ff24760fee77c711acaaea56ff9b9e150f6a27

This commit is contained in:
NagyZoltanPeter 2026-06-27 12:31:32 +02:00 committed by GitHub
commit 0bd2101a1e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
34 changed files with 533 additions and 284 deletions

View File

@ -10,6 +10,7 @@ import
from std/sugar import `=>`
import logos_delivery/waku/compat/option_valueor
import ./tester_message, ./lpt_metrics
type

View File

@ -3,9 +3,9 @@ import chronos, chronicles, results, ffi
import
logos_delivery,
logos_delivery/waku/node/waku_node,
logos_delivery/waku/events/message_events,
logos_delivery/api/messaging_client_api,
logos_delivery/api/types,
logos_delivery/waku/events/[message_events, health_events],
logos_delivery/waku/events/health_events,
tools/confutils/conf_from_json,
../declare_lib,
../json_event
@ -116,7 +116,7 @@ proc logosdelivery_start_node(
chronicles.error "MessageReceivedEvent.listen failed", err = $error
return err("MessageReceivedEvent.listen failed: " & $error)
let ConnectionStatusChangeListener = EventConnectionStatusChange.listen(
discard EventConnectionStatusChange.listen(
ctx.myLib[].waku.brokerCtx,
proc(event: EventConnectionStatusChange) {.async: (raises: []).} =
callEventCallback(ctx, "onConnectionStatusChange"):

View File

@ -0,0 +1,202 @@
import std/options
import chronos, results
import brokers/event_broker
import logos_delivery/api/types as api_types
import logos_delivery/waku/waku_core/topics/pubsub_topic
import logos_delivery/waku/waku_store/common as store_types
export event_broker
export api_types, pubsub_topic, store_types
type IKernel* = ref object of RootObj
EventBroker:
# Internal event emitted when a message arrives from the network via any protocol
type MessageSeenEvent* = object
topic*: PubsubTopic
message*: WakuMessage
# --- topic construction ---
method buildContentTopic*(
self: IKernel, appName: string, appVersion: uint32, name: string, encoding: string
): Future[Result[ContentTopic, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.buildContentTopic not implemented")
method buildPubsubTopic*(
self: IKernel, topicName: string
): Future[Result[PubsubTopic, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.buildPubsubTopic not implemented")
method defaultPubsubTopic*(
self: IKernel
): Future[Result[PubsubTopic, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.defaultPubsubTopic not implemented")
# --- relay ---
method relayPublish*(
self: IKernel, pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32
): Future[Result[int, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.relayPublish not implemented")
method relaySubscribe*(
self: IKernel, pubsubTopic: PubsubTopic
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.relaySubscribe not implemented")
method relayUnsubscribe*(
self: IKernel, pubsubTopic: PubsubTopic
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.relayUnsubscribe not implemented")
method relayAddProtectedShard*(
self: IKernel, clusterId: uint16, shardId: uint16, publicKey: string
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.relayAddProtectedShard not implemented")
method relayConnectedPeers*(
self: IKernel, pubsubTopic: PubsubTopic
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.relayConnectedPeers not implemented")
method relayPeersInMesh*(
self: IKernel, pubsubTopic: PubsubTopic
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.relayPeersInMesh not implemented")
# --- filter ---
method filterSubscribe*(
self: IKernel,
pubsubTopic: Option[PubsubTopic],
contentTopics: seq[ContentTopic],
peer: string,
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.filterSubscribe not implemented")
method filterUnsubscribe*(
self: IKernel,
pubsubTopic: Option[PubsubTopic],
contentTopics: seq[ContentTopic],
peer: string,
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.filterUnsubscribe not implemented")
method filterUnsubscribeAll*(
self: IKernel, peer: string
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.filterUnsubscribeAll not implemented")
# --- lightpush ---
method lightpushPublish*(
self: IKernel, pubsubTopic: PubsubTopic, message: WakuMessage, peer: string
): Future[Result[string, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.lightpushPublish not implemented")
# --- store ---
method storeQuery*(
self: IKernel, request: StoreQueryRequest, peer: string, timeoutMs: int
): Future[Result[StoreQueryResponse, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.storeQuery not implemented")
# --- peer management ---
method connect*(
self: IKernel, peers: seq[string], timeoutMs: uint32
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.connect not implemented")
method disconnectPeerById*(
self: IKernel, peerId: string
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.disconnectPeerById not implemented")
method disconnectAllPeers*(
self: IKernel
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.disconnectAllPeers not implemented")
method dialPeer*(
self: IKernel, peerAddr: string, protocol: string, timeoutMs: int
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.dialPeer not implemented")
method dialPeerById*(
self: IKernel, peerId: string, protocol: string, timeoutMs: int
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.dialPeerById not implemented")
method peerIdsFromPeerstore*(
self: IKernel
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.peerIdsFromPeerstore not implemented")
method connectedPeersInfo*(
self: IKernel
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.connectedPeersInfo not implemented")
method connectedPeers*(
self: IKernel
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.connectedPeers not implemented")
method peerIdsByProtocol*(
self: IKernel, protocol: string
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.peerIdsByProtocol not implemented")
# --- discovery ---
method dnsDiscovery*(
self: IKernel, enrTreeUrl: string, nameServer: string, timeoutMs: int
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.dnsDiscovery not implemented")
method discv5UpdateBootnodes*(
self: IKernel, bootnodes: seq[string]
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.discv5UpdateBootnodes not implemented")
method startDiscv5*(
self: IKernel
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.startDiscv5 not implemented")
method stopDiscv5*(
self: IKernel
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.stopDiscv5 not implemented")
method peerExchangeRequest*(
self: IKernel, numPeers: uint64
): Future[Result[int, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.peerExchangeRequest not implemented")
# --- debug / info ---
method version*(
self: IKernel
): Future[Result[string, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.version not implemented")
method listenAddresses*(
self: IKernel
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.listenAddresses not implemented")
method myEnr*(
self: IKernel
): Future[Result[string, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.myEnr not implemented")
method myPeerId*(
self: IKernel
): Future[Result[string, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.myPeerId not implemented")
method metrics*(
self: IKernel
): Future[Result[string, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.metrics not implemented")
method pingPeer*(
self: IKernel, peerAddr: string, timeoutMs: int
): Future[Result[int64, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.pingPeer not implemented")

View File

@ -0,0 +1,33 @@
## `LogosDelivery` is the project entry point. It is a pure concentrator: it
## owns exactly one instance of each API layer
##
## Waku <- MessagingClient <- ReliableChannelManager
##
## and chains them together (each layer drives the one below it). Every layer
## keeps its own, separate public API — `LogosDelivery` only wires them up and
## drives the shared `new` / `start` / `stop` lifecycle.
{.push raises: [].}
import results, chronos
import brokers/event_broker
import types as api_types
export api_types, event_broker
type
## Entry point. Holds one instance of each API layer.
ILogosDelivery* = ref object of RootObj
EventBroker:
type EventConnectionStatusChange* = object
connectionStatus*: ConnectionStatus
method start*(self: ILogosDelivery): Future[Result[void, string]] {.async, base.} =
return err("ILogosDelivery.start not implemented")
method stop*(self: ILogosDelivery): Future[Result[void, string]] {.async, base.} =
return err("ILogosDelivery.stop not implemented")
method isOnline*(self: ILogosDelivery): Future[Result[bool, string]] {.async, base.} =
return err("ILogosDelivery.isOnline not implemented")

View File

@ -0,0 +1,48 @@
import chronos, results
import brokers/event_broker
import logos_delivery/api/types as api_types
export event_broker, api_types
type IMessagingClient* = ref object of RootObj
EventBroker:
# Event emitted when a message is sent to the network
type MessageSentEvent* = object
requestId*: RequestId
messageHash*: string
EventBroker:
# Event emitted when a message send operation fails
type MessageErrorEvent* = object
requestId*: RequestId
messageHash*: string
error*: string
EventBroker:
# Confirmation that a message has been correctly delivered to some neighbouring nodes.
type MessagePropagatedEvent* = object
requestId*: RequestId
messageHash*: string
EventBroker:
# Event emitted when a message is received via Waku
type MessageReceivedEvent* = object
messageHash*: string
message*: WakuMessage
method subscribe*(
self: IMessagingClient, contentTopic: ContentTopic
): Future[Result[void, string]] {.async: (raises: []), base.} =
return err("Interface IMessagingClient.subscribe not implemented")
method unsubscribe*(
self: IMessagingClient, contentTopic: ContentTopic
): Result[void, string] {.base, raises: [].} =
return err("Interface IMessagingClient.unsubscribe not implemented")
method send*(
self: IMessagingClient, envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async: (raises: []), base.} =
return err("Interface IMessagingClient.send not implemented")

View File

@ -0,0 +1,62 @@
import chronos, results
import brokers/event_broker
import logos_delivery/api/types as api_types
export event_broker, api_types
type
IReliableChannelManager* = ref object of RootObj
SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.
async: (raises: [CatchableError]), gcsafe
.}
## Egress dispatch boundary. Typically wraps `MessagingClient.send`;
## tests inject a fake that records calls and returns canned
## `RequestId`s so the send state machine can be exercised end-to-end
## without a network.
EventBroker:
type ChannelMessageReceivedEvent* = object
channelId*: ChannelId
senderId*: SdsParticipantID
payload*: seq[byte]
EventBroker:
## Emitted when every segment of a channel-level `send()` reached
## `Confirmed`. Channel-level analogue of `MessageSentEvent`; the
## `requestId` is the channel-layer parent returned by `send()`.
type ChannelMessageSentEvent* = object
channelId*: ChannelId
requestId*: RequestId
EventBroker:
## Emitted when a channel-level `send()` finalises with at least one
## segment in `Failed`. Channel-level analogue of `MessageErrorEvent`.
type ChannelMessageErrorEvent* = object
channelId*: ChannelId
requestId*: RequestId
error*: string
method createReliableChannel*(
self: IReliableChannelManager,
channelId: ChannelId,
contentTopic: ContentTopic,
senderId: SdsParticipantID,
sendHandler: SendHandler = nil,
): Result[ChannelId, string] {.base.} =
return err("Interface IReliableChannelManager.createReliableChannel not implemented")
method closeChannel*(
self: IReliableChannelManager, channelId: ChannelId
): Future[Result[void, string]] {.async: (raises: []), base.} =
return err("Interface IReliableChannelManager.closeChannel not implemented")
method send*(
self: IReliableChannelManager,
channelId: ChannelId,
appPayload: seq[byte],
ephemeral: bool = false,
): Future[Result[RequestId, string]] {.async: (raises: []), base.} =
return err("Interface IReliableChannelManager.send not implemented")

View File

@ -1,14 +1,19 @@
import logos_delivery/waku/compat/option_valueor
import libp2p/crypto/crypto
{.push raises: [].}
import std/hashes
import bearssl/rand, std/times, chronos
import stew/byteutils
import logos_delivery/waku/utils/requests as request_utils
import libp2p/crypto/crypto
import logos_delivery/waku/compat/option_valueor
import logos_delivery/waku/waku_core/[topics/content_topic, message/message, time]
export content_topic, message
import types/sds_message_id
export sds_message_id
type
MessageEnvelope* = object
contentTopic*: ContentTopic
@ -26,9 +31,16 @@ type
PartiallyConnected
Connected
ChannelId* = SdsChannelID
proc generateRequestId*(rng: crypto.Rng): string =
var bytes: array[10, byte]
rng.generate(bytes)
return byteutils.toHex(bytes)
proc new*(T: typedesc[RequestId], rng: crypto.Rng): T =
## Generate a new RequestId using the provided RNG.
RequestId(request_utils.generateRequestId(rng))
RequestId(generateRequestId(rng))
proc `$`*(r: RequestId): string {.inline.} =
string(r)
@ -36,6 +48,10 @@ proc `$`*(r: RequestId): string {.inline.} =
proc `==`*(a, b: RequestId): bool {.inline.} =
string(a) == string(b)
proc hash*(r: RequestId): Hash =
## Allows `RequestId` to be used as a `Table` key.
hash(string(r))
proc init*(
T: type MessageEnvelope,
contentTopic: ContentTopic,

View File

@ -1,39 +0,0 @@
## Reliable Channel event types emitted to API consumers.
##
## Lifecycle events for individual segments (sent / propagated / errored)
## are the same as the network-level ones the MessagingClient already
## emits — `requestId` is shared across layers — so we just re-export
## `waku/events/message_events` and avoid declaring duplicates.
##
## Only the channel-level `MessageReceivedEvent` carries data that has
## no analogue in the lower layer (reassembled application payload,
## senderId, channelId), so it lives here.
import logos_delivery/waku/events/message_events as waku_message_events
import brokers/event_broker
import ./types as channel_types
export waku_message_events, channel_types, event_broker
EventBroker:
type ChannelMessageReceivedEvent* = object
channelId*: ChannelId
senderId*: SdsParticipantID
payload*: seq[byte]
EventBroker:
## Emitted when every segment of a channel-level `send()` reached
## `Confirmed`. Channel-level analogue of `MessageSentEvent`; the
## `requestId` is the channel-layer parent returned by `send()`.
type ChannelMessageSentEvent* = object
channelId*: ChannelId
requestId*: RequestId
EventBroker:
## Emitted when a channel-level `send()` finalises with at least one
## segment in `Failed`. Channel-level analogue of `MessageErrorEvent`.
type ChannelMessageErrorEvent* = object
channelId*: ChannelId
requestId*: RequestId
error*: string

View File

@ -24,16 +24,17 @@ import libp2p/crypto/crypto as libp2p_crypto
import logos_delivery/api/types
import logos_delivery/messaging/delivery_service/send_service
import logos_delivery/waku/waku_core/topics
import logos_delivery/api/reliable_channel_manager_api
import logos_delivery/api/messaging_client_api
import ./events
import ./segmentation/segmentation
import ./scalable_data_sync/scalable_data_sync
import ./rate_limit_manager/rate_limit_manager
import ./encryption/encryption
export
types, send_service, events, segmentation, scalable_data_sync, rate_limit_manager,
encryption
types, send_service, reliable_channel_manager_api, segmentation, scalable_data_sync,
rate_limit_manager, encryption
const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1"
## Wire-format spec marker for the Reliable Channel layer, as defined
@ -44,14 +45,6 @@ const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1"
## on breaking on-the-wire changes; implementations pin one version.
type
SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.
async: (raises: [CatchableError]), gcsafe
.}
## Egress dispatch boundary. Typically wraps `MessagingClient.send`;
## tests inject a fake that records calls and returns canned
## `RequestId`s so the send state machine can be exercised end-to-end
## without a network.
MessagePersistence {.pure.} = enum
Persistent
Ephemeral

View File

@ -13,9 +13,9 @@ import stew/byteutils
import brokers/broker_context
import logos_delivery/waku/events/message_events as waku_message_events
import logos_delivery/messaging/messaging_client
import logos_delivery/api/types
import logos_delivery/api/reliable_channel_manager_api
import logos_delivery/messaging/messaging_client
import logos_delivery/waku/waku_core/topics
import logos_delivery/waku/persistency/sds_persistency
@ -34,7 +34,7 @@ type
## channel API. Placeholder for now (segmentation / SDS / rate-limit defaults
## will move here in a follow-up PR); kept so each layer owns its own config.
ReliableChannelManager* = ref object
ReliableChannelManager* = ref object of IReliableChannelManager
channels: Table[ChannelId, ReliableChannel]
messagingClient: MessagingClient ## The channel layer chains onto messaging.
sendHandler: SendHandler
@ -94,13 +94,13 @@ proc sdsPersistence(): Option[Persistence] =
return none(Persistence)
return some(newSdsPersistence(job))
proc createReliableChannel*(
method createReliableChannel*(
self: ReliableChannelManager,
channelId: ChannelId,
contentTopic: ContentTopic,
senderId: SdsParticipantID,
sendHandler: SendHandler = nil,
): Result[ChannelId, string] =
): Result[ChannelId, string] {.raises: [].} =
## Spec entry point. The `sendHandler` and `rng` the channel needs are
## sourced from the owning `ReliableChannelManager` rather than passed
## per call. Encryption is wired up through the `Encrypt`/`Decrypt`
@ -146,7 +146,7 @@ proc createReliableChannel*(
self.channels[channelId] = chn
return ok(channelId)
proc closeChannel*(
method closeChannel*(
self: ReliableChannelManager, channelId: ChannelId
): Future[Result[void, string]] {.async: (raises: []).} =
## Stops the channel's SDS loops and releases the channel. Persisted SDS
@ -158,7 +158,7 @@ proc closeChannel*(
await chn.stop()
return ok()
proc send*(
method send*(
self: ReliableChannelManager,
channelId: ChannelId,
appPayload: seq[byte],

View File

@ -1,15 +0,0 @@
## Core identifier types for the Reliable Channel API.
import std/hashes
import logos_delivery/api/types as api_types
import ./scalable_data_sync/scalable_data_sync
export scalable_data_sync
export api_types
type ChannelId* = SdsChannelID
proc hash*(r: RequestId): Hash =
## Allows `RequestId` to be used as a `Table` key.
hash(string(r))

View File

@ -11,6 +11,8 @@
import results, chronos, chronicles
import logos_delivery/api/logos_delivery_api
export logos_delivery_api
import logos_delivery/waku/waku
export waku
import logos_delivery/messaging/messaging_client
@ -35,7 +37,8 @@ type
messaging*: MessagingClientConf
reliableChannel*: ReliableChannelManagerConf
LogosDelivery* = ref object ## Entry point. Holds one instance of each API layer.
LogosDelivery* = ref object of ILogosDelivery
## Entry point. Holds one instance of each API layer.
waku*: Waku
messagingClient*: MessagingClient
reliableChannelManager*: ReliableChannelManager
@ -79,7 +82,7 @@ proc new*(
)
)
proc start*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
method start*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
## Starts each layer bottom-up: transport first, then messaging, then channels.
if self.waku.isNil():
return err("Waku node is not initialized")
@ -99,7 +102,7 @@ proc start*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
return ok()
proc stop*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
method stop*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
## Stops in reverse order so higher layers drain before their dependencies.
await self.reliableChannelManager.stop()
await self.messagingClient.stop()
@ -109,7 +112,7 @@ proc stop*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
return ok()
proc isOnline*(self: LogosDelivery): Future[Result[bool, string]] {.async.} =
method isOnline*(self: LogosDelivery): Future[Result[bool, string]] {.async.} =
if self.waku.isNil():
return err("Waku node is not initialized")
return ok(self.waku.healthMonitor.onlineMonitor.amIOnline())

View File

@ -6,6 +6,7 @@ import logos_delivery/waku/compat/option_valueor
import std/[tables, sequtils, options, sets]
import chronos, chronicles, libp2p/utility
import brokers/broker_context
import logos_delivery/api/[types, logos_delivery_api, kernel_api, messaging_client_api]
import
logos_delivery/waku/[
waku_core,
@ -13,7 +14,6 @@ import
waku_store/client,
waku_store/common,
waku_filter_v2/client,
events/message_events,
events/health_events,
waku_node,
node/subscription_manager,

View File

@ -5,6 +5,7 @@ import logos_delivery/waku/compat/option_valueor
import std/[sequtils, tables, options, typetraits]
import chronos, chronicles, libp2p/utility
import brokers/broker_context
import logos_delivery/api/messaging_client_api
import
./[send_processor, relay_processor, lightpush_processor, delivery_task],
logos_delivery/waku/[
@ -18,7 +19,6 @@ import
rln/rln,
waku_lightpush/client,
waku_lightpush/callbacks,
events/message_events,
]
logScope:

View File

@ -2,6 +2,7 @@ import results, chronos
import chronicles
import
logos_delivery/api/types,
logos_delivery/api/messaging_client_api,
logos_delivery/waku/node/[waku_node, subscription_manager],
logos_delivery/messaging/delivery_service/[recv_service, send_service],
logos_delivery/messaging/delivery_service/send_service/delivery_task
@ -13,7 +14,7 @@ type
## follow-up PR. Today it only carries the p2p reliability toggle.
useP2PReliability*: bool
MessagingClient* = ref object
MessagingClient* = ref object of IMessagingClient
node: WakuNode
sendService*: SendService
recvService*: RecvService
@ -46,26 +47,25 @@ proc stop*(self: MessagingClient) {.async.} =
proc checkApiAvailability(self: MessagingClient): Result[void, string] =
if self.isNil():
return err("MessagingClient is not initialized")
return ok()
proc subscribe*(
method subscribe*(
self: MessagingClient, contentTopic: ContentTopic
): Future[Result[void, string]] {.async.} =
): Future[Result[void, string]] {.async: (raises: []).} =
?checkApiAvailability(self)
return self.node.subscriptionManager.subscribe(contentTopic)
proc unsubscribe*(
method unsubscribe*(
self: MessagingClient, contentTopic: ContentTopic
): Result[void, string] =
): Result[void, string] {.raises: [].} =
?checkApiAvailability(self)
return self.node.subscriptionManager.unsubscribe(contentTopic)
proc send*(
method send*(
self: MessagingClient, envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async.} =
): Future[Result[RequestId, string]] {.async: (raises: []).} =
## High-level messaging API send. Auto-subscribes to the content topic
## (so the local node sees its own gossipsub broadcast), builds a
## `DeliveryTask`, and hands it to the send service. Returns the request

View File

@ -1,9 +1,3 @@
import
./[
message_events, delivery_events, health_events, peer_events, lifecycle_events,
discovery_events,
]
import ./[delivery_events, health_events, peer_events, discovery_events]
export
message_events, delivery_events, health_events, peer_events, lifecycle_events,
discovery_events
export delivery_events, health_events, peer_events, discovery_events

View File

@ -1,15 +1,10 @@
import brokers/event_broker
import logos_delivery/api/types
import logos_delivery/waku/node/health_monitor/[protocol_health, topic_health]
import logos_delivery/waku/waku_core/topics
from logos_delivery/api/logos_delivery_api import EventConnectionStatusChange
import logos_delivery/waku/node/health_monitor/topic_health
from logos_delivery/waku/waku_core/topics import ContentTopic, PubsubTopic
export protocol_health, topic_health
# Notify health changes to node connectivity
EventBroker:
type EventConnectionStatusChange* = object
connectionStatus*: ConnectionStatus
export topic_health, EventConnectionStatusChange
# Notify health changes to a subscribed topic
# TODO: emit content topic health change events when subscribe/unsubscribe

View File

@ -1,35 +0,0 @@
import brokers/event_broker
import logos_delivery/api/types
import logos_delivery/waku/[waku_core/message, waku_core/topics]
export types
EventBroker:
# Event emitted when a message is sent to the network
type MessageSentEvent* = object
requestId*: RequestId
messageHash*: string
EventBroker:
# Event emitted when a message send operation fails
type MessageErrorEvent* = object
requestId*: RequestId
messageHash*: string
error*: string
EventBroker:
# Confirmation that a message has been correctly delivered to some neighbouring nodes.
type MessagePropagatedEvent* = object
requestId*: RequestId
messageHash*: string
EventBroker:
# Event emitted when a message is received via Waku
type MessageReceivedEvent* = object
messageHash*: string
message*: WakuMessage
EventBroker:
# Internal event emitted when a message arrives from the network via any protocol
type MessageSeenEvent* = object
topic*: PubsubTopic
message*: WakuMessage

View File

@ -9,6 +9,7 @@ import
libp2p/protocols/pubsub,
libp2p/protocols/pubsub/rpc/messages,
logos_delivery/api/types,
logos_delivery/api/logos_delivery_api,
logos_delivery/waku/[
waku_relay,
rln,

View File

@ -1,6 +1,6 @@
import chronos
import logos_delivery/waku/waku_core
from logos_delivery/waku/waku_core/topics import PubsubTopic
type TopicHealth* = enum
UNHEALTHY

View File

@ -2,6 +2,7 @@ import logos_delivery/waku/compat/option_valueor
import std/[sequtils, sets, tables, options], chronos, chronicles, metrics, results
import libp2p/[peerid, peerinfo]
import brokers/broker_context
import logos_delivery/api/kernel_api
import
logos_delivery/waku/[
@ -16,7 +17,6 @@ import
waku_filter_v2/client as filter_client,
waku_filter_v2/protocol as filter_protocol,
events/health_events,
events/message_events,
events/peer_events,
requests/health_requests,
node/peer_manager,

View File

@ -60,9 +60,9 @@ import
requests/node_requests,
requests/health_requests,
events/health_events,
events/message_events,
events/peer_events,
],
logos_delivery/api/kernel_api,
logos_delivery/waku/discovery/waku_kademlia,
logos_delivery/waku/net/[bound_ports, net_config],
./peer_manager,

View File

@ -33,7 +33,6 @@ import
node/waku_node,
node/subscription_manager,
node/peer_manager,
events/message_events,
]
export waku_relay.WakuRelayHandler

View File

@ -1,10 +0,0 @@
# Request utils.
{.push raises: [].}
import libp2p/crypto/crypto, stew/byteutils
proc generateRequestId*(rng: crypto.Rng): string =
var bytes: array[10, byte]
rng.generate(bytes)
return byteutils.toHex(bytes)

View File

@ -22,6 +22,7 @@ import
metrics/chronos_httpserver,
brokers/broker_context,
logos_delivery/api/types,
logos_delivery/api/kernel_api,
logos_delivery/waku/[
waku_core,
waku_node,
@ -65,7 +66,7 @@ const git_version* {.strdefine.} = "n/a"
const FilterOpTimeout = 5.seconds
type Waku* = ref object
type Waku* = ref object of IKernel
stateInfo*: WakuStateInfo
conf*: WakuConf
rng*: crypto.Rng
@ -577,29 +578,31 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
## Kernel API realization
##
# --- topic construction ---
proc buildContentTopic*(
method buildContentTopic*(
self: Waku, appName: string, appVersion: uint32, name: string, encoding: string
): Future[Result[ContentTopic, string]] {.async.} =
): Future[Result[ContentTopic, string]] {.async: (raises: []).} =
try:
return ok(ContentTopic(fmt"/{appName}/{appVersion}/{name}/{encoding}"))
except CatchableError as e:
return err(e.msg)
proc buildPubsubTopic*(
method buildPubsubTopic*(
self: Waku, topicName: string
): Future[Result[PubsubTopic, string]] {.async.} =
): Future[Result[PubsubTopic, string]] {.async: (raises: []).} =
try:
return ok(PubsubTopic(fmt"/waku/2/{topicName}"))
except CatchableError as e:
return err(e.msg)
proc defaultPubsubTopic*(self: Waku): Future[Result[PubsubTopic, string]] {.async.} =
method defaultPubsubTopic*(
self: Waku
): Future[Result[PubsubTopic, string]] {.async: (raises: []).} =
return ok(DefaultPubsubTopic)
# --- relay ---
proc relayPublish*(
method relayPublish*(
self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32
): Future[Result[int, string]] {.async.} =
): Future[Result[int, string]] {.async: (raises: []).} =
try:
if self.node.wakuRelay.isNil():
return err("relayPublish: WakuRelay not mounted")
@ -611,9 +614,9 @@ proc relayPublish*(
except CatchableError as e:
return err(e.msg)
proc relaySubscribe*(
method relaySubscribe*(
self: Waku, pubsubTopic: PubsubTopic
): Future[Result[bool, string]] {.async.} =
): Future[Result[bool, string]] {.async: (raises: []).} =
try:
if self.node.wakuRelay.isNil():
return err("relaySubscribe: WakuRelay not mounted")
@ -627,9 +630,9 @@ proc relaySubscribe*(
except CatchableError as e:
return err(e.msg)
proc relayUnsubscribe*(
method relayUnsubscribe*(
self: Waku, pubsubTopic: PubsubTopic
): Future[Result[bool, string]] {.async.} =
): Future[Result[bool, string]] {.async: (raises: []).} =
try:
if self.node.wakuRelay.isNil():
return err("relayUnsubscribe: WakuRelay not mounted")
@ -641,9 +644,9 @@ proc relayUnsubscribe*(
except CatchableError as e:
return err(e.msg)
proc relayAddProtectedShard*(
method relayAddProtectedShard*(
self: Waku, clusterId: uint16, shardId: uint16, publicKey: string
): Future[Result[bool, string]] {.async.} =
): Future[Result[bool, string]] {.async: (raises: []).} =
try:
if self.node.wakuRelay.isNil():
return err("relayAddProtectedShard: WakuRelay not mounted")
@ -657,9 +660,9 @@ proc relayAddProtectedShard*(
except CatchableError as e:
return err(e.msg)
proc relayConnectedPeers*(
method relayConnectedPeers*(
self: Waku, pubsubTopic: PubsubTopic
): Future[Result[seq[string], string]] {.async.} =
): Future[Result[seq[string], string]] {.async: (raises: []).} =
try:
if self.node.wakuRelay.isNil():
return err("relayConnectedPeers: WakuRelay not mounted")
@ -671,9 +674,9 @@ proc relayConnectedPeers*(
except CatchableError as e:
return err(e.msg)
proc relayPeersInMesh*(
method relayPeersInMesh*(
self: Waku, pubsubTopic: PubsubTopic
): Future[Result[seq[string], string]] {.async.} =
): Future[Result[seq[string], string]] {.async: (raises: []).} =
try:
if self.node.wakuRelay.isNil():
return err("relayPeersInMesh: WakuRelay not mounted")
@ -686,12 +689,12 @@ proc relayPeersInMesh*(
return err(e.msg)
# --- filter ---
proc filterSubscribe*(
method filterSubscribe*(
self: Waku,
pubsubTopic: Option[PubsubTopic],
contentTopics: seq[ContentTopic],
peer: string,
): Future[Result[bool, string]] {.async.} =
): Future[Result[bool, string]] {.async: (raises: []).} =
try:
if self.node.wakuFilterClient.isNil():
return err("wakuFilterClient is not mounted")
@ -706,12 +709,12 @@ proc filterSubscribe*(
except CatchableError as e:
return err(e.msg)
proc filterUnsubscribe*(
method filterUnsubscribe*(
self: Waku,
pubsubTopic: Option[PubsubTopic],
contentTopics: seq[ContentTopic],
peer: string,
): Future[Result[bool, string]] {.async.} =
): Future[Result[bool, string]] {.async: (raises: []).} =
try:
if self.node.wakuFilterClient.isNil():
return err("wakuFilterClient is not mounted")
@ -726,9 +729,9 @@ proc filterUnsubscribe*(
except CatchableError as e:
return err(e.msg)
proc filterUnsubscribeAll*(
method filterUnsubscribeAll*(
self: Waku, peer: string
): Future[Result[bool, string]] {.async.} =
): Future[Result[bool, string]] {.async: (raises: []).} =
try:
if self.node.wakuFilterClient.isNil():
return err("wakuFilterClient is not mounted")
@ -744,9 +747,9 @@ proc filterUnsubscribeAll*(
return err(e.msg)
# --- lightpush ---
proc lightpushPublish*(
method lightpushPublish*(
self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, peer: string
): Future[Result[string, string]] {.async.} =
): Future[Result[string, string]] {.async: (raises: []).} =
try:
if self.node.wakuLegacyLightpushClient.isNil():
return err("wakuLegacyLightpushClient is not mounted")
@ -766,9 +769,9 @@ proc lightpushPublish*(
return err(e.msg)
# --- store ---
proc storeQuery*(
method storeQuery*(
self: Waku, request: StoreQueryRequest, peer: string, timeoutMs: int
): Future[Result[StoreQueryResponse, string]] {.async.} =
): Future[Result[StoreQueryResponse, string]] {.async: (raises: []).} =
try:
if self.node.wakuStoreClient.isNil():
return err("wakuStoreClient is not mounted")
@ -788,18 +791,18 @@ proc storeQuery*(
return err(e.msg)
# --- peer management ---
proc connect*(
method connect*(
self: Waku, peers: seq[string], timeoutMs: uint32
): Future[Result[bool, string]] {.async.} =
): Future[Result[bool, string]] {.async: (raises: []).} =
try:
await self.node.connectToNodes(peers.mapIt(strip(it)), source = "static")
return ok(true)
except CatchableError as e:
return err(e.msg)
proc disconnectPeerById*(
method disconnectPeerById*(
self: Waku, peerId: string
): Future[Result[bool, string]] {.async.} =
): Future[Result[bool, string]] {.async: (raises: []).} =
try:
let pId = PeerId.init(peerId).valueOr:
return err($error)
@ -808,16 +811,18 @@ proc disconnectPeerById*(
except CatchableError as e:
return err(e.msg)
proc disconnectAllPeers*(self: Waku): Future[Result[bool, string]] {.async.} =
method disconnectAllPeers*(
self: Waku
): Future[Result[bool, string]] {.async: (raises: []).} =
try:
await self.node.peerManager.disconnectAllPeers()
return ok(true)
except CatchableError as e:
return err(e.msg)
proc dialPeer*(
method dialPeer*(
self: Waku, peerAddr: string, protocol: string, timeoutMs: int
): Future[Result[bool, string]] {.async.} =
): Future[Result[bool, string]] {.async: (raises: []).} =
try:
let remotePeerInfo = parsePeerInfo(peerAddr).valueOr:
return err($error)
@ -828,9 +833,9 @@ proc dialPeer*(
except CatchableError as e:
return err(e.msg)
proc dialPeerById*(
method dialPeerById*(
self: Waku, peerId: string, protocol: string, timeoutMs: int
): Future[Result[bool, string]] {.async.} =
): Future[Result[bool, string]] {.async: (raises: []).} =
try:
let pId = PeerId.init(peerId).valueOr:
return err($error)
@ -841,13 +846,17 @@ proc dialPeerById*(
except CatchableError as e:
return err(e.msg)
proc peerIdsFromPeerstore*(self: Waku): Future[Result[seq[string], string]] {.async.} =
method peerIdsFromPeerstore*(
self: Waku
): Future[Result[seq[string], string]] {.async: (raises: []).} =
try:
return ok(self.node.peerManager.switch.peerStore.peers().mapIt($it.peerId))
except CatchableError as e:
return err(e.msg)
proc connectedPeersInfo*(self: Waku): Future[Result[seq[string], string]] {.async.} =
method connectedPeersInfo*(
self: Waku
): Future[Result[seq[string], string]] {.async: (raises: []).} =
try:
return ok(
self.node.peerManager.switch.peerStore
@ -858,16 +867,18 @@ proc connectedPeersInfo*(self: Waku): Future[Result[seq[string], string]] {.asyn
except CatchableError as e:
return err(e.msg)
proc connectedPeers*(self: Waku): Future[Result[seq[string], string]] {.async.} =
method connectedPeers*(
self: Waku
): Future[Result[seq[string], string]] {.async: (raises: []).} =
try:
let (inPeerIds, outPeerIds) = self.node.peerManager.connectedPeers()
return ok(concat(inPeerIds, outPeerIds).mapIt($it))
except CatchableError as e:
return err(e.msg)
proc peerIdsByProtocol*(
method peerIdsByProtocol*(
self: Waku, protocol: string
): Future[Result[seq[string], string]] {.async.} =
): Future[Result[seq[string], string]] {.async: (raises: []).} =
try:
return ok(
self.node.peerManager.switch.peerStore
@ -879,9 +890,9 @@ proc peerIdsByProtocol*(
return err(e.msg)
# --- discovery ---
proc dnsDiscovery*(
method dnsDiscovery*(
self: Waku, enrTreeUrl: string, nameServer: string, timeoutMs: int
): Future[Result[seq[string], string]] {.async.} =
): Future[Result[seq[string], string]] {.async: (raises: []).} =
try:
let dnsNameServers = @[parseIpAddress(nameServer)]
let discoveredPeers = (
@ -898,9 +909,9 @@ proc dnsDiscovery*(
except CatchableError as e:
return err(e.msg)
proc discv5UpdateBootnodes*(
method discv5UpdateBootnodes*(
self: Waku, bootnodes: seq[string]
): Future[Result[bool, string]] {.async.} =
): Future[Result[bool, string]] {.async: (raises: []).} =
try:
if self.wakuDiscv5.isNil():
return err("discv5 not started")
@ -911,7 +922,7 @@ proc discv5UpdateBootnodes*(
except CatchableError as e:
return err(e.msg)
proc startDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} =
method startDiscv5*(self: Waku): Future[Result[bool, string]] {.async: (raises: []).} =
try:
if self.wakuDiscv5.isNil():
return err("discv5 not started")
@ -921,7 +932,7 @@ proc startDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} =
except CatchableError as e:
return err(e.msg)
proc stopDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} =
method stopDiscv5*(self: Waku): Future[Result[bool, string]] {.async: (raises: []).} =
try:
if self.wakuDiscv5.isNil():
return err("discv5 not started")
@ -930,9 +941,9 @@ proc stopDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} =
except CatchableError as e:
return err(e.msg)
proc peerExchangeRequest*(
method peerExchangeRequest*(
self: Waku, numPeers: uint64
): Future[Result[int, string]] {.async.} =
): Future[Result[int, string]] {.async: (raises: []).} =
try:
let numPeersRecv = (await self.node.fetchPeerExchangePeers(numPeers)).valueOr:
return err("failed peer exchange: " & $error)
@ -941,37 +952,39 @@ proc peerExchangeRequest*(
return err(e.msg)
# --- debug / info ---
proc version*(self: Waku): Future[Result[string, string]] {.async.} =
method version*(self: Waku): Future[Result[string, string]] {.async: (raises: []).} =
return ok(WakuNodeVersionString)
proc listenAddresses*(self: Waku): Future[Result[seq[string], string]] {.async.} =
method listenAddresses*(
self: Waku
): Future[Result[seq[string], string]] {.async: (raises: []).} =
try:
return ok(self.node.info().listenAddresses)
except CatchableError as e:
return err(e.msg)
proc myEnr*(self: Waku): Future[Result[string, string]] {.async.} =
method myEnr*(self: Waku): Future[Result[string, string]] {.async: (raises: []).} =
try:
return ok(self.node.enr.toURI())
except CatchableError as e:
return err(e.msg)
proc myPeerId*(self: Waku): Future[Result[string, string]] {.async.} =
method myPeerId*(self: Waku): Future[Result[string, string]] {.async: (raises: []).} =
try:
return ok($self.node.peerId())
except CatchableError as e:
return err(e.msg)
proc metrics*(self: Waku): Future[Result[string, string]] {.async.} =
method metrics*(self: Waku): Future[Result[string, string]] {.async: (raises: []).} =
{.gcsafe.}:
try:
return ok(defaultRegistry.toText())
except CatchableError as e:
return err(e.msg)
proc pingPeer*(
method pingPeer*(
self: Waku, peerAddr: string, timeoutMs: int
): Future[Result[int64, string]] {.async.} =
): Future[Result[int64, string]] {.async: (raises: []).} =
try:
let peerInfo = parsePeerInfo(peerAddr).valueOr:
return err("pingPeer failed to parse peer addr: " & $error)

View File

@ -4,10 +4,10 @@ import logos_delivery/waku/compat/option_valueor
import std/options, results, chronicles, chronos, metrics, bearssl/rand, stew/byteutils
import libp2p/peerid, libp2p/stream/connection
import logos_delivery/api/types
import
../waku_core/peers,
../node/peer_manager,
../utils/requests,
../waku_core,
./common,
./protocol_metrics,

View File

@ -10,8 +10,8 @@
## that could be used also as a lightpush client, helping testing and development.
import results, chronos, std/options, metrics
import ../waku_core, ./protocol, ./common, ./rpc, ./rpc_codec, ../utils/requests
import logos_delivery/api/types
import ../waku_core, ./protocol, ./common, ./rpc, ./rpc_codec
proc handleSelfLightPushRequest*(
self: WakuLightPush, pubSubTopic: Option[PubsubTopic], message: WakuMessage
): Future[WakuLightPushResult] {.async.} =

View File

@ -4,10 +4,10 @@ import logos_delivery/waku/compat/option_valueor
import std/options, results, chronicles, chronos, metrics, bearssl/rand, stew/byteutils
import libp2p/peerid
import logos_delivery/api/types
import
../waku_core/peers,
../node/peer_manager,
../utils/requests,
../waku_core,
./common,
./protocol_metrics,

View File

@ -10,14 +10,8 @@
## that could be used also as a lightpush client, helping testing and development.
import results, chronos, chronicles, std/options, metrics, stew/byteutils
import
../waku_core,
./protocol,
./common,
./rpc,
./rpc_codec,
./protocol_metrics,
../utils/requests
import logos_delivery/api/types
import ../waku_core, ./protocol, ./common, ./rpc, ./rpc_codec, ./protocol_metrics
proc handleSelfLightPushRequest*(
self: WakuLegacyLightPush, pubSubTopic: PubsubTopic, message: WakuMessage

View File

@ -9,8 +9,8 @@ import
chronos,
metrics,
bearssl/rand
import
../node/peer_manager, ../utils/requests, ./protocol_metrics, ./common, ./rpc_codec
import logos_delivery/api/types
import ../node/peer_manager, ./protocol_metrics, ./common, ./rpc_codec
logScope:
topics = "waku store client"

View File

@ -5,6 +5,7 @@ import chronos, testutils/unittests, stew/byteutils
import libp2p/[peerid, peerinfo, crypto/crypto]
import brokers/broker_context
import ../testlib/[common, wakucore, wakunode, testasync]
import logos_delivery/api/[types, logos_delivery_api, kernel_api, messaging_client_api]
import ../waku_archive/archive_utils
import logos_delivery/messaging/messaging_client
import logos_delivery/messaging/delivery_service/recv_service
@ -14,7 +15,6 @@ import
logos_delivery/waku/[
waku_node,
waku_core,
events/message_events,
events/health_events,
waku_relay/protocol,
waku_archive,

View File

@ -4,6 +4,7 @@ import std/strutils
import chronos, testutils/unittests, stew/byteutils, libp2p/[switch, peerinfo]
import brokers/broker_context
import ../testlib/[common, wakucore, wakunode, testasync]
import logos_delivery/api/[types, logos_delivery_api, kernel_api, messaging_client_api]
import ../waku_archive/archive_utils
import logos_delivery, logos_delivery/waku/[waku_node, waku_core, waku_relay/protocol]
import logos_delivery/waku/factory/waku_conf

View File

@ -5,6 +5,7 @@ import chronos, testutils/unittests, stew/byteutils
import libp2p/[peerid, peerinfo, multiaddress, crypto/crypto]
import brokers/broker_context
import ../testlib/[common, wakucore, wakunode, testasync]
import logos_delivery/api/[types, logos_delivery_api, kernel_api, messaging_client_api]
import logos_delivery/messaging/messaging_client
import
@ -12,7 +13,6 @@ import
logos_delivery/waku/[
waku_node,
waku_core,
events/message_events,
waku_relay/protocol,
node/waku_node/filter,
node/subscription_manager,

View File

@ -7,10 +7,11 @@ import brokers/broker_context
import ../testlib/[common, wakucore, wakunode, testasync]
import logos_delivery/api/[types, logos_delivery_api, kernel_api, messaging_client_api]
import logos_delivery
import logos_delivery/waku/[waku_node, waku_core]
import logos_delivery/waku/factory/waku_conf
import logos_delivery/waku/events/message_events as waku_message_events
import tools/confutils/cli_args
import logos_delivery/channels/reliable_channel_manager
@ -99,9 +100,8 @@ suite "Reliable Channel - ingress":
meta: LipWireReliableChannelVersion.toBytes(),
)
waku_message_events.MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(messageHash: "", message: inboundMsg),
MessageReceivedEvent.emit(
brokerCtx, MessageReceivedEvent(messageHash: "", message: inboundMsg)
)
let arrived = await received.withTimeout(TestTimeout)
@ -151,9 +151,8 @@ suite "Reliable Channel - ingress":
meta: @[], ## no Reliable Channel spec marker
)
waku_message_events.MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(messageHash: "", message: inboundMsg),
MessageReceivedEvent.emit(
brokerCtx, MessageReceivedEvent(messageHash: "", message: inboundMsg)
)
## Give the event broker a chance to fan out.
@ -217,9 +216,8 @@ suite "Reliable Channel - send state machine":
await sleepAsync(5.milliseconds)
check sendCalls == 1
waku_message_events.MessageSentEvent.emit(
brokerCtx,
waku_message_events.MessageSentEvent(requestId: fakeMsgReqId, messageHash: ""),
MessageSentEvent.emit(
brokerCtx, MessageSentEvent(requestId: fakeMsgReqId, messageHash: "")
)
let finalised = await sentFut.withTimeout(1.seconds)
@ -296,9 +294,8 @@ suite "Reliable Channel - send state machine":
await sleepAsync(5.milliseconds)
check msgReqIds.len == 2
waku_message_events.MessageSentEvent.emit(
brokerCtx,
waku_message_events.MessageSentEvent(requestId: msgReqIds[0], messageHash: ""),
MessageSentEvent.emit(
brokerCtx, MessageSentEvent(requestId: msgReqIds[0], messageHash: "")
)
let sentArrived = await sentFut.withTimeout(1.seconds)
check sentArrived
@ -308,11 +305,9 @@ suite "Reliable Channel - send state machine":
## segment is still `InFlight`.
check not erroredFut.finished()
waku_message_events.MessageErrorEvent.emit(
MessageErrorEvent.emit(
brokerCtx,
waku_message_events.MessageErrorEvent(
requestId: msgReqIds[1], messageHash: "", error: "synthetic"
),
MessageErrorEvent(requestId: msgReqIds[1], messageHash: "", error: "synthetic"),
)
let erroredArrived = await erroredFut.withTimeout(1.seconds)
check erroredArrived
@ -364,9 +359,8 @@ suite "Reliable Channel - send state machine":
let id = RequestId("race-msg-req-" & $(msgReqIds.len + 1))
msgReqIds.add(id)
if msgReqIds.len == 2:
waku_message_events.MessageSentEvent.emit(
brokerCtx,
waku_message_events.MessageSentEvent(requestId: msgReqIds[0], messageHash: ""),
MessageSentEvent.emit(
brokerCtx, MessageSentEvent(requestId: msgReqIds[0], messageHash: "")
)
await sleepAsync(50.milliseconds)
sendsReturned.inc()
@ -419,9 +413,8 @@ suite "Reliable Channel - send state machine":
## Finalise the second segment from the outside. If the race
## corrupted state, `channelReqId2`'s entry would never reach
## `inflightMessagingIds` and this event would silently miss.
waku_message_events.MessageSentEvent.emit(
brokerCtx,
waku_message_events.MessageSentEvent(requestId: msgReqIds[1], messageHash: ""),
MessageSentEvent.emit(
brokerCtx, MessageSentEvent(requestId: msgReqIds[1], messageHash: "")
)
let arrived = await bothFinalised.withTimeout(2.seconds)
@ -556,9 +549,9 @@ suite "Reliable Channel - SDS lifecycle":
).expect("wrap m2")
## m2 first: missing dependency m1 -> parked, nothing delivered.
waku_message_events.MessageReceivedEvent.emit(
MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
MessageReceivedEvent(
messageHash: "hash-m2", message: sdsWakuMessage(contentTopic, wire2)
),
)
@ -566,9 +559,9 @@ suite "Reliable Channel - SDS lifecycle":
check deliveries.len == 0
## m1 arrives: m1 delivered, then the parked m2 released after it.
waku_message_events.MessageReceivedEvent.emit(
MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
MessageReceivedEvent(
messageHash: "hash-m1", message: sdsWakuMessage(contentTopic, wire1)
),
)
@ -622,15 +615,15 @@ suite "Reliable Channel - SDS lifecycle":
).expect("wrap")
## Same envelope twice (different hashes) — the second must be suppressed.
waku_message_events.MessageReceivedEvent.emit(
MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
MessageReceivedEvent(
messageHash: "dup-hash-1", message: sdsWakuMessage(contentTopic, wire)
),
)
waku_message_events.MessageReceivedEvent.emit(
MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
MessageReceivedEvent(
messageHash: "dup-hash-2", message: sdsWakuMessage(contentTopic, wire)
),
)
@ -678,9 +671,9 @@ suite "Reliable Channel - SDS lifecycle":
)
).expect("wrap")
waku_message_events.MessageReceivedEvent.emit(
MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
MessageReceivedEvent(
messageHash: "foreign-hash", message: sdsWakuMessage(contentTopic, wire)
),
)
@ -738,9 +731,9 @@ suite "Reliable Channel - SDS lifecycle":
)
).expect("wrap")
waku_message_events.MessageReceivedEvent.emit(
MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
MessageReceivedEvent(
messageHash: "restore-hash-1", message: sdsWakuMessage(contentTopic, wire)
),
)
@ -768,9 +761,9 @@ suite "Reliable Channel - SDS lifecycle":
.expect("re-createReliableChannel")
## Replay the same envelope. Only a restored history suppresses it.
waku_message_events.MessageReceivedEvent.emit(
MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
MessageReceivedEvent(
messageHash: "restore-hash-2", message: sdsWakuMessage(contentTopic, wire)
),
)
@ -820,9 +813,9 @@ suite "Reliable Channel - SDS protocol semantics":
).expect("wrap m1")
let m1 = deserializeMessage(wire1).expect("deserialize m1")
waku_message_events.MessageReceivedEvent.emit(
MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
MessageReceivedEvent(
messageHash: "semantics-hash-1", message: sdsWakuMessage(contentTopic, wire1)
),
)
@ -916,9 +909,9 @@ suite "Reliable Channel - SDS protocol semantics":
)
).expect("wrap ack carrier")
waku_message_events.MessageReceivedEvent.emit(
MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
MessageReceivedEvent(
messageHash: "ack-hash-1", message: sdsWakuMessage(contentTopic, ackCarrier)
),
)
@ -982,9 +975,9 @@ suite "Reliable Channel - SDS protocol semantics":
## Deepest first: m3, then m2 — both must be parked.
for i in [2, 1]:
waku_message_events.MessageReceivedEvent.emit(
MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
MessageReceivedEvent(
messageHash: "chain-hash-" & $(i + 1),
message: sdsWakuMessage(contentTopic, wires[i]),
),
@ -993,9 +986,9 @@ suite "Reliable Channel - SDS protocol semantics":
check deliveries.len == 0
## The root arrives: everything drains in causal order.
waku_message_events.MessageReceivedEvent.emit(
MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
MessageReceivedEvent(
messageHash: "chain-hash-1", message: sdsWakuMessage(contentTopic, wires[0])
),
)
@ -1054,9 +1047,9 @@ suite "Reliable Channel - SDS protocol semantics":
)
let syncWire = serializeMessage(syncMsg).expect("serialize sync")
waku_message_events.MessageReceivedEvent.emit(
MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
MessageReceivedEvent(
messageHash: "sync-hash-1", message: sdsWakuMessage(contentTopic, syncWire)
),
)
@ -1070,9 +1063,9 @@ suite "Reliable Channel - SDS protocol semantics":
appPayload, "sync-m1", SdsChannelID(channelId)
)
).expect("wrap")
waku_message_events.MessageReceivedEvent.emit(
MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
MessageReceivedEvent(
messageHash: "sync-hash-2", message: sdsWakuMessage(contentTopic, wire)
),
)
@ -1145,9 +1138,9 @@ suite "Reliable Channel - SDS protocol semantics":
appPayload, "unique-m" & $i, SdsChannelID(channelId)
)
).expect("wrap " & $i)
waku_message_events.MessageReceivedEvent.emit(
MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
MessageReceivedEvent(
messageHash: "unique-hash-" & $i, message: sdsWakuMessage(contentTopic, wire)
),
)