mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-16 00:47:24 +00:00
deploy: f1a60271587072ca5db5b9aa6173ec56bbfc491d
This commit is contained in:
parent
5dba9ca6d8
commit
bcbac0f906
@ -318,7 +318,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
|
||||
info "Hit filter handler"
|
||||
|
||||
await node.subscribe(
|
||||
FilterRequest(contentFilters: @[ContentFilter(topics: @[DefaultContentTopic])], topic: DefaultTopic, subscribe: true),
|
||||
FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[DefaultContentTopic])], pubSubTopic: DefaultTopic, subscribe: true),
|
||||
filterHandler
|
||||
)
|
||||
|
||||
|
@ -291,8 +291,8 @@ procSuite "Waku v2 JSON-RPC API":
|
||||
# Light node has not yet subscribed to any filters
|
||||
node.filters.len() == 0
|
||||
|
||||
let contentFilters = @[ContentFilter(topics: @[defaultContentTopic, ContentTopic("2")]),
|
||||
ContentFilter(topics: @[ContentTopic("3"), ContentTopic("4")])]
|
||||
let contentFilters = @[ContentFilter(contentTopics: @[defaultContentTopic, ContentTopic("2")]),
|
||||
ContentFilter(contentTopics: @[ContentTopic("3"), ContentTopic("4")])]
|
||||
var response = await client.post_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(defaultTopic))
|
||||
|
||||
check:
|
||||
@ -330,7 +330,7 @@ procSuite "Waku v2 JSON-RPC API":
|
||||
|
||||
# First ensure subscription exists
|
||||
|
||||
let sub = await client.post_waku_v2_filter_v1_subscription(contentFilters = @[ContentFilter(topics: @[defaultContentTopic])], topic = some(defaultTopic))
|
||||
let sub = await client.post_waku_v2_filter_v1_subscription(contentFilters = @[ContentFilter(contentTopics: @[defaultContentTopic])], topic = some(defaultTopic))
|
||||
check:
|
||||
sub
|
||||
|
||||
|
@ -39,7 +39,7 @@ procSuite "Waku Filter":
|
||||
|
||||
let
|
||||
proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
|
||||
rpc = FilterRequest(contentFilters: @[ContentFilter(topics: @[contentTopic])], topic: defaultTopic, subscribe: true)
|
||||
rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], pubSubTopic: defaultTopic, subscribe: true)
|
||||
|
||||
dialSwitch.mount(proto)
|
||||
proto.setPeer(listenSwitch.peerInfo)
|
||||
@ -88,7 +88,7 @@ procSuite "Waku Filter":
|
||||
|
||||
let
|
||||
proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
|
||||
rpc = FilterRequest(contentFilters: @[ContentFilter(topics: @[contentTopic])], topic: defaultTopic, subscribe: true)
|
||||
rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], pubSubTopic: defaultTopic, subscribe: true)
|
||||
|
||||
dialSwitch.mount(proto)
|
||||
proto.setPeer(listenSwitch.peerInfo)
|
||||
@ -118,7 +118,7 @@ procSuite "Waku Filter":
|
||||
responseCompletionFuture = newFuture[bool]()
|
||||
|
||||
let
|
||||
rpcU = FilterRequest(contentFilters: @[ContentFilter(topics: @[contentTopic])], topic: defaultTopic, subscribe: false)
|
||||
rpcU = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], pubSubTopic: defaultTopic, subscribe: false)
|
||||
|
||||
await proto.unsubscribe(rpcU)
|
||||
|
||||
@ -145,7 +145,7 @@ procSuite "Waku Filter":
|
||||
|
||||
let
|
||||
proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
|
||||
rpc = FilterRequest(contentFilters: @[ContentFilter(topics: @[contentTopic])], topic: defaultTopic, subscribe: true)
|
||||
rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], pubSubTopic: defaultTopic, subscribe: true)
|
||||
|
||||
dialSwitch.mount(proto)
|
||||
|
||||
|
@ -28,7 +28,7 @@ procSuite "WakuNode":
|
||||
Port(60000))
|
||||
pubSubTopic = "chat"
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])], subscribe: true)
|
||||
filterRequest = FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], subscribe: true)
|
||||
message = WakuMessage(payload: "hello world".toBytes(),
|
||||
contentTopic: contentTopic)
|
||||
|
||||
@ -80,7 +80,7 @@ procSuite "WakuNode":
|
||||
Port(60002))
|
||||
pubSubTopic = "chat"
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])], subscribe: true)
|
||||
filterRequest = FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], subscribe: true)
|
||||
message = WakuMessage(payload: "hello world".toBytes(),
|
||||
contentTopic: contentTopic)
|
||||
|
||||
@ -147,8 +147,8 @@ procSuite "WakuNode":
|
||||
otherPayload = @[byte 9]
|
||||
defaultMessage = WakuMessage(payload: defaultPayload, contentTopic: defaultContentTopic)
|
||||
otherMessage = WakuMessage(payload: otherPayload, contentTopic: otherContentTopic)
|
||||
defaultFR = FilterRequest(contentFilters: @[ContentFilter(topics: @[defaultContentTopic])], subscribe: true)
|
||||
otherFR = FilterRequest(contentFilters: @[ContentFilter(topics: @[otherContentTopic])], subscribe: true)
|
||||
defaultFR = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[defaultContentTopic])], subscribe: true)
|
||||
otherFR = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[otherContentTopic])], subscribe: true)
|
||||
|
||||
await node1.start()
|
||||
node1.mountRelay()
|
||||
@ -272,7 +272,7 @@ procSuite "WakuNode":
|
||||
msg == message
|
||||
completionFut.complete(true)
|
||||
|
||||
await node1.subscribe(FilterRequest(topic: "/waku/2/default-waku/proto", contentFilters: @[ContentFilter(topics: @[contentTopic])], subscribe: true), handler)
|
||||
await node1.subscribe(FilterRequest(pubSubTopic: "/waku/2/default-waku/proto", contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], subscribe: true), handler)
|
||||
|
||||
await sleepAsync(2000.millis)
|
||||
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
# libtool - Provide generalized library-building support services.
|
||||
# Generated automatically by config.status (libbacktrace) version-unused
|
||||
# Libtool was configured on host fv-az272-270:
|
||||
# Libtool was configured on host fv-az173-665:
|
||||
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
|
||||
#
|
||||
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,
|
||||
|
@ -58,12 +58,12 @@ proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer, messageCache:
|
||||
|
||||
# Construct a filter request
|
||||
# @TODO use default PubSub topic if undefined
|
||||
let fReq = if topic.isSome: FilterRequest(topic: topic.get, contentFilters: contentFilters, subscribe: true) else: FilterRequest(contentFilters: contentFilters, subscribe: true)
|
||||
let fReq = if topic.isSome: FilterRequest(pubSubTopic: topic.get, contentFilters: contentFilters, subscribe: true) else: FilterRequest(contentFilters: contentFilters, subscribe: true)
|
||||
|
||||
if (await node.subscribe(fReq, filterHandler).withTimeout(futTimeout)):
|
||||
# Successfully subscribed to all content filters
|
||||
|
||||
for cTopic in concat(contentFilters.mapIt(it.topics)):
|
||||
for cTopic in concat(contentFilters.mapIt(it.contentTopics)):
|
||||
# Create message cache for each subscribed content topic
|
||||
messageCache[cTopic] = @[]
|
||||
|
||||
@ -78,12 +78,12 @@ proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer, messageCache:
|
||||
|
||||
# Construct a filter request
|
||||
# @TODO consider using default PubSub topic if undefined
|
||||
let fReq = if topic.isSome: FilterRequest(topic: topic.get, contentFilters: contentFilters, subscribe: false) else: FilterRequest(contentFilters: contentFilters, subscribe: false)
|
||||
let fReq = if topic.isSome: FilterRequest(pubSubTopic: topic.get, contentFilters: contentFilters, subscribe: false) else: FilterRequest(contentFilters: contentFilters, subscribe: false)
|
||||
|
||||
if (await node.unsubscribe(fReq).withTimeout(futTimeout)):
|
||||
# Successfully unsubscribed from all content filters
|
||||
|
||||
for cTopic in concat(contentFilters.mapIt(it.topics)):
|
||||
for cTopic in concat(contentFilters.mapIt(it.contentTopics)):
|
||||
# Remove message cache for each unsubscribed content topic
|
||||
messageCache.del(cTopic)
|
||||
|
||||
|
@ -81,7 +81,7 @@ proc removeContentFilters(filters: var Filters, contentFilters: seq[ContentFilte
|
||||
# Flatten all unsubscribe topics into single seq
|
||||
var unsubscribeTopics: seq[ContentTopic]
|
||||
for cf in contentFilters:
|
||||
unsubscribeTopics = unsubscribeTopics.concat(cf.topics)
|
||||
unsubscribeTopics = unsubscribeTopics.concat(cf.contentTopics)
|
||||
|
||||
debug "unsubscribing", unsubscribeTopics=unsubscribeTopics
|
||||
|
||||
@ -90,10 +90,10 @@ proc removeContentFilters(filters: var Filters, contentFilters: seq[ContentFilte
|
||||
# Iterate filter entries to remove matching content topics
|
||||
for cf in f.contentFilters.mitems:
|
||||
# Iterate content filters in filter entry
|
||||
cf.topics.keepIf(proc (t: auto): bool = t notin unsubscribeTopics)
|
||||
cf.contentTopics.keepIf(proc (t: auto): bool = t notin unsubscribeTopics)
|
||||
# make sure we delete the content filter
|
||||
# if no more topics are left
|
||||
f.contentFilters.keepIf(proc (cf: auto): bool = cf.topics.len > 0)
|
||||
f.contentFilters.keepIf(proc (cf: auto): bool = cf.contentTopics.len > 0)
|
||||
|
||||
if f.contentFilters.len == 0:
|
||||
rIdToRemove.add(rId)
|
||||
@ -186,7 +186,6 @@ proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) =
|
||||
|
||||
let msg = WakuMessage.init(data)
|
||||
if msg.isOk():
|
||||
node.filters.notify(msg.value(), "") # Trigger filter handlers on a light node
|
||||
await node.subscriptions.notify(topic, msg.value()) # Trigger subscription handlers on a store/filter node
|
||||
waku_node_messages.inc(labelValues = ["relay"])
|
||||
|
||||
@ -329,7 +328,7 @@ proc mountFilter*(node: WakuNode) =
|
||||
proc filterHandler(requestId: string, msg: MessagePush) {.gcsafe.} =
|
||||
info "push received"
|
||||
for message in msg.messages:
|
||||
node.filters.notify(message, requestId)
|
||||
node.filters.notify(message, requestId) # Trigger filter handlers on a light node
|
||||
waku_node_messages.inc(labelValues = ["filter"])
|
||||
|
||||
node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler)
|
||||
|
@ -48,8 +48,8 @@ proc notify*(filters: Filters, msg: WakuMessage, requestId: string = "") =
|
||||
# TODO: In case of no topics we should either trigger here for all messages,
|
||||
# or we should not allow such filter to exist in the first place.
|
||||
for contentFilter in filter.contentFilters:
|
||||
if contentFilter.topics.len > 0:
|
||||
if msg.contentTopic in contentFilter.topics:
|
||||
if contentFilter.contentTopics.len > 0:
|
||||
if msg.contentTopic in contentFilter.contentTopics:
|
||||
filter.handler(msg)
|
||||
break
|
||||
|
||||
@ -57,7 +57,7 @@ proc unsubscribeFilters(subscribers: var seq[Subscriber], request: FilterRequest
|
||||
# Flatten all unsubscribe topics into single seq
|
||||
var unsubscribeTopics: seq[ContentTopic]
|
||||
for cf in request.contentFilters:
|
||||
unsubscribeTopics = unsubscribeTopics.concat(cf.topics)
|
||||
unsubscribeTopics = unsubscribeTopics.concat(cf.contentTopics)
|
||||
|
||||
debug "unsubscribing", peerId=peerId, unsubscribeTopics=unsubscribeTopics
|
||||
|
||||
@ -67,11 +67,11 @@ proc unsubscribeFilters(subscribers: var seq[Subscriber], request: FilterRequest
|
||||
# Iterate through subscriber entries matching peer ID to remove matching content topics
|
||||
for cf in subscriber.filter.contentFilters.mitems:
|
||||
# Iterate content filters in filter entry
|
||||
cf.topics.keepIf(proc (t: auto): bool = t notin unsubscribeTopics)
|
||||
cf.contentTopics.keepIf(proc (t: auto): bool = t notin unsubscribeTopics)
|
||||
|
||||
# make sure we delete the content filter
|
||||
# if no more topics are left
|
||||
subscriber.filter.contentFilters.keepIf(proc (cf: auto): bool = cf.topics.len > 0)
|
||||
subscriber.filter.contentFilters.keepIf(proc (cf: auto): bool = cf.contentTopics.len > 0)
|
||||
|
||||
# make sure we delete the subscriber
|
||||
# if no more content filters left
|
||||
@ -83,15 +83,15 @@ proc unsubscribeFilters(subscribers: var seq[Subscriber], request: FilterRequest
|
||||
proc encode*(filter: ContentFilter): ProtoBuffer =
|
||||
result = initProtoBuffer()
|
||||
|
||||
for topic in filter.topics:
|
||||
result.write(1, topic)
|
||||
for contentTopic in filter.contentTopics:
|
||||
result.write(1, contentTopic)
|
||||
|
||||
proc encode*(rpc: FilterRequest): ProtoBuffer =
|
||||
result = initProtoBuffer()
|
||||
|
||||
result.write(1, uint64(rpc.subscribe))
|
||||
|
||||
result.write(2, rpc.topic)
|
||||
result.write(2, rpc.pubSubTopic)
|
||||
|
||||
for filter in rpc.contentFilters:
|
||||
result.write(3, filter.encode())
|
||||
@ -99,20 +99,20 @@ proc encode*(rpc: FilterRequest): ProtoBuffer =
|
||||
proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] =
|
||||
let pb = initProtoBuffer(buffer)
|
||||
|
||||
var topics: seq[ContentTopic]
|
||||
discard ? pb.getRepeatedField(1, topics)
|
||||
var contentTopics: seq[ContentTopic]
|
||||
discard ? pb.getRepeatedField(1, contentTopics)
|
||||
|
||||
ok(ContentFilter(topics: topics))
|
||||
ok(ContentFilter(contentTopics: contentTopics))
|
||||
|
||||
proc init*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] =
|
||||
var rpc = FilterRequest(contentFilters: @[], topic: "")
|
||||
var rpc = FilterRequest(contentFilters: @[], pubSubTopic: "")
|
||||
let pb = initProtoBuffer(buffer)
|
||||
|
||||
var subflag: uint64
|
||||
if ? pb.getField(1, subflag):
|
||||
rpc.subscribe = bool(subflag)
|
||||
|
||||
discard ? pb.getField(2, rpc.topic)
|
||||
discard ? pb.getField(2, rpc.pubSubTopic)
|
||||
|
||||
var buffs: seq[seq[byte]]
|
||||
discard ? pb.getRepeatedField(3, buffs)
|
||||
@ -205,12 +205,16 @@ proc subscription*(proto: WakuFilter): MessageNotificationSubscription =
|
||||
## Returns a Filter for the specific protocol
|
||||
## This filter can then be used to send messages to subscribers that match conditions.
|
||||
proc handle(topic: string, msg: WakuMessage) {.async.} =
|
||||
trace "handle WakuFilter subscription", topic=topic, msg=msg
|
||||
|
||||
for subscriber in proto.subscribers:
|
||||
if subscriber.filter.topic != topic:
|
||||
if subscriber.filter.pubSubTopic != "" and subscriber.filter.pubSubTopic != topic:
|
||||
trace "Subscriber's filter pubsubTopic does not match message topic", filter=subscriber.filter.pubSubTopic, topic=topic
|
||||
continue
|
||||
|
||||
for filter in subscriber.filter.contentFilters:
|
||||
if msg.contentTopic in filter.topics:
|
||||
if msg.contentTopic in filter.contentTopics:
|
||||
trace "Found matching contentTopic", filter=filter, msg=msg
|
||||
let push = FilterRPC(requestId: subscriber.requestId, push: MessagePush(messages: @[msg]))
|
||||
|
||||
let connOpt = await proto.peerManager.dialPeer(subscriber.peer, WakuFilterCodec)
|
||||
|
@ -10,7 +10,7 @@ export waku_message
|
||||
|
||||
type
|
||||
ContentFilter* = object
|
||||
topics*: seq[ContentTopic]
|
||||
contentTopics*: seq[ContentTopic]
|
||||
|
||||
ContentFilterHandler* = proc(msg: WakuMessage) {.gcsafe, closure.}
|
||||
|
||||
@ -23,7 +23,7 @@ type
|
||||
|
||||
FilterRequest* = object
|
||||
contentFilters*: seq[ContentFilter]
|
||||
topic*: string
|
||||
pubSubTopic*: string
|
||||
subscribe*: bool
|
||||
|
||||
MessagePush* = object
|
||||
|
Loading…
x
Reference in New Issue
Block a user