From 161a10ecb0e0e0a3c329876bcf5b705f1b7411e5 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Wed, 6 Mar 2024 20:50:22 +0100 Subject: [PATCH] feat: Postgres partition implementation (#2506) * postgres: first step to implement partition management * postgres_driver: use of times.now().toTime().toUnix() instead of Moment.now() * postgres migrations: set new version to 2 * test_driver_postgres: use of assert instead of require and avoid using times.now() * postgres_driver: better implementation of the reset method with partitions * Remove createMessageTable, init, and deleteMessageTable procs * postgres: ensure we use the version 15.4 in tests * postgres_driver.nim: enhance debug logs partition addition * ci.yml: ensure logs are printed without colors * postgres_driver: starting the loop factory in an asynchronous task * postgres_driver: log partition name and size when removing a partition --- .github/workflows/ci.yml | 4 +- tests/postgres-docker-compose.yml | 2 +- tests/waku_archive/test_driver_postgres.nim | 52 ++- waku/waku_archive/driver.nim | 3 +- waku/waku_archive/driver/builder.nim | 13 + waku/waku_archive/driver/postgres_driver.nim | 11 +- .../driver/postgres_driver/migrations.nim | 2 +- .../postgres_driver/partitions_manager.nim | 105 +++++ .../postgres_driver/postgres_driver.nim | 405 +++++++++++++----- .../driver/queue_driver/queue_driver.nim | 3 +- .../driver/sqlite_driver/sqlite_driver.nim | 3 +- 11 files changed, 450 insertions(+), 153 deletions(-) create mode 100644 waku/waku_archive/driver/postgres_driver/partitions_manager.nim diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 481a5f82a..ce99e29a3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,7 +13,7 @@ concurrency: env: NPROC: 2 MAKEFLAGS: "-j${NPROC}" - NIMFLAGS: "--parallelBuild:${NPROC}" + NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none" jobs: changes: # changes detection @@ -115,7 +115,7 @@ jobs: fi if [ ${{ runner.os }} == "Linux" ]; then - sudo docker run --rm -d -e POSTGRES_PASSWORD=test123 -p 5432:5432 postgres:9.6-alpine + sudo docker run --rm -d -e POSTGRES_PASSWORD=test123 -p 5432:5432 postgres:15.4-alpine3.18 fi make V=1 LOG_LEVEL=DEBUG QUICK_AND_DIRTY_COMPILER=1 POSTGRES=1 test testwakunode2 diff --git a/tests/postgres-docker-compose.yml b/tests/postgres-docker-compose.yml index 99f93e6ab..396a14257 100644 --- a/tests/postgres-docker-compose.yml +++ b/tests/postgres-docker-compose.yml @@ -2,7 +2,7 @@ version: "3.8" services: db: - image: postgres:9.6-alpine + image: postgres:15.4-alpine3.18 restart: always environment: POSTGRES_PASSWORD: test123 diff --git a/tests/waku_archive/test_driver_postgres.nim b/tests/waku_archive/test_driver_postgres.nim index 0f0d26605..92e6aad2b 100644 --- a/tests/waku_archive/test_driver_postgres.nim +++ b/tests/waku_archive/test_driver_postgres.nim @@ -1,7 +1,7 @@ {.used.} import - std/[sequtils,times,options], + std/[sequtils,options], testutils/unittests, chronos import @@ -13,8 +13,6 @@ import ../testlib/testasync, ../testlib/postgres -proc now():int64 = getTime().toUnix() - proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveCursor = @@ -56,7 +54,7 @@ suite "Postgres driver": # Actually, the diff randomly goes between 1 and 2 seconds. # although in theory it should spend 1s because we establish 100 # connections and we spawn 100 tasks that spend ~1s each. - require diff < 20 + assert diff < 20_000_000_000 asyncTest "Insert a message": const contentTopic = "test-content-topic" @@ -69,14 +67,14 @@ suite "Postgres driver": assert putRes.isOk(), putRes.error let storedMsg = (await driver.getAllMessages()).tryGet() - require: - storedMsg.len == 1 - storedMsg.all do (item: auto) -> bool: - let (pubsubTopic, actualMsg, digest, storeTimestamp) = item - actualMsg.contentTopic == contentTopic and - pubsubTopic == DefaultPubsubTopic and - toHex(computedDigest.data) == toHex(digest) and - toHex(actualMsg.payload) == toHex(msg.payload) + + assert storedMsg.len == 1 + + let (pubsubTopic, actualMsg, digest, storeTimestamp) = storedMsg[0] + assert actualMsg.contentTopic == contentTopic + assert pubsubTopic == DefaultPubsubTopic + assert toHex(computedDigest.data) == toHex(digest) + assert toHex(actualMsg.payload) == toHex(msg.payload) asyncTest "Insert and query message": const contentTopic1 = "test-content-topic-1" @@ -96,21 +94,21 @@ suite "Postgres driver": let countMessagesRes = await driver.getMessagesCount() - require countMessagesRes.isOk() and countMessagesRes.get() == 2 + assert countMessagesRes.isOk(), $countMessagesRes.error + assert countMessagesRes.get() == 2 var messagesRes = await driver.getMessages(contentTopic = @[contentTopic1]) - require messagesRes.isOk() - require messagesRes.get().len == 1 + assert messagesRes.isOk(), $messagesRes.error + assert messagesRes.get().len == 1 # Get both content topics, check ordering messagesRes = await driver.getMessages(contentTopic = @[contentTopic1, contentTopic2]) assert messagesRes.isOk(), messagesRes.error - require: - messagesRes.get().len == 2 and - messagesRes.get()[0][1].contentTopic == contentTopic1 + assert messagesRes.get().len == 2 + assert messagesRes.get()[0][1].contentTopic == contentTopic1 # Descending order messagesRes = await driver.getMessages(contentTopic = @[contentTopic1, @@ -118,9 +116,8 @@ suite "Postgres driver": ascendingOrder = false) assert messagesRes.isOk(), messagesRes.error - require: - messagesRes.get().len == 2 and - messagesRes.get()[0][1].contentTopic == contentTopic2 + assert messagesRes.get().len == 2 + assert messagesRes.get()[0][1].contentTopic == contentTopic2 # cursor # Get both content topics @@ -130,8 +127,8 @@ suite "Postgres driver": cursor = some( computeTestCursor(pubsubTopic1, messagesRes.get()[1][1]))) - require messagesRes.isOk() - require messagesRes.get().len == 1 + assert messagesRes.isOk() + assert messagesRes.get().len == 1 # Get both content topics but one pubsub topic messagesRes = await driver.getMessages(contentTopic = @[contentTopic1, @@ -139,16 +136,15 @@ suite "Postgres driver": pubsubTopic = some(pubsubTopic1)) assert messagesRes.isOk(), messagesRes.error - require: - messagesRes.get().len == 1 and - messagesRes.get()[0][1].contentTopic == contentTopic1 + assert messagesRes.get().len == 1 + assert messagesRes.get()[0][1].contentTopic == contentTopic1 # Limit messagesRes = await driver.getMessages(contentTopic = @[contentTopic1, contentTopic2], maxPageSize = 1) assert messagesRes.isOk(), messagesRes.error - require messagesRes.get().len == 1 + assert messagesRes.get().len == 1 asyncTest "Insert true duplicated messages": # Validates that two completely equal messages can not be stored. @@ -164,5 +160,5 @@ suite "Postgres driver": putRes = await driver.put(DefaultPubsubTopic, msg2, computeDigest(msg2), computeMessageHash(DefaultPubsubTopic, msg2), msg2.timestamp) - require not putRes.isOk() + assert not putRes.isOk() diff --git a/waku/waku_archive/driver.nim b/waku/waku_archive/driver.nim index fccb14bd1..49ab57deb 100644 --- a/waku/waku_archive/driver.nim +++ b/waku/waku_archive/driver.nim @@ -73,7 +73,8 @@ method deleteOldestMessagesNotWithinLimit*(driver: ArchiveDriver, Future[ArchiveDriverResult[void]] {.base, async.} = discard method decreaseDatabaseSize*(driver: ArchiveDriver, - targetSizeInBytes: int64): + targetSizeInBytes: int64, + forceRemoval: bool = false): Future[ArchiveDriverResult[void]] {.base, async.} = discard method close*(driver: ArchiveDriver): diff --git a/waku/waku_archive/driver/builder.nim b/waku/waku_archive/driver/builder.nim index e1c93e6a0..e32dce201 100644 --- a/waku/waku_archive/driver/builder.nim +++ b/waku/waku_archive/driver/builder.nim @@ -108,6 +108,19 @@ proc new*(T: type ArchiveDriver, if migrateRes.isErr(): return err("ArchiveDriver build failed in migration: " & $migrateRes.error) + ## This should be started once we make sure the 'messages' table exists + ## Hence, this should be run after the migration is completed. + asyncSpawn driver.startPartitionFactory(onFatalErrorAction) + + info "waiting for a partition to be created" + for i in 0..<100: + if driver.containsAnyPartition(): + break + await sleepAsync(chronos.milliseconds(100)) + + if not driver.containsAnyPartition(): + onFatalErrorAction("a partition could not be created") + return ok(driver) else: diff --git a/waku/waku_archive/driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver.nim index 496005cbe..c7e908344 100644 --- a/waku/waku_archive/driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver.nim @@ -3,6 +3,13 @@ when (NimMajor, NimMinor) < (1, 4): else: {.push raises: [].} -import ./postgres_driver/postgres_driver +import + ./postgres_driver/postgres_driver, + ./postgres_driver/partitions_manager, + ./postgres_driver/postgres_healthcheck + +export + postgres_driver, + partitions_manager, + postgres_healthcheck -export postgres_driver diff --git a/waku/waku_archive/driver/postgres_driver/migrations.nim b/waku/waku_archive/driver/postgres_driver/migrations.nim index 74cfb4530..92f658db9 100644 --- a/waku/waku_archive/driver/postgres_driver/migrations.nim +++ b/waku/waku_archive/driver/postgres_driver/migrations.nim @@ -13,7 +13,7 @@ import logScope: topics = "waku archive migration" -const SchemaVersion* = 1 # increase this when there is an update in the database schema +const SchemaVersion* = 2 # increase this when there is an update in the database schema proc breakIntoStatements*(script: string): seq[string] = ## Given a full migration script, that can potentially contain a list diff --git a/waku/waku_archive/driver/postgres_driver/partitions_manager.nim b/waku/waku_archive/driver/postgres_driver/partitions_manager.nim new file mode 100644 index 000000000..a9d90d8e9 --- /dev/null +++ b/waku/waku_archive/driver/postgres_driver/partitions_manager.nim @@ -0,0 +1,105 @@ + +## This module is aimed to handle the creation and truncation of partition tables +## in order to limit the space occupied in disk by the database. +## +## The created partitions are referenced by the 'storedAt' field. +## + +import + std/deques +import + chronos, + chronicles + +logScope: + topics = "waku archive partitions_manager" + +## The time range has seconds resolution +type TimeRange* = tuple[beginning: int64, `end`: int64] + +type + Partition = object + name: string + timeRange: TimeRange + + PartitionManager* = ref object + partitions: Deque[Partition] # FIFO of partition table names. The first is the oldest partition + +proc new*(T: type PartitionManager): T = + return PartitionManager() + +proc getPartitionFromDateTime*(self: PartitionManager, + targetMoment: int64): + Result[Partition, string] = + ## Returns the partition name that might store a message containing the passed timestamp. + ## In order words, it simply returns the partition name which contains the given timestamp. + ## targetMoment - represents the time of interest, measured in seconds since epoch. + + if self.partitions.len == 0: + return err("There are no partitions") + + for partition in self.partitions: + let timeRange = partition.timeRange + + let beginning = timeRange.beginning + let `end` = timeRange.`end` + + if beginning <= targetMoment and targetMoment < `end`: + return ok(partition) + + return err("Couldn't find a partition table for given time: " & $targetMoment) + +proc getNewestPartition*(self: PartitionManager): Result[Partition, string] = + if self.partitions.len == 0: + return err("there are no partitions allocated") + + let newestPartition = self.partitions.peekLast + return ok(newestPartition) + +proc getOldestPartition*(self: PartitionManager): Result[Partition, string] = + if self.partitions.len == 0: + return err("there are no partitions allocated") + + let oldestPartition = self.partitions.peekFirst + return ok(oldestPartition) + +proc addPartitionInfo*(self: PartitionManager, + partitionName: string, + beginning: int64, + `end`: int64) = + ## The given partition range has seconds resolution. + ## We just store information of the new added partition merely to keep track of it. + let partitionInfo = Partition(name: partitionName, timeRange: (beginning, `end`)) + trace "Adding partition info" + self.partitions.addLast(partitionInfo) + +proc removeOldestPartitionName*(self: PartitionManager) = + ## Simply removed the partition from the tracked/known partitions queue. + ## Just remove it and ignore it. + discard self.partitions.popFirst() + +proc isEmpty*(self: PartitionManager): bool = + return self.partitions.len == 0 + +proc getLastMoment*(partition: Partition): int64 = + ## Considering the time range covered by the partition, this + ## returns the `end` time (number of seconds since epoch) of such range. + let lastTimeInSec = partition.timeRange.`end` + return lastTimeInSec + +proc containsMoment*(partition: Partition, time: int64): bool = + ## Returns true if the given moment is contained within the partition window, + ## 'false' otherwise. + ## time - number of seconds since epoch + if partition.timeRange.beginning <= time and + time < partition.timeRange.`end`: + return true + + return false + +proc getName*(partition: Partition): string = + return partition.name + +func `==`*(a, b: Partition): bool {.inline.} = + return a.name == b.name + diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index 71efa491d..bfabc268c 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -4,7 +4,7 @@ else: {.push raises: [].} import - std/[nre,options,sequtils,strutils,times,strformat], + std/[nre,options,sequtils,strutils,strformat,times], stew/[results,byteutils], db_postgres, postgres, @@ -16,33 +16,17 @@ import ../../common, ../../driver, ../../../common/databases/db_postgres as waku_postgres, - ./postgres_healthcheck - -export postgres_driver + ./postgres_healthcheck, + ./partitions_manager type PostgresDriver* = ref object of ArchiveDriver ## Establish a separate pools for read/write operations writeConnPool: PgAsyncPool readConnPool: PgAsyncPool -proc dropTableQuery(): string = - "DROP TABLE messages" - -proc dropVersionTableQuery(): string = - "DROP TABLE version" - -proc createTableQuery(): string = - "CREATE TABLE IF NOT EXISTS messages (" & - " pubsubTopic VARCHAR NOT NULL," & - " contentTopic VARCHAR NOT NULL," & - " payload VARCHAR," & - " version INTEGER NOT NULL," & - " timestamp BIGINT NOT NULL," & - " id VARCHAR NOT NULL," & - " messageHash VARCHAR NOT NULL," & - " storedAt BIGINT NOT NULL," & - " CONSTRAINT messageIndex PRIMARY KEY (messageHash)" & - ");" + ## Partition container + partitionMngr: PartitionManager + futLoopPartitionFactory: Future[void] const InsertRowStmtName = "InsertRow" const InsertRowStmtDefinition = @@ -111,52 +95,17 @@ proc new*(T: type PostgresDriver, if not isNil(onFatalErrorAction): asyncSpawn checkConnectivity(writeConnPool, onFatalErrorAction) - return ok(PostgresDriver(writeConnPool: writeConnPool, - readConnPool: readConnPool)) - -proc performWriteQuery*(s: PostgresDriver, - query: string): Future[ArchiveDriverResult[void]] {.async.} = - ## Executes a query that changes the database state - ## TODO: we can reduce the code a little with this proc - (await s.writeConnPool.pgQuery(query)).isOkOr: - return err(fmt"error in {query}: {error}") - - return ok() - -proc createMessageTable*(s: PostgresDriver): - Future[ArchiveDriverResult[void]] {.async.} = - - let execRes = await s.writeConnPool.pgQuery(createTableQuery()) - if execRes.isErr(): - return err("error in createMessageTable: " & execRes.error) - - return ok() - -proc deleteMessageTable(s: PostgresDriver): - Future[ArchiveDriverResult[void]] {.async.} = - - let execRes = await s.writeConnPool.pgQuery(dropTableQuery()) - if execRes.isErr(): - return err("error in deleteMessageTable: " & execRes.error) - - return ok() - -proc deleteVersionTable(s: PostgresDriver): - Future[ArchiveDriverResult[void]] {.async.} = - - let execRes = await s.writeConnPool.pgQuery(dropVersionTableQuery()) - if execRes.isErr(): - return err("error in deleteVersionTable: " & execRes.error) - - return ok() + let driver = PostgresDriver(writeConnPool: writeConnPool, + readConnPool: readConnPool, + partitionMngr: PartitionManager.new()) + return ok(driver) proc reset*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} = - ## This is only used for testing purposes, to set a fresh database at the beginning of each test - (await s.deleteMessageTable()).isOkOr: - return err("error deleting message table: " & $error) - (await s.deleteVersionTable()).isOkOr: - return err("error deleting version table: " & $error) - return ok() + ## Clear the database partitions + let targetSize = 0 + let forceRemoval = true + let ret = await s.decreaseDatabaseSize(targetSize, forceRemoval) + return ret proc rowCallbackImpl(pqResult: ptr PGresult, outRows: var seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]) = @@ -219,6 +168,8 @@ method put*(s: PostgresDriver, let version = $message.version let timestamp = $message.timestamp + debug "put PostgresDriver", timestamp = timestamp + return await s.writeConnPool.runStmt(InsertRowStmtName, InsertRowStmtDefinition, @[digest, @@ -258,6 +209,33 @@ method getAllMessages*(s: PostgresDriver): return ok(rows) +proc getPartitionsList(s: PostgresDriver): Future[ArchiveDriverResult[seq[string]]] {.async.} = + ## Retrieves the seq of partition table names. + ## e.g: @["messages_1708534333_1708534393", "messages_1708534273_1708534333"] + + var partitions: seq[string] + proc rowCallback(pqResult: ptr PGresult) = + for iRow in 0..= newestPartition.getLastMoment(): + debug "creating a new partition to contain current messages" + ## There is no partition to contain the current time. + ## This happens if the node has been stopped for quite a long time. + ## Then, let's create the needed partition to contain 'now'. + (await self.addPartition(now, PartitionsRangeInterval)).isOkOr: + onFatalError("could not add the next partition: " & $error) + + await sleepAsync(DefaultDatabasePartitionCheckTimeInterval) + +proc startPartitionFactory*(self: PostgresDriver, + onFatalError: OnFatalErrorHandler) {.async.} = + + self.futLoopPartitionFactory = self.loopPartitionFactory(onFatalError) + +proc getTableSize*(self: PostgresDriver, + tableName: string): Future[ArchiveDriverResult[string]] {.async.} = + ## Returns a human-readable representation of the size for the requested table. + ## tableName - table of interest. + + let tableSize = (await self.getStr(fmt""" + SELECT pg_size_pretty(pg_total_relation_size(C.oid)) AS "total_size" + FROM pg_class C + where relname = '{tableName}'""")).valueOr: + return err("error in getDatabaseSize: " & error) + + return ok(tableSize) + +proc removeOldestPartition(self: PostgresDriver, + forceRemoval: bool = false, ## To allow cleanup in tests + ): + Future[ArchiveDriverResult[void]] {.async.} = + ## Indirectly called from the retention policy + + let oldestPartition = self.partitionMngr.getOldestPartition().valueOr: + return err("could not remove oldest partition: " & $error) + + if not forceRemoval: + let now = times.now().toTime().toUnix() + let currentPartitionRes = self.partitionMngr.getPartitionFromDateTime(now) + if currentPartitionRes.isOk(): + ## The database contains a partition that would store current messages. + + if currentPartitionRes.get() == oldestPartition: + debug "Skipping to remove the current partition" + return ok() + + var partSize = "" + let partSizeRes = await self.getTableSize(oldestPartition.getName()) + if partSizeRes.isOk(): + partSize = partSizeRes.get() + + ## In the following lines is where the partition removal happens. + ## Detach and remove the partition concurrently to not block the parent table (messages) + let detachPartitionQuery = + "ALTER TABLE messages DETACH PARTITION " & oldestPartition.getName() & " CONCURRENTLY;" + debug "removeOldestPartition", query = detachPartitionQuery + (await self.performWriteQuery(detachPartitionQuery)).isOkOr: + return err(fmt"error in {detachPartitionQuery}: " & $error) + + ## Drop the partition + let dropPartitionQuery = "DROP TABLE " & oldestPartition.getName() + debug "removeOldestPartition drop partition", query = dropPartitionQuery + (await self.performWriteQuery(dropPartitionQuery)).isOkOr: + return err(fmt"error in {dropPartitionQuery}: " & $error) + + debug "removed partition", partition_name = oldestPartition.getName(), partition_size = partSize + self.partitionMngr.removeOldestPartitionName() + + return ok() + +proc containsAnyPartition*(self: PostgresDriver): bool = + return not self.partitionMngr.isEmpty() + +method decreaseDatabaseSize*(driver: PostgresDriver, + targetSizeInBytes: int64, + forceRemoval: bool = false): + Future[ArchiveDriverResult[void]] {.async.} = + var dbSize = (await driver.getDatabaseSize()).valueOr: + return err("decreaseDatabaseSize failed to get database size: " & $error) + + ## database size in bytes + var totalSizeOfDB: int64 = int64(dbSize) + + if totalSizeOfDB <= targetSizeInBytes: + return ok() + + debug "start reducing database size", targetSize = $targetSizeInBytes, currentSize = $totalSizeOfDB + + while totalSizeOfDB > targetSizeInBytes and driver.containsAnyPartition(): + (await driver.removeOldestPartition(forceRemoval)).isOkOr: + return err("decreaseDatabaseSize inside loop failed to remove oldest partition: " & $error) + + dbSize = (await driver.getDatabaseSize()).valueOr: + return err("decreaseDatabaseSize inside loop failed to get database size: " & $error) + + let newCurrentSize = int64(dbSize) + if newCurrentSize == totalSizeOfDB: + return err("the previous partition removal didn't clear database size") + + totalSizeOfDB = newCurrentSize + + debug "reducing database size", targetSize = $targetSizeInBytes, newCurrentSize = $totalSizeOfDB + + return ok() + method existsTable*(s: PostgresDriver, tableName: string): Future[ArchiveDriverResult[bool]] {.async.} = let query: string = fmt""" @@ -628,3 +799,5 @@ proc getCurrentVersion*(s: PostgresDriver): return err("error in getMessagesCount: " & $error) return ok(res) + + diff --git a/waku/waku_archive/driver/queue_driver/queue_driver.nim b/waku/waku_archive/driver/queue_driver/queue_driver.nim index 27d9ae963..aebc53e6f 100644 --- a/waku/waku_archive/driver/queue_driver/queue_driver.nim +++ b/waku/waku_archive/driver/queue_driver/queue_driver.nim @@ -322,7 +322,8 @@ method deleteOldestMessagesNotWithinLimit*(driver: QueueDriver, return err("interface method not implemented") method decreaseDatabaseSize*(driver: QueueDriver, - targetSizeInBytes: int64): + targetSizeInBytes: int64, + forceRemoval: bool = false): Future[ArchiveDriverResult[void]] {.async.} = return err("interface method not implemented") diff --git a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim index 5a74928c3..c8fd087e6 100644 --- a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim +++ b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim @@ -147,7 +147,8 @@ method deleteOldestMessagesNotWithinLimit*(s: SqliteDriver, return s.db.deleteOldestMessagesNotWithinLimit(limit) method decreaseDatabaseSize*(driver: SqliteDriver, - targetSizeInBytes: int64): + targetSizeInBytes: int64, + forceRemoval: bool = false): Future[ArchiveDriverResult[void]] {.async.} = ## To remove 20% of the outdated data from database