diff --git a/channels/reliable_channel.nim b/channels/reliable_channel.nim index 11fbefba9..664b37834 100644 --- a/channels/reliable_channel.nim +++ b/channels/reliable_channel.nim @@ -20,7 +20,8 @@ import bearssl/rand import stew/byteutils import libp2p/crypto/crypto as libp2p_crypto -import waku/node/delivery_service/delivery_service +import waku/api/api +import waku/factory/waku as waku_factory import waku/node/delivery_service/send_service import waku/waku_core/topics @@ -31,8 +32,8 @@ import ./rate_limit_manager/rate_limit_manager import ./encryption/encryption export - delivery_service, send_service, events, segmentation, scalable_data_sync, - rate_limit_manager, encryption + api, waku_factory, events, segmentation, scalable_data_sync, rate_limit_manager, + encryption const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1" ## Wire-format spec marker for the Reliable Channel layer, as defined @@ -80,7 +81,7 @@ type ## Spec-defined public type. Fields are private so callers cannot ## mutate internals and break invariants. Getters are added below ## for the few values consumers may need. - deliveryService: DeliveryService + waku: Waku channelId: ChannelId contentTopic: ContentTopic senderId: SdsParticipantID @@ -224,38 +225,41 @@ proc onReadyToSend( continue let wireBytes = seq[byte](encrypted) + ## The `meta` field carries the Reliable Channel wire-format spec + ## marker so the ingress side of any peer can route this WakuMessage + ## to its Reliable Channel layer. let envelope = MessageEnvelope( - contentTopic: self.contentTopic, payload: wireBytes, ephemeral: isEphemeral + contentTopic: self.contentTopic, + payload: wireBytes, + ephemeral: isEphemeral, + meta: LipWireReliableChannelVersion.toBytes(), ) - let messagingReqId = RequestId.new(self.rng) - let deliveryTask = DeliveryTask.new( - messagingReqId, envelope, self.brokerCtx - ).valueOr: + ## `waku.send` is not annotated `(raises: [])`, but this listener is. + ## Convert any raise to a Result error so the state machine handles + ## both failure modes (Result.err and exception) through one path. + let sendRes = + try: + await self.waku.send(envelope) + except CatchableError as e: + Result[RequestId, string].err("waku send raised: " & e.msg) + + let messagingReqId = sendRes.valueOr: MessageErrorEvent.emit( self.brokerCtx, MessageErrorEvent( requestId: channelReqId, messageHash: "", - error: "delivery task setup failed: " & error, + error: "waku send failed: " & error, ), ) - self.pendingMessagingRequests[idx].messagingReqId = some(messagingReqId) self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.Failed self.pruneCompletedChannelReqs() idx.inc() continue - ## Stamp the Reliable Channel wire-format spec marker so the ingress - ## side of any peer can route this WakuMessage to its Reliable - ## Channel layer. Done on the constructed WakuMessage rather than - ## via the envelope because `MessageEnvelope` does not expose a - ## `meta` field. - deliveryTask.msg.meta = LipWireReliableChannelVersion.toBytes() - self.pendingMessagingRequests[idx].messagingReqId = some(messagingReqId) self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.InFlight - asyncSpawn self.deliveryService.sendService.send(deliveryTask) self.requestIds.mgetOrPut(channelReqId, @[]).add(messagingReqId) idx.inc() @@ -352,7 +356,7 @@ proc onMessageReceived( proc new*( T: type ReliableChannel, - deliveryService: DeliveryService, + waku: Waku, channelId: ChannelId, contentTopic: ContentTopic, senderId: SdsParticipantID, @@ -368,7 +372,7 @@ proc new*( ## `Decrypt` request brokers, so the channel keeps no per-instance ## encryption state either. let chn = T( - deliveryService: deliveryService, + waku: waku, channelId: channelId, contentTopic: contentTopic, senderId: senderId, diff --git a/channels/reliable_channel_manager.nim b/channels/reliable_channel_manager.nim index b95a67263..e343d0ec4 100644 --- a/channels/reliable_channel_manager.nim +++ b/channels/reliable_channel_manager.nim @@ -14,6 +14,7 @@ import waku/api/api import waku/api/api_conf import waku/events/message_events as waku_message_events import waku/factory/waku as waku_factory +import waku/node/delivery_service/delivery_service import waku/waku_core/topics import ./reliable_channel @@ -23,11 +24,10 @@ export reliable_channel type ReliableChannelManager* = ref object channels: Table[ChannelId, ReliableChannel] - deliveryService: DeliveryService - ## Owned by the manager. The ownership chain is - ## ReliableChannelManager -> DeliveryService -> Waku -> WakuNode. - ## Hidden so callers can't substitute their own and bypass the - ## manager's pipeline. + waku: Waku + ## Owned by the manager. The channel layer reaches the messaging + ## API through `waku.send(envelope)`; constructing DeliveryTasks + ## directly would breach the layer boundary. brokerCtx: BrokerContext proc new*( @@ -38,14 +38,14 @@ proc new*( ## TODO !! The proper ownership chain is: ## ReliableChannelManager -> DeliveryService (MessagingClient) -> Waku (Kernel/Protocols) -> WakuNode, ## and this will be implemented in the future. For now, `createNode` - ## is called here to get a DeliveryService instance, and the WakuNode is immediately discarded. + ## is called here to get a Waku instance, and the WakuNode is immediately discarded. ## This is a temporary workaround to get the API let waku = ?(await createNode(conf)) let manager = T( channels: initTable[ChannelId, ReliableChannel](), - deliveryService: waku.deliveryService, + waku: waku, brokerCtx: brokerCtx, ) @@ -55,11 +55,11 @@ proc start*(self: ReliableChannelManager): Result[void, string] = ## Bring the owned DeliveryService up. Separated from `new` so callers ## can register encryption providers / create channels before traffic ## starts flowing. - self.deliveryService.startDeliveryService() + self.waku.deliveryService.startDeliveryService() proc stop*(self: ReliableChannelManager) {.async.} = - if not self.deliveryService.isNil(): - await self.deliveryService.stopDeliveryService() + if not self.waku.isNil(): + await self.waku.deliveryService.stopDeliveryService() proc getChannelForTest*( self: ReliableChannelManager, channelId: ChannelId @@ -103,7 +103,7 @@ proc createReliableChannel*( ) let chn = ReliableChannel.new( - deliveryService = self.deliveryService, + waku = self.waku, channelId = channelId, contentTopic = contentTopic, senderId = senderId,