fix: store v3 validate cursor & remove messages (#2636)

This commit is contained in:
Simon-Pierre Vivier 2024-05-01 14:47:06 -04:00 committed by GitHub
parent 44703f2608
commit db72e2b823
21 changed files with 220 additions and 154 deletions

View File

@ -59,10 +59,13 @@ suite "Waku Store - End to End - Sorted Archive":
fakeWakuMessage(@[byte 09], ts = ts(90, timeOrigin)),
]
archiveMessages = messages.mapIt(
WakuMessageKeyValue(messageHash: computeMessageHash(pubsubTopic, it), message: it)
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
)
)
storeQuery = StoreQueryRequest(
includeData: true,
pubsubTopic: some(pubsubTopic),
contentTopics: contentTopicSeq,
paginationForward: PagingDirection.Forward,
@ -102,6 +105,7 @@ suite "Waku Store - End to End - Sorted Archive":
# Given the next query
var otherHistoryQuery = StoreQueryRequest(
includeData: true,
pubsubTopic: some(pubsubTopic),
contentTopics: contentTopicSeq,
paginationCursor: queryResponse.get().paginationCursor,
@ -130,6 +134,7 @@ suite "Waku Store - End to End - Sorted Archive":
# Given the next query
var nextHistoryQuery = StoreQueryRequest(
includeData: true,
paginationCursor: queryResponse.get().paginationCursor,
pubsubTopic: some(pubsubTopic),
contentTopics: contentTopicSeq,
@ -159,6 +164,7 @@ suite "Waku Store - End to End - Sorted Archive":
# Given the next query (2/5)
let historyQuery2 = StoreQueryRequest(
includeData: true,
paginationCursor: queryResponse1.get().paginationCursor,
pubsubTopic: some(pubsubTopic),
contentTopics: contentTopicSeq,
@ -175,6 +181,7 @@ suite "Waku Store - End to End - Sorted Archive":
# Given the next query (3/5)
let historyQuery3 = StoreQueryRequest(
includeData: true,
paginationCursor: queryResponse2.get().paginationCursor,
pubsubTopic: some(pubsubTopic),
contentTopics: contentTopicSeq,
@ -191,6 +198,7 @@ suite "Waku Store - End to End - Sorted Archive":
# Given the next query (4/5)
let historyQuery4 = StoreQueryRequest(
includeData: true,
paginationCursor: queryResponse3.get().paginationCursor,
pubsubTopic: some(pubsubTopic),
contentTopics: contentTopicSeq,
@ -207,6 +215,7 @@ suite "Waku Store - End to End - Sorted Archive":
# Given the next query (5/5)
let historyQuery5 = StoreQueryRequest(
includeData: true,
paginationCursor: queryResponse4.get().paginationCursor,
pubsubTopic: some(pubsubTopic),
contentTopics: contentTopicSeq,
@ -234,6 +243,7 @@ suite "Waku Store - End to End - Sorted Archive":
# Given the next query (2/2)
let historyQuery2 = StoreQueryRequest(
includeData: true,
paginationCursor: queryResponse1.get().paginationCursor,
pubsubTopic: some(pubsubTopic),
contentTopics: contentTopicSeq,
@ -272,6 +282,7 @@ suite "Waku Store - End to End - Sorted Archive":
# Given the next query (2/3)
let historyQuery2 = StoreQueryRequest(
includeData: true,
paginationCursor: queryResponse1.get().paginationCursor,
pubsubTopic: some(pubsubTopic),
contentTopics: contentTopicSeq,
@ -288,6 +299,7 @@ suite "Waku Store - End to End - Sorted Archive":
# Given the next query (3/3)
let historyQuery3 = StoreQueryRequest(
includeData: true,
paginationCursor: queryResponse2.get().paginationCursor,
pubsubTopic: some(pubsubTopic),
contentTopics: contentTopicSeq,
@ -310,7 +322,7 @@ suite "Waku Store - End to End - Sorted Archive":
let missingMessagesAmount = archive.DefaultPageSize - currentStoreLen + 5
let lastMessageTimestamp =
archiveMessages[archiveMessages.len - 1].message.timestamp
archiveMessages[archiveMessages.len - 1].message.get().timestamp
var extraMessages: seq[WakuMessage] = @[]
for i in 0 ..< missingMessagesAmount:
let
@ -325,7 +337,7 @@ suite "Waku Store - End to End - Sorted Archive":
archiveMessages &
extraMessages.mapIt(
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: it
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
)
)
@ -341,6 +353,7 @@ suite "Waku Store - End to End - Sorted Archive":
# Given the next query (2/2)
let historyQuery2 = StoreQueryRequest(
includeData: true,
paginationCursor: queryResponse1.get().paginationCursor,
pubsubTopic: some(pubsubTopic),
contentTopics: contentTopicSeq,
@ -364,7 +377,7 @@ suite "Waku Store - End to End - Sorted Archive":
let missingMessagesAmount = archive.DefaultPageSize - currentStoreLen + 5
let lastMessageTimestamp =
archiveMessages[archiveMessages.len - 1].message.timestamp
archiveMessages[archiveMessages.len - 1].message.get().timestamp
var extraMessages: seq[WakuMessage] = @[]
for i in 0 ..< missingMessagesAmount:
let
@ -379,12 +392,13 @@ suite "Waku Store - End to End - Sorted Archive":
archiveMessages &
extraMessages.mapIt(
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: it
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
)
)
# Given a query with default page size (1/2)
storeQuery = StoreQueryRequest(
includeData: true,
pubsubTopic: some(pubsubTopic),
contentTopics: contentTopicSeq,
paginationForward: PagingDirection.FORWARD,
@ -399,6 +413,7 @@ suite "Waku Store - End to End - Sorted Archive":
# Given the next query (2/2)
let historyQuery2 = StoreQueryRequest(
includeData: true,
paginationCursor: queryResponse.get().paginationCursor,
pubsubTopic: some(pubsubTopic),
contentTopics: contentTopicSeq,
@ -457,8 +472,9 @@ suite "Waku Store - End to End - Sorted Archive":
asyncTest "Cursor Reusability Across Nodes":
# Given a different server node with the same archive
let
otherArchiveDriverWithMessages =
newArchiveDriverWithMessages(pubsubTopic, archiveMessages.mapIt(it.message))
otherArchiveDriverWithMessages = newArchiveDriverWithMessages(
pubsubTopic, archiveMessages.mapIt(it.message.get())
)
otherServerKey = generateSecp256k1Key()
otherServer =
newTestWakuNode(otherServerKey, ValidIpAddress.init("0.0.0.0"), Port(0))
@ -483,6 +499,7 @@ suite "Waku Store - End to End - Sorted Archive":
# When making a history query to the second server node
let otherHistoryQuery = StoreQueryRequest(
includeData: true,
paginationCursor: paginationCursor,
pubsubTopic: some(pubsubTopic),
contentTopics: contentTopicSeq,
@ -518,6 +535,7 @@ suite "Waku Store - End to End - Unsorted Archive":
contentTopicSeq = @[contentTopic]
storeQuery = StoreQueryRequest(
includeData: true,
pubsubTopic: some(pubsubTopic),
contentTopics: contentTopicSeq,
paginationForward: PagingDirection.FORWARD,
@ -539,7 +557,9 @@ suite "Waku Store - End to End - Unsorted Archive":
fakeWakuMessage(@[byte 05], ts = ts(20, timeOrigin)),
]
unsortedArchiveMessages = messages.mapIt(
WakuMessageKeyValue(messageHash: computeMessageHash(pubsubTopic, it), message: it)
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
)
)
let
@ -575,17 +595,17 @@ suite "Waku Store - End to End - Unsorted Archive":
check:
queryResponse.get().messages.len == 5
queryResponse.get().messages[0].message.timestamp ==
queryResponse.get().messages[1].message.timestamp
queryResponse.get().messages[0].message.get().timestamp ==
queryResponse.get().messages[1].message.get().timestamp
queryResponse.get().messages[1].message.timestamp ==
queryResponse.get().messages[2].message.timestamp
queryResponse.get().messages[1].message.get().timestamp ==
queryResponse.get().messages[2].message.get().timestamp
queryResponse.get().messages[2].message.timestamp <
queryResponse.get().messages[3].message.timestamp
queryResponse.get().messages[2].message.get().timestamp <
queryResponse.get().messages[3].message.get().timestamp
queryResponse.get().messages[3].message.timestamp ==
queryResponse.get().messages[4].message.timestamp
queryResponse.get().messages[3].message.get().timestamp ==
queryResponse.get().messages[4].message.get().timestamp
toHex(queryResponse.get().messages[0].messageHash) <
toHex(queryResponse.get().messages[1].messageHash)
@ -598,6 +618,7 @@ suite "Waku Store - End to End - Unsorted Archive":
# Given the next query
var historyQuery2 = StoreQueryRequest(
includeData: true,
paginationCursor: queryResponse.get().paginationCursor,
pubsubTopic: some(pubsubTopic),
contentTopics: contentTopicSeq,
@ -610,17 +631,17 @@ suite "Waku Store - End to End - Unsorted Archive":
# Check the ordering
check:
queryResponse2.get().messages[0].message.timestamp <
queryResponse2.get().messages[1].message.timestamp
queryResponse2.get().messages[0].message.get().timestamp <
queryResponse2.get().messages[1].message.get().timestamp
queryResponse2.get().messages[1].message.timestamp ==
queryResponse2.get().messages[2].message.timestamp
queryResponse2.get().messages[1].message.get().timestamp ==
queryResponse2.get().messages[2].message.get().timestamp
queryResponse2.get().messages[2].message.timestamp ==
queryResponse2.get().messages[3].message.timestamp
queryResponse2.get().messages[2].message.get().timestamp ==
queryResponse2.get().messages[3].message.get().timestamp
queryResponse2.get().messages[3].message.timestamp ==
queryResponse2.get().messages[4].message.timestamp
queryResponse2.get().messages[3].message.get().timestamp ==
queryResponse2.get().messages[4].message.get().timestamp
toHex(queryResponse2.get().messages[1].messageHash) <
toHex(queryResponse2.get().messages[2].messageHash)
@ -651,11 +672,11 @@ suite "Waku Store - End to End - Unsorted Archive":
check:
queryResponse.get().messages.len == 3
queryResponse.get().messages[0].message.timestamp ==
queryResponse.get().messages[1].message.timestamp
queryResponse.get().messages[0].message.get().timestamp ==
queryResponse.get().messages[1].message.get().timestamp
queryResponse.get().messages[1].message.timestamp ==
queryResponse.get().messages[2].message.timestamp
queryResponse.get().messages[1].message.get().timestamp ==
queryResponse.get().messages[2].message.get().timestamp
toHex(queryResponse.get().messages[0].messageHash) <
toHex(queryResponse.get().messages[1].messageHash)
@ -684,20 +705,20 @@ suite "Waku Store - End to End - Unsorted Archive":
check:
queryResponse.get().messages.len == 6
queryResponse.get().messages[0].message.timestamp ==
queryResponse.get().messages[1].message.timestamp
queryResponse.get().messages[0].message.get().timestamp ==
queryResponse.get().messages[1].message.get().timestamp
queryResponse.get().messages[1].message.timestamp <
queryResponse.get().messages[2].message.timestamp
queryResponse.get().messages[1].message.get().timestamp <
queryResponse.get().messages[2].message.get().timestamp
queryResponse.get().messages[2].message.timestamp ==
queryResponse.get().messages[3].message.timestamp
queryResponse.get().messages[2].message.get().timestamp ==
queryResponse.get().messages[3].message.get().timestamp
queryResponse.get().messages[3].message.timestamp ==
queryResponse.get().messages[4].message.timestamp
queryResponse.get().messages[3].message.get().timestamp ==
queryResponse.get().messages[4].message.get().timestamp
queryResponse.get().messages[4].message.timestamp ==
queryResponse.get().messages[5].message.timestamp
queryResponse.get().messages[4].message.get().timestamp ==
queryResponse.get().messages[5].message.get().timestamp
toHex(queryResponse.get().messages[0].messageHash) <
toHex(queryResponse.get().messages[1].messageHash)
@ -730,6 +751,7 @@ suite "Waku Store - End to End - Unsorted Archive without provided Timestamp":
contentTopicSeq = @[contentTopic]
storeQuery = StoreQueryRequest(
includeData: true,
pubsubTopic: some(pubsubTopic),
contentTopics: contentTopicSeq,
paginationForward: PagingDirection.FORWARD,
@ -750,7 +772,9 @@ suite "Waku Store - End to End - Unsorted Archive without provided Timestamp":
fakeWakuMessage(@[byte 08]),
]
unsortedArchiveMessages = messages.mapIt(
WakuMessageKeyValue(messageHash: computeMessageHash(pubsubTopic, it), message: it)
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
)
)
let
@ -785,20 +809,21 @@ suite "Waku Store - End to End - Unsorted Archive without provided Timestamp":
check:
queryResponse.get().messages.len == 5
queryResponse.get().messages[0].message.timestamp <=
queryResponse.get().messages[1].message.timestamp
queryResponse.get().messages[0].message.get().timestamp <=
queryResponse.get().messages[1].message.get().timestamp
queryResponse.get().messages[1].message.timestamp <=
queryResponse.get().messages[2].message.timestamp
queryResponse.get().messages[1].message.get().timestamp <=
queryResponse.get().messages[2].message.get().timestamp
queryResponse.get().messages[2].message.timestamp <=
queryResponse.get().messages[3].message.timestamp
queryResponse.get().messages[2].message.get().timestamp <=
queryResponse.get().messages[3].message.get().timestamp
queryResponse.get().messages[3].message.timestamp <=
queryResponse.get().messages[4].message.timestamp
queryResponse.get().messages[3].message.get().timestamp <=
queryResponse.get().messages[4].message.get().timestamp
# Given the next query
var historyQuery2 = StoreQueryRequest(
includeData: true,
paginationCursor: queryResponse.get().paginationCursor,
pubsubTopic: some(pubsubTopic),
contentTopics: contentTopicSeq,
@ -820,17 +845,17 @@ suite "Waku Store - End to End - Unsorted Archive without provided Timestamp":
queryResponse2.get().messages.len == 5
queryResponse2.get().messages[0].message.timestamp <=
queryResponse2.get().messages[1].message.timestamp
queryResponse2.get().messages[0].message.get().timestamp <=
queryResponse2.get().messages[1].message.get().timestamp
queryResponse2.get().messages[1].message.timestamp <=
queryResponse2.get().messages[2].message.timestamp
queryResponse2.get().messages[1].message.get().timestamp <=
queryResponse2.get().messages[2].message.get().timestamp
queryResponse2.get().messages[2].message.timestamp <=
queryResponse2.get().messages[3].message.timestamp
queryResponse2.get().messages[2].message.get().timestamp <=
queryResponse2.get().messages[3].message.get().timestamp
queryResponse2.get().messages[3].message.timestamp <=
queryResponse2.get().messages[4].message.timestamp
queryResponse2.get().messages[3].message.get().timestamp <=
queryResponse2.get().messages[4].message.get().timestamp
suite "Waku Store - End to End - Archive with Multiple Topics":
var pubsubTopic {.threadvar.}: PubsubTopic
@ -861,6 +886,7 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
@[contentTopic, contentTopicB, contentTopicC, contentTopicSpecials]
storeQuery = StoreQueryRequest(
includeData: true,
pubsubTopic: some(pubsubTopic),
contentTopics: contentTopicSeq,
paginationForward: PagingDirection.FORWARD,
@ -888,12 +914,14 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
]
archiveMessages = messages.mapIt(
WakuMessageKeyValue(messageHash: computeMessageHash(pubsubTopic, it), message: it)
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
)
)
for i in 6 ..< 10:
archiveMessages[i].messagehash =
computeMessageHash(pubsubTopicB, archiveMessages[i].message)
computeMessageHash(pubsubTopicB, archiveMessages[i].message.get())
let
serverKey = generateSecp256k1Key()
@ -961,6 +989,7 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
# Given the next query
let historyQuery2 = StoreQueryRequest(
includeData: true,
paginationCursor: queryResponse.get().paginationCursor,
pubsubTopic: none(PubsubTopic),
contentTopics: contentTopicSeq,
@ -1028,6 +1057,7 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
# Given the next query
let historyQuery2 = StoreQueryRequest(
includeData: true,
paginationCursor: queryResponse.get().paginationCursor,
pubsubTopic: none(PubsubTopic),
contentTopics: contentTopicSeq,
@ -1244,7 +1274,7 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
let voluminousArchiveMessages = messages.mapIt(
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: it
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
)
)

View File

@ -183,7 +183,7 @@ procSuite "Waku Archive - find messages":
waitFor archive.handleMessage("foo", msg2)
## Given
let req = ArchiveQuery(contentTopics: @[topic])
let req = ArchiveQuery(includeData: true, contentTopics: @[topic])
## When
let queryRes = waitFor archive.findMessages(req)
@ -218,7 +218,7 @@ procSuite "Waku Archive - find messages":
waitFor archive.handleMessage("foo", msg3)
## Given
let req = ArchiveQuery(contentTopics: @[topic1, topic3])
let req = ArchiveQuery(includeData: true, contentTopics: @[topic1, topic3])
## When
let queryRes = waitFor archive.findMessages(req)
@ -283,7 +283,9 @@ procSuite "Waku Archive - find messages":
## Given
# This query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3)
let req = ArchiveQuery(
pubsubTopic: some(pubsubTopic1), contentTopics: @[contentTopic1, contentTopic3]
includeData: true,
pubsubTopic: some(pubsubTopic1),
contentTopics: @[contentTopic1, contentTopic3],
)
## When
@ -349,7 +351,7 @@ procSuite "Waku Archive - find messages":
waitFor archive.handleMessage(pubsubTopic, msg3)
## Given
let req = ArchiveQuery(pubsubTopic: some(pubsubTopic))
let req = ArchiveQuery(includeData: true, pubsubTopic: some(pubsubTopic))
## When
let res = waitFor archive.findMessages(req)
@ -367,7 +369,8 @@ procSuite "Waku Archive - find messages":
test "handle query with forward pagination":
## Given
let req = ArchiveQuery(pageSize: 4, direction: PagingDirection.FORWARD)
let req =
ArchiveQuery(includeData: true, pageSize: 4, direction: PagingDirection.FORWARD)
## When
var nextReq = req # copy
@ -400,7 +403,8 @@ procSuite "Waku Archive - find messages":
test "handle query with backward pagination":
## Given
let req = ArchiveQuery(pageSize: 4, direction: PagingDirection.BACKWARD)
let req =
ArchiveQuery(includeData: true, pageSize: 4, direction: PagingDirection.BACKWARD)
## When
var nextReq = req # copy
@ -463,7 +467,7 @@ procSuite "Waku Archive - find messages":
).isOk()
## Given
let req = ArchiveQuery(contentTopics: @[DefaultContentTopic])
let req = ArchiveQuery(includeData: true, contentTopics: @[DefaultContentTopic])
## When
let res = waitFor archive.findMessages(req)
@ -482,6 +486,7 @@ procSuite "Waku Archive - find messages":
test "handle temporal history query with a valid time window":
## Given
let req = ArchiveQuery(
includeData: true,
contentTopics: @[ContentTopic("1")],
startTime: some(ts(15, timeOrigin)),
endTime: some(ts(55, timeOrigin)),

View File

@ -38,9 +38,9 @@ suite "Store Client":
hash3 = computeMessageHash(DefaultPubsubTopic, message3)
messageSeq =
@[
WakuMessageKeyValue(messageHash: hash1, message: message1),
WakuMessageKeyValue(messageHash: hash2, message: message2),
WakuMessageKeyValue(messageHash: hash3, message: message3),
WakuMessageKeyValue(messageHash: hash1, message: some(message1)),
WakuMessageKeyValue(messageHash: hash2, message: some(message2)),
WakuMessageKeyValue(messageHash: hash3, message: some(message3)),
]
handlerFuture = newHistoryFuture()
handler = proc(req: StoreQueryRequest): Future[StoreQueryResult] {.async, gcsafe.} =

View File

@ -14,7 +14,7 @@ procSuite "Waku Store - RPC codec":
## Given
let query = StoreQueryRequest(
requestId: "0",
includeData: false,
includeData: true,
pubsubTopic: some(DefaultPubsubTopic),
contentTopics: @[DefaultContentTopic],
startTime: some(Timestamp(10)),
@ -58,7 +58,7 @@ procSuite "Waku Store - RPC codec":
let
message = fakeWakuMessage()
hash = computeMessageHash(DefaultPubsubTopic, message)
keyValue = WakuMessageKeyValue(messageHash: hash, message: message)
keyValue = WakuMessageKeyValue(messageHash: hash, message: some(message))
res = StoreQueryResponse(
requestId: "1",
statusCode: 200,

View File

@ -29,7 +29,7 @@ suite "Waku Store - query handler":
let msg = fakeWakuMessage(contentTopic = DefaultContentTopic)
let hash = computeMessageHash(DefaultPubsubTopic, msg)
let kv = WakuMessageKeyValue(messageHash: hash, message: msg)
let kv = WakuMessageKeyValue(messageHash: hash, message: some(msg))
var queryHandlerFut = newFuture[(StoreQueryRequest)]()

View File

@ -49,18 +49,19 @@ procSuite "WakuNode - Store":
let hashes = msgListA.mapIt(computeMessageHash(DefaultPubsubTopic, it))
let kvs =
zip(hashes, msgListA).mapIt(WakuMessageKeyValue(messageHash: it[0], message: it[1]))
let kvs = zip(hashes, msgListA).mapIt(
WakuMessageKeyValue(messageHash: it[0], message: some(it[1]))
)
let archiveA = block:
let driver = newSqliteArchiveDriver()
for kv in kvs:
let msg_digest = computeDigest(kv.message)
let message = kv.message.get()
let msg_digest = computeDigest(message)
require (
waitFor driver.put(
DefaultPubsubTopic, kv.message, msg_digest, kv.messageHash,
kv.message.timestamp,
DefaultPubsubTopic, message, msg_digest, kv.messageHash, message.timestamp
)
).isOk()
@ -84,7 +85,8 @@ procSuite "WakuNode - Store":
client.mountStoreClient()
## Given
let req = StoreQueryRequest(contentTopics: @[DefaultContentTopic])
let req =
StoreQueryRequest(includeData: true, contentTopics: @[DefaultContentTopic])
let serverPeer = server.peerInfo.toRemotePeerInfo()
## When
@ -119,6 +121,7 @@ procSuite "WakuNode - Store":
## Given
let req = StoreQueryRequest(
includeData: true,
contentTopics: @[DefaultContentTopic],
paginationForward: PagingDirection.FORWARD,
paginationLimit: some(uint64(7)),
@ -174,6 +177,7 @@ procSuite "WakuNode - Store":
## Given
let req = StoreQueryRequest(
includeData: true,
contentTopics: @[DefaultContentTopic],
paginationLimit: some(uint64(7)),
paginationForward: PagingDirection.BACKWARD,
@ -261,7 +265,8 @@ procSuite "WakuNode - Store":
# Wait for the server filter to receive the push message
require waitFor filterFut.withTimeout(5.seconds)
let req = StoreQueryRequest(contentTopics: @[DefaultContentTopic])
let req =
StoreQueryRequest(includeData: true, contentTopics: @[DefaultContentTopic])
let res = waitFor client.query(req, serverPeer)
## Then
@ -270,7 +275,8 @@ procSuite "WakuNode - Store":
let response = res.get()
check:
response.messages.len == 1
response.messages[0] == WakuMessageKeyValue(messageHash: hash, message: message)
response.messages[0] ==
WakuMessageKeyValue(messageHash: hash, message: some(message))
let (handledPubsubTopic, handledMsg) = filterFut.read()
check:
@ -341,7 +347,8 @@ procSuite "WakuNode - Store":
client.mountStoreClient()
## Given
let req = StoreQueryRequest(contentTopics: @[DefaultContentTopic])
let req =
StoreQueryRequest(includeData: true, contentTopics: @[DefaultContentTopic])
let serverPeer = server.peerInfo.toRemotePeerInfo()
let requestProc = proc() {.async.} =
@ -351,7 +358,7 @@ procSuite "WakuNode - Store":
let response = queryRes.get()
check:
response.messages.mapIt(it.message) == msgListA
response.messages.mapIt(it.message.get()) == msgListA
for count in 0 ..< 4:
waitFor requestProc()
@ -384,7 +391,8 @@ procSuite "WakuNode - Store":
client.mountStoreClient()
## Given
let req = StoreQueryRequest(contentTopics: @[DefaultContentTopic])
let req =
StoreQueryRequest(includeData: true, contentTopics: @[DefaultContentTopic])
let serverPeer = server.peerInfo.toRemotePeerInfo()
let successProc = proc() {.async.} =
@ -393,7 +401,7 @@ procSuite "WakuNode - Store":
check queryRes.isOk()
let response = queryRes.get()
check:
response.messages.mapIt(it.message) == msgListA
response.messages.mapIt(it.message.get()) == msgListA
let failsProc = proc() {.async.} =
let queryRes = waitFor client.query(req, peer = serverPeer)

View File

@ -1,7 +1,7 @@
{.used.}
import
std/[options, times],
std/[options, times, sugar],
stew/shims/net as stewNet,
chronicles,
testutils/unittests,
@ -224,9 +224,10 @@ procSuite "Waku Rest API - Store v3":
"7", # page size. Empty implies default page size.
)
var wakuMessages = newSeq[WakuMessage](0)
for j in 0 ..< response.data.messages.len:
wakuMessages.add(response.data.messages[j].message)
let wakuMessages = collect(newSeq):
for element in response.data.messages:
if element.message.isSome():
element.message.get()
pages[i] = wakuMessages
@ -620,15 +621,16 @@ procSuite "Waku Rest API - Store v3":
let client = newRestHttpClient(initTAddress(restAddress, restPort))
# Filtering by a known pubsub topic.
var response =
await client.getStoreMessagesV3(pubsubTopic = encodeUrl(DefaultPubsubTopic))
var response = await client.getStoreMessagesV3(
includeData = "true", pubsubTopic = encodeUrl(DefaultPubsubTopic)
)
check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.messages.len == 1
let storeMessage = response.data.messages[0].message
let storeMessage = response.data.messages[0].message.get()
check:
storeMessage.payload == msg.payload
@ -710,9 +712,10 @@ procSuite "Waku Rest API - Store v3":
"3", # page size. Empty implies default page size.
)
var wakuMessages = newSeq[WakuMessage](0)
for j in 0 ..< response.data.messages.len:
wakuMessages.add(response.data.messages[j].message)
let wakuMessages = collect(newSeq):
for element in response.data.messages:
if element.message.isSome():
element.message.get()
pages[i] = wakuMessages
@ -773,9 +776,10 @@ procSuite "Waku Rest API - Store v3":
response.status == 200
$response.contentType == $MIMETYPE_JSON
var wakuMessages = newSeq[WakuMessage](0)
for j in 0 ..< response.data.messages.len:
wakuMessages.add(response.data.messages[j].message)
let wakuMessages = collect(newSeq):
for element in response.data.messages:
if element.message.isSome():
element.message.get()
check wakuMessages == msgList[6 .. 9]

View File

@ -45,8 +45,6 @@ proc write3*(proto: var ProtoBuffer, field: int, value: auto) =
when value is Option:
if value.isSome():
proto.write(field, value.get())
elif value is bool:
proto.write(field, zint(value))
else:
proto.write(field, value)

View File

@ -808,6 +808,7 @@ when defined(waku_exp_store_resume):
proc toArchiveQuery(request: StoreQueryRequest): ArchiveQuery =
var query = ArchiveQuery()
query.includeData = request.includeData
query.pubsubTopic = request.pubsubTopic
query.contentTopics = request.contentTopics
query.startTime = request.startTime
@ -834,9 +835,17 @@ proc toStoreResult(res: ArchiveResult): StoreQueryResult =
res.statusCode = 200
res.statusDesc = "OK"
res.messages = response.hashes.zip(response.messages).mapIt(
WakuMessageKeyValue(messageHash: it[0], message: it[1])
)
for i in 0 ..< response.hashes.len:
let hash = response.hashes[i]
let kv =
store_common.WakuMessageKeyValue(messageHash: hash, message: none(WakuMessage))
res.messages.add(kv)
for i in 0 ..< response.messages.len:
res.messages[i].message = some(response.messages[i])
if response.cursor.isSome():
res.paginationCursor = some(response.cursor.get().hash)

View File

@ -186,7 +186,9 @@ proc writeValue*(
writer.beginRecord()
writer.writeField("message_hash", value.messageHash)
writer.writeField("message", value.message)
if value.message.isSome():
writer.writeField("message", value.message.get())
writer.endRecord()
@ -217,10 +219,7 @@ proc readValue*(
if messageHash.isNone():
reader.raiseUnexpectedValue("Field `message_hash` is missing")
if message.isNone():
reader.raiseUnexpectedValue("Field `message` is missing")
value = WakuMessageKeyValue(messageHash: messageHash.get(), message: message.get())
value = WakuMessageKeyValue(messageHash: messageHash.get(), message: message)
## StoreQueryResponse serde

View File

@ -144,10 +144,14 @@ proc findMessages*(
if query.contentTopics.len > 10:
return err(ArchiveError.invalidQuery("too many content topics"))
if query.cursor.isSome() and query.cursor.get().hash.len != 32:
return err(ArchiveError.invalidQuery("invalid cursor hash length"))
let queryStartTime = getTime().toUnixFloat()
let rows = (
await self.driver.getMessages(
includeData = query.includeData,
contentTopic = query.contentTopics,
pubsubTopic = query.pubsubTopic,
cursor = query.cursor,
@ -174,7 +178,10 @@ proc findMessages*(
let pageSize = min(rows.len, int(maxPageSize))
#TODO once store v2 is removed, unzip instead of 2x map
messages = rows[0 ..< pageSize].mapIt(it[1])
#TODO once store v2 is removed, update driver to not return messages when not needed
if query.includeData:
messages = rows[0 ..< pageSize].mapIt(it[1])
hashes = rows[0 ..< pageSize].mapIt(it[4])
## Cursor
@ -206,7 +213,7 @@ proc findMessages*(
proc findMessagesV2*(
self: WakuArchive, query: ArchiveQuery
): Future[ArchiveResult] {.async, gcsafe.} =
): Future[ArchiveResult] {.async, deprecated, gcsafe.} =
## Search the archive to return a single page of messages matching the query criteria
let maxPageSize =

View File

@ -43,6 +43,7 @@ type
hash*: WakuMessageHash
ArchiveQuery* = object
includeData*: bool # indicate if messages should be returned in addition to hashes.
pubsubTopic*: Option[PubsubTopic]
contentTopics*: seq[ContentTopic]
cursor*: Option[ArchiveCursor]

View File

@ -41,11 +41,12 @@ method getMessagesV2*(
endTime = none(Timestamp),
maxPageSize = DefaultPageSize,
ascendingOrder = true,
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.base, async.} =
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.base, deprecated, async.} =
discard
method getMessages*(
driver: ArchiveDriver,
includeData = false,
contentTopic = newSeq[ContentTopic](0),
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),

View File

@ -377,7 +377,7 @@ proc getMessagesV2ArbitraryQuery(
endTime = none(Timestamp),
maxPageSize = DefaultPageSize,
ascendingOrder = true,
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async, deprecated.} =
## This proc allows to handle atypical queries. We don't use prepared statements for those.
var query =
@ -521,7 +521,7 @@ proc getMessagesV2PreparedStmt(
endTime: Timestamp,
maxPageSize = DefaultPageSize,
ascOrder = true,
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async, deprecated.} =
## This proc aims to run the most typical queries in a more performant way, i.e. by means of
## prepared statements.
##
@ -591,6 +591,7 @@ proc getMessagesV2PreparedStmt(
method getMessages*(
s: PostgresDriver,
includeData = false,
contentTopicSeq = newSeq[ContentTopic](0),
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),
@ -631,7 +632,7 @@ method getMessagesV2*(
endTime = none(Timestamp),
maxPageSize = DefaultPageSize,
ascendingOrder = true,
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async, deprecated.} =
if contentTopicSeq.len == 1 and pubsubTopic.isSome() and startTime.isSome() and
endTime.isSome():
## Considered the most common query. Therefore, we use prepared statements to optimize it.

View File

@ -258,6 +258,7 @@ method existsTable*(
method getMessages*(
driver: QueueDriver,
includeData = false,
contentTopic: seq[ContentTopic] = @[],
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),

View File

@ -292,7 +292,7 @@ proc whereClausev2(
startTime: Option[Timestamp],
endTime: Option[Timestamp],
ascending: bool,
): Option[string] =
): Option[string] {.deprecated.} =
let cursorClause =
if cursor:
let comp = if ascending: ">" else: "<"
@ -336,7 +336,7 @@ proc whereClausev2(
proc selectMessagesWithLimitQueryv2(
table: string, where: Option[string], limit: uint, ascending = true, v3 = false
): SqlQueryStr =
): SqlQueryStr {.deprecated.} =
let order = if ascending: "ASC" else: "DESC"
var query: string
@ -369,7 +369,7 @@ proc execSelectMessagesV2WithLimitStmt(
startTime: Option[Timestamp],
endTime: Option[Timestamp],
onRowCallback: DataProc,
): DatabaseResult[void] =
): DatabaseResult[void] {.deprecated.} =
let s = RawStmtPtr(s)
# Bind params
@ -416,29 +416,6 @@ proc execSelectMessagesV2WithLimitStmt(
discard sqlite3_reset(s) # same return information as step
discard sqlite3_clear_bindings(s) # no errors possible
proc execSelectMessageByHash(
s: SqliteStmt, hash: WakuMessageHash, onRowCallback: DataProc
): DatabaseResult[void] =
let s = RawStmtPtr(s)
checkErr bindParam(s, 1, toSeq(hash))
try:
while true:
let v = sqlite3_step(s)
case v
of SQLITE_ROW:
onRowCallback(s)
of SQLITE_DONE:
return ok()
else:
return err($sqlite3_errstr(v))
finally:
# release implicit transaction
discard sqlite3_reset(s) # same return information as step
discard sqlite3_clear_bindings(s)
# no errors possible
proc selectMessagesByHistoryQueryWithLimit*(
db: SqliteDatabase,
contentTopic: seq[ContentTopic],
@ -450,7 +427,7 @@ proc selectMessagesByHistoryQueryWithLimit*(
ascending: bool,
): DatabaseResult[
seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)]
] =
] {.deprecated.} =
var messages: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)] =
@[]
@ -483,6 +460,28 @@ proc selectMessagesByHistoryQueryWithLimit*(
### Store v3 ###
proc execSelectMessageByHash(
s: SqliteStmt, hash: WakuMessageHash, onRowCallback: DataProc
): DatabaseResult[void] =
let s = RawStmtPtr(s)
checkErr bindParam(s, 1, toSeq(hash))
try:
while true:
let v = sqlite3_step(s)
case v
of SQLITE_ROW:
onRowCallback(s)
of SQLITE_DONE:
return ok()
else:
return err($sqlite3_errstr(v))
finally:
# release implicit transaction
discard sqlite3_reset(s) # same return information as step
discard sqlite3_clear_bindings(s) # no errors possible
proc selectMessageByHashQuery(): SqlQueryStr =
var query: string

View File

@ -92,7 +92,7 @@ method getMessagesV2*(
endTime = none(Timestamp),
maxPageSize = DefaultPageSize,
ascendingOrder = true,
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async, deprecated.} =
echo "here"
let cursor = cursor.map(toDbCursor)
@ -111,6 +111,7 @@ method getMessagesV2*(
method getMessages*(
s: SqliteDriver,
includeData = false,
contentTopic = newSeq[ContentTopic](0),
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),

View File

@ -18,7 +18,7 @@ proc encode*(message: WakuMessage): ProtoBuffer =
buf.write3(10, zint64(message.timestamp))
buf.write3(11, message.meta)
buf.write3(21, message.proof)
buf.write3(31, message.ephemeral)
buf.write3(31, uint32(message.ephemeral))
buf.finish3()
buf
@ -67,7 +67,7 @@ proc decode*(T: type WakuMessage, buffer: seq[byte]): ProtobufResult[T] =
else:
msg.proof = proof
var ephemeral: uint
var ephemeral: uint32
if not ?pb.getField(31, ephemeral):
msg.ephemeral = false
else:

View File

@ -37,7 +37,7 @@ type
WakuMessageKeyValue* = object
messageHash*: WakuMessageHash
message*: WakuMessage
message*: Option[WakuMessage]
StoreQueryResponse* = object
requestId*: string

View File

@ -49,11 +49,11 @@ proc handleQueryRequest*(
var res = StoreQueryResponse()
let req = StoreQueryRequest.decode(raw_request).valueOr:
error "failed to decode rpc", peerId = requestor
error "failed to decode rpc", peerId = requestor, error = $error
waku_store_errors.inc(labelValues = [decodeRpcFailure])
res.statusCode = uint32(ErrorCode.BAD_REQUEST)
res.statusDesc = "decode rpc failed"
res.statusDesc = "decoding rpc failed: " & $error
return res.encode().buffer
@ -82,10 +82,10 @@ proc handleQueryRequest*(
res = queryResult.valueOr:
error "store query failed",
peerId = requestor, requestId = requestId, error = queryResult.error
peerId = requestor, requestId = requestId, error = $error
res.statusCode = uint32(queryResult.error.kind)
res.statusDesc = $queryResult.error
res.statusCode = uint32(error.kind)
res.statusDesc = $error
return res.encode().buffer

View File

@ -14,7 +14,7 @@ proc encode*(req: StoreQueryRequest): ProtoBuffer =
var pb = initProtoBuffer()
pb.write3(1, req.requestId)
pb.write3(2, req.includeData)
pb.write3(2, uint32(req.includeData))
pb.write3(10, req.pubsubTopic)
@ -56,11 +56,11 @@ proc decode*(
if not ?pb.getField(1, req.requestId):
return err(ProtobufError.missingRequiredField("request_id"))
var inclData: uint
var inclData: uint32
if not ?pb.getField(2, inclData):
req.includeData = false
else:
req.includeData = inclData == 1
req.includeData = inclData > 0
var pubsubTopic: string
if not ?pb.getField(10, pubsubTopic):
@ -124,7 +124,9 @@ proc encode*(keyValue: WakuMessageKeyValue): ProtoBuffer =
var pb = initProtoBuffer()
pb.write3(1, keyValue.messageHash)
pb.write3(2, keyValue.message.encode())
if keyValue.message.isSome():
pb.write3(2, keyValue.message.get().encode())
pb.finish3()
@ -163,9 +165,9 @@ proc decode*(
var proto: ProtoBuffer
if not ?pb.getField(2, proto):
return err(ProtobufError.missingRequiredField("message"))
keyValue.message = none(WakuMessage)
else:
keyValue.message = ?WakuMessage.decode(proto.buffer)
keyValue.message = some(?WakuMessage.decode(proto.buffer))
return ok(keyValue)