diff --git a/waku/protocol/v2/filter.nim b/waku/protocol/v2/filter.nim index c174c4a9b..d39aa961f 100644 --- a/waku/protocol/v2/filter.nim +++ b/waku/protocol/v2/filter.nim @@ -1,9 +1,11 @@ +import libp2p/protocols/pubsub/rpc/messages + import tables type - FilterMessageHandler* = proc(msg: seq[byte]) {.gcsafe, closure.} + FilterMessageHandler* = proc(msg: Message) {.gcsafe, closure.} Filter* = object topics: seq[string] # @TODO TOPIC @@ -17,9 +19,17 @@ proc init*(T: type Filter, topics: seq[string], handler: FilterMessageHandler): handler: handler ) -proc notify*(filters: var Filters, topic: string, msg: seq[byte]) {.gcsafe.} = +proc containsMatch(lhs: seq[string], rhs: seq[string]): bool = + for leftItem in lhs: + if leftItem in rhs: + return true + + return false + +proc notify*(filters: var Filters, msg: Message) {.gcsafe.} = for filter in filters.mvalues: - if filter.topics.len > 0 and topic notin filter.topics: + # @TODO WILL NEED TO CHECK SUBTOPICS IN FUTURE FOR WAKU TOPICS NOT LIBP2P ONES + if filter.topics.len > 0 and not filter.topics.containsMatch(msg.topicIDs): continue filter.handler(msg) diff --git a/waku/protocol/v2/waku_protocol2.nim b/waku/protocol/v2/waku_protocol2.nim index a32291c3c..f55ff43c0 100644 --- a/waku/protocol/v2/waku_protocol2.nim +++ b/waku/protocol/v2/waku_protocol2.nim @@ -89,7 +89,6 @@ method subscribeTopic*(w: WakuSub, # Currently we are using the libp2p topic here. # This will need to be change to a Waku topic. - w.filters.notify(topic, data) debug "subscribeTopic", topic=topic, subscribe=subscribe, peerId=peerId @@ -125,6 +124,10 @@ method rpcHandler*(w: WakuSub, else: await procCall FloodSub(w).rpcHandler(peer, rpcMsgs) # XXX: here + + for rpcs in rpcMsgs: + for msg in rpcs.messages: + w.filters.notify(msg) method publish*(w: WakuSub, topic: string,