feat: supporting meta field in store (#2609)

This commit is contained in:
gabrielmer 2024-05-06 10:20:21 +02:00 committed by GitHub
parent f8184a6de8
commit a46d4451eb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 286 additions and 36 deletions

View File

@ -0,0 +1,71 @@
const ContentScriptVersion_4* =
"""
ALTER TABLE IF EXISTS messages_backup RENAME TO messages;
ALTER TABLE messages RENAME TO messages_backup;
ALTER TABLE messages_backup DROP CONSTRAINT messageIndex;
CREATE TABLE IF NOT EXISTS messages (
pubsubTopic VARCHAR NOT NULL,
contentTopic VARCHAR NOT NULL,
payload VARCHAR,
version INTEGER NOT NULL,
timestamp BIGINT NOT NULL,
id VARCHAR NOT NULL,
messageHash VARCHAR NOT NULL,
storedAt BIGINT NOT NULL,
meta VARCHAR,
CONSTRAINT messageIndex PRIMARY KEY (messageHash, storedAt)
) PARTITION BY RANGE (storedAt);
DO $$
DECLARE
min_storedAt numeric;
max_storedAt numeric;
min_storedAtSeconds integer = 0;
max_storedAtSeconds integer = 0;
partition_name TEXT;
create_partition_stmt TEXT;
BEGIN
SELECT MIN(storedAt) into min_storedAt
FROM messages_backup;
SELECT MAX(storedAt) into max_storedAt
FROM messages_backup;
min_storedAtSeconds := min_storedAt / 1000000000;
max_storedAtSeconds := max_storedAt / 1000000000;
partition_name := 'messages_' || min_storedAtSeconds || '_' || max_storedAtSeconds;
create_partition_stmt := 'CREATE TABLE ' || partition_name ||
' PARTITION OF messages FOR VALUES FROM (' ||
min_storedAt || ') TO (' || (max_storedAt + 1) || ')';
IF min_storedAtSeconds > 0 AND max_storedAtSeconds > 0 THEN
EXECUTE create_partition_stmt USING partition_name, min_storedAt, max_storedAt;
END IF;
END $$;
INSERT INTO messages (
pubsubTopic,
contentTopic,
payload,
version,
timestamp,
id,
messageHash,
storedAt
)
SELECT pubsubTopic,
contentTopic,
payload,
version,
timestamp,
id,
messageHash,
storedAt
FROM messages_backup;
DROP TABLE messages_backup;
UPDATE version SET version = 4 WHERE version = 3;
"""

View File

@ -1,4 +1,6 @@
import content_script_version_1, content_script_version_2, content_script_version_3
import
content_script_version_1, content_script_version_2, content_script_version_3,
content_script_version_4
type MigrationScript* = object
version*: int
@ -12,6 +14,7 @@ const PgMigrationScripts* =
MigrationScript(version: 1, scriptContent: ContentScriptVersion_1),
MigrationScript(version: 2, scriptContent: ContentScriptVersion_2),
MigrationScript(version: 3, scriptContent: ContentScriptVersion_3),
MigrationScript(version: 4, scriptContent: ContentScriptVersion_4),
]
proc getMigrationScripts*(currentVersion: int64, targetVersion: int64): seq[string] =

View File

