From fae20bff20738397dd7327e56aabfb6036cf19b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lex=20Cabeza=20Romero?= Date: Tue, 19 Dec 2023 15:10:27 +0100 Subject: [PATCH] refactor(store): HistoryQuery.direction (#2263) * Fix issue with default history query ascending value in serde operations: Should use the same value. * Update direction types to PagingDirection. --- tests/waku_archive/test_waku_archive.nim | 7 +++-- tests/waku_store/test_rpc_codec.nim | 7 +++-- tests/waku_store/test_waku_store.nim | 5 ++-- tests/waku_store/test_wakunode_store.nim | 5 ++-- waku/common/paging.nim | 35 ++++++++++++++++++++++++ waku/node/waku_node.nim | 2 +- waku/waku_api/jsonrpc/store/handlers.nim | 19 +++++++------ waku/waku_api/rest/store/handlers.nim | 11 ++++---- waku/waku_api/rest/store/openapi.yaml | 3 +- waku/waku_archive/archive.nim | 13 +++++---- waku/waku_archive/common.nim | 5 ++-- waku/waku_store/client.nim | 2 +- waku/waku_store/common.nim | 5 ++-- waku/waku_store/rpc.nim | 22 +++++++-------- waku/waku_store/rpc_codec.nim | 8 +++--- 15 files changed, 97 insertions(+), 52 deletions(-) create mode 100644 waku/common/paging.nim diff --git a/tests/waku_archive/test_waku_archive.nim b/tests/waku_archive/test_waku_archive.nim index fca4f467d..bd445839d 100644 --- a/tests/waku_archive/test_waku_archive.nim +++ b/tests/waku_archive/test_waku_archive.nim @@ -9,6 +9,7 @@ import import ../../../waku/common/databases/db_sqlite, + ../../../waku/common/paging, ../../../waku/waku_core, ../../../waku/waku_core/message/digest, ../../../waku/waku_archive/driver/sqlite_driver, @@ -341,7 +342,7 @@ procSuite "Waku Archive - find messages": ## Given let req = ArchiveQuery( pageSize: 4, - ascending: true + direction: PagingDirection.FORWARD ) ## When @@ -377,7 +378,7 @@ procSuite "Waku Archive - find messages": ## Given let req = ArchiveQuery( pageSize: 4, - ascending: false # backward + direction: PagingDirection.BACKWARD ) ## When @@ -454,7 +455,7 @@ procSuite "Waku Archive - find messages": contentTopics: @[ContentTopic("1")], startTime: some(ts(15, timeOrigin)), endTime: some(ts(55, timeOrigin)), - ascending: true + direction: PagingDirection.FORWARD ) ## When diff --git a/tests/waku_store/test_rpc_codec.nim b/tests/waku_store/test_rpc_codec.nim index f8582ab5b..c06354297 100644 --- a/tests/waku_store/test_rpc_codec.nim +++ b/tests/waku_store/test_rpc_codec.nim @@ -6,6 +6,7 @@ import chronos import ../../../waku/common/protobuf, + ../../../waku/common/paging, ../../../waku/waku_core, ../../../waku/waku_store/rpc, ../../../waku/waku_store/rpc_codec, @@ -53,7 +54,7 @@ procSuite "Waku Store - RPC codec": ## Given let index = PagingIndexRPC.compute(fakeWakuMessage(), receivedTime=ts(), pubsubTopic=DefaultPubsubTopic) - pagingInfo = PagingInfoRPC(pageSize: some(1'u64), cursor: some(index), direction: some(PagingDirectionRPC.FORWARD)) + pagingInfo = PagingInfoRPC(pageSize: some(1'u64), cursor: some(index), direction: some(PagingDirection.FORWARD)) ## When let pb = pagingInfo.encode() @@ -88,7 +89,7 @@ procSuite "Waku Store - RPC codec": ## Given let index = PagingIndexRPC.compute(fakeWakuMessage(), receivedTime=ts(), pubsubTopic=DefaultPubsubTopic) - pagingInfo = PagingInfoRPC(pageSize: some(1'u64), cursor: some(index), direction: some(PagingDirectionRPC.BACKWARD)) + pagingInfo = PagingInfoRPC(pageSize: some(1'u64), cursor: some(index), direction: some(PagingDirection.BACKWARD)) query = HistoryQueryRPC( contentFilters: @[HistoryContentFilterRPC(contentTopic: DefaultContentTopic), HistoryContentFilterRPC(contentTopic: DefaultContentTopic)], pagingInfo: some(pagingInfo), @@ -129,7 +130,7 @@ procSuite "Waku Store - RPC codec": let message = fakeWakuMessage() index = PagingIndexRPC.compute(message, receivedTime=ts(), pubsubTopic=DefaultPubsubTopic) - pagingInfo = PagingInfoRPC(pageSize: some(1'u64), cursor: some(index), direction: some(PagingDirectionRPC.BACKWARD)) + pagingInfo = PagingInfoRPC(pageSize: some(1'u64), cursor: some(index), direction: some(PagingDirection.BACKWARD)) res = HistoryResponseRPC(messages: @[message], pagingInfo: some(pagingInfo), error: HistoryResponseErrorRPC.INVALID_CURSOR) ## When diff --git a/tests/waku_store/test_waku_store.nim b/tests/waku_store/test_waku_store.nim index b854416d1..0436ba6a4 100644 --- a/tests/waku_store/test_waku_store.nim +++ b/tests/waku_store/test_waku_store.nim @@ -9,6 +9,7 @@ import import ../../../waku/[ + common/paging, node/peer_manager, waku_core, waku_store, @@ -46,7 +47,7 @@ suite "Waku Store - query handler": server = await newTestWakuStore(serverSwitch, handler=queryhandler) client = newTestWakuStoreClient(clientSwitch) - let req = HistoryQuery(contentTopics: @[DefaultContentTopic], ascending: true) + let req = HistoryQuery(contentTopics: @[DefaultContentTopic], direction: PagingDirection.FORWARD) ## When let queryRes = await client.query(req, peer=serverPeerInfo) @@ -88,7 +89,7 @@ suite "Waku Store - query handler": server = await newTestWakuStore(serverSwitch, handler=queryhandler) client = newTestWakuStoreClient(clientSwitch) - let req = HistoryQuery(contentTopics: @[DefaultContentTopic], ascending: true) + let req = HistoryQuery(contentTopics: @[DefaultContentTopic], direction: PagingDirection.FORWARD) ## When let queryRes = await client.query(req, peer=serverPeerInfo) diff --git a/tests/waku_store/test_wakunode_store.nim b/tests/waku_store/test_wakunode_store.nim index 7ac30bdb0..6c52d853c 100644 --- a/tests/waku_store/test_wakunode_store.nim +++ b/tests/waku_store/test_wakunode_store.nim @@ -14,6 +14,7 @@ import libp2p/protocols/pubsub/gossipsub import ../../../waku/common/databases/db_sqlite, + ../../../waku/common/paging, ../../../waku/waku_core, ../../../waku/waku_core/message/digest, ../../../waku/node/peer_manager, @@ -107,7 +108,7 @@ procSuite "WakuNode - Store": client.mountStoreClient() ## Given - let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 7, ascending: true) + let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 7, direction: PagingDirection.FORWARD) let serverPeer = server.peerInfo.toRemotePeerInfo() ## When @@ -158,7 +159,7 @@ procSuite "WakuNode - Store": client.mountStoreClient() ## Given - let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 7, ascending: false) + let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 7, direction: PagingDirection.BACKWARD) let serverPeer = server.peerInfo.toRemotePeerInfo() ## When diff --git a/waku/common/paging.nim b/waku/common/paging.nim new file mode 100644 index 000000000..287ee63a8 --- /dev/null +++ b/waku/common/paging.nim @@ -0,0 +1,35 @@ +import std/options + +type + PagingDirection* {.pure.} = enum + ## PagingDirection determines the direction of pagination + BACKWARD = uint32(0) + FORWARD = uint32(1) + + +proc default*(): PagingDirection {.inline.} = + PagingDirection.FORWARD + + +proc into*(b: bool): PagingDirection = + PagingDirection(b) + + +proc into*(b: Option[bool]): PagingDirection = + if b.isNone(): + return default() + b.get().into() + + +proc into*(d: PagingDirection): bool = + d == PagingDirection.FORWARD + + +proc into*(d: Option[PagingDirection]): bool = + if d.isNone(): + return false + d.get().into() + + +proc into*(s: string): PagingDirection = + (s == "true").into() diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index e50eb94c7..96e28c051 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -765,7 +765,7 @@ proc toArchiveQuery(request: HistoryQuery): ArchiveQuery = startTime: request.startTime, endTime: request.endTime, pageSize: request.pageSize.uint, - ascending: request.ascending + direction: request.direction ) # TODO: Review this mapping logic. Maybe, move it to the appplication code diff --git a/waku/waku_api/jsonrpc/store/handlers.nim b/waku/waku_api/jsonrpc/store/handlers.nim index fa0897f53..db53a79fc 100644 --- a/waku/waku_api/jsonrpc/store/handlers.nim +++ b/waku/waku_api/jsonrpc/store/handlers.nim @@ -8,11 +8,14 @@ import chronicles, json_rpc/rpcserver import - ../../../waku_core, - ../../../waku_store, + ../../../[ + waku_core, + waku_store, + waku_node + ], ../../../waku_store/rpc, - ../../../waku_node, ../../../node/peer_manager, + ../../../common/paging, ./types @@ -27,16 +30,14 @@ proc toPagingInfo*(pagingOptions: StorePagingOptions): PagingInfoRPC = PagingInfoRPC( pageSize: some(pagingOptions.pageSize), cursor: pagingOptions.cursor, - direction: if pagingOptions.forward: some(PagingDirectionRPC.FORWARD) - else: some(PagingDirectionRPC.BACKWARD) + direction: some(pagingOptions.forward.into()) ) proc toPagingOptions*(pagingInfo: PagingInfoRPC): StorePagingOptions = StorePagingOptions( pageSize: pagingInfo.pageSize.get(0'u64), cursor: pagingInfo.cursor, - forward: if pagingInfo.direction.isNone(): true - else: pagingInfo.direction.get() == PagingDirectionRPC.FORWARD + forward: pagingInfo.direction.into() ) proc toJsonRPCStoreResponse*(response: HistoryResponse): StoreResponse = @@ -65,8 +66,8 @@ proc installStoreApiHandlers*(node: WakuNode, server: RpcServer) = contentTopics: contentFiltersOption.get(@[]).mapIt(it.contentTopic), startTime: startTime, endTime: endTime, - ascending: if pagingOptions.isNone(): true - else: pagingOptions.get().forward, + direction: if pagingOptions.isNone(): default() + else: pagingOptions.get().forward.into(), pageSize: if pagingOptions.isNone(): DefaultPageSize else: min(pagingOptions.get().pageSize, MaxPageSize), cursor: if pagingOptions.isNone(): none(HistoryCursor) diff --git a/waku/waku_api/rest/store/handlers.nim b/waku/waku_api/rest/store/handlers.nim index de1ae34c1..57bcdaf61 100644 --- a/waku/waku_api/rest/store/handlers.nim +++ b/waku/waku_api/rest/store/handlers.nim @@ -15,6 +15,7 @@ import ../../../waku_store/common, ../../../waku_node, ../../../node/peer_manager, + ../../../common/paging, ../../handlers, ../responses, ../serdes, @@ -123,7 +124,7 @@ proc createHistoryQuery(pubsubTopic: Option[string], startTime: Option[string], endTime: Option[string], pageSize: Option[string], - ascending: Option[string]): + direction: Option[string]): Result[HistoryQuery, string] = @@ -164,16 +165,16 @@ proc createHistoryQuery(pubsubTopic: Option[string], let parsedEndTime = ? parseTime(endTime) # Parse ascending field - var parsedAscending = true - if ascending.isSome() and ascending.get() != "": - parsedAscending = ascending.get() == "true" + var parsedDirection = default() + if direction.isSome() and direction.get() != "": + parsedDirection = direction.get().into() return ok( HistoryQuery(pubsubTopic: parsedPubsubTopic, contentTopics: parsedContentTopics, startTime: parsedStartTime, endTime: parsedEndTime, - ascending: parsedAscending, + direction: parsedDirection, pageSize: parsedPagedSize, cursor: parsedCursor )) diff --git a/waku/waku_api/rest/store/openapi.yaml b/waku/waku_api/rest/store/openapi.yaml index cbd653ca3..fdd552e19 100644 --- a/waku/waku_api/rest/store/openapi.yaml +++ b/waku/waku_api/rest/store/openapi.yaml @@ -124,7 +124,8 @@ paths: schema: type: string description: > - "true" for paging forward, "false" for paging backward + "true" for paging forward, "false" for paging backward. + If not specified or if specified with an invalid value, the default is "true". example: "true" responses: diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index db642e383..0e16b1d3c 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -11,8 +11,11 @@ import regex, metrics import - ../common/databases/dburl, - ../common/databases/db_sqlite, + ../common/[ + databases/dburl, + databases/db_sqlite, + paging + ], ./driver, ./retention_policy, ./retention_policy/retention_policy_capacity, @@ -131,7 +134,7 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] { qEndTime = query.endTime qMaxPageSize = if query.pageSize <= 0: DefaultPageSize else: min(query.pageSize, MaxPageSize) - qAscendingOrder = query.ascending + isAscendingOrder = query.direction.into() if qContentTopics.len > 10: return err(ArchiveError.invalidQuery("too many content topics")) @@ -145,7 +148,7 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] { startTime = qStartTime, endTime = qEndTime, maxPageSize = qMaxPageSize + 1, - ascendingOrder = qAscendingOrder + ascendingOrder = isAscendingOrder ) let queryDuration = getTime().toUnixFloat() - queryStartTime @@ -188,7 +191,7 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] { )) # All messages MUST be returned in chronological order - if not qAscendingOrder: + if not isAscendingOrder: reverse(messages) return ok(ArchiveResponse(messages: messages, cursor: cursor)) diff --git a/waku/waku_archive/common.nim b/waku/waku_archive/common.nim index 29d2bfd22..78ae5b87f 100644 --- a/waku/waku_archive/common.nim +++ b/waku/waku_archive/common.nim @@ -9,7 +9,8 @@ import stew/byteutils, nimcrypto/sha2 import - ../waku_core + ../waku_core, + ../common/paging ## Waku message digest @@ -54,7 +55,7 @@ type startTime*: Option[Timestamp] endTime*: Option[Timestamp] pageSize*: uint - ascending*: bool + direction*: PagingDirection ArchiveResponse* = object messages*: seq[WakuMessage] diff --git a/waku/waku_store/client.nim b/waku/waku_store/client.nim index 735c01cf5..792f25848 100644 --- a/waku/waku_store/client.nim +++ b/waku/waku_store/client.nim @@ -199,7 +199,7 @@ when defined(waku_exp_store_resume): startTime: some(queryStartTime), endTime: some(queryEndTime), pageSize: uint64(pageSize), - ascending: true + direction: default() ) var res: WakuStoreResult[seq[WakuMessage]] diff --git a/waku/waku_store/common.nim b/waku/waku_store/common.nim index 7d06bf4cf..6302e6478 100644 --- a/waku/waku_store/common.nim +++ b/waku/waku_store/common.nim @@ -9,7 +9,8 @@ import stew/byteutils, nimcrypto/sha2 import - ../waku_core + ../waku_core, + ../common/paging const @@ -55,7 +56,7 @@ type startTime*: Option[Timestamp] endTime*: Option[Timestamp] pageSize*: uint64 - ascending*: bool + direction*: PagingDirection HistoryResponse* = object messages*: seq[WakuMessage] diff --git a/waku/waku_store/rpc.nim b/waku/waku_store/rpc.nim index c0b105c7b..dadd9140d 100644 --- a/waku/waku_store/rpc.nim +++ b/waku/waku_store/rpc.nim @@ -8,11 +8,14 @@ import stew/results import ../waku_core, + ../common/paging, ./common ## Wire protocol +const HistoryQueryDirectionDefaultValue = default(type HistoryQuery.direction) + type PagingIndexRPC* = object ## This type contains the description of an Index used in the pagination of WakuMessages pubsubTopic*: PubsubTopic @@ -41,16 +44,11 @@ proc compute*(T: type PagingIndexRPC, msg: WakuMessage, receivedTime: Timestamp, type - PagingDirectionRPC* {.pure.} = enum - ## PagingDirection determines the direction of pagination - BACKWARD = uint32(0) - FORWARD = uint32(1) - PagingInfoRPC* = object ## This type holds the information needed for the pagination pageSize*: Option[uint64] cursor*: Option[PagingIndexRPC] - direction*: Option[PagingDirectionRPC] + direction*: Option[PagingDirection] type @@ -120,14 +118,14 @@ proc toRPC*(query: HistoryQuery): HistoryQueryRPC = rpc.pagingInfo = block: if query.cursor.isNone() and query.pageSize == default(type query.pageSize) and - query.ascending == default(type query.ascending): + query.direction == HistoryQueryDirectionDefaultValue: none(PagingInfoRPC) else: let pageSize = some(query.pageSize) cursor = query.cursor.map(toRPC) - direction = if query.ascending: some(PagingDirectionRPC.FORWARD) - else: some(PagingDirectionRPC.BACKWARD) + direction = some(query.direction) + some(PagingInfoRPC( pageSize: pageSize, cursor: cursor, @@ -156,8 +154,8 @@ proc toAPI*(rpc: HistoryQueryRPC): HistoryQuery = pageSize = if rpc.pagingInfo.isNone() or rpc.pagingInfo.get().pageSize.isNone(): 0'u64 else: rpc.pagingInfo.get().pageSize.get() - ascending = if rpc.pagingInfo.isNone() or rpc.pagingInfo.get().direction.isNone(): true - else: rpc.pagingInfo.get().direction.get() == PagingDirectionRPC.FORWARD + direction = if rpc.pagingInfo.isNone() or rpc.pagingInfo.get().direction.isNone(): HistoryQueryDirectionDefaultValue + else: rpc.pagingInfo.get().direction.get() HistoryQuery( pubsubTopic: pubsubTopic, @@ -166,7 +164,7 @@ proc toAPI*(rpc: HistoryQueryRPC): HistoryQuery = startTime: startTime, endTime: endTime, pageSize: pageSize, - ascending: ascending + direction: direction ) diff --git a/waku/waku_store/rpc_codec.nim b/waku/waku_store/rpc_codec.nim index 3223fb7ec..5d44b3ba9 100644 --- a/waku/waku_store/rpc_codec.nim +++ b/waku/waku_store/rpc_codec.nim @@ -7,7 +7,7 @@ import std/options, nimcrypto/hash import - ../common/protobuf, + ../common/[protobuf, paging], ../waku_core, ./common, ./rpc @@ -74,7 +74,7 @@ proc encode*(rpc: PagingInfoRPC): ProtoBuffer = pb.write3(1, rpc.pageSize) pb.write3(2, rpc.cursor.map(encode)) - pb.write3(3, rpc.direction.map(proc(d: PagingDirectionRPC): uint32 = uint32(ord(d)))) + pb.write3(3, rpc.direction.map(proc(d: PagingDirection): uint32 = uint32(ord(d)))) pb.finish3() pb @@ -99,9 +99,9 @@ proc decode*(T: type PagingInfoRPC, buffer: seq[byte]): ProtobufResult[T] = var direction: uint32 if not ?pb.getField(3, direction): - rpc.direction = none(PagingDirectionRPC) + rpc.direction = none(PagingDirection) else: - rpc.direction = some(PagingDirectionRPC(direction)) + rpc.direction = some(PagingDirection(direction)) ok(rpc)