From 888f7cb3127e4cea03b5959135bb0cb1a6c740ff Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Mon, 25 Jul 2022 13:01:37 +0200 Subject: [PATCH] refactor(waku-store): major code reorganization, move StoreQueue to message_store folder --- examples/v2/chat2.nim | 3 +- tests/v2/test_jsonrpc_waku.nim | 2 +- tests/v2/test_message_store.nim | 5 +- tests/v2/test_peer_exchange.nim | 8 +- tests/v2/test_waku_bridge.nim | 2 +- tests/v2/test_waku_dnsdisc.nim | 8 +- tests/v2/test_waku_pagination.nim | 14 +- tests/v2/test_waku_store.nim | 14 +- tests/v2/test_waku_store_queue.nim | 11 +- tests/v2/test_waku_swap.nim | 2 +- tests/v2/test_wakunode.nim | 3 +- waku/v2/node/jsonrpc/jsonrpc_utils.nim | 3 +- waku/v2/node/jsonrpc/store_api.nim | 8 +- waku/v2/node/quicksim2.nim | 11 +- waku/v2/node/scripts/rpc_info.nim | 2 +- waku/v2/node/scripts/rpc_publish.nim | 2 +- waku/v2/node/scripts/rpc_query.nim | 2 +- waku/v2/node/scripts/rpc_subscribe.nim | 2 +- waku/v2/node/scripts/rpc_subscribe_filter.nim | 2 +- .../v2/node/storage/message/message_store.nim | 21 +- .../storage/message/waku_message_store.nim | 14 +- .../storage/message/waku_store_queue.nim} | 114 ++----- waku/v2/node/wakunode2.nim | 9 +- waku/v2/node/wakunode2_types.nim | 2 +- waku/v2/protocol/waku_store.nim | 9 + .../{waku_store.nim => protocol.nim} | 303 +++--------------- waku/v2/protocol/waku_store/rpc.nim | 35 ++ waku/v2/protocol/waku_store/rpc_codec.nim | 204 ++++++++++++ waku/v2/utils/pagination.nim | 62 +++- 29 files changed, 471 insertions(+), 406 deletions(-) rename waku/v2/{protocol/waku_store/waku_store_types.nim => node/storage/message/waku_store_queue.nim} (82%) create mode 100644 waku/v2/protocol/waku_store.nim rename waku/v2/protocol/waku_store/{waku_store.nim => protocol.nim} (74%) create mode 100644 waku/v2/protocol/waku_store/rpc.nim create mode 100644 waku/v2/protocol/waku_store/rpc_codec.nim diff --git a/examples/v2/chat2.nim b/examples/v2/chat2.nim index b9d0fd9dd..e20caf240 100644 --- a/examples/v2/chat2.nim +++ b/examples/v2/chat2.nim @@ -22,7 +22,8 @@ import libp2p/[switch, # manage transports, a single entry poi nameresolving/dnsresolver,# define DNS resolution muxers/muxer] # define an interface for stream multiplexing, allowing peers to offer many protocols over a single connection import ../../waku/v2/node/[wakunode2, waku_payload], - ../../waku/v2/node/./dnsdisc/waku_dnsdisc, + ../../waku/v2/node/dnsdisc/waku_dnsdisc, + ../../waku/v2/protocol/waku_store, ../../waku/v2/utils/[peers,time], ../../waku/common/utils/nat, ./config_chat2 diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index 83341708b..40a75654a 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -21,7 +21,7 @@ import admin_api, private_api], ../../waku/v2/protocol/waku_relay, - ../../waku/v2/protocol/waku_store/[waku_store, waku_store_types], + ../../waku/v2/protocol/waku_store, ../../waku/v2/protocol/waku_swap/waku_swap, ../../waku/v2/protocol/waku_filter/waku_filter, ../../waku/v2/utils/peers, diff --git a/tests/v2/test_message_store.nim b/tests/v2/test_message_store.nim index 40a7022c8..8f07c5ca2 100644 --- a/tests/v2/test_message_store.nim +++ b/tests/v2/test_message_store.nim @@ -6,9 +6,12 @@ import sqlite3_abi, stew/byteutils, ../../waku/v2/node/storage/message/waku_message_store, + ../../waku/v2/node/storage/message/waku_store_queue, ../../waku/v2/node/storage/sqlite, - ../../waku/v2/protocol/waku_store/waku_store, + ../../waku/v2/protocol/waku_message, + ../../waku/v2/protocol/waku_store, ../../waku/v2/utils/time, + ../../waku/v2/utils/pagination, ./utils diff --git a/tests/v2/test_peer_exchange.nim b/tests/v2/test_peer_exchange.nim index a2f27526f..c434f1b91 100644 --- a/tests/v2/test_peer_exchange.nim +++ b/tests/v2/test_peer_exchange.nim @@ -2,13 +2,15 @@ import std/[sequtils, options], + stew/shims/net, + testutils/unittests, chronicles, chronos, libp2p/crypto/crypto, - libp2p/protocols/pubsub/gossipsub, - stew/shims/net, - testutils/unittests, + libp2p/protocols/pubsub/gossipsub +import ../../waku/v2/node/wakunode2, + ../../waku/v2/utils/peers, ../test_helpers procSuite "Peer Exchange": diff --git a/tests/v2/test_waku_bridge.nim b/tests/v2/test_waku_bridge.nim index 6f33e8c0e..b84053e5b 100644 --- a/tests/v2/test_waku_bridge.nim +++ b/tests/v2/test_waku_bridge.nim @@ -16,7 +16,7 @@ import ../../waku/common/wakubridge, ../../waku/v1/protocol/waku_protocol, ../../waku/v2/protocol/waku_message, - ../../waku/v2/protocol/waku_store/waku_store, + ../../waku/v2/protocol/waku_store, ../../waku/v2/protocol/waku_filter/waku_filter, ../../waku/v2/node/[wakunode2, waku_payload], ../../waku/v2/utils/peers, diff --git a/tests/v2/test_waku_dnsdisc.nim b/tests/v2/test_waku_dnsdisc.nim index ca636103d..595df70b7 100644 --- a/tests/v2/test_waku_dnsdisc.nim +++ b/tests/v2/test_waku_dnsdisc.nim @@ -2,14 +2,16 @@ import std/[sequtils, tables], - chronicles, - chronos, testutils/unittests, stew/shims/net, stew/[base32, results], + chronicles, + chronos, libp2p/crypto/crypto, eth/keys, - discovery/dnsdisc/builder, + discovery/dnsdisc/builder +import + ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/node/dnsdisc/waku_dnsdisc, ../../waku/v2/node/wakunode2, ../test_helpers diff --git a/tests/v2/test_waku_pagination.nim b/tests/v2/test_waku_pagination.nim index 84789dd55..3c3356fec 100644 --- a/tests/v2/test_waku_pagination.nim +++ b/tests/v2/test_waku_pagination.nim @@ -1,10 +1,16 @@ {.used.} + import std/[options, sequtils], - testutils/unittests, nimcrypto/sha2, - libp2p/protobuf/minprotobuf, - ../../waku/v2/protocol/waku_store/waku_store, - ../../waku/v2/utils/time + testutils/unittests, + nimcrypto/sha2, + libp2p/protobuf/minprotobuf +import + ../../waku/v2/node/storage/message/waku_store_queue, + ../../waku/v2/protocol/waku_store, + ../../waku/v2/protocol/waku_message, + ../../waku/v2/utils/time, + ../../waku/v2/utils/pagination proc createSampleStoreQueue(s: int): StoreQueueRef = diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index d94bf5a41..4572b01a5 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -2,18 +2,24 @@ import std/[options, tables, sets, sequtils], - testutils/unittests, chronos, chronicles, + chronos, + chronicles, + testutils/unittests, libp2p/switch, libp2p/protobuf/minprotobuf, libp2p/stream/[bufferstream, connection], libp2p/crypto/crypto, - libp2p/protocols/pubsub/rpc/message, + libp2p/protocols/pubsub/rpc/message +import ../../waku/v2/protocol/waku_message, - ../../waku/v2/protocol/waku_store/waku_store, + ../../waku/v2/protocol/waku_store, ../../waku/v2/node/storage/message/waku_message_store, + ../../waku/v2/node/storage/message/waku_store_queue, ../../waku/v2/node/peer_manager/peer_manager, + ../../waku/v2/utils/pagination, ../../waku/v2/utils/time, - ../test_helpers, ./utils + ../test_helpers, + ./utils procSuite "Waku Store": const defaultContentTopic = ContentTopic("1") diff --git a/tests/v2/test_waku_store_queue.nim b/tests/v2/test_waku_store_queue.nim index 85ad96d5d..c744475fd 100644 --- a/tests/v2/test_waku_store_queue.nim +++ b/tests/v2/test_waku_store_queue.nim @@ -2,9 +2,16 @@ import std/[sequtils, strutils], + stew/results, testutils/unittests, - ../../waku/v2/protocol/waku_store/waku_store_types, - ../../waku/v2/utils/time + nimcrypto/hash +import + ../../waku/v2/node/storage/message/waku_store_queue, + ../../waku/v2/protocol/waku_message, + ../../waku/v2/protocol/waku_store, + ../../waku/v2/utils/time, + ../../waku/v2/utils/pagination + procSuite "Sorted store queue": diff --git a/tests/v2/test_waku_swap.nim b/tests/v2/test_waku_swap.nim index 51c107021..8fbf28049 100644 --- a/tests/v2/test_waku_swap.nim +++ b/tests/v2/test_waku_swap.nim @@ -11,7 +11,7 @@ import libp2p/switch, eth/keys, ../../waku/v2/protocol/waku_message, - ../../waku/v2/protocol/waku_store/waku_store, + ../../waku/v2/protocol/waku_store, ../../waku/v2/protocol/waku_swap/waku_swap, ../../waku/v2/node/wakunode2, ../../waku/v2/utils/peers, diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index accadf4c9..f8b37f2bf 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -15,8 +15,9 @@ import eth/keys, ../../waku/v2/node/storage/sqlite, ../../waku/v2/node/storage/message/waku_message_store, + ../../waku/v2/node/storage/message/waku_store_queue, ../../waku/v2/protocol/[waku_relay, waku_message], - ../../waku/v2/protocol/waku_store/waku_store, + ../../waku/v2/protocol/waku_store, ../../waku/v2/protocol/waku_filter/waku_filter, ../../waku/v2/protocol/waku_lightpush/waku_lightpush, ../../waku/v2/node/peer_manager/peer_manager, diff --git a/waku/v2/node/jsonrpc/jsonrpc_utils.nim b/waku/v2/node/jsonrpc/jsonrpc_utils.nim index ddff71654..461f1474b 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_utils.nim +++ b/waku/v2/node/jsonrpc/jsonrpc_utils.nim @@ -4,9 +4,10 @@ import std/[options, json], eth/keys, ../../../v1/node/rpc/hexstrings, - ../../protocol/waku_store/waku_store_types, + ../../protocol/waku_store, ../../protocol/waku_message, ../../utils/time, + ../../utils/pagination, ../waku_payload, ./jsonrpc_types diff --git a/waku/v2/node/jsonrpc/store_api.nim b/waku/v2/node/jsonrpc/store_api.nim index 95b7168c3..0097c2cda 100644 --- a/waku/v2/node/jsonrpc/store_api.nim +++ b/waku/v2/node/jsonrpc/store_api.nim @@ -3,10 +3,14 @@ import std/options, chronicles, - json_rpc/rpcserver, + json_rpc/rpcserver +import ../wakunode2, + ../../protocol/waku_store, ../../utils/time, - ./jsonrpc_types, ./jsonrpc_utils + ../../utils/pagination, + ./jsonrpc_types, + ./jsonrpc_utils export jsonrpc_types diff --git a/waku/v2/node/quicksim2.nim b/waku/v2/node/quicksim2.nim index f3cac49ed..165f58b42 100644 --- a/waku/v2/node/quicksim2.nim +++ b/waku/v2/node/quicksim2.nim @@ -1,16 +1,13 @@ -# Group by std, external then internal imports import - # std imports - std/ [os, strutils, times, options], #options as what # TODO: Huh? Redefinition? - # external imports + std/[os, strutils, times, options], #options as what # TODO: Huh? Redefinition? chronicles, eth/common as eth_common, eth/keys, json_rpc/[rpcclient, rpcserver], - libp2p/protobuf/minprotobuf, - # internal imports + libp2p/protobuf/minprotobuf +import ../protocol/waku_filter/waku_filter_types, - ../protocol/waku_store/waku_store_types, + ../protocol/waku_store, ../protocol/waku_message, ../utils/time, ./wakunode2, diff --git a/waku/v2/node/scripts/rpc_info.nim b/waku/v2/node/scripts/rpc_info.nim index 0f1fdd62c..068405b3c 100644 --- a/waku/v2/node/scripts/rpc_info.nim +++ b/waku/v2/node/scripts/rpc_info.nim @@ -9,7 +9,7 @@ import ../waku_payload, ../jsonrpc/jsonrpc_types, ../../protocol/waku_filter/waku_filter_types, - ../../protocol/waku_store/waku_store_types, + ../../protocol/waku_store, ../../../v1/node/rpc/hexstrings from strutils import rsplit diff --git a/waku/v2/node/scripts/rpc_publish.nim b/waku/v2/node/scripts/rpc_publish.nim index a23682a23..ba85a1ef4 100644 --- a/waku/v2/node/scripts/rpc_publish.nim +++ b/waku/v2/node/scripts/rpc_publish.nim @@ -9,7 +9,7 @@ import ../waku_payload, ../jsonrpc/jsonrpc_types, ../../protocol/waku_filter/waku_filter_types, - ../../protocol/waku_store/waku_store_types, + ../../protocol/waku_store, ../../../v1/node/rpc/hexstrings from strutils import rsplit diff --git a/waku/v2/node/scripts/rpc_query.nim b/waku/v2/node/scripts/rpc_query.nim index a72a42980..3ff76357d 100644 --- a/waku/v2/node/scripts/rpc_query.nim +++ b/waku/v2/node/scripts/rpc_query.nim @@ -9,7 +9,7 @@ import ../waku_payload, ../jsonrpc/jsonrpc_types, ../../protocol/waku_filter/waku_filter_types, - ../../protocol/waku_store/waku_store_types, + ../../protocol/waku_store, ../../../v1/node/rpc/hexstrings from strutils import rsplit diff --git a/waku/v2/node/scripts/rpc_subscribe.nim b/waku/v2/node/scripts/rpc_subscribe.nim index a5e426fc9..e15ed22bd 100644 --- a/waku/v2/node/scripts/rpc_subscribe.nim +++ b/waku/v2/node/scripts/rpc_subscribe.nim @@ -8,7 +8,7 @@ import ../waku_payload, ../jsonrpc/jsonrpc_types, ../../protocol/waku_filter/waku_filter_types, - ../../protocol/waku_store/waku_store_types, + ../../protocol/waku_store, ../../../v1/node/rpc/hexstrings from strutils import rsplit diff --git a/waku/v2/node/scripts/rpc_subscribe_filter.nim b/waku/v2/node/scripts/rpc_subscribe_filter.nim index ad7f14190..70094d08f 100644 --- a/waku/v2/node/scripts/rpc_subscribe_filter.nim +++ b/waku/v2/node/scripts/rpc_subscribe_filter.nim @@ -9,7 +9,7 @@ import ../waku_payload, ../jsonrpc/jsonrpc_types, ../../protocol/waku_filter/waku_filter_types, - ../../protocol/waku_store/waku_store_types, + ../../protocol/waku_store, ../../../v1/node/rpc/hexstrings from strutils import rsplit diff --git a/waku/v2/node/storage/message/message_store.nim b/waku/v2/node/storage/message/message_store.nim index 0e650c003..b87e8408c 100644 --- a/waku/v2/node/storage/message/message_store.nim +++ b/waku/v2/node/storage/message/message_store.nim @@ -1,16 +1,16 @@ +## This module defines a message store interface. Implementations of +## MessageStore are used by the `WakuStore` protocol to store and +## retrieve historical messages {.push raises: [Defect].} import std/options, stew/results, ../../../protocol/waku_message, - ../../../protocol/waku_store/waku_store_types, + ../../../protocol/waku_store/rpc, ../../../utils/time, ../../../utils/pagination -## This module defines a message store interface. Implementations of -## MessageStore are used by the `WakuStore` protocol to store and -## retrieve historical messages type DataProc* = proc(receiverTimestamp: Timestamp, msg: WakuMessage, pubsubTopic: string) {.closure, raises: [Defect].} @@ -19,6 +19,19 @@ type MessageStore* = ref object of RootObj + +# TODO: Remove after resolving nwaku #1026. Move it back to waku_store_queue.nim +type + IndexedWakuMessage* = object + # TODO may need to rename this object as it holds both the index and the pubsub topic of a waku message + ## This type is used to encapsulate a WakuMessage and its Index + msg*: WakuMessage + index*: Index + pubsubTopic*: string + + QueryFilterMatcher* = proc(indexedWakuMsg: IndexedWakuMessage) : bool {.gcsafe, closure.} + + # MessageStore interface method put*(db: MessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] {.base.} = discard method getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] {.base.} = discard diff --git a/waku/v2/node/storage/message/waku_message_store.nim b/waku/v2/node/storage/message/waku_message_store.nim index 6210cd0a5..aff2b2849 100644 --- a/waku/v2/node/storage/message/waku_message_store.nim +++ b/waku/v2/node/storage/message/waku_message_store.nim @@ -2,16 +2,18 @@ import std/[options, tables, times], - sqlite3_abi, stew/[byteutils, results], - chronicles, chronos, - ./message_store, - ../sqlite, + chronicles, + sqlite3_abi +import ../../../protocol/waku_message, - ../../../protocol/waku_store/waku_store, + ../../../protocol/waku_store, ../../../utils/pagination, - ../../../utils/time + ../../../utils/time, + ../sqlite, + ./message_store, + ./waku_store_queue export sqlite diff --git a/waku/v2/protocol/waku_store/waku_store_types.nim b/waku/v2/node/storage/message/waku_store_queue.nim similarity index 82% rename from waku/v2/protocol/waku_store/waku_store_types.nim rename to waku/v2/node/storage/message/waku_store_queue.nim index f90bb27a5..545655733 100644 --- a/waku/v2/protocol/waku_store/waku_store_types.nim +++ b/waku/v2/node/storage/message/waku_store_queue.nim @@ -1,95 +1,42 @@ -## Types for waku_store protocol. - {.push raises: [Defect].} -# Group by std, external then internal imports import - std/[algorithm, options], - # external imports - bearssl, - chronicles, - libp2p/protocols/protocol, + std/[options, algorithm], stew/[results, sorted_set], - # internal imports - ../../utils/pagination, - ../../utils/time, - ../../node/peer_manager/peer_manager, - ../waku_swap/waku_swap_types, - ../waku_message + chronicles +import + ../../../protocol/waku_message, + ../../../protocol/waku_store/rpc, + ../../../utils/pagination, + ../../../utils/time, + ./message_store + + +# TODO: Remove after resolving nwaku #1026 +export + message_store + + +logScope: + topics = "message_store.storequeue" -# export all modules whose types are used in public functions/types -export - bearssl, - results, - peer_manager, - waku_swap_types, - waku_message, - pagination const - # Constants required for pagination ------------------------------------------- - MaxPageSize* = uint64(100) # Maximum number of waku messages in each page - # TODO the DefaultPageSize can be changed, it's current value is random - DefaultPageSize* = uint64(20) # A recommended default number of waku messages per page - - MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead - - MaxTimeVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future - - DefaultTopic* = "/waku/2/default-waku/proto" + MaxPageSize = uint64(100) # Maximum number of waku messages in each page + + MaxTimeVariance = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future type - HistoryContentFilter* = object - contentTopic*: ContentTopic - QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.} - QueryFilterMatcher* = proc(indexedWakuMsg: IndexedWakuMessage) : bool {.gcsafe, closure.} - - IndexedWakuMessage* = object - # TODO may need to rename this object as it holds both the index and the pubsub topic of a waku message - ## This type is used to encapsulate a WakuMessage and its Index - msg*: WakuMessage - index*: Index - pubsubTopic*: string - - PagingDirection* {.pure.} = enum - ## PagingDirection determines the direction of pagination - BACKWARD = uint32(0) - FORWARD = uint32(1) - - PagingInfo* = object - ## This type holds the information needed for the pagination - pageSize*: uint64 - cursor*: Index - direction*: PagingDirection - - HistoryResponseError* {.pure.} = enum - ## HistoryResponseError contains error message to inform the querying node about the state of its request - NONE = uint32(0) - INVALID_CURSOR = uint32(1) - - HistoryQuery* = object - contentFilters*: seq[HistoryContentFilter] - pubsubTopic*: string - pagingInfo*: PagingInfo # used for pagination - startTime*: Timestamp # used for time-window query - endTime*: Timestamp # used for time-window query - - HistoryResponse* = object - messages*: seq[WakuMessage] - pagingInfo*: PagingInfo # used for pagination - error*: HistoryResponseError - - HistoryRPC* = object - requestId*: string - query*: HistoryQuery - response*: HistoryResponse - QueryResult* = Result[uint64, string] + MessagesResult* = Result[seq[WakuMessage], string] +type + StoreQueueResult*[T] = Result[T, cstring] + StoreQueueRef* = ref object ## Bounded repository for indexed messages ## @@ -105,15 +52,8 @@ type items: SortedSet[Index, IndexedWakuMessage] # sorted set of stored messages capacity: int # Maximum amount of messages to keep - StoreQueueResult*[T] = Result[T, cstring] - -###################### -# StoreQueue helpers # -###################### - -logScope: - topics = "wakustorequeue" +### Helpers proc ffdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage], startCursor: Index): @@ -305,10 +245,8 @@ proc bwdPage(storeQueue: StoreQueueRef, outPagingInfo, outError) -################## -# StoreQueue API # -################## +#### API ## --- SortedSet accessors --- iterator fwdIterator*(storeQueue: StoreQueueRef): (Index, IndexedWakuMessage) = diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 80ec95341..e7c57710f 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -13,18 +13,21 @@ import libp2p/protocols/pubsub/[gossipsub, rpc/messages], libp2p/nameresolving/nameresolver, libp2p/[builders, multihash], - libp2p/transports/[transport, tcptransport, wstransport], + libp2p/transports/[transport, tcptransport, wstransport] +import + ../node/storage/message/waku_store_queue, ../protocol/[waku_relay, waku_message], - ../protocol/waku_store/waku_store, + ../protocol/waku_store, ../protocol/waku_swap/waku_swap, ../protocol/waku_filter/waku_filter, ../protocol/waku_lightpush/waku_lightpush, ../protocol/waku_rln_relay/[waku_rln_relay_types], ../utils/[peers, requests, wakuswitch, wakuenr], ./peer_manager/peer_manager, + ./storage/message/message_store, ./dnsdisc/waku_dnsdisc, ./discv5/waku_discv5, - wakunode2_types + ./wakunode2_types export builders, diff --git a/waku/v2/node/wakunode2_types.nim b/waku/v2/node/wakunode2_types.nim index 591f5063b..e22e46c34 100644 --- a/waku/v2/node/wakunode2_types.nim +++ b/waku/v2/node/wakunode2_types.nim @@ -3,7 +3,7 @@ import libp2p/crypto/crypto, libp2p/protocols/ping, ../protocol/waku_relay, - ../protocol/waku_store/waku_store, + ../protocol/waku_store, ../protocol/waku_swap/waku_swap, ../protocol/waku_filter/waku_filter, ../protocol/waku_lightpush/waku_lightpush, diff --git a/waku/v2/protocol/waku_store.nim b/waku/v2/protocol/waku_store.nim new file mode 100644 index 000000000..b34494b5f --- /dev/null +++ b/waku/v2/protocol/waku_store.nim @@ -0,0 +1,9 @@ +import + ./waku_store/protocol, + ./waku_store/rpc, + ./waku_store/rpc_codec + +export + protocol, + rpc, + rpc_codec \ No newline at end of file diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/protocol.nim similarity index 74% rename from waku/v2/protocol/waku_store/waku_store.nim rename to waku/v2/protocol/waku_store/protocol.nim index a2776c030..032ff3835 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -1,42 +1,31 @@ ## Waku Store protocol for historical messaging support. ## See spec for more details: ## https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-store.md - {.push raises: [Defect].} -# Group by std, external then internal imports import - # std imports std/[tables, times, sequtils, options, math], - # external imports - bearssl, + stew/[results, byteutils], chronicles, chronos, + bearssl, libp2p/crypto/crypto, libp2p/protocols/protocol, libp2p/protobuf/minprotobuf, libp2p/stream/connection, - libp2p/varint, - metrics, - stew/[results, byteutils], - # internal imports + metrics +import ../../node/storage/message/message_store, + ../../node/storage/message/waku_store_queue, ../../node/peer_manager/peer_manager, - ../../utils/protobuf, - ../../utils/requests, ../../utils/time, + ../../utils/pagination, + ../../utils/requests, + ../waku_message, ../waku_swap/waku_swap, - ./waku_store_types + ./rpc, + ./rpc_codec -# export all modules whose types are used in public functions/types -export - options, - chronos, - bearssl, - minprotobuf, - peer_manager, - waku_store_types, - message_store declarePublicGauge waku_store_messages, "number of historical messages", ["type"] declarePublicGauge waku_store_peers, "number of store peers" @@ -46,18 +35,30 @@ declarePublicGauge waku_store_queries, "number of store queries received" logScope: topics = "wakustore" + +const + # Constants required for pagination ------------------------------------------- + MaxPageSize* = uint64(100) # Maximum number of waku messages in each page + + # TODO the DefaultPageSize can be changed, it's current value is random + DefaultPageSize* = uint64(20) # A recommended default number of waku messages per page + + MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead + + MaxTimeVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future + + DefaultTopic* = "/waku/2/default-waku/proto" + + const WakuStoreCodec* = "/vac/waku/store/2.0.0-beta4" - DefaultStoreCapacity* = 50000 # Default maximum of 50k messages stored + DefaultStoreCapacity* = 50_000 # Default maximum of 50k messages stored # Error types (metric label values) const dialFailure = "dial_failure" decodeRpcFailure = "decode_rpc_failure" -# TODO Move serialization function to separate file, too noisy -# TODO Move pagination to separate file, self-contained logic - type WakuStore* = ref object of LPProtocol peerManager*: PeerManager @@ -70,230 +71,6 @@ type isSqliteOnly: bool # if true, don't use in memory-store and answer history queries from the sqlite DB -proc computeIndex*(msg: WakuMessage, - receivedTime = getNanosecondTime(getTime().toUnixFloat()), - pubsubTopic = DefaultTopic): Index = - ## Takes a WakuMessage with received timestamp and returns its Index. - ## Received timestamp will default to system time if not provided. - var ctx: sha256 - ctx.init() - ctx.update(msg.contentTopic.toBytes()) # converts the contentTopic to bytes - ctx.update(msg.payload) - let digest = ctx.finish() # computes the hash - ctx.clear() - - let - receiverTime = receivedTime - index = Index(digest:digest, - receiverTime: receiverTime, - senderTime: msg.timestamp, - pubsubTopic: pubsubTopic) - - return index - -proc encode*(index: Index): ProtoBuffer = - ## encodes an Index object into a ProtoBuffer - ## returns the resultant ProtoBuffer - - # intiate a ProtoBuffer - var output = initProtoBuffer() - - # encodes index - output.write3(1, index.digest.data) - output.write3(2, zint64(index.receiverTime)) - output.write3(3, zint64(index.senderTime)) - output.write3(4, index.pubsubTopic) - - output.finish3() - - return output - -proc encode*(pinfo: PagingInfo): ProtoBuffer = - ## encodes a PagingInfo object into a ProtoBuffer - ## returns the resultant ProtoBuffer - - # intiate a ProtoBuffer - var output = initProtoBuffer() - - # encodes pinfo - output.write3(1, pinfo.pageSize) - output.write3(2, pinfo.cursor.encode()) - output.write3(3, uint32(ord(pinfo.direction))) - - output.finish3() - - return output - -proc init*(T: type Index, buffer: seq[byte]): ProtoResult[T] = - ## creates and returns an Index object out of buffer - var index = Index() - let pb = initProtoBuffer(buffer) - - var data: seq[byte] - discard ? pb.getField(1, data) - - # create digest from data - index.digest = MDigest[256]() - for count, b in data: - index.digest.data[count] = b - - # read the timestamp - var receiverTime: zint64 - discard ? pb.getField(2, receiverTime) - index.receiverTime = Timestamp(receiverTime) - - # read the timestamp - var senderTime: zint64 - discard ? pb.getField(3, senderTime) - index.senderTime = Timestamp(senderTime) - - # read the pubsubTopic - discard ? pb.getField(4, index.pubsubTopic) - - return ok(index) - -proc init*(T: type PagingInfo, buffer: seq[byte]): ProtoResult[T] = - ## creates and returns a PagingInfo object out of buffer - var pagingInfo = PagingInfo() - let pb = initProtoBuffer(buffer) - - var pageSize: uint64 - discard ? pb.getField(1, pageSize) - pagingInfo.pageSize = pageSize - - - var cursorBuffer: seq[byte] - discard ? pb.getField(2, cursorBuffer) - pagingInfo.cursor = ? Index.init(cursorBuffer) - - var direction: uint32 - discard ? pb.getField(3, direction) - pagingInfo.direction = PagingDirection(direction) - - return ok(pagingInfo) - -proc init*(T: type HistoryContentFilter, buffer: seq[byte]): ProtoResult[T] = - let pb = initProtoBuffer(buffer) - - # ContentTopic corresponds to the contentTopic field of waku message (not to be confused with pubsub topic) - var contentTopic: ContentTopic - discard ? pb.getField(1, contentTopic) - - ok(HistoryContentFilter(contentTopic: contentTopic)) - -proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] = - var msg = HistoryQuery() - let pb = initProtoBuffer(buffer) - - discard ? pb.getField(2, msg.pubsubTopic) - - var buffs: seq[seq[byte]] - discard ? pb.getRepeatedField(3, buffs) - - for buf in buffs: - msg.contentFilters.add(? HistoryContentFilter.init(buf)) - - var pagingInfoBuffer: seq[byte] - discard ? pb.getField(4, pagingInfoBuffer) - - msg.pagingInfo = ? PagingInfo.init(pagingInfoBuffer) - - var startTime: zint64 - discard ? pb.getField(5, startTime) - msg.startTime = Timestamp(startTime) - - var endTime: zint64 - discard ? pb.getField(6, endTime) - msg.endTime = Timestamp(endTime) - - return ok(msg) - -proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] = - var msg = HistoryResponse() - let pb = initProtoBuffer(buffer) - - var messages: seq[seq[byte]] - discard ? pb.getRepeatedField(2, messages) - - for buf in messages: - msg.messages.add(? WakuMessage.init(buf)) - - var pagingInfoBuffer: seq[byte] - discard ? pb.getField(3, pagingInfoBuffer) - msg.pagingInfo= ? PagingInfo.init(pagingInfoBuffer) - - var error: uint32 - discard ? pb.getField(4, error) - msg.error = HistoryResponseError(error) - - return ok(msg) - -proc init*(T: type HistoryRPC, buffer: seq[byte]): ProtoResult[T] = - var rpc = HistoryRPC() - let pb = initProtoBuffer(buffer) - - discard ? pb.getField(1, rpc.requestId) - - var queryBuffer: seq[byte] - discard ? pb.getField(2, queryBuffer) - - rpc.query = ? HistoryQuery.init(queryBuffer) - - var responseBuffer: seq[byte] - discard ? pb.getField(3, responseBuffer) - - rpc.response = ? HistoryResponse.init(responseBuffer) - - return ok(rpc) - -proc encode*(filter: HistoryContentFilter): ProtoBuffer = - var output = initProtoBuffer() - output.write3(1, filter.contentTopic) - output.finish3() - return output - -proc encode*(query: HistoryQuery): ProtoBuffer = - var output = initProtoBuffer() - - output.write3(2, query.pubsubTopic) - - for filter in query.contentFilters: - output.write3(3, filter.encode()) - - output.write3(4, query.pagingInfo.encode()) - - output.write3(5, zint64(query.startTime)) - output.write3(6, zint64(query.endTime)) - - output.finish3() - - return output - -proc encode*(response: HistoryResponse): ProtoBuffer = - var output = initProtoBuffer() - - for msg in response.messages: - output.write3(2, msg.encode()) - - output.write3(3, response.pagingInfo.encode()) - - output.write3(4, uint32(ord(response.error))) - - output.finish3() - - return output - -proc encode*(rpc: HistoryRPC): ProtoBuffer = - var output = initProtoBuffer() - - output.write3(1, rpc.requestId) - output.write3(2, rpc.query.encode()) - output.write3(3, rpc.response.encode()) - - output.finish3() - - return output - proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} = ## Query history to return a single page of messages matching the query @@ -397,7 +174,11 @@ proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) = proc onData(receiverTime: Timestamp, msg: WakuMessage, pubsubTopic: string) = # TODO index should not be recalculated - discard ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(receiverTime, pubsubTopic), pubsubTopic: pubsubTopic)) + discard ws.messages.add(IndexedWakuMessage( + msg: msg, + index: Index.compute(msg, receiverTime, pubsubTopic), + pubsubTopic: pubsubTopic + )) info "attempting to load messages from persistent storage" @@ -411,7 +192,6 @@ proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) = debug "the number of messages in the memory", messageNum=ws.messages.len waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"]) - proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgContext, store: MessageStore = nil, wakuSwap: WakuSwap = nil, persistMessages = true, capacity = DefaultStoreCapacity, isSqliteOnly = false): T = @@ -430,7 +210,11 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} = # Store is mounted but new messages should not be stored return - let index = msg.computeIndex(pubsubTopic = topic) + let index = Index.compute( + msg, + receivedTime = getNanosecondTime(getTime().toUnixFloat()), + pubsubTopic = topic + ) # add message to in-memory store if not w.isSqliteOnly: @@ -614,7 +398,12 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem # exclude index from the comparison criteria for msg in msgList: - let index = msg.computeIndex(pubsubTopic = DefaultTopic) + let index = Index.compute( + msg, + receivedTime = getNanosecondTime(getTime().toUnixFloat()), + pubsubTopic = DefaultTopic + ) + # check for duplicate messages # TODO Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic if ws.messages.contains(index): @@ -712,3 +501,11 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"]) handler(response.value.response) + + + +# TODO: Remove the following deprecated method +proc computeIndex*(msg: WakuMessage, + receivedTime = getNanosecondTime(getTime().toUnixFloat()), + pubsubTopic = DefaultTopic): Index {.deprecated: "Use Index.compute() instead".}= + Index.compute(msg, receivedTime, pubsubTopic) diff --git a/waku/v2/protocol/waku_store/rpc.nim b/waku/v2/protocol/waku_store/rpc.nim new file mode 100644 index 000000000..02215895f --- /dev/null +++ b/waku/v2/protocol/waku_store/rpc.nim @@ -0,0 +1,35 @@ +{.push raises: [Defect].} + +import + nimcrypto/hash +import + ../waku_message, + ../../utils/pagination, + ../../utils/time + + +type + HistoryContentFilter* = object + contentTopic*: ContentTopic + + HistoryQuery* = object + contentFilters*: seq[HistoryContentFilter] + pubsubTopic*: string + pagingInfo*: PagingInfo # used for pagination + startTime*: Timestamp # used for time-window query + endTime*: Timestamp # used for time-window query + + HistoryResponseError* {.pure.} = enum + ## HistoryResponseError contains error message to inform the querying node about the state of its request + NONE = uint32(0) + INVALID_CURSOR = uint32(1) + + HistoryResponse* = object + messages*: seq[WakuMessage] + pagingInfo*: PagingInfo # used for pagination + error*: HistoryResponseError + + HistoryRPC* = object + requestId*: string + query*: HistoryQuery + response*: HistoryResponse diff --git a/waku/v2/protocol/waku_store/rpc_codec.nim b/waku/v2/protocol/waku_store/rpc_codec.nim new file mode 100644 index 000000000..fa7dda965 --- /dev/null +++ b/waku/v2/protocol/waku_store/rpc_codec.nim @@ -0,0 +1,204 @@ +{.push raises: [Defect].} + +import + nimcrypto/hash, + libp2p/protobuf/minprotobuf, + libp2p/varint +import + ../waku_message, + ../../utils/protobuf, + ../../utils/pagination, + ../../utils/time, + ./rpc + + +proc encode*(index: Index): ProtoBuffer = + ## Encode an Index object into a ProtoBuffer + ## returns the resultant ProtoBuffer + + var output = initProtoBuffer() + output.write3(1, index.digest.data) + output.write3(2, zint64(index.receiverTime)) + output.write3(3, zint64(index.senderTime)) + output.write3(4, index.pubsubTopic) + output.finish3() + + return output + +proc init*(T: type Index, buffer: seq[byte]): ProtoResult[T] = + ## creates and returns an Index object out of buffer + var index = Index() + let pb = initProtoBuffer(buffer) + + var data: seq[byte] + discard ?pb.getField(1, data) + + # create digest from data + index.digest = MDigest[256]() + for count, b in data: + index.digest.data[count] = b + + # read the timestamp + var receiverTime: zint64 + discard ?pb.getField(2, receiverTime) + index.receiverTime = Timestamp(receiverTime) + + # read the timestamp + var senderTime: zint64 + discard ?pb.getField(3, senderTime) + index.senderTime = Timestamp(senderTime) + + # read the pubsubTopic + discard ?pb.getField(4, index.pubsubTopic) + + return ok(index) + + +proc encode*(pinfo: PagingInfo): ProtoBuffer = + ## Encodes a PagingInfo object into a ProtoBuffer + ## returns the resultant ProtoBuffer + + var output = initProtoBuffer() + output.write3(1, pinfo.pageSize) + output.write3(2, pinfo.cursor.encode()) + output.write3(3, uint32(ord(pinfo.direction))) + output.finish3() + + return output + +proc init*(T: type PagingInfo, buffer: seq[byte]): ProtoResult[T] = + ## creates and returns a PagingInfo object out of buffer + var pagingInfo = PagingInfo() + let pb = initProtoBuffer(buffer) + + var pageSize: uint64 + discard ?pb.getField(1, pageSize) + pagingInfo.pageSize = pageSize + + var cursorBuffer: seq[byte] + discard ?pb.getField(2, cursorBuffer) + pagingInfo.cursor = ?Index.init(cursorBuffer) + + var direction: uint32 + discard ?pb.getField(3, direction) + pagingInfo.direction = PagingDirection(direction) + + return ok(pagingInfo) + + +proc encode*(filter: HistoryContentFilter): ProtoBuffer = + var output = initProtoBuffer() + output.write3(1, filter.contentTopic) + output.finish3() + return output + +proc init*(T: type HistoryContentFilter, buffer: seq[byte]): ProtoResult[T] = + let pb = initProtoBuffer(buffer) + var contentTopic: ContentTopic + discard ?pb.getField(1, contentTopic) + + ok(HistoryContentFilter(contentTopic: contentTopic)) + + +proc encode*(query: HistoryQuery): ProtoBuffer = + var output = initProtoBuffer() + output.write3(2, query.pubsubTopic) + + for filter in query.contentFilters: + output.write3(3, filter.encode()) + + output.write3(4, query.pagingInfo.encode()) + output.write3(5, zint64(query.startTime)) + output.write3(6, zint64(query.endTime)) + output.finish3() + + return output + +proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] = + var msg = HistoryQuery() + let pb = initProtoBuffer(buffer) + + discard ?pb.getField(2, msg.pubsubTopic) + + var buffs: seq[seq[byte]] + discard ?pb.getRepeatedField(3, buffs) + + for buf in buffs: + msg.contentFilters.add(? HistoryContentFilter.init(buf)) + + var pagingInfoBuffer: seq[byte] + discard ?pb.getField(4, pagingInfoBuffer) + + msg.pagingInfo = ?PagingInfo.init(pagingInfoBuffer) + + var startTime: zint64 + discard ?pb.getField(5, startTime) + msg.startTime = Timestamp(startTime) + + var endTime: zint64 + discard ?pb.getField(6, endTime) + msg.endTime = Timestamp(endTime) + + return ok(msg) + + +proc encode*(response: HistoryResponse): ProtoBuffer = + var output = initProtoBuffer() + + for msg in response.messages: + output.write3(2, msg.encode()) + + output.write3(3, response.pagingInfo.encode()) + output.write3(4, uint32(ord(response.error))) + output.finish3() + + return output + +proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] = + var msg = HistoryResponse() + let pb = initProtoBuffer(buffer) + + var messages: seq[seq[byte]] + discard ?pb.getRepeatedField(2, messages) + + for buf in messages: + let message = ?WakuMessage.init(buf) + msg.messages.add(message) + + var pagingInfoBuffer: seq[byte] + discard ?pb.getField(3, pagingInfoBuffer) + msg.pagingInfo = ?PagingInfo.init(pagingInfoBuffer) + + var error: uint32 + discard ?pb.getField(4, error) + msg.error = HistoryResponseError(error) + + return ok(msg) + + +proc encode*(rpc: HistoryRPC): ProtoBuffer = + var output = initProtoBuffer() + + output.write3(1, rpc.requestId) + output.write3(2, rpc.query.encode()) + output.write3(3, rpc.response.encode()) + + output.finish3() + + return output + +proc init*(T: type HistoryRPC, buffer: seq[byte]): ProtoResult[T] = + var rpc = HistoryRPC() + let pb = initProtoBuffer(buffer) + discard ?pb.getField(1, rpc.requestId) + + var queryBuffer: seq[byte] + discard ?pb.getField(2, queryBuffer) + rpc.query = ?HistoryQuery.init(queryBuffer) + + var responseBuffer: seq[byte] + discard ?pb.getField(3, responseBuffer) + rpc.response = ?HistoryResponse.init(responseBuffer) + + return ok(rpc) + diff --git a/waku/v2/utils/pagination.nim b/waku/v2/utils/pagination.nim index ba5c3a6a4..6218522ab 100644 --- a/waku/v2/utils/pagination.nim +++ b/waku/v2/utils/pagination.nim @@ -1,23 +1,44 @@ ## Contains types and utilities for pagination. -## -## Used by both message store and store protocol. - {.push raises: [Defect].} import - ./time, - nimcrypto/hash, - stew/byteutils + stew/byteutils, + nimcrypto +import + ../protocol/waku_message, + ./time -export hash -type - Index* = object - ## This type contains the description of an Index used in the pagination of WakuMessages - digest*: MDigest[256] # calculated over payload and content topic - receiverTime*: Timestamp - senderTime*: Timestamp # the time at which the message is generated - pubsubTopic*: string +type Index* = object + ## This type contains the description of an Index used in the pagination of WakuMessages + digest*: MDigest[256] # calculated over payload and content topic + receiverTime*: Timestamp + senderTime*: Timestamp # the time at which the message is generated + pubsubTopic*: string + + +proc compute*(T: type Index, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: string): T = + ## Takes a WakuMessage with received timestamp and returns its Index. + ## Received timestamp will default to system time if not provided. + + let + contentTopic = toBytes(msg.contentTopic) + payload = msg.payload + senderTime = msg.timestamp + + var ctx: sha256 + ctx.init() + ctx.update(contentTopic) + ctx.update(payload) + let digest = ctx.finish() # computes the hash + ctx.clear() + + Index( + digest:digest, + receiverTime: receivedTime, + senderTime: senderTime, + pubsubTopic: pubsubTopic + ) proc `==`*(x, y: Index): bool = ## receiverTime plays no role in index equality @@ -59,3 +80,16 @@ proc cmp*(x, y: Index): int = return digestcmp return cmp(x.pubsubTopic, y.pubsubTopic) + + +type + PagingDirection* {.pure.} = enum + ## PagingDirection determines the direction of pagination + BACKWARD = uint32(0) + FORWARD = uint32(1) + + PagingInfo* = object + ## This type holds the information needed for the pagination + pageSize*: uint64 + cursor*: Index + direction*: PagingDirection