mirror of https://github.com/waku-org/nwaku.git
feat: Postgres partition implementation (#2506)
* postgres: first step to implement partition management * postgres_driver: use of times.now().toTime().toUnix() instead of Moment.now() * postgres migrations: set new version to 2 * test_driver_postgres: use of assert instead of require and avoid using times.now() * postgres_driver: better implementation of the reset method with partitions * Remove createMessageTable, init, and deleteMessageTable procs * postgres: ensure we use the version 15.4 in tests * postgres_driver.nim: enhance debug logs partition addition * ci.yml: ensure logs are printed without colors * postgres_driver: starting the loop factory in an asynchronous task * postgres_driver: log partition name and size when removing a partition
This commit is contained in:
parent
beba14dcaa
commit
161a10ecb0
|
@ -13,7 +13,7 @@ concurrency:
|
|||
env:
|
||||
NPROC: 2
|
||||
MAKEFLAGS: "-j${NPROC}"
|
||||
NIMFLAGS: "--parallelBuild:${NPROC}"
|
||||
NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none"
|
||||
|
||||
jobs:
|
||||
changes: # changes detection
|
||||
|
@ -115,7 +115,7 @@ jobs:
|
|||
fi
|
||||
|
||||
if [ ${{ runner.os }} == "Linux" ]; then
|
||||
sudo docker run --rm -d -e POSTGRES_PASSWORD=test123 -p 5432:5432 postgres:9.6-alpine
|
||||
sudo docker run --rm -d -e POSTGRES_PASSWORD=test123 -p 5432:5432 postgres:15.4-alpine3.18
|
||||
fi
|
||||
|
||||
make V=1 LOG_LEVEL=DEBUG QUICK_AND_DIRTY_COMPILER=1 POSTGRES=1 test testwakunode2
|
||||
|
|
|
@ -2,7 +2,7 @@ version: "3.8"
|
|||
|
||||
services:
|
||||
db:
|
||||
image: postgres:9.6-alpine
|
||||
image: postgres:15.4-alpine3.18
|
||||
restart: always
|
||||
environment:
|
||||
POSTGRES_PASSWORD: test123
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
{.used.}
|
||||
|
||||
import
|
||||
std/[sequtils,times,options],
|
||||
std/[sequtils,options],
|
||||
testutils/unittests,
|
||||
chronos
|
||||
import
|
||||
|
@ -13,8 +13,6 @@ import
|
|||
../testlib/testasync,
|
||||
../testlib/postgres
|
||||
|
||||
proc now():int64 = getTime().toUnix()
|
||||
|
||||
proc computeTestCursor(pubsubTopic: PubsubTopic,
|
||||
message: WakuMessage):
|
||||
ArchiveCursor =
|
||||
|
@ -56,7 +54,7 @@ suite "Postgres driver":
|
|||
# Actually, the diff randomly goes between 1 and 2 seconds.
|
||||
# although in theory it should spend 1s because we establish 100
|
||||
# connections and we spawn 100 tasks that spend ~1s each.
|
||||
require diff < 20
|
||||
assert diff < 20_000_000_000
|
||||
|
||||
asyncTest "Insert a message":
|
||||
const contentTopic = "test-content-topic"
|
||||
|
@ -69,14 +67,14 @@ suite "Postgres driver":
|
|||
assert putRes.isOk(), putRes.error
|
||||
|
||||
let storedMsg = (await driver.getAllMessages()).tryGet()
|
||||
require:
|
||||
storedMsg.len == 1
|
||||
storedMsg.all do (item: auto) -> bool:
|
||||
let (pubsubTopic, actualMsg, digest, storeTimestamp) = item
|
||||
actualMsg.contentTopic == contentTopic and
|
||||
pubsubTopic == DefaultPubsubTopic and
|
||||
toHex(computedDigest.data) == toHex(digest) and
|
||||
toHex(actualMsg.payload) == toHex(msg.payload)
|
||||
|
||||
assert storedMsg.len == 1
|
||||
|
||||
let (pubsubTopic, actualMsg, digest, storeTimestamp) = storedMsg[0]
|
||||
assert actualMsg.contentTopic == contentTopic
|
||||
assert pubsubTopic == DefaultPubsubTopic
|
||||
assert toHex(computedDigest.data) == toHex(digest)
|
||||
assert toHex(actualMsg.payload) == toHex(msg.payload)
|
||||
|
||||
asyncTest "Insert and query message":
|
||||
const contentTopic1 = "test-content-topic-1"
|
||||
|
@ -96,21 +94,21 @@ suite "Postgres driver":
|
|||
|
||||
let countMessagesRes = await driver.getMessagesCount()
|
||||
|
||||
require countMessagesRes.isOk() and countMessagesRes.get() == 2
|
||||
assert countMessagesRes.isOk(), $countMessagesRes.error
|
||||
assert countMessagesRes.get() == 2
|
||||
|
||||
var messagesRes = await driver.getMessages(contentTopic = @[contentTopic1])
|
||||
|
||||
require messagesRes.isOk()
|
||||
require messagesRes.get().len == 1
|
||||
assert messagesRes.isOk(), $messagesRes.error
|
||||
assert messagesRes.get().len == 1
|
||||
|
||||
# Get both content topics, check ordering
|
||||
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1,
|
||||
contentTopic2])
|
||||
assert messagesRes.isOk(), messagesRes.error
|
||||
|
||||
require:
|
||||
messagesRes.get().len == 2 and
|
||||
messagesRes.get()[0][1].contentTopic == contentTopic1
|
||||
assert messagesRes.get().len == 2
|
||||
assert messagesRes.get()[0][1].contentTopic == contentTopic1
|
||||
|
||||
# Descending order
|
||||
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1,
|
||||
|
@ -118,9 +116,8 @@ suite "Postgres driver":
|
|||
ascendingOrder = false)
|
||||
assert messagesRes.isOk(), messagesRes.error
|
||||
|
||||
require:
|
||||
messagesRes.get().len == 2 and
|
||||
messagesRes.get()[0][1].contentTopic == contentTopic2
|
||||
assert messagesRes.get().len == 2
|
||||
assert messagesRes.get()[0][1].contentTopic == contentTopic2
|
||||
|
||||
# cursor
|
||||
# Get both content topics
|
||||
|
@ -130,8 +127,8 @@ suite "Postgres driver":
|
|||
cursor = some(
|
||||
computeTestCursor(pubsubTopic1,
|
||||
messagesRes.get()[1][1])))
|
||||
require messagesRes.isOk()
|
||||
require messagesRes.get().len == 1
|
||||
assert messagesRes.isOk()
|
||||
assert messagesRes.get().len == 1
|
||||
|
||||
# Get both content topics but one pubsub topic
|
||||
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1,
|
||||
|
@ -139,16 +136,15 @@ suite "Postgres driver":
|
|||
pubsubTopic = some(pubsubTopic1))
|
||||
assert messagesRes.isOk(), messagesRes.error
|
||||
|
||||
require:
|
||||
messagesRes.get().len == 1 and
|
||||
messagesRes.get()[0][1].contentTopic == contentTopic1
|
||||
assert messagesRes.get().len == 1
|
||||
assert messagesRes.get()[0][1].contentTopic == contentTopic1
|
||||
|
||||
# Limit
|
||||
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1,
|
||||
contentTopic2],
|
||||
maxPageSize = 1)
|
||||
assert messagesRes.isOk(), messagesRes.error
|
||||
require messagesRes.get().len == 1
|
||||
assert messagesRes.get().len == 1
|
||||
|
||||
asyncTest "Insert true duplicated messages":
|
||||
# Validates that two completely equal messages can not be stored.
|
||||
|
@ -164,5 +160,5 @@ suite "Postgres driver":
|
|||
|
||||
putRes = await driver.put(DefaultPubsubTopic,
|
||||
msg2, computeDigest(msg2), computeMessageHash(DefaultPubsubTopic, msg2), msg2.timestamp)
|
||||
require not putRes.isOk()
|
||||
assert not putRes.isOk()
|
||||
|
||||
|
|
|
@ -73,7 +73,8 @@ method deleteOldestMessagesNotWithinLimit*(driver: ArchiveDriver,
|
|||
Future[ArchiveDriverResult[void]] {.base, async.} = discard
|
||||
|
||||
method decreaseDatabaseSize*(driver: ArchiveDriver,
|
||||
targetSizeInBytes: int64):
|
||||
targetSizeInBytes: int64,
|
||||
forceRemoval: bool = false):
|
||||
Future[ArchiveDriverResult[void]] {.base, async.} = discard
|
||||
|
||||
method close*(driver: ArchiveDriver):
|
||||
|
|
|
@ -108,6 +108,19 @@ proc new*(T: type ArchiveDriver,
|
|||
if migrateRes.isErr():
|
||||
return err("ArchiveDriver build failed in migration: " & $migrateRes.error)
|
||||
|
||||
## This should be started once we make sure the 'messages' table exists
|
||||
## Hence, this should be run after the migration is completed.
|
||||
asyncSpawn driver.startPartitionFactory(onFatalErrorAction)
|
||||
|
||||
info "waiting for a partition to be created"
|
||||
for i in 0..<100:
|
||||
if driver.containsAnyPartition():
|
||||
break
|
||||
await sleepAsync(chronos.milliseconds(100))
|
||||
|
||||
if not driver.containsAnyPartition():
|
||||
onFatalErrorAction("a partition could not be created")
|
||||
|
||||
return ok(driver)
|
||||
|
||||
else:
|
||||
|
|
|
@ -3,6 +3,13 @@ when (NimMajor, NimMinor) < (1, 4):
|
|||
else:
|
||||
{.push raises: [].}
|
||||
|
||||
import ./postgres_driver/postgres_driver
|
||||
import
|
||||
./postgres_driver/postgres_driver,
|
||||
./postgres_driver/partitions_manager,
|
||||
./postgres_driver/postgres_healthcheck
|
||||
|
||||
export
|
||||
postgres_driver,
|
||||
partitions_manager,
|
||||
postgres_healthcheck
|
||||
|
||||
export postgres_driver
|
||||
|
|
|
@ -13,7 +13,7 @@ import
|
|||
logScope:
|
||||
topics = "waku archive migration"
|
||||
|
||||
const SchemaVersion* = 1 # increase this when there is an update in the database schema
|
||||
const SchemaVersion* = 2 # increase this when there is an update in the database schema
|
||||
|
||||
proc breakIntoStatements*(script: string): seq[string] =
|
||||
## Given a full migration script, that can potentially contain a list
|
||||
|
|
|
@ -0,0 +1,105 @@
|
|||
|
||||
## This module is aimed to handle the creation and truncation of partition tables
|
||||
## in order to limit the space occupied in disk by the database.
|
||||
##
|
||||
## The created partitions are referenced by the 'storedAt' field.
|
||||
##
|
||||
|
||||
import
|
||||
std/deques
|
||||
import
|
||||
chronos,
|
||||
chronicles
|
||||
|
||||
logScope:
|
||||
topics = "waku archive partitions_manager"
|
||||
|
||||
## The time range has seconds resolution
|
||||
type TimeRange* = tuple[beginning: int64, `end`: int64]
|
||||
|
||||
type
|
||||
Partition = object
|
||||
name: string
|
||||
timeRange: TimeRange
|
||||
|
||||
PartitionManager* = ref object
|
||||
partitions: Deque[Partition] # FIFO of partition table names. The first is the oldest partition
|
||||
|
||||
proc new*(T: type PartitionManager): T =
|
||||
return PartitionManager()
|
||||
|
||||
proc getPartitionFromDateTime*(self: PartitionManager,
|
||||
targetMoment: int64):
|
||||
Result[Partition, string] =
|
||||
## Returns the partition name that might store a message containing the passed timestamp.
|
||||
## In order words, it simply returns the partition name which contains the given timestamp.
|
||||
## targetMoment - represents the time of interest, measured in seconds since epoch.
|
||||
|
||||
if self.partitions.len == 0:
|
||||
return err("There are no partitions")
|
||||
|
||||
for partition in self.partitions:
|
||||
let timeRange = partition.timeRange
|
||||
|
||||
let beginning = timeRange.beginning
|
||||
let `end` = timeRange.`end`
|
||||
|
||||
if beginning <= targetMoment and targetMoment < `end`:
|
||||
return ok(partition)
|
||||
|
||||
return err("Couldn't find a partition table for given time: " & $targetMoment)
|
||||
|
||||
proc getNewestPartition*(self: PartitionManager): Result[Partition, string] =
|
||||
if self.partitions.len == 0:
|
||||
return err("there are no partitions allocated")
|
||||
|
||||
let newestPartition = self.partitions.peekLast
|
||||
return ok(newestPartition)
|
||||
|
||||
proc getOldestPartition*(self: PartitionManager): Result[Partition, string] =
|
||||
if self.partitions.len == 0:
|
||||
return err("there are no partitions allocated")
|
||||
|
||||
let oldestPartition = self.partitions.peekFirst
|
||||
return ok(oldestPartition)
|
||||
|
||||
proc addPartitionInfo*(self: PartitionManager,
|
||||
partitionName: string,
|
||||
beginning: int64,
|
||||
`end`: int64) =
|
||||
## The given partition range has seconds resolution.
|
||||
## We just store information of the new added partition merely to keep track of it.
|
||||
let partitionInfo = Partition(name: partitionName, timeRange: (beginning, `end`))
|
||||
trace "Adding partition info"
|
||||
self.partitions.addLast(partitionInfo)
|
||||
|
||||
proc removeOldestPartitionName*(self: PartitionManager) =
|
||||
## Simply removed the partition from the tracked/known partitions queue.
|
||||
## Just remove it and ignore it.
|
||||
discard self.partitions.popFirst()
|
||||
|
||||
proc isEmpty*(self: PartitionManager): bool =
|
||||
return self.partitions.len == 0
|
||||
|
||||
proc getLastMoment*(partition: Partition): int64 =
|
||||
## Considering the time range covered by the partition, this
|
||||
## returns the `end` time (number of seconds since epoch) of such range.
|
||||
let lastTimeInSec = partition.timeRange.`end`
|
||||
return lastTimeInSec
|
||||
|
||||
proc containsMoment*(partition: Partition, time: int64): bool =
|
||||
## Returns true if the given moment is contained within the partition window,
|
||||
## 'false' otherwise.
|
||||
## time - number of seconds since epoch
|
||||
if partition.timeRange.beginning <= time and
|
||||
time < partition.timeRange.`end`:
|
||||
return true
|
||||
|
||||
return false
|
||||
|
||||
proc getName*(partition: Partition): string =
|
||||
return partition.name
|
||||
|
||||
func `==`*(a, b: Partition): bool {.inline.} =
|
||||
return a.name == b.name
|
||||
|
|
@ -4,7 +4,7 @@ else:
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[nre,options,sequtils,strutils,times,strformat],
|
||||
std/[nre,options,sequtils,strutils,strformat,times],
|
||||
stew/[results,byteutils],
|
||||
db_postgres,
|
||||
postgres,
|
||||
|
@ -16,33 +16,17 @@ import
|
|||
../../common,
|
||||
../../driver,
|
||||
../../../common/databases/db_postgres as waku_postgres,
|
||||
./postgres_healthcheck
|
||||
|
||||
export postgres_driver
|
||||
./postgres_healthcheck,
|
||||
./partitions_manager
|
||||
|
||||
type PostgresDriver* = ref object of ArchiveDriver
|
||||
## Establish a separate pools for read/write operations
|
||||
writeConnPool: PgAsyncPool
|
||||
readConnPool: PgAsyncPool
|
||||
|
||||
proc dropTableQuery(): string =
|
||||
"DROP TABLE messages"
|
||||
|
||||
proc dropVersionTableQuery(): string =
|
||||
"DROP TABLE version"
|
||||
|
||||
proc createTableQuery(): string =
|
||||
"CREATE TABLE IF NOT EXISTS messages (" &
|
||||
" pubsubTopic VARCHAR NOT NULL," &
|
||||
" contentTopic VARCHAR NOT NULL," &
|
||||
" payload VARCHAR," &
|
||||
" version INTEGER NOT NULL," &
|
||||
" timestamp BIGINT NOT NULL," &
|
||||
" id VARCHAR NOT NULL," &
|
||||
" messageHash VARCHAR NOT NULL," &
|
||||
" storedAt BIGINT NOT NULL," &
|
||||
" CONSTRAINT messageIndex PRIMARY KEY (messageHash)" &
|
||||
");"
|
||||
## Partition container
|
||||
partitionMngr: PartitionManager
|
||||
futLoopPartitionFactory: Future[void]
|
||||
|
||||
const InsertRowStmtName = "InsertRow"
|
||||
const InsertRowStmtDefinition =
|
||||
|
@ -111,52 +95,17 @@ proc new*(T: type PostgresDriver,
|
|||
if not isNil(onFatalErrorAction):
|
||||
asyncSpawn checkConnectivity(writeConnPool, onFatalErrorAction)
|
||||
|
||||
return ok(PostgresDriver(writeConnPool: writeConnPool,
|
||||
readConnPool: readConnPool))
|
||||
|
||||
proc performWriteQuery*(s: PostgresDriver,
|
||||
query: string): Future[ArchiveDriverResult[void]] {.async.} =
|
||||
## Executes a query that changes the database state
|
||||
## TODO: we can reduce the code a little with this proc
|
||||
(await s.writeConnPool.pgQuery(query)).isOkOr:
|
||||
return err(fmt"error in {query}: {error}")
|
||||
|
||||
return ok()
|
||||
|
||||
proc createMessageTable*(s: PostgresDriver):
|
||||
Future[ArchiveDriverResult[void]] {.async.} =
|
||||
|
||||
let execRes = await s.writeConnPool.pgQuery(createTableQuery())
|
||||
if execRes.isErr():
|
||||
return err("error in createMessageTable: " & execRes.error)
|
||||
|
||||
return ok()
|
||||
|
||||
proc deleteMessageTable(s: PostgresDriver):
|
||||
Future[ArchiveDriverResult[void]] {.async.} =
|
||||
|
||||
let execRes = await s.writeConnPool.pgQuery(dropTableQuery())
|
||||
if execRes.isErr():
|
||||
return err("error in deleteMessageTable: " & execRes.error)
|
||||
|
||||
return ok()
|
||||
|
||||
proc deleteVersionTable(s: PostgresDriver):
|
||||
Future[ArchiveDriverResult[void]] {.async.} =
|
||||
|
||||
let execRes = await s.writeConnPool.pgQuery(dropVersionTableQuery())
|
||||
if execRes.isErr():
|
||||
return err("error in deleteVersionTable: " & execRes.error)
|
||||
|
||||
return ok()
|
||||
let driver = PostgresDriver(writeConnPool: writeConnPool,
|
||||
readConnPool: readConnPool,
|
||||
partitionMngr: PartitionManager.new())
|
||||
return ok(driver)
|
||||
|
||||
proc reset*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} =
|
||||
## This is only used for testing purposes, to set a fresh database at the beginning of each test
|
||||
(await s.deleteMessageTable()).isOkOr:
|
||||
return err("error deleting message table: " & $error)
|
||||
(await s.deleteVersionTable()).isOkOr:
|
||||
return err("error deleting version table: " & $error)
|
||||
return ok()
|
||||
## Clear the database partitions
|
||||
let targetSize = 0
|
||||
let forceRemoval = true
|
||||
let ret = await s.decreaseDatabaseSize(targetSize, forceRemoval)
|
||||
return ret
|
||||
|
||||
proc rowCallbackImpl(pqResult: ptr PGresult,
|
||||
outRows: var seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]) =
|
||||
|
@ -219,6 +168,8 @@ method put*(s: PostgresDriver,
|
|||
let version = $message.version
|
||||
let timestamp = $message.timestamp
|
||||
|
||||
debug "put PostgresDriver", timestamp = timestamp
|
||||
|
||||
return await s.writeConnPool.runStmt(InsertRowStmtName,
|
||||
InsertRowStmtDefinition,
|
||||
@[digest,
|
||||
|
@ -258,6 +209,33 @@ method getAllMessages*(s: PostgresDriver):
|
|||
|
||||
return ok(rows)
|
||||
|
||||
proc getPartitionsList(s: PostgresDriver): Future[ArchiveDriverResult[seq[string]]] {.async.} =
|
||||
## Retrieves the seq of partition table names.
|
||||
## e.g: @["messages_1708534333_1708534393", "messages_1708534273_1708534333"]
|
||||
|
||||
var partitions: seq[string]
|
||||
proc rowCallback(pqResult: ptr PGresult) =
|
||||
for iRow in 0..<pqResult.pqNtuples():
|
||||
let partitionName = $(pqgetvalue(pqResult, iRow, 0))
|
||||
partitions.add(partitionName)
|
||||
|
||||
(await s.readConnPool.pgQuery("""
|
||||
SELECT child.relname AS partition_name
|
||||
FROM pg_inherits
|
||||
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
|
||||
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
|
||||
JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace
|
||||
JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace
|
||||
WHERE parent.relname='messages'
|
||||
""",
|
||||
newSeq[string](0),
|
||||
rowCallback
|
||||
)).isOkOr:
|
||||
|
||||
return err("getPartitionsList failed in query: " & $error)
|
||||
|
||||
return ok(partitions)
|
||||
|
||||
proc getMessagesArbitraryQuery(s: PostgresDriver,
|
||||
contentTopic: seq[ContentTopic] = @[],
|
||||
pubsubTopic = none(PubsubTopic),
|
||||
|
@ -428,29 +406,41 @@ method getMessages*(s: PostgresDriver,
|
|||
maxPageSize,
|
||||
ascendingOrder)
|
||||
|
||||
proc getStr(s: PostgresDriver,
|
||||
query: string):
|
||||
Future[ArchiveDriverResult[string]] {.async.} =
|
||||
# Performs a query that is expected to return a single string
|
||||
|
||||
var ret: string
|
||||
proc rowCallback(pqResult: ptr PGresult) =
|
||||
if pqResult.pqnfields() != 1:
|
||||
error "Wrong number of fields in getStr"
|
||||
return
|
||||
|
||||
if pqResult.pqNtuples() != 1:
|
||||
error "Wrong number of rows in getStr"
|
||||
return
|
||||
|
||||
ret = $(pqgetvalue(pqResult, 0, 0))
|
||||
|
||||
(await s.readConnPool.pgQuery(query, newSeq[string](0), rowCallback)).isOkOr:
|
||||
return err("failed in getRow: " & $error)
|
||||
|
||||
return ok(ret)
|
||||
|
||||
proc getInt(s: PostgresDriver,
|
||||
query: string):
|
||||
Future[ArchiveDriverResult[int64]] {.async.} =
|
||||
# Performs a query that is expected to return a single numeric value (int64)
|
||||
|
||||
var retInt = 0'i64
|
||||
proc rowCallback(pqResult: ptr PGresult) =
|
||||
if pqResult.pqnfields() != 1:
|
||||
error "Wrong number of fields in getInt"
|
||||
return
|
||||
let str = (await s.getStr(query)).valueOr:
|
||||
return err("could not get str in getInt: " & $error)
|
||||
|
||||
if pqResult.pqNtuples() != 1:
|
||||
error "Wrong number of rows in getInt"
|
||||
return
|
||||
|
||||
try:
|
||||
retInt = parseInt( $(pqgetvalue(pqResult, 0, 0)) )
|
||||
except ValueError:
|
||||
error "exception in getInt, parseInt", error = getCurrentExceptionMsg()
|
||||
return
|
||||
|
||||
(await s.readConnPool.pgQuery(query, newSeq[string](0), rowCallback)).isOkOr:
|
||||
return err("failed in getRow: " & $error)
|
||||
try:
|
||||
retInt = parseInt( str )
|
||||
except ValueError:
|
||||
return err("exception in getInt, parseInt: " & getCurrentExceptionMsg())
|
||||
|
||||
return ok(retInt)
|
||||
|
||||
|
@ -518,42 +508,11 @@ method deleteOldestMessagesNotWithinLimit*(
|
|||
|
||||
return ok()
|
||||
|
||||
method decreaseDatabaseSize*(driver: PostgresDriver,
|
||||
targetSizeInBytes: int64):
|
||||
Future[ArchiveDriverResult[void]] {.async.} =
|
||||
## TODO: refactor this implementation and use partition management instead
|
||||
## To remove 20% of the outdated data from database
|
||||
const DeleteLimit = 0.80
|
||||
|
||||
## when db size overshoots the database limit, shread 20% of outdated messages
|
||||
## get size of database
|
||||
let dbSize = (await driver.getDatabaseSize()).valueOr:
|
||||
return err("failed to get database size: " & $error)
|
||||
|
||||
## database size in bytes
|
||||
let totalSizeOfDB: int64 = int64(dbSize)
|
||||
|
||||
if totalSizeOfDB < targetSizeInBytes:
|
||||
return ok()
|
||||
|
||||
## to shread/delete messsges, get the total row/message count
|
||||
let numMessages = (await driver.getMessagesCount()).valueOr:
|
||||
return err("failed to get messages count: " & error)
|
||||
|
||||
## NOTE: Using SQLite vacuuming is done manually, we delete a percentage of rows
|
||||
## if vacumming is done automatically then we aim to check DB size periodially for efficient
|
||||
## retention policy implementation.
|
||||
|
||||
## 80% of the total messages are to be kept, delete others
|
||||
let pageDeleteWindow = int(float(numMessages) * DeleteLimit)
|
||||
|
||||
(await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow)).isOkOr:
|
||||
return err("deleting oldest messages failed: " & error)
|
||||
|
||||
return ok()
|
||||
|
||||
method close*(s: PostgresDriver):
|
||||
Future[ArchiveDriverResult[void]] {.async.} =
|
||||
## Cancel the partition factory loop
|
||||
s.futLoopPartitionFactory.cancel()
|
||||
|
||||
## Close the database connection
|
||||
let writeCloseRes = await s.writeConnPool.close()
|
||||
let readCloseRes = await s.readConnPool.close()
|
||||
|
@ -587,6 +546,218 @@ proc sleep*(s: PostgresDriver, seconds: int):
|
|||
|
||||
return ok()
|
||||
|
||||
proc performWriteQuery*(s: PostgresDriver, query: string):
|
||||
Future[ArchiveDriverResult[void]] {.async.} =
|
||||
## Performs a query that somehow changes the state of the database
|
||||
|
||||
(await s.writeConnPool.pgQuery(query)).isOkOr:
|
||||
return err("error in performWriteQuery: " & $error)
|
||||
|
||||
return ok()
|
||||
|
||||
proc addPartition(self: PostgresDriver,
|
||||
startTime: Timestamp,
|
||||
duration: timer.Duration):
|
||||
Future[ArchiveDriverResult[void]] {.async.} =
|
||||
## Creates a partition table that will store the messages that fall in the range
|
||||
## `startTime` <= storedAt < `startTime + duration`.
|
||||
## `startTime` is measured in seconds since epoch
|
||||
|
||||
let beginning = startTime
|
||||
let `end` = (startTime + duration.seconds)
|
||||
|
||||
let fromInSec: string = $beginning
|
||||
let untilInSec: string = $`end`
|
||||
|
||||
let fromInNanoSec: string = fromInSec & "000000000"
|
||||
let untilInNanoSec: string = untilInSec & "000000000"
|
||||
|
||||
let partitionName = "messages_" & fromInSec & "_" & untilInSec
|
||||
|
||||
let createPartitionQuery = "CREATE TABLE IF NOT EXISTS " & partitionName & " PARTITION OF " &
|
||||
"messages FOR VALUES FROM ('" & fromInNanoSec & "') TO ('" & untilInNanoSec & "');"
|
||||
|
||||
(await self.performWriteQuery(createPartitionQuery)).isOkOr:
|
||||
return err(fmt"error adding partition [{partitionName}]: " & $error)
|
||||
|
||||
debug "new partition added", query = createPartitionQuery
|
||||
|
||||
self.partitionMngr.addPartitionInfo(partitionName, beginning, `end`)
|
||||
return ok()
|
||||
|
||||
proc initializePartitionsInfo(self: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} =
|
||||
let partitionNamesRes = await self.getPartitionsList()
|
||||
if not partitionNamesRes.isOk():
|
||||
return err("Could not retrieve partitions list: " & $partitionNamesRes.error)
|
||||
else:
|
||||
let partitionNames = partitionNamesRes.get()
|
||||
for partitionName in partitionNames:
|
||||
## partitionName contains something like 'messages_1708449815_1708449875'
|
||||
let bothTimes = partitionName.replace("messages_", "")
|
||||
let times = bothTimes.split("_")
|
||||
if times.len != 2:
|
||||
return err(fmt"loopPartitionFactory wrong partition name {partitionName}")
|
||||
|
||||
var beginning: int64
|
||||
try:
|
||||
beginning = parseInt(times[0])
|
||||
except ValueError:
|
||||
return err("Could not parse beginning time: " & getCurrentExceptionMsg())
|
||||
|
||||
var `end`: int64
|
||||
try:
|
||||
`end` = parseInt(times[1])
|
||||
except ValueError:
|
||||
return err("Could not parse end time: " & getCurrentExceptionMsg())
|
||||
|
||||
self.partitionMngr.addPartitionInfo(partitionName, beginning, `end`)
|
||||
|
||||
return ok()
|
||||
|
||||
const DefaultDatabasePartitionCheckTimeInterval = timer.minutes(10)
|
||||
const PartitionsRangeInterval = timer.hours(1) ## Time range covered by each parition
|
||||
|
||||
proc loopPartitionFactory(self: PostgresDriver,
|
||||
onFatalError: OnFatalErrorHandler) {.async.} =
|
||||
## Loop proc that continuously checks whether we need to create a new partition.
|
||||
## Notice that the deletion of partitions is handled by the retention policy modules.
|
||||
|
||||
debug "starting loopPartitionFactory"
|
||||
|
||||
if PartitionsRangeInterval < DefaultDatabasePartitionCheckTimeInterval:
|
||||
onFatalError("partition factory partition range interval should be bigger than check interval")
|
||||
|
||||
## First of all, let's make the 'partition_manager' aware of the current partitions
|
||||
(await self.initializePartitionsInfo()).isOkOr:
|
||||
onFatalError("issue in loopPartitionFactory: " & $error)
|
||||
|
||||
while true:
|
||||
trace "Check if we need to create a new partition"
|
||||
|
||||
let now = times.now().toTime().toUnix()
|
||||
|
||||
if self.partitionMngr.isEmpty():
|
||||
debug "adding partition because now there aren't more partitions"
|
||||
(await self.addPartition(now, PartitionsRangeInterval)).isOkOr:
|
||||
onFatalError("error when creating a new partition from empty state: " & $error)
|
||||
else:
|
||||
let newestPartitionRes = self.partitionMngr.getNewestPartition()
|
||||
if newestPartitionRes.isErr():
|
||||
onFatalError("could not get newest partition: " & $newestPartitionRes.error)
|
||||
|
||||
let newestPartition = newestPartitionRes.get()
|
||||
if newestPartition.containsMoment(now):
|
||||
debug "creating a new partition for the future"
|
||||
## The current used partition is the last one that was created.
|
||||
## Thus, let's create another partition for the future.
|
||||
(await self.addPartition(newestPartition.getLastMoment(), PartitionsRangeInterval)).isOkOr:
|
||||
onFatalError("could not add the next partition for 'now': " & $error)
|
||||
elif now >= newestPartition.getLastMoment():
|
||||
debug "creating a new partition to contain current messages"
|
||||
## There is no partition to contain the current time.
|
||||
## This happens if the node has been stopped for quite a long time.
|
||||
## Then, let's create the needed partition to contain 'now'.
|
||||
(await self.addPartition(now, PartitionsRangeInterval)).isOkOr:
|
||||
onFatalError("could not add the next partition: " & $error)
|
||||
|
||||
await sleepAsync(DefaultDatabasePartitionCheckTimeInterval)
|
||||
|
||||
proc startPartitionFactory*(self: PostgresDriver,
|
||||
onFatalError: OnFatalErrorHandler) {.async.} =
|
||||
|
||||
self.futLoopPartitionFactory = self.loopPartitionFactory(onFatalError)
|
||||
|
||||
proc getTableSize*(self: PostgresDriver,
|
||||
tableName: string): Future[ArchiveDriverResult[string]] {.async.} =
|
||||
## Returns a human-readable representation of the size for the requested table.
|
||||
## tableName - table of interest.
|
||||
|
||||
let tableSize = (await self.getStr(fmt"""
|
||||
SELECT pg_size_pretty(pg_total_relation_size(C.oid)) AS "total_size"
|
||||
FROM pg_class C
|
||||
where relname = '{tableName}'""")).valueOr:
|
||||
return err("error in getDatabaseSize: " & error)
|
||||
|
||||
return ok(tableSize)
|
||||
|
||||
proc removeOldestPartition(self: PostgresDriver,
|
||||
forceRemoval: bool = false, ## To allow cleanup in tests
|
||||
):
|
||||
Future[ArchiveDriverResult[void]] {.async.} =
|
||||
## Indirectly called from the retention policy
|
||||
|
||||
let oldestPartition = self.partitionMngr.getOldestPartition().valueOr:
|
||||
return err("could not remove oldest partition: " & $error)
|
||||
|
||||
if not forceRemoval:
|
||||
let now = times.now().toTime().toUnix()
|
||||
let currentPartitionRes = self.partitionMngr.getPartitionFromDateTime(now)
|
||||
if currentPartitionRes.isOk():
|
||||
## The database contains a partition that would store current messages.
|
||||
|
||||
if currentPartitionRes.get() == oldestPartition:
|
||||
debug "Skipping to remove the current partition"
|
||||
return ok()
|
||||
|
||||
var partSize = ""
|
||||
let partSizeRes = await self.getTableSize(oldestPartition.getName())
|
||||
if partSizeRes.isOk():
|
||||
partSize = partSizeRes.get()
|
||||
|
||||
## In the following lines is where the partition removal happens.
|
||||
## Detach and remove the partition concurrently to not block the parent table (messages)
|
||||
let detachPartitionQuery =
|
||||
"ALTER TABLE messages DETACH PARTITION " & oldestPartition.getName() & " CONCURRENTLY;"
|
||||
debug "removeOldestPartition", query = detachPartitionQuery
|
||||
(await self.performWriteQuery(detachPartitionQuery)).isOkOr:
|
||||
return err(fmt"error in {detachPartitionQuery}: " & $error)
|
||||
|
||||
## Drop the partition
|
||||
let dropPartitionQuery = "DROP TABLE " & oldestPartition.getName()
|
||||
debug "removeOldestPartition drop partition", query = dropPartitionQuery
|
||||
(await self.performWriteQuery(dropPartitionQuery)).isOkOr:
|
||||
return err(fmt"error in {dropPartitionQuery}: " & $error)
|
||||
|
||||
debug "removed partition", partition_name = oldestPartition.getName(), partition_size = partSize
|
||||
self.partitionMngr.removeOldestPartitionName()
|
||||
|
||||
return ok()
|
||||
|
||||
proc containsAnyPartition*(self: PostgresDriver): bool =
|
||||
return not self.partitionMngr.isEmpty()
|
||||
|
||||
method decreaseDatabaseSize*(driver: PostgresDriver,
|
||||
targetSizeInBytes: int64,
|
||||
forceRemoval: bool = false):
|
||||
Future[ArchiveDriverResult[void]] {.async.} =
|
||||
var dbSize = (await driver.getDatabaseSize()).valueOr:
|
||||
return err("decreaseDatabaseSize failed to get database size: " & $error)
|
||||
|
||||
## database size in bytes
|
||||
var totalSizeOfDB: int64 = int64(dbSize)
|
||||
|
||||
if totalSizeOfDB <= targetSizeInBytes:
|
||||
return ok()
|
||||
|
||||
debug "start reducing database size", targetSize = $targetSizeInBytes, currentSize = $totalSizeOfDB
|
||||
|
||||
while totalSizeOfDB > targetSizeInBytes and driver.containsAnyPartition():
|
||||
(await driver.removeOldestPartition(forceRemoval)).isOkOr:
|
||||
return err("decreaseDatabaseSize inside loop failed to remove oldest partition: " & $error)
|
||||
|
||||
dbSize = (await driver.getDatabaseSize()).valueOr:
|
||||
return err("decreaseDatabaseSize inside loop failed to get database size: " & $error)
|
||||
|
||||
let newCurrentSize = int64(dbSize)
|
||||
if newCurrentSize == totalSizeOfDB:
|
||||
return err("the previous partition removal didn't clear database size")
|
||||
|
||||
totalSizeOfDB = newCurrentSize
|
||||
|
||||
debug "reducing database size", targetSize = $targetSizeInBytes, newCurrentSize = $totalSizeOfDB
|
||||
|
||||
return ok()
|
||||
|
||||
method existsTable*(s: PostgresDriver, tableName: string):
|
||||
Future[ArchiveDriverResult[bool]] {.async.} =
|
||||
let query: string = fmt"""
|
||||
|
@ -628,3 +799,5 @@ proc getCurrentVersion*(s: PostgresDriver):
|
|||
return err("error in getMessagesCount: " & $error)
|
||||
|
||||
return ok(res)
|
||||
|
||||
|
||||
|
|
|
@ -322,7 +322,8 @@ method deleteOldestMessagesNotWithinLimit*(driver: QueueDriver,
|
|||
return err("interface method not implemented")
|
||||
|
||||
method decreaseDatabaseSize*(driver: QueueDriver,
|
||||
targetSizeInBytes: int64):
|
||||
targetSizeInBytes: int64,
|
||||
forceRemoval: bool = false):
|
||||
Future[ArchiveDriverResult[void]] {.async.} =
|
||||
return err("interface method not implemented")
|
||||
|
||||
|
|
|
@ -147,7 +147,8 @@ method deleteOldestMessagesNotWithinLimit*(s: SqliteDriver,
|
|||
return s.db.deleteOldestMessagesNotWithinLimit(limit)
|
||||
|
||||
method decreaseDatabaseSize*(driver: SqliteDriver,
|
||||
targetSizeInBytes: int64):
|
||||
targetSizeInBytes: int64,
|
||||
forceRemoval: bool = false):
|
||||
Future[ArchiveDriverResult[void]] {.async.} =
|
||||
|
||||
## To remove 20% of the outdated data from database
|
||||
|
|
Loading…
Reference in New Issue