mirror of https://github.com/waku-org/nwaku.git
chore: enhance libpq management (#3015)
* db_postgres: register pg socket fd to chronos better data available awareness * waku_store protocol: better logs to track time and new store metrics
This commit is contained in:
parent
01fe4d1847
commit
45319f09c9
|
@ -1,4 +1,4 @@
|
|||
import std/[times, strutils], results, chronos
|
||||
import std/[times, strutils, asyncnet, os, sequtils], results, chronos
|
||||
|
||||
include db_connector/db_postgres
|
||||
|
||||
|
@ -33,7 +33,17 @@ proc open*(connString: string): Result[DbConn, string] =
|
|||
|
||||
return err("unknown reason")
|
||||
|
||||
ok(conn)
|
||||
## registering the socket fd in chronos for better wait for data
|
||||
let asyncFd = cast[asyncengine.AsyncFD](pqsocket(conn))
|
||||
asyncengine.register(asyncFd)
|
||||
|
||||
return ok(conn)
|
||||
|
||||
proc closeDbConn*(db: DbConn) {.raises: [OSError].} =
|
||||
let fd = db.pqsocket()
|
||||
if fd != -1:
|
||||
asyncengine.unregister(cast[asyncengine.AsyncFD](fd))
|
||||
db.close()
|
||||
|
||||
proc sendQuery(
|
||||
db: DbConn, query: SqlQuery, args: seq[string]
|
||||
|
@ -112,23 +122,17 @@ proc waitQueryToFinish(
|
|||
## The 'rowCallback' param is != nil when the underlying query wants to retrieve results (SELECT.)
|
||||
## For other queries, like "INSERT", 'rowCallback' should be nil.
|
||||
|
||||
while db.pqisBusy() == 1:
|
||||
## TODO: Enhance performance in concurrent queries.
|
||||
## The connection keeps busy for quite a long time when performing intense concurrect queries.
|
||||
## For example, a given query can last 11 milliseconds within from the database point of view
|
||||
## but, on the other hand, the connection remains in "db.pqisBusy() == 1" for 100ms more.
|
||||
## I think this is because `nwaku` is single-threaded and it has to handle many connections (20)
|
||||
## simultaneously. Therefore, there is an underlying resource sharing (cpu) that makes this
|
||||
## to happen. Notice that the _Postgres_ database spawns one process per each connection.
|
||||
let success = db.pqconsumeInput()
|
||||
var dataAvailable = false
|
||||
proc onDataAvailable(udata: pointer) {.gcsafe, raises: [].} =
|
||||
dataAvailable = true
|
||||
|
||||
if success != 1:
|
||||
db.check().isOkOr:
|
||||
return err("failed pqconsumeInput: " & $error)
|
||||
let asyncFd = cast[asyncengine.AsyncFD](pqsocket(db))
|
||||
|
||||
return err("failed pqconsumeInput: unknown reason")
|
||||
asyncengine.addReader2(asyncFd, onDataAvailable).isOkOr:
|
||||
return err("failed to add event reader in waitQueryToFinish: " & $error)
|
||||
|
||||
await sleepAsync(timer.milliseconds(0)) # Do not block the async runtime
|
||||
while not dataAvailable:
|
||||
await sleepAsync(timer.milliseconds(1))
|
||||
|
||||
## Now retrieve the result
|
||||
while true:
|
||||
|
|
|
@ -10,7 +10,7 @@ type PgAsyncPoolState {.pure.} = enum
|
|||
Live
|
||||
Closing
|
||||
|
||||
type PgDbConn = object
|
||||
type PgDbConn = ref object
|
||||
dbConn: DbConn
|
||||
open: bool
|
||||
busy: bool
|
||||
|
@ -76,14 +76,11 @@ proc close*(pool: PgAsyncPool): Future[Result[void, string]] {.async.} =
|
|||
if pool.conns[i].busy:
|
||||
continue
|
||||
|
||||
if pool.conns[i].open:
|
||||
pool.conns[i].dbConn.close()
|
||||
pool.conns[i].busy = false
|
||||
pool.conns[i].open = false
|
||||
|
||||
for i in 0 ..< pool.conns.len:
|
||||
if pool.conns[i].open:
|
||||
pool.conns[i].dbConn.close()
|
||||
pool.conns[i].dbConn.closeDbConn()
|
||||
pool.conns[i].busy = false
|
||||
pool.conns[i].open = false
|
||||
|
||||
pool.conns.setLen(0)
|
||||
pool.state = PgAsyncPoolState.Closed
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/options,
|
||||
std/[options, times],
|
||||
results,
|
||||
chronicles,
|
||||
chronos,
|
||||
|
@ -36,9 +36,11 @@ type WakuStore* = ref object of LPProtocol
|
|||
|
||||
## Protocol
|
||||
|
||||
type StoreResp = tuple[resp: seq[byte], requestId: string]
|
||||
|
||||
proc handleQueryRequest(
|
||||
self: WakuStore, requestor: PeerId, raw_request: seq[byte]
|
||||
): Future[seq[byte]] {.async.} =
|
||||
): Future[StoreResp] {.async.} =
|
||||
var res = StoreQueryResponse()
|
||||
|
||||
let req = StoreQueryRequest.decode(raw_request).valueOr:
|
||||
|
@ -48,7 +50,7 @@ proc handleQueryRequest(
|
|||
res.statusCode = uint32(ErrorCode.BAD_REQUEST)
|
||||
res.statusDesc = "decoding rpc failed: " & $error
|
||||
|
||||
return res.encode().buffer
|
||||
return (res.encode().buffer, "not_parsed_requestId")
|
||||
|
||||
let requestId = req.requestId
|
||||
|
||||
|
@ -65,7 +67,7 @@ proc handleQueryRequest(
|
|||
res.statusCode = uint32(error.kind)
|
||||
res.statusDesc = $error
|
||||
|
||||
return res.encode().buffer
|
||||
return (res.encode().buffer, "not_parsed_requestId")
|
||||
|
||||
res.requestId = requestId
|
||||
res.statusCode = 200
|
||||
|
@ -74,7 +76,7 @@ proc handleQueryRequest(
|
|||
info "sending store query response",
|
||||
peerId = requestor, requestId = requestId, messages = res.messages.len
|
||||
|
||||
return res.encode().buffer
|
||||
return (res.encode().buffer, requestId)
|
||||
|
||||
proc initProtocolHandler(self: WakuStore) =
|
||||
let rejectReposnseBuffer = StoreQueryResponse(
|
||||
|
@ -87,7 +89,8 @@ proc initProtocolHandler(self: WakuStore) =
|
|||
).encode().buffer
|
||||
|
||||
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||
var resBuf: seq[byte]
|
||||
var successfulQuery = false ## only consider the correct queries in metrics
|
||||
var resBuf: StoreResp
|
||||
self.requestRateLimiter.checkUsageLimit(WakuStoreCodec, conn):
|
||||
let readRes = catch:
|
||||
await conn.readLp(DefaultMaxRpcSize.int)
|
||||
|
@ -100,21 +103,34 @@ proc initProtocolHandler(self: WakuStore) =
|
|||
amount = reqBuf.len().int64, labelValues = [WakuStoreCodec, "in"]
|
||||
)
|
||||
|
||||
let queryStartTime = getTime().toUnixFloat()
|
||||
|
||||
resBuf = await self.handleQueryRequest(conn.peerId, reqBuf)
|
||||
|
||||
let queryDuration = getTime().toUnixFloat() - queryStartTime
|
||||
waku_store_time_seconds.inc(amount = queryDuration, labelValues = ["query-db"])
|
||||
successfulQuery = true
|
||||
do:
|
||||
debug "store query request rejected due rate limit exceeded",
|
||||
peerId = conn.peerId, limit = $self.requestRateLimiter.setting
|
||||
resBuf = rejectReposnseBuffer
|
||||
resBuf = (rejectReposnseBuffer, "rejected")
|
||||
|
||||
let writeRespStartTime = getTime().toUnixFloat()
|
||||
|
||||
let writeRes = catch:
|
||||
await conn.writeLp(resBuf)
|
||||
await conn.writeLp(resBuf.resp)
|
||||
|
||||
if writeRes.isErr():
|
||||
error "Connection write error", error = writeRes.error.msg
|
||||
return
|
||||
|
||||
debug "after sending response", requestId = resBuf.requestId
|
||||
if successfulQuery:
|
||||
let writeDuration = getTime().toUnixFloat() - writeRespStartTime
|
||||
waku_store_time_seconds.inc(amount = writeDuration, labelValues = ["send-resp"])
|
||||
|
||||
waku_service_network_bytes.inc(
|
||||
amount = resBuf.len().int64, labelValues = [WakuStoreCodec, "out"]
|
||||
amount = resBuf.resp.len().int64, labelValues = [WakuStoreCodec, "out"]
|
||||
)
|
||||
|
||||
self.handler = handler
|
||||
|
|
|
@ -5,6 +5,11 @@ import metrics
|
|||
declarePublicGauge waku_store_errors, "number of store protocol errors", ["type"]
|
||||
declarePublicGauge waku_store_queries, "number of store queries received"
|
||||
|
||||
## f.e., we have the "query" phase, where the node performs the query to the database,
|
||||
## and the "libp2p" phase, where the node writes the store response to the libp2p stream.
|
||||
declarePublicGauge waku_store_time_seconds,
|
||||
"Time in seconds spent by each store phase", labels = ["phase"]
|
||||
|
||||
# Error types (metric label values)
|
||||
const
|
||||
dialFailure* = "dial_failure"
|
||||
|
|
Loading…
Reference in New Issue