From b07cdb1841390b64abb9f7cbb67ff0d2654f6c77 Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Wed, 9 Nov 2022 18:50:18 +0100 Subject: [PATCH] refactor(store): decouple waku store public api types from rpc types --- apps/chat2/chat2.nim | 2 +- tests/v2/test_jsonrpc_waku.nim | 3 +- .../test_message_store_queue_pagination.nim | 18 +- tests/v2/test_message_store_sqlite.nim | 19 +- tests/v2/test_message_store_sqlite_query.nim | 30 +- tests/v2/test_waku_store.nim | 119 ++++--- tests/v2/test_waku_store_client.nim | 27 +- tests/v2/test_waku_store_rpc_codec.nim | 69 ++-- tests/v2/test_waku_swap.nim | 10 +- tests/v2/test_wakunode_store.nim | 4 +- tools/scripts/rpc_query.nim | 4 +- tools/simulation/quicksim2.nim | 2 +- waku/v2/node/jsonrpc/jsonrpc_callsigs.nim | 2 +- waku/v2/node/jsonrpc/jsonrpc_types.nim | 4 +- waku/v2/node/jsonrpc/jsonrpc_utils.nim | 26 +- waku/v2/node/jsonrpc/store_api.nim | 32 +- .../node/message_store/queue_store/index.nim | 12 +- .../message_store/queue_store/queue_store.nim | 6 +- .../sqlite_store/sqlite_store.nim | 6 +- waku/v2/node/waku_node.nim | 2 +- waku/v2/protocol/waku_store.nim | 16 +- waku/v2/protocol/waku_store/client.nim | 85 +++-- waku/v2/protocol/waku_store/common.nim | 104 ++++++ waku/v2/protocol/waku_store/message_store.nim | 13 +- waku/v2/protocol/waku_store/pagination.nim | 70 ---- waku/v2/protocol/waku_store/protocol.nim | 325 ++++++++++-------- .../protocol/waku_store/protocol_metrics.nim | 4 +- waku/v2/protocol/waku_store/rpc.nim | 240 ++++++++++++- waku/v2/protocol/waku_store/rpc_codec.nim | 56 +-- 29 files changed, 831 insertions(+), 479 deletions(-) create mode 100644 waku/v2/protocol/waku_store/common.nim delete mode 100644 waku/v2/protocol/waku_store/pagination.nim diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index 775090ccd..88ccf27b4 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -484,7 +484,7 @@ proc processInput(rfd: AsyncFD) {.async.} = echo &"{chatLine}" info "Hit store handler" - let queryRes = await node.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: chat.contentTopic)])) + let queryRes = await node.query(HistoryQuery(contentTopics: @[chat.contentTopic])) if queryRes.isOk(): storeHandler(queryRes.value) diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index 227a63c3d..17a7377ea 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -25,6 +25,7 @@ import ../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_relay, ../../waku/v2/protocol/waku_store, + ../../waku/v2/protocol/waku_store/rpc, ../../waku/v2/protocol/waku_swap/waku_swap, ../../waku/v2/protocol/waku_filter, ../../waku/v2/protocol/waku_filter/client, @@ -263,7 +264,7 @@ procSuite "Waku v2 JSON-RPC API": let client = newRpcHttpClient() await client.connect("127.0.0.1", rpcPort, false) - let response = await client.get_waku_v2_store_v1_messages(some(DefaultPubsubTopic), some(@[HistoryContentFilter(contentTopic: DefaultContentTopic)]), some(Timestamp(0)), some(Timestamp(9)), some(StorePagingOptions())) + let response = await client.get_waku_v2_store_v1_messages(some(DefaultPubsubTopic), some(@[HistoryContentFilterRPC(contentTopic: DefaultContentTopic)]), some(Timestamp(0)), some(Timestamp(9)), some(StorePagingOptions())) check: response.messages.len() == 8 response.pagingOptions.isNone() diff --git a/tests/v2/test_message_store_queue_pagination.nim b/tests/v2/test_message_store_queue_pagination.nim index c99875bee..69dcd8bca 100644 --- a/tests/v2/test_message_store_queue_pagination.nim +++ b/tests/v2/test_message_store_queue_pagination.nim @@ -151,7 +151,14 @@ procSuite "Queue store - pagination": test "Forward pagination - invalid cursor": ## Given - let index = PagingIndex.compute(WakuMessage(payload: @[byte 10]), ts(), DefaultPubsubTopic).toIndex() + let msg = fakeWakuMessage(payload= @[byte 10]) + let index = HistoryCursor( + pubsubTopic: DefaultPubsubTopic, + senderTime: msg.timestamp, + storeTime: msg.timestamp, + digest: computeDigest(msg) + ).toIndex() + let pageSize: uint64 = 10 cursor: Option[Index] = some(index) @@ -325,7 +332,14 @@ procSuite "Queue store - pagination": test "Backward pagination - invalid cursor": ## Given - let index = PagingIndex.compute(WakuMessage(payload: @[byte 10]), ts(), DefaultPubsubTopic).toIndex() + let msg = fakeWakuMessage(payload= @[byte 10]) + let index = HistoryCursor( + pubsubTopic: DefaultPubsubTopic, + senderTime: msg.timestamp, + storeTime: msg.timestamp, + digest: computeDigest(msg) + ).toIndex() + let pageSize: uint64 = 2 cursor: Option[Index] = some(index) diff --git a/tests/v2/test_message_store_sqlite.nim b/tests/v2/test_message_store_sqlite.nim index a5df6ea5a..61e90dd82 100644 --- a/tests/v2/test_message_store_sqlite.nim +++ b/tests/v2/test_message_store_sqlite.nim @@ -10,7 +10,7 @@ import ../../waku/v2/node/message_store/message_retention_policy, ../../waku/v2/node/message_store/message_retention_policy_capacity, ../../waku/v2/protocol/waku_message, - ../../waku/v2/protocol/waku_store/pagination, + ../../waku/v2/protocol/waku_store, ../../waku/v2/utils/time, ./utils, ./testlib/common @@ -137,12 +137,17 @@ suite "Message Store": WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: DefaultContentTopic, version: high(uint32), timestamp: t3), ] - var indexes: seq[PagingIndex] = @[] + var indexes: seq[HistoryCursor] = @[] for msg in msgs: require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() - let index = PagingIndex.compute(msg, msg.timestamp, DefaultPubsubTopic) - indexes.add(index) + let cursor = HistoryCursor( + pubsubTopic: DefaultPubsubTopic, + senderTime: msg.timestamp, + storeTime: msg.timestamp, + digest: computeDigest(msg) + ) + indexes.add(cursor) ## When let res = store.getAllMessages() @@ -167,9 +172,9 @@ suite "Message Store": pubsubTopic == DefaultPubsubTopic # check correct retrieval of receiver timestamps - if receiverTimestamp == indexes[0].receiverTime: rt1Flag = true - if receiverTimestamp == indexes[1].receiverTime: rt2Flag = true - if receiverTimestamp == indexes[2].receiverTime: rt3Flag = true + if receiverTimestamp == indexes[0].storeTime: rt1Flag = true + if receiverTimestamp == indexes[1].storeTime: rt2Flag = true + if receiverTimestamp == indexes[2].storeTime: rt3Flag = true check: msg in msgs diff --git a/tests/v2/test_message_store_sqlite_query.nim b/tests/v2/test_message_store_sqlite_query.nim index 54fff2aec..c578402cd 100644 --- a/tests/v2/test_message_store_sqlite_query.nim +++ b/tests/v2/test_message_store_sqlite_query.nim @@ -8,7 +8,7 @@ import ../../waku/common/sqlite, ../../waku/v2/node/message_store/sqlite_store, ../../waku/v2/protocol/waku_message, - ../../waku/v2/protocol/waku_store/pagination, + ../../waku/v2/protocol/waku_store, ./utils, ./testlib/common @@ -231,7 +231,12 @@ suite "message store - history query": for msg in messages: require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() - let cursor = PagingIndex.compute(messages[4], messages[4].timestamp, DefaultPubsubTopic) + let cursor = HistoryCursor( + pubsubTopic: DefaultPubsubTopic, + senderTime: messages[4].timestamp, + storeTime: messages[4].timestamp, + digest: computeDigest(messages[4]) + ) ## When let res = store.getMessagesByHistoryQuery( @@ -279,7 +284,12 @@ suite "message store - history query": for msg in messages: require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() - let cursor = PagingIndex.compute(messages[6], messages[6].timestamp, DefaultPubsubTopic) + let cursor = HistoryCursor( + pubsubTopic: DefaultPubsubTopic, + senderTime: messages[6].timestamp, + storeTime: messages[6].timestamp, + digest: computeDigest(messages[6]) + ) ## When let res = store.getMessagesByHistoryQuery( @@ -330,7 +340,12 @@ suite "message store - history query": for msg in messages2: require store.put(pubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() - let cursor = PagingIndex.compute(messages2[0], messages2[0].timestamp, DefaultPubsubTopic) + let cursor = HistoryCursor( + pubsubTopic: DefaultPubsubTopic, + senderTime: messages2[0].timestamp, + storeTime: messages2[0].timestamp, + digest: computeDigest(messages2[0]) + ) ## When let res = store.getMessagesByHistoryQuery( @@ -597,7 +612,12 @@ suite "message store - history query": for msg in messages: require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() - let cursor = PagingIndex.compute(messages[3], messages[3].timestamp, DefaultPubsubTopic) + let cursor = HistoryCursor( + pubsubTopic: DefaultPubsubTopic, + senderTime: messages[3].timestamp, + storeTime: messages[3].timestamp, + digest: computeDigest(messages[3]) + ) ## When let res = store.getMessagesByHistoryQuery( diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index 268f326e0..0db00c2d4 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -3,7 +3,8 @@ import std/[options, sequtils], testutils/unittests, - chronos, + chronos, + chronicles, libp2p/crypto/crypto import ../../waku/common/sqlite, @@ -90,14 +91,14 @@ procSuite "Waku Store - history query": let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() ## When - let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)]) - let resQuery = await client.query(rpc, peer=serverPeerInfo) + let req = HistoryQuery(contentTopics: @[topic]) + let queryRes = await client.query(req, peer=serverPeerInfo) ## Then check: - resQuery.isOk() + queryRes.isOk() - let response = resQuery.tryGet() + let response = queryRes.tryGet() check: response.messages.len == 1 response.messages == @[msg1] @@ -135,17 +136,14 @@ procSuite "Waku Store - history query": let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() ## When - let rpc = HistoryQuery(contentFilters: @[ - HistoryContentFilter(contentTopic: topic1), - HistoryContentFilter(contentTopic: topic3) - ]) - let resQuery = await client.query(rpc, peer=serverPeerInfo) + let req = HistoryQuery(contentTopics: @[topic1, topic3]) + let queryRes = await client.query(req, peer=serverPeerInfo) ## Then check: - resQuery.isOk() + queryRes.isOk() - let response = resQuery.tryGet() + let response = queryRes.tryGet() check: response.messages.len() == 2 response.messages.anyIt(it == msg1) @@ -189,18 +187,17 @@ procSuite "Waku Store - history query": ## When # this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3) - let rpc = HistoryQuery( - contentFilters: @[HistoryContentFilter(contentTopic: contentTopic1), - HistoryContentFilter(contentTopic: contentTopic3)], - pubsubTopic: pubsubTopic1 + let req = HistoryQuery( + pubsubTopic: some(pubsubTopic1), + contentTopics: @[contentTopic1, contentTopic3] ) - let resQuery = await client.query(rpc, peer=serverPeerInfo) + let queryRes = await client.query(req, peer=serverPeerInfo) ## Then check: - resQuery.isOk() + queryRes.isOk() - let response = resQuery.tryGet() + let response = queryRes.tryGet() check: response.messages.len() == 1 response.messages.anyIt(it == msg1) @@ -237,8 +234,8 @@ procSuite "Waku Store - history query": let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() ## When - let rpc = HistoryQuery(pubsubTopic: pubsubTopic1) - let res = await client.query(rpc, peer=serverPeerInfo) + let req = HistoryQuery(pubsubTopic: some(pubsubTopic1)) + let res = await client.query(req, peer=serverPeerInfo) ## Then check: @@ -278,8 +275,8 @@ procSuite "Waku Store - history query": let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() ## When - let rpc = HistoryQuery(pubsubTopic: pubsubTopic) - let res = await client.query(rpc, peer=serverPeerInfo) + let req = HistoryQuery(pubsubTopic: some(pubsubTopic)) + let res = await client.query(req, peer=serverPeerInfo) ## Then check: @@ -328,11 +325,12 @@ procSuite "Waku Store - history query": let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() ## When - var rpc = HistoryQuery( - contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)], - pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) + var req = HistoryQuery( + contentTopics: @[DefaultContentTopic], + pageSize: 2, + ascending: true ) - var res = await client.query(rpc, peer=serverPeerInfo) + var res = await client.query(req, peer=serverPeerInfo) require res.isOk() var @@ -340,17 +338,17 @@ procSuite "Waku Store - history query": totalMessages = response.messages.len() totalQueries = 1 - while response.pagingInfo.cursor != PagingIndex(): + while response.cursor.isSome(): require: totalQueries <= 4 # Sanity check here and guarantee that the test will not run forever response.messages.len() == 2 - response.pagingInfo.pageSize == 2 - response.pagingInfo.direction == PagingDirection.FORWARD + response.pageSize == 2 + response.ascending == true - rpc.pagingInfo = response.pagingInfo + req.cursor = response.cursor # Continue querying - res = await client.query(rpc, peer=serverPeerInfo) + res = await client.query(req, peer=serverPeerInfo) require res.isOk() response = res.tryGet() totalMessages += response.messages.len() @@ -397,11 +395,12 @@ procSuite "Waku Store - history query": let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() ## When - var rpc = HistoryQuery( - contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)], - pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD) + var req = HistoryQuery( + contentTopics: @[DefaultContentTopic], + pageSize: 2, + ascending: false ) - var res = await client.query(rpc, peer=serverPeerInfo) + var res = await client.query(req, peer=serverPeerInfo) require res.isOk() var @@ -409,17 +408,17 @@ procSuite "Waku Store - history query": totalMessages = response.messages.len() totalQueries = 1 - while response.pagingInfo.cursor != PagingIndex(): + while response.cursor.isSome(): require: totalQueries <= 4 # Sanity check here and guarantee that the test will not run forever response.messages.len() == 2 - response.pagingInfo.pageSize == 2 - response.pagingInfo.direction == PagingDirection.BACKWARD + response.pageSize == 2 + response.ascending == false - rpc.pagingInfo = response.pagingInfo + req.cursor = response.cursor # Continue querying - res = await client.query(rpc, peer=serverPeerInfo) + res = await client.query(req, peer=serverPeerInfo) require res.isOk() response = res.tryGet() totalMessages += response.messages.len() @@ -465,8 +464,8 @@ procSuite "Waku Store - history query": let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() ## When - let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]) - let res = await client.query(rpc, peer=serverPeerInfo) + let req = HistoryQuery(contentTopics: @[DefaultContentTopic]) + let res = await client.query(req, peer=serverPeerInfo) ## Then check: @@ -477,7 +476,7 @@ procSuite "Waku Store - history query": ## No pagination specified. Response will be auto-paginated with ## up to MaxPageSize messages per page. response.messages.len() == 8 - response.pagingInfo == PagingInfo() + response.cursor.isNone() ## Cleanup await allFutures(clientSwitch.stop(), serverSwitch.stop()) @@ -495,16 +494,16 @@ procSuite "Waku Store - history query": client = newTestWakuStoreClient(clientSwitch) ## Given - let rpc = HistoryQuery( - contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], - startTime: Timestamp(2), - endTime: Timestamp(5) + let req = HistoryQuery( + contentTopics: @[ContentTopic("1")], + startTime: some(Timestamp(2)), + endTime: some(Timestamp(5)) ) let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() ## When - let res = await client.query(rpc, peer=serverPeerInfo) + let res = await client.query(req, peer=serverPeerInfo) ## Then check res.isOk() @@ -532,16 +531,16 @@ procSuite "Waku Store - history query": client = newTestWakuStoreClient(clientSwitch) ## Given - let rpc = HistoryQuery( - contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], - startTime: Timestamp(2), - endTime: Timestamp(2) + let req = HistoryQuery( + contentTopics: @[ContentTopic("1")], + startTime: some(Timestamp(2)), + endTime: some(Timestamp(2)) ) let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() ## When - let res = await client.query(rpc, peer=serverPeerInfo) + let res = await client.query(req, peer=serverPeerInfo) ## Then check res.isOk() @@ -567,16 +566,16 @@ procSuite "Waku Store - history query": client = newTestWakuStoreClient(clientSwitch) ## Given - let rpc = HistoryQuery( - contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], - startTime: Timestamp(5), - endTime: Timestamp(2) + let req = HistoryQuery( + contentTopics: @[ContentTopic("1")], + startTime: some(Timestamp(5)), + endTime: some(Timestamp(2)) ) - let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() + let serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() ## When - let res = await client.query(rpc, peer=serverPeerInfo) + let res = await client.query(req, peer=serverPeerInfo) ## Then check res.isOk() diff --git a/tests/v2/test_waku_store_client.nim b/tests/v2/test_waku_store_client.nim index 77e0611ed..7a0b066c8 100644 --- a/tests/v2/test_waku_store_client.nim +++ b/tests/v2/test_waku_store_client.nim @@ -11,7 +11,7 @@ import ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_store, - ../../waku/v2/protocol/waku_store/client, + ../../waku/v2/protocol/waku_store/client {.all.}, ./testlib/common, ./testlib/switch @@ -81,13 +81,10 @@ procSuite "Waku Store Client": ## Given let peer = serverSwitch.peerInfo.toRemotePeerInfo() - let rpc = HistoryQuery( - contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)], - pagingInfo: PagingInfo(pageSize: 8) - ) + let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 8) ## When - let res = await client.query(rpc, peer) + let res = await client.query(req, peer) ## Then check: @@ -98,7 +95,7 @@ procSuite "Waku Store Client": ## No pagination specified. Response will be auto-paginated with ## up to MaxPageSize messages per page. response.messages.len() == 8 - response.pagingInfo != PagingInfo() + response.cursor.isSome() ## Cleanup await allFutures(clientSwitch.stop(), serverSwitch.stop()) @@ -117,13 +114,10 @@ procSuite "Waku Store Client": ## Given let peer = serverSwitch.peerInfo.toRemotePeerInfo() - let rpc = HistoryQuery( - contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)], - pagingInfo: PagingInfo(pageSize: 5) - ) + let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 5) ## When - let res = await client.queryWithPaging(rpc, peer) + let res = await client.queryAll(req, peer) ## Then check: @@ -136,6 +130,8 @@ procSuite "Waku Store Client": ## Cleanup await allFutures(clientSwitch.stop(), serverSwitch.stop()) + + # TODO: Move to resume test suite asyncTest "multiple query to multiple peers with pagination": ## Setup let @@ -155,13 +151,10 @@ procSuite "Waku Store Client": serverSwitchA.peerInfo.toRemotePeerInfo(), serverSwitchB.peerInfo.toRemotePeerInfo() ] - let rpc = HistoryQuery( - contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)], - pagingInfo: PagingInfo(pageSize: 5) - ) + let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 5) ## When - let res = await client.queryLoop(rpc, peers) + let res = await client.queryLoop(req, peers) ## Then check: diff --git a/tests/v2/test_waku_store_rpc_codec.nim b/tests/v2/test_waku_store_rpc_codec.nim index f575cdfc0..828cfc05f 100644 --- a/tests/v2/test_waku_store_rpc_codec.nim +++ b/tests/v2/test_waku_store_rpc_codec.nim @@ -5,20 +5,21 @@ import testutils/unittests, chronos import - ../../waku/v2/protocol/waku_store, + ../../waku/v2/protocol/waku_store/rpc, + ../../waku/v2/protocol/waku_store/rpc_codec, ../../waku/v2/utils/time, ./testlib/common procSuite "Waku Store - RPC codec": - test "PagingIndex protobuf codec": + test "PagingIndexRPC protobuf codec": ## Given - let index = PagingIndex.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic) + let index = PagingIndexRPC.compute(fakeWakuMessage(), receivedTime=ts(), pubsubTopic=DefaultPubsubTopic) ## When let encodedIndex = index.encode() - let decodedIndexRes = PagingIndex.decode(encodedIndex.buffer) + let decodedIndexRes = PagingIndexRPC.decode(encodedIndex.buffer) ## Then check: @@ -29,12 +30,12 @@ procSuite "Waku Store - RPC codec": # The fields of decodedIndex must be the same as the original index decodedIndex == index - test "PagingIndex protobuf codec - empty index": + test "PagingIndexRPC protobuf codec - empty index": ## Given - let emptyIndex = PagingIndex() + let emptyIndex = PagingIndexRPC() let encodedIndex = emptyIndex.encode() - let decodedIndexRes = PagingIndex.decode(encodedIndex.buffer) + let decodedIndexRes = PagingIndexRPC.decode(encodedIndex.buffer) ## Then check: @@ -42,18 +43,18 @@ procSuite "Waku Store - RPC codec": let decodedIndex = decodedIndexRes.tryGet() check: - # Check the correctness of init and encode for an empty PagingIndex + # Check the correctness of init and encode for an empty PagingIndexRPC decodedIndex == emptyIndex - test "PagingInfo protobuf codec": + test "PagingInfoRPC protobuf codec": ## Given let - index = PagingIndex.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic) - pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.FORWARD) + index = PagingIndexRPC.compute(fakeWakuMessage(), receivedTime=ts(), pubsubTopic=DefaultPubsubTopic) + pagingInfo = PagingInfoRPC(pageSize: 1, cursor: index, direction: PagingDirectionRPC.FORWARD) ## When let pb = pagingInfo.encode() - let decodedPagingInfo = PagingInfo.decode(pb.buffer) + let decodedPagingInfo = PagingInfoRPC.decode(pb.buffer) ## Then check: @@ -64,32 +65,32 @@ procSuite "Waku Store - RPC codec": decodedPagingInfo.value == pagingInfo decodedPagingInfo.value.direction == pagingInfo.direction - test "PagingInfo protobuf codec - empty paging info": + test "PagingInfoRPC protobuf codec - empty paging info": ## Given - let emptyPagingInfo = PagingInfo() + let emptyPagingInfo = PagingInfoRPC() ## When let pb = emptyPagingInfo.encode() - let decodedEmptyPagingInfo = PagingInfo.decode(pb.buffer) + let decodedEmptyPagingInfo = PagingInfoRPC.decode(pb.buffer) ## Then check: decodedEmptyPagingInfo.isOk() check: - # check the correctness of init and encode for an empty PagingInfo + # check the correctness of init and encode for an empty PagingInfoRPC decodedEmptyPagingInfo.value == emptyPagingInfo - test "HistoryQuery protobuf codec": + test "HistoryQueryRPC protobuf codec": ## Given let - index = PagingIndex.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic) - pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD) - query = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic), HistoryContentFilter(contentTopic: DefaultContentTopic)], pagingInfo: pagingInfo, startTime: Timestamp(10), endTime: Timestamp(11)) + index = PagingIndexRPC.compute(fakeWakuMessage(), receivedTime=ts(), pubsubTopic=DefaultPubsubTopic) + pagingInfo = PagingInfoRPC(pageSize: 1, cursor: index, direction: PagingDirectionRPC.BACKWARD) + query = HistoryQueryRPC(contentFilters: @[HistoryContentFilterRPC(contentTopic: DefaultContentTopic), HistoryContentFilterRPC(contentTopic: DefaultContentTopic)], pagingInfo: pagingInfo, startTime: Timestamp(10), endTime: Timestamp(11)) ## When let pb = query.encode() - let decodedQuery = HistoryQuery.decode(pb.buffer) + let decodedQuery = HistoryQueryRPC.decode(pb.buffer) ## Then check: @@ -99,33 +100,33 @@ procSuite "Waku Store - RPC codec": # the fields of decoded query decodedQuery must be the same as the original query query decodedQuery.value == query - test "HistoryQuery protobuf codec - empty history query": + test "HistoryQueryRPC protobuf codec - empty history query": ## Given - let emptyQuery = HistoryQuery() + let emptyQuery = HistoryQueryRPC() ## When let pb = emptyQuery.encode() - let decodedEmptyQuery = HistoryQuery.decode(pb.buffer) + let decodedEmptyQuery = HistoryQueryRPC.decode(pb.buffer) ## Then check: decodedEmptyQuery.isOk() check: - # check the correctness of init and encode for an empty HistoryQuery + # check the correctness of init and encode for an empty HistoryQueryRPC decodedEmptyQuery.value == emptyQuery - test "HistoryResponse protobuf codec": + test "HistoryResponseRPC protobuf codec": ## Given let message = fakeWakuMessage() - index = PagingIndex.compute(message, receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic) - pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD) - res = HistoryResponse(messages: @[message], pagingInfo:pagingInfo, error: HistoryResponseError.INVALID_CURSOR) + index = PagingIndexRPC.compute(message, receivedTime=ts(), pubsubTopic=DefaultPubsubTopic) + pagingInfo = PagingInfoRPC(pageSize: 1, cursor: index, direction: PagingDirectionRPC.BACKWARD) + res = HistoryResponseRPC(messages: @[message], pagingInfo:pagingInfo, error: HistoryResponseErrorRPC.INVALID_CURSOR) ## When let pb = res.encode() - let decodedRes = HistoryResponse.decode(pb.buffer) + let decodedRes = HistoryResponseRPC.decode(pb.buffer) ## Then check: @@ -135,18 +136,18 @@ procSuite "Waku Store - RPC codec": # the fields of decoded response decodedRes must be the same as the original response res decodedRes.value == res - test "HistoryResponse protobuf codec - empty history response": + test "HistoryResponseRPC protobuf codec - empty history response": ## Given - let emptyRes = HistoryResponse() + let emptyRes = HistoryResponseRPC() ## When let pb = emptyRes.encode() - let decodedEmptyRes = HistoryResponse.decode(pb.buffer) + let decodedEmptyRes = HistoryResponseRPC.decode(pb.buffer) ## Then check: decodedEmptyRes.isOk() check: - # check the correctness of init and encode for an empty HistoryResponse + # check the correctness of init and encode for an empty HistoryResponseRPC decodedEmptyRes.value == emptyRes \ No newline at end of file diff --git a/tests/v2/test_waku_swap.nim b/tests/v2/test_waku_swap.nim index 0ed86626f..8c0e98711 100644 --- a/tests/v2/test_waku_swap.nim +++ b/tests/v2/test_waku_swap.nim @@ -81,10 +81,10 @@ procSuite "Waku SWAP Accounting": require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk() let serverPeer = server.peerInfo.toRemotePeerInfo() - let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]) + let req = HistoryQuery(contentTopics: @[DefaultContentTopic]) ## When - let queryRes = await client.query(rpc, peer=serverPeer) + let queryRes = await client.query(req, peer=serverPeer) ## Then check queryRes.isOk() @@ -133,12 +133,12 @@ procSuite "Waku SWAP Accounting": require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk() let serverPeer = server.peerInfo.toRemotePeerInfo() - let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]) + let req = HistoryQuery(contentTopics: @[DefaultContentTopic]) ## When # TODO: Handshakes - for now we assume implicit, e2e still works for PoC - let res1 = await client.query(rpc, peer=serverPeer) - let res2 = await client.query(rpc, peer=serverPeer) + let res1 = await client.query(req, peer=serverPeer) + let res2 = await client.query(req, peer=serverPeer) require: res1.isOk() diff --git a/tests/v2/test_wakunode_store.nim b/tests/v2/test_wakunode_store.nim index 05e443a2f..064ef03a3 100644 --- a/tests/v2/test_wakunode_store.nim +++ b/tests/v2/test_wakunode_store.nim @@ -54,7 +54,7 @@ procSuite "WakuNode - Store": let serverPeer = server.peerInfo.toRemotePeerInfo() ## When - let req = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]) + let req = HistoryQuery(contentTopics: @[DefaultContentTopic]) let queryRes = await client.query(req, peer=serverPeer) ## Then @@ -106,7 +106,7 @@ procSuite "WakuNode - Store": # Wait for the server filter to receive the push message require await filterFut.withTimeout(5.seconds) - let res = await client.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]), peer=serverPeer) + let res = await client.query(HistoryQuery(contentTopics: @[DefaultContentTopic]), peer=serverPeer) ## Then check res.isOk() diff --git a/tools/scripts/rpc_query.nim b/tools/scripts/rpc_query.nim index 518418e04..86a043a6c 100644 --- a/tools/scripts/rpc_query.nim +++ b/tools/scripts/rpc_query.nim @@ -10,7 +10,7 @@ import ../../waku/v2/node/waku_payload, ../../waku/v2/node/jsonrpc/jsonrpc_types, ../../waku/v2/protocol/waku_filter, - ../../waku/v2/protocol/waku_store, + ../../waku/v2/protocol/waku_store/rpc, ../../waku/v1/node/rpc/hexstrings @@ -39,5 +39,5 @@ echo "Content topic is:", input var node = newRpcHttpClient() waitfor node.connect("localhost", rpcPort) -var res = waitfor node.get_waku_v2_store_v1_messages(some(pubsubTopic), some(@[HistoryContentFilter(contentTopic: ContentTopic(input))]), none(StorePagingOptions)) +var res = waitfor node.get_waku_v2_store_v1_messages(some(pubsubTopic), some(@[HistoryContentFilterRPC(contentTopic: ContentTopic(input))]), none(StorePagingOptions)) echo "Waku query response: ", res diff --git a/tools/simulation/quicksim2.nim b/tools/simulation/quicksim2.nim index 12490f98b..7cc50b28a 100644 --- a/tools/simulation/quicksim2.nim +++ b/tools/simulation/quicksim2.nim @@ -7,7 +7,7 @@ import libp2p/protobuf/minprotobuf import ../../waku/v2/protocol/waku_filter, - ../../waku/v2/protocol/waku_store, + ../../waku/v2/protocol/waku_store/rpc, ../../waku/v2/protocol/waku_message, ../../waku/v2/utils/time, ../../waku/v2/node/waku_node, diff --git a/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim b/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim index 24aba4322..6011c4040 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim +++ b/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim @@ -16,7 +16,7 @@ proc delete_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool # Store API -proc get_waku_v2_store_v1_messages(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[Timestamp], endTime: Option[Timestamp], pagingOptions: Option[StorePagingOptions]): StoreResponse +proc get_waku_v2_store_v1_messages(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilterRPC]], startTime: Option[Timestamp], endTime: Option[Timestamp], pagingOptions: Option[StorePagingOptions]): StoreResponse # Filter API diff --git a/waku/v2/node/jsonrpc/jsonrpc_types.nim b/waku/v2/node/jsonrpc/jsonrpc_types.nim index 8ae5248c5..08a57b84a 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_types.nim +++ b/waku/v2/node/jsonrpc/jsonrpc_types.nim @@ -7,7 +7,7 @@ import std/[options,tables], eth/keys, ../../protocol/waku_message, - ../../protocol/waku_store/pagination, + ../../protocol/waku_store/rpc, ../../utils/time type @@ -18,7 +18,7 @@ type StorePagingOptions* = object ## This type holds some options for pagination pageSize*: uint64 - cursor*: Option[PagingIndex] + cursor*: Option[PagingIndexRPC] forward*: bool WakuRelayMessage* = object diff --git a/waku/v2/node/jsonrpc/jsonrpc_utils.nim b/waku/v2/node/jsonrpc/jsonrpc_utils.nim index eafe7d06c..124694ed7 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_utils.nim +++ b/waku/v2/node/jsonrpc/jsonrpc_utils.nim @@ -9,6 +9,7 @@ import ../../../v1/node/rpc/hexstrings, ../../protocol/waku_message, ../../protocol/waku_store, + ../../protocol/waku_store/rpc, ../../utils/time, ../waku_payload, ./jsonrpc_types @@ -29,19 +30,26 @@ proc `%`*(value: WakuMessage): JsonNode = ## Since the Waku v2 JSON-RPC API has its own defined types, ## we need to convert between these and the types for the Nim API -proc toPagingInfo*(pagingOptions: StorePagingOptions): PagingInfo = - PagingInfo(pageSize: pagingOptions.pageSize, - cursor: if pagingOptions.cursor.isSome: pagingOptions.cursor.get else: PagingIndex(), - direction: if pagingOptions.forward: PagingDirection.FORWARD else: PagingDirection.BACKWARD) +proc toPagingInfo*(pagingOptions: StorePagingOptions): PagingInfoRPC = + PagingInfoRPC(pageSize: pagingOptions.pageSize, + cursor: if pagingOptions.cursor.isSome: pagingOptions.cursor.get else: PagingIndexRPC(), + direction: if pagingOptions.forward: PagingDirectionRPC.FORWARD else: PagingDirectionRPC.BACKWARD) -proc toPagingOptions*(pagingInfo: PagingInfo): StorePagingOptions = +proc toPagingOptions*(pagingInfo: PagingInfoRPC): StorePagingOptions = StorePagingOptions(pageSize: pagingInfo.pageSize, cursor: some(pagingInfo.cursor), - forward: if pagingInfo.direction == PagingDirection.FORWARD: true else: false) + forward: if pagingInfo.direction == PagingDirectionRPC.FORWARD: true else: false) -proc toStoreResponse*(historyResponse: HistoryResponse): StoreResponse = - StoreResponse(messages: historyResponse.messages, - pagingOptions: if historyResponse.pagingInfo != PagingInfo(): some(historyResponse.pagingInfo.toPagingOptions()) else: none(StorePagingOptions)) +proc toJsonRPCStoreResponse*(response: HistoryResponse): StoreResponse = + StoreResponse( + messages: response.messages, + pagingOptions: if response.cursor.isNone(): none(StorePagingOptions) + else: some(StorePagingOptions( + pageSize: response.pageSize, + forward: response.ascending, + cursor: response.cursor.map(toRPC) + )) + ) proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessage = var t: Timestamp diff --git a/waku/v2/node/jsonrpc/store_api.nim b/waku/v2/node/jsonrpc/store_api.nim index 9cf0c9c84..f5e75b60a 100644 --- a/waku/v2/node/jsonrpc/store_api.nim +++ b/waku/v2/node/jsonrpc/store_api.nim @@ -4,14 +4,15 @@ else: {.push raises: [].} import - std/options, + std/[options, sequtils], chronicles, json_rpc/rpcserver import - ../peer_manager/peer_manager, - ../waku_node, ../../protocol/waku_store, + ../../protocol/waku_store/rpc, ../../utils/time, + ../waku_node, + ../peer_manager/peer_manager, ./jsonrpc_types, ./jsonrpc_utils @@ -25,7 +26,7 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = ## Store API version 1 definitions - rpcsrv.rpc("get_waku_v2_store_v1_messages") do (pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[Timestamp], endTime: Option[Timestamp], pagingOptions: Option[StorePagingOptions]) -> StoreResponse: + rpcsrv.rpc("get_waku_v2_store_v1_messages") do (pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilterRPC]], startTime: Option[Timestamp], endTime: Option[Timestamp], pagingOptions: Option[StorePagingOptions]) -> StoreResponse: ## Returns history for a list of content topics with optional paging debug "get_waku_v2_store_v1_messages" @@ -33,12 +34,21 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = if peerOpt.isNone(): raise newException(ValueError, "no suitable remote store peers") - let historyQuery = HistoryQuery(pubsubTopic: if pubsubTopicOption.isSome: pubsubTopicOption.get() else: "", - contentFilters: if contentFiltersOption.isSome: contentFiltersOption.get() else: @[], - startTime: if startTime.isSome: startTime.get() else: Timestamp(0), - endTime: if endTime.isSome: endTime.get() else: Timestamp(0), - pagingInfo: if pagingOptions.isSome: pagingOptions.get.toPagingInfo() else: PagingInfo()) - let queryFut = node.query(historyQuery, peerOpt.get()) + let req = HistoryQuery( + pubsubTopic: pubsubTopicOption, + contentTopics: if contentFiltersOption.isNone(): @[] + else: contentFiltersOption.get().mapIt(it.contentTopic), + startTime: startTime, + endTime: endTime, + ascending: if pagingOptions.isNone(): true + else: pagingOptions.get().forward, + pageSize: if pagingOptions.isNone(): DefaultPageSize + else: min(pagingOptions.get().pageSize, MaxPageSize), + cursor: if pagingOptions.isNone(): none(HistoryCursor) + else: pagingOptions.get().cursor.map(toAPI) + ) + + let queryFut = node.query(req, peerOpt.get()) if not await queryFut.withTimeout(futTimeout): raise newException(ValueError, "No history response received (timeout)") @@ -48,4 +58,4 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = raise newException(ValueError, $res.error) debug "get_waku_v2_store_v1_messages response" - return res.value.toStoreResponse() + return res.value.toJsonRPCStoreResponse() diff --git a/waku/v2/node/message_store/queue_store/index.nim b/waku/v2/node/message_store/queue_store/index.nim index 83ffda3cf..3bcb5c08c 100644 --- a/waku/v2/node/message_store/queue_store/index.nim +++ b/waku/v2/node/message_store/queue_store/index.nim @@ -8,7 +8,7 @@ import nimcrypto/sha2 import ../../../protocol/waku_message, - ../../../protocol/waku_store/pagination, + ../../../protocol/waku_store/common, ../../../utils/time @@ -33,19 +33,19 @@ proc compute*(T: type Index, msg: WakuMessage, receivedTime: Timestamp, pubsubTo ) -proc toPagingIndex*(index: Index): PagingIndex = - PagingIndex( +proc tohistoryCursor*(index: Index): HistoryCursor = + HistoryCursor( pubsubTopic: index.pubsubTopic, senderTime: index.senderTime, - receiverTime: index.receiverTime, + storeTime: index.receiverTime, digest: index.digest ) -proc toIndex*(index: PagingIndex): Index = +proc toIndex*(index: HistoryCursor): Index = Index( pubsubTopic: index.pubsubTopic, senderTime: index.senderTime, - receiverTime: index.receiverTime, + receiverTime: index.storeTime, digest: index.digest ) diff --git a/waku/v2/node/message_store/queue_store/queue_store.nim b/waku/v2/node/message_store/queue_store/queue_store.nim index 1fa0694cf..9e2579580 100644 --- a/waku/v2/node/message_store/queue_store/queue_store.nim +++ b/waku/v2/node/message_store/queue_store/queue_store.nim @@ -10,7 +10,7 @@ import chronicles import ../../../protocol/waku_message, - ../../../protocol/waku_store/pagination, + ../../../protocol/waku_store/common, ../../../protocol/waku_store/message_store, ../../../utils/time, ./index @@ -271,8 +271,8 @@ method getAllMessages*(store: StoreQueueRef): MessageStoreResult[seq[MessageStor method getMessagesByHistoryQuery*( store: StoreQueueRef, contentTopic = none(seq[ContentTopic]), - pubsubTopic = none(string), - cursor = none(PagingIndex), + pubsubTopic = none(PubsubTopic), + cursor = none(HistoryCursor), startTime = none(Timestamp), endTime = none(Timestamp), maxPageSize = DefaultPageSize, diff --git a/waku/v2/node/message_store/sqlite_store/sqlite_store.nim b/waku/v2/node/message_store/sqlite_store/sqlite_store.nim index 0add7ffb2..929bd1d0d 100644 --- a/waku/v2/node/message_store/sqlite_store/sqlite_store.nim +++ b/waku/v2/node/message_store/sqlite_store/sqlite_store.nim @@ -12,7 +12,7 @@ import import ../../../../common/sqlite, ../../../protocol/waku_message, - ../../../protocol/waku_store/pagination, + ../../../protocol/waku_store/common, ../../../protocol/waku_store/message_store, ../../../utils/time, ./queries @@ -99,13 +99,13 @@ method getMessagesByHistoryQuery*( s: SqliteStore, contentTopic = none(seq[ContentTopic]), pubsubTopic = none(PubsubTopic), - cursor = none(PagingIndex), + cursor = none(HistoryCursor), startTime = none(Timestamp), endTime = none(Timestamp), maxPageSize = DefaultPageSize, ascendingOrder = true ): MessageStoreResult[seq[MessageStoreRow]] = - let cursor = cursor.map(proc(c: PagingIndex): DbCursor = (c.receiverTime, @(c.digest.data), c.pubsubTopic)) + let cursor = cursor.map(proc(c: HistoryCursor): DbCursor = (c.storeTime, @(c.digest.data), c.pubsubTopic)) return s.db.selectMessagesByHistoryQueryWithLimit( contentTopic, diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index 9c879b519..042361fd9 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -611,7 +611,7 @@ proc query*(node: WakuNode, query: HistoryQuery, peer: RemotePeerInfo): Future[W let queryRes = await node.wakuStoreClient.query(query, peer) if queryRes.isErr(): - return err(queryRes.error) + return err($queryRes.error) let response = queryRes.get() diff --git a/waku/v2/protocol/waku_store.nim b/waku/v2/protocol/waku_store.nim index 2491e05a3..3233d02b2 100644 --- a/waku/v2/protocol/waku_store.nim +++ b/waku/v2/protocol/waku_store.nim @@ -1,13 +1,9 @@ import - ./waku_store/protocol, - ./waku_store/rpc, - ./waku_store/rpc_codec, - ./waku_store/pagination, - ./waku_store/message_store + ./waku_store/common, + ./waku_store/message_store, + ./waku_store/protocol export - protocol, - rpc, - rpc_codec, - pagination, - message_store \ No newline at end of file + common, + message_store, + protocol \ No newline at end of file diff --git a/waku/v2/protocol/waku_store/client.nim b/waku/v2/protocol/waku_store/client.nim index 69e4c2f82..9741e0f34 100644 --- a/waku/v2/protocol/waku_store/client.nim +++ b/waku/v2/protocol/waku_store/client.nim @@ -16,9 +16,8 @@ import ../../utils/time, ../waku_message, ../waku_swap/waku_swap, - ./protocol, ./protocol_metrics, - ./pagination, + ./common, ./rpc, ./rpc_codec, ./message_store @@ -28,6 +27,10 @@ logScope: topics = "waku store client" +const + DefaultPageSize*: uint64 = 20 # A recommended default number of waku messages per page + + type WakuStoreClient* = ref object peerManager: PeerManager rng: ref rand.HmacDrbgContext @@ -40,28 +43,45 @@ proc new*(T: type WakuStoreClient, store: MessageStore): T = WakuStoreClient(peerManager: peerManager, rng: rng, store: store) +proc sendHistoryQueryRPC(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo): Future[HistoryResult] {.async, gcsafe.} = -proc query*(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} = let connOpt = await w.peerManager.dialPeer(peer, WakuStoreCodec) if connOpt.isNone(): waku_store_errors.inc(labelValues = [dialFailure]) - return err(dialFailure) + return err(HistoryError(kind: HistoryErrorKind.PEER_DIAL_FAILURE, address: $peer)) + let connection = connOpt.get() - let rpc = HistoryRPC(requestId: generateRequestId(w.rng), query: req) - await connection.writeLP(rpc.encode().buffer) - var message = await connection.readLp(MaxRpcSize.int) - let response = HistoryRPC.decode(message) + let reqRpc = HistoryRPC(requestId: generateRequestId(w.rng), query: req.toRPC()) + await connection.writeLP(reqRpc.encode().buffer) - if response.isErr(): - error "failed to decode response" + + let buf = await connection.readLp(MaxRpcSize.int) + let respDecodeRes = HistoryRPC.decode(buf) + if respDecodeRes.isErr(): waku_store_errors.inc(labelValues = [decodeRpcFailure]) - return err(decodeRpcFailure) + return err(HistoryError(kind: HistoryErrorKind.BAD_RESPONSE, cause: decodeRpcFailure)) - return ok(response.value.response) + let respRpc = respDecodeRes.get() -proc queryWithPaging*(w: WakuStoreClient, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} = + + # Disabled ,for now, since the default response is a possible case (no messages, pagesize = 0, error = NONE(0)) + # TODO: Rework the RPC protocol to differentiate the default value from an empty value (e.g., status = 200 (OK)) + # and rework the protobuf parsing to return Option[T] when empty values are received + # if respRpc.response == default(HistoryResponseRPC): + # waku_store_errors.inc(labelValues = [emptyRpcResponseFailure]) + # return err(HistoryError(kind: HistoryErrorKind.BAD_RESPONSE, cause: emptyRpcResponseFailure)) + + let resp = respRpc.response + + return resp.toAPI() + + +proc query*(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo): Future[HistoryResult] {.async, gcsafe.} = + return await w.sendHistoryQueryRPC(req, peer) + +proc queryAll*(w: WakuStoreClient, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} = ## A thin wrapper for query. Sends the query to the given peer. when the query has a valid pagingInfo, ## it retrieves the historical messages in pages. ## Returns all the fetched messages, if error occurs, returns an error string @@ -72,29 +92,34 @@ proc queryWithPaging*(w: WakuStoreClient, query: HistoryQuery, peer: RemotePeerI var messageList: seq[WakuMessage] = @[] while true: - let res = await w.query(req, peer) - if res.isErr(): - return err(res.error) + let queryRes = await w.query(req, peer) + if queryRes.isErr(): + return err($queryRes.error) - let response = res.get() + let response = queryRes.get() messageList.add(response.messages) # Check whether it is the last page - if response.pagingInfo == PagingInfo(): + if response.cursor.isNone(): break # Update paging cursor - req.pagingInfo.cursor = response.pagingInfo.cursor + req.cursor = response.cursor return ok(messageList) -proc queryLoop*(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} = + +## Resume store + +const StoreResumeTimeWindowOffset: Timestamp = getNanosecondTime(20) ## Adjust the time window with an offset of 20 seconds + +proc queryLoop(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} = ## Loops through the peers candidate list in order and sends the query to each ## ## Once all responses have been received, the retrieved messages are consolidated into one deduplicated list. ## if no messages have been retrieved, the returned future will resolve into a result holding an empty seq. - let queryFuturesList = peers.mapIt(w.queryWithPaging(req, it)) + let queryFuturesList = peers.mapIt(w.queryAll(req, it)) await allFutures(queryFuturesList) @@ -115,10 +140,6 @@ proc queryLoop*(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo return ok(messagesList) -## Resume store - -const StoreResumeTimeWindowOffset: Timestamp = getNanosecondTime(20) ## Adjust the time window with an offset of 20 seconds - proc resume*(w: WakuStoreClient, peerList = none(seq[RemotePeerInfo]), pageSize = DefaultPageSize, @@ -153,13 +174,11 @@ proc resume*(w: WakuStoreClient, queryStartTime = max(lastSeenTime - StoreResumeTimeWindowOffset, 0) let req = HistoryQuery( - pubsubTopic: pubsubTopic, - startTime: queryStartTime, - endTime: queryEndTime, - pagingInfo: PagingInfo( - direction:PagingDirection.FORWARD, - pageSize: uint64(pageSize) - ) + pubsubTopic: some(pubsubTopic), + startTime: some(queryStartTime), + endTime: some(queryEndTime), + pageSize: uint64(pageSize), + ascending: true ) var res: WakuStoreResult[seq[WakuMessage]] @@ -177,7 +196,7 @@ proc resume*(w: WakuStoreClient, return err("no suitable remote peers") debug "a peer is selected from peer manager" - res = await w.queryWithPaging(req, peerOpt.get()) + res = await w.queryAll(req, peerOpt.get()) if res.isErr(): debug "failed to resume the history" diff --git a/waku/v2/protocol/waku_store/common.nim b/waku/v2/protocol/waku_store/common.nim new file mode 100644 index 000000000..89f7b62ae --- /dev/null +++ b/waku/v2/protocol/waku_store/common.nim @@ -0,0 +1,104 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/options, + stew/results, + stew/byteutils, + nimcrypto/sha2 +import + ../../utils/time, + ../waku_message + + +const + WakuStoreCodec* = "/vac/waku/store/2.0.0-beta4" + + DefaultPageSize*: uint64 = 20 + + MaxPageSize*: uint64 = 100 + + +type WakuStoreResult*[T] = Result[T, string] + + +## Waku message digest + +type MessageDigest* = MDigest[256] + +proc computeDigest*(msg: WakuMessage): MessageDigest = + var ctx: sha256 + ctx.init() + defer: ctx.clear() + + ctx.update(msg.contentTopic.toBytes()) + ctx.update(msg.payload) + + # Computes the hash + return ctx.finish() + + +## Public API types + +type + HistoryCursor* = object + pubsubTopic*: PubsubTopic + senderTime*: Timestamp + storeTime*: Timestamp + digest*: MessageDigest + + HistoryQuery* = object + pubsubTopic*: Option[PubsubTopic] + contentTopics*: seq[ContentTopic] + cursor*: Option[HistoryCursor] + startTime*: Option[Timestamp] + endTime*: Option[Timestamp] + pageSize*: uint64 + ascending*: bool + + HistoryResponse* = object + messages*: seq[WakuMessage] + pageSize*: uint64 + ascending*: bool + cursor*: Option[HistoryCursor] + + HistoryErrorKind* {.pure.} = enum + UNKNOWN = uint32(000) + PEER_DIAL_FAILURE = uint32(200) + BAD_RESPONSE = uint32(300) + BAD_REQUEST = uint32(400) + SERVICE_UNAVAILABLE = uint32(503) + + HistoryError* = object + case kind*: HistoryErrorKind + of PEER_DIAL_FAILURE: + address*: string + of BAD_RESPONSE, BAD_REQUEST: + cause*: string + else: + discard + + HistoryResult* = Result[HistoryResponse, HistoryError] + + +proc parse*(T: type HistoryErrorKind, kind: uint32): T = + case kind: + of 000, 200, 300, 400, 503: + HistoryErrorKind(kind) + else: + HistoryErrorKind.UNKNOWN + +proc `$`*(err: HistoryError): string = + case err.kind: + of HistoryErrorKind.PEER_DIAL_FAILURE: + "PEER_DIAL_FAILURE: " & err.address + of HistoryErrorKind.BAD_RESPONSE: + "BAD_RESPONSE: " & err.cause + of HistoryErrorKind.BAD_REQUEST: + "BAD_REQUEST: " & err.cause + of HistoryErrorKind.SERVICE_UNAVAILABLE: + "SERVICE_UNAVAILABLE" + of HistoryErrorKind.UNKNOWN: + "UNKNOWN" \ No newline at end of file diff --git a/waku/v2/protocol/waku_store/message_store.nim b/waku/v2/protocol/waku_store/message_store.nim index 03f46612f..458804a5a 100644 --- a/waku/v2/protocol/waku_store/message_store.nim +++ b/waku/v2/protocol/waku_store/message_store.nim @@ -10,27 +10,28 @@ import std/[options, times], stew/results import + ../../utils/time, ../waku_message, - ./pagination, - ../../utils/time + ./common type MessageStoreResult*[T] = Result[T, string] - MessageStoreRow* = (string, WakuMessage, seq[byte], Timestamp) + MessageStoreRow* = (PubsubTopic, WakuMessage, seq[byte], Timestamp) MessageStore* = ref object of RootObj # MessageStore interface + method put*(ms: MessageStore, pubsubTopic: PubsubTopic, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] {.base.} = discard method put*(ms: MessageStore, pubsubTopic: PubsubTopic, message: WakuMessage): MessageStoreResult[void] {.base.} = let digest = computeDigest(message) receivedTime = if message.timestamp > 0: message.timestamp - else: getNanosecondTime(getTime().toUnixFloat()) + else: getNanosecondTime(getTime().toUnixFloat()) ms.put(pubsubTopic, message, digest, receivedTime) @@ -40,8 +41,8 @@ method getAllMessages*(ms: MessageStore): MessageStoreResult[seq[MessageStoreRow method getMessagesByHistoryQuery*( ms: MessageStore, contentTopic = none(seq[ContentTopic]), - pubsubTopic = none(string), - cursor = none(PagingIndex), + pubsubTopic = none(PubsubTopic), + cursor = none(HistoryCursor), startTime = none(Timestamp), endTime = none(Timestamp), maxPageSize = DefaultPageSize, diff --git a/waku/v2/protocol/waku_store/pagination.nim b/waku/v2/protocol/waku_store/pagination.nim deleted file mode 100644 index ebb98ce0f..000000000 --- a/waku/v2/protocol/waku_store/pagination.nim +++ /dev/null @@ -1,70 +0,0 @@ -## Contains types and utilities for pagination. -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import - stew/byteutils, - nimcrypto/sha2 -import - ../waku_message, - ../../utils/time - -const - MaxPageSize*: uint64 = 100 - - DefaultPageSize*: uint64 = 20 # A recommended default number of waku messages per page - - -type MessageDigest* = MDigest[256] - -type PagingIndex* = object - ## This type contains the description of an Index used in the pagination of WakuMessages - pubsubTopic*: string - senderTime*: Timestamp # the time at which the message is generated - receiverTime*: Timestamp - digest*: MessageDigest # calculated over payload and content topic - -proc computeDigest*(msg: WakuMessage): MessageDigest = - var ctx: sha256 - ctx.init() - defer: ctx.clear() - - ctx.update(msg.contentTopic.toBytes()) - ctx.update(msg.payload) - - # Computes the hash - return ctx.finish() - -proc compute*(T: type PagingIndex, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: PubsubTopic): T = - ## Takes a WakuMessage with received timestamp and returns its Index. - let - digest = computeDigest(msg) - senderTime = msg.timestamp - - PagingIndex( - pubsubTopic: pubsubTopic, - senderTime: senderTime, - receiverTime: receivedTime, - digest: digest - ) - -proc `==`*(x, y: PagingIndex): bool = - ## receiverTime plays no role in index equality - (x.senderTime == y.senderTime) and - (x.digest == y.digest) and - (x.pubsubTopic == y.pubsubTopic) - - -type - PagingDirection* {.pure.} = enum - ## PagingDirection determines the direction of pagination - BACKWARD = uint32(0) - FORWARD = uint32(1) - - PagingInfo* = object - ## This type holds the information needed for the pagination - pageSize*: uint64 - cursor*: PagingIndex - direction*: PagingDirection diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index 0fe831279..cf7ded6f6 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -23,9 +23,9 @@ import ../../utils/time, ../waku_message, ../waku_swap/waku_swap, + ./common, ./rpc, ./rpc_codec, - ./pagination, ./message_store, ./protocol_metrics @@ -35,14 +35,10 @@ logScope: const - WakuStoreCodec* = "/vac/waku/store/2.0.0-beta4" - MaxMessageTimestampVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" type - WakuStoreResult*[T] = Result[T, string] - WakuStore* = ref object of LPProtocol peerManager*: PeerManager rng*: ref rand.HmacDrbgContext @@ -51,6 +47,7 @@ type retentionPolicy: Option[MessageRetentionPolicy] +# TODO: Move to a message store wrapper proc executeMessageRetentionPolicy*(w: WakuStore) = if w.retentionPolicy.isNone(): return @@ -65,7 +62,7 @@ proc executeMessageRetentionPolicy*(w: WakuStore) = waku_store_errors.inc(labelValues = [retPolicyFailure]) debug "failed execution of retention policy", error=retPolicyRes.error - +# TODO: Move to a message store wrapper proc reportStoredMessagesMetric*(w: WakuStore) = if w.store.isNil(): return @@ -76,147 +73,7 @@ proc reportStoredMessagesMetric*(w: WakuStore) = waku_store_messages.set(resCount.value, labelValues = ["stored"]) - -proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} = - ## Query history to return a single page of messages matching the query - - # Extract query criteria. All query criteria are optional - let - qContentTopics = if (query.contentFilters.len != 0): some(query.contentFilters.mapIt(it.contentTopic)) - else: none(seq[ContentTopic]) - qPubSubTopic = if (query.pubsubTopic != ""): some(query.pubsubTopic) - else: none(string) - qCursor = if query.pagingInfo.cursor != PagingIndex(): some(query.pagingInfo.cursor) - else: none(PagingIndex) - qStartTime = if query.startTime != Timestamp(0): some(query.startTime) - else: none(Timestamp) - qEndTime = if query.endTime != Timestamp(0): some(query.endTime) - else: none(Timestamp) - qMaxPageSize = if query.pagingInfo.pageSize <= 0: DefaultPageSize - else: min(query.pagingInfo.pageSize, MaxPageSize) - qAscendingOrder = query.pagingInfo.direction == PagingDirection.FORWARD - - - let queryStartTime = getTime().toUnixFloat() - - let queryRes = w.store.getMessagesByHistoryQuery( - contentTopic = qContentTopics, - pubsubTopic = qPubSubTopic, - cursor = qCursor, - startTime = qStartTime, - endTime = qEndTime, - maxPageSize = qMaxPageSize + 1, - ascendingOrder = qAscendingOrder - ) - - let queryDuration = getTime().toUnixFloat() - queryStartTime - waku_store_query_duration_seconds.observe(queryDuration) - - - # Build response - # TODO: Improve error reporting - if queryRes.isErr(): - return HistoryResponse(messages: @[], pagingInfo: PagingInfo(), error: HistoryResponseError.INVALID_CURSOR) - - let rows = queryRes.get() - - if rows.len <= 0: - return HistoryResponse(messages: @[], error: HistoryResponseError.NONE) - - var messages = if rows.len <= int(qMaxPageSize): rows.mapIt(it[1]) - else: rows[0..^2].mapIt(it[1]) - var pagingInfo = none(PagingInfo) - - # The retrieved messages list should always be in chronological order - if not qAscendingOrder: - messages.reverse() - - - if rows.len > int(qMaxPageSize): - ## Build last message cursor - ## The cursor is built from the last message INCLUDED in the response - ## (i.e. the second last message in the rows list) - let (pubsubTopic, message, digest, storeTimestamp) = rows[^2] - - # TODO: Improve coherence of MessageDigest type - var messageDigest: array[32, byte] - for i in 0.. int(qMaxPageSize): + ## Build last message cursor + ## The cursor is built from the last message INCLUDED in the response + ## (i.e. the second last message in the rows list) + let (pubsubTopic, message, digest, storeTimestamp) = rows[^2] + + # TODO: Improve coherence of MessageDigest type + var messageDigest: array[32, byte] + for i in 0..