From 8e644bf1ca56ae962363773a0079a79bdf0ec4bc Mon Sep 17 00:00:00 2001 From: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com> Date: Mon, 2 Oct 2023 11:07:13 +0200 Subject: [PATCH] Revert "chore: add size retention policy (#2093)" (#2097) This reverts commit fc810eaf638af444c4e2002a21305fd78bca2d53. (cherry picked from commit b213b2c385b0534481448cd6e30af18e183d0504) --- tests/waku_archive/test_retention_policy.nim | 40 --------- waku/common/databases/db_sqlite.nim | 3 +- waku/waku_archive/driver.nim | 9 -- .../driver/queue_driver/queue_driver.nim | 12 --- .../driver/sqlite_driver/sqlite_driver.nim | 12 --- .../waku_archive/retention_policy/builder.nim | 36 +------- .../retention_policy_size.nim | 86 ------------------- 7 files changed, 2 insertions(+), 196 deletions(-) delete mode 100644 waku/waku_archive/retention_policy/retention_policy_size.nim diff --git a/tests/waku_archive/test_retention_policy.nim b/tests/waku_archive/test_retention_policy.nim index 948759dd8..e08455d27 100644 --- a/tests/waku_archive/test_retention_policy.nim +++ b/tests/waku_archive/test_retention_policy.nim @@ -12,7 +12,6 @@ import ../../../waku/waku_archive/driver/sqlite_driver, ../../../waku/waku_archive/retention_policy, ../../../waku/waku_archive/retention_policy/retention_policy_capacity, - ../../../waku/waku_archive/retention_policy/retention_policy_size, ../testlib/common, ../testlib/wakucore @@ -54,45 +53,6 @@ suite "Waku Archive - Retention policy": ## Cleanup (waitFor driver.close()).expect("driver to close") - - test "size retention policy - windowed message deletion": - ## Given - let - # in megabytes - sizeLimit:float = 0.05 - excess = 123 - - let driver = newTestArchiveDriver() - - let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.init(size=sizeLimit) - - ## When - - var putFutures = newSeq[Future[ArchiveDriverResult[void]]]() - var retentionFutures = newSeq[Future[ArchiveDriverResult[void]]]() - - for i in 1..excess: - let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i)) - putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)) - retentionFutures.add(retentionPolicy.execute(driver)) - - # waitFor is used to synchronously wait for the futures to complete. - discard waitFor allFinished(putFutures & retentionFutures) - - ## Then - # calculate the current database size - let pageSize = (waitFor driver.getPagesSize()).tryGet() - let pageCount = (waitFor driver.getPagesCount()).tryGet() - let sizeDB = float(pageCount * pageSize) / (1024.0 * 1024.0) - - check: - # size of the database is used to check if the storage limit has been preserved - # check the current database size with the limitSize provided by the user - # it should be lower - sizeDB <= sizeLimit - - ## Cleanup - (waitFor driver.close()).expect("driver to close") test "store capacity should be limited": ## Given diff --git a/waku/common/databases/db_sqlite.nim b/waku/common/databases/db_sqlite.nim index f8dab5f6a..48eeaa361 100644 --- a/waku/common/databases/db_sqlite.nim +++ b/waku/common/databases/db_sqlite.nim @@ -484,5 +484,4 @@ proc performSqliteVacuum*(db: SqliteDatabase): DatabaseResult[void] = if resVacuum.isErr(): return err("failed to execute vacuum: " & resVacuum.error) - debug "finished sqlite database vacuuming" - ok() \ No newline at end of file + debug "finished sqlite database vacuuming" \ No newline at end of file diff --git a/waku/waku_archive/driver.nim b/waku/waku_archive/driver.nim index 4e2f33b5e..c3f2accd1 100644 --- a/waku/waku_archive/driver.nim +++ b/waku/waku_archive/driver.nim @@ -45,15 +45,6 @@ method getMessages*(driver: ArchiveDriver, method getMessagesCount*(driver: ArchiveDriver): Future[ArchiveDriverResult[int64]] {.base, async.} = discard -method getPagesCount*(driver: ArchiveDriver): - Future[ArchiveDriverResult[int64]] {.base, async.} = discard - -method getPagesSize*(driver: ArchiveDriver): - Future[ArchiveDriverResult[int64]] {.base, async.} = discard - -method performsVacuum*(driver: ArchiveDriver): - Future[ArchiveDriverResult[void]] {.base, async.} = discard - method getOldestMessageTimestamp*(driver: ArchiveDriver): Future[ArchiveDriverResult[Timestamp]] {.base, async.} = discard diff --git a/waku/waku_archive/driver/queue_driver/queue_driver.nim b/waku/waku_archive/driver/queue_driver/queue_driver.nim index 86f72df98..9a8b54f85 100644 --- a/waku/waku_archive/driver/queue_driver/queue_driver.nim +++ b/waku/waku_archive/driver/queue_driver/queue_driver.nim @@ -280,18 +280,6 @@ method getMessagesCount*(driver: QueueDriver): Future[ArchiveDriverResult[int64]] {.async} = return ok(int64(driver.len())) -method getPagesCount*(driver: QueueDriver): - Future[ArchiveDriverResult[int64]] {.async} = - return ok(int64(driver.len())) - -method getPagesSize*(driver: QueueDriver): - Future[ArchiveDriverResult[int64]] {.async} = - return ok(int64(driver.len())) - -method performsVacuum*(driver: QueueDriver): - Future[ArchiveDriverResult[void]] {.async.} = - return err("interface method not implemented") - method getOldestMessageTimestamp*(driver: QueueDriver): Future[ArchiveDriverResult[Timestamp]] {.async.} = return driver.first().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime) diff --git a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim index 4ad9ff8b6..8746838a9 100644 --- a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim +++ b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim @@ -109,18 +109,6 @@ method getMessagesCount*(s: SqliteDriver): Future[ArchiveDriverResult[int64]] {.async.} = return s.db.getMessageCount() -method getPagesCount*(s: SqliteDriver): - Future[ArchiveDriverResult[int64]] {.async.} = - return s.db.getPageCount() - -method getPagesSize*(s: SqliteDriver): - Future[ArchiveDriverResult[int64]] {.async.} = - return s.db.getPageSize() - -method performsVacuum*(s: SqliteDriver): - Future[ArchiveDriverResult[void]] {.async.} = - return s.db.performSqliteVacuum() - method getOldestMessageTimestamp*(s: SqliteDriver): Future[ArchiveDriverResult[Timestamp]] {.async.} = return s.db.selectOldestReceiverTimestamp() diff --git a/waku/waku_archive/retention_policy/builder.nim b/waku/waku_archive/retention_policy/builder.nim index d968e922e..3cb84d797 100644 --- a/waku/waku_archive/retention_policy/builder.nim +++ b/waku/waku_archive/retention_policy/builder.nim @@ -11,8 +11,7 @@ import import ../retention_policy, ./retention_policy_time, - ./retention_policy_capacity, - ./retention_policy_size + ./retention_policy_capacity proc new*(T: type RetentionPolicy, retPolicy: string): @@ -52,38 +51,5 @@ proc new*(T: type RetentionPolicy, let retPolicy: RetentionPolicy = CapacityRetentionPolicy.init(retentionCapacity) return ok(some(retPolicy)) - elif policy == "size": - var retentionSize: string - retentionSize = policyArgs - - # captures the size unit such as Gb or Mb - let sizeUnit = retentionSize.substr(retentionSize.len-2) - # captures the string type number data of the size provided - let sizeQuantityStr = retentionSize.substr(0,retentionSize.len-3) - # to hold the numeric value data of size - var sizeQuantity: float - - if sizeUnit in ["gb", "Gb", "GB", "gB"]: - # parse the actual value into integer type var - try: - sizeQuantity = parseFloat(sizeQuantityStr) - except ValueError: - return err("invalid size retention policy argument") - # Gb data is converted into Mb for uniform processing - sizeQuantity = sizeQuantity * 1024 - elif sizeUnit in ["mb", "Mb", "MB", "mB"]: - try: - sizeQuantity = parseFloat(sizeQuantityStr) - except ValueError: - return err("invalid size retention policy argument") - else: - return err ("""invalid size retention value unit: expected "Mb" or "Gb" but got """ & sizeUnit ) - - if sizeQuantity <= 0: - return err("invalid size retention policy argument: a non-zero value is required") - - let retPolicy: RetentionPolicy = SizeRetentionPolicy.init(sizeQuantity) - return ok(some(retPolicy)) - else: return err("unknown retention policy") diff --git a/waku/waku_archive/retention_policy/retention_policy_size.nim b/waku/waku_archive/retention_policy/retention_policy_size.nim deleted file mode 100644 index 6e56ccc62..000000000 --- a/waku/waku_archive/retention_policy/retention_policy_size.nim +++ /dev/null @@ -1,86 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import - std/times, - stew/results, - chronicles, - chronos -import - ../driver, - ../retention_policy - -logScope: - topics = "waku archive retention_policy" - -# default size is 30 Gb -const DefaultRetentionSize*: float = 30_720 - -# to remove 20% of the outdated data from database -const DeleteLimit = 0.80 - -type - # SizeRetentionPolicy implements auto delete as follows: - # - sizeLimit is the size in megabytes (Mbs) the database can grow upto - # to reduce the size of the databases, remove the rows/number-of-messages - # DeleteLimit is the total number of messages to delete beyond this limit - # when the database size crosses the sizeLimit, then only a fraction of messages are kept, - # rest of the outdated message are deleted using deleteOldestMessagesNotWithinLimit(), - # upon deletion process the fragmented space is retrieve back using Vacuum process. - SizeRetentionPolicy* = ref object of RetentionPolicy - sizeLimit: float - -proc init*(T: type SizeRetentionPolicy, size=DefaultRetentionSize): T = - SizeRetentionPolicy( - sizeLimit: size - ) - -method execute*(p: SizeRetentionPolicy, - driver: ArchiveDriver): - Future[RetentionPolicyResult[void]] {.async.} = - ## when db size overshoots the database limit, shread 20% of outdated messages - - # to get the size of the database, pageCount and PageSize is required - # get page count in "messages" database - var pageCountRes = await driver.getPagesCount() - if pageCountRes.isErr(): - return err("failed to get Pages count: " & pageCountRes.error) - - var pageCount: int64 = pageCountRes.value - - # get page size of database - let pageSizeRes = await driver.getPagesSize() - var pageSize: int64 = int64(pageSizeRes.valueOr(0) div 1024) - - if pageSize == 0: - return err("failed to get Page size: " & pageSizeRes.error) - - # database size in megabytes (Mb) - var totalSizeOfDB: float = float(pageSize * pageCount)/1024.0 - - # check if current databse size crosses the db size limit - if totalSizeOfDB < p.sizeLimit: - return ok() - - # to shread/delete messsges, get the total row/message count - var numMessagesRes = await driver.getMessagesCount() - if numMessagesRes.isErr(): - return err("failed to get messages count: " & numMessagesRes.error) - var numMessages = numMessagesRes.value - - # 80% of the total messages are to be kept, delete others - let pageDeleteWindow = int(float(numMessages) * DeleteLimit) - - let res = await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow) - if res.isErr(): - return err("deleting oldest messages failed: " & res.error) - - # vacuum to get the deleted pages defragments to save storage space - # this will resize the database size - let resVaccum = await driver.performsVacuum() - if resVaccum.isErr(): - return err("vacuumming failed: " & resVaccum.error) - - return ok()