From e44daec1c4ecccb716ec1eebeda587c8c4b93552 Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Fri, 29 May 2026 21:41:30 +0200 Subject: [PATCH] nph fix --- channels/reliable_channel.nim | 34 ++++++++----------- channels/reliable_channel_manager.nim | 4 +-- .../test_reliable_channel_send_receive.nim | 8 ++--- 3 files changed, 17 insertions(+), 29 deletions(-) diff --git a/channels/reliable_channel.nim b/channels/reliable_channel.nim index 1caeb2ba2..c3fbe5d77 100644 --- a/channels/reliable_channel.nim +++ b/channels/reliable_channel.nim @@ -60,13 +60,11 @@ type ## messaging layer has its own richer `DeliveryState` (retries, ## propagated-vs-validated); here we only model what's needed to ## decide when a `channelReqId` is fully accounted for. - AwaitingRateLimit - ## Pushed by `send`; not yet released by rate_limit_manager. + AwaitingRateLimit ## Pushed by `send`; not yet released by rate_limit_manager. InFlight ## Released by rate_limit_manager and handed to delivery_service; ## `messagingReqId` is now set. - Confirmed - ## `MessageSentEvent` arrived for `messagingReqId`. + Confirmed ## `MessageSentEvent` arrived for `messagingReqId`. Failed ## `MessageErrorEvent` arrived for `messagingReqId`, or the local ## delivery-task construction failed before any id was reachable. @@ -164,7 +162,7 @@ proc onMessageSent(self: ReliableChannel, messagingReqId: RequestId) = ## belong to this channel simply don't match and are no-ops. self.pendingMessagingRequests.applyItIf( it.segmentSendState == SegmentSendState.InFlight and - it.messagingReqId == some(messagingReqId) + it.messagingReqId == some(messagingReqId) ): it.segmentSendState = SegmentSendState.Confirmed self.pruneCompletedChannelReqs() @@ -173,7 +171,7 @@ proc onMessageError(self: ReliableChannel, messagingReqId: RequestId) = ## Symmetric to `onMessageSent` but for `MessageErrorEvent`. self.pendingMessagingRequests.applyItIf( it.segmentSendState == SegmentSendState.InFlight and - it.messagingReqId == some(messagingReqId) + it.messagingReqId == some(messagingReqId) ): it.segmentSendState = SegmentSendState.Failed self.pruneCompletedChannelReqs() @@ -196,7 +194,9 @@ proc onReadyToSend( ## live on until every sibling of their `channelReqId` is final, ## so we walk past those to find the next one that was awaiting for this batch. while idx < self.pendingMessagingRequests.len and - self.pendingMessagingRequests[idx].segmentSendState != SegmentSendState.AwaitingRateLimit: + self.pendingMessagingRequests[idx].segmentSendState != + SegmentSendState.AwaitingRateLimit + : idx.inc() if idx >= self.pendingMessagingRequests.len: ## rate_limit_manager emitted more messages than we have pending — @@ -219,9 +219,7 @@ proc onReadyToSend( MessageErrorEvent.emit( self.brokerCtx, MessageErrorEvent( - requestId: channelReqId, - messageHash: "", - error: "encryption failed: " & error, + requestId: channelReqId, messageHash: "", error: "encryption failed: " & error ), ) ## Encryption failed *before* we could hand the segment to the @@ -260,9 +258,7 @@ proc onReadyToSend( MessageErrorEvent.emit( self.brokerCtx, MessageErrorEvent( - requestId: channelReqId, - messageHash: "", - error: "waku send failed: " & error, + requestId: channelReqId, messageHash: "", error: "waku send failed: " & error ), ) self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.Failed @@ -390,9 +386,9 @@ proc new*( ## the send state machine without touching the network. let resolvedSendHandler = if sendHandler.isNil(): - proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {. - async: (raises: [CatchableError]), gcsafe - .} = + proc( + envelope: MessageEnvelope + ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = return await waku.send(envelope) else: sendHandler @@ -445,15 +441,13 @@ proc new*( discard MessageSentEvent.listen( chn.brokerCtx, proc(evt: MessageSentEvent): Future[void] {.async: (raises: []).} = - chn.onMessageSent(evt.requestId) - , + chn.onMessageSent(evt.requestId), ) discard MessageErrorEvent.listen( chn.brokerCtx, proc(evt: MessageErrorEvent): Future[void] {.async: (raises: []).} = - chn.onMessageError(evt.requestId) - , + chn.onMessageError(evt.requestId), ) return chn diff --git a/channels/reliable_channel_manager.nim b/channels/reliable_channel_manager.nim index 79f59ac3f..747f755b4 100644 --- a/channels/reliable_channel_manager.nim +++ b/channels/reliable_channel_manager.nim @@ -44,9 +44,7 @@ proc new*( let waku = ?(await createNode(conf)) let manager = T( - channels: initTable[ChannelId, ReliableChannel](), - waku: waku, - brokerCtx: brokerCtx, + channels: initTable[ChannelId, ReliableChannel](), waku: waku, brokerCtx: brokerCtx ) return ok(manager) diff --git a/tests/channels/test_reliable_channel_send_receive.nim b/tests/channels/test_reliable_channel_send_receive.nim index 91e3a9685..2f49182a2 100644 --- a/tests/channels/test_reliable_channel_send_receive.nim +++ b/tests/channels/test_reliable_channel_send_receive.nim @@ -175,9 +175,7 @@ suite "Reliable Channel - send state machine": var sendCalls = 0 let fakeSend: SendHandler = proc( env: MessageEnvelope - ): Future[Result[RequestId, string]] {. - async: (raises: [CatchableError]), gcsafe - .} = + ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = sendCalls.inc return ok(fakeMsgReqId) @@ -242,9 +240,7 @@ suite "Reliable Channel - send state machine": var msgReqIds: seq[RequestId] let fakeSend: SendHandler = proc( env: MessageEnvelope - ): Future[Result[RequestId, string]] {. - async: (raises: [CatchableError]), gcsafe - .} = + ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = let id = RequestId("fake-msg-req-" & $(msgReqIds.len + 1)) msgReqIds.add(id) return ok(id)