From e8a49b76b27ccbc9ea891341ddab771c92651816 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Thu, 29 Aug 2024 22:56:14 +0200 Subject: [PATCH] chore: Better timing and requestId detail for slower store db queries (#2994) * Better timing and requestId detail for store db queries slower than two seconds * Adapt tests and client to allow sending custom store requestId --- tests/waku_store_legacy/test_client.nim | 7 ++++ tests/waku_store_legacy/test_waku_store.nim | 8 +++-- .../databases/db_postgres/pgasyncpool.nim | 21 +++++++++-- waku/node/waku_node.nim | 2 ++ waku/waku_archive/archive.nim | 1 + waku/waku_archive/common.nim | 1 + waku/waku_archive/driver.nim | 1 + .../postgres_driver/postgres_driver.nim | 32 ++++++++++++----- .../driver/queue_driver/queue_driver.nim | 1 + .../driver/sqlite_driver/sqlite_driver.nim | 1 + waku/waku_archive_legacy/archive.nim | 2 ++ waku/waku_archive_legacy/common.nim | 1 + waku/waku_archive_legacy/driver.nim | 2 ++ .../postgres_driver/postgres_driver.nim | 35 ++++++++++++++----- .../driver/queue_driver/queue_driver.nim | 1 + .../driver/sqlite_driver/sqlite_driver.nim | 4 +-- waku/waku_store/client.nim | 3 +- waku/waku_store_legacy/client.nim | 8 ++++- waku/waku_store_legacy/common.nim | 1 + waku/waku_store_legacy/protocol.nim | 6 ++-- 20 files changed, 111 insertions(+), 27 deletions(-) diff --git a/tests/waku_store_legacy/test_client.nim b/tests/waku_store_legacy/test_client.nim index 57099e995..9e403dc21 100644 --- a/tests/waku_store_legacy/test_client.nim +++ b/tests/waku_store_legacy/test_client.nim @@ -44,6 +44,7 @@ suite "Store Client": pubsubTopic: some(DefaultPubsubTopic), contentTopics: @[DefaultContentTopic], direction: PagingDirection.FORWARD, + requestId: "customRequestId", ) serverSwitch = newTestSwitch() @@ -93,33 +94,39 @@ suite "Store Client": pubsubTopic: some(DefaultPubsubTopic), contentTopics: @[], direction: PagingDirection.FORWARD, + requestId: "reqId1", ) invalidQuery2 = HistoryQuery( pubsubTopic: PubsubTopic.none(), contentTopics: @[DefaultContentTopic], direction: PagingDirection.FORWARD, + requestId: "reqId2", ) invalidQuery3 = HistoryQuery( pubsubTopic: some(DefaultPubsubTopic), contentTopics: @[DefaultContentTopic], pageSize: 0, + requestId: "reqId3", ) invalidQuery4 = HistoryQuery( pubsubTopic: some(DefaultPubsubTopic), contentTopics: @[DefaultContentTopic], pageSize: 0, + requestId: "reqId4", ) invalidQuery5 = HistoryQuery( pubsubTopic: some(DefaultPubsubTopic), contentTopics: @[DefaultContentTopic], startTime: some(0.Timestamp), endTime: some(0.Timestamp), + requestId: "reqId5", ) invalidQuery6 = HistoryQuery( pubsubTopic: some(DefaultPubsubTopic), contentTopics: @[DefaultContentTopic], startTime: some(0.Timestamp), endTime: some(-1.Timestamp), + requestId: "reqId6", ) # When the query is sent to the server diff --git a/tests/waku_store_legacy/test_waku_store.nim b/tests/waku_store_legacy/test_waku_store.nim index a134645a9..8ff4eaf09 100644 --- a/tests/waku_store_legacy/test_waku_store.nim +++ b/tests/waku_store_legacy/test_waku_store.nim @@ -44,7 +44,9 @@ suite "Waku Store - query handler legacy": client = newTestWakuStoreClient(clientSwitch) let req = HistoryQuery( - contentTopics: @[DefaultContentTopic], direction: PagingDirection.FORWARD + contentTopics: @[DefaultContentTopic], + direction: PagingDirection.FORWARD, + requestId: "reqId", ) ## When @@ -96,7 +98,9 @@ suite "Waku Store - query handler legacy": client = newTestWakuStoreClient(clientSwitch) let req = HistoryQuery( - contentTopics: @[DefaultContentTopic], direction: PagingDirection.FORWARD + contentTopics: @[DefaultContentTopic], + direction: PagingDirection.FORWARD, + requestId: "reqId", ) info "check point" # log added to track flaky test diff --git a/waku/common/databases/db_postgres/pgasyncpool.nim b/waku/common/databases/db_postgres/pgasyncpool.nim index e9124f4fb..fb8bb3fff 100644 --- a/waku/common/databases/db_postgres/pgasyncpool.nim +++ b/waku/common/databases/db_postgres/pgasyncpool.nim @@ -2,8 +2,8 @@ # Inspired by: https://github.com/treeform/pg/ {.push raises: [].} -import std/[sequtils, nre, strformat, sets], results, chronos -import ./dbconn, ../common +import std/[sequtils, nre, strformat, sets], results, chronos, chronicles +import ./dbconn, ../common, ../../../waku_core/time type PgAsyncPoolState {.pure.} = enum Closed @@ -149,18 +149,26 @@ proc releaseConn(pool: PgAsyncPool, conn: DbConn) = if pool.conns[i].dbConn == conn: pool.conns[i].busy = false +const SlowQueryThresholdInNanoSeconds = 2_000_000_000 + proc pgQuery*( pool: PgAsyncPool, query: string, args: seq[string] = newSeq[string](0), rowCallback: DataProc = nil, + requestId: string = "", ): Future[DatabaseResult[void]] {.async.} = let connIndex = (await pool.getConnIndex()).valueOr: return err("connRes.isErr in query: " & $error) + let queryStartTime = getNowInNanosecondTime() let conn = pool.conns[connIndex].dbConn defer: pool.releaseConn(conn) + let queryDuration = getNowInNanosecondTime() - queryStartTime + if queryDuration > SlowQueryThresholdInNanoSeconds: + debug "pgQuery slow query", + query_duration_secs = (queryDuration / 1_000_000_000), query, requestId (await conn.dbConnQuery(sql(query), args, rowCallback)).isOkOr: return err("error in asyncpool query: " & $error) @@ -175,6 +183,7 @@ proc runStmt*( paramLengths: seq[int32], paramFormats: seq[int32], rowCallback: DataProc = nil, + requestId: string = "", ): Future[DatabaseResult[void]] {.async.} = ## Runs a stored statement, for performance purposes. ## The stored statements are connection specific and is a technique of caching a very common @@ -187,8 +196,16 @@ proc runStmt*( return err("Error in runStmt: " & $error) let conn = pool.conns[connIndex].dbConn + let queryStartTime = getNowInNanosecondTime() + defer: pool.releaseConn(conn) + let queryDuration = getNowInNanosecondTime() - queryStartTime + if queryDuration > SlowQueryThresholdInNanoSeconds: + debug "runStmt slow query", + query_duration = queryDuration / 1_000_000_000, + query = stmtDefinition, + requestId if not pool.conns[connIndex].preparedStmts.contains(stmtName): # The connection doesn't have that statement yet. Let's create it. diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index b3354c7c2..e347f38ac 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -769,6 +769,7 @@ proc toArchiveQuery( endTime: request.endTime, pageSize: request.pageSize.uint, direction: request.direction, + requestId: request.requestId, ) # TODO: Review this mapping logic. Maybe, move it to the appplication code @@ -910,6 +911,7 @@ proc toArchiveQuery(request: StoreQueryRequest): waku_archive.ArchiveQuery = query.hashes = request.messageHashes query.cursor = request.paginationCursor query.direction = request.paginationForward + query.requestId = request.requestId if request.paginationLimit.isSome(): query.pageSize = uint(request.paginationLimit.get()) diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index f95865ebd..7e1cce9dd 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -145,6 +145,7 @@ proc findMessages*( hashes = query.hashes, maxPageSize = maxPageSize + 1, ascendingOrder = isAscendingOrder, + requestId = query.requestId, ) ).valueOr: return err(ArchiveError(kind: ArchiveErrorKind.DRIVER_ERROR, cause: error)) diff --git a/waku/waku_archive/common.nim b/waku/waku_archive/common.nim index b88b70f05..731dc11c4 100644 --- a/waku/waku_archive/common.nim +++ b/waku/waku_archive/common.nim @@ -18,6 +18,7 @@ type hashes*: seq[WakuMessageHash] pageSize*: uint direction*: PagingDirection + requestId*: string ArchiveResponse* = object hashes*: seq[WakuMessageHash] diff --git a/waku/waku_archive/driver.nim b/waku/waku_archive/driver.nim index 49174b571..4d5cedd66 100644 --- a/waku/waku_archive/driver.nim +++ b/waku/waku_archive/driver.nim @@ -37,6 +37,7 @@ method getMessages*( hashes = newSeq[WakuMessageHash](0), maxPageSize = DefaultPageSize, ascendingOrder = true, + requestId = "", ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.base, async.} = discard diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index c15941b21..6c0160805 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -425,6 +425,7 @@ proc getMessagesArbitraryQuery( hexHashes: seq[string] = @[], maxPageSize = DefaultPageSize, ascendingOrder = true, + requestId: string, ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = ## This proc allows to handle atypical queries. We don't use prepared statements for those. @@ -489,7 +490,7 @@ proc getMessagesArbitraryQuery( proc rowCallback(pqResult: ptr PGresult) = rowCallbackImpl(pqResult, rows) - (await s.readConnPool.pgQuery(query, args, rowCallback)).isOkOr: + (await s.readConnPool.pgQuery(query, args, rowCallback, requestId)).isOkOr: return err("failed to run query: " & $error) return ok(rows) @@ -504,6 +505,7 @@ proc getMessageHashesArbitraryQuery( hexHashes: seq[string] = @[], maxPageSize = DefaultPageSize, ascendingOrder = true, + requestId: string, ): Future[ArchiveDriverResult[seq[(WakuMessageHash, PubsubTopic, WakuMessage)]]] {. async .} = @@ -571,7 +573,7 @@ proc getMessageHashesArbitraryQuery( proc rowCallback(pqResult: ptr PGresult) = hashCallbackImpl(pqResult, rows) - (await s.readConnPool.pgQuery(query, args, rowCallback)).isOkOr: + (await s.readConnPool.pgQuery(query, args, rowCallback, requestId)).isOkOr: return err("failed to run query: " & $error) return ok(rows) @@ -586,6 +588,7 @@ proc getMessagesPreparedStmt( hashes: string, maxPageSize = DefaultPageSize, ascOrder = true, + requestId: string, ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = ## This proc aims to run the most typical queries in a more performant way, ## i.e. by means of prepared statements. @@ -619,6 +622,7 @@ proc getMessagesPreparedStmt( ], @[int32(0), int32(0), int32(0), int32(0), int32(0)], rowCallback, + requestId, ) ).isOkOr: return err(stmtName & ": " & $error) @@ -659,6 +663,7 @@ proc getMessagesPreparedStmt( ], @[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)], rowCallback, + requestId, ) ).isOkOr: return err(stmtName & ": " & $error) @@ -675,6 +680,7 @@ proc getMessageHashesPreparedStmt( hashes: string, maxPageSize = DefaultPageSize, ascOrder = true, + requestId: string, ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = ## This proc aims to run the most typical queries in a more performant way, ## i.e. by means of prepared statements. @@ -710,6 +716,7 @@ proc getMessageHashesPreparedStmt( ], @[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)], rowCallback, + requestId, ) ).isOkOr: return err(stmtName & ": " & $error) @@ -753,6 +760,7 @@ proc getMessageHashesPreparedStmt( ], @[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)], rowCallback, + requestId, ) ).isOkOr: return err(stmtName & ": " & $error) @@ -760,7 +768,7 @@ proc getMessageHashesPreparedStmt( return ok(rows) proc getMessagesByMessageHashes( - s: PostgresDriver, hashes: string, maxPageSize: uint + s: PostgresDriver, hashes: string, maxPageSize: uint, requestId: string ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = ## Retrieves information only filtering by a given messageHashes list. ## This proc levarages on the messages_lookup table to have better query performance @@ -797,7 +805,11 @@ proc getMessagesByMessageHashes( proc rowCallback(pqResult: ptr PGresult) = rowCallbackImpl(pqResult, rows) - (await s.readConnPool.pgQuery(query = query, rowCallback = rowCallback)).isOkOr: + ( + await s.readConnPool.pgQuery( + query = query, rowCallback = rowCallback, requestId = requestId + ) + ).isOkOr: return err("failed to run query: " & $error) debug "end of getMessagesByMessageHashes" @@ -814,6 +826,7 @@ method getMessages*( hashes = newSeq[WakuMessageHash](0), maxPageSize = DefaultPageSize, ascendingOrder = true, + requestId = "", ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = debug "beginning of getMessages" @@ -825,8 +838,9 @@ method getMessages*( if cursor.isNone() and pubsubTopic.isNone() and contentTopics.len == 0 and startTime.isNone() and endTime.isNone() and hexHashes.len > 0: - return - await s.getMessagesByMessageHashes("'" & hexHashes.join("','") & "'", maxPageSize) + return await s.getMessagesByMessageHashes( + "'" & hexHashes.join("','") & "'", maxPageSize, requestId + ) if contentTopics.len > 0 and hexHashes.len > 0 and pubsubTopic.isSome() and startTime.isSome() and endTime.isSome(): @@ -841,6 +855,7 @@ method getMessages*( hexHashes.join(","), maxPageSize, ascendingOrder, + requestId, ) else: return await s.getMessageHashesPreparedStmt( @@ -852,18 +867,19 @@ method getMessages*( hexHashes.join(","), maxPageSize, ascendingOrder, + requestId, ) else: if includeData: ## We will run atypical query. In this case we don't use prepared statemets return await s.getMessagesArbitraryQuery( contentTopics, pubsubTopic, cursor, startTime, endTime, hexHashes, maxPageSize, - ascendingOrder, + ascendingOrder, requestId, ) else: return await s.getMessageHashesArbitraryQuery( contentTopics, pubsubTopic, cursor, startTime, endTime, hexHashes, maxPageSize, - ascendingOrder, + ascendingOrder, requestId, ) proc getStr( diff --git a/waku/waku_archive/driver/queue_driver/queue_driver.nim b/waku/waku_archive/driver/queue_driver/queue_driver.nim index df21cf8f4..9dbf3c112 100644 --- a/waku/waku_archive/driver/queue_driver/queue_driver.nim +++ b/waku/waku_archive/driver/queue_driver/queue_driver.nim @@ -256,6 +256,7 @@ method getMessages*( hashes: seq[WakuMessageHash] = @[], maxPageSize = DefaultPageSize, ascendingOrder = true, + requestId = "", ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = var index = none(Index) diff --git a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim index d872b9f15..173dd3e81 100644 --- a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim +++ b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim @@ -83,6 +83,7 @@ method getMessages*( hashes = newSeq[WakuMessageHash](0), maxPageSize = DefaultPageSize, ascendingOrder = true, + requestId = "", ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = if not includeData: return s.db.selectMessageHashesByStoreQueryWithLimit( diff --git a/waku/waku_archive_legacy/archive.nim b/waku/waku_archive_legacy/archive.nim index 690257ff5..f130ffc5f 100644 --- a/waku/waku_archive_legacy/archive.nim +++ b/waku/waku_archive_legacy/archive.nim @@ -151,6 +151,7 @@ proc findMessages*( hashes = query.hashes, maxPageSize = maxPageSize + 1, ascendingOrder = isAscendingOrder, + requestId = query.requestId, ) ).valueOr: return err(ArchiveError(kind: ArchiveErrorKind.DRIVER_ERROR, cause: error)) @@ -230,6 +231,7 @@ proc findMessagesV2*( endTime = query.endTime, maxPageSize = maxPageSize + 1, ascendingOrder = isAscendingOrder, + requestId = query.requestId, ) ).valueOr: return err(ArchiveError(kind: ArchiveErrorKind.DRIVER_ERROR, cause: error)) diff --git a/waku/waku_archive_legacy/common.nim b/waku/waku_archive_legacy/common.nim index 9ef67178f..e068e0f0c 100644 --- a/waku/waku_archive_legacy/common.nim +++ b/waku/waku_archive_legacy/common.nim @@ -52,6 +52,7 @@ type hashes*: seq[WakuMessageHash] pageSize*: uint direction*: PagingDirection + requestId*: string ArchiveResponse* = object hashes*: seq[WakuMessageHash] diff --git a/waku/waku_archive_legacy/driver.nim b/waku/waku_archive_legacy/driver.nim index 98dccdf0a..8ff8df029 100644 --- a/waku/waku_archive_legacy/driver.nim +++ b/waku/waku_archive_legacy/driver.nim @@ -41,6 +41,7 @@ method getMessagesV2*( endTime = none(Timestamp), maxPageSize = DefaultPageSize, ascendingOrder = true, + requestId: string, ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.base, deprecated, async.} = discard @@ -55,6 +56,7 @@ method getMessages*( hashes = newSeq[WakuMessageHash](0), maxPageSize = DefaultPageSize, ascendingOrder = true, + requestId = "", ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.base, async.} = discard diff --git a/waku/waku_archive_legacy/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive_legacy/driver/postgres_driver/postgres_driver.nim index f9dd05a61..c2f83e36a 100644 --- a/waku/waku_archive_legacy/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive_legacy/driver/postgres_driver/postgres_driver.nim @@ -281,6 +281,7 @@ proc getMessagesArbitraryQuery( hexHashes: seq[string] = @[], maxPageSize = DefaultPageSize, ascendingOrder = true, + requestId: string, ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = ## This proc allows to handle atypical queries. We don't use prepared statements for those. @@ -320,6 +321,7 @@ proc getMessagesArbitraryQuery( @[int32(hashHex.len)], @[int32(0)], entreeCallback, + requestId, ) ).isOkOr: return err("failed to run query with cursor: " & $error) @@ -360,7 +362,7 @@ proc getMessagesArbitraryQuery( proc rowCallback(pqResult: ptr PGresult) = rowCallbackImpl(pqResult, rows) - (await s.readConnPool.pgQuery(query, args, rowCallback)).isOkOr: + (await s.readConnPool.pgQuery(query, args, rowCallback, requestId)).isOkOr: return err("failed to run query: " & $error) return ok(rows) @@ -374,6 +376,7 @@ proc getMessagesV2ArbitraryQuery( endTime = none(Timestamp), maxPageSize = DefaultPageSize, ascendingOrder = true, + requestId: string, ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async, deprecated.} = ## This proc allows to handle atypical queries. We don't use prepared statements for those. @@ -424,7 +427,7 @@ proc getMessagesV2ArbitraryQuery( proc rowCallback(pqResult: ptr PGresult) = rowCallbackImpl(pqResult, rows) - (await s.readConnPool.pgQuery(query, args, rowCallback)).isOkOr: + (await s.readConnPool.pgQuery(query, args, rowCallback, requestId)).isOkOr: return err("failed to run query: " & $error) return ok(rows) @@ -439,6 +442,7 @@ proc getMessagesPreparedStmt( hashes: string, maxPageSize = DefaultPageSize, ascOrder = true, + requestId: string, ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = ## This proc aims to run the most typical queries in a more performant way, i.e. by means of ## prepared statements. @@ -469,6 +473,7 @@ proc getMessagesPreparedStmt( @[int32(hash.len)], @[int32(0)], entreeCallback, + requestId, ) ).isOkOr: return err("failed to run query with cursor: " & $error) @@ -505,6 +510,7 @@ proc getMessagesPreparedStmt( int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0) ], rowCallback, + requestId, ) ).isOkOr: return err("failed to run query with cursor: " & $error) @@ -528,6 +534,7 @@ proc getMessagesPreparedStmt( ], @[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)], rowCallback, + requestId, ) ).isOkOr: return err("failed to run query without cursor: " & $error) @@ -543,6 +550,7 @@ proc getMessagesV2PreparedStmt( endTime: Timestamp, maxPageSize = DefaultPageSize, ascOrder = true, + requestId: string, ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async, deprecated.} = ## This proc aims to run the most typical queries in a more performant way, i.e. by means of ## prepared statements. @@ -582,6 +590,7 @@ proc getMessagesV2PreparedStmt( ], @[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)], rowCallback, + requestId, ) ).isOkOr: return err("failed to run query with cursor: " & $error) @@ -605,6 +614,7 @@ proc getMessagesV2PreparedStmt( ], @[int32(0), int32(0), int32(0), int32(0), int32(0)], rowCallback, + requestId, ) ).isOkOr: return err("failed to run query without cursor: " & $error) @@ -612,7 +622,7 @@ proc getMessagesV2PreparedStmt( return ok(rows) proc getMessagesByMessageHashes( - s: PostgresDriver, hashes: string, maxPageSize: uint + s: PostgresDriver, hashes: string, maxPageSize: uint, requestId: string ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = ## Retrieves information only filtering by a given messageHashes list. ## This proc levarages on the messages_lookup table to have better query performance @@ -649,7 +659,11 @@ proc getMessagesByMessageHashes( proc rowCallback(pqResult: ptr PGresult) = rowCallbackImpl(pqResult, rows) - (await s.readConnPool.pgQuery(query = query, rowCallback = rowCallback)).isOkOr: + ( + await s.readConnPool.pgQuery( + query = query, rowCallback = rowCallback, requestId = requestId + ) + ).isOkOr: return err("failed to run query: " & $error) debug "end of getMessagesByMessageHashes" @@ -666,6 +680,7 @@ method getMessages*( hashes = newSeq[WakuMessageHash](0), maxPageSize = DefaultPageSize, ascendingOrder = true, + requestId = "", ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = debug "beginning of getMessages" @@ -673,8 +688,9 @@ method getMessages*( if cursor.isNone() and pubsubTopic.isNone() and contentTopicSeq.len == 0 and startTime.isNone() and endTime.isNone() and hexHashes.len > 0: - return - await s.getMessagesByMessageHashes("'" & hexHashes.join("','") & "'", maxPageSize) + return await s.getMessagesByMessageHashes( + "'" & hexHashes.join("','") & "'", maxPageSize, requestId + ) if contentTopicSeq.len == 1 and hexHashes.len == 1 and pubsubTopic.isSome() and startTime.isSome() and endTime.isSome(): @@ -688,12 +704,13 @@ method getMessages*( hexHashes.join(","), maxPageSize, ascendingOrder, + requestId, ) else: ## We will run atypical query. In this case we don't use prepared statemets return await s.getMessagesArbitraryQuery( contentTopicSeq, pubsubTopic, cursor, startTime, endTime, hexHashes, maxPageSize, - ascendingOrder, + ascendingOrder, requestId, ) method getMessagesV2*( @@ -705,6 +722,7 @@ method getMessagesV2*( endTime = none(Timestamp), maxPageSize = DefaultPageSize, ascendingOrder = true, + requestId: string, ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async, deprecated.} = if contentTopicSeq.len == 1 and pubsubTopic.isSome() and startTime.isSome() and endTime.isSome(): @@ -717,12 +735,13 @@ method getMessagesV2*( endTime.get(), maxPageSize, ascendingOrder, + requestId, ) else: ## We will run atypical query. In this case we don't use prepared statemets return await s.getMessagesV2ArbitraryQuery( contentTopicSeq, pubsubTopic, cursor, startTime, endTime, maxPageSize, - ascendingOrder, + ascendingOrder, requestId, ) proc getStr( diff --git a/waku/waku_archive_legacy/driver/queue_driver/queue_driver.nim b/waku/waku_archive_legacy/driver/queue_driver/queue_driver.nim index 85c30823a..942a720df 100644 --- a/waku/waku_archive_legacy/driver/queue_driver/queue_driver.nim +++ b/waku/waku_archive_legacy/driver/queue_driver/queue_driver.nim @@ -268,6 +268,7 @@ method getMessages*( hashes: seq[WakuMessageHash] = @[], maxPageSize = DefaultPageSize, ascendingOrder = true, + requestId = "", ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = let cursor = cursor.map(toIndex) diff --git a/waku/waku_archive_legacy/driver/sqlite_driver/sqlite_driver.nim b/waku/waku_archive_legacy/driver/sqlite_driver/sqlite_driver.nim index 4e0450aab..5a6c12b05 100644 --- a/waku/waku_archive_legacy/driver/sqlite_driver/sqlite_driver.nim +++ b/waku/waku_archive_legacy/driver/sqlite_driver/sqlite_driver.nim @@ -93,9 +93,8 @@ method getMessagesV2*( endTime = none(Timestamp), maxPageSize = DefaultPageSize, ascendingOrder = true, + requestId: string, ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async, deprecated.} = - echo "here" - let cursor = cursor.map(toDbCursor) let rowsRes = s.db.selectMessagesByHistoryQueryWithLimit( @@ -121,6 +120,7 @@ method getMessages*( hashes = newSeq[WakuMessageHash](0), maxPageSize = DefaultPageSize, ascendingOrder = true, + requestId = "", ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = let cursor = if cursor.isSome(): diff --git a/waku/waku_store/client.nim b/waku/waku_store/client.nim index 4b05249e3..61229576a 100644 --- a/waku/waku_store/client.nim +++ b/waku/waku_store/client.nim @@ -24,7 +24,8 @@ proc sendStoreRequest( ): Future[StoreQueryResult] {.async, gcsafe.} = var req = request - req.requestId = generateRequestId(self.rng) + if req.requestId == "": + req.requestId = generateRequestId(self.rng) let writeRes = catch: await connection.writeLP(req.encode().buffer) diff --git a/waku/waku_store_legacy/client.nim b/waku/waku_store_legacy/client.nim index 2e87c4ca4..f26906e9e 100644 --- a/waku/waku_store_legacy/client.nim +++ b/waku/waku_store_legacy/client.nim @@ -43,7 +43,13 @@ proc sendHistoryQueryRPC( let connection = connOpt.get() - let reqRpc = HistoryRPC(requestId: generateRequestId(w.rng), query: some(req.toRPC())) + let requestId = + if req.requestId != "": + req.requestId + else: + generateRequestId(w.rng) + + let reqRpc = HistoryRPC(requestId: requestId, query: some(req.toRPC())) await connection.writeLP(reqRpc.encode().buffer) #TODO: I see a challenge here, if storeNode uses a different MaxRPCSize this read will fail. diff --git a/waku/waku_store_legacy/common.nim b/waku/waku_store_legacy/common.nim index 3f88f9509..ddbe676d5 100644 --- a/waku/waku_store_legacy/common.nim +++ b/waku/waku_store_legacy/common.nim @@ -45,6 +45,7 @@ type endTime*: Option[Timestamp] pageSize*: uint64 direction*: PagingDirection + requestId*: string HistoryResponse* = object messages*: seq[WakuMessage] diff --git a/waku/waku_store_legacy/protocol.nim b/waku/waku_store_legacy/protocol.nim index a4e5c9246..11e3e9be9 100644 --- a/waku/waku_store_legacy/protocol.nim +++ b/waku/waku_store_legacy/protocol.nim @@ -55,9 +55,9 @@ proc handleLegacyQueryRequest( # TODO: Return (BAD_REQUEST, cause: "empty query") return - let - requestId = reqRpc.requestId - request = reqRpc.query.get().toAPI() + let requestId = reqRpc.requestId + var request = reqRpc.query.get().toAPI() + request.requestId = requestId info "received history query", peerId = requestor, requestId = requestId, query = request