mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-02 14:03:06 +00:00
chore: more efficient metrics usage (#3298)
* Enhance metrics labels * Bound the metrics-label-values in arbitrary queries * The metrics-label-values for prepared statements are kept as they already represent a fixed set
This commit is contained in:
parent
57514f5c9e
commit
f90baa1d2f
@ -235,6 +235,13 @@ proc isSecureString(input: string): bool =
|
||||
|
||||
return true
|
||||
|
||||
proc convertQueryToMetricLabel(query: string): string =
|
||||
## Simple query categorization. The output label is the one that should be used in query metrics
|
||||
for snippetQuery, metric in QueriesToMetricMap.pairs():
|
||||
if query.contains($snippetQuery):
|
||||
return $metric
|
||||
return "unknown_query_metric"
|
||||
|
||||
proc dbConnQuery*(
|
||||
dbConnWrapper: DbConnWrapper,
|
||||
query: SqlQuery,
|
||||
@ -247,11 +254,7 @@ proc dbConnQuery*(
|
||||
|
||||
dbConnWrapper.futBecomeFree = newFuture[void]("dbConnQuery")
|
||||
|
||||
let cleanedQuery = ($query).replace(" ", "").replace("\n", "")
|
||||
## remove everything between ' or " all possible sequence of numbers. e.g. rm partition partition
|
||||
var querySummary = cleanedQuery.replace(re2("""(['"]).*?\\1"""), "")
|
||||
querySummary = querySummary.replace(re2"\d+", "")
|
||||
querySummary = "query_tag_" & querySummary[0 ..< min(querySummary.len, 128)]
|
||||
let metricLabel = convertQueryToMetricLabel($query)
|
||||
|
||||
var queryStartTime = getTime().toUnixFloat()
|
||||
|
||||
@ -262,7 +265,7 @@ proc dbConnQuery*(
|
||||
return err("error in dbConnQuery calling sendQuery: " & $error)
|
||||
|
||||
let sendDuration = getTime().toUnixFloat() - queryStartTime
|
||||
query_time_secs.set(sendDuration, [querySummary, "sendToDBQuery"])
|
||||
query_time_secs.set(sendDuration, [metricLabel, "sendToDBQuery"])
|
||||
|
||||
queryStartTime = getTime().toUnixFloat()
|
||||
|
||||
@ -270,16 +273,16 @@ proc dbConnQuery*(
|
||||
return err("error in dbConnQuery calling waitQueryToFinish: " & $error)
|
||||
|
||||
let waitDuration = getTime().toUnixFloat() - queryStartTime
|
||||
query_time_secs.set(waitDuration, [querySummary, "waitFinish"])
|
||||
query_time_secs.set(waitDuration, [metricLabel, "waitFinish"])
|
||||
|
||||
query_count.inc(labelValues = [querySummary])
|
||||
query_count.inc(labelValues = [metricLabel])
|
||||
|
||||
if "insert" notin ($query).toLower():
|
||||
debug "dbConnQuery",
|
||||
requestId,
|
||||
query = $query,
|
||||
args,
|
||||
querySummary,
|
||||
metricLabel,
|
||||
waitDbQueryDurationSecs = waitDuration,
|
||||
sendToDBDurationSecs = sendDuration
|
||||
|
||||
@ -302,9 +305,8 @@ proc dbConnQueryPrepared*(
|
||||
error "error in dbConnQueryPrepared", error = $error
|
||||
return err("error in dbConnQueryPrepared calling sendQuery: " & $error)
|
||||
|
||||
let stmtNameSummary = stmtName[0 ..< min(stmtName.len, 128)]
|
||||
let sendDuration = getTime().toUnixFloat() - queryStartTime
|
||||
query_time_secs.set(sendDuration, [stmtNameSummary, "sendToDBQuery"])
|
||||
query_time_secs.set(sendDuration, [stmtName, "sendToDBQuery"])
|
||||
|
||||
queryStartTime = getTime().toUnixFloat()
|
||||
|
||||
@ -312,9 +314,9 @@ proc dbConnQueryPrepared*(
|
||||
return err("error in dbConnQueryPrepared calling waitQueryToFinish: " & $error)
|
||||
|
||||
let waitDuration = getTime().toUnixFloat() - queryStartTime
|
||||
query_time_secs.set(waitDuration, [stmtNameSummary, "waitFinish"])
|
||||
query_time_secs.set(waitDuration, [stmtName, "waitFinish"])
|
||||
|
||||
query_count.inc(labelValues = [stmtNameSummary])
|
||||
query_count.inc(labelValues = [stmtName])
|
||||
|
||||
if "insert" notin stmtName.toLower():
|
||||
debug "dbConnQueryPrepared",
|
||||
|
||||
@ -5,3 +5,27 @@ declarePublicGauge query_time_secs,
|
||||
|
||||
declarePublicCounter query_count,
|
||||
"number of times a query is being performed", labels = ["query"]
|
||||
|
||||
## Maps parts of the possible known queries with a fixed and shorter query label.
|
||||
const QueriesToMetricMap* = {
|
||||
"contentTopic IN": "content_topic",
|
||||
"SELECT version()": "select_version",
|
||||
"WITH min_timestamp": "messages_lookup",
|
||||
"SELECT messageHash FROM messages WHERE pubsubTopic = ? AND timestamp >= ? AND timestamp <= ? ORDER BY timestamp DESC, messageHash DESC LIMIT ?":
|
||||
"msg_hash_no_ctopic",
|
||||
"AS partition_name": "get_partitions_list",
|
||||
"SELECT COUNT(1) FROM messages": "count_msgs",
|
||||
"SELECT messageHash FROM messages WHERE (timestamp, messageHash) < (?,?) AND pubsubTopic = ? AND timestamp >= ? AND timestamp <= ? ORDER BY timestamp DESC, messageHash DESC LIMIT ?":
|
||||
"msg_hash_with_cursor",
|
||||
"SELECT pg_database_size(current_database())": "get_database_size",
|
||||
"DELETE FROM messages_lookup WHERE timestamp": "delete_from_msgs_lookup",
|
||||
"DROP TABLE messages_": "drop_partition_table",
|
||||
"ALTER TABLE messages DETACH PARTITION": "detach_partition",
|
||||
"SELECT pg_size_pretty(pg_total_relation_size(C.oid))": "get_partition_size",
|
||||
"pg_try_advisory_lock": "try_advisory_lock",
|
||||
"SELECT messageHash FROM messages ORDER BY timestamp DESC, messageHash DESC LIMIT ?":
|
||||
"get_all_msg_hash",
|
||||
"SELECT pg_advisory_unlock": "advisory_unlock",
|
||||
"ANALYZE messages": "analyze_messages",
|
||||
"SELECT EXISTS": "check_version_table_exists",
|
||||
}
|
||||
|
||||
@ -39,11 +39,11 @@ proc sendStoreRequest(
|
||||
return err(StoreError(kind: ErrorCode.BAD_RESPONSE, cause: error.msg))
|
||||
|
||||
let res = StoreQueryResponse.decode(buf).valueOr:
|
||||
waku_store_errors.inc(labelValues = [decodeRpcFailure])
|
||||
return err(StoreError(kind: ErrorCode.BAD_RESPONSE, cause: decodeRpcFailure))
|
||||
waku_store_errors.inc(labelValues = [DecodeRpcFailure])
|
||||
return err(StoreError(kind: ErrorCode.BAD_RESPONSE, cause: DecodeRpcFailure))
|
||||
|
||||
if res.statusCode != uint32(StatusCode.SUCCESS):
|
||||
waku_store_errors.inc(labelValues = [res.statusDesc])
|
||||
waku_store_errors.inc(labelValues = [NoSuccessStatusCode])
|
||||
return err(StoreError.new(res.statusCode, res.statusDesc))
|
||||
|
||||
return ok(res)
|
||||
@ -55,7 +55,7 @@ proc query*(
|
||||
return err(StoreError(kind: ErrorCode.BAD_REQUEST, cause: "invalid cursor"))
|
||||
|
||||
let connection = (await self.peerManager.dialPeer(peer, WakuStoreCodec)).valueOr:
|
||||
waku_store_errors.inc(labelValues = [dialFailure])
|
||||
waku_store_errors.inc(labelValues = [DialFailure])
|
||||
|
||||
return err(StoreError(kind: ErrorCode.PEER_DIAL_FAILURE, address: $peer))
|
||||
|
||||
@ -74,7 +74,7 @@ proc queryToAny*(
|
||||
return err(StoreError(kind: BAD_RESPONSE, cause: "no service store peer connected"))
|
||||
|
||||
let connection = (await self.peerManager.dialPeer(peer, WakuStoreCodec)).valueOr:
|
||||
waku_store_errors.inc(labelValues = [dialFailure])
|
||||
waku_store_errors.inc(labelValues = [DialFailure])
|
||||
|
||||
return err(StoreError(kind: ErrorCode.PEER_DIAL_FAILURE, address: $peer))
|
||||
|
||||
|
||||
@ -45,7 +45,7 @@ proc handleQueryRequest(
|
||||
|
||||
let req = StoreQueryRequest.decode(raw_request).valueOr:
|
||||
error "failed to decode rpc", peerId = requestor, error = $error
|
||||
waku_store_errors.inc(labelValues = [decodeRpcFailure])
|
||||
waku_store_errors.inc(labelValues = [DecodeRpcFailure])
|
||||
|
||||
res.statusCode = uint32(ErrorCode.BAD_REQUEST)
|
||||
res.statusDesc = "decoding rpc failed: " & $error
|
||||
|
||||
@ -12,8 +12,9 @@ declarePublicGauge waku_store_time_seconds,
|
||||
|
||||
# Error types (metric label values)
|
||||
const
|
||||
dialFailure* = "dial_failure"
|
||||
decodeRpcFailure* = "decode_rpc_failure"
|
||||
peerNotFoundFailure* = "peer_not_found_failure"
|
||||
emptyRpcQueryFailure* = "empty_rpc_query_failure"
|
||||
emptyRpcResponseFailure* = "empty_rpc_response_failure"
|
||||
DialFailure* = "dial_failure"
|
||||
DecodeRpcFailure* = "decode_rpc_failure"
|
||||
PeerNotFoundFailure* = "peer_not_found_failure"
|
||||
EmptyRpcQueryFailure* = "empty_rpc_query_failure"
|
||||
EmptyRpcResponseFailure* = "empty_rpc_response_failure"
|
||||
NoSuccessStatusCode* = "status_code_no_success"
|
||||
|
||||
@ -13,8 +13,8 @@ declarePublicGauge waku_legacy_store_time_seconds,
|
||||
|
||||
# Error types (metric label values)
|
||||
const
|
||||
dialFailure* = "dial_failure"
|
||||
decodeRpcFailure* = "decode_rpc_failure"
|
||||
peerNotFoundFailure* = "peer_not_found_failure"
|
||||
emptyRpcQueryFailure* = "empty_rpc_query_failure"
|
||||
emptyRpcResponseFailure* = "empty_rpc_response_failure"
|
||||
dialFailure* = "dial_failure_legacy"
|
||||
decodeRpcFailure* = "decode_rpc_failure_legacy"
|
||||
peerNotFoundFailure* = "peer_not_found_failure_legacy"
|
||||
emptyRpcQueryFailure* = "empty_rpc_query_failure_legacy"
|
||||
emptyRpcResponseFailure* = "empty_rpc_response_failure_legacy"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user