fix: store cursors should be exclusive and match a DB item (#1263)

* fix(store): use exclusive cursor

* fix(store): read correct column length

Co-authored-by: Lorenzo Delgado <lorenzo@status.im>
This commit is contained in:
Hanno Cornelius 2022-10-20 13:24:40 +03:00 committed by GitHub
parent 78e224f45c
commit a12a359b79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 88 additions and 52 deletions

View File

@ -332,39 +332,56 @@ procSuite "Waku Store - history query":
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
## Given
let currentTime = getNanosecondTime(getTime().toUnixFloat())
let msgList = @[
WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")),
WakuMessage(payload: @[byte 1], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 2], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 3], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 4], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 5], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 6], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 7], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"))
WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: currentTime - 9),
WakuMessage(payload: @[byte 1], contentTopic: DefaultContentTopic, timestamp: currentTime - 8),
WakuMessage(payload: @[byte 2], contentTopic: DefaultContentTopic, timestamp: currentTime - 7),
WakuMessage(payload: @[byte 3], contentTopic: DefaultContentTopic, timestamp: currentTime - 6),
WakuMessage(payload: @[byte 4], contentTopic: DefaultContentTopic, timestamp: currentTime - 5),
WakuMessage(payload: @[byte 5], contentTopic: DefaultContentTopic, timestamp: currentTime - 4),
WakuMessage(payload: @[byte 6], contentTopic: DefaultContentTopic, timestamp: currentTime - 3),
WakuMessage(payload: @[byte 7], contentTopic: DefaultContentTopic, timestamp: currentTime - 2),
WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic, timestamp: currentTime - 1),
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"), timestamp: currentTime)
]
for msg in msgList:
require serverProto.store.put(DefaultPubsubTopic, msg).isOk()
## When
let rpc = HistoryQuery(
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)],
pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD)
)
let res = await clientProto.query(rpc)
var rpc = HistoryQuery(
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)],
pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD)
)
var res = await clientProto.query(rpc)
require res.isOk()
var
response = res.tryGet()
totalMessages = response.messages.len()
totalQueries = 1
while response.pagingInfo.cursor != PagingIndex():
require:
totalQueries <= 4 # Sanity check here and guarantee that the test will not run forever
response.messages.len() == 2
response.pagingInfo.pageSize == 2
response.pagingInfo.direction == PagingDirection.FORWARD
rpc.pagingInfo = response.pagingInfo
# Continue querying
res = await clientProto.query(rpc)
require res.isOk()
response = res.tryGet()
totalMessages += response.messages.len()
totalQueries += 1
## Then
check:
res.isOk()
let response = res.tryGet()
check:
response.messages.len() == 2
response.pagingInfo.pageSize == 2
response.pagingInfo.direction == PagingDirection.FORWARD
response.pagingInfo.cursor != PagingIndex()
totalQueries == 4 # 4 queries of pageSize 2
totalMessages == 8 # 8 messages in total
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
@ -384,39 +401,56 @@ procSuite "Waku Store - history query":
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
## Given
let currentTime = getNanosecondTime(getTime().toUnixFloat())
let msgList = @[
WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")),
WakuMessage(payload: @[byte 1], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 2], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 3], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 4], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 5], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 6], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 7], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"))
WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: currentTime - 9),
WakuMessage(payload: @[byte 1], contentTopic: DefaultContentTopic, timestamp: currentTime - 8),
WakuMessage(payload: @[byte 2], contentTopic: DefaultContentTopic, timestamp: currentTime - 7),
WakuMessage(payload: @[byte 3], contentTopic: DefaultContentTopic, timestamp: currentTime - 6),
WakuMessage(payload: @[byte 4], contentTopic: DefaultContentTopic, timestamp: currentTime - 5),
WakuMessage(payload: @[byte 5], contentTopic: DefaultContentTopic, timestamp: currentTime - 4),
WakuMessage(payload: @[byte 6], contentTopic: DefaultContentTopic, timestamp: currentTime - 3),
WakuMessage(payload: @[byte 7], contentTopic: DefaultContentTopic, timestamp: currentTime - 2),
WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic, timestamp: currentTime - 1),
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"), timestamp: currentTime)
]
for msg in msgList:
require serverProto.store.put(DefaultPubsubTopic, msg).isOk()
## When
let rpc = HistoryQuery(
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)],
pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD)
)
let res = await clientProto.query(rpc)
var rpc = HistoryQuery(
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)],
pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD)
)
var res = await clientProto.query(rpc)
require res.isOk()
var
response = res.tryGet()
totalMessages = response.messages.len()
totalQueries = 1
while response.pagingInfo.cursor != PagingIndex():
require:
totalQueries <= 4 # Sanity check here and guarantee that the test will not run forever
response.messages.len() == 2
response.pagingInfo.pageSize == 2
response.pagingInfo.direction == PagingDirection.BACKWARD
rpc.pagingInfo = response.pagingInfo
# Continue querying
res = await clientProto.query(rpc)
require res.isOk()
response = res.tryGet()
totalMessages += response.messages.len()
totalQueries += 1
## Then
check:
res.isOk()
let response = res.tryGet()
check:
response.messages.len() == 2
response.pagingInfo.pageSize == 2
response.pagingInfo.direction == PagingDirection.BACKWARD
response.pagingInfo.cursor != PagingIndex()
totalQueries == 4 # 4 queries of pageSize 2
totalMessages == 8 # 8 messages in total
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())

View File

@ -22,11 +22,11 @@ type DbCursor* = (Timestamp, seq[byte], string)
proc queryRowWakuMessageCallback(s: ptr sqlite3_stmt, contentTopicCol, payloadCol, versionCol, senderTimestampCol: cint): WakuMessage =
let
topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, contentTopicCol))
topicLength = sqlite3_column_bytes(s, 1)
topicLength = sqlite3_column_bytes(s, contentTopicCol)
contentTopic = string.fromBytes(@(toOpenArray(topic, 0, topicLength-1)))
let
p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, payloadCol))
length = sqlite3_column_bytes(s, 2)
length = sqlite3_column_bytes(s, payloadCol)
payload = @(toOpenArray(p, 0, length-1))
let version = sqlite3_column_int64(s, versionCol)
let senderTimestamp = sqlite3_column_int64(s, senderTimestampCol)
@ -45,7 +45,7 @@ proc queryRowReceiverTimestampCallback(s: ptr sqlite3_stmt, storedAtCol: cint):
proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): string =
let
pubsubTopicPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, pubsubTopicCol))
pubsubTopicLength = sqlite3_column_bytes(s, 3)
pubsubTopicLength = sqlite3_column_bytes(s, pubsubTopicCol)
pubsubTopic = string.fromBytes(@(toOpenArray(pubsubTopicPointer, 0, pubsubTopicLength-1)))
pubsubTopic
@ -53,7 +53,7 @@ proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): str
proc queryRowDigestCallback(s: ptr sqlite3_stmt, digestCol: cint): seq[byte] =
let
digestPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, digestCol))
digestLength = sqlite3_column_bytes(s, 3)
digestLength = sqlite3_column_bytes(s, digestCol)
digest = @(toOpenArray(digestPointer, 0, digestLength-1))
digest

View File

@ -150,8 +150,10 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.}
if rows.len > int(qMaxPageSize):
# Build last message cursor
let (pubsubTopic, message, digest, storeTimestamp) = rows[^1]
## Build last message cursor
## The cursor is built from the last message INCLUDED in the response
## (i.e. the second last message in the rows list)
let (pubsubTopic, message, digest, storeTimestamp) = rows[^2]
# TODO: Improve coherence of MessageDigest type
var messageDigest: array[32, byte]