mirror of https://github.com/waku-org/nwaku.git
chore: extending store metrics (#3042)
* adding query_metrics module * update fleet-dashboard with new store panels for better timing insight
This commit is contained in:
parent
b534a1c257
commit
26a488d522
|
@ -4,7 +4,9 @@ type WakuCallBack* = proc(
|
||||||
callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer
|
callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer
|
||||||
) {.cdecl, gcsafe, raises: [].}
|
) {.cdecl, gcsafe, raises: [].}
|
||||||
|
|
||||||
template checkLibwakuParams*(ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer) =
|
template checkLibwakuParams*(
|
||||||
|
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
|
||||||
|
) =
|
||||||
ctx[].userData = userData
|
ctx[].userData = userData
|
||||||
|
|
||||||
if isNil(callback):
|
if isNil(callback):
|
||||||
|
|
|
@ -487,9 +487,7 @@ proc waku_get_peerids_from_peerstore(
|
||||||
let connRes = waku_thread.sendRequestToWakuThread(
|
let connRes = waku_thread.sendRequestToWakuThread(
|
||||||
ctx,
|
ctx,
|
||||||
RequestType.PEER_MANAGER,
|
RequestType.PEER_MANAGER,
|
||||||
PeerManagementRequest.createShared(
|
PeerManagementRequest.createShared(PeerManagementMsgType.GET_ALL_PEER_IDS),
|
||||||
PeerManagementMsgType.GET_ALL_PEER_IDS
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
if connRes.isErr():
|
if connRes.isErr():
|
||||||
let msg = $connRes.error
|
let msg = $connRes.error
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1,4 +1,5 @@
|
||||||
import std/[times, strutils, asyncnet, os, sequtils], results, chronos
|
import std/[times, strutils, asyncnet, os, sequtils], results, chronos, metrics, re
|
||||||
|
import ./query_metrics
|
||||||
|
|
||||||
include db_connector/db_postgres
|
include db_connector/db_postgres
|
||||||
|
|
||||||
|
@ -45,6 +46,9 @@ proc closeDbConn*(db: DbConn) {.raises: [OSError].} =
|
||||||
asyncengine.unregister(cast[asyncengine.AsyncFD](fd))
|
asyncengine.unregister(cast[asyncengine.AsyncFD](fd))
|
||||||
db.close()
|
db.close()
|
||||||
|
|
||||||
|
proc `$`(self: SqlQuery): string =
|
||||||
|
return cast[string](self)
|
||||||
|
|
||||||
proc sendQuery(
|
proc sendQuery(
|
||||||
db: DbConn, query: SqlQuery, args: seq[string]
|
db: DbConn, query: SqlQuery, args: seq[string]
|
||||||
): Future[Result[void, string]] {.async.} =
|
): Future[Result[void, string]] {.async.} =
|
||||||
|
@ -152,12 +156,32 @@ proc waitQueryToFinish(
|
||||||
proc dbConnQuery*(
|
proc dbConnQuery*(
|
||||||
db: DbConn, query: SqlQuery, args: seq[string], rowCallback: DataProc
|
db: DbConn, query: SqlQuery, args: seq[string], rowCallback: DataProc
|
||||||
): Future[Result[void, string]] {.async, gcsafe.} =
|
): Future[Result[void, string]] {.async, gcsafe.} =
|
||||||
|
let cleanedQuery = ($query).replace(" ", "").replace("\n", "")
|
||||||
|
## remove everything between ' or " all possible sequence of numbers. e.g. rm partition partition
|
||||||
|
var querySummary = cleanedQuery.replace(re"""(['"]).*?\1""", "")
|
||||||
|
querySummary = querySummary.replace(re"\d+", "")
|
||||||
|
querySummary = "query_tag_" & querySummary[0 ..< min(querySummary.len, 200)]
|
||||||
|
|
||||||
|
var queryStartTime = getTime().toUnixFloat()
|
||||||
|
|
||||||
(await db.sendQuery(query, args)).isOkOr:
|
(await db.sendQuery(query, args)).isOkOr:
|
||||||
return err("error in dbConnQuery calling sendQuery: " & $error)
|
return err("error in dbConnQuery calling sendQuery: " & $error)
|
||||||
|
|
||||||
|
query_time_secs.set(
|
||||||
|
getTime().toUnixFloat() - queryStartTime, [querySummary, "sendQuery"]
|
||||||
|
)
|
||||||
|
|
||||||
|
queryStartTime = getTime().toUnixFloat()
|
||||||
|
|
||||||
(await db.waitQueryToFinish(rowCallback)).isOkOr:
|
(await db.waitQueryToFinish(rowCallback)).isOkOr:
|
||||||
return err("error in dbConnQuery calling waitQueryToFinish: " & $error)
|
return err("error in dbConnQuery calling waitQueryToFinish: " & $error)
|
||||||
|
|
||||||
|
query_time_secs.set(
|
||||||
|
getTime().toUnixFloat() - queryStartTime, [querySummary, "waitFinish"]
|
||||||
|
)
|
||||||
|
|
||||||
|
query_count.inc(labelValues = [querySummary])
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
proc dbConnQueryPrepared*(
|
proc dbConnQueryPrepared*(
|
||||||
|
@ -168,10 +192,21 @@ proc dbConnQueryPrepared*(
|
||||||
paramFormats: seq[int32],
|
paramFormats: seq[int32],
|
||||||
rowCallback: DataProc,
|
rowCallback: DataProc,
|
||||||
): Future[Result[void, string]] {.async, gcsafe.} =
|
): Future[Result[void, string]] {.async, gcsafe.} =
|
||||||
|
var queryStartTime = getTime().toUnixFloat()
|
||||||
db.sendQueryPrepared(stmtName, paramValues, paramLengths, paramFormats).isOkOr:
|
db.sendQueryPrepared(stmtName, paramValues, paramLengths, paramFormats).isOkOr:
|
||||||
return err("error in dbConnQueryPrepared calling sendQuery: " & $error)
|
return err("error in dbConnQueryPrepared calling sendQuery: " & $error)
|
||||||
|
|
||||||
|
query_time_secs.set(getTime().toUnixFloat() - queryStartTime, [stmtName, "sendQuery"])
|
||||||
|
|
||||||
|
queryStartTime = getTime().toUnixFloat()
|
||||||
|
|
||||||
(await db.waitQueryToFinish(rowCallback)).isOkOr:
|
(await db.waitQueryToFinish(rowCallback)).isOkOr:
|
||||||
return err("error in dbConnQueryPrepared calling waitQueryToFinish: " & $error)
|
return err("error in dbConnQueryPrepared calling waitQueryToFinish: " & $error)
|
||||||
|
|
||||||
|
query_time_secs.set(
|
||||||
|
getTime().toUnixFloat() - queryStartTime, [stmtName, "waitFinish"]
|
||||||
|
)
|
||||||
|
|
||||||
|
query_count.inc(labelValues = [stmtName])
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
|
@ -0,0 +1,7 @@
|
||||||
|
import metrics
|
||||||
|
|
||||||
|
declarePublicGauge query_time_secs,
|
||||||
|
"query time measured in nanoseconds", labels = ["query", "phase"]
|
||||||
|
|
||||||
|
declarePublicCounter query_count,
|
||||||
|
"number of times a query is being performed", labels = ["query"]
|
|
@ -108,7 +108,7 @@ proc initProtocolHandler(self: WakuStore) =
|
||||||
resBuf = await self.handleQueryRequest(conn.peerId, reqBuf)
|
resBuf = await self.handleQueryRequest(conn.peerId, reqBuf)
|
||||||
|
|
||||||
let queryDuration = getTime().toUnixFloat() - queryStartTime
|
let queryDuration = getTime().toUnixFloat() - queryStartTime
|
||||||
waku_store_time_seconds.inc(amount = queryDuration, labelValues = ["query-db"])
|
waku_store_time_seconds.set(queryDuration, ["query-db-time"])
|
||||||
successfulQuery = true
|
successfulQuery = true
|
||||||
do:
|
do:
|
||||||
debug "store query request rejected due rate limit exceeded",
|
debug "store query request rejected due rate limit exceeded",
|
||||||
|
@ -127,7 +127,7 @@ proc initProtocolHandler(self: WakuStore) =
|
||||||
debug "after sending response", requestId = resBuf.requestId
|
debug "after sending response", requestId = resBuf.requestId
|
||||||
if successfulQuery:
|
if successfulQuery:
|
||||||
let writeDuration = getTime().toUnixFloat() - writeRespStartTime
|
let writeDuration = getTime().toUnixFloat() - writeRespStartTime
|
||||||
waku_store_time_seconds.inc(amount = writeDuration, labelValues = ["send-resp"])
|
waku_store_time_seconds.set(writeDuration, ["send-store-resp-time"])
|
||||||
|
|
||||||
waku_service_network_bytes.inc(
|
waku_service_network_bytes.inc(
|
||||||
amount = resBuf.resp.len().int64, labelValues = [WakuStoreCodec, "out"]
|
amount = resBuf.resp.len().int64, labelValues = [WakuStoreCodec, "out"]
|
||||||
|
|
|
@ -5,8 +5,8 @@ import metrics
|
||||||
declarePublicGauge waku_store_errors, "number of store protocol errors", ["type"]
|
declarePublicGauge waku_store_errors, "number of store protocol errors", ["type"]
|
||||||
declarePublicGauge waku_store_queries, "number of store queries received"
|
declarePublicGauge waku_store_queries, "number of store queries received"
|
||||||
|
|
||||||
## f.e., we have the "query" phase, where the node performs the query to the database,
|
## "query-db-time" phase considers the time when node performs the query to the database.
|
||||||
## and the "libp2p" phase, where the node writes the store response to the libp2p stream.
|
## "send-store-resp-time" phase is the time when node writes the store response to the store-client.
|
||||||
declarePublicGauge waku_store_time_seconds,
|
declarePublicGauge waku_store_time_seconds,
|
||||||
"Time in seconds spent by each store phase", labels = ["phase"]
|
"Time in seconds spent by each store phase", labels = ["phase"]
|
||||||
|
|
||||||
|
|
|
@ -4,7 +4,7 @@
|
||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/options,
|
std/[options, times],
|
||||||
results,
|
results,
|
||||||
chronicles,
|
chronicles,
|
||||||
chronos,
|
chronos,
|
||||||
|
@ -102,6 +102,7 @@ proc initProtocolHandler(ws: WakuStore) =
|
||||||
).encode().buffer
|
).encode().buffer
|
||||||
|
|
||||||
proc handler(conn: Connection, proto: string) {.async, closure.} =
|
proc handler(conn: Connection, proto: string) {.async, closure.} =
|
||||||
|
var successfulQuery = false ## only consider the correct queries in metrics
|
||||||
var resBuf: seq[byte]
|
var resBuf: seq[byte]
|
||||||
ws.requestRateLimiter.checkUsageLimit(WakuLegacyStoreCodec, conn):
|
ws.requestRateLimiter.checkUsageLimit(WakuLegacyStoreCodec, conn):
|
||||||
let readRes = catch:
|
let readRes = catch:
|
||||||
|
@ -115,12 +116,17 @@ proc initProtocolHandler(ws: WakuStore) =
|
||||||
amount = reqBuf.len().int64, labelValues = [WakuLegacyStoreCodec, "in"]
|
amount = reqBuf.len().int64, labelValues = [WakuLegacyStoreCodec, "in"]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
let queryStartTime = getTime().toUnixFloat()
|
||||||
resBuf = await ws.handleLegacyQueryRequest(conn.peerId, reqBuf)
|
resBuf = await ws.handleLegacyQueryRequest(conn.peerId, reqBuf)
|
||||||
|
let queryDuration = getTime().toUnixFloat() - queryStartTime
|
||||||
|
waku_legacy_store_time_seconds.set(queryDuration, ["query-db-time"])
|
||||||
|
successfulQuery = true
|
||||||
do:
|
do:
|
||||||
debug "Legacy store query request rejected due rate limit exceeded",
|
debug "Legacy store query request rejected due rate limit exceeded",
|
||||||
peerId = conn.peerId, limit = $ws.requestRateLimiter.setting
|
peerId = conn.peerId, limit = $ws.requestRateLimiter.setting
|
||||||
resBuf = rejectResponseBuf
|
resBuf = rejectResponseBuf
|
||||||
|
|
||||||
|
let writeRespStartTime = getTime().toUnixFloat()
|
||||||
let writeRes = catch:
|
let writeRes = catch:
|
||||||
await conn.writeLp(resBuf)
|
await conn.writeLp(resBuf)
|
||||||
|
|
||||||
|
@ -128,6 +134,10 @@ proc initProtocolHandler(ws: WakuStore) =
|
||||||
error "Connection write error", error = writeRes.error.msg
|
error "Connection write error", error = writeRes.error.msg
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if successfulQuery:
|
||||||
|
let writeDuration = getTime().toUnixFloat() - writeRespStartTime
|
||||||
|
waku_legacy_store_time_seconds.set(writeDuration, ["send-store-resp-time"])
|
||||||
|
|
||||||
waku_service_network_bytes.inc(
|
waku_service_network_bytes.inc(
|
||||||
amount = resBuf.len().int64, labelValues = [WakuLegacyStoreCodec, "out"]
|
amount = resBuf.len().int64, labelValues = [WakuLegacyStoreCodec, "out"]
|
||||||
)
|
)
|
||||||
|
|
|
@ -6,6 +6,11 @@ declarePublicGauge waku_legacy_store_errors,
|
||||||
"number of legacy store protocol errors", ["type"]
|
"number of legacy store protocol errors", ["type"]
|
||||||
declarePublicGauge waku_legacy_store_queries, "number of legacy store queries received"
|
declarePublicGauge waku_legacy_store_queries, "number of legacy store queries received"
|
||||||
|
|
||||||
|
## "query-db-time" phase considers the time when node performs the query to the database.
|
||||||
|
## "send-store-resp-time" phase is the time when node writes the store response to the store-client.
|
||||||
|
declarePublicGauge waku_legacy_store_time_seconds,
|
||||||
|
"Time in seconds spent by each store phase", labels = ["phase"]
|
||||||
|
|
||||||
# Error types (metric label values)
|
# Error types (metric label values)
|
||||||
const
|
const
|
||||||
dialFailure* = "dial_failure"
|
dialFailure* = "dial_failure"
|
||||||
|
|
Loading…
Reference in New Issue