nwaku/waku/waku_archive/retention_policy/retention_policy_time.nim
Ivan FB d530528259
refactor: new proc to foster different size retention policy implementations (#2463)
* new proc to foster different size retention policy implementations
  The new proc, decreaseDatabaseSize, will have different implementations
  per each driver. For example, in future commits we will implement a size
  retention policy thanks to partitions management, in Postgres.
* RetentionPolicy: use of new instead of init for ref object types
* waku_archive: fix signatures in decreaseDatabaseSize methods
* retention_policy_size: minor cleanup of comments and imports
2024-02-22 16:55:37 +01:00

53 lines
1.4 KiB
Nim

when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/times,
stew/results,
chronicles,
chronos
import
../../waku_core,
../driver,
../retention_policy
logScope:
topics = "waku archive retention_policy"
const DefaultRetentionTime*: int64 = 30.days.seconds
type TimeRetentionPolicy* = ref object of RetentionPolicy
retentionTime: chronos.Duration
proc new*(T: type TimeRetentionPolicy, retentionTime=DefaultRetentionTime): T =
TimeRetentionPolicy(
retentionTime: retentionTime.seconds
)
method execute*(p: TimeRetentionPolicy,
driver: ArchiveDriver):
Future[RetentionPolicyResult[void]] {.async.} =
## Delete messages that exceed the retention time by 10% and more (batch delete for efficiency)
let omtRes = await driver.getOldestMessageTimestamp()
if omtRes.isErr():
return err("failed to get oldest message timestamp: " & omtRes.error)
let now = getNanosecondTime(getTime().toUnixFloat())
let retentionTimestamp = now - p.retentionTime.nanoseconds
let thresholdTimestamp = retentionTimestamp - p.retentionTime.nanoseconds div 10
if thresholdTimestamp <= omtRes.value:
return ok()
let res = await driver.deleteMessagesOlderThanTimestamp(ts=retentionTimestamp)
if res.isErr():
return err("failed to delete oldest messages: " & res.error)
return ok()