mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-04 15:03:08 +00:00
feat: Add new DB column messageHash (#2202)
* feat: added DB column messageHash * feat: minor change * feat: minor merge conflict fix * Update test_resume.nim * Update test_resume.nim * randomblob() func used to populate attribute * PRIMARY key updated - SQLite and Postgres
This commit is contained in:
parent
a22ee60494
commit
aeb77a3e75
@ -0,0 +1,28 @@
|
||||
ALTER TABLE message RENAME TO message_backup;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS message (
|
||||
pubsubTopic BLOB NOT NULL,
|
||||
contentTopic BLOB NOT NULL,
|
||||
payload BLOB,
|
||||
version INTEGER NOT NULL,
|
||||
timestamp INTEGER NOT NULL,
|
||||
id BLOB,
|
||||
messageHash BLOB, -- Newly added, this will be populated with a counter value
|
||||
storedAt INTEGER NOT NULL,
|
||||
CONSTRAINT messageIndex PRIMARY KEY (messageHash)
|
||||
) WITHOUT ROWID;
|
||||
|
||||
|
||||
INSERT INTO message(pubsubTopic, contentTopic, payload, version, timestamp, id, messageHash, storedAt)
|
||||
SELECT
|
||||
mb.pubsubTopic,
|
||||
mb.contentTopic,
|
||||
mb.payload,
|
||||
mb.version,
|
||||
mb.timestamp,
|
||||
mb.id,
|
||||
randomblob(32), -- to populate 32-byte random blob
|
||||
mb.storedAt
|
||||
FROM message_backup AS mb;
|
||||
|
||||
DROP TABLE message_backup;
|
||||
@ -8,6 +8,7 @@ import
|
||||
../../../waku/waku_archive,
|
||||
../../../waku/waku_archive/driver/postgres_driver,
|
||||
../../../waku/waku_core,
|
||||
../../../waku/waku_core/message/digest,
|
||||
../testlib/wakucore
|
||||
|
||||
proc now():int64 = getTime().toUnix()
|
||||
@ -80,7 +81,7 @@ suite "Postgres driver":
|
||||
|
||||
let computedDigest = computeDigest(msg)
|
||||
|
||||
let putRes = await driver.put(DefaultPubsubTopic, msg, computedDigest, msg.timestamp)
|
||||
let putRes = await driver.put(DefaultPubsubTopic, msg, computedDigest, computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)
|
||||
assert putRes.isOk(), putRes.error
|
||||
|
||||
let storedMsg = (await driver.getAllMessages()).tryGet()
|
||||
@ -113,12 +114,12 @@ suite "Postgres driver":
|
||||
|
||||
let msg1 = fakeWakuMessage(contentTopic=contentTopic1)
|
||||
|
||||
var putRes = await driver.put(pubsubTopic1, msg1, computeDigest(msg1), msg1.timestamp)
|
||||
var putRes = await driver.put(pubsubTopic1, msg1, computeDigest(msg1), computeMessageHash(pubsubTopic1, msg1), msg1.timestamp)
|
||||
assert putRes.isOk(), putRes.error
|
||||
|
||||
let msg2 = fakeWakuMessage(contentTopic=contentTopic2)
|
||||
|
||||
putRes = await driver.put(pubsubTopic2, msg2, computeDigest(msg2), msg2.timestamp)
|
||||
putRes = await driver.put(pubsubTopic2, msg2, computeDigest(msg2), computeMessageHash(pubsubTopic2, msg2), msg2.timestamp)
|
||||
assert putRes.isOk(), putRes.error
|
||||
|
||||
let countMessagesRes = await driver.getMessagesCount()
|
||||
@ -197,11 +198,11 @@ suite "Postgres driver":
|
||||
let msg2 = fakeWakuMessage(ts = now)
|
||||
|
||||
var putRes = await driver.put(DefaultPubsubTopic,
|
||||
msg1, computeDigest(msg1), msg1.timestamp)
|
||||
msg1, computeDigest(msg1), computeMessageHash(DefaultPubsubTopic, msg1), msg1.timestamp)
|
||||
assert putRes.isOk(), putRes.error
|
||||
|
||||
putRes = await driver.put(DefaultPubsubTopic,
|
||||
msg2, computeDigest(msg2), msg2.timestamp)
|
||||
msg2, computeDigest(msg2), computeMessageHash(DefaultPubsubTopic, msg2), msg2.timestamp)
|
||||
require not putRes.isOk()
|
||||
|
||||
(await driver.close()).expect("driver to close")
|
||||
|
||||
@ -9,12 +9,15 @@ import
|
||||
../../../waku/waku_archive,
|
||||
../../../waku/waku_archive/driver/postgres_driver,
|
||||
../../../waku/waku_core,
|
||||
../../../waku/waku_core/message/digest,
|
||||
../testlib/common,
|
||||
../testlib/wakucore
|
||||
|
||||
|
||||
logScope:
|
||||
topics = "test archive postgres driver"
|
||||
|
||||
|
||||
## This whole file is copied from the 'test_driver_sqlite_query.nim' file
|
||||
## and it tests the same use cases but using the postgres driver.
|
||||
|
||||
@ -65,7 +68,7 @@ suite "Postgres driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -107,7 +110,7 @@ suite "Postgres driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -150,7 +153,7 @@ suite "Postgres driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -195,7 +198,7 @@ suite "Postgres driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -233,7 +236,7 @@ suite "Postgres driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -260,7 +263,7 @@ suite "Postgres driver - query by content topic":
|
||||
|
||||
for t in 0..<40:
|
||||
let msg = fakeWakuMessage(@[byte t], DefaultContentTopic, ts=ts(t))
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -306,7 +309,7 @@ suite "Postgres driver - query by pubsub topic":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -351,7 +354,7 @@ suite "Postgres driver - query by pubsub topic":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -396,7 +399,7 @@ suite "Postgres driver - query by pubsub topic":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -443,7 +446,7 @@ suite "Postgres driver - query by cursor":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])
|
||||
|
||||
@ -488,7 +491,7 @@ suite "Postgres driver - query by cursor":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])
|
||||
|
||||
@ -531,7 +534,7 @@ suite "Postgres driver - query by cursor":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])
|
||||
|
||||
@ -575,7 +578,7 @@ suite "Postgres driver - query by cursor":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[6])
|
||||
|
||||
@ -626,7 +629,7 @@ suite "Postgres driver - query by cursor":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[5][0], expected[5][1])
|
||||
|
||||
@ -678,7 +681,7 @@ suite "Postgres driver - query by cursor":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[6][0], expected[6][1])
|
||||
|
||||
@ -726,7 +729,7 @@ suite "Postgres driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -768,7 +771,7 @@ suite "Postgres driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -816,7 +819,7 @@ suite "Postgres driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -861,7 +864,7 @@ suite "Postgres driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -904,7 +907,7 @@ suite "Postgres driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -949,7 +952,7 @@ suite "Postgres driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -994,7 +997,7 @@ suite "Postgres driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[3])
|
||||
|
||||
@ -1042,7 +1045,7 @@ suite "Postgres driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[6])
|
||||
|
||||
@ -1093,7 +1096,7 @@ suite "Postgres driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[1][1])
|
||||
|
||||
@ -1147,7 +1150,7 @@ suite "Postgres driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[7][0], expected[7][1])
|
||||
|
||||
@ -1201,7 +1204,7 @@ suite "Postgres driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[1][0], expected[1][1])
|
||||
|
||||
@ -1256,7 +1259,7 @@ suite "Postgres driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[1][0], expected[1][1])
|
||||
|
||||
@ -1306,7 +1309,7 @@ suite "Postgres driver - retention policy":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
var res = await driver.getOldestMessageTimestamp()
|
||||
assert res.isOk(), res.error
|
||||
@ -1341,7 +1344,7 @@ suite "Postgres driver - retention policy":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
var res = await driver.getMessagesCount()
|
||||
assert res.isOk(), res.error
|
||||
@ -1378,7 +1381,7 @@ suite "Postgres driver - retention policy":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
var res = await driver.getMessagesCount()
|
||||
assert res.isOk(), res.error
|
||||
|
||||
@ -9,6 +9,7 @@ import
|
||||
../../../waku/waku_archive,
|
||||
../../../waku/waku_archive/driver/queue_driver,
|
||||
../../../waku/waku_core,
|
||||
../../../waku/waku_core/message/digest,
|
||||
../testlib/common,
|
||||
../testlib/wakucore
|
||||
|
||||
@ -58,7 +59,7 @@ suite "Queue driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -102,7 +103,7 @@ suite "Queue driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -147,7 +148,7 @@ suite "Queue driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -194,7 +195,7 @@ suite "Queue driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -234,7 +235,7 @@ suite "Queue driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -263,7 +264,7 @@ suite "Queue driver - query by content topic":
|
||||
|
||||
for t in 0..<40:
|
||||
let msg = fakeWakuMessage(@[byte t], DefaultContentTopic, ts=ts(t))
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -312,7 +313,7 @@ suite "SQLite driver - query by pubsub topic":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -359,7 +360,7 @@ suite "SQLite driver - query by pubsub topic":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -406,7 +407,7 @@ suite "SQLite driver - query by pubsub topic":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -456,7 +457,7 @@ suite "Queue driver - query by cursor":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])
|
||||
@ -503,7 +504,7 @@ suite "Queue driver - query by cursor":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])
|
||||
@ -548,7 +549,7 @@ suite "Queue driver - query by cursor":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])
|
||||
@ -594,7 +595,7 @@ suite "Queue driver - query by cursor":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[6])
|
||||
@ -647,7 +648,7 @@ suite "Queue driver - query by cursor":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[5][0], expected[5][1])
|
||||
@ -701,7 +702,7 @@ suite "Queue driver - query by cursor":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[6][0], expected[6][1])
|
||||
@ -752,7 +753,7 @@ suite "Queue driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -796,7 +797,7 @@ suite "Queue driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg),computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -846,7 +847,7 @@ suite "Queue driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -893,7 +894,7 @@ suite "Queue driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -938,7 +939,7 @@ suite "Queue driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -985,7 +986,7 @@ suite "Queue driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -1032,7 +1033,7 @@ suite "Queue driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[3])
|
||||
@ -1082,7 +1083,7 @@ suite "Queue driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[6])
|
||||
@ -1135,7 +1136,7 @@ suite "Queue driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[1][1])
|
||||
@ -1191,7 +1192,7 @@ suite "Queue driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[7][0], expected[7][1])
|
||||
@ -1247,7 +1248,7 @@ suite "Queue driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[1][0], expected[1][1])
|
||||
@ -1304,7 +1305,7 @@ suite "Queue driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[1][0], expected[1][1])
|
||||
|
||||
@ -50,7 +50,7 @@ suite "SQLite driver":
|
||||
let msg = fakeWakuMessage(contentTopic=contentTopic)
|
||||
|
||||
## When
|
||||
let putRes = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let putRes = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)
|
||||
|
||||
## Then
|
||||
check:
|
||||
|
||||
@ -10,6 +10,7 @@ import
|
||||
../../../waku/waku_archive,
|
||||
../../../waku/waku_archive/driver/sqlite_driver,
|
||||
../../../waku/waku_core,
|
||||
../../../waku/waku_core/message/digest,
|
||||
../testlib/common,
|
||||
../testlib/wakucore
|
||||
|
||||
@ -62,7 +63,7 @@ suite "SQLite driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -105,7 +106,7 @@ suite "SQLite driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -149,7 +150,7 @@ suite "SQLite driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -195,7 +196,7 @@ suite "SQLite driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -234,7 +235,7 @@ suite "SQLite driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -262,7 +263,7 @@ suite "SQLite driver - query by content topic":
|
||||
|
||||
for t in 0..<40:
|
||||
let msg = fakeWakuMessage(@[byte t], DefaultContentTopic, ts=ts(t))
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -310,7 +311,7 @@ suite "SQLite driver - query by pubsub topic":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -356,7 +357,7 @@ suite "SQLite driver - query by pubsub topic":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -402,7 +403,7 @@ suite "SQLite driver - query by pubsub topic":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -451,7 +452,7 @@ suite "SQLite driver - query by cursor":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])
|
||||
|
||||
@ -497,7 +498,7 @@ suite "SQLite driver - query by cursor":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])
|
||||
|
||||
@ -541,7 +542,7 @@ suite "SQLite driver - query by cursor":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])
|
||||
|
||||
@ -586,7 +587,7 @@ suite "SQLite driver - query by cursor":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[6])
|
||||
|
||||
@ -638,7 +639,7 @@ suite "SQLite driver - query by cursor":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[5][0], expected[5][1])
|
||||
|
||||
@ -691,7 +692,7 @@ suite "SQLite driver - query by cursor":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[6][0], expected[6][1])
|
||||
|
||||
@ -741,7 +742,7 @@ suite "SQLite driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -784,7 +785,7 @@ suite "SQLite driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -833,7 +834,7 @@ suite "SQLite driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -879,7 +880,7 @@ suite "SQLite driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -923,7 +924,7 @@ suite "SQLite driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -969,7 +970,7 @@ suite "SQLite driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -1015,7 +1016,7 @@ suite "SQLite driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[3])
|
||||
|
||||
@ -1064,7 +1065,7 @@ suite "SQLite driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[6])
|
||||
|
||||
@ -1116,7 +1117,7 @@ suite "SQLite driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[1][1])
|
||||
|
||||
@ -1171,7 +1172,7 @@ suite "SQLite driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[7][0], expected[7][1])
|
||||
|
||||
@ -1226,7 +1227,7 @@ suite "SQLite driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[1][0], expected[1][1])
|
||||
|
||||
@ -1282,7 +1283,7 @@ suite "SQLite driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[1][0], expected[1][1])
|
||||
|
||||
|
||||
@ -8,6 +8,7 @@ import
|
||||
import
|
||||
../../../waku/common/databases/db_sqlite,
|
||||
../../../waku/waku_core,
|
||||
../../../waku/waku_core/message/digest,
|
||||
../../../waku/waku_archive,
|
||||
../../../waku/waku_archive/driver/sqlite_driver,
|
||||
../../../waku/waku_archive/retention_policy,
|
||||
@ -41,7 +42,7 @@ suite "Waku Archive - Retention policy":
|
||||
## When
|
||||
for i in 1..capacity+excess:
|
||||
let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i))
|
||||
putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp))
|
||||
putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp))
|
||||
|
||||
discard waitFor allFinished(putFutures)
|
||||
|
||||
@ -79,11 +80,12 @@ suite "Waku Archive - Retention policy":
|
||||
require (waitFor driver.performVacuum()).isOk()
|
||||
|
||||
## When
|
||||
##
|
||||
|
||||
# create a number of messages so that the size of the DB overshoots
|
||||
for i in 1..excess:
|
||||
let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i))
|
||||
putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp))
|
||||
putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp))
|
||||
|
||||
# waitFor is used to synchronously wait for the futures to complete.
|
||||
discard waitFor allFinished(putFutures)
|
||||
@ -137,7 +139,7 @@ suite "Waku Archive - Retention policy":
|
||||
|
||||
## When
|
||||
for msg in messages:
|
||||
require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
require (waitFor retentionPolicy.execute(driver)).isOk()
|
||||
|
||||
## Then
|
||||
|
||||
@ -9,6 +9,7 @@ import
|
||||
import
|
||||
../../../waku/common/databases/db_sqlite,
|
||||
../../../waku/waku_core,
|
||||
../../../waku/waku_core/message/digest,
|
||||
../../../waku/waku_archive/driver/sqlite_driver,
|
||||
../../../waku/waku_archive,
|
||||
../testlib/common,
|
||||
@ -152,7 +153,7 @@ procSuite "Waku Archive - find messages":
|
||||
archive = newTestWakuArchive(driver)
|
||||
|
||||
for msg in msgListA:
|
||||
require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
archive
|
||||
|
||||
@ -446,7 +447,7 @@ procSuite "Waku Archive - find messages":
|
||||
]
|
||||
|
||||
for msg in msgList:
|
||||
require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()
|
||||
|
||||
## Given
|
||||
let req = ArchiveQuery(contentTopics: @[DefaultContentTopic])
|
||||
|
||||
@ -32,7 +32,7 @@ suite "Waku Message - Deterministic hashing":
|
||||
)
|
||||
|
||||
## When
|
||||
let messageHash = digest(pubsubTopic, message)
|
||||
let messageHash = computeMessageHash(pubsubTopic, message)
|
||||
|
||||
## Then
|
||||
check:
|
||||
@ -64,7 +64,7 @@ suite "Waku Message - Deterministic hashing":
|
||||
)
|
||||
|
||||
## When
|
||||
let messageHash = digest(pubsubTopic, message)
|
||||
let messageHash = computeMessageHash(pubsubTopic, message)
|
||||
|
||||
## Then
|
||||
check:
|
||||
@ -96,7 +96,7 @@ suite "Waku Message - Deterministic hashing":
|
||||
)
|
||||
|
||||
## When
|
||||
let messageHash = digest(pubsubTopic, message)
|
||||
let messageHash = computeMessageHash(pubsubTopic, message)
|
||||
|
||||
## Then
|
||||
check:
|
||||
@ -128,7 +128,7 @@ suite "Waku Message - Deterministic hashing":
|
||||
)
|
||||
|
||||
## When
|
||||
let messageHash = digest(pubsubTopic, message)
|
||||
let messageHash = computeMessageHash(pubsubTopic, message)
|
||||
|
||||
## Then
|
||||
check:
|
||||
|
||||
@ -8,9 +8,11 @@ import
|
||||
libp2p/crypto/crypto
|
||||
import
|
||||
../../waku/common/databases/db_sqlite,
|
||||
../../waku/node/message_store/sqlite_store,
|
||||
../../waku/waku_archive/driver,
|
||||
../../waku/waku_archive/driver/sqlite_driver/sqlite_driver,
|
||||
../../waku/node/peer_manager,
|
||||
../../waku/waku_core,
|
||||
../../waku/waku_core/message/digest,
|
||||
../../waku/waku_store,
|
||||
./testlib/common,
|
||||
./testlib/switch
|
||||
@ -19,12 +21,12 @@ import
|
||||
proc newTestDatabase(): SqliteDatabase =
|
||||
SqliteDatabase.new("memory:").tryGet()
|
||||
|
||||
proc newTestArchiveDriver(): ArchiveDriver =
|
||||
proc newTestArchiveDriver(): ArchiveDriverResult =
|
||||
let database = SqliteDatabase.new(":memory:").tryGet()
|
||||
SqliteDriver.init(database).tryGet()
|
||||
|
||||
|
||||
proc newTestWakuStore(switch: Switch, store=newTestMessageStore()): Future[WakuStore] {.async.} =
|
||||
proc newTestWakuStore(switch: Switch, store: MessageStore = nil): Future[WakuStore] {.async.} =
|
||||
let
|
||||
peerManager = PeerManager.new(switch)
|
||||
proto = WakuStore.init(peerManager, rng, store)
|
||||
@ -58,7 +60,7 @@ procSuite "Waku Store - resume store":
|
||||
]
|
||||
|
||||
for msg in msgList:
|
||||
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
|
||||
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp).isOk()
|
||||
|
||||
store
|
||||
|
||||
@ -76,7 +78,7 @@ procSuite "Waku Store - resume store":
|
||||
]
|
||||
|
||||
for msg in msgList2:
|
||||
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
|
||||
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp).isOk()
|
||||
|
||||
store
|
||||
|
||||
|
||||
@ -15,6 +15,7 @@ import
|
||||
import
|
||||
../../../waku/common/databases/db_sqlite,
|
||||
../../../waku/waku_core,
|
||||
../../../waku/waku_core/message/digest,
|
||||
../../../waku/node/peer_manager,
|
||||
../../../waku/waku_archive,
|
||||
../../../waku/waku_archive/driver/sqlite_driver,
|
||||
@ -58,7 +59,8 @@ procSuite "WakuNode - Store":
|
||||
|
||||
for msg in msgListA:
|
||||
let msg_digest = waku_archive.computeDigest(msg)
|
||||
require (waitFor driver.put(DefaultPubsubTopic, msg, msg_digest, msg.timestamp)).isOk()
|
||||
let msg_hash = computeMessageHash(DefaultPubsubTopic, msg)
|
||||
require (waitFor driver.put(DefaultPubsubTopic, msg, msg_digest, msg_hash, msg.timestamp)).isOk()
|
||||
|
||||
driver
|
||||
|
||||
|
||||
@ -9,6 +9,7 @@ import
|
||||
json_rpc/[rpcserver, rpcclient]
|
||||
import
|
||||
../../../waku/waku_core,
|
||||
../../../waku/waku_core/message/digest,
|
||||
../../../waku/node/peer_manager,
|
||||
../../../waku/waku_node,
|
||||
../../../waku/waku_api/jsonrpc/store/handlers as store_api,
|
||||
@ -25,10 +26,11 @@ import
|
||||
proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Future[Result[void, string]] =
|
||||
let
|
||||
digest = waku_archive.computeDigest(message)
|
||||
msgHash = computeMessageHash(pubsubTopic, message)
|
||||
receivedTime = if message.timestamp > 0: message.timestamp
|
||||
else: getNanosecondTime(getTime().toUnixFloat())
|
||||
|
||||
store.put(pubsubTopic, message, digest, receivedTime)
|
||||
store.put(pubsubTopic, message, digest, msgHash, receivedTime)
|
||||
|
||||
procSuite "Waku v2 JSON-RPC API - Store":
|
||||
|
||||
|
||||
@ -10,6 +10,7 @@ import
|
||||
libp2p/crypto/crypto
|
||||
import
|
||||
../../../waku/waku_core/message,
|
||||
../../../waku/waku_core/message/digest,
|
||||
../../../waku/waku_core/topics,
|
||||
../../../waku/waku_core/time,
|
||||
../../../waku/waku_node,
|
||||
@ -33,10 +34,11 @@ logScope:
|
||||
proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Future[Result[void, string]] =
|
||||
let
|
||||
digest = waku_archive.computeDigest(message)
|
||||
msgHash = computeMessageHash(pubsubTopic, message)
|
||||
receivedTime = if message.timestamp > 0: message.timestamp
|
||||
else: getNanosecondTime(getTime().toUnixFloat())
|
||||
|
||||
store.put(pubsubTopic, message, digest, receivedTime)
|
||||
store.put(pubsubTopic, message, digest, msgHash, receivedTime)
|
||||
|
||||
# Creates a new WakuNode
|
||||
proc testWakuNode(): WakuNode =
|
||||
|
||||
@ -210,7 +210,7 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
|
||||
trace "waku.relay received",
|
||||
peerId=node.peerId,
|
||||
pubsubTopic=topic,
|
||||
hash=topic.digest(msg).to0xHex(),
|
||||
hash=topic.computeMessageHash(msg).to0xHex(),
|
||||
receivedTime=getNowInNanosecondTime(),
|
||||
payloadSizeBytes=msg.payload.len
|
||||
|
||||
@ -339,7 +339,7 @@ proc publish*(
|
||||
trace "waku.relay published",
|
||||
peerId=node.peerId,
|
||||
pubsubTopic=pubsubTopic,
|
||||
hash=pubsubTopic.digest(message).to0xHex(),
|
||||
hash=pubsubTopic.computeMessageHash(message).to0xHex(),
|
||||
publishTime=getNowInNanosecondTime()
|
||||
|
||||
proc startRelay*(node: WakuNode) {.async.} =
|
||||
|
||||
@ -18,6 +18,7 @@ import
|
||||
./retention_policy/retention_policy_capacity,
|
||||
./retention_policy/retention_policy_time,
|
||||
../waku_core,
|
||||
../waku_core/message/digest,
|
||||
./common,
|
||||
./archive_metrics
|
||||
|
||||
@ -101,12 +102,13 @@ proc handleMessage*(w: WakuArchive,
|
||||
block:
|
||||
let
|
||||
msgDigest = computeDigest(msg)
|
||||
msgHash = computeMessageHash(pubsubTopic, msg)
|
||||
msgReceivedTime = if msg.timestamp > 0: msg.timestamp
|
||||
else: getNanosecondTime(getTime().toUnixFloat())
|
||||
|
||||
trace "handling message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic, timestamp=msg.timestamp, digest=msgDigest
|
||||
trace "handling message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic, timestamp=msg.timestamp, digest=msgDigest, messageHash=msgHash
|
||||
|
||||
let putRes = await w.driver.put(pubsubTopic, msg, msgDigest, msgReceivedTime)
|
||||
let putRes = await w.driver.put(pubsubTopic, msg, msgDigest, msgHash, msgReceivedTime)
|
||||
if putRes.isErr():
|
||||
error "failed to insert message", err=putRes.error
|
||||
waku_archive_errors.inc(labelValues = [insertFailure])
|
||||
|
||||
@ -26,6 +26,7 @@ method put*(driver: ArchiveDriver,
|
||||
pubsubTopic: PubsubTopic,
|
||||
message: WakuMessage,
|
||||
digest: MessageDigest,
|
||||
messageHash: WakuMessageHash,
|
||||
receivedTime: Timestamp):
|
||||
Future[ArchiveDriverResult[void]] {.base, async.} = discard
|
||||
|
||||
|
||||
@ -35,15 +35,16 @@ proc createTableQuery(): string =
|
||||
" version INTEGER NOT NULL," &
|
||||
" timestamp BIGINT NOT NULL," &
|
||||
" id VARCHAR NOT NULL," &
|
||||
" messageHash VARCHAR NOT NULL," &
|
||||
" storedAt BIGINT NOT NULL," &
|
||||
" CONSTRAINT messageIndex PRIMARY KEY (storedAt, id, pubsubTopic)" &
|
||||
" CONSTRAINT messageIndex PRIMARY KEY (messageHash)" &
|
||||
");"
|
||||
|
||||
const InsertRowStmtName = "InsertRow"
|
||||
const InsertRowStmtDefinition =
|
||||
# TODO: get the sql queries from a file
|
||||
"""INSERT INTO messages (id, storedAt, contentTopic, payload, pubsubTopic,
|
||||
version, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7);"""
|
||||
"""INSERT INTO messages (id, messageHash, storedAt, contentTopic, payload, pubsubTopic,
|
||||
version, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7, $8);"""
|
||||
|
||||
const SelectNoCursorAscStmtName = "SelectWithoutCursorAsc"
|
||||
const SelectNoCursorAscStmtDef =
|
||||
@ -186,10 +187,12 @@ method put*(s: PostgresDriver,
|
||||
pubsubTopic: PubsubTopic,
|
||||
message: WakuMessage,
|
||||
digest: MessageDigest,
|
||||
messageHash: WakuMessageHash,
|
||||
receivedTime: Timestamp):
|
||||
Future[ArchiveDriverResult[void]] {.async.} =
|
||||
|
||||
let digest = toHex(digest.data)
|
||||
let messageHash = toHex(messageHash)
|
||||
let rxTime = $receivedTime
|
||||
let contentTopic = message.contentTopic
|
||||
let payload = toHex(message.payload)
|
||||
@ -199,6 +202,7 @@ method put*(s: PostgresDriver,
|
||||
return await s.writeConnPool.runStmt(InsertRowStmtName,
|
||||
InsertRowStmtDefinition,
|
||||
@[digest,
|
||||
messageHash,
|
||||
rxTime,
|
||||
contentTopic,
|
||||
payload,
|
||||
@ -206,6 +210,7 @@ method put*(s: PostgresDriver,
|
||||
version,
|
||||
timestamp],
|
||||
@[int32(digest.len),
|
||||
int32(messageHash.len),
|
||||
int32(rxTime.len),
|
||||
int32(contentTopic.len),
|
||||
int32(payload.len),
|
||||
@ -213,7 +218,7 @@ method put*(s: PostgresDriver,
|
||||
int32(version.len),
|
||||
int32(timestamp.len)],
|
||||
@[int32(0), int32(0), int32(0), int32(0),
|
||||
int32(0), int32(0), int32(0)])
|
||||
int32(0), int32(0), int32(0), int32(0)])
|
||||
|
||||
method getAllMessages*(s: PostgresDriver):
|
||||
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
|
||||
|
||||
@ -228,6 +228,7 @@ method put*(driver: QueueDriver,
|
||||
pubsubTopic: PubsubTopic,
|
||||
message: WakuMessage,
|
||||
digest: MessageDigest,
|
||||
messageHash: WakuMessageHash,
|
||||
receivedTime: Timestamp):
|
||||
Future[ArchiveDriverResult[void]] {.async.} =
|
||||
let index = Index(pubsubTopic: pubsubTopic, senderTime: message.timestamp, receiverTime: receivedTime, digest: digest)
|
||||
|
||||
@ -14,7 +14,7 @@ logScope:
|
||||
topics = "waku archive migration"
|
||||
|
||||
|
||||
const SchemaVersion* = 7 # increase this when there is an update in the database schema
|
||||
const SchemaVersion* = 8 # increase this when there is an update in the database schema
|
||||
|
||||
template projectRoot: string = currentSourcePath.rsplit(DirSep, 1)[0] / ".." / ".." / ".." / ".."
|
||||
const MessageStoreMigrationPath: string = projectRoot / "migrations" / "message_store"
|
||||
|
||||
@ -71,8 +71,9 @@ proc createTableQuery(table: string): SqlQueryStr =
|
||||
" version INTEGER NOT NULL," &
|
||||
" timestamp INTEGER NOT NULL," &
|
||||
" id BLOB," &
|
||||
" messageHash BLOB," &
|
||||
" storedAt INTEGER NOT NULL," &
|
||||
" CONSTRAINT messageIndex PRIMARY KEY (storedAt, id, pubsubTopic)" &
|
||||
" CONSTRAINT messageIndex PRIMARY KEY (messageHash)" &
|
||||
") WITHOUT ROWID;"
|
||||
|
||||
proc createTable*(db: SqliteDatabase): DatabaseResult[void] =
|
||||
@ -102,17 +103,16 @@ proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] =
|
||||
|
||||
|
||||
## Insert message
|
||||
type InsertMessageParams* = (seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp)
|
||||
type InsertMessageParams* = (seq[byte], seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp)
|
||||
|
||||
proc insertMessageQuery(table: string): SqlQueryStr =
|
||||
"INSERT INTO " & table & "(id, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" &
|
||||
" VALUES (?, ?, ?, ?, ?, ?, ?);"
|
||||
"INSERT INTO " & table & "(id, messageHash, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" &
|
||||
" VALUES (?, ?, ?, ?, ?, ?, ?, ?);"
|
||||
|
||||
proc prepareInsertMessageStmt*(db: SqliteDatabase): SqliteStmt[InsertMessageParams, void] =
|
||||
let query = insertMessageQuery(DbTable)
|
||||
db.prepareStmt(query, InsertMessageParams, void).expect("this is a valid statement")
|
||||
|
||||
|
||||
## Count table messages
|
||||
|
||||
proc countMessagesQuery(table: string): SqlQueryStr =
|
||||
|
||||
@ -13,6 +13,7 @@ import
|
||||
import
|
||||
../../../common/databases/db_sqlite,
|
||||
../../../waku_core,
|
||||
../../../waku_core/message/digest,
|
||||
../../common,
|
||||
../../driver,
|
||||
./cursor,
|
||||
@ -61,11 +62,13 @@ method put*(s: SqliteDriver,
|
||||
pubsubTopic: PubsubTopic,
|
||||
message: WakuMessage,
|
||||
digest: MessageDigest,
|
||||
messageHash: WakuMessageHash,
|
||||
receivedTime: Timestamp):
|
||||
Future[ArchiveDriverResult[void]] {.async.} =
|
||||
## Inserts a message into the store
|
||||
let res = s.insertStmt.exec((
|
||||
@(digest.data), # id
|
||||
@(messageHash), # messageHash
|
||||
receivedTime, # storedAt
|
||||
toBytes(message.contentTopic), # contentTopic
|
||||
message.payload, # payload
|
||||
|
||||
@ -17,17 +17,16 @@ import
|
||||
## 14/WAKU2-MESSAGE: Deterministic message hashing
|
||||
## https://rfc.vac.dev/spec/14/#deterministic-message-hashing
|
||||
|
||||
type WakuMessageDigest* = array[32, byte]
|
||||
type WakuMessageHash* = array[32, byte]
|
||||
|
||||
|
||||
converter toBytesArray*(digest: MDigest[256]): WakuMessageDigest =
|
||||
converter toBytesArray*(digest: MDigest[256]): WakuMessageHash =
|
||||
digest.data
|
||||
|
||||
converter toBytes*(digest: MDigest[256]): seq[byte] =
|
||||
toSeq(digest.data)
|
||||
|
||||
|
||||
proc digest*(pubsubTopic: PubsubTopic, msg: WakuMessage): WakuMessageDigest =
|
||||
proc computeMessageHash*(pubsubTopic: PubsubTopic, msg: WakuMessage): WakuMessageHash =
|
||||
var ctx: sha256
|
||||
ctx.init()
|
||||
defer: ctx.clear()
|
||||
|
||||
@ -163,7 +163,7 @@ proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} =
|
||||
await conn.get().writeLp(buffer)
|
||||
|
||||
proc pushToPeers(wf: WakuFilter, peers: seq[PeerId], messagePush: MessagePush) {.async.} =
|
||||
debug "pushing message to subscribed peers", pubsubTopic=messagePush.pubsubTopic, contentTopic=messagePush.wakuMessage.contentTopic, peers=peers, hash=messagePush.pubsubTopic.digest(messagePush.wakuMessage).to0xHex()
|
||||
debug "pushing message to subscribed peers", pubsubTopic=messagePush.pubsubTopic, contentTopic=messagePush.wakuMessage.contentTopic, peers=peers, hash=messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex()
|
||||
|
||||
let bufferToPublish = messagePush.encode().buffer
|
||||
|
||||
@ -210,10 +210,10 @@ proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessa
|
||||
wakuMessage: message)
|
||||
|
||||
if not await wf.pushToPeers(subscribedPeers, messagePush).withTimeout(MessagePushTimeout):
|
||||
debug "timed out pushing message to peers", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic, hash=pubsubTopic.digest(message).to0xHex()
|
||||
debug "timed out pushing message to peers", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic, hash=pubsubTopic.computeMessageHash(message).to0xHex()
|
||||
waku_filter_errors.inc(labelValues = [pushTimeoutFailure])
|
||||
else:
|
||||
debug "pushed message succesfully to all subscribers", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic, hash=pubsubTopic.digest(message).to0xHex()
|
||||
debug "pushed message succesfully to all subscribers", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic, hash=pubsubTopic.computeMessageHash(message).to0xHex()
|
||||
|
||||
|
||||
let
|
||||
|
||||
@ -21,6 +21,7 @@ import
|
||||
when defined(waku_exp_store_resume):
|
||||
import std/[sequtils, times]
|
||||
import ../waku_archive
|
||||
import ../waku_core/message/digest
|
||||
|
||||
|
||||
logScope:
|
||||
@ -154,10 +155,11 @@ when defined(waku_exp_store_resume):
|
||||
proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Result[void, string] =
|
||||
let
|
||||
digest = waku_archive.computeDigest(message)
|
||||
messageHash = computeMessageHash(pubsubTopic, message)
|
||||
receivedTime = if message.timestamp > 0: message.timestamp
|
||||
else: getNanosecondTime(getTime().toUnixFloat())
|
||||
|
||||
store.put(pubsubTopic, message, digest, receivedTime)
|
||||
store.put(pubsubTopic, message, digest, messageHash, receivedTime)
|
||||
|
||||
proc resume*(w: WakuStoreClient,
|
||||
peerList = none(seq[RemotePeerInfo]),
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user