From 5b4e429a195e73488d437416e3d4e4f35807dc5b Mon Sep 17 00:00:00 2001 From: Dean Eigenmann <7621705+decanus@users.noreply.github.com> Date: Mon, 7 Sep 2020 11:12:27 +0200 Subject: [PATCH] enhancement/push-based-filter (#136) * updated protobuf * updated test --- tests/v2/test_waku_filter.nim | 17 +++++-- waku/protocol/v2/waku_filter.nim | 85 ++++++++++++++++++++++++++------ 2 files changed, 83 insertions(+), 19 deletions(-) diff --git a/tests/v2/test_waku_filter.nim b/tests/v2/test_waku_filter.nim index 1af5ef946..3a786e6dc 100644 --- a/tests/v2/test_waku_filter.nim +++ b/tests/v2/test_waku_filter.nim @@ -17,7 +17,12 @@ import procSuite "Waku Filter": test "encoding and decoding FilterRPC": - let rpc = FilterRPC(filters: @[ContentFilter(topics: @["foo", "bar"])]) + let + peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) + rpc = FilterRPC( + filterRequest: @[FilterRequest(contentFilter: @[ContentFilter(topics: @["foo", "bar"])], topic: "foo")], + messagePush: @[MessagePush(message: @[Message.init(peer, @[byte 1, 2, 3], "topic", 3, false)])] + ) let buf = rpc.encode() @@ -62,7 +67,7 @@ procSuite "Waku Filter": let transport2: TcpTransport = TcpTransport.init() let conn = await transport2.dial(transport1.ma) - var rpc = FilterRPC(filters: @[ContentFilter(topics: @["topic"])]) + var rpc = FilterRPC(filterRequest: @[FilterRequest(contentFilter: @[ContentFilter(topics: @[])], topic: "topic")]) discard await msDial.select(conn, WakuFilterCodec) await conn.writeLP(rpc.encode().buffer) @@ -73,7 +78,9 @@ procSuite "Waku Filter": var message = await conn.readLp(64*1024) - let response = protobuf.decodeMessage(initProtoBuffer(message)) - + let response = FilterRPC.init(message) + let res = response.value check: - msg == response.value + res.messagePush.len() == 1 + res.messagePush[0].message.len() == 1 + res.messagePush[0].message[0] == msg diff --git a/waku/protocol/v2/waku_filter.nim b/waku/protocol/v2/waku_filter.nim index 08858087b..632e8c463 100644 --- a/waku/protocol/v2/waku_filter.nim +++ b/waku/protocol/v2/waku_filter.nim @@ -15,18 +15,26 @@ import # relay protocol. const - WakuFilterCodec* = "/vac/waku/filter/2.0.0-alpha2" + WakuFilterCodec* = "/vac/waku/filter/2.0.0-alpha3" type ContentFilter* = object topics*: seq[string] + FilterRequest* = object + contentFilter*: seq[ContentFilter] + topic*: string + + MessagePush* = object + message*: seq[Message] + FilterRPC* = object - filters*: seq[ContentFilter] + filterRequest*: seq[FilterRequest] + messagePush*: seq[MessagePush] Subscriber = object connection: Connection - filter: FilterRPC + filter: seq[FilterRequest] WakuFilter* = ref object of LPProtocol subscribers*: seq[Subscriber] @@ -37,12 +45,14 @@ proc encode*(filter: ContentFilter): ProtoBuffer = for topic in filter.topics: result.write(1, topic) -proc encode*(rpc: FilterRPC): ProtoBuffer = +proc encode*(rpc: FilterRequest): ProtoBuffer = result = initProtoBuffer() - for filter in rpc.filters: + for filter in rpc.contentFilter: result.write(1, filter.encode()) + result.write(2, rpc.topic) + proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] = let pb = initProtoBuffer(buffer) @@ -51,15 +61,62 @@ proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] = ok(ContentFilter(topics: topics)) -proc init*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] = - var rpc = FilterRPC(filters: @[]) +proc init*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] = + var rpc = FilterRequest(contentFilter: @[], topic: "") let pb = initProtoBuffer(buffer) var buffs: seq[seq[byte]] discard ? pb.getRepeatedField(1, buffs) for buf in buffs: - rpc.filters.add(? ContentFilter.init(buf)) + rpc.contentFilter.add(? ContentFilter.init(buf)) + + discard ? pb.getField(2, rpc.topic) + + ok(rpc) + +proc encode*(push: MessagePush): ProtoBuffer = + result = initProtoBuffer() + + for push in push.message: + result.write(1, push.encodeMessage()) + +proc encode*(rpc: FilterRPC): ProtoBuffer = + result = initProtoBuffer() + + for request in rpc.filterRequest: + result.write(1, request.encode()) + + for push in rpc.messagePush: + result.write(2, push.encode()) + +proc init*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] = + var push = MessagePush() + let pb = initProtoBuffer(buffer) + + var messages: seq[seq[byte]] + discard ? pb.getRepeatedField(1, messages) + + for buf in messages: + push.message.add(? protobuf.decodeMessage(initProtoBuffer(buf))) + + ok(push) + +proc init*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] = + var rpc = FilterRPC() + let pb = initProtoBuffer(buffer) + + var requests: seq[seq[byte]] + discard ? pb.getRepeatedField(1, requests) + + for buffer in requests: + rpc.filterRequest.add(? FilterRequest.init(buffer)) + + var pushes: seq[seq[byte]] + discard ? pb.getRepeatedField(2, pushes) + + for buffer in pushes: + rpc.messagePush.add(? MessagePush.init(buffer)) ok(rpc) @@ -76,7 +133,7 @@ proc init*(T: type WakuFilter): T = if res.isErr: return - ws.subscribers.add(Subscriber(connection: conn, filter: res.value)) + ws.subscribers.add(Subscriber(connection: conn, filter: res.value.filterRequest)) # @TODO THIS IS A VERY ROUGH EXPERIMENT ws.handler = handle @@ -88,10 +145,10 @@ proc filter*(proto: WakuFilter): Filter = ## This filter can then be used to send messages to subscribers that match conditions. proc handle(msg: Message) = for subscriber in proto.subscribers: - for f in subscriber.filter.filters: - for topic in f.topics: - if topic in msg.topicIDs: - discard subscriber.connection.writeLp(msg.encodeMessage()) - break + for filter in subscriber.filter: + if filter.topic in msg.topicIDs: + # @TODO PROBABLY WANT TO BATCH MESSAGES + discard subscriber.connection.writeLp(FilterRPC(messagePush: @[MessagePush(message: @[msg])]).encode().buffer) + break Filter.init(@[], handle)