mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-27 23:27:27 +00:00
f54ba10bc7
* queue driver refactor (#2753) * chore(archive): archive refactor (#2752) * chore(archive): sqlite driver refactor (#2754) * chore(archive): postgres driver refactor (#2755) * chore(archive): renaming & copies (#2751) * posgres legacy: stop using the storedAt field * migration script 6: we still need the id column The id column is needed because it contains the message digest which is used in store v2, and we need to keep support to store v2 for a while * legacy archive: set target migration version to 6 * waku_node: try to use wakuLegacyArchive if wakuArchive is nil * node_factory, waku_node: mount legacy and future store simultaneously We want the nwaku node to simultaneously support store-v2 requests and store-v3 requests. Only the legacy archive is in charge of archiving messages, and the archived information is suitable to fulfill both store-v2 and store-v3 needs. * postgres_driver: adding temporary code until store-v2 is removed --------- Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored-by: gabrielmer <101006718+gabrielmer@users.noreply.github.com> Co-authored-by: Ivan Folgueira Bande <ivansete@status.im>
156 lines
5.1 KiB
Nim
156 lines
5.1 KiB
Nim
{.used.}
|
|
|
|
import std/[sequtils, times], stew/results, testutils/unittests, chronos
|
|
import
|
|
waku/[
|
|
common/databases/db_sqlite,
|
|
waku_core,
|
|
waku_core/message/digest,
|
|
waku_archive,
|
|
waku_archive/driver/sqlite_driver,
|
|
waku_archive/retention_policy,
|
|
waku_archive/retention_policy/retention_policy_capacity,
|
|
waku_archive/retention_policy/retention_policy_size,
|
|
],
|
|
../waku_archive/archive_utils,
|
|
../testlib/wakucore
|
|
|
|
suite "Waku Archive - Retention policy":
|
|
test "capacity retention policy - windowed message deletion":
|
|
## Given
|
|
let
|
|
capacity = 100
|
|
excess = 60
|
|
|
|
let driver = newSqliteArchiveDriver()
|
|
|
|
let retentionPolicy: RetentionPolicy =
|
|
CapacityRetentionPolicy.new(capacity = capacity)
|
|
var putFutures = newSeq[Future[ArchiveDriverResult[void]]]()
|
|
|
|
## When
|
|
for i in 1 .. capacity + excess:
|
|
let msg = fakeWakuMessage(
|
|
payload = @[byte i], contentTopic = DefaultContentTopic, ts = Timestamp(i)
|
|
)
|
|
putFutures.add(
|
|
driver.put(computeMessageHash(DefaultPubsubTopic, msg), DefaultPubsubTopic, msg)
|
|
)
|
|
|
|
discard waitFor allFinished(putFutures)
|
|
|
|
let res = waitFor retentionPolicy.execute(driver)
|
|
assert res.isOk(), $res.error
|
|
|
|
## Then
|
|
let numMessages = (waitFor driver.getMessagesCount()).tryGet()
|
|
check:
|
|
# Expected number of messages is 120 because
|
|
# (capacity = 100) + (half of the overflow window = 15) + (5 messages added after after the last delete)
|
|
# the window size changes when changing `const maxStoreOverflow = 1.3 in sqlite_store
|
|
numMessages == 115
|
|
|
|
## Cleanup
|
|
(waitFor driver.close()).expect("driver to close")
|
|
|
|
test "size retention policy - windowed message deletion":
|
|
## Given
|
|
let
|
|
# in bytes
|
|
sizeLimit: int64 = 52428
|
|
excess = 325
|
|
|
|
let driver = newSqliteArchiveDriver()
|
|
|
|
let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.new(size = sizeLimit)
|
|
var putFutures = newSeq[Future[ArchiveDriverResult[void]]]()
|
|
|
|
# make sure that the db is empty to before test begins
|
|
let storedMsg = (waitFor driver.getAllMessages()).tryGet()
|
|
# if there are messages in db, empty them
|
|
if storedMsg.len > 0:
|
|
let now = getNanosecondTime(getTime().toUnixFloat())
|
|
require (waitFor driver.deleteMessagesOlderThanTimestamp(ts = now)).isOk()
|
|
require (waitFor driver.performVacuum()).isOk()
|
|
|
|
## When
|
|
##
|
|
|
|
# create a number of messages so that the size of the DB overshoots
|
|
for i in 1 .. excess:
|
|
let msg = fakeWakuMessage(
|
|
payload = @[byte i], contentTopic = DefaultContentTopic, ts = Timestamp(i)
|
|
)
|
|
putFutures.add(
|
|
driver.put(computeMessageHash(DefaultPubsubTopic, msg), DefaultPubsubTopic, msg)
|
|
)
|
|
|
|
# waitFor is used to synchronously wait for the futures to complete.
|
|
discard waitFor allFinished(putFutures)
|
|
|
|
## Then
|
|
# calculate the current database size
|
|
let sizeDB = int64((waitFor driver.getDatabaseSize()).tryGet())
|
|
|
|
# NOTE: since vacuumin is done manually, this needs to be revisited if vacuuming done automatically
|
|
|
|
# get the rows count pre-deletion
|
|
let rowsCountBeforeDeletion = (waitFor driver.getMessagesCount()).tryGet()
|
|
|
|
# execute policy provided the current db size oveflows, results in rows deletion
|
|
require (sizeDB >= sizeLimit)
|
|
require (waitFor retentionPolicy.execute(driver)).isOk()
|
|
|
|
# get the number or rows from database
|
|
let rowCountAfterDeletion = (waitFor driver.getMessagesCount()).tryGet()
|
|
|
|
check:
|
|
# size of the database is used to check if the storage limit has been preserved
|
|
# check the current database size with the limitSize provided by the user
|
|
# it should be lower
|
|
rowCountAfterDeletion <= rowsCountBeforeDeletion
|
|
|
|
## Cleanup
|
|
(waitFor driver.close()).expect("driver to close")
|
|
|
|
test "store capacity should be limited":
|
|
## Given
|
|
const capacity = 5
|
|
const contentTopic = "test-content-topic"
|
|
|
|
let
|
|
driver = newSqliteArchiveDriver()
|
|
retentionPolicy: RetentionPolicy =
|
|
CapacityRetentionPolicy.new(capacity = capacity)
|
|
|
|
let messages =
|
|
@[
|
|
fakeWakuMessage(contentTopic = DefaultContentTopic, ts = ts(0)),
|
|
fakeWakuMessage(contentTopic = DefaultContentTopic, ts = ts(1)),
|
|
fakeWakuMessage(contentTopic = contentTopic, ts = ts(2)),
|
|
fakeWakuMessage(contentTopic = contentTopic, ts = ts(3)),
|
|
fakeWakuMessage(contentTopic = contentTopic, ts = ts(4)),
|
|
fakeWakuMessage(contentTopic = contentTopic, ts = ts(5)),
|
|
fakeWakuMessage(contentTopic = contentTopic, ts = ts(6)),
|
|
]
|
|
|
|
## When
|
|
for msg in messages:
|
|
require (
|
|
waitFor driver.put(
|
|
computeMessageHash(DefaultPubsubTopic, msg), DefaultPubsubTopic, msg
|
|
)
|
|
).isOk()
|
|
require (waitFor retentionPolicy.execute(driver)).isOk()
|
|
|
|
## Then
|
|
let storedMsg = (waitFor driver.getAllMessages()).tryGet()
|
|
check:
|
|
storedMsg.len == capacity
|
|
storedMsg.all do(item: auto) -> bool:
|
|
let (_, pubsubTopic, msg) = item
|
|
msg.contentTopic == contentTopic and pubsubTopic == DefaultPubsubTopic
|
|
|
|
## Cleanup
|
|
(waitFor driver.close()).expect("driver to close")
|