From 11461aed44e777ffde5655405c8fbce9c0c2e9c3 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Thu, 19 Mar 2026 21:37:04 +0100 Subject: [PATCH] allow union of several retention policies (#3766) * refactor retention policy to allow union of several retention policies * bug fix time retention policy * add removal of orphan partitions if any * use nim-http-utils 0.4.1 --- tests/api/test_node_conf.nim | 65 ++++++++++++- tests/wakunode2/test_cli_args.nim | 81 +++++++++++++++- tools/confutils/cli_args.nim | 4 +- vendor/nim-http-utils | 2 +- .../store_service_conf_builder.nim | 50 +++++++++- waku/factory/node_factory.nim | 4 +- waku/factory/waku_conf.nim | 2 +- waku/node/kernel_api/store.nim | 4 +- waku/waku_archive/archive.nim | 33 ++++--- .../postgres_driver/postgres_driver.nim | 95 ++++++++++++++++--- waku/waku_archive/retention_policy.nim | 3 + .../waku_archive/retention_policy/builder.nim | 13 ++- .../retention_policy_capacity.nim | 3 + .../retention_policy_size.nim | 3 + .../retention_policy_time.nim | 16 +--- 15 files changed, 323 insertions(+), 55 deletions(-) diff --git a/tests/api/test_node_conf.nim b/tests/api/test_node_conf.nim index 232ffc7d2..255dec8b2 100644 --- a/tests/api/test_node_conf.nim +++ b/tests/api/test_node_conf.nim @@ -1,7 +1,11 @@ {.used.} -import std/options, results, stint, testutils/unittests -import waku/api/api_conf, waku/factory/waku_conf, waku/factory/networks_config +import std/[options, strutils], results, stint, testutils/unittests, chronos +import + waku/api/api_conf, + waku/factory/waku_conf, + waku/factory/networks_config, + waku/factory/conf_builder/conf_builder suite "LibWaku Conf - toWakuConf": test "Minimal configuration": @@ -277,3 +281,60 @@ suite "LibWaku Conf - toWakuConf": check: wakuConf.staticNodes.len == 1 wakuConf.staticNodes[0] == entryNodes[1] + +suite "WakuConfBuilder - store retention policies": + test "Multiple retention policies": + ## Given + var b = WakuConfBuilder.init() + b.storeServiceConf.withEnabled(true) + b.storeServiceConf.withDbUrl("sqlite://test.db") + b.storeServiceConf.withRetentionPolicies(@["time:86400", "capacity:10000"]) + + ## When + let wakuConf = b.build().valueOr: + raiseAssert error + + ## Then + require wakuConf.storeServiceConf.isSome() + let storeConf = wakuConf.storeServiceConf.get() + check storeConf.retentionPolicies == @["time:86400", "capacity:10000"] + + test "Duplicated retention policies returns error": + ## Given + var b = WakuConfBuilder.init() + b.storeServiceConf.withEnabled(true) + b.storeServiceConf.withDbUrl("sqlite://test.db") + b.storeServiceConf.withRetentionPolicies(@["time:86400", "time:800", "capacity:10000"]) + + ## When + let wakuConfRes = b.build() + + ## Then + check wakuConfRes.isErr() + check wakuConfRes.error.contains("duplicated retention policy type") + + test "Incorrect retention policy type returns error": + ## Given + var b = WakuConfBuilder.init() + b.storeServiceConf.withEnabled(true) + b.storeServiceConf.withDbUrl("sqlite://test.db") + b.storeServiceConf.withRetentionPolicies(@["capaity:10000"]) + + ## When + let wakuConfRes = b.build() + + ## Then + check wakuConfRes.isErr() + check wakuConfRes.error.contains("unknown retention policy type") + + test "Store disabled - no retention policy applied": + ## Given + var b = WakuConfBuilder.init() + # storeServiceConf not enabled + + ## When + let wakuConf = b.build().valueOr: + raiseAssert error + + ## Then + check wakuConf.storeServiceConf.isNone() diff --git a/tests/wakunode2/test_cli_args.nim b/tests/wakunode2/test_cli_args.nim index dabc78083..5108b4a9d 100644 --- a/tests/wakunode2/test_cli_args.nim +++ b/tests/wakunode2/test_cli_args.nim @@ -1,7 +1,7 @@ {.used.} import - std/options, + std/[options, strutils], testutils/unittests, chronos, libp2p/crypto/[crypto, secp], @@ -261,6 +261,85 @@ suite "Waku external config - Shards": ## Then assert res.isErr(), "Invalid shard was accepted" +suite "Waku external config - store retention policy": + test "Default retention policy": + ## Given + var conf = defaultWakuNodeConf().get() + conf.store = true + conf.storeMessageDbUrl = "sqlite://test.db" + # storeMessageRetentionPolicy keeps its default: "time:<2 days in seconds>" + + ## When + let res = conf.toWakuConf() + + ## Then + assert res.isOk(), $res.error + let wakuConf = res.get() + require wakuConf.storeServiceConf.isSome() + check wakuConf.storeServiceConf.get().retentionPolicies == + @["time:" & $2.days.seconds] + + test "Single custom retention policy": + ## Given + var conf = defaultWakuNodeConf().get() + conf.store = true + conf.storeMessageDbUrl = "sqlite://test.db" + conf.storeMessageRetentionPolicy = "capacity:50000" + + ## When + let res = conf.toWakuConf() + + ## Then + assert res.isOk(), $res.error + let wakuConf = res.get() + require wakuConf.storeServiceConf.isSome() + check wakuConf.storeServiceConf.get().retentionPolicies == @["capacity:50000"] + + test "Retention policies with whitespace around semicolons and colons": + ## Given + var conf = defaultWakuNodeConf().get() + conf.store = true + conf.storeMessageDbUrl = "sqlite://test.db" + conf.storeMessageRetentionPolicy = "time:3600 ; capacity:10000 ; size : 30GB" + + ## When + let res = conf.toWakuConf() + + ## Then + assert res.isOk(), $res.error + let wakuConf = res.get() + require wakuConf.storeServiceConf.isSome() + check wakuConf.storeServiceConf.get().retentionPolicies == + @["time:3600", "capacity:10000", "size:30GB"] + + test "Invalid retention policy type returns error": + ## Given + var conf = defaultWakuNodeConf().get() + conf.store = true + conf.storeMessageDbUrl = "sqlite://test.db" + conf.storeMessageRetentionPolicy = "foo:1234" + + ## When + let res = conf.toWakuConf() + + ## Then + check res.isErr() + check res.error.contains("unknown retention policy type") + + test "Duplicated retention policy type returns error": + ## Given + var conf = defaultWakuNodeConf().get() + conf.store = true + conf.storeMessageDbUrl = "sqlite://test.db" + conf.storeMessageRetentionPolicy = "time:3600;time:7200;capacity:10000" + + ## When + let res = conf.toWakuConf() + + ## Then + check res.isErr() + check res.error.contains("duplicated retention policy type") + suite "Waku external config - http url parsing": test "Basic HTTP URLs without authentication": check string(parseCmdArg(EthRpcUrl, "https://example.com/path")) == diff --git a/tools/confutils/cli_args.nim b/tools/confutils/cli_args.nim index e6b3fc97d..994bc6a2d 100644 --- a/tools/confutils/cli_args.nim +++ b/tools/confutils/cli_args.nim @@ -347,7 +347,7 @@ hence would have reachability issues.""", storeMessageRetentionPolicy* {. desc: - "Message store retention policy. Time retention policy: 'time:'. Capacity retention policy: 'capacity:'. Size retention policy: 'size:'. Set to 'none' to disable.", + "Message store retention policy. Multiple policies may be provided as a semicolon-separated string and are applied as a union. Time retention policy: 'time:'. Capacity retention policy: 'capacity:'. Size retention policy: 'size:'. Set to 'none' to disable. Example: 'time:3600;size:1GB;capacity:100'.", defaultValue: "time:" & $2.days.seconds, name: "store-message-retention-policy" .}: string @@ -991,7 +991,7 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] = b.storeServiceConf.withEnabled(n.store) b.storeServiceConf.withSupportV2(n.legacyStore) - b.storeServiceConf.withRetentionPolicy(n.storeMessageRetentionPolicy) + b.storeServiceConf.withRetentionPolicies(n.storeMessageRetentionPolicy) b.storeServiceConf.withDbUrl(n.storeMessageDbUrl) b.storeServiceConf.withDbVacuum(n.storeMessageDbVacuum) b.storeServiceConf.withDbMigration(n.storeMessageDbMigration) diff --git a/vendor/nim-http-utils b/vendor/nim-http-utils index 08db60946..f142cb2e8 160000 --- a/vendor/nim-http-utils +++ b/vendor/nim-http-utils @@ -1 +1 @@ -Subproject commit 08db609467a0e2b5a6e8ce118bb83dba7a8a9375 +Subproject commit f142cb2e8bd812dd002a6493b6082827bb248592 diff --git a/waku/factory/conf_builder/store_service_conf_builder.nim b/waku/factory/conf_builder/store_service_conf_builder.nim index d5d48c34d..30c743e01 100644 --- a/waku/factory/conf_builder/store_service_conf_builder.nim +++ b/waku/factory/conf_builder/store_service_conf_builder.nim @@ -1,4 +1,5 @@ -import chronicles, std/options, results, chronos +import std/[options, strutils, sequtils] +import chronicles, results, chronos import ../waku_conf, ./store_sync_conf_builder logScope: @@ -15,7 +16,7 @@ type StoreServiceConfBuilder* = object dbVacuum*: Option[bool] supportV2*: Option[bool] maxNumDbConnections*: Option[int] - retentionPolicy*: Option[string] + retentionPolicies*: seq[string] resume*: Option[bool] storeSyncConf*: StoreSyncConfBuilder @@ -42,12 +43,43 @@ proc withMaxNumDbConnections*( ) = b.maxNumDbConnections = some(maxNumDbConnections) -proc withRetentionPolicy*(b: var StoreServiceConfBuilder, retentionPolicy: string) = - b.retentionPolicy = some(retentionPolicy) +proc withRetentionPolicies*(b: var StoreServiceConfBuilder, retentionPolicies: string) = + b.retentionPolicies = retentionPolicies + .multiReplace((" ", ""), ("\t", "")) + .split(";") + .mapIt(it.strip()) + .filterIt(it.len > 0) proc withResume*(b: var StoreServiceConfBuilder, resume: bool) = b.resume = some(resume) +const ValidRetentionPolicyTypes = ["time", "capacity", "size"] + +proc validateRetentionPolicies(policies: seq[string]): Result[void, string] = + var seen: seq[string] + + for p in policies: + let policy = p.multiReplace((" ", ""), ("\t", "")) + let parts = policy.split(":", 1) + if parts.len != 2 or parts[1] == "": + return err( + "invalid retention policy format: '" & policy & "', expected ':'" + ) + + let policyType = parts[0].toLowerAscii() + if policyType notin ValidRetentionPolicyTypes: + return err( + "unknown retention policy type: '" & policyType & + "', valid types are: time, capacity, size" + ) + + if policyType in seen: + return err("duplicated retention policy type: '" & policyType & "'") + + seen.add(policyType) + + return ok() + proc build*(b: StoreServiceConfBuilder): Result[Option[StoreServiceConf], string] = if not b.enabled.get(false): return ok(none(StoreServiceConf)) @@ -58,6 +90,14 @@ proc build*(b: StoreServiceConfBuilder): Result[Option[StoreServiceConf], string let storeSyncConf = b.storeSyncConf.build().valueOr: return err("Store Sync Conf failed to build") + let retentionPolicies = + if b.retentionPolicies.len == 0: + @["time:" & $2.days.seconds] + else: + validateRetentionPolicies(b.retentionPolicies).isOkOr: + return err("invalid retention policies: " & error) + b.retentionPolicies + return ok( some( StoreServiceConf( @@ -66,7 +106,7 @@ proc build*(b: StoreServiceConfBuilder): Result[Option[StoreServiceConf], string dbVacuum: b.dbVacuum.get(false), supportV2: b.supportV2.get(false), maxNumDbConnections: b.maxNumDbConnections.get(50), - retentionPolicy: b.retentionPolicy.get("time:" & $2.days.seconds), + retentionPolicies: retentionPolicies, resume: b.resume.get(false), storeSyncConf: storeSyncConf, ) diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 488d07c06..86ab75d77 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -206,10 +206,10 @@ proc setupProtocols( ).valueOr: return err("failed to setup archive driver: " & error) - let retPolicy = policy.RetentionPolicy.new(storeServiceConf.retentionPolicy).valueOr: + let retPolicies = policy.RetentionPolicy.new(storeServiceConf.retentionPolicies).valueOr: return err("failed to create retention policy: " & error) - node.mountArchive(archiveDriver, retPolicy).isOkOr: + node.mountArchive(archiveDriver, retPolicies).isOkOr: return err("failed to mount waku archive protocol: " & error) if storeServiceConf.supportV2: diff --git a/waku/factory/waku_conf.nim b/waku/factory/waku_conf.nim index 89ffb366c..3b99d1070 100644 --- a/waku/factory/waku_conf.nim +++ b/waku/factory/waku_conf.nim @@ -57,7 +57,7 @@ type StoreServiceConf* {.requiresInit.} = object dbVacuum*: bool supportV2*: bool maxNumDbConnections*: int - retentionPolicy*: string + retentionPolicies*: seq[string] resume*: bool storeSyncConf*: Option[StoreSyncConf] diff --git a/waku/node/kernel_api/store.nim b/waku/node/kernel_api/store.nim index 7edae7966..ca9917163 100644 --- a/waku/node/kernel_api/store.nim +++ b/waku/node/kernel_api/store.nim @@ -39,10 +39,10 @@ logScope: proc mountArchive*( node: WakuNode, driver: waku_archive.ArchiveDriver, - retentionPolicy = none(waku_archive.RetentionPolicy), + retentionPolicies = newSeq[waku_archive.RetentionPolicy](), ): Result[void, string] = node.wakuArchive = waku_archive.WakuArchive.new( - driver = driver, retentionPolicy = retentionPolicy + driver = driver, retentionPolicies = retentionPolicies ).valueOr: return err("error in mountArchive: " & error) diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index 707c757a3..4c4ac4bba 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -45,7 +45,7 @@ type WakuArchive* = ref object validator: MessageValidator - retentionPolicy: Option[RetentionPolicy] + retentionPolicies: seq[RetentionPolicy] retentionPolicyHandle: Future[void] metricsHandle: Future[void] @@ -72,13 +72,17 @@ proc new*( T: type WakuArchive, driver: ArchiveDriver, validator: MessageValidator = validate, - retentionPolicy = none(RetentionPolicy), + retentionPolicies = newSeq[RetentionPolicy](0), ): Result[T, string] = if driver.isNil(): return err("archive driver is Nil") - let archive = - WakuArchive(driver: driver, validator: validator, retentionPolicy: retentionPolicy) + if retentionPolicies.len == 0: + return err("at least one retention policy must be provided") + + let archive = WakuArchive( + driver: driver, validator: validator, retentionPolicies: retentionPolicies + ) return ok(archive) @@ -253,16 +257,15 @@ proc findMessages*( ) proc periodicRetentionPolicy(self: WakuArchive) {.async.} = - let policy = self.retentionPolicy.get() - while true: - info "executing message retention policy" - (await policy.execute(self.driver)).isOkOr: - waku_archive_errors.inc(labelValues = [retPolicyFailure]) - error "failed execution of retention policy", error = error - await sleepAsync(WakuArchiveDefaultRetentionPolicyIntervalWhenError) - ## in case of error, let's try again faster - continue + for policy in self.retentionPolicies: + info "executing message retention policy", policy = $policy + (await policy.execute(self.driver)).isOkOr: + waku_archive_errors.inc(labelValues = [retPolicyFailure]) + error "failed execution of retention policy", policy = $policy, error = error + await sleepAsync(WakuArchiveDefaultRetentionPolicyIntervalWhenError) + ## in case of error, let's try again faster + continue await sleepAsync(WakuArchiveDefaultRetentionPolicyInterval) @@ -279,7 +282,7 @@ proc periodicMetricReport(self: WakuArchive) {.async.} = await sleepAsync(WakuArchiveDefaultMetricsReportInterval) proc start*(self: WakuArchive) = - if self.retentionPolicy.isSome(): + if self.retentionPolicies.len > 0: self.retentionPolicyHandle = self.periodicRetentionPolicy() self.metricsHandle = self.periodicMetricReport() @@ -287,7 +290,7 @@ proc start*(self: WakuArchive) = proc stopWait*(self: WakuArchive) {.async.} = var futures: seq[Future[void]] - if self.retentionPolicy.isSome() and not self.retentionPolicyHandle.isNil(): + if not self.retentionPolicyHandle.isNil(): futures.add(self.retentionPolicyHandle.cancelAndWait()) if not self.metricsHandle.isNil: diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index 842d7cbc2..c6e50d0dd 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -16,6 +16,9 @@ import ./postgres_healthcheck, ./partitions_manager +logScope: + topics = "postgres driver" + type PostgresDriver* = ref object of ArchiveDriver ## Establish a separate pools for read/write operations writeConnPool: PgAsyncPool @@ -367,6 +370,7 @@ proc getPartitionsList( ): Future[ArchiveDriverResult[seq[string]]] {.async.} = ## Retrieves the seq of partition table names. ## e.g: @["messages_1708534333_1708534393", "messages_1708534273_1708534333"] + ## This returns the partitions that are attached to the main messages table. var partitions: seq[string] proc rowCallback(pqResult: ptr PGresult) = for iRow in 0 ..< pqResult.pqNtuples(): @@ -393,6 +397,49 @@ proc getPartitionsList( return ok(partitions) +## fwd declaration. The implementation is below. +proc dropPartition( + self: PostgresDriver, partitionName: string +): Future[ArchiveDriverResult[void]] {.async.} + +proc dropOrphanPartitions( + s: PostgresDriver +): Future[ArchiveDriverResult[void]] {.async.} = + ## Tries to remove partitions that weren't correctly removed during retention policy execution. + ## Orphan partition is a partition that is not attached to the main messages table. + ## Therefore, it is not used for queries and can be safely removed. + var partitions: seq[string] + proc rowCallback(pqResult: ptr PGresult) = + for iRow in 0 ..< pqResult.pqNtuples(): + let partitionName = $(pqgetvalue(pqResult, iRow, 0)) + partitions.add(partitionName) + + ( + await s.readConnPool.pgQuery( + """ + SELECT c.relname AS partition_name + FROM pg_class c + LEFT JOIN pg_inherits i ON i.inhrelid = c.oid + WHERE c.relname LIKE 'messages_%' + AND c.relname != 'messages_lookup' + AND c.relkind = 'r' -- only regular tables + AND i.inhrelid IS NULL -- detached partition + ORDER BY partition_name + """, + newSeq[string](0), + rowCallback, + ) + ).isOkOr: + return err("dropOrphanPartitions failed in query: " & $error) + + for partition in partitions: + info "orphan partition found", partitionName = partition + (await s.dropPartition(partition)).isOkOr: + error "failed to drop orphan partition", partitionName = partition, error = $error + continue + + return ok() + proc getTimeCursor( s: PostgresDriver, hashHex: string ): Future[ArchiveDriverResult[Option[Timestamp]]] {.async.} = @@ -1259,11 +1306,18 @@ proc loopPartitionFactory( self: PostgresDriver, onFatalError: OnFatalErrorHandler ) {.async.} = ## Loop proc that continuously checks whether we need to create a new partition. - ## Notice that the deletion of partitions is handled by the retention policy modules. + ## Notice that the deletion of partitions is mostly handled by the retention policy modules. + ## This loop only removes orphan partitions which were detached but not properly removed by the + ## retention policy module due to some error. However, the main task of this loop is to create + ## new partitions when needed. info "starting loopPartitionFactory" while true: + trace "loopPartitionFactory iteration started" + (await self.dropOrphanPartitions()).isOkOr: + onFatalError("error when dropping orphan partitions: " & $error) + trace "Check if a new partition is needed" ## Let's make the 'partition_manager' aware of the current partitions @@ -1321,14 +1375,24 @@ proc getTableSize*( return ok(tableSize) -proc removePartition( +proc dropPartition( + self: PostgresDriver, partitionName: string +): Future[ArchiveDriverResult[void]] {.async.} = + let dropPartitionQuery = "DROP TABLE " & partitionName + info "drop partition", query = dropPartitionQuery + (await self.performWriteQuery(dropPartitionQuery)).isOkOr: + return err(fmt"error in dropPartition: {dropPartitionQuery}: " & $error) + + return ok() + +proc detachAndDropPartition( self: PostgresDriver, partition: Partition ): Future[ArchiveDriverResult[void]] {.async.} = - ## Removes the desired partition and also removes the rows from messages_lookup table + ## Detaches and drops the desired partition and also removes the rows from messages_lookup table ## whose rows belong to the partition time range let partitionName = partition.getName() - info "beginning of removePartition", partitionName + info "beginning of detachAndDropPartition", partitionName let partSize = (await self.getTableSize(partitionName)).valueOr("") @@ -1351,11 +1415,8 @@ proc removePartition( else: return err(fmt"error in {detachPartitionQuery}: " & $error) - ## Drop the partition - let dropPartitionQuery = "DROP TABLE " & partitionName - info "removeOldestPartition drop partition", query = dropPartitionQuery - (await self.performWriteQuery(dropPartitionQuery)).isOkOr: - return err(fmt"error in {dropPartitionQuery}: " & $error) + ## Drop partition + ?(await self.dropPartition(partitionName)) info "removed partition", partition_name = partitionName, partition_size = partSize self.partitionMngr.removeOldestPartitionName() @@ -1380,8 +1441,18 @@ proc removePartitionsOlderThan( var oldestPartition = self.partitionMngr.getOldestPartition().valueOr: return err("could not get oldest partition in removePartitionOlderThan: " & $error) - while not oldestPartition.containsMoment(tsInSec): - (await self.removePartition(oldestPartition)).isOkOr: + debug "oldest partition info", + partitionName = oldestPartition.getName(), + partitionLastMoment = oldestPartition.getLastMoment(), + tsInSec + + while oldestPartition.getLastMoment() < tsInSec: + info "start removing partition whose first record is older than the specified timestamp", + partitionName = oldestPartition.getName(), + partitionFirstMoment = oldestPartition.getLastMoment(), + tsInSec + + (await self.detachAndDropPartition(oldestPartition)).isOkOr: return err("issue in removePartitionsOlderThan: " & $error) oldestPartition = self.partitionMngr.getOldestPartition().valueOr: @@ -1409,7 +1480,7 @@ proc removeOldestPartition( info "Skipping to remove the current partition" return ok() - return await self.removePartition(oldestPartition) + return await self.detachAndDropPartition(oldestPartition) proc containsAnyPartition*(self: PostgresDriver): bool = return not self.partitionMngr.isEmpty() diff --git a/waku/waku_archive/retention_policy.nim b/waku/waku_archive/retention_policy.nim index d4b75ee1f..c2663fb66 100644 --- a/waku/waku_archive/retention_policy.nim +++ b/waku/waku_archive/retention_policy.nim @@ -11,3 +11,6 @@ method execute*( p: RetentionPolicy, store: ArchiveDriver ): Future[RetentionPolicyResult[void]] {.base, async.} = discard + +method `$`*(p: RetentionPolicy): string {.base, gcsafe.} = + "unknown retention policy" diff --git a/waku/waku_archive/retention_policy/builder.nim b/waku/waku_archive/retention_policy/builder.nim index 6cb131bbc..7e777f4a0 100644 --- a/waku/waku_archive/retention_policy/builder.nim +++ b/waku/waku_archive/retention_policy/builder.nim @@ -7,7 +7,7 @@ import ./retention_policy_capacity, ./retention_policy_size -proc new*( +proc new( T: type RetentionPolicy, retPolicy: string ): RetentionPolicyResult[Option[RetentionPolicy]] = let retPolicy = retPolicy.toLower @@ -83,3 +83,14 @@ proc new*( return ok(some(retPolicy)) else: return err("unknown retention policy") + +proc new*( + T: typedesc[RetentionPolicy], retPolicies: seq[string] +): RetentionPolicyResult[seq[RetentionPolicy]] = + var policies: seq[RetentionPolicy] + for retPolicy in retPolicies: + let policy = RetentionPolicy.new(retPolicy).valueOr: + return err(error) + if policy.isSome(): + policies.add(policy.get()) + return ok(policies) diff --git a/waku/waku_archive/retention_policy/retention_policy_capacity.nim b/waku/waku_archive/retention_policy/retention_policy_capacity.nim index ed4dd2339..ff4da6861 100644 --- a/waku/waku_archive/retention_policy/retention_policy_capacity.nim +++ b/waku/waku_archive/retention_policy/retention_policy_capacity.nim @@ -50,6 +50,9 @@ proc new*(T: type CapacityRetentionPolicy, capacity = DefaultCapacity): T = capacity: capacity, totalCapacity: totalCapacity, deleteWindow: deleteWindow ) +method `$`*(p: CapacityRetentionPolicy): string = + "capacity:" & $p.capacity + method execute*( p: CapacityRetentionPolicy, driver: ArchiveDriver ): Future[RetentionPolicyResult[void]] {.async.} = diff --git a/waku/waku_archive/retention_policy/retention_policy_size.nim b/waku/waku_archive/retention_policy/retention_policy_size.nim index e60aba303..416d95ec0 100644 --- a/waku/waku_archive/retention_policy/retention_policy_size.nim +++ b/waku/waku_archive/retention_policy/retention_policy_size.nim @@ -15,6 +15,9 @@ type SizeRetentionPolicy* = ref object of RetentionPolicy proc new*(T: type SizeRetentionPolicy, size = DefaultRetentionSize): T = SizeRetentionPolicy(sizeLimit: size) +method `$`*(p: SizeRetentionPolicy): string = + "size:" & $p.sizeLimit & "b" + method execute*( p: SizeRetentionPolicy, driver: ArchiveDriver ): Future[RetentionPolicyResult[void]] {.async.} = diff --git a/waku/waku_archive/retention_policy/retention_policy_time.nim b/waku/waku_archive/retention_policy/retention_policy_time.nim index 6d4c0815a..12f056c7b 100644 --- a/waku/waku_archive/retention_policy/retention_policy_time.nim +++ b/waku/waku_archive/retention_policy/retention_policy_time.nim @@ -6,29 +6,23 @@ import ../../waku_core, ../driver, ../retention_policy logScope: topics = "waku archive retention_policy" -const DefaultRetentionTime*: int64 = 30.days.seconds - type TimeRetentionPolicy* = ref object of RetentionPolicy retentionTime: chronos.Duration -proc new*(T: type TimeRetentionPolicy, retentionTime = DefaultRetentionTime): T = +proc new*(T: type TimeRetentionPolicy, retentionTime: int64): T = TimeRetentionPolicy(retentionTime: retentionTime.seconds) +method `$`*(p: TimeRetentionPolicy): string = + "time:" & $p.retentionTime.seconds + method execute*( p: TimeRetentionPolicy, driver: ArchiveDriver ): Future[RetentionPolicyResult[void]] {.async.} = - ## Delete messages that exceed the retention time by 10% and more (batch delete for efficiency) + ## Delete messages that exceed the retention time info "beginning of executing message retention policy - time" - let omt = (await driver.getOldestMessageTimestamp()).valueOr: - return err("failed to get oldest message timestamp: " & error) - let now = getNanosecondTime(getTime().toUnixFloat()) let retentionTimestamp = now - p.retentionTime.nanoseconds - let thresholdTimestamp = retentionTimestamp - p.retentionTime.nanoseconds div 10 - - if thresholdTimestamp <= omt: - return ok() (await driver.deleteMessagesOlderThanTimestamp(ts = retentionTimestamp)).isOkOr: return err("failed to delete oldest messages: " & error)