mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-08 17:03:09 +00:00
feat: supporting meta field in store (#2609)
This commit is contained in:
parent
2a7984b951
commit
9be3221b5d
@ -0,0 +1,71 @@
|
|||||||
|
const ContentScriptVersion_4* =
|
||||||
|
"""
|
||||||
|
ALTER TABLE IF EXISTS messages_backup RENAME TO messages;
|
||||||
|
ALTER TABLE messages RENAME TO messages_backup;
|
||||||
|
ALTER TABLE messages_backup DROP CONSTRAINT messageIndex;
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS messages (
|
||||||
|
pubsubTopic VARCHAR NOT NULL,
|
||||||
|
contentTopic VARCHAR NOT NULL,
|
||||||
|
payload VARCHAR,
|
||||||
|
version INTEGER NOT NULL,
|
||||||
|
timestamp BIGINT NOT NULL,
|
||||||
|
id VARCHAR NOT NULL,
|
||||||
|
messageHash VARCHAR NOT NULL,
|
||||||
|
storedAt BIGINT NOT NULL,
|
||||||
|
meta VARCHAR,
|
||||||
|
CONSTRAINT messageIndex PRIMARY KEY (messageHash, storedAt)
|
||||||
|
) PARTITION BY RANGE (storedAt);
|
||||||
|
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
min_storedAt numeric;
|
||||||
|
max_storedAt numeric;
|
||||||
|
min_storedAtSeconds integer = 0;
|
||||||
|
max_storedAtSeconds integer = 0;
|
||||||
|
partition_name TEXT;
|
||||||
|
create_partition_stmt TEXT;
|
||||||
|
BEGIN
|
||||||
|
SELECT MIN(storedAt) into min_storedAt
|
||||||
|
FROM messages_backup;
|
||||||
|
|
||||||
|
SELECT MAX(storedAt) into max_storedAt
|
||||||
|
FROM messages_backup;
|
||||||
|
|
||||||
|
min_storedAtSeconds := min_storedAt / 1000000000;
|
||||||
|
max_storedAtSeconds := max_storedAt / 1000000000;
|
||||||
|
|
||||||
|
partition_name := 'messages_' || min_storedAtSeconds || '_' || max_storedAtSeconds;
|
||||||
|
create_partition_stmt := 'CREATE TABLE ' || partition_name ||
|
||||||
|
' PARTITION OF messages FOR VALUES FROM (' ||
|
||||||
|
min_storedAt || ') TO (' || (max_storedAt + 1) || ')';
|
||||||
|
IF min_storedAtSeconds > 0 AND max_storedAtSeconds > 0 THEN
|
||||||
|
EXECUTE create_partition_stmt USING partition_name, min_storedAt, max_storedAt;
|
||||||
|
END IF;
|
||||||
|
END $$;
|
||||||
|
|
||||||
|
INSERT INTO messages (
|
||||||
|
pubsubTopic,
|
||||||
|
contentTopic,
|
||||||
|
payload,
|
||||||
|
version,
|
||||||
|
timestamp,
|
||||||
|
id,
|
||||||
|
messageHash,
|
||||||
|
storedAt
|
||||||
|
)
|
||||||
|
SELECT pubsubTopic,
|
||||||
|
contentTopic,
|
||||||
|
payload,
|
||||||
|
version,
|
||||||
|
timestamp,
|
||||||
|
id,
|
||||||
|
messageHash,
|
||||||
|
storedAt
|
||||||
|
FROM messages_backup;
|
||||||
|
|
||||||
|
DROP TABLE messages_backup;
|
||||||
|
|
||||||
|
UPDATE version SET version = 4 WHERE version = 3;
|
||||||
|
|
||||||
|
"""
|
||||||
@ -1,4 +1,6 @@
|
|||||||
import content_script_version_1, content_script_version_2, content_script_version_3
|
import
|
||||||
|
content_script_version_1, content_script_version_2, content_script_version_3,
|
||||||
|
content_script_version_4
|
||||||
|
|
||||||
type MigrationScript* = object
|
type MigrationScript* = object
|
||||||
version*: int
|
version*: int
|
||||||
@ -12,6 +14,7 @@ const PgMigrationScripts* =
|
|||||||
MigrationScript(version: 1, scriptContent: ContentScriptVersion_1),
|
MigrationScript(version: 1, scriptContent: ContentScriptVersion_1),
|
||||||
MigrationScript(version: 2, scriptContent: ContentScriptVersion_2),
|
MigrationScript(version: 2, scriptContent: ContentScriptVersion_2),
|
||||||
MigrationScript(version: 3, scriptContent: ContentScriptVersion_3),
|
MigrationScript(version: 3, scriptContent: ContentScriptVersion_3),
|
||||||
|
MigrationScript(version: 4, scriptContent: ContentScriptVersion_4),
|
||||||
]
|
]
|
||||||
|
|
||||||
proc getMigrationScripts*(currentVersion: int64, targetVersion: int64): seq[string] =
|
proc getMigrationScripts*(currentVersion: int64, targetVersion: int64): seq[string] =
|
||||||
|
|||||||
@ -47,20 +47,27 @@ export waku_core.DefaultPubsubTopic, waku_core.DefaultContentTopic
|
|||||||
proc fakeWakuMessage*(
|
proc fakeWakuMessage*(
|
||||||
payload: string | seq[byte] = "TEST-PAYLOAD",
|
payload: string | seq[byte] = "TEST-PAYLOAD",
|
||||||
contentTopic = DefaultContentTopic,
|
contentTopic = DefaultContentTopic,
|
||||||
meta = newSeq[byte](),
|
meta: string | seq[byte] = newSeq[byte](),
|
||||||
ts = now(),
|
ts = now(),
|
||||||
ephemeral = false,
|
ephemeral = false,
|
||||||
): WakuMessage =
|
): WakuMessage =
|
||||||
var payloadBytes: seq[byte]
|
var payloadBytes: seq[byte]
|
||||||
|
var metaBytes: seq[byte]
|
||||||
|
|
||||||
when payload is string:
|
when payload is string:
|
||||||
payloadBytes = toBytes(payload)
|
payloadBytes = toBytes(payload)
|
||||||
else:
|
else:
|
||||||
payloadBytes = payload
|
payloadBytes = payload
|
||||||
|
|
||||||
|
when meta is string:
|
||||||
|
metaBytes = toBytes(meta)
|
||||||
|
else:
|
||||||
|
metaBytes = meta
|
||||||
|
|
||||||
WakuMessage(
|
WakuMessage(
|
||||||
payload: payloadBytes,
|
payload: payloadBytes,
|
||||||
contentTopic: contentTopic,
|
contentTopic: contentTopic,
|
||||||
meta: meta,
|
meta: metaBytes,
|
||||||
version: 2,
|
version: 2,
|
||||||
timestamp: ts,
|
timestamp: ts,
|
||||||
ephemeral: ephemeral,
|
ephemeral: ephemeral,
|
||||||
|
|||||||
@ -54,8 +54,9 @@ suite "Postgres driver":
|
|||||||
|
|
||||||
asyncTest "Insert a message":
|
asyncTest "Insert a message":
|
||||||
const contentTopic = "test-content-topic"
|
const contentTopic = "test-content-topic"
|
||||||
|
const meta = "test meta"
|
||||||
|
|
||||||
let msg = fakeWakuMessage(contentTopic = contentTopic)
|
let msg = fakeWakuMessage(contentTopic = contentTopic, meta = meta)
|
||||||
|
|
||||||
let computedDigest = computeDigest(msg)
|
let computedDigest = computeDigest(msg)
|
||||||
let computedHash = computeMessageHash(DefaultPubsubTopic, msg)
|
let computedHash = computeMessageHash(DefaultPubsubTopic, msg)
|
||||||
@ -75,6 +76,7 @@ suite "Postgres driver":
|
|||||||
assert toHex(computedDigest.data) == toHex(digest)
|
assert toHex(computedDigest.data) == toHex(digest)
|
||||||
assert toHex(actualMsg.payload) == toHex(msg.payload)
|
assert toHex(actualMsg.payload) == toHex(msg.payload)
|
||||||
assert toHex(computedHash) == toHex(hash)
|
assert toHex(computedHash) == toHex(hash)
|
||||||
|
assert toHex(actualMsg.meta) == toHex(msg.meta)
|
||||||
|
|
||||||
asyncTest "Insert and query message":
|
asyncTest "Insert and query message":
|
||||||
const contentTopic1 = "test-content-topic-1"
|
const contentTopic1 = "test-content-topic-1"
|
||||||
|
|||||||
@ -133,6 +133,61 @@ suite "Postgres driver - queries":
|
|||||||
check:
|
check:
|
||||||
filteredMessages == expected[2 .. 3]
|
filteredMessages == expected[2 .. 3]
|
||||||
|
|
||||||
|
asyncTest "single content topic with meta field":
|
||||||
|
## Given
|
||||||
|
const contentTopic = "test-content-topic"
|
||||||
|
|
||||||
|
let expected =
|
||||||
|
@[
|
||||||
|
fakeWakuMessage(@[byte 0], ts = ts(00), meta = "meta-0"),
|
||||||
|
fakeWakuMessage(@[byte 1], ts = ts(10), meta = "meta-1"),
|
||||||
|
fakeWakuMessage(
|
||||||
|
@[byte 2], contentTopic = contentTopic, ts = ts(20), meta = "meta-2"
|
||||||
|
),
|
||||||
|
fakeWakuMessage(
|
||||||
|
@[byte 3], contentTopic = contentTopic, ts = ts(30), meta = "meta-3"
|
||||||
|
),
|
||||||
|
fakeWakuMessage(
|
||||||
|
@[byte 4], contentTopic = contentTopic, ts = ts(40), meta = "meta-4"
|
||||||
|
),
|
||||||
|
fakeWakuMessage(
|
||||||
|
@[byte 5], contentTopic = contentTopic, ts = ts(50), meta = "meta-5"
|
||||||
|
),
|
||||||
|
fakeWakuMessage(
|
||||||
|
@[byte 6], contentTopic = contentTopic, ts = ts(60), meta = "meta-6"
|
||||||
|
),
|
||||||
|
fakeWakuMessage(
|
||||||
|
@[byte 7], contentTopic = contentTopic, ts = ts(70), meta = "meta-7"
|
||||||
|
),
|
||||||
|
]
|
||||||
|
var messages = expected
|
||||||
|
|
||||||
|
shuffle(messages)
|
||||||
|
debug "randomized message insertion sequence", sequence = messages.mapIt(it.payload)
|
||||||
|
|
||||||
|
for msg in messages:
|
||||||
|
require (
|
||||||
|
await driver.put(
|
||||||
|
DefaultPubsubTopic,
|
||||||
|
msg,
|
||||||
|
computeDigest(msg),
|
||||||
|
computeMessageHash(DefaultPubsubTopic, msg),
|
||||||
|
msg.timestamp,
|
||||||
|
)
|
||||||
|
).isOk()
|
||||||
|
|
||||||
|
## When
|
||||||
|
let res = await driver.getMessages(
|
||||||
|
contentTopic = @[contentTopic], maxPageSize = 2, ascendingOrder = true
|
||||||
|
)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
assert res.isOk(), res.error
|
||||||
|
|
||||||
|
let filteredMessages = res.tryGet().mapIt(it[1])
|
||||||
|
check:
|
||||||
|
filteredMessages == expected[2 .. 3]
|
||||||
|
|
||||||
asyncTest "single content topic - descending order":
|
asyncTest "single content topic - descending order":
|
||||||
## Given
|
## Given
|
||||||
const contentTopic = "test-content-topic"
|
const contentTopic = "test-content-topic"
|
||||||
|
|||||||
@ -32,10 +32,11 @@ suite "SQLite driver":
|
|||||||
test "insert a message":
|
test "insert a message":
|
||||||
## Given
|
## Given
|
||||||
const contentTopic = "test-content-topic"
|
const contentTopic = "test-content-topic"
|
||||||
|
const meta = "test meta"
|
||||||
|
|
||||||
let driver = newSqliteArchiveDriver()
|
let driver = newSqliteArchiveDriver()
|
||||||
|
|
||||||
let msg = fakeWakuMessage(contentTopic = contentTopic)
|
let msg = fakeWakuMessage(contentTopic = contentTopic, meta = meta)
|
||||||
let msgHash = computeMessageHash(DefaultPubsubTopic, msg)
|
let msgHash = computeMessageHash(DefaultPubsubTopic, msg)
|
||||||
|
|
||||||
## When
|
## When
|
||||||
@ -51,9 +52,9 @@ suite "SQLite driver":
|
|||||||
check:
|
check:
|
||||||
storedMsg.len == 1
|
storedMsg.len == 1
|
||||||
storedMsg.all do(item: auto) -> bool:
|
storedMsg.all do(item: auto) -> bool:
|
||||||
let (pubsubTopic, msg, _, _, hash) = item
|
let (pubsubTopic, actualMsg, _, _, hash) = item
|
||||||
msg.contentTopic == contentTopic and pubsubTopic == DefaultPubsubTopic and
|
actualMsg.contentTopic == contentTopic and pubsubTopic == DefaultPubsubTopic and
|
||||||
hash == msgHash
|
hash == msgHash and msg.meta == actualMsg.meta
|
||||||
|
|
||||||
## Cleanup
|
## Cleanup
|
||||||
(waitFor driver.close()).expect("driver to close")
|
(waitFor driver.close()).expect("driver to close")
|
||||||
|
|||||||
@ -116,6 +116,67 @@ suite "SQLite driver - query by content topic":
|
|||||||
## Cleanup
|
## Cleanup
|
||||||
(await driver.close()).expect("driver to close")
|
(await driver.close()).expect("driver to close")
|
||||||
|
|
||||||
|
asyncTest "single content topic with meta field":
|
||||||
|
## Given
|
||||||
|
const contentTopic = "test-content-topic"
|
||||||
|
|
||||||
|
let driver = newSqliteArchiveDriver()
|
||||||
|
|
||||||
|
let expected =
|
||||||
|
@[
|
||||||
|
fakeWakuMessage(@[byte 0], ts = ts(00), meta = "meta-0"),
|
||||||
|
fakeWakuMessage(@[byte 1], ts = ts(10), meta = "meta-1"),
|
||||||
|
fakeWakuMessage(
|
||||||
|
@[byte 2], contentTopic = contentTopic, ts = ts(20), meta = "meta-2"
|
||||||
|
),
|
||||||
|
fakeWakuMessage(
|
||||||
|
@[byte 3], contentTopic = contentTopic, ts = ts(30), meta = "meta-3"
|
||||||
|
),
|
||||||
|
fakeWakuMessage(
|
||||||
|
@[byte 4], contentTopic = contentTopic, ts = ts(40), meta = "meta-4"
|
||||||
|
),
|
||||||
|
fakeWakuMessage(
|
||||||
|
@[byte 5], contentTopic = contentTopic, ts = ts(50), meta = "meta-5"
|
||||||
|
),
|
||||||
|
fakeWakuMessage(
|
||||||
|
@[byte 6], contentTopic = contentTopic, ts = ts(60), meta = "meta-6"
|
||||||
|
),
|
||||||
|
fakeWakuMessage(
|
||||||
|
@[byte 7], contentTopic = contentTopic, ts = ts(70), meta = "meta-7"
|
||||||
|
),
|
||||||
|
]
|
||||||
|
var messages = expected
|
||||||
|
|
||||||
|
shuffle(messages)
|
||||||
|
debug "randomized message insertion sequence", sequence = messages.mapIt(it.payload)
|
||||||
|
|
||||||
|
for msg in messages:
|
||||||
|
require (
|
||||||
|
await driver.put(
|
||||||
|
DefaultPubsubTopic,
|
||||||
|
msg,
|
||||||
|
computeDigest(msg),
|
||||||
|
computeMessageHash(DefaultPubsubTopic, msg),
|
||||||
|
msg.timestamp,
|
||||||
|
)
|
||||||
|
).isOk()
|
||||||
|
|
||||||
|
## When
|
||||||
|
let res = await driver.getMessages(
|
||||||
|
contentTopic = @[contentTopic], maxPageSize = 2, ascendingOrder = true
|
||||||
|
)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
check:
|
||||||
|
res.isOk()
|
||||||
|
|
||||||
|
let filteredMessages = res.tryGet().mapIt(it[1])
|
||||||
|
check:
|
||||||
|
filteredMessages == expected[2 .. 3]
|
||||||
|
|
||||||
|
## Cleanup
|
||||||
|
(await driver.close()).expect("driver to close")
|
||||||
|
|
||||||
asyncTest "single content topic - descending order":
|
asyncTest "single content topic - descending order":
|
||||||
## Given
|
## Given
|
||||||
const contentTopic = "test-content-topic"
|
const contentTopic = "test-content-topic"
|
||||||
|
|||||||
@ -34,6 +34,7 @@ proc msgHash*(pubSubTopic: string, msg: WakuMessage): array[32, byte] =
|
|||||||
ctx.update(msg.payload)
|
ctx.update(msg.payload)
|
||||||
ctx.update(msg.contentTopic.toBytes())
|
ctx.update(msg.contentTopic.toBytes())
|
||||||
ctx.update(msg.timestamp.uint64.toBytes(Endianness.littleEndian))
|
ctx.update(msg.timestamp.uint64.toBytes(Endianness.littleEndian))
|
||||||
|
# ctx.update(msg.meta) meta is not included in the message hash, as the signature goes in the meta field
|
||||||
ctx.update(
|
ctx.update(
|
||||||
if msg.ephemeral:
|
if msg.ephemeral:
|
||||||
@[1.byte]
|
@[1.byte]
|
||||||
|
|||||||
@ -9,7 +9,7 @@ import
|
|||||||
logScope:
|
logScope:
|
||||||
topics = "waku archive migration"
|
topics = "waku archive migration"
|
||||||
|
|
||||||
const SchemaVersion* = 3 # increase this when there is an update in the database schema
|
const SchemaVersion* = 4 # increase this when there is an update in the database schema
|
||||||
|
|
||||||
proc breakIntoStatements*(script: string): seq[string] =
|
proc breakIntoStatements*(script: string): seq[string] =
|
||||||
## Given a full migration script, that can potentially contain a list
|
## Given a full migration script, that can potentially contain a list
|
||||||
|
|||||||
@ -31,11 +31,11 @@ type PostgresDriver* = ref object of ArchiveDriver
|
|||||||
const InsertRowStmtName = "InsertRow"
|
const InsertRowStmtName = "InsertRow"
|
||||||
const InsertRowStmtDefinition = # TODO: get the sql queries from a file
|
const InsertRowStmtDefinition = # TODO: get the sql queries from a file
|
||||||
"""INSERT INTO messages (id, messageHash, storedAt, contentTopic, payload, pubsubTopic,
|
"""INSERT INTO messages (id, messageHash, storedAt, contentTopic, payload, pubsubTopic,
|
||||||
version, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT DO NOTHING;"""
|
version, timestamp, meta) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, CASE WHEN $9 = '' THEN NULL ELSE $9 END) ON CONFLICT DO NOTHING;"""
|
||||||
|
|
||||||
const SelectNoCursorAscStmtName = "SelectWithoutCursorAsc"
|
const SelectNoCursorAscStmtName = "SelectWithoutCursorAsc"
|
||||||
const SelectNoCursorAscStmtDef =
|
const SelectNoCursorAscStmtDef =
|
||||||
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages
|
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta FROM messages
|
||||||
WHERE contentTopic IN ($1) AND
|
WHERE contentTopic IN ($1) AND
|
||||||
messageHash IN ($2) AND
|
messageHash IN ($2) AND
|
||||||
pubsubTopic = $3 AND
|
pubsubTopic = $3 AND
|
||||||
@ -86,7 +86,7 @@ const SelectNoCursorV2AscStmtDef =
|
|||||||
|
|
||||||
const SelectNoCursorV2DescStmtName = "SelectWithoutCursorV2Desc"
|
const SelectNoCursorV2DescStmtName = "SelectWithoutCursorV2Desc"
|
||||||
const SelectNoCursorV2DescStmtDef =
|
const SelectNoCursorV2DescStmtDef =
|
||||||
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages
|
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta FROM messages
|
||||||
WHERE contentTopic IN ($1) AND
|
WHERE contentTopic IN ($1) AND
|
||||||
pubsubTopic = $2 AND
|
pubsubTopic = $2 AND
|
||||||
storedAt >= $3 AND
|
storedAt >= $3 AND
|
||||||
@ -95,7 +95,7 @@ const SelectNoCursorV2DescStmtDef =
|
|||||||
|
|
||||||
const SelectWithCursorV2DescStmtName = "SelectWithCursorV2Desc"
|
const SelectWithCursorV2DescStmtName = "SelectWithCursorV2Desc"
|
||||||
const SelectWithCursorV2DescStmtDef =
|
const SelectWithCursorV2DescStmtDef =
|
||||||
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages
|
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta FROM messages
|
||||||
WHERE contentTopic IN ($1) AND
|
WHERE contentTopic IN ($1) AND
|
||||||
pubsubTopic = $2 AND
|
pubsubTopic = $2 AND
|
||||||
(storedAt, id) < ($3,$4) AND
|
(storedAt, id) < ($3,$4) AND
|
||||||
@ -105,7 +105,7 @@ const SelectWithCursorV2DescStmtDef =
|
|||||||
|
|
||||||
const SelectWithCursorV2AscStmtName = "SelectWithCursorV2Asc"
|
const SelectWithCursorV2AscStmtName = "SelectWithCursorV2Asc"
|
||||||
const SelectWithCursorV2AscStmtDef =
|
const SelectWithCursorV2AscStmtDef =
|
||||||
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages
|
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta FROM messages
|
||||||
WHERE contentTopic IN ($1) AND
|
WHERE contentTopic IN ($1) AND
|
||||||
pubsubTopic = $2 AND
|
pubsubTopic = $2 AND
|
||||||
(storedAt, id) > ($3,$4) AND
|
(storedAt, id) > ($3,$4) AND
|
||||||
@ -161,7 +161,7 @@ proc rowCallbackImpl(
|
|||||||
## outRows - seq of Store-rows. This is populated from the info contained in pqResult
|
## outRows - seq of Store-rows. This is populated from the info contained in pqResult
|
||||||
|
|
||||||
let numFields = pqResult.pqnfields()
|
let numFields = pqResult.pqnfields()
|
||||||
if numFields != 8:
|
if numFields != 9:
|
||||||
error "Wrong number of fields"
|
error "Wrong number of fields"
|
||||||
return
|
return
|
||||||
|
|
||||||
@ -176,6 +176,7 @@ proc rowCallbackImpl(
|
|||||||
var payload: string
|
var payload: string
|
||||||
var hashHex: string
|
var hashHex: string
|
||||||
var msgHash: WakuMessageHash
|
var msgHash: WakuMessageHash
|
||||||
|
var meta: string
|
||||||
|
|
||||||
try:
|
try:
|
||||||
storedAt = parseInt($(pqgetvalue(pqResult, iRow, 0)))
|
storedAt = parseInt($(pqgetvalue(pqResult, iRow, 0)))
|
||||||
@ -186,6 +187,7 @@ proc rowCallbackImpl(
|
|||||||
timestamp = parseInt($(pqgetvalue(pqResult, iRow, 5)))
|
timestamp = parseInt($(pqgetvalue(pqResult, iRow, 5)))
|
||||||
digest = parseHexStr($(pqgetvalue(pqResult, iRow, 6)))
|
digest = parseHexStr($(pqgetvalue(pqResult, iRow, 6)))
|
||||||
hashHex = parseHexStr($(pqgetvalue(pqResult, iRow, 7)))
|
hashHex = parseHexStr($(pqgetvalue(pqResult, iRow, 7)))
|
||||||
|
meta = parseHexStr($(pqgetvalue(pqResult, iRow, 8)))
|
||||||
msgHash = fromBytes(hashHex.toOpenArrayByte(0, 31))
|
msgHash = fromBytes(hashHex.toOpenArrayByte(0, 31))
|
||||||
except ValueError:
|
except ValueError:
|
||||||
error "could not parse correctly", error = getCurrentExceptionMsg()
|
error "could not parse correctly", error = getCurrentExceptionMsg()
|
||||||
@ -194,6 +196,7 @@ proc rowCallbackImpl(
|
|||||||
wakuMessage.version = uint32(version)
|
wakuMessage.version = uint32(version)
|
||||||
wakuMessage.contentTopic = contentTopic
|
wakuMessage.contentTopic = contentTopic
|
||||||
wakuMessage.payload = @(payload.toOpenArrayByte(0, payload.high))
|
wakuMessage.payload = @(payload.toOpenArrayByte(0, payload.high))
|
||||||
|
wakuMessage.meta = @(meta.toOpenArrayByte(0, meta.high))
|
||||||
|
|
||||||
outRows.add(
|
outRows.add(
|
||||||
(
|
(
|
||||||
@ -220,6 +223,7 @@ method put*(
|
|||||||
let payload = toHex(message.payload)
|
let payload = toHex(message.payload)
|
||||||
let version = $message.version
|
let version = $message.version
|
||||||
let timestamp = $message.timestamp
|
let timestamp = $message.timestamp
|
||||||
|
let meta = toHex(message.meta)
|
||||||
|
|
||||||
trace "put PostgresDriver", timestamp = timestamp
|
trace "put PostgresDriver", timestamp = timestamp
|
||||||
|
|
||||||
@ -228,7 +232,7 @@ method put*(
|
|||||||
InsertRowStmtDefinition,
|
InsertRowStmtDefinition,
|
||||||
@[
|
@[
|
||||||
digest, messageHash, rxTime, contentTopic, payload, pubsubTopic, version,
|
digest, messageHash, rxTime, contentTopic, payload, pubsubTopic, version,
|
||||||
timestamp,
|
timestamp, meta,
|
||||||
],
|
],
|
||||||
@[
|
@[
|
||||||
int32(digest.len),
|
int32(digest.len),
|
||||||
@ -239,8 +243,19 @@ method put*(
|
|||||||
int32(pubsubTopic.len),
|
int32(pubsubTopic.len),
|
||||||
int32(version.len),
|
int32(version.len),
|
||||||
int32(timestamp.len),
|
int32(timestamp.len),
|
||||||
|
int32(meta.len),
|
||||||
|
],
|
||||||
|
@[
|
||||||
|
int32(0),
|
||||||
|
int32(0),
|
||||||
|
int32(0),
|
||||||
|
int32(0),
|
||||||
|
int32(0),
|
||||||
|
int32(0),
|
||||||
|
int32(0),
|
||||||
|
int32(0),
|
||||||
|
int32(0),
|
||||||
],
|
],
|
||||||
@[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)],
|
|
||||||
)
|
)
|
||||||
|
|
||||||
method getAllMessages*(
|
method getAllMessages*(
|
||||||
@ -256,7 +271,7 @@ method getAllMessages*(
|
|||||||
await s.readConnPool.pgQuery(
|
await s.readConnPool.pgQuery(
|
||||||
"""SELECT storedAt, contentTopic,
|
"""SELECT storedAt, contentTopic,
|
||||||
payload, pubsubTopic, version, timestamp,
|
payload, pubsubTopic, version, timestamp,
|
||||||
id, messageHash FROM messages ORDER BY storedAt ASC""",
|
id, messageHash, meta FROM messages ORDER BY storedAt ASC""",
|
||||||
newSeq[string](0),
|
newSeq[string](0),
|
||||||
rowCallback,
|
rowCallback,
|
||||||
)
|
)
|
||||||
@ -311,7 +326,7 @@ proc getMessagesArbitraryQuery(
|
|||||||
## This proc allows to handle atypical queries. We don't use prepared statements for those.
|
## This proc allows to handle atypical queries. We don't use prepared statements for those.
|
||||||
|
|
||||||
var query =
|
var query =
|
||||||
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages"""
|
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta FROM messages"""
|
||||||
var statements: seq[string]
|
var statements: seq[string]
|
||||||
var args: seq[string]
|
var args: seq[string]
|
||||||
|
|
||||||
|
|||||||
@ -18,7 +18,7 @@ type SqlQueryStr = string
|
|||||||
|
|
||||||
proc queryRowWakuMessageCallback(
|
proc queryRowWakuMessageCallback(
|
||||||
s: ptr sqlite3_stmt,
|
s: ptr sqlite3_stmt,
|
||||||
contentTopicCol, payloadCol, versionCol, senderTimestampCol: cint,
|
contentTopicCol, payloadCol, versionCol, senderTimestampCol, metaCol: cint,
|
||||||
): WakuMessage =
|
): WakuMessage =
|
||||||
let
|
let
|
||||||
topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, contentTopicCol))
|
topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, contentTopicCol))
|
||||||
@ -26,17 +26,21 @@ proc queryRowWakuMessageCallback(
|
|||||||
contentTopic = string.fromBytes(@(toOpenArray(topic, 0, topicLength - 1)))
|
contentTopic = string.fromBytes(@(toOpenArray(topic, 0, topicLength - 1)))
|
||||||
|
|
||||||
p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, payloadCol))
|
p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, payloadCol))
|
||||||
|
m = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, metaCol))
|
||||||
|
|
||||||
length = sqlite3_column_bytes(s, payloadCol)
|
payloadLength = sqlite3_column_bytes(s, payloadCol)
|
||||||
payload = @(toOpenArray(p, 0, length - 1))
|
metaLength = sqlite3_column_bytes(s, metaCol)
|
||||||
|
payload = @(toOpenArray(p, 0, payloadLength - 1))
|
||||||
version = sqlite3_column_int64(s, versionCol)
|
version = sqlite3_column_int64(s, versionCol)
|
||||||
senderTimestamp = sqlite3_column_int64(s, senderTimestampCol)
|
senderTimestamp = sqlite3_column_int64(s, senderTimestampCol)
|
||||||
|
meta = @(toOpenArray(m, 0, metaLength - 1))
|
||||||
|
|
||||||
return WakuMessage(
|
return WakuMessage(
|
||||||
contentTopic: ContentTopic(contentTopic),
|
contentTopic: ContentTopic(contentTopic),
|
||||||
payload: payload,
|
payload: payload,
|
||||||
version: uint32(version),
|
version: uint32(version),
|
||||||
timestamp: Timestamp(senderTimestamp),
|
timestamp: Timestamp(senderTimestamp),
|
||||||
|
meta: meta,
|
||||||
)
|
)
|
||||||
|
|
||||||
proc queryRowReceiverTimestampCallback(
|
proc queryRowReceiverTimestampCallback(
|
||||||
@ -83,8 +87,8 @@ proc createTableQuery(table: string): SqlQueryStr =
|
|||||||
"CREATE TABLE IF NOT EXISTS " & table & " (" & " pubsubTopic BLOB NOT NULL," &
|
"CREATE TABLE IF NOT EXISTS " & table & " (" & " pubsubTopic BLOB NOT NULL," &
|
||||||
" contentTopic BLOB NOT NULL," & " payload BLOB," & " version INTEGER NOT NULL," &
|
" contentTopic BLOB NOT NULL," & " payload BLOB," & " version INTEGER NOT NULL," &
|
||||||
" timestamp INTEGER NOT NULL," & " id BLOB," & " messageHash BLOB," &
|
" timestamp INTEGER NOT NULL," & " id BLOB," & " messageHash BLOB," &
|
||||||
" storedAt INTEGER NOT NULL," & " CONSTRAINT messageIndex PRIMARY KEY (messageHash)" &
|
" storedAt INTEGER NOT NULL," & " meta BLOB," &
|
||||||
") WITHOUT ROWID;"
|
" CONSTRAINT messageIndex PRIMARY KEY (messageHash)" & ") WITHOUT ROWID;"
|
||||||
|
|
||||||
proc createTable*(db: SqliteDatabase): DatabaseResult[void] =
|
proc createTable*(db: SqliteDatabase): DatabaseResult[void] =
|
||||||
let query = createTableQuery(DbTable)
|
let query = createTableQuery(DbTable)
|
||||||
@ -129,14 +133,23 @@ proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] =
|
|||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
## Insert message
|
## Insert message
|
||||||
type InsertMessageParams* =
|
type InsertMessageParams* = (
|
||||||
(seq[byte], seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp)
|
seq[byte],
|
||||||
|
seq[byte],
|
||||||
|
Timestamp,
|
||||||
|
seq[byte],
|
||||||
|
seq[byte],
|
||||||
|
seq[byte],
|
||||||
|
int64,
|
||||||
|
Timestamp,
|
||||||
|
seq[byte],
|
||||||
|
)
|
||||||
|
|
||||||
proc insertMessageQuery(table: string): SqlQueryStr =
|
proc insertMessageQuery(table: string): SqlQueryStr =
|
||||||
return
|
return
|
||||||
"INSERT INTO " & table &
|
"INSERT INTO " & table &
|
||||||
"(id, messageHash, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" &
|
"(id, messageHash, storedAt, contentTopic, payload, pubsubTopic, version, timestamp, meta)" &
|
||||||
" VALUES (?, ?, ?, ?, ?, ?, ?, ?);"
|
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?);"
|
||||||
|
|
||||||
proc prepareInsertMessageStmt*(
|
proc prepareInsertMessageStmt*(
|
||||||
db: SqliteDatabase
|
db: SqliteDatabase
|
||||||
@ -244,7 +257,7 @@ proc deleteOldestMessagesNotWithinLimit*(
|
|||||||
|
|
||||||
proc selectAllMessagesQuery(table: string): SqlQueryStr =
|
proc selectAllMessagesQuery(table: string): SqlQueryStr =
|
||||||
return
|
return
|
||||||
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash" &
|
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta" &
|
||||||
" FROM " & table & " ORDER BY storedAt ASC"
|
" FROM " & table & " ORDER BY storedAt ASC"
|
||||||
|
|
||||||
proc selectAllMessages*(
|
proc selectAllMessages*(
|
||||||
@ -258,7 +271,12 @@ proc selectAllMessages*(
|
|||||||
let
|
let
|
||||||
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol = 3)
|
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol = 3)
|
||||||
wakuMessage = queryRowWakuMessageCallback(
|
wakuMessage = queryRowWakuMessageCallback(
|
||||||
s, contentTopicCol = 1, payloadCol = 2, versionCol = 4, senderTimestampCol = 5
|
s,
|
||||||
|
contentTopicCol = 1,
|
||||||
|
payloadCol = 2,
|
||||||
|
versionCol = 4,
|
||||||
|
senderTimestampCol = 5,
|
||||||
|
metaCol = 8,
|
||||||
)
|
)
|
||||||
digest = queryRowDigestCallback(s, digestCol = 6)
|
digest = queryRowDigestCallback(s, digestCol = 6)
|
||||||
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol = 0)
|
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol = 0)
|
||||||
@ -342,7 +360,7 @@ proc selectMessagesWithLimitQueryv2(
|
|||||||
var query: string
|
var query: string
|
||||||
|
|
||||||
query =
|
query =
|
||||||
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash"
|
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta"
|
||||||
query &= " FROM " & table
|
query &= " FROM " & table
|
||||||
|
|
||||||
if where.isSome():
|
if where.isSome():
|
||||||
@ -435,7 +453,12 @@ proc selectMessagesByHistoryQueryWithLimit*(
|
|||||||
let
|
let
|
||||||
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol = 3)
|
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol = 3)
|
||||||
message = queryRowWakuMessageCallback(
|
message = queryRowWakuMessageCallback(
|
||||||
s, contentTopicCol = 1, payloadCol = 2, versionCol = 4, senderTimestampCol = 5
|
s,
|
||||||
|
contentTopicCol = 1,
|
||||||
|
payloadCol = 2,
|
||||||
|
versionCol = 4,
|
||||||
|
senderTimestampCol = 5,
|
||||||
|
metaCol = 8,
|
||||||
)
|
)
|
||||||
digest = queryRowDigestCallback(s, digestCol = 6)
|
digest = queryRowDigestCallback(s, digestCol = 6)
|
||||||
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol = 0)
|
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol = 0)
|
||||||
@ -485,7 +508,7 @@ proc execSelectMessageByHash(
|
|||||||
proc selectMessageByHashQuery(): SqlQueryStr =
|
proc selectMessageByHashQuery(): SqlQueryStr =
|
||||||
var query: string
|
var query: string
|
||||||
|
|
||||||
query = "SELECT contentTopic, payload, version, timestamp, messageHash"
|
query = "SELECT contentTopic, payload, version, timestamp, meta, messageHash"
|
||||||
query &= " FROM " & DbTable
|
query &= " FROM " & DbTable
|
||||||
query &= " WHERE messageHash = (?)"
|
query &= " WHERE messageHash = (?)"
|
||||||
|
|
||||||
@ -621,7 +644,7 @@ proc selectMessagesWithLimitQuery(
|
|||||||
var query: string
|
var query: string
|
||||||
|
|
||||||
query =
|
query =
|
||||||
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash"
|
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta"
|
||||||
query &= " FROM " & table
|
query &= " FROM " & table
|
||||||
|
|
||||||
if where.isSome():
|
if where.isSome():
|
||||||
@ -655,7 +678,12 @@ proc selectMessagesByStoreQueryWithLimit*(
|
|||||||
|
|
||||||
proc queryRowCallback(s: ptr sqlite3_stmt) =
|
proc queryRowCallback(s: ptr sqlite3_stmt) =
|
||||||
wakuMessage = queryRowWakuMessageCallback(
|
wakuMessage = queryRowWakuMessageCallback(
|
||||||
s, contentTopicCol = 0, payloadCol = 1, versionCol = 2, senderTimestampCol = 3
|
s,
|
||||||
|
contentTopicCol = 0,
|
||||||
|
payloadCol = 1,
|
||||||
|
versionCol = 2,
|
||||||
|
senderTimestampCol = 3,
|
||||||
|
metaCol = 4,
|
||||||
)
|
)
|
||||||
|
|
||||||
let query = selectMessageByHashQuery()
|
let query = selectMessageByHashQuery()
|
||||||
@ -676,7 +704,12 @@ proc selectMessagesByStoreQueryWithLimit*(
|
|||||||
let
|
let
|
||||||
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol = 3)
|
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol = 3)
|
||||||
message = queryRowWakuMessageCallback(
|
message = queryRowWakuMessageCallback(
|
||||||
s, contentTopicCol = 1, payloadCol = 2, versionCol = 4, senderTimestampCol = 5
|
s,
|
||||||
|
contentTopicCol = 1,
|
||||||
|
payloadCol = 2,
|
||||||
|
versionCol = 4,
|
||||||
|
senderTimestampCol = 5,
|
||||||
|
metaCol = 8,
|
||||||
)
|
)
|
||||||
digest = queryRowDigestCallback(s, digestCol = 6)
|
digest = queryRowDigestCallback(s, digestCol = 6)
|
||||||
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol = 0)
|
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol = 0)
|
||||||
|
|||||||
@ -72,6 +72,7 @@ method put*(
|
|||||||
toBytes(pubsubTopic), # pubsubTopic
|
toBytes(pubsubTopic), # pubsubTopic
|
||||||
int64(message.version), # version
|
int64(message.version), # version
|
||||||
message.timestamp, # senderTimestamp
|
message.timestamp, # senderTimestamp
|
||||||
|
message.meta, # meta
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user