Merge 4995b86d134547c922b3eb1722593632fbf193bc into 8f9ddc80cec1719de64fffd245df6e17612a41f6

This commit is contained in:
NagyZoltanPeter 2026-06-25 10:17:34 +02:00 committed by GitHub
commit d2ab955d2f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 114 additions and 141 deletions

View File

@ -65,7 +65,7 @@ requires "https://github.com/logos-messaging/nim-ffi#v0.1.3"
requires "https://github.com/logos-messaging/nim-sds.git#b12f5ee07c5b764303b51fb948b32a4ade1de3b5"
requires "https://github.com/NagyZoltanPeter/nim-brokers.git#v3.1.1"
requires "https://github.com/NagyZoltanPeter/nim-brokers.git#v3.1.4"
requires "https://github.com/vacp2p/nim-lsquic.git#v0.5.1"
requires "https://github.com/vacp2p/nim-jwt.git#057ec95eb5af0eea9c49bfe9025b3312c95dc5f2"

View File

@ -44,7 +44,6 @@ method createReliableChannel*(
channelId: ChannelId,
contentTopic: ContentTopic,
senderId: SdsParticipantID,
sendHandler: SendHandler = nil,
): Result[ChannelId, string] {.base.} =
return err("Interface IReliableChannelManager.createReliableChannel not implemented")

View File

