From fc810eaf638af444c4e2002a21305fd78bca2d53 Mon Sep 17 00:00:00 2001 From: Abhimanyu Date: Sat, 30 Sep 2023 11:10:52 +0530 Subject: [PATCH] chore: add size retention policy (#2093) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * chore: add retention policy with GB or MB limitation #1885 * chore: add retention policy with GB or MB limitation * chore: updated code post review- retention policy * ci: extract discordNotify to separate file Signed-off-by: Jakub Sokołowski * ci: push images to new wakuorg/nwaku repo Signed-off-by: Jakub Sokołowski * ci: enforce default Docker image tags strictly Signed-off-by: Jakub Sokołowski * ci: push GIT_REF if it looks like a version Signed-off-by: Jakub Sokołowski * fix: update wakuv2 fleet DNS discovery enrtree https://github.com/status-im/infra-misc/issues/171 * chore: resolving DNS IP and publishing it when no extIp is provided (#2030) * feat(coverage): Add simple coverage (#2067) * Add test aggregator to all directories. * Implement coverage script. * fix(ci): fix name of discord notify method Also use absolute path to load Groovy script. Signed-off-by: Jakub Sokołowski * chore(networkmonitor): refactor setConnectedPeersMetrics, make it partially concurrent, add version (#2080) * chore(networkmonitor): refactor setConnectedPeersMetrics, make it partially concurrent, add version * add more metrics, refactor how most metrics are calculated * rework metrics table fillup * reset connErr to make sure we honour successful reconnection * chore(cbindings): Adding cpp example that integrates the 'libwaku' (#2079) * Adding cpp example that integrates the `libwaku` --------- Co-authored-by: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> * fix(ci): update the dependency list in pre-release WF (#2088) * chore: adding NetConfig test suite (#2091) --------- Signed-off-by: Jakub Sokołowski Co-authored-by: Jakub Sokołowski Co-authored-by: Anton Iakimov Co-authored-by: gabrielmer <101006718+gabrielmer@users.noreply.github.com> Co-authored-by: Álex Cabeza Romero Co-authored-by: Vaclav Pavlin Co-authored-by: Ivan Folgueira Bande <128452529+Ivansete-status@users.noreply.github.com> Co-authored-by: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> --- tests/waku_archive/test_retention_policy.nim | 40 +++++++++ waku/common/databases/db_sqlite.nim | 3 +- waku/waku_archive/driver.nim | 9 ++ .../driver/queue_driver/queue_driver.nim | 12 +++ .../driver/sqlite_driver/sqlite_driver.nim | 12 +++ .../waku_archive/retention_policy/builder.nim | 36 +++++++- .../retention_policy_size.nim | 86 +++++++++++++++++++ 7 files changed, 196 insertions(+), 2 deletions(-) create mode 100644 waku/waku_archive/retention_policy/retention_policy_size.nim diff --git a/tests/waku_archive/test_retention_policy.nim b/tests/waku_archive/test_retention_policy.nim index e08455d27..948759dd8 100644 --- a/tests/waku_archive/test_retention_policy.nim +++ b/tests/waku_archive/test_retention_policy.nim @@ -12,6 +12,7 @@ import ../../../waku/waku_archive/driver/sqlite_driver, ../../../waku/waku_archive/retention_policy, ../../../waku/waku_archive/retention_policy/retention_policy_capacity, + ../../../waku/waku_archive/retention_policy/retention_policy_size, ../testlib/common, ../testlib/wakucore @@ -53,6 +54,45 @@ suite "Waku Archive - Retention policy": ## Cleanup (waitFor driver.close()).expect("driver to close") + + test "size retention policy - windowed message deletion": + ## Given + let + # in megabytes + sizeLimit:float = 0.05 + excess = 123 + + let driver = newTestArchiveDriver() + + let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.init(size=sizeLimit) + + ## When + + var putFutures = newSeq[Future[ArchiveDriverResult[void]]]() + var retentionFutures = newSeq[Future[ArchiveDriverResult[void]]]() + + for i in 1..excess: + let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i)) + putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)) + retentionFutures.add(retentionPolicy.execute(driver)) + + # waitFor is used to synchronously wait for the futures to complete. + discard waitFor allFinished(putFutures & retentionFutures) + + ## Then + # calculate the current database size + let pageSize = (waitFor driver.getPagesSize()).tryGet() + let pageCount = (waitFor driver.getPagesCount()).tryGet() + let sizeDB = float(pageCount * pageSize) / (1024.0 * 1024.0) + + check: + # size of the database is used to check if the storage limit has been preserved + # check the current database size with the limitSize provided by the user + # it should be lower + sizeDB <= sizeLimit + + ## Cleanup + (waitFor driver.close()).expect("driver to close") test "store capacity should be limited": ## Given diff --git a/waku/common/databases/db_sqlite.nim b/waku/common/databases/db_sqlite.nim index 48eeaa361..f8dab5f6a 100644 --- a/waku/common/databases/db_sqlite.nim +++ b/waku/common/databases/db_sqlite.nim @@ -484,4 +484,5 @@ proc performSqliteVacuum*(db: SqliteDatabase): DatabaseResult[void] = if resVacuum.isErr(): return err("failed to execute vacuum: " & resVacuum.error) - debug "finished sqlite database vacuuming" \ No newline at end of file + debug "finished sqlite database vacuuming" + ok() \ No newline at end of file diff --git a/waku/waku_archive/driver.nim b/waku/waku_archive/driver.nim index c3f2accd1..4e2f33b5e 100644 --- a/waku/waku_archive/driver.nim +++ b/waku/waku_archive/driver.nim @@ -45,6 +45,15 @@ method getMessages*(driver: ArchiveDriver, method getMessagesCount*(driver: ArchiveDriver): Future[ArchiveDriverResult[int64]] {.base, async.} = discard +method getPagesCount*(driver: ArchiveDriver): + Future[ArchiveDriverResult[int64]] {.base, async.} = discard + +method getPagesSize*(driver: ArchiveDriver): + Future[ArchiveDriverResult[int64]] {.base, async.} = discard + +method performsVacuum*(driver: ArchiveDriver): + Future[ArchiveDriverResult[void]] {.base, async.} = discard + method getOldestMessageTimestamp*(driver: ArchiveDriver): Future[ArchiveDriverResult[Timestamp]] {.base, async.} = discard diff --git a/waku/waku_archive/driver/queue_driver/queue_driver.nim b/waku/waku_archive/driver/queue_driver/queue_driver.nim index 9a8b54f85..86f72df98 100644 --- a/waku/waku_archive/driver/queue_driver/queue_driver.nim +++ b/waku/waku_archive/driver/queue_driver/queue_driver.nim @@ -280,6 +280,18 @@ method getMessagesCount*(driver: QueueDriver): Future[ArchiveDriverResult[int64]] {.async} = return ok(int64(driver.len())) +method getPagesCount*(driver: QueueDriver): + Future[ArchiveDriverResult[int64]] {.async} = + return ok(int64(driver.len())) + +method getPagesSize*(driver: QueueDriver): + Future[ArchiveDriverResult[int64]] {.async} = + return ok(int64(driver.len())) + +method performsVacuum*(driver: QueueDriver): + Future[ArchiveDriverResult[void]] {.async.} = + return err("interface method not implemented") + method getOldestMessageTimestamp*(driver: QueueDriver): Future[ArchiveDriverResult[Timestamp]] {.async.} = return driver.first().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime) diff --git a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim index 8746838a9..4ad9ff8b6 100644 --- a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim +++ b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim @@ -109,6 +109,18 @@ method getMessagesCount*(s: SqliteDriver): Future[ArchiveDriverResult[int64]] {.async.} = return s.db.getMessageCount() +method getPagesCount*(s: SqliteDriver): + Future[ArchiveDriverResult[int64]] {.async.} = + return s.db.getPageCount() + +method getPagesSize*(s: SqliteDriver): + Future[ArchiveDriverResult[int64]] {.async.} = + return s.db.getPageSize() + +method performsVacuum*(s: SqliteDriver): + Future[ArchiveDriverResult[void]] {.async.} = + return s.db.performSqliteVacuum() + method getOldestMessageTimestamp*(s: SqliteDriver): Future[ArchiveDriverResult[Timestamp]] {.async.} = return s.db.selectOldestReceiverTimestamp() diff --git a/waku/waku_archive/retention_policy/builder.nim b/waku/waku_archive/retention_policy/builder.nim index 3cb84d797..d968e922e 100644 --- a/waku/waku_archive/retention_policy/builder.nim +++ b/waku/waku_archive/retention_policy/builder.nim @@ -11,7 +11,8 @@ import import ../retention_policy, ./retention_policy_time, - ./retention_policy_capacity + ./retention_policy_capacity, + ./retention_policy_size proc new*(T: type RetentionPolicy, retPolicy: string): @@ -51,5 +52,38 @@ proc new*(T: type RetentionPolicy, let retPolicy: RetentionPolicy = CapacityRetentionPolicy.init(retentionCapacity) return ok(some(retPolicy)) + elif policy == "size": + var retentionSize: string + retentionSize = policyArgs + + # captures the size unit such as Gb or Mb + let sizeUnit = retentionSize.substr(retentionSize.len-2) + # captures the string type number data of the size provided + let sizeQuantityStr = retentionSize.substr(0,retentionSize.len-3) + # to hold the numeric value data of size + var sizeQuantity: float + + if sizeUnit in ["gb", "Gb", "GB", "gB"]: + # parse the actual value into integer type var + try: + sizeQuantity = parseFloat(sizeQuantityStr) + except ValueError: + return err("invalid size retention policy argument") + # Gb data is converted into Mb for uniform processing + sizeQuantity = sizeQuantity * 1024 + elif sizeUnit in ["mb", "Mb", "MB", "mB"]: + try: + sizeQuantity = parseFloat(sizeQuantityStr) + except ValueError: + return err("invalid size retention policy argument") + else: + return err ("""invalid size retention value unit: expected "Mb" or "Gb" but got """ & sizeUnit ) + + if sizeQuantity <= 0: + return err("invalid size retention policy argument: a non-zero value is required") + + let retPolicy: RetentionPolicy = SizeRetentionPolicy.init(sizeQuantity) + return ok(some(retPolicy)) + else: return err("unknown retention policy") diff --git a/waku/waku_archive/retention_policy/retention_policy_size.nim b/waku/waku_archive/retention_policy/retention_policy_size.nim new file mode 100644 index 000000000..6e56ccc62 --- /dev/null +++ b/waku/waku_archive/retention_policy/retention_policy_size.nim @@ -0,0 +1,86 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/times, + stew/results, + chronicles, + chronos +import + ../driver, + ../retention_policy + +logScope: + topics = "waku archive retention_policy" + +# default size is 30 Gb +const DefaultRetentionSize*: float = 30_720 + +# to remove 20% of the outdated data from database +const DeleteLimit = 0.80 + +type + # SizeRetentionPolicy implements auto delete as follows: + # - sizeLimit is the size in megabytes (Mbs) the database can grow upto + # to reduce the size of the databases, remove the rows/number-of-messages + # DeleteLimit is the total number of messages to delete beyond this limit + # when the database size crosses the sizeLimit, then only a fraction of messages are kept, + # rest of the outdated message are deleted using deleteOldestMessagesNotWithinLimit(), + # upon deletion process the fragmented space is retrieve back using Vacuum process. + SizeRetentionPolicy* = ref object of RetentionPolicy + sizeLimit: float + +proc init*(T: type SizeRetentionPolicy, size=DefaultRetentionSize): T = + SizeRetentionPolicy( + sizeLimit: size + ) + +method execute*(p: SizeRetentionPolicy, + driver: ArchiveDriver): + Future[RetentionPolicyResult[void]] {.async.} = + ## when db size overshoots the database limit, shread 20% of outdated messages + + # to get the size of the database, pageCount and PageSize is required + # get page count in "messages" database + var pageCountRes = await driver.getPagesCount() + if pageCountRes.isErr(): + return err("failed to get Pages count: " & pageCountRes.error) + + var pageCount: int64 = pageCountRes.value + + # get page size of database + let pageSizeRes = await driver.getPagesSize() + var pageSize: int64 = int64(pageSizeRes.valueOr(0) div 1024) + + if pageSize == 0: + return err("failed to get Page size: " & pageSizeRes.error) + + # database size in megabytes (Mb) + var totalSizeOfDB: float = float(pageSize * pageCount)/1024.0 + + # check if current databse size crosses the db size limit + if totalSizeOfDB < p.sizeLimit: + return ok() + + # to shread/delete messsges, get the total row/message count + var numMessagesRes = await driver.getMessagesCount() + if numMessagesRes.isErr(): + return err("failed to get messages count: " & numMessagesRes.error) + var numMessages = numMessagesRes.value + + # 80% of the total messages are to be kept, delete others + let pageDeleteWindow = int(float(numMessages) * DeleteLimit) + + let res = await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow) + if res.isErr(): + return err("deleting oldest messages failed: " & res.error) + + # vacuum to get the deleted pages defragments to save storage space + # this will resize the database size + let resVaccum = await driver.performsVacuum() + if resVaccum.isErr(): + return err("vacuumming failed: " & resVaccum.error) + + return ok()