refactor(store): HistoryQuery.direction (#2263)

* Fix issue with default history query ascending value in serde operations: Should use the same value.
* Update direction types to PagingDirection.
This commit is contained in:
Álex Cabeza Romero 2023-12-19 15:10:27 +01:00 committed by GitHub
parent 8b37919ee0
commit fae20bff20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 97 additions and 52 deletions

View File

@ -9,6 +9,7 @@ import
import
../../../waku/common/databases/db_sqlite,
../../../waku/common/paging,
../../../waku/waku_core,
../../../waku/waku_core/message/digest,
../../../waku/waku_archive/driver/sqlite_driver,
@ -341,7 +342,7 @@ procSuite "Waku Archive - find messages":
## Given
let req = ArchiveQuery(
pageSize: 4,
ascending: true
direction: PagingDirection.FORWARD
)
## When
@ -377,7 +378,7 @@ procSuite "Waku Archive - find messages":
## Given
let req = ArchiveQuery(
pageSize: 4,
ascending: false # backward
direction: PagingDirection.BACKWARD
)
## When
@ -454,7 +455,7 @@ procSuite "Waku Archive - find messages":
contentTopics: @[ContentTopic("1")],
startTime: some(ts(15, timeOrigin)),
endTime: some(ts(55, timeOrigin)),
ascending: true
direction: PagingDirection.FORWARD
)
## When

View File

