chore: Better database query logs and logarithmic scale in grafana store panels (#3048)

This commit is contained in:
Ivan FB 2024-09-20 17:43:56 +02:00 committed by Gabriel mermelstein
parent bbebbf6d4f
commit e492c590e9
No known key found for this signature in database
GPG Key ID: 82B8134785FEAE0D
4 changed files with 73 additions and 85 deletions

View File

@ -3948,7 +3948,8 @@
"lineWidth": 2,
"pointSize": 10,
"scaleDistribution": {
"type": "linear"
"log": 10,
"type": "log"
},
"showPoints": "auto",
"spanNulls": false,
@ -3973,7 +3974,32 @@
},
"unit": "s"
},
"overrides": []
"overrides": [
{
"__systemRef": "hideSeriesFrom",
"matcher": {
"id": "byNames",
"options": {
"mode": "exclude",
"names": [
"query_tag_ANALYZEmessages"
],
"prefix": "All except:",
"readOnly": true
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": false,
"tooltip": false,
"viz": true
}
}
]
}
]
},
"gridPos": {
"h": 11,
@ -4048,7 +4074,8 @@
"lineWidth": 2,
"pointSize": 10,
"scaleDistribution": {
"type": "linear"
"log": 10,
"type": "log"
},
"showPoints": "auto",
"spanNulls": false,
@ -4109,6 +4136,7 @@
"editorMode": "code",
"exemplar": true,
"expr": "query_time_secs{instance=~\"[[host]].([[dc:pipe]]).([[fleet:pipe]])\", phase=\"waitFinish\"} and deriv(query_time_secs{instance=~\"[[host]].([[dc:pipe]]).([[fleet:pipe]])\", phase=\"waitFinish\"}[45s]) != 0",
"hide": false,
"interval": "",
"legendFormat": "{{query}}",
"range": true,
@ -4147,7 +4175,8 @@
"lineWidth": 2,
"pointSize": 10,
"scaleDistribution": {
"type": "linear"
"log": 10,
"type": "log"
},
"showPoints": "auto",
"spanNulls": false,
@ -4246,7 +4275,8 @@
"lineWidth": 2,
"pointSize": 10,
"scaleDistribution": {
"type": "linear"
"log": 10,
"type": "log"
},
"showPoints": "auto",
"spanNulls": false,
@ -5456,8 +5486,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@ -5551,8 +5580,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@ -5643,8 +5671,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@ -5897,8 +5924,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@ -6022,8 +6048,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@ -7346,7 +7371,7 @@
"type": "row"
}
],
"refresh": "30s",
"refresh": false,
"schemaVersion": 37,
"style": "dark",
"tags": [],
@ -7428,10 +7453,10 @@
"current": {
"selected": true,
"text": [
"All"
"ac-cn-hongkong-c"
],
"value": [
"$__all"
"ac-cn-hongkong-c"
]
},
"datasource": {
@ -7461,8 +7486,8 @@
]
},
"time": {
"from": "now-24h",
"to": "now"
"from": "2024-09-20T12:34:03.849Z",
"to": "2024-09-20T13:52:30.721Z"
},
"timepicker": {
"refresh_intervals": [
@ -7480,6 +7505,6 @@
"timezone": "browser",
"title": "Nim-Waku V2",
"uid": "qrp_ZCTGz",
"version": 150,
"version": 151,
"weekStart": ""
}

View File

@ -1,4 +1,10 @@
import std/[times, strutils, asyncnet, os, sequtils], results, chronos, metrics, re
import
std/[times, strutils, asyncnet, os, sequtils],
results,
chronos,
metrics,
re,
chronicles
import ./query_metrics
include db_connector/db_postgres
@ -167,21 +173,26 @@ proc dbConnQuery*(
(await db.sendQuery(query, args)).isOkOr:
return err("error in dbConnQuery calling sendQuery: " & $error)
query_time_secs.set(
getTime().toUnixFloat() - queryStartTime, [querySummary, "sendQuery"]
)
let sendDuration = getTime().toUnixFloat() - queryStartTime
query_time_secs.set(sendDuration, [querySummary, "sendQuery"])
queryStartTime = getTime().toUnixFloat()
(await db.waitQueryToFinish(rowCallback)).isOkOr:
return err("error in dbConnQuery calling waitQueryToFinish: " & $error)
query_time_secs.set(
getTime().toUnixFloat() - queryStartTime, [querySummary, "waitFinish"]
)
let waitDuration = getTime().toUnixFloat() - queryStartTime
query_time_secs.set(waitDuration, [querySummary, "waitFinish"])
query_count.inc(labelValues = [querySummary])
if "insert" notin ($query).toLower():
debug "dbConnQuery",
query = $query,
querySummary,
waitDurationSecs = waitDuration,
sendDurationSecs = sendDuration
return ok()
proc dbConnQueryPrepared*(
@ -196,17 +207,21 @@ proc dbConnQueryPrepared*(
db.sendQueryPrepared(stmtName, paramValues, paramLengths, paramFormats).isOkOr:
return err("error in dbConnQueryPrepared calling sendQuery: " & $error)
query_time_secs.set(getTime().toUnixFloat() - queryStartTime, [stmtName, "sendQuery"])
let sendDuration = getTime().toUnixFloat() - queryStartTime
query_time_secs.set(sendDuration, [stmtName, "sendQuery"])
queryStartTime = getTime().toUnixFloat()
(await db.waitQueryToFinish(rowCallback)).isOkOr:
return err("error in dbConnQueryPrepared calling waitQueryToFinish: " & $error)
query_time_secs.set(
getTime().toUnixFloat() - queryStartTime, [stmtName, "waitFinish"]
)
let waitDuration = getTime().toUnixFloat() - queryStartTime
query_time_secs.set(waitDuration, [stmtName, "waitFinish"])
query_count.inc(labelValues = [stmtName])
if "insert" notin stmtName.toLower():
debug "dbConnQueryPrepared",
stmtName, waitDurationSecs = waitDuration, sendDurationSecs = sendDuration
return ok()

View File

@ -346,8 +346,6 @@ method getAllMessages*(
s: PostgresDriver
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## Retrieve all messages from the store.
debug "beginning of getAllMessages"
var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)]
proc rowCallback(pqResult: ptr PGresult) =
rowCallbackImpl(pqResult, rows)
@ -370,8 +368,6 @@ proc getPartitionsList(
): Future[ArchiveDriverResult[seq[string]]] {.async.} =
## Retrieves the seq of partition table names.
## e.g: @["messages_1708534333_1708534393", "messages_1708534273_1708534333"]
debug "beginning getPartitionsList"
var partitions: seq[string]
proc rowCallback(pqResult: ptr PGresult) =
for iRow in 0 ..< pqResult.pqNtuples():
@ -431,7 +427,6 @@ proc getMessagesArbitraryQuery(
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## This proc allows to handle atypical queries. We don't use prepared statements for those.
debug "beginning getMessagesArbitraryQuery"
var query = SelectClause
var statements: seq[string]
var args: seq[string]
@ -512,8 +507,6 @@ proc getMessageHashesArbitraryQuery(
async
.} =
## This proc allows to handle atypical queries. We don't use prepared statements for those.
debug "beginning of getMessageHashesArbitraryQuery"
var query = """SELECT messageHash FROM messages"""
var statements: seq[string]
@ -597,7 +590,6 @@ proc getMessagesPreparedStmt(
var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)]
debug "beginning of getMessagesPreparedStmt"
proc rowCallback(pqResult: ptr PGresult) =
rowCallbackImpl(pqResult, rows)
@ -689,7 +681,6 @@ proc getMessageHashesPreparedStmt(
var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)]
debug "beginning of getMessageHashesPreparedStmt"
proc rowCallback(pqResult: ptr PGresult) =
hashCallbackImpl(pqResult, rows)
@ -775,7 +766,6 @@ proc getMessagesByMessageHashes(
## Retrieves information only filtering by a given messageHashes list.
## This proc levarages on the messages_lookup table to have better query performance
## and only query the desired partitions in the partitioned messages table
debug "beginning of getMessagesByMessageHashes"
var query =
fmt"""
WITH min_timestamp AS (
@ -814,7 +804,6 @@ proc getMessagesByMessageHashes(
).isOkOr:
return err("failed to run query: " & $error)
debug "end of getMessagesByMessageHashes"
return ok(rows)
proc getMessagesWithinLimits(
@ -894,8 +883,6 @@ method getMessages*(
ascendingOrder = true,
requestId = "",
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
debug "beginning of getMessages"
let rows = collect(newSeq):
for i in countup(0, hashes.len, MaxHashesPerQuery):
let stop = min(i + MaxHashesPerQuery, hashes.len)
@ -911,8 +898,6 @@ method getMessages*(
for row in subRows:
row
debug "end of getMessages"
return ok(rows)
proc getStr(
@ -920,8 +905,6 @@ proc getStr(
): Future[ArchiveDriverResult[string]] {.async.} =
# Performs a query that is expected to return a single string
debug "beginning of getStr"
var ret: string
proc rowCallback(pqResult: ptr PGresult) =
if pqResult.pqnfields() != 1:
@ -944,7 +927,6 @@ proc getInt(
): Future[ArchiveDriverResult[int64]] {.async.} =
# Performs a query that is expected to return a single numeric value (int64)
debug "beginning of getInt"
var retInt = 0'i64
let str = (await s.getStr(query)).valueOr:
return err("could not get str in getInt: " & $error)
@ -962,8 +944,6 @@ proc getInt(
method getDatabaseSize*(
s: PostgresDriver
): Future[ArchiveDriverResult[int64]] {.async.} =
debug "beginning of getDatabaseSize"
let intRes = (await s.getInt("SELECT pg_database_size(current_database())")).valueOr:
return err("error in getDatabaseSize: " & error)
@ -973,8 +953,6 @@ method getDatabaseSize*(
method getMessagesCount*(
s: PostgresDriver
): Future[ArchiveDriverResult[int64]] {.async.} =
debug "beginning of getMessagesCount"
let intRes = await s.getInt("SELECT COUNT(1) FROM messages")
if intRes.isErr():
return err("error in getMessagesCount: " & intRes.error)
@ -987,8 +965,6 @@ method getOldestMessageTimestamp*(
## In some cases it could happen that we have
## empty partitions which are older than the current stored rows.
## In those cases we want to consider those older partitions as the oldest considered timestamp.
debug "beginning of getOldestMessageTimestamp"
let oldestPartition = s.partitionMngr.getOldestPartition().valueOr:
return err("could not get oldest partition: " & $error)
@ -1004,7 +980,6 @@ method getOldestMessageTimestamp*(
method getNewestMessageTimestamp*(
s: PostgresDriver
): Future[ArchiveDriverResult[Timestamp]] {.async.} =
debug "beginning of getNewestMessageTimestamp"
let intRes = await s.getInt("SELECT MAX(timestamp) FROM messages")
if intRes.isErr():
@ -1015,8 +990,6 @@ method getNewestMessageTimestamp*(
method deleteOldestMessagesNotWithinLimit*(
s: PostgresDriver, limit: int
): Future[ArchiveDriverResult[void]] {.async.} =
debug "beginning of deleteOldestMessagesNotWithinLimit"
var execRes = await s.writeConnPool.pgQuery(
"""DELETE FROM messages WHERE messageHash NOT IN
(
@ -1039,12 +1012,9 @@ method deleteOldestMessagesNotWithinLimit*(
"error in deleteOldestMessagesNotWithinLimit messages_lookup: " & execRes.error
)
debug "end of deleteOldestMessagesNotWithinLimit"
return ok()
method close*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} =
debug "beginning of postgres close"
## Cancel the partition factory loop
s.futLoopPartitionFactory.cancelSoon()
@ -1096,8 +1066,6 @@ proc acquireDatabaseLock*(
## "performWriteQueryWithLock" in the migrations process because we can't nest two PL/SQL
## scripts.
debug "beginning of acquireDatabaseLock", lockId
let locked = (
await s.getStr(
fmt"""
@ -1116,7 +1084,6 @@ proc releaseDatabaseLock*(
s: PostgresDriver, lockId: int = 841886
): Future[ArchiveDriverResult[void]] {.async.} =
## Release an advisory lock (useful to avoid more than one application running migrations at the same time)
debug "beginning of releaseDatabaseLock", lockId
let unlocked = (
await s.getStr(
fmt"""
@ -1143,11 +1110,10 @@ proc performWriteQuery*(
const COULD_NOT_ACQUIRE_ADVISORY_LOCK* = "could not acquire advisory lock"
proc performWriteQueryWithLock*(
proc performWriteQueryWithLock(
self: PostgresDriver, queryToProtect: string
): Future[ArchiveDriverResult[void]] {.async.} =
## This wraps the original query in a script so that we make sure a pg_advisory lock protects it
debug "performWriteQueryWithLock", queryToProtect
let query =
fmt"""
DO $$
@ -1210,8 +1176,6 @@ proc addPartition(
## Creates a partition table that will store the messages that fall in the range
## `startTime` <= timestamp < `startTime + duration`.
## `startTime` is measured in seconds since epoch
debug "beginning of addPartition"
let beginning = startTime
let `end` = partitions_manager.calcEndPartitionTime(startTime)
@ -1353,8 +1317,6 @@ proc getTableSize*(
): Future[ArchiveDriverResult[string]] {.async.} =
## Returns a human-readable representation of the size for the requested table.
## tableName - table of interest.
debug "beginning of getTableSize"
let tableSize = (
await self.getStr(
fmt"""
@ -1442,8 +1404,6 @@ proc removeOldestPartition(
self: PostgresDriver, forceRemoval: bool = false, ## To allow cleanup in tests
): Future[ArchiveDriverResult[void]] {.async.} =
## Indirectly called from the retention policy
debug "beginning of removeOldestPartition"
let oldestPartition = self.partitionMngr.getOldestPartition().valueOr:
return err("could not remove oldest partition: " & $error)
@ -1465,8 +1425,6 @@ proc containsAnyPartition*(self: PostgresDriver): bool =
method decreaseDatabaseSize*(
driver: PostgresDriver, targetSizeInBytes: int64, forceRemoval: bool = false
): Future[ArchiveDriverResult[void]] {.async.} =
debug "beginning of decreaseDatabaseSize"
var dbSize = (await driver.getDatabaseSize()).valueOr:
return err("decreaseDatabaseSize failed to get database size: " & $error)
@ -1533,8 +1491,6 @@ method existsTable*(
proc getCurrentVersion*(
s: PostgresDriver
): Future[ArchiveDriverResult[int64]] {.async.} =
debug "beginning of getCurrentVersion"
let existsVersionTable = (await s.existsTable("version")).valueOr:
return err("error in getCurrentVersion-existsTable: " & $error)
@ -1551,8 +1507,6 @@ method deleteMessagesOlderThanTimestamp*(
): Future[ArchiveDriverResult[void]] {.async.} =
## First of all, let's remove the older partitions so that we can reduce
## the database size.
debug "beginning of deleteMessagesOlderThanTimestamp"
(await s.removePartitionsOlderThan(tsNanoSec)).isOkOr:
return err("error while removing older partitions: " & $error)

View File

@ -259,8 +259,6 @@ method getAllMessages*(
s: PostgresDriver
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## Retrieve all messages from the store.
debug "beginning of getAllMessages"
var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)]
proc rowCallback(pqResult: ptr PGresult) =
rowCallbackImpl(pqResult, rows)
@ -634,7 +632,6 @@ proc getMessagesByMessageHashes(
## Retrieves information only filtering by a given messageHashes list.
## This proc levarages on the messages_lookup table to have better query performance
## and only query the desired partitions in the partitioned messages table
debug "beginning of getMessagesByMessageHashes"
var query =
fmt"""
WITH min_timestamp AS (
@ -673,7 +670,6 @@ proc getMessagesByMessageHashes(
).isOkOr:
return err("failed to run query: " & $error)
debug "end of getMessagesByMessageHashes"
return ok(rows)
method getMessages*(
@ -689,8 +685,6 @@ method getMessages*(
ascendingOrder = true,
requestId = "",
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
debug "beginning of getMessages"
let hexHashes = hashes.mapIt(toHex(it))
if cursor.isNone() and pubsubTopic.isNone() and contentTopicSeq.len == 0 and