diff --git a/tests/waku_archive/test_driver_postgres_query.nim b/tests/waku_archive/test_driver_postgres_query.nim index 94b8131ef..80e0a231a 100644 --- a/tests/waku_archive/test_driver_postgres_query.nim +++ b/tests/waku_archive/test_driver_postgres_query.nim @@ -645,6 +645,49 @@ suite "Postgres driver - queries": check: filteredMessages == expected[2 .. 3].reversed() + asyncTest "only cursor - invalid": + ## Given + const contentTopic = "test-content-topic" + + var messages = + @[ + fakeWakuMessage(@[byte 0], ts = ts(00)), + fakeWakuMessage(@[byte 1], ts = ts(10)), + fakeWakuMessage(@[byte 2], contentTopic = contentTopic, ts = ts(20)), + fakeWakuMessage(@[byte 3], contentTopic = contentTopic, ts = ts(30)), + fakeWakuMessage(@[byte 4], contentTopic = contentTopic, ts = ts(40)), + fakeWakuMessage(@[byte 5], contentTopic = contentTopic, ts = ts(50)), + fakeWakuMessage(@[byte 6], contentTopic = contentTopic, ts = ts(60)), + fakeWakuMessage(@[byte 7], contentTopic = contentTopic, ts = ts(70)), + ] + + shuffle(messages) + debug "randomized message insertion sequence", sequence = messages.mapIt(it.payload) + + for msg in messages: + require ( + await driver.put( + DefaultPubsubTopic, + msg, + computeDigest(msg), + computeMessageHash(DefaultPubsubTopic, msg), + msg.timestamp, + ) + ).isOk() + + let cursor = computeTestCursor(DefaultPubsubTopic, fakeWakuMessage()) + + ## When + let res = await driver.getMessages( + cursor = some(cursor), maxPageSize = 2, ascendingOrder = false + ) + + ## Then + assert res.isOk(), res.error + + check: + res.value.len == 0 + asyncTest "content topic and cursor": ## Given const contentTopic = "test-content-topic" diff --git a/tests/waku_archive/test_driver_queue_query.nim b/tests/waku_archive/test_driver_queue_query.nim index cfdf78df5..4b0993264 100644 --- a/tests/waku_archive/test_driver_queue_query.nim +++ b/tests/waku_archive/test_driver_queue_query.nim @@ -606,6 +606,52 @@ suite "Queue driver - query by cursor": ## Cleanup (waitFor driver.close()).expect("driver to close") + test "only cursor - invalid": + ## Given + const contentTopic = "test-content-topic" + + let driver = newTestSqliteDriver() + + var messages = + @[ + fakeWakuMessage(@[byte 0], ts = ts(00)), + fakeWakuMessage(@[byte 1], ts = ts(10)), + fakeWakuMessage(@[byte 2], contentTopic = contentTopic, ts = ts(20)), + fakeWakuMessage(@[byte 3], contentTopic = contentTopic, ts = ts(30)), + fakeWakuMessage(@[byte 4], contentTopic = contentTopic, ts = ts(40)), + fakeWakuMessage(@[byte 5], contentTopic = contentTopic, ts = ts(50)), + fakeWakuMessage(@[byte 6], contentTopic = contentTopic, ts = ts(60)), + fakeWakuMessage(@[byte 7], contentTopic = contentTopic, ts = ts(70)), + ] + + shuffle(messages) + debug "randomized message insertion sequence", sequence = messages.mapIt(it.payload) + + for msg in messages: + let retFut = waitFor driver.put( + DefaultPubsubTopic, + msg, + computeDigest(msg), + computeMessageHash(DefaultPubsubTopic, msg), + msg.timestamp, + ) + require retFut.isOk() + + let cursor = computeTestCursor(DefaultPubsubTopic, fakeWakuMessage()) + + ## When + let res = waitFor driver.getMessages( + cursor = some(cursor), maxPageSize = 2, ascendingOrder = false + ) + + ## Then + check: + res.isErr() + res.error == "invalid_cursor" + + ## Cleanup + (waitFor driver.close()).expect("driver to close") + test "content topic and cursor": ## Given const contentTopic = "test-content-topic" diff --git a/tests/waku_archive/test_driver_sqlite_query.nim b/tests/waku_archive/test_driver_sqlite_query.nim index 6ae2ce414..873107f3b 100644 --- a/tests/waku_archive/test_driver_sqlite_query.nim +++ b/tests/waku_archive/test_driver_sqlite_query.nim @@ -669,6 +669,53 @@ suite "SQLite driver - query by cursor": ## Cleanup (await driver.close()).expect("driver to close") + asyncTest "only cursor - invalid": + ## Given + const contentTopic = "test-content-topic" + + let driver = newSqliteArchiveDriver() + + var messages = + @[ + fakeWakuMessage(@[byte 0], ts = ts(00)), + fakeWakuMessage(@[byte 1], ts = ts(10)), + fakeWakuMessage(@[byte 2], contentTopic = contentTopic, ts = ts(20)), + fakeWakuMessage(@[byte 3], contentTopic = contentTopic, ts = ts(30)), + fakeWakuMessage(@[byte 4], contentTopic = contentTopic, ts = ts(40)), + fakeWakuMessage(@[byte 5], contentTopic = contentTopic, ts = ts(50)), + fakeWakuMessage(@[byte 6], contentTopic = contentTopic, ts = ts(60)), + fakeWakuMessage(@[byte 7], contentTopic = contentTopic, ts = ts(70)), + ] + + shuffle(messages) + debug "randomized message insertion sequence", sequence = messages.mapIt(it.payload) + + for msg in messages: + require ( + await driver.put( + DefaultPubsubTopic, + msg, + computeDigest(msg), + computeMessageHash(DefaultPubsubTopic, msg), + msg.timestamp, + ) + ).isOk() + + let cursor = computeArchiveCursor(DefaultPubsubTopic, fakeWakuMessage()) + + ## When + let res = await driver.getMessages( + cursor = some(cursor), maxPageSize = 2, ascendingOrder = false + ) + + ## Then + check: + res.isOk() + res.value.len == 0 + + ## Cleanup + (await driver.close()).expect("driver to close") + asyncTest "content topic and cursor": ## Given const contentTopic = "test-content-topic" diff --git a/waku/waku_api/rest/store/types.nim b/waku/waku_api/rest/store/types.nim index 0a9a79d88..b899dc0d0 100644 --- a/waku/waku_api/rest/store/types.nim +++ b/waku/waku_api/rest/store/types.nim @@ -4,8 +4,8 @@ else: {.push raises: [].} import - std/[sets, strformat, uri, options], - stew/[byteutils, arrayops], + std/[sets, strformat, uri, options, sequtils], + stew/byteutils, chronicles, json_serialization, json_serialization/std/options, @@ -30,7 +30,7 @@ proc parseHash*(input: Option[string]): Result[Option[WakuMessageHash], string] if base64UrlEncoded == "": return ok(none(WakuMessageHash)) - let base64Encoded = decodeUrl(base64UrlEncoded) + let base64Encoded = decodeUrl(base64UrlEncoded, false) let decodedBytes = base64.decode(Base64String(base64Encoded)).valueOr: return err("waku message hash parsing error: " & error) @@ -45,7 +45,7 @@ proc parseHashes*(input: Option[string]): Result[seq[WakuMessageHash], string] = if not input.isSome() or input.get() == "": return ok(hashes) - let decodedUrl = decodeUrl(input.get()) + let decodedUrl = decodeUrl(input.get(), false) if decodedUrl != "": for subString in decodedUrl.split(','): @@ -62,7 +62,7 @@ proc parseHashes*(input: Option[string]): Result[seq[WakuMessageHash], string] = # and this result is URL-encoded. proc toRestStringWakuMessageHash*(self: WakuMessageHash): string = let base64Encoded = base64.encode(self) - encodeUrl($base64Encoded) + encodeUrl($base64Encoded, false) ## WakuMessage serde @@ -146,38 +146,6 @@ proc readValue*( proof: proof, ) -## WakuMessageHash serde - -proc writeValue*( - writer: var JsonWriter, value: WakuMessageHash -) {.gcsafe, raises: [IOError].} = - writer.beginRecord() - writer.writeField("data", base64.encode(value)) - writer.endRecord() - -proc readValue*( - reader: var JsonReader, value: var WakuMessageHash -) {.gcsafe, raises: [SerializationError, IOError].} = - var data = none(seq[byte]) - - for fieldName in readObjectFields(reader): - case fieldName - of "data": - if data.isSome(): - reader.raiseUnexpectedField("Multiple `data` fields found", "WakuMessageHash") - let decoded = base64.decode(reader.readValue(Base64String)) - if not decoded.isOk(): - reader.raiseUnexpectedField("Failed decoding data", "WakuMessageHash") - data = some(decoded.get()) - else: - reader.raiseUnexpectedField("Unrecognided field", cstring(fieldName)) - - if data.isNone(): - reader.raiseUnexpectedValue("Field `data` is missing") - - for i in 0 ..< 32: - value[i] = data.get()[i] - ## WakuMessageKeyValue serde proc writeValue*( @@ -185,11 +153,14 @@ proc writeValue*( ) {.gcsafe, raises: [IOError].} = writer.beginRecord() - writer.writeField("messageHash", value.messageHash) + writer.writeField("messageHash", base64.encode(value.messageHash)) if value.message.isSome(): writer.writeField("message", value.message.get()) + if value.pubsubTopic.isSome(): + writer.writeField("pubsubTopic", value.pubsubTopic.get()) + writer.endRecord() proc readValue*( @@ -198,6 +169,7 @@ proc readValue*( var messageHash = none(WakuMessageHash) message = none(WakuMessage) + pubsubTopic = none(PubsubTopic) for fieldName in readObjectFields(reader): case fieldName @@ -206,20 +178,31 @@ proc readValue*( reader.raiseUnexpectedField( "Multiple `messageHash` fields found", "WakuMessageKeyValue" ) - messageHash = some(reader.readValue(WakuMessageHash)) + let base64String = reader.readValue(Base64String) + let bytes = base64.decode(base64String).valueOr: + reader.raiseUnexpectedField("Failed decoding data", "messageHash") + messageHash = some(fromBytes(bytes)) of "message": if message.isSome(): reader.raiseUnexpectedField( "Multiple `message` fields found", "WakuMessageKeyValue" ) message = some(reader.readValue(WakuMessage)) + of "pubsubTopic": + if pubsubTopic.isSome(): + reader.raiseUnexpectedField( + "Multiple `pubsubTopic` fields found", "WakuMessageKeyValue" + ) + pubsubTopic = some(reader.readValue(string)) else: reader.raiseUnexpectedField("Unrecognided field", cstring(fieldName)) if messageHash.isNone(): reader.raiseUnexpectedValue("Field `messageHash` is missing") - value = WakuMessageKeyValue(messageHash: messageHash.get(), message: message) + value = WakuMessageKeyValue( + messageHash: messageHash.get(), message: message, pubsubTopic: pubsubTopic + ) ## StoreQueryResponse serde @@ -234,7 +217,7 @@ proc writeValue*( writer.writeField("messages", value.messages) if value.paginationCursor.isSome(): - writer.writeField("paginationCursor", value.paginationCursor.get()) + writer.writeField("paginationCursor", base64.encode(value.paginationCursor.get())) writer.endRecord() @@ -279,7 +262,10 @@ proc readValue*( reader.raiseUnexpectedField( "Multiple `paginationCursor` fields found", "StoreQueryResponse" ) - cursor = some(reader.readValue(WakuMessageHash)) + let base64String = reader.readValue(Base64String) + let bytes = base64.decode(base64String).valueOr: + reader.raiseUnexpectedField("Failed decoding data", "paginationCursor") + cursor = some(fromBytes(bytes)) else: reader.raiseUnexpectedField("Unrecognided field", cstring(fieldName)) @@ -324,10 +310,10 @@ proc writeValue*( if req.endTime.isSome(): writer.writeField("endTime", req.endTime.get()) - writer.writeField("messageHashes", req.messageHashes) + writer.writeField("messageHashes", req.messageHashes.mapIt(base64.encode(it))) if req.paginationCursor.isSome(): - writer.writeField("paginationCursor", req.paginationCursor.get()) + writer.writeField("paginationCursor", base64.encode(req.paginationCursor.get())) writer.writeField("paginationForward", req.paginationForward) diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index 51464ce05..efd43ba4c 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -75,6 +75,10 @@ const SelectWithCursorAscStmtDef = storedAt <= $7 ORDER BY storedAt ASC, messageHash ASC LIMIT $8;""" +const SelectMessageByHashName = "SelectMessageByHash" +const SelectMessageByHashDef = + """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta FROM messages WHERE messageHash = $1""" + const SelectNoCursorV2AscStmtName = "SelectWithoutCursorV2Asc" const SelectNoCursorV2AscStmtDef = """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages @@ -347,10 +351,33 @@ proc getMessagesArbitraryQuery( args.add(pubsubTopic.get()) if cursor.isSome(): + let hashHex = toHex(cursor.get().hash) + + var entree: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)] + proc entreeCallback(pqResult: ptr PGresult) = + rowCallbackImpl(pqResult, entree) + + ( + await s.readConnPool.runStmt( + SelectMessageByHashName, + SelectMessageByHashDef, + @[hashHex], + @[int32(hashHex.len)], + @[int32(0)], + entreeCallback, + ) + ).isOkOr: + return err("failed to run query with cursor: " & $error) + + if entree.len == 0: + return ok(entree) + + let storetime = entree[0][3] + let comp = if ascendingOrder: ">" else: "<" statements.add("(storedAt, messageHash) " & comp & " (?,?)") - args.add($cursor.get().storeTime) - args.add(toHex(cursor.get().hash)) + args.add($storetime) + args.add(hashHex) if startTime.isSome(): statements.add("storedAt >= ?") @@ -472,14 +499,35 @@ proc getMessagesPreparedStmt( let limit = $maxPageSize if cursor.isSome(): + let hash = toHex(cursor.get().hash) + + var entree: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)] + + proc entreeCallback(pqResult: ptr PGresult) = + rowCallbackImpl(pqResult, entree) + + ( + await s.readConnPool.runStmt( + SelectMessageByHashName, + SelectMessageByHashDef, + @[hash], + @[int32(hash.len)], + @[int32(0)], + entreeCallback, + ) + ).isOkOr: + return err("failed to run query with cursor: " & $error) + + if entree.len == 0: + return ok(entree) + + let storeTime = $entree[0][3] + var stmtName = if ascOrder: SelectWithCursorAscStmtName else: SelectWithCursorDescStmtName var stmtDef = if ascOrder: SelectWithCursorAscStmtDef else: SelectWithCursorDescStmtDef - let hash = toHex(cursor.get().hash) - let storeTime = $cursor.get().storeTime - ( await s.readConnPool.runStmt( stmtName, diff --git a/waku/waku_core/message/digest.nim b/waku/waku_core/message/digest.nim index 48d89d5e3..67e8d81c2 100644 --- a/waku/waku_core/message/digest.nim +++ b/waku/waku_core/message/digest.nim @@ -18,8 +18,7 @@ const EmptyWakuMessageHash*: WakuMessageHash = [ converter fromBytes*(array: openArray[byte]): WakuMessageHash = var hash: WakuMessageHash - let copiedBytes = copyFrom(hash, array) - assert copiedBytes == 32, "Waku message hash is 32 bytes" + discard copyFrom(hash, array) hash converter toBytesArray*(digest: MDigest[256]): WakuMessageHash =