@ -6,6 +6,7 @@ import
chronos
import
../../../waku/common/protobuf,
../../../waku/common/paging,
../../../waku/waku_core,
../../../waku/waku_store/rpc,
../../../waku/waku_store/rpc_codec,
@ -53,7 +54,7 @@ procSuite "Waku Store - RPC codec":
## Given
let
index = PagingIndexRPC.compute(fakeWakuMessage(), receivedTime=ts(), pubsubTopic=DefaultPubsubTopic)
pagingInfo = PagingInfoRPC(pageSize: some(1'u64), cursor: some(index), direction: some(PagingDirectionRPC.FORWARD))
pagingInfo = PagingInfoRPC(pageSize: some(1'u64), cursor: some(index), direction: some(PagingDirection.FORWARD))
## When
let pb = pagingInfo.encode()
@ -88,7 +89,7 @@ procSuite "Waku Store - RPC codec":
## Given
let
index = PagingIndexRPC.compute(fakeWakuMessage(), receivedTime=ts(), pubsubTopic=DefaultPubsubTopic)
pagingInfo = PagingInfoRPC(pageSize: some(1'u64), cursor: some(index), direction: some(PagingDirectionRPC.BACKWARD))
pagingInfo = PagingInfoRPC(pageSize: some(1'u64), cursor: some(index), direction: some(PagingDirection.BACKWARD))
query = HistoryQueryRPC(
contentFilters: @[HistoryContentFilterRPC(contentTopic: DefaultContentTopic), HistoryContentFilterRPC(contentTopic: DefaultContentTopic)],
pagingInfo: some(pagingInfo),
@ -129,7 +130,7 @@ procSuite "Waku Store - RPC codec":
let
message = fakeWakuMessage()
index = PagingIndexRPC.compute(message, receivedTime=ts(), pubsubTopic=DefaultPubsubTopic)
pagingInfo = PagingInfoRPC(pageSize: some(1'u64), cursor: some(index), direction: some(PagingDirectionRPC.BACKWARD))
pagingInfo = PagingInfoRPC(pageSize: some(1'u64), cursor: some(index), direction: some(PagingDirection.BACKWARD))
res = HistoryResponseRPC(messages: @[message], pagingInfo: some(pagingInfo), error: HistoryResponseErrorRPC.INVALID_CURSOR)
## When

View File

@ -9,6 +9,7 @@ import
import
../../../waku/[
common/paging,
node/peer_manager,
waku_core,
waku_store,
@ -46,7 +47,7 @@ suite "Waku Store - query handler":
server = await newTestWakuStore(serverSwitch, handler=queryhandler)
client = newTestWakuStoreClient(clientSwitch)
let req = HistoryQuery(contentTopics: @[DefaultContentTopic], ascending: true)
let req = HistoryQuery(contentTopics: @[DefaultContentTopic], direction: PagingDirection.FORWARD)
## When
let queryRes = await client.query(req, peer=serverPeerInfo)
@ -88,7 +89,7 @@ suite "Waku Store - query handler":
server = await newTestWakuStore(serverSwitch, handler=queryhandler)
client = newTestWakuStoreClient(clientSwitch)
let req = HistoryQuery(contentTopics: @[DefaultContentTopic], ascending: true)
let req = HistoryQuery(contentTopics: @[DefaultContentTopic], direction: PagingDirection.FORWARD)
## When
let queryRes = await client.query(req, peer=serverPeerInfo)

View File

@ -14,6 +14,7 @@ import
libp2p/protocols/pubsub/gossipsub
import
../../../waku/common/databases/db_sqlite,
../../../waku/common/paging,
../../../waku/waku_core,
../../../waku/waku_core/message/digest,
../../../waku/node/peer_manager,
@ -107,7 +108,7 @@ procSuite "WakuNode - Store":
client.mountStoreClient()
## Given
let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 7, ascending: true)
let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 7, direction: PagingDirection.FORWARD)
let serverPeer = server.peerInfo.toRemotePeerInfo()
## When
@ -158,7 +159,7 @@ procSuite "WakuNode - Store":
client.mountStoreClient()
## Given
let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 7, ascending: false)
let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 7, direction: PagingDirection.BACKWARD)
let serverPeer = server.peerInfo.toRemotePeerInfo()
## When

35
waku/common/paging.nim Normal file
View File

@ -0,0 +1,35 @@
import std/options
type
PagingDirection* {.pure.} = enum
## PagingDirection determines the direction of pagination
BACKWARD = uint32(0)
FORWARD = uint32(1)
proc default*(): PagingDirection {.inline.} =
PagingDirection.FORWARD
proc into*(b: bool): PagingDirection =
PagingDirection(b)
proc into*(b: Option[bool]): PagingDirection =
if b.isNone():
return default()
b.get().into()
proc into*(d: PagingDirection): bool =
d == PagingDirection.FORWARD
proc into*(d: Option[PagingDirection]): bool =
if d.isNone():
return false
d.get().into()
proc into*(s: string): PagingDirection =
(s == "true").into()

View File

@ -765,7 +765,7 @@ proc toArchiveQuery(request: HistoryQuery): ArchiveQuery =
startTime: request.startTime,
endTime: request.endTime,
pageSize: request.pageSize.uint,
ascending: request.ascending
direction: request.direction
)
# TODO: Review this mapping logic. Maybe, move it to the appplication code

View File

@ -8,11 +8,14 @@ import
chronicles,
json_rpc/rpcserver
import
../../../waku_core,
../../../waku_store,
../../../[
waku_core,
waku_store,
waku_node
],
../../../waku_store/rpc,
../../../waku_node,
../../../node/peer_manager,
../../../common/paging,
./types
@ -27,16 +30,14 @@ proc toPagingInfo*(pagingOptions: StorePagingOptions): PagingInfoRPC =
PagingInfoRPC(
pageSize: some(pagingOptions.pageSize),
cursor: pagingOptions.cursor,
direction: if pagingOptions.forward: some(PagingDirectionRPC.FORWARD)
else: some(PagingDirectionRPC.BACKWARD)
direction: some(pagingOptions.forward.into())
)
proc toPagingOptions*(pagingInfo: PagingInfoRPC): StorePagingOptions =
StorePagingOptions(
pageSize: pagingInfo.pageSize.get(0'u64),
cursor: pagingInfo.cursor,
forward: if pagingInfo.direction.isNone(): true
else: pagingInfo.direction.get() == PagingDirectionRPC.FORWARD
forward: pagingInfo.direction.into()
)
proc toJsonRPCStoreResponse*(response: HistoryResponse): StoreResponse =
@ -65,8 +66,8 @@ proc installStoreApiHandlers*(node: WakuNode, server: RpcServer) =
contentTopics: contentFiltersOption.get(@[]).mapIt(it.contentTopic),
startTime: startTime,
endTime: endTime,
ascending: if pagingOptions.isNone(): true
else: pagingOptions.get().forward,
direction: if pagingOptions.isNone(): default()
else: pagingOptions.get().forward.into(),
pageSize: if pagingOptions.isNone(): DefaultPageSize
else: min(pagingOptions.get().pageSize, MaxPageSize),
cursor: if pagingOptions.isNone(): none(HistoryCursor)

View File

@ -15,6 +15,7 @@ import
../../../waku_store/common,
../../../waku_node,
../../../node/peer_manager,
../../../common/paging,
../../handlers,
../responses,
../serdes,
@ -123,7 +124,7 @@ proc createHistoryQuery(pubsubTopic: Option[string],
startTime: Option[string],
endTime: Option[string],
pageSize: Option[string],
ascending: Option[string]):
direction: Option[string]):
Result[HistoryQuery, string] =
@ -164,16 +165,16 @@ proc createHistoryQuery(pubsubTopic: Option[string],
let parsedEndTime = ? parseTime(endTime)
# Parse ascending field
var parsedAscending = true
if ascending.isSome() and ascending.get() != "":
parsedAscending = ascending.get() == "true"
var parsedDirection = default()
if direction.isSome() and direction.get() != "":
parsedDirection = direction.get().into()
return ok(
HistoryQuery(pubsubTopic: parsedPubsubTopic,
contentTopics: parsedContentTopics,
startTime: parsedStartTime,
endTime: parsedEndTime,
ascending: parsedAscending,
direction: parsedDirection,
pageSize: parsedPagedSize,
cursor: parsedCursor
))

View File

@ -124,7 +124,8 @@ paths:
schema:
type: string
description: >
"true" for paging forward, "false" for paging backward
"true" for paging forward, "false" for paging backward.
If not specified or if specified with an invalid value, the default is "true".
example: "true"
responses:

View File

@ -11,8 +11,11 @@ import
regex,
metrics
import
../common/databases/dburl,
../common/databases/db_sqlite,
../common/[
databases/dburl,
databases/db_sqlite,
paging
],
./driver,
./retention_policy,
./retention_policy/retention_policy_capacity,
@ -131,7 +134,7 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] {
qEndTime = query.endTime
qMaxPageSize = if query.pageSize <= 0: DefaultPageSize
else: min(query.pageSize, MaxPageSize)
qAscendingOrder = query.ascending
isAscendingOrder = query.direction.into()
if qContentTopics.len > 10:
return err(ArchiveError.invalidQuery("too many content topics"))
@ -145,7 +148,7 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] {
startTime = qStartTime,
endTime = qEndTime,
maxPageSize = qMaxPageSize + 1,
ascendingOrder = qAscendingOrder
ascendingOrder = isAscendingOrder
)
let queryDuration = getTime().toUnixFloat() - queryStartTime
@ -188,7 +191,7 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] {
))
# All messages MUST be returned in chronological order
if not qAscendingOrder:
if not isAscendingOrder:
reverse(messages)
return ok(ArchiveResponse(messages: messages, cursor: cursor))

View File

@ -9,7 +9,8 @@ import
stew/byteutils,
nimcrypto/sha2
import
../waku_core
../waku_core,
../common/paging
## Waku message digest
@ -54,7 +55,7 @@ type
startTime*: Option[Timestamp]
endTime*: Option[Timestamp]
pageSize*: uint
ascending*: bool
direction*: PagingDirection
ArchiveResponse* = object
messages*: seq[WakuMessage]

View File

@ -199,7 +199,7 @@ when defined(waku_exp_store_resume):
startTime: some(queryStartTime),
endTime: some(queryEndTime),
pageSize: uint64(pageSize),
ascending: true
direction: default()
)
var res: WakuStoreResult[seq[WakuMessage]]

View File

@ -9,7 +9,8 @@ import
stew/byteutils,
nimcrypto/sha2
import
../waku_core
../waku_core,
../common/paging
const
@ -55,7 +56,7 @@ type
startTime*: Option[Timestamp]
endTime*: Option[Timestamp]
pageSize*: uint64
ascending*: bool
direction*: PagingDirection
HistoryResponse* = object
messages*: seq[WakuMessage]

View File

@ -8,11 +8,14 @@ import
stew/results
import
../waku_core,
../common/paging,
./common
## Wire protocol
const HistoryQueryDirectionDefaultValue = default(type HistoryQuery.direction)
type PagingIndexRPC* = object
## This type contains the description of an Index used in the pagination of WakuMessages
pubsubTopic*: PubsubTopic
@ -41,16 +44,11 @@ proc compute*(T: type PagingIndexRPC, msg: WakuMessage, receivedTime: Timestamp,
type
PagingDirectionRPC* {.pure.} = enum
## PagingDirection determines the direction of pagination
BACKWARD = uint32(0)
FORWARD = uint32(1)
PagingInfoRPC* = object
## This type holds the information needed for the pagination
pageSize*: Option[uint64]
cursor*: Option[PagingIndexRPC]
direction*: Option[PagingDirectionRPC]
direction*: Option[PagingDirection]
type
@ -120,14 +118,14 @@ proc toRPC*(query: HistoryQuery): HistoryQueryRPC =
rpc.pagingInfo = block:
if query.cursor.isNone() and
query.pageSize == default(type query.pageSize) and
query.ascending == default(type query.ascending):
query.direction == HistoryQueryDirectionDefaultValue:
none(PagingInfoRPC)
else:
let
pageSize = some(query.pageSize)
cursor = query.cursor.map(toRPC)
direction = if query.ascending: some(PagingDirectionRPC.FORWARD)
else: some(PagingDirectionRPC.BACKWARD)
direction = some(query.direction)
some(PagingInfoRPC(
pageSize: pageSize,
cursor: cursor,
@ -156,8 +154,8 @@ proc toAPI*(rpc: HistoryQueryRPC): HistoryQuery =
pageSize = if rpc.pagingInfo.isNone() or rpc.pagingInfo.get().pageSize.isNone(): 0'u64
else: rpc.pagingInfo.get().pageSize.get()
ascending = if rpc.pagingInfo.isNone() or rpc.pagingInfo.get().direction.isNone(): true
else: rpc.pagingInfo.get().direction.get() == PagingDirectionRPC.FORWARD
direction = if rpc.pagingInfo.isNone() or rpc.pagingInfo.get().direction.isNone(): HistoryQueryDirectionDefaultValue
else: rpc.pagingInfo.get().direction.get()
HistoryQuery(
pubsubTopic: pubsubTopic,
@ -166,7 +164,7 @@ proc toAPI*(rpc: HistoryQueryRPC): HistoryQuery =
startTime: startTime,
endTime: endTime,
pageSize: pageSize,
ascending: ascending
direction: direction
)

View File

@ -7,7 +7,7 @@ import
std/options,
nimcrypto/hash
import
../common/protobuf,
../common/[protobuf, paging],
../waku_core,
./common,
./rpc
@ -74,7 +74,7 @@ proc encode*(rpc: PagingInfoRPC): ProtoBuffer =
pb.write3(1, rpc.pageSize)
pb.write3(2, rpc.cursor.map(encode))
pb.write3(3, rpc.direction.map(proc(d: PagingDirectionRPC): uint32 = uint32(ord(d))))
pb.write3(3, rpc.direction.map(proc(d: PagingDirection): uint32 = uint32(ord(d))))
pb.finish3()
pb
@ -99,9 +99,9 @@ proc decode*(T: type PagingInfoRPC, buffer: seq[byte]): ProtobufResult[T] =
var direction: uint32
if not ?pb.getField(3, direction):
rpc.direction = none(PagingDirectionRPC)
rpc.direction = none(PagingDirection)
else:
rpc.direction = some(PagingDirectionRPC(direction))
rpc.direction = some(PagingDirection(direction))
ok(rpc)