Revert "chore: add size retention policy (#2093)" (#2097)

This reverts commit 8897ae1a2f9ef4dac98f982a6250ed378bc3d234.

(cherry picked from commit b213b2c385b0534481448cd6e30af18e183d0504)
This commit is contained in:
Hanno Cornelius 2023-10-02 11:07:13 +02:00 committed by GitHub
parent 8897ae1a2f
commit c019016545
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 2 additions and 196 deletions

View File

@ -12,7 +12,6 @@ import
../../../waku/waku_archive/driver/sqlite_driver,
../../../waku/waku_archive/retention_policy,
../../../waku/waku_archive/retention_policy/retention_policy_capacity,
../../../waku/waku_archive/retention_policy/retention_policy_size,
../testlib/common,
../testlib/wakucore
@ -54,45 +53,6 @@ suite "Waku Archive - Retention policy":
## Cleanup
(waitFor driver.close()).expect("driver to close")
test "size retention policy - windowed message deletion":
## Given
let
# in megabytes
sizeLimit:float = 0.05
excess = 123
let driver = newTestArchiveDriver()
let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.init(size=sizeLimit)
## When
var putFutures = newSeq[Future[ArchiveDriverResult[void]]]()
var retentionFutures = newSeq[Future[ArchiveDriverResult[void]]]()
for i in 1..excess:
let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i))
putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp))
retentionFutures.add(retentionPolicy.execute(driver))
# waitFor is used to synchronously wait for the futures to complete.
discard waitFor allFinished(putFutures & retentionFutures)
## Then
# calculate the current database size
let pageSize = (waitFor driver.getPagesSize()).tryGet()
let pageCount = (waitFor driver.getPagesCount()).tryGet()
let sizeDB = float(pageCount * pageSize) / (1024.0 * 1024.0)
check:
# size of the database is used to check if the storage limit has been preserved
# check the current database size with the limitSize provided by the user
# it should be lower
sizeDB <= sizeLimit
## Cleanup
(waitFor driver.close()).expect("driver to close")
test "store capacity should be limited":
## Given

View File

@ -484,5 +484,4 @@ proc performSqliteVacuum*(db: SqliteDatabase): DatabaseResult[void] =
if resVacuum.isErr():
return err("failed to execute vacuum: " & resVacuum.error)
debug "finished sqlite database vacuuming"
ok()
debug "finished sqlite database vacuuming"

View File

@ -45,15 +45,6 @@ method getMessages*(driver: ArchiveDriver,
method getMessagesCount*(driver: ArchiveDriver):
Future[ArchiveDriverResult[int64]] {.base, async.} = discard
method getPagesCount*(driver: ArchiveDriver):
Future[ArchiveDriverResult[int64]] {.base, async.} = discard
method getPagesSize*(driver: ArchiveDriver):
Future[ArchiveDriverResult[int64]] {.base, async.} = discard
method performsVacuum*(driver: ArchiveDriver):
Future[ArchiveDriverResult[void]] {.base, async.} = discard
method getOldestMessageTimestamp*(driver: ArchiveDriver):
Future[ArchiveDriverResult[Timestamp]] {.base, async.} = discard

View File

@ -280,18 +280,6 @@ method getMessagesCount*(driver: QueueDriver):
Future[ArchiveDriverResult[int64]] {.async} =
return ok(int64(driver.len()))
method getPagesCount*(driver: QueueDriver):
Future[ArchiveDriverResult[int64]] {.async} =
return ok(int64(driver.len()))
method getPagesSize*(driver: QueueDriver):
Future[ArchiveDriverResult[int64]] {.async} =
return ok(int64(driver.len()))
method performsVacuum*(driver: QueueDriver):
Future[ArchiveDriverResult[void]] {.async.} =
return err("interface method not implemented")
method getOldestMessageTimestamp*(driver: QueueDriver):
Future[ArchiveDriverResult[Timestamp]] {.async.} =
return driver.first().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime)

View File

@ -109,18 +109,6 @@ method getMessagesCount*(s: SqliteDriver):
Future[ArchiveDriverResult[int64]] {.async.} =
return s.db.getMessageCount()
method getPagesCount*(s: SqliteDriver):
Future[ArchiveDriverResult[int64]] {.async.} =
return s.db.getPageCount()
method getPagesSize*(s: SqliteDriver):
Future[ArchiveDriverResult[int64]] {.async.} =
return s.db.getPageSize()
method performsVacuum*(s: SqliteDriver):
Future[ArchiveDriverResult[void]] {.async.} =
return s.db.performSqliteVacuum()
method getOldestMessageTimestamp*(s: SqliteDriver):
Future[ArchiveDriverResult[Timestamp]] {.async.} =
return s.db.selectOldestReceiverTimestamp()

