fix: store v3 bug fixes (#2718)

This commit is contained in:
Simon-Pierre Vivier 2024-05-23 08:01:52 -04:00 committed by GitHub
parent ebe69be8e5
commit 4a6ec468db
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 220 additions and 51 deletions

View File

@ -645,6 +645,49 @@ suite "Postgres driver - queries":
check: check:
filteredMessages == expected[2 .. 3].reversed() filteredMessages == expected[2 .. 3].reversed()
asyncTest "only cursor - invalid":
## Given
const contentTopic = "test-content-topic"
var messages =
@[
fakeWakuMessage(@[byte 0], ts = ts(00)),
fakeWakuMessage(@[byte 1], ts = ts(10)),
fakeWakuMessage(@[byte 2], contentTopic = contentTopic, ts = ts(20)),
fakeWakuMessage(@[byte 3], contentTopic = contentTopic, ts = ts(30)),
fakeWakuMessage(@[byte 4], contentTopic = contentTopic, ts = ts(40)),
fakeWakuMessage(@[byte 5], contentTopic = contentTopic, ts = ts(50)),
fakeWakuMessage(@[byte 6], contentTopic = contentTopic, ts = ts(60)),
fakeWakuMessage(@[byte 7], contentTopic = contentTopic, ts = ts(70)),
]
shuffle(messages)
debug "randomized message insertion sequence", sequence = messages.mapIt(it.payload)
for msg in messages:
require (
await driver.put(
DefaultPubsubTopic,
msg,
computeDigest(msg),
computeMessageHash(DefaultPubsubTopic, msg),
msg.timestamp,
)
).isOk()
let cursor = computeTestCursor(DefaultPubsubTopic, fakeWakuMessage())
## When
let res = await driver.getMessages(
cursor = some(cursor), maxPageSize = 2, ascendingOrder = false
)
## Then
assert res.isOk(), res.error
check:
res.value.len == 0
asyncTest "content topic and cursor": asyncTest "content topic and cursor":
## Given ## Given
const contentTopic = "test-content-topic" const contentTopic = "test-content-topic"

View File

@ -606,6 +606,52 @@ suite "Queue driver - query by cursor":
## Cleanup ## Cleanup
(waitFor driver.close()).expect("driver to close") (waitFor driver.close()).expect("driver to close")
test "only cursor - invalid":
## Given
const contentTopic = "test-content-topic"
let driver = newTestSqliteDriver()
var messages =
@[
fakeWakuMessage(@[byte 0], ts = ts(00)),
fakeWakuMessage(@[byte 1], ts = ts(10)),
fakeWakuMessage(@[byte 2], contentTopic = contentTopic, ts = ts(20)),
fakeWakuMessage(@[byte 3], contentTopic = contentTopic, ts = ts(30)),
fakeWakuMessage(@[byte 4], contentTopic = contentTopic, ts = ts(40)),
fakeWakuMessage(@[byte 5], contentTopic = contentTopic, ts = ts(50)),
fakeWakuMessage(@[byte 6], contentTopic = contentTopic, ts = ts(60)),
fakeWakuMessage(@[byte 7], contentTopic = contentTopic, ts = ts(70)),
]
shuffle(messages)
debug "randomized message insertion sequence", sequence = messages.mapIt(it.payload)
for msg in messages:
let retFut = waitFor driver.put(
DefaultPubsubTopic,
msg,
computeDigest(msg),
computeMessageHash(DefaultPubsubTopic, msg),
msg.timestamp,
)
require retFut.isOk()
let cursor = computeTestCursor(DefaultPubsubTopic, fakeWakuMessage())
## When
let res = waitFor driver.getMessages(
cursor = some(cursor), maxPageSize = 2, ascendingOrder = false
)
## Then
check:
res.isErr()
res.error == "invalid_cursor"
## Cleanup
(waitFor driver.close()).expect("driver to close")
test "content topic and cursor": test "content topic and cursor":
## Given ## Given
const contentTopic = "test-content-topic" const contentTopic = "test-content-topic"

View File

@ -669,6 +669,53 @@ suite "SQLite driver - query by cursor":
## Cleanup ## Cleanup
(await driver.close()).expect("driver to close") (await driver.close()).expect("driver to close")
asyncTest "only cursor - invalid":
## Given
const contentTopic = "test-content-topic"
let driver = newSqliteArchiveDriver()
var messages =
@[
fakeWakuMessage(@[byte 0], ts = ts(00)),
fakeWakuMessage(@[byte 1], ts = ts(10)),
fakeWakuMessage(@[byte 2], contentTopic = contentTopic, ts = ts(20)),
fakeWakuMessage(@[byte 3], contentTopic = contentTopic, ts = ts(30)),
fakeWakuMessage(@[byte 4], contentTopic = contentTopic, ts = ts(40)),
fakeWakuMessage(@[byte 5], contentTopic = contentTopic, ts = ts(50)),
fakeWakuMessage(@[byte 6], contentTopic = contentTopic, ts = ts(60)),
fakeWakuMessage(@[byte 7], contentTopic = contentTopic, ts = ts(70)),
]
shuffle(messages)
debug "randomized message insertion sequence", sequence = messages.mapIt(it.payload)
for msg in messages:
require (
await driver.put(
DefaultPubsubTopic,
msg,
computeDigest(msg),
computeMessageHash(DefaultPubsubTopic, msg),
msg.timestamp,
)
).isOk()
let cursor = computeArchiveCursor(DefaultPubsubTopic, fakeWakuMessage())
## When
let res = await driver.getMessages(
cursor = some(cursor), maxPageSize = 2, ascendingOrder = false
)
## Then
check:
res.isOk()
res.value.len == 0
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "content topic and cursor": asyncTest "content topic and cursor":
## Given ## Given
const contentTopic = "test-content-topic" const contentTopic = "test-content-topic"

View File

@ -4,8 +4,8 @@ else:
{.push raises: [].} {.push raises: [].}
import import
std/[sets, strformat, uri, options], std/[sets, strformat, uri, options, sequtils],
stew/[byteutils, arrayops], stew/byteutils,
chronicles, chronicles,
json_serialization, json_serialization,
json_serialization/std/options, json_serialization/std/options,
@ -30,7 +30,7 @@ proc parseHash*(input: Option[string]): Result[Option[WakuMessageHash], string]
if base64UrlEncoded == "": if base64UrlEncoded == "":
return ok(none(WakuMessageHash)) return ok(none(WakuMessageHash))
let base64Encoded = decodeUrl(base64UrlEncoded) let base64Encoded = decodeUrl(base64UrlEncoded, false)
let decodedBytes = base64.decode(Base64String(base64Encoded)).valueOr: let decodedBytes = base64.decode(Base64String(base64Encoded)).valueOr:
return err("waku message hash parsing error: " & error) return err("waku message hash parsing error: " & error)
@ -45,7 +45,7 @@ proc parseHashes*(input: Option[string]): Result[seq[WakuMessageHash], string] =
if not input.isSome() or input.get() == "": if not input.isSome() or input.get() == "":
return ok(hashes) return ok(hashes)
let decodedUrl = decodeUrl(input.get()) let decodedUrl = decodeUrl(input.get(), false)
if decodedUrl != "": if decodedUrl != "":
for subString in decodedUrl.split(','): for subString in decodedUrl.split(','):
@ -62,7 +62,7 @@ proc parseHashes*(input: Option[string]): Result[seq[WakuMessageHash], string] =
# and this result is URL-encoded. # and this result is URL-encoded.
proc toRestStringWakuMessageHash*(self: WakuMessageHash): string = proc toRestStringWakuMessageHash*(self: WakuMessageHash): string =
let base64Encoded = base64.encode(self) let base64Encoded = base64.encode(self)
encodeUrl($base64Encoded) encodeUrl($base64Encoded, false)
## WakuMessage serde ## WakuMessage serde
@ -146,38 +146,6 @@ proc readValue*(
proof: proof, proof: proof,
) )
## WakuMessageHash serde
proc writeValue*(
writer: var JsonWriter, value: WakuMessageHash
) {.gcsafe, raises: [IOError].} =
writer.beginRecord()
writer.writeField("data", base64.encode(value))
writer.endRecord()
proc readValue*(
reader: var JsonReader, value: var WakuMessageHash
) {.gcsafe, raises: [SerializationError, IOError].} =
var data = none(seq[byte])
for fieldName in readObjectFields(reader):
case fieldName
of "data":
if data.isSome():
reader.raiseUnexpectedField("Multiple `data` fields found", "WakuMessageHash")
let decoded = base64.decode(reader.readValue(Base64String))
if not decoded.isOk():
reader.raiseUnexpectedField("Failed decoding data", "WakuMessageHash")
data = some(decoded.get())
else:
reader.raiseUnexpectedField("Unrecognided field", cstring(fieldName))
if data.isNone():
reader.raiseUnexpectedValue("Field `data` is missing")
for i in 0 ..< 32:
value[i] = data.get()[i]
## WakuMessageKeyValue serde ## WakuMessageKeyValue serde
proc writeValue*( proc writeValue*(
@ -185,11 +153,14 @@ proc writeValue*(
) {.gcsafe, raises: [IOError].} = ) {.gcsafe, raises: [IOError].} =
writer.beginRecord() writer.beginRecord()
writer.writeField("messageHash", value.messageHash) writer.writeField("messageHash", base64.encode(value.messageHash))
if value.message.isSome(): if value.message.isSome():
writer.writeField("message", value.message.get()) writer.writeField("message", value.message.get())
if value.pubsubTopic.isSome():
writer.writeField("pubsubTopic", value.pubsubTopic.get())
writer.endRecord() writer.endRecord()
proc readValue*( proc readValue*(
@ -198,6 +169,7 @@ proc readValue*(
var var
messageHash = none(WakuMessageHash) messageHash = none(WakuMessageHash)
message = none(WakuMessage) message = none(WakuMessage)
pubsubTopic = none(PubsubTopic)
for fieldName in readObjectFields(reader): for fieldName in readObjectFields(reader):
case fieldName case fieldName
@ -206,20 +178,31 @@ proc readValue*(
reader.raiseUnexpectedField( reader.raiseUnexpectedField(
"Multiple `messageHash` fields found", "WakuMessageKeyValue" "Multiple `messageHash` fields found", "WakuMessageKeyValue"
) )
messageHash = some(reader.readValue(WakuMessageHash)) let base64String = reader.readValue(Base64String)
let bytes = base64.decode(base64String).valueOr:
reader.raiseUnexpectedField("Failed decoding data", "messageHash")
messageHash = some(fromBytes(bytes))
of "message": of "message":
if message.isSome(): if message.isSome():
reader.raiseUnexpectedField( reader.raiseUnexpectedField(
"Multiple `message` fields found", "WakuMessageKeyValue" "Multiple `message` fields found", "WakuMessageKeyValue"
) )
message = some(reader.readValue(WakuMessage)) message = some(reader.readValue(WakuMessage))
of "pubsubTopic":
if pubsubTopic.isSome():
reader.raiseUnexpectedField(
"Multiple `pubsubTopic` fields found", "WakuMessageKeyValue"
)
pubsubTopic = some(reader.readValue(string))
else: else:
reader.raiseUnexpectedField("Unrecognided field", cstring(fieldName)) reader.raiseUnexpectedField("Unrecognided field", cstring(fieldName))
if messageHash.isNone(): if messageHash.isNone():
reader.raiseUnexpectedValue("Field `messageHash` is missing") reader.raiseUnexpectedValue("Field `messageHash` is missing")
value = WakuMessageKeyValue(messageHash: messageHash.get(), message: message) value = WakuMessageKeyValue(
messageHash: messageHash.get(), message: message, pubsubTopic: pubsubTopic
)
## StoreQueryResponse serde ## StoreQueryResponse serde
@ -234,7 +217,7 @@ proc writeValue*(
writer.writeField("messages", value.messages) writer.writeField("messages", value.messages)
if value.paginationCursor.isSome(): if value.paginationCursor.isSome():
writer.writeField("paginationCursor", value.paginationCursor.get()) writer.writeField("paginationCursor", base64.encode(value.paginationCursor.get()))
writer.endRecord() writer.endRecord()
@ -279,7 +262,10 @@ proc readValue*(
reader.raiseUnexpectedField( reader.raiseUnexpectedField(
"Multiple `paginationCursor` fields found", "StoreQueryResponse" "Multiple `paginationCursor` fields found", "StoreQueryResponse"
) )
cursor = some(reader.readValue(WakuMessageHash)) let base64String = reader.readValue(Base64String)
let bytes = base64.decode(base64String).valueOr:
reader.raiseUnexpectedField("Failed decoding data", "paginationCursor")
cursor = some(fromBytes(bytes))
else: else:
reader.raiseUnexpectedField("Unrecognided field", cstring(fieldName)) reader.raiseUnexpectedField("Unrecognided field", cstring(fieldName))
@ -324,10 +310,10 @@ proc writeValue*(
if req.endTime.isSome(): if req.endTime.isSome():
writer.writeField("endTime", req.endTime.get()) writer.writeField("endTime", req.endTime.get())
writer.writeField("messageHashes", req.messageHashes) writer.writeField("messageHashes", req.messageHashes.mapIt(base64.encode(it)))
if req.paginationCursor.isSome(): if req.paginationCursor.isSome():
writer.writeField("paginationCursor", req.paginationCursor.get()) writer.writeField("paginationCursor", base64.encode(req.paginationCursor.get()))
writer.writeField("paginationForward", req.paginationForward) writer.writeField("paginationForward", req.paginationForward)

View File

@ -75,6 +75,10 @@ const SelectWithCursorAscStmtDef =
storedAt <= $7 storedAt <= $7
ORDER BY storedAt ASC, messageHash ASC LIMIT $8;""" ORDER BY storedAt ASC, messageHash ASC LIMIT $8;"""
const SelectMessageByHashName = "SelectMessageByHash"
const SelectMessageByHashDef =
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta FROM messages WHERE messageHash = $1"""
const SelectNoCursorV2AscStmtName = "SelectWithoutCursorV2Asc" const SelectNoCursorV2AscStmtName = "SelectWithoutCursorV2Asc"
const SelectNoCursorV2AscStmtDef = const SelectNoCursorV2AscStmtDef =
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages
@ -347,10 +351,33 @@ proc getMessagesArbitraryQuery(
args.add(pubsubTopic.get()) args.add(pubsubTopic.get())
if cursor.isSome(): if cursor.isSome():
let hashHex = toHex(cursor.get().hash)
var entree: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)]
proc entreeCallback(pqResult: ptr PGresult) =
rowCallbackImpl(pqResult, entree)
(
await s.readConnPool.runStmt(
SelectMessageByHashName,
SelectMessageByHashDef,
@[hashHex],
@[int32(hashHex.len)],
@[int32(0)],
entreeCallback,
)
).isOkOr:
return err("failed to run query with cursor: " & $error)
if entree.len == 0:
return ok(entree)
let storetime = entree[0][3]
let comp = if ascendingOrder: ">" else: "<" let comp = if ascendingOrder: ">" else: "<"
statements.add("(storedAt, messageHash) " & comp & " (?,?)") statements.add("(storedAt, messageHash) " & comp & " (?,?)")
args.add($cursor.get().storeTime) args.add($storetime)
args.add(toHex(cursor.get().hash)) args.add(hashHex)
if startTime.isSome(): if startTime.isSome():
statements.add("storedAt >= ?") statements.add("storedAt >= ?")
@ -472,14 +499,35 @@ proc getMessagesPreparedStmt(
let limit = $maxPageSize let limit = $maxPageSize
if cursor.isSome(): if cursor.isSome():
let hash = toHex(cursor.get().hash)
var entree: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)]
proc entreeCallback(pqResult: ptr PGresult) =
rowCallbackImpl(pqResult, entree)
(
await s.readConnPool.runStmt(
SelectMessageByHashName,
SelectMessageByHashDef,
@[hash],
@[int32(hash.len)],
@[int32(0)],
entreeCallback,
)
).isOkOr:
return err("failed to run query with cursor: " & $error)
if entree.len == 0:
return ok(entree)
let storeTime = $entree[0][3]
var stmtName = var stmtName =
if ascOrder: SelectWithCursorAscStmtName else: SelectWithCursorDescStmtName if ascOrder: SelectWithCursorAscStmtName else: SelectWithCursorDescStmtName
var stmtDef = var stmtDef =
if ascOrder: SelectWithCursorAscStmtDef else: SelectWithCursorDescStmtDef if ascOrder: SelectWithCursorAscStmtDef else: SelectWithCursorDescStmtDef
let hash = toHex(cursor.get().hash)
let storeTime = $cursor.get().storeTime
( (
await s.readConnPool.runStmt( await s.readConnPool.runStmt(
stmtName, stmtName,

View File

@ -18,8 +18,7 @@ const EmptyWakuMessageHash*: WakuMessageHash = [
converter fromBytes*(array: openArray[byte]): WakuMessageHash = converter fromBytes*(array: openArray[byte]): WakuMessageHash =
var hash: WakuMessageHash var hash: WakuMessageHash
let copiedBytes = copyFrom(hash, array) discard copyFrom(hash, array)
assert copiedBytes == 32, "Waku message hash is 32 bytes"
hash hash
converter toBytesArray*(digest: MDigest[256]): WakuMessageHash = converter toBytesArray*(digest: MDigest[256]): WakuMessageHash =