mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-28 06:50:56 +00:00
feature/add-filter (#244)
* started working on adding filter * fix * fix * fix * not subscribing * fix * filter Co-authored-by: Oskar Thorén <ot@oskarthoren.com>
This commit is contained in:
parent
16c4e65762
commit
45d57efaea
@ -19,7 +19,7 @@ import libp2p/[switch, # manage transports, a single entry poi
|
|||||||
muxers/muxer, # define an interface for stream multiplexing, allowing peers to offer many protocols over a single connection
|
muxers/muxer, # define an interface for stream multiplexing, allowing peers to offer many protocols over a single connection
|
||||||
muxers/mplex/mplex] # define some contants and message types for stream multiplexing
|
muxers/mplex/mplex] # define some contants and message types for stream multiplexing
|
||||||
import ../../waku/node/v2/[config, wakunode2, waku_types],
|
import ../../waku/node/v2/[config, wakunode2, waku_types],
|
||||||
../../waku/protocol/v2/[waku_relay, waku_store],
|
../../waku/protocol/v2/[waku_relay, waku_store, waku_filter],
|
||||||
../../waku/node/common
|
../../waku/node/common
|
||||||
|
|
||||||
const Help = """
|
const Help = """
|
||||||
@ -158,7 +158,11 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
|
|||||||
|
|
||||||
# waitFor vs await
|
# waitFor vs await
|
||||||
await node.start()
|
await node.start()
|
||||||
await node.mountRelay(conf.topics.split(" "))
|
|
||||||
|
if conf.filternode != "":
|
||||||
|
await node.mountRelay(conf.topics.split(" "))
|
||||||
|
else:
|
||||||
|
await node.mountRelay(@[])
|
||||||
|
|
||||||
var chat = Chat(node: node, transp: transp, subscribed: true, connected: false, started: true)
|
var chat = Chat(node: node, transp: transp, subscribed: true, connected: false, started: true)
|
||||||
|
|
||||||
@ -182,6 +186,21 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
|
|||||||
|
|
||||||
await node.query(HistoryQuery(topics: @[DefaultContentTopic]), storeHandler)
|
await node.query(HistoryQuery(topics: @[DefaultContentTopic]), storeHandler)
|
||||||
|
|
||||||
|
if conf.filternode != "":
|
||||||
|
node.mountFilter()
|
||||||
|
|
||||||
|
node.wakuFilter.setPeer(parsePeer(conf.filternode))
|
||||||
|
|
||||||
|
proc filterHandler(msg: WakuMessage) {.gcsafe.} =
|
||||||
|
let payload = cast[string](msg.payload)
|
||||||
|
echo &"{payload}"
|
||||||
|
info "Hit filter handler"
|
||||||
|
|
||||||
|
await node.subscribe(
|
||||||
|
FilterRequest(contentFilters: @[ContentFilter(topics: @[DefaultContentTopic])], topic: DefaultTopic),
|
||||||
|
filterHandler
|
||||||
|
)
|
||||||
|
|
||||||
# Subscribe to a topic
|
# Subscribe to a topic
|
||||||
# TODO To get end to end sender would require more information in payload
|
# TODO To get end to end sender would require more information in payload
|
||||||
# We could possibly indicate the relayer point with connection somehow probably (?)
|
# We could possibly indicate the relayer point with connection somehow probably (?)
|
||||||
|
@ -35,7 +35,6 @@ type
|
|||||||
|
|
||||||
QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.}
|
QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.}
|
||||||
|
|
||||||
|
|
||||||
Index* = object
|
Index* = object
|
||||||
## This type contains the description of an index used in the pagination of waku messages
|
## This type contains the description of an index used in the pagination of waku messages
|
||||||
digest*: MDigest[256]
|
digest*: MDigest[256]
|
||||||
|
Loading…
x
Reference in New Issue
Block a user