mirror of https://github.com/waku-org/nwaku.git
chore: add size retention policy (#2093)
* chore: add retention policy with GB or MB limitation #1885 * chore: add retention policy with GB or MB limitation * chore: updated code post review- retention policy * ci: extract discordNotify to separate file Signed-off-by: Jakub Sokołowski <jakub@status.im> * ci: push images to new wakuorg/nwaku repo Signed-off-by: Jakub Sokołowski <jakub@status.im> * ci: enforce default Docker image tags strictly Signed-off-by: Jakub Sokołowski <jakub@status.im> * ci: push GIT_REF if it looks like a version Signed-off-by: Jakub Sokołowski <jakub@status.im> * fix: update wakuv2 fleet DNS discovery enrtree https://github.com/status-im/infra-misc/issues/171 * chore: resolving DNS IP and publishing it when no extIp is provided (#2030) * feat(coverage): Add simple coverage (#2067) * Add test aggregator to all directories. * Implement coverage script. * fix(ci): fix name of discord notify method Also use absolute path to load Groovy script. Signed-off-by: Jakub Sokołowski <jakub@status.im> * chore(networkmonitor): refactor setConnectedPeersMetrics, make it partially concurrent, add version (#2080) * chore(networkmonitor): refactor setConnectedPeersMetrics, make it partially concurrent, add version * add more metrics, refactor how most metrics are calculated * rework metrics table fillup * reset connErr to make sure we honour successful reconnection * chore(cbindings): Adding cpp example that integrates the 'libwaku' (#2079) * Adding cpp example that integrates the `libwaku` --------- Co-authored-by: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> * fix(ci): update the dependency list in pre-release WF (#2088) * chore: adding NetConfig test suite (#2091) --------- Signed-off-by: Jakub Sokołowski <jakub@status.im> Co-authored-by: Jakub Sokołowski <jakub@status.im> Co-authored-by: Anton Iakimov <yakimant@gmail.com> Co-authored-by: gabrielmer <101006718+gabrielmer@users.noreply.github.com> Co-authored-by: Álex Cabeza Romero <alex93cabeza@gmail.com> Co-authored-by: Vaclav Pavlin <vaclav@status.im> Co-authored-by: Ivan Folgueira Bande <128452529+Ivansete-status@users.noreply.github.com> Co-authored-by: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com>
This commit is contained in:
parent
23b49ca53d
commit
8897ae1a2f
|
@ -12,6 +12,7 @@ import
|
||||||
../../../waku/waku_archive/driver/sqlite_driver,
|
../../../waku/waku_archive/driver/sqlite_driver,
|
||||||
../../../waku/waku_archive/retention_policy,
|
../../../waku/waku_archive/retention_policy,
|
||||||
../../../waku/waku_archive/retention_policy/retention_policy_capacity,
|
../../../waku/waku_archive/retention_policy/retention_policy_capacity,
|
||||||
|
../../../waku/waku_archive/retention_policy/retention_policy_size,
|
||||||
../testlib/common,
|
../testlib/common,
|
||||||
../testlib/wakucore
|
../testlib/wakucore
|
||||||
|
|
||||||
|
@ -54,6 +55,45 @@ suite "Waku Archive - Retention policy":
|
||||||
## Cleanup
|
## Cleanup
|
||||||
(waitFor driver.close()).expect("driver to close")
|
(waitFor driver.close()).expect("driver to close")
|
||||||
|
|
||||||
|
test "size retention policy - windowed message deletion":
|
||||||
|
## Given
|
||||||
|
let
|
||||||
|
# in megabytes
|
||||||
|
sizeLimit:float = 0.05
|
||||||
|
excess = 123
|
||||||
|
|
||||||
|
let driver = newTestArchiveDriver()
|
||||||
|
|
||||||
|
let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.init(size=sizeLimit)
|
||||||
|
|
||||||
|
## When
|
||||||
|
|
||||||
|
var putFutures = newSeq[Future[ArchiveDriverResult[void]]]()
|
||||||
|
var retentionFutures = newSeq[Future[ArchiveDriverResult[void]]]()
|
||||||
|
|
||||||
|
for i in 1..excess:
|
||||||
|
let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i))
|
||||||
|
putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp))
|
||||||
|
retentionFutures.add(retentionPolicy.execute(driver))
|
||||||
|
|
||||||
|
# waitFor is used to synchronously wait for the futures to complete.
|
||||||
|
discard waitFor allFinished(putFutures & retentionFutures)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
# calculate the current database size
|
||||||
|
let pageSize = (waitFor driver.getPagesSize()).tryGet()
|
||||||
|
let pageCount = (waitFor driver.getPagesCount()).tryGet()
|
||||||
|
let sizeDB = float(pageCount * pageSize) / (1024.0 * 1024.0)
|
||||||
|
|
||||||
|
check:
|
||||||
|
# size of the database is used to check if the storage limit has been preserved
|
||||||
|
# check the current database size with the limitSize provided by the user
|
||||||
|
# it should be lower
|
||||||
|
sizeDB <= sizeLimit
|
||||||
|
|
||||||
|
## Cleanup
|
||||||
|
(waitFor driver.close()).expect("driver to close")
|
||||||
|
|
||||||
test "store capacity should be limited":
|
test "store capacity should be limited":
|
||||||
## Given
|
## Given
|
||||||
const capacity = 5
|
const capacity = 5
|
||||||
|
|
|
@ -485,3 +485,4 @@ proc performSqliteVacuum*(db: SqliteDatabase): DatabaseResult[void] =
|
||||||
return err("failed to execute vacuum: " & resVacuum.error)
|
return err("failed to execute vacuum: " & resVacuum.error)
|
||||||
|
|
||||||
debug "finished sqlite database vacuuming"
|
debug "finished sqlite database vacuuming"
|
||||||
|
ok()
|
|
@ -45,6 +45,15 @@ method getMessages*(driver: ArchiveDriver,
|
||||||
method getMessagesCount*(driver: ArchiveDriver):
|
method getMessagesCount*(driver: ArchiveDriver):
|
||||||
Future[ArchiveDriverResult[int64]] {.base, async.} = discard
|
Future[ArchiveDriverResult[int64]] {.base, async.} = discard
|
||||||
|
|
||||||
|
method getPagesCount*(driver: ArchiveDriver):
|
||||||
|
Future[ArchiveDriverResult[int64]] {.base, async.} = discard
|
||||||
|
|
||||||
|
method getPagesSize*(driver: ArchiveDriver):
|
||||||
|
Future[ArchiveDriverResult[int64]] {.base, async.} = discard
|
||||||
|
|
||||||
|
method performsVacuum*(driver: ArchiveDriver):
|
||||||
|
Future[ArchiveDriverResult[void]] {.base, async.} = discard
|
||||||
|
|
||||||
method getOldestMessageTimestamp*(driver: ArchiveDriver):
|
method getOldestMessageTimestamp*(driver: ArchiveDriver):
|
||||||
Future[ArchiveDriverResult[Timestamp]] {.base, async.} = discard
|
Future[ArchiveDriverResult[Timestamp]] {.base, async.} = discard
|
||||||
|
|
||||||
|
|
|
@ -280,6 +280,18 @@ method getMessagesCount*(driver: QueueDriver):
|
||||||
Future[ArchiveDriverResult[int64]] {.async} =
|
Future[ArchiveDriverResult[int64]] {.async} =
|
||||||
return ok(int64(driver.len()))
|
return ok(int64(driver.len()))
|
||||||
|
|
||||||
|
method getPagesCount*(driver: QueueDriver):
|
||||||
|
Future[ArchiveDriverResult[int64]] {.async} =
|
||||||
|
return ok(int64(driver.len()))
|
||||||
|
|
||||||
|
method getPagesSize*(driver: QueueDriver):
|
||||||
|
Future[ArchiveDriverResult[int64]] {.async} =
|
||||||
|
return ok(int64(driver.len()))
|
||||||
|
|
||||||
|
method performsVacuum*(driver: QueueDriver):
|
||||||
|
Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
|
return err("interface method not implemented")
|
||||||
|
|
||||||
method getOldestMessageTimestamp*(driver: QueueDriver):
|
method getOldestMessageTimestamp*(driver: QueueDriver):
|
||||||
Future[ArchiveDriverResult[Timestamp]] {.async.} =
|
Future[ArchiveDriverResult[Timestamp]] {.async.} =
|
||||||
return driver.first().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime)
|
return driver.first().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime)
|
||||||
|
|
|
@ -109,6 +109,18 @@ method getMessagesCount*(s: SqliteDriver):
|
||||||
Future[ArchiveDriverResult[int64]] {.async.} =
|
Future[ArchiveDriverResult[int64]] {.async.} =
|
||||||
return s.db.getMessageCount()
|
return s.db.getMessageCount()
|
||||||
|
|
||||||
|
method getPagesCount*(s: SqliteDriver):
|
||||||
|
Future[ArchiveDriverResult[int64]] {.async.} =
|
||||||
|
return s.db.getPageCount()
|
||||||
|
|
||||||
|
method getPagesSize*(s: SqliteDriver):
|
||||||
|
Future[ArchiveDriverResult[int64]] {.async.} =
|
||||||
|
return s.db.getPageSize()
|
||||||
|
|
||||||
|
method performsVacuum*(s: SqliteDriver):
|
||||||
|
Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
|
return s.db.performSqliteVacuum()
|
||||||
|
|
||||||
method getOldestMessageTimestamp*(s: SqliteDriver):
|
method getOldestMessageTimestamp*(s: SqliteDriver):
|
||||||
Future[ArchiveDriverResult[Timestamp]] {.async.} =
|
Future[ArchiveDriverResult[Timestamp]] {.async.} =
|
||||||
return s.db.selectOldestReceiverTimestamp()
|
return s.db.selectOldestReceiverTimestamp()
|
||||||
|
|
|
@ -11,7 +11,8 @@ import
|
||||||
import
|
import
|
||||||
../retention_policy,
|
../retention_policy,
|
||||||
./retention_policy_time,
|
./retention_policy_time,
|
||||||
./retention_policy_capacity
|
./retention_policy_capacity,
|
||||||
|
./retention_policy_size
|
||||||
|
|
||||||
proc new*(T: type RetentionPolicy,
|
proc new*(T: type RetentionPolicy,
|
||||||
retPolicy: string):
|
retPolicy: string):
|
||||||
|
@ -51,5 +52,38 @@ proc new*(T: type RetentionPolicy,
|
||||||
let retPolicy: RetentionPolicy = CapacityRetentionPolicy.init(retentionCapacity)
|
let retPolicy: RetentionPolicy = CapacityRetentionPolicy.init(retentionCapacity)
|
||||||
return ok(some(retPolicy))
|
return ok(some(retPolicy))
|
||||||
|
|
||||||
|
elif policy == "size":
|
||||||
|
var retentionSize: string
|
||||||
|
retentionSize = policyArgs
|
||||||
|
|
||||||
|
# captures the size unit such as Gb or Mb
|
||||||
|
let sizeUnit = retentionSize.substr(retentionSize.len-2)
|
||||||
|
# captures the string type number data of the size provided
|
||||||
|
let sizeQuantityStr = retentionSize.substr(0,retentionSize.len-3)
|
||||||
|
# to hold the numeric value data of size
|
||||||
|
var sizeQuantity: float
|
||||||
|
|
||||||
|
if sizeUnit in ["gb", "Gb", "GB", "gB"]:
|
||||||
|
# parse the actual value into integer type var
|
||||||
|
try:
|
||||||
|
sizeQuantity = parseFloat(sizeQuantityStr)
|
||||||
|
except ValueError:
|
||||||
|
return err("invalid size retention policy argument")
|
||||||
|
# Gb data is converted into Mb for uniform processing
|
||||||
|
sizeQuantity = sizeQuantity * 1024
|
||||||
|
elif sizeUnit in ["mb", "Mb", "MB", "mB"]:
|
||||||
|
try:
|
||||||
|
sizeQuantity = parseFloat(sizeQuantityStr)
|
||||||
|
except ValueError:
|
||||||
|
return err("invalid size retention policy argument")
|
||||||
|
else:
|
||||||
|
return err ("""invalid size retention value unit: expected "Mb" or "Gb" but got """ & sizeUnit )
|
||||||
|
|
||||||
|
if sizeQuantity <= 0:
|
||||||
|
return err("invalid size retention policy argument: a non-zero value is required")
|
||||||
|
|
||||||
|
let retPolicy: RetentionPolicy = SizeRetentionPolicy.init(sizeQuantity)
|
||||||
|
return ok(some(retPolicy))
|
||||||
|
|
||||||
else:
|
else:
|
||||||
return err("unknown retention policy")
|
return err("unknown retention policy")
|
||||||
|
|
|
@ -0,0 +1,86 @@
|
||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/times,
|
||||||
|
stew/results,
|
||||||
|
chronicles,
|
||||||
|
chronos
|
||||||
|
import
|
||||||
|
../driver,
|
||||||
|
../retention_policy
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "waku archive retention_policy"
|
||||||
|
|
||||||
|
# default size is 30 Gb
|
||||||
|
const DefaultRetentionSize*: float = 30_720
|
||||||
|
|
||||||
|
# to remove 20% of the outdated data from database
|
||||||
|
const DeleteLimit = 0.80
|
||||||
|
|
||||||
|
type
|
||||||
|
# SizeRetentionPolicy implements auto delete as follows:
|
||||||
|
# - sizeLimit is the size in megabytes (Mbs) the database can grow upto
|
||||||
|
# to reduce the size of the databases, remove the rows/number-of-messages
|
||||||
|
# DeleteLimit is the total number of messages to delete beyond this limit
|
||||||
|
# when the database size crosses the sizeLimit, then only a fraction of messages are kept,
|
||||||
|
# rest of the outdated message are deleted using deleteOldestMessagesNotWithinLimit(),
|
||||||
|
# upon deletion process the fragmented space is retrieve back using Vacuum process.
|
||||||
|
SizeRetentionPolicy* = ref object of RetentionPolicy
|
||||||
|
sizeLimit: float
|
||||||
|
|
||||||
|
proc init*(T: type SizeRetentionPolicy, size=DefaultRetentionSize): T =
|
||||||
|
SizeRetentionPolicy(
|
||||||
|
sizeLimit: size
|
||||||
|
)
|
||||||
|
|
||||||
|
method execute*(p: SizeRetentionPolicy,
|
||||||
|
driver: ArchiveDriver):
|
||||||
|
Future[RetentionPolicyResult[void]] {.async.} =
|
||||||
|
## when db size overshoots the database limit, shread 20% of outdated messages
|
||||||
|
|
||||||
|
# to get the size of the database, pageCount and PageSize is required
|
||||||
|
# get page count in "messages" database
|
||||||
|
var pageCountRes = await driver.getPagesCount()
|
||||||
|
if pageCountRes.isErr():
|
||||||
|
return err("failed to get Pages count: " & pageCountRes.error)
|
||||||
|
|
||||||
|
var pageCount: int64 = pageCountRes.value
|
||||||
|
|
||||||
|
# get page size of database
|
||||||
|
let pageSizeRes = await driver.getPagesSize()
|
||||||
|
var pageSize: int64 = int64(pageSizeRes.valueOr(0) div 1024)
|
||||||
|
|
||||||
|
if pageSize == 0:
|
||||||
|
return err("failed to get Page size: " & pageSizeRes.error)
|
||||||
|
|
||||||
|
# database size in megabytes (Mb)
|
||||||
|
var totalSizeOfDB: float = float(pageSize * pageCount)/1024.0
|
||||||
|
|
||||||
|
# check if current databse size crosses the db size limit
|
||||||
|
if totalSizeOfDB < p.sizeLimit:
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
# to shread/delete messsges, get the total row/message count
|
||||||
|
var numMessagesRes = await driver.getMessagesCount()
|
||||||
|
if numMessagesRes.isErr():
|
||||||
|
return err("failed to get messages count: " & numMessagesRes.error)
|
||||||
|
var numMessages = numMessagesRes.value
|
||||||
|
|
||||||
|
# 80% of the total messages are to be kept, delete others
|
||||||
|
let pageDeleteWindow = int(float(numMessages) * DeleteLimit)
|
||||||
|
|
||||||
|
let res = await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow)
|
||||||
|
if res.isErr():
|
||||||
|
return err("deleting oldest messages failed: " & res.error)
|
||||||
|
|
||||||
|
# vacuum to get the deleted pages defragments to save storage space
|
||||||
|
# this will resize the database size
|
||||||
|
let resVaccum = await driver.performsVacuum()
|
||||||
|
if resVaccum.isErr():
|
||||||
|
return err("vacuumming failed: " & resVaccum.error)
|
||||||
|
|
||||||
|
return ok()
|
Loading…
Reference in New Issue