diff --git a/waku/common/databases/db_postgres/dbconn.nim b/waku/common/databases/db_postgres/dbconn.nim index aae8d80bc..6ed26d928 100644 --- a/waku/common/databases/db_postgres/dbconn.nim +++ b/waku/common/databases/db_postgres/dbconn.nim @@ -230,7 +230,7 @@ proc dbConnQuery*( return err("error in dbConnQuery calling sendQuery: " & $error) let sendDuration = getTime().toUnixFloat() - queryStartTime - query_time_secs.set(sendDuration, [querySummary, "sendQuery"]) + query_time_secs.set(sendDuration, [querySummary, "sendToDBQuery"]) queryStartTime = getTime().toUnixFloat() @@ -246,9 +246,10 @@ proc dbConnQuery*( debug "dbConnQuery", requestId, query = $query, + args, querySummary, - waitDurationSecs = waitDuration, - sendDurationSecs = sendDuration + waitDbQueryDurationSecs = waitDuration, + sendToDBDurationSecs = sendDuration return ok() @@ -270,7 +271,7 @@ proc dbConnQueryPrepared*( return err("error in dbConnQueryPrepared calling sendQuery: " & $error) let sendDuration = getTime().toUnixFloat() - queryStartTime - query_time_secs.set(sendDuration, [stmtName, "sendQuery"]) + query_time_secs.set(sendDuration, [stmtName, "sendToDBQuery"]) queryStartTime = getTime().toUnixFloat() @@ -286,7 +287,8 @@ proc dbConnQueryPrepared*( debug "dbConnQueryPrepared", requestId, stmtName, - waitDurationSecs = waitDuration, - sendDurationSecs = sendDuration + paramValues, + waitDbQueryDurationSecs = waitDuration, + sendToDBDurationSecs = sendDuration return ok() diff --git a/waku/waku_store/protocol.nim b/waku/waku_store/protocol.nim index 5a8a81c13..5f986983e 100644 --- a/waku/waku_store/protocol.nim +++ b/waku/waku_store/protocol.nim @@ -91,6 +91,7 @@ proc initProtocolHandler(self: WakuStore) = proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = var successfulQuery = false ## only consider the correct queries in metrics var resBuf: StoreResp + var queryDuration: float self.requestRateLimiter.checkUsageLimit(WakuStoreCodec, conn): let readRes = catch: await conn.readLp(DefaultMaxRpcSize.int) @@ -107,7 +108,7 @@ proc initProtocolHandler(self: WakuStore) = resBuf = await self.handleQueryRequest(conn.peerId, reqBuf) - let queryDuration = getTime().toUnixFloat() - queryStartTime + queryDuration = getTime().toUnixFloat() - queryStartTime waku_store_time_seconds.set(queryDuration, ["query-db-time"]) successfulQuery = true do: @@ -124,10 +125,13 @@ proc initProtocolHandler(self: WakuStore) = error "Connection write error", error = writeRes.error.msg return - debug "after sending response", requestId = resBuf.requestId if successfulQuery: let writeDuration = getTime().toUnixFloat() - writeRespStartTime waku_store_time_seconds.set(writeDuration, ["send-store-resp-time"]) + debug "after sending response", + requestId = resBuf.requestId, + queryDurationSecs = queryDuration, + writeStreamDurationSecs = writeDuration waku_service_network_bytes.inc( amount = resBuf.resp.len().int64, labelValues = [WakuStoreCodec, "out"] diff --git a/waku/waku_store_legacy/protocol.nim b/waku/waku_store_legacy/protocol.nim index 4e06419b4..d72308e63 100644 --- a/waku/waku_store_legacy/protocol.nim +++ b/waku/waku_store_legacy/protocol.nim @@ -37,23 +37,23 @@ type WakuStore* = ref object of LPProtocol ## Protocol +type StoreResp = tuple[resp: seq[byte], requestId: string] + proc handleLegacyQueryRequest( self: WakuStore, requestor: PeerId, raw_request: seq[byte] -): Future[seq[byte]] {.async.} = +): Future[StoreResp] {.async.} = let decodeRes = HistoryRPC.decode(raw_request) if decodeRes.isErr(): - error "failed to decode rpc", peerId = requestor + error "failed to decode rpc", peerId = requestor, error = $decodeRes.error waku_legacy_store_errors.inc(labelValues = [decodeRpcFailure]) - # TODO: Return (BAD_REQUEST, cause: "decode rpc failed") - return + return (newSeq[byte](), "failed to decode rpc") let reqRpc = decodeRes.value if reqRpc.query.isNone(): error "empty query rpc", peerId = requestor, requestId = reqRpc.requestId waku_legacy_store_errors.inc(labelValues = [emptyRpcQueryFailure]) - # TODO: Return (BAD_REQUEST, cause: "empty query") - return + return (newSeq[byte](), "empty query rpc") let requestId = reqRpc.requestId var request = reqRpc.query.get().toAPI() @@ -72,21 +72,30 @@ proc handleLegacyQueryRequest( let error = HistoryError(kind: HistoryErrorKind.UNKNOWN).toRPC() let response = HistoryResponseRPC(error: error) - return HistoryRPC(requestId: requestId, response: some(response)).encode().buffer + return ( + HistoryRPC(requestId: requestId, response: some(response)).encode().buffer, + requestId, + ) if responseRes.isErr(): error "history query failed", peerId = requestor, requestId = requestId, error = responseRes.error let response = responseRes.toRPC() - return HistoryRPC(requestId: requestId, response: some(response)).encode().buffer + return ( + HistoryRPC(requestId: requestId, response: some(response)).encode().buffer, + requestId, + ) let response = responseRes.toRPC() info "sending history response", peerId = requestor, requestId = requestId, messages = response.messages.len - return HistoryRPC(requestId: requestId, response: some(response)).encode().buffer + return ( + HistoryRPC(requestId: requestId, response: some(response)).encode().buffer, + requestId, + ) proc initProtocolHandler(ws: WakuStore) = let rejectResponseBuf = HistoryRPC( @@ -103,7 +112,8 @@ proc initProtocolHandler(ws: WakuStore) = proc handler(conn: Connection, proto: string) {.async, closure.} = var successfulQuery = false ## only consider the correct queries in metrics - var resBuf: seq[byte] + var resBuf: StoreResp + var queryDuration: float ws.requestRateLimiter.checkUsageLimit(WakuLegacyStoreCodec, conn): let readRes = catch: await conn.readLp(DefaultMaxRpcSize.int) @@ -118,17 +128,17 @@ proc initProtocolHandler(ws: WakuStore) = let queryStartTime = getTime().toUnixFloat() resBuf = await ws.handleLegacyQueryRequest(conn.peerId, reqBuf) - let queryDuration = getTime().toUnixFloat() - queryStartTime + queryDuration = getTime().toUnixFloat() - queryStartTime waku_legacy_store_time_seconds.set(queryDuration, ["query-db-time"]) successfulQuery = true do: debug "Legacy store query request rejected due rate limit exceeded", peerId = conn.peerId, limit = $ws.requestRateLimiter.setting - resBuf = rejectResponseBuf + resBuf = (rejectResponseBuf, "rejected") let writeRespStartTime = getTime().toUnixFloat() let writeRes = catch: - await conn.writeLp(resBuf) + await conn.writeLp(resBuf.resp) if writeRes.isErr(): error "Connection write error", error = writeRes.error.msg @@ -137,9 +147,13 @@ proc initProtocolHandler(ws: WakuStore) = if successfulQuery: let writeDuration = getTime().toUnixFloat() - writeRespStartTime waku_legacy_store_time_seconds.set(writeDuration, ["send-store-resp-time"]) + debug "after sending response", + requestId = resBuf.requestId, + queryDurationSecs = queryDuration, + writeStreamDurationSecs = writeDuration waku_service_network_bytes.inc( - amount = resBuf.len().int64, labelValues = [WakuLegacyStoreCodec, "out"] + amount = resBuf.resp.len().int64, labelValues = [WakuLegacyStoreCodec, "out"] ) ws.handler = handler