diff --git a/tests/waku_archive/test_retention_policy.nim b/tests/waku_archive/test_retention_policy.nim index 5233d6924..582985950 100644 --- a/tests/waku_archive/test_retention_policy.nim +++ b/tests/waku_archive/test_retention_policy.nim @@ -29,7 +29,7 @@ suite "Waku Archive - Retention policy": let driver = newSqliteArchiveDriver() - let retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity) + let retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.new(capacity=capacity) var putFutures = newSeq[Future[ArchiveDriverResult[void]]]() ## When @@ -61,7 +61,7 @@ suite "Waku Archive - Retention policy": let driver = newSqliteArchiveDriver() - let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.init(size=sizeLimit) + let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.new(size=sizeLimit) var putFutures = newSeq[Future[ArchiveDriverResult[void]]]() # make sure that the db is empty to before test begins @@ -115,7 +115,7 @@ suite "Waku Archive - Retention policy": let driver = newSqliteArchiveDriver() - retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity) + retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.new(capacity=capacity) let messages = @[ fakeWakuMessage(contentTopic=DefaultContentTopic, ts=ts(0)), diff --git a/waku/waku_archive/driver.nim b/waku/waku_archive/driver.nim index a315ea0b5..fb3826663 100644 --- a/waku/waku_archive/driver.nim +++ b/waku/waku_archive/driver.nim @@ -72,6 +72,10 @@ method deleteOldestMessagesNotWithinLimit*(driver: ArchiveDriver, limit: int): Future[ArchiveDriverResult[void]] {.base, async.} = discard +method decreaseDatabaseSize*(driver: ArchiveDriver, + targetSizeInBytes: int64): + Future[ArchiveDriverResult[void]] {.base, async.} = discard + method close*(driver: ArchiveDriver): Future[ArchiveDriverResult[void]] {.base, async.} = discard diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index f5adbfb2c..443eb0f38 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -502,6 +502,40 @@ method deleteOldestMessagesNotWithinLimit*( return ok() +method decreaseDatabaseSize*(driver: PostgresDriver, + targetSizeInBytes: int64): + Future[ArchiveDriverResult[void]] {.async.} = + ## TODO: refactor this implementation and use partition management instead + ## To remove 20% of the outdated data from database + const DeleteLimit = 0.80 + + ## when db size overshoots the database limit, shread 20% of outdated messages + ## get size of database + let dbSize = (await driver.getDatabaseSize()).valueOr: + return err("failed to get database size: " & $error) + + ## database size in bytes + let totalSizeOfDB: int64 = int64(dbSize) + + if totalSizeOfDB < targetSizeInBytes: + return ok() + + ## to shread/delete messsges, get the total row/message count + let numMessages = (await driver.getMessagesCount()).valueOr: + return err("failed to get messages count: " & error) + + ## NOTE: Using SQLite vacuuming is done manually, we delete a percentage of rows + ## if vacumming is done automatically then we aim to check DB size periodially for efficient + ## retention policy implementation. + + ## 80% of the total messages are to be kept, delete others + let pageDeleteWindow = int(float(numMessages) * DeleteLimit) + + (await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow)).isOkOr: + return err("deleting oldest messages failed: " & error) + + return ok() + method close*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} = ## Close the database connection diff --git a/waku/waku_archive/driver/queue_driver/queue_driver.nim b/waku/waku_archive/driver/queue_driver/queue_driver.nim index 9fd266e6d..2db8a3d84 100644 --- a/waku/waku_archive/driver/queue_driver/queue_driver.nim +++ b/waku/waku_archive/driver/queue_driver/queue_driver.nim @@ -317,6 +317,11 @@ method deleteOldestMessagesNotWithinLimit*(driver: QueueDriver, # TODO: Implement this message_store method return err("interface method not implemented") +method decreaseDatabaseSize*(driver: QueueDriver, + targetSizeInBytes: int64): + Future[ArchiveDriverResult[void]] {.async.} = + return err("interface method not implemented") + method close*(driver: QueueDriver): Future[ArchiveDriverResult[void]] {.async.} = return ok() diff --git a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim index 1d0cbc5bc..88d1974ef 100644 --- a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim +++ b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim @@ -146,6 +146,40 @@ method deleteOldestMessagesNotWithinLimit*(s: SqliteDriver, Future[ArchiveDriverResult[void]] {.async.} = return s.db.deleteOldestMessagesNotWithinLimit(limit) +method decreaseDatabaseSize*(driver: SqliteDriver, + targetSizeInBytes: int64): + Future[ArchiveDriverResult[void]] {.async.} = + + ## To remove 20% of the outdated data from database + const DeleteLimit = 0.80 + + ## when db size overshoots the database limit, shread 20% of outdated messages + ## get size of database + let dbSize = (await driver.getDatabaseSize()).valueOr: + return err("failed to get database size: " & $error) + + ## database size in bytes + let totalSizeOfDB: int64 = int64(dbSize) + + if totalSizeOfDB < targetSizeInBytes: + return ok() + + ## to shread/delete messsges, get the total row/message count + let numMessages = (await driver.getMessagesCount()).valueOr: + return err("failed to get messages count: " & error) + + ## NOTE: Using SQLite vacuuming is done manually, we delete a percentage of rows + ## if vacumming is done automatically then we aim to check DB size periodially for efficient + ## retention policy implementation. + + ## 80% of the total messages are to be kept, delete others + let pageDeleteWindow = int(float(numMessages) * DeleteLimit) + + (await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow)).isOkOr: + return err("deleting oldest messages failed: " & error) + + return ok() + method close*(s: SqliteDriver): Future[ArchiveDriverResult[void]] {.async.} = ## Close the database connection diff --git a/waku/waku_archive/retention_policy/builder.nim b/waku/waku_archive/retention_policy/builder.nim index 0588eb4bb..86cbf68a9 100644 --- a/waku/waku_archive/retention_policy/builder.nim +++ b/waku/waku_archive/retention_policy/builder.nim @@ -39,7 +39,7 @@ proc new*(T: type RetentionPolicy, except ValueError: return err("invalid time retention policy argument") - let retPolicy: RetentionPolicy = TimeRetentionPolicy.init(retentionTimeSeconds) + let retPolicy: RetentionPolicy = TimeRetentionPolicy.new(retentionTimeSeconds) return ok(some(retPolicy)) elif policy == "capacity": @@ -49,7 +49,7 @@ proc new*(T: type RetentionPolicy, except ValueError: return err("invalid capacity retention policy argument") - let retPolicy: RetentionPolicy = CapacityRetentionPolicy.init(retentionCapacity) + let retPolicy: RetentionPolicy = CapacityRetentionPolicy.new(retentionCapacity) return ok(some(retPolicy)) elif policy == "size": @@ -85,7 +85,7 @@ proc new*(T: type RetentionPolicy, if sizeQuantity <= 0: return err("invalid size retention policy argument: a non-zero value is required") - let retPolicy: RetentionPolicy = SizeRetentionPolicy.init(sizeQuantity) + let retPolicy: RetentionPolicy = SizeRetentionPolicy.new(sizeQuantity) return ok(some(retPolicy)) else: diff --git a/waku/waku_archive/retention_policy/retention_policy_capacity.nim b/waku/waku_archive/retention_policy/retention_policy_capacity.nim index bb1d146cf..b4d36e2db 100644 --- a/waku/waku_archive/retention_policy/retention_policy_capacity.nim +++ b/waku/waku_archive/retention_policy/retention_policy_capacity.nim @@ -47,7 +47,7 @@ proc calculateOverflowWindow(capacity: int, overflow: float): int = proc calculateDeleteWindow(capacity: int, overflow: float): int = calculateOverflowWindow(capacity, overflow) div 2 -proc init*(T: type CapacityRetentionPolicy, capacity=DefaultCapacity): T = +proc new*(T: type CapacityRetentionPolicy, capacity=DefaultCapacity): T = let totalCapacity = calculateTotalCapacity(capacity, MaxOverflow) deleteWindow = calculateDeleteWindow(capacity, MaxOverflow) diff --git a/waku/waku_archive/retention_policy/retention_policy_size.nim b/waku/waku_archive/retention_policy/retention_policy_size.nim index 8b512230b..21bb152df 100644 --- a/waku/waku_archive/retention_policy/retention_policy_size.nim +++ b/waku/waku_archive/retention_policy/retention_policy_size.nim @@ -4,11 +4,9 @@ else: {.push raises: [].} import - std/times, stew/results, chronicles, - chronos, - os + chronos import ../driver, ../retention_policy @@ -19,21 +17,11 @@ logScope: # default size is 30 GiB or 32212254720.0 in bytes const DefaultRetentionSize*: int64 = 32212254720 -# to remove 20% of the outdated data from database -const DeleteLimit = 0.80 - type - # SizeRetentionPolicy implements auto delete as follows: - # - sizeLimit is the size in bytes the database can grow upto - # to reduce the size of the databases, remove the rows/number-of-messages - # DeleteLimit is the total number of messages to delete beyond this limit - # when the database size crosses the sizeLimit, then only a fraction of messages are kept, - # rest of the outdated message are deleted using deleteOldestMessagesNotWithinLimit(), - # upon deletion process the fragmented space is retrieve back using Vacuum process. SizeRetentionPolicy* = ref object of RetentionPolicy - sizeLimit: int64 + sizeLimit: int64 -proc init*(T: type SizeRetentionPolicy, size=DefaultRetentionSize): T = +proc new*(T: type SizeRetentionPolicy, size=DefaultRetentionSize): T = SizeRetentionPolicy( sizeLimit: size ) @@ -41,29 +29,8 @@ proc init*(T: type SizeRetentionPolicy, size=DefaultRetentionSize): T = method execute*(p: SizeRetentionPolicy, driver: ArchiveDriver): Future[RetentionPolicyResult[void]] {.async.} = - ## when db size overshoots the database limit, shread 20% of outdated messages - # get size of database - let dbSize = (await driver.getDatabaseSize()).valueOr: - return err("failed to get database size: " & $error) - # database size in bytes - let totalSizeOfDB: int64 = int64(dbSize) - - if totalSizeOfDB < p.sizeLimit: - return ok() - - # to shread/delete messsges, get the total row/message count - let numMessages = (await driver.getMessagesCount()).valueOr: - return err("failed to get messages count: " & error) - - # NOTE: Using SQLite vacuuming is done manually, we delete a percentage of rows - # if vacumming is done automatically then we aim to check DB size periodially for efficient - # retention policy implementation. - - # 80% of the total messages are to be kept, delete others - let pageDeleteWindow = int(float(numMessages) * DeleteLimit) - - (await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow)).isOkOr: - return err("deleting oldest messages failed: " & error) + (await driver.decreaseDatabaseSize(p.sizeLimit)).isOkOr: + return err("decreaseDatabaseSize failed: " & $error) return ok() diff --git a/waku/waku_archive/retention_policy/retention_policy_time.nim b/waku/waku_archive/retention_policy/retention_policy_time.nim index 27622f2e4..caa47e8b6 100644 --- a/waku/waku_archive/retention_policy/retention_policy_time.nim +++ b/waku/waku_archive/retention_policy/retention_policy_time.nim @@ -24,7 +24,7 @@ type TimeRetentionPolicy* = ref object of RetentionPolicy retentionTime: chronos.Duration -proc init*(T: type TimeRetentionPolicy, retentionTime=DefaultRetentionTime): T = +proc new*(T: type TimeRetentionPolicy, retentionTime=DefaultRetentionTime): T = TimeRetentionPolicy( retentionTime: retentionTime.seconds )