@ -22,10 +22,10 @@ import stew/byteutils
import libp2p/crypto/crypto as libp2p_crypto
import logos_delivery/api/types
import logos_delivery/messaging/delivery_service/send_service
import logos_delivery/waku/waku_core/topics
import logos_delivery/api/reliable_channel_manager_api
import logos_delivery/api/messaging_client_api
import logos_delivery/messaging/messaging_client
import ./segmentation/segmentation
import ./scalable_data_sync/scalable_data_sync
@ -33,7 +33,7 @@ import ./rate_limit_manager/rate_limit_manager
import ./encryption/encryption
export
types, send_service, reliable_channel_manager_api, segmentation, scalable_data_sync,
types, reliable_channel_manager_api, segmentation, scalable_data_sync,
rate_limit_manager, encryption
const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1"
@ -83,7 +83,6 @@ type
## Spec-defined public type. Fields are private so callers cannot
## mutate internals and break invariants. Getters are added below
## for the few values consumers may need.
sendHandler: SendHandler
channelId: ChannelId
contentTopic: ContentTopic
senderId: SdsParticipantID
@ -262,14 +261,7 @@ proc onReadyToSend(
meta: LipWireReliableChannelVersion.toBytes(),
)
## `sendHandler` is not annotated `(raises: [])`, but this listener is.
## Convert any raise to a Result error so the state machine handles
## both failure modes (Result.err and exception) through one path.
let sendRes =
try:
await self.sendHandler(envelope)
except CatchableError as e:
Result[RequestId, string].err("messaging send raised: " & e.msg)
let sendRes = await MessagingSend.request(self.brokerCtx, envelope)
let messagingReqId = sendRes.valueOr:
MessageErrorEvent.emit(
@ -359,14 +351,9 @@ proc dispatchRepair(self: ReliableChannel, wire: seq[byte]) {.async: (raises: []
meta: LipWireReliableChannelVersion.toBytes(),
)
let sendRes =
try:
await self.sendHandler(envelope)
except CatchableError as e:
Result[RequestId, string].err("messaging send raised: " & e.msg)
if sendRes.isErr():
(await MessagingSend.request(self.brokerCtx, envelope)).isOkOr:
debug "SDS repair rebroadcast dropped: dispatch failed",
channelId = self.channelId, error = sendRes.error
channelId = self.channelId, error = error
proc onMessageReceived(
self: ReliableChannel, messageHash: string, payload: seq[byte]
@ -414,7 +401,6 @@ proc onMessageReceived(
proc new*(
T: type ReliableChannel,
sendHandler: SendHandler,
channelId: ChannelId,
contentTopic: ContentTopic,
senderId: SdsParticipantID,
@ -429,12 +415,7 @@ proc new*(
## should be wiring up. Encryption is delegated to the `Encrypt`/
## `Decrypt` request brokers, so the channel keeps no per-instance
## encryption state either.
##
## `sendHandler` is the egress dispatch. The owning `ReliableChannelManager`
## typically constructs it as a closure over `MessagingClient.send`. Tests
## pass a fake to drive the send state machine without touching the network.
let chn = T(
sendHandler: sendHandler,
channelId: channelId,
contentTopic: contentTopic,
senderId: senderId,

View File

@ -36,38 +36,14 @@ type
ReliableChannelManager* = ref object of IReliableChannelManager
channels: Table[ChannelId, ReliableChannel]
messagingClient: MessagingClient ## The channel layer chains onto messaging.
sendHandler: SendHandler
## Default egress dispatch for channels created through this manager.
## Built in `new` as a closure over `MessagingClient.send` so the channel
## layer itself stays callable-only.
brokerCtx: BrokerContext
proc new*(
T: type ReliableChannelManager,
conf: ReliableChannelManagerConf,
messagingClient: MessagingClient,
brokerCtx: BrokerContext = globalBrokerContext(),
): Result[T, string] =
## The reliable channel layer chains onto the messaging layer: its default
## egress is `MessagingClient.send`, wrapped here so callers never wire the
## handler themselves.
if messagingClient.isNil():
return err("messaging client is required")
let defaultSendHandler: SendHandler = proc(
envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} =
return await messagingClient.send(envelope)
return ok(
T(
channels: initTable[ChannelId, ReliableChannel](),
messagingClient: messagingClient,
sendHandler: defaultSendHandler,
brokerCtx: brokerCtx,
)
)
return ok(T(channels: initTable[ChannelId, ReliableChannel](), brokerCtx: brokerCtx))
proc start*(self: ReliableChannelManager): Result[void, string] =
## Placeholder: per-channel listeners are installed in `ReliableChannel.new`,
@ -99,9 +75,8 @@ method createReliableChannel*(
channelId: ChannelId,
contentTopic: ContentTopic,
senderId: SdsParticipantID,
sendHandler: SendHandler = nil,
): Result[ChannelId, string] {.raises: [].} =
## Spec entry point. The `sendHandler` and `rng` the channel needs are
## Spec entry point. The `rng` the channel needs are
## sourced from the owning `ReliableChannelManager` rather than passed
## per call. Encryption is wired up through the `Encrypt`/`Decrypt`
## request brokers — the application installs its own providers
@ -109,9 +84,6 @@ method createReliableChannel*(
##
## Segmentation, SDS and rate-limit configs will eventually be read
## from the node's `NodeConfig`. Defaults for now.
##
## `sendHandler` defaults to the manager's default (constructed at mount
## from `MessagingClient.send`); tests pass a fake to bypass the network.
if self.channels.hasKey(channelId):
return err("channel already exists: " & channelId)
@ -130,10 +102,7 @@ method createReliableChannel*(
epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch
)
let effectiveSendHandler = if sendHandler.isNil(): self.sendHandler else: sendHandler
let chn = ReliableChannel.new(
sendHandler = effectiveSendHandler,
channelId = channelId,
contentTopic = contentTopic,
senderId = senderId,

View File

@ -70,7 +70,7 @@ proc new*(
return err("failed to create MessagingClient: " & error)
let reliableChannelManager = ReliableChannelManager.new(
layerConf.reliableChannel, messagingClient, waku.brokerCtx
layerConf.reliableChannel, waku.brokerCtx
).valueOr:
return err("failed to create ReliableChannelManager: " & error)

View File

@ -1,5 +1,6 @@
import results, chronos
import chronicles
import brokers/[request_broker, broker_context]
import
logos_delivery/api/types,
logos_delivery/api/messaging_client_api,
@ -7,6 +8,11 @@ import
logos_delivery/messaging/delivery_service/[recv_service, send_service],
logos_delivery/messaging/delivery_service/send_service/delivery_task
RequestBroker:
proc MessagingSend(
envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async.}
type
MessagingClientConf* = object
## Per-layer config object for the messaging API.
@ -15,6 +21,7 @@ type
useP2PReliability*: bool
MessagingClient* = ref object of IMessagingClient
brokerCtx: BrokerContext
node: WakuNode
sendService*: SendService
recvService*: RecvService
@ -27,19 +34,34 @@ proc new*(
## `WakuNode` (Waku's core) for transport while exposing its own send/recv API.
let sendService = ?SendService.new(conf.useP2PReliability, node)
let recvService = RecvService.new(node)
ok(T(node: node, sendService: sendService, recvService: recvService))
ok(
T(
node: node,
sendService: sendService,
recvService: recvService,
brokerCtx: node.brokerCtx,
)
)
proc start*(self: MessagingClient): Result[void, string] =
if self.started:
return ok()
self.recvService.startRecvService()
self.sendService.startSendService()
?MessagingSend.setProvider(
self.brokerCtx,
proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.async.} =
return await self.send(envelope),
)
self.started = true
ok()
proc stop*(self: MessagingClient) {.async.} =
if not self.started:
return
MessagingSend.clearProvider(self.brokerCtx)
await self.sendService.stopSendService()
await self.recvService.stopRecvService()
self.started = false

View File

@ -162,13 +162,13 @@ proc registerProviders(backend: KvBackend, ctx: BrokerContext): Result[void, str
return ok()
proc clearProviders(ctx: BrokerContext) =
proc clearProviders(ctx: BrokerContext) {.async.} =
KvGet.clearProvider(ctx)
KvExists.clearProvider(ctx)
KvScan.clearProvider(ctx)
KvCount.clearProvider(ctx)
KvDelete.clearProvider(ctx)
PersistEvent.dropAllListeners(ctx)
await PersistEvent.dropAllListeners(ctx)
# ── thread proc ─────────────────────────────────────────────────────────
@ -217,7 +217,7 @@ proc storageThreadMain(arg: ptr StorageThreadArg) {.thread.} =
except CatchableError as e:
error "storage thread loop crashed", err = e.msg
clearProviders(arg.ctx)
waitFor clearProviders(arg.ctx)
backend.close()
# ── lifecycle ───────────────────────────────────────────────────────────

View File

@ -272,7 +272,7 @@ proc hasJob*(p: Persistency, jobId: string): bool {.inline.} =
proc persist*(t: Job, ops: seq[TxOp]): Future[void] {.async.} =
## Emit a batched persist event. The handler treats >1 ops as a single
## BEGIN IMMEDIATE/COMMIT transaction (see backend_sqlite.applyOps).
await PersistEvent.emit(t.context, PersistEvent(ops: ops))
PersistEvent.emit(t.context, PersistEvent(ops: ops))
proc persist*(t: Job, op: TxOp): Future[void] {.async.} =
await persist(t, @[op])

View File

@ -328,8 +328,8 @@
}
},
"brokers": {
"version": "#v3.1.1",
"vcsRevision": "a7316a35f1b62e3497ae8ee0fc1aace74df0beb2",
"version": "#v3.1.4",
"vcsRevision": "8ae9e963b0b4478c93e6f888be6a46654da787de",
"url": "https://github.com/NagyZoltanPeter/nim-brokers.git",
"downloadMethod": "git",
"dependencies": [
@ -341,7 +341,7 @@
"cbor_serialization"
],
"checksums": {
"sha1": "4447d7c1f9da14ae439afb23aee45116ce2ecb40"
"sha1": "2949398033c1b3a2586f0afd7e92f46b9a0a412e"
}
},
"stint": {

View File

@ -186,16 +186,16 @@ suite "Reliable Channel - send state machine":
setNoopEncryption()
var sendCalls = 0
let fakeSend: SendHandler = proc(
env: MessageEnvelope
): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} =
sendCalls.inc
return ok(fakeMsgReqId)
MessagingSend.replaceProvider(
brokerCtx,
proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.async.} =
sendCalls.inc
return ok(fakeMsgReqId),
).isOkOr:
raiseAssert "replaceProvider failed: " & error
discard manager
.createReliableChannel(
channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend
)
.createReliableChannel(channelId, contentTopic, SdsParticipantID("local"))
.expect("createReliableChannel")
let sentFut = newFuture[RequestId]("channel-sent")
@ -250,17 +250,17 @@ suite "Reliable Channel - send state machine":
setNoopEncryption()
var msgReqIds: seq[RequestId]
let fakeSend: SendHandler = proc(
env: MessageEnvelope
): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} =
let id = RequestId("fake-msg-req-" & $(msgReqIds.len + 1))
msgReqIds.add(id)
return ok(id)
MessagingSend.replaceProvider(
brokerCtx,
proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.async.} =
let id = RequestId("fake-msg-req-" & $(msgReqIds.len + 1))
msgReqIds.add(id)
return ok(id),
).isOkOr:
raiseAssert "replaceProvider failed: " & error
discard manager
.createReliableChannel(
channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend
)
.createReliableChannel(channelId, contentTopic, SdsParticipantID("local"))
.expect("createReliableChannel")
let sentFut = newFuture[RequestId]("channel-sent")
@ -349,27 +349,27 @@ suite "Reliable Channel - send state machine":
var msgReqIds: seq[RequestId]
var sendsReturned = 0
let fakeSend: SendHandler = proc(
env: MessageEnvelope
): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} =
## Call 2 fires the first segment's terminal event and then
## yields, so the listener task runs while the second segment
## is still mid-`await` in `onReadyToSend` — the exact race
## window the regression test targets.
let id = RequestId("race-msg-req-" & $(msgReqIds.len + 1))
msgReqIds.add(id)
if msgReqIds.len == 2:
MessageSentEvent.emit(
brokerCtx, MessageSentEvent(requestId: msgReqIds[0], messageHash: "")
)
await sleepAsync(50.milliseconds)
sendsReturned.inc()
return ok(id)
MessagingSend.replaceProvider(
brokerCtx,
proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.async.} =
## Call 2 fires the first segment's terminal event and then
## yields, so the listener task runs while the second segment
## is still mid-`await` in `onReadyToSend` — the exact race
## window the regression test targets.
let id = RequestId("race-msg-req-" & $(msgReqIds.len + 1))
msgReqIds.add(id)
if msgReqIds.len == 2:
MessageSentEvent.emit(
brokerCtx, MessageSentEvent(requestId: msgReqIds[0], messageHash: "")
)
await sleepAsync(50.milliseconds)
sendsReturned.inc()
return ok(id),
).isOkOr:
raiseAssert "replaceProvider failed: " & error
discard manager
.createReliableChannel(
channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend
)
.createReliableChannel(channelId, contentTopic, SdsParticipantID("local"))
.expect("createReliableChannel")
var finalisedReqIds: seq[RequestId]
@ -450,15 +450,15 @@ suite "Reliable Channel - SDS persistence":
setNoopEncryption()
let fakeSend: SendHandler = proc(
env: MessageEnvelope
): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} =
return ok(RequestId("persist-msg-req-1"))
MessagingSend.replaceProvider(
globalBrokerContext(),
proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.async.} =
return ok(RequestId("persist-msg-req-1")),
).isOkOr:
raiseAssert "replaceProvider failed: " & error
discard manager
.createReliableChannel(
channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend
)
.createReliableChannel(channelId, contentTopic, SdsParticipantID("local"))
.expect("createReliableChannel")
discard (await manager.send(channelId, "persist me".toBytes())).expect("send")
@ -791,17 +791,17 @@ suite "Reliable Channel - SDS protocol semantics":
setNoopEncryption()
var capturedWires: seq[seq[byte]]
let fakeSend: SendHandler = proc(
env: MessageEnvelope
): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} =
## Noop encryption is identity, so the envelope payload IS the SDS wire.
capturedWires.add(env.payload)
return ok(RequestId("semantics-req-" & $capturedWires.len))
MessagingSend.replaceProvider(
brokerCtx,
proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.async.} =
## Noop encryption is identity, so the envelope payload IS the SDS wire.
capturedWires.add(envelope.payload)
return ok(RequestId("semantics-req-" & $capturedWires.len)),
).isOkOr:
raiseAssert "replaceProvider failed: " & error
discard manager
.createReliableChannel(
channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend
)
.createReliableChannel(channelId, contentTopic, SdsParticipantID("local"))
.expect("createReliableChannel")
let remotePeer =
@ -859,16 +859,17 @@ suite "Reliable Channel - SDS protocol semantics":
setNoopEncryption()
var capturedWires: seq[seq[byte]]
let fakeSend: SendHandler = proc(
env: MessageEnvelope
): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} =
capturedWires.add(env.payload)
return ok(RequestId("ack-req-" & $capturedWires.len))
MessagingSend.replaceProvider(
brokerCtx,
proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.async.} =
## Noop encryption is identity, so the envelope payload IS the SDS wire.
capturedWires.add(envelope.payload)
return ok(RequestId("ack-req-" & $capturedWires.len)),
).isOkOr:
raiseAssert "replaceProvider failed: " & error
discard manager
.createReliableChannel(
channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend
)
.createReliableChannel(channelId, contentTopic, SdsParticipantID("local"))
.expect("createReliableChannel")
discard (await manager.send(channelId, "needs ack".toBytes())).expect("send")
@ -1095,16 +1096,17 @@ suite "Reliable Channel - SDS protocol semantics":
setNoopEncryption()
var capturedWires: seq[seq[byte]]
let fakeSend: SendHandler = proc(
env: MessageEnvelope
): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} =
capturedWires.add(env.payload)
return ok(RequestId("unique-req-" & $capturedWires.len))
MessagingSend.replaceProvider(
brokerCtx,
proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.async.} =
## Noop encryption is identity, so the envelope payload IS the SDS wire.
capturedWires.add(envelope.payload)
return ok(RequestId("unique-req-" & $capturedWires.len)),
).isOkOr:
raiseAssert "replaceProvider failed: " & error
discard manager
.createReliableChannel(
channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend
)
.createReliableChannel(channelId, contentTopic, SdsParticipantID("local"))
.expect("createReliableChannel")
var deliveries: seq[seq[byte]]

