mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-06-27 11:59:26 +00:00
fix - Cap store checks on propagated messages by MessagingClient (#3965)
* Frame send service store checks and cap to 1 min per task * Add test for store not verify case
This commit is contained in:
parent
5309ce294b
commit
8501d051a1
@ -24,6 +24,9 @@ type DeliveryTask* = ref object
|
||||
tryCount*: int
|
||||
state*: DeliveryState
|
||||
deliveryTime*: Moment
|
||||
firstPropagatedTime*: Option[Moment]
|
||||
## Set once on the first successful propagation; never reset on re-publish.
|
||||
## Anchors the store-validation time cap (see propagationAge).
|
||||
propagateEventEmitted*: bool
|
||||
errorDesc*: string
|
||||
|
||||
@ -74,5 +77,13 @@ proc deliveryAge*(self: DeliveryTask): timer.Duration =
|
||||
else:
|
||||
ZeroDuration
|
||||
|
||||
proc propagationAge*(self: DeliveryTask): timer.Duration =
|
||||
## Time elapsed since the message was first successfully propagated.
|
||||
## Stable across re-publishes; ZeroDuration until first propagation.
|
||||
if self.firstPropagatedTime.isSome():
|
||||
timer.Moment.now() - self.firstPropagatedTime.get()
|
||||
else:
|
||||
ZeroDuration
|
||||
|
||||
proc isEphemeral*(self: DeliveryTask): bool =
|
||||
return self.msg.ephemeral
|
||||
|
||||
@ -71,6 +71,8 @@ method sendImpl*(
|
||||
requestId = task.requestId, msgHash = task.msgHash.to0xHex()
|
||||
task.state = DeliveryState.SuccessfullyPropagated
|
||||
task.deliveryTime = Moment.now()
|
||||
if task.firstPropagatedTime.isNone():
|
||||
task.firstPropagatedTime = some(Moment.now())
|
||||
# TODO: with a simple retry processor it might be more accurate to say `Sent`
|
||||
else:
|
||||
# Controversial state, publish says ok but no peer. It should not happen.
|
||||
|
||||
@ -76,6 +76,8 @@ method sendImpl*(self: RelaySendProcessor, task: DeliveryTask) {.async.} =
|
||||
noOfPeers = noOfPublishedPeers
|
||||
task.state = DeliveryState.SuccessfullyPropagated
|
||||
task.deliveryTime = Moment.now()
|
||||
if task.firstPropagatedTime.isNone():
|
||||
task.firstPropagatedTime = some(Moment.now())
|
||||
else:
|
||||
# It shall not happen, but still covering it
|
||||
task.state = self.fallbackStateToSet
|
||||
|
||||
@ -59,6 +59,7 @@ type SendService* = ref object of RootObj
|
||||
|
||||
node: WakuNode
|
||||
checkStoreForMessages: bool
|
||||
lastStoreCheckTime: Moment ## throttles store validation queries to ArchiveTime cadence
|
||||
|
||||
proc setupSendProcessorChain(
|
||||
peerManager: PeerManager,
|
||||
@ -117,6 +118,7 @@ proc new*(
|
||||
sendProcessor: sendProcessorChain,
|
||||
node: w,
|
||||
checkStoreForMessages: checkStoreForMessages,
|
||||
lastStoreCheckTime: Moment.now(),
|
||||
)
|
||||
|
||||
return ok(sendService)
|
||||
@ -163,11 +165,20 @@ proc checkStoredMessages(self: SendService) {.async.} =
|
||||
if not self.checkStoreForMessages:
|
||||
return
|
||||
|
||||
# Throttle store queries so they run at most every ArchiveTime (3s), regardless
|
||||
# of the 1s service loop cadence.
|
||||
if Moment.now() - self.lastStoreCheckTime < ArchiveTime:
|
||||
return
|
||||
|
||||
let tasksToValidate = self.taskCache.filterIt(
|
||||
it.state == DeliveryState.SuccessfullyPropagated and it.deliveryAge() > ArchiveTime and
|
||||
not it.isEphemeral()
|
||||
it.state == DeliveryState.SuccessfullyPropagated and
|
||||
it.propagationAge() > ArchiveTime and not it.isEphemeral()
|
||||
)
|
||||
|
||||
if tasksToValidate.len() == 0:
|
||||
return
|
||||
|
||||
self.lastStoreCheckTime = Moment.now()
|
||||
await self.checkMsgsInStore(tasksToValidate)
|
||||
|
||||
proc reportTaskResult(self: SendService, task: DeliveryTask) =
|
||||
@ -200,7 +211,10 @@ proc reportTaskResult(self: SendService, task: DeliveryTask) =
|
||||
# rest of the states are intermediate and does not translate to event
|
||||
discard
|
||||
|
||||
if task.messageAge() > MaxTimeInCache:
|
||||
# Only tasks that never propagated are reported as hard send failures here.
|
||||
# Propagated-but-not-store-validated tasks are handled (warn + drop, no event)
|
||||
# in evaluateAndCleanUp.
|
||||
if task.firstPropagatedTime.isNone() and task.messageAge() > MaxTimeInCache:
|
||||
error "Failed to send message",
|
||||
requestId = task.requestId,
|
||||
msgHash = task.msgHash.to0xHex(),
|
||||
@ -229,6 +243,25 @@ proc evaluateAndCleanUp(self: SendService) =
|
||||
)
|
||||
)
|
||||
|
||||
# Store validation timed out: the message was propagated but never confirmed in a
|
||||
# store node within MaxTimeInCache (measured from first propagation). Warn and drop
|
||||
# without emitting an app event.
|
||||
for task in self.taskCache:
|
||||
if task.firstPropagatedTime.isSome() and
|
||||
task.state != DeliveryState.SuccessfullyValidated and
|
||||
task.propagationAge() > MaxTimeInCache:
|
||||
warn "Message propagated but not validated by a store node within time window; stop trying.",
|
||||
requestId = task.requestId,
|
||||
msgHash = task.msgHash.to0xHex(),
|
||||
propagationAge = task.propagationAge()
|
||||
|
||||
self.taskCache.keepItIf(
|
||||
not (
|
||||
it.firstPropagatedTime.isSome() and it.state != DeliveryState.SuccessfullyValidated and
|
||||
it.propagationAge() > MaxTimeInCache
|
||||
)
|
||||
)
|
||||
|
||||
proc trySendMessages(self: SendService) {.async.} =
|
||||
let tasksToSend = self.taskCache.filterIt(it.state == DeliveryState.NextRoundRetry)
|
||||
|
||||
|
||||
@ -472,3 +472,60 @@ suite "Waku API - Send":
|
||||
eventManager.validate({SendEventOutcome.Error}, requestId)
|
||||
(await node.stop()).isOkOr:
|
||||
raiseAssert "Failed to stop node: " & error
|
||||
|
||||
asyncTest "Store validation times out without event":
|
||||
## The message propagates successfully, but the only reachable store peer never
|
||||
## receives/archives it (it is outside the relay propagation path), so store
|
||||
## validation never confirms. After MaxTimeInCache the task must be dropped with a
|
||||
## warn log and NO app event: Propagated fires, but neither Sent nor Error - the
|
||||
## missing Sent event is the signal that delivery could not be validated.
|
||||
var isolatedStoreNode: WakuNode
|
||||
lockNewGlobalBrokerContext:
|
||||
isolatedStoreNode =
|
||||
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
||||
isolatedStoreNode.mountMetadata(3, @[0'u16]).isOkOr:
|
||||
raiseAssert "Failed to mount metadata: " & error
|
||||
(await isolatedStoreNode.mountRelay()).isOkOr:
|
||||
raiseAssert "Failed to mount relay"
|
||||
let archiveDriver = newSqliteArchiveDriver()
|
||||
isolatedStoreNode.mountArchive(archiveDriver).isOkOr:
|
||||
raiseAssert "Failed to mount archive: " & error
|
||||
await isolatedStoreNode.mountStore()
|
||||
await isolatedStoreNode.mountLibp2pPing()
|
||||
await isolatedStoreNode.start()
|
||||
# Deliberately NOT subscribed to the topic and NOT wired into the relay mesh, so
|
||||
# it can answer store queries but never holds the published message.
|
||||
let isolatedStoreNodePeerInfo = isolatedStoreNode.peerInfo.toRemotePeerInfo()
|
||||
|
||||
var node: Waku
|
||||
lockNewGlobalBrokerContext:
|
||||
node = (await createNode(createApiNodeConf())).valueOr:
|
||||
raiseAssert error
|
||||
node.mountMessagingClient().isOkOr:
|
||||
raiseAssert "Failed to mount messaging: " & error
|
||||
(await node.start()).isOkOr:
|
||||
raiseAssert "Failed to start Waku node: " & error
|
||||
|
||||
# Propagate via relayNode1; store queries can only reach the isolated store node.
|
||||
await node.node.connectToNodes(@[relayNode1PeerInfo, isolatedStoreNodePeerInfo])
|
||||
|
||||
let eventManager = newSendEventListenerManager(node.brokerCtx)
|
||||
defer:
|
||||
await eventManager.teardown()
|
||||
|
||||
let envelope = MessageEnvelope.init(
|
||||
ContentTopic("/waku/2/default-content/proto"), "test payload"
|
||||
)
|
||||
|
||||
let requestId = (await node.send(envelope)).valueOr:
|
||||
raiseAssert error
|
||||
|
||||
# Must outlive MaxTimeInCache (1 min) so the store-validation timeout drop fires.
|
||||
const eventTimeout = 65.seconds
|
||||
discard await eventManager.waitForEvents(eventTimeout)
|
||||
|
||||
eventManager.validate({SendEventOutcome.Propagated}, requestId)
|
||||
|
||||
await isolatedStoreNode.stop()
|
||||
(await node.stop()).isOkOr:
|
||||
raiseAssert "Failed to stop node: " & error
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user