mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-28 15:46:33 +00:00
deploy: a12a359b79c0add814c51fbb5ed80c462b8801db
This commit is contained in:
parent
c447d51fcb
commit
1fbe9ce013
@ -332,39 +332,56 @@ procSuite "Waku Store - history query":
|
|||||||
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
## Given
|
## Given
|
||||||
|
let currentTime = getNanosecondTime(getTime().toUnixFloat())
|
||||||
let msgList = @[
|
let msgList = @[
|
||||||
WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")),
|
WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: currentTime - 9),
|
||||||
WakuMessage(payload: @[byte 1], contentTopic: DefaultContentTopic),
|
WakuMessage(payload: @[byte 1], contentTopic: DefaultContentTopic, timestamp: currentTime - 8),
|
||||||
WakuMessage(payload: @[byte 2], contentTopic: DefaultContentTopic),
|
WakuMessage(payload: @[byte 2], contentTopic: DefaultContentTopic, timestamp: currentTime - 7),
|
||||||
WakuMessage(payload: @[byte 3], contentTopic: DefaultContentTopic),
|
WakuMessage(payload: @[byte 3], contentTopic: DefaultContentTopic, timestamp: currentTime - 6),
|
||||||
WakuMessage(payload: @[byte 4], contentTopic: DefaultContentTopic),
|
WakuMessage(payload: @[byte 4], contentTopic: DefaultContentTopic, timestamp: currentTime - 5),
|
||||||
WakuMessage(payload: @[byte 5], contentTopic: DefaultContentTopic),
|
WakuMessage(payload: @[byte 5], contentTopic: DefaultContentTopic, timestamp: currentTime - 4),
|
||||||
WakuMessage(payload: @[byte 6], contentTopic: DefaultContentTopic),
|
WakuMessage(payload: @[byte 6], contentTopic: DefaultContentTopic, timestamp: currentTime - 3),
|
||||||
WakuMessage(payload: @[byte 7], contentTopic: DefaultContentTopic),
|
WakuMessage(payload: @[byte 7], contentTopic: DefaultContentTopic, timestamp: currentTime - 2),
|
||||||
WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic),
|
WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic, timestamp: currentTime - 1),
|
||||||
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"))
|
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"), timestamp: currentTime)
|
||||||
]
|
]
|
||||||
|
|
||||||
for msg in msgList:
|
for msg in msgList:
|
||||||
require serverProto.store.put(DefaultPubsubTopic, msg).isOk()
|
require serverProto.store.put(DefaultPubsubTopic, msg).isOk()
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let rpc = HistoryQuery(
|
var rpc = HistoryQuery(
|
||||||
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)],
|
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)],
|
||||||
pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD)
|
pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD)
|
||||||
)
|
)
|
||||||
let res = await clientProto.query(rpc)
|
var res = await clientProto.query(rpc)
|
||||||
|
require res.isOk()
|
||||||
|
|
||||||
|
var
|
||||||
|
response = res.tryGet()
|
||||||
|
totalMessages = response.messages.len()
|
||||||
|
totalQueries = 1
|
||||||
|
|
||||||
|
while response.pagingInfo.cursor != PagingIndex():
|
||||||
|
require:
|
||||||
|
totalQueries <= 4 # Sanity check here and guarantee that the test will not run forever
|
||||||
|
response.messages.len() == 2
|
||||||
|
response.pagingInfo.pageSize == 2
|
||||||
|
response.pagingInfo.direction == PagingDirection.FORWARD
|
||||||
|
|
||||||
|
rpc.pagingInfo = response.pagingInfo
|
||||||
|
|
||||||
|
# Continue querying
|
||||||
|
res = await clientProto.query(rpc)
|
||||||
|
require res.isOk()
|
||||||
|
response = res.tryGet()
|
||||||
|
totalMessages += response.messages.len()
|
||||||
|
totalQueries += 1
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
check:
|
check:
|
||||||
res.isOk()
|
totalQueries == 4 # 4 queries of pageSize 2
|
||||||
|
totalMessages == 8 # 8 messages in total
|
||||||
let response = res.tryGet()
|
|
||||||
check:
|
|
||||||
response.messages.len() == 2
|
|
||||||
response.pagingInfo.pageSize == 2
|
|
||||||
response.pagingInfo.direction == PagingDirection.FORWARD
|
|
||||||
response.pagingInfo.cursor != PagingIndex()
|
|
||||||
|
|
||||||
## Cleanup
|
## Cleanup
|
||||||
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
||||||
@ -384,39 +401,56 @@ procSuite "Waku Store - history query":
|
|||||||
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
## Given
|
## Given
|
||||||
|
let currentTime = getNanosecondTime(getTime().toUnixFloat())
|
||||||
let msgList = @[
|
let msgList = @[
|
||||||
WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")),
|
WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: currentTime - 9),
|
||||||
WakuMessage(payload: @[byte 1], contentTopic: DefaultContentTopic),
|
WakuMessage(payload: @[byte 1], contentTopic: DefaultContentTopic, timestamp: currentTime - 8),
|
||||||
WakuMessage(payload: @[byte 2], contentTopic: DefaultContentTopic),
|
WakuMessage(payload: @[byte 2], contentTopic: DefaultContentTopic, timestamp: currentTime - 7),
|
||||||
WakuMessage(payload: @[byte 3], contentTopic: DefaultContentTopic),
|
WakuMessage(payload: @[byte 3], contentTopic: DefaultContentTopic, timestamp: currentTime - 6),
|
||||||
WakuMessage(payload: @[byte 4], contentTopic: DefaultContentTopic),
|
WakuMessage(payload: @[byte 4], contentTopic: DefaultContentTopic, timestamp: currentTime - 5),
|
||||||
WakuMessage(payload: @[byte 5], contentTopic: DefaultContentTopic),
|
WakuMessage(payload: @[byte 5], contentTopic: DefaultContentTopic, timestamp: currentTime - 4),
|
||||||
WakuMessage(payload: @[byte 6], contentTopic: DefaultContentTopic),
|
WakuMessage(payload: @[byte 6], contentTopic: DefaultContentTopic, timestamp: currentTime - 3),
|
||||||
WakuMessage(payload: @[byte 7], contentTopic: DefaultContentTopic),
|
WakuMessage(payload: @[byte 7], contentTopic: DefaultContentTopic, timestamp: currentTime - 2),
|
||||||
WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic),
|
WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic, timestamp: currentTime - 1),
|
||||||
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"))
|
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"), timestamp: currentTime)
|
||||||
]
|
]
|
||||||
|
|
||||||
for msg in msgList:
|
for msg in msgList:
|
||||||
require serverProto.store.put(DefaultPubsubTopic, msg).isOk()
|
require serverProto.store.put(DefaultPubsubTopic, msg).isOk()
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let rpc = HistoryQuery(
|
var rpc = HistoryQuery(
|
||||||
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)],
|
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)],
|
||||||
pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD)
|
pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD)
|
||||||
)
|
)
|
||||||
let res = await clientProto.query(rpc)
|
var res = await clientProto.query(rpc)
|
||||||
|
require res.isOk()
|
||||||
|
|
||||||
|
var
|
||||||
|
response = res.tryGet()
|
||||||
|
totalMessages = response.messages.len()
|
||||||
|
totalQueries = 1
|
||||||
|
|
||||||
|
while response.pagingInfo.cursor != PagingIndex():
|
||||||
|
require:
|
||||||
|
totalQueries <= 4 # Sanity check here and guarantee that the test will not run forever
|
||||||
|
response.messages.len() == 2
|
||||||
|
response.pagingInfo.pageSize == 2
|
||||||
|
response.pagingInfo.direction == PagingDirection.BACKWARD
|
||||||
|
|
||||||
|
rpc.pagingInfo = response.pagingInfo
|
||||||
|
|
||||||
|
# Continue querying
|
||||||
|
res = await clientProto.query(rpc)
|
||||||
|
require res.isOk()
|
||||||
|
response = res.tryGet()
|
||||||
|
totalMessages += response.messages.len()
|
||||||
|
totalQueries += 1
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
check:
|
check:
|
||||||
res.isOk()
|
totalQueries == 4 # 4 queries of pageSize 2
|
||||||
|
totalMessages == 8 # 8 messages in total
|
||||||
let response = res.tryGet()
|
|
||||||
check:
|
|
||||||
response.messages.len() == 2
|
|
||||||
response.pagingInfo.pageSize == 2
|
|
||||||
response.pagingInfo.direction == PagingDirection.BACKWARD
|
|
||||||
response.pagingInfo.cursor != PagingIndex()
|
|
||||||
|
|
||||||
## Cleanup
|
## Cleanup
|
||||||
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
# libtool - Provide generalized library-building support services.
|
# libtool - Provide generalized library-building support services.
|
||||||
# Generated automatically by config.status (libbacktrace) version-unused
|
# Generated automatically by config.status (libbacktrace) version-unused
|
||||||
# Libtool was configured on host fv-az456-432:
|
# Libtool was configured on host fv-az554-756:
|
||||||
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
|
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
|
||||||
#
|
#
|
||||||
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,
|
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,
|
||||||
|
@ -22,11 +22,11 @@ type DbCursor* = (Timestamp, seq[byte], string)
|
|||||||
proc queryRowWakuMessageCallback(s: ptr sqlite3_stmt, contentTopicCol, payloadCol, versionCol, senderTimestampCol: cint): WakuMessage =
|
proc queryRowWakuMessageCallback(s: ptr sqlite3_stmt, contentTopicCol, payloadCol, versionCol, senderTimestampCol: cint): WakuMessage =
|
||||||
let
|
let
|
||||||
topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, contentTopicCol))
|
topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, contentTopicCol))
|
||||||
topicLength = sqlite3_column_bytes(s, 1)
|
topicLength = sqlite3_column_bytes(s, contentTopicCol)
|
||||||
contentTopic = string.fromBytes(@(toOpenArray(topic, 0, topicLength-1)))
|
contentTopic = string.fromBytes(@(toOpenArray(topic, 0, topicLength-1)))
|
||||||
let
|
let
|
||||||
p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, payloadCol))
|
p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, payloadCol))
|
||||||
length = sqlite3_column_bytes(s, 2)
|
length = sqlite3_column_bytes(s, payloadCol)
|
||||||
payload = @(toOpenArray(p, 0, length-1))
|
payload = @(toOpenArray(p, 0, length-1))
|
||||||
let version = sqlite3_column_int64(s, versionCol)
|
let version = sqlite3_column_int64(s, versionCol)
|
||||||
let senderTimestamp = sqlite3_column_int64(s, senderTimestampCol)
|
let senderTimestamp = sqlite3_column_int64(s, senderTimestampCol)
|
||||||
@ -45,7 +45,7 @@ proc queryRowReceiverTimestampCallback(s: ptr sqlite3_stmt, storedAtCol: cint):
|
|||||||
proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): string =
|
proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): string =
|
||||||
let
|
let
|
||||||
pubsubTopicPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, pubsubTopicCol))
|
pubsubTopicPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, pubsubTopicCol))
|
||||||
pubsubTopicLength = sqlite3_column_bytes(s, 3)
|
pubsubTopicLength = sqlite3_column_bytes(s, pubsubTopicCol)
|
||||||
pubsubTopic = string.fromBytes(@(toOpenArray(pubsubTopicPointer, 0, pubsubTopicLength-1)))
|
pubsubTopic = string.fromBytes(@(toOpenArray(pubsubTopicPointer, 0, pubsubTopicLength-1)))
|
||||||
|
|
||||||
pubsubTopic
|
pubsubTopic
|
||||||
@ -53,7 +53,7 @@ proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): str
|
|||||||
proc queryRowDigestCallback(s: ptr sqlite3_stmt, digestCol: cint): seq[byte] =
|
proc queryRowDigestCallback(s: ptr sqlite3_stmt, digestCol: cint): seq[byte] =
|
||||||
let
|
let
|
||||||
digestPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, digestCol))
|
digestPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, digestCol))
|
||||||
digestLength = sqlite3_column_bytes(s, 3)
|
digestLength = sqlite3_column_bytes(s, digestCol)
|
||||||
digest = @(toOpenArray(digestPointer, 0, digestLength-1))
|
digest = @(toOpenArray(digestPointer, 0, digestLength-1))
|
||||||
|
|
||||||
digest
|
digest
|
||||||
|
@ -150,8 +150,10 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.}
|
|||||||
|
|
||||||
|
|
||||||
if rows.len > int(qMaxPageSize):
|
if rows.len > int(qMaxPageSize):
|
||||||
# Build last message cursor
|
## Build last message cursor
|
||||||
let (pubsubTopic, message, digest, storeTimestamp) = rows[^1]
|
## The cursor is built from the last message INCLUDED in the response
|
||||||
|
## (i.e. the second last message in the rows list)
|
||||||
|
let (pubsubTopic, message, digest, storeTimestamp) = rows[^2]
|
||||||
|
|
||||||
# TODO: Improve coherence of MessageDigest type
|
# TODO: Improve coherence of MessageDigest type
|
||||||
var messageDigest: array[32, byte]
|
var messageDigest: array[32, byte]
|
||||||
|
Loading…
x
Reference in New Issue
Block a user