allow union of several retention policies (#3766)

* refactor retention policy to allow union of several retention policies
* bug fix time retention policy
* add removal of orphan partitions if any
* use nim-http-utils 0.4.1
This commit is contained in:
Ivan FB 2026-03-19 21:37:04 +01:00 committed by GitHub
parent fdf4e839ff
commit 11461aed44
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 323 additions and 55 deletions

View File

@ -1,7 +1,11 @@
{.used.}
import std/options, results, stint, testutils/unittests
import waku/api/api_conf, waku/factory/waku_conf, waku/factory/networks_config
import std/[options, strutils], results, stint, testutils/unittests, chronos
import
waku/api/api_conf,
waku/factory/waku_conf,
waku/factory/networks_config,
waku/factory/conf_builder/conf_builder
suite "LibWaku Conf - toWakuConf":
test "Minimal configuration":
@ -277,3 +281,60 @@ suite "LibWaku Conf - toWakuConf":
check:
wakuConf.staticNodes.len == 1
wakuConf.staticNodes[0] == entryNodes[1]
suite "WakuConfBuilder - store retention policies":
test "Multiple retention policies":
## Given
var b = WakuConfBuilder.init()
b.storeServiceConf.withEnabled(true)
b.storeServiceConf.withDbUrl("sqlite://test.db")
b.storeServiceConf.withRetentionPolicies(@["time:86400", "capacity:10000"])
## When
let wakuConf = b.build().valueOr:
raiseAssert error
## Then
require wakuConf.storeServiceConf.isSome()
let storeConf = wakuConf.storeServiceConf.get()
check storeConf.retentionPolicies == @["time:86400", "capacity:10000"]
test "Duplicated retention policies returns error":
## Given
var b = WakuConfBuilder.init()
b.storeServiceConf.withEnabled(true)
b.storeServiceConf.withDbUrl("sqlite://test.db")
b.storeServiceConf.withRetentionPolicies(@["time:86400", "time:800", "capacity:10000"])
## When
let wakuConfRes = b.build()
## Then
check wakuConfRes.isErr()
check wakuConfRes.error.contains("duplicated retention policy type")
test "Incorrect retention policy type returns error":
## Given
var b = WakuConfBuilder.init()
b.storeServiceConf.withEnabled(true)
b.storeServiceConf.withDbUrl("sqlite://test.db")
b.storeServiceConf.withRetentionPolicies(@["capaity:10000"])
## When
let wakuConfRes = b.build()
## Then
check wakuConfRes.isErr()
check wakuConfRes.error.contains("unknown retention policy type")
test "Store disabled - no retention policy applied":
## Given
var b = WakuConfBuilder.init()
# storeServiceConf not enabled
## When
let wakuConf = b.build().valueOr:
raiseAssert error
## Then
check wakuConf.storeServiceConf.isNone()

View File

@ -1,7 +1,7 @@
{.used.}
import
std/options,
std/[options, strutils],
testutils/unittests,
chronos,
libp2p/crypto/[crypto, secp],
@ -261,6 +261,85 @@ suite "Waku external config - Shards":
## Then
assert res.isErr(), "Invalid shard was accepted"
suite "Waku external config - store retention policy":
test "Default retention policy":
## Given
var conf = defaultWakuNodeConf().get()
conf.store = true
conf.storeMessageDbUrl = "sqlite://test.db"
# storeMessageRetentionPolicy keeps its default: "time:<2 days in seconds>"
## When
let res = conf.toWakuConf()
## Then
assert res.isOk(), $res.error
let wakuConf = res.get()
require wakuConf.storeServiceConf.isSome()
check wakuConf.storeServiceConf.get().retentionPolicies ==
@["time:" & $2.days.seconds]
test "Single custom retention policy":
## Given
var conf = defaultWakuNodeConf().get()
conf.store = true
conf.storeMessageDbUrl = "sqlite://test.db"
conf.storeMessageRetentionPolicy = "capacity:50000"
## When
let res = conf.toWakuConf()
## Then
assert res.isOk(), $res.error
let wakuConf = res.get()
require wakuConf.storeServiceConf.isSome()
check wakuConf.storeServiceConf.get().retentionPolicies == @["capacity:50000"]
test "Retention policies with whitespace around semicolons and colons":
## Given
var conf = defaultWakuNodeConf().get()
conf.store = true
conf.storeMessageDbUrl = "sqlite://test.db"
conf.storeMessageRetentionPolicy = "time:3600 ; capacity:10000 ; size : 30GB"
## When
let res = conf.toWakuConf()
## Then
assert res.isOk(), $res.error
let wakuConf = res.get()
require wakuConf.storeServiceConf.isSome()
check wakuConf.storeServiceConf.get().retentionPolicies ==
@["time:3600", "capacity:10000", "size:30GB"]
test "Invalid retention policy type returns error":
## Given
var conf = defaultWakuNodeConf().get()
conf.store = true
conf.storeMessageDbUrl = "sqlite://test.db"
conf.storeMessageRetentionPolicy = "foo:1234"
## When
let res = conf.toWakuConf()
## Then
check res.isErr()
check res.error.contains("unknown retention policy type")
test "Duplicated retention policy type returns error":
## Given
var conf = defaultWakuNodeConf().get()
conf.store = true
conf.storeMessageDbUrl = "sqlite://test.db"
conf.storeMessageRetentionPolicy = "time:3600;time:7200;capacity:10000"
## When
let res = conf.toWakuConf()
## Then
check res.isErr()
check res.error.contains("duplicated retention policy type")
suite "Waku external config - http url parsing":
test "Basic HTTP URLs without authentication":
check string(parseCmdArg(EthRpcUrl, "https://example.com/path")) ==

View File

@ -347,7 +347,7 @@ hence would have reachability issues.""",
storeMessageRetentionPolicy* {.
desc:
"Message store retention policy. Time retention policy: 'time:<seconds>'. Capacity retention policy: 'capacity:<count>'. Size retention policy: 'size:<xMB/xGB>'. Set to 'none' to disable.",
"Message store retention policy. Multiple policies may be provided as a semicolon-separated string and are applied as a union. Time retention policy: 'time:<seconds>'. Capacity retention policy: 'capacity:<count>'. Size retention policy: 'size:<xMB/xGB>'. Set to 'none' to disable. Example: 'time:3600;size:1GB;capacity:100'.",
defaultValue: "time:" & $2.days.seconds,
name: "store-message-retention-policy"
.}: string
@ -991,7 +991,7 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] =
b.storeServiceConf.withEnabled(n.store)
b.storeServiceConf.withSupportV2(n.legacyStore)
b.storeServiceConf.withRetentionPolicy(n.storeMessageRetentionPolicy)
b.storeServiceConf.withRetentionPolicies(n.storeMessageRetentionPolicy)
b.storeServiceConf.withDbUrl(n.storeMessageDbUrl)
b.storeServiceConf.withDbVacuum(n.storeMessageDbVacuum)
b.storeServiceConf.withDbMigration(n.storeMessageDbMigration)

@ -1 +1 @@
Subproject commit 08db609467a0e2b5a6e8ce118bb83dba7a8a9375
Subproject commit f142cb2e8bd812dd002a6493b6082827bb248592

View File

@ -1,4 +1,5 @@
import chronicles, std/options, results, chronos
import std/[options, strutils, sequtils]
import chronicles, results, chronos
import ../waku_conf, ./store_sync_conf_builder
logScope:
@ -15,7 +16,7 @@ type StoreServiceConfBuilder* = object
dbVacuum*: Option[bool]
supportV2*: Option[bool]
maxNumDbConnections*: Option[int]
retentionPolicy*: Option[string]
retentionPolicies*: seq[string]
resume*: Option[bool]
storeSyncConf*: StoreSyncConfBuilder
@ -42,12 +43,43 @@ proc withMaxNumDbConnections*(
) =
b.maxNumDbConnections = some(maxNumDbConnections)
proc withRetentionPolicy*(b: var StoreServiceConfBuilder, retentionPolicy: string) =
b.retentionPolicy = some(retentionPolicy)
proc withRetentionPolicies*(b: var StoreServiceConfBuilder, retentionPolicies: string) =
b.retentionPolicies = retentionPolicies
.multiReplace((" ", ""), ("\t", ""))
.split(";")
.mapIt(it.strip())
.filterIt(it.len > 0)
proc withResume*(b: var StoreServiceConfBuilder, resume: bool) =
b.resume = some(resume)
const ValidRetentionPolicyTypes = ["time", "capacity", "size"]
proc validateRetentionPolicies(policies: seq[string]): Result[void, string] =
var seen: seq[string]
for p in policies:
let policy = p.multiReplace((" ", ""), ("\t", ""))
let parts = policy.split(":", 1)
if parts.len != 2 or parts[1] == "":
return err(
"invalid retention policy format: '" & policy & "', expected '<type>:<value>'"
)
let policyType = parts[0].toLowerAscii()
if policyType notin ValidRetentionPolicyTypes:
return err(
"unknown retention policy type: '" & policyType &
"', valid types are: time, capacity, size"
)
if policyType in seen:
return err("duplicated retention policy type: '" & policyType & "'")
seen.add(policyType)
return ok()
proc build*(b: StoreServiceConfBuilder): Result[Option[StoreServiceConf], string] =
if not b.enabled.get(false):
return ok(none(StoreServiceConf))
@ -58,6 +90,14 @@ proc build*(b: StoreServiceConfBuilder): Result[Option[StoreServiceConf], string
let storeSyncConf = b.storeSyncConf.build().valueOr:
return err("Store Sync Conf failed to build")
let retentionPolicies =
if b.retentionPolicies.len == 0:
@["time:" & $2.days.seconds]
else:
validateRetentionPolicies(b.retentionPolicies).isOkOr:
return err("invalid retention policies: " & error)
b.retentionPolicies
return ok(
some(
StoreServiceConf(
@ -66,7 +106,7 @@ proc build*(b: StoreServiceConfBuilder): Result[Option[StoreServiceConf], string
dbVacuum: b.dbVacuum.get(false),
supportV2: b.supportV2.get(false),
maxNumDbConnections: b.maxNumDbConnections.get(50),
retentionPolicy: b.retentionPolicy.get("time:" & $2.days.seconds),
retentionPolicies: retentionPolicies,
resume: b.resume.get(false),
storeSyncConf: storeSyncConf,
)

View File

@ -206,10 +206,10 @@ proc setupProtocols(
).valueOr:
return err("failed to setup archive driver: " & error)
let retPolicy = policy.RetentionPolicy.new(storeServiceConf.retentionPolicy).valueOr:
let retPolicies = policy.RetentionPolicy.new(storeServiceConf.retentionPolicies).valueOr:
return err("failed to create retention policy: " & error)
node.mountArchive(archiveDriver, retPolicy).isOkOr:
node.mountArchive(archiveDriver, retPolicies).isOkOr:
return err("failed to mount waku archive protocol: " & error)
if storeServiceConf.supportV2:

View File

@ -57,7 +57,7 @@ type StoreServiceConf* {.requiresInit.} = object
dbVacuum*: bool
supportV2*: bool
maxNumDbConnections*: int
retentionPolicy*: string
retentionPolicies*: seq[string]
resume*: bool
storeSyncConf*: Option[StoreSyncConf]

View File

@ -39,10 +39,10 @@ logScope:
proc mountArchive*(
node: WakuNode,
driver: waku_archive.ArchiveDriver,
retentionPolicy = none(waku_archive.RetentionPolicy),
retentionPolicies = newSeq[waku_archive.RetentionPolicy](),
): Result[void, string] =
node.wakuArchive = waku_archive.WakuArchive.new(
driver = driver, retentionPolicy = retentionPolicy
driver = driver, retentionPolicies = retentionPolicies
).valueOr:
return err("error in mountArchive: " & error)

View File

@ -45,7 +45,7 @@ type WakuArchive* = ref object
validator: MessageValidator
retentionPolicy: Option[RetentionPolicy]
retentionPolicies: seq[RetentionPolicy]
retentionPolicyHandle: Future[void]
metricsHandle: Future[void]
@ -72,13 +72,17 @@ proc new*(
T: type WakuArchive,
driver: ArchiveDriver,
validator: MessageValidator = validate,
retentionPolicy = none(RetentionPolicy),
retentionPolicies = newSeq[RetentionPolicy](0),
): Result[T, string] =
if driver.isNil():
return err("archive driver is Nil")
let archive =
WakuArchive(driver: driver, validator: validator, retentionPolicy: retentionPolicy)
if retentionPolicies.len == 0:
return err("at least one retention policy must be provided")
let archive = WakuArchive(
driver: driver, validator: validator, retentionPolicies: retentionPolicies
)
return ok(archive)
@ -253,16 +257,15 @@ proc findMessages*(
)
proc periodicRetentionPolicy(self: WakuArchive) {.async.} =
let policy = self.retentionPolicy.get()
while true:
info "executing message retention policy"
(await policy.execute(self.driver)).isOkOr:
waku_archive_errors.inc(labelValues = [retPolicyFailure])
error "failed execution of retention policy", error = error
await sleepAsync(WakuArchiveDefaultRetentionPolicyIntervalWhenError)
## in case of error, let's try again faster
continue
for policy in self.retentionPolicies:
info "executing message retention policy", policy = $policy
(await policy.execute(self.driver)).isOkOr:
waku_archive_errors.inc(labelValues = [retPolicyFailure])
error "failed execution of retention policy", policy = $policy, error = error
await sleepAsync(WakuArchiveDefaultRetentionPolicyIntervalWhenError)
## in case of error, let's try again faster
continue
await sleepAsync(WakuArchiveDefaultRetentionPolicyInterval)
@ -279,7 +282,7 @@ proc periodicMetricReport(self: WakuArchive) {.async.} =
await sleepAsync(WakuArchiveDefaultMetricsReportInterval)
proc start*(self: WakuArchive) =
if self.retentionPolicy.isSome():
if self.retentionPolicies.len > 0:
self.retentionPolicyHandle = self.periodicRetentionPolicy()
self.metricsHandle = self.periodicMetricReport()
@ -287,7 +290,7 @@ proc start*(self: WakuArchive) =
proc stopWait*(self: WakuArchive) {.async.} =
var futures: seq[Future[void]]
if self.retentionPolicy.isSome() and not self.retentionPolicyHandle.isNil():
if not self.retentionPolicyHandle.isNil():
futures.add(self.retentionPolicyHandle.cancelAndWait())
if not self.metricsHandle.isNil:

View File

@ -16,6 +16,9 @@ import
./postgres_healthcheck,
./partitions_manager
logScope:
topics = "postgres driver"
type PostgresDriver* = ref object of ArchiveDriver
## Establish a separate pools for read/write operations
writeConnPool: PgAsyncPool
@ -367,6 +370,7 @@ proc getPartitionsList(
): Future[ArchiveDriverResult[seq[string]]] {.async.} =
## Retrieves the seq of partition table names.
## e.g: @["messages_1708534333_1708534393", "messages_1708534273_1708534333"]
## This returns the partitions that are attached to the main messages table.
var partitions: seq[string]
proc rowCallback(pqResult: ptr PGresult) =
for iRow in 0 ..< pqResult.pqNtuples():
@ -393,6 +397,49 @@ proc getPartitionsList(
return ok(partitions)
## fwd declaration. The implementation is below.
proc dropPartition(
self: PostgresDriver, partitionName: string
): Future[ArchiveDriverResult[void]] {.async.}
proc dropOrphanPartitions(
s: PostgresDriver
): Future[ArchiveDriverResult[void]] {.async.} =
## Tries to remove partitions that weren't correctly removed during retention policy execution.
## Orphan partition is a partition that is not attached to the main messages table.
## Therefore, it is not used for queries and can be safely removed.
var partitions: seq[string]
proc rowCallback(pqResult: ptr PGresult) =
for iRow in 0 ..< pqResult.pqNtuples():
let partitionName = $(pqgetvalue(pqResult, iRow, 0))
partitions.add(partitionName)
(
await s.readConnPool.pgQuery(
"""
SELECT c.relname AS partition_name
FROM pg_class c
LEFT JOIN pg_inherits i ON i.inhrelid = c.oid
WHERE c.relname LIKE 'messages_%'
AND c.relname != 'messages_lookup'
AND c.relkind = 'r' -- only regular tables
AND i.inhrelid IS NULL -- detached partition
ORDER BY partition_name
""",
newSeq[string](0),
rowCallback,
)
).isOkOr:
return err("dropOrphanPartitions failed in query: " & $error)
for partition in partitions:
info "orphan partition found", partitionName = partition
(await s.dropPartition(partition)).isOkOr:
error "failed to drop orphan partition", partitionName = partition, error = $error
continue
return ok()
proc getTimeCursor(
s: PostgresDriver, hashHex: string
): Future[ArchiveDriverResult[Option[Timestamp]]] {.async.} =
@ -1259,11 +1306,18 @@ proc loopPartitionFactory(
self: PostgresDriver, onFatalError: OnFatalErrorHandler
) {.async.} =
## Loop proc that continuously checks whether we need to create a new partition.
## Notice that the deletion of partitions is handled by the retention policy modules.
## Notice that the deletion of partitions is mostly handled by the retention policy modules.
## This loop only removes orphan partitions which were detached but not properly removed by the
## retention policy module due to some error. However, the main task of this loop is to create
## new partitions when needed.
info "starting loopPartitionFactory"
while true:
trace "loopPartitionFactory iteration started"
(await self.dropOrphanPartitions()).isOkOr:
onFatalError("error when dropping orphan partitions: " & $error)
trace "Check if a new partition is needed"
## Let's make the 'partition_manager' aware of the current partitions
@ -1321,14 +1375,24 @@ proc getTableSize*(
return ok(tableSize)
proc removePartition(
proc dropPartition(
self: PostgresDriver, partitionName: string
): Future[ArchiveDriverResult[void]] {.async.} =
let dropPartitionQuery = "DROP TABLE " & partitionName
info "drop partition", query = dropPartitionQuery
(await self.performWriteQuery(dropPartitionQuery)).isOkOr:
return err(fmt"error in dropPartition: {dropPartitionQuery}: " & $error)
return ok()
proc detachAndDropPartition(
self: PostgresDriver, partition: Partition
): Future[ArchiveDriverResult[void]] {.async.} =
## Removes the desired partition and also removes the rows from messages_lookup table
## Detaches and drops the desired partition and also removes the rows from messages_lookup table
## whose rows belong to the partition time range
let partitionName = partition.getName()
info "beginning of removePartition", partitionName
info "beginning of detachAndDropPartition", partitionName
let partSize = (await self.getTableSize(partitionName)).valueOr("")
@ -1351,11 +1415,8 @@ proc removePartition(
else:
return err(fmt"error in {detachPartitionQuery}: " & $error)
## Drop the partition
let dropPartitionQuery = "DROP TABLE " & partitionName
info "removeOldestPartition drop partition", query = dropPartitionQuery
(await self.performWriteQuery(dropPartitionQuery)).isOkOr:
return err(fmt"error in {dropPartitionQuery}: " & $error)
## Drop partition
?(await self.dropPartition(partitionName))
info "removed partition", partition_name = partitionName, partition_size = partSize
self.partitionMngr.removeOldestPartitionName()
@ -1380,8 +1441,18 @@ proc removePartitionsOlderThan(
var oldestPartition = self.partitionMngr.getOldestPartition().valueOr:
return err("could not get oldest partition in removePartitionOlderThan: " & $error)
while not oldestPartition.containsMoment(tsInSec):
(await self.removePartition(oldestPartition)).isOkOr:
debug "oldest partition info",
partitionName = oldestPartition.getName(),
partitionLastMoment = oldestPartition.getLastMoment(),
tsInSec
while oldestPartition.getLastMoment() < tsInSec:
info "start removing partition whose first record is older than the specified timestamp",
partitionName = oldestPartition.getName(),
partitionFirstMoment = oldestPartition.getLastMoment(),
tsInSec
(await self.detachAndDropPartition(oldestPartition)).isOkOr:
return err("issue in removePartitionsOlderThan: " & $error)
oldestPartition = self.partitionMngr.getOldestPartition().valueOr:
@ -1409,7 +1480,7 @@ proc removeOldestPartition(
info "Skipping to remove the current partition"
return ok()
return await self.removePartition(oldestPartition)
return await self.detachAndDropPartition(oldestPartition)
proc containsAnyPartition*(self: PostgresDriver): bool =
return not self.partitionMngr.isEmpty()

View File

@ -11,3 +11,6 @@ method execute*(
p: RetentionPolicy, store: ArchiveDriver
): Future[RetentionPolicyResult[void]] {.base, async.} =
discard
method `$`*(p: RetentionPolicy): string {.base, gcsafe.} =
"unknown retention policy"

View File

@ -7,7 +7,7 @@ import
./retention_policy_capacity,
./retention_policy_size
proc new*(
proc new(
T: type RetentionPolicy, retPolicy: string
): RetentionPolicyResult[Option[RetentionPolicy]] =
let retPolicy = retPolicy.toLower
@ -83,3 +83,14 @@ proc new*(
return ok(some(retPolicy))
else:
return err("unknown retention policy")
proc new*(
T: typedesc[RetentionPolicy], retPolicies: seq[string]
): RetentionPolicyResult[seq[RetentionPolicy]] =
var policies: seq[RetentionPolicy]
for retPolicy in retPolicies:
let policy = RetentionPolicy.new(retPolicy).valueOr:
return err(error)
if policy.isSome():
policies.add(policy.get())
return ok(policies)

View File

@ -50,6 +50,9 @@ proc new*(T: type CapacityRetentionPolicy, capacity = DefaultCapacity): T =
capacity: capacity, totalCapacity: totalCapacity, deleteWindow: deleteWindow
)
method `$`*(p: CapacityRetentionPolicy): string =
"capacity:" & $p.capacity
method execute*(
p: CapacityRetentionPolicy, driver: ArchiveDriver
): Future[RetentionPolicyResult[void]] {.async.} =

View File

@ -15,6 +15,9 @@ type SizeRetentionPolicy* = ref object of RetentionPolicy
proc new*(T: type SizeRetentionPolicy, size = DefaultRetentionSize): T =
SizeRetentionPolicy(sizeLimit: size)
method `$`*(p: SizeRetentionPolicy): string =
"size:" & $p.sizeLimit & "b"
method execute*(
p: SizeRetentionPolicy, driver: ArchiveDriver
): Future[RetentionPolicyResult[void]] {.async.} =

View File

@ -6,29 +6,23 @@ import ../../waku_core, ../driver, ../retention_policy
logScope:
topics = "waku archive retention_policy"
const DefaultRetentionTime*: int64 = 30.days.seconds
type TimeRetentionPolicy* = ref object of RetentionPolicy
retentionTime: chronos.Duration
proc new*(T: type TimeRetentionPolicy, retentionTime = DefaultRetentionTime): T =
proc new*(T: type TimeRetentionPolicy, retentionTime: int64): T =
TimeRetentionPolicy(retentionTime: retentionTime.seconds)
method `$`*(p: TimeRetentionPolicy): string =
"time:" & $p.retentionTime.seconds
method execute*(
p: TimeRetentionPolicy, driver: ArchiveDriver
): Future[RetentionPolicyResult[void]] {.async.} =
## Delete messages that exceed the retention time by 10% and more (batch delete for efficiency)
## Delete messages that exceed the retention time
info "beginning of executing message retention policy - time"
let omt = (await driver.getOldestMessageTimestamp()).valueOr:
return err("failed to get oldest message timestamp: " & error)
let now = getNanosecondTime(getTime().toUnixFloat())
let retentionTimestamp = now - p.retentionTime.nanoseconds
let thresholdTimestamp = retentionTimestamp - p.retentionTime.nanoseconds div 10
if thresholdTimestamp <= omt:
return ok()
(await driver.deleteMessagesOlderThanTimestamp(ts = retentionTimestamp)).isOkOr:
return err("failed to delete oldest messages: " & error)