View File

@ -11,8 +11,7 @@ import
import
../retention_policy,
./retention_policy_time,
./retention_policy_capacity,
./retention_policy_size
./retention_policy_capacity
proc new*(T: type RetentionPolicy,
retPolicy: string):
@ -52,38 +51,5 @@ proc new*(T: type RetentionPolicy,
let retPolicy: RetentionPolicy = CapacityRetentionPolicy.init(retentionCapacity)
return ok(some(retPolicy))
elif policy == "size":
var retentionSize: string
retentionSize = policyArgs
# captures the size unit such as Gb or Mb
let sizeUnit = retentionSize.substr(retentionSize.len-2)
# captures the string type number data of the size provided
let sizeQuantityStr = retentionSize.substr(0,retentionSize.len-3)
# to hold the numeric value data of size
var sizeQuantity: float
if sizeUnit in ["gb", "Gb", "GB", "gB"]:
# parse the actual value into integer type var
try:
sizeQuantity = parseFloat(sizeQuantityStr)
except ValueError:
return err("invalid size retention policy argument")
# Gb data is converted into Mb for uniform processing
sizeQuantity = sizeQuantity * 1024
elif sizeUnit in ["mb", "Mb", "MB", "mB"]:
try:
sizeQuantity = parseFloat(sizeQuantityStr)
except ValueError:
return err("invalid size retention policy argument")
else:
return err ("""invalid size retention value unit: expected "Mb" or "Gb" but got """ & sizeUnit )
if sizeQuantity <= 0:
return err("invalid size retention policy argument: a non-zero value is required")
let retPolicy: RetentionPolicy = SizeRetentionPolicy.init(sizeQuantity)
return ok(some(retPolicy))
else:
return err("unknown retention policy")

View File

@ -1,86 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/times,
stew/results,
chronicles,
chronos
import
../driver,
../retention_policy
logScope:
topics = "waku archive retention_policy"
# default size is 30 Gb
const DefaultRetentionSize*: float = 30_720
# to remove 20% of the outdated data from database
const DeleteLimit = 0.80
type
# SizeRetentionPolicy implements auto delete as follows:
# - sizeLimit is the size in megabytes (Mbs) the database can grow upto
# to reduce the size of the databases, remove the rows/number-of-messages
# DeleteLimit is the total number of messages to delete beyond this limit
# when the database size crosses the sizeLimit, then only a fraction of messages are kept,
# rest of the outdated message are deleted using deleteOldestMessagesNotWithinLimit(),
# upon deletion process the fragmented space is retrieve back using Vacuum process.
SizeRetentionPolicy* = ref object of RetentionPolicy
sizeLimit: float
proc init*(T: type SizeRetentionPolicy, size=DefaultRetentionSize): T =
SizeRetentionPolicy(
sizeLimit: size
)
method execute*(p: SizeRetentionPolicy,
driver: ArchiveDriver):
Future[RetentionPolicyResult[void]] {.async.} =
## when db size overshoots the database limit, shread 20% of outdated messages
# to get the size of the database, pageCount and PageSize is required
# get page count in "messages" database
var pageCountRes = await driver.getPagesCount()
if pageCountRes.isErr():
return err("failed to get Pages count: " & pageCountRes.error)
var pageCount: int64 = pageCountRes.value
# get page size of database
let pageSizeRes = await driver.getPagesSize()
var pageSize: int64 = int64(pageSizeRes.valueOr(0) div 1024)
if pageSize == 0:
return err("failed to get Page size: " & pageSizeRes.error)
# database size in megabytes (Mb)
var totalSizeOfDB: float = float(pageSize * pageCount)/1024.0
# check if current databse size crosses the db size limit
if totalSizeOfDB < p.sizeLimit:
return ok()
# to shread/delete messsges, get the total row/message count
var numMessagesRes = await driver.getMessagesCount()
if numMessagesRes.isErr():
return err("failed to get messages count: " & numMessagesRes.error)
var numMessages = numMessagesRes.value
# 80% of the total messages are to be kept, delete others
let pageDeleteWindow = int(float(numMessages) * DeleteLimit)
let res = await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow)
if res.isErr():
return err("deleting oldest messages failed: " & res.error)
# vacuum to get the deleted pages defragments to save storage space
# this will resize the database size
let resVaccum = await driver.performsVacuum()
if resVaccum.isErr():
return err("vacuumming failed: " & resVaccum.error)
return ok()