From e1e05afb02b319b5cbacea57dba122ededc619b0 Mon Sep 17 00:00:00 2001 From: Simon-Pierre Vivier Date: Wed, 4 Sep 2024 10:17:28 -0400 Subject: [PATCH] chore: per limit split of PostgreSQL queries (#3008) --- .../postgres_driver/postgres_driver.nim | 133 +++++++++++------- 1 file changed, 83 insertions(+), 50 deletions(-) diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index 6c0160805..6c8ba28d6 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -1,7 +1,7 @@ {.push raises: [].} import - std/[nre, options, sequtils, strutils, strformat, times], + std/[nre, options, sequtils, strutils, strformat, times, sugar], stew/[byteutils, arrayops], results, chronos, @@ -128,7 +128,9 @@ const SelectCursorByHashDef = """SELECT timestamp FROM messages WHERE messageHash = $1""" -const DefaultMaxNumConns = 50 +const + DefaultMaxNumConns = 50 + MaxHashesPerQuery = 100 proc new*( T: type PostgresDriver, @@ -815,6 +817,70 @@ proc getMessagesByMessageHashes( debug "end of getMessagesByMessageHashes" return ok(rows) +proc getMessagesWithinLimits( + self: PostgresDriver, + includeData: bool, + contentTopics: seq[ContentTopic], + pubsubTopic: Option[PubsubTopic], + cursor: Option[ArchiveCursor], + startTime: Option[Timestamp], + endTime: Option[Timestamp], + hashes: seq[WakuMessageHash], + maxPageSize: uint, + ascendingOrder: bool, + requestId: string, +): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = + if hashes.len > MaxHashesPerQuery: + return err(fmt"can not attend queries with more than {MaxHashesPerQuery} hashes") + + let hexHashes = hashes.mapIt(toHex(it)) + + if cursor.isNone() and pubsubTopic.isNone() and contentTopics.len == 0 and + startTime.isNone() and endTime.isNone() and hexHashes.len > 0: + return await self.getMessagesByMessageHashes( + "'" & hexHashes.join("','") & "'", maxPageSize, requestId + ) + + if contentTopics.len > 0 and hexHashes.len > 0 and pubsubTopic.isSome() and + startTime.isSome() and endTime.isSome(): + ## Considered the most common query. Therefore, we use prepared statements to optimize it. + if includeData: + return await self.getMessagesPreparedStmt( + contentTopics.join(","), + PubsubTopic(pubsubTopic.get()), + cursor, + startTime.get(), + endTime.get(), + hexHashes.join(","), + maxPageSize, + ascendingOrder, + requestId, + ) + else: + return await self.getMessageHashesPreparedStmt( + contentTopics.join(","), + PubsubTopic(pubsubTopic.get()), + cursor, + startTime.get(), + endTime.get(), + hexHashes.join(","), + maxPageSize, + ascendingOrder, + requestId, + ) + else: + if includeData: + ## We will run atypical query. In this case we don't use prepared statemets + return await self.getMessagesArbitraryQuery( + contentTopics, pubsubTopic, cursor, startTime, endTime, hexHashes, maxPageSize, + ascendingOrder, requestId, + ) + else: + return await self.getMessageHashesArbitraryQuery( + contentTopics, pubsubTopic, cursor, startTime, endTime, hexHashes, maxPageSize, + ascendingOrder, requestId, + ) + method getMessages*( s: PostgresDriver, includeData = true, @@ -830,57 +896,24 @@ method getMessages*( ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = debug "beginning of getMessages" - const MAX_ALLOWED_HASHES = 100 - if hashes.len > MAX_ALLOWED_HASHES: - return err(fmt"can not attend queries with more than {MAX_ALLOWED_HASHES} hashes") + let rows = collect(newSeq): + for i in countup(0, hashes.len, MaxHashesPerQuery): + let stop = min(i + MaxHashesPerQuery, hashes.len) - let hexHashes = hashes.mapIt(toHex(it)) + let splittedHashes = hashes[i ..< stop] - if cursor.isNone() and pubsubTopic.isNone() and contentTopics.len == 0 and - startTime.isNone() and endTime.isNone() and hexHashes.len > 0: - return await s.getMessagesByMessageHashes( - "'" & hexHashes.join("','") & "'", maxPageSize, requestId - ) + let subRows = + ?await s.getMessagesWithinLimits( + includeData, contentTopics, pubsubTopic, cursor, startTime, endTime, + splittedHashes, maxPageSize, ascendingOrder, requestId, + ) - if contentTopics.len > 0 and hexHashes.len > 0 and pubsubTopic.isSome() and - startTime.isSome() and endTime.isSome(): - ## Considered the most common query. Therefore, we use prepared statements to optimize it. - if includeData: - return await s.getMessagesPreparedStmt( - contentTopics.join(","), - PubsubTopic(pubsubTopic.get()), - cursor, - startTime.get(), - endTime.get(), - hexHashes.join(","), - maxPageSize, - ascendingOrder, - requestId, - ) - else: - return await s.getMessageHashesPreparedStmt( - contentTopics.join(","), - PubsubTopic(pubsubTopic.get()), - cursor, - startTime.get(), - endTime.get(), - hexHashes.join(","), - maxPageSize, - ascendingOrder, - requestId, - ) - else: - if includeData: - ## We will run atypical query. In this case we don't use prepared statemets - return await s.getMessagesArbitraryQuery( - contentTopics, pubsubTopic, cursor, startTime, endTime, hexHashes, maxPageSize, - ascendingOrder, requestId, - ) - else: - return await s.getMessageHashesArbitraryQuery( - contentTopics, pubsubTopic, cursor, startTime, endTime, hexHashes, maxPageSize, - ascendingOrder, requestId, - ) + for row in subRows: + row + + debug "end of getMessages" + + return ok(rows) proc getStr( s: PostgresDriver, query: string