feat: store v3 return pubsub topics (#2676)

This commit is contained in:
Simon-Pierre Vivier 2024-05-08 15:35:56 -04:00 committed by GitHub
parent 6a1af92276
commit d700006a6a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 71 additions and 22 deletions

View File

@ -60,7 +60,9 @@ suite "Waku Store - End to End - Sorted Archive":
]
archiveMessages = messages.mapIt(
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
messageHash: computeMessageHash(pubsubTopic, it),
message: some(it),
pubsubTopic: some(pubsubTopic),
)
)
@ -337,7 +339,9 @@ suite "Waku Store - End to End - Sorted Archive":
archiveMessages &
extraMessages.mapIt(
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
messageHash: computeMessageHash(pubsubTopic, it),
message: some(it),
pubsubTopic: some(pubsubTopic),
)
)
@ -392,7 +396,9 @@ suite "Waku Store - End to End - Sorted Archive":
archiveMessages &
extraMessages.mapIt(
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
messageHash: computeMessageHash(pubsubTopic, it),
message: some(it),
pubsubTopic: some(pubsubTopic),
)
)
@ -558,7 +564,9 @@ suite "Waku Store - End to End - Unsorted Archive":
]
unsortedArchiveMessages = messages.mapIt(
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
messageHash: computeMessageHash(pubsubTopic, it),
message: some(it),
pubsubTopic: some(pubsubTopic),
)
)
@ -773,7 +781,9 @@ suite "Waku Store - End to End - Unsorted Archive without provided Timestamp":
]
unsortedArchiveMessages = messages.mapIt(
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
messageHash: computeMessageHash(pubsubTopic, it),
message: some(it),
pubsubTopic: some(pubsubTopic),
)
)
@ -915,7 +925,9 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
archiveMessages = messages.mapIt(
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
messageHash: computeMessageHash(pubsubTopic, it),
message: some(it),
pubsubTopic: some(pubsubTopic),
)
)
@ -923,6 +935,8 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
archiveMessages[i].messagehash =
computeMessageHash(pubsubTopicB, archiveMessages[i].message.get())
archiveMessages[i].pubsubTopic = some(pubsubTopicB)
let
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()
@ -1274,7 +1288,9 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
let voluminousArchiveMessages = messages.mapIt(
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
messageHash: computeMessageHash(pubsubTopic, it),
message: some(it),
pubsubTopic: some(pubsubTopic),
)
)

View File

@ -38,9 +38,21 @@ suite "Store Client":
hash3 = computeMessageHash(DefaultPubsubTopic, message3)
messageSeq =
@[
WakuMessageKeyValue(messageHash: hash1, message: some(message1)),
WakuMessageKeyValue(messageHash: hash2, message: some(message2)),
WakuMessageKeyValue(messageHash: hash3, message: some(message3)),
WakuMessageKeyValue(
messageHash: hash1,
message: some(message1),
pubsubTopic: some(DefaultPubsubTopic),
),
WakuMessageKeyValue(
messageHash: hash2,
message: some(message2),
pubsubTopic: some(DefaultPubsubTopic),
),
WakuMessageKeyValue(
messageHash: hash3,
message: some(message3),
pubsubTopic: some(DefaultPubsubTopic),
),
]
handlerFuture = newHistoryFuture()
handler = proc(req: StoreQueryRequest): Future[StoreQueryResult] {.async, gcsafe.} =

View File

