mirror of
https://github.com/waku-org/nwaku.git
synced 2025-03-01 07:20:54 +00:00
fix: extended Postgres code to support retention policy + refactoring (#2244)
* updated Postgres retention policy code + refactoring * Update waku/waku_archive/driver/postgres_driver/postgres_driver.nim Co-authored-by: Simon-Pierre Vivier <simvivier@status.im> * updated code review changes * data unit fixed, processing everything in bytes now --------- Co-authored-by: Simon-Pierre Vivier <simvivier@status.im>
This commit is contained in:
parent
110de90ff6
commit
a1ed517f9c
@ -62,8 +62,8 @@ suite "Waku Archive - Retention policy":
|
|||||||
test "size retention policy - windowed message deletion":
|
test "size retention policy - windowed message deletion":
|
||||||
## Given
|
## Given
|
||||||
let
|
let
|
||||||
# in megabytes
|
# in bytes
|
||||||
sizeLimit:float = 0.05
|
sizeLimit:int64 = 52428
|
||||||
excess = 325
|
excess = 325
|
||||||
|
|
||||||
let driver = newTestArchiveDriver()
|
let driver = newTestArchiveDriver()
|
||||||
@ -92,9 +92,7 @@ suite "Waku Archive - Retention policy":
|
|||||||
|
|
||||||
## Then
|
## Then
|
||||||
# calculate the current database size
|
# calculate the current database size
|
||||||
let pageSize = (waitFor driver.getPagesSize()).tryGet()
|
let sizeDB = int64((waitFor driver.getDatabaseSize()).tryGet())
|
||||||
let pageCount = (waitFor driver.getPagesCount()).tryGet()
|
|
||||||
let sizeDB = float(pageCount * pageSize) / (1024.0 * 1024.0)
|
|
||||||
|
|
||||||
# NOTE: since vacuumin is done manually, this needs to be revisited if vacuuming done automatically
|
# NOTE: since vacuumin is done manually, this needs to be revisited if vacuuming done automatically
|
||||||
|
|
||||||
@ -105,7 +103,7 @@ suite "Waku Archive - Retention policy":
|
|||||||
require (sizeDB >= sizeLimit)
|
require (sizeDB >= sizeLimit)
|
||||||
require (waitFor retentionPolicy.execute(driver)).isOk()
|
require (waitFor retentionPolicy.execute(driver)).isOk()
|
||||||
|
|
||||||
# get the number or rows from DB
|
# get the number or rows from database
|
||||||
let rowCountAfterDeletion = (waitFor driver.getMessagesCount()).tryGet()
|
let rowCountAfterDeletion = (waitFor driver.getMessagesCount()).tryGet()
|
||||||
|
|
||||||
check:
|
check:
|
||||||
|
@ -295,6 +295,20 @@ proc getPageCount*(db: SqliteDatabase): DatabaseResult[int64] =
|
|||||||
|
|
||||||
return ok(count)
|
return ok(count)
|
||||||
|
|
||||||
|
proc getDatabaseSize*(db: SqliteDatabase): DatabaseResult[int64] =
|
||||||
|
# get the database page size in bytes
|
||||||
|
var pageSize: int64 = ?db.getPageSize()
|
||||||
|
|
||||||
|
if pageSize == 0:
|
||||||
|
return err("failed to get page size ")
|
||||||
|
|
||||||
|
# get the database page count
|
||||||
|
let pageCount = ?db.getPageCount()
|
||||||
|
|
||||||
|
let databaseSize = (pageSize * pageCount)
|
||||||
|
|
||||||
|
return ok(databaseSize)
|
||||||
|
|
||||||
proc gatherSqlitePageStats*(db: SqliteDatabase):
|
proc gatherSqlitePageStats*(db: SqliteDatabase):
|
||||||
DatabaseResult[(int64, int64, int64)] =
|
DatabaseResult[(int64, int64, int64)] =
|
||||||
let
|
let
|
||||||
|
@ -52,6 +52,9 @@ method getPagesCount*(driver: ArchiveDriver):
|
|||||||
method getPagesSize*(driver: ArchiveDriver):
|
method getPagesSize*(driver: ArchiveDriver):
|
||||||
Future[ArchiveDriverResult[int64]] {.base, async.} = discard
|
Future[ArchiveDriverResult[int64]] {.base, async.} = discard
|
||||||
|
|
||||||
|
method getDatabaseSize*(driver: ArchiveDriver):
|
||||||
|
Future[ArchiveDriverResult[int64]] {.base, async.} = discard
|
||||||
|
|
||||||
method performVacuum*(driver: ArchiveDriver):
|
method performVacuum*(driver: ArchiveDriver):
|
||||||
Future[ArchiveDriverResult[void]] {.base, async.} = discard
|
Future[ArchiveDriverResult[void]] {.base, async.} = discard
|
||||||
|
|
||||||
|
@ -434,6 +434,15 @@ proc getInt(s: PostgresDriver,
|
|||||||
|
|
||||||
return ok(retInt)
|
return ok(retInt)
|
||||||
|
|
||||||
|
method getDatabaseSize*(s: PostgresDriver):
|
||||||
|
Future[ArchiveDriverResult[int64]] {.async.} =
|
||||||
|
|
||||||
|
let intRes = (await s.getInt("SELECT pg_database_size(current_database())")).valueOr:
|
||||||
|
return err("error in getDatabaseSize: " & error)
|
||||||
|
|
||||||
|
let databaseSize: int64 = int64(intRes)
|
||||||
|
return ok(databaseSize)
|
||||||
|
|
||||||
method getMessagesCount*(s: PostgresDriver):
|
method getMessagesCount*(s: PostgresDriver):
|
||||||
Future[ArchiveDriverResult[int64]] {.async.} =
|
Future[ArchiveDriverResult[int64]] {.async.} =
|
||||||
|
|
||||||
|
@ -289,6 +289,10 @@ method getPagesSize*(driver: QueueDriver):
|
|||||||
Future[ArchiveDriverResult[int64]] {.async} =
|
Future[ArchiveDriverResult[int64]] {.async} =
|
||||||
return ok(int64(driver.len()))
|
return ok(int64(driver.len()))
|
||||||
|
|
||||||
|
method getDatabasesSize*(driver: QueueDriver):
|
||||||
|
Future[ArchiveDriverResult[int64]] {.async} =
|
||||||
|
return ok(int64(driver.len()))
|
||||||
|
|
||||||
method performVacuum*(driver: QueueDriver):
|
method performVacuum*(driver: QueueDriver):
|
||||||
Future[ArchiveDriverResult[void]] {.async.} =
|
Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
return err("interface method not implemented")
|
return err("interface method not implemented")
|
||||||
|
@ -120,6 +120,10 @@ method getPagesSize*(s: SqliteDriver):
|
|||||||
Future[ArchiveDriverResult[int64]] {.async.} =
|
Future[ArchiveDriverResult[int64]] {.async.} =
|
||||||
return s.db.getPageSize()
|
return s.db.getPageSize()
|
||||||
|
|
||||||
|
method getDatabaseSize*(s: SqliteDriver):
|
||||||
|
Future[ArchiveDriverResult[int64]] {.async.} =
|
||||||
|
return s.db.getDatabaseSize()
|
||||||
|
|
||||||
method performVacuum*(s: SqliteDriver):
|
method performVacuum*(s: SqliteDriver):
|
||||||
Future[ArchiveDriverResult[void]] {.async.} =
|
Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
return s.db.performSqliteVacuum()
|
return s.db.performSqliteVacuum()
|
||||||
|
@ -56,24 +56,27 @@ proc new*(T: type RetentionPolicy,
|
|||||||
var retentionSize: string
|
var retentionSize: string
|
||||||
retentionSize = policyArgs
|
retentionSize = policyArgs
|
||||||
|
|
||||||
# captures the size unit such as Gb or Mb
|
# captures the size unit such as GB or MB
|
||||||
let sizeUnit = retentionSize.substr(retentionSize.len-2)
|
let sizeUnit = retentionSize.substr(retentionSize.len-2)
|
||||||
# captures the string type number data of the size provided
|
# captures the string type number data of the size provided
|
||||||
let sizeQuantityStr = retentionSize.substr(0,retentionSize.len-3)
|
let sizeQuantityStr = retentionSize.substr(0,retentionSize.len-3)
|
||||||
# to hold the numeric value data of size
|
# to hold the numeric value data of size
|
||||||
var sizeQuantity: float
|
var inptSizeQuantity: float
|
||||||
|
var sizeQuantity: int64
|
||||||
|
|
||||||
if sizeUnit in ["gb", "Gb", "GB", "gB"]:
|
if sizeUnit in ["gb", "Gb", "GB", "gB"]:
|
||||||
# parse the actual value into integer type var
|
# parse the actual value into integer type var
|
||||||
try:
|
try:
|
||||||
sizeQuantity = parseFloat(sizeQuantityStr)
|
inptSizeQuantity = parseFloat(sizeQuantityStr)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
return err("invalid size retention policy argument: " & getCurrentExceptionMsg())
|
return err("invalid size retention policy argument: " & getCurrentExceptionMsg())
|
||||||
# Gb data is converted into Mb for uniform processing
|
# GB data is converted into bytes for uniform processing
|
||||||
sizeQuantity = sizeQuantity * 1024
|
sizeQuantity = int64(inptSizeQuantity * 1024.0 * 1024.0 * 1024.0)
|
||||||
elif sizeUnit in ["mb", "Mb", "MB", "mB"]:
|
elif sizeUnit in ["mb", "Mb", "MB", "mB"]:
|
||||||
try:
|
try:
|
||||||
sizeQuantity = parseFloat(sizeQuantityStr)
|
inptSizeQuantity = parseFloat(sizeQuantityStr)
|
||||||
|
# MB data is converted into bytes for uniform processing
|
||||||
|
sizeQuantity = int64(inptSizeQuantity * 1024.0 * 1024.0)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
return err("invalid size retention policy argument")
|
return err("invalid size retention policy argument")
|
||||||
else:
|
else:
|
||||||
|
@ -16,22 +16,22 @@ import
|
|||||||
logScope:
|
logScope:
|
||||||
topics = "waku archive retention_policy"
|
topics = "waku archive retention_policy"
|
||||||
|
|
||||||
# default size is 30 Gb
|
# default size is 30 GiB or 32212254720.0 in bytes
|
||||||
const DefaultRetentionSize*: float = 30_720
|
const DefaultRetentionSize*: int64 = 32212254720
|
||||||
|
|
||||||
# to remove 20% of the outdated data from database
|
# to remove 20% of the outdated data from database
|
||||||
const DeleteLimit = 0.80
|
const DeleteLimit = 0.80
|
||||||
|
|
||||||
type
|
type
|
||||||
# SizeRetentionPolicy implements auto delete as follows:
|
# SizeRetentionPolicy implements auto delete as follows:
|
||||||
# - sizeLimit is the size in megabytes (Mbs) the database can grow upto
|
# - sizeLimit is the size in bytes the database can grow upto
|
||||||
# to reduce the size of the databases, remove the rows/number-of-messages
|
# to reduce the size of the databases, remove the rows/number-of-messages
|
||||||
# DeleteLimit is the total number of messages to delete beyond this limit
|
# DeleteLimit is the total number of messages to delete beyond this limit
|
||||||
# when the database size crosses the sizeLimit, then only a fraction of messages are kept,
|
# when the database size crosses the sizeLimit, then only a fraction of messages are kept,
|
||||||
# rest of the outdated message are deleted using deleteOldestMessagesNotWithinLimit(),
|
# rest of the outdated message are deleted using deleteOldestMessagesNotWithinLimit(),
|
||||||
# upon deletion process the fragmented space is retrieve back using Vacuum process.
|
# upon deletion process the fragmented space is retrieve back using Vacuum process.
|
||||||
SizeRetentionPolicy* = ref object of RetentionPolicy
|
SizeRetentionPolicy* = ref object of RetentionPolicy
|
||||||
sizeLimit: float
|
sizeLimit: int64
|
||||||
|
|
||||||
proc init*(T: type SizeRetentionPolicy, size=DefaultRetentionSize): T =
|
proc init*(T: type SizeRetentionPolicy, size=DefaultRetentionSize): T =
|
||||||
SizeRetentionPolicy(
|
SizeRetentionPolicy(
|
||||||
@ -42,21 +42,12 @@ method execute*(p: SizeRetentionPolicy,
|
|||||||
driver: ArchiveDriver):
|
driver: ArchiveDriver):
|
||||||
Future[RetentionPolicyResult[void]] {.async.} =
|
Future[RetentionPolicyResult[void]] {.async.} =
|
||||||
## when db size overshoots the database limit, shread 20% of outdated messages
|
## when db size overshoots the database limit, shread 20% of outdated messages
|
||||||
|
# get size of database
|
||||||
|
let dbSize = (await driver.getDatabaseSize()).valueOr:
|
||||||
|
return err("failed to get database size: " & $error)
|
||||||
|
|
||||||
# get page size of database
|
# database size in bytes
|
||||||
let pageSizeRes = await driver.getPagesSize()
|
let totalSizeOfDB: int64 = int64(dbSize)
|
||||||
let pageSize: int64 = int64(pageSizeRes.valueOr(0) div 1024)
|
|
||||||
|
|
||||||
if pageSize == 0:
|
|
||||||
return err("failed to get Page size: " & pageSizeRes.error)
|
|
||||||
|
|
||||||
# to get the size of the database, pageCount and PageSize is required
|
|
||||||
# get page count in "messages" database
|
|
||||||
let pageCount = (await driver.getPagesCount()).valueOr:
|
|
||||||
return err("failed to get Pages count: " & $error)
|
|
||||||
|
|
||||||
# database size in megabytes (Mb)
|
|
||||||
let totalSizeOfDB: float = float(pageSize * pageCount)/1024.0
|
|
||||||
|
|
||||||
if totalSizeOfDB < p.sizeLimit:
|
if totalSizeOfDB < p.sizeLimit:
|
||||||
return ok()
|
return ok()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user