enhancement/push-based-filter (#136)

* updated protobuf

* updated test
This commit is contained in:
Dean Eigenmann 2020-09-07 11:12:27 +02:00 committed by GitHub
parent 196c54d6ab
commit 5b4e429a19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 83 additions and 19 deletions

View File

@ -17,7 +17,12 @@ import
procSuite "Waku Filter":
test "encoding and decoding FilterRPC":
let rpc = FilterRPC(filters: @[ContentFilter(topics: @["foo", "bar"])])
let
peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
rpc = FilterRPC(
filterRequest: @[FilterRequest(contentFilter: @[ContentFilter(topics: @["foo", "bar"])], topic: "foo")],
messagePush: @[MessagePush(message: @[Message.init(peer, @[byte 1, 2, 3], "topic", 3, false)])]
)
let buf = rpc.encode()
@ -62,7 +67,7 @@ procSuite "Waku Filter":
let transport2: TcpTransport = TcpTransport.init()
let conn = await transport2.dial(transport1.ma)
var rpc = FilterRPC(filters: @[ContentFilter(topics: @["topic"])])
var rpc = FilterRPC(filterRequest: @[FilterRequest(contentFilter: @[ContentFilter(topics: @[])], topic: "topic")])
discard await msDial.select(conn, WakuFilterCodec)
await conn.writeLP(rpc.encode().buffer)
@ -73,7 +78,9 @@ procSuite "Waku Filter":
var message = await conn.readLp(64*1024)
let response = protobuf.decodeMessage(initProtoBuffer(message))
let response = FilterRPC.init(message)
let res = response.value
check:
msg == response.value
res.messagePush.len() == 1
res.messagePush[0].message.len() == 1
res.messagePush[0].message[0] == msg

View File

@ -15,18 +15,26 @@ import
# relay protocol.
const
WakuFilterCodec* = "/vac/waku/filter/2.0.0-alpha2"
WakuFilterCodec* = "/vac/waku/filter/2.0.0-alpha3"
type
ContentFilter* = object
topics*: seq[string]
FilterRequest* = object
contentFilter*: seq[ContentFilter]
topic*: string
MessagePush* = object
message*: seq[Message]
FilterRPC* = object
filters*: seq[ContentFilter]
filterRequest*: seq[FilterRequest]
messagePush*: seq[MessagePush]
Subscriber = object
connection: Connection
filter: FilterRPC
filter: seq[FilterRequest]
WakuFilter* = ref object of LPProtocol
subscribers*: seq[Subscriber]
@ -37,12 +45,14 @@ proc encode*(filter: ContentFilter): ProtoBuffer =
for topic in filter.topics:
result.write(1, topic)
proc encode*(rpc: FilterRPC): ProtoBuffer =
proc encode*(rpc: FilterRequest): ProtoBuffer =
result = initProtoBuffer()
for filter in rpc.filters:
for filter in rpc.contentFilter:
result.write(1, filter.encode())
result.write(2, rpc.topic)
proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer)
@ -51,15 +61,62 @@ proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] =
ok(ContentFilter(topics: topics))
proc init*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] =
var rpc = FilterRPC(filters: @[])
proc init*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] =
var rpc = FilterRequest(contentFilter: @[], topic: "")
let pb = initProtoBuffer(buffer)
var buffs: seq[seq[byte]]
discard ? pb.getRepeatedField(1, buffs)
for buf in buffs:
rpc.filters.add(? ContentFilter.init(buf))
rpc.contentFilter.add(? ContentFilter.init(buf))
discard ? pb.getField(2, rpc.topic)
ok(rpc)
proc encode*(push: MessagePush): ProtoBuffer =
result = initProtoBuffer()
for push in push.message:
result.write(1, push.encodeMessage())
proc encode*(rpc: FilterRPC): ProtoBuffer =
result = initProtoBuffer()
for request in rpc.filterRequest:
result.write(1, request.encode())
for push in rpc.messagePush:
result.write(2, push.encode())
proc init*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] =
var push = MessagePush()
let pb = initProtoBuffer(buffer)
var messages: seq[seq[byte]]
discard ? pb.getRepeatedField(1, messages)
for buf in messages:
push.message.add(? protobuf.decodeMessage(initProtoBuffer(buf)))
ok(push)
proc init*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] =
var rpc = FilterRPC()
let pb = initProtoBuffer(buffer)
var requests: seq[seq[byte]]
discard ? pb.getRepeatedField(1, requests)
for buffer in requests:
rpc.filterRequest.add(? FilterRequest.init(buffer))
var pushes: seq[seq[byte]]
discard ? pb.getRepeatedField(2, pushes)
for buffer in pushes:
rpc.messagePush.add(? MessagePush.init(buffer))
ok(rpc)
@ -76,7 +133,7 @@ proc init*(T: type WakuFilter): T =
if res.isErr:
return
ws.subscribers.add(Subscriber(connection: conn, filter: res.value))
ws.subscribers.add(Subscriber(connection: conn, filter: res.value.filterRequest))
# @TODO THIS IS A VERY ROUGH EXPERIMENT
ws.handler = handle
@ -88,10 +145,10 @@ proc filter*(proto: WakuFilter): Filter =
## This filter can then be used to send messages to subscribers that match conditions.
proc handle(msg: Message) =
for subscriber in proto.subscribers:
for f in subscriber.filter.filters:
for topic in f.topics:
if topic in msg.topicIDs:
discard subscriber.connection.writeLp(msg.encodeMessage())
for filter in subscriber.filter:
if filter.topic in msg.topicIDs:
# @TODO PROBABLY WANT TO BATCH MESSAGES
discard subscriber.connection.writeLp(FilterRPC(messagePush: @[MessagePush(message: @[msg])]).encode().buffer)
break
Filter.init(@[], handle)