feat(postgresql): 1st commit to async sql (waku_archive/driver...) (#1755)

This commit is contained in:
Ivan Folgueira Bande 2023-05-25 17:34:34 +02:00 committed by GitHub
parent 3c2d2891e5
commit 59ca03a875
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 515 additions and 475 deletions

View File

@ -552,7 +552,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
await node.mountFilterClient()
node.peerManager.addServicePeer(peerInfo.value, WakuFilterCodec)
proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.gcsafe.} =
proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} =
trace "Hit filter handler", contentTopic=msg.contentTopic
chat.printReceivedMessage(msg)

View File

@ -54,7 +54,7 @@ suite "Waku Filter":
let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo()
let pushHandlerFuture = newFuture[(string, WakuMessage)]()
proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} =
proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.async, gcsafe, closure.} =
pushHandlerFuture.complete((pubsubTopic, message))
let
@ -97,7 +97,7 @@ suite "Waku Filter":
let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo()
var pushHandlerFuture = newFuture[void]()
proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} =
proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.async, gcsafe, closure.} =
pushHandlerFuture.complete()
let
@ -149,7 +149,7 @@ suite "Waku Filter":
let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo()
var pushHandlerFuture = newFuture[void]()
proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} =
proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.async, gcsafe, closure.} =
pushHandlerFuture.complete()
let
@ -214,7 +214,7 @@ suite "Waku Filter":
let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo()
var pushHandlerFuture = newFuture[void]()
proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} =
proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.async, gcsafe, closure.} =
pushHandlerFuture.complete()
let

View File

@ -25,10 +25,10 @@ suite "WakuNode - Filter":
clientKey = generateSecp256k1Key()
client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0))
await allFutures(server.start(), client.start())
waitFor allFutures(server.start(), client.start())
await server.mountFilter()
await client.mountFilterClient()
waitFor server.mountFilter()
waitFor client.mountFilterClient()
## Given
let serverPeerInfo = server.peerInfo.toRemotePeerInfo()
@ -39,18 +39,18 @@ suite "WakuNode - Filter":
message = fakeWakuMessage(contentTopic=contentTopic)
var filterPushHandlerFut = newFuture[(PubsubTopic, WakuMessage)]()
proc filterPushHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.gcsafe, closure.} =
proc filterPushHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} =
filterPushHandlerFut.complete((pubsubTopic, msg))
## When
await client.filterSubscribe(pubsubTopic, contentTopic, filterPushHandler, peer=serverPeerInfo)
# Wait for subscription to take effect
await sleepAsync(100.millis)
waitFor sleepAsync(100.millis)
await server.filterHandleMessage(pubSubTopic, message)
waitFor server.filterHandleMessage(pubSubTopic, message)
require await filterPushHandlerFut.withTimeout(5.seconds)
require waitFor filterPushHandlerFut.withTimeout(5.seconds)
## Then
check filterPushHandlerFut.completed()
@ -60,4 +60,4 @@ suite "WakuNode - Filter":
filterMessage == message
## Cleanup
await allFutures(client.stop(), server.stop())
waitFor allFutures(client.stop(), server.stop())

View File

