mirror of https://github.com/waku-org/nwaku.git
updated review code
This commit is contained in:
parent
9eedf812ab
commit
b182bdb801
|
@ -11,7 +11,7 @@ import
|
||||||
../../../waku/waku_core,
|
../../../waku/waku_core,
|
||||||
../../../waku/waku_core/message/digest,
|
../../../waku/waku_core/message/digest,
|
||||||
../../../waku/waku_archive,
|
../../../waku/waku_archive,
|
||||||
../../../waku/waku_archive/driver/postgres_driver,
|
../../../waku/waku_archive/driver/postgres_driver,
|
||||||
../../../waku/waku_archive/retention_policy,
|
../../../waku/waku_archive/retention_policy,
|
||||||
../../../waku/waku_archive/retention_policy/retention_policy_capacity,
|
../../../waku/waku_archive/retention_policy/retention_policy_capacity,
|
||||||
../../../waku/waku_archive/retention_policy/retention_policy_size,
|
../../../waku/waku_archive/retention_policy/retention_policy_size,
|
||||||
|
@ -57,9 +57,8 @@ suite "Waku Archive - Retention policy":
|
||||||
## Then
|
## Then
|
||||||
let numMessages = (waitFor driver.getMessagesCount()).tryGet()
|
let numMessages = (waitFor driver.getMessagesCount()).tryGet()
|
||||||
check:
|
check:
|
||||||
# Expected number of messages is 120 because
|
# Expected number of messages is 115 because
|
||||||
# (capacity = 100) + (half of the overflow window = 15) + (5 messages added after after the last delete)
|
# (capacity = 100) + (half of the overflow window = 15)
|
||||||
# the window size changes when changing `const maxStoreOverflow = 1.3 in sqlite_store
|
|
||||||
numMessages == 115
|
numMessages == 115
|
||||||
|
|
||||||
## Cleanup
|
## Cleanup
|
||||||
|
@ -68,52 +67,51 @@ suite "Waku Archive - Retention policy":
|
||||||
test "size retention policy - windowed message deletion":
|
test "size retention policy - windowed message deletion":
|
||||||
## Given
|
## Given
|
||||||
let driver = newTestPostgresDriver()
|
let driver = newTestPostgresDriver()
|
||||||
let dbEngine = driver.getDbType()
|
|
||||||
|
|
||||||
# make sure that the db is empty to before test begins
|
# make sure that the db is empty to before test begins
|
||||||
let storedMsg = (waitFor driver.getAllMessages()).tryGet()
|
let storedMsg = (waitFor driver.getAllMessages()).tryGet()
|
||||||
# if there are messages in db, empty them
|
# if there are messages in db, delete them before the test begins
|
||||||
if storedMsg.len > 0:
|
if storedMsg.len > 0:
|
||||||
let now = getNanosecondTime(getTime().toUnixFloat())
|
let now = getNanosecondTime(getTime().toUnixFloat())
|
||||||
require (waitFor driver.deleteMessagesOlderThanTimestamp(ts=now)).isOk()
|
require (waitFor driver.deleteMessagesOlderThanTimestamp(ts=now)).isOk()
|
||||||
require (waitFor driver.performVacuum()).isOk()
|
require (waitFor driver.performVacuum()).isOk()
|
||||||
|
|
||||||
# in bytes
|
# get the minimum/empty size of the database
|
||||||
let sizeLimit:int64 = 10422851
|
let sizeLimit = int64((waitFor driver.getDatabaseSize()).tryGet())
|
||||||
let excess = 800
|
let num_messages = 100
|
||||||
|
let retryLimit = 4
|
||||||
|
|
||||||
let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.init(size=sizeLimit)
|
let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.init(size=sizeLimit)
|
||||||
var putFutures = newSeq[Future[ArchiveDriverResult[void]]]()
|
var putFutures = newSeq[Future[ArchiveDriverResult[void]]]()
|
||||||
|
|
||||||
## When
|
## When
|
||||||
##
|
# create a number of messages to increase DB size
|
||||||
|
for i in 1..num_messages:
|
||||||
# create a number of messages so that the size of the DB overshoots
|
let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i))
|
||||||
for i in 1..excess:
|
putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp))
|
||||||
let msg = fakeWakuMessage(payload= @[byte (i*10)], contentTopic=DefaultContentTopic, ts=Timestamp(i))
|
|
||||||
putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp))
|
|
||||||
|
|
||||||
# waitFor is used to synchronously wait for the futures to complete.
|
# waitFor is used to synchronously wait for the futures to complete.
|
||||||
discard waitFor allFinished(putFutures)
|
discard waitFor allFinished(putFutures)
|
||||||
|
sleep(150)
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
# calculate the current database size
|
# calculate the current database size
|
||||||
var sizeDB = int64((waitFor driver.getDatabaseSize()).tryGet())
|
let sizeBeforeRetPolicy = int64((waitFor driver.getDatabaseSize()).tryGet())
|
||||||
|
var sizeAfterRetPolicy:int64 = sizeBeforeRetPolicy
|
||||||
|
var retryCounter = 0
|
||||||
|
|
||||||
if (sizeDB < sizeLimit):
|
while (sizeAfterRetPolicy >= sizeBeforeRetPolicy) and (retryCounter < retryLimit):
|
||||||
# we put a warning/debug that the size of the DB is less than the limit already
|
# execute the retention policy
|
||||||
echo ("WARNING: DB size is less than the limit. This test may not work as expected")
|
require (waitFor retentionPolicy.execute(driver)).isOk()
|
||||||
|
|
||||||
require (waitFor retentionPolicy.execute(driver)).isOk()
|
# get the updated DB size post vacuum
|
||||||
|
sizeAfterRetPolicy = int64((waitFor driver.getDatabaseSize()).tryGet())
|
||||||
# get the updated DB size post vacuum
|
retryCounter += 1
|
||||||
sizeDB = int64((waitFor driver.getDatabaseSize()).tryGet())
|
sleep(150)
|
||||||
|
|
||||||
check:
|
check:
|
||||||
# size of the database is used to check if the storage limit has been preserved
|
# check if the size of the database has been reduced after executing the retention policy
|
||||||
# check the current database size with the limitSize provided by the user
|
sizeAfterRetPolicy < sizeBeforeRetPolicy
|
||||||
# it should be lower
|
|
||||||
sizeDB <= sizeLimit
|
|
||||||
|
|
||||||
## Cleanup
|
## Cleanup
|
||||||
(waitFor driver.close()).expect("driver to close")
|
(waitFor driver.close()).expect("driver to close")
|
||||||
|
|
|
@ -85,7 +85,7 @@ proc new*(T: type QueueDriver, capacity: int = QueueDriverDefaultMaxCapacity): T
|
||||||
return QueueDriver(items: items, capacity: capacity)
|
return QueueDriver(items: items, capacity: capacity)
|
||||||
|
|
||||||
method getDbType*(driver: QueueDriver): string =
|
method getDbType*(driver: QueueDriver): string =
|
||||||
return "sqlite"
|
return "queue"
|
||||||
|
|
||||||
proc contains*(driver: QueueDriver, index: Index): bool =
|
proc contains*(driver: QueueDriver, index: Index): bool =
|
||||||
## Return `true` if the store queue already contains the `index`, `false` otherwise.
|
## Return `true` if the store queue already contains the `index`, `false` otherwise.
|
||||||
|
|
|
@ -71,4 +71,12 @@ method execute*(p: CapacityRetentionPolicy,
|
||||||
(await driver.deleteOldestMessagesNotWithinLimit(limit=p.capacity + p.deleteWindow)).isOkOr:
|
(await driver.deleteOldestMessagesNotWithinLimit(limit=p.capacity + p.deleteWindow)).isOkOr:
|
||||||
return err("deleting oldest messages failed: " & error)
|
return err("deleting oldest messages failed: " & error)
|
||||||
|
|
||||||
|
# perform vacuum
|
||||||
|
let resVaccum = await driver.performVacuum()
|
||||||
|
if resVaccum.isErr():
|
||||||
|
return err("vacuumming failed: " & resVaccum.error)
|
||||||
|
|
||||||
|
# sleep to give it some time to complete vacuuming
|
||||||
|
await sleepAsync(350)
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
|
@ -49,41 +49,35 @@ method execute*(p: SizeRetentionPolicy,
|
||||||
|
|
||||||
## when db size overshoots the database limit, shread 20% of outdated messages
|
## when db size overshoots the database limit, shread 20% of outdated messages
|
||||||
# get size of database
|
# get size of database
|
||||||
var dbSize = (await driver.getDatabaseSize()).valueOr:
|
let dbSize = (await driver.getDatabaseSize()).valueOr:
|
||||||
return err("failed to get database size: " & $error)
|
return err("failed to get database size: " & $error)
|
||||||
|
|
||||||
# database size in bytes
|
# database size in bytes
|
||||||
var totalSizeOfDB: int64 = int64(dbSize)
|
let totalSizeOfDB: int64 = int64(dbSize)
|
||||||
let retryLimit = 2
|
|
||||||
var retryCounter:int = 0
|
|
||||||
|
|
||||||
if totalSizeOfDB < p.sizeLimit:
|
if totalSizeOfDB < p.sizeLimit:
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
# NOTE: Using SQLite vacuuming is done manually, we delete a percentage of rows
|
|
||||||
# if vacumming is done automatically then we aim to check DB size periodially for efficient
|
|
||||||
# retention policy implementation.
|
|
||||||
while (totalSizeOfDB > p.sizeLimit) and (retryLimit > retryCounter):
|
|
||||||
# to shread/delete messsges, get the total row/message count
|
|
||||||
let numMessages = (await driver.getMessagesCount()).valueOr:
|
|
||||||
return err("failed to get messages count: " & error)
|
|
||||||
|
|
||||||
# 80% of the total messages are to be kept, delete others
|
# NOTE: Using SQLite vacuuming is done manually, we delete a percentage of rows
|
||||||
let pageDeleteWindow = int(float(numMessages) * DeleteLimit)
|
# if vacumming is done automatically then we aim to check DB size periodially for efficient
|
||||||
|
# retention policy implementation.
|
||||||
|
# to shread/delete messsges, get the total row/message count
|
||||||
|
let numMessages = (await driver.getMessagesCount()).valueOr:
|
||||||
|
return err("failed to get messages count: " & error)
|
||||||
|
|
||||||
(await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow)).isOkOr:
|
# 80% of the total messages are to be kept, delete others
|
||||||
return err("deleting oldest messages failed: " & error)
|
let pageDeleteWindow = int(float(numMessages) * DeleteLimit)
|
||||||
|
echo ("deleting oldest messages not within limit: " & $pageDeleteWindow)
|
||||||
|
|
||||||
# perform vacuum
|
(await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow)).isOkOr:
|
||||||
let resVaccum = await driver.performVacuum()
|
return err("deleting oldest messages failed: " & error)
|
||||||
if resVaccum.isErr():
|
|
||||||
return err("vacuumming failed: " & resVaccum.error)
|
# perform vacuum
|
||||||
# recompute the DB size to check if the size has actually reduced or not
|
let resVaccum = await driver.performVacuum()
|
||||||
|
if resVaccum.isErr():
|
||||||
dbSize = (await driver.getDatabaseSize()).valueOr:
|
return err("vacuumming failed: " & resVaccum.error)
|
||||||
return err("failed to get database size: " & $error)
|
|
||||||
totalSizeOfDB = int64(dbSize)
|
# sleep to give it some time to complete vacuuming
|
||||||
retryCounter += 1
|
await sleepAsync(350)
|
||||||
sleep(150*retryCounter)
|
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
|
@ -49,4 +49,12 @@ method execute*(p: TimeRetentionPolicy,
|
||||||
if res.isErr():
|
if res.isErr():
|
||||||
return err("failed to delete oldest messages: " & res.error)
|
return err("failed to delete oldest messages: " & res.error)
|
||||||
|
|
||||||
|
# perform vacuum
|
||||||
|
let resVaccum = await driver.performVacuum()
|
||||||
|
if resVaccum.isErr():
|
||||||
|
return err("vacuumming failed: " & resVaccum.error)
|
||||||
|
|
||||||
|
# sleep to give it some time to complete vacuuming
|
||||||
|
await sleepAsync(350)
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
Loading…
Reference in New Issue