@ -58,7 +58,9 @@ procSuite "Waku Store - RPC codec":
let
message = fakeWakuMessage()
hash = computeMessageHash(DefaultPubsubTopic, message)
keyValue = WakuMessageKeyValue(messageHash: hash, message: some(message))
keyValue = WakuMessageKeyValue(
messageHash: hash, message: some(message), pubsubTopic: some(DefaultPubsubTopic)
)
res = StoreQueryResponse(
requestId: "1",
statusCode: 200,

View File

@ -29,7 +29,9 @@ suite "Waku Store - query handler":
let msg = fakeWakuMessage(contentTopic = DefaultContentTopic)
let hash = computeMessageHash(DefaultPubsubTopic, msg)
let kv = WakuMessageKeyValue(messageHash: hash, message: some(msg))
let kv = WakuMessageKeyValue(
messageHash: hash, message: some(msg), pubsubTopic: some(DefaultPubsubTopic)
)
var queryHandlerFut = newFuture[(StoreQueryRequest)]()

View File

@ -50,7 +50,9 @@ procSuite "WakuNode - Store":
let hashes = msgListA.mapIt(computeMessageHash(DefaultPubsubTopic, it))
let kvs = zip(hashes, msgListA).mapIt(
WakuMessageKeyValue(messageHash: it[0], message: some(it[1]))
WakuMessageKeyValue(
messageHash: it[0], message: some(it[1]), pubsubTopic: some(DefaultPubsubTopic)
)
)
let archiveA = block:
@ -276,7 +278,11 @@ procSuite "WakuNode - Store":
check:
response.messages.len == 1
response.messages[0] ==
WakuMessageKeyValue(messageHash: hash, message: some(message))
WakuMessageKeyValue(
messageHash: hash,
message: some(message),
pubsubTopic: some(DefaultPubSubTopic),
)
let (handledPubsubTopic, handledMsg) = filterFut.read()
check:

View File

@ -838,13 +838,13 @@ proc toStoreResult(res: ArchiveResult): StoreQueryResult =
for i in 0 ..< response.hashes.len:
let hash = response.hashes[i]
let kv =
store_common.WakuMessageKeyValue(messageHash: hash, message: none(WakuMessage))
let kv = store_common.WakuMessageKeyValue(messageHash: hash)
res.messages.add(kv)
for i in 0 ..< response.messages.len:
res.messages[i].message = some(response.messages[i])
res.messages[i].pubsubTopic = some(response.topics[i])
if response.cursor.isSome():
res.paginationCursor = some(response.cursor.get().hash)

View File

@ -169,6 +169,7 @@ proc findMessages*(
var hashes = newSeq[WakuMessageHash]()
var messages = newSeq[WakuMessage]()
var topics = newSeq[PubsubTopic]()
var cursor = none(ArchiveCursor)
if rows.len == 0:
@ -180,6 +181,7 @@ proc findMessages*(
#TODO once store v2 is removed, unzip instead of 2x map
#TODO once store v2 is removed, update driver to not return messages when not needed
if query.includeData:
topics = rows[0 ..< pageSize].mapIt(it[0])
messages = rows[0 ..< pageSize].mapIt(it[1])
hashes = rows[0 ..< pageSize].mapIt(it[4])
@ -206,10 +208,13 @@ proc findMessages*(
# All messages MUST be returned in chronological order
if not isAscendingOrder:
reverse(messages)
reverse(hashes)
reverse(messages)
reverse(topics)
return ok(ArchiveResponse(hashes: hashes, messages: messages, cursor: cursor))
return ok(
ArchiveResponse(hashes: hashes, messages: messages, topics: topics, cursor: cursor)
)
proc findMessagesV2*(
self: WakuArchive, query: ArchiveQuery

View File

@ -56,6 +56,7 @@ type
ArchiveResponse* = object
hashes*: seq[WakuMessageHash]
messages*: seq[WakuMessage]
topics*: seq[PubsubTopic]
cursor*: Option[ArchiveCursor]
ArchiveErrorKind* {.pure.} = enum

View File

@ -38,6 +38,7 @@ type
WakuMessageKeyValue* = object
messageHash*: WakuMessageHash
message*: Option[WakuMessage]
pubsubTopic*: Option[PubsubTopic]
StoreQueryResponse* = object
requestId*: string

View File

@ -125,8 +125,9 @@ proc encode*(keyValue: WakuMessageKeyValue): ProtoBuffer =
pb.write3(1, keyValue.messageHash)
if keyValue.message.isSome():
if keyValue.message.isSome() and keyValue.pubsubTopic.isSome():
pb.write3(2, keyValue.message.get().encode())
pb.write3(3, keyValue.pubsubTopic.get())
pb.finish3()
@ -164,10 +165,13 @@ proc decode*(
keyValue.messagehash = hash
var proto: ProtoBuffer
if not ?pb.getField(2, proto):
keyValue.message = none(WakuMessage)
else:
var topic: string
if ?pb.getField(2, proto) and ?pb.getField(3, topic):
keyValue.message = some(?WakuMessage.decode(proto.buffer))
keyValue.pubsubTopic = some(topic)
else:
keyValue.message = none(WakuMessage)
keyValue.pubsubTopic = none(string)
return ok(keyValue)