@ -58,10 +58,11 @@ suite "Queue driver - query by content topic":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
require retFut.isOk()
## When
let res = driver.getMessages(
let res = waitFor driver.getMessages(
maxPageSize=5,
ascendingOrder=true
)
@ -75,7 +76,7 @@ suite "Queue driver - query by content topic":
filteredMessages == expected[0..4]
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
test "single content topic":
## Given
@ -101,10 +102,11 @@ suite "Queue driver - query by content topic":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
require retFut.isOk()
## When
let res = driver.getMessages(
let res = waitFor driver.getMessages(
contentTopic= @[contentTopic],
maxPageSize=2,
ascendingOrder=true
@ -119,7 +121,7 @@ suite "Queue driver - query by content topic":
filteredMessages == expected[2..3]
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
test "single content topic - descending order":
## Given
@ -145,10 +147,11 @@ suite "Queue driver - query by content topic":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
require retFut.isOk()
## When
let res = driver.getMessages(
let res = waitFor driver.getMessages(
contentTopic= @[contentTopic],
maxPageSize=2,
ascendingOrder=false
@ -163,7 +166,7 @@ suite "Queue driver - query by content topic":
filteredMessages == expected[6..7].reversed()
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
test "multiple content topic":
## Given
@ -191,10 +194,11 @@ suite "Queue driver - query by content topic":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
require retFut.isOk()
## When
let res = driver.getMessages(
let res = waitFor driver.getMessages(
contentTopic= @[contentTopic1, contentTopic2],
maxPageSize=2,
ascendingOrder=true
@ -209,7 +213,7 @@ suite "Queue driver - query by content topic":
filteredMessages == expected[2..3]
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
test "single content topic - no results":
## Given
@ -230,10 +234,11 @@ suite "Queue driver - query by content topic":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
require retFut.isOk()
## When
let res = driver.getMessages(
let res = waitFor driver.getMessages(
contentTopic= @[contentTopic],
maxPageSize=2,
ascendingOrder=true
@ -248,7 +253,7 @@ suite "Queue driver - query by content topic":
filteredMessages.len == 0
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
test "content topic and max page size - not enough messages stored":
## Given
@ -258,10 +263,11 @@ suite "Queue driver - query by content topic":
for t in 0..<40:
let msg = fakeWakuMessage(@[byte t], DefaultContentTopic, ts=ts(t))
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
require retFut.isOk()
## When
let res = driver.getMessages(
let res = waitFor driver.getMessages(
contentTopic= @[DefaultContentTopic],
maxPageSize=pageSize,
ascendingOrder=true
@ -276,7 +282,7 @@ suite "Queue driver - query by content topic":
filteredMessages.len == 40
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
suite "SQLite driver - query by pubsub topic":
@ -306,10 +312,11 @@ suite "SQLite driver - query by pubsub topic":
for row in messages:
let (topic, msg) = row
require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk()
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
require retFut.isOk()
## When
let res = driver.getMessages(
let res = waitFor driver.getMessages(
pubsubTopic=some(pubsubTopic),
maxPageSize=2,
ascendingOrder=true
@ -325,7 +332,7 @@ suite "SQLite driver - query by pubsub topic":
filteredMessages == expectedMessages[4..5]
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
test "no pubsub topic":
## Given
@ -352,10 +359,11 @@ suite "SQLite driver - query by pubsub topic":
for row in messages:
let (topic, msg) = row
require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk()
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
require retFut.isOk()
## When
let res = driver.getMessages(
let res = waitFor driver.getMessages(
maxPageSize=2,
ascendingOrder=true
)
@ -370,7 +378,7 @@ suite "SQLite driver - query by pubsub topic":
filteredMessages == expectedMessages[0..1]
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
test "content topic and pubsub topic":
## Given
@ -398,10 +406,11 @@ suite "SQLite driver - query by pubsub topic":
for row in messages:
let (topic, msg) = row
require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk()
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
require retFut.isOk()
## When
let res = driver.getMessages(
let res = waitFor driver.getMessages(
contentTopic= @[contentTopic],
pubsubTopic=some(pubsubTopic),
maxPageSize=2,
@ -418,7 +427,7 @@ suite "SQLite driver - query by pubsub topic":
filteredMessages == expectedMessages[4..5]
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
suite "Queue driver - query by cursor":
@ -447,12 +456,13 @@ suite "Queue driver - query by cursor":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
require retFut.isOk()
let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])
## When
let res = driver.getMessages(
let res = waitFor driver.getMessages(
cursor=some(cursor),
maxPageSize=2,
ascendingOrder=true
@ -467,7 +477,7 @@ suite "Queue driver - query by cursor":
filteredMessages == expected[5..6]
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
test "only cursor - descending order":
## Given
@ -493,12 +503,13 @@ suite "Queue driver - query by cursor":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
require retFut.isOk()
let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])
## When
let res = driver.getMessages(
let res = waitFor driver.getMessages(
cursor=some(cursor),
maxPageSize=2,
ascendingOrder=false
@ -513,7 +524,7 @@ suite "Queue driver - query by cursor":
filteredMessages == expected[2..3].reversed()
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
test "content topic and cursor":
## Given
@ -537,12 +548,13 @@ suite "Queue driver - query by cursor":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
require retFut.isOk()
let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])
## When
let res = driver.getMessages(
let res = waitFor driver.getMessages(
contentTopic= @[contentTopic],
cursor=some(cursor),
maxPageSize=10,
@ -558,7 +570,7 @@ suite "Queue driver - query by cursor":
filteredMessages == expected[5..6]
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
test "content topic and cursor - descending order":
## Given
@ -582,12 +594,13 @@ suite "Queue driver - query by cursor":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
require retFut.isOk()
let cursor = computeTestCursor(DefaultPubsubTopic, expected[6])
## When
let res = driver.getMessages(
let res = waitFor driver.getMessages(
contentTopic= @[contentTopic],
cursor=some(cursor),
maxPageSize=10,
@ -603,7 +616,7 @@ suite "Queue driver - query by cursor":
filteredMessages == expected[2..5].reversed()
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
test "pubsub topic and cursor":
## Given
@ -634,12 +647,13 @@ suite "Queue driver - query by cursor":
for row in messages:
let (topic, msg) = row
require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk()
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
require retFut.isOk()
let cursor = computeTestCursor(expected[5][0], expected[5][1])
## When
let res = driver.getMessages(
let res = waitFor driver.getMessages(
pubsubTopic=some(pubsubTopic),
cursor=some(cursor),
maxPageSize=10,
@ -656,7 +670,7 @@ suite "Queue driver - query by cursor":
filteredMessages == expectedMessages[6..7]
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
test "pubsub topic and cursor - descending order":
## Given
@ -687,12 +701,13 @@ suite "Queue driver - query by cursor":
for row in messages:
let (topic, msg) = row
require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk()
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
require retFut.isOk()
let cursor = computeTestCursor(expected[6][0], expected[6][1])
## When
let res = driver.getMessages(
let res = waitFor driver.getMessages(
pubsubTopic=some(pubsubTopic),
cursor=some(cursor),
maxPageSize=10,
@ -709,7 +724,7 @@ suite "Queue driver - query by cursor":
filteredMessages == expectedMessages[4..5].reversed()
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
suite "Queue driver - query by time range":
@ -737,10 +752,11 @@ suite "Queue driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
require retFut.isOk()
## When
let res = driver.getMessages(
let res = waitFor driver.getMessages(
startTime=some(ts(15, timeOrigin)),
maxPageSize=10,
ascendingOrder=true
@ -755,7 +771,7 @@ suite "Queue driver - query by time range":
filteredMessages == expected[2..6]
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
test "end time only":
## Given
@ -780,10 +796,11 @@ suite "Queue driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
require retFut.isOk()
## When
let res = driver.getMessages(
let res = waitFor driver.getMessages(
endTime=some(ts(45, timeOrigin)),
maxPageSize=10,
ascendingOrder=true
@ -798,7 +815,7 @@ suite "Queue driver - query by time range":
filteredMessages == expected[0..4]
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
test "start time and end time":
## Given
@ -829,10 +846,11 @@ suite "Queue driver - query by time range":
for row in messages:
let (topic, msg) = row
require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk()
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
require retFut.isOk()
## When
let res = driver.getMessages(
let res = waitFor driver.getMessages(
startTime=some(ts(15, timeOrigin)),
endTime=some(ts(45, timeOrigin)),
maxPageSize=10,
@ -849,7 +867,7 @@ suite "Queue driver - query by time range":
filteredMessages == expectedMessages[2..4]
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
test "invalid time range - no results":
## Given
@ -875,10 +893,11 @@ suite "Queue driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
require retFut.isOk()
## When
let res = driver.getMessages(
let res = waitFor driver.getMessages(
contentTopic= @[contentTopic],
startTime=some(ts(45, timeOrigin)),
endTime=some(ts(15, timeOrigin)),
@ -894,9 +913,9 @@ suite "Queue driver - query by time range":
filteredMessages.len == 0
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
test "time range start and content topic":
asynctest "time range start and content topic":
## Given
const contentTopic = "test-content-topic"
@ -919,10 +938,11 @@ suite "Queue driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
require retFut.isOk()
## When
let res = driver.getMessages(
let res = waitFor driver.getMessages(
contentTopic= @[contentTopic],
startTime=some(ts(15, timeOrigin)),
maxPageSize=10,
@ -937,7 +957,7 @@ suite "Queue driver - query by time range":
filteredMessages == expected[2..6]
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
test "time range start and content topic - descending order":
## Given
@ -965,10 +985,11 @@ suite "Queue driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
require retFut.isOk()
## When
let res = driver.getMessages(
let res = waitFor driver.getMessages(
contentTopic= @[contentTopic],
startTime=some(ts(15, timeOrigin)),
maxPageSize=10,
@ -983,9 +1004,9 @@ suite "Queue driver - query by time range":
filteredMessages == expected[2..6].reversed()
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
test "time range start, single content topic and cursor":
asynctest "time range start, single content topic and cursor":
## Given
const contentTopic = "test-content-topic"
@ -1011,12 +1032,13 @@ suite "Queue driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
require retFut.isOk()
let cursor = computeTestCursor(DefaultPubsubTopic, expected[3])
## When
let res = driver.getMessages(
let res = waitFor driver.getMessages(
contentTopic= @[contentTopic],
cursor=some(cursor),
startTime=some(ts(15, timeOrigin)),
@ -1032,9 +1054,9 @@ suite "Queue driver - query by time range":
filteredMessages == expected[4..9]
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
test "time range start, single content topic and cursor - descending order":
asynctest "time range start, single content topic and cursor - descending order":
## Given
const contentTopic = "test-content-topic"
@ -1060,12 +1082,13 @@ suite "Queue driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
require retFut.isOk()
let cursor = computeTestCursor(DefaultPubsubTopic, expected[6])
## When
let res = driver.getMessages(
let res = waitFor driver.getMessages(
contentTopic= @[contentTopic],
cursor=some(cursor),
startTime=some(ts(15, timeOrigin)),
@ -1081,7 +1104,7 @@ suite "Queue driver - query by time range":
filteredMessages == expected[3..4].reversed()
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
test "time range, content topic, pubsub topic and cursor":
## Given
@ -1112,12 +1135,13 @@ suite "Queue driver - query by time range":
for row in messages:
let (topic, msg) = row
require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk()
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
require retFut.isOk()
let cursor = computeTestCursor(DefaultPubsubTopic, expected[1][1])
## When
let res = driver.getMessages(
let res = waitFor driver.getMessages(
contentTopic= @[contentTopic],
pubsubTopic=some(pubsubTopic),
cursor=some(cursor),
@ -1136,7 +1160,7 @@ suite "Queue driver - query by time range":
filteredMessages == expectedMessages[3..4]
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
test "time range, content topic, pubsub topic and cursor - descending order":
## Given
@ -1167,12 +1191,13 @@ suite "Queue driver - query by time range":
for row in messages:
let (topic, msg) = row
require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk()
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
require retFut.isOk()
let cursor = computeTestCursor(expected[7][0], expected[7][1])
## When
let res = driver.getMessages(
let res = waitFor driver.getMessages(
contentTopic= @[contentTopic],
pubsubTopic=some(pubsubTopic),
cursor=some(cursor),
@ -1191,7 +1216,7 @@ suite "Queue driver - query by time range":
filteredMessages == expectedMessages[4..5].reversed()
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
test "time range, content topic, pubsub topic and cursor - cursor timestamp out of time range":
## Given
@ -1222,12 +1247,13 @@ suite "Queue driver - query by time range":
for row in messages:
let (topic, msg) = row
require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk()
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
require retFut.isOk()
let cursor = computeTestCursor(expected[1][0], expected[1][1])
## When
let res = driver.getMessages(
let res = waitFor driver.getMessages(
contentTopic= @[contentTopic],
pubsubTopic=some(pubsubTopic),
cursor=some(cursor),
@ -1247,7 +1273,7 @@ suite "Queue driver - query by time range":
filteredMessages == expectedMessages[4..5]
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
test "time range, content topic, pubsub topic and cursor - cursor timestamp out of time range, descending order":
## Given
@ -1278,12 +1304,13 @@ suite "Queue driver - query by time range":
for row in messages:
let (topic, msg) = row
require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk()
let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp)
require retFut.isOk()
let cursor = computeTestCursor(expected[1][0], expected[1][1])
## When
let res = driver.getMessages(
let res = waitFor driver.getMessages(
contentTopic= @[contentTopic],
pubsubTopic=some(pubsubTopic),
cursor=some(cursor),
@ -1302,4 +1329,4 @@ suite "Queue driver - query by time range":
filteredMessages.len == 0
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")

View File

@ -39,7 +39,7 @@ suite "SQLite driver":
not driver.isNil()
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
test "insert a message":
## Given
@ -50,13 +50,13 @@ suite "SQLite driver":
let msg = fakeWakuMessage(contentTopic=contentTopic)
## When
let putRes = driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
let putRes = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)
## Then
check:
putRes.isOk()
let storedMsg = driver.getAllMessages().tryGet()
let storedMsg = (waitFor driver.getAllMessages()).tryGet()
check:
storedMsg.len == 1
storedMsg.all do (item: auto) -> bool:
@ -65,4 +65,4 @@ suite "SQLite driver":
pubsubTopic == DefaultPubsubTopic
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")

View File

@ -39,7 +39,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC
suite "SQLite driver - query by content topic":
test "no content topic":
asyncTest "no content topic":
## Given
const contentTopic = "test-content-topic"
@ -62,10 +62,10 @@ suite "SQLite driver - query by content topic":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
## When
let res = driver.getMessages(
let res = await driver.getMessages(
maxPageSize=5,
ascendingOrder=true
)
@ -79,9 +79,9 @@ suite "SQLite driver - query by content topic":
filteredMessages == expected[0..4]
## Cleanup
driver.close().expect("driver to close")
(await driver.close()).expect("driver to close")
test "single content topic":
asyncTest "single content topic":
## Given
const contentTopic = "test-content-topic"
@ -105,10 +105,10 @@ suite "SQLite driver - query by content topic":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
## When
let res = driver.getMessages(
let res = await driver.getMessages(
contentTopic= @[contentTopic],
maxPageSize=2,
ascendingOrder=true
@ -123,9 +123,9 @@ suite "SQLite driver - query by content topic":
filteredMessages == expected[2..3]
## Cleanup
driver.close().expect("driver to close")
(await driver.close()).expect("driver to close")
test "single content topic - descending order":
asyncTest "single content topic - descending order":
## Given
const contentTopic = "test-content-topic"
@ -149,10 +149,10 @@ suite "SQLite driver - query by content topic":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
## When
let res = driver.getMessages(
let res = await driver.getMessages(
contentTopic= @[contentTopic],
maxPageSize=2,
ascendingOrder=false
@ -167,9 +167,9 @@ suite "SQLite driver - query by content topic":
filteredMessages == expected[6..7].reversed()
## Cleanup
driver.close().expect("driver to close")
(await driver.close()).expect("driver to close")
test "multiple content topic":
asyncTest "multiple content topic":
## Given
const contentTopic1 = "test-content-topic-1"
const contentTopic2 = "test-content-topic-2"
@ -195,10 +195,10 @@ suite "SQLite driver - query by content topic":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
## When
let res = driver.getMessages(
let res = await driver.getMessages(
contentTopic= @[contentTopic1, contentTopic2],
maxPageSize=2,
ascendingOrder=true
@ -213,9 +213,9 @@ suite "SQLite driver - query by content topic":
filteredMessages == expected[2..3]
## Cleanup
driver.close().expect("driver to close")
(await driver.close()).expect("driver to close")
test "single content topic - no results":
asyncTest "single content topic - no results":
## Given
const contentTopic = "test-content-topic"
@ -234,10 +234,10 @@ suite "SQLite driver - query by content topic":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
## When
let res = driver.getMessages(
let res = await driver.getMessages(
contentTopic= @[contentTopic],
maxPageSize=2,
ascendingOrder=true
@ -252,9 +252,9 @@ suite "SQLite driver - query by content topic":
filteredMessages.len == 0
## Cleanup
driver.close().expect("driver to close")
(await driver.close()).expect("driver to close")
test "content topic and max page size - not enough messages stored":
asyncTest "content topic and max page size - not enough messages stored":
## Given
const pageSize: uint = 50
@ -262,10 +262,10 @@ suite "SQLite driver - query by content topic":
for t in 0..<40:
let msg = fakeWakuMessage(@[byte t], DefaultContentTopic, ts=ts(t))
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
## When
let res = driver.getMessages(
let res = await driver.getMessages(
contentTopic= @[DefaultContentTopic],
maxPageSize=pageSize,
ascendingOrder=true
@ -280,12 +280,12 @@ suite "SQLite driver - query by content topic":
filteredMessages.len == 40
## Cleanup
driver.close().expect("driver to close")
(await driver.close()).expect("driver to close")
suite "SQLite driver - query by pubsub topic":
test "pubsub topic":
asyncTest "pubsub topic":
## Given
const contentTopic = "test-content-topic"
const pubsubTopic = "test-pubsub-topic"
@ -310,10 +310,10 @@ suite "SQLite driver - query by pubsub topic":
for row in messages:
let (topic, msg) = row
require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk()
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
## When
let res = driver.getMessages(
let res = await driver.getMessages(
pubsubTopic=some(pubsubTopic),
maxPageSize=2,
ascendingOrder=true
@ -329,9 +329,9 @@ suite "SQLite driver - query by pubsub topic":
filteredMessages == expectedMessages[4..5]
## Cleanup
driver.close().expect("driver to close")
(await driver.close()).expect("driver to close")
test "no pubsub topic":
asyncTest "no pubsub topic":
## Given
const contentTopic = "test-content-topic"
const pubsubTopic = "test-pubsub-topic"
@ -356,10 +356,10 @@ suite "SQLite driver - query by pubsub topic":
for row in messages:
let (topic, msg) = row
require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk()
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
## When
let res = driver.getMessages(
let res = await driver.getMessages(
maxPageSize=2,
ascendingOrder=true
)
@ -374,9 +374,9 @@ suite "SQLite driver - query by pubsub topic":
filteredMessages == expectedMessages[0..1]
## Cleanup
driver.close().expect("driver to close")
(await driver.close()).expect("driver to close")
test "content topic and pubsub topic":
asyncTest "content topic and pubsub topic":
## Given
const contentTopic = "test-content-topic"
const pubsubTopic = "test-pubsub-topic"
@ -402,10 +402,10 @@ suite "SQLite driver - query by pubsub topic":
for row in messages:
let (topic, msg) = row
require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk()
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
## When
let res = driver.getMessages(
let res = await driver.getMessages(
contentTopic= @[contentTopic],
pubsubTopic=some(pubsubTopic),
maxPageSize=2,
@ -422,12 +422,12 @@ suite "SQLite driver - query by pubsub topic":
filteredMessages == expectedMessages[4..5]
## Cleanup
driver.close().expect("driver to close")
(await driver.close()).expect("driver to close")
suite "SQLite driver - query by cursor":
test "only cursor":
asyncTest "only cursor":
## Given
const contentTopic = "test-content-topic"
@ -451,12 +451,12 @@ suite "SQLite driver - query by cursor":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])
## When
let res = driver.getMessages(
let res = await driver.getMessages(
cursor=some(cursor),
maxPageSize=2,
ascendingOrder=true
@ -471,9 +471,9 @@ suite "SQLite driver - query by cursor":
filteredMessages == expected[5..6]
## Cleanup
driver.close().expect("driver to close")
(await driver.close()).expect("driver to close")
test "only cursor - descending order":
asyncTest "only cursor - descending order":
## Given
const contentTopic = "test-content-topic"
@ -497,12 +497,12 @@ suite "SQLite driver - query by cursor":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])
## When
let res = driver.getMessages(
let res = await driver.getMessages(
cursor=some(cursor),
maxPageSize=2,
ascendingOrder=false
@ -517,9 +517,9 @@ suite "SQLite driver - query by cursor":
filteredMessages == expected[2..3].reversed()
## Cleanup
driver.close().expect("driver to close")
(await driver.close()).expect("driver to close")
test "content topic and cursor":
asyncTest "content topic and cursor":
## Given
const contentTopic = "test-content-topic"
@ -541,12 +541,12 @@ suite "SQLite driver - query by cursor":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])
## When
let res = driver.getMessages(
let res = await driver.getMessages(
contentTopic= @[contentTopic],
cursor=some(cursor),
maxPageSize=10,
@ -562,9 +562,9 @@ suite "SQLite driver - query by cursor":
filteredMessages == expected[5..6]
## Cleanup
driver.close().expect("driver to close")
(await driver.close()).expect("driver to close")
test "content topic and cursor - descending order":
asyncTest "content topic and cursor - descending order":
## Given
const contentTopic = "test-content-topic"
@ -586,12 +586,12 @@ suite "SQLite driver - query by cursor":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
let cursor = computeTestCursor(DefaultPubsubTopic, expected[6])
## When
let res = driver.getMessages(
let res = await driver.getMessages(
contentTopic= @[contentTopic],
cursor=some(cursor),
maxPageSize=10,
@ -607,9 +607,9 @@ suite "SQLite driver - query by cursor":
filteredMessages == expected[2..5].reversed()
## Cleanup
driver.close().expect("driver to close")
(await driver.close()).expect("driver to close")
test "pubsub topic and cursor":
asyncTest "pubsub topic and cursor":
## Given
const contentTopic = "test-content-topic"
const pubsubTopic = "test-pubsub-topic"
@ -638,12 +638,12 @@ suite "SQLite driver - query by cursor":
for row in messages:
let (topic, msg) = row
require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk()
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
let cursor = computeTestCursor(expected[5][0], expected[5][1])
## When
let res = driver.getMessages(
let res = await driver.getMessages(
pubsubTopic=some(pubsubTopic),
cursor=some(cursor),
maxPageSize=10,
@ -660,9 +660,9 @@ suite "SQLite driver - query by cursor":
filteredMessages == expectedMessages[6..7]
## Cleanup
driver.close().expect("driver to close")
(await driver.close()).expect("driver to close")
test "pubsub topic and cursor - descending order":
asyncTest "pubsub topic and cursor - descending order":
## Given
const contentTopic = "test-content-topic"
const pubsubTopic = "test-pubsub-topic"
@ -691,12 +691,12 @@ suite "SQLite driver - query by cursor":
for row in messages:
let (topic, msg) = row
require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk()
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
let cursor = computeTestCursor(expected[6][0], expected[6][1])
## When
let res = driver.getMessages(
let res = await driver.getMessages(
pubsubTopic=some(pubsubTopic),
cursor=some(cursor),
maxPageSize=10,
@ -713,12 +713,12 @@ suite "SQLite driver - query by cursor":
filteredMessages == expectedMessages[4..5].reversed()
## Cleanup
driver.close().expect("driver to close")
(await driver.close()).expect("driver to close")
suite "SQLite driver - query by time range":
test "start time only":
asyncTest "start time only":
## Given
const contentTopic = "test-content-topic"
@ -741,10 +741,10 @@ suite "SQLite driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
## When
let res = driver.getMessages(
let res = await driver.getMessages(
startTime=some(ts(15, timeOrigin)),
maxPageSize=10,
ascendingOrder=true
@ -759,9 +759,9 @@ suite "SQLite driver - query by time range":
filteredMessages == expected[2..6]
## Cleanup
driver.close().expect("driver to close")
(await driver.close()).expect("driver to close")
test "end time only":
asyncTest "end time only":
## Given
const contentTopic = "test-content-topic"
@ -784,10 +784,10 @@ suite "SQLite driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
## When
let res = driver.getMessages(
let res = await driver.getMessages(
endTime=some(ts(45, timeOrigin)),
maxPageSize=10,
ascendingOrder=true
@ -802,9 +802,9 @@ suite "SQLite driver - query by time range":
filteredMessages == expected[0..4]
## Cleanup
driver.close().expect("driver to close")
(await driver.close()).expect("driver to close")
test "start time and end time":
asyncTest "start time and end time":
## Given
const contentTopic = "test-content-topic"
const pubsubTopic = "test-pubsub-topic"
@ -833,10 +833,10 @@ suite "SQLite driver - query by time range":
for row in messages:
let (topic, msg) = row
require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk()
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
## When
let res = driver.getMessages(
let res = await driver.getMessages(
startTime=some(ts(15, timeOrigin)),
endTime=some(ts(45, timeOrigin)),
maxPageSize=10,
@ -853,9 +853,9 @@ suite "SQLite driver - query by time range":
filteredMessages == expectedMessages[2..4]
## Cleanup
driver.close().expect("driver to close")
(await driver.close()).expect("driver to close")
test "invalid time range - no results":
asyncTest "invalid time range - no results":
## Given
const contentTopic = "test-content-topic"
@ -879,10 +879,10 @@ suite "SQLite driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
## When
let res = driver.getMessages(
let res = await driver.getMessages(
contentTopic= @[contentTopic],
startTime=some(ts(45, timeOrigin)),
endTime=some(ts(15, timeOrigin)),
@ -898,9 +898,9 @@ suite "SQLite driver - query by time range":
filteredMessages.len == 0
## Cleanup
driver.close().expect("driver to close")
(await driver.close()).expect("driver to close")
test "time range start and content topic":
asyncTest "time range start and content topic":
## Given
const contentTopic = "test-content-topic"
@ -923,10 +923,10 @@ suite "SQLite driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
## When
let res = driver.getMessages(
let res = await driver.getMessages(
contentTopic= @[contentTopic],
startTime=some(ts(15, timeOrigin)),
maxPageSize=10,
@ -941,9 +941,9 @@ suite "SQLite driver - query by time range":
filteredMessages == expected[2..6]
## Cleanup
driver.close().expect("driver to close")
(await driver.close()).expect("driver to close")
test "time range start and content topic - descending order":
asyncTest "time range start and content topic - descending order":
## Given
const contentTopic = "test-content-topic"
@ -969,10 +969,10 @@ suite "SQLite driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
## When
let res = driver.getMessages(
let res = await driver.getMessages(
contentTopic= @[contentTopic],
startTime=some(ts(15, timeOrigin)),
maxPageSize=10,
@ -987,9 +987,9 @@ suite "SQLite driver - query by time range":
filteredMessages == expected[2..6].reversed()
## Cleanup
driver.close().expect("driver to close")
(await driver.close()).expect("driver to close")
test "time range start, single content topic and cursor":
asyncTest "time range start, single content topic and cursor":
## Given
const contentTopic = "test-content-topic"
@ -1015,12 +1015,12 @@ suite "SQLite driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
let cursor = computeTestCursor(DefaultPubsubTopic, expected[3])
## When
let res = driver.getMessages(
let res = await driver.getMessages(
contentTopic= @[contentTopic],
cursor=some(cursor),
startTime=some(ts(15, timeOrigin)),
@ -1036,9 +1036,9 @@ suite "SQLite driver - query by time range":
filteredMessages == expected[4..9]
## Cleanup
driver.close().expect("driver to close")
(await driver.close()).expect("driver to close")
test "time range start, single content topic and cursor - descending order":
asyncTest "time range start, single content topic and cursor - descending order":
## Given
const contentTopic = "test-content-topic"
@ -1064,12 +1064,12 @@ suite "SQLite driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
let cursor = computeTestCursor(DefaultPubsubTopic, expected[6])
## When
let res = driver.getMessages(
let res = await driver.getMessages(
contentTopic= @[contentTopic],
cursor=some(cursor),
startTime=some(ts(15, timeOrigin)),
@ -1085,9 +1085,9 @@ suite "SQLite driver - query by time range":
filteredMessages == expected[3..4].reversed()
## Cleanup
driver.close().expect("driver to close")
(await driver.close()).expect("driver to close")
test "time range, content topic, pubsub topic and cursor":
asyncTest "time range, content topic, pubsub topic and cursor":
## Given
const contentTopic = "test-content-topic"
const pubsubTopic = "test-pubsub-topic"
@ -1116,12 +1116,12 @@ suite "SQLite driver - query by time range":
for row in messages:
let (topic, msg) = row
require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk()
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
let cursor = computeTestCursor(DefaultPubsubTopic, expected[1][1])
## When
let res = driver.getMessages(
let res = await driver.getMessages(
contentTopic= @[contentTopic],
pubsubTopic=some(pubsubTopic),
cursor=some(cursor),
@ -1140,9 +1140,9 @@ suite "SQLite driver - query by time range":
filteredMessages == expectedMessages[3..4]
## Cleanup
driver.close().expect("driver to close")
(await driver.close()).expect("driver to close")
test "time range, content topic, pubsub topic and cursor - descending order":
asyncTest "time range, content topic, pubsub topic and cursor - descending order":
## Given
const contentTopic = "test-content-topic"
const pubsubTopic = "test-pubsub-topic"
@ -1171,12 +1171,12 @@ suite "SQLite driver - query by time range":
for row in messages:
let (topic, msg) = row
require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk()
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
let cursor = computeTestCursor(expected[7][0], expected[7][1])
## When
let res = driver.getMessages(
let res = await driver.getMessages(
contentTopic= @[contentTopic],
pubsubTopic=some(pubsubTopic),
cursor=some(cursor),
@ -1195,9 +1195,9 @@ suite "SQLite driver - query by time range":
filteredMessages == expectedMessages[4..5].reversed()
## Cleanup
driver.close().expect("driver to close")
(await driver.close()).expect("driver to close")
test "time range, content topic, pubsub topic and cursor - cursor timestamp out of time range":
asyncTest "time range, content topic, pubsub topic and cursor - cursor timestamp out of time range":
## Given
const contentTopic = "test-content-topic"
const pubsubTopic = "test-pubsub-topic"
@ -1226,12 +1226,12 @@ suite "SQLite driver - query by time range":
for row in messages:
let (topic, msg) = row
require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk()
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
let cursor = computeTestCursor(expected[1][0], expected[1][1])
## When
let res = driver.getMessages(
let res = await driver.getMessages(
contentTopic= @[contentTopic],
pubsubTopic=some(pubsubTopic),
cursor=some(cursor),
@ -1251,9 +1251,9 @@ suite "SQLite driver - query by time range":
filteredMessages == expectedMessages[4..5]
## Cleanup
driver.close().expect("driver to close")
(await driver.close()).expect("driver to close")
test "time range, content topic, pubsub topic and cursor - cursor timestamp out of time range, descending order":
asyncTest "time range, content topic, pubsub topic and cursor - cursor timestamp out of time range, descending order":
## Given
const contentTopic = "test-content-topic"
const pubsubTopic = "test-pubsub-topic"
@ -1282,12 +1282,12 @@ suite "SQLite driver - query by time range":
for row in messages:
let (topic, msg) = row
require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk()
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
let cursor = computeTestCursor(expected[1][0], expected[1][1])
## When
let res = driver.getMessages(
let res = await driver.getMessages(
contentTopic= @[contentTopic],
pubsubTopic=some(pubsubTopic),
cursor=some(cursor),
@ -1306,4 +1306,4 @@ suite "SQLite driver - query by time range":
filteredMessages.len == 0
## Cleanup
driver.close().expect("driver to close")
(await driver.close()).expect("driver to close")

View File

@ -40,11 +40,11 @@ suite "Waku Archive - Retention policy":
for i in 1..capacity+excess:
let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i))
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require retentionPolicy.execute(driver).isOk()
require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (waitFor retentionPolicy.execute(driver)).isOk()
## Then
let numMessages = driver.getMessagesCount().tryGet()
let numMessages = (waitFor driver.getMessagesCount()).tryGet()
check:
# Expected number of messages is 120 because
# (capacity = 100) + (half of the overflow window = 15) + (5 messages added after after the last delete)
@ -52,7 +52,7 @@ suite "Waku Archive - Retention policy":
numMessages == 120
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")
test "store capacity should be limited":
## Given
@ -76,11 +76,11 @@ suite "Waku Archive - Retention policy":
## When
for msg in messages:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require retentionPolicy.execute(driver).isOk()
require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (waitFor retentionPolicy.execute(driver)).isOk()
## Then
let storedMsg = driver.getAllMessages().tryGet()
let storedMsg = (waitFor driver.getAllMessages()).tryGet()
check:
storedMsg.len == capacity
storedMsg.all do (item: auto) -> bool:
@ -89,4 +89,4 @@ suite "Waku Archive - Retention policy":
pubsubTopic == DefaultPubsubTopic
## Cleanup
driver.close().expect("driver to close")
(waitFor driver.close()).expect("driver to close")

View File

@ -4,6 +4,7 @@ import
std/[options, sequtils],
testutils/unittests,
chronicles,
chronos,
libp2p/crypto/crypto
import
../../../waku/common/sqlite,
@ -47,11 +48,11 @@ suite "Waku Archive - message handling":
let message = fakeWakuMessage(ephemeral=false, ts=validSenderTime)
## When
archive.handleMessage(DefaultPubSubTopic, message)
waitFor archive.handleMessage(DefaultPubSubTopic, message)
## Then
check:
driver.getMessagesCount().tryGet() == 1
(waitFor driver.getMessagesCount()).tryGet() == 1
test "it should not driver an ephemeral message":
## Setup
@ -69,11 +70,11 @@ suite "Waku Archive - message handling":
## When
for msg in msgList:
archive.handleMessage(DefaultPubsubTopic, msg)
waitFor archive.handleMessage(DefaultPubsubTopic, msg)
## Then
check:
driver.getMessagesCount().tryGet() == 2
(waitFor driver.getMessagesCount()).tryGet() == 2
test "it should driver a message with no sender timestamp":
## Setup
@ -85,11 +86,11 @@ suite "Waku Archive - message handling":
let message = fakeWakuMessage(ts=invalidSenderTime)
## When
archive.handleMessage(DefaultPubSubTopic, message)
waitFor archive.handleMessage(DefaultPubSubTopic, message)
## Then
check:
driver.getMessagesCount().tryGet() == 1
(waitFor driver.getMessagesCount()).tryGet() == 1
test "it should not driver a message with a sender time variance greater than max time variance (future)":
## Setup
@ -104,11 +105,11 @@ suite "Waku Archive - message handling":
let message = fakeWakuMessage(ts=invalidSenderTime)
## When
archive.handleMessage(DefaultPubSubTopic, message)
waitFor archive.handleMessage(DefaultPubSubTopic, message)
## Then
check:
driver.getMessagesCount().tryGet() == 0
(waitFor driver.getMessagesCount()).tryGet() == 0
test "it should not driver a message with a sender time variance greater than max time variance (past)":
## Setup
@ -123,11 +124,11 @@ suite "Waku Archive - message handling":
let message = fakeWakuMessage(ts=invalidSenderTime)
## When
archive.handleMessage(DefaultPubSubTopic, message)
waitFor archive.handleMessage(DefaultPubSubTopic, message)
## Then
check:
driver.getMessagesCount().tryGet() == 0
(waitFor driver.getMessagesCount()).tryGet() == 0
procSuite "Waku Archive - find messages":
@ -147,14 +148,14 @@ procSuite "Waku Archive - find messages":
]
let archiveA = block:
let
driver = newTestArchiveDriver()
archive = newTestWakuArchive(driver)
let
driver = newTestArchiveDriver()
archive = newTestWakuArchive(driver)
for msg in msgListA:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
for msg in msgListA:
require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
archive
archive
test "handle query":
## Setup
@ -167,14 +168,14 @@ procSuite "Waku Archive - find messages":
msg1 = fakeWakuMessage(contentTopic=topic)
msg2 = fakeWakuMessage()
archive.handleMessage("foo", msg1)
archive.handleMessage("foo", msg2)
waitFor archive.handleMessage("foo", msg1)
waitFor archive.handleMessage("foo", msg2)
## Given
let req = ArchiveQuery(contentTopics: @[topic])
## When
let queryRes = archive.findMessages(req)
let queryRes = waitFor archive.findMessages(req)
## Then
check:
@ -201,15 +202,15 @@ procSuite "Waku Archive - find messages":
msg2 = fakeWakuMessage(contentTopic=topic2)
msg3 = fakeWakuMessage(contentTopic=topic3)
archive.handleMessage("foo", msg1)
archive.handleMessage("foo", msg2)
archive.handleMessage("foo", msg3)
waitFor archive.handleMessage("foo", msg1)
waitFor archive.handleMessage("foo", msg2)
waitFor archive.handleMessage("foo", msg3)
## Given
let req = ArchiveQuery(contentTopics: @[topic1, topic3])
## When
let queryRes = archive.findMessages(req)
let queryRes = waitFor archive.findMessages(req)
## Then
check:
@ -233,7 +234,7 @@ procSuite "Waku Archive - find messages":
let req = ArchiveQuery(contentTopics: queryTopics)
## When
let queryRes = archive.findMessages(req)
let queryRes = waitFor archive.findMessages(req)
## Then
check:
@ -264,9 +265,9 @@ procSuite "Waku Archive - find messages":
msg2 = fakeWakuMessage(contentTopic=contentTopic2)
msg3 = fakeWakuMessage(contentTopic=contentTopic3)
archive.handleMessage(pubsubtopic1, msg1)
archive.handleMessage(pubsubtopic2, msg2)
archive.handleMessage(pubsubtopic2, msg3)
waitFor archive.handleMessage(pubsubtopic1, msg1)
waitFor archive.handleMessage(pubsubtopic2, msg2)
waitFor archive.handleMessage(pubsubtopic2, msg3)
## Given
# This query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3)
@ -276,7 +277,7 @@ procSuite "Waku Archive - find messages":
)
## When
let queryRes = archive.findMessages(req)
let queryRes = waitFor archive.findMessages(req)
## Then
check:
@ -302,15 +303,15 @@ procSuite "Waku Archive - find messages":
msg2 = fakeWakuMessage()
msg3 = fakeWakuMessage()
archive.handleMessage(pubsubtopic2, msg1)
archive.handleMessage(pubsubtopic2, msg2)
archive.handleMessage(pubsubtopic2, msg3)
waitFor archive.handleMessage(pubsubtopic2, msg1)
waitFor archive.handleMessage(pubsubtopic2, msg2)
waitFor archive.handleMessage(pubsubtopic2, msg3)
## Given
let req = ArchiveQuery(pubsubTopic: some(pubsubTopic1))
## When
let res = archive.findMessages(req)
let res = waitFor archive.findMessages(req)
## Then
check:
@ -333,15 +334,15 @@ procSuite "Waku Archive - find messages":
msg2 = fakeWakuMessage(payload="TEST-2")
msg3 = fakeWakuMessage(payload="TEST-3")
archive.handleMessage(pubsubTopic, msg1)
archive.handleMessage(pubsubTopic, msg2)
archive.handleMessage(pubsubTopic, msg3)
waitFor archive.handleMessage(pubsubTopic, msg1)
waitFor archive.handleMessage(pubsubTopic, msg2)
waitFor archive.handleMessage(pubsubTopic, msg3)
## Given
let req = ArchiveQuery(pubsubTopic: some(pubsubTopic))
## When
let res = archive.findMessages(req)
let res = waitFor archive.findMessages(req)
## Then
check:
@ -368,7 +369,7 @@ procSuite "Waku Archive - find messages":
var cursors = newSeq[Option[ArchiveCursor]](3)
for i in 0..<3:
let res = archiveA.findMessages(nextReq)
let res = waitFor archiveA.findMessages(nextReq)
require res.isOk()
# Keep query response content
@ -404,7 +405,7 @@ procSuite "Waku Archive - find messages":
var cursors = newSeq[Option[ArchiveCursor]](3)
for i in 0..<3:
let res = archiveA.findMessages(nextReq)
let res = waitFor archiveA.findMessages(nextReq)
require res.isOk()
# Keep query response content
@ -446,13 +447,13 @@ procSuite "Waku Archive - find messages":
]
for msg in msgList:
require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
## Given
let req = ArchiveQuery(contentTopics: @[DefaultContentTopic])
## When
let res = archive.findMessages(req)
let res = waitFor archive.findMessages(req)
## Then
check:
@ -475,7 +476,7 @@ procSuite "Waku Archive - find messages":
)
## When
let res = archiveA.findMessages(req)
let res = waitFor archiveA.findMessages(req)
## Then
check res.isOk()
@ -495,7 +496,7 @@ procSuite "Waku Archive - find messages":
)
## When
let res = archiveA.findMessages(req)
let res = waitFor archiveA.findMessages(req)
## Then
check res.isOk()
@ -514,7 +515,7 @@ procSuite "Waku Archive - find messages":
)
## When
let res = archiveA.findMessages(req)
let res = waitFor archiveA.findMessages(req)
## Then
check res.isOk()

View File

@ -14,8 +14,6 @@ import
../testlib/common,
../testlib/wakucore
proc newTestWakuStore(switch: Switch, handler: HistoryQueryHandler): Future[WakuStore] {.async.} =
let
peerManager = PeerManager.new(switch)
@ -27,8 +25,7 @@ proc newTestWakuStore(switch: Switch, handler: HistoryQueryHandler): Future[Waku
return proto
proc newTestWakuStoreClient(switch: Switch): WakuStoreClient =
let
peerManager = PeerManager.new(switch)
let peerManager = PeerManager.new(switch)
WakuStoreClient.new(peerManager, rng)
@ -48,7 +45,8 @@ suite "Waku Store - query handler":
let msg = fakeWakuMessage(contentTopic=DefaultContentTopic)
var queryHandlerFut = newFuture[(HistoryQuery)]()
let queryHandler = proc(req: HistoryQuery): HistoryResult =
let queryHandler = proc(req: HistoryQuery): Future[HistoryResult] {.async, gcsafe.} =
queryHandlerFut.complete(req)
return ok(HistoryResponse(messages: @[msg]))
@ -90,7 +88,7 @@ suite "Waku Store - query handler":
let serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo()
var queryHandlerFut = newFuture[(HistoryQuery)]()
let queryHandler = proc(req: HistoryQuery): HistoryResult =
let queryHandler = proc(req: HistoryQuery): Future[HistoryResult] {.async, gcsafe.} =
queryHandlerFut.complete(req)
return err(HistoryError(kind: HistoryErrorKind.BAD_REQUEST))

View File

@ -25,7 +25,6 @@ import
../testlib/wakucore,
../testlib/wakunode
proc newTestArchiveDriver(): ArchiveDriver =
let database = SqliteDatabase.new(":memory:").tryGet()
SqliteDriver.new(database).tryGet()
@ -55,15 +54,15 @@ procSuite "WakuNode - Store":
]
let archiveA = block:
let driver = newTestArchiveDriver()
let driver = newTestArchiveDriver()
for msg in msgListA:
let msg_digest = waku_archive.computeDigest(msg)
require driver.put(DefaultPubsubTopic, msg, msg_digest, msg.timestamp).isOk()
for msg in msgListA:
let msg_digest = waku_archive.computeDigest(msg)
require (waitFor driver.put(DefaultPubsubTopic, msg, msg_digest, msg.timestamp)).isOk()
driver
driver
asyncTest "Store protocol returns expected messages":
test "Store protocol returns expected messages":
## Setup
let
serverKey = generateSecp256k1Key()
@ -71,10 +70,10 @@ procSuite "WakuNode - Store":
clientKey = generateSecp256k1Key()
client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0))
await allFutures(client.start(), server.start())
waitFor allFutures(client.start(), server.start())
server.mountArchive(some(archiveA), none(MessageValidator), none(RetentionPolicy))
await server.mountStore()
waitFor server.mountStore()
client.mountStoreClient()
@ -83,7 +82,7 @@ procSuite "WakuNode - Store":
let serverPeer = server.peerInfo.toRemotePeerInfo()
## When
let queryRes = await client.query(req, peer=serverPeer)
let queryRes = waitFor client.query(req, peer=serverPeer)
## Then
check queryRes.isOk()
@ -93,9 +92,9 @@ procSuite "WakuNode - Store":
response.messages == msgListA
# Cleanup
await allFutures(client.stop(), server.stop())
waitFor allFutures(client.stop(), server.stop())
asyncTest "Store node history response - forward pagination":
test "Store node history response - forward pagination":
## Setup
let
serverKey = generateSecp256k1Key()
@ -103,10 +102,10 @@ procSuite "WakuNode - Store":
clientKey = generateSecp256k1Key()
client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0))
await allFutures(client.start(), server.start())
waitFor allFutures(client.start(), server.start())
server.mountArchive(some(archiveA), none(MessageValidator), none(RetentionPolicy))
await server.mountStore()
waitFor server.mountStore()
client.mountStoreClient()
@ -121,7 +120,7 @@ procSuite "WakuNode - Store":
var cursors = newSeq[Option[HistoryCursor]](2)
for i in 0..<2:
let res = await client.query(nextReq, peer=serverPeer)
let res = waitFor client.query(nextReq, peer=serverPeer)
require res.isOk()
# Keep query response content
@ -142,9 +141,9 @@ procSuite "WakuNode - Store":
pages[1] == msgListA[7..9]
# Cleanup
await allFutures(client.stop(), server.stop())
waitFor allFutures(client.stop(), server.stop())
asyncTest "Store node history response - backward pagination":
test "Store node history response - backward pagination":
## Setup
let
serverKey = generateSecp256k1Key()
@ -152,10 +151,10 @@ procSuite "WakuNode - Store":
clientKey = generateSecp256k1Key()
client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0))
await allFutures(client.start(), server.start())
waitFor allFutures(client.start(), server.start())
server.mountArchive(some(archiveA), none(MessageValidator), none(RetentionPolicy))
await server.mountStore()
waitFor server.mountStore()
client.mountStoreClient()
@ -170,7 +169,7 @@ procSuite "WakuNode - Store":
var cursors = newSeq[Option[HistoryCursor]](2)
for i in 0..<2:
let res = await client.query(nextReq, peer=serverPeer)
let res = waitFor client.query(nextReq, peer=serverPeer)
require res.isOk()
# Keep query response content
@ -191,9 +190,9 @@ procSuite "WakuNode - Store":
pages[1] == msgListA[0..2]
# Cleanup
await allFutures(client.stop(), server.stop())
waitFor allFutures(client.stop(), server.stop())
asyncTest "Store protocol returns expected message when relay is disabled and filter enabled":
test "Store protocol returns expected message when relay is disabled and filter enabled":
## See nwaku issue #937: 'Store: ability to decouple store from relay'
## Setup
let
@ -204,13 +203,13 @@ procSuite "WakuNode - Store":
clientKey = generateSecp256k1Key()
client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0))
await allFutures(client.start(), server.start(), filterSource.start())
waitFor allFutures(client.start(), server.start(), filterSource.start())
await filterSource.mountFilter()
waitFor filterSource.mountFilter()
let driver = newTestArchiveDriver()
server.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
await server.mountStore()
await server.mountFilterClient()
waitFor server.mountStore()
waitFor server.mountFilterClient()
client.mountStoreClient()
## Given
@ -221,20 +220,20 @@ procSuite "WakuNode - Store":
## Then
let filterFut = newFuture[(PubsubTopic, WakuMessage)]()
proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.gcsafe, closure.} =
proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} =
filterFut.complete((pubsubTopic, msg))
await server.filterSubscribe(DefaultPubsubTopic, DefaultContentTopic, filterHandler, peer=filterSourcePeer)
waitFor server.filterSubscribe(DefaultPubsubTopic, DefaultContentTopic, filterHandler, peer=filterSourcePeer)
await sleepAsync(100.millis)
waitFor sleepAsync(100.millis)
# Send filter push message to server from source node
await filterSource.wakuFilterLegacy.handleMessage(DefaultPubsubTopic, message)
waitFor filterSource.wakuFilterLegacy.handleMessage(DefaultPubsubTopic, message)
# Wait for the server filter to receive the push message
require await filterFut.withTimeout(5.seconds)
require waitFor filterFut.withTimeout(5.seconds)
let res = await client.query(HistoryQuery(contentTopics: @[DefaultContentTopic]), peer=serverPeer)
let res = waitFor client.query(HistoryQuery(contentTopics: @[DefaultContentTopic]), peer=serverPeer)
## Then
check res.isOk()
@ -250,4 +249,4 @@ procSuite "WakuNode - Store":
handledMsg == message
## Cleanup
await allFutures(client.stop(), server.stop(), filterSource.stop())
waitFor allFutures(client.stop(), server.stop(), filterSource.stop())

View File

@ -22,7 +22,7 @@ import
../../v2/testlib/wakunode
proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Result[void, string] =
proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Future[Result[void, string]] =
let
digest = waku_archive.computeDigest(message)
receivedTime = if message.timestamp > 0: message.timestamp
@ -83,7 +83,7 @@ procSuite "Waku v2 JSON-RPC API - Store":
]
for msg in msgList:
require driver.put(DefaultPubsubTopic, msg).isOk()
require (waitFor driver.put(DefaultPubsubTopic, msg)).isOk()
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)
@ -133,7 +133,7 @@ procSuite "Waku v2 JSON-RPC API - Store":
fakeWakuMessage(@[byte 9], ts=9)
]
for msg in msgList:
require driver.put(DefaultPubsubTopic, msg).isOk()
require (waitFor driver.put(DefaultPubsubTopic, msg)).isOk()
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)

View File

@ -28,7 +28,7 @@ import
logScope:
topics = "waku node rest store_api test"
proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Result[void, string] =
proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Future[Result[void, string]] =
let
digest = waku_archive.computeDigest(message)
receivedTime = if message.timestamp > 0: message.timestamp
@ -107,7 +107,7 @@ procSuite "Waku v2 Rest API - Store":
fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("c2"), ts=9)
]
for msg in msgList:
require driver.put(DefaultPubsubTopic, msg).isOk()
require (waitFor driver.put(DefaultPubsubTopic, msg)).isOk()
let client = newRestHttpClient(initTAddress(restAddress, restPort))
@ -178,7 +178,7 @@ procSuite "Waku v2 Rest API - Store":
fakeWakuMessage(@[byte 09], ts=ts(90, timeOrigin))
]
for msg in msgList:
require driver.put(DefaultPubsubTopic, msg).isOk()
require (waitFor driver.put(DefaultPubsubTopic, msg)).isOk()
let client = newRestHttpClient(initTAddress(restAddress, restPort))
@ -266,7 +266,7 @@ procSuite "Waku v2 Rest API - Store":
fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("2"), ts=9)
]
for msg in msgList:
require driver.put(DefaultPubsubTopic, msg).isOk()
require (waitFor driver.put(DefaultPubsubTopic, msg)).isOk()
let client = newRestHttpClient(initTAddress(restAddress, restPort))
@ -338,7 +338,7 @@ procSuite "Waku v2 Rest API - Store":
fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("ct2"), ts=9)
]
for msg in msgList:
require driver.put(DefaultPubsubTopic, msg).isOk()
require (waitFor driver.put(DefaultPubsubTopic, msg)).isOk()
let client = newRestHttpClient(initTAddress(restAddress, restPort))
@ -427,7 +427,7 @@ procSuite "Waku v2 Rest API - Store":
fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("ct2"), ts=9)
]
for msg in msgList:
require driver.put(DefaultPubsubTopic, msg).isOk()
require (waitFor driver.put(DefaultPubsubTopic, msg)).isOk()
let client = newRestHttpClient(initTAddress(restAddress, restPort))
@ -482,7 +482,7 @@ procSuite "Waku v2 Rest API - Store":
fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("ct2"), ts=9)
]
for msg in msgList:
require driver.put(DefaultPubsubTopic, msg).isOk()
require (waitFor driver.put(DefaultPubsubTopic, msg)).isOk()
let client = newRestHttpClient(initTAddress(restAddress, restPort))

View File

@ -44,7 +44,7 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message
pubsubTopic: PubsubTopic = topic.get(DefaultPubsubTopic)
contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic)
let handler: FilterPushHandler = proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.gcsafe, closure.} =
let handler: FilterPushHandler = proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} =
cache.addMessage(msg.contentTopic, msg)
let subFut = node.filterSubscribe(pubsubTopic, contentTopics, handler, peerOpt.get())

View File

@ -264,7 +264,7 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
if node.wakuArchive.isNil():
return
node.wakuArchive.handleMessage(topic, msg)
await node.wakuArchive.handleMessage(topic, msg)
let defaultHandler = proc(topic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} =
@ -455,11 +455,11 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: C
# Add handler wrapper to store the message when pushed, when relay is disabled and filter enabled
# TODO: Move this logic to wakunode2 app
let handlerWrapper: FilterPushHandler = proc(pubsubTopic: string, message: WakuMessage) {.raises: [Exception].} =
let handlerWrapper: FilterPushHandler = proc(pubsubTopic: string, message: WakuMessage) {.async, gcsafe, closure.} =
if node.wakuRelay.isNil() and not node.wakuStore.isNil():
node.wakuArchive.handleMessage(pubSubTopic, message)
await node.wakuArchive.handleMessage(pubSubTopic, message)
handler(pubsubTopic, message)
await handler(pubsubTopic, message)
let subRes = await node.wakuFilterClientLegacy.subscribe(pubsubTopic, contentTopics, handlerWrapper, peer=remotePeer)
if subRes.isOk():
@ -521,7 +521,6 @@ proc unsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Conte
await node.filterUnsubscribe(pubsubTopic, contentTopics, peer=peerOpt.get())
## Waku archive
proc mountArchive*(node: WakuNode,
@ -544,8 +543,11 @@ proc executeMessageRetentionPolicy*(node: WakuNode) =
debug "executing message retention policy"
node.wakuArchive.executeMessageRetentionPolicy()
node.wakuArchive.reportStoredMessagesMetric()
try:
waitFor node.wakuArchive.executeMessageRetentionPolicy()
waitFor node.wakuArchive.reportStoredMessagesMetric()
except CatchableError:
debug "Error executing retention policy " & getCurrentExceptionMsg()
proc startMessageRetentionPolicyPeriodicTask*(node: WakuNode, interval: Duration) =
if node.wakuArchive.isNil():
@ -602,14 +604,13 @@ proc mountStore*(node: WakuNode) {.async, raises: [Defect, LPError].} =
return
# TODO: Review this handler logic. Maybe, move it to the appplication code
let queryHandler: HistoryQueryHandler = proc(request: HistoryQuery): HistoryResult =
let queryHandler: HistoryQueryHandler = proc(request: HistoryQuery): Future[HistoryResult] {.async.} =
let request = request.toArchiveQuery()
let response = node.wakuArchive.findMessages(request)
response.toHistoryResult()
let response = await node.wakuArchive.findMessages(request)
return response.toHistoryResult()
node.wakuStore = WakuStore.new(node.peerManager, node.rng, queryHandler)
if node.started:
# Node has started already. Let's start store too.
await node.wakuStore.start()

View File

@ -80,9 +80,9 @@ proc new*(T: type WakuArchive,
retentionPolicy: retentionPolicy.get(nil)
)
proc handleMessage*(w: WakuArchive, pubsubTopic: PubsubTopic, msg: WakuMessage) =
proc handleMessage*(w: WakuArchive,
pubsubTopic: PubsubTopic,
msg: WakuMessage) {.async.} =
if msg.ephemeral:
# Ephemeral message, do not store
return
@ -93,7 +93,6 @@ proc handleMessage*(w: WakuArchive, pubsubTopic: PubsubTopic, msg: WakuMessage)
waku_archive_errors.inc(labelValues = [validationRes.error])
return
let insertStartTime = getTime().toUnixFloat()
block:
@ -104,7 +103,7 @@ proc handleMessage*(w: WakuArchive, pubsubTopic: PubsubTopic, msg: WakuMessage)
trace "handling message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic, timestamp=msg.timestamp, digest=msgDigest
let putRes = w.driver.put(pubsubTopic, msg, msgDigest, msgReceivedTime)
let putRes = await w.driver.put(pubsubTopic, msg, msgDigest, msgReceivedTime)
if putRes.isErr():
error "failed to insert message", err=putRes.error
waku_archive_errors.inc(labelValues = [insertFailure])
@ -113,7 +112,7 @@ proc handleMessage*(w: WakuArchive, pubsubTopic: PubsubTopic, msg: WakuMessage)
waku_archive_insert_duration_seconds.observe(insertDuration)
proc findMessages*(w: WakuArchive, query: ArchiveQuery): ArchiveResult {.gcsafe.} =
proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] {.async, gcsafe.} =
## Search the archive to return a single page of messages matching the query criteria
let
qContentTopics = query.contentTopics
@ -128,10 +127,9 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): ArchiveResult {.gcsafe.
if qContentTopics.len > 10:
return err(ArchiveError.invalidQuery("too many content topics"))
let queryStartTime = getTime().toUnixFloat()
let queryRes = w.driver.getMessages(
let queryRes = await w.driver.getMessages(
contentTopic = qContentTopics,
pubsubTopic = qPubSubTopic,
cursor = qCursor,
@ -144,16 +142,13 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): ArchiveResult {.gcsafe.
let queryDuration = getTime().toUnixFloat() - queryStartTime
waku_archive_query_duration_seconds.observe(queryDuration)
# Build response
if queryRes.isErr():
return err(ArchiveError(kind: ArchiveErrorKind.DRIVER_ERROR, cause: queryRes.error))
let rows = queryRes.get()
var messages = newSeq[WakuMessage]()
var cursor = none(ArchiveCursor)
if rows.len == 0:
return ok(ArchiveResponse(messages: messages, cursor: cursor))
@ -187,28 +182,27 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): ArchiveResult {.gcsafe.
if not qAscendingOrder:
reverse(messages)
ok(ArchiveResponse(messages: messages, cursor: cursor))
return ok(ArchiveResponse(messages: messages, cursor: cursor))
# Retention policy
proc executeMessageRetentionPolicy*(w: WakuArchive) =
proc executeMessageRetentionPolicy*(w: WakuArchive) {.async.} =
if w.retentionPolicy.isNil():
return
if w.driver.isNil():
return
let retPolicyRes = w.retentionPolicy.execute(w.driver)
let retPolicyRes = await w.retentionPolicy.execute(w.driver)
if retPolicyRes.isErr():
waku_archive_errors.inc(labelValues = [retPolicyFailure])
error "failed execution of retention policy", error=retPolicyRes.error
proc reportStoredMessagesMetric*(w: WakuArchive) =
proc reportStoredMessagesMetric*(w: WakuArchive) {.async.} =
if w.driver.isNil():
return
let resCount = w.driver.getMessagesCount()
let resCount = await w.driver.getMessagesCount()
if resCount.isErr():
error "failed to get messages count", error=resCount.error
return

View File

@ -5,53 +5,58 @@ else:
import
std/options,
stew/results
stew/results,
chronos
import
../waku_core,
./common
const DefaultPageSize*: uint = 25
type
ArchiveDriverResult*[T] = Result[T, string]
ArchiveDriver* = ref object of RootObj
type ArchiveRow* = (PubsubTopic, WakuMessage, seq[byte], Timestamp)
# ArchiveDriver interface
method put*(driver: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): ArchiveDriverResult[void] {.base.} = discard
method put*(driver: ArchiveDriver,
pubsubTopic: PubsubTopic,
message: WakuMessage,
digest: MessageDigest,
receivedTime: Timestamp):
Future[ArchiveDriverResult[void]] {.base, async.} = discard
method getAllMessages*(driver: ArchiveDriver):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.base, async.} = discard
method getAllMessages*(driver: ArchiveDriver): ArchiveDriverResult[seq[ArchiveRow]] {.base.} = discard
method getMessages*(driver: ArchiveDriver,
contentTopic: seq[ContentTopic] = @[],
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = DefaultPageSize,
ascendingOrder = true):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.base, async.} = discard
method getMessages*(
driver: ArchiveDriver,
contentTopic: seq[ContentTopic] = @[],
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = DefaultPageSize,
ascendingOrder = true
): ArchiveDriverResult[seq[ArchiveRow]] {.base.} = discard
method getMessagesCount*(driver: ArchiveDriver):
Future[ArchiveDriverResult[int64]] {.base, async.} = discard
method getOldestMessageTimestamp*(driver: ArchiveDriver):
Future[ArchiveDriverResult[Timestamp]] {.base, async.} = discard
method getMessagesCount*(driver: ArchiveDriver): ArchiveDriverResult[int64] {.base.} = discard
method getNewestMessageTimestamp*(driver: ArchiveDriver):
Future[ArchiveDriverResult[Timestamp]] {.base, async.} = discard
method getOldestMessageTimestamp*(driver: ArchiveDriver): ArchiveDriverResult[Timestamp] {.base.} = discard
method deleteMessagesOlderThanTimestamp*(driver: ArchiveDriver,
ts: Timestamp):
Future[ArchiveDriverResult[void]] {.base, async.} = discard
method getNewestMessageTimestamp*(driver: ArchiveDriver): ArchiveDriverResult[Timestamp] {.base.} = discard
method deleteOldestMessagesNotWithinLimit*(driver: ArchiveDriver,
limit: int):
Future[ArchiveDriverResult[void]] {.base, async.} = discard
method deleteMessagesOlderThanTimestamp*(driver: ArchiveDriver, ts: Timestamp): ArchiveDriverResult[void] {.base.} = discard
method deleteOldestMessagesNotWithinLimit*(driver: ArchiveDriver, limit: int): ArchiveDriverResult[void] {.base.} = discard
method close*(driver: ArchiveDriver): ArchiveDriverResult[void] {.base.} =
ok()
method close*(driver: ArchiveDriver):
Future[ArchiveDriverResult[void]] {.base, async.} = discard

View File

@ -7,21 +7,19 @@ import
std/options,
stew/results,
stew/sorted_set,
chronicles
chronicles,
chronos
import
../../../waku_core,
../../common,
../../driver,
./index
logScope:
topics = "waku archive queue_store"
const QueueDriverDefaultMaxCapacity* = 25_000
type
IndexedWakuMessage = object
# TODO: may need to rename this object as it holds both the index and the pubsub topic of a waku message
@ -43,7 +41,6 @@ proc `$`(error: QueueDriverErrorKind): string =
of INVALID_CURSOR:
"invalid_cursor"
type QueueDriver* = ref object of ArchiveDriver
## Bounded repository for indexed messages
##
@ -59,7 +56,6 @@ type QueueDriver* = ref object of ArchiveDriver
items: SortedSet[Index, IndexedWakuMessage] # sorted set of stored messages
capacity: int # Maximum amount of messages to keep
### Helpers
proc walkToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage],
@ -82,14 +78,12 @@ proc walkToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage],
return nextItem
#### API
proc new*(T: type QueueDriver, capacity: int = QueueDriverDefaultMaxCapacity): T =
var items = SortedSet[Index, IndexedWakuMessage].init()
return QueueDriver(items: items, capacity: capacity)
proc contains*(driver: QueueDriver, index: Index): bool =
## Return `true` if the store queue already contains the `index`, `false` otherwise.
driver.items.eq(index).isOk()
@ -202,7 +196,6 @@ proc last*(driver: QueueDriver): ArchiveDriverResult[IndexedWakuMessage] =
return ok(res.value.data)
## --- Queue API ---
proc add*(driver: QueueDriver, msg: IndexedWakuMessage): ArchiveDriverResult[void] =
@ -231,27 +224,30 @@ proc add*(driver: QueueDriver, msg: IndexedWakuMessage): ArchiveDriverResult[voi
return ok()
method put*(driver: QueueDriver, pubsubTopic: PubsubTopic, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): ArchiveDriverResult[void] =
method put*(driver: QueueDriver,
pubsubTopic: PubsubTopic,
message: WakuMessage,
digest: MessageDigest,
receivedTime: Timestamp):
Future[ArchiveDriverResult[void]] {.async.} =
let index = Index(pubsubTopic: pubsubTopic, senderTime: message.timestamp, receiverTime: receivedTime, digest: digest)
let message = IndexedWakuMessage(msg: message, index: index, pubsubTopic: pubsubTopic)
driver.add(message)
return driver.add(message)
method getAllMessages*(driver: QueueDriver): ArchiveDriverResult[seq[ArchiveRow]] =
method getAllMessages*(driver: QueueDriver):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
# TODO: Implement this message_store method
err("interface method not implemented")
return err("interface method not implemented")
method getMessages*(
driver: QueueDriver,
contentTopic: seq[ContentTopic] = @[],
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = DefaultPageSize,
ascendingOrder = true
): ArchiveDriverResult[seq[ArchiveRow]] =
method getMessages*(driver: QueueDriver,
contentTopic: seq[ContentTopic] = @[],
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = DefaultPageSize,
ascendingOrder = true):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.}=
let cursor = cursor.map(toIndex)
let matchesQuery: QueryFilterMatcher = func(row: IndexedWakuMessage): bool =
@ -272,29 +268,38 @@ method getMessages*(
var pageRes: QueueDriverGetPageResult
try:
pageRes = driver.getPage(maxPageSize, ascendingOrder, cursor, matchesQuery)
except: # TODO: Fix "BareExcept" warning
except CatchableError, Exception:
return err(getCurrentExceptionMsg())
if pageRes.isErr():
return err($pageRes.error)
ok(pageRes.value)
return ok(pageRes.value)
method getMessagesCount*(driver: QueueDriver):
Future[ArchiveDriverResult[int64]] {.async} =
return ok(int64(driver.len()))
method getMessagesCount*(driver: QueueDriver): ArchiveDriverResult[int64] =
ok(int64(driver.len()))
method getOldestMessageTimestamp*(driver: QueueDriver):
Future[ArchiveDriverResult[Timestamp]] {.async.} =
return driver.first().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime)
method getOldestMessageTimestamp*(driver: QueueDriver): ArchiveDriverResult[Timestamp] =
driver.first().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime)
method getNewestMessageTimestamp*(driver: QueueDriver):
Future[ArchiveDriverResult[Timestamp]] {.async.} =
return driver.last().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime)
method getNewestMessageTimestamp*(driver: QueueDriver): ArchiveDriverResult[Timestamp] =
driver.last().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime)
method deleteMessagesOlderThanTimestamp*(driver: QueueDriver, ts: Timestamp): ArchiveDriverResult[void] =
method deleteMessagesOlderThanTimestamp*(driver: QueueDriver,
ts: Timestamp):
Future[ArchiveDriverResult[void]] {.async.} =
# TODO: Implement this message_store method
err("interface method not implemented")
return err("interface method not implemented")
method deleteOldestMessagesNotWithinLimit*(driver: QueueDriver, limit: int): ArchiveDriverResult[void] =
method deleteOldestMessagesNotWithinLimit*(driver: QueueDriver,
limit: int):
Future[ArchiveDriverResult[void]] {.async.} =
# TODO: Implement this message_store method
err("interface method not implemented")
return err("interface method not implemented")
method close*(driver: QueueDriver):
Future[ArchiveDriverResult[void]] {.async.} =
return ok()

View File

@ -8,7 +8,8 @@ else:
import
std/options,
stew/[byteutils, results],
chronicles
chronicles,
chronos
import
../../../../common/sqlite,
../../../waku_core,
@ -20,8 +21,6 @@ import
logScope:
topics = "waku archive sqlite"
proc init(db: SqliteDatabase): ArchiveDriverResult[void] =
## Misconfiguration can lead to nil DB
if db.isNil():
@ -43,7 +42,6 @@ proc init(db: SqliteDatabase): ArchiveDriverResult[void] =
ok()
type SqliteDriver* = ref object of ArchiveDriver
db: SqliteDatabase
insertStmt: SqliteStmt[InsertMessageParams, void]
@ -59,21 +57,13 @@ proc new*(T: type SqliteDriver, db: SqliteDatabase): ArchiveDriverResult[T] =
let insertStmt = db.prepareInsertMessageStmt()
ok(SqliteDriver(db: db, insertStmt: insertStmt))
method close*(s: SqliteDriver): ArchiveDriverResult[void] =
## Close the database connection
# Dispose statements
s.insertStmt.dispose()
# Close connection
s.db.close()
ok()
method put*(s: SqliteDriver, pubsubTopic: PubsubTopic, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): ArchiveDriverResult[void] =
method put*(s: SqliteDriver,
pubsubTopic: PubsubTopic,
message: WakuMessage,
digest: MessageDigest,
receivedTime: Timestamp):
Future[ArchiveDriverResult[void]] {.async.} =
## Inserts a message into the store
let res = s.insertStmt.exec((
@(digest.data), # id
receivedTime, # storedAt
@ -83,30 +73,27 @@ method put*(s: SqliteDriver, pubsubTopic: PubsubTopic, message: WakuMessage, dig
int64(message.version), # version
message.timestamp # senderTimestamp
))
if res.isErr():
return err("message insert failed: " & res.error)
ok()
return res
method getAllMessages*(s: SqliteDriver): ArchiveDriverResult[seq[ArchiveRow]] =
method getAllMessages*(s: SqliteDriver):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## Retrieve all messages from the store.
s.db.selectAllMessages()
return s.db.selectAllMessages()
method getMessages*(s: SqliteDriver,
contentTopic: seq[ContentTopic] = @[],
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = DefaultPageSize,
ascendingOrder = true):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
method getMessages*(
s: SqliteDriver,
contentTopic: seq[ContentTopic] = @[],
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = DefaultPageSize,
ascendingOrder = true
): ArchiveDriverResult[seq[ArchiveRow]] =
let cursor = cursor.map(toDbCursor)
let rows = ?s.db.selectMessagesByHistoryQueryWithLimit(
let rowsRes = s.db.selectMessagesByHistoryQueryWithLimit(
contentTopic,
pubsubTopic,
cursor,
@ -116,21 +103,35 @@ method getMessages*(
ascending=ascendingOrder
)
ok(rows)
return rowsRes
method getMessagesCount*(s: SqliteDriver):
Future[ArchiveDriverResult[int64]] {.async.} =
return s.db.getMessageCount()
method getMessagesCount*(s: SqliteDriver): ArchiveDriverResult[int64] =
s.db.getMessageCount()
method getOldestMessageTimestamp*(s: SqliteDriver):
Future[ArchiveDriverResult[Timestamp]] {.async.} =
return s.db.selectOldestReceiverTimestamp()
method getOldestMessageTimestamp*(s: SqliteDriver): ArchiveDriverResult[Timestamp] =
s.db.selectOldestReceiverTimestamp()
method getNewestMessageTimestamp*(s: SqliteDriver):
Future[ArchiveDriverResult[Timestamp]] {.async.} =
return s.db.selectnewestReceiverTimestamp()
method getNewestMessageTimestamp*(s: SqliteDriver): ArchiveDriverResult[Timestamp] =
s.db.selectnewestReceiverTimestamp()
method deleteMessagesOlderThanTimestamp*(s: SqliteDriver,
ts: Timestamp):
Future[ArchiveDriverResult[void]] {.async.} =
return s.db.deleteMessagesOlderThanTimestamp(ts)
method deleteOldestMessagesNotWithinLimit*(s: SqliteDriver,
limit: int):
Future[ArchiveDriverResult[void]] {.async.} =
return s.db.deleteOldestMessagesNotWithinLimit(limit)
method deleteMessagesOlderThanTimestamp*(s: SqliteDriver, ts: Timestamp): ArchiveDriverResult[void] =
s.db.deleteMessagesOlderThanTimestamp(ts)
method deleteOldestMessagesNotWithinLimit*(s: SqliteDriver, limit: int): ArchiveDriverResult[void] =
s.db.deleteOldestMessagesNotWithinLimit(limit)
method close*(s: SqliteDriver):
Future[ArchiveDriverResult[void]] {.async.} =
## Close the database connection
# Dispose statements
s.insertStmt.dispose()
# Close connection
s.db.close()
return ok()

View File

@ -4,7 +4,8 @@ else:
{.push raises: [].}
import
stew/results
stew/results,
chronos
import
./driver
@ -12,5 +13,5 @@ type RetentionPolicyResult*[T] = Result[T, string]
type RetentionPolicy* = ref object of RootObj
method execute*(p: RetentionPolicy, store: ArchiveDriver): RetentionPolicyResult[void] {.base.} = discard
method execute*(p: RetentionPolicy, store: ArchiveDriver):
Future[RetentionPolicyResult[void]] {.base, async.} = discard

View File

@ -5,7 +5,8 @@ else:
import
stew/results,
chronicles
chronicles,
chronos
import
../driver,
../retention_policy
@ -13,7 +14,6 @@ import
logScope:
topics = "waku archive retention_policy"
const DefaultCapacity*: int = 25_000
const MaxOverflow = 1.3
@ -38,7 +38,6 @@ type
totalCapacity: int # = capacity * MaxOverflow
deleteWindow: int # = capacity * (MaxOverflow - 1) / 2; half of the overflow window, the amount of messages deleted when overflow occurs
proc calculateTotalCapacity(capacity: int, overflow: float): int =
int(float(capacity) * overflow)
@ -48,7 +47,6 @@ proc calculateOverflowWindow(capacity: int, overflow: float): int =
proc calculateDeleteWindow(capacity: int, overflow: float): int =
calculateOverflowWindow(capacity, overflow) div 2
proc init*(T: type CapacityRetentionPolicy, capacity=DefaultCapacity): T =
let
totalCapacity = calculateTotalCapacity(capacity, MaxOverflow)
@ -60,14 +58,21 @@ proc init*(T: type CapacityRetentionPolicy, capacity=DefaultCapacity): T =
deleteWindow: deleteWindow
)
method execute*(p: CapacityRetentionPolicy, driver: ArchiveDriver): RetentionPolicyResult[void] =
let numMessages = ?driver.getMessagesCount().mapErr(proc(err: string): string = "failed to get messages count: " & err)
method execute*(p: CapacityRetentionPolicy,
driver: ArchiveDriver):
Future[RetentionPolicyResult[void]] {.async.} =
let numMessagesRes = await driver.getMessagesCount()
if numMessagesRes.isErr():
return err("failed to get messages count: " & numMessagesRes.error)
let numMessages = numMessagesRes.value
if numMessages < p.totalCapacity:
return ok()
let res = driver.deleteOldestMessagesNotWithinLimit(limit=p.capacity + p.deleteWindow)
let res = await driver.deleteOldestMessagesNotWithinLimit(limit=p.capacity + p.deleteWindow)
if res.isErr():
return err("deleting oldest messages failed: " & res.error())
return err("deleting oldest messages failed: " & res.error)
ok()
return ok()

View File

@ -29,21 +29,24 @@ proc init*(T: type TimeRetentionPolicy, retentionTime=DefaultRetentionTime): T =
retentionTime: retentionTime.seconds
)
method execute*(p: TimeRetentionPolicy, driver: ArchiveDriver): RetentionPolicyResult[void] =
method execute*(p: TimeRetentionPolicy,
driver: ArchiveDriver):
Future[RetentionPolicyResult[void]] {.async.} =
## Delete messages that exceed the retention time by 10% and more (batch delete for efficiency)
let oldestReceiverTimestamp = ?driver.getOldestMessageTimestamp().mapErr(proc(err: string): string = "failed to get oldest message timestamp: " & err)
let omtRes = await driver.getOldestMessageTimestamp()
if omtRes.isErr():
return err("failed to get oldest message timestamp: " & omtRes.error)
let now = getNanosecondTime(getTime().toUnixFloat())
let retentionTimestamp = now - p.retentionTime.nanoseconds
let thresholdTimestamp = retentionTimestamp - p.retentionTime.nanoseconds div 10
if thresholdTimestamp <= oldestReceiverTimestamp:
if thresholdTimestamp <= omtRes.value:
return ok()
let res = driver.deleteMessagesOlderThanTimestamp(ts=retentionTimestamp)
let res = await driver.deleteMessagesOlderThanTimestamp(ts=retentionTimestamp)
if res.isErr():
return err("failed to delete oldest messages: " & res.error())
return err("failed to delete oldest messages: " & res.error)
ok()
return ok()

View File

@ -30,7 +30,7 @@ const Defaultstring = "/waku/2/default-waku/proto"
### Client, filter subscripton manager
type FilterPushHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.}
type FilterPushHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.async, gcsafe, closure.}
## Subscription manager
@ -59,7 +59,7 @@ proc notifySubscriptionHandler(m: SubscriptionManager, pubsubTopic: PubsubTopic,
try:
let handler = m.subscriptions[(pubsubTopic, contentTopic)]
handler(pubsubTopic, message)
asyncSpawn handler(pubsubTopic, message)
except: # TODO: Fix "BareExcept" warning
discard

View File

@ -34,7 +34,7 @@ const
MaxMessageTimestampVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift"
type HistoryQueryHandler* = proc(req: HistoryQuery): HistoryResult {.gcsafe.}
type HistoryQueryHandler* = proc(req: HistoryQuery): Future[HistoryResult] {.async, gcsafe.}
type
WakuStore* = ref object of LPProtocol
@ -74,7 +74,7 @@ proc initProtocolHandler(ws: WakuStore) =
var responseRes: HistoryResult
try:
responseRes = ws.queryHandler(request)
responseRes = await ws.queryHandler(request)
except Exception:
error "history query failed", peerId= $conn.peerId, requestId=requestId, error=getCurrentExceptionMsg()