nwaku/waku/waku_archive/driver/postgres_driver/postgres_driver.nim

1554 lines
50 KiB
Nim

{.push raises: [].}
import
std/[nre, options, sequtils, strutils, strformat, times, sugar],
stew/[byteutils, arrayops],
results,
chronos,
db_connector/[postgres, db_common],
chronicles
import
../../../common/error_handling,
../../../waku_core,
../../common,
../../driver,
../../../common/databases/db_postgres as waku_postgres,
./postgres_healthcheck,
./partitions_manager
type PostgresDriver* = ref object of ArchiveDriver
## Establish a separate pools for read/write operations
writeConnPool: PgAsyncPool
readConnPool: PgAsyncPool
## Partition container
partitionMngr: PartitionManager
futLoopPartitionFactory: Future[void]
futLoopAnalyzeTable: Future[void]
const InsertRowStmtName = "InsertRow"
const InsertRowStmtDefinition =
"""INSERT INTO messages (id, messageHash, pubsubTopic, contentTopic, payload,
version, timestamp, meta) VALUES ($1, $2, $3, $4, $5, $6, $7, CASE WHEN $8 = '' THEN NULL ELSE $8 END) ON CONFLICT DO NOTHING;"""
const InsertRowInMessagesLookupStmtName = "InsertRowMessagesLookup"
const InsertRowInMessagesLookupStmtDefinition =
"""INSERT INTO messages_lookup (messageHash, timestamp) VALUES ($1, $2) ON CONFLICT DO NOTHING;"""
const SelectClause =
"""SELECT messageHash, pubsubTopic, contentTopic, payload, version, timestamp, meta FROM messages """
const SelectNoCursorAscStmtName = "SelectWithoutCursorAsc"
const SelectNoCursorAscStmtDef =
SelectClause &
"""WHERE contentTopic IN ($1) AND
messageHash IN ($2) AND
pubsubTopic = $3 AND
timestamp >= $4 AND
timestamp <= $5
ORDER BY timestamp ASC, messageHash ASC LIMIT $6;"""
const SelectNoCursorNoDataAscStmtName = "SelectWithoutCursorAndDataAsc"
const SelectNoCursorNoDataAscStmtDef =
"""SELECT messageHash FROM messages
WHERE contentTopic IN ($1) AND
messageHash IN ($2) AND
pubsubTopic = $3 AND
timestamp >= $4 AND
timestamp <= $5
ORDER BY timestamp ASC, messageHash ASC LIMIT $6;"""
const SelectNoCursorDescStmtName = "SelectWithoutCursorDesc"
const SelectNoCursorDescStmtDef =
SelectClause &
"""WHERE contentTopic IN ($1) AND
messageHash IN ($2) AND
pubsubTopic = $3 AND
timestamp >= $4 AND
timestamp <= $5
ORDER BY timestamp DESC, messageHash DESC LIMIT $6;"""
const SelectNoCursorNoDataDescStmtName = "SelectWithoutCursorAndDataDesc"
const SelectNoCursorNoDataDescStmtDef =
"""SELECT messageHash FROM messages
WHERE contentTopic IN ($1) AND
messageHash IN ($2) AND
pubsubTopic = $3 AND
timestamp >= $4 AND
timestamp <= $5
ORDER BY timestamp DESC, messageHash DESC LIMIT $6;"""
const SelectWithCursorDescStmtName = "SelectWithCursorDesc"
const SelectWithCursorDescStmtDef =
SelectClause &
"""WHERE contentTopic IN ($1) AND
messageHash IN ($2) AND
pubsubTopic = $3 AND
(timestamp, messageHash) < ($4,$5) AND
timestamp >= $6 AND
timestamp <= $7
ORDER BY timestamp DESC, messageHash DESC LIMIT $8;"""
const SelectWithCursorNoDataDescStmtName = "SelectWithCursorNoDataDesc"
const SelectWithCursorNoDataDescStmtDef =
"""SELECT messageHash FROM messages
WHERE contentTopic IN ($1) AND
messageHash IN ($2) AND
pubsubTopic = $3 AND
(timestamp, messageHash) < ($4,$5) AND
timestamp >= $6 AND
timestamp <= $7
ORDER BY timestamp DESC, messageHash DESC LIMIT $8;"""
const SelectWithCursorAscStmtName = "SelectWithCursorAsc"
const SelectWithCursorAscStmtDef =
SelectClause &
"""WHERE contentTopic IN ($1) AND
messageHash IN ($2) AND
pubsubTopic = $3 AND
(timestamp, messageHash) > ($4,$5) AND
timestamp >= $6 AND
timestamp <= $7
ORDER BY timestamp ASC, messageHash ASC LIMIT $8;"""
const SelectWithCursorNoDataAscStmtName = "SelectWithCursorNoDataAsc"
const SelectWithCursorNoDataAscStmtDef =
"""SELECT messageHash FROM messages
WHERE contentTopic IN ($1) AND
messageHash IN ($2) AND
pubsubTopic = $3 AND
(timestamp, messageHash) > ($4,$5) AND
timestamp >= $6 AND
timestamp <= $7
ORDER BY timestamp ASC, messageHash ASC LIMIT $8;"""
const SelectCursorByHashName = "SelectMessageByHashInMessagesLookup"
const SelectCursorByHashDef =
"""SELECT timestamp FROM messages_lookup
WHERE messageHash = $1"""
const
DefaultMaxNumConns = 50
MaxHashesPerQuery = 100
proc new*(
T: type PostgresDriver,
dbUrl: string,
maxConnections = DefaultMaxNumConns,
onFatalErrorAction: OnFatalErrorHandler = nil,
): ArchiveDriverResult[T] =
## Very simplistic split of max connections
let maxNumConnOnEachPool = int(maxConnections / 2)
let readConnPool = PgAsyncPool.new(dbUrl, maxNumConnOnEachPool).valueOr:
return err("error creating read conn pool PgAsyncPool")
let writeConnPool = PgAsyncPool.new(dbUrl, maxNumConnOnEachPool).valueOr:
return err("error creating write conn pool PgAsyncPool")
if not isNil(onFatalErrorAction):
asyncSpawn checkConnectivity(readConnPool, onFatalErrorAction)
if not isNil(onFatalErrorAction):
asyncSpawn checkConnectivity(writeConnPool, onFatalErrorAction)
let driver = PostgresDriver(
writeConnPool: writeConnPool,
readConnPool: readConnPool,
partitionMngr: PartitionManager.new(),
)
return ok(driver)
proc reset*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} =
## Clear the database partitions
let targetSize = 0
let forceRemoval = true
let ret = await s.decreaseDatabaseSize(targetSize, forceRemoval)
return ret
proc timeCursorCallbackImpl(pqResult: ptr PGresult, timeCursor: var Option[Timestamp]) =
## Callback to get a timestamp out of the DB.
## Used to get the cursor timestamp.
let numFields = pqResult.pqnfields()
if numFields != 1:
error "Wrong number of fields"
return
let rawTimestamp = $(pqgetvalue(pqResult, 0, 0))
trace "db output", rawTimestamp
if rawTimestamp.len < 1:
return
let catchable = catch:
parseBiggestInt(rawTimestamp)
if catchable.isErr():
error "could not parse correctly", error = catchable.error.msg
return
timeCursor = some(catchable.get())
proc hashCallbackImpl(
pqResult: ptr PGresult, rows: var seq[(WakuMessageHash, PubsubTopic, WakuMessage)]
) =
## Callback to get a hash out of the DB.
## Used when queries only ask for hashes
let numFields = pqResult.pqnfields()
if numFields != 1:
error "Wrong number of fields"
return
for iRow in 0 ..< pqResult.pqNtuples():
let rawHash = $(pqgetvalue(pqResult, iRow, 0))
trace "db output", rawHash
if rawHash.len < 1:
return
let catchable = catch:
parseHexStr(rawHash)
if catchable.isErr():
error "could not parse correctly", error = catchable.error.msg
return
let hashHex = catchable.get()
let msgHash = fromBytes(hashHex.toOpenArrayByte(0, 31))
rows.add((msgHash, "", WakuMessage()))
proc rowCallbackImpl(
pqResult: ptr PGresult,
outRows: var seq[(WakuMessageHash, PubsubTopic, WakuMessage)],
) =
## Proc aimed to contain the logic of the callback passed to the `psasyncpool`.
## That callback is used in "SELECT" queries.
##
## pqResult - contains the query results
## outRows - seq of Store-rows. This is populated from the info contained in pqResult
let numFields = pqResult.pqnfields()
if numFields != 7:
error "Wrong number of fields"
return
for iRow in 0 ..< pqResult.pqNtuples():
var
rawHash: string
rawPayload: string
rawVersion: string
rawTimestamp: string
rawMeta: string
hashHex: string
msgHash: WakuMessageHash
pubSubTopic: string
contentTopic: string
payload: string
version: uint
timestamp: Timestamp
meta: string
wakuMessage: WakuMessage
rawHash = $(pqgetvalue(pqResult, iRow, 0))
pubSubTopic = $(pqgetvalue(pqResult, iRow, 1))
contentTopic = $(pqgetvalue(pqResult, iRow, 2))
rawPayload = $(pqgetvalue(pqResult, iRow, 3))
rawVersion = $(pqgetvalue(pqResult, iRow, 4))
rawTimestamp = $(pqgetvalue(pqResult, iRow, 5))
rawMeta = $(pqgetvalue(pqResult, iRow, 6))
trace "db output",
rawHash, pubSubTopic, contentTopic, rawPayload, rawVersion, rawTimestamp, rawMeta
try:
hashHex = parseHexStr(rawHash)
payload = parseHexStr(rawPayload)
version = parseUInt(rawVersion)
timestamp = parseInt(rawTimestamp)
meta = parseHexStr(rawMeta)
except ValueError:
error "could not parse correctly", error = getCurrentExceptionMsg()
msgHash = fromBytes(hashHex.toOpenArrayByte(0, 31))
wakuMessage.contentTopic = contentTopic
wakuMessage.payload = @(payload.toOpenArrayByte(0, payload.high))
wakuMessage.version = uint32(version)
wakuMessage.timestamp = timestamp
wakuMessage.meta = @(meta.toOpenArrayByte(0, meta.high))
outRows.add((msgHash, pubSubTopic, wakuMessage))
method put*(
s: PostgresDriver,
messageHash: WakuMessageHash,
pubsubTopic: PubsubTopic,
message: WakuMessage,
): Future[ArchiveDriverResult[void]] {.async.} =
let messageHash = toHex(messageHash)
let contentTopic = message.contentTopic
let payload = toHex(message.payload)
let version = $message.version
let timestamp = $message.timestamp
let meta = toHex(message.meta)
trace "put PostgresDriver",
messageHash, contentTopic, payload, version, timestamp, meta
## this is not needed for store-v3. Nevertheless, we will keep that temporarily
## until we completely remove the store/archive-v2 logic
let fakeId = "0"
(
## Add the row to the messages table
await s.writeConnPool.runStmt(
InsertRowStmtName,
InsertRowStmtDefinition,
@[
fakeId, messageHash, pubsubTopic, contentTopic, payload, version, timestamp,
meta,
],
@[
int32(fakeId.len),
int32(messageHash.len),
int32(pubsubTopic.len),
int32(contentTopic.len),
int32(payload.len),
int32(version.len),
int32(timestamp.len),
int32(meta.len),
],
@[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)],
)
).isOkOr:
return err("could not put msg in messages table: " & $error)
## Now add the row to messages_lookup
return await s.writeConnPool.runStmt(
InsertRowInMessagesLookupStmtName,
InsertRowInMessagesLookupStmtDefinition,
@[messageHash, timestamp],
@[int32(messageHash.len), int32(timestamp.len)],
@[int32(0), int32(0)],
)
method getAllMessages*(
s: PostgresDriver
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## Retrieve all messages from the store.
var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)]
proc rowCallback(pqResult: ptr PGresult) =
rowCallbackImpl(pqResult, rows)
(
await s.readConnPool.pgQuery(
"""SELECT messageHash, pubsubTopic, contentTopic, payload, version, timestamp, meta
FROM messages
ORDER BY timestamp ASC, messageHash ASC""",
newSeq[string](0),
rowCallback,
)
).isOkOr:
return err("failed in query: " & $error)
return ok(rows)
proc getPartitionsList(
s: PostgresDriver
): Future[ArchiveDriverResult[seq[string]]] {.async.} =
## Retrieves the seq of partition table names.
## e.g: @["messages_1708534333_1708534393", "messages_1708534273_1708534333"]
var partitions: seq[string]
proc rowCallback(pqResult: ptr PGresult) =
for iRow in 0 ..< pqResult.pqNtuples():
let partitionName = $(pqgetvalue(pqResult, iRow, 0))
partitions.add(partitionName)
(
await s.readConnPool.pgQuery(
"""
SELECT child.relname AS partition_name
FROM pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace
JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace
WHERE parent.relname='messages'
ORDER BY partition_name ASC
""",
newSeq[string](0),
rowCallback,
)
).isOkOr:
return err("getPartitionsList failed in query: " & $error)
return ok(partitions)
proc getTimeCursor(
s: PostgresDriver, hashHex: string
): Future[ArchiveDriverResult[Option[Timestamp]]] {.async.} =
var timeCursor: Option[Timestamp]
proc cursorCallback(pqResult: ptr PGresult) =
timeCursorCallbackImpl(pqResult, timeCursor)
?await s.readConnPool.runStmt(
SelectCursorByHashName,
SelectCursorByHashDef,
@[hashHex],
@[int32(hashHex.len)],
@[int32(0)],
cursorCallback,
)
return ok(timeCursor)
proc getMessagesArbitraryQuery(
s: PostgresDriver,
contentTopics: seq[ContentTopic] = @[],
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
hexHashes: seq[string] = @[],
maxPageSize = DefaultPageSize,
ascendingOrder = true,
requestId: string,
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## This proc allows to handle atypical queries. We don't use prepared statements for those.
var query = SelectClause
var statements: seq[string]
var args: seq[string]
if cursor.isSome():
let hashHex = toHex(cursor.get())
let timeCursor = ?await s.getTimeCursor(hashHex)
if timeCursor.isNone():
return err("cursor not found")
let comp = if ascendingOrder: ">" else: "<"
statements.add("(timestamp, messageHash) " & comp & " (?,?)")
args.add($timeCursor.get())
args.add(hashHex)
if contentTopics.len > 0:
let cstmt = "contentTopic IN (" & "?".repeat(contentTopics.len).join(",") & ")"
statements.add(cstmt)
for t in contentTopics:
args.add(t)
if hexHashes.len > 0:
let cstmt = "messageHash IN (" & "?".repeat(hexHashes.len).join(",") & ")"
statements.add(cstmt)
for t in hexHashes:
args.add(t)
if pubsubTopic.isSome():
statements.add("pubsubTopic = ?")
args.add(pubsubTopic.get())
if startTime.isSome():
statements.add("timestamp >= ?")
args.add($startTime.get())
if endTime.isSome():
statements.add("timestamp <= ?")
args.add($endTime.get())
if statements.len > 0:
query &= " WHERE " & statements.join(" AND ")
var direction: string
if ascendingOrder:
direction = "ASC"
else:
direction = "DESC"
query &= " ORDER BY timestamp " & direction & ", messageHash " & direction
query &= " LIMIT ?"
args.add($maxPageSize)
var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)]
proc rowCallback(pqResult: ptr PGresult) =
rowCallbackImpl(pqResult, rows)
(await s.readConnPool.pgQuery(query, args, rowCallback, requestId)).isOkOr:
return err("failed to run query: " & $error)
return ok(rows)
proc getMessageHashesArbitraryQuery(
s: PostgresDriver,
contentTopics: seq[ContentTopic] = @[],
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
hexHashes: seq[string] = @[],
maxPageSize = DefaultPageSize,
ascendingOrder = true,
requestId: string,
): Future[ArchiveDriverResult[seq[(WakuMessageHash, PubsubTopic, WakuMessage)]]] {.
async
.} =
## This proc allows to handle atypical queries. We don't use prepared statements for those.
var query = """SELECT messageHash FROM messages"""
var statements: seq[string]
var args: seq[string]
if cursor.isSome():
let hashHex = toHex(cursor.get())
let timeCursor = ?await s.getTimeCursor(hashHex)
if timeCursor.isNone():
return err("cursor not found")
let comp = if ascendingOrder: ">" else: "<"
statements.add("(timestamp, messageHash) " & comp & " (?,?)")
args.add($timeCursor.get())
args.add(hashHex)
if contentTopics.len > 0:
let cstmt = "contentTopic IN (" & "?".repeat(contentTopics.len).join(",") & ")"
statements.add(cstmt)
for t in contentTopics:
args.add(t)
if hexHashes.len > 0:
let cstmt = "messageHash IN (" & "?".repeat(hexHashes.len).join(",") & ")"
statements.add(cstmt)
for t in hexHashes:
args.add(t)
if pubsubTopic.isSome():
statements.add("pubsubTopic = ?")
args.add(pubsubTopic.get())
if startTime.isSome():
statements.add("timestamp >= ?")
args.add($startTime.get())
if endTime.isSome():
statements.add("timestamp <= ?")
args.add($endTime.get())
if statements.len > 0:
query &= " WHERE " & statements.join(" AND ")
var direction: string
if ascendingOrder:
direction = "ASC"
else:
direction = "DESC"
query &= " ORDER BY timestamp " & direction & ", messageHash " & direction
query &= " LIMIT ?"
args.add($maxPageSize)
var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)]
proc rowCallback(pqResult: ptr PGresult) =
hashCallbackImpl(pqResult, rows)
(await s.readConnPool.pgQuery(query, args, rowCallback, requestId)).isOkOr:
return err("failed to run query: " & $error)
return ok(rows)
proc getMessagesPreparedStmt(
s: PostgresDriver,
contentTopic: string,
pubsubTopic: PubsubTopic,
cursor = none(ArchiveCursor),
startTime: Timestamp,
endTime: Timestamp,
hashes: string,
maxPageSize = DefaultPageSize,
ascOrder = true,
requestId: string,
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## This proc aims to run the most typical queries in a more performant way,
## i.e. by means of prepared statements.
var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)]
proc rowCallback(pqResult: ptr PGresult) =
rowCallbackImpl(pqResult, rows)
let startTimeStr = $startTime
let endTimeStr = $endTime
let limit = $maxPageSize
if cursor.isNone():
var stmtName =
if ascOrder: SelectNoCursorAscStmtName else: SelectNoCursorDescStmtName
var stmtDef = if ascOrder: SelectNoCursorAscStmtDef else: SelectNoCursorDescStmtDef
(
await s.readConnPool.runStmt(
stmtName,
stmtDef,
@[contentTopic, hashes, pubsubTopic, startTimeStr, endTimeStr, limit],
@[
int32(contentTopic.len),
int32(pubsubTopic.len),
int32(startTimeStr.len),
int32(endTimeStr.len),
int32(limit.len),
],
@[int32(0), int32(0), int32(0), int32(0), int32(0)],
rowCallback,
requestId,
)
).isOkOr:
return err(stmtName & ": " & $error)
return ok(rows)
let hashHex = toHex(cursor.get())
let timeCursor = ?await s.getTimeCursor(hashHex)
if timeCursor.isNone():
return err("cursor not found")
let timeString = $timeCursor.get()
var stmtName =
if ascOrder: SelectWithCursorAscStmtName else: SelectWithCursorDescStmtName
var stmtDef =
if ascOrder: SelectWithCursorAscStmtDef else: SelectWithCursorDescStmtDef
(
await s.readConnPool.runStmt(
stmtName,
stmtDef,
@[
contentTopic, hashes, pubsubTopic, hashHex, timeString, startTimeStr,
endTimeStr, limit,
],
@[
int32(contentTopic.len),
int32(hashes.len),
int32(pubsubTopic.len),
int32(timeString.len),
int32(hashHex.len),
int32(startTimeStr.len),
int32(endTimeStr.len),
int32(limit.len),
],
@[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)],
rowCallback,
requestId,
)
).isOkOr:
return err(stmtName & ": " & $error)
return ok(rows)
proc getMessageHashesPreparedStmt(
s: PostgresDriver,
contentTopic: string,
pubsubTopic: PubsubTopic,
cursor = none(ArchiveCursor),
startTime: Timestamp,
endTime: Timestamp,
hashes: string,
maxPageSize = DefaultPageSize,
ascOrder = true,
requestId: string,
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## This proc aims to run the most typical queries in a more performant way,
## i.e. by means of prepared statements.
var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)]
proc rowCallback(pqResult: ptr PGresult) =
hashCallbackImpl(pqResult, rows)
let startTimeStr = $startTime
let endTimeStr = $endTime
let limit = $maxPageSize
if cursor.isNone():
var stmtName =
if ascOrder: SelectNoCursorNoDataAscStmtName else: SelectNoCursorNoDataDescStmtName
var stmtDef =
if ascOrder: SelectNoCursorNoDataAscStmtDef else: SelectNoCursorNoDataDescStmtDef
(
await s.readConnPool.runStmt(
stmtName,
stmtDef,
@[contentTopic, hashes, pubsubTopic, startTimeStr, endTimeStr, limit],
@[
int32(contentTopic.len),
int32(hashes.len),
int32(pubsubTopic.len),
int32(startTimeStr.len),
int32(endTimeStr.len),
int32(limit.len),
],
@[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)],
rowCallback,
requestId,
)
).isOkOr:
return err(stmtName & ": " & $error)
return ok(rows)
let hashHex = toHex(cursor.get())
let timeCursor = ?await s.getTimeCursor(hashHex)
if timeCursor.isNone():
return err("cursor not found")
let timeString = $timeCursor.get()
var stmtName =
if ascOrder:
SelectWithCursorNoDataAscStmtName
else:
SelectWithCursorNoDataDescStmtName
var stmtDef =
if ascOrder: SelectWithCursorNoDataAscStmtDef else: SelectWithCursorNoDataDescStmtDef
(
await s.readConnPool.runStmt(
stmtName,
stmtDef,
@[
contentTopic, hashes, pubsubTopic, hashHex, timeString, startTimeStr,
endTimeStr, limit,
],
@[
int32(contentTopic.len),
int32(hashes.len),
int32(pubsubTopic.len),
int32(timeString.len),
int32(hashHex.len),
int32(startTimeStr.len),
int32(endTimeStr.len),
int32(limit.len),
],
@[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)],
rowCallback,
requestId,
)
).isOkOr:
return err(stmtName & ": " & $error)
return ok(rows)
proc getMessagesByMessageHashes(
s: PostgresDriver, hashes: string, maxPageSize: uint, requestId: string
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## Retrieves information only filtering by a given messageHashes list.
## This proc levarages on the messages_lookup table to have better query performance
## and only query the desired partitions in the partitioned messages table
var query =
fmt"""
WITH min_timestamp AS (
SELECT MIN(timestamp) AS min_ts
FROM messages_lookup
WHERE messagehash IN (
{hashes}
)
)
SELECT m.messageHash, pubsubTopic, contentTopic, payload, version, m.timestamp, meta
FROM messages m
INNER JOIN
messages_lookup l
ON
m.timestamp = l.timestamp
AND m.messagehash = l.messagehash
WHERE
l.timestamp >= (SELECT min_ts FROM min_timestamp)
AND l.messagehash IN (
{hashes}
)
ORDER BY
m.timestamp DESC,
m.messagehash DESC
LIMIT {maxPageSize};
"""
var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)]
proc rowCallback(pqResult: ptr PGresult) =
rowCallbackImpl(pqResult, rows)
(
await s.readConnPool.pgQuery(
query = query, rowCallback = rowCallback, requestId = requestId
)
).isOkOr:
return err("failed to run query: " & $error)
return ok(rows)
proc getMessagesWithinLimits(
self: PostgresDriver,
includeData: bool,
contentTopics: seq[ContentTopic],
pubsubTopic: Option[PubsubTopic],
cursor: Option[ArchiveCursor],
startTime: Option[Timestamp],
endTime: Option[Timestamp],
hashes: seq[WakuMessageHash],
maxPageSize: uint,
ascendingOrder: bool,
requestId: string,
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
if hashes.len > MaxHashesPerQuery:
return err(fmt"can not attend queries with more than {MaxHashesPerQuery} hashes")
let hexHashes = hashes.mapIt(toHex(it))
if cursor.isNone() and pubsubTopic.isNone() and contentTopics.len == 0 and
startTime.isNone() and endTime.isNone() and hexHashes.len > 0:
return await self.getMessagesByMessageHashes(
"'" & hexHashes.join("','") & "'", maxPageSize, requestId
)
if contentTopics.len > 0 and hexHashes.len > 0 and pubsubTopic.isSome() and
startTime.isSome() and endTime.isSome():
## Considered the most common query. Therefore, we use prepared statements to optimize it.
if includeData:
return await self.getMessagesPreparedStmt(
contentTopics.join(","),
PubsubTopic(pubsubTopic.get()),
cursor,
startTime.get(),
endTime.get(),
hexHashes.join(","),
maxPageSize,
ascendingOrder,
requestId,
)
else:
return await self.getMessageHashesPreparedStmt(
contentTopics.join(","),
PubsubTopic(pubsubTopic.get()),
cursor,
startTime.get(),
endTime.get(),
hexHashes.join(","),
maxPageSize,
ascendingOrder,
requestId,
)
else:
if includeData:
## We will run atypical query. In this case we don't use prepared statemets
return await self.getMessagesArbitraryQuery(
contentTopics, pubsubTopic, cursor, startTime, endTime, hexHashes, maxPageSize,
ascendingOrder, requestId,
)
else:
return await self.getMessageHashesArbitraryQuery(
contentTopics, pubsubTopic, cursor, startTime, endTime, hexHashes, maxPageSize,
ascendingOrder, requestId,
)
method getMessages*(
s: PostgresDriver,
includeData = true,
contentTopics = newSeq[ContentTopic](0),
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
hashes = newSeq[WakuMessageHash](0),
maxPageSize = DefaultPageSize,
ascendingOrder = true,
requestId = "",
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
let rows = collect(newSeq):
for i in countup(0, hashes.len, MaxHashesPerQuery):
let stop = min(i + MaxHashesPerQuery, hashes.len)
let splittedHashes = hashes[i ..< stop]
let subRows =
?await s.getMessagesWithinLimits(
includeData, contentTopics, pubsubTopic, cursor, startTime, endTime,
splittedHashes, maxPageSize, ascendingOrder, requestId,
)
for row in subRows:
row
return ok(rows)
proc getStr(
s: PostgresDriver, query: string
): Future[ArchiveDriverResult[string]] {.async.} =
# Performs a query that is expected to return a single string
var ret: string
proc rowCallback(pqResult: ptr PGresult) =
if pqResult.pqnfields() != 1:
error "Wrong number of fields in getStr"
return
if pqResult.pqNtuples() != 1:
error "Wrong number of rows in getStr"
return
ret = $(pqgetvalue(pqResult, 0, 0))
(await s.readConnPool.pgQuery(query, newSeq[string](0), rowCallback)).isOkOr:
return err("failed in getRow: " & $error)
return ok(ret)
proc getInt(
s: PostgresDriver, query: string
): Future[ArchiveDriverResult[int64]] {.async.} =
# Performs a query that is expected to return a single numeric value (int64)
var retInt = 0'i64
let str = (await s.getStr(query)).valueOr:
return err("could not get str in getInt: " & $error)
try:
retInt = parseInt(str)
except ValueError:
return err(
"exception in getInt, parseInt, str: " & str & " query: " & query & " exception: " &
getCurrentExceptionMsg()
)
return ok(retInt)
method getDatabaseSize*(
s: PostgresDriver
): Future[ArchiveDriverResult[int64]] {.async.} =
let intRes = (await s.getInt("SELECT pg_database_size(current_database())")).valueOr:
return err("error in getDatabaseSize: " & error)
let databaseSize: int64 = int64(intRes)
return ok(databaseSize)
method getMessagesCount*(
s: PostgresDriver
): Future[ArchiveDriverResult[int64]] {.async.} =
let intRes = await s.getInt("SELECT COUNT(1) FROM messages")
if intRes.isErr():
return err("error in getMessagesCount: " & intRes.error)
return ok(intRes.get())
method getOldestMessageTimestamp*(
s: PostgresDriver
): Future[ArchiveDriverResult[Timestamp]] {.async.} =
## In some cases it could happen that we have
## empty partitions which are older than the current stored rows.
## In those cases we want to consider those older partitions as the oldest considered timestamp.
let oldestPartition = s.partitionMngr.getOldestPartition().valueOr:
return err("could not get oldest partition: " & $error)
let oldestPartitionTimeNanoSec = oldestPartition.getPartitionStartTimeInNanosec()
let intRes = await s.getInt("SELECT MIN(timestamp) FROM messages")
if intRes.isErr():
## Just return the oldest partition time considering the partitions set
return ok(Timestamp(oldestPartitionTimeNanoSec))
return ok(Timestamp(min(intRes.get(), oldestPartitionTimeNanoSec)))
method getNewestMessageTimestamp*(
s: PostgresDriver
): Future[ArchiveDriverResult[Timestamp]] {.async.} =
let intRes = await s.getInt("SELECT MAX(timestamp) FROM messages")
if intRes.isErr():
return err("error in getNewestMessageTimestamp: " & intRes.error)
return ok(Timestamp(intRes.get()))
method deleteOldestMessagesNotWithinLimit*(
s: PostgresDriver, limit: int
): Future[ArchiveDriverResult[void]] {.async.} =
var execRes = await s.writeConnPool.pgQuery(
"""DELETE FROM messages WHERE messageHash NOT IN
(
SELECT messageHash FROM messages ORDER BY timestamp DESC LIMIT ?
);""",
@[$limit],
)
if execRes.isErr():
return err("error in deleteOldestMessagesNotWithinLimit: " & execRes.error)
execRes = await s.writeConnPool.pgQuery(
"""DELETE FROM messages_lookup WHERE messageHash NOT IN
(
SELECT messageHash FROM messages ORDER BY timestamp DESC LIMIT ?
);""",
@[$limit],
)
if execRes.isErr():
return err(
"error in deleteOldestMessagesNotWithinLimit messages_lookup: " & execRes.error
)
return ok()
method close*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} =
## Cancel the partition factory loop
s.futLoopPartitionFactory.cancelSoon()
## Cancel analyze table loop
if not s.futLoopAnalyzeTable.isNil():
s.futLoopAnalyzeTable.cancelSoon()
## Close the database connection
let writeCloseRes = await s.writeConnPool.close()
let readCloseRes = await s.readConnPool.close()
writeCloseRes.isOkOr:
return err("error closing write pool: " & $error)
readCloseRes.isOkOr:
return err("error closing read pool: " & $error)
return ok()
proc sleep*(
s: PostgresDriver, seconds: int
): Future[ArchiveDriverResult[void]] {.async.} =
# This is for testing purposes only. It is aimed to test the proper
# implementation of asynchronous requests. It merely triggers a sleep in the
# database for the amount of seconds given as a parameter.
proc rowCallback(result: ptr PGresult) =
## We are not interested in any value in this case
discard
try:
let params = @[$seconds]
(await s.writeConnPool.pgQuery("SELECT pg_sleep(?)", params, rowCallback)).isOkOr:
return err("error in postgres_driver sleep: " & $error)
except DbError:
# This always raises an exception although the sleep works
return err("exception sleeping: " & getCurrentExceptionMsg())
return ok()
const EXPECTED_LOCK_ERROR* = "another waku instance is currently executing a migration"
proc acquireDatabaseLock*(
s: PostgresDriver, lockId: int = 841886
): Future[ArchiveDriverResult[void]] {.async.} =
## Acquire an advisory lock (useful to avoid more than one application running migrations at the same time)
## This should only be used in the migrations module because this approach doesn't ensure
## that the lock is acquired/released by the same connection. The preferable "lock"
## approach is using the "performWriteQueryWithLock" proc. However, we can't use
## "performWriteQueryWithLock" in the migrations process because we can't nest two PL/SQL
## scripts.
let locked = (
await s.getStr(
fmt"""
SELECT pg_try_advisory_lock({lockId})
"""
)
).valueOr:
return err("error acquiring a lock: " & error)
if locked == "f":
return err(EXPECTED_LOCK_ERROR)
return ok()
proc releaseDatabaseLock*(
s: PostgresDriver, lockId: int = 841886
): Future[ArchiveDriverResult[void]] {.async.} =
## Release an advisory lock (useful to avoid more than one application running migrations at the same time)
let unlocked = (
await s.getStr(
fmt"""
SELECT pg_advisory_unlock({lockId})
"""
)
).valueOr:
return err("error releasing a lock: " & error)
if unlocked == "f":
return err("could not release advisory lock")
return ok()
proc performWriteQuery*(
s: PostgresDriver, query: string
): Future[ArchiveDriverResult[void]] {.async.} =
## Performs a query that somehow changes the state of the database
(await s.writeConnPool.pgQuery(query)).isOkOr:
return err("error in performWriteQuery: " & $error)
return ok()
const COULD_NOT_ACQUIRE_ADVISORY_LOCK* = "could not acquire advisory lock"
proc performWriteQueryWithLock(
self: PostgresDriver, queryToProtect: string
): Future[ArchiveDriverResult[void]] {.async.} =
## This wraps the original query in a script so that we make sure a pg_advisory lock protects it
let query =
fmt"""
DO $$
DECLARE
lock_acquired boolean;
BEGIN
-- Try to acquire the advisory lock
lock_acquired := pg_try_advisory_lock(123456789);
IF NOT lock_acquired THEN
RAISE EXCEPTION '{COULD_NOT_ACQUIRE_ADVISORY_LOCK}';
END IF;
-- Perform the query
BEGIN
{queryToProtect}
EXCEPTION WHEN OTHERS THEN
-- Ensure the lock is released if an error occurs
PERFORM pg_advisory_unlock(123456789);
RAISE;
END;
-- Release the advisory lock after the query completes successfully
PERFORM pg_advisory_unlock(123456789);
END $$;
"""
(await self.performWriteQuery(query)).isOkOr:
if error.contains(COULD_NOT_ACQUIRE_ADVISORY_LOCK):
## We don't consider this as an error. Just someone else acquired the advisory lock
debug "skip performWriteQuery because the advisory lock is acquired by other"
return ok()
if error.contains("already exists"):
## expected to happen when trying to add a partition table constraint that already exists
## e.g., constraint "constraint_name" for relation "messages_1720364735_1720364740" already exists
debug "skip already exists error", error = error
return ok()
if error.contains("is already a partition"):
## expected to happen when a node tries to add a partition that is already attached,
## e.g., "messages_1720364735_1720364740" is already a partition
debug "skip is already a partition error", error = error
return ok()
if error.contains("does not exist"):
## expected to happen when trying to drop a constraint that has already been dropped by other
## constraint "constraint_name" of relation "messages_1720364735_1720364740" does not exist
debug "skip does not exist error", error = error
return ok()
debug "protected query ended with error", error = $error
return err("protected query ended with error:" & $error)
debug "protected query ended correctly"
return ok()
proc addPartition(
self: PostgresDriver, startTime: Timestamp
): Future[ArchiveDriverResult[void]] {.async.} =
## Creates a partition table that will store the messages that fall in the range
## `startTime` <= timestamp < `startTime + duration`.
## `startTime` is measured in seconds since epoch
let beginning = startTime
let `end` = partitions_manager.calcEndPartitionTime(startTime)
let fromInSec: string = $beginning
let untilInSec: string = $`end`
let fromInNanoSec: string = fromInSec & "000000000"
let untilInNanoSec: string = untilInSec & "000000000"
let partitionName = "messages_" & fromInSec & "_" & untilInSec
## Create the partition table but not attach it yet to the main table
let createPartitionQuery =
"CREATE TABLE IF NOT EXISTS " & partitionName &
" (LIKE messages INCLUDING DEFAULTS INCLUDING CONSTRAINTS);"
(await self.performWriteQueryWithLock(createPartitionQuery)).isOkOr:
return err(fmt"error adding partition [{partitionName}]: " & $error)
## Add constraint to the partition table so that EXCLUSIVE ACCESS is not performed when
## the partition is attached to the main table.
let constraintName = partitionName & "_by_range_check"
let addTimeConstraintQuery =
"ALTER TABLE " & partitionName & " ADD CONSTRAINT " & constraintName &
" CHECK ( timestamp >= " & fromInNanoSec & " AND timestamp < " & untilInNanoSec &
" );"
(await self.performWriteQueryWithLock(addTimeConstraintQuery)).isOkOr:
return err(fmt"error creating constraint [{partitionName}]: " & $error)
## Attaching the new created table as a new partition. That does not require EXCLUSIVE ACCESS.
let attachPartitionQuery =
"ALTER TABLE messages ATTACH PARTITION " & partitionName & " FOR VALUES FROM (" &
fromInNanoSec & ") TO (" & untilInNanoSec & ");"
(await self.performWriteQueryWithLock(attachPartitionQuery)).isOkOr:
return err(fmt"error attaching partition [{partitionName}]: " & $error)
## Dropping the check constraint as it was only necessary to prevent full scan,
## and EXCLUSIVE ACCESS, to the whole messages table, when the new partition was attached.
let dropConstraint =
"ALTER TABLE " & partitionName & " DROP CONSTRAINT " & constraintName & ";"
(await self.performWriteQueryWithLock(dropConstraint)).isOkOr:
return err(fmt"error dropping constraint [{partitionName}]: " & $error)
debug "new partition added", query = createPartitionQuery
self.partitionMngr.addPartitionInfo(partitionName, beginning, `end`)
return ok()
proc refreshPartitionsInfo(
self: PostgresDriver
): Future[ArchiveDriverResult[void]] {.async.} =
debug "refreshPartitionsInfo"
self.partitionMngr.clearPartitionInfo()
let partitionNamesRes = await self.getPartitionsList()
if not partitionNamesRes.isOk():
return err("Could not retrieve partitions list: " & $partitionNamesRes.error)
else:
let partitionNames = partitionNamesRes.get()
for partitionName in partitionNames:
## partitionName contains something like 'messages_1708449815_1708449875'
let bothTimes = partitionName.replace("messages_", "")
let times = bothTimes.split("_")
if times.len != 2:
return err(fmt"loopPartitionFactory wrong partition name {partitionName}")
var beginning: int64
try:
beginning = parseInt(times[0])
except ValueError:
return err("Could not parse beginning time: " & getCurrentExceptionMsg())
var `end`: int64
try:
`end` = parseInt(times[1])
except ValueError:
return err("Could not parse end time: " & getCurrentExceptionMsg())
self.partitionMngr.addPartitionInfo(partitionName, beginning, `end`)
return ok()
const DefaultDatabasePartitionCheckTimeInterval = timer.minutes(10)
proc loopPartitionFactory(
self: PostgresDriver, onFatalError: OnFatalErrorHandler
) {.async.} =
## Loop proc that continuously checks whether we need to create a new partition.
## Notice that the deletion of partitions is handled by the retention policy modules.
debug "starting loopPartitionFactory"
while true:
trace "Check if a new partition is needed"
## Let's make the 'partition_manager' aware of the current partitions
(await self.refreshPartitionsInfo()).isOkOr:
onFatalError("issue in loopPartitionFactory: " & $error)
let now = times.now().toTime().toUnix()
if self.partitionMngr.isEmpty():
debug "adding partition because now there aren't more partitions"
(await self.addPartition(now)).isOkOr:
onFatalError("error when creating a new partition from empty state: " & $error)
else:
let newestPartitionRes = self.partitionMngr.getNewestPartition()
if newestPartitionRes.isErr():
onFatalError("could not get newest partition: " & $newestPartitionRes.error)
let newestPartition = newestPartitionRes.get()
if newestPartition.containsMoment(now):
debug "creating a new partition for the future"
## The current used partition is the last one that was created.
## Thus, let's create another partition for the future.
(await self.addPartition(newestPartition.getLastMoment())).isOkOr:
onFatalError("could not add the next partition for 'now': " & $error)
elif now >= newestPartition.getLastMoment():
debug "creating a new partition to contain current messages"
## There is no partition to contain the current time.
## This happens if the node has been stopped for quite a long time.
## Then, let's create the needed partition to contain 'now'.
(await self.addPartition(now)).isOkOr:
onFatalError("could not add the next partition: " & $error)
await sleepAsync(DefaultDatabasePartitionCheckTimeInterval)
proc startPartitionFactory*(
self: PostgresDriver, onFatalError: OnFatalErrorHandler
) {.async.} =
self.futLoopPartitionFactory = self.loopPartitionFactory(onFatalError)
proc getTableSize*(
self: PostgresDriver, tableName: string
): Future[ArchiveDriverResult[string]] {.async.} =
## Returns a human-readable representation of the size for the requested table.
## tableName - table of interest.
let tableSize = (
await self.getStr(
fmt"""
SELECT pg_size_pretty(pg_total_relation_size(C.oid)) AS "total_size"
FROM pg_class C
where relname = '{tableName}'"""
)
).valueOr:
return err("error in getDatabaseSize: " & error)
return ok(tableSize)
proc removePartition(
self: PostgresDriver, partition: Partition
): Future[ArchiveDriverResult[void]] {.async.} =
## Removes the desired partition and also removes the rows from messages_lookup table
## whose rows belong to the partition time range
let partitionName = partition.getName()
debug "beginning of removePartition", partitionName
var partSize = ""
let partSizeRes = await self.getTableSize(partitionName)
if partSizeRes.isOk():
partSize = partSizeRes.get()
## Detach and remove the partition concurrently to not block the parent table (messages)
let detachPartitionQuery =
"ALTER TABLE messages DETACH PARTITION " & partitionName & " CONCURRENTLY;"
debug "removeOldestPartition", query = detachPartitionQuery
(await self.performWriteQuery(detachPartitionQuery)).isOkOr:
if ($error).contains("FINALIZE"):
## We assume the database is suggesting to use FINALIZE when detaching a partition
let detachPartitionFinalizeQuery =
"ALTER TABLE messages DETACH PARTITION " & partitionName & " FINALIZE;"
debug "removeOldestPartition detaching with FINALIZE",
query = detachPartitionFinalizeQuery
(await self.performWriteQuery(detachPartitionFinalizeQuery)).isOkOr:
return err(fmt"error in FINALIZE {detachPartitionFinalizeQuery}: " & $error)
else:
return err(fmt"error in {detachPartitionQuery}: " & $error)
## Drop the partition
let dropPartitionQuery = "DROP TABLE " & partitionName
debug "removeOldestPartition drop partition", query = dropPartitionQuery
(await self.performWriteQuery(dropPartitionQuery)).isOkOr:
return err(fmt"error in {dropPartitionQuery}: " & $error)
debug "removed partition", partition_name = partitionName, partition_size = partSize
self.partitionMngr.removeOldestPartitionName()
## Now delete rows from the messages_lookup table
let timeRange = partition.getTimeRange()
let `end` = timeRange.`end` * 1_000_000_000
let deleteRowsQuery = "DELETE FROM messages_lookup WHERE timestamp < " & $`end`
(await self.performWriteQuery(deleteRowsQuery)).isOkOr:
return err(fmt"error in {deleteRowsQuery}: " & $error)
return ok()
proc removePartitionsOlderThan(
self: PostgresDriver, tsInNanoSec: Timestamp
): Future[ArchiveDriverResult[void]] {.async.} =
## Removes old partitions that don't contain the specified timestamp
let tsInSec = Timestamp(float(tsInNanoSec) / 1_000_000_000)
debug "beginning of removePartitionsOlderThan", tsInSec
var oldestPartition = self.partitionMngr.getOldestPartition().valueOr:
return err("could not get oldest partition in removePartitionOlderThan: " & $error)
while not oldestPartition.containsMoment(tsInSec):
(await self.removePartition(oldestPartition)).isOkOr:
return err("issue in removePartitionsOlderThan: " & $error)
oldestPartition = self.partitionMngr.getOldestPartition().valueOr:
return err(
"could not get partition in removePartitionOlderThan in while loop: " & $error
)
## We reached the partition that contains the target timestamp plus don't want to remove it
return ok()
proc removeOldestPartition(
self: PostgresDriver, forceRemoval: bool = false, ## To allow cleanup in tests
): Future[ArchiveDriverResult[void]] {.async.} =
## Indirectly called from the retention policy
let oldestPartition = self.partitionMngr.getOldestPartition().valueOr:
return err("could not remove oldest partition: " & $error)
if not forceRemoval:
let now = times.now().toTime().toUnix()
let currentPartitionRes = self.partitionMngr.getPartitionFromDateTime(now)
if currentPartitionRes.isOk():
## The database contains a partition that would store current messages.
if currentPartitionRes.get() == oldestPartition:
debug "Skipping to remove the current partition"
return ok()
return await self.removePartition(oldestPartition)
proc containsAnyPartition*(self: PostgresDriver): bool =
return not self.partitionMngr.isEmpty()
method decreaseDatabaseSize*(
driver: PostgresDriver, targetSizeInBytes: int64, forceRemoval: bool = false
): Future[ArchiveDriverResult[void]] {.async.} =
var dbSize = (await driver.getDatabaseSize()).valueOr:
return err("decreaseDatabaseSize failed to get database size: " & $error)
## database size in bytes
var totalSizeOfDB: int64 = int64(dbSize)
if totalSizeOfDB <= targetSizeInBytes:
return ok()
debug "start reducing database size",
targetSize = $targetSizeInBytes, currentSize = $totalSizeOfDB
while totalSizeOfDB > targetSizeInBytes and driver.containsAnyPartition():
(await driver.removeOldestPartition(forceRemoval)).isOkOr:
return err(
"decreaseDatabaseSize inside loop failed to remove oldest partition: " & $error
)
dbSize = (await driver.getDatabaseSize()).valueOr:
return
err("decreaseDatabaseSize inside loop failed to get database size: " & $error)
let newCurrentSize = int64(dbSize)
if newCurrentSize == totalSizeOfDB:
return err("the previous partition removal didn't clear database size")
totalSizeOfDB = newCurrentSize
debug "reducing database size",
targetSize = $targetSizeInBytes, newCurrentSize = $totalSizeOfDB
return ok()
method existsTable*(
s: PostgresDriver, tableName: string
): Future[ArchiveDriverResult[bool]] {.async.} =
let query: string =
fmt"""
SELECT EXISTS (
SELECT FROM
pg_tables
WHERE
tablename = '{tableName}'
);
"""
var exists: string
proc rowCallback(pqResult: ptr PGresult) =
if pqResult.pqnfields() != 1:
error "Wrong number of fields in existsTable"
return
if pqResult.pqNtuples() != 1:
error "Wrong number of rows in existsTable"
return
exists = $(pqgetvalue(pqResult, 0, 0))
(await s.readConnPool.pgQuery(query, newSeq[string](0), rowCallback)).isOkOr:
return err("existsTable failed in getRow: " & $error)
return ok(exists == "t")
proc getCurrentVersion*(
s: PostgresDriver
): Future[ArchiveDriverResult[int64]] {.async.} =
let existsVersionTable = (await s.existsTable("version")).valueOr:
return err("error in getCurrentVersion-existsTable: " & $error)
if not existsVersionTable:
return ok(0)
let res = (await s.getInt(fmt"SELECT version FROM version")).valueOr:
return err("error in getMessagesCount: " & $error)
return ok(res)
method deleteMessagesOlderThanTimestamp*(
s: PostgresDriver, tsNanoSec: Timestamp
): Future[ArchiveDriverResult[void]] {.async.} =
## First of all, let's remove the older partitions so that we can reduce
## the database size.
(await s.removePartitionsOlderThan(tsNanoSec)).isOkOr:
return err("error while removing older partitions: " & $error)
(
await s.writeConnPool.pgQuery(
"DELETE FROM messages WHERE timestamp < " & $tsNanoSec
)
).isOkOr:
return err("error in deleteMessagesOlderThanTimestamp: " & $error)
return ok()
############################################
## TODO: start splitting code better
const AnalyzeQuery = "ANALYZE messages"
const AnalyzeTableLockId = 111111 ## An arbitrary and different lock id
const RunAnalyzeInterval = timer.days(1)
proc analyzeTableLoop(self: PostgresDriver) {.async.} =
## The database stats should be calculated regularly so that the planner
## picks up the proper indexes and we have better query performance.
while true:
debug "analyzeTableLoop lock db"
(await self.acquireDatabaseLock(AnalyzeTableLockId)).isOkOr:
if error != EXPECTED_LOCK_ERROR:
error "failed to acquire lock in analyzeTableLoop", error = error
await sleepAsync(RunAnalyzeInterval)
continue
debug "analyzeTableLoop start analysis"
(await self.performWriteQuery(AnalyzeQuery)).isOkOr:
error "failed to run ANALYZE messages", error = error
debug "analyzeTableLoop unlock db"
(await self.releaseDatabaseLock(AnalyzeTableLockId)).isOkOr:
error "failed to release lock analyzeTableLoop", error = error
debug "analyzeTableLoop analysis completed"
await sleepAsync(RunAnalyzeInterval)
proc startAnalyzeTableLoop*(self: PostgresDriver) =
self.futLoopAnalyzeTable = self.analyzeTableLoop