chore: Optimize hash queries with lookup table (#2933)

* Upgrade Postgres schema to add messages_lookup table
* Perform optimized query for messageHash-only queries
This commit is contained in:
Ivan FB 2024-08-08 21:46:08 +02:00 committed by GitHub
parent 3e6b2ea683
commit 696587fdac
8 changed files with 298 additions and 37 deletions

View File

@ -0,0 +1,30 @@
const ContentScriptVersion_7* =
"""
-- Create lookup table
CREATE TABLE IF NOT EXISTS messages_lookup (
timestamp BIGINT NOT NULL,
messageHash VARCHAR NOT NULL
);
-- Put data into lookup table
INSERT INTO messages_lookup (messageHash, timestamp) SELECT messageHash, timestamp from messages;
ALTER TABLE messages_lookup ADD CONSTRAINT messageIndexLookupTable PRIMARY KEY (messageHash, timestamp);
-- Create indexes
CREATE INDEX IF NOT EXISTS idx_messages_messagehash ON messages (messagehash);
CREATE INDEX IF NOT EXISTS idx_messages_timestamp ON messages (timestamp);
CREATE INDEX IF NOT EXISTS idx_messages_lookup_messagehash ON messages_lookup (messagehash);
CREATE INDEX IF NOT EXISTS idx_messages_lookup_timestamp ON messages_lookup (timestamp);
DROP INDEX IF EXISTS i_query_storedat;
DROP INDEX IF EXISTS i_query;
CREATE INDEX IF NOT EXISTS idx_query_pubsubtopic ON messages (pubsubTopic);
CREATE INDEX IF NOT EXISTS idx_query_contenttopic ON messages (contentTopic);
-- Update to new version
UPDATE version SET version = 7 WHERE version = 6;
"""

View File

@ -1,6 +1,7 @@
import
content_script_version_1, content_script_version_2, content_script_version_3,
content_script_version_4, content_script_version_5, content_script_version_6
content_script_version_4, content_script_version_5, content_script_version_6,
content_script_version_7
type MigrationScript* = object
version*: int
@ -17,6 +18,7 @@ const PgMigrationScripts* =
MigrationScript(version: 4, scriptContent: ContentScriptVersion_4),
MigrationScript(version: 5, scriptContent: ContentScriptVersion_5),
MigrationScript(version: 6, scriptContent: ContentScriptVersion_6),
MigrationScript(version: 7, scriptContent: ContentScriptVersion_7),
]
proc getMigrationScripts*(currentVersion: int64, targetVersion: int64): seq[string] =

View File

@ -1788,3 +1788,32 @@ suite "Postgres driver - queries":
var existsRes = await driver.existsTable("version")
assert existsRes.isOk(), existsRes.error
check existsRes.get() == true
asyncTest "Query by message hash only":
const contentTopic = "test-content-topic"
let timeOrigin = now()
let expected =
@[
fakeWakuMessage(@[byte 0], contentTopic = contentTopic, ts = ts(00, timeOrigin)),
fakeWakuMessage(@[byte 1], contentTopic = contentTopic, ts = ts(10, timeOrigin)),
fakeWakuMessage(@[byte 2], contentTopic = contentTopic, ts = ts(20, timeOrigin)),
]
var messages = expected
var hashes = newSeq[WakuMessageHash](0)
for msg in messages:
let hash = computeMessageHash(DefaultPubsubTopic, msg)
hashes.add(hash)
let ret = await driver.put(hash, DefaultPubsubTopic, msg)
assert ret.isOk(), ret.error
let ret = (await driver.getMessages(hashes = hashes)).valueOr:
assert false, $error
return
check:
ret.len == 3
ret[2][0] == hashes[0]
ret[1][0] == hashes[1]
ret[0][0] == hashes[2]

View File

@ -1952,3 +1952,36 @@ suite "Postgres driver - queries":
var existsRes = await driver.existsTable("version")
assert existsRes.isOk(), existsRes.error
check existsRes.get() == true
asyncTest "Query by message hash only - legacy":
const contentTopic = "test-content-topic"
let timeOrigin = now()
let expected =
@[
fakeWakuMessage(@[byte 0], contentTopic = contentTopic, ts = ts(00, timeOrigin)),
fakeWakuMessage(@[byte 1], contentTopic = contentTopic, ts = ts(10, timeOrigin)),
fakeWakuMessage(@[byte 2], contentTopic = contentTopic, ts = ts(20, timeOrigin)),
]
var messages = expected
var hashes = newSeq[WakuMessageHash](0)
for msg in messages:
let hash = computeMessageHash(DefaultPubsubTopic, msg)
hashes.add(hash)
require (
await driver.put(
DefaultPubsubTopic, msg, computeDigest(msg), hash, msg.timestamp
)
).isOk()
let ret = (await driver.getMessages(hashes = hashes)).valueOr:
assert false, $error
return
check:
ret.len == 3
## (PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)
ret[2][4] == hashes[0]
ret[1][4] == hashes[1]
ret[0][4] == hashes[2]

View File

@ -9,7 +9,7 @@ import
logScope:
topics = "waku archive migration"
const SchemaVersion* = 6 # increase this when there is an update in the database schema
const SchemaVersion* = 7 # increase this when there is an update in the database schema
proc breakIntoStatements*(script: string): seq[string] =
## Given a full migration script, that can potentially contain a list

View File

@ -15,7 +15,7 @@ logScope:
type TimeRange* = tuple[beginning: int64, `end`: int64]
type
Partition = object
Partition* = object
name: string
timeRange: TimeRange
@ -132,5 +132,8 @@ proc calcEndPartitionTime*(startTime: Timestamp): Timestamp =
proc getName*(partition: Partition): string =
return partition.name
proc getTimeRange*(partition: Partition): TimeRange =
return partition.timeRange
func `==`*(a, b: Partition): bool {.inline.} =
return a.name == b.name

View File

@ -30,6 +30,10 @@ const InsertRowStmtDefinition =
"""INSERT INTO messages (id, messageHash, pubsubTopic, contentTopic, payload,
version, timestamp, meta) VALUES ($1, $2, $3, $4, $5, $6, $7, CASE WHEN $8 = '' THEN NULL ELSE $8 END) ON CONFLICT DO NOTHING;"""
const InsertRowInMessagesLookupStmtName = "InsertRowMessagesLookup"
const InsertRowInMessagesLookupStmtDefinition =
"""INSERT INTO messages_lookup (messageHash, timestamp) VALUES ($1, $2) ON CONFLICT DO NOTHING;"""
const SelectClause =
"""SELECT messageHash, pubsubTopic, contentTopic, payload, version, timestamp, meta FROM messages """
@ -301,27 +305,44 @@ method put*(
## until we completely remove the store/archive-v2 logic
let fakeId = "0"
(
## Add the row to the messages table
await s.writeConnPool.runStmt(
InsertRowStmtName,
InsertRowStmtDefinition,
@[
fakeId, messageHash, pubsubTopic, contentTopic, payload, version, timestamp,
meta,
],
@[
int32(fakeId.len),
int32(messageHash.len),
int32(pubsubTopic.len),
int32(contentTopic.len),
int32(payload.len),
int32(version.len),
int32(timestamp.len),
int32(meta.len),
],
@[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)],
)
).isOkOr:
return err("could not put msg in messages table: " & $error)
## Now add the row to messages_lookup
return await s.writeConnPool.runStmt(
InsertRowStmtName,
InsertRowStmtDefinition,
@[fakeId, messageHash, pubsubTopic, contentTopic, payload, version, timestamp, meta],
@[
int32(fakeId.len),
int32(messageHash.len),
int32(pubsubTopic.len),
int32(contentTopic.len),
int32(payload.len),
int32(version.len),
int32(timestamp.len),
int32(meta.len),
],
@[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)],
InsertRowInMessagesLookupStmtName,
InsertRowInMessagesLookupStmtDefinition,
@[messageHash, timestamp],
@[int32(messageHash.len), int32(timestamp.len)],
@[int32(0), int32(0)],
)
method getAllMessages*(
s: PostgresDriver
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## Retrieve all messages from the store.
debug "beginning of getAllMessages"
var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)]
proc rowCallback(pqResult: ptr PGresult) =
@ -486,7 +507,7 @@ proc getMessageHashesArbitraryQuery(
.} =
## This proc allows to handle atypical queries. We don't use prepared statements for those.
debug "beginning of getMessagesV2ArbitraryQuery"
debug "beginning of getMessageHashesArbitraryQuery"
var query = """SELECT messageHash FROM messages"""
var statements: seq[string]
@ -658,7 +679,7 @@ proc getMessageHashesPreparedStmt(
var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)]
debug "beginning of getMessagesV2PreparedStmt"
debug "beginning of getMessageHashesPreparedStmt"
proc rowCallback(pqResult: ptr PGresult) =
hashCallbackImpl(pqResult, rows)
@ -736,6 +757,50 @@ proc getMessageHashesPreparedStmt(
return ok(rows)
proc getMessagesByMessageHashes(
s: PostgresDriver, hashes: string, maxPageSize: uint
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## Retrieves information only filtering by a given messageHashes list.
## This proc levarages on the messages_lookup table to have better query performance
## and only query the desired partitions in the partitioned messages table
debug "beginning of getMessagesByMessageHashes"
var query =
fmt"""
WITH min_timestamp AS (
SELECT MIN(timestamp) AS min_ts
FROM messages_lookup
WHERE messagehash IN (
{hashes}
)
)
SELECT m.messageHash, pubsubTopic, contentTopic, payload, version, m.timestamp, meta
FROM messages m
INNER JOIN
messages_lookup l
ON
m.timestamp = l.timestamp
AND m.messagehash = l.messagehash
WHERE
l.timestamp >= (SELECT min_ts FROM min_timestamp)
AND l.messagehash IN (
{hashes}
)
ORDER BY
m.timestamp DESC,
m.messagehash DESC
LIMIT {maxPageSize};
"""
var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)]
proc rowCallback(pqResult: ptr PGresult) =
rowCallbackImpl(pqResult, rows)
(await s.readConnPool.pgQuery(query = query, rowCallback = rowCallback)).isOkOr:
return err("failed to run query: " & $error)
debug "end of getMessagesByMessageHashes"
return ok(rows)
method getMessages*(
s: PostgresDriver,
includeData = true,
@ -752,6 +817,11 @@ method getMessages*(
let hexHashes = hashes.mapIt(toHex(it))
if cursor.isNone() and pubsubTopic.isNone() and contentTopics.len == 0 and
startTime.isNone() and endTime.isNone() and hexHashes.len > 0:
return
await s.getMessagesByMessageHashes("'" & hexHashes.join("','") & "'", maxPageSize)
if contentTopics.len > 0 and hexHashes.len > 0 and pubsubTopic.isSome() and
startTime.isSome() and endTime.isSome():
## Considered the most common query. Therefore, we use prepared statements to optimize it.
@ -892,7 +962,7 @@ method deleteOldestMessagesNotWithinLimit*(
): Future[ArchiveDriverResult[void]] {.async.} =
debug "beginning of deleteOldestMessagesNotWithinLimit"
let execRes = await s.writeConnPool.pgQuery(
var execRes = await s.writeConnPool.pgQuery(
"""DELETE FROM messages WHERE messageHash NOT IN
(
SELECT messageHash FROM messages ORDER BY timestamp DESC LIMIT ?
@ -902,6 +972,18 @@ method deleteOldestMessagesNotWithinLimit*(
if execRes.isErr():
return err("error in deleteOldestMessagesNotWithinLimit: " & execRes.error)
execRes = await s.writeConnPool.pgQuery(
"""DELETE FROM messages_lookup WHERE messageHash NOT IN
(
SELECT messageHash FROM messages ORDER BY timestamp DESC LIMIT ?
);""",
@[$limit],
)
if execRes.isErr():
return err(
"error in deleteOldestMessagesNotWithinLimit messages_lookup: " & execRes.error
)
debug "end of deleteOldestMessagesNotWithinLimit"
return ok()
@ -1226,8 +1308,12 @@ proc getTableSize*(
return ok(tableSize)
proc removePartition(
self: PostgresDriver, partitionName: string
self: PostgresDriver, partition: Partition
): Future[ArchiveDriverResult[void]] {.async.} =
## Removes the desired partition and also removes the rows from messages_lookup table
## whose rows belong to the partition time range
let partitionName = partition.getName()
debug "beginning of removePartition", partitionName
var partSize = ""
@ -1251,6 +1337,13 @@ proc removePartition(
debug "removed partition", partition_name = partitionName, partition_size = partSize
self.partitionMngr.removeOldestPartitionName()
## Now delete rows from the messages_lookup table
let timeRange = partition.getTimeRange()
let `end` = timeRange.`end` * 1_000_000_000
let deleteRowsQuery = "DELETE FROM messages_lookup WHERE timestamp < " & $`end`
(await self.performWriteQuery(deleteRowsQuery)).isOkOr:
return err(fmt"error in {deleteRowsQuery}: " & $error)
return ok()
proc removePartitionsOlderThan(
@ -1265,7 +1358,7 @@ proc removePartitionsOlderThan(
return err("could not get oldest partition in removePartitionOlderThan: " & $error)
while not oldestPartition.containsMoment(tsInSec):
(await self.removePartition(oldestPartition.getName())).isOkOr:
(await self.removePartition(oldestPartition)).isOkOr:
return err("issue in removePartitionsOlderThan: " & $error)
oldestPartition = self.partitionMngr.getOldestPartition().valueOr:
@ -1295,7 +1388,7 @@ proc removeOldestPartition(
debug "Skipping to remove the current partition"
return ok()
return await self.removePartition(oldestPartition.getName())
return await self.removePartition(oldestPartition)
proc containsAnyPartition*(self: PostgresDriver): bool =
return not self.partitionMngr.isEmpty()

View File

@ -27,6 +27,10 @@ const InsertRowStmtDefinition = # TODO: get the sql queries from a file
"""INSERT INTO messages (id, messageHash, contentTopic, payload, pubsubTopic,
version, timestamp, meta) VALUES ($1, $2, $3, $4, $5, $6, $7, CASE WHEN $8 = '' THEN NULL ELSE $8 END) ON CONFLICT DO NOTHING;"""
const InsertRowInMessagesLookupStmtName = "InsertRowMessagesLookup"
const InsertRowInMessagesLookupStmtDefinition =
"""INSERT INTO messages_lookup (messageHash, timestamp) VALUES ($1, $2) ON CONFLICT DO NOTHING;"""
const SelectNoCursorAscStmtName = "SelectWithoutCursorAsc"
const SelectNoCursorAscStmtDef =
"""SELECT contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta FROM messages
@ -212,27 +216,43 @@ method put*(
trace "put PostgresDriver", timestamp = timestamp
(
await s.writeConnPool.runStmt(
InsertRowStmtName,
InsertRowStmtDefinition,
@[
digest, messageHash, contentTopic, payload, pubsubTopic, version, timestamp,
meta,
],
@[
int32(digest.len),
int32(messageHash.len),
int32(contentTopic.len),
int32(payload.len),
int32(pubsubTopic.len),
int32(version.len),
int32(timestamp.len),
int32(meta.len),
],
@[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)],
)
).isOkOr:
return err("could not put msg in messages table: " & $error)
## Now add the row to messages_lookup
return await s.writeConnPool.runStmt(
InsertRowStmtName,
InsertRowStmtDefinition,
@[digest, messageHash, contentTopic, payload, pubsubTopic, version, timestamp, meta],
@[
int32(digest.len),
int32(messageHash.len),
int32(contentTopic.len),
int32(payload.len),
int32(pubsubTopic.len),
int32(version.len),
int32(timestamp.len),
int32(meta.len),
],
@[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)],
InsertRowInMessagesLookupStmtName,
InsertRowInMessagesLookupStmtDefinition,
@[messageHash, timestamp],
@[int32(messageHash.len), int32(timestamp.len)],
@[int32(0), int32(0)],
)
method getAllMessages*(
s: PostgresDriver
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## Retrieve all messages from the store.
debug "beginning of getAllMessages"
var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)]
proc rowCallback(pqResult: ptr PGresult) =
@ -591,6 +611,50 @@ proc getMessagesV2PreparedStmt(
return ok(rows)
proc getMessagesByMessageHashes(
s: PostgresDriver, hashes: string, maxPageSize: uint
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## Retrieves information only filtering by a given messageHashes list.
## This proc levarages on the messages_lookup table to have better query performance
## and only query the desired partitions in the partitioned messages table
debug "beginning of getMessagesByMessageHashes"
var query =
fmt"""
WITH min_timestamp AS (
SELECT MIN(timestamp) AS min_ts
FROM messages_lookup
WHERE messagehash IN (
{hashes}
)
)
SELECT contentTopic, payload, pubsubTopic, version, m.timestamp, id, m.messageHash, meta
FROM messages m
INNER JOIN
messages_lookup l
ON
m.timestamp = l.timestamp
AND m.messagehash = l.messagehash
WHERE
l.timestamp >= (SELECT min_ts FROM min_timestamp)
AND l.messagehash IN (
{hashes}
)
ORDER BY
m.timestamp DESC,
m.messagehash DESC
LIMIT {maxPageSize};
"""
var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)]
proc rowCallback(pqResult: ptr PGresult) =
rowCallbackImpl(pqResult, rows)
(await s.readConnPool.pgQuery(query = query, rowCallback = rowCallback)).isOkOr:
return err("failed to run query: " & $error)
debug "end of getMessagesByMessageHashes"
return ok(rows)
method getMessages*(
s: PostgresDriver,
includeData = true,
@ -603,8 +667,15 @@ method getMessages*(
maxPageSize = DefaultPageSize,
ascendingOrder = true,
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
debug "beginning of getMessages"
let hexHashes = hashes.mapIt(toHex(it))
if cursor.isNone() and pubsubTopic.isNone() and contentTopicSeq.len == 0 and
startTime.isNone() and endTime.isNone() and hexHashes.len > 0:
return
await s.getMessagesByMessageHashes("'" & hexHashes.join("','") & "'", maxPageSize)
if contentTopicSeq.len == 1 and hexHashes.len == 1 and pubsubTopic.isSome() and
startTime.isSome() and endTime.isSome():
## Considered the most common query. Therefore, we use prepared statements to optimize it.