@ -47,20 +47,27 @@ export waku_core.DefaultPubsubTopic, waku_core.DefaultContentTopic
proc fakeWakuMessage*(
payload: string | seq[byte] = "TEST-PAYLOAD",
contentTopic = DefaultContentTopic,
meta = newSeq[byte](),
meta: string | seq[byte] = newSeq[byte](),
ts = now(),
ephemeral = false,
): WakuMessage =
var payloadBytes: seq[byte]
var metaBytes: seq[byte]
when payload is string:
payloadBytes = toBytes(payload)
else:
payloadBytes = payload
when meta is string:
metaBytes = toBytes(meta)
else:
metaBytes = meta
WakuMessage(
payload: payloadBytes,
contentTopic: contentTopic,
meta: meta,
meta: metaBytes,
version: 2,
timestamp: ts,
ephemeral: ephemeral,

View File

@ -54,8 +54,9 @@ suite "Postgres driver":
asyncTest "Insert a message":
const contentTopic = "test-content-topic"
const meta = "test meta"
let msg = fakeWakuMessage(contentTopic = contentTopic)
let msg = fakeWakuMessage(contentTopic = contentTopic, meta = meta)
let computedDigest = computeDigest(msg)
let computedHash = computeMessageHash(DefaultPubsubTopic, msg)
@ -75,6 +76,7 @@ suite "Postgres driver":
assert toHex(computedDigest.data) == toHex(digest)
assert toHex(actualMsg.payload) == toHex(msg.payload)
assert toHex(computedHash) == toHex(hash)
assert toHex(actualMsg.meta) == toHex(msg.meta)
asyncTest "Insert and query message":
const contentTopic1 = "test-content-topic-1"

View File

@ -133,6 +133,61 @@ suite "Postgres driver - queries":
check:
filteredMessages == expected[2 .. 3]
asyncTest "single content topic with meta field":
## Given
const contentTopic = "test-content-topic"
let expected =
@[
fakeWakuMessage(@[byte 0], ts = ts(00), meta = "meta-0"),
fakeWakuMessage(@[byte 1], ts = ts(10), meta = "meta-1"),
fakeWakuMessage(
@[byte 2], contentTopic = contentTopic, ts = ts(20), meta = "meta-2"
),
fakeWakuMessage(
@[byte 3], contentTopic = contentTopic, ts = ts(30), meta = "meta-3"
),
fakeWakuMessage(
@[byte 4], contentTopic = contentTopic, ts = ts(40), meta = "meta-4"
),
fakeWakuMessage(
@[byte 5], contentTopic = contentTopic, ts = ts(50), meta = "meta-5"
),
fakeWakuMessage(
@[byte 6], contentTopic = contentTopic, ts = ts(60), meta = "meta-6"
),
fakeWakuMessage(
@[byte 7], contentTopic = contentTopic, ts = ts(70), meta = "meta-7"
),
]
var messages = expected
shuffle(messages)
debug "randomized message insertion sequence", sequence = messages.mapIt(it.payload)
for msg in messages:
require (
await driver.put(
DefaultPubsubTopic,
msg,
computeDigest(msg),
computeMessageHash(DefaultPubsubTopic, msg),
msg.timestamp,
)
).isOk()
## When
let res = await driver.getMessages(
contentTopic = @[contentTopic], maxPageSize = 2, ascendingOrder = true
)
## Then
assert res.isOk(), res.error
let filteredMessages = res.tryGet().mapIt(it[1])
check:
filteredMessages == expected[2 .. 3]
asyncTest "single content topic - descending order":
## Given
const contentTopic = "test-content-topic"

View File

@ -32,10 +32,11 @@ suite "SQLite driver":
test "insert a message":
## Given
const contentTopic = "test-content-topic"
const meta = "test meta"
let driver = newSqliteArchiveDriver()
let msg = fakeWakuMessage(contentTopic = contentTopic)
let msg = fakeWakuMessage(contentTopic = contentTopic, meta = meta)
let msgHash = computeMessageHash(DefaultPubsubTopic, msg)
## When
@ -51,9 +52,9 @@ suite "SQLite driver":
check:
storedMsg.len == 1
storedMsg.all do(item: auto) -> bool:
let (pubsubTopic, msg, _, _, hash) = item
msg.contentTopic == contentTopic and pubsubTopic == DefaultPubsubTopic and
hash == msgHash
let (pubsubTopic, actualMsg, _, _, hash) = item
actualMsg.contentTopic == contentTopic and pubsubTopic == DefaultPubsubTopic and
hash == msgHash and msg.meta == actualMsg.meta
## Cleanup
(waitFor driver.close()).expect("driver to close")

View File

@ -116,6 +116,67 @@ suite "SQLite driver - query by content topic":
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "single content topic with meta field":
## Given
const contentTopic = "test-content-topic"
let driver = newSqliteArchiveDriver()
let expected =
@[
fakeWakuMessage(@[byte 0], ts = ts(00), meta = "meta-0"),
fakeWakuMessage(@[byte 1], ts = ts(10), meta = "meta-1"),
fakeWakuMessage(
@[byte 2], contentTopic = contentTopic, ts = ts(20), meta = "meta-2"
),
fakeWakuMessage(
@[byte 3], contentTopic = contentTopic, ts = ts(30), meta = "meta-3"
),
fakeWakuMessage(
@[byte 4], contentTopic = contentTopic, ts = ts(40), meta = "meta-4"
),
fakeWakuMessage(
@[byte 5], contentTopic = contentTopic, ts = ts(50), meta = "meta-5"
),
fakeWakuMessage(
@[byte 6], contentTopic = contentTopic, ts = ts(60), meta = "meta-6"
),
fakeWakuMessage(
@[byte 7], contentTopic = contentTopic, ts = ts(70), meta = "meta-7"
),
]
var messages = expected
shuffle(messages)
debug "randomized message insertion sequence", sequence = messages.mapIt(it.payload)
for msg in messages:
require (
await driver.put(
DefaultPubsubTopic,
msg,
computeDigest(msg),
computeMessageHash(DefaultPubsubTopic, msg),
msg.timestamp,
)
).isOk()
## When
let res = await driver.getMessages(
contentTopic = @[contentTopic], maxPageSize = 2, ascendingOrder = true
)
## Then
check:
res.isOk()
let filteredMessages = res.tryGet().mapIt(it[1])
check:
filteredMessages == expected[2 .. 3]
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "single content topic - descending order":
## Given
const contentTopic = "test-content-topic"

View File

@ -34,6 +34,7 @@ proc msgHash*(pubSubTopic: string, msg: WakuMessage): array[32, byte] =
ctx.update(msg.payload)
ctx.update(msg.contentTopic.toBytes())
ctx.update(msg.timestamp.uint64.toBytes(Endianness.littleEndian))
# ctx.update(msg.meta) meta is not included in the message hash, as the signature goes in the meta field
ctx.update(
if msg.ephemeral:
@[1.byte]

View File

@ -9,7 +9,7 @@ import
logScope:
topics = "waku archive migration"
const SchemaVersion* = 3 # increase this when there is an update in the database schema
const SchemaVersion* = 4 # increase this when there is an update in the database schema
proc breakIntoStatements*(script: string): seq[string] =
## Given a full migration script, that can potentially contain a list

View File

@ -31,11 +31,11 @@ type PostgresDriver* = ref object of ArchiveDriver
const InsertRowStmtName = "InsertRow"
const InsertRowStmtDefinition = # TODO: get the sql queries from a file
"""INSERT INTO messages (id, messageHash, storedAt, contentTopic, payload, pubsubTopic,
version, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT DO NOTHING;"""
version, timestamp, meta) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, CASE WHEN $9 = '' THEN NULL ELSE $9 END) ON CONFLICT DO NOTHING;"""
const SelectNoCursorAscStmtName = "SelectWithoutCursorAsc"
const SelectNoCursorAscStmtDef =
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta FROM messages
WHERE contentTopic IN ($1) AND
messageHash IN ($2) AND
pubsubTopic = $3 AND
@ -86,7 +86,7 @@ const SelectNoCursorV2AscStmtDef =
const SelectNoCursorV2DescStmtName = "SelectWithoutCursorV2Desc"
const SelectNoCursorV2DescStmtDef =
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta FROM messages
WHERE contentTopic IN ($1) AND
pubsubTopic = $2 AND
storedAt >= $3 AND
@ -95,7 +95,7 @@ const SelectNoCursorV2DescStmtDef =
const SelectWithCursorV2DescStmtName = "SelectWithCursorV2Desc"
const SelectWithCursorV2DescStmtDef =
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta FROM messages
WHERE contentTopic IN ($1) AND
pubsubTopic = $2 AND
(storedAt, id) < ($3,$4) AND
@ -105,7 +105,7 @@ const SelectWithCursorV2DescStmtDef =
const SelectWithCursorV2AscStmtName = "SelectWithCursorV2Asc"
const SelectWithCursorV2AscStmtDef =
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta FROM messages
WHERE contentTopic IN ($1) AND
pubsubTopic = $2 AND
(storedAt, id) > ($3,$4) AND
@ -161,7 +161,7 @@ proc rowCallbackImpl(
## outRows - seq of Store-rows. This is populated from the info contained in pqResult
let numFields = pqResult.pqnfields()
if numFields != 8:
if numFields != 9:
error "Wrong number of fields"
return
@ -176,6 +176,7 @@ proc rowCallbackImpl(
var payload: string
var hashHex: string
var msgHash: WakuMessageHash
var meta: string
try:
storedAt = parseInt($(pqgetvalue(pqResult, iRow, 0)))
@ -186,6 +187,7 @@ proc rowCallbackImpl(
timestamp = parseInt($(pqgetvalue(pqResult, iRow, 5)))
digest = parseHexStr($(pqgetvalue(pqResult, iRow, 6)))
hashHex = parseHexStr($(pqgetvalue(pqResult, iRow, 7)))
meta = parseHexStr($(pqgetvalue(pqResult, iRow, 8)))
msgHash = fromBytes(hashHex.toOpenArrayByte(0, 31))
except ValueError:
error "could not parse correctly", error = getCurrentExceptionMsg()
@ -194,6 +196,7 @@ proc rowCallbackImpl(
wakuMessage.version = uint32(version)
wakuMessage.contentTopic = contentTopic
wakuMessage.payload = @(payload.toOpenArrayByte(0, payload.high))
wakuMessage.meta = @(meta.toOpenArrayByte(0, meta.high))
outRows.add(
(
@ -220,6 +223,7 @@ method put*(
let payload = toHex(message.payload)
let version = $message.version
let timestamp = $message.timestamp
let meta = toHex(message.meta)
trace "put PostgresDriver", timestamp = timestamp
@ -228,7 +232,7 @@ method put*(
InsertRowStmtDefinition,
@[
digest, messageHash, rxTime, contentTopic, payload, pubsubTopic, version,
timestamp,
timestamp, meta,
],
@[
int32(digest.len),
@ -239,8 +243,19 @@ method put*(
int32(pubsubTopic.len),
int32(version.len),
int32(timestamp.len),
int32(meta.len),
],
@[
int32(0),
int32(0),
int32(0),
int32(0),
int32(0),
int32(0),
int32(0),
int32(0),
int32(0),
],
@[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)],
)
method getAllMessages*(
@ -256,7 +271,7 @@ method getAllMessages*(
await s.readConnPool.pgQuery(
"""SELECT storedAt, contentTopic,
payload, pubsubTopic, version, timestamp,
id, messageHash FROM messages ORDER BY storedAt ASC""",
id, messageHash, meta FROM messages ORDER BY storedAt ASC""",
newSeq[string](0),
rowCallback,
)
@ -311,7 +326,7 @@ proc getMessagesArbitraryQuery(
## This proc allows to handle atypical queries. We don't use prepared statements for those.
var query =
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages"""
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta FROM messages"""
var statements: seq[string]
var args: seq[string]

View File

@ -18,7 +18,7 @@ type SqlQueryStr = string
proc queryRowWakuMessageCallback(
s: ptr sqlite3_stmt,
contentTopicCol, payloadCol, versionCol, senderTimestampCol: cint,
contentTopicCol, payloadCol, versionCol, senderTimestampCol, metaCol: cint,
): WakuMessage =
let
topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, contentTopicCol))
@ -26,17 +26,21 @@ proc queryRowWakuMessageCallback(
contentTopic = string.fromBytes(@(toOpenArray(topic, 0, topicLength - 1)))
p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, payloadCol))
m = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, metaCol))
length = sqlite3_column_bytes(s, payloadCol)
payload = @(toOpenArray(p, 0, length - 1))
payloadLength = sqlite3_column_bytes(s, payloadCol)
metaLength = sqlite3_column_bytes(s, metaCol)
payload = @(toOpenArray(p, 0, payloadLength - 1))
version = sqlite3_column_int64(s, versionCol)
senderTimestamp = sqlite3_column_int64(s, senderTimestampCol)
meta = @(toOpenArray(m, 0, metaLength - 1))
return WakuMessage(
contentTopic: ContentTopic(contentTopic),
payload: payload,
version: uint32(version),
timestamp: Timestamp(senderTimestamp),
meta: meta,
)
proc queryRowReceiverTimestampCallback(
@ -83,8 +87,8 @@ proc createTableQuery(table: string): SqlQueryStr =
"CREATE TABLE IF NOT EXISTS " & table & " (" & " pubsubTopic BLOB NOT NULL," &
" contentTopic BLOB NOT NULL," & " payload BLOB," & " version INTEGER NOT NULL," &
" timestamp INTEGER NOT NULL," & " id BLOB," & " messageHash BLOB," &
" storedAt INTEGER NOT NULL," & " CONSTRAINT messageIndex PRIMARY KEY (messageHash)" &
") WITHOUT ROWID;"
" storedAt INTEGER NOT NULL," & " meta BLOB," &
" CONSTRAINT messageIndex PRIMARY KEY (messageHash)" & ") WITHOUT ROWID;"
proc createTable*(db: SqliteDatabase): DatabaseResult[void] =
let query = createTableQuery(DbTable)
@ -129,14 +133,23 @@ proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] =
return ok()
## Insert message
type InsertMessageParams* =
(seq[byte], seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp)
type InsertMessageParams* = (
seq[byte],
seq[byte],
Timestamp,
seq[byte],
seq[byte],
seq[byte],
int64,
Timestamp,
seq[byte],
)
proc insertMessageQuery(table: string): SqlQueryStr =
return
"INSERT INTO " & table &
"(id, messageHash, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" &
" VALUES (?, ?, ?, ?, ?, ?, ?, ?);"
"(id, messageHash, storedAt, contentTopic, payload, pubsubTopic, version, timestamp, meta)" &
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?);"
proc prepareInsertMessageStmt*(
db: SqliteDatabase
@ -244,7 +257,7 @@ proc deleteOldestMessagesNotWithinLimit*(
proc selectAllMessagesQuery(table: string): SqlQueryStr =
return
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash" &
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta" &
" FROM " & table & " ORDER BY storedAt ASC"
proc selectAllMessages*(
@ -258,7 +271,12 @@ proc selectAllMessages*(
let
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol = 3)
wakuMessage = queryRowWakuMessageCallback(
s, contentTopicCol = 1, payloadCol = 2, versionCol = 4, senderTimestampCol = 5
s,
contentTopicCol = 1,
payloadCol = 2,
versionCol = 4,
senderTimestampCol = 5,
metaCol = 8,
)
digest = queryRowDigestCallback(s, digestCol = 6)
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol = 0)
@ -342,7 +360,7 @@ proc selectMessagesWithLimitQueryv2(
var query: string
query =
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash"
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta"
query &= " FROM " & table
if where.isSome():
@ -435,7 +453,12 @@ proc selectMessagesByHistoryQueryWithLimit*(
let
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol = 3)
message = queryRowWakuMessageCallback(
s, contentTopicCol = 1, payloadCol = 2, versionCol = 4, senderTimestampCol = 5
s,
contentTopicCol = 1,
payloadCol = 2,
versionCol = 4,
senderTimestampCol = 5,
metaCol = 8,
)
digest = queryRowDigestCallback(s, digestCol = 6)
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol = 0)
@ -485,7 +508,7 @@ proc execSelectMessageByHash(
proc selectMessageByHashQuery(): SqlQueryStr =
var query: string
query = "SELECT contentTopic, payload, version, timestamp, messageHash"
query = "SELECT contentTopic, payload, version, timestamp, meta, messageHash"
query &= " FROM " & DbTable
query &= " WHERE messageHash = (?)"
@ -621,7 +644,7 @@ proc selectMessagesWithLimitQuery(
var query: string
query =
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash"
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta"
query &= " FROM " & table
if where.isSome():
@ -655,7 +678,12 @@ proc selectMessagesByStoreQueryWithLimit*(
proc queryRowCallback(s: ptr sqlite3_stmt) =
wakuMessage = queryRowWakuMessageCallback(
s, contentTopicCol = 0, payloadCol = 1, versionCol = 2, senderTimestampCol = 3
s,
contentTopicCol = 0,
payloadCol = 1,
versionCol = 2,
senderTimestampCol = 3,
metaCol = 4,
)
let query = selectMessageByHashQuery()
@ -676,7 +704,12 @@ proc selectMessagesByStoreQueryWithLimit*(
let
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol = 3)
message = queryRowWakuMessageCallback(
s, contentTopicCol = 1, payloadCol = 2, versionCol = 4, senderTimestampCol = 5
s,
contentTopicCol = 1,
payloadCol = 2,
versionCol = 4,
senderTimestampCol = 5,
metaCol = 8,
)
digest = queryRowDigestCallback(s, digestCol = 6)
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol = 0)

View File

@ -72,6 +72,7 @@ method put*(
toBytes(pubsubTopic), # pubsubTopic
int64(message.version), # version
message.timestamp, # senderTimestamp
message.meta, # meta
)
)