mirror of https://github.com/waku-org/nwaku.git
fix: invalid cursor returning messages (#2724)
Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
This commit is contained in:
parent
6fbab633a2
commit
a65b13fcb8
|
@ -675,11 +675,20 @@ suite "Postgres driver - queries":
|
||||||
)
|
)
|
||||||
).isOk()
|
).isOk()
|
||||||
|
|
||||||
let cursor = computeTestCursor(DefaultPubsubTopic, fakeWakuMessage())
|
let fakeCursor = computeMessageHash(DefaultPubsubTopic, fakeWakuMessage())
|
||||||
|
let cursor = ArchiveCursor(hash: fakeCursor)
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let res = await driver.getMessages(
|
let res = await driver.getMessages(
|
||||||
cursor = some(cursor), maxPageSize = 2, ascendingOrder = false
|
includeData = true,
|
||||||
|
contentTopicSeq = @[DefaultContentTopic],
|
||||||
|
pubsubTopic = none(PubsubTopic),
|
||||||
|
cursor = some(cursor),
|
||||||
|
startTime = none(Timestamp),
|
||||||
|
endTime = none(Timestamp),
|
||||||
|
hashes = @[],
|
||||||
|
maxPageSize = 5,
|
||||||
|
ascendingOrder = true,
|
||||||
)
|
)
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
|
|
|
@ -637,11 +637,20 @@ suite "Queue driver - query by cursor":
|
||||||
)
|
)
|
||||||
require retFut.isOk()
|
require retFut.isOk()
|
||||||
|
|
||||||
let cursor = computeTestCursor(DefaultPubsubTopic, fakeWakuMessage())
|
let fakeCursor = computeMessageHash(DefaultPubsubTopic, fakeWakuMessage())
|
||||||
|
let cursor = ArchiveCursor(hash: fakeCursor)
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let res = waitFor driver.getMessages(
|
let res = waitFor driver.getMessages(
|
||||||
cursor = some(cursor), maxPageSize = 2, ascendingOrder = false
|
includeData = true,
|
||||||
|
contentTopic = @[DefaultContentTopic],
|
||||||
|
pubsubTopic = none(PubsubTopic),
|
||||||
|
cursor = some(cursor),
|
||||||
|
startTime = none(Timestamp),
|
||||||
|
endTime = none(Timestamp),
|
||||||
|
hashes = @[],
|
||||||
|
maxPageSize = 5,
|
||||||
|
ascendingOrder = true,
|
||||||
)
|
)
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
|
|
|
@ -701,17 +701,26 @@ suite "SQLite driver - query by cursor":
|
||||||
)
|
)
|
||||||
).isOk()
|
).isOk()
|
||||||
|
|
||||||
let cursor = computeArchiveCursor(DefaultPubsubTopic, fakeWakuMessage())
|
let fakeCursor = computeMessageHash(DefaultPubsubTopic, fakeWakuMessage())
|
||||||
|
let cursor = ArchiveCursor(hash: fakeCursor)
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let res = await driver.getMessages(
|
let res = await driver.getMessages(
|
||||||
cursor = some(cursor), maxPageSize = 2, ascendingOrder = false
|
includeData = true,
|
||||||
|
contentTopic = @[DefaultContentTopic],
|
||||||
|
pubsubTopic = none(PubsubTopic),
|
||||||
|
cursor = some(cursor),
|
||||||
|
startTime = none(Timestamp),
|
||||||
|
endTime = none(Timestamp),
|
||||||
|
hashes = @[],
|
||||||
|
maxPageSize = 5,
|
||||||
|
ascendingOrder = true,
|
||||||
)
|
)
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
check:
|
check:
|
||||||
res.isOk()
|
res.isErr()
|
||||||
res.value.len == 0
|
res.error == "cursor not found"
|
||||||
|
|
||||||
## Cleanup
|
## Cleanup
|
||||||
(await driver.close()).expect("driver to close")
|
(await driver.close()).expect("driver to close")
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
{.used.}
|
{.used.}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[options, times, sugar],
|
std/[options, sugar],
|
||||||
stew/shims/net as stewNet,
|
stew/shims/net as stewNet,
|
||||||
chronicles,
|
chronicles,
|
||||||
|
chronos/timer,
|
||||||
testutils/unittests,
|
testutils/unittests,
|
||||||
eth/keys,
|
eth/keys,
|
||||||
presto,
|
presto,
|
||||||
|
@ -24,8 +25,10 @@ import
|
||||||
../../../waku/waku_api/rest/store/types,
|
../../../waku/waku_api/rest/store/types,
|
||||||
../../../waku/waku_archive,
|
../../../waku/waku_archive,
|
||||||
../../../waku/waku_archive/driver/queue_driver,
|
../../../waku/waku_archive/driver/queue_driver,
|
||||||
|
../../../waku/waku_archive/driver/sqlite_driver,
|
||||||
|
../../../waku/common/databases/db_sqlite,
|
||||||
|
../../../waku/waku_archive/driver/postgres_driver,
|
||||||
../../../waku/waku_store as waku_store,
|
../../../waku/waku_store as waku_store,
|
||||||
../../../waku/common/base64,
|
|
||||||
../testlib/wakucore,
|
../testlib/wakucore,
|
||||||
../testlib/wakunode
|
../testlib/wakunode
|
||||||
|
|
||||||
|
@ -42,7 +45,7 @@ proc put(
|
||||||
if message.timestamp > 0:
|
if message.timestamp > 0:
|
||||||
message.timestamp
|
message.timestamp
|
||||||
else:
|
else:
|
||||||
getNanosecondTime(getTime().toUnixFloat())
|
getNowInNanosecondTime()
|
||||||
|
|
||||||
store.put(pubsubTopic, message, digest, msgHash, receivedTime)
|
store.put(pubsubTopic, message, digest, msgHash, receivedTime)
|
||||||
|
|
||||||
|
@ -84,6 +87,84 @@ procSuite "Waku Rest API - Store v3":
|
||||||
check:
|
check:
|
||||||
expected.get() == msgHashRes.get().get().toRestStringWakuMessageHash()
|
expected.get() == msgHashRes.get().get().toRestStringWakuMessageHash()
|
||||||
|
|
||||||
|
asyncTest "invalid cursor":
|
||||||
|
let node = testWakuNode()
|
||||||
|
await node.start()
|
||||||
|
await node.mountRelay()
|
||||||
|
|
||||||
|
let restPort = Port(58011)
|
||||||
|
let restAddress = parseIpAddress("0.0.0.0")
|
||||||
|
let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
|
||||||
|
|
||||||
|
installStoreApiHandlers(restServer.router, node)
|
||||||
|
restServer.start()
|
||||||
|
|
||||||
|
# WakuStore setup
|
||||||
|
let db: SqliteDatabase =
|
||||||
|
SqliteDatabase.new(string.none().get(":memory:")).expect("valid DB")
|
||||||
|
let driver: ArchiveDriver = SqliteDriver.new(db).expect("valid driver")
|
||||||
|
let mountArchiveRes = node.mountArchive(driver)
|
||||||
|
assert mountArchiveRes.isOk(), mountArchiveRes.error
|
||||||
|
|
||||||
|
await node.mountStore()
|
||||||
|
node.mountStoreClient()
|
||||||
|
|
||||||
|
let key = generateEcdsaKey()
|
||||||
|
var peerSwitch = newStandardSwitch(some(key))
|
||||||
|
await peerSwitch.start()
|
||||||
|
|
||||||
|
peerSwitch.mount(node.wakuStore)
|
||||||
|
|
||||||
|
await sleepAsync(1.seconds())
|
||||||
|
|
||||||
|
# Now prime it with some history before tests
|
||||||
|
let msgList =
|
||||||
|
@[
|
||||||
|
fakeWakuMessage(@[byte 0], contentTopic = ContentTopic("ct1"), ts = 0),
|
||||||
|
fakeWakuMessage(@[byte 1], ts = 1),
|
||||||
|
fakeWakuMessage(@[byte 1, byte 2], ts = 2),
|
||||||
|
fakeWakuMessage(@[byte 1], ts = 3),
|
||||||
|
fakeWakuMessage(@[byte 1], ts = 4),
|
||||||
|
fakeWakuMessage(@[byte 1], ts = 5),
|
||||||
|
fakeWakuMessage(@[byte 1], ts = 6),
|
||||||
|
fakeWakuMessage(@[byte 9], contentTopic = ContentTopic("c2"), ts = 9),
|
||||||
|
]
|
||||||
|
for msg in msgList:
|
||||||
|
require (await driver.put(DefaultPubsubTopic, msg)).isOk()
|
||||||
|
|
||||||
|
let client = newRestHttpClient(initTAddress(restAddress, restPort))
|
||||||
|
|
||||||
|
let remotePeerInfo = peerSwitch.peerInfo.toRemotePeerInfo()
|
||||||
|
let fullAddr = $remotePeerInfo.addrs[0] & "/p2p/" & $remotePeerInfo.peerId
|
||||||
|
|
||||||
|
await sleepAsync(1.seconds())
|
||||||
|
|
||||||
|
let fakeCursor = computeMessageHash(DefaultPubsubTopic, fakeWakuMessage())
|
||||||
|
let encodedCursor = fakeCursor.toRestStringWakuMessageHash()
|
||||||
|
|
||||||
|
# Apply filter by start and end timestamps
|
||||||
|
var response = await client.getStoreMessagesV3(
|
||||||
|
encodeUrl(fullAddr),
|
||||||
|
"true", # include data
|
||||||
|
"", # pubsub topic
|
||||||
|
"ct1,c2", # empty content topics.
|
||||||
|
"", # start time
|
||||||
|
"", # end time
|
||||||
|
"", # hashes
|
||||||
|
encodedCursor, # base64-encoded hash
|
||||||
|
"true", # ascending
|
||||||
|
"5", # empty implies default page size
|
||||||
|
)
|
||||||
|
|
||||||
|
check:
|
||||||
|
response.status == 200
|
||||||
|
$response.contentType == $MIMETYPE_JSON
|
||||||
|
response.data.messages.len == 0
|
||||||
|
|
||||||
|
await restServer.stop()
|
||||||
|
await restServer.closeWait()
|
||||||
|
await node.stop()
|
||||||
|
|
||||||
asyncTest "Filter by start and end time":
|
asyncTest "Filter by start and end time":
|
||||||
let node = testWakuNode()
|
let node = testWakuNode()
|
||||||
await node.start()
|
await node.start()
|
||||||
|
|
|
@ -35,6 +35,9 @@ proc parseHash*(input: Option[string]): Result[Option[WakuMessageHash], string]
|
||||||
let decodedBytes = base64.decode(Base64String(base64Encoded)).valueOr:
|
let decodedBytes = base64.decode(Base64String(base64Encoded)).valueOr:
|
||||||
return err("waku message hash parsing error: " & error)
|
return err("waku message hash parsing error: " & error)
|
||||||
|
|
||||||
|
if decodedBytes.len != 32:
|
||||||
|
return err("waku message hash parsing error: invalid hash length: " & $decodedBytes.len)
|
||||||
|
|
||||||
let hash: WakuMessageHash = fromBytes(decodedBytes)
|
let hash: WakuMessageHash = fromBytes(decodedBytes)
|
||||||
|
|
||||||
return ok(some(hash))
|
return ok(some(hash))
|
||||||
|
|
|
@ -674,16 +674,18 @@ proc selectMessagesByStoreQueryWithLimit*(
|
||||||
if cursor.isSome() and cursor.get() != EmptyWakuMessageHash:
|
if cursor.isSome() and cursor.get() != EmptyWakuMessageHash:
|
||||||
let hash: WakuMessageHash = cursor.get()
|
let hash: WakuMessageHash = cursor.get()
|
||||||
|
|
||||||
var wakuMessage: WakuMessage
|
var wakuMessage: Option[WakuMessage]
|
||||||
|
|
||||||
proc queryRowCallback(s: ptr sqlite3_stmt) =
|
proc queryRowCallback(s: ptr sqlite3_stmt) =
|
||||||
wakuMessage = queryRowWakuMessageCallback(
|
wakuMessage = some(
|
||||||
s,
|
queryRowWakuMessageCallback(
|
||||||
contentTopicCol = 0,
|
s,
|
||||||
payloadCol = 1,
|
contentTopicCol = 0,
|
||||||
versionCol = 2,
|
payloadCol = 1,
|
||||||
senderTimestampCol = 3,
|
versionCol = 2,
|
||||||
metaCol = 4,
|
senderTimestampCol = 3,
|
||||||
|
metaCol = 4,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
let query = selectMessageByHashQuery()
|
let query = selectMessageByHashQuery()
|
||||||
|
@ -691,9 +693,12 @@ proc selectMessagesByStoreQueryWithLimit*(
|
||||||
?dbStmt.execSelectMessageByHash(hash, queryRowCallback)
|
?dbStmt.execSelectMessageByHash(hash, queryRowCallback)
|
||||||
dbStmt.dispose()
|
dbStmt.dispose()
|
||||||
|
|
||||||
let time: Timestamp = wakuMessage.timestamp
|
if wakuMessage.isSome():
|
||||||
|
let time = wakuMessage.get().timestamp
|
||||||
|
|
||||||
some((time, hash))
|
some((time, hash))
|
||||||
|
else:
|
||||||
|
return err("cursor not found")
|
||||||
else:
|
else:
|
||||||
none((Timestamp, WakuMessageHash))
|
none((Timestamp, WakuMessageHash))
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue