From 67a964be28523bf44c3aaed51cb0e9bdda097e63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Oskar=20Thor=C3=A9n?= Date: Tue, 1 Sep 2020 11:03:17 +0800 Subject: [PATCH] Partial filter (#120) * start work on filter protocol * init functions for protobufs * comment * added init * playing around with subscribers * Update waku_filter.nim * added test for protobuf * fixes * fix * Update test_waku_filter.nim * Update waku_filter.nim * Add note on status Co-authored-by: decanus <7621705+decanus@users.noreply.github.com> --- tests/all_tests_v2.nim | 3 +- tests/v2/test_waku_filter.nim | 30 +++++++++ waku/protocol/v2/waku_filter.nim | 102 +++++++++++++++++++++++++++++++ 3 files changed, 134 insertions(+), 1 deletion(-) create mode 100644 tests/v2/test_waku_filter.nim create mode 100644 waku/protocol/v2/waku_filter.nim diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index 4e8688534..0044effa9 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -2,4 +2,5 @@ import # Waku v2 tests ./v2/test_waku, ./v2/test_wakunode, - ./v2/test_waku_store + ./v2/test_waku_store, + ./v2/test_waku_filter diff --git a/tests/v2/test_waku_filter.nim b/tests/v2/test_waku_filter.nim new file mode 100644 index 000000000..55ccd2255 --- /dev/null +++ b/tests/v2/test_waku_filter.nim @@ -0,0 +1,30 @@ + +import unittest, options, tables, sets, sequtils +import chronos, chronicles +import utils, + libp2p/errors, + libp2p/switch, + libp2p/protobuf/minprotobuf, + libp2p/stream/[bufferstream, connection], + libp2p/crypto/crypto, + libp2p/protocols/pubsub/floodsub, + libp2p/protocols/pubsub/rpc/message, + libp2p/multistream, + libp2p/transports/transport, + libp2p/transports/tcptransport +import ../../waku/protocol/v2/[waku_protocol2, waku_filter, filter] + +import ../test_helpers + +procSuite "Waku Filter": + + test "encoding and decoding FilterRPC": + let rpc = FilterRPC(filters: @[ContentFilter(topics: @["foo", "bar"])]) + + let buf = rpc.encode() + + let decode = FilterRPC.init(buf.buffer) + + check: + decode.isErr == false + decode.value == rpc diff --git a/waku/protocol/v2/waku_filter.nim b/waku/protocol/v2/waku_filter.nim new file mode 100644 index 000000000..2b26a3e09 --- /dev/null +++ b/waku/protocol/v2/waku_filter.nim @@ -0,0 +1,102 @@ +import chronos, chronicles +import ./filter +import tables +import libp2p/protocols/pubsub/pubsub, + libp2p/protocols/pubsub/pubsubpeer, + libp2p/protocols/pubsub/floodsub, + libp2p/protocols/pubsub/gossipsub, + libp2p/protocols/pubsub/rpc/[messages, protobuf], + libp2p/protocols/protocol, + libp2p/protobuf/minprotobuf, + libp2p/stream/connection + +import metrics + +import stew/results + + +# NOTE This is just a start, the design of this protocol isn't done yet. It +# should be direct payload exchange (a la req-resp), not be coupled with the +# relay protocol. + +const + WakuFilterCodec* = "/vac/waku/filter/2.0.0-alpha2" + +type + ContentFilter* = object + topics*: seq[string] + + FilterRPC* = object + filters*: seq[ContentFilter] + + Subscriber = object + connection: Connection + filter: FilterRPC + + WakuFilter* = ref object of LPProtocol + subscribers*: seq[Subscriber] + +method encode*(filter: ContentFilter): ProtoBuffer = + result = initProtoBuffer() + + for topic in filter.topics: + result.write(1, topic) + +method encode*(rpc: FilterRPC): ProtoBuffer = + result = initProtoBuffer() + + for filter in rpc.filters: + result.write(1, filter.encode()) + +proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] = + let pb = initProtoBuffer(buffer) + + var topics: seq[string] + discard ? pb.getRepeatedField(1, topics) + + ok(ContentFilter(topics: topics)) + +proc init*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] = + var rpc = FilterRPC(filters: @[]) + let pb = initProtoBuffer(buffer) + + var buffs: seq[seq[byte]] + discard ? pb.getRepeatedField(1, buffs) + + for buf in buffs: + rpc.filters.add(? ContentFilter.init(buf)) + + ok(rpc) + +method init*(T: type WakuFilter): T = + var ws = WakuFilter(subscribers: newSeq[Subscriber](0)) + + # From my understanding we need to set up filters, + # then on every message received we need the handle function to send it to the connection + # if the peer subscribed. + + proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = + var message = await conn.readLp(64*1024) + var res = FilterRPC.init(message) + if res.isErr: + return + + ws.subscribers.add(Subscriber(connection: conn, filter: res.value)) + # @TODO THIS IS A VERY ROUGH EXPERIMENT + + ws.handler = handle + ws.codec = WakuFilterCodec + result = ws + +proc filter*(proto: WakuFilter): Filter = + ## Returns a Filter for the specific protocol + ## This filter can then be used to send messages to subscribers that match conditions. + proc handle(msg: Message) = + for subscriber in proto.subscribers: + for f in subscriber.filter.filters: + for topic in f.topics: + if topic in msg.topicIDs: + discard subscriber.connection.writeLp(msg.encodeMessage()) + break + + Filter.init(@[], handle)