diff --git a/apps/liteprotocoltester/statistics.nim b/apps/liteprotocoltester/statistics.nim index 7feebd4cf..ed9a6c214 100644 --- a/apps/liteprotocoltester/statistics.nim +++ b/apps/liteprotocoltester/statistics.nim @@ -10,6 +10,7 @@ import from std/sugar import `=>` +import logos_delivery/waku/compat/option_valueor import ./tester_message, ./lpt_metrics type diff --git a/library/logos_delivery_api/node_api.nim b/library/logos_delivery_api/node_api.nim index 354ab3a98..8f0ee3921 100644 --- a/library/logos_delivery_api/node_api.nim +++ b/library/logos_delivery_api/node_api.nim @@ -3,9 +3,9 @@ import chronos, chronicles, results, ffi import logos_delivery, logos_delivery/waku/node/waku_node, - logos_delivery/waku/events/message_events, + logos_delivery/api/messaging_client_api, logos_delivery/api/types, - logos_delivery/waku/events/[message_events, health_events], + logos_delivery/waku/events/health_events, tools/confutils/conf_from_json, ../declare_lib, ../json_event @@ -116,7 +116,7 @@ proc logosdelivery_start_node( chronicles.error "MessageReceivedEvent.listen failed", err = $error return err("MessageReceivedEvent.listen failed: " & $error) - let ConnectionStatusChangeListener = EventConnectionStatusChange.listen( + discard EventConnectionStatusChange.listen( ctx.myLib[].waku.brokerCtx, proc(event: EventConnectionStatusChange) {.async: (raises: []).} = callEventCallback(ctx, "onConnectionStatusChange"): diff --git a/logos_delivery/api/kernel_api.nim b/logos_delivery/api/kernel_api.nim new file mode 100644 index 000000000..ea64eab22 --- /dev/null +++ b/logos_delivery/api/kernel_api.nim @@ -0,0 +1,202 @@ +import std/options +import chronos, results +import brokers/event_broker + +import logos_delivery/api/types as api_types +import logos_delivery/waku/waku_core/topics/pubsub_topic +import logos_delivery/waku/waku_store/common as store_types + +export event_broker +export api_types, pubsub_topic, store_types + +type IKernel* = ref object of RootObj + +EventBroker: + # Internal event emitted when a message arrives from the network via any protocol + type MessageSeenEvent* = object + topic*: PubsubTopic + message*: WakuMessage + +# --- topic construction --- +method buildContentTopic*( + self: IKernel, appName: string, appVersion: uint32, name: string, encoding: string +): Future[Result[ContentTopic, string]] {.async: (raises: []), base.} = + return err("Interface IKernel.buildContentTopic not implemented") + +method buildPubsubTopic*( + self: IKernel, topicName: string +): Future[Result[PubsubTopic, string]] {.async: (raises: []), base.} = + return err("Interface IKernel.buildPubsubTopic not implemented") + +method defaultPubsubTopic*( + self: IKernel +): Future[Result[PubsubTopic, string]] {.async: (raises: []), base.} = + return err("Interface IKernel.defaultPubsubTopic not implemented") + +# --- relay --- +method relayPublish*( + self: IKernel, pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32 +): Future[Result[int, string]] {.async: (raises: []), base.} = + return err("Interface IKernel.relayPublish not implemented") + +method relaySubscribe*( + self: IKernel, pubsubTopic: PubsubTopic +): Future[Result[bool, string]] {.async: (raises: []), base.} = + return err("Interface IKernel.relaySubscribe not implemented") + +method relayUnsubscribe*( + self: IKernel, pubsubTopic: PubsubTopic +): Future[Result[bool, string]] {.async: (raises: []), base.} = + return err("Interface IKernel.relayUnsubscribe not implemented") + +method relayAddProtectedShard*( + self: IKernel, clusterId: uint16, shardId: uint16, publicKey: string +): Future[Result[bool, string]] {.async: (raises: []), base.} = + return err("Interface IKernel.relayAddProtectedShard not implemented") + +method relayConnectedPeers*( + self: IKernel, pubsubTopic: PubsubTopic +): Future[Result[seq[string], string]] {.async: (raises: []), base.} = + return err("Interface IKernel.relayConnectedPeers not implemented") + +method relayPeersInMesh*( + self: IKernel, pubsubTopic: PubsubTopic +): Future[Result[seq[string], string]] {.async: (raises: []), base.} = + return err("Interface IKernel.relayPeersInMesh not implemented") + +# --- filter --- +method filterSubscribe*( + self: IKernel, + pubsubTopic: Option[PubsubTopic], + contentTopics: seq[ContentTopic], + peer: string, +): Future[Result[bool, string]] {.async: (raises: []), base.} = + return err("Interface IKernel.filterSubscribe not implemented") + +method filterUnsubscribe*( + self: IKernel, + pubsubTopic: Option[PubsubTopic], + contentTopics: seq[ContentTopic], + peer: string, +): Future[Result[bool, string]] {.async: (raises: []), base.} = + return err("Interface IKernel.filterUnsubscribe not implemented") + +method filterUnsubscribeAll*( + self: IKernel, peer: string +): Future[Result[bool, string]] {.async: (raises: []), base.} = + return err("Interface IKernel.filterUnsubscribeAll not implemented") + +# --- lightpush --- +method lightpushPublish*( + self: IKernel, pubsubTopic: PubsubTopic, message: WakuMessage, peer: string +): Future[Result[string, string]] {.async: (raises: []), base.} = + return err("Interface IKernel.lightpushPublish not implemented") + +# --- store --- +method storeQuery*( + self: IKernel, request: StoreQueryRequest, peer: string, timeoutMs: int +): Future[Result[StoreQueryResponse, string]] {.async: (raises: []), base.} = + return err("Interface IKernel.storeQuery not implemented") + +# --- peer management --- +method connect*( + self: IKernel, peers: seq[string], timeoutMs: uint32 +): Future[Result[bool, string]] {.async: (raises: []), base.} = + return err("Interface IKernel.connect not implemented") + +method disconnectPeerById*( + self: IKernel, peerId: string +): Future[Result[bool, string]] {.async: (raises: []), base.} = + return err("Interface IKernel.disconnectPeerById not implemented") + +method disconnectAllPeers*( + self: IKernel +): Future[Result[bool, string]] {.async: (raises: []), base.} = + return err("Interface IKernel.disconnectAllPeers not implemented") + +method dialPeer*( + self: IKernel, peerAddr: string, protocol: string, timeoutMs: int +): Future[Result[bool, string]] {.async: (raises: []), base.} = + return err("Interface IKernel.dialPeer not implemented") + +method dialPeerById*( + self: IKernel, peerId: string, protocol: string, timeoutMs: int +): Future[Result[bool, string]] {.async: (raises: []), base.} = + return err("Interface IKernel.dialPeerById not implemented") + +method peerIdsFromPeerstore*( + self: IKernel +): Future[Result[seq[string], string]] {.async: (raises: []), base.} = + return err("Interface IKernel.peerIdsFromPeerstore not implemented") + +method connectedPeersInfo*( + self: IKernel +): Future[Result[seq[string], string]] {.async: (raises: []), base.} = + return err("Interface IKernel.connectedPeersInfo not implemented") + +method connectedPeers*( + self: IKernel +): Future[Result[seq[string], string]] {.async: (raises: []), base.} = + return err("Interface IKernel.connectedPeers not implemented") + +method peerIdsByProtocol*( + self: IKernel, protocol: string +): Future[Result[seq[string], string]] {.async: (raises: []), base.} = + return err("Interface IKernel.peerIdsByProtocol not implemented") + +# --- discovery --- +method dnsDiscovery*( + self: IKernel, enrTreeUrl: string, nameServer: string, timeoutMs: int +): Future[Result[seq[string], string]] {.async: (raises: []), base.} = + return err("Interface IKernel.dnsDiscovery not implemented") + +method discv5UpdateBootnodes*( + self: IKernel, bootnodes: seq[string] +): Future[Result[bool, string]] {.async: (raises: []), base.} = + return err("Interface IKernel.discv5UpdateBootnodes not implemented") + +method startDiscv5*( + self: IKernel +): Future[Result[bool, string]] {.async: (raises: []), base.} = + return err("Interface IKernel.startDiscv5 not implemented") + +method stopDiscv5*( + self: IKernel +): Future[Result[bool, string]] {.async: (raises: []), base.} = + return err("Interface IKernel.stopDiscv5 not implemented") + +method peerExchangeRequest*( + self: IKernel, numPeers: uint64 +): Future[Result[int, string]] {.async: (raises: []), base.} = + return err("Interface IKernel.peerExchangeRequest not implemented") + +# --- debug / info --- +method version*( + self: IKernel +): Future[Result[string, string]] {.async: (raises: []), base.} = + return err("Interface IKernel.version not implemented") + +method listenAddresses*( + self: IKernel +): Future[Result[seq[string], string]] {.async: (raises: []), base.} = + return err("Interface IKernel.listenAddresses not implemented") + +method myEnr*( + self: IKernel +): Future[Result[string, string]] {.async: (raises: []), base.} = + return err("Interface IKernel.myEnr not implemented") + +method myPeerId*( + self: IKernel +): Future[Result[string, string]] {.async: (raises: []), base.} = + return err("Interface IKernel.myPeerId not implemented") + +method metrics*( + self: IKernel +): Future[Result[string, string]] {.async: (raises: []), base.} = + return err("Interface IKernel.metrics not implemented") + +method pingPeer*( + self: IKernel, peerAddr: string, timeoutMs: int +): Future[Result[int64, string]] {.async: (raises: []), base.} = + return err("Interface IKernel.pingPeer not implemented") diff --git a/logos_delivery/api/logos_delivery_api.nim b/logos_delivery/api/logos_delivery_api.nim new file mode 100644 index 000000000..162159d8e --- /dev/null +++ b/logos_delivery/api/logos_delivery_api.nim @@ -0,0 +1,33 @@ +## `LogosDelivery` is the project entry point. It is a pure concentrator: it +## owns exactly one instance of each API layer +## +## Waku <- MessagingClient <- ReliableChannelManager +## +## and chains them together (each layer drives the one below it). Every layer +## keeps its own, separate public API — `LogosDelivery` only wires them up and +## drives the shared `new` / `start` / `stop` lifecycle. + +{.push raises: [].} + +import results, chronos +import brokers/event_broker +import types as api_types + +export api_types, event_broker + +type + ## Entry point. Holds one instance of each API layer. + ILogosDelivery* = ref object of RootObj + +EventBroker: + type EventConnectionStatusChange* = object + connectionStatus*: ConnectionStatus + +method start*(self: ILogosDelivery): Future[Result[void, string]] {.async, base.} = + return err("ILogosDelivery.start not implemented") + +method stop*(self: ILogosDelivery): Future[Result[void, string]] {.async, base.} = + return err("ILogosDelivery.stop not implemented") + +method isOnline*(self: ILogosDelivery): Future[Result[bool, string]] {.async, base.} = + return err("ILogosDelivery.isOnline not implemented") diff --git a/logos_delivery/api/messaging_client_api.nim b/logos_delivery/api/messaging_client_api.nim new file mode 100644 index 000000000..46acec3bc --- /dev/null +++ b/logos_delivery/api/messaging_client_api.nim @@ -0,0 +1,48 @@ +import chronos, results +import brokers/event_broker + +import logos_delivery/api/types as api_types + +export event_broker, api_types + +type IMessagingClient* = ref object of RootObj + +EventBroker: + # Event emitted when a message is sent to the network + type MessageSentEvent* = object + requestId*: RequestId + messageHash*: string + +EventBroker: + # Event emitted when a message send operation fails + type MessageErrorEvent* = object + requestId*: RequestId + messageHash*: string + error*: string + +EventBroker: + # Confirmation that a message has been correctly delivered to some neighbouring nodes. + type MessagePropagatedEvent* = object + requestId*: RequestId + messageHash*: string + +EventBroker: + # Event emitted when a message is received via Waku + type MessageReceivedEvent* = object + messageHash*: string + message*: WakuMessage + +method subscribe*( + self: IMessagingClient, contentTopic: ContentTopic +): Future[Result[void, string]] {.async: (raises: []), base.} = + return err("Interface IMessagingClient.subscribe not implemented") + +method unsubscribe*( + self: IMessagingClient, contentTopic: ContentTopic +): Result[void, string] {.base, raises: [].} = + return err("Interface IMessagingClient.unsubscribe not implemented") + +method send*( + self: IMessagingClient, envelope: MessageEnvelope +): Future[Result[RequestId, string]] {.async: (raises: []), base.} = + return err("Interface IMessagingClient.send not implemented") diff --git a/logos_delivery/api/reliable_channel_manager_api.nim b/logos_delivery/api/reliable_channel_manager_api.nim new file mode 100644 index 000000000..206ef0df7 --- /dev/null +++ b/logos_delivery/api/reliable_channel_manager_api.nim @@ -0,0 +1,62 @@ +import chronos, results + +import brokers/event_broker + +import logos_delivery/api/types as api_types + +export event_broker, api_types + +type + IReliableChannelManager* = ref object of RootObj + + SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {. + async: (raises: [CatchableError]), gcsafe + .} + ## Egress dispatch boundary. Typically wraps `MessagingClient.send`; + ## tests inject a fake that records calls and returns canned + ## `RequestId`s so the send state machine can be exercised end-to-end + ## without a network. + +EventBroker: + type ChannelMessageReceivedEvent* = object + channelId*: ChannelId + senderId*: SdsParticipantID + payload*: seq[byte] + +EventBroker: + ## Emitted when every segment of a channel-level `send()` reached + ## `Confirmed`. Channel-level analogue of `MessageSentEvent`; the + ## `requestId` is the channel-layer parent returned by `send()`. + type ChannelMessageSentEvent* = object + channelId*: ChannelId + requestId*: RequestId + +EventBroker: + ## Emitted when a channel-level `send()` finalises with at least one + ## segment in `Failed`. Channel-level analogue of `MessageErrorEvent`. + type ChannelMessageErrorEvent* = object + channelId*: ChannelId + requestId*: RequestId + error*: string + +method createReliableChannel*( + self: IReliableChannelManager, + channelId: ChannelId, + contentTopic: ContentTopic, + senderId: SdsParticipantID, + sendHandler: SendHandler = nil, +): Result[ChannelId, string] {.base.} = + return err("Interface IReliableChannelManager.createReliableChannel not implemented") + +method closeChannel*( + self: IReliableChannelManager, channelId: ChannelId +): Future[Result[void, string]] {.async: (raises: []), base.} = + return err("Interface IReliableChannelManager.closeChannel not implemented") + +method send*( + self: IReliableChannelManager, + channelId: ChannelId, + appPayload: seq[byte], + ephemeral: bool = false, +): Future[Result[RequestId, string]] {.async: (raises: []), base.} = + return err("Interface IReliableChannelManager.send not implemented") diff --git a/logos_delivery/api/types.nim b/logos_delivery/api/types.nim index 5757a8e82..415e4bf0b 100644 --- a/logos_delivery/api/types.nim +++ b/logos_delivery/api/types.nim @@ -1,14 +1,19 @@ -import logos_delivery/waku/compat/option_valueor -import libp2p/crypto/crypto {.push raises: [].} - +import std/hashes import bearssl/rand, std/times, chronos import stew/byteutils -import logos_delivery/waku/utils/requests as request_utils +import libp2p/crypto/crypto + +import logos_delivery/waku/compat/option_valueor + import logos_delivery/waku/waku_core/[topics/content_topic, message/message, time] export content_topic, message +import types/sds_message_id + +export sds_message_id + type MessageEnvelope* = object contentTopic*: ContentTopic @@ -26,9 +31,16 @@ type PartiallyConnected Connected + ChannelId* = SdsChannelID + +proc generateRequestId*(rng: crypto.Rng): string = + var bytes: array[10, byte] + rng.generate(bytes) + return byteutils.toHex(bytes) + proc new*(T: typedesc[RequestId], rng: crypto.Rng): T = ## Generate a new RequestId using the provided RNG. - RequestId(request_utils.generateRequestId(rng)) + RequestId(generateRequestId(rng)) proc `$`*(r: RequestId): string {.inline.} = string(r) @@ -36,6 +48,10 @@ proc `$`*(r: RequestId): string {.inline.} = proc `==`*(a, b: RequestId): bool {.inline.} = string(a) == string(b) +proc hash*(r: RequestId): Hash = + ## Allows `RequestId` to be used as a `Table` key. + hash(string(r)) + proc init*( T: type MessageEnvelope, contentTopic: ContentTopic, diff --git a/logos_delivery/channels/events.nim b/logos_delivery/channels/events.nim deleted file mode 100644 index 5f69095e4..000000000 --- a/logos_delivery/channels/events.nim +++ /dev/null @@ -1,39 +0,0 @@ -## Reliable Channel event types emitted to API consumers. -## -## Lifecycle events for individual segments (sent / propagated / errored) -## are the same as the network-level ones the MessagingClient already -## emits — `requestId` is shared across layers — so we just re-export -## `waku/events/message_events` and avoid declaring duplicates. -## -## Only the channel-level `MessageReceivedEvent` carries data that has -## no analogue in the lower layer (reassembled application payload, -## senderId, channelId), so it lives here. - -import logos_delivery/waku/events/message_events as waku_message_events -import brokers/event_broker - -import ./types as channel_types - -export waku_message_events, channel_types, event_broker - -EventBroker: - type ChannelMessageReceivedEvent* = object - channelId*: ChannelId - senderId*: SdsParticipantID - payload*: seq[byte] - -EventBroker: - ## Emitted when every segment of a channel-level `send()` reached - ## `Confirmed`. Channel-level analogue of `MessageSentEvent`; the - ## `requestId` is the channel-layer parent returned by `send()`. - type ChannelMessageSentEvent* = object - channelId*: ChannelId - requestId*: RequestId - -EventBroker: - ## Emitted when a channel-level `send()` finalises with at least one - ## segment in `Failed`. Channel-level analogue of `MessageErrorEvent`. - type ChannelMessageErrorEvent* = object - channelId*: ChannelId - requestId*: RequestId - error*: string diff --git a/logos_delivery/channels/reliable_channel.nim b/logos_delivery/channels/reliable_channel.nim index 307dc17a4..c52d25653 100644 --- a/logos_delivery/channels/reliable_channel.nim +++ b/logos_delivery/channels/reliable_channel.nim @@ -24,16 +24,17 @@ import libp2p/crypto/crypto as libp2p_crypto import logos_delivery/api/types import logos_delivery/messaging/delivery_service/send_service import logos_delivery/waku/waku_core/topics +import logos_delivery/api/reliable_channel_manager_api +import logos_delivery/api/messaging_client_api -import ./events import ./segmentation/segmentation import ./scalable_data_sync/scalable_data_sync import ./rate_limit_manager/rate_limit_manager import ./encryption/encryption export - types, send_service, events, segmentation, scalable_data_sync, rate_limit_manager, - encryption + types, send_service, reliable_channel_manager_api, segmentation, scalable_data_sync, + rate_limit_manager, encryption const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1" ## Wire-format spec marker for the Reliable Channel layer, as defined @@ -44,14 +45,6 @@ const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1" ## on breaking on-the-wire changes; implementations pin one version. type - SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {. - async: (raises: [CatchableError]), gcsafe - .} - ## Egress dispatch boundary. Typically wraps `MessagingClient.send`; - ## tests inject a fake that records calls and returns canned - ## `RequestId`s so the send state machine can be exercised end-to-end - ## without a network. - MessagePersistence {.pure.} = enum Persistent Ephemeral diff --git a/logos_delivery/channels/reliable_channel_manager.nim b/logos_delivery/channels/reliable_channel_manager.nim index 29feab0b9..88d1d1787 100644 --- a/logos_delivery/channels/reliable_channel_manager.nim +++ b/logos_delivery/channels/reliable_channel_manager.nim @@ -13,9 +13,9 @@ import stew/byteutils import brokers/broker_context -import logos_delivery/waku/events/message_events as waku_message_events -import logos_delivery/messaging/messaging_client import logos_delivery/api/types +import logos_delivery/api/reliable_channel_manager_api +import logos_delivery/messaging/messaging_client import logos_delivery/waku/waku_core/topics import logos_delivery/waku/persistency/sds_persistency @@ -34,7 +34,7 @@ type ## channel API. Placeholder for now (segmentation / SDS / rate-limit defaults ## will move here in a follow-up PR); kept so each layer owns its own config. - ReliableChannelManager* = ref object + ReliableChannelManager* = ref object of IReliableChannelManager channels: Table[ChannelId, ReliableChannel] messagingClient: MessagingClient ## The channel layer chains onto messaging. sendHandler: SendHandler @@ -94,13 +94,13 @@ proc sdsPersistence(): Option[Persistence] = return none(Persistence) return some(newSdsPersistence(job)) -proc createReliableChannel*( +method createReliableChannel*( self: ReliableChannelManager, channelId: ChannelId, contentTopic: ContentTopic, senderId: SdsParticipantID, sendHandler: SendHandler = nil, -): Result[ChannelId, string] = +): Result[ChannelId, string] {.raises: [].} = ## Spec entry point. The `sendHandler` and `rng` the channel needs are ## sourced from the owning `ReliableChannelManager` rather than passed ## per call. Encryption is wired up through the `Encrypt`/`Decrypt` @@ -146,7 +146,7 @@ proc createReliableChannel*( self.channels[channelId] = chn return ok(channelId) -proc closeChannel*( +method closeChannel*( self: ReliableChannelManager, channelId: ChannelId ): Future[Result[void, string]] {.async: (raises: []).} = ## Stops the channel's SDS loops and releases the channel. Persisted SDS @@ -158,7 +158,7 @@ proc closeChannel*( await chn.stop() return ok() -proc send*( +method send*( self: ReliableChannelManager, channelId: ChannelId, appPayload: seq[byte], diff --git a/logos_delivery/channels/types.nim b/logos_delivery/channels/types.nim deleted file mode 100644 index 7730c5c58..000000000 --- a/logos_delivery/channels/types.nim +++ /dev/null @@ -1,15 +0,0 @@ -## Core identifier types for the Reliable Channel API. - -import std/hashes -import logos_delivery/api/types as api_types - -import ./scalable_data_sync/scalable_data_sync - -export scalable_data_sync -export api_types - -type ChannelId* = SdsChannelID - -proc hash*(r: RequestId): Hash = - ## Allows `RequestId` to be used as a `Table` key. - hash(string(r)) diff --git a/logos_delivery/logos_delivery.nim b/logos_delivery/logos_delivery.nim index f61202b9a..f739851e7 100644 --- a/logos_delivery/logos_delivery.nim +++ b/logos_delivery/logos_delivery.nim @@ -11,6 +11,8 @@ import results, chronos, chronicles +import logos_delivery/api/logos_delivery_api +export logos_delivery_api import logos_delivery/waku/waku export waku import logos_delivery/messaging/messaging_client @@ -35,7 +37,8 @@ type messaging*: MessagingClientConf reliableChannel*: ReliableChannelManagerConf - LogosDelivery* = ref object ## Entry point. Holds one instance of each API layer. + LogosDelivery* = ref object of ILogosDelivery + ## Entry point. Holds one instance of each API layer. waku*: Waku messagingClient*: MessagingClient reliableChannelManager*: ReliableChannelManager @@ -79,7 +82,7 @@ proc new*( ) ) -proc start*(self: LogosDelivery): Future[Result[void, string]] {.async.} = +method start*(self: LogosDelivery): Future[Result[void, string]] {.async.} = ## Starts each layer bottom-up: transport first, then messaging, then channels. if self.waku.isNil(): return err("Waku node is not initialized") @@ -99,7 +102,7 @@ proc start*(self: LogosDelivery): Future[Result[void, string]] {.async.} = return ok() -proc stop*(self: LogosDelivery): Future[Result[void, string]] {.async.} = +method stop*(self: LogosDelivery): Future[Result[void, string]] {.async.} = ## Stops in reverse order so higher layers drain before their dependencies. await self.reliableChannelManager.stop() await self.messagingClient.stop() @@ -109,7 +112,7 @@ proc stop*(self: LogosDelivery): Future[Result[void, string]] {.async.} = return ok() -proc isOnline*(self: LogosDelivery): Future[Result[bool, string]] {.async.} = +method isOnline*(self: LogosDelivery): Future[Result[bool, string]] {.async.} = if self.waku.isNil(): return err("Waku node is not initialized") return ok(self.waku.healthMonitor.onlineMonitor.amIOnline()) diff --git a/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim b/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim index 90bdb0839..9840d5a20 100644 --- a/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim +++ b/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim @@ -6,6 +6,7 @@ import logos_delivery/waku/compat/option_valueor import std/[tables, sequtils, options, sets] import chronos, chronicles, libp2p/utility import brokers/broker_context +import logos_delivery/api/[types, logos_delivery_api, kernel_api, messaging_client_api] import logos_delivery/waku/[ waku_core, @@ -13,7 +14,6 @@ import waku_store/client, waku_store/common, waku_filter_v2/client, - events/message_events, events/health_events, waku_node, node/subscription_manager, diff --git a/logos_delivery/messaging/delivery_service/send_service/send_service.nim b/logos_delivery/messaging/delivery_service/send_service/send_service.nim index 67f8de95f..a7be11a8b 100644 --- a/logos_delivery/messaging/delivery_service/send_service/send_service.nim +++ b/logos_delivery/messaging/delivery_service/send_service/send_service.nim @@ -5,6 +5,7 @@ import logos_delivery/waku/compat/option_valueor import std/[sequtils, tables, options, typetraits] import chronos, chronicles, libp2p/utility import brokers/broker_context +import logos_delivery/api/messaging_client_api import ./[send_processor, relay_processor, lightpush_processor, delivery_task], logos_delivery/waku/[ @@ -18,7 +19,6 @@ import rln/rln, waku_lightpush/client, waku_lightpush/callbacks, - events/message_events, ] logScope: diff --git a/logos_delivery/messaging/messaging_client.nim b/logos_delivery/messaging/messaging_client.nim index 96cd13eb1..6db970a72 100644 --- a/logos_delivery/messaging/messaging_client.nim +++ b/logos_delivery/messaging/messaging_client.nim @@ -2,6 +2,7 @@ import results, chronos import chronicles import logos_delivery/api/types, + logos_delivery/api/messaging_client_api, logos_delivery/waku/node/[waku_node, subscription_manager], logos_delivery/messaging/delivery_service/[recv_service, send_service], logos_delivery/messaging/delivery_service/send_service/delivery_task @@ -13,7 +14,7 @@ type ## follow-up PR. Today it only carries the p2p reliability toggle. useP2PReliability*: bool - MessagingClient* = ref object + MessagingClient* = ref object of IMessagingClient node: WakuNode sendService*: SendService recvService*: RecvService @@ -46,26 +47,25 @@ proc stop*(self: MessagingClient) {.async.} = proc checkApiAvailability(self: MessagingClient): Result[void, string] = if self.isNil(): return err("MessagingClient is not initialized") - return ok() -proc subscribe*( +method subscribe*( self: MessagingClient, contentTopic: ContentTopic -): Future[Result[void, string]] {.async.} = +): Future[Result[void, string]] {.async: (raises: []).} = ?checkApiAvailability(self) return self.node.subscriptionManager.subscribe(contentTopic) -proc unsubscribe*( +method unsubscribe*( self: MessagingClient, contentTopic: ContentTopic -): Result[void, string] = +): Result[void, string] {.raises: [].} = ?checkApiAvailability(self) return self.node.subscriptionManager.unsubscribe(contentTopic) -proc send*( +method send*( self: MessagingClient, envelope: MessageEnvelope -): Future[Result[RequestId, string]] {.async.} = +): Future[Result[RequestId, string]] {.async: (raises: []).} = ## High-level messaging API send. Auto-subscribes to the content topic ## (so the local node sees its own gossipsub broadcast), builds a ## `DeliveryTask`, and hands it to the send service. Returns the request diff --git a/logos_delivery/waku/events/events.nim b/logos_delivery/waku/events/events.nim index 130d7c018..56d8eee7f 100644 --- a/logos_delivery/waku/events/events.nim +++ b/logos_delivery/waku/events/events.nim @@ -1,9 +1,3 @@ -import - ./[ - message_events, delivery_events, health_events, peer_events, lifecycle_events, - discovery_events, - ] +import ./[delivery_events, health_events, peer_events, discovery_events] -export - message_events, delivery_events, health_events, peer_events, lifecycle_events, - discovery_events +export delivery_events, health_events, peer_events, discovery_events diff --git a/logos_delivery/waku/events/health_events.nim b/logos_delivery/waku/events/health_events.nim index d19de776b..9dc70ae63 100644 --- a/logos_delivery/waku/events/health_events.nim +++ b/logos_delivery/waku/events/health_events.nim @@ -1,15 +1,10 @@ import brokers/event_broker -import logos_delivery/api/types -import logos_delivery/waku/node/health_monitor/[protocol_health, topic_health] -import logos_delivery/waku/waku_core/topics +from logos_delivery/api/logos_delivery_api import EventConnectionStatusChange +import logos_delivery/waku/node/health_monitor/topic_health +from logos_delivery/waku/waku_core/topics import ContentTopic, PubsubTopic -export protocol_health, topic_health - -# Notify health changes to node connectivity -EventBroker: - type EventConnectionStatusChange* = object - connectionStatus*: ConnectionStatus +export topic_health, EventConnectionStatusChange # Notify health changes to a subscribed topic # TODO: emit content topic health change events when subscribe/unsubscribe diff --git a/logos_delivery/waku/events/message_events.nim b/logos_delivery/waku/events/message_events.nim deleted file mode 100644 index 2e4bece80..000000000 --- a/logos_delivery/waku/events/message_events.nim +++ /dev/null @@ -1,35 +0,0 @@ -import brokers/event_broker -import logos_delivery/api/types -import logos_delivery/waku/[waku_core/message, waku_core/topics] -export types - -EventBroker: - # Event emitted when a message is sent to the network - type MessageSentEvent* = object - requestId*: RequestId - messageHash*: string - -EventBroker: - # Event emitted when a message send operation fails - type MessageErrorEvent* = object - requestId*: RequestId - messageHash*: string - error*: string - -EventBroker: - # Confirmation that a message has been correctly delivered to some neighbouring nodes. - type MessagePropagatedEvent* = object - requestId*: RequestId - messageHash*: string - -EventBroker: - # Event emitted when a message is received via Waku - type MessageReceivedEvent* = object - messageHash*: string - message*: WakuMessage - -EventBroker: - # Internal event emitted when a message arrives from the network via any protocol - type MessageSeenEvent* = object - topic*: PubsubTopic - message*: WakuMessage diff --git a/logos_delivery/waku/node/health_monitor/node_health_monitor.nim b/logos_delivery/waku/node/health_monitor/node_health_monitor.nim index d75a793ac..adfd0a477 100644 --- a/logos_delivery/waku/node/health_monitor/node_health_monitor.nim +++ b/logos_delivery/waku/node/health_monitor/node_health_monitor.nim @@ -9,6 +9,7 @@ import libp2p/protocols/pubsub, libp2p/protocols/pubsub/rpc/messages, logos_delivery/api/types, + logos_delivery/api/logos_delivery_api, logos_delivery/waku/[ waku_relay, rln, diff --git a/logos_delivery/waku/node/health_monitor/topic_health.nim b/logos_delivery/waku/node/health_monitor/topic_health.nim index 9273f4835..a08a3c3a9 100644 --- a/logos_delivery/waku/node/health_monitor/topic_health.nim +++ b/logos_delivery/waku/node/health_monitor/topic_health.nim @@ -1,6 +1,6 @@ import chronos -import logos_delivery/waku/waku_core +from logos_delivery/waku/waku_core/topics import PubsubTopic type TopicHealth* = enum UNHEALTHY diff --git a/logos_delivery/waku/node/subscription_manager.nim b/logos_delivery/waku/node/subscription_manager.nim index 15b582ea6..2eadd2009 100644 --- a/logos_delivery/waku/node/subscription_manager.nim +++ b/logos_delivery/waku/node/subscription_manager.nim @@ -2,6 +2,7 @@ import logos_delivery/waku/compat/option_valueor import std/[sequtils, sets, tables, options], chronos, chronicles, metrics, results import libp2p/[peerid, peerinfo] import brokers/broker_context +import logos_delivery/api/kernel_api import logos_delivery/waku/[ @@ -16,7 +17,6 @@ import waku_filter_v2/client as filter_client, waku_filter_v2/protocol as filter_protocol, events/health_events, - events/message_events, events/peer_events, requests/health_requests, node/peer_manager, diff --git a/logos_delivery/waku/node/waku_node.nim b/logos_delivery/waku/node/waku_node.nim index 665f0ba98..613b3a9f3 100644 --- a/logos_delivery/waku/node/waku_node.nim +++ b/logos_delivery/waku/node/waku_node.nim @@ -60,9 +60,9 @@ import requests/node_requests, requests/health_requests, events/health_events, - events/message_events, events/peer_events, ], + logos_delivery/api/kernel_api, logos_delivery/waku/discovery/waku_kademlia, logos_delivery/waku/net/[bound_ports, net_config], ./peer_manager, diff --git a/logos_delivery/waku/node/waku_node/relay.nim b/logos_delivery/waku/node/waku_node/relay.nim index b1a2f9287..05e1d30db 100644 --- a/logos_delivery/waku/node/waku_node/relay.nim +++ b/logos_delivery/waku/node/waku_node/relay.nim @@ -33,7 +33,6 @@ import node/waku_node, node/subscription_manager, node/peer_manager, - events/message_events, ] export waku_relay.WakuRelayHandler diff --git a/logos_delivery/waku/utils/requests.nim b/logos_delivery/waku/utils/requests.nim deleted file mode 100644 index a7fd9a2a9..000000000 --- a/logos_delivery/waku/utils/requests.nim +++ /dev/null @@ -1,10 +0,0 @@ -# Request utils. - -{.push raises: [].} - -import libp2p/crypto/crypto, stew/byteutils - -proc generateRequestId*(rng: crypto.Rng): string = - var bytes: array[10, byte] - rng.generate(bytes) - return byteutils.toHex(bytes) diff --git a/logos_delivery/waku/waku.nim b/logos_delivery/waku/waku.nim index 067b5b6ec..ff9905ab5 100644 --- a/logos_delivery/waku/waku.nim +++ b/logos_delivery/waku/waku.nim @@ -22,6 +22,7 @@ import metrics/chronos_httpserver, brokers/broker_context, logos_delivery/api/types, + logos_delivery/api/kernel_api, logos_delivery/waku/[ waku_core, waku_node, @@ -65,7 +66,7 @@ const git_version* {.strdefine.} = "n/a" const FilterOpTimeout = 5.seconds -type Waku* = ref object +type Waku* = ref object of IKernel stateInfo*: WakuStateInfo conf*: WakuConf rng*: crypto.Rng @@ -577,29 +578,31 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} = ## Kernel API realization ## # --- topic construction --- -proc buildContentTopic*( +method buildContentTopic*( self: Waku, appName: string, appVersion: uint32, name: string, encoding: string -): Future[Result[ContentTopic, string]] {.async.} = +): Future[Result[ContentTopic, string]] {.async: (raises: []).} = try: return ok(ContentTopic(fmt"/{appName}/{appVersion}/{name}/{encoding}")) except CatchableError as e: return err(e.msg) -proc buildPubsubTopic*( +method buildPubsubTopic*( self: Waku, topicName: string -): Future[Result[PubsubTopic, string]] {.async.} = +): Future[Result[PubsubTopic, string]] {.async: (raises: []).} = try: return ok(PubsubTopic(fmt"/waku/2/{topicName}")) except CatchableError as e: return err(e.msg) -proc defaultPubsubTopic*(self: Waku): Future[Result[PubsubTopic, string]] {.async.} = +method defaultPubsubTopic*( + self: Waku +): Future[Result[PubsubTopic, string]] {.async: (raises: []).} = return ok(DefaultPubsubTopic) # --- relay --- -proc relayPublish*( +method relayPublish*( self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32 -): Future[Result[int, string]] {.async.} = +): Future[Result[int, string]] {.async: (raises: []).} = try: if self.node.wakuRelay.isNil(): return err("relayPublish: WakuRelay not mounted") @@ -611,9 +614,9 @@ proc relayPublish*( except CatchableError as e: return err(e.msg) -proc relaySubscribe*( +method relaySubscribe*( self: Waku, pubsubTopic: PubsubTopic -): Future[Result[bool, string]] {.async.} = +): Future[Result[bool, string]] {.async: (raises: []).} = try: if self.node.wakuRelay.isNil(): return err("relaySubscribe: WakuRelay not mounted") @@ -627,9 +630,9 @@ proc relaySubscribe*( except CatchableError as e: return err(e.msg) -proc relayUnsubscribe*( +method relayUnsubscribe*( self: Waku, pubsubTopic: PubsubTopic -): Future[Result[bool, string]] {.async.} = +): Future[Result[bool, string]] {.async: (raises: []).} = try: if self.node.wakuRelay.isNil(): return err("relayUnsubscribe: WakuRelay not mounted") @@ -641,9 +644,9 @@ proc relayUnsubscribe*( except CatchableError as e: return err(e.msg) -proc relayAddProtectedShard*( +method relayAddProtectedShard*( self: Waku, clusterId: uint16, shardId: uint16, publicKey: string -): Future[Result[bool, string]] {.async.} = +): Future[Result[bool, string]] {.async: (raises: []).} = try: if self.node.wakuRelay.isNil(): return err("relayAddProtectedShard: WakuRelay not mounted") @@ -657,9 +660,9 @@ proc relayAddProtectedShard*( except CatchableError as e: return err(e.msg) -proc relayConnectedPeers*( +method relayConnectedPeers*( self: Waku, pubsubTopic: PubsubTopic -): Future[Result[seq[string], string]] {.async.} = +): Future[Result[seq[string], string]] {.async: (raises: []).} = try: if self.node.wakuRelay.isNil(): return err("relayConnectedPeers: WakuRelay not mounted") @@ -671,9 +674,9 @@ proc relayConnectedPeers*( except CatchableError as e: return err(e.msg) -proc relayPeersInMesh*( +method relayPeersInMesh*( self: Waku, pubsubTopic: PubsubTopic -): Future[Result[seq[string], string]] {.async.} = +): Future[Result[seq[string], string]] {.async: (raises: []).} = try: if self.node.wakuRelay.isNil(): return err("relayPeersInMesh: WakuRelay not mounted") @@ -686,12 +689,12 @@ proc relayPeersInMesh*( return err(e.msg) # --- filter --- -proc filterSubscribe*( +method filterSubscribe*( self: Waku, pubsubTopic: Option[PubsubTopic], contentTopics: seq[ContentTopic], peer: string, -): Future[Result[bool, string]] {.async.} = +): Future[Result[bool, string]] {.async: (raises: []).} = try: if self.node.wakuFilterClient.isNil(): return err("wakuFilterClient is not mounted") @@ -706,12 +709,12 @@ proc filterSubscribe*( except CatchableError as e: return err(e.msg) -proc filterUnsubscribe*( +method filterUnsubscribe*( self: Waku, pubsubTopic: Option[PubsubTopic], contentTopics: seq[ContentTopic], peer: string, -): Future[Result[bool, string]] {.async.} = +): Future[Result[bool, string]] {.async: (raises: []).} = try: if self.node.wakuFilterClient.isNil(): return err("wakuFilterClient is not mounted") @@ -726,9 +729,9 @@ proc filterUnsubscribe*( except CatchableError as e: return err(e.msg) -proc filterUnsubscribeAll*( +method filterUnsubscribeAll*( self: Waku, peer: string -): Future[Result[bool, string]] {.async.} = +): Future[Result[bool, string]] {.async: (raises: []).} = try: if self.node.wakuFilterClient.isNil(): return err("wakuFilterClient is not mounted") @@ -744,9 +747,9 @@ proc filterUnsubscribeAll*( return err(e.msg) # --- lightpush --- -proc lightpushPublish*( +method lightpushPublish*( self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, peer: string -): Future[Result[string, string]] {.async.} = +): Future[Result[string, string]] {.async: (raises: []).} = try: if self.node.wakuLegacyLightpushClient.isNil(): return err("wakuLegacyLightpushClient is not mounted") @@ -766,9 +769,9 @@ proc lightpushPublish*( return err(e.msg) # --- store --- -proc storeQuery*( +method storeQuery*( self: Waku, request: StoreQueryRequest, peer: string, timeoutMs: int -): Future[Result[StoreQueryResponse, string]] {.async.} = +): Future[Result[StoreQueryResponse, string]] {.async: (raises: []).} = try: if self.node.wakuStoreClient.isNil(): return err("wakuStoreClient is not mounted") @@ -788,18 +791,18 @@ proc storeQuery*( return err(e.msg) # --- peer management --- -proc connect*( +method connect*( self: Waku, peers: seq[string], timeoutMs: uint32 -): Future[Result[bool, string]] {.async.} = +): Future[Result[bool, string]] {.async: (raises: []).} = try: await self.node.connectToNodes(peers.mapIt(strip(it)), source = "static") return ok(true) except CatchableError as e: return err(e.msg) -proc disconnectPeerById*( +method disconnectPeerById*( self: Waku, peerId: string -): Future[Result[bool, string]] {.async.} = +): Future[Result[bool, string]] {.async: (raises: []).} = try: let pId = PeerId.init(peerId).valueOr: return err($error) @@ -808,16 +811,18 @@ proc disconnectPeerById*( except CatchableError as e: return err(e.msg) -proc disconnectAllPeers*(self: Waku): Future[Result[bool, string]] {.async.} = +method disconnectAllPeers*( + self: Waku +): Future[Result[bool, string]] {.async: (raises: []).} = try: await self.node.peerManager.disconnectAllPeers() return ok(true) except CatchableError as e: return err(e.msg) -proc dialPeer*( +method dialPeer*( self: Waku, peerAddr: string, protocol: string, timeoutMs: int -): Future[Result[bool, string]] {.async.} = +): Future[Result[bool, string]] {.async: (raises: []).} = try: let remotePeerInfo = parsePeerInfo(peerAddr).valueOr: return err($error) @@ -828,9 +833,9 @@ proc dialPeer*( except CatchableError as e: return err(e.msg) -proc dialPeerById*( +method dialPeerById*( self: Waku, peerId: string, protocol: string, timeoutMs: int -): Future[Result[bool, string]] {.async.} = +): Future[Result[bool, string]] {.async: (raises: []).} = try: let pId = PeerId.init(peerId).valueOr: return err($error) @@ -841,13 +846,17 @@ proc dialPeerById*( except CatchableError as e: return err(e.msg) -proc peerIdsFromPeerstore*(self: Waku): Future[Result[seq[string], string]] {.async.} = +method peerIdsFromPeerstore*( + self: Waku +): Future[Result[seq[string], string]] {.async: (raises: []).} = try: return ok(self.node.peerManager.switch.peerStore.peers().mapIt($it.peerId)) except CatchableError as e: return err(e.msg) -proc connectedPeersInfo*(self: Waku): Future[Result[seq[string], string]] {.async.} = +method connectedPeersInfo*( + self: Waku +): Future[Result[seq[string], string]] {.async: (raises: []).} = try: return ok( self.node.peerManager.switch.peerStore @@ -858,16 +867,18 @@ proc connectedPeersInfo*(self: Waku): Future[Result[seq[string], string]] {.asyn except CatchableError as e: return err(e.msg) -proc connectedPeers*(self: Waku): Future[Result[seq[string], string]] {.async.} = +method connectedPeers*( + self: Waku +): Future[Result[seq[string], string]] {.async: (raises: []).} = try: let (inPeerIds, outPeerIds) = self.node.peerManager.connectedPeers() return ok(concat(inPeerIds, outPeerIds).mapIt($it)) except CatchableError as e: return err(e.msg) -proc peerIdsByProtocol*( +method peerIdsByProtocol*( self: Waku, protocol: string -): Future[Result[seq[string], string]] {.async.} = +): Future[Result[seq[string], string]] {.async: (raises: []).} = try: return ok( self.node.peerManager.switch.peerStore @@ -879,9 +890,9 @@ proc peerIdsByProtocol*( return err(e.msg) # --- discovery --- -proc dnsDiscovery*( +method dnsDiscovery*( self: Waku, enrTreeUrl: string, nameServer: string, timeoutMs: int -): Future[Result[seq[string], string]] {.async.} = +): Future[Result[seq[string], string]] {.async: (raises: []).} = try: let dnsNameServers = @[parseIpAddress(nameServer)] let discoveredPeers = ( @@ -898,9 +909,9 @@ proc dnsDiscovery*( except CatchableError as e: return err(e.msg) -proc discv5UpdateBootnodes*( +method discv5UpdateBootnodes*( self: Waku, bootnodes: seq[string] -): Future[Result[bool, string]] {.async.} = +): Future[Result[bool, string]] {.async: (raises: []).} = try: if self.wakuDiscv5.isNil(): return err("discv5 not started") @@ -911,7 +922,7 @@ proc discv5UpdateBootnodes*( except CatchableError as e: return err(e.msg) -proc startDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} = +method startDiscv5*(self: Waku): Future[Result[bool, string]] {.async: (raises: []).} = try: if self.wakuDiscv5.isNil(): return err("discv5 not started") @@ -921,7 +932,7 @@ proc startDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} = except CatchableError as e: return err(e.msg) -proc stopDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} = +method stopDiscv5*(self: Waku): Future[Result[bool, string]] {.async: (raises: []).} = try: if self.wakuDiscv5.isNil(): return err("discv5 not started") @@ -930,9 +941,9 @@ proc stopDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} = except CatchableError as e: return err(e.msg) -proc peerExchangeRequest*( +method peerExchangeRequest*( self: Waku, numPeers: uint64 -): Future[Result[int, string]] {.async.} = +): Future[Result[int, string]] {.async: (raises: []).} = try: let numPeersRecv = (await self.node.fetchPeerExchangePeers(numPeers)).valueOr: return err("failed peer exchange: " & $error) @@ -941,37 +952,39 @@ proc peerExchangeRequest*( return err(e.msg) # --- debug / info --- -proc version*(self: Waku): Future[Result[string, string]] {.async.} = +method version*(self: Waku): Future[Result[string, string]] {.async: (raises: []).} = return ok(WakuNodeVersionString) -proc listenAddresses*(self: Waku): Future[Result[seq[string], string]] {.async.} = +method listenAddresses*( + self: Waku +): Future[Result[seq[string], string]] {.async: (raises: []).} = try: return ok(self.node.info().listenAddresses) except CatchableError as e: return err(e.msg) -proc myEnr*(self: Waku): Future[Result[string, string]] {.async.} = +method myEnr*(self: Waku): Future[Result[string, string]] {.async: (raises: []).} = try: return ok(self.node.enr.toURI()) except CatchableError as e: return err(e.msg) -proc myPeerId*(self: Waku): Future[Result[string, string]] {.async.} = +method myPeerId*(self: Waku): Future[Result[string, string]] {.async: (raises: []).} = try: return ok($self.node.peerId()) except CatchableError as e: return err(e.msg) -proc metrics*(self: Waku): Future[Result[string, string]] {.async.} = +method metrics*(self: Waku): Future[Result[string, string]] {.async: (raises: []).} = {.gcsafe.}: try: return ok(defaultRegistry.toText()) except CatchableError as e: return err(e.msg) -proc pingPeer*( +method pingPeer*( self: Waku, peerAddr: string, timeoutMs: int -): Future[Result[int64, string]] {.async.} = +): Future[Result[int64, string]] {.async: (raises: []).} = try: let peerInfo = parsePeerInfo(peerAddr).valueOr: return err("pingPeer failed to parse peer addr: " & $error) diff --git a/logos_delivery/waku/waku_lightpush/client.nim b/logos_delivery/waku/waku_lightpush/client.nim index 680970a51..e33b2875c 100644 --- a/logos_delivery/waku/waku_lightpush/client.nim +++ b/logos_delivery/waku/waku_lightpush/client.nim @@ -4,10 +4,10 @@ import logos_delivery/waku/compat/option_valueor import std/options, results, chronicles, chronos, metrics, bearssl/rand, stew/byteutils import libp2p/peerid, libp2p/stream/connection +import logos_delivery/api/types import ../waku_core/peers, ../node/peer_manager, - ../utils/requests, ../waku_core, ./common, ./protocol_metrics, diff --git a/logos_delivery/waku/waku_lightpush/self_req_handler.nim b/logos_delivery/waku/waku_lightpush/self_req_handler.nim index 06a0d3715..dcf8ba895 100644 --- a/logos_delivery/waku/waku_lightpush/self_req_handler.nim +++ b/logos_delivery/waku/waku_lightpush/self_req_handler.nim @@ -10,8 +10,8 @@ ## that could be used also as a lightpush client, helping testing and development. import results, chronos, std/options, metrics -import ../waku_core, ./protocol, ./common, ./rpc, ./rpc_codec, ../utils/requests - +import logos_delivery/api/types +import ../waku_core, ./protocol, ./common, ./rpc, ./rpc_codec proc handleSelfLightPushRequest*( self: WakuLightPush, pubSubTopic: Option[PubsubTopic], message: WakuMessage ): Future[WakuLightPushResult] {.async.} = diff --git a/logos_delivery/waku/waku_lightpush_legacy/client.nim b/logos_delivery/waku/waku_lightpush_legacy/client.nim index 511c3f543..ff7e694d3 100644 --- a/logos_delivery/waku/waku_lightpush_legacy/client.nim +++ b/logos_delivery/waku/waku_lightpush_legacy/client.nim @@ -4,10 +4,10 @@ import logos_delivery/waku/compat/option_valueor import std/options, results, chronicles, chronos, metrics, bearssl/rand, stew/byteutils import libp2p/peerid +import logos_delivery/api/types import ../waku_core/peers, ../node/peer_manager, - ../utils/requests, ../waku_core, ./common, ./protocol_metrics, diff --git a/logos_delivery/waku/waku_lightpush_legacy/self_req_handler.nim b/logos_delivery/waku/waku_lightpush_legacy/self_req_handler.nim index 3c5d09a9c..466d14f7c 100644 --- a/logos_delivery/waku/waku_lightpush_legacy/self_req_handler.nim +++ b/logos_delivery/waku/waku_lightpush_legacy/self_req_handler.nim @@ -10,14 +10,8 @@ ## that could be used also as a lightpush client, helping testing and development. import results, chronos, chronicles, std/options, metrics, stew/byteutils -import - ../waku_core, - ./protocol, - ./common, - ./rpc, - ./rpc_codec, - ./protocol_metrics, - ../utils/requests +import logos_delivery/api/types +import ../waku_core, ./protocol, ./common, ./rpc, ./rpc_codec, ./protocol_metrics proc handleSelfLightPushRequest*( self: WakuLegacyLightPush, pubSubTopic: PubsubTopic, message: WakuMessage diff --git a/logos_delivery/waku/waku_store/client.nim b/logos_delivery/waku/waku_store/client.nim index 21e2699e3..c93fad6a4 100644 --- a/logos_delivery/waku/waku_store/client.nim +++ b/logos_delivery/waku/waku_store/client.nim @@ -9,8 +9,8 @@ import chronos, metrics, bearssl/rand -import - ../node/peer_manager, ../utils/requests, ./protocol_metrics, ./common, ./rpc_codec +import logos_delivery/api/types +import ../node/peer_manager, ./protocol_metrics, ./common, ./rpc_codec logScope: topics = "waku store client" diff --git a/tests/api/test_api_receive.nim b/tests/api/test_api_receive.nim index 41c0f0477..ae8d6fcc2 100644 --- a/tests/api/test_api_receive.nim +++ b/tests/api/test_api_receive.nim @@ -5,6 +5,7 @@ import chronos, testutils/unittests, stew/byteutils import libp2p/[peerid, peerinfo, crypto/crypto] import brokers/broker_context import ../testlib/[common, wakucore, wakunode, testasync] +import logos_delivery/api/[types, logos_delivery_api, kernel_api, messaging_client_api] import ../waku_archive/archive_utils import logos_delivery/messaging/messaging_client import logos_delivery/messaging/delivery_service/recv_service @@ -14,7 +15,6 @@ import logos_delivery/waku/[ waku_node, waku_core, - events/message_events, events/health_events, waku_relay/protocol, waku_archive, diff --git a/tests/api/test_api_send.nim b/tests/api/test_api_send.nim index 0ad1cea4a..ffb8d3e46 100644 --- a/tests/api/test_api_send.nim +++ b/tests/api/test_api_send.nim @@ -4,6 +4,7 @@ import std/strutils import chronos, testutils/unittests, stew/byteutils, libp2p/[switch, peerinfo] import brokers/broker_context import ../testlib/[common, wakucore, wakunode, testasync] +import logos_delivery/api/[types, logos_delivery_api, kernel_api, messaging_client_api] import ../waku_archive/archive_utils import logos_delivery, logos_delivery/waku/[waku_node, waku_core, waku_relay/protocol] import logos_delivery/waku/factory/waku_conf diff --git a/tests/api/test_api_subscription.nim b/tests/api/test_api_subscription.nim index bf2851b02..50653108e 100644 --- a/tests/api/test_api_subscription.nim +++ b/tests/api/test_api_subscription.nim @@ -5,6 +5,7 @@ import chronos, testutils/unittests, stew/byteutils import libp2p/[peerid, peerinfo, multiaddress, crypto/crypto] import brokers/broker_context import ../testlib/[common, wakucore, wakunode, testasync] +import logos_delivery/api/[types, logos_delivery_api, kernel_api, messaging_client_api] import logos_delivery/messaging/messaging_client import @@ -12,7 +13,6 @@ import logos_delivery/waku/[ waku_node, waku_core, - events/message_events, waku_relay/protocol, node/waku_node/filter, node/subscription_manager, diff --git a/tests/channels/test_reliable_channel_send_receive.nim b/tests/channels/test_reliable_channel_send_receive.nim index b3d9a8f00..719ed857c 100644 --- a/tests/channels/test_reliable_channel_send_receive.nim +++ b/tests/channels/test_reliable_channel_send_receive.nim @@ -7,10 +7,11 @@ import brokers/broker_context import ../testlib/[common, wakucore, wakunode, testasync] +import logos_delivery/api/[types, logos_delivery_api, kernel_api, messaging_client_api] + import logos_delivery import logos_delivery/waku/[waku_node, waku_core] import logos_delivery/waku/factory/waku_conf -import logos_delivery/waku/events/message_events as waku_message_events import tools/confutils/cli_args import logos_delivery/channels/reliable_channel_manager @@ -99,9 +100,8 @@ suite "Reliable Channel - ingress": meta: LipWireReliableChannelVersion.toBytes(), ) - waku_message_events.MessageReceivedEvent.emit( - brokerCtx, - waku_message_events.MessageReceivedEvent(messageHash: "", message: inboundMsg), + MessageReceivedEvent.emit( + brokerCtx, MessageReceivedEvent(messageHash: "", message: inboundMsg) ) let arrived = await received.withTimeout(TestTimeout) @@ -151,9 +151,8 @@ suite "Reliable Channel - ingress": meta: @[], ## no Reliable Channel spec marker ) - waku_message_events.MessageReceivedEvent.emit( - brokerCtx, - waku_message_events.MessageReceivedEvent(messageHash: "", message: inboundMsg), + MessageReceivedEvent.emit( + brokerCtx, MessageReceivedEvent(messageHash: "", message: inboundMsg) ) ## Give the event broker a chance to fan out. @@ -217,9 +216,8 @@ suite "Reliable Channel - send state machine": await sleepAsync(5.milliseconds) check sendCalls == 1 - waku_message_events.MessageSentEvent.emit( - brokerCtx, - waku_message_events.MessageSentEvent(requestId: fakeMsgReqId, messageHash: ""), + MessageSentEvent.emit( + brokerCtx, MessageSentEvent(requestId: fakeMsgReqId, messageHash: "") ) let finalised = await sentFut.withTimeout(1.seconds) @@ -296,9 +294,8 @@ suite "Reliable Channel - send state machine": await sleepAsync(5.milliseconds) check msgReqIds.len == 2 - waku_message_events.MessageSentEvent.emit( - brokerCtx, - waku_message_events.MessageSentEvent(requestId: msgReqIds[0], messageHash: ""), + MessageSentEvent.emit( + brokerCtx, MessageSentEvent(requestId: msgReqIds[0], messageHash: "") ) let sentArrived = await sentFut.withTimeout(1.seconds) check sentArrived @@ -308,11 +305,9 @@ suite "Reliable Channel - send state machine": ## segment is still `InFlight`. check not erroredFut.finished() - waku_message_events.MessageErrorEvent.emit( + MessageErrorEvent.emit( brokerCtx, - waku_message_events.MessageErrorEvent( - requestId: msgReqIds[1], messageHash: "", error: "synthetic" - ), + MessageErrorEvent(requestId: msgReqIds[1], messageHash: "", error: "synthetic"), ) let erroredArrived = await erroredFut.withTimeout(1.seconds) check erroredArrived @@ -364,9 +359,8 @@ suite "Reliable Channel - send state machine": let id = RequestId("race-msg-req-" & $(msgReqIds.len + 1)) msgReqIds.add(id) if msgReqIds.len == 2: - waku_message_events.MessageSentEvent.emit( - brokerCtx, - waku_message_events.MessageSentEvent(requestId: msgReqIds[0], messageHash: ""), + MessageSentEvent.emit( + brokerCtx, MessageSentEvent(requestId: msgReqIds[0], messageHash: "") ) await sleepAsync(50.milliseconds) sendsReturned.inc() @@ -419,9 +413,8 @@ suite "Reliable Channel - send state machine": ## Finalise the second segment from the outside. If the race ## corrupted state, `channelReqId2`'s entry would never reach ## `inflightMessagingIds` and this event would silently miss. - waku_message_events.MessageSentEvent.emit( - brokerCtx, - waku_message_events.MessageSentEvent(requestId: msgReqIds[1], messageHash: ""), + MessageSentEvent.emit( + brokerCtx, MessageSentEvent(requestId: msgReqIds[1], messageHash: "") ) let arrived = await bothFinalised.withTimeout(2.seconds) @@ -556,9 +549,9 @@ suite "Reliable Channel - SDS lifecycle": ).expect("wrap m2") ## m2 first: missing dependency m1 -> parked, nothing delivered. - waku_message_events.MessageReceivedEvent.emit( + MessageReceivedEvent.emit( brokerCtx, - waku_message_events.MessageReceivedEvent( + MessageReceivedEvent( messageHash: "hash-m2", message: sdsWakuMessage(contentTopic, wire2) ), ) @@ -566,9 +559,9 @@ suite "Reliable Channel - SDS lifecycle": check deliveries.len == 0 ## m1 arrives: m1 delivered, then the parked m2 released after it. - waku_message_events.MessageReceivedEvent.emit( + MessageReceivedEvent.emit( brokerCtx, - waku_message_events.MessageReceivedEvent( + MessageReceivedEvent( messageHash: "hash-m1", message: sdsWakuMessage(contentTopic, wire1) ), ) @@ -622,15 +615,15 @@ suite "Reliable Channel - SDS lifecycle": ).expect("wrap") ## Same envelope twice (different hashes) — the second must be suppressed. - waku_message_events.MessageReceivedEvent.emit( + MessageReceivedEvent.emit( brokerCtx, - waku_message_events.MessageReceivedEvent( + MessageReceivedEvent( messageHash: "dup-hash-1", message: sdsWakuMessage(contentTopic, wire) ), ) - waku_message_events.MessageReceivedEvent.emit( + MessageReceivedEvent.emit( brokerCtx, - waku_message_events.MessageReceivedEvent( + MessageReceivedEvent( messageHash: "dup-hash-2", message: sdsWakuMessage(contentTopic, wire) ), ) @@ -678,9 +671,9 @@ suite "Reliable Channel - SDS lifecycle": ) ).expect("wrap") - waku_message_events.MessageReceivedEvent.emit( + MessageReceivedEvent.emit( brokerCtx, - waku_message_events.MessageReceivedEvent( + MessageReceivedEvent( messageHash: "foreign-hash", message: sdsWakuMessage(contentTopic, wire) ), ) @@ -738,9 +731,9 @@ suite "Reliable Channel - SDS lifecycle": ) ).expect("wrap") - waku_message_events.MessageReceivedEvent.emit( + MessageReceivedEvent.emit( brokerCtx, - waku_message_events.MessageReceivedEvent( + MessageReceivedEvent( messageHash: "restore-hash-1", message: sdsWakuMessage(contentTopic, wire) ), ) @@ -768,9 +761,9 @@ suite "Reliable Channel - SDS lifecycle": .expect("re-createReliableChannel") ## Replay the same envelope. Only a restored history suppresses it. - waku_message_events.MessageReceivedEvent.emit( + MessageReceivedEvent.emit( brokerCtx, - waku_message_events.MessageReceivedEvent( + MessageReceivedEvent( messageHash: "restore-hash-2", message: sdsWakuMessage(contentTopic, wire) ), ) @@ -820,9 +813,9 @@ suite "Reliable Channel - SDS protocol semantics": ).expect("wrap m1") let m1 = deserializeMessage(wire1).expect("deserialize m1") - waku_message_events.MessageReceivedEvent.emit( + MessageReceivedEvent.emit( brokerCtx, - waku_message_events.MessageReceivedEvent( + MessageReceivedEvent( messageHash: "semantics-hash-1", message: sdsWakuMessage(contentTopic, wire1) ), ) @@ -916,9 +909,9 @@ suite "Reliable Channel - SDS protocol semantics": ) ).expect("wrap ack carrier") - waku_message_events.MessageReceivedEvent.emit( + MessageReceivedEvent.emit( brokerCtx, - waku_message_events.MessageReceivedEvent( + MessageReceivedEvent( messageHash: "ack-hash-1", message: sdsWakuMessage(contentTopic, ackCarrier) ), ) @@ -982,9 +975,9 @@ suite "Reliable Channel - SDS protocol semantics": ## Deepest first: m3, then m2 — both must be parked. for i in [2, 1]: - waku_message_events.MessageReceivedEvent.emit( + MessageReceivedEvent.emit( brokerCtx, - waku_message_events.MessageReceivedEvent( + MessageReceivedEvent( messageHash: "chain-hash-" & $(i + 1), message: sdsWakuMessage(contentTopic, wires[i]), ), @@ -993,9 +986,9 @@ suite "Reliable Channel - SDS protocol semantics": check deliveries.len == 0 ## The root arrives: everything drains in causal order. - waku_message_events.MessageReceivedEvent.emit( + MessageReceivedEvent.emit( brokerCtx, - waku_message_events.MessageReceivedEvent( + MessageReceivedEvent( messageHash: "chain-hash-1", message: sdsWakuMessage(contentTopic, wires[0]) ), ) @@ -1054,9 +1047,9 @@ suite "Reliable Channel - SDS protocol semantics": ) let syncWire = serializeMessage(syncMsg).expect("serialize sync") - waku_message_events.MessageReceivedEvent.emit( + MessageReceivedEvent.emit( brokerCtx, - waku_message_events.MessageReceivedEvent( + MessageReceivedEvent( messageHash: "sync-hash-1", message: sdsWakuMessage(contentTopic, syncWire) ), ) @@ -1070,9 +1063,9 @@ suite "Reliable Channel - SDS protocol semantics": appPayload, "sync-m1", SdsChannelID(channelId) ) ).expect("wrap") - waku_message_events.MessageReceivedEvent.emit( + MessageReceivedEvent.emit( brokerCtx, - waku_message_events.MessageReceivedEvent( + MessageReceivedEvent( messageHash: "sync-hash-2", message: sdsWakuMessage(contentTopic, wire) ), ) @@ -1145,9 +1138,9 @@ suite "Reliable Channel - SDS protocol semantics": appPayload, "unique-m" & $i, SdsChannelID(channelId) ) ).expect("wrap " & $i) - waku_message_events.MessageReceivedEvent.emit( + MessageReceivedEvent.emit( brokerCtx, - waku_message_events.MessageReceivedEvent( + MessageReceivedEvent( messageHash: "unique-hash-" & $i, message: sdsWakuMessage(contentTopic, wire) ), )