Ivan FB 282c2e8107
chore: Minor Postgres optimizations (#2166)
* postgres_healthcheck: validate once per minute instead of 30 sec
* postgres_driver.nim: change MaxNumCons from 5 to 50
* postgres_driver.nim: split connPool into writeConPool and readConPool
  This aims to avoid clashes in insert and select queries
  because the inserts and selects can happen concurrently
  in relay and store events, respectively.
2023-10-30 15:16:49 +01:00

347 lines
11 KiB
Nim

when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[strformat,nre,options,strutils],
stew/[results,byteutils],
db_postgres,
chronos
import
../../../waku_core,
../../common,
../../driver,
../../../common/databases/db_postgres as waku_postgres,
./postgres_healthcheck
export postgres_driver
type PostgresDriver* = ref object of ArchiveDriver
## Establish a separate pools for read/write operations
writeConnPool: PgAsyncPool
readConnPool: PgAsyncPool
proc dropTableQuery(): string =
"DROP TABLE messages"
proc createTableQuery(): string =
"CREATE TABLE IF NOT EXISTS messages (" &
" pubsubTopic VARCHAR NOT NULL," &
" contentTopic VARCHAR NOT NULL," &
" payload VARCHAR," &
" version INTEGER NOT NULL," &
" timestamp BIGINT NOT NULL," &
" id VARCHAR NOT NULL," &
" storedAt BIGINT NOT NULL," &
" CONSTRAINT messageIndex PRIMARY KEY (storedAt, id, pubsubTopic)" &
");"
proc insertRow(): string =
# TODO: get the sql queries from a file
"""INSERT INTO messages (id, storedAt, contentTopic, payload, pubsubTopic,
version, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7);"""
const MaxNumConns = 50 #TODO: we may need to set that from app args (maybe?)
proc new*(T: type PostgresDriver,
dbUrl: string,
maxConnections: int = MaxNumConns,
onErrAction: OnErrHandler = nil):
ArchiveDriverResult[T] =
let readConnPool = PgAsyncPool.new(dbUrl, maxConnections).valueOr:
return err("error creating read conn pool PgAsyncPool")
let writeConnPool = PgAsyncPool.new(dbUrl, maxConnections).valueOr:
return err("error creating write conn pool PgAsyncPool")
if not isNil(onErrAction):
asyncSpawn checkConnectivity(readConnPool, onErrAction)
if not isNil(onErrAction):
asyncSpawn checkConnectivity(writeConnPool, onErrAction)
return ok(PostgresDriver(writeConnPool: writeConnPool,
readConnPool: readConnPool))
proc createMessageTable*(s: PostgresDriver):
Future[ArchiveDriverResult[void]] {.async.} =
let execRes = await s.writeConnPool.exec(createTableQuery())
if execRes.isErr():
return err("error in createMessageTable: " & execRes.error)
return ok()
proc deleteMessageTable*(s: PostgresDriver):
Future[ArchiveDriverResult[void]] {.async.} =
let execRes = await s.writeConnPool.exec(dropTableQuery())
if execRes.isErr():
return err("error in deleteMessageTable: " & execRes.error)
return ok()
proc init*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} =
let createMsgRes = await s.createMessageTable()
if createMsgRes.isErr():
return err("createMsgRes.isErr in init: " & createMsgRes.error)
return ok()
proc reset*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} =
let ret = await s.deleteMessageTable()
return ret
method put*(s: PostgresDriver,
pubsubTopic: PubsubTopic,
message: WakuMessage,
digest: MessageDigest,
receivedTime: Timestamp):
Future[ArchiveDriverResult[void]] {.async.} =
let ret = await s.writeConnPool.runStmt(insertRow(),
@[toHex(digest.data),
$receivedTime,
message.contentTopic,
toHex(message.payload),
pubsubTopic,
$message.version,
$message.timestamp])
return ret
proc toArchiveRow(r: Row): ArchiveDriverResult[ArchiveRow] =
# Converts a postgres row into an ArchiveRow
var wakuMessage: WakuMessage
var timestamp: Timestamp
var version: uint
var pubSubTopic: string
var contentTopic: string
var storedAt: int64
var digest: string
var payload: string
try:
storedAt = parseInt(r[0])
contentTopic = r[1]
payload = parseHexStr(r[2])
pubSubTopic = r[3]
version = parseUInt(r[4])
timestamp = parseInt(r[5])
digest = parseHexStr(r[6])
except ValueError:
return err("could not parse timestamp")
wakuMessage.timestamp = timestamp
wakuMessage.version = uint32(version)
wakuMessage.contentTopic = contentTopic
wakuMessage.payload = @(payload.toOpenArrayByte(0, payload.high))
return ok((pubSubTopic,
wakuMessage,
@(digest.toOpenArrayByte(0, digest.high)),
storedAt))
method getAllMessages*(s: PostgresDriver):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## Retrieve all messages from the store.
let rowsRes = await s.readConnPool.query("""SELECT storedAt, contentTopic,
payload, pubsubTopic, version, timestamp,
id FROM messages ORDER BY storedAt ASC""",
newSeq[string](0))
if rowsRes.isErr():
return err("failed in query: " & rowsRes.error)
var results: seq[ArchiveRow]
for r in rowsRes.value:
let rowRes = r.toArchiveRow()
if rowRes.isErr():
return err("failed to extract row")
results.add(rowRes.get())
return ok(results)
method getMessages*(s: PostgresDriver,
contentTopic: seq[ContentTopic] = @[],
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = DefaultPageSize,
ascendingOrder = true):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
var query = """SELECT storedAt, contentTopic, payload,
pubsubTopic, version, timestamp, id FROM messages"""
var statements: seq[string]
var args: seq[string]
if contentTopic.len > 0:
let cstmt = "contentTopic IN (" & "?".repeat(contentTopic.len).join(",") & ")"
statements.add(cstmt)
for t in contentTopic:
args.add(t)
if pubsubTopic.isSome():
statements.add("pubsubTopic = ?")
args.add(pubsubTopic.get())
if cursor.isSome():
let comp = if ascendingOrder: ">" else: "<"
statements.add("(storedAt, id) " & comp & " (?,?)")
args.add($cursor.get().storeTime)
args.add(toHex(cursor.get().digest.data))
if startTime.isSome():
statements.add("storedAt >= ?")
args.add($startTime.get())
if endTime.isSome():
statements.add("storedAt <= ?")
args.add($endTime.get())
if statements.len > 0:
query &= " WHERE " & statements.join(" AND ")
var direction: string
if ascendingOrder:
direction = "ASC"
else:
direction = "DESC"
query &= " ORDER BY storedAt " & direction & ", id " & direction
query &= " LIMIT ?"
args.add($maxPageSize)
let rowsRes = await s.readConnPool.query(query, args)
if rowsRes.isErr():
return err("failed to run query: " & rowsRes.error)
var results: seq[ArchiveRow]
for r in rowsRes.value:
let rowRes = r.toArchiveRow()
if rowRes.isErr():
return err("failed to extract row: " & rowRes.error)
results.add(rowRes.get())
return ok(results)
proc getInt(s: PostgresDriver,
query: string):
Future[ArchiveDriverResult[int64]] {.async.} =
# Performs a query that is expected to return a single numeric value (int64)
let rowsRes = await s.readConnPool.query(query)
if rowsRes.isErr():
return err("failed in getRow: " & rowsRes.error)
let rows = rowsRes.get()
if rows.len != 1:
return err("failed in getRow. Expected one row but got " & $rows.len)
let fields = rows[0]
if fields.len != 1:
return err("failed in getRow: Expected one field but got " & $fields.len)
var retInt = 0'i64
try:
if fields[0] != "":
retInt = parseInt(fields[0])
except ValueError:
return err("exception in getRow, parseInt: " & getCurrentExceptionMsg())
return ok(retInt)
method getMessagesCount*(s: PostgresDriver):
Future[ArchiveDriverResult[int64]] {.async.} =
let intRes = await s.getInt("SELECT COUNT(1) FROM messages")
if intRes.isErr():
return err("error in getMessagesCount: " & intRes.error)
return ok(intRes.get())
method getOldestMessageTimestamp*(s: PostgresDriver):
Future[ArchiveDriverResult[Timestamp]] {.async.} =
let intRes = await s.getInt("SELECT MIN(storedAt) FROM messages")
if intRes.isErr():
return err("error in getOldestMessageTimestamp: " & intRes.error)
return ok(Timestamp(intRes.get()))
method getNewestMessageTimestamp*(s: PostgresDriver):
Future[ArchiveDriverResult[Timestamp]] {.async.} =
let intRes = await s.getInt("SELECT MAX(storedAt) FROM messages")
if intRes.isErr():
return err("error in getNewestMessageTimestamp: " & intRes.error)
return ok(Timestamp(intRes.get()))
method deleteMessagesOlderThanTimestamp*(
s: PostgresDriver,
ts: Timestamp):
Future[ArchiveDriverResult[void]] {.async.} =
let execRes = await s.writeConnPool.exec(
"DELETE FROM messages WHERE storedAt < " & $ts)
if execRes.isErr():
return err("error in deleteMessagesOlderThanTimestamp: " & execRes.error)
return ok()
method deleteOldestMessagesNotWithinLimit*(
s: PostgresDriver,
limit: int):
Future[ArchiveDriverResult[void]] {.async.} =
let execRes = await s.writeConnPool.exec(
"""DELETE FROM messages WHERE id NOT IN
(
SELECT id FROM messages ORDER BY storedAt DESC LIMIT ?
);""",
@[$limit])
if execRes.isErr():
return err("error in deleteOldestMessagesNotWithinLimit: " & execRes.error)
return ok()
method close*(s: PostgresDriver):
Future[ArchiveDriverResult[void]] {.async.} =
## Close the database connection
let writeCloseRes = await s.writeConnPool.close()
let readCloseRes = await s.readConnPool.close()
writeCloseRes.isOkOr:
return err("error closing write pool: " & $error)
readCloseRes.isOkOr:
return err("error closing read pool: " & $error)
return ok()
proc sleep*(s: PostgresDriver, seconds: int):
Future[ArchiveDriverResult[void]] {.async.} =
# This is for testing purposes only. It is aimed to test the proper
# implementation of asynchronous requests. It merely triggers a sleep in the
# database for the amount of seconds given as a parameter.
try:
let params = @[$seconds]
let sleepRes = await s.writeConnPool.query("SELECT pg_sleep(?)", params)
if sleepRes.isErr():
return err("error in postgres_driver sleep: " & sleepRes.error)
except DbError:
# This always raises an exception although the sleep works
return err("exception sleeping: " & getCurrentExceptionMsg())
return ok()