mirror of https://github.com/waku-org/nwaku.git
refactor: new proc to foster different size retention policy implementations (#2463)
* new proc to foster different size retention policy implementations The new proc, decreaseDatabaseSize, will have different implementations per each driver. For example, in future commits we will implement a size retention policy thanks to partitions management, in Postgres. * RetentionPolicy: use of new instead of init for ref object types * waku_archive: fix signatures in decreaseDatabaseSize methods * retention_policy_size: minor cleanup of comments and imports
This commit is contained in:
parent
f6332ac646
commit
d530528259
|
@ -29,7 +29,7 @@ suite "Waku Archive - Retention policy":
|
||||||
|
|
||||||
let driver = newSqliteArchiveDriver()
|
let driver = newSqliteArchiveDriver()
|
||||||
|
|
||||||
let retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity)
|
let retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.new(capacity=capacity)
|
||||||
var putFutures = newSeq[Future[ArchiveDriverResult[void]]]()
|
var putFutures = newSeq[Future[ArchiveDriverResult[void]]]()
|
||||||
|
|
||||||
## When
|
## When
|
||||||
|
@ -61,7 +61,7 @@ suite "Waku Archive - Retention policy":
|
||||||
|
|
||||||
let driver = newSqliteArchiveDriver()
|
let driver = newSqliteArchiveDriver()
|
||||||
|
|
||||||
let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.init(size=sizeLimit)
|
let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.new(size=sizeLimit)
|
||||||
var putFutures = newSeq[Future[ArchiveDriverResult[void]]]()
|
var putFutures = newSeq[Future[ArchiveDriverResult[void]]]()
|
||||||
|
|
||||||
# make sure that the db is empty to before test begins
|
# make sure that the db is empty to before test begins
|
||||||
|
@ -115,7 +115,7 @@ suite "Waku Archive - Retention policy":
|
||||||
|
|
||||||
let
|
let
|
||||||
driver = newSqliteArchiveDriver()
|
driver = newSqliteArchiveDriver()
|
||||||
retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity)
|
retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.new(capacity=capacity)
|
||||||
|
|
||||||
let messages = @[
|
let messages = @[
|
||||||
fakeWakuMessage(contentTopic=DefaultContentTopic, ts=ts(0)),
|
fakeWakuMessage(contentTopic=DefaultContentTopic, ts=ts(0)),
|
||||||
|
|
|
@ -72,6 +72,10 @@ method deleteOldestMessagesNotWithinLimit*(driver: ArchiveDriver,
|
||||||
limit: int):
|
limit: int):
|
||||||
Future[ArchiveDriverResult[void]] {.base, async.} = discard
|
Future[ArchiveDriverResult[void]] {.base, async.} = discard
|
||||||
|
|
||||||
|
method decreaseDatabaseSize*(driver: ArchiveDriver,
|
||||||
|
targetSizeInBytes: int64):
|
||||||
|
Future[ArchiveDriverResult[void]] {.base, async.} = discard
|
||||||
|
|
||||||
method close*(driver: ArchiveDriver):
|
method close*(driver: ArchiveDriver):
|
||||||
Future[ArchiveDriverResult[void]] {.base, async.} = discard
|
Future[ArchiveDriverResult[void]] {.base, async.} = discard
|
||||||
|
|
||||||
|
|
|
@ -502,6 +502,40 @@ method deleteOldestMessagesNotWithinLimit*(
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
|
method decreaseDatabaseSize*(driver: PostgresDriver,
|
||||||
|
targetSizeInBytes: int64):
|
||||||
|
Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
|
## TODO: refactor this implementation and use partition management instead
|
||||||
|
## To remove 20% of the outdated data from database
|
||||||
|
const DeleteLimit = 0.80
|
||||||
|
|
||||||
|
## when db size overshoots the database limit, shread 20% of outdated messages
|
||||||
|
## get size of database
|
||||||
|
let dbSize = (await driver.getDatabaseSize()).valueOr:
|
||||||
|
return err("failed to get database size: " & $error)
|
||||||
|
|
||||||
|
## database size in bytes
|
||||||
|
let totalSizeOfDB: int64 = int64(dbSize)
|
||||||
|
|
||||||
|
if totalSizeOfDB < targetSizeInBytes:
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
## to shread/delete messsges, get the total row/message count
|
||||||
|
let numMessages = (await driver.getMessagesCount()).valueOr:
|
||||||
|
return err("failed to get messages count: " & error)
|
||||||
|
|
||||||
|
## NOTE: Using SQLite vacuuming is done manually, we delete a percentage of rows
|
||||||
|
## if vacumming is done automatically then we aim to check DB size periodially for efficient
|
||||||
|
## retention policy implementation.
|
||||||
|
|
||||||
|
## 80% of the total messages are to be kept, delete others
|
||||||
|
let pageDeleteWindow = int(float(numMessages) * DeleteLimit)
|
||||||
|
|
||||||
|
(await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow)).isOkOr:
|
||||||
|
return err("deleting oldest messages failed: " & error)
|
||||||
|
|
||||||
|
return ok()
|
||||||
|
|
||||||
method close*(s: PostgresDriver):
|
method close*(s: PostgresDriver):
|
||||||
Future[ArchiveDriverResult[void]] {.async.} =
|
Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
## Close the database connection
|
## Close the database connection
|
||||||
|
|
|
@ -317,6 +317,11 @@ method deleteOldestMessagesNotWithinLimit*(driver: QueueDriver,
|
||||||
# TODO: Implement this message_store method
|
# TODO: Implement this message_store method
|
||||||
return err("interface method not implemented")
|
return err("interface method not implemented")
|
||||||
|
|
||||||
|
method decreaseDatabaseSize*(driver: QueueDriver,
|
||||||
|
targetSizeInBytes: int64):
|
||||||
|
Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
|
return err("interface method not implemented")
|
||||||
|
|
||||||
method close*(driver: QueueDriver):
|
method close*(driver: QueueDriver):
|
||||||
Future[ArchiveDriverResult[void]] {.async.} =
|
Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
return ok()
|
return ok()
|
||||||
|
|
|
@ -146,6 +146,40 @@ method deleteOldestMessagesNotWithinLimit*(s: SqliteDriver,
|
||||||
Future[ArchiveDriverResult[void]] {.async.} =
|
Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
return s.db.deleteOldestMessagesNotWithinLimit(limit)
|
return s.db.deleteOldestMessagesNotWithinLimit(limit)
|
||||||
|
|
||||||
|
method decreaseDatabaseSize*(driver: SqliteDriver,
|
||||||
|
targetSizeInBytes: int64):
|
||||||
|
Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
|
|
||||||
|
## To remove 20% of the outdated data from database
|
||||||
|
const DeleteLimit = 0.80
|
||||||
|
|
||||||
|
## when db size overshoots the database limit, shread 20% of outdated messages
|
||||||
|
## get size of database
|
||||||
|
let dbSize = (await driver.getDatabaseSize()).valueOr:
|
||||||
|
return err("failed to get database size: " & $error)
|
||||||
|
|
||||||
|
## database size in bytes
|
||||||
|
let totalSizeOfDB: int64 = int64(dbSize)
|
||||||
|
|
||||||
|
if totalSizeOfDB < targetSizeInBytes:
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
## to shread/delete messsges, get the total row/message count
|
||||||
|
let numMessages = (await driver.getMessagesCount()).valueOr:
|
||||||
|
return err("failed to get messages count: " & error)
|
||||||
|
|
||||||
|
## NOTE: Using SQLite vacuuming is done manually, we delete a percentage of rows
|
||||||
|
## if vacumming is done automatically then we aim to check DB size periodially for efficient
|
||||||
|
## retention policy implementation.
|
||||||
|
|
||||||
|
## 80% of the total messages are to be kept, delete others
|
||||||
|
let pageDeleteWindow = int(float(numMessages) * DeleteLimit)
|
||||||
|
|
||||||
|
(await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow)).isOkOr:
|
||||||
|
return err("deleting oldest messages failed: " & error)
|
||||||
|
|
||||||
|
return ok()
|
||||||
|
|
||||||
method close*(s: SqliteDriver):
|
method close*(s: SqliteDriver):
|
||||||
Future[ArchiveDriverResult[void]] {.async.} =
|
Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
## Close the database connection
|
## Close the database connection
|
||||||
|
|
|
@ -39,7 +39,7 @@ proc new*(T: type RetentionPolicy,
|
||||||
except ValueError:
|
except ValueError:
|
||||||
return err("invalid time retention policy argument")
|
return err("invalid time retention policy argument")
|
||||||
|
|
||||||
let retPolicy: RetentionPolicy = TimeRetentionPolicy.init(retentionTimeSeconds)
|
let retPolicy: RetentionPolicy = TimeRetentionPolicy.new(retentionTimeSeconds)
|
||||||
return ok(some(retPolicy))
|
return ok(some(retPolicy))
|
||||||
|
|
||||||
elif policy == "capacity":
|
elif policy == "capacity":
|
||||||
|
@ -49,7 +49,7 @@ proc new*(T: type RetentionPolicy,
|
||||||
except ValueError:
|
except ValueError:
|
||||||
return err("invalid capacity retention policy argument")
|
return err("invalid capacity retention policy argument")
|
||||||
|
|
||||||
let retPolicy: RetentionPolicy = CapacityRetentionPolicy.init(retentionCapacity)
|
let retPolicy: RetentionPolicy = CapacityRetentionPolicy.new(retentionCapacity)
|
||||||
return ok(some(retPolicy))
|
return ok(some(retPolicy))
|
||||||
|
|
||||||
elif policy == "size":
|
elif policy == "size":
|
||||||
|
@ -85,7 +85,7 @@ proc new*(T: type RetentionPolicy,
|
||||||
if sizeQuantity <= 0:
|
if sizeQuantity <= 0:
|
||||||
return err("invalid size retention policy argument: a non-zero value is required")
|
return err("invalid size retention policy argument: a non-zero value is required")
|
||||||
|
|
||||||
let retPolicy: RetentionPolicy = SizeRetentionPolicy.init(sizeQuantity)
|
let retPolicy: RetentionPolicy = SizeRetentionPolicy.new(sizeQuantity)
|
||||||
return ok(some(retPolicy))
|
return ok(some(retPolicy))
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -47,7 +47,7 @@ proc calculateOverflowWindow(capacity: int, overflow: float): int =
|
||||||
proc calculateDeleteWindow(capacity: int, overflow: float): int =
|
proc calculateDeleteWindow(capacity: int, overflow: float): int =
|
||||||
calculateOverflowWindow(capacity, overflow) div 2
|
calculateOverflowWindow(capacity, overflow) div 2
|
||||||
|
|
||||||
proc init*(T: type CapacityRetentionPolicy, capacity=DefaultCapacity): T =
|
proc new*(T: type CapacityRetentionPolicy, capacity=DefaultCapacity): T =
|
||||||
let
|
let
|
||||||
totalCapacity = calculateTotalCapacity(capacity, MaxOverflow)
|
totalCapacity = calculateTotalCapacity(capacity, MaxOverflow)
|
||||||
deleteWindow = calculateDeleteWindow(capacity, MaxOverflow)
|
deleteWindow = calculateDeleteWindow(capacity, MaxOverflow)
|
||||||
|
|
|
@ -4,11 +4,9 @@ else:
|
||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/times,
|
|
||||||
stew/results,
|
stew/results,
|
||||||
chronicles,
|
chronicles,
|
||||||
chronos,
|
chronos
|
||||||
os
|
|
||||||
import
|
import
|
||||||
../driver,
|
../driver,
|
||||||
../retention_policy
|
../retention_policy
|
||||||
|
@ -19,21 +17,11 @@ logScope:
|
||||||
# default size is 30 GiB or 32212254720.0 in bytes
|
# default size is 30 GiB or 32212254720.0 in bytes
|
||||||
const DefaultRetentionSize*: int64 = 32212254720
|
const DefaultRetentionSize*: int64 = 32212254720
|
||||||
|
|
||||||
# to remove 20% of the outdated data from database
|
|
||||||
const DeleteLimit = 0.80
|
|
||||||
|
|
||||||
type
|
type
|
||||||
# SizeRetentionPolicy implements auto delete as follows:
|
|
||||||
# - sizeLimit is the size in bytes the database can grow upto
|
|
||||||
# to reduce the size of the databases, remove the rows/number-of-messages
|
|
||||||
# DeleteLimit is the total number of messages to delete beyond this limit
|
|
||||||
# when the database size crosses the sizeLimit, then only a fraction of messages are kept,
|
|
||||||
# rest of the outdated message are deleted using deleteOldestMessagesNotWithinLimit(),
|
|
||||||
# upon deletion process the fragmented space is retrieve back using Vacuum process.
|
|
||||||
SizeRetentionPolicy* = ref object of RetentionPolicy
|
SizeRetentionPolicy* = ref object of RetentionPolicy
|
||||||
sizeLimit: int64
|
sizeLimit: int64
|
||||||
|
|
||||||
proc init*(T: type SizeRetentionPolicy, size=DefaultRetentionSize): T =
|
proc new*(T: type SizeRetentionPolicy, size=DefaultRetentionSize): T =
|
||||||
SizeRetentionPolicy(
|
SizeRetentionPolicy(
|
||||||
sizeLimit: size
|
sizeLimit: size
|
||||||
)
|
)
|
||||||
|
@ -41,29 +29,8 @@ proc init*(T: type SizeRetentionPolicy, size=DefaultRetentionSize): T =
|
||||||
method execute*(p: SizeRetentionPolicy,
|
method execute*(p: SizeRetentionPolicy,
|
||||||
driver: ArchiveDriver):
|
driver: ArchiveDriver):
|
||||||
Future[RetentionPolicyResult[void]] {.async.} =
|
Future[RetentionPolicyResult[void]] {.async.} =
|
||||||
## when db size overshoots the database limit, shread 20% of outdated messages
|
|
||||||
# get size of database
|
|
||||||
let dbSize = (await driver.getDatabaseSize()).valueOr:
|
|
||||||
return err("failed to get database size: " & $error)
|
|
||||||
|
|
||||||
# database size in bytes
|
(await driver.decreaseDatabaseSize(p.sizeLimit)).isOkOr:
|
||||||
let totalSizeOfDB: int64 = int64(dbSize)
|
return err("decreaseDatabaseSize failed: " & $error)
|
||||||
|
|
||||||
if totalSizeOfDB < p.sizeLimit:
|
|
||||||
return ok()
|
|
||||||
|
|
||||||
# to shread/delete messsges, get the total row/message count
|
|
||||||
let numMessages = (await driver.getMessagesCount()).valueOr:
|
|
||||||
return err("failed to get messages count: " & error)
|
|
||||||
|
|
||||||
# NOTE: Using SQLite vacuuming is done manually, we delete a percentage of rows
|
|
||||||
# if vacumming is done automatically then we aim to check DB size periodially for efficient
|
|
||||||
# retention policy implementation.
|
|
||||||
|
|
||||||
# 80% of the total messages are to be kept, delete others
|
|
||||||
let pageDeleteWindow = int(float(numMessages) * DeleteLimit)
|
|
||||||
|
|
||||||
(await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow)).isOkOr:
|
|
||||||
return err("deleting oldest messages failed: " & error)
|
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
|
@ -24,7 +24,7 @@ type TimeRetentionPolicy* = ref object of RetentionPolicy
|
||||||
retentionTime: chronos.Duration
|
retentionTime: chronos.Duration
|
||||||
|
|
||||||
|
|
||||||
proc init*(T: type TimeRetentionPolicy, retentionTime=DefaultRetentionTime): T =
|
proc new*(T: type TimeRetentionPolicy, retentionTime=DefaultRetentionTime): T =
|
||||||
TimeRetentionPolicy(
|
TimeRetentionPolicy(
|
||||||
retentionTime: retentionTime.seconds
|
retentionTime: retentionTime.seconds
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue