Partial filter (#120)

* start work on filter protocol

* init functions for protobufs

* comment

* added init

* playing around with subscribers

* Update waku_filter.nim

* added test for protobuf

* fixes

* fix

* Update test_waku_filter.nim

* Update waku_filter.nim

* Add note on status

Co-authored-by: decanus <7621705+decanus@users.noreply.github.com>
This commit is contained in:
Oskar Thorén 2020-09-01 11:03:17 +08:00 committed by GitHub
parent ea0d62993d
commit 67a964be28
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 134 additions and 1 deletions

View File

@ -2,4 +2,5 @@ import
# Waku v2 tests
./v2/test_waku,
./v2/test_wakunode,
./v2/test_waku_store
./v2/test_waku_store,
./v2/test_waku_filter

View File

@ -0,0 +1,30 @@
import unittest, options, tables, sets, sequtils
import chronos, chronicles
import utils,
libp2p/errors,
libp2p/switch,
libp2p/protobuf/minprotobuf,
libp2p/stream/[bufferstream, connection],
libp2p/crypto/crypto,
libp2p/protocols/pubsub/floodsub,
libp2p/protocols/pubsub/rpc/message,
libp2p/multistream,
libp2p/transports/transport,
libp2p/transports/tcptransport
import ../../waku/protocol/v2/[waku_protocol2, waku_filter, filter]
import ../test_helpers
procSuite "Waku Filter":
test "encoding and decoding FilterRPC":
let rpc = FilterRPC(filters: @[ContentFilter(topics: @["foo", "bar"])])
let buf = rpc.encode()
let decode = FilterRPC.init(buf.buffer)
check:
decode.isErr == false
decode.value == rpc

View File

@ -0,0 +1,102 @@
import chronos, chronicles
import ./filter
import tables
import libp2p/protocols/pubsub/pubsub,
libp2p/protocols/pubsub/pubsubpeer,
libp2p/protocols/pubsub/floodsub,
libp2p/protocols/pubsub/gossipsub,
libp2p/protocols/pubsub/rpc/[messages, protobuf],
libp2p/protocols/protocol,
libp2p/protobuf/minprotobuf,
libp2p/stream/connection
import metrics
import stew/results
# NOTE This is just a start, the design of this protocol isn't done yet. It
# should be direct payload exchange (a la req-resp), not be coupled with the
# relay protocol.
const
WakuFilterCodec* = "/vac/waku/filter/2.0.0-alpha2"
type
ContentFilter* = object
topics*: seq[string]
FilterRPC* = object
filters*: seq[ContentFilter]
Subscriber = object
connection: Connection
filter: FilterRPC
WakuFilter* = ref object of LPProtocol
subscribers*: seq[Subscriber]
method encode*(filter: ContentFilter): ProtoBuffer =
result = initProtoBuffer()
for topic in filter.topics:
result.write(1, topic)
method encode*(rpc: FilterRPC): ProtoBuffer =
result = initProtoBuffer()
for filter in rpc.filters:
result.write(1, filter.encode())
proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer)
var topics: seq[string]
discard ? pb.getRepeatedField(1, topics)
ok(ContentFilter(topics: topics))
proc init*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] =
var rpc = FilterRPC(filters: @[])
let pb = initProtoBuffer(buffer)
var buffs: seq[seq[byte]]
discard ? pb.getRepeatedField(1, buffs)
for buf in buffs:
rpc.filters.add(? ContentFilter.init(buf))
ok(rpc)
method init*(T: type WakuFilter): T =
var ws = WakuFilter(subscribers: newSeq[Subscriber](0))
# From my understanding we need to set up filters,
# then on every message received we need the handle function to send it to the connection
# if the peer subscribed.
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var message = await conn.readLp(64*1024)
var res = FilterRPC.init(message)
if res.isErr:
return
ws.subscribers.add(Subscriber(connection: conn, filter: res.value))
# @TODO THIS IS A VERY ROUGH EXPERIMENT
ws.handler = handle
ws.codec = WakuFilterCodec
result = ws
proc filter*(proto: WakuFilter): Filter =
## Returns a Filter for the specific protocol
## This filter can then be used to send messages to subscribers that match conditions.
proc handle(msg: Message) =
for subscriber in proto.subscribers:
for f in subscriber.filter.filters:
for topic in f.topics:
if topic in msg.topicIDs:
discard subscriber.connection.writeLp(msg.encodeMessage())
break
Filter.init(@[], handle)