mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-27 07:06:42 +00:00
feat: amending computeDigest func. + related test cases (#2132)
* feat: amending computeDigest func. + related test cases * minor fixes * minor fixes v1: testcase saga continues --------- Co-authored-by: Vaclav Pavlin <vaclav@status.im>
This commit is contained in:
parent
459331e3a6
commit
1669f710ce
@ -19,7 +19,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic,
|
||||
pubsubTopic: pubsubTopic,
|
||||
senderTime: message.timestamp,
|
||||
storeTime: message.timestamp,
|
||||
digest: computeDigest(message)
|
||||
digest: computeDigest(message, pubsubTopic)
|
||||
)
|
||||
|
||||
suite "Postgres driver":
|
||||
@ -78,7 +78,7 @@ suite "Postgres driver":
|
||||
|
||||
let msg = fakeWakuMessage(contentTopic=contentTopic)
|
||||
|
||||
let computedDigest = computeDigest(msg)
|
||||
let computedDigest = computeDigest(msg, DefaultPubsubTopic)
|
||||
|
||||
let putRes = await driver.put(DefaultPubsubTopic, msg, computedDigest, msg.timestamp)
|
||||
assert putRes.isOk(), putRes.error
|
||||
@ -113,12 +113,12 @@ suite "Postgres driver":
|
||||
|
||||
let msg1 = fakeWakuMessage(contentTopic=contentTopic1)
|
||||
|
||||
var putRes = await driver.put(pubsubTopic1, msg1, computeDigest(msg1), msg1.timestamp)
|
||||
var putRes = await driver.put(pubsubTopic1, msg1, computeDigest(msg1,pubsubTopic1), msg1.timestamp)
|
||||
assert putRes.isOk(), putRes.error
|
||||
|
||||
let msg2 = fakeWakuMessage(contentTopic=contentTopic2)
|
||||
|
||||
putRes = await driver.put(pubsubTopic2, msg2, computeDigest(msg2), msg2.timestamp)
|
||||
putRes = await driver.put(pubsubTopic2, msg2, computeDigest(msg2, pubsubTopic2), msg2.timestamp)
|
||||
assert putRes.isOk(), putRes.error
|
||||
|
||||
let countMessagesRes = await driver.getMessagesCount()
|
||||
@ -197,9 +197,9 @@ suite "Postgres driver":
|
||||
let msg2 = fakeWakuMessage(ts = now)
|
||||
|
||||
var putRes = await driver.put(DefaultPubsubTopic,
|
||||
msg1, computeDigest(msg1), msg1.timestamp)
|
||||
msg1, computeDigest(msg1, DefaultPubsubTopic), msg1.timestamp)
|
||||
assert putRes.isOk(), putRes.error
|
||||
|
||||
putRes = await driver.put(DefaultPubsubTopic,
|
||||
msg2, computeDigest(msg2), msg2.timestamp)
|
||||
msg2, computeDigest(msg2, DefaultPubsubTopic), msg2.timestamp)
|
||||
require not putRes.isOk()
|
||||
|
@ -37,7 +37,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC
|
||||
pubsubTopic: pubsubTopic,
|
||||
senderTime: message.timestamp,
|
||||
storeTime: message.timestamp,
|
||||
digest: computeDigest(message)
|
||||
digest: computeDigest(message, pubsubTopic)
|
||||
)
|
||||
|
||||
suite "Postgres driver - query by content topic":
|
||||
@ -65,7 +65,7 @@ suite "Postgres driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -107,7 +107,7 @@ suite "Postgres driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -150,7 +150,7 @@ suite "Postgres driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -195,7 +195,7 @@ suite "Postgres driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -233,7 +233,7 @@ suite "Postgres driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -260,7 +260,7 @@ suite "Postgres driver - query by content topic":
|
||||
|
||||
for t in 0..<40:
|
||||
let msg = fakeWakuMessage(@[byte t], DefaultContentTopic, ts=ts(t))
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -306,7 +306,7 @@ suite "Postgres driver - query by pubsub topic":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -351,7 +351,7 @@ suite "Postgres driver - query by pubsub topic":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -396,7 +396,7 @@ suite "Postgres driver - query by pubsub topic":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -443,7 +443,7 @@ suite "Postgres driver - query by cursor":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])
|
||||
|
||||
@ -488,7 +488,7 @@ suite "Postgres driver - query by cursor":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])
|
||||
|
||||
@ -531,7 +531,7 @@ suite "Postgres driver - query by cursor":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])
|
||||
|
||||
@ -575,7 +575,7 @@ suite "Postgres driver - query by cursor":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[6])
|
||||
|
||||
@ -626,7 +626,7 @@ suite "Postgres driver - query by cursor":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[5][0], expected[5][1])
|
||||
|
||||
@ -678,7 +678,7 @@ suite "Postgres driver - query by cursor":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[6][0], expected[6][1])
|
||||
|
||||
@ -726,7 +726,7 @@ suite "Postgres driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -768,7 +768,7 @@ suite "Postgres driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -816,7 +816,7 @@ suite "Postgres driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -861,7 +861,7 @@ suite "Postgres driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -904,7 +904,7 @@ suite "Postgres driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -949,7 +949,7 @@ suite "Postgres driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -994,7 +994,7 @@ suite "Postgres driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[3])
|
||||
|
||||
@ -1042,7 +1042,7 @@ suite "Postgres driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[6])
|
||||
|
||||
@ -1093,7 +1093,7 @@ suite "Postgres driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[1][1])
|
||||
|
||||
@ -1147,7 +1147,7 @@ suite "Postgres driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[7][0], expected[7][1])
|
||||
|
||||
@ -1201,7 +1201,7 @@ suite "Postgres driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[1][0], expected[1][1])
|
||||
|
||||
@ -1256,7 +1256,7 @@ suite "Postgres driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[1][0], expected[1][1])
|
||||
|
||||
@ -1306,7 +1306,7 @@ suite "Postgres driver - retention policy":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
var res = await driver.getOldestMessageTimestamp()
|
||||
assert res.isOk(), res.error
|
||||
@ -1341,7 +1341,7 @@ suite "Postgres driver - retention policy":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
var res = await driver.getMessagesCount()
|
||||
assert res.isOk(), res.error
|
||||
@ -1378,7 +1378,7 @@ suite "Postgres driver - retention policy":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
var res = await driver.getMessagesCount()
|
||||
assert res.isOk(), res.error
|
||||
|
@ -156,7 +156,7 @@ procSuite "Queue driver - pagination":
|
||||
pubsubTopic: DefaultPubsubTopic,
|
||||
senderTime: msg.timestamp,
|
||||
storeTime: msg.timestamp,
|
||||
digest: computeDigest(msg)
|
||||
digest: computeDigest(msg, DefaultPubsubTopic)
|
||||
).toIndex()
|
||||
|
||||
let
|
||||
@ -337,7 +337,7 @@ procSuite "Queue driver - pagination":
|
||||
pubsubTopic: DefaultPubsubTopic,
|
||||
senderTime: msg.timestamp,
|
||||
storeTime: msg.timestamp,
|
||||
digest: computeDigest(msg)
|
||||
digest: computeDigest(msg, DefaultPubsubTopic)
|
||||
).toIndex()
|
||||
|
||||
let
|
||||
|
@ -29,7 +29,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC
|
||||
pubsubTopic: pubsubTopic,
|
||||
senderTime: message.timestamp,
|
||||
storeTime: message.timestamp,
|
||||
digest: computeDigest(message)
|
||||
digest: computeDigest(message, pubsubTopic)
|
||||
)
|
||||
|
||||
|
||||
@ -58,7 +58,7 @@ suite "Queue driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -102,7 +102,7 @@ suite "Queue driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -147,7 +147,7 @@ suite "Queue driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -194,7 +194,7 @@ suite "Queue driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -234,7 +234,7 @@ suite "Queue driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -263,7 +263,7 @@ suite "Queue driver - query by content topic":
|
||||
|
||||
for t in 0..<40:
|
||||
let msg = fakeWakuMessage(@[byte t], DefaultContentTopic, ts=ts(t))
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -312,7 +312,7 @@ suite "SQLite driver - query by pubsub topic":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -359,7 +359,7 @@ suite "SQLite driver - query by pubsub topic":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -406,7 +406,7 @@ suite "SQLite driver - query by pubsub topic":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -456,7 +456,7 @@ suite "Queue driver - query by cursor":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])
|
||||
@ -503,7 +503,7 @@ suite "Queue driver - query by cursor":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])
|
||||
@ -548,7 +548,7 @@ suite "Queue driver - query by cursor":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])
|
||||
@ -594,7 +594,7 @@ suite "Queue driver - query by cursor":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[6])
|
||||
@ -647,7 +647,7 @@ suite "Queue driver - query by cursor":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[5][0], expected[5][1])
|
||||
@ -701,7 +701,7 @@ suite "Queue driver - query by cursor":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[6][0], expected[6][1])
|
||||
@ -752,7 +752,7 @@ suite "Queue driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -796,7 +796,7 @@ suite "Queue driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -846,7 +846,7 @@ suite "Queue driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -893,7 +893,7 @@ suite "Queue driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -938,7 +938,7 @@ suite "Queue driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -985,7 +985,7 @@ suite "Queue driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
## When
|
||||
@ -1032,7 +1032,7 @@ suite "Queue driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[3])
|
||||
@ -1082,7 +1082,7 @@ suite "Queue driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[6])
|
||||
@ -1135,7 +1135,7 @@ suite "Queue driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[1][1])
|
||||
@ -1191,7 +1191,7 @@ suite "Queue driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[7][0], expected[7][1])
|
||||
@ -1247,7 +1247,7 @@ suite "Queue driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[1][0], expected[1][1])
|
||||
@ -1304,7 +1304,7 @@ suite "Queue driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
|
||||
let retFut = waitFor driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)
|
||||
require retFut.isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[1][0], expected[1][1])
|
||||
|
@ -50,7 +50,7 @@ suite "SQLite driver":
|
||||
let msg = fakeWakuMessage(contentTopic=contentTopic)
|
||||
|
||||
## When
|
||||
let putRes = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
|
||||
let putRes = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)
|
||||
|
||||
## Then
|
||||
check:
|
||||
|
@ -33,7 +33,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC
|
||||
pubsubTopic: pubsubTopic,
|
||||
senderTime: message.timestamp,
|
||||
storeTime: message.timestamp,
|
||||
digest: computeDigest(message)
|
||||
digest: computeDigest(message, pubsubTopic)
|
||||
)
|
||||
|
||||
|
||||
@ -62,7 +62,7 @@ suite "SQLite driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -105,7 +105,7 @@ suite "SQLite driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -149,7 +149,7 @@ suite "SQLite driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -195,7 +195,7 @@ suite "SQLite driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -234,7 +234,7 @@ suite "SQLite driver - query by content topic":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -262,7 +262,7 @@ suite "SQLite driver - query by content topic":
|
||||
|
||||
for t in 0..<40:
|
||||
let msg = fakeWakuMessage(@[byte t], DefaultContentTopic, ts=ts(t))
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -310,7 +310,7 @@ suite "SQLite driver - query by pubsub topic":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -356,7 +356,7 @@ suite "SQLite driver - query by pubsub topic":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -402,7 +402,7 @@ suite "SQLite driver - query by pubsub topic":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -451,7 +451,7 @@ suite "SQLite driver - query by cursor":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])
|
||||
|
||||
@ -497,7 +497,7 @@ suite "SQLite driver - query by cursor":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])
|
||||
|
||||
@ -541,7 +541,7 @@ suite "SQLite driver - query by cursor":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])
|
||||
|
||||
@ -586,7 +586,7 @@ suite "SQLite driver - query by cursor":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[6])
|
||||
|
||||
@ -638,7 +638,7 @@ suite "SQLite driver - query by cursor":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[5][0], expected[5][1])
|
||||
|
||||
@ -691,7 +691,7 @@ suite "SQLite driver - query by cursor":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[6][0], expected[6][1])
|
||||
|
||||
@ -741,7 +741,7 @@ suite "SQLite driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -784,7 +784,7 @@ suite "SQLite driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -833,7 +833,7 @@ suite "SQLite driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -879,7 +879,7 @@ suite "SQLite driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -923,7 +923,7 @@ suite "SQLite driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -969,7 +969,7 @@ suite "SQLite driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
## When
|
||||
let res = await driver.getMessages(
|
||||
@ -1015,7 +1015,7 @@ suite "SQLite driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[3])
|
||||
|
||||
@ -1064,7 +1064,7 @@ suite "SQLite driver - query by time range":
|
||||
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
|
||||
|
||||
for msg in messages:
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[6])
|
||||
|
||||
@ -1116,7 +1116,7 @@ suite "SQLite driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(DefaultPubsubTopic, expected[1][1])
|
||||
|
||||
@ -1171,7 +1171,7 @@ suite "SQLite driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[7][0], expected[7][1])
|
||||
|
||||
@ -1226,7 +1226,7 @@ suite "SQLite driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[1][0], expected[1][1])
|
||||
|
||||
@ -1282,7 +1282,7 @@ suite "SQLite driver - query by time range":
|
||||
|
||||
for row in messages:
|
||||
let (topic, msg) = row
|
||||
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (await driver.put(topic, msg, computeDigest(msg, topic), msg.timestamp)).isOk()
|
||||
|
||||
let cursor = computeTestCursor(expected[1][0], expected[1][1])
|
||||
|
||||
|
@ -41,7 +41,7 @@ suite "Waku Archive - Retention policy":
|
||||
## When
|
||||
for i in 1..capacity+excess:
|
||||
let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i))
|
||||
putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp))
|
||||
putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp))
|
||||
|
||||
discard waitFor allFinished(putFutures)
|
||||
|
||||
@ -88,7 +88,7 @@ suite "Waku Archive - Retention policy":
|
||||
# create a number of messages so that the size of the DB overshoots
|
||||
for i in 1..excess:
|
||||
let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i))
|
||||
putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp))
|
||||
putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp))
|
||||
|
||||
# waitFor is used to synchronously wait for the futures to complete.
|
||||
discard waitFor allFinished(putFutures)
|
||||
@ -139,7 +139,7 @@ suite "Waku Archive - Retention policy":
|
||||
|
||||
## When
|
||||
for msg in messages:
|
||||
require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
require (waitFor retentionPolicy.execute(driver)).isOk()
|
||||
|
||||
## Then
|
||||
|
@ -30,7 +30,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC
|
||||
pubsubTopic: pubsubTopic,
|
||||
senderTime: message.timestamp,
|
||||
storeTime: message.timestamp,
|
||||
digest: computeDigest(message)
|
||||
digest: computeDigest(message, pubsubTopic)
|
||||
)
|
||||
|
||||
|
||||
@ -152,7 +152,7 @@ procSuite "Waku Archive - find messages":
|
||||
archive = newTestWakuArchive(driver)
|
||||
|
||||
for msg in msgListA:
|
||||
require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
archive
|
||||
|
||||
@ -446,7 +446,7 @@ procSuite "Waku Archive - find messages":
|
||||
]
|
||||
|
||||
for msg in msgList:
|
||||
require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
|
||||
require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp)).isOk()
|
||||
|
||||
## Given
|
||||
let req = ArchiveQuery(contentTopics: @[DefaultContentTopic])
|
||||
|
@ -34,7 +34,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): HistoryC
|
||||
pubsubTopic: pubsubTopic,
|
||||
senderTime: message.timestamp,
|
||||
storeTime: message.timestamp,
|
||||
digest: waku_archive.computeDigest(message)
|
||||
digest: waku_archive.computeDigest(message, pubsubTopic)
|
||||
)
|
||||
|
||||
procSuite "WakuNode - Store":
|
||||
@ -57,7 +57,7 @@ procSuite "WakuNode - Store":
|
||||
let driver = newTestArchiveDriver()
|
||||
|
||||
for msg in msgListA:
|
||||
let msg_digest = waku_archive.computeDigest(msg)
|
||||
let msg_digest = waku_archive.computeDigest(msg, DefaultPubsubTopic)
|
||||
require (waitFor driver.put(DefaultPubsubTopic, msg, msg_digest, msg.timestamp)).isOk()
|
||||
|
||||
driver
|
||||
|
@ -24,7 +24,7 @@ import
|
||||
|
||||
proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Future[Result[void, string]] =
|
||||
let
|
||||
digest = waku_archive.computeDigest(message)
|
||||
digest = waku_archive.computeDigest(message, pubsubTopic)
|
||||
receivedTime = if message.timestamp > 0: message.timestamp
|
||||
else: getNanosecondTime(getTime().toUnixFloat())
|
||||
|
||||
|
@ -32,7 +32,7 @@ logScope:
|
||||
|
||||
proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Future[Result[void, string]] =
|
||||
let
|
||||
digest = waku_archive.computeDigest(message)
|
||||
digest = waku_archive.computeDigest(message, pubsubTopic)
|
||||
receivedTime = if message.timestamp > 0: message.timestamp
|
||||
else: getNanosecondTime(getTime().toUnixFloat())
|
||||
|
||||
|
@ -100,7 +100,7 @@ proc handleMessage*(w: WakuArchive,
|
||||
|
||||
block:
|
||||
let
|
||||
msgDigest = computeDigest(msg)
|
||||
msgDigest = computeDigest(msg, pubsubTopic)
|
||||
msgReceivedTime = if msg.timestamp > 0: msg.timestamp
|
||||
else: getNanosecondTime(getTime().toUnixFloat())
|
||||
|
||||
|
@ -18,13 +18,15 @@ import
|
||||
|
||||
type MessageDigest* = MDigest[256]
|
||||
|
||||
proc computeDigest*(msg: WakuMessage): MessageDigest =
|
||||
proc computeDigest*(msg: WakuMessage, pubSubTopic: string): MessageDigest =
|
||||
var ctx: sha256
|
||||
ctx.init()
|
||||
defer: ctx.clear()
|
||||
|
||||
ctx.update(msg.contentTopic.toBytes())
|
||||
ctx.update(pubSubTopic.toBytes())
|
||||
ctx.update(msg.payload)
|
||||
ctx.update(msg.contentTopic.toBytes())
|
||||
ctx.update(msg.meta)
|
||||
|
||||
# Computes the hash
|
||||
return ctx.finish()
|
||||
|
@ -21,7 +21,7 @@ type Index* = object
|
||||
proc compute*(T: type Index, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: PubsubTopic): T =
|
||||
## Takes a WakuMessage with received timestamp and returns its Index.
|
||||
let
|
||||
digest = computeDigest(msg)
|
||||
digest = computeDigest(msg, pubsubTopic)
|
||||
senderTime = msg.timestamp
|
||||
|
||||
Index(
|
||||
|
@ -153,7 +153,7 @@ when defined(waku_exp_store_resume):
|
||||
|
||||
proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Result[void, string] =
|
||||
let
|
||||
digest = waku_archive.computeDigest(message)
|
||||
digest = waku_archive.computeDigest(message, pubsubTopic)
|
||||
receivedTime = if message.timestamp > 0: message.timestamp
|
||||
else: getNanosecondTime(getTime().toUnixFloat())
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user