View File

@ -156,7 +156,7 @@ suite "Persistency lifecycle":
let ev = PersistEvent(
ops: @[TxOp(category: "msg", key: k, kind: txPut, payload: payloadBytes("hello"))]
)
await PersistEvent.emit(t.context, ev)
PersistEvent.emit(t.context, ev)
let ckOk2 = await t.pollExists("msg", k)
check ckOk2
@ -178,7 +178,7 @@ suite "Persistency lifecycle":
check a.context != b.context
let k = key("shared", 1'i64)
await PersistEvent.emit(
PersistEvent.emit(
a.context,
PersistEvent(
ops: @[
@ -188,7 +188,7 @@ suite "Persistency lifecycle":
]
),
)
await PersistEvent.emit(
PersistEvent.emit(
b.context,
PersistEvent(
ops: @[
@ -255,7 +255,7 @@ suite "Persistency lifecycle":
ops.add(
TxOp(category: "msg", key: key("c", i), kind: txPut, payload: payloadBytes($i))
)
await PersistEvent.emit(t.context, PersistEvent(ops: ops))
PersistEvent.emit(t.context, PersistEvent(ops: ops))
# Wait for the last insert to land.
let ckOk5 = await t.pollExists("msg", key("c", 5'i64))
check ckOk5
@ -285,7 +285,7 @@ suite "Persistency lifecycle":
let r1 = aw7.get()
check r1.existed == false
await PersistEvent.emit(
PersistEvent.emit(
t.context,
PersistEvent(
ops: @[TxOp(category: "msg", key: k, kind: txPut, payload: payloadBytes("v"))]