2022-11-22 18:40:24 +00:00
|
|
|
{.used.}
|
|
|
|
|
|
|
|
import
|
2023-10-10 09:59:09 +00:00
|
|
|
std/[sequtils,times],
|
2022-11-22 18:40:24 +00:00
|
|
|
stew/results,
|
|
|
|
testutils/unittests,
|
|
|
|
chronos
|
|
|
|
import
|
2023-06-22 09:27:40 +00:00
|
|
|
../../../waku/common/databases/db_sqlite,
|
2023-08-09 17:11:50 +00:00
|
|
|
../../../waku/waku_core,
|
2023-11-22 16:32:56 +00:00
|
|
|
../../../waku/waku_core/message/digest,
|
2023-08-09 17:11:50 +00:00
|
|
|
../../../waku/waku_archive,
|
|
|
|
../../../waku/waku_archive/driver/sqlite_driver,
|
|
|
|
../../../waku/waku_archive/retention_policy,
|
|
|
|
../../../waku/waku_archive/retention_policy/retention_policy_capacity,
|
2023-10-10 09:59:09 +00:00
|
|
|
../../../waku/waku_archive/retention_policy/retention_policy_size,
|
2023-02-13 10:43:49 +00:00
|
|
|
../testlib/common,
|
2023-04-05 14:01:51 +00:00
|
|
|
../testlib/wakucore
|
2022-11-22 18:40:24 +00:00
|
|
|
|
|
|
|
|
|
|
|
proc newTestDatabase(): SqliteDatabase =
|
|
|
|
SqliteDatabase.new(":memory:").tryGet()
|
|
|
|
|
|
|
|
proc newTestArchiveDriver(): ArchiveDriver =
|
|
|
|
let db = newTestDatabase()
|
|
|
|
SqliteDriver.new(db).tryGet()
|
|
|
|
|
|
|
|
|
|
|
|
suite "Waku Archive - Retention policy":
|
|
|
|
|
|
|
|
test "capacity retention policy - windowed message deletion":
|
|
|
|
## Given
|
|
|
|
let
|
|
|
|
capacity = 100
|
2023-10-10 09:59:09 +00:00
|
|
|
excess = 60
|
2022-11-22 18:40:24 +00:00
|
|
|
|
|
|
|
let driver = newTestArchiveDriver()
|
|
|
|
|
|
|
|
let retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity)
|
2023-10-10 09:59:09 +00:00
|
|
|
var putFutures = newSeq[Future[ArchiveDriverResult[void]]]()
|
2022-11-22 18:40:24 +00:00
|
|
|
|
|
|
|
## When
|
|
|
|
for i in 1..capacity+excess:
|
|
|
|
let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i))
|
2023-11-22 16:32:56 +00:00
|
|
|
putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp))
|
2023-10-10 09:59:09 +00:00
|
|
|
|
|
|
|
discard waitFor allFinished(putFutures)
|
2022-11-22 18:40:24 +00:00
|
|
|
|
2023-10-10 09:59:09 +00:00
|
|
|
require (waitFor retentionPolicy.execute(driver)).isOk()
|
2022-11-22 18:40:24 +00:00
|
|
|
|
|
|
|
## Then
|
2023-05-25 15:34:34 +00:00
|
|
|
let numMessages = (waitFor driver.getMessagesCount()).tryGet()
|
2022-11-22 18:40:24 +00:00
|
|
|
check:
|
|
|
|
# Expected number of messages is 120 because
|
|
|
|
# (capacity = 100) + (half of the overflow window = 15) + (5 messages added after after the last delete)
|
|
|
|
# the window size changes when changing `const maxStoreOverflow = 1.3 in sqlite_store
|
2023-10-10 09:59:09 +00:00
|
|
|
numMessages == 115
|
|
|
|
|
|
|
|
## Cleanup
|
|
|
|
(waitFor driver.close()).expect("driver to close")
|
|
|
|
|
|
|
|
test "size retention policy - windowed message deletion":
|
|
|
|
## Given
|
|
|
|
let
|
|
|
|
# in megabytes
|
|
|
|
sizeLimit:float = 0.05
|
|
|
|
excess = 325
|
|
|
|
|
|
|
|
let driver = newTestArchiveDriver()
|
|
|
|
|
|
|
|
let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.init(size=sizeLimit)
|
|
|
|
var putFutures = newSeq[Future[ArchiveDriverResult[void]]]()
|
|
|
|
|
|
|
|
# make sure that the db is empty to before test begins
|
|
|
|
let storedMsg = (waitFor driver.getAllMessages()).tryGet()
|
|
|
|
# if there are messages in db, empty them
|
|
|
|
if storedMsg.len > 0:
|
|
|
|
let now = getNanosecondTime(getTime().toUnixFloat())
|
|
|
|
require (waitFor driver.deleteMessagesOlderThanTimestamp(ts=now)).isOk()
|
|
|
|
require (waitFor driver.performVacuum()).isOk()
|
|
|
|
|
|
|
|
## When
|
2023-11-22 16:32:56 +00:00
|
|
|
##
|
2023-10-10 09:59:09 +00:00
|
|
|
|
|
|
|
# create a number of messages so that the size of the DB overshoots
|
|
|
|
for i in 1..excess:
|
|
|
|
let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i))
|
2023-11-22 16:32:56 +00:00
|
|
|
putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp))
|
2023-10-10 09:59:09 +00:00
|
|
|
|
|
|
|
# waitFor is used to synchronously wait for the futures to complete.
|
|
|
|
discard waitFor allFinished(putFutures)
|
|
|
|
|
|
|
|
## Then
|
|
|
|
# calculate the current database size
|
2023-11-21 10:27:50 +00:00
|
|
|
let pageSize = (waitFor driver.getPagesSize()).tryGet()
|
|
|
|
let pageCount = (waitFor driver.getPagesCount()).tryGet()
|
|
|
|
let sizeDB = float(pageCount * pageSize) / (1024.0 * 1024.0)
|
|
|
|
|
|
|
|
# NOTE: since vacuumin is done manually, this needs to be revisited if vacuuming done automatically
|
|
|
|
|
|
|
|
# get the rows count pre-deletion
|
|
|
|
let rowsCountBeforeDeletion = (waitFor driver.getMessagesCount()).tryGet()
|
2023-10-10 09:59:09 +00:00
|
|
|
|
2023-11-21 10:27:50 +00:00
|
|
|
# execute policy provided the current db size oveflows, results in rows deletion
|
2023-10-10 09:59:09 +00:00
|
|
|
require (sizeDB >= sizeLimit)
|
|
|
|
require (waitFor retentionPolicy.execute(driver)).isOk()
|
|
|
|
|
2023-11-21 10:27:50 +00:00
|
|
|
# get the number or rows from DB
|
|
|
|
let rowCountAfterDeletion = (waitFor driver.getMessagesCount()).tryGet()
|
2023-10-10 09:59:09 +00:00
|
|
|
|
|
|
|
check:
|
|
|
|
# size of the database is used to check if the storage limit has been preserved
|
|
|
|
# check the current database size with the limitSize provided by the user
|
|
|
|
# it should be lower
|
2023-11-21 10:27:50 +00:00
|
|
|
rowCountAfterDeletion <= rowsCountBeforeDeletion
|
2022-11-22 18:40:24 +00:00
|
|
|
|
|
|
|
## Cleanup
|
2023-05-25 15:34:34 +00:00
|
|
|
(waitFor driver.close()).expect("driver to close")
|
2022-11-22 18:40:24 +00:00
|
|
|
|
|
|
|
test "store capacity should be limited":
|
|
|
|
## Given
|
|
|
|
const capacity = 5
|
|
|
|
const contentTopic = "test-content-topic"
|
|
|
|
|
|
|
|
let
|
|
|
|
driver = newTestArchiveDriver()
|
|
|
|
retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity)
|
|
|
|
|
|
|
|
let messages = @[
|
|
|
|
fakeWakuMessage(contentTopic=DefaultContentTopic, ts=ts(0)),
|
|
|
|
fakeWakuMessage(contentTopic=DefaultContentTopic, ts=ts(1)),
|
|
|
|
|
|
|
|
fakeWakuMessage(contentTopic=contentTopic, ts=ts(2)),
|
|
|
|
fakeWakuMessage(contentTopic=contentTopic, ts=ts(3)),
|
|
|
|
fakeWakuMessage(contentTopic=contentTopic, ts=ts(4)),
|
|
|
|
fakeWakuMessage(contentTopic=contentTopic, ts=ts(5)),
|
|
|
|
fakeWakuMessage(contentTopic=contentTopic, ts=ts(6))
|
|
|
|
]
|
|
|
|
|
|
|
|
## When
|
|
|
|
for msg in messages:
|
2023-11-22 16:32:56 +00:00
|
|
|
require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
2023-05-25 15:34:34 +00:00
|
|
|
require (waitFor retentionPolicy.execute(driver)).isOk()
|
2022-11-22 18:40:24 +00:00
|
|
|
|
|
|
|
## Then
|
2023-05-25 15:34:34 +00:00
|
|
|
let storedMsg = (waitFor driver.getAllMessages()).tryGet()
|
2022-11-22 18:40:24 +00:00
|
|
|
check:
|
|
|
|
storedMsg.len == capacity
|
|
|
|
storedMsg.all do (item: auto) -> bool:
|
2023-10-24 14:05:39 +00:00
|
|
|
let (pubsubTopic, msg, digest, storeTimestamp) = item
|
2022-11-22 18:40:24 +00:00
|
|
|
msg.contentTopic == contentTopic and
|
|
|
|
pubsubTopic == DefaultPubsubTopic
|
|
|
|
|
|
|
|
## Cleanup
|
2023-05-25 15:34:34 +00:00
|
|
|
(waitFor driver.close()).expect("driver to close")
|
2023-10-10 09:59:09 +00:00
|
|
|
|