This commit is contained in:
NagyZoltanPeter 2026-06-24 21:06:45 +02:00
parent 9c744bfeb8
commit 7fd722c09a
No known key found for this signature in database
GPG Key ID: 3E1F97CF4A7B6F42
2 changed files with 8 additions and 22 deletions

View File

@ -52,7 +52,7 @@ proc start*(self: MessagingClient): Result[void, string] =
?MessagingSend.setProvider(
self.brokerCtx,
proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.async.} =
return await self.send(envelope)
return await self.send(envelope),
)
self.started = true

View File

@ -188,9 +188,7 @@ suite "Reliable Channel - send state machine":
var sendCalls = 0
MessagingSend.replaceProvider(
brokerCtx,
proc(
envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async.} =
proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.async.} =
sendCalls.inc
return ok(fakeMsgReqId),
).isOkOr:
@ -254,9 +252,7 @@ suite "Reliable Channel - send state machine":
var msgReqIds: seq[RequestId]
MessagingSend.replaceProvider(
brokerCtx,
proc(
envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async.} =
proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.async.} =
let id = RequestId("fake-msg-req-" & $(msgReqIds.len + 1))
msgReqIds.add(id)
return ok(id),
@ -355,9 +351,7 @@ suite "Reliable Channel - send state machine":
var sendsReturned = 0
MessagingSend.replaceProvider(
brokerCtx,
proc(
envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async.} =
proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.async.} =
## Call 2 fires the first segment's terminal event and then
## yields, so the listener task runs while the second segment
## is still mid-`await` in `onReadyToSend` — the exact race
@ -458,9 +452,7 @@ suite "Reliable Channel - SDS persistence":
MessagingSend.replaceProvider(
globalBrokerContext(),
proc(
envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async.} =
proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.async.} =
return ok(RequestId("persist-msg-req-1")),
).isOkOr:
raiseAssert "replaceProvider failed: " & error
@ -801,9 +793,7 @@ suite "Reliable Channel - SDS protocol semantics":
var capturedWires: seq[seq[byte]]
MessagingSend.replaceProvider(
brokerCtx,
proc(
envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async.} =
proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.async.} =
## Noop encryption is identity, so the envelope payload IS the SDS wire.
capturedWires.add(envelope.payload)
return ok(RequestId("semantics-req-" & $capturedWires.len)),
@ -871,9 +861,7 @@ suite "Reliable Channel - SDS protocol semantics":
var capturedWires: seq[seq[byte]]
MessagingSend.replaceProvider(
brokerCtx,
proc(
envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async.} =
proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.async.} =
## Noop encryption is identity, so the envelope payload IS the SDS wire.
capturedWires.add(envelope.payload)
return ok(RequestId("ack-req-" & $capturedWires.len)),
@ -1110,9 +1098,7 @@ suite "Reliable Channel - SDS protocol semantics":
var capturedWires: seq[seq[byte]]
MessagingSend.replaceProvider(
brokerCtx,
proc(
envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async.} =
proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.async.} =
## Noop encryption is identity, so the envelope payload IS the SDS wire.
capturedWires.add(envelope.payload)
return ok(RequestId("unique-req-" & $capturedWires.len)),