feature/mount-filter-node-api (#181)

* started working on filter mount

* fixes

* fix indentation

* fixes

* save

* change

* rm

* Update waku_types.nim

* Update waku_filter.nim

* fixes

* fix

* using new filter

* did some testing stuff

* rm

* fix

* updated

* update doc

* fix

* fix

* unasynced

* fix

* w -> node

* rename

* move to content filter

* fix

* method

* rename

* renamed contentFilter -> contentFilters

* readded

* moved

* first test fix

* readded

* no more need

* fixes

* fix

* fix

* fixes

* todo

* removes mother fuck

* Update waku_types.nim
This commit is contained in:
Dean Eigenmann 2020-10-02 14:48:56 +02:00 committed by GitHub
parent 75be455272
commit a975bf6d70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 160 additions and 74 deletions

View File

@ -25,14 +25,11 @@ method subscribe*(w: WakuNode, topic: Topic, handler: TopicHandler)
## NOTE The data field SHOULD be decoded as a WakuMessage.
## Status: Implemented.
method subscribe*(w: WakuNode, contentFilter: ContentFilter, handler: ContentFilterHandler)
## Subscribes to a ContentFilter. Triggers handler when receiving messages on
## this content filter. ContentFilter is a method that takes some content
## filter, specifically with `ContentTopic`, and a `Message`. The `Message`
## has to match the `ContentTopic`.
## Status: Not yet implemented.
## TODO Implement as wrapper around `waku_filter` and `subscribe` above.
method subscribe*(w: WakuNode, filter: FilterRequest, handler: ContentFilterHandler)
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
## FilterHandler is a method that takes a MessagePush.
##
## Status: Implemented.
method unsubscribe*(w: WakuNode, topic: Topic)
## Unsubscribe from a topic.

View File

@ -29,20 +29,21 @@ procSuite "Waku Filter":
var listenSwitch = newStandardSwitch(some(key))
discard await listenSwitch.start()
var completionFut = newFuture[bool]()
proc handle(msg: MessagePush) {.async, gcsafe, closure.} =
var responseRequestIdFuture = newFuture[string]()
proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
check:
msg.messages.len() == 1
msg.messages[0] == post
completionFut.complete(true)
responseRequestIdFuture.complete(requestId)
let
proto = WakuFilter.init(dialSwitch, crypto.newRng(), handle)
rpc = FilterRequest(contentFilter: @[ContentFilter(topics: @["pew", "pew2"])], topic: "topic")
rpc = FilterRequest(contentFilters: @[ContentFilter(topics: @["pew", "pew2"])], topic: "topic")
dialSwitch.mount(proto)
proto.setPeer(listenSwitch.peerInfo)
proc emptyHandle(msg: MessagePush) {.async, gcsafe, closure.} =
proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
discard
let
@ -53,11 +54,11 @@ procSuite "Waku Filter":
subscriptions["test"] = subscription
listenSwitch.mount(proto2)
await proto.subscribe(listenSwitch.peerInfo, rpc)
let id = await proto.subscribe(rpc)
await sleepAsync(2.seconds)
await subscriptions.notify("topic", post)
check:
(await completionFut.withTimeout(5.seconds)) == true
(await responseRequestIdFuture) == id

View File

@ -7,7 +7,7 @@ import
libp2p/crypto/secp,
libp2p/switch,
eth/keys,
../../waku/protocol/v2/[waku_relay, waku_store, message_notifier],
../../waku/protocol/v2/[waku_relay, waku_store, waku_filter, message_notifier],
../../waku/node/v2/[wakunode2, waku_types],
../test_helpers
@ -20,7 +20,7 @@ procSuite "WakuNode":
Port(60000))
pubSubTopic = "chat"
contentTopic = "foobar"
contentFilter = ContentFilter(topics: @[contentTopic])
filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])])
message = WakuMessage(payload: "hello world".toBytes(),
contentTopic: contentTopic)
@ -31,24 +31,28 @@ procSuite "WakuNode":
if msg.isOk():
check:
topic == "chat"
node.filters.notify(msg[])
node.filters.notify(msg.value(), topic)
var completionFut = newFuture[bool]()
# This would be the actual application handler
proc contentHandler(message: seq[byte]) {.gcsafe, closure.} =
let msg = string.fromBytes(message)
proc contentHandler(msg: WakuMessage) {.gcsafe, closure.} =
let message = string.fromBytes(msg.payload)
check:
msg == "hello world"
message == "hello world"
completionFut.complete(true)
await node.start()
# Subscribe our node to the pubSubTopic where all chat data go onto.
await node.subscribe(pubSubTopic, relayHandler)
# Subscribe a contentFilter to trigger a specific application handler when
# WakuMessages with that content are received
await node.subscribe(contentFilter, contentHandler)
# node2.wakuFilter.setPeer(node1.peerInfo)
await node.subscribe(filterRequest, contentHandler)
await sleepAsync(2000.millis)
node.publish(pubSubTopic, message)
@ -67,7 +71,7 @@ procSuite "WakuNode":
Port(60002))
pubSubTopic = "chat"
contentTopic = "foobar"
contentFilter = ContentFilter(topics: @[contentTopic])
filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])])
message = WakuMessage(payload: "hello world".toBytes(),
contentTopic: contentTopic)
@ -80,13 +84,13 @@ procSuite "WakuNode":
if msg.isOk():
check:
topic == "chat"
node1.filters.notify(msg[])
node1.filters.notify(msg.value(), topic)
# This would be the actual application handler
proc contentHandler1(message: seq[byte]) {.gcsafe, closure.} =
let msg = string.fromBytes(message)
proc contentHandler(msg: WakuMessage) {.gcsafe, closure.} =
let message = string.fromBytes(msg.payload)
check:
msg == "hello world"
message == "hello world"
completionFut.complete(true)
await allFutures([node1.start(), node2.start()])
@ -95,10 +99,13 @@ procSuite "WakuNode":
await node1.subscribe(pubSubTopic, relayHandler)
# Subscribe a contentFilter to trigger a specific application handler when
# WakuMessages with that content are received
await node1.subscribe(contentFilter, contentHandler1)
node1.wakuFilter.setPeer(node2.peerInfo)
await node1.subscribe(filterRequest, contentHandler)
await sleepAsync(2000.millis)
# Connect peers by dialing from node2 to node1
let conn = await node2.switch.dial(node1.peerInfo, WakuRelayCodec)
#
# We need to sleep to allow the subscription to go through
info "Going to sleep to allow subscribe to go through"
await sleepAsync(2000.millis)
@ -144,3 +151,39 @@ procSuite "WakuNode":
(await completionFut.withTimeout(5.seconds)) == true
await node1.stop()
await node2.stop()
asyncTest "Filter protocol returns expected message":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
Port(60000))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
Port(60002))
contentTopic = "foobar"
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
var completionFut = newFuture[bool]()
await node1.start()
await node2.start()
node1.wakuFilter.setPeer(node2.peerInfo)
proc handler(msg: WakuMessage) {.gcsafe, closure.} =
check:
msg == message
completionFut.complete(true)
await node1.subscribe(FilterRequest(topic: "waku", contentFilters: @[ContentFilter(topics: @[contentTopic])]), handler)
await sleepAsync(2000.millis)
await node2.subscriptions.notify("waku", message)
await sleepAsync(2000.millis)
check:
(await completionFut.withTimeout(5.seconds)) == true
await node1.stop()
await node2.stop()

