diff --git a/CHANGELOG.md b/CHANGELOG.md index af6abfa6d..69e76954f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## Next version - Refactor: Split out `waku_types` types into right place; create utils folder. +- Refactor: Replace sequence of ContentTopics in ContentFilter with a single ContentTopic. - Docs: Add information on how to query Status test fleet for node addresses; how to view logs and how to update submodules. - PubSub topic `subscribe` and `unsubscribe` no longer returns a future (removed `async` designation) - Added a peer manager for `relay`, `filter`, `store` and `swap` peers. diff --git a/examples/v2/chat2.nim b/examples/v2/chat2.nim index 7b127a5e3..ca9b022c6 100644 --- a/examples/v2/chat2.nim +++ b/examples/v2/chat2.nim @@ -318,7 +318,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = info "Hit filter handler" await node.subscribe( - FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[DefaultContentTopic])], pubSubTopic: DefaultTopic, subscribe: true), + FilterRequest(contentFilters: @[ContentFilter(contentTopic: DefaultContentTopic)], pubSubTopic: DefaultTopic, subscribe: true), filterHandler ) diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index cd8ea3a00..79b140d31 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -291,8 +291,11 @@ procSuite "Waku v2 JSON-RPC API": # Light node has not yet subscribed to any filters node.filters.len() == 0 - let contentFilters = @[ContentFilter(contentTopics: @[defaultContentTopic, ContentTopic("2")]), - ContentFilter(contentTopics: @[ContentTopic("3"), ContentTopic("4")])] + let contentFilters = @[ContentFilter(contentTopic: defaultContentTopic), + ContentFilter(contentTopic: ContentTopic("2")), + ContentFilter(contentTopic: ContentTopic("3")), + ContentFilter(contentTopic: ContentTopic("4")), + ] var response = await client.post_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(defaultTopic)) check: @@ -330,7 +333,7 @@ procSuite "Waku v2 JSON-RPC API": # First ensure subscription exists - let sub = await client.post_waku_v2_filter_v1_subscription(contentFilters = @[ContentFilter(contentTopics: @[defaultContentTopic])], topic = some(defaultTopic)) + let sub = await client.post_waku_v2_filter_v1_subscription(contentFilters = @[ContentFilter(contentTopic: defaultContentTopic)], topic = some(defaultTopic)) check: sub @@ -730,4 +733,4 @@ procSuite "Waku v2 JSON-RPC API": server3.close() await node1.stop() await node2.stop() - await node3.stop() + await node3.stop() \ No newline at end of file diff --git a/tests/v2/test_waku_filter.nim b/tests/v2/test_waku_filter.nim index 40309c347..80f7cd8b7 100644 --- a/tests/v2/test_waku_filter.nim +++ b/tests/v2/test_waku_filter.nim @@ -1,155 +1,155 @@ -{.used.} - -import - std/[options, tables, sets], - testutils/unittests, chronos, chronicles, - libp2p/switch, - libp2p/protobuf/minprotobuf, - libp2p/stream/[bufferstream, connection], - libp2p/crypto/crypto, - libp2p/multistream, - ../../waku/v2/node/peer_manager/peer_manager, - ../../waku/v2/protocol/message_notifier, - ../../waku/v2/protocol/waku_filter/waku_filter, - ../test_helpers, ./utils - -procSuite "Waku Filter": - - asyncTest "handle filter": - const defaultTopic = "/waku/2/default-waku/proto" - - let - key = PrivateKey.random(ECDSA, rng[]).get() - peer = PeerInfo.init(key) - contentTopic = ContentTopic("/waku/2/default-content/proto") - post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic) - - var dialSwitch = newStandardSwitch() - discard await dialSwitch.start() - - var listenSwitch = newStandardSwitch(some(key)) - discard await listenSwitch.start() - - var responseRequestIdFuture = newFuture[string]() - proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = - check: - msg.messages.len() == 1 - msg.messages[0] == post - responseRequestIdFuture.complete(requestId) - - let - proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle) - rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], pubSubTopic: defaultTopic, subscribe: true) - - dialSwitch.mount(proto) - proto.setPeer(listenSwitch.peerInfo) - - proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = - discard - - let - proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle) - subscription = proto2.subscription() - - var subscriptions = newTable[string, MessageNotificationSubscription]() - subscriptions["test"] = subscription - listenSwitch.mount(proto2) - - let id = (await proto.subscribe(rpc)).get() - - await sleepAsync(2.seconds) - - await subscriptions.notify(defaultTopic, post) - - check: - (await responseRequestIdFuture) == id - - asyncTest "Can subscribe and unsubscribe from content filter": - const defaultTopic = "/waku/2/default-waku/proto" - - let - key = PrivateKey.random(ECDSA, rng[]).get() - peer = PeerInfo.init(key) - contentTopic = ContentTopic("/waku/2/default-content/proto") - post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic) - - var dialSwitch = newStandardSwitch() - discard await dialSwitch.start() - - var listenSwitch = newStandardSwitch(some(key)) - discard await listenSwitch.start() - - var responseCompletionFuture = newFuture[bool]() - proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = - check: - msg.messages.len() == 1 - msg.messages[0] == post - responseCompletionFuture.complete(true) - - let - proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle) - rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], pubSubTopic: defaultTopic, subscribe: true) - - dialSwitch.mount(proto) - proto.setPeer(listenSwitch.peerInfo) - - proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = - discard - - let - proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle) - subscription = proto2.subscription() - - var subscriptions = newTable[string, MessageNotificationSubscription]() - subscriptions["test"] = subscription - listenSwitch.mount(proto2) - - let id = (await proto.subscribe(rpc)).get() - - await sleepAsync(2.seconds) - - await subscriptions.notify(defaultTopic, post) - - check: - # Check that subscription works as expected - (await responseCompletionFuture.withTimeout(3.seconds)) == true - - # Reset to test unsubscribe - responseCompletionFuture = newFuture[bool]() - - let - rpcU = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], pubSubTopic: defaultTopic, subscribe: false) - - await proto.unsubscribe(rpcU) - - await sleepAsync(2.seconds) - - await subscriptions.notify(defaultTopic, post) - - check: - # Check that unsubscribe works as expected - (await responseCompletionFuture.withTimeout(5.seconds)) == false - - asyncTest "handle filter subscribe failures": - const defaultTopic = "/waku/2/default-waku/proto" - - let - contentTopic = ContentTopic("/waku/2/default-content/proto") - - var dialSwitch = newStandardSwitch() - discard await dialSwitch.start() - - var responseRequestIdFuture = newFuture[string]() - proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = - discard - - let - proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle) - rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], pubSubTopic: defaultTopic, subscribe: true) - - dialSwitch.mount(proto) - - let idOpt = (await proto.subscribe(rpc)) - - check: - idOpt.isNone +{.used.} + +import + std/[options, tables, sets], + testutils/unittests, chronos, chronicles, + libp2p/switch, + libp2p/protobuf/minprotobuf, + libp2p/stream/[bufferstream, connection], + libp2p/crypto/crypto, + libp2p/multistream, + ../../waku/v2/node/peer_manager/peer_manager, + ../../waku/v2/protocol/message_notifier, + ../../waku/v2/protocol/waku_filter/waku_filter, + ../test_helpers, ./utils + +procSuite "Waku Filter": + + asyncTest "handle filter": + const defaultTopic = "/waku/2/default-waku/proto" + + let + key = PrivateKey.random(ECDSA, rng[]).get() + peer = PeerInfo.init(key) + contentTopic = ContentTopic("/waku/2/default-content/proto") + post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic) + + var dialSwitch = newStandardSwitch() + discard await dialSwitch.start() + + var listenSwitch = newStandardSwitch(some(key)) + discard await listenSwitch.start() + + var responseRequestIdFuture = newFuture[string]() + proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = + check: + msg.messages.len() == 1 + msg.messages[0] == post + responseRequestIdFuture.complete(requestId) + + let + proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle) + rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true) + + dialSwitch.mount(proto) + proto.setPeer(listenSwitch.peerInfo) + + proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = + discard + + let + proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle) + subscription = proto2.subscription() + + var subscriptions = newTable[string, MessageNotificationSubscription]() + subscriptions["test"] = subscription + listenSwitch.mount(proto2) + + let id = (await proto.subscribe(rpc)).get() + + await sleepAsync(2.seconds) + + await subscriptions.notify(defaultTopic, post) + + check: + (await responseRequestIdFuture) == id + + asyncTest "Can subscribe and unsubscribe from content filter": + const defaultTopic = "/waku/2/default-waku/proto" + + let + key = PrivateKey.random(ECDSA, rng[]).get() + peer = PeerInfo.init(key) + contentTopic = ContentTopic("/waku/2/default-content/proto") + post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic) + + var dialSwitch = newStandardSwitch() + discard await dialSwitch.start() + + var listenSwitch = newStandardSwitch(some(key)) + discard await listenSwitch.start() + + var responseCompletionFuture = newFuture[bool]() + proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = + check: + msg.messages.len() == 1 + msg.messages[0] == post + responseCompletionFuture.complete(true) + + let + proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle) + rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true) + + dialSwitch.mount(proto) + proto.setPeer(listenSwitch.peerInfo) + + proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = + discard + + let + proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle) + subscription = proto2.subscription() + + var subscriptions = newTable[string, MessageNotificationSubscription]() + subscriptions["test"] = subscription + listenSwitch.mount(proto2) + + let id = (await proto.subscribe(rpc)).get() + + await sleepAsync(2.seconds) + + await subscriptions.notify(defaultTopic, post) + + check: + # Check that subscription works as expected + (await responseCompletionFuture.withTimeout(3.seconds)) == true + + # Reset to test unsubscribe + responseCompletionFuture = newFuture[bool]() + + let + rpcU = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: false) + + await proto.unsubscribe(rpcU) + + await sleepAsync(2.seconds) + + await subscriptions.notify(defaultTopic, post) + + check: + # Check that unsubscribe works as expected + (await responseCompletionFuture.withTimeout(5.seconds)) == false + + asyncTest "handle filter subscribe failures": + const defaultTopic = "/waku/2/default-waku/proto" + + let + contentTopic = ContentTopic("/waku/2/default-content/proto") + + var dialSwitch = newStandardSwitch() + discard await dialSwitch.start() + + var responseRequestIdFuture = newFuture[string]() + proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = + discard + + let + proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle) + rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true) + + dialSwitch.mount(proto) + + let idOpt = (await proto.subscribe(rpc)) + + check: + idOpt.isNone diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index 1df4fe1c2..b3acd563e 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -30,7 +30,7 @@ procSuite "WakuNode": Port(60000)) pubSubTopic = "chat" contentTopic = ContentTopic("/waku/2/default-content/proto") - filterRequest = FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], subscribe: true) + filterRequest = FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true) message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) @@ -82,7 +82,7 @@ procSuite "WakuNode": Port(60002)) pubSubTopic = "chat" contentTopic = ContentTopic("/waku/2/default-content/proto") - filterRequest = FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], subscribe: true) + filterRequest = FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true) message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) @@ -149,8 +149,8 @@ procSuite "WakuNode": otherPayload = @[byte 9] defaultMessage = WakuMessage(payload: defaultPayload, contentTopic: defaultContentTopic) otherMessage = WakuMessage(payload: otherPayload, contentTopic: otherContentTopic) - defaultFR = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[defaultContentTopic])], subscribe: true) - otherFR = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[otherContentTopic])], subscribe: true) + defaultFR = FilterRequest(contentFilters: @[ContentFilter(contentTopic: defaultContentTopic)], subscribe: true) + otherFR = FilterRequest(contentFilters: @[ContentFilter(contentTopic: otherContentTopic)], subscribe: true) await node1.start() node1.mountRelay() @@ -221,7 +221,7 @@ procSuite "WakuNode": contentTopic = "defaultCT" payload = @[byte 1] message = WakuMessage(payload: payload, contentTopic: contentTopic) - filterRequest = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], subscribe: true) + filterRequest = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true) await node1.start() node1.mountRelay() @@ -322,7 +322,7 @@ procSuite "WakuNode": msg == message completionFut.complete(true) - await node1.subscribe(FilterRequest(pubSubTopic: "/waku/2/default-waku/proto", contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], subscribe: true), handler) + await node1.subscribe(FilterRequest(pubSubTopic: "/waku/2/default-waku/proto", contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true), handler) await sleepAsync(2000.millis) @@ -679,4 +679,4 @@ procSuite "WakuNode": # (await completionFutLightPush.withTimeout(5.seconds)) == true # await node1.stop() # await node2.stop() - # await node3.stop() + # await node3.stop() \ No newline at end of file diff --git a/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool b/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool index 17117af58..bf261ee52 100755 --- a/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool +++ b/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool @@ -2,7 +2,7 @@ # libtool - Provide generalized library-building support services. # Generated automatically by config.status (libbacktrace) version-unused -# Libtool was configured on host fv-az193-336: +# Libtool was configured on host fv-az190-951: # NOTE: Changes made to this file will be lost: look at ltmain.sh. # # Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005, diff --git a/waku/v2/node/config.nim b/waku/v2/node/config.nim index d986cb765..f0eb6989e 100644 --- a/waku/v2/node/config.nim +++ b/waku/v2/node/config.nim @@ -192,4 +192,4 @@ proc completeCmdArg*(T: type Port, val: TaintedString): seq[string] = func defaultListenAddress*(conf: WakuNodeConf): ValidIpAddress = # TODO: How should we select between IPv4 and IPv6 # Maybe there should be a config option for this. - (static ValidIpAddress.init("0.0.0.0")) + (static ValidIpAddress.init("0.0.0.0")) \ No newline at end of file diff --git a/waku/v2/node/jsonrpc/filter_api.nim b/waku/v2/node/jsonrpc/filter_api.nim index 415c97902..98067a1de 100644 --- a/waku/v2/node/jsonrpc/filter_api.nim +++ b/waku/v2/node/jsonrpc/filter_api.nim @@ -1,93 +1,93 @@ -{.push raises: [Exception, Defect].} - -import - std/[tables,sequtils], - json_rpc/rpcserver, - eth/[common, rlp, keys, p2p], - ../../protocol/waku_filter/waku_filter_types, - ../wakunode2, - ./jsonrpc_types - -export jsonrpc_types - -logScope: - topics = "filter api" - -const futTimeout* = 5.seconds # Max time to wait for futures -const maxCache* = 30 # Max number of messages cached per topic @TODO make this configurable - -proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer, messageCache: MessageCache) = - - proc filterHandler(msg: WakuMessage) {.gcsafe, closure.} = - # Add message to current cache - trace "WakuMessage received", msg=msg - - # Make a copy of msgs for this topic to modify - var msgs = messageCache.getOrDefault(msg.contentTopic, @[]) - - if msgs.len >= maxCache: - # Message cache on this topic exceeds maximum. Delete oldest. - # @TODO this may become a bottle neck if called as the norm rather than exception when adding messages. Performance profile needed. - msgs.delete(0,0) - msgs.add(msg) - - # Replace indexed entry with copy - # @TODO max number of content topics could be limited in node - messageCache[msg.contentTopic] = msgs - - ## Filter API version 1 definitions - - rpcsrv.rpc("get_waku_v2_filter_v1_messages") do(contentTopic: ContentTopic) -> seq[WakuMessage]: - ## Returns all WakuMessages received on a content topic since the - ## last time this method was called - ## @TODO ability to specify a return message limit - debug "get_waku_v2_filter_v1_messages", contentTopic=contentTopic - - if messageCache.hasKey(contentTopic): - let msgs = messageCache[contentTopic] - # Clear cache before next call - messageCache[contentTopic] = @[] - return msgs - else: - # Not subscribed to this content topic - raise newException(ValueError, "Not subscribed to content topic: " & $contentTopic) - - rpcsrv.rpc("post_waku_v2_filter_v1_subscription") do(contentFilters: seq[ContentFilter], topic: Option[string]) -> bool: - ## Subscribes a node to a list of content filters - debug "post_waku_v2_filter_v1_subscription" - - # Construct a filter request - # @TODO use default PubSub topic if undefined - let fReq = if topic.isSome: FilterRequest(pubSubTopic: topic.get, contentFilters: contentFilters, subscribe: true) else: FilterRequest(contentFilters: contentFilters, subscribe: true) - - if (await node.subscribe(fReq, filterHandler).withTimeout(futTimeout)): - # Successfully subscribed to all content filters - - for cTopic in concat(contentFilters.mapIt(it.contentTopics)): - # Create message cache for each subscribed content topic - messageCache[cTopic] = @[] - - return true - else: - # Failed to subscribe to one or more content filters - raise newException(ValueError, "Failed to subscribe to contentFilters " & repr(fReq)) - - rpcsrv.rpc("delete_waku_v2_filter_v1_subscription") do(contentFilters: seq[ContentFilter], topic: Option[string]) -> bool: - ## Unsubscribes a node from a list of content filters - debug "delete_waku_v2_filter_v1_subscription" - - # Construct a filter request - # @TODO consider using default PubSub topic if undefined - let fReq = if topic.isSome: FilterRequest(pubSubTopic: topic.get, contentFilters: contentFilters, subscribe: false) else: FilterRequest(contentFilters: contentFilters, subscribe: false) - - if (await node.unsubscribe(fReq).withTimeout(futTimeout)): - # Successfully unsubscribed from all content filters - - for cTopic in concat(contentFilters.mapIt(it.contentTopics)): - # Remove message cache for each unsubscribed content topic - messageCache.del(cTopic) - - return true - else: - # Failed to unsubscribe from one or more content filters - raise newException(ValueError, "Failed to unsubscribe from contentFilters " & repr(fReq)) +{.push raises: [Exception, Defect].} + +import + std/[tables,sequtils], + json_rpc/rpcserver, + eth/[common, rlp, keys, p2p], + ../../protocol/waku_filter/waku_filter_types, + ../wakunode2, + ./jsonrpc_types + +export jsonrpc_types + +logScope: + topics = "filter api" + +const futTimeout* = 5.seconds # Max time to wait for futures +const maxCache* = 30 # Max number of messages cached per topic @TODO make this configurable + +proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer, messageCache: MessageCache) = + + proc filterHandler(msg: WakuMessage) {.gcsafe, closure.} = + # Add message to current cache + trace "WakuMessage received", msg=msg + + # Make a copy of msgs for this topic to modify + var msgs = messageCache.getOrDefault(msg.contentTopic, @[]) + + if msgs.len >= maxCache: + # Message cache on this topic exceeds maximum. Delete oldest. + # @TODO this may become a bottle neck if called as the norm rather than exception when adding messages. Performance profile needed. + msgs.delete(0,0) + msgs.add(msg) + + # Replace indexed entry with copy + # @TODO max number of content topics could be limited in node + messageCache[msg.contentTopic] = msgs + + ## Filter API version 1 definitions + + rpcsrv.rpc("get_waku_v2_filter_v1_messages") do(contentTopic: ContentTopic) -> seq[WakuMessage]: + ## Returns all WakuMessages received on a content topic since the + ## last time this method was called + ## @TODO ability to specify a return message limit + debug "get_waku_v2_filter_v1_messages", contentTopic=contentTopic + + if messageCache.hasKey(contentTopic): + let msgs = messageCache[contentTopic] + # Clear cache before next call + messageCache[contentTopic] = @[] + return msgs + else: + # Not subscribed to this content topic + raise newException(ValueError, "Not subscribed to content topic: " & $contentTopic) + + rpcsrv.rpc("post_waku_v2_filter_v1_subscription") do(contentFilters: seq[ContentFilter], topic: Option[string]) -> bool: + ## Subscribes a node to a list of content filters + debug "post_waku_v2_filter_v1_subscription" + + # Construct a filter request + # @TODO use default PubSub topic if undefined + let fReq = if topic.isSome: FilterRequest(pubSubTopic: topic.get, contentFilters: contentFilters, subscribe: true) else: FilterRequest(contentFilters: contentFilters, subscribe: true) + + if (await node.subscribe(fReq, filterHandler).withTimeout(futTimeout)): + # Successfully subscribed to all content filters + + for cTopic in contentFilters.mapIt(it.contentTopic): + # Create message cache for each subscribed content topic + messageCache[cTopic] = @[] + + return true + else: + # Failed to subscribe to one or more content filters + raise newException(ValueError, "Failed to subscribe to contentFilters " & repr(fReq)) + + rpcsrv.rpc("delete_waku_v2_filter_v1_subscription") do(contentFilters: seq[ContentFilter], topic: Option[string]) -> bool: + ## Unsubscribes a node from a list of content filters + debug "delete_waku_v2_filter_v1_subscription" + + # Construct a filter request + # @TODO consider using default PubSub topic if undefined + let fReq = if topic.isSome: FilterRequest(pubSubTopic: topic.get, contentFilters: contentFilters, subscribe: false) else: FilterRequest(contentFilters: contentFilters, subscribe: false) + + if (await node.unsubscribe(fReq).withTimeout(futTimeout)): + # Successfully unsubscribed from all content filters + + for cTopic in contentFilters.mapIt(it.contentTopic): + # Remove message cache for each unsubscribed content topic + messageCache.del(cTopic) + + return true + else: + # Failed to unsubscribe from one or more content filters + raise newException(ValueError, "Failed to unsubscribe from contentFilters " & repr(fReq)) diff --git a/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim b/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim index 0ce5db071..5fe9389fc 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim +++ b/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim @@ -32,4 +32,4 @@ proc get_waku_v2_private_v1_symmetric_messages(topic: string, symkey: string): s # Asymmetric proc get_waku_v2_private_v1_asymmetric_keypair(): WakuKeyPair proc post_waku_v2_private_v1_asymmetric_message(topic: string, message: WakuRelayMessage, publicKey: string): bool -proc get_waku_v2_private_v1_asymmetric_messages(topic: string, privateKey: string): seq[WakuRelayMessage] +proc get_waku_v2_private_v1_asymmetric_messages(topic: string, privateKey: string): seq[WakuRelayMessage] \ No newline at end of file diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 44ddf9198..d1cbcc555 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -81,21 +81,17 @@ func asEthKey*(key: PrivateKey): keys.PrivateKey = proc removeContentFilters(filters: var Filters, contentFilters: seq[ContentFilter]) {.gcsafe.} = # Flatten all unsubscribe topics into single seq - var unsubscribeTopics: seq[ContentTopic] - for cf in contentFilters: - unsubscribeTopics = unsubscribeTopics.concat(cf.contentTopics) + let unsubscribeTopics = contentFilters.mapIt(it.contentTopic) debug "unsubscribing", unsubscribeTopics=unsubscribeTopics var rIdToRemove: seq[string] = @[] for rId, f in filters.mpairs: # Iterate filter entries to remove matching content topics - for cf in f.contentFilters.mitems: - # Iterate content filters in filter entry - cf.contentTopics.keepIf(proc (t: auto): bool = t notin unsubscribeTopics) + # make sure we delete the content filter # if no more topics are left - f.contentFilters.keepIf(proc (cf: auto): bool = cf.contentTopics.len > 0) + f.contentFilters.keepIf(proc (cf: auto): bool = cf.contentTopic notin unsubscribeTopics) if f.contentFilters.len == 0: rIdToRemove.add(rId) @@ -744,4 +740,4 @@ when isMainModule: c_signal(SIGTERM, handleSigterm) - runForever() + runForever() \ No newline at end of file diff --git a/waku/v2/protocol/waku_filter/waku_filter.nim b/waku/v2/protocol/waku_filter/waku_filter.nim index 780b31367..9cad753fc 100644 --- a/waku/v2/protocol/waku_filter/waku_filter.nim +++ b/waku/v2/protocol/waku_filter/waku_filter.nim @@ -1,268 +1,258 @@ -import - std/[tables, sequtils, options], - bearssl, - chronos, chronicles, metrics, stew/results, - libp2p/protocols/pubsub/pubsubpeer, - libp2p/protocols/pubsub/floodsub, - libp2p/protocols/pubsub/gossipsub, - libp2p/protocols/protocol, - libp2p/protobuf/minprotobuf, - libp2p/stream/connection, - libp2p/crypto/crypto, - ../message_notifier, - waku_filter_types, - ../../utils/requests, - ../../node/peer_manager/peer_manager - -# NOTE This is just a start, the design of this protocol isn't done yet. It -# should be direct payload exchange (a la req-resp), not be coupled with the -# relay protocol. - -export waku_filter_types - -declarePublicGauge waku_filter_peers, "number of filter peers" -declarePublicGauge waku_filter_subscribers, "number of light node filter subscribers" -declarePublicGauge waku_filter_errors, "number of filter protocol errors", ["type"] - -logScope: - topics = "wakufilter" - -const - WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1" - -# Error types (metric label values) -const - dialFailure = "dial_failure" - decodeRpcFailure = "decode_rpc_failure" - -proc notify*(filters: Filters, msg: WakuMessage, requestId: string = "") = - for key in filters.keys: - let filter = filters[key] - # We do this because the key for the filter is set to the requestId received from the filter protocol. - # This means we do not need to check the content filter explicitly as all MessagePushs already contain - # the requestId of the coresponding filter. - if requestId != "" and requestId == key: - filter.handler(msg) - continue - - # TODO: In case of no topics we should either trigger here for all messages, - # or we should not allow such filter to exist in the first place. - for contentFilter in filter.contentFilters: - if contentFilter.contentTopics.len > 0: - if msg.contentTopic in contentFilter.contentTopics: - filter.handler(msg) - break - -proc unsubscribeFilters(subscribers: var seq[Subscriber], request: FilterRequest, peerId: PeerID) = - # Flatten all unsubscribe topics into single seq - var unsubscribeTopics: seq[ContentTopic] - for cf in request.contentFilters: - unsubscribeTopics = unsubscribeTopics.concat(cf.contentTopics) - - debug "unsubscribing", peerId=peerId, unsubscribeTopics=unsubscribeTopics - - for subscriber in subscribers.mitems: - if subscriber.peer.peerId != peerId: continue - - # Iterate through subscriber entries matching peer ID to remove matching content topics - for cf in subscriber.filter.contentFilters.mitems: - # Iterate content filters in filter entry - cf.contentTopics.keepIf(proc (t: auto): bool = t notin unsubscribeTopics) - - # make sure we delete the content filter - # if no more topics are left - subscriber.filter.contentFilters.keepIf(proc (cf: auto): bool = cf.contentTopics.len > 0) - - # make sure we delete the subscriber - # if no more content filters left - subscribers.keepIf(proc (s: auto): bool = s.filter.contentFilters.len > 0) - - debug "subscribers modified", subscribers=subscribers - # @TODO: metrics? - -proc encode*(filter: ContentFilter): ProtoBuffer = - result = initProtoBuffer() - - for contentTopic in filter.contentTopics: - result.write(1, contentTopic) - -proc encode*(rpc: FilterRequest): ProtoBuffer = - result = initProtoBuffer() - - result.write(1, uint64(rpc.subscribe)) - - result.write(2, rpc.pubSubTopic) - - for filter in rpc.contentFilters: - result.write(3, filter.encode()) - -proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] = - let pb = initProtoBuffer(buffer) - - var contentTopics: seq[ContentTopic] - discard ? pb.getRepeatedField(1, contentTopics) - - ok(ContentFilter(contentTopics: contentTopics)) - -proc init*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] = - var rpc = FilterRequest(contentFilters: @[], pubSubTopic: "") - let pb = initProtoBuffer(buffer) - - var subflag: uint64 - if ? pb.getField(1, subflag): - rpc.subscribe = bool(subflag) - - discard ? pb.getField(2, rpc.pubSubTopic) - - var buffs: seq[seq[byte]] - discard ? pb.getRepeatedField(3, buffs) - - for buf in buffs: - rpc.contentFilters.add(? ContentFilter.init(buf)) - - ok(rpc) - -proc encode*(push: MessagePush): ProtoBuffer = - result = initProtoBuffer() - - for push in push.messages: - result.write(1, push.encode()) - -proc init*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] = - var push = MessagePush() - let pb = initProtoBuffer(buffer) - - var messages: seq[seq[byte]] - discard ? pb.getRepeatedField(1, messages) - - for buf in messages: - push.messages.add(? WakuMessage.init(buf)) - - ok(push) - -proc init*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] = - var rpc = FilterRPC() - let pb = initProtoBuffer(buffer) - - discard ? pb.getField(1, rpc.requestId) - - var requestBuffer: seq[byte] - discard ? pb.getField(2, requestBuffer) - - rpc.request = ? FilterRequest.init(requestBuffer) - - var pushBuffer: seq[byte] - discard ? pb.getField(3, pushBuffer) - - rpc.push = ? MessagePush.init(pushBuffer) - - ok(rpc) - -proc encode*(rpc: FilterRPC): ProtoBuffer = - result = initProtoBuffer() - - result.write(1, rpc.requestId) - result.write(2, rpc.request.encode()) - result.write(3, rpc.push.encode()) - -method init*(wf: WakuFilter) = - proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = - var message = await conn.readLp(64*1024) - var res = FilterRPC.init(message) - if res.isErr: - error "failed to decode rpc" - waku_filter_errors.inc(labelValues = [decodeRpcFailure]) - return - - info "filter message received" - - let value = res.value - if value.push != MessagePush(): - wf.pushHandler(value.requestId, value.push) - if value.request != FilterRequest(): - if value.request.subscribe: - wf.subscribers.add(Subscriber(peer: conn.peerInfo, requestId: value.requestId, filter: value.request)) - else: - wf.subscribers.unsubscribeFilters(value.request, conn.peerInfo.peerId) - - waku_filter_subscribers.set(wf.subscribers.len.int64) - - wf.handler = handle - wf.codec = WakuFilterCodec - -proc init*(T: type WakuFilter, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: MessagePushHandler): T = - new result - result.rng = crypto.newRng() - result.peerManager = peerManager - result.pushHandler = handler - result.init() - -proc setPeer*(wf: WakuFilter, peer: PeerInfo) = - wf.peerManager.addPeer(peer, WakuFilterCodec) - waku_filter_peers.inc() - -proc subscription*(proto: WakuFilter): MessageNotificationSubscription = - ## Returns a Filter for the specific protocol - ## This filter can then be used to send messages to subscribers that match conditions. - proc handle(topic: string, msg: WakuMessage) {.async.} = - trace "handle WakuFilter subscription", topic=topic, msg=msg - - for subscriber in proto.subscribers: - if subscriber.filter.pubSubTopic != "" and subscriber.filter.pubSubTopic != topic: - trace "Subscriber's filter pubsubTopic does not match message topic", filter=subscriber.filter.pubSubTopic, topic=topic - continue - - for filter in subscriber.filter.contentFilters: - if msg.contentTopic in filter.contentTopics: - trace "Found matching contentTopic", filter=filter, msg=msg - let push = FilterRPC(requestId: subscriber.requestId, push: MessagePush(messages: @[msg])) - - let connOpt = await proto.peerManager.dialPeer(subscriber.peer, WakuFilterCodec) - - if connOpt.isSome: - await connOpt.get().writeLP(push.encode().buffer) - else: - # @TODO more sophisticated error handling here - error "failed to push messages to remote peer" - waku_filter_errors.inc(labelValues = [dialFailure]) - break - - MessageNotificationSubscription.init(@[], handle) - -proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[Option[string]] {.async, gcsafe.} = - let peerOpt = wf.peerManager.selectPeer(WakuFilterCodec) - - if peerOpt.isSome: - let peer = peerOpt.get() - - let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec) - - if connOpt.isSome: - # This is the only successful path to subscription - let id = generateRequestId(wf.rng) - await connOpt.get().writeLP(FilterRPC(requestId: id, request: request).encode().buffer) - return some(id) - else: - # @TODO more sophisticated error handling here - error "failed to connect to remote peer" - waku_filter_errors.inc(labelValues = [dialFailure]) - return none(string) - -proc unsubscribe*(wf: WakuFilter, request: FilterRequest) {.async, gcsafe.} = - # @TODO: NO REAL REASON TO GENERATE REQUEST ID FOR UNSUBSCRIBE OTHER THAN CREATING SANE-LOOKING RPC. - let - id = generateRequestId(wf.rng) - peerOpt = wf.peerManager.selectPeer(WakuFilterCodec) - - if peerOpt.isSome: - # @TODO: if there are more than one WakuFilter peer, WakuFilter should unsubscribe from all peers - let peer = peerOpt.get() - - let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec) - - if connOpt.isSome: - await connOpt.get().writeLP(FilterRPC(requestId: id, request: request).encode().buffer) - else: - # @TODO more sophisticated error handling here - error "failed to connect to remote peer" - waku_filter_errors.inc(labelValues = [dialFailure]) +import + std/[tables, sequtils, options], + bearssl, + chronos, chronicles, metrics, stew/results, + libp2p/protocols/pubsub/pubsubpeer, + libp2p/protocols/pubsub/floodsub, + libp2p/protocols/pubsub/gossipsub, + libp2p/protocols/protocol, + libp2p/protobuf/minprotobuf, + libp2p/stream/connection, + libp2p/crypto/crypto, + ../message_notifier, + waku_filter_types, + ../../utils/requests, + ../../node/peer_manager/peer_manager + +# NOTE This is just a start, the design of this protocol isn't done yet. It +# should be direct payload exchange (a la req-resp), not be coupled with the +# relay protocol. + +export waku_filter_types + +declarePublicGauge waku_filter_peers, "number of filter peers" +declarePublicGauge waku_filter_subscribers, "number of light node filter subscribers" +declarePublicGauge waku_filter_errors, "number of filter protocol errors", ["type"] + +logScope: + topics = "wakufilter" + +const + WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1" + +# Error types (metric label values) +const + dialFailure = "dial_failure" + decodeRpcFailure = "decode_rpc_failure" + +proc notify*(filters: Filters, msg: WakuMessage, requestId: string = "") = + for key in filters.keys: + let filter = filters[key] + # We do this because the key for the filter is set to the requestId received from the filter protocol. + # This means we do not need to check the content filter explicitly as all MessagePushs already contain + # the requestId of the coresponding filter. + if requestId != "" and requestId == key: + filter.handler(msg) + continue + + # TODO: In case of no topics we should either trigger here for all messages, + # or we should not allow such filter to exist in the first place. + for contentFilter in filter.contentFilters: + if msg.contentTopic == contentFilter.contentTopic: + filter.handler(msg) + break + +proc unsubscribeFilters(subscribers: var seq[Subscriber], request: FilterRequest, peerId: PeerID) = + # Flatten all unsubscribe topics into single seq + let unsubscribeTopics = request.contentFilters.mapIt(it.contentTopic) + debug "unsubscribing", peerId=peerId, unsubscribeTopics=unsubscribeTopics + + for subscriber in subscribers.mitems: + if subscriber.peer.peerId != peerId: continue + + # make sure we delete the content filter + # if no more topics are left + subscriber.filter.contentFilters.keepIf(proc (cf: auto): bool = cf.contentTopic notin unsubscribeTopics) + + # make sure we delete the subscriber + # if no more content filters left + subscribers.keepIf(proc (s: auto): bool = s.filter.contentFilters.len > 0) + + debug "subscribers modified", subscribers=subscribers + # @TODO: metrics? + +proc encode*(filter: ContentFilter): ProtoBuffer = + result = initProtoBuffer() + + result.write(1, filter.contentTopic) + +proc encode*(rpc: FilterRequest): ProtoBuffer = + result = initProtoBuffer() + + result.write(1, uint64(rpc.subscribe)) + + result.write(2, rpc.pubSubTopic) + + for filter in rpc.contentFilters: + result.write(3, filter.encode()) + +proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] = + let pb = initProtoBuffer(buffer) + + var contentTopic: ContentTopic + discard ? pb.getField(1, contentTopic) + + ok(ContentFilter(contentTopic: contentTopic)) + +proc init*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] = + var rpc = FilterRequest(contentFilters: @[], pubSubTopic: "") + let pb = initProtoBuffer(buffer) + + var subflag: uint64 + if ? pb.getField(1, subflag): + rpc.subscribe = bool(subflag) + + discard ? pb.getField(2, rpc.pubSubTopic) + + var buffs: seq[seq[byte]] + discard ? pb.getRepeatedField(3, buffs) + + for buf in buffs: + rpc.contentFilters.add(? ContentFilter.init(buf)) + + ok(rpc) + +proc encode*(push: MessagePush): ProtoBuffer = + result = initProtoBuffer() + + for push in push.messages: + result.write(1, push.encode()) + +proc init*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] = + var push = MessagePush() + let pb = initProtoBuffer(buffer) + + var messages: seq[seq[byte]] + discard ? pb.getRepeatedField(1, messages) + + for buf in messages: + push.messages.add(? WakuMessage.init(buf)) + + ok(push) + +proc init*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] = + var rpc = FilterRPC() + let pb = initProtoBuffer(buffer) + + discard ? pb.getField(1, rpc.requestId) + + var requestBuffer: seq[byte] + discard ? pb.getField(2, requestBuffer) + + rpc.request = ? FilterRequest.init(requestBuffer) + + var pushBuffer: seq[byte] + discard ? pb.getField(3, pushBuffer) + + rpc.push = ? MessagePush.init(pushBuffer) + + ok(rpc) + +proc encode*(rpc: FilterRPC): ProtoBuffer = + result = initProtoBuffer() + + result.write(1, rpc.requestId) + result.write(2, rpc.request.encode()) + result.write(3, rpc.push.encode()) + +method init*(wf: WakuFilter) = + proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = + var message = await conn.readLp(64*1024) + var res = FilterRPC.init(message) + if res.isErr: + error "failed to decode rpc" + waku_filter_errors.inc(labelValues = [decodeRpcFailure]) + return + + info "filter message received" + + let value = res.value + if value.push != MessagePush(): + wf.pushHandler(value.requestId, value.push) + if value.request != FilterRequest(): + if value.request.subscribe: + wf.subscribers.add(Subscriber(peer: conn.peerInfo, requestId: value.requestId, filter: value.request)) + else: + wf.subscribers.unsubscribeFilters(value.request, conn.peerInfo.peerId) + + waku_filter_subscribers.set(wf.subscribers.len.int64) + + wf.handler = handle + wf.codec = WakuFilterCodec + +proc init*(T: type WakuFilter, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: MessagePushHandler): T = + new result + result.rng = crypto.newRng() + result.peerManager = peerManager + result.pushHandler = handler + result.init() + +proc setPeer*(wf: WakuFilter, peer: PeerInfo) = + wf.peerManager.addPeer(peer, WakuFilterCodec) + waku_filter_peers.inc() + +proc subscription*(proto: WakuFilter): MessageNotificationSubscription = + ## Returns a Filter for the specific protocol + ## This filter can then be used to send messages to subscribers that match conditions. + proc handle(topic: string, msg: WakuMessage) {.async.} = + trace "handle WakuFilter subscription", topic=topic, msg=msg + + for subscriber in proto.subscribers: + if subscriber.filter.pubSubTopic != "" and subscriber.filter.pubSubTopic != topic: + trace "Subscriber's filter pubsubTopic does not match message topic", filter=subscriber.filter.pubSubTopic, topic=topic + continue + + for filter in subscriber.filter.contentFilters: + if msg.contentTopic == filter.contentTopic: + trace "Found matching contentTopic", filter=filter, msg=msg + let push = FilterRPC(requestId: subscriber.requestId, push: MessagePush(messages: @[msg])) + + let connOpt = await proto.peerManager.dialPeer(subscriber.peer, WakuFilterCodec) + + if connOpt.isSome: + await connOpt.get().writeLP(push.encode().buffer) + else: + # @TODO more sophisticated error handling here + error "failed to push messages to remote peer" + waku_filter_errors.inc(labelValues = [dialFailure]) + break + + MessageNotificationSubscription.init(@[], handle) + +proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[Option[string]] {.async, gcsafe.} = + let peerOpt = wf.peerManager.selectPeer(WakuFilterCodec) + + if peerOpt.isSome: + let peer = peerOpt.get() + + let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec) + + if connOpt.isSome: + # This is the only successful path to subscription + let id = generateRequestId(wf.rng) + await connOpt.get().writeLP(FilterRPC(requestId: id, request: request).encode().buffer) + return some(id) + else: + # @TODO more sophisticated error handling here + error "failed to connect to remote peer" + waku_filter_errors.inc(labelValues = [dialFailure]) + return none(string) + +proc unsubscribe*(wf: WakuFilter, request: FilterRequest) {.async, gcsafe.} = + # @TODO: NO REAL REASON TO GENERATE REQUEST ID FOR UNSUBSCRIBE OTHER THAN CREATING SANE-LOOKING RPC. + let + id = generateRequestId(wf.rng) + peerOpt = wf.peerManager.selectPeer(WakuFilterCodec) + + if peerOpt.isSome: + # @TODO: if there are more than one WakuFilter peer, WakuFilter should unsubscribe from all peers + let peer = peerOpt.get() + + let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec) + + if connOpt.isSome: + await connOpt.get().writeLP(FilterRPC(requestId: id, request: request).encode().buffer) + else: + # @TODO more sophisticated error handling here + error "failed to connect to remote peer" + waku_filter_errors.inc(labelValues = [dialFailure]) diff --git a/waku/v2/protocol/waku_filter/waku_filter_types.nim b/waku/v2/protocol/waku_filter/waku_filter_types.nim index 92c957581..f594c8adc 100644 --- a/waku/v2/protocol/waku_filter/waku_filter_types.nim +++ b/waku/v2/protocol/waku_filter/waku_filter_types.nim @@ -1,48 +1,48 @@ -import - std/[tables], - bearssl, - libp2p/peerinfo, - libp2p/protocols/protocol, - ../../node/peer_manager/peer_manager, - ../waku_message - -export waku_message - -type - ContentFilter* = object - contentTopics*: seq[ContentTopic] - - ContentFilterHandler* = proc(msg: WakuMessage) {.gcsafe, closure.} - - Filter* = object - contentFilters*: seq[ContentFilter] - handler*: ContentFilterHandler - - # @TODO MAYBE MORE INFO? - Filters* = Table[string, Filter] - - FilterRequest* = object - contentFilters*: seq[ContentFilter] - pubSubTopic*: string - subscribe*: bool - - MessagePush* = object - messages*: seq[WakuMessage] - - FilterRPC* = object - requestId*: string - request*: FilterRequest - push*: MessagePush - - Subscriber* = object - peer*: PeerInfo - requestId*: string - filter*: FilterRequest # @TODO MAKE THIS A SEQUENCE AGAIN? - - MessagePushHandler* = proc(requestId: string, msg: MessagePush) {.gcsafe, closure.} - - WakuFilter* = ref object of LPProtocol - rng*: ref BrHmacDrbgContext - peerManager*: PeerManager - subscribers*: seq[Subscriber] - pushHandler*: MessagePushHandler +import + std/[tables], + bearssl, + libp2p/peerinfo, + libp2p/protocols/protocol, + ../../node/peer_manager/peer_manager, + ../waku_message + +export waku_message + +type + ContentFilter* = object + contentTopic*: ContentTopic + + ContentFilterHandler* = proc(msg: WakuMessage) {.gcsafe, closure.} + + Filter* = object + contentFilters*: seq[ContentFilter] + handler*: ContentFilterHandler + + # @TODO MAYBE MORE INFO? + Filters* = Table[string, Filter] + + FilterRequest* = object + contentFilters*: seq[ContentFilter] + pubSubTopic*: string + subscribe*: bool + + MessagePush* = object + messages*: seq[WakuMessage] + + FilterRPC* = object + requestId*: string + request*: FilterRequest + push*: MessagePush + + Subscriber* = object + peer*: PeerInfo + requestId*: string + filter*: FilterRequest # @TODO MAKE THIS A SEQUENCE AGAIN? + + MessagePushHandler* = proc(requestId: string, msg: MessagePush) {.gcsafe, closure.} + + WakuFilter* = ref object of LPProtocol + rng*: ref BrHmacDrbgContext + peerManager*: PeerManager + subscribers*: seq[Subscriber] + pushHandler*: MessagePushHandler