mirror of https://github.com/waku-org/nwaku.git
chore: Better database query logs and logarithmic scale in grafana store panels (#3048)
This commit is contained in:
parent
256b7853a6
commit
d68b06f145
|
@ -3948,7 +3948,8 @@
|
||||||
"lineWidth": 2,
|
"lineWidth": 2,
|
||||||
"pointSize": 10,
|
"pointSize": 10,
|
||||||
"scaleDistribution": {
|
"scaleDistribution": {
|
||||||
"type": "linear"
|
"log": 10,
|
||||||
|
"type": "log"
|
||||||
},
|
},
|
||||||
"showPoints": "auto",
|
"showPoints": "auto",
|
||||||
"spanNulls": false,
|
"spanNulls": false,
|
||||||
|
@ -3973,7 +3974,32 @@
|
||||||
},
|
},
|
||||||
"unit": "s"
|
"unit": "s"
|
||||||
},
|
},
|
||||||
"overrides": []
|
"overrides": [
|
||||||
|
{
|
||||||
|
"__systemRef": "hideSeriesFrom",
|
||||||
|
"matcher": {
|
||||||
|
"id": "byNames",
|
||||||
|
"options": {
|
||||||
|
"mode": "exclude",
|
||||||
|
"names": [
|
||||||
|
"query_tag_ANALYZEmessages"
|
||||||
|
],
|
||||||
|
"prefix": "All except:",
|
||||||
|
"readOnly": true
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"properties": [
|
||||||
|
{
|
||||||
|
"id": "custom.hideFrom",
|
||||||
|
"value": {
|
||||||
|
"legend": false,
|
||||||
|
"tooltip": false,
|
||||||
|
"viz": true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
},
|
},
|
||||||
"gridPos": {
|
"gridPos": {
|
||||||
"h": 11,
|
"h": 11,
|
||||||
|
@ -4048,7 +4074,8 @@
|
||||||
"lineWidth": 2,
|
"lineWidth": 2,
|
||||||
"pointSize": 10,
|
"pointSize": 10,
|
||||||
"scaleDistribution": {
|
"scaleDistribution": {
|
||||||
"type": "linear"
|
"log": 10,
|
||||||
|
"type": "log"
|
||||||
},
|
},
|
||||||
"showPoints": "auto",
|
"showPoints": "auto",
|
||||||
"spanNulls": false,
|
"spanNulls": false,
|
||||||
|
@ -4109,6 +4136,7 @@
|
||||||
"editorMode": "code",
|
"editorMode": "code",
|
||||||
"exemplar": true,
|
"exemplar": true,
|
||||||
"expr": "query_time_secs{instance=~\"[[host]].([[dc:pipe]]).([[fleet:pipe]])\", phase=\"waitFinish\"} and deriv(query_time_secs{instance=~\"[[host]].([[dc:pipe]]).([[fleet:pipe]])\", phase=\"waitFinish\"}[45s]) != 0",
|
"expr": "query_time_secs{instance=~\"[[host]].([[dc:pipe]]).([[fleet:pipe]])\", phase=\"waitFinish\"} and deriv(query_time_secs{instance=~\"[[host]].([[dc:pipe]]).([[fleet:pipe]])\", phase=\"waitFinish\"}[45s]) != 0",
|
||||||
|
"hide": false,
|
||||||
"interval": "",
|
"interval": "",
|
||||||
"legendFormat": "{{query}}",
|
"legendFormat": "{{query}}",
|
||||||
"range": true,
|
"range": true,
|
||||||
|
@ -4147,7 +4175,8 @@
|
||||||
"lineWidth": 2,
|
"lineWidth": 2,
|
||||||
"pointSize": 10,
|
"pointSize": 10,
|
||||||
"scaleDistribution": {
|
"scaleDistribution": {
|
||||||
"type": "linear"
|
"log": 10,
|
||||||
|
"type": "log"
|
||||||
},
|
},
|
||||||
"showPoints": "auto",
|
"showPoints": "auto",
|
||||||
"spanNulls": false,
|
"spanNulls": false,
|
||||||
|
@ -4246,7 +4275,8 @@
|
||||||
"lineWidth": 2,
|
"lineWidth": 2,
|
||||||
"pointSize": 10,
|
"pointSize": 10,
|
||||||
"scaleDistribution": {
|
"scaleDistribution": {
|
||||||
"type": "linear"
|
"log": 10,
|
||||||
|
"type": "log"
|
||||||
},
|
},
|
||||||
"showPoints": "auto",
|
"showPoints": "auto",
|
||||||
"spanNulls": false,
|
"spanNulls": false,
|
||||||
|
@ -5456,8 +5486,7 @@
|
||||||
"mode": "absolute",
|
"mode": "absolute",
|
||||||
"steps": [
|
"steps": [
|
||||||
{
|
{
|
||||||
"color": "green",
|
"color": "green"
|
||||||
"value": null
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"color": "red",
|
"color": "red",
|
||||||
|
@ -5551,8 +5580,7 @@
|
||||||
"mode": "absolute",
|
"mode": "absolute",
|
||||||
"steps": [
|
"steps": [
|
||||||
{
|
{
|
||||||
"color": "green",
|
"color": "green"
|
||||||
"value": null
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"color": "red",
|
"color": "red",
|
||||||
|
@ -5643,8 +5671,7 @@
|
||||||
"mode": "absolute",
|
"mode": "absolute",
|
||||||
"steps": [
|
"steps": [
|
||||||
{
|
{
|
||||||
"color": "green",
|
"color": "green"
|
||||||
"value": null
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"color": "red",
|
"color": "red",
|
||||||
|
@ -5897,8 +5924,7 @@
|
||||||
"mode": "absolute",
|
"mode": "absolute",
|
||||||
"steps": [
|
"steps": [
|
||||||
{
|
{
|
||||||
"color": "green",
|
"color": "green"
|
||||||
"value": null
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"color": "red",
|
"color": "red",
|
||||||
|
@ -6022,8 +6048,7 @@
|
||||||
"mode": "absolute",
|
"mode": "absolute",
|
||||||
"steps": [
|
"steps": [
|
||||||
{
|
{
|
||||||
"color": "green",
|
"color": "green"
|
||||||
"value": null
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"color": "red",
|
"color": "red",
|
||||||
|
@ -7346,7 +7371,7 @@
|
||||||
"type": "row"
|
"type": "row"
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"refresh": "30s",
|
"refresh": false,
|
||||||
"schemaVersion": 37,
|
"schemaVersion": 37,
|
||||||
"style": "dark",
|
"style": "dark",
|
||||||
"tags": [],
|
"tags": [],
|
||||||
|
@ -7428,10 +7453,10 @@
|
||||||
"current": {
|
"current": {
|
||||||
"selected": true,
|
"selected": true,
|
||||||
"text": [
|
"text": [
|
||||||
"All"
|
"ac-cn-hongkong-c"
|
||||||
],
|
],
|
||||||
"value": [
|
"value": [
|
||||||
"$__all"
|
"ac-cn-hongkong-c"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
"datasource": {
|
"datasource": {
|
||||||
|
@ -7461,8 +7486,8 @@
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
"time": {
|
"time": {
|
||||||
"from": "now-24h",
|
"from": "2024-09-20T12:34:03.849Z",
|
||||||
"to": "now"
|
"to": "2024-09-20T13:52:30.721Z"
|
||||||
},
|
},
|
||||||
"timepicker": {
|
"timepicker": {
|
||||||
"refresh_intervals": [
|
"refresh_intervals": [
|
||||||
|
@ -7480,6 +7505,6 @@
|
||||||
"timezone": "browser",
|
"timezone": "browser",
|
||||||
"title": "Nim-Waku V2",
|
"title": "Nim-Waku V2",
|
||||||
"uid": "qrp_ZCTGz",
|
"uid": "qrp_ZCTGz",
|
||||||
"version": 150,
|
"version": 151,
|
||||||
"weekStart": ""
|
"weekStart": ""
|
||||||
}
|
}
|
|
@ -1,4 +1,10 @@
|
||||||
import std/[times, strutils, asyncnet, os, sequtils], results, chronos, metrics, re
|
import
|
||||||
|
std/[times, strutils, asyncnet, os, sequtils],
|
||||||
|
results,
|
||||||
|
chronos,
|
||||||
|
metrics,
|
||||||
|
re,
|
||||||
|
chronicles
|
||||||
import ./query_metrics
|
import ./query_metrics
|
||||||
|
|
||||||
include db_connector/db_postgres
|
include db_connector/db_postgres
|
||||||
|
@ -167,21 +173,26 @@ proc dbConnQuery*(
|
||||||
(await db.sendQuery(query, args)).isOkOr:
|
(await db.sendQuery(query, args)).isOkOr:
|
||||||
return err("error in dbConnQuery calling sendQuery: " & $error)
|
return err("error in dbConnQuery calling sendQuery: " & $error)
|
||||||
|
|
||||||
query_time_secs.set(
|
let sendDuration = getTime().toUnixFloat() - queryStartTime
|
||||||
getTime().toUnixFloat() - queryStartTime, [querySummary, "sendQuery"]
|
query_time_secs.set(sendDuration, [querySummary, "sendQuery"])
|
||||||
)
|
|
||||||
|
|
||||||
queryStartTime = getTime().toUnixFloat()
|
queryStartTime = getTime().toUnixFloat()
|
||||||
|
|
||||||
(await db.waitQueryToFinish(rowCallback)).isOkOr:
|
(await db.waitQueryToFinish(rowCallback)).isOkOr:
|
||||||
return err("error in dbConnQuery calling waitQueryToFinish: " & $error)
|
return err("error in dbConnQuery calling waitQueryToFinish: " & $error)
|
||||||
|
|
||||||
query_time_secs.set(
|
let waitDuration = getTime().toUnixFloat() - queryStartTime
|
||||||
getTime().toUnixFloat() - queryStartTime, [querySummary, "waitFinish"]
|
query_time_secs.set(waitDuration, [querySummary, "waitFinish"])
|
||||||
)
|
|
||||||
|
|
||||||
query_count.inc(labelValues = [querySummary])
|
query_count.inc(labelValues = [querySummary])
|
||||||
|
|
||||||
|
if "insert" notin ($query).toLower():
|
||||||
|
debug "dbConnQuery",
|
||||||
|
query = $query,
|
||||||
|
querySummary,
|
||||||
|
waitDurationSecs = waitDuration,
|
||||||
|
sendDurationSecs = sendDuration
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
proc dbConnQueryPrepared*(
|
proc dbConnQueryPrepared*(
|
||||||
|
@ -196,17 +207,21 @@ proc dbConnQueryPrepared*(
|
||||||
db.sendQueryPrepared(stmtName, paramValues, paramLengths, paramFormats).isOkOr:
|
db.sendQueryPrepared(stmtName, paramValues, paramLengths, paramFormats).isOkOr:
|
||||||
return err("error in dbConnQueryPrepared calling sendQuery: " & $error)
|
return err("error in dbConnQueryPrepared calling sendQuery: " & $error)
|
||||||
|
|
||||||
query_time_secs.set(getTime().toUnixFloat() - queryStartTime, [stmtName, "sendQuery"])
|
let sendDuration = getTime().toUnixFloat() - queryStartTime
|
||||||
|
query_time_secs.set(sendDuration, [stmtName, "sendQuery"])
|
||||||
|
|
||||||
queryStartTime = getTime().toUnixFloat()
|
queryStartTime = getTime().toUnixFloat()
|
||||||
|
|
||||||
(await db.waitQueryToFinish(rowCallback)).isOkOr:
|
(await db.waitQueryToFinish(rowCallback)).isOkOr:
|
||||||
return err("error in dbConnQueryPrepared calling waitQueryToFinish: " & $error)
|
return err("error in dbConnQueryPrepared calling waitQueryToFinish: " & $error)
|
||||||
|
|
||||||
query_time_secs.set(
|
let waitDuration = getTime().toUnixFloat() - queryStartTime
|
||||||
getTime().toUnixFloat() - queryStartTime, [stmtName, "waitFinish"]
|
query_time_secs.set(waitDuration, [stmtName, "waitFinish"])
|
||||||
)
|
|
||||||
|
|
||||||
query_count.inc(labelValues = [stmtName])
|
query_count.inc(labelValues = [stmtName])
|
||||||
|
|
||||||
|
if "insert" notin stmtName.toLower():
|
||||||
|
debug "dbConnQueryPrepared",
|
||||||
|
stmtName, waitDurationSecs = waitDuration, sendDurationSecs = sendDuration
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
|
@ -346,8 +346,6 @@ method getAllMessages*(
|
||||||
s: PostgresDriver
|
s: PostgresDriver
|
||||||
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
|
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
|
||||||
## Retrieve all messages from the store.
|
## Retrieve all messages from the store.
|
||||||
debug "beginning of getAllMessages"
|
|
||||||
|
|
||||||
var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)]
|
var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)]
|
||||||
proc rowCallback(pqResult: ptr PGresult) =
|
proc rowCallback(pqResult: ptr PGresult) =
|
||||||
rowCallbackImpl(pqResult, rows)
|
rowCallbackImpl(pqResult, rows)
|
||||||
|
@ -370,8 +368,6 @@ proc getPartitionsList(
|
||||||
): Future[ArchiveDriverResult[seq[string]]] {.async.} =
|
): Future[ArchiveDriverResult[seq[string]]] {.async.} =
|
||||||
## Retrieves the seq of partition table names.
|
## Retrieves the seq of partition table names.
|
||||||
## e.g: @["messages_1708534333_1708534393", "messages_1708534273_1708534333"]
|
## e.g: @["messages_1708534333_1708534393", "messages_1708534273_1708534333"]
|
||||||
|
|
||||||
debug "beginning getPartitionsList"
|
|
||||||
var partitions: seq[string]
|
var partitions: seq[string]
|
||||||
proc rowCallback(pqResult: ptr PGresult) =
|
proc rowCallback(pqResult: ptr PGresult) =
|
||||||
for iRow in 0 ..< pqResult.pqNtuples():
|
for iRow in 0 ..< pqResult.pqNtuples():
|
||||||
|
@ -431,7 +427,6 @@ proc getMessagesArbitraryQuery(
|
||||||
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
|
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
|
||||||
## This proc allows to handle atypical queries. We don't use prepared statements for those.
|
## This proc allows to handle atypical queries. We don't use prepared statements for those.
|
||||||
|
|
||||||
debug "beginning getMessagesArbitraryQuery"
|
|
||||||
var query = SelectClause
|
var query = SelectClause
|
||||||
var statements: seq[string]
|
var statements: seq[string]
|
||||||
var args: seq[string]
|
var args: seq[string]
|
||||||
|
@ -512,8 +507,6 @@ proc getMessageHashesArbitraryQuery(
|
||||||
async
|
async
|
||||||
.} =
|
.} =
|
||||||
## This proc allows to handle atypical queries. We don't use prepared statements for those.
|
## This proc allows to handle atypical queries. We don't use prepared statements for those.
|
||||||
|
|
||||||
debug "beginning of getMessageHashesArbitraryQuery"
|
|
||||||
var query = """SELECT messageHash FROM messages"""
|
var query = """SELECT messageHash FROM messages"""
|
||||||
|
|
||||||
var statements: seq[string]
|
var statements: seq[string]
|
||||||
|
@ -597,7 +590,6 @@ proc getMessagesPreparedStmt(
|
||||||
|
|
||||||
var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)]
|
var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)]
|
||||||
|
|
||||||
debug "beginning of getMessagesPreparedStmt"
|
|
||||||
proc rowCallback(pqResult: ptr PGresult) =
|
proc rowCallback(pqResult: ptr PGresult) =
|
||||||
rowCallbackImpl(pqResult, rows)
|
rowCallbackImpl(pqResult, rows)
|
||||||
|
|
||||||
|
@ -689,7 +681,6 @@ proc getMessageHashesPreparedStmt(
|
||||||
|
|
||||||
var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)]
|
var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)]
|
||||||
|
|
||||||
debug "beginning of getMessageHashesPreparedStmt"
|
|
||||||
proc rowCallback(pqResult: ptr PGresult) =
|
proc rowCallback(pqResult: ptr PGresult) =
|
||||||
hashCallbackImpl(pqResult, rows)
|
hashCallbackImpl(pqResult, rows)
|
||||||
|
|
||||||
|
@ -775,7 +766,6 @@ proc getMessagesByMessageHashes(
|
||||||
## Retrieves information only filtering by a given messageHashes list.
|
## Retrieves information only filtering by a given messageHashes list.
|
||||||
## This proc levarages on the messages_lookup table to have better query performance
|
## This proc levarages on the messages_lookup table to have better query performance
|
||||||
## and only query the desired partitions in the partitioned messages table
|
## and only query the desired partitions in the partitioned messages table
|
||||||
debug "beginning of getMessagesByMessageHashes"
|
|
||||||
var query =
|
var query =
|
||||||
fmt"""
|
fmt"""
|
||||||
WITH min_timestamp AS (
|
WITH min_timestamp AS (
|
||||||
|
@ -814,7 +804,6 @@ proc getMessagesByMessageHashes(
|
||||||
).isOkOr:
|
).isOkOr:
|
||||||
return err("failed to run query: " & $error)
|
return err("failed to run query: " & $error)
|
||||||
|
|
||||||
debug "end of getMessagesByMessageHashes"
|
|
||||||
return ok(rows)
|
return ok(rows)
|
||||||
|
|
||||||
proc getMessagesWithinLimits(
|
proc getMessagesWithinLimits(
|
||||||
|
@ -894,8 +883,6 @@ method getMessages*(
|
||||||
ascendingOrder = true,
|
ascendingOrder = true,
|
||||||
requestId = "",
|
requestId = "",
|
||||||
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
|
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
|
||||||
debug "beginning of getMessages"
|
|
||||||
|
|
||||||
let rows = collect(newSeq):
|
let rows = collect(newSeq):
|
||||||
for i in countup(0, hashes.len, MaxHashesPerQuery):
|
for i in countup(0, hashes.len, MaxHashesPerQuery):
|
||||||
let stop = min(i + MaxHashesPerQuery, hashes.len)
|
let stop = min(i + MaxHashesPerQuery, hashes.len)
|
||||||
|
@ -911,8 +898,6 @@ method getMessages*(
|
||||||
for row in subRows:
|
for row in subRows:
|
||||||
row
|
row
|
||||||
|
|
||||||
debug "end of getMessages"
|
|
||||||
|
|
||||||
return ok(rows)
|
return ok(rows)
|
||||||
|
|
||||||
proc getStr(
|
proc getStr(
|
||||||
|
@ -920,8 +905,6 @@ proc getStr(
|
||||||
): Future[ArchiveDriverResult[string]] {.async.} =
|
): Future[ArchiveDriverResult[string]] {.async.} =
|
||||||
# Performs a query that is expected to return a single string
|
# Performs a query that is expected to return a single string
|
||||||
|
|
||||||
debug "beginning of getStr"
|
|
||||||
|
|
||||||
var ret: string
|
var ret: string
|
||||||
proc rowCallback(pqResult: ptr PGresult) =
|
proc rowCallback(pqResult: ptr PGresult) =
|
||||||
if pqResult.pqnfields() != 1:
|
if pqResult.pqnfields() != 1:
|
||||||
|
@ -944,7 +927,6 @@ proc getInt(
|
||||||
): Future[ArchiveDriverResult[int64]] {.async.} =
|
): Future[ArchiveDriverResult[int64]] {.async.} =
|
||||||
# Performs a query that is expected to return a single numeric value (int64)
|
# Performs a query that is expected to return a single numeric value (int64)
|
||||||
|
|
||||||
debug "beginning of getInt"
|
|
||||||
var retInt = 0'i64
|
var retInt = 0'i64
|
||||||
let str = (await s.getStr(query)).valueOr:
|
let str = (await s.getStr(query)).valueOr:
|
||||||
return err("could not get str in getInt: " & $error)
|
return err("could not get str in getInt: " & $error)
|
||||||
|
@ -962,8 +944,6 @@ proc getInt(
|
||||||
method getDatabaseSize*(
|
method getDatabaseSize*(
|
||||||
s: PostgresDriver
|
s: PostgresDriver
|
||||||
): Future[ArchiveDriverResult[int64]] {.async.} =
|
): Future[ArchiveDriverResult[int64]] {.async.} =
|
||||||
debug "beginning of getDatabaseSize"
|
|
||||||
|
|
||||||
let intRes = (await s.getInt("SELECT pg_database_size(current_database())")).valueOr:
|
let intRes = (await s.getInt("SELECT pg_database_size(current_database())")).valueOr:
|
||||||
return err("error in getDatabaseSize: " & error)
|
return err("error in getDatabaseSize: " & error)
|
||||||
|
|
||||||
|
@ -973,8 +953,6 @@ method getDatabaseSize*(
|
||||||
method getMessagesCount*(
|
method getMessagesCount*(
|
||||||
s: PostgresDriver
|
s: PostgresDriver
|
||||||
): Future[ArchiveDriverResult[int64]] {.async.} =
|
): Future[ArchiveDriverResult[int64]] {.async.} =
|
||||||
debug "beginning of getMessagesCount"
|
|
||||||
|
|
||||||
let intRes = await s.getInt("SELECT COUNT(1) FROM messages")
|
let intRes = await s.getInt("SELECT COUNT(1) FROM messages")
|
||||||
if intRes.isErr():
|
if intRes.isErr():
|
||||||
return err("error in getMessagesCount: " & intRes.error)
|
return err("error in getMessagesCount: " & intRes.error)
|
||||||
|
@ -987,8 +965,6 @@ method getOldestMessageTimestamp*(
|
||||||
## In some cases it could happen that we have
|
## In some cases it could happen that we have
|
||||||
## empty partitions which are older than the current stored rows.
|
## empty partitions which are older than the current stored rows.
|
||||||
## In those cases we want to consider those older partitions as the oldest considered timestamp.
|
## In those cases we want to consider those older partitions as the oldest considered timestamp.
|
||||||
debug "beginning of getOldestMessageTimestamp"
|
|
||||||
|
|
||||||
let oldestPartition = s.partitionMngr.getOldestPartition().valueOr:
|
let oldestPartition = s.partitionMngr.getOldestPartition().valueOr:
|
||||||
return err("could not get oldest partition: " & $error)
|
return err("could not get oldest partition: " & $error)
|
||||||
|
|
||||||
|
@ -1004,7 +980,6 @@ method getOldestMessageTimestamp*(
|
||||||
method getNewestMessageTimestamp*(
|
method getNewestMessageTimestamp*(
|
||||||
s: PostgresDriver
|
s: PostgresDriver
|
||||||
): Future[ArchiveDriverResult[Timestamp]] {.async.} =
|
): Future[ArchiveDriverResult[Timestamp]] {.async.} =
|
||||||
debug "beginning of getNewestMessageTimestamp"
|
|
||||||
let intRes = await s.getInt("SELECT MAX(timestamp) FROM messages")
|
let intRes = await s.getInt("SELECT MAX(timestamp) FROM messages")
|
||||||
|
|
||||||
if intRes.isErr():
|
if intRes.isErr():
|
||||||
|
@ -1015,8 +990,6 @@ method getNewestMessageTimestamp*(
|
||||||
method deleteOldestMessagesNotWithinLimit*(
|
method deleteOldestMessagesNotWithinLimit*(
|
||||||
s: PostgresDriver, limit: int
|
s: PostgresDriver, limit: int
|
||||||
): Future[ArchiveDriverResult[void]] {.async.} =
|
): Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
debug "beginning of deleteOldestMessagesNotWithinLimit"
|
|
||||||
|
|
||||||
var execRes = await s.writeConnPool.pgQuery(
|
var execRes = await s.writeConnPool.pgQuery(
|
||||||
"""DELETE FROM messages WHERE messageHash NOT IN
|
"""DELETE FROM messages WHERE messageHash NOT IN
|
||||||
(
|
(
|
||||||
|
@ -1039,12 +1012,9 @@ method deleteOldestMessagesNotWithinLimit*(
|
||||||
"error in deleteOldestMessagesNotWithinLimit messages_lookup: " & execRes.error
|
"error in deleteOldestMessagesNotWithinLimit messages_lookup: " & execRes.error
|
||||||
)
|
)
|
||||||
|
|
||||||
debug "end of deleteOldestMessagesNotWithinLimit"
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
method close*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} =
|
method close*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
debug "beginning of postgres close"
|
|
||||||
|
|
||||||
## Cancel the partition factory loop
|
## Cancel the partition factory loop
|
||||||
s.futLoopPartitionFactory.cancelSoon()
|
s.futLoopPartitionFactory.cancelSoon()
|
||||||
|
|
||||||
|
@ -1096,8 +1066,6 @@ proc acquireDatabaseLock*(
|
||||||
## "performWriteQueryWithLock" in the migrations process because we can't nest two PL/SQL
|
## "performWriteQueryWithLock" in the migrations process because we can't nest two PL/SQL
|
||||||
## scripts.
|
## scripts.
|
||||||
|
|
||||||
debug "beginning of acquireDatabaseLock", lockId
|
|
||||||
|
|
||||||
let locked = (
|
let locked = (
|
||||||
await s.getStr(
|
await s.getStr(
|
||||||
fmt"""
|
fmt"""
|
||||||
|
@ -1116,7 +1084,6 @@ proc releaseDatabaseLock*(
|
||||||
s: PostgresDriver, lockId: int = 841886
|
s: PostgresDriver, lockId: int = 841886
|
||||||
): Future[ArchiveDriverResult[void]] {.async.} =
|
): Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
## Release an advisory lock (useful to avoid more than one application running migrations at the same time)
|
## Release an advisory lock (useful to avoid more than one application running migrations at the same time)
|
||||||
debug "beginning of releaseDatabaseLock", lockId
|
|
||||||
let unlocked = (
|
let unlocked = (
|
||||||
await s.getStr(
|
await s.getStr(
|
||||||
fmt"""
|
fmt"""
|
||||||
|
@ -1143,11 +1110,10 @@ proc performWriteQuery*(
|
||||||
|
|
||||||
const COULD_NOT_ACQUIRE_ADVISORY_LOCK* = "could not acquire advisory lock"
|
const COULD_NOT_ACQUIRE_ADVISORY_LOCK* = "could not acquire advisory lock"
|
||||||
|
|
||||||
proc performWriteQueryWithLock*(
|
proc performWriteQueryWithLock(
|
||||||
self: PostgresDriver, queryToProtect: string
|
self: PostgresDriver, queryToProtect: string
|
||||||
): Future[ArchiveDriverResult[void]] {.async.} =
|
): Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
## This wraps the original query in a script so that we make sure a pg_advisory lock protects it
|
## This wraps the original query in a script so that we make sure a pg_advisory lock protects it
|
||||||
debug "performWriteQueryWithLock", queryToProtect
|
|
||||||
let query =
|
let query =
|
||||||
fmt"""
|
fmt"""
|
||||||
DO $$
|
DO $$
|
||||||
|
@ -1210,8 +1176,6 @@ proc addPartition(
|
||||||
## Creates a partition table that will store the messages that fall in the range
|
## Creates a partition table that will store the messages that fall in the range
|
||||||
## `startTime` <= timestamp < `startTime + duration`.
|
## `startTime` <= timestamp < `startTime + duration`.
|
||||||
## `startTime` is measured in seconds since epoch
|
## `startTime` is measured in seconds since epoch
|
||||||
debug "beginning of addPartition"
|
|
||||||
|
|
||||||
let beginning = startTime
|
let beginning = startTime
|
||||||
let `end` = partitions_manager.calcEndPartitionTime(startTime)
|
let `end` = partitions_manager.calcEndPartitionTime(startTime)
|
||||||
|
|
||||||
|
@ -1353,8 +1317,6 @@ proc getTableSize*(
|
||||||
): Future[ArchiveDriverResult[string]] {.async.} =
|
): Future[ArchiveDriverResult[string]] {.async.} =
|
||||||
## Returns a human-readable representation of the size for the requested table.
|
## Returns a human-readable representation of the size for the requested table.
|
||||||
## tableName - table of interest.
|
## tableName - table of interest.
|
||||||
debug "beginning of getTableSize"
|
|
||||||
|
|
||||||
let tableSize = (
|
let tableSize = (
|
||||||
await self.getStr(
|
await self.getStr(
|
||||||
fmt"""
|
fmt"""
|
||||||
|
@ -1442,8 +1404,6 @@ proc removeOldestPartition(
|
||||||
self: PostgresDriver, forceRemoval: bool = false, ## To allow cleanup in tests
|
self: PostgresDriver, forceRemoval: bool = false, ## To allow cleanup in tests
|
||||||
): Future[ArchiveDriverResult[void]] {.async.} =
|
): Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
## Indirectly called from the retention policy
|
## Indirectly called from the retention policy
|
||||||
debug "beginning of removeOldestPartition"
|
|
||||||
|
|
||||||
let oldestPartition = self.partitionMngr.getOldestPartition().valueOr:
|
let oldestPartition = self.partitionMngr.getOldestPartition().valueOr:
|
||||||
return err("could not remove oldest partition: " & $error)
|
return err("could not remove oldest partition: " & $error)
|
||||||
|
|
||||||
|
@ -1465,8 +1425,6 @@ proc containsAnyPartition*(self: PostgresDriver): bool =
|
||||||
method decreaseDatabaseSize*(
|
method decreaseDatabaseSize*(
|
||||||
driver: PostgresDriver, targetSizeInBytes: int64, forceRemoval: bool = false
|
driver: PostgresDriver, targetSizeInBytes: int64, forceRemoval: bool = false
|
||||||
): Future[ArchiveDriverResult[void]] {.async.} =
|
): Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
debug "beginning of decreaseDatabaseSize"
|
|
||||||
|
|
||||||
var dbSize = (await driver.getDatabaseSize()).valueOr:
|
var dbSize = (await driver.getDatabaseSize()).valueOr:
|
||||||
return err("decreaseDatabaseSize failed to get database size: " & $error)
|
return err("decreaseDatabaseSize failed to get database size: " & $error)
|
||||||
|
|
||||||
|
@ -1533,8 +1491,6 @@ method existsTable*(
|
||||||
proc getCurrentVersion*(
|
proc getCurrentVersion*(
|
||||||
s: PostgresDriver
|
s: PostgresDriver
|
||||||
): Future[ArchiveDriverResult[int64]] {.async.} =
|
): Future[ArchiveDriverResult[int64]] {.async.} =
|
||||||
debug "beginning of getCurrentVersion"
|
|
||||||
|
|
||||||
let existsVersionTable = (await s.existsTable("version")).valueOr:
|
let existsVersionTable = (await s.existsTable("version")).valueOr:
|
||||||
return err("error in getCurrentVersion-existsTable: " & $error)
|
return err("error in getCurrentVersion-existsTable: " & $error)
|
||||||
|
|
||||||
|
@ -1551,8 +1507,6 @@ method deleteMessagesOlderThanTimestamp*(
|
||||||
): Future[ArchiveDriverResult[void]] {.async.} =
|
): Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
## First of all, let's remove the older partitions so that we can reduce
|
## First of all, let's remove the older partitions so that we can reduce
|
||||||
## the database size.
|
## the database size.
|
||||||
debug "beginning of deleteMessagesOlderThanTimestamp"
|
|
||||||
|
|
||||||
(await s.removePartitionsOlderThan(tsNanoSec)).isOkOr:
|
(await s.removePartitionsOlderThan(tsNanoSec)).isOkOr:
|
||||||
return err("error while removing older partitions: " & $error)
|
return err("error while removing older partitions: " & $error)
|
||||||
|
|
||||||
|
|
|
@ -259,8 +259,6 @@ method getAllMessages*(
|
||||||
s: PostgresDriver
|
s: PostgresDriver
|
||||||
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
|
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
|
||||||
## Retrieve all messages from the store.
|
## Retrieve all messages from the store.
|
||||||
debug "beginning of getAllMessages"
|
|
||||||
|
|
||||||
var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)]
|
var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)]
|
||||||
proc rowCallback(pqResult: ptr PGresult) =
|
proc rowCallback(pqResult: ptr PGresult) =
|
||||||
rowCallbackImpl(pqResult, rows)
|
rowCallbackImpl(pqResult, rows)
|
||||||
|
@ -634,7 +632,6 @@ proc getMessagesByMessageHashes(
|
||||||
## Retrieves information only filtering by a given messageHashes list.
|
## Retrieves information only filtering by a given messageHashes list.
|
||||||
## This proc levarages on the messages_lookup table to have better query performance
|
## This proc levarages on the messages_lookup table to have better query performance
|
||||||
## and only query the desired partitions in the partitioned messages table
|
## and only query the desired partitions in the partitioned messages table
|
||||||
debug "beginning of getMessagesByMessageHashes"
|
|
||||||
var query =
|
var query =
|
||||||
fmt"""
|
fmt"""
|
||||||
WITH min_timestamp AS (
|
WITH min_timestamp AS (
|
||||||
|
@ -673,7 +670,6 @@ proc getMessagesByMessageHashes(
|
||||||
).isOkOr:
|
).isOkOr:
|
||||||
return err("failed to run query: " & $error)
|
return err("failed to run query: " & $error)
|
||||||
|
|
||||||
debug "end of getMessagesByMessageHashes"
|
|
||||||
return ok(rows)
|
return ok(rows)
|
||||||
|
|
||||||
method getMessages*(
|
method getMessages*(
|
||||||
|
@ -689,8 +685,6 @@ method getMessages*(
|
||||||
ascendingOrder = true,
|
ascendingOrder = true,
|
||||||
requestId = "",
|
requestId = "",
|
||||||
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
|
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
|
||||||
debug "beginning of getMessages"
|
|
||||||
|
|
||||||
let hexHashes = hashes.mapIt(toHex(it))
|
let hexHashes = hashes.mapIt(toHex(it))
|
||||||
|
|
||||||
if cursor.isNone() and pubsubTopic.isNone() and contentTopicSeq.len == 0 and
|
if cursor.isNone() and pubsubTopic.isNone() and contentTopicSeq.len == 0 and
|
||||||
|
|
Loading…
Reference in New Issue