mirror of https://github.com/waku-org/nwaku.git
Refactor: waku_types -> waku_filter (#331)
Also create a folder with README for filter protocol.
This commit is contained in:
parent
3242b0020b
commit
f860c0dda4
|
@ -6,6 +6,7 @@
|
|||
- Added JSON-RPC Admin API to retrieve information about peers registered on the `wakunode2`
|
||||
- `StrictNoSign` enabled.
|
||||
- Added JSON-RPC Private API to enable using symmetric or asymmetric cryptography to encrypt/decrypt message payloads
|
||||
- Refactor: Move `waku_filter` protocol into its own module.
|
||||
|
||||
## 2020-11-30 v0.1
|
||||
|
||||
|
|
|
@ -19,8 +19,9 @@ import libp2p/[switch, # manage transports, a single entry poi
|
|||
muxers/muxer, # define an interface for stream multiplexing, allowing peers to offer many protocols over a single connection
|
||||
muxers/mplex/mplex] # define some contants and message types for stream multiplexing
|
||||
import ../../waku/v2/node/[config, wakunode2, waku_payload],
|
||||
../../waku/v2/protocol/[waku_relay, waku_filter],
|
||||
../../waku/v2/protocol/[waku_relay],
|
||||
../../waku/v2/protocol/waku_store/waku_store,
|
||||
../../waku/v2/protocol/waku_filter/waku_filter,
|
||||
../../waku/common/utils/nat,
|
||||
../../waku/v2/waku_types
|
||||
|
||||
|
|
|
@ -19,9 +19,9 @@ import
|
|||
admin_api,
|
||||
private_api],
|
||||
../../waku/v2/protocol/message_notifier,
|
||||
../../waku/v2/protocol/waku_filter,
|
||||
../../waku/v2/protocol/waku_store/waku_store,
|
||||
../../waku/v2/protocol/waku_swap/waku_swap,
|
||||
../../waku/v2/protocol/waku_filter/waku_filter,
|
||||
../test_helpers
|
||||
|
||||
template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0]
|
||||
|
|
|
@ -8,7 +8,8 @@ import
|
|||
libp2p/stream/[bufferstream, connection],
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/multistream,
|
||||
../../waku/v2/protocol/[waku_filter, message_notifier],
|
||||
../../waku/v2/protocol/[message_notifier],
|
||||
../../waku/v2/protocol/waku_filter/waku_filter,
|
||||
../../waku/v2/waku_types,
|
||||
../test_helpers, ./utils
|
||||
|
||||
|
|
|
@ -7,8 +7,9 @@ import
|
|||
libp2p/crypto/secp,
|
||||
libp2p/switch,
|
||||
eth/keys,
|
||||
../../waku/v2/protocol/[waku_relay, waku_filter, message_notifier],
|
||||
../../waku/v2/protocol/[waku_relay, message_notifier],
|
||||
../../waku/v2/protocol/waku_store/waku_store,
|
||||
../../waku/v2/protocol/waku_filter/waku_filter,
|
||||
../../waku/v2/node/wakunode2,
|
||||
../test_helpers,
|
||||
../../waku/v2/waku_types
|
||||
|
|
|
@ -7,7 +7,7 @@ import
|
|||
../../waku_types,
|
||||
../../protocol/waku_store/[waku_store_types, waku_store],
|
||||
../../protocol/waku_swap/[waku_swap_types, waku_swap],
|
||||
../../protocol/waku_filter,
|
||||
../../protocol/waku_filter/[waku_filter_types, waku_filter],
|
||||
../wakunode2,
|
||||
./jsonrpc_types
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ import
|
|||
json_rpc/rpcserver,
|
||||
eth/[common, rlp, keys, p2p],
|
||||
../../waku_types,
|
||||
../../protocol/waku_filter/waku_filter_types,
|
||||
../wakunode2,
|
||||
./jsonrpc_types
|
||||
|
||||
|
@ -87,4 +88,4 @@ proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer, messageCache:
|
|||
return true
|
||||
else:
|
||||
# Failed to unsubscribe from one or more content filters
|
||||
raise newException(ValueError, "Failed to unsubscribe from contentFilters " & repr(fReq))
|
||||
raise newException(ValueError, "Failed to unsubscribe from contentFilters " & repr(fReq))
|
||||
|
|
|
@ -6,6 +6,7 @@ import
|
|||
../../protocol/waku_relay,
|
||||
../../waku_types,
|
||||
../../protocol/waku_store/waku_store,
|
||||
../../protocol/waku_filter/waku_filter,
|
||||
../wakunode2
|
||||
|
||||
proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) =
|
||||
|
|
|
@ -10,9 +10,10 @@ import
|
|||
libp2p/protocols/pubsub/pubsub,
|
||||
libp2p/peerinfo,
|
||||
libp2p/standard_setup,
|
||||
../protocol/[waku_relay, waku_filter, message_notifier],
|
||||
../protocol/[waku_relay, message_notifier],
|
||||
../protocol/waku_store/waku_store,
|
||||
../protocol/waku_swap/waku_swap,
|
||||
../protocol/waku_filter/waku_filter,
|
||||
../waku_types,
|
||||
./message_store,
|
||||
./sqlite
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
# Waku Filter protocol
|
||||
|
||||
The filter protocol implements bandwidth preserving filtering for light nodes. See https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-store.md for more information.
|
|
@ -10,19 +10,40 @@ import
|
|||
libp2p/stream/connection,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/switch,
|
||||
./message_notifier,
|
||||
../waku_types
|
||||
../message_notifier,
|
||||
../../waku_types,
|
||||
waku_filter_types
|
||||
|
||||
# NOTE This is just a start, the design of this protocol isn't done yet. It
|
||||
# should be direct payload exchange (a la req-resp), not be coupled with the
|
||||
# relay protocol.
|
||||
|
||||
export waku_filter_types
|
||||
|
||||
logScope:
|
||||
topics = "wakufilter"
|
||||
|
||||
const
|
||||
WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1"
|
||||
|
||||
proc notify*(filters: Filters, msg: WakuMessage, requestId: string = "") =
|
||||
for key in filters.keys:
|
||||
let filter = filters[key]
|
||||
# We do this because the key for the filter is set to the requestId received from the filter protocol.
|
||||
# This means we do not need to check the content filter explicitly as all MessagePushs already contain
|
||||
# the requestId of the coresponding filter.
|
||||
if requestId != "" and requestId == key:
|
||||
filter.handler(msg)
|
||||
continue
|
||||
|
||||
# TODO: In case of no topics we should either trigger here for all messages,
|
||||
# or we should not allow such filter to exist in the first place.
|
||||
for contentFilter in filter.contentFilters:
|
||||
if contentFilter.topics.len > 0:
|
||||
if msg.contentTopic in contentFilter.topics:
|
||||
filter.handler(msg)
|
||||
break
|
||||
|
||||
proc unsubscribeFilters(subscribers: var seq[Subscriber], request: FilterRequest, peerId: PeerID) =
|
||||
# Flatten all unsubscribe topics into single seq
|
||||
var unsubscribeTopics: seq[ContentTopic]
|
|
@ -0,0 +1,49 @@
|
|||
import
|
||||
std/[tables],
|
||||
bearssl,
|
||||
libp2p/[switch, peerinfo],
|
||||
libp2p/protocols/protocol,
|
||||
../../waku_types
|
||||
|
||||
type
|
||||
ContentFilter* = object
|
||||
topics*: seq[ContentTopic]
|
||||
|
||||
ContentFilterHandler* = proc(msg: WakuMessage) {.gcsafe, closure.}
|
||||
|
||||
Filter* = object
|
||||
contentFilters*: seq[ContentFilter]
|
||||
handler*: ContentFilterHandler
|
||||
|
||||
# @TODO MAYBE MORE INFO?
|
||||
Filters* = Table[string, Filter]
|
||||
|
||||
FilterRequest* = object
|
||||
contentFilters*: seq[ContentFilter]
|
||||
topic*: string
|
||||
subscribe*: bool
|
||||
|
||||
MessagePush* = object
|
||||
messages*: seq[WakuMessage]
|
||||
|
||||
FilterRPC* = object
|
||||
requestId*: string
|
||||
request*: FilterRequest
|
||||
push*: MessagePush
|
||||
|
||||
Subscriber* = object
|
||||
peer*: PeerInfo
|
||||
requestId*: string
|
||||
filter*: FilterRequest # @TODO MAKE THIS A SEQUENCE AGAIN?
|
||||
|
||||
MessagePushHandler* = proc(requestId: string, msg: MessagePush) {.gcsafe, closure.}
|
||||
|
||||
FilterPeer* = object
|
||||
peerInfo*: PeerInfo
|
||||
|
||||
WakuFilter* = ref object of LPProtocol
|
||||
rng*: ref BrHmacDrbgContext
|
||||
switch*: Switch
|
||||
peers*: seq[FilterPeer]
|
||||
subscribers*: seq[Subscriber]
|
||||
pushHandler*: MessagePushHandler
|
|
@ -20,6 +20,8 @@ const MaxPageSize* = 100 # Maximum number of waku messages in each page
|
|||
# Common data types -----------------------------------------------------------
|
||||
type
|
||||
|
||||
# Message -------------------------------------------------------------------
|
||||
|
||||
Index* = object
|
||||
## This type contains the description of an Index used in the pagination of WakuMessages
|
||||
digest*: MDigest[256]
|
||||
|
@ -46,47 +48,7 @@ type
|
|||
|
||||
MessageNotificationSubscriptions* = TableRef[MessageNotificationSubscriptionIdentifier, MessageNotificationSubscription]
|
||||
|
||||
FilterRequest* = object
|
||||
contentFilters*: seq[ContentFilter]
|
||||
topic*: string
|
||||
subscribe*: bool
|
||||
|
||||
MessagePush* = object
|
||||
messages*: seq[WakuMessage]
|
||||
|
||||
FilterRPC* = object
|
||||
requestId*: string
|
||||
request*: FilterRequest
|
||||
push*: MessagePush
|
||||
|
||||
Subscriber* = object
|
||||
peer*: PeerInfo
|
||||
requestId*: string
|
||||
filter*: FilterRequest # @TODO MAKE THIS A SEQUENCE AGAIN?
|
||||
|
||||
MessagePushHandler* = proc(requestId: string, msg: MessagePush) {.gcsafe, closure.}
|
||||
|
||||
FilterPeer* = object
|
||||
peerInfo*: PeerInfo
|
||||
|
||||
WakuFilter* = ref object of LPProtocol
|
||||
rng*: ref BrHmacDrbgContext
|
||||
switch*: Switch
|
||||
peers*: seq[FilterPeer]
|
||||
subscribers*: seq[Subscriber]
|
||||
pushHandler*: MessagePushHandler
|
||||
|
||||
ContentFilter* = object
|
||||
topics*: seq[ContentTopic]
|
||||
|
||||
ContentFilterHandler* = proc(msg: WakuMessage) {.gcsafe, closure.}
|
||||
|
||||
Filter* = object
|
||||
contentFilters*: seq[ContentFilter]
|
||||
handler*: ContentFilterHandler
|
||||
|
||||
# @TODO MAYBE MORE INFO?
|
||||
Filters* = Table[string, Filter]
|
||||
# Relay protocol types -------------------------------------------------------
|
||||
|
||||
WakuRelay* = ref object of GossipSub
|
||||
|
||||
|
@ -122,24 +84,6 @@ proc encode*(message: WakuMessage): ProtoBuffer =
|
|||
result.write(2, message.contentTopic)
|
||||
result.write(3, message.version)
|
||||
|
||||
proc notify*(filters: Filters, msg: WakuMessage, requestId: string = "") =
|
||||
for key in filters.keys:
|
||||
let filter = filters[key]
|
||||
# We do this because the key for the filter is set to the requestId received from the filter protocol.
|
||||
# This means we do not need to check the content filter explicitly as all MessagePushs already contain
|
||||
# the requestId of the coresponding filter.
|
||||
if requestId != "" and requestId == key:
|
||||
filter.handler(msg)
|
||||
continue
|
||||
|
||||
# TODO: In case of no topics we should either trigger here for all messages,
|
||||
# or we should not allow such filter to exist in the first place.
|
||||
for contentFilter in filter.contentFilters:
|
||||
if contentFilter.topics.len > 0:
|
||||
if msg.contentTopic in contentFilter.topics:
|
||||
filter.handler(msg)
|
||||
break
|
||||
|
||||
proc generateRequestId*(rng: ref BrHmacDrbgContext): string =
|
||||
var bytes: array[10, byte]
|
||||
brHmacDrbgGenerate(rng[], bytes)
|
||||
|
|
Loading…
Reference in New Issue