mirror of https://github.com/waku-org/nwaku.git
parent
3c11734bbf
commit
75d7d4780e
|
@ -1,9 +1,11 @@
|
|||
import libp2p/protocols/pubsub/rpc/messages
|
||||
|
||||
import
|
||||
tables
|
||||
|
||||
type
|
||||
|
||||
FilterMessageHandler* = proc(msg: seq[byte]) {.gcsafe, closure.}
|
||||
FilterMessageHandler* = proc(msg: Message) {.gcsafe, closure.}
|
||||
|
||||
Filter* = object
|
||||
topics: seq[string] # @TODO TOPIC
|
||||
|
@ -17,9 +19,17 @@ proc init*(T: type Filter, topics: seq[string], handler: FilterMessageHandler):
|
|||
handler: handler
|
||||
)
|
||||
|
||||
proc notify*(filters: var Filters, topic: string, msg: seq[byte]) {.gcsafe.} =
|
||||
proc containsMatch(lhs: seq[string], rhs: seq[string]): bool =
|
||||
for leftItem in lhs:
|
||||
if leftItem in rhs:
|
||||
return true
|
||||
|
||||
return false
|
||||
|
||||
proc notify*(filters: var Filters, msg: Message) {.gcsafe.} =
|
||||
for filter in filters.mvalues:
|
||||
if filter.topics.len > 0 and topic notin filter.topics:
|
||||
# @TODO WILL NEED TO CHECK SUBTOPICS IN FUTURE FOR WAKU TOPICS NOT LIBP2P ONES
|
||||
if filter.topics.len > 0 and not filter.topics.containsMatch(msg.topicIDs):
|
||||
continue
|
||||
|
||||
filter.handler(msg)
|
||||
|
|
|
@ -89,7 +89,6 @@ method subscribeTopic*(w: WakuSub,
|
|||
|
||||
# Currently we are using the libp2p topic here.
|
||||
# This will need to be change to a Waku topic.
|
||||
w.filters.notify(topic, data)
|
||||
|
||||
debug "subscribeTopic", topic=topic, subscribe=subscribe, peerId=peerId
|
||||
|
||||
|
@ -125,6 +124,10 @@ method rpcHandler*(w: WakuSub,
|
|||
else:
|
||||
await procCall FloodSub(w).rpcHandler(peer, rpcMsgs)
|
||||
# XXX: here
|
||||
|
||||
for rpcs in rpcMsgs:
|
||||
for msg in rpcs.messages:
|
||||
w.filters.notify(msg)
|
||||
|
||||
method publish*(w: WakuSub,
|
||||
topic: string,
|
||||
|
|
Loading…
Reference in New Issue