View File

@ -6,6 +6,7 @@ proc waku_publish(topic: string, message: seq[byte]): bool
proc waku_publish2(topic: string, message: seq[byte]): bool
proc waku_subscribe(topic: string): bool
proc waku_query(topics: seq[string]): bool
proc waku_subscribe_filter(topic: string, contentFilters: seq[seq[string]]): bool
#proc waku_subscribe(topic: string, handler: Topichandler): bool
# NYI

View File

@ -75,3 +75,17 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) =
await node.query(HistoryQuery(topics: topics), handler)
return true
rpcsrv.rpc("waku_subscribe_filter") do(topic: string, contentFilters: seq[seq[string]]) -> bool:
debug "waku_subscribe_filter"
# XXX: Hacky in-line handler
proc handler(msg: WakuMessage) {.gcsafe, closure.} =
info "Hit subscribe response", message=msg
var filters = newSeq[ContentFilter]()
for topics in contentFilters:
filters.add(ContentFilter(topics: topics))
await node.subscribe(FilterRequest(topic: topic, contentFilters: filters), handler)
return true

View File

@ -18,20 +18,6 @@ type
Topic* = string
Message* = seq[byte]
# TODO: these filter structures can be simplified but like this for now to
# match Node API
# Also, should reuse in filter/wakufilter code, but cyclic imports right now.
ContentFilter* = object
topics*: seq[string]
ContentFilterHandler* = proc(message: seq[byte]) {.gcsafe, closure.}
Filter* = object
contentFilter*: ContentFilter
handler*: ContentFilterHandler
Filters* = Table[string, Filter]
WakuMessage* = object
payload*: seq[byte]
contentTopic*: string
@ -67,7 +53,7 @@ type
messages*: seq[WakuMessage]
FilterRequest* = object
contentFilter*: seq[ContentFilter]
contentFilters*: seq[ContentFilter]
topic*: string
MessagePush* = object
@ -83,14 +69,30 @@ type
requestId*: string
filter*: FilterRequest # @TODO MAKE THIS A SEQUENCE AGAIN?
MessagePushHandler* = proc(msg: MessagePush): Future[void] {.gcsafe, closure.}
MessagePushHandler* = proc(requestId: string, msg: MessagePush) {.gcsafe, closure.}
FilterPeer* = object
peerInfo*: PeerInfo
WakuFilter* = ref object of LPProtocol
rng*: ref BrHmacDrbgContext
switch*: Switch
peers*: seq[FilterPeer]
subscribers*: seq[Subscriber]
pushHandler*: MessagePushHandler
ContentFilter* = object
topics*: seq[string]
ContentFilterHandler* = proc(msg: WakuMessage) {.gcsafe, closure.}
Filter* = object
contentFilters*: seq[ContentFilter]
handler*: ContentFilterHandler
# @TODO MAYBE MORE INFO?
Filters* = Table[string, Filter]
# NOTE based on Eth2Node in NBC eth2_network.nim
WakuNode* = ref object of RootObj
switch*: Switch
@ -125,13 +127,23 @@ proc encode*(message: WakuMessage): ProtoBuffer =
result.write(1, message.payload)
result.write(2, message.contentTopic)
proc notify*(filters: Filters, msg: WakuMessage) =
for filter in filters.values:
proc notify*(filters: Filters, msg: WakuMessage, requestId: string = "") =
for key in filters.keys:
let filter = filters[key]
# We do this because the key for the filter is set to the requestId received from the filter protocol.
# This means we do not need to check the content filter explicitly as all MessagePushs already contain
# the requestId of the coresponding filter.
if requestId != "" and requestId == key:
filter.handler(msg)
continue
# TODO: In case of no topics we should either trigger here for all messages,
# or we should not allow such filter to exist in the first place.
if filter.contentFilter.topics.len > 0:
if msg.contentTopic in filter.contentFilter.topics:
filter.handler(msg.payload)
for contentFilter in filter.contentFilters:
if contentFilter.topics.len > 0:
if msg.contentTopic in contentFilter.topics:
filter.handler(msg)
break
proc generateRequestId*(rng: ref BrHmacDrbgContext): string =
var bytes: array[10, byte]

View File

@ -88,7 +88,8 @@ proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey,
rng: crypto.newRng(),
peerInfo: peerInfo,
wakuRelay: wakuRelay,
subscriptions: newTable[string, MessageNotificationSubscription]()
subscriptions: newTable[string, MessageNotificationSubscription](),
filters: initTable[string, Filter]()
)
for topic in topics:
@ -108,17 +109,19 @@ proc start*(node: WakuNode) {.async.} =
node.switch.mount(node.wakuStore)
node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription())
proc pushHandler(msg: MessagePush) {.async, gcsafe.} =
proc filterHandler(requestId: string, msg: MessagePush) {.gcsafe.} =
info "push received"
for message in msg.messages:
node.filters.notify(message, requestId)
node.wakuFilter = WakuFilter.init(node.switch, node.rng, pushHandler)
node.wakuFilter = WakuFilter.init(node.switch, node.rng, filterHandler)
node.switch.mount(node.wakuFilter)
node.subscriptions.subscribe(WakuFilterCodec, node.wakuFilter.subscription())
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data)
if msg.isOk():
await node.subscriptions.notify(topic, msg.value())
node.filters.notify(msg.value(), "")
await node.wakuRelay.subscribe("waku", relayHandler)
@ -146,15 +149,16 @@ proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) {.async.} =
let wakuRelay = node.wakuRelay
await wakuRelay.subscribe(topic, handler)
proc subscribe*(node: WakuNode, contentFilter: waku_types.ContentFilter, handler: ContentFilterHandler) {.async.} =
## Subscribes to a ContentFilter. Triggers handler when receiving messages on
## this content filter. ContentFilter is a method that takes some content
## filter, specifically with `ContentTopic`, and a `Message`. The `Message`
## has to match the `ContentTopic`.
info "subscribe content", contentFilter=contentFilter
proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHandler) {.async, gcsafe.} =
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
## FilterHandler is a method that takes a MessagePush.
##
## Status: Implemented.
info "subscribe content", filter=request
# TODO: get some random id, or use the Filter directly as key
node.filters.add("some random id", Filter(contentFilter: contentFilter, handler: handler))
# @TODO: ERROR HANDLING
let id = await node.wakuFilter.subscribe(request)
node.filters[id] = Filter(contentFilters: request.contentFilters, handler: handler)
proc unsubscribe*(w: WakuNode, topic: Topic) =
echo "NYI"

View File

@ -32,7 +32,7 @@ proc encode*(filter: ContentFilter): ProtoBuffer =
proc encode*(rpc: FilterRequest): ProtoBuffer =
result = initProtoBuffer()
for filter in rpc.contentFilter:
for filter in rpc.contentFilters:
result.write(1, filter.encode())
result.write(2, rpc.topic)
@ -46,14 +46,14 @@ proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] =
ok(ContentFilter(topics: topics))
proc init*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] =
var rpc = FilterRequest(contentFilter: @[], topic: "")
var rpc = FilterRequest(contentFilters: @[], topic: "")
let pb = initProtoBuffer(buffer)
var buffs: seq[seq[byte]]
discard ? pb.getRepeatedField(1, buffs)
for buf in buffs:
rpc.contentFilter.add(? ContentFilter.init(buf))
rpc.contentFilters.add(? ContentFilter.init(buf))
discard ? pb.getField(2, rpc.topic)
@ -81,13 +81,15 @@ proc init*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] =
var rpc = FilterRPC()
let pb = initProtoBuffer(buffer)
discard ? pb.getField(1, rpc.requestId)
var requestBuffer: seq[byte]
discard ? pb.getField(1, requestBuffer)
discard ? pb.getField(2, requestBuffer)
rpc.request = ? FilterRequest.init(requestBuffer)
var pushBuffer: seq[byte]
discard ? pb.getField(2, pushBuffer)
discard ? pb.getField(3, pushBuffer)
rpc.push = ? MessagePush.init(pushBuffer)
@ -96,8 +98,9 @@ proc init*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] =
proc encode*(rpc: FilterRPC): ProtoBuffer =
result = initProtoBuffer()
result.write(1, rpc.request.encode())
result.write(2, rpc.push.encode())
result.write(1, rpc.requestId)
result.write(2, rpc.request.encode())
result.write(3, rpc.push.encode())
method init*(wf: WakuFilter) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
@ -107,9 +110,11 @@ method init*(wf: WakuFilter) =
error "failed to decode rpc"
return
info "filter message received"
let value = res.value
if value.push != MessagePush():
await wf.pushHandler(value.push)
wf.pushHandler(value.requestId, value.push)
if value.request != FilterRequest():
wf.subscribers.add(Subscriber(peer: conn.peerInfo, requestId: value.requestId, filter: value.request))
@ -123,6 +128,10 @@ proc init*(T: type WakuFilter, switch: Switch, rng: ref BrHmacDrbgContext, handl
result.pushHandler = handler
result.init()
# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY
proc setPeer*(wf: WakuFilter, peer: PeerInfo) =
wf.peers.add(FilterPeer(peerInfo: peer))
proc subscription*(proto: WakuFilter): MessageNotificationSubscription =
## Returns a Filter for the specific protocol
## This filter can then be used to send messages to subscribers that match conditions.
@ -131,7 +140,7 @@ proc subscription*(proto: WakuFilter): MessageNotificationSubscription =
if subscriber.filter.topic != topic:
continue
for filter in subscriber.filter.contentFilter:
for filter in subscriber.filter.contentFilters:
if msg.contentTopic in filter.topics:
let push = FilterRPC(requestId: subscriber.requestId, push: MessagePush(messages: @[msg]))
let conn = await proto.switch.dial(subscriber.peer.peerId, subscriber.peer.addrs, WakuFilterCodec)
@ -140,6 +149,11 @@ proc subscription*(proto: WakuFilter): MessageNotificationSubscription =
MessageNotificationSubscription.init(@[], handle)
proc subscribe*(wf: WakuFilter, peer: PeerInfo, request: FilterRequest) {.async, gcsafe.} =
proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[string] {.async, gcsafe.} =
let id = generateRequestId(wf.rng)
if wf.peers.len >= 1:
let peer = wf.peers[0].peerInfo
# @TODO: THERE SHOULD BE ERROR HANDLING HERE, WHAT IF A PEER IS GONE? WHAT IF THERE IS A TIMEOUT ETC.
let conn = await wf.switch.dial(peer.peerId, peer.addrs, WakuFilterCodec)
await conn.writeLP(FilterRPC(requestId: generateRequestId(wf.rng), request: request).encode().buffer)
await conn.writeLP(FilterRPC(requestId: id, request: request).encode().buffer)
result = id