From e43612e153e2136e1d60fab74b7011316cdfdf6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Oskar=20Thor=C3=A9n?= Date: Tue, 5 Jan 2021 12:52:10 +0800 Subject: [PATCH] Refactor: waku_types -> waku_filter (#331) Also create a folder with README for filter protocol. --- CHANGELOG.md | 1 + examples/v2/chat2.nim | 3 +- tests/v2/test_jsonrpc_waku.nim | 2 +- tests/v2/test_waku_filter.nim | 3 +- tests/v2/test_wakunode.nim | 3 +- waku/v2/node/jsonrpc/admin_api.nim | 2 +- waku/v2/node/jsonrpc/filter_api.nim | 3 +- waku/v2/node/rpc/wakurpc.nim | 1 + waku/v2/node/wakunode2.nim | 3 +- waku/v2/protocol/waku_filter/README.md | 3 + .../{ => waku_filter}/waku_filter.nim | 25 +++++++- .../waku_filter/waku_filter_types.nim | 49 +++++++++++++++ waku/v2/waku_types.nim | 62 +------------------ 13 files changed, 92 insertions(+), 68 deletions(-) create mode 100644 waku/v2/protocol/waku_filter/README.md rename waku/v2/protocol/{ => waku_filter}/waku_filter.nim (88%) create mode 100644 waku/v2/protocol/waku_filter/waku_filter_types.nim diff --git a/CHANGELOG.md b/CHANGELOG.md index f6d1f89b8..0722d6d18 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - Added JSON-RPC Admin API to retrieve information about peers registered on the `wakunode2` - `StrictNoSign` enabled. - Added JSON-RPC Private API to enable using symmetric or asymmetric cryptography to encrypt/decrypt message payloads +- Refactor: Move `waku_filter` protocol into its own module. ## 2020-11-30 v0.1 diff --git a/examples/v2/chat2.nim b/examples/v2/chat2.nim index 5081f223b..d04d7c006 100644 --- a/examples/v2/chat2.nim +++ b/examples/v2/chat2.nim @@ -19,8 +19,9 @@ import libp2p/[switch, # manage transports, a single entry poi muxers/muxer, # define an interface for stream multiplexing, allowing peers to offer many protocols over a single connection muxers/mplex/mplex] # define some contants and message types for stream multiplexing import ../../waku/v2/node/[config, wakunode2, waku_payload], - ../../waku/v2/protocol/[waku_relay, waku_filter], + ../../waku/v2/protocol/[waku_relay], ../../waku/v2/protocol/waku_store/waku_store, + ../../waku/v2/protocol/waku_filter/waku_filter, ../../waku/common/utils/nat, ../../waku/v2/waku_types diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index 48e194a28..1877d9c5a 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -19,9 +19,9 @@ import admin_api, private_api], ../../waku/v2/protocol/message_notifier, - ../../waku/v2/protocol/waku_filter, ../../waku/v2/protocol/waku_store/waku_store, ../../waku/v2/protocol/waku_swap/waku_swap, + ../../waku/v2/protocol/waku_filter/waku_filter, ../test_helpers template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0] diff --git a/tests/v2/test_waku_filter.nim b/tests/v2/test_waku_filter.nim index 11ab325db..c80211526 100644 --- a/tests/v2/test_waku_filter.nim +++ b/tests/v2/test_waku_filter.nim @@ -8,7 +8,8 @@ import libp2p/stream/[bufferstream, connection], libp2p/crypto/crypto, libp2p/multistream, - ../../waku/v2/protocol/[waku_filter, message_notifier], + ../../waku/v2/protocol/[message_notifier], + ../../waku/v2/protocol/waku_filter/waku_filter, ../../waku/v2/waku_types, ../test_helpers, ./utils diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index 51a211191..b2e27a890 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -7,8 +7,9 @@ import libp2p/crypto/secp, libp2p/switch, eth/keys, - ../../waku/v2/protocol/[waku_relay, waku_filter, message_notifier], + ../../waku/v2/protocol/[waku_relay, message_notifier], ../../waku/v2/protocol/waku_store/waku_store, + ../../waku/v2/protocol/waku_filter/waku_filter, ../../waku/v2/node/wakunode2, ../test_helpers, ../../waku/v2/waku_types diff --git a/waku/v2/node/jsonrpc/admin_api.nim b/waku/v2/node/jsonrpc/admin_api.nim index c69b359af..835510f88 100644 --- a/waku/v2/node/jsonrpc/admin_api.nim +++ b/waku/v2/node/jsonrpc/admin_api.nim @@ -7,7 +7,7 @@ import ../../waku_types, ../../protocol/waku_store/[waku_store_types, waku_store], ../../protocol/waku_swap/[waku_swap_types, waku_swap], - ../../protocol/waku_filter, + ../../protocol/waku_filter/[waku_filter_types, waku_filter], ../wakunode2, ./jsonrpc_types diff --git a/waku/v2/node/jsonrpc/filter_api.nim b/waku/v2/node/jsonrpc/filter_api.nim index ac3cce858..d5f1796bf 100644 --- a/waku/v2/node/jsonrpc/filter_api.nim +++ b/waku/v2/node/jsonrpc/filter_api.nim @@ -5,6 +5,7 @@ import json_rpc/rpcserver, eth/[common, rlp, keys, p2p], ../../waku_types, + ../../protocol/waku_filter/waku_filter_types, ../wakunode2, ./jsonrpc_types @@ -87,4 +88,4 @@ proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer, messageCache: return true else: # Failed to unsubscribe from one or more content filters - raise newException(ValueError, "Failed to unsubscribe from contentFilters " & repr(fReq)) \ No newline at end of file + raise newException(ValueError, "Failed to unsubscribe from contentFilters " & repr(fReq)) diff --git a/waku/v2/node/rpc/wakurpc.nim b/waku/v2/node/rpc/wakurpc.nim index de35f08e6..c5e05feae 100644 --- a/waku/v2/node/rpc/wakurpc.nim +++ b/waku/v2/node/rpc/wakurpc.nim @@ -6,6 +6,7 @@ import ../../protocol/waku_relay, ../../waku_types, ../../protocol/waku_store/waku_store, + ../../protocol/waku_filter/waku_filter, ../wakunode2 proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) = diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 1cc1026c2..96ce32885 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -10,9 +10,10 @@ import libp2p/protocols/pubsub/pubsub, libp2p/peerinfo, libp2p/standard_setup, - ../protocol/[waku_relay, waku_filter, message_notifier], + ../protocol/[waku_relay, message_notifier], ../protocol/waku_store/waku_store, ../protocol/waku_swap/waku_swap, + ../protocol/waku_filter/waku_filter, ../waku_types, ./message_store, ./sqlite diff --git a/waku/v2/protocol/waku_filter/README.md b/waku/v2/protocol/waku_filter/README.md new file mode 100644 index 000000000..8fd844684 --- /dev/null +++ b/waku/v2/protocol/waku_filter/README.md @@ -0,0 +1,3 @@ +# Waku Filter protocol + +The filter protocol implements bandwidth preserving filtering for light nodes. See https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-store.md for more information. diff --git a/waku/v2/protocol/waku_filter.nim b/waku/v2/protocol/waku_filter/waku_filter.nim similarity index 88% rename from waku/v2/protocol/waku_filter.nim rename to waku/v2/protocol/waku_filter/waku_filter.nim index 4a20fd80f..35f2ab062 100644 --- a/waku/v2/protocol/waku_filter.nim +++ b/waku/v2/protocol/waku_filter/waku_filter.nim @@ -10,19 +10,40 @@ import libp2p/stream/connection, libp2p/crypto/crypto, libp2p/switch, - ./message_notifier, - ../waku_types + ../message_notifier, + ../../waku_types, + waku_filter_types # NOTE This is just a start, the design of this protocol isn't done yet. It # should be direct payload exchange (a la req-resp), not be coupled with the # relay protocol. +export waku_filter_types + logScope: topics = "wakufilter" const WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1" +proc notify*(filters: Filters, msg: WakuMessage, requestId: string = "") = + for key in filters.keys: + let filter = filters[key] + # We do this because the key for the filter is set to the requestId received from the filter protocol. + # This means we do not need to check the content filter explicitly as all MessagePushs already contain + # the requestId of the coresponding filter. + if requestId != "" and requestId == key: + filter.handler(msg) + continue + + # TODO: In case of no topics we should either trigger here for all messages, + # or we should not allow such filter to exist in the first place. + for contentFilter in filter.contentFilters: + if contentFilter.topics.len > 0: + if msg.contentTopic in contentFilter.topics: + filter.handler(msg) + break + proc unsubscribeFilters(subscribers: var seq[Subscriber], request: FilterRequest, peerId: PeerID) = # Flatten all unsubscribe topics into single seq var unsubscribeTopics: seq[ContentTopic] diff --git a/waku/v2/protocol/waku_filter/waku_filter_types.nim b/waku/v2/protocol/waku_filter/waku_filter_types.nim new file mode 100644 index 000000000..2c7eaa4ab --- /dev/null +++ b/waku/v2/protocol/waku_filter/waku_filter_types.nim @@ -0,0 +1,49 @@ +import + std/[tables], + bearssl, + libp2p/[switch, peerinfo], + libp2p/protocols/protocol, + ../../waku_types + +type + ContentFilter* = object + topics*: seq[ContentTopic] + + ContentFilterHandler* = proc(msg: WakuMessage) {.gcsafe, closure.} + + Filter* = object + contentFilters*: seq[ContentFilter] + handler*: ContentFilterHandler + + # @TODO MAYBE MORE INFO? + Filters* = Table[string, Filter] + + FilterRequest* = object + contentFilters*: seq[ContentFilter] + topic*: string + subscribe*: bool + + MessagePush* = object + messages*: seq[WakuMessage] + + FilterRPC* = object + requestId*: string + request*: FilterRequest + push*: MessagePush + + Subscriber* = object + peer*: PeerInfo + requestId*: string + filter*: FilterRequest # @TODO MAKE THIS A SEQUENCE AGAIN? + + MessagePushHandler* = proc(requestId: string, msg: MessagePush) {.gcsafe, closure.} + + FilterPeer* = object + peerInfo*: PeerInfo + + WakuFilter* = ref object of LPProtocol + rng*: ref BrHmacDrbgContext + switch*: Switch + peers*: seq[FilterPeer] + subscribers*: seq[Subscriber] + pushHandler*: MessagePushHandler diff --git a/waku/v2/waku_types.nim b/waku/v2/waku_types.nim index d8d6be1a2..d034c5a97 100644 --- a/waku/v2/waku_types.nim +++ b/waku/v2/waku_types.nim @@ -20,6 +20,8 @@ const MaxPageSize* = 100 # Maximum number of waku messages in each page # Common data types ----------------------------------------------------------- type + # Message ------------------------------------------------------------------- + Index* = object ## This type contains the description of an Index used in the pagination of WakuMessages digest*: MDigest[256] @@ -46,47 +48,7 @@ type MessageNotificationSubscriptions* = TableRef[MessageNotificationSubscriptionIdentifier, MessageNotificationSubscription] - FilterRequest* = object - contentFilters*: seq[ContentFilter] - topic*: string - subscribe*: bool - - MessagePush* = object - messages*: seq[WakuMessage] - - FilterRPC* = object - requestId*: string - request*: FilterRequest - push*: MessagePush - - Subscriber* = object - peer*: PeerInfo - requestId*: string - filter*: FilterRequest # @TODO MAKE THIS A SEQUENCE AGAIN? - - MessagePushHandler* = proc(requestId: string, msg: MessagePush) {.gcsafe, closure.} - - FilterPeer* = object - peerInfo*: PeerInfo - - WakuFilter* = ref object of LPProtocol - rng*: ref BrHmacDrbgContext - switch*: Switch - peers*: seq[FilterPeer] - subscribers*: seq[Subscriber] - pushHandler*: MessagePushHandler - - ContentFilter* = object - topics*: seq[ContentTopic] - - ContentFilterHandler* = proc(msg: WakuMessage) {.gcsafe, closure.} - - Filter* = object - contentFilters*: seq[ContentFilter] - handler*: ContentFilterHandler - - # @TODO MAYBE MORE INFO? - Filters* = Table[string, Filter] + # Relay protocol types ------------------------------------------------------- WakuRelay* = ref object of GossipSub @@ -122,24 +84,6 @@ proc encode*(message: WakuMessage): ProtoBuffer = result.write(2, message.contentTopic) result.write(3, message.version) -proc notify*(filters: Filters, msg: WakuMessage, requestId: string = "") = - for key in filters.keys: - let filter = filters[key] - # We do this because the key for the filter is set to the requestId received from the filter protocol. - # This means we do not need to check the content filter explicitly as all MessagePushs already contain - # the requestId of the coresponding filter. - if requestId != "" and requestId == key: - filter.handler(msg) - continue - - # TODO: In case of no topics we should either trigger here for all messages, - # or we should not allow such filter to exist in the first place. - for contentFilter in filter.contentFilters: - if contentFilter.topics.len > 0: - if msg.contentTopic in contentFilter.topics: - filter.handler(msg) - break - proc generateRequestId*(rng: ref BrHmacDrbgContext): string = var bytes: array[10, byte] brHmacDrbgGenerate(rng[], bytes)