diff --git a/waku/protocol/v2/filter.nim b/waku/protocol/v2/filter.nim new file mode 100644 index 000000000..c174c4a9b --- /dev/null +++ b/waku/protocol/v2/filter.nim @@ -0,0 +1,25 @@ +import + tables + +type + + FilterMessageHandler* = proc(msg: seq[byte]) {.gcsafe, closure.} + + Filter* = object + topics: seq[string] # @TODO TOPIC + handler: FilterMessageHandler + + Filters* = Table[string, Filter] + +proc init*(T: type Filter, topics: seq[string], handler: FilterMessageHandler): T = + result = T( + topics: topics, + handler: handler + ) + +proc notify*(filters: var Filters, topic: string, msg: seq[byte]) {.gcsafe.} = + for filter in filters.mvalues: + if filter.topics.len > 0 and topic notin filter.topics: + continue + + filter.handler(msg) diff --git a/waku/protocol/v2/waku_protocol2.nim b/waku/protocol/v2/waku_protocol2.nim index bd9096c41..115939148 100644 --- a/waku/protocol/v2/waku_protocol2.nim +++ b/waku/protocol/v2/waku_protocol2.nim @@ -5,6 +5,8 @@ import strutils import chronos, chronicles +import ./filter +import tables import libp2p/protocols/pubsub/pubsub, libp2p/protocols/pubsub/pubsubpeer, libp2p/protocols/pubsub/floodsub, @@ -28,6 +30,8 @@ type text*: string gossip_enabled*: bool + filters: Filters + method init(w: WakuSub) = debug "init" proc handler(conn: Connection, proto: string) {.async.} = @@ -44,6 +48,7 @@ method init(w: WakuSub) = # XXX: Handler hijack GossipSub here? w.handler = handler + w.filters = initTable[string, Filter]() w.codec = WakuSubCodec method initPubSub*(w: WakuSub) = @@ -82,6 +87,10 @@ method subscribeTopic*(w: WakuSub, proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = info "Hit NOOP handler", topic + # Currently we are using the libp2p topic here. + # This will need to be change to a Waku topic. + w.filters.notify(topic, data) + debug "subscribeTopic", topic=topic, subscribe=subscribe, peerId=peerId if w.gossip_enabled: