archive: simplify and enhance async retention policy application (#2278)

* Avoid using timer and just use an infinite async loop that can be
cancelled at any time.
This commit is contained in:
Ivan FB 2023-12-11 08:50:40 +01:00 committed by GitHub
parent 45b0be8e75
commit 77c5ba7669
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 21 additions and 41 deletions

View File

@ -740,7 +740,6 @@ proc filterUnsubscribeAll*(node: WakuNode,
# yet incompatible to handle both type of filters - use specific filter registration instead
## Waku archive
const WakuArchiveDefaultRetentionPolicyInterval* = 30.minutes
proc mountArchive*(node: WakuNode,
driver: ArchiveDriver,
retentionPolicy = none(RetentionPolicy)):
@ -760,18 +759,7 @@ proc mountArchive*(node: WakuNode,
except CatchableError:
return err("exception in mountArchive: " & getCurrentExceptionMsg())
if retentionPolicy.isSome():
try:
debug "executing message retention policy"
let retPolRes = waitFor node.wakuArchive.executeMessageRetentionPolicy()
if retPolRes.isErr():
return err("error in mountArchive: " & retPolRes.error)
except CatchableError:
return err("exception in mountArch-ret-pol: " & getCurrentExceptionMsg())
node.wakuArchive.startMessageRetentionPolicyPeriodicTask(
WakuArchiveDefaultRetentionPolicyInterval)
asyncSpawn node.wakuArchive.start()
return ok()
## Waku store
@ -1174,6 +1162,9 @@ proc stop*(node: WakuNode) {.async.} =
if not node.wakuRlnRelay.isNil():
await node.wakuRlnRelay.stop()
if not node.wakuArchive.isNil():
await node.wakuArchive.stop()
node.started = false
proc isReady*(node: WakuNode): Future[bool] {.async.} =

View File

@ -68,6 +68,7 @@ type
driver*: ArchiveDriver # TODO: Make this field private. Remove asterisk
validator: MessageValidator
retentionPolicy: RetentionPolicy
retPolicyFut: Future[Result[void, string]] ## retention policy cancelable future
proc new*(T: type WakuArchive,
driver: ArchiveDriver,
@ -189,19 +190,25 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] {
return ok(ArchiveResponse(messages: messages, cursor: cursor))
# Retention policy
const WakuArchiveDefaultRetentionPolicyInterval* = chronos.minutes(30)
proc loopApplyRetentionPolicy*(w: WakuArchive):
Future[Result[void, string]] {.async.} =
proc executeMessageRetentionPolicy*(w: WakuArchive):
Future[Result[void, string]] {.async.} =
if w.retentionPolicy.isNil():
return err("retentionPolicy is Nil in executeMessageRetentionPolicy")
if w.driver.isNil():
return err("driver is Nil in executeMessageRetentionPolicy")
let retPolicyRes = await w.retentionPolicy.execute(w.driver)
if retPolicyRes.isErr():
waku_archive_errors.inc(labelValues = [retPolicyFailure])
return err("failed execution of retention policy: " & retPolicyRes.error)
while true:
debug "executing message retention policy"
let retPolicyRes = await w.retentionPolicy.execute(w.driver)
if retPolicyRes.isErr():
waku_archive_errors.inc(labelValues = [retPolicyFailure])
error "failed execution of retention policy", error=retPolicyRes.error
await sleepAsync(WakuArchiveDefaultRetentionPolicyInterval)
return ok()
@ -218,26 +225,8 @@ proc reportStoredMessagesMetric*(w: WakuArchive):
return ok()
proc startMessageRetentionPolicyPeriodicTask*(w: WakuArchive,
interval: timer.Duration) =
# Start the periodic message retention policy task
# https://github.com/nim-lang/Nim/issues/17369
proc start*(self: WakuArchive) {.async.} =
self.retPolicyFut = self.loopApplyRetentionPolicy()
var executeRetentionPolicy: CallbackFunc
executeRetentionPolicy =
CallbackFunc(
proc (arg: pointer) {.gcsafe, raises: [].} =
try:
let retPolRes = waitFor w.executeMessageRetentionPolicy()
if retPolRes.isErr():
waku_archive_errors.inc(labelValues = [retPolicyFailure])
error "error in periodic retention policy", error = retPolRes.error
except CatchableError:
waku_archive_errors.inc(labelValues = [retPolicyFailure])
error "exception in periodic retention policy",
error = getCurrentExceptionMsg()
discard setTimer(Moment.fromNow(interval), executeRetentionPolicy)
)
discard setTimer(Moment.fromNow(interval), executeRetentionPolicy)
proc stop*(self: WakuArchive) {.async.} =
self.retPolicyFut.cancel()