chore: Simplification of store legacy code (#2931)

This commit is contained in:
Ivan FB 2024-07-30 14:05:23 +02:00 committed by GitHub
parent aed2a1130e
commit d4e8a0dab6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 109 additions and 1070 deletions

View File

@ -25,7 +25,6 @@ import
./waku_archive_legacy/test_driver_queue,
./waku_archive_legacy/test_driver_sqlite_query,
./waku_archive_legacy/test_driver_sqlite,
./waku_archive_legacy/test_retention_policy,
./waku_archive_legacy/test_waku_archive
const os* {.strdefine.} = ""

View File

@ -4,11 +4,13 @@ import std/[sequtils, options], testutils/unittests, chronos
import
waku/waku_archive_legacy,
waku/waku_archive_legacy/driver/postgres_driver,
waku/waku_archive/driver/postgres_driver as new_postgres_driver,
waku/waku_core,
waku/waku_core/message/digest,
../testlib/wakucore,
../testlib/testasync,
../testlib/postgres_legacy
../testlib/postgres_legacy,
../testlib/postgres as new_postgres
proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveCursor =
ArchiveCursor(
@ -21,22 +23,39 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC
suite "Postgres driver":
## Unique driver instance
var driver {.threadvar.}: PostgresDriver
var driver {.threadvar.}: postgres_driver.PostgresDriver
## We need to artificially create an instance of the "newDriver"
## because this is the only one in charge of creating partitions
## We will clean legacy store soon and this file will get removed.
var newDriver {.threadvar.}: new_postgres_driver.PostgresDriver
asyncSetup:
let driverRes = await newTestPostgresDriver()
let driverRes = await postgres_legacy.newTestPostgresDriver()
if driverRes.isErr():
assert false, driverRes.error
driver = PostgresDriver(driverRes.get())
driver = postgres_driver.PostgresDriver(driverRes.get())
let newDriverRes = await new_postgres.newTestPostgresDriver()
if driverRes.isErr():
assert false, driverRes.error
newDriver = new_postgres_driver.PostgresDriver(newDriverRes.get())
asyncTeardown:
let resetRes = await driver.reset()
var resetRes = await driver.reset()
if resetRes.isErr():
assert false, resetRes.error
(await driver.close()).expect("driver to close")
resetRes = await newDriver.reset()
if resetRes.isErr():
assert false, resetRes.error
(await newDriver.close()).expect("driver to close")
asyncTest "Asynchronous queries":
var futures = newSeq[Future[ArchiveDriverResult[void]]](0)

View File

@ -9,12 +9,15 @@ import
waku/waku_archive_legacy,
waku/waku_archive_legacy/driver as driver_module,
waku/waku_archive_legacy/driver/postgres_driver,
waku/waku_archive/driver/postgres_driver as new_postgres_driver,
waku/waku_core,
waku/waku_core/message/digest,
../testlib/common,
../testlib/wakucore,
../testlib/testasync,
../testlib/postgres_legacy
../testlib/postgres_legacy,
../testlib/postgres as new_postgres,
../testlib/testutils
logScope:
topics = "test archive postgres driver"
@ -36,22 +39,39 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC
suite "Postgres driver - queries":
## Unique driver instance
var driver {.threadvar.}: PostgresDriver
var driver {.threadvar.}: postgres_driver.PostgresDriver
## We need to artificially create an instance of the "newDriver"
## because this is the only one in charge of creating partitions
## We will clean legacy store soon and this file will get removed.
var newDriver {.threadvar.}: new_postgres_driver.PostgresDriver
asyncSetup:
let driverRes = await newTestPostgresDriver()
let driverRes = await postgres_legacy.newTestPostgresDriver()
if driverRes.isErr():
assert false, driverRes.error
driver = PostgresDriver(driverRes.get())
driver = postgres_driver.PostgresDriver(driverRes.get())
let newDriverRes = await new_postgres.newTestPostgresDriver()
if driverRes.isErr():
assert false, driverRes.error
newDriver = new_postgres_driver.PostgresDriver(newDriverRes.get())
asyncTeardown:
let resetRes = await driver.reset()
var resetRes = await driver.reset()
if resetRes.isErr():
assert false, resetRes.error
(await driver.close()).expect("driver to close")
resetRes = await newDriver.reset()
if resetRes.isErr():
assert false, resetRes.error
(await newDriver.close()).expect("driver to close")
asyncTest "no content topic":
## Given
const contentTopic = "test-content-topic"
@ -1790,7 +1810,8 @@ suite "Postgres driver - queries":
check:
filteredMessages.len == 0
asyncTest "Get oldest and newest message timestamp":
xasyncTest "Get oldest and newest message timestamp":
## This test no longer makes sense because that will always be controlled by the newDriver
const contentTopic = "test-content-topic"
let timeOrigin = now()
@ -1842,7 +1863,8 @@ suite "Postgres driver - queries":
assert res.isOk(), res.error
assert res.get() == newestTime, "Failed to retrieve the newest timestamp"
asyncTest "Delete messages older than certain timestamp":
xasyncTest "Delete messages older than certain timestamp":
## This test no longer makes sense because that will always be controlled by the newDriver
const contentTopic = "test-content-topic"
let timeOrigin = now()
@ -1884,7 +1906,8 @@ suite "Postgres driver - queries":
assert res.isOk(), res.error
assert res.get() == 3, "Failed to retrieve the # of messages after deletion"
asyncTest "Keep last n messages":
xasyncTest "Keep last n messages":
## This test no longer makes sense because that will always be controlled by the newDriver
const contentTopic = "test-content-topic"
let timeOrigin = now()

View File

@ -1,169 +0,0 @@
{.used.}
import std/[sequtils, times], stew/results, testutils/unittests, chronos
import
waku/common/databases/db_sqlite,
waku/waku_core,
waku/waku_core/message/digest,
waku/waku_archive_legacy,
waku/waku_archive_legacy/driver/sqlite_driver,
waku/waku_archive_legacy/retention_policy,
waku/waku_archive_legacy/retention_policy/retention_policy_capacity,
waku/waku_archive_legacy/retention_policy/retention_policy_size,
../waku_archive_legacy/archive_utils,
../testlib/common,
../testlib/wakucore
suite "Waku Archive - Retention policy":
test "capacity retention policy - windowed message deletion":
## Given
let
capacity = 100
excess = 60
let driver = newSqliteArchiveDriver()
let retentionPolicy: RetentionPolicy =
CapacityRetentionPolicy.new(capacity = capacity)
var putFutures = newSeq[Future[ArchiveDriverResult[void]]]()
## When
for i in 1 .. capacity + excess:
let msg = fakeWakuMessage(
payload = @[byte i], contentTopic = DefaultContentTopic, ts = Timestamp(i)
)
putFutures.add(
driver.put(
DefaultPubsubTopic,
msg,
computeDigest(msg),
computeMessageHash(DefaultPubsubTopic, msg),
msg.timestamp,
)
)
discard waitFor allFinished(putFutures)
require (waitFor retentionPolicy.execute(driver)).isOk()
## Then
let numMessages = (waitFor driver.getMessagesCount()).tryGet()
check:
# Expected number of messages is 120 because
# (capacity = 100) + (half of the overflow window = 15) + (5 messages added after after the last delete)
# the window size changes when changing `const maxStoreOverflow = 1.3 in sqlite_store
numMessages == 115
## Cleanup
(waitFor driver.close()).expect("driver to close")
test "size retention policy - windowed message deletion":
## Given
let
# in bytes
sizeLimit: int64 = 52428
excess = 325
let driver = newSqliteArchiveDriver()
let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.new(size = sizeLimit)
var putFutures = newSeq[Future[ArchiveDriverResult[void]]]()
# make sure that the db is empty to before test begins
let storedMsg = (waitFor driver.getAllMessages()).tryGet()
# if there are messages in db, empty them
if storedMsg.len > 0:
let now = getNanosecondTime(getTime().toUnixFloat())
require (waitFor driver.deleteMessagesOlderThanTimestamp(ts = now)).isOk()
require (waitFor driver.performVacuum()).isOk()
## When
##
# create a number of messages so that the size of the DB overshoots
for i in 1 .. excess:
let msg = fakeWakuMessage(
payload = @[byte i], contentTopic = DefaultContentTopic, ts = Timestamp(i)
)
putFutures.add(
driver.put(
DefaultPubsubTopic,
msg,
computeDigest(msg),
computeMessageHash(DefaultPubsubTopic, msg),
msg.timestamp,
)
)
# waitFor is used to synchronously wait for the futures to complete.
discard waitFor allFinished(putFutures)
## Then
# calculate the current database size
let sizeDB = int64((waitFor driver.getDatabaseSize()).tryGet())
# NOTE: since vacuumin is done manually, this needs to be revisited if vacuuming done automatically
# get the rows count pre-deletion
let rowsCountBeforeDeletion = (waitFor driver.getMessagesCount()).tryGet()
# execute policy provided the current db size oveflows, results in rows deletion
require (sizeDB >= sizeLimit)
require (waitFor retentionPolicy.execute(driver)).isOk()
# get the number or rows from database
let rowCountAfterDeletion = (waitFor driver.getMessagesCount()).tryGet()
check:
# size of the database is used to check if the storage limit has been preserved
# check the current database size with the limitSize provided by the user
# it should be lower
rowCountAfterDeletion <= rowsCountBeforeDeletion
## Cleanup
(waitFor driver.close()).expect("driver to close")
test "store capacity should be limited":
## Given
const capacity = 5
const contentTopic = "test-content-topic"
let
driver = newSqliteArchiveDriver()
retentionPolicy: RetentionPolicy =
CapacityRetentionPolicy.new(capacity = capacity)
let messages =
@[
fakeWakuMessage(contentTopic = DefaultContentTopic, ts = ts(0)),
fakeWakuMessage(contentTopic = DefaultContentTopic, ts = ts(1)),
fakeWakuMessage(contentTopic = contentTopic, ts = ts(2)),
fakeWakuMessage(contentTopic = contentTopic, ts = ts(3)),
fakeWakuMessage(contentTopic = contentTopic, ts = ts(4)),
fakeWakuMessage(contentTopic = contentTopic, ts = ts(5)),
fakeWakuMessage(contentTopic = contentTopic, ts = ts(6)),
]
## When
for msg in messages:
require (
waitFor driver.put(
DefaultPubsubTopic,
msg,
computeDigest(msg),
computeMessageHash(DefaultPubsubTopic, msg),
msg.timestamp,
)
).isOk()
require (waitFor retentionPolicy.execute(driver)).isOk()
## Then
let storedMsg = (waitFor driver.getAllMessages()).tryGet()
check:
storedMsg.len == capacity
storedMsg.all do(item: auto) -> bool:
let (pubsubTopic, msg, _, _, _) = item
msg.contentTopic == contentTopic and pubsubTopic == DefaultPubsubTopic
## Cleanup
(waitFor driver.close()).expect("driver to close")

View File

@ -21,8 +21,6 @@ import
../waku_archive/retention_policy/builder as policy_builder,
../waku_archive/driver as driver,
../waku_archive/driver/builder as driver_builder,
../waku_archive_legacy/retention_policy as legacy_policy,
../waku_archive_legacy/retention_policy/builder as legacy_policy_builder,
../waku_archive_legacy/driver as legacy_driver,
../waku_archive_legacy/driver/builder as legacy_driver_builder,
../waku_store,
@ -232,13 +230,7 @@ proc setupProtocols(
if archiveDriverRes.isErr():
return err("failed to setup legacy archive driver: " & archiveDriverRes.error)
let retPolicyRes =
legacy_policy.RetentionPolicy.new(conf.storeMessageRetentionPolicy)
if retPolicyRes.isErr():
return err("failed to create retention policy: " & retPolicyRes.error)
let mountArcRes =
node.mountLegacyArchive(archiveDriverRes.get(), retPolicyRes.get())
let mountArcRes = node.mountLegacyArchive(archiveDriverRes.get())
if mountArcRes.isErr():
return err("failed to mount waku legacy archive protocol: " & mountArcRes.error)

View File

@ -703,17 +703,11 @@ proc mountArchive*(
return ok()
proc mountLegacyArchive*(
node: WakuNode,
driver: waku_archive_legacy.ArchiveDriver,
retentionPolicy = none(waku_archive_legacy.RetentionPolicy),
node: WakuNode, driver: waku_archive_legacy.ArchiveDriver
): Result[void, string] =
node.wakuLegacyArchive = waku_archive_legacy.WakuArchive.new(
driver = driver, retentionPolicy = retentionPolicy
).valueOr:
node.wakuLegacyArchive = waku_archive_legacy.WakuArchive.new(driver = driver).valueOr:
return err("error in mountLegacyArchive: " & error)
node.wakuLegacyArchive.start()
return ok()
## Legacy Waku Store

View File

@ -1,7 +1,6 @@
import
./waku_archive_legacy/common,
./waku_archive_legacy/archive,
./waku_archive_legacy/driver,
./waku_archive_legacy/retention_policy
./waku_archive_legacy/driver
export common, archive, driver, retention_policy
export common, archive, driver

View File

@ -12,7 +12,6 @@ import
import
../common/paging,
./driver,
./retention_policy,
../waku_core,
../waku_core/message/digest,
./common,
@ -45,11 +44,6 @@ type WakuArchive* = ref object
validator: MessageValidator
retentionPolicy: Option[RetentionPolicy]
retentionPolicyHandle: Future[void]
metricsHandle: Future[void]
proc validate*(msg: WakuMessage): Result[void, string] =
if msg.ephemeral:
# Ephemeral message, do not store
@ -72,16 +66,12 @@ proc validate*(msg: WakuMessage): Result[void, string] =
return ok()
proc new*(
T: type WakuArchive,
driver: ArchiveDriver,
validator: MessageValidator = validate,
retentionPolicy = none(RetentionPolicy),
T: type WakuArchive, driver: ArchiveDriver, validator: MessageValidator = validate
): Result[T, string] =
if driver.isNil():
return err("archive driver is Nil")
let archive =
WakuArchive(driver: driver, validator: validator, retentionPolicy: retentionPolicy)
let archive = WakuArchive(driver: driver, validator: validator)
return ok(archive)
@ -280,44 +270,3 @@ proc findMessagesV2*(
reverse(messages)
return ok(ArchiveResponse(messages: messages, cursor: cursor))
proc periodicRetentionPolicy(self: WakuArchive) {.async.} =
debug "executing message retention policy"
let policy = self.retentionPolicy.get()
while true:
(await policy.execute(self.driver)).isOkOr:
waku_legacy_archive_errors.inc(labelValues = [retPolicyFailure])
error "failed execution of retention policy", error = error
await sleepAsync(WakuArchiveDefaultRetentionPolicyInterval)
proc periodicMetricReport(self: WakuArchive) {.async.} =
while true:
let countRes = (await self.driver.getMessagesCount())
if countRes.isErr():
error "loopReportStoredMessagesMetric failed to get messages count",
error = countRes.error
else:
let count = countRes.get()
waku_legacy_archive_messages.set(count, labelValues = ["stored"])
await sleepAsync(WakuArchiveDefaultMetricsReportInterval)
proc start*(self: WakuArchive) =
if self.retentionPolicy.isSome():
self.retentionPolicyHandle = self.periodicRetentionPolicy()
self.metricsHandle = self.periodicMetricReport()
proc stopWait*(self: WakuArchive) {.async.} =
var futures: seq[Future[void]]
if self.retentionPolicy.isSome() and not self.retentionPolicyHandle.isNil():
futures.add(self.retentionPolicyHandle.cancelAndWait())
if not self.metricsHandle.isNil:
futures.add(self.metricsHandle.cancelAndWait())
await noCancel(allFutures(futures))

View File

@ -17,7 +17,6 @@ export sqlite_driver, queue_driver
when defined(postgres):
import ## These imports add dependency with an external libpq library
./postgres_driver/migrations as archive_postgres_driver_migrations,
./postgres_driver
export postgres_driver
@ -94,26 +93,6 @@ proc new*(
return err("failed to init postgres archive driver: " & res.error)
let driver = res.get()
# Database migration
if migrate:
let migrateRes = await archive_postgres_driver_migrations.migrate(driver)
if migrateRes.isErr():
return err("ArchiveDriver build failed in migration: " & $migrateRes.error)
## This should be started once we make sure the 'messages' table exists
## Hence, this should be run after the migration is completed.
asyncSpawn driver.startPartitionFactory(onFatalErrorAction)
info "waiting for a partition to be created"
for i in 0 ..< 100:
if driver.containsAnyPartition():
break
await sleepAsync(chronos.milliseconds(100))
if not driver.containsAnyPartition():
onFatalErrorAction("a partition could not be created")
return ok(driver)
else:
return err(

View File

@ -3,9 +3,6 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}
import
./postgres_driver/postgres_driver,
./postgres_driver/partitions_manager,
./postgres_driver/postgres_healthcheck
import ./postgres_driver/postgres_driver
export postgres_driver, partitions_manager, postgres_healthcheck
export postgres_driver

View File

@ -1,89 +0,0 @@
{.push raises: [].}
import std/strutils, results, chronicles, chronos
import
../../../common/databases/common,
../../../../migrations/message_store_postgres/pg_migration_manager,
../postgres_driver
logScope:
topics = "waku archive migration"
const SchemaVersion* = 6 # increase this when there is an update in the database schema
proc breakIntoStatements*(script: string): seq[string] =
## Given a full migration script, that can potentially contain a list
## of SQL statements, this proc splits it into the contained isolated statements
## that should be executed one after the other.
var statements = newSeq[string]()
let lines = script.split('\n')
var simpleStmt: string
var plSqlStatement: string
var insidePlSqlScript = false
for line in lines:
if line.strip().len == 0:
continue
if insidePlSqlScript:
if line.contains("END $$"):
## End of the Pl/SQL script
plSqlStatement &= line
statements.add(plSqlStatement)
plSqlStatement = ""
insidePlSqlScript = false
continue
else:
plSqlStatement &= line & "\n"
if line.contains("DO $$"):
## Beginning of the Pl/SQL script
insidePlSqlScript = true
plSqlStatement &= line & "\n"
if not insidePlSqlScript:
if line.contains(';'):
## End of simple statement
simpleStmt &= line
statements.add(simpleStmt)
simpleStmt = ""
else:
simpleStmt &= line & "\n"
return statements
proc migrate*(
driver: PostgresDriver, targetVersion = SchemaVersion
): Future[DatabaseResult[void]] {.async.} =
debug "starting message store's postgres database migration"
let currentVersion = (await driver.getCurrentVersion()).valueOr:
return err("migrate error could not retrieve current version: " & $error)
if currentVersion == targetVersion:
debug "database schema is up to date",
currentVersion = currentVersion, targetVersion = targetVersion
return ok()
info "database schema is outdated",
currentVersion = currentVersion, targetVersion = targetVersion
# Load migration scripts
let scripts = pg_migration_manager.getMigrationScripts(currentVersion, targetVersion)
# Run the migration scripts
for script in scripts:
for statement in script.breakIntoStatements():
debug "executing migration statement", statement = statement
(await driver.performWriteQuery(statement)).isOkOr:
error "failed to execute migration statement",
statement = statement, error = error
return err("failed to execute migration statement")
debug "migration statement executed succesfully", statement = statement
debug "finished message store's postgres database migration"
return ok()

View File

@ -1,102 +0,0 @@
## This module is aimed to handle the creation and truncation of partition tables
## in order to limit the space occupied in disk by the database.
##
## The created partitions are referenced by the 'storedAt' field.
##
import std/deques
import chronos, chronicles
logScope:
topics = "waku archive partitions_manager"
## The time range has seconds resolution
type TimeRange* = tuple[beginning: int64, `end`: int64]
type
Partition = object
name: string
timeRange: TimeRange
PartitionManager* = ref object
partitions: Deque[Partition]
# FIFO of partition table names. The first is the oldest partition
proc new*(T: type PartitionManager): T =
return PartitionManager()
proc getPartitionFromDateTime*(
self: PartitionManager, targetMoment: int64
): Result[Partition, string] =
## Returns the partition name that might store a message containing the passed timestamp.
## In order words, it simply returns the partition name which contains the given timestamp.
## targetMoment - represents the time of interest, measured in seconds since epoch.
if self.partitions.len == 0:
return err("There are no partitions")
for partition in self.partitions:
let timeRange = partition.timeRange
let beginning = timeRange.beginning
let `end` = timeRange.`end`
if beginning <= targetMoment and targetMoment < `end`:
return ok(partition)
return err("Couldn't find a partition table for given time: " & $targetMoment)
proc getNewestPartition*(self: PartitionManager): Result[Partition, string] =
if self.partitions.len == 0:
return err("there are no partitions allocated")
let newestPartition = self.partitions.peekLast
return ok(newestPartition)
proc getOldestPartition*(self: PartitionManager): Result[Partition, string] =
if self.partitions.len == 0:
return err("there are no partitions allocated")
let oldestPartition = self.partitions.peekFirst
return ok(oldestPartition)
proc addPartitionInfo*(
self: PartitionManager, partitionName: string, beginning: int64, `end`: int64
) =
## The given partition range has seconds resolution.
## We just store information of the new added partition merely to keep track of it.
let partitionInfo = Partition(name: partitionName, timeRange: (beginning, `end`))
trace "Adding partition info"
self.partitions.addLast(partitionInfo)
proc removeOldestPartitionName*(self: PartitionManager) =
## Simply removed the partition from the tracked/known partitions queue.
## Just remove it and ignore it.
discard self.partitions.popFirst()
proc isEmpty*(self: PartitionManager): bool =
return self.partitions.len == 0
proc getLastMoment*(partition: Partition): int64 =
## Considering the time range covered by the partition, this
## returns the `end` time (number of seconds since epoch) of such range.
let lastTimeInSec = partition.timeRange.`end`
return lastTimeInSec
proc getPartitionStartTimeInNanosec*(partition: Partition): int64 =
return partition.timeRange.beginning * 1_000_000_000
proc containsMoment*(partition: Partition, time: int64): bool =
## Returns true if the given moment is contained within the partition window,
## 'false' otherwise.
## time - number of seconds since epoch
if partition.timeRange.beginning <= time and time < partition.timeRange.`end`:
return true
return false
proc getName*(partition: Partition): string =
return partition.name
func `==`*(a, b: Partition): bool {.inline.} =
return a.name == b.name

View File

@ -15,19 +15,13 @@ import
../../../waku_core,
../../common,
../../driver,
../../../common/databases/db_postgres as waku_postgres,
./postgres_healthcheck,
./partitions_manager
../../../common/databases/db_postgres as waku_postgres
type PostgresDriver* = ref object of ArchiveDriver
## Establish a separate pools for read/write operations
writeConnPool: PgAsyncPool
readConnPool: PgAsyncPool
## Partition container
partitionMngr: PartitionManager
futLoopPartitionFactory: Future[void]
const InsertRowStmtName = "InsertRow"
const InsertRowStmtDefinition = # TODO: get the sql queries from a file
"""INSERT INTO messages (id, messageHash, contentTopic, payload, pubsubTopic,
@ -134,17 +128,7 @@ proc new*(
let writeConnPool = PgAsyncPool.new(dbUrl, maxNumConnOnEachPool).valueOr:
return err("error creating write conn pool PgAsyncPool")
if not isNil(onFatalErrorAction):
asyncSpawn checkConnectivity(readConnPool, onFatalErrorAction)
if not isNil(onFatalErrorAction):
asyncSpawn checkConnectivity(writeConnPool, onFatalErrorAction)
let driver = PostgresDriver(
writeConnPool: writeConnPool,
readConnPool: readConnPool,
partitionMngr: PartitionManager.new(),
)
let driver = PostgresDriver(writeConnPool: writeConnPool, readConnPool: readConnPool)
return ok(driver)
proc reset*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} =
@ -267,38 +251,6 @@ method getAllMessages*(
return ok(rows)
proc getPartitionsList(
s: PostgresDriver
): Future[ArchiveDriverResult[seq[string]]] {.async.} =
## Retrieves the seq of partition table names.
## e.g: @["messages_1708534333_1708534393", "messages_1708534273_1708534333"]
var partitions: seq[string]
proc rowCallback(pqResult: ptr PGresult) =
for iRow in 0 ..< pqResult.pqNtuples():
let partitionName = $(pqgetvalue(pqResult, iRow, 0))
partitions.add(partitionName)
(
await s.readConnPool.pgQuery(
"""
SELECT child.relname AS partition_name
FROM pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace
JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace
WHERE parent.relname='messages'
ORDER BY partition_name ASC
""",
newSeq[string](0),
rowCallback,
)
).isOkOr:
return err("getPartitionsList failed in query: " & $error)
return ok(partitions)
proc getMessagesArbitraryQuery(
s: PostgresDriver,
contentTopic: seq[ContentTopic] = @[],
@ -764,20 +716,7 @@ method getMessagesCount*(
method getOldestMessageTimestamp*(
s: PostgresDriver
): Future[ArchiveDriverResult[Timestamp]] {.async.} =
## In some cases it could happen that we have
## empty partitions which are older than the current stored rows.
## In those cases we want to consider those older partitions as the oldest considered timestamp.
let oldestPartition = s.partitionMngr.getOldestPartition().valueOr:
return err("could not get oldest partition: " & $error)
let oldestPartitionTimeNanoSec = oldestPartition.getPartitionStartTimeInNanosec()
let intRes = await s.getInt("SELECT MIN(timestamp) FROM messages")
if intRes.isErr():
## Just return the oldest partition time considering the partitions set
return ok(Timestamp(oldestPartitionTimeNanoSec))
return ok(Timestamp(min(intRes.get(), oldestPartitionTimeNanoSec)))
return err("not implemented because legacy will get deprecated")
method getNewestMessageTimestamp*(
s: PostgresDriver
@ -791,22 +730,20 @@ method getNewestMessageTimestamp*(
method deleteOldestMessagesNotWithinLimit*(
s: PostgresDriver, limit: int
): Future[ArchiveDriverResult[void]] {.async.} =
let execRes = await s.writeConnPool.pgQuery(
"""DELETE FROM messages WHERE id NOT IN
(
SELECT id FROM messages ORDER BY timestamp DESC LIMIT ?
);""",
@[$limit],
)
if execRes.isErr():
return err("error in deleteOldestMessagesNotWithinLimit: " & execRes.error)
## Will be completely removed when deprecating store legacy
# let execRes = await s.writeConnPool.pgQuery(
# """DELETE FROM messages WHERE id NOT IN
# (
# SELECT id FROM messages ORDER BY timestamp DESC LIMIT ?
# );""",
# @[$limit],
# )
# if execRes.isErr():
# return err("error in deleteOldestMessagesNotWithinLimit: " & execRes.error)
return ok()
method close*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} =
## Cancel the partition factory loop
s.futLoopPartitionFactory.cancel()
## Close the database connection
let writeCloseRes = await s.writeConnPool.close()
let readCloseRes = await s.readConnPool.close()
@ -850,250 +787,41 @@ proc performWriteQuery*(
return ok()
proc addPartition(
self: PostgresDriver, startTime: Timestamp, duration: timer.Duration
): Future[ArchiveDriverResult[void]] {.async.} =
## Creates a partition table that will store the messages that fall in the range
## `startTime` <= timestamp < `startTime + duration`.
## `startTime` is measured in seconds since epoch
let beginning = startTime
let `end` = (startTime + duration.seconds)
let fromInSec: string = $beginning
let untilInSec: string = $`end`
let fromInNanoSec: string = fromInSec & "000000000"
let untilInNanoSec: string = untilInSec & "000000000"
let partitionName = "messages_" & fromInSec & "_" & untilInSec
let createPartitionQuery =
"CREATE TABLE IF NOT EXISTS " & partitionName & " PARTITION OF " &
"messages FOR VALUES FROM ('" & fromInNanoSec & "') TO ('" & untilInNanoSec & "');"
(await self.performWriteQuery(createPartitionQuery)).isOkOr:
return err(fmt"error adding partition [{partitionName}]: " & $error)
debug "new partition added", query = createPartitionQuery
self.partitionMngr.addPartitionInfo(partitionName, beginning, `end`)
return ok()
proc initializePartitionsInfo(
self: PostgresDriver
): Future[ArchiveDriverResult[void]] {.async.} =
let partitionNamesRes = await self.getPartitionsList()
if not partitionNamesRes.isOk():
return err("Could not retrieve partitions list: " & $partitionNamesRes.error)
else:
let partitionNames = partitionNamesRes.get()
for partitionName in partitionNames:
## partitionName contains something like 'messages_1708449815_1708449875'
let bothTimes = partitionName.replace("messages_", "")
let times = bothTimes.split("_")
if times.len != 2:
return err(fmt"loopPartitionFactory wrong partition name {partitionName}")
var beginning: int64
try:
beginning = parseInt(times[0])
except ValueError:
return err("Could not parse beginning time: " & getCurrentExceptionMsg())
var `end`: int64
try:
`end` = parseInt(times[1])
except ValueError:
return err("Could not parse end time: " & getCurrentExceptionMsg())
self.partitionMngr.addPartitionInfo(partitionName, beginning, `end`)
return ok()
const DefaultDatabasePartitionCheckTimeInterval = timer.minutes(10)
const PartitionsRangeInterval = timer.hours(1) ## Time range covered by each parition
proc loopPartitionFactory(
self: PostgresDriver, onFatalError: OnFatalErrorHandler
) {.async.} =
## Loop proc that continuously checks whether we need to create a new partition.
## Notice that the deletion of partitions is handled by the retention policy modules.
debug "starting loopPartitionFactory"
if PartitionsRangeInterval < DefaultDatabasePartitionCheckTimeInterval:
onFatalError(
"partition factory partition range interval should be bigger than check interval"
)
## First of all, let's make the 'partition_manager' aware of the current partitions
(await self.initializePartitionsInfo()).isOkOr:
onFatalError("issue in loopPartitionFactory: " & $error)
while true:
trace "Check if we need to create a new partition"
let now = times.now().toTime().toUnix()
if self.partitionMngr.isEmpty():
debug "adding partition because now there aren't more partitions"
(await self.addPartition(now, PartitionsRangeInterval)).isOkOr:
onFatalError("error when creating a new partition from empty state: " & $error)
else:
let newestPartitionRes = self.partitionMngr.getNewestPartition()
if newestPartitionRes.isErr():
onFatalError("could not get newest partition: " & $newestPartitionRes.error)
let newestPartition = newestPartitionRes.get()
if newestPartition.containsMoment(now):
debug "creating a new partition for the future"
## The current used partition is the last one that was created.
## Thus, let's create another partition for the future.
(
await self.addPartition(
newestPartition.getLastMoment(), PartitionsRangeInterval
)
).isOkOr:
onFatalError("could not add the next partition for 'now': " & $error)
elif now >= newestPartition.getLastMoment():
debug "creating a new partition to contain current messages"
## There is no partition to contain the current time.
## This happens if the node has been stopped for quite a long time.
## Then, let's create the needed partition to contain 'now'.
(await self.addPartition(now, PartitionsRangeInterval)).isOkOr:
onFatalError("could not add the next partition: " & $error)
await sleepAsync(DefaultDatabasePartitionCheckTimeInterval)
proc startPartitionFactory*(
self: PostgresDriver, onFatalError: OnFatalErrorHandler
) {.async.} =
self.futLoopPartitionFactory = self.loopPartitionFactory(onFatalError)
proc getTableSize*(
self: PostgresDriver, tableName: string
): Future[ArchiveDriverResult[string]] {.async.} =
## Returns a human-readable representation of the size for the requested table.
## tableName - table of interest.
let tableSize = (
await self.getStr(
fmt"""
SELECT pg_size_pretty(pg_total_relation_size(C.oid)) AS "total_size"
FROM pg_class C
where relname = '{tableName}'"""
)
).valueOr:
return err("error in getDatabaseSize: " & error)
return ok(tableSize)
proc removePartition(
self: PostgresDriver, partitionName: string
): Future[ArchiveDriverResult[void]] {.async.} =
var partSize = ""
let partSizeRes = await self.getTableSize(partitionName)
if partSizeRes.isOk():
partSize = partSizeRes.get()
## Detach and remove the partition concurrently to not block the parent table (messages)
let detachPartitionQuery =
"ALTER TABLE messages DETACH PARTITION " & partitionName & " CONCURRENTLY;"
debug "removeOldestPartition", query = detachPartitionQuery
(await self.performWriteQuery(detachPartitionQuery)).isOkOr:
return err(fmt"error in {detachPartitionQuery}: " & $error)
## Drop the partition
let dropPartitionQuery = "DROP TABLE " & partitionName
debug "removeOldestPartition drop partition", query = dropPartitionQuery
(await self.performWriteQuery(dropPartitionQuery)).isOkOr:
return err(fmt"error in {dropPartitionQuery}: " & $error)
debug "removed partition", partition_name = partitionName, partition_size = partSize
self.partitionMngr.removeOldestPartitionName()
return ok()
proc removePartitionsOlderThan(
self: PostgresDriver, tsInNanoSec: Timestamp
): Future[ArchiveDriverResult[void]] {.async.} =
## Removes old partitions that don't contain the specified timestamp
let tsInSec = Timestamp(float(tsInNanoSec) / 1_000_000_000)
var oldestPartition = self.partitionMngr.getOldestPartition().valueOr:
return err("could not get oldest partition in removePartitionOlderThan: " & $error)
while not oldestPartition.containsMoment(tsInSec):
(await self.removePartition(oldestPartition.getName())).isOkOr:
return err("issue in removePartitionsOlderThan: " & $error)
oldestPartition = self.partitionMngr.getOldestPartition().valueOr:
return err(
"could not get partition in removePartitionOlderThan in while loop: " & $error
)
## We reached the partition that contains the target timestamp plus don't want to remove it
return ok()
proc removeOldestPartition(
self: PostgresDriver, forceRemoval: bool = false, ## To allow cleanup in tests
): Future[ArchiveDriverResult[void]] {.async.} =
## Indirectly called from the retention policy
let oldestPartition = self.partitionMngr.getOldestPartition().valueOr:
return err("could not remove oldest partition: " & $error)
if not forceRemoval:
let now = times.now().toTime().toUnix()
let currentPartitionRes = self.partitionMngr.getPartitionFromDateTime(now)
if currentPartitionRes.isOk():
## The database contains a partition that would store current messages.
if currentPartitionRes.get() == oldestPartition:
debug "Skipping to remove the current partition"
return ok()
return await self.removePartition(oldestPartition.getName())
proc containsAnyPartition*(self: PostgresDriver): bool =
return not self.partitionMngr.isEmpty()
method decreaseDatabaseSize*(
driver: PostgresDriver, targetSizeInBytes: int64, forceRemoval: bool = false
): Future[ArchiveDriverResult[void]] {.async.} =
var dbSize = (await driver.getDatabaseSize()).valueOr:
return err("decreaseDatabaseSize failed to get database size: " & $error)
## This is completely disabled and only the non-legacy driver
## will take care of that
# var dbSize = (await driver.getDatabaseSize()).valueOr:
# return err("decreaseDatabaseSize failed to get database size: " & $error)
## database size in bytes
var totalSizeOfDB: int64 = int64(dbSize)
# ## database size in bytes
# var totalSizeOfDB: int64 = int64(dbSize)
if totalSizeOfDB <= targetSizeInBytes:
return ok()
# if totalSizeOfDB <= targetSizeInBytes:
# return ok()
debug "start reducing database size",
targetSize = $targetSizeInBytes, currentSize = $totalSizeOfDB
# debug "start reducing database size",
# targetSize = $targetSizeInBytes, currentSize = $totalSizeOfDB
while totalSizeOfDB > targetSizeInBytes and driver.containsAnyPartition():
(await driver.removeOldestPartition(forceRemoval)).isOkOr:
return err(
"decreaseDatabaseSize inside loop failed to remove oldest partition: " & $error
)
# while totalSizeOfDB > targetSizeInBytes and driver.containsAnyPartition():
# (await driver.removeOldestPartition(forceRemoval)).isOkOr:
# return err(
# "decreaseDatabaseSize inside loop failed to remove oldest partition: " & $error
# )
dbSize = (await driver.getDatabaseSize()).valueOr:
return
err("decreaseDatabaseSize inside loop failed to get database size: " & $error)
# dbSize = (await driver.getDatabaseSize()).valueOr:
# return
# err("decreaseDatabaseSize inside loop failed to get database size: " & $error)
let newCurrentSize = int64(dbSize)
if newCurrentSize == totalSizeOfDB:
return err("the previous partition removal didn't clear database size")
# let newCurrentSize = int64(dbSize)
# if newCurrentSize == totalSizeOfDB:
# return err("the previous partition removal didn't clear database size")
totalSizeOfDB = newCurrentSize
# totalSizeOfDB = newCurrentSize
debug "reducing database size",
targetSize = $targetSizeInBytes, newCurrentSize = $totalSizeOfDB
# debug "reducing database size",
# targetSize = $targetSizeInBytes, newCurrentSize = $totalSizeOfDB
return ok()
@ -1146,14 +874,14 @@ method deleteMessagesOlderThanTimestamp*(
): Future[ArchiveDriverResult[void]] {.async.} =
## First of all, let's remove the older partitions so that we can reduce
## the database size.
(await s.removePartitionsOlderThan(tsNanoSec)).isOkOr:
return err("error while removing older partitions: " & $error)
# (await s.removePartitionsOlderThan(tsNanoSec)).isOkOr:
# return err("error while removing older partitions: " & $error)
(
await s.writeConnPool.pgQuery(
"DELETE FROM messages WHERE timestamp < " & $tsNanoSec
)
).isOkOr:
return err("error in deleteMessagesOlderThanTimestamp: " & $error)
# (
# await s.writeConnPool.pgQuery(
# "DELETE FROM messages WHERE timestamp < " & $tsNanoSec
# )
# ).isOkOr:
# return err("error in deleteMessagesOlderThanTimestamp: " & $error)
return ok()

View File

@ -1,41 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import chronos, results
import ../../../common/databases/db_postgres, ../../../common/error_handling
## Simple query to validate that the postgres is working and attending requests
const HealthCheckQuery = "SELECT version();"
const CheckConnectivityInterval = 60.seconds
const MaxNumTrials = 20
const TrialInterval = 1.seconds
proc checkConnectivity*(
connPool: PgAsyncPool, onFatalErrorAction: OnFatalErrorHandler
) {.async.} =
while true:
(await connPool.pgQuery(HealthCheckQuery)).isOkOr:
## The connection failed once. Let's try reconnecting for a while.
## Notice that the 'exec' proc tries to establish a new connection.
block errorBlock:
## Force close all the opened connections. No need to close gracefully.
(await connPool.resetConnPool()).isOkOr:
onFatalErrorAction("checkConnectivity resetConnPool error: " & error)
var numTrial = 0
while numTrial < MaxNumTrials:
let res = await connPool.pgQuery(HealthCheckQuery)
if res.isOk():
## Connection resumed. Let's go back to the normal healthcheck.
break errorBlock
await sleepAsync(TrialInterval)
numTrial.inc()
## The connection couldn't be resumed. Let's inform the upper layers.
onFatalErrorAction("postgres health check error: " & error)
await sleepAsync(CheckConnectivityInterval)

View File

@ -1,16 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import results, chronos
import ./driver
type RetentionPolicyResult*[T] = Result[T, string]
type RetentionPolicy* = ref object of RootObj
method execute*(
p: RetentionPolicy, store: ArchiveDriver
): Future[RetentionPolicyResult[void]] {.base, async.} =
discard

View File

@ -1,88 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import std/[strutils, options], regex, results
import
../retention_policy,
./retention_policy_time,
./retention_policy_capacity,
./retention_policy_size
proc new*(
T: type RetentionPolicy, retPolicy: string
): RetentionPolicyResult[Option[RetentionPolicy]] =
let retPolicy = retPolicy.toLower
# Validate the retention policy format
if retPolicy == "" or retPolicy == "none":
return ok(none(RetentionPolicy))
const StoreMessageRetentionPolicyRegex = re2"^\w+:\d*\.?\d+((g|m)b)?$"
if not retPolicy.match(StoreMessageRetentionPolicyRegex):
return err("invalid 'store message retention policy' format: " & retPolicy)
# Apply the retention policy, if any
let rententionPolicyParts = retPolicy.split(":", 1)
let
policy = rententionPolicyParts[0]
policyArgs = rententionPolicyParts[1]
if policy == "time":
var retentionTimeSeconds: int64
try:
retentionTimeSeconds = parseInt(policyArgs)
except ValueError:
return err("invalid time retention policy argument")
let retPolicy: RetentionPolicy = TimeRetentionPolicy.new(retentionTimeSeconds)
return ok(some(retPolicy))
elif policy == "capacity":
var retentionCapacity: int
try:
retentionCapacity = parseInt(policyArgs)
except ValueError:
return err("invalid capacity retention policy argument")
let retPolicy: RetentionPolicy = CapacityRetentionPolicy.new(retentionCapacity)
return ok(some(retPolicy))
elif policy == "size":
var retentionSize: string
retentionSize = policyArgs
# captures the size unit such as GB or MB
let sizeUnit = retentionSize.substr(retentionSize.len - 2)
# captures the string type number data of the size provided
let sizeQuantityStr = retentionSize.substr(0, retentionSize.len - 3)
# to hold the numeric value data of size
var inptSizeQuantity: float
var sizeQuantity: int64
var sizeMultiplier: float
try:
inptSizeQuantity = parseFloat(sizeQuantityStr)
except ValueError:
return err("invalid size retention policy argument: " & getCurrentExceptionMsg())
case sizeUnit
of "gb":
sizeMultiplier = 1024.0 * 1024.0 * 1024.0
of "mb":
sizeMultiplier = 1024.0 * 1024.0
else:
return err (
"""invalid size retention value unit: expected "Mb" or "Gb" but got """ &
sizeUnit
)
# quantity is converted into bytes for uniform processing
sizeQuantity = int64(inptSizeQuantity * sizeMultiplier)
if sizeQuantity <= 0:
return err("invalid size retention policy argument: a non-zero value is required")
let retPolicy: RetentionPolicy = SizeRetentionPolicy.new(sizeQuantity)
return ok(some(retPolicy))
else:
return err("unknown retention policy")

View File

@ -1,68 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import results, chronicles, chronos
import ../driver, ../retention_policy
logScope:
topics = "waku archive retention_policy"
const DefaultCapacity*: int = 25_000
const MaxOverflow = 1.3
type
# CapacityRetentionPolicy implements auto deletion as follows:
# - The sqlite DB will driver up to `totalCapacity = capacity` * `MaxOverflow` messages,
# giving an overflowWindow of `capacity * (MaxOverflow - 1) = overflowWindow`.
#
# - In case of an overflow, messages are sorted by `receiverTimestamp` and the oldest ones are
# deleted. The number of messages that get deleted is `(overflowWindow / 2) = deleteWindow`,
# bringing the total number of driverd messages back to `capacity + (overflowWindow / 2)`.
#
# The rationale for batch deleting is efficiency. We keep half of the overflow window in addition
# to `capacity` because we delete the oldest messages with respect to `receiverTimestamp` instead of
# `senderTimestamp`. `ReceiverTimestamp` is guaranteed to be set, while senders could omit setting
# `senderTimestamp`. However, `receiverTimestamp` can differ from node to node for the same message.
# So sorting by `receiverTimestamp` might (slightly) prioritize some actually older messages and we
# compensate that by keeping half of the overflow window.
CapacityRetentionPolicy* = ref object of RetentionPolicy
capacity: int
# represents both the number of messages that are persisted in the sqlite DB (excl. the overflow window explained above), and the number of messages that get loaded via `getAll`.
totalCapacity: int # = capacity * MaxOverflow
deleteWindow: int
# = capacity * (MaxOverflow - 1) / 2; half of the overflow window, the amount of messages deleted when overflow occurs
proc calculateTotalCapacity(capacity: int, overflow: float): int =
int(float(capacity) * overflow)
proc calculateOverflowWindow(capacity: int, overflow: float): int =
int(float(capacity) * (overflow - 1))
proc calculateDeleteWindow(capacity: int, overflow: float): int =
calculateOverflowWindow(capacity, overflow) div 2
proc new*(T: type CapacityRetentionPolicy, capacity = DefaultCapacity): T =
let
totalCapacity = calculateTotalCapacity(capacity, MaxOverflow)
deleteWindow = calculateDeleteWindow(capacity, MaxOverflow)
CapacityRetentionPolicy(
capacity: capacity, totalCapacity: totalCapacity, deleteWindow: deleteWindow
)
method execute*(
p: CapacityRetentionPolicy, driver: ArchiveDriver
): Future[RetentionPolicyResult[void]] {.async.} =
let numMessages = (await driver.getMessagesCount()).valueOr:
return err("failed to get messages count: " & error)
if numMessages < p.totalCapacity:
return ok()
(await driver.deleteOldestMessagesNotWithinLimit(limit = p.capacity + p.deleteWindow)).isOkOr:
return err("deleting oldest messages failed: " & error)
return ok()

View File

@ -1,27 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import results, chronicles, chronos
import ../driver, ../retention_policy
logScope:
topics = "waku archive retention_policy"
# default size is 30 GiB or 32212254720.0 in bytes
const DefaultRetentionSize*: int64 = 32212254720
type SizeRetentionPolicy* = ref object of RetentionPolicy
sizeLimit: int64
proc new*(T: type SizeRetentionPolicy, size = DefaultRetentionSize): T =
SizeRetentionPolicy(sizeLimit: size)
method execute*(
p: SizeRetentionPolicy, driver: ArchiveDriver
): Future[RetentionPolicyResult[void]] {.async.} =
(await driver.decreaseDatabaseSize(p.sizeLimit)).isOkOr:
return err("decreaseDatabaseSize failed: " & $error)
return ok()

View File

@ -1,40 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import std/times, results, chronicles, chronos
import ../../waku_core, ../driver, ../retention_policy
logScope:
topics = "waku archive retention_policy"
const DefaultRetentionTime*: int64 = 30.days.seconds
type TimeRetentionPolicy* = ref object of RetentionPolicy
retentionTime: chronos.Duration
proc new*(T: type TimeRetentionPolicy, retentionTime = DefaultRetentionTime): T =
TimeRetentionPolicy(retentionTime: retentionTime.seconds)
method execute*(
p: TimeRetentionPolicy, driver: ArchiveDriver
): Future[RetentionPolicyResult[void]] {.async.} =
## Delete messages that exceed the retention time by 10% and more (batch delete for efficiency)
let omtRes = await driver.getOldestMessageTimestamp()
if omtRes.isErr():
return err("failed to get oldest message timestamp: " & omtRes.error)
let now = getNanosecondTime(getTime().toUnixFloat())
let retentionTimestamp = now - p.retentionTime.nanoseconds
let thresholdTimestamp = retentionTimestamp - p.retentionTime.nanoseconds div 10
if thresholdTimestamp <= omtRes.value:
return ok()
let res = await driver.deleteMessagesOlderThanTimestamp(ts = retentionTimestamp)
if res.isErr():
return err("failed to delete oldest messages: " & res.error)
return ok()