From e492c590e9ec06b08ca4ea75a7daf1fa6bbce01c Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Fri, 20 Sep 2024 17:43:56 +0200 Subject: [PATCH] chore: Better database query logs and logarithmic scale in grafana store panels (#3048) --- metrics/waku-fleet-dashboard.json | 67 +++++++++++++------ waku/common/databases/db_postgres/dbconn.nim | 37 +++++++--- .../postgres_driver/postgres_driver.nim | 48 +------------ .../postgres_driver/postgres_driver.nim | 6 -- 4 files changed, 73 insertions(+), 85 deletions(-) diff --git a/metrics/waku-fleet-dashboard.json b/metrics/waku-fleet-dashboard.json index 23efbd139..8c7cdd174 100644 --- a/metrics/waku-fleet-dashboard.json +++ b/metrics/waku-fleet-dashboard.json @@ -3948,7 +3948,8 @@ "lineWidth": 2, "pointSize": 10, "scaleDistribution": { - "type": "linear" + "log": 10, + "type": "log" }, "showPoints": "auto", "spanNulls": false, @@ -3973,7 +3974,32 @@ }, "unit": "s" }, - "overrides": [] + "overrides": [ + { + "__systemRef": "hideSeriesFrom", + "matcher": { + "id": "byNames", + "options": { + "mode": "exclude", + "names": [ + "query_tag_ANALYZEmessages" + ], + "prefix": "All except:", + "readOnly": true + } + }, + "properties": [ + { + "id": "custom.hideFrom", + "value": { + "legend": false, + "tooltip": false, + "viz": true + } + } + ] + } + ] }, "gridPos": { "h": 11, @@ -4048,7 +4074,8 @@ "lineWidth": 2, "pointSize": 10, "scaleDistribution": { - "type": "linear" + "log": 10, + "type": "log" }, "showPoints": "auto", "spanNulls": false, @@ -4109,6 +4136,7 @@ "editorMode": "code", "exemplar": true, "expr": "query_time_secs{instance=~\"[[host]].([[dc:pipe]]).([[fleet:pipe]])\", phase=\"waitFinish\"} and deriv(query_time_secs{instance=~\"[[host]].([[dc:pipe]]).([[fleet:pipe]])\", phase=\"waitFinish\"}[45s]) != 0", + "hide": false, "interval": "", "legendFormat": "{{query}}", "range": true, @@ -4147,7 +4175,8 @@ "lineWidth": 2, "pointSize": 10, "scaleDistribution": { - "type": "linear" + "log": 10, + "type": "log" }, "showPoints": "auto", "spanNulls": false, @@ -4246,7 +4275,8 @@ "lineWidth": 2, "pointSize": 10, "scaleDistribution": { - "type": "linear" + "log": 10, + "type": "log" }, "showPoints": "auto", "spanNulls": false, @@ -5456,8 +5486,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -5551,8 +5580,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -5643,8 +5671,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -5897,8 +5924,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -6022,8 +6048,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -7346,7 +7371,7 @@ "type": "row" } ], - "refresh": "30s", + "refresh": false, "schemaVersion": 37, "style": "dark", "tags": [], @@ -7428,10 +7453,10 @@ "current": { "selected": true, "text": [ - "All" + "ac-cn-hongkong-c" ], "value": [ - "$__all" + "ac-cn-hongkong-c" ] }, "datasource": { @@ -7461,8 +7486,8 @@ ] }, "time": { - "from": "now-24h", - "to": "now" + "from": "2024-09-20T12:34:03.849Z", + "to": "2024-09-20T13:52:30.721Z" }, "timepicker": { "refresh_intervals": [ @@ -7480,6 +7505,6 @@ "timezone": "browser", "title": "Nim-Waku V2", "uid": "qrp_ZCTGz", - "version": 150, + "version": 151, "weekStart": "" } \ No newline at end of file diff --git a/waku/common/databases/db_postgres/dbconn.nim b/waku/common/databases/db_postgres/dbconn.nim index 54fbc27a1..bc5da4ee6 100644 --- a/waku/common/databases/db_postgres/dbconn.nim +++ b/waku/common/databases/db_postgres/dbconn.nim @@ -1,4 +1,10 @@ -import std/[times, strutils, asyncnet, os, sequtils], results, chronos, metrics, re +import + std/[times, strutils, asyncnet, os, sequtils], + results, + chronos, + metrics, + re, + chronicles import ./query_metrics include db_connector/db_postgres @@ -167,21 +173,26 @@ proc dbConnQuery*( (await db.sendQuery(query, args)).isOkOr: return err("error in dbConnQuery calling sendQuery: " & $error) - query_time_secs.set( - getTime().toUnixFloat() - queryStartTime, [querySummary, "sendQuery"] - ) + let sendDuration = getTime().toUnixFloat() - queryStartTime + query_time_secs.set(sendDuration, [querySummary, "sendQuery"]) queryStartTime = getTime().toUnixFloat() (await db.waitQueryToFinish(rowCallback)).isOkOr: return err("error in dbConnQuery calling waitQueryToFinish: " & $error) - query_time_secs.set( - getTime().toUnixFloat() - queryStartTime, [querySummary, "waitFinish"] - ) + let waitDuration = getTime().toUnixFloat() - queryStartTime + query_time_secs.set(waitDuration, [querySummary, "waitFinish"]) query_count.inc(labelValues = [querySummary]) + if "insert" notin ($query).toLower(): + debug "dbConnQuery", + query = $query, + querySummary, + waitDurationSecs = waitDuration, + sendDurationSecs = sendDuration + return ok() proc dbConnQueryPrepared*( @@ -196,17 +207,21 @@ proc dbConnQueryPrepared*( db.sendQueryPrepared(stmtName, paramValues, paramLengths, paramFormats).isOkOr: return err("error in dbConnQueryPrepared calling sendQuery: " & $error) - query_time_secs.set(getTime().toUnixFloat() - queryStartTime, [stmtName, "sendQuery"]) + let sendDuration = getTime().toUnixFloat() - queryStartTime + query_time_secs.set(sendDuration, [stmtName, "sendQuery"]) queryStartTime = getTime().toUnixFloat() (await db.waitQueryToFinish(rowCallback)).isOkOr: return err("error in dbConnQueryPrepared calling waitQueryToFinish: " & $error) - query_time_secs.set( - getTime().toUnixFloat() - queryStartTime, [stmtName, "waitFinish"] - ) + let waitDuration = getTime().toUnixFloat() - queryStartTime + query_time_secs.set(waitDuration, [stmtName, "waitFinish"]) query_count.inc(labelValues = [stmtName]) + if "insert" notin stmtName.toLower(): + debug "dbConnQueryPrepared", + stmtName, waitDurationSecs = waitDuration, sendDurationSecs = sendDuration + return ok() diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index 6c8ba28d6..a69fdb9f4 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -346,8 +346,6 @@ method getAllMessages*( s: PostgresDriver ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = ## Retrieve all messages from the store. - debug "beginning of getAllMessages" - var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)] proc rowCallback(pqResult: ptr PGresult) = rowCallbackImpl(pqResult, rows) @@ -370,8 +368,6 @@ proc getPartitionsList( ): Future[ArchiveDriverResult[seq[string]]] {.async.} = ## Retrieves the seq of partition table names. ## e.g: @["messages_1708534333_1708534393", "messages_1708534273_1708534333"] - - debug "beginning getPartitionsList" var partitions: seq[string] proc rowCallback(pqResult: ptr PGresult) = for iRow in 0 ..< pqResult.pqNtuples(): @@ -431,7 +427,6 @@ proc getMessagesArbitraryQuery( ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = ## This proc allows to handle atypical queries. We don't use prepared statements for those. - debug "beginning getMessagesArbitraryQuery" var query = SelectClause var statements: seq[string] var args: seq[string] @@ -512,8 +507,6 @@ proc getMessageHashesArbitraryQuery( async .} = ## This proc allows to handle atypical queries. We don't use prepared statements for those. - - debug "beginning of getMessageHashesArbitraryQuery" var query = """SELECT messageHash FROM messages""" var statements: seq[string] @@ -597,7 +590,6 @@ proc getMessagesPreparedStmt( var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)] - debug "beginning of getMessagesPreparedStmt" proc rowCallback(pqResult: ptr PGresult) = rowCallbackImpl(pqResult, rows) @@ -689,7 +681,6 @@ proc getMessageHashesPreparedStmt( var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)] - debug "beginning of getMessageHashesPreparedStmt" proc rowCallback(pqResult: ptr PGresult) = hashCallbackImpl(pqResult, rows) @@ -775,7 +766,6 @@ proc getMessagesByMessageHashes( ## Retrieves information only filtering by a given messageHashes list. ## This proc levarages on the messages_lookup table to have better query performance ## and only query the desired partitions in the partitioned messages table - debug "beginning of getMessagesByMessageHashes" var query = fmt""" WITH min_timestamp AS ( @@ -814,7 +804,6 @@ proc getMessagesByMessageHashes( ).isOkOr: return err("failed to run query: " & $error) - debug "end of getMessagesByMessageHashes" return ok(rows) proc getMessagesWithinLimits( @@ -894,8 +883,6 @@ method getMessages*( ascendingOrder = true, requestId = "", ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = - debug "beginning of getMessages" - let rows = collect(newSeq): for i in countup(0, hashes.len, MaxHashesPerQuery): let stop = min(i + MaxHashesPerQuery, hashes.len) @@ -911,8 +898,6 @@ method getMessages*( for row in subRows: row - debug "end of getMessages" - return ok(rows) proc getStr( @@ -920,8 +905,6 @@ proc getStr( ): Future[ArchiveDriverResult[string]] {.async.} = # Performs a query that is expected to return a single string - debug "beginning of getStr" - var ret: string proc rowCallback(pqResult: ptr PGresult) = if pqResult.pqnfields() != 1: @@ -944,7 +927,6 @@ proc getInt( ): Future[ArchiveDriverResult[int64]] {.async.} = # Performs a query that is expected to return a single numeric value (int64) - debug "beginning of getInt" var retInt = 0'i64 let str = (await s.getStr(query)).valueOr: return err("could not get str in getInt: " & $error) @@ -962,8 +944,6 @@ proc getInt( method getDatabaseSize*( s: PostgresDriver ): Future[ArchiveDriverResult[int64]] {.async.} = - debug "beginning of getDatabaseSize" - let intRes = (await s.getInt("SELECT pg_database_size(current_database())")).valueOr: return err("error in getDatabaseSize: " & error) @@ -973,8 +953,6 @@ method getDatabaseSize*( method getMessagesCount*( s: PostgresDriver ): Future[ArchiveDriverResult[int64]] {.async.} = - debug "beginning of getMessagesCount" - let intRes = await s.getInt("SELECT COUNT(1) FROM messages") if intRes.isErr(): return err("error in getMessagesCount: " & intRes.error) @@ -987,8 +965,6 @@ method getOldestMessageTimestamp*( ## In some cases it could happen that we have ## empty partitions which are older than the current stored rows. ## In those cases we want to consider those older partitions as the oldest considered timestamp. - debug "beginning of getOldestMessageTimestamp" - let oldestPartition = s.partitionMngr.getOldestPartition().valueOr: return err("could not get oldest partition: " & $error) @@ -1004,7 +980,6 @@ method getOldestMessageTimestamp*( method getNewestMessageTimestamp*( s: PostgresDriver ): Future[ArchiveDriverResult[Timestamp]] {.async.} = - debug "beginning of getNewestMessageTimestamp" let intRes = await s.getInt("SELECT MAX(timestamp) FROM messages") if intRes.isErr(): @@ -1015,8 +990,6 @@ method getNewestMessageTimestamp*( method deleteOldestMessagesNotWithinLimit*( s: PostgresDriver, limit: int ): Future[ArchiveDriverResult[void]] {.async.} = - debug "beginning of deleteOldestMessagesNotWithinLimit" - var execRes = await s.writeConnPool.pgQuery( """DELETE FROM messages WHERE messageHash NOT IN ( @@ -1039,12 +1012,9 @@ method deleteOldestMessagesNotWithinLimit*( "error in deleteOldestMessagesNotWithinLimit messages_lookup: " & execRes.error ) - debug "end of deleteOldestMessagesNotWithinLimit" return ok() method close*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} = - debug "beginning of postgres close" - ## Cancel the partition factory loop s.futLoopPartitionFactory.cancelSoon() @@ -1096,8 +1066,6 @@ proc acquireDatabaseLock*( ## "performWriteQueryWithLock" in the migrations process because we can't nest two PL/SQL ## scripts. - debug "beginning of acquireDatabaseLock", lockId - let locked = ( await s.getStr( fmt""" @@ -1116,7 +1084,6 @@ proc releaseDatabaseLock*( s: PostgresDriver, lockId: int = 841886 ): Future[ArchiveDriverResult[void]] {.async.} = ## Release an advisory lock (useful to avoid more than one application running migrations at the same time) - debug "beginning of releaseDatabaseLock", lockId let unlocked = ( await s.getStr( fmt""" @@ -1143,11 +1110,10 @@ proc performWriteQuery*( const COULD_NOT_ACQUIRE_ADVISORY_LOCK* = "could not acquire advisory lock" -proc performWriteQueryWithLock*( +proc performWriteQueryWithLock( self: PostgresDriver, queryToProtect: string ): Future[ArchiveDriverResult[void]] {.async.} = ## This wraps the original query in a script so that we make sure a pg_advisory lock protects it - debug "performWriteQueryWithLock", queryToProtect let query = fmt""" DO $$ @@ -1210,8 +1176,6 @@ proc addPartition( ## Creates a partition table that will store the messages that fall in the range ## `startTime` <= timestamp < `startTime + duration`. ## `startTime` is measured in seconds since epoch - debug "beginning of addPartition" - let beginning = startTime let `end` = partitions_manager.calcEndPartitionTime(startTime) @@ -1353,8 +1317,6 @@ proc getTableSize*( ): Future[ArchiveDriverResult[string]] {.async.} = ## Returns a human-readable representation of the size for the requested table. ## tableName - table of interest. - debug "beginning of getTableSize" - let tableSize = ( await self.getStr( fmt""" @@ -1442,8 +1404,6 @@ proc removeOldestPartition( self: PostgresDriver, forceRemoval: bool = false, ## To allow cleanup in tests ): Future[ArchiveDriverResult[void]] {.async.} = ## Indirectly called from the retention policy - debug "beginning of removeOldestPartition" - let oldestPartition = self.partitionMngr.getOldestPartition().valueOr: return err("could not remove oldest partition: " & $error) @@ -1465,8 +1425,6 @@ proc containsAnyPartition*(self: PostgresDriver): bool = method decreaseDatabaseSize*( driver: PostgresDriver, targetSizeInBytes: int64, forceRemoval: bool = false ): Future[ArchiveDriverResult[void]] {.async.} = - debug "beginning of decreaseDatabaseSize" - var dbSize = (await driver.getDatabaseSize()).valueOr: return err("decreaseDatabaseSize failed to get database size: " & $error) @@ -1533,8 +1491,6 @@ method existsTable*( proc getCurrentVersion*( s: PostgresDriver ): Future[ArchiveDriverResult[int64]] {.async.} = - debug "beginning of getCurrentVersion" - let existsVersionTable = (await s.existsTable("version")).valueOr: return err("error in getCurrentVersion-existsTable: " & $error) @@ -1551,8 +1507,6 @@ method deleteMessagesOlderThanTimestamp*( ): Future[ArchiveDriverResult[void]] {.async.} = ## First of all, let's remove the older partitions so that we can reduce ## the database size. - debug "beginning of deleteMessagesOlderThanTimestamp" - (await s.removePartitionsOlderThan(tsNanoSec)).isOkOr: return err("error while removing older partitions: " & $error) diff --git a/waku/waku_archive_legacy/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive_legacy/driver/postgres_driver/postgres_driver.nim index 4f3532622..e8eed4238 100644 --- a/waku/waku_archive_legacy/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive_legacy/driver/postgres_driver/postgres_driver.nim @@ -259,8 +259,6 @@ method getAllMessages*( s: PostgresDriver ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = ## Retrieve all messages from the store. - debug "beginning of getAllMessages" - var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)] proc rowCallback(pqResult: ptr PGresult) = rowCallbackImpl(pqResult, rows) @@ -634,7 +632,6 @@ proc getMessagesByMessageHashes( ## Retrieves information only filtering by a given messageHashes list. ## This proc levarages on the messages_lookup table to have better query performance ## and only query the desired partitions in the partitioned messages table - debug "beginning of getMessagesByMessageHashes" var query = fmt""" WITH min_timestamp AS ( @@ -673,7 +670,6 @@ proc getMessagesByMessageHashes( ).isOkOr: return err("failed to run query: " & $error) - debug "end of getMessagesByMessageHashes" return ok(rows) method getMessages*( @@ -689,8 +685,6 @@ method getMessages*( ascendingOrder = true, requestId = "", ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = - debug "beginning of getMessages" - let hexHashes = hashes.mapIt(toHex(it)) if cursor.isNone() and pubsubTopic.isNone() and contentTopicSeq.len == 0 and