diff --git a/logos_delivery.nimble b/logos_delivery.nimble index efe118595..8c4d32cb5 100644 --- a/logos_delivery.nimble +++ b/logos_delivery.nimble @@ -65,7 +65,7 @@ requires "https://github.com/logos-messaging/nim-ffi#v0.1.3" requires "https://github.com/logos-messaging/nim-sds.git#b12f5ee07c5b764303b51fb948b32a4ade1de3b5" -requires "https://github.com/NagyZoltanPeter/nim-brokers.git#v3.1.1" +requires "https://github.com/NagyZoltanPeter/nim-brokers.git#v3.1.4" requires "https://github.com/vacp2p/nim-lsquic.git#v0.5.1" requires "https://github.com/vacp2p/nim-jwt.git#057ec95eb5af0eea9c49bfe9025b3312c95dc5f2" diff --git a/logos_delivery/api/reliable_channel_manager_api.nim b/logos_delivery/api/reliable_channel_manager_api.nim index 206ef0df7..ec337beb7 100644 --- a/logos_delivery/api/reliable_channel_manager_api.nim +++ b/logos_delivery/api/reliable_channel_manager_api.nim @@ -44,7 +44,6 @@ method createReliableChannel*( channelId: ChannelId, contentTopic: ContentTopic, senderId: SdsParticipantID, - sendHandler: SendHandler = nil, ): Result[ChannelId, string] {.base.} = return err("Interface IReliableChannelManager.createReliableChannel not implemented") diff --git a/logos_delivery/channels/reliable_channel.nim b/logos_delivery/channels/reliable_channel.nim index c52d25653..975679557 100644 --- a/logos_delivery/channels/reliable_channel.nim +++ b/logos_delivery/channels/reliable_channel.nim @@ -22,10 +22,10 @@ import stew/byteutils import libp2p/crypto/crypto as libp2p_crypto import logos_delivery/api/types -import logos_delivery/messaging/delivery_service/send_service import logos_delivery/waku/waku_core/topics import logos_delivery/api/reliable_channel_manager_api import logos_delivery/api/messaging_client_api +import logos_delivery/messaging/messaging_client import ./segmentation/segmentation import ./scalable_data_sync/scalable_data_sync @@ -33,7 +33,7 @@ import ./rate_limit_manager/rate_limit_manager import ./encryption/encryption export - types, send_service, reliable_channel_manager_api, segmentation, scalable_data_sync, + types, reliable_channel_manager_api, segmentation, scalable_data_sync, rate_limit_manager, encryption const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1" @@ -83,7 +83,6 @@ type ## Spec-defined public type. Fields are private so callers cannot ## mutate internals and break invariants. Getters are added below ## for the few values consumers may need. - sendHandler: SendHandler channelId: ChannelId contentTopic: ContentTopic senderId: SdsParticipantID @@ -262,14 +261,7 @@ proc onReadyToSend( meta: LipWireReliableChannelVersion.toBytes(), ) - ## `sendHandler` is not annotated `(raises: [])`, but this listener is. - ## Convert any raise to a Result error so the state machine handles - ## both failure modes (Result.err and exception) through one path. - let sendRes = - try: - await self.sendHandler(envelope) - except CatchableError as e: - Result[RequestId, string].err("messaging send raised: " & e.msg) + let sendRes = await MessagingSend.request(self.brokerCtx, envelope) let messagingReqId = sendRes.valueOr: MessageErrorEvent.emit( @@ -359,14 +351,9 @@ proc dispatchRepair(self: ReliableChannel, wire: seq[byte]) {.async: (raises: [] meta: LipWireReliableChannelVersion.toBytes(), ) - let sendRes = - try: - await self.sendHandler(envelope) - except CatchableError as e: - Result[RequestId, string].err("messaging send raised: " & e.msg) - if sendRes.isErr(): + (await MessagingSend.request(self.brokerCtx, envelope)).isOkOr: debug "SDS repair rebroadcast dropped: dispatch failed", - channelId = self.channelId, error = sendRes.error + channelId = self.channelId, error = error proc onMessageReceived( self: ReliableChannel, messageHash: string, payload: seq[byte] @@ -414,7 +401,6 @@ proc onMessageReceived( proc new*( T: type ReliableChannel, - sendHandler: SendHandler, channelId: ChannelId, contentTopic: ContentTopic, senderId: SdsParticipantID, @@ -429,12 +415,7 @@ proc new*( ## should be wiring up. Encryption is delegated to the `Encrypt`/ ## `Decrypt` request brokers, so the channel keeps no per-instance ## encryption state either. - ## - ## `sendHandler` is the egress dispatch. The owning `ReliableChannelManager` - ## typically constructs it as a closure over `MessagingClient.send`. Tests - ## pass a fake to drive the send state machine without touching the network. let chn = T( - sendHandler: sendHandler, channelId: channelId, contentTopic: contentTopic, senderId: senderId, diff --git a/logos_delivery/channels/reliable_channel_manager.nim b/logos_delivery/channels/reliable_channel_manager.nim index 88d1d1787..2dbe924de 100644 --- a/logos_delivery/channels/reliable_channel_manager.nim +++ b/logos_delivery/channels/reliable_channel_manager.nim @@ -36,38 +36,14 @@ type ReliableChannelManager* = ref object of IReliableChannelManager channels: Table[ChannelId, ReliableChannel] - messagingClient: MessagingClient ## The channel layer chains onto messaging. - sendHandler: SendHandler - ## Default egress dispatch for channels created through this manager. - ## Built in `new` as a closure over `MessagingClient.send` so the channel - ## layer itself stays callable-only. brokerCtx: BrokerContext proc new*( T: type ReliableChannelManager, conf: ReliableChannelManagerConf, - messagingClient: MessagingClient, brokerCtx: BrokerContext = globalBrokerContext(), ): Result[T, string] = - ## The reliable channel layer chains onto the messaging layer: its default - ## egress is `MessagingClient.send`, wrapped here so callers never wire the - ## handler themselves. - if messagingClient.isNil(): - return err("messaging client is required") - - let defaultSendHandler: SendHandler = proc( - envelope: MessageEnvelope - ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = - return await messagingClient.send(envelope) - - return ok( - T( - channels: initTable[ChannelId, ReliableChannel](), - messagingClient: messagingClient, - sendHandler: defaultSendHandler, - brokerCtx: brokerCtx, - ) - ) + return ok(T(channels: initTable[ChannelId, ReliableChannel](), brokerCtx: brokerCtx)) proc start*(self: ReliableChannelManager): Result[void, string] = ## Placeholder: per-channel listeners are installed in `ReliableChannel.new`, @@ -99,9 +75,8 @@ method createReliableChannel*( channelId: ChannelId, contentTopic: ContentTopic, senderId: SdsParticipantID, - sendHandler: SendHandler = nil, ): Result[ChannelId, string] {.raises: [].} = - ## Spec entry point. The `sendHandler` and `rng` the channel needs are + ## Spec entry point. The `rng` the channel needs are ## sourced from the owning `ReliableChannelManager` rather than passed ## per call. Encryption is wired up through the `Encrypt`/`Decrypt` ## request brokers — the application installs its own providers @@ -109,9 +84,6 @@ method createReliableChannel*( ## ## Segmentation, SDS and rate-limit configs will eventually be read ## from the node's `NodeConfig`. Defaults for now. - ## - ## `sendHandler` defaults to the manager's default (constructed at mount - ## from `MessagingClient.send`); tests pass a fake to bypass the network. if self.channels.hasKey(channelId): return err("channel already exists: " & channelId) @@ -130,10 +102,7 @@ method createReliableChannel*( epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch ) - let effectiveSendHandler = if sendHandler.isNil(): self.sendHandler else: sendHandler - let chn = ReliableChannel.new( - sendHandler = effectiveSendHandler, channelId = channelId, contentTopic = contentTopic, senderId = senderId, diff --git a/logos_delivery/logos_delivery.nim b/logos_delivery/logos_delivery.nim index f739851e7..6bee5c87f 100644 --- a/logos_delivery/logos_delivery.nim +++ b/logos_delivery/logos_delivery.nim @@ -70,7 +70,7 @@ proc new*( return err("failed to create MessagingClient: " & error) let reliableChannelManager = ReliableChannelManager.new( - layerConf.reliableChannel, messagingClient, waku.brokerCtx + layerConf.reliableChannel, waku.brokerCtx ).valueOr: return err("failed to create ReliableChannelManager: " & error) diff --git a/logos_delivery/messaging/messaging_client.nim b/logos_delivery/messaging/messaging_client.nim index 6db970a72..58fcbb36d 100644 --- a/logos_delivery/messaging/messaging_client.nim +++ b/logos_delivery/messaging/messaging_client.nim @@ -1,5 +1,6 @@ import results, chronos import chronicles +import brokers/[request_broker, broker_context] import logos_delivery/api/types, logos_delivery/api/messaging_client_api, @@ -7,6 +8,11 @@ import logos_delivery/messaging/delivery_service/[recv_service, send_service], logos_delivery/messaging/delivery_service/send_service/delivery_task +RequestBroker: + proc MessagingSend( + envelope: MessageEnvelope + ): Future[Result[RequestId, string]] {.async.} + type MessagingClientConf* = object ## Per-layer config object for the messaging API. @@ -15,6 +21,7 @@ type useP2PReliability*: bool MessagingClient* = ref object of IMessagingClient + brokerCtx: BrokerContext node: WakuNode sendService*: SendService recvService*: RecvService @@ -27,19 +34,34 @@ proc new*( ## `WakuNode` (Waku's core) for transport while exposing its own send/recv API. let sendService = ?SendService.new(conf.useP2PReliability, node) let recvService = RecvService.new(node) - ok(T(node: node, sendService: sendService, recvService: recvService)) + ok( + T( + node: node, + sendService: sendService, + recvService: recvService, + brokerCtx: node.brokerCtx, + ) + ) proc start*(self: MessagingClient): Result[void, string] = if self.started: return ok() self.recvService.startRecvService() self.sendService.startSendService() + + ?MessagingSend.setProvider( + self.brokerCtx, + proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.async.} = + return await self.send(envelope), + ) + self.started = true ok() proc stop*(self: MessagingClient) {.async.} = if not self.started: return + MessagingSend.clearProvider(self.brokerCtx) await self.sendService.stopSendService() await self.recvService.stopRecvService() self.started = false diff --git a/logos_delivery/waku/persistency/backend_thread.nim b/logos_delivery/waku/persistency/backend_thread.nim index e32e5c209..28f328729 100644 --- a/logos_delivery/waku/persistency/backend_thread.nim +++ b/logos_delivery/waku/persistency/backend_thread.nim @@ -162,13 +162,13 @@ proc registerProviders(backend: KvBackend, ctx: BrokerContext): Result[void, str return ok() -proc clearProviders(ctx: BrokerContext) = +proc clearProviders(ctx: BrokerContext) {.async.} = KvGet.clearProvider(ctx) KvExists.clearProvider(ctx) KvScan.clearProvider(ctx) KvCount.clearProvider(ctx) KvDelete.clearProvider(ctx) - PersistEvent.dropAllListeners(ctx) + await PersistEvent.dropAllListeners(ctx) # ── thread proc ───────────────────────────────────────────────────────── @@ -217,7 +217,7 @@ proc storageThreadMain(arg: ptr StorageThreadArg) {.thread.} = except CatchableError as e: error "storage thread loop crashed", err = e.msg - clearProviders(arg.ctx) + waitFor clearProviders(arg.ctx) backend.close() # ── lifecycle ─────────────────────────────────────────────────────────── diff --git a/logos_delivery/waku/persistency/persistency.nim b/logos_delivery/waku/persistency/persistency.nim index fca6ad34d..d6ac28a1e 100644 --- a/logos_delivery/waku/persistency/persistency.nim +++ b/logos_delivery/waku/persistency/persistency.nim @@ -272,7 +272,7 @@ proc hasJob*(p: Persistency, jobId: string): bool {.inline.} = proc persist*(t: Job, ops: seq[TxOp]): Future[void] {.async.} = ## Emit a batched persist event. The handler treats >1 ops as a single ## BEGIN IMMEDIATE/COMMIT transaction (see backend_sqlite.applyOps). - await PersistEvent.emit(t.context, PersistEvent(ops: ops)) + PersistEvent.emit(t.context, PersistEvent(ops: ops)) proc persist*(t: Job, op: TxOp): Future[void] {.async.} = await persist(t, @[op]) diff --git a/nimble.lock b/nimble.lock index 18ebde258..03bb7a434 100644 --- a/nimble.lock +++ b/nimble.lock @@ -328,8 +328,8 @@ } }, "brokers": { - "version": "#v3.1.1", - "vcsRevision": "a7316a35f1b62e3497ae8ee0fc1aace74df0beb2", + "version": "#v3.1.4", + "vcsRevision": "8ae9e963b0b4478c93e6f888be6a46654da787de", "url": "https://github.com/NagyZoltanPeter/nim-brokers.git", "downloadMethod": "git", "dependencies": [ @@ -341,7 +341,7 @@ "cbor_serialization" ], "checksums": { - "sha1": "4447d7c1f9da14ae439afb23aee45116ce2ecb40" + "sha1": "2949398033c1b3a2586f0afd7e92f46b9a0a412e" } }, "stint": { diff --git a/tests/channels/test_reliable_channel_send_receive.nim b/tests/channels/test_reliable_channel_send_receive.nim index 719ed857c..c6a53bae4 100644 --- a/tests/channels/test_reliable_channel_send_receive.nim +++ b/tests/channels/test_reliable_channel_send_receive.nim @@ -186,16 +186,16 @@ suite "Reliable Channel - send state machine": setNoopEncryption() var sendCalls = 0 - let fakeSend: SendHandler = proc( - env: MessageEnvelope - ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = - sendCalls.inc - return ok(fakeMsgReqId) + MessagingSend.replaceProvider( + brokerCtx, + proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.async.} = + sendCalls.inc + return ok(fakeMsgReqId), + ).isOkOr: + raiseAssert "replaceProvider failed: " & error discard manager - .createReliableChannel( - channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend - ) + .createReliableChannel(channelId, contentTopic, SdsParticipantID("local")) .expect("createReliableChannel") let sentFut = newFuture[RequestId]("channel-sent") @@ -250,17 +250,17 @@ suite "Reliable Channel - send state machine": setNoopEncryption() var msgReqIds: seq[RequestId] - let fakeSend: SendHandler = proc( - env: MessageEnvelope - ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = - let id = RequestId("fake-msg-req-" & $(msgReqIds.len + 1)) - msgReqIds.add(id) - return ok(id) + MessagingSend.replaceProvider( + brokerCtx, + proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.async.} = + let id = RequestId("fake-msg-req-" & $(msgReqIds.len + 1)) + msgReqIds.add(id) + return ok(id), + ).isOkOr: + raiseAssert "replaceProvider failed: " & error discard manager - .createReliableChannel( - channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend - ) + .createReliableChannel(channelId, contentTopic, SdsParticipantID("local")) .expect("createReliableChannel") let sentFut = newFuture[RequestId]("channel-sent") @@ -349,27 +349,27 @@ suite "Reliable Channel - send state machine": var msgReqIds: seq[RequestId] var sendsReturned = 0 - let fakeSend: SendHandler = proc( - env: MessageEnvelope - ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = - ## Call 2 fires the first segment's terminal event and then - ## yields, so the listener task runs while the second segment - ## is still mid-`await` in `onReadyToSend` — the exact race - ## window the regression test targets. - let id = RequestId("race-msg-req-" & $(msgReqIds.len + 1)) - msgReqIds.add(id) - if msgReqIds.len == 2: - MessageSentEvent.emit( - brokerCtx, MessageSentEvent(requestId: msgReqIds[0], messageHash: "") - ) - await sleepAsync(50.milliseconds) - sendsReturned.inc() - return ok(id) + MessagingSend.replaceProvider( + brokerCtx, + proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.async.} = + ## Call 2 fires the first segment's terminal event and then + ## yields, so the listener task runs while the second segment + ## is still mid-`await` in `onReadyToSend` — the exact race + ## window the regression test targets. + let id = RequestId("race-msg-req-" & $(msgReqIds.len + 1)) + msgReqIds.add(id) + if msgReqIds.len == 2: + MessageSentEvent.emit( + brokerCtx, MessageSentEvent(requestId: msgReqIds[0], messageHash: "") + ) + await sleepAsync(50.milliseconds) + sendsReturned.inc() + return ok(id), + ).isOkOr: + raiseAssert "replaceProvider failed: " & error discard manager - .createReliableChannel( - channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend - ) + .createReliableChannel(channelId, contentTopic, SdsParticipantID("local")) .expect("createReliableChannel") var finalisedReqIds: seq[RequestId] @@ -450,15 +450,15 @@ suite "Reliable Channel - SDS persistence": setNoopEncryption() - let fakeSend: SendHandler = proc( - env: MessageEnvelope - ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = - return ok(RequestId("persist-msg-req-1")) + MessagingSend.replaceProvider( + globalBrokerContext(), + proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.async.} = + return ok(RequestId("persist-msg-req-1")), + ).isOkOr: + raiseAssert "replaceProvider failed: " & error discard manager - .createReliableChannel( - channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend - ) + .createReliableChannel(channelId, contentTopic, SdsParticipantID("local")) .expect("createReliableChannel") discard (await manager.send(channelId, "persist me".toBytes())).expect("send") @@ -791,17 +791,17 @@ suite "Reliable Channel - SDS protocol semantics": setNoopEncryption() var capturedWires: seq[seq[byte]] - let fakeSend: SendHandler = proc( - env: MessageEnvelope - ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = - ## Noop encryption is identity, so the envelope payload IS the SDS wire. - capturedWires.add(env.payload) - return ok(RequestId("semantics-req-" & $capturedWires.len)) + MessagingSend.replaceProvider( + brokerCtx, + proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.async.} = + ## Noop encryption is identity, so the envelope payload IS the SDS wire. + capturedWires.add(envelope.payload) + return ok(RequestId("semantics-req-" & $capturedWires.len)), + ).isOkOr: + raiseAssert "replaceProvider failed: " & error discard manager - .createReliableChannel( - channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend - ) + .createReliableChannel(channelId, contentTopic, SdsParticipantID("local")) .expect("createReliableChannel") let remotePeer = @@ -859,16 +859,17 @@ suite "Reliable Channel - SDS protocol semantics": setNoopEncryption() var capturedWires: seq[seq[byte]] - let fakeSend: SendHandler = proc( - env: MessageEnvelope - ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = - capturedWires.add(env.payload) - return ok(RequestId("ack-req-" & $capturedWires.len)) + MessagingSend.replaceProvider( + brokerCtx, + proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.async.} = + ## Noop encryption is identity, so the envelope payload IS the SDS wire. + capturedWires.add(envelope.payload) + return ok(RequestId("ack-req-" & $capturedWires.len)), + ).isOkOr: + raiseAssert "replaceProvider failed: " & error discard manager - .createReliableChannel( - channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend - ) + .createReliableChannel(channelId, contentTopic, SdsParticipantID("local")) .expect("createReliableChannel") discard (await manager.send(channelId, "needs ack".toBytes())).expect("send") @@ -1095,16 +1096,17 @@ suite "Reliable Channel - SDS protocol semantics": setNoopEncryption() var capturedWires: seq[seq[byte]] - let fakeSend: SendHandler = proc( - env: MessageEnvelope - ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = - capturedWires.add(env.payload) - return ok(RequestId("unique-req-" & $capturedWires.len)) + MessagingSend.replaceProvider( + brokerCtx, + proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.async.} = + ## Noop encryption is identity, so the envelope payload IS the SDS wire. + capturedWires.add(envelope.payload) + return ok(RequestId("unique-req-" & $capturedWires.len)), + ).isOkOr: + raiseAssert "replaceProvider failed: " & error discard manager - .createReliableChannel( - channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend - ) + .createReliableChannel(channelId, contentTopic, SdsParticipantID("local")) .expect("createReliableChannel") var deliveries: seq[seq[byte]] diff --git a/tests/persistency/test_lifecycle.nim b/tests/persistency/test_lifecycle.nim index 2626cb01b..3006672da 100644 --- a/tests/persistency/test_lifecycle.nim +++ b/tests/persistency/test_lifecycle.nim @@ -156,7 +156,7 @@ suite "Persistency lifecycle": let ev = PersistEvent( ops: @[TxOp(category: "msg", key: k, kind: txPut, payload: payloadBytes("hello"))] ) - await PersistEvent.emit(t.context, ev) + PersistEvent.emit(t.context, ev) let ckOk2 = await t.pollExists("msg", k) check ckOk2 @@ -178,7 +178,7 @@ suite "Persistency lifecycle": check a.context != b.context let k = key("shared", 1'i64) - await PersistEvent.emit( + PersistEvent.emit( a.context, PersistEvent( ops: @[ @@ -188,7 +188,7 @@ suite "Persistency lifecycle": ] ), ) - await PersistEvent.emit( + PersistEvent.emit( b.context, PersistEvent( ops: @[ @@ -255,7 +255,7 @@ suite "Persistency lifecycle": ops.add( TxOp(category: "msg", key: key("c", i), kind: txPut, payload: payloadBytes($i)) ) - await PersistEvent.emit(t.context, PersistEvent(ops: ops)) + PersistEvent.emit(t.context, PersistEvent(ops: ops)) # Wait for the last insert to land. let ckOk5 = await t.pollExists("msg", key("c", 5'i64)) check ckOk5 @@ -285,7 +285,7 @@ suite "Persistency lifecycle": let r1 = aw7.get() check r1.existed == false - await PersistEvent.emit( + PersistEvent.emit( t.context, PersistEvent( ops: @[TxOp(category: "msg", key: k, kind: txPut, payload: payloadBytes("v"))]