diff --git a/docs/api/v2/node.md b/docs/api/v2/node.md index ab6d0de70..628967d01 100644 --- a/docs/api/v2/node.md +++ b/docs/api/v2/node.md @@ -25,14 +25,11 @@ method subscribe*(w: WakuNode, topic: Topic, handler: TopicHandler) ## NOTE The data field SHOULD be decoded as a WakuMessage. ## Status: Implemented. -method subscribe*(w: WakuNode, contentFilter: ContentFilter, handler: ContentFilterHandler) - ## Subscribes to a ContentFilter. Triggers handler when receiving messages on - ## this content filter. ContentFilter is a method that takes some content - ## filter, specifically with `ContentTopic`, and a `Message`. The `Message` - ## has to match the `ContentTopic`. - - ## Status: Not yet implemented. - ## TODO Implement as wrapper around `waku_filter` and `subscribe` above. +method subscribe*(w: WakuNode, filter: FilterRequest, handler: ContentFilterHandler) + ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. + ## FilterHandler is a method that takes a MessagePush. + ## + ## Status: Implemented. method unsubscribe*(w: WakuNode, topic: Topic) ## Unsubscribe from a topic. diff --git a/tests/v2/test_waku_filter.nim b/tests/v2/test_waku_filter.nim index f20329d3d..eaf4e290b 100644 --- a/tests/v2/test_waku_filter.nim +++ b/tests/v2/test_waku_filter.nim @@ -29,20 +29,21 @@ procSuite "Waku Filter": var listenSwitch = newStandardSwitch(some(key)) discard await listenSwitch.start() - var completionFut = newFuture[bool]() - proc handle(msg: MessagePush) {.async, gcsafe, closure.} = + var responseRequestIdFuture = newFuture[string]() + proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = check: msg.messages.len() == 1 msg.messages[0] == post - completionFut.complete(true) + responseRequestIdFuture.complete(requestId) let proto = WakuFilter.init(dialSwitch, crypto.newRng(), handle) - rpc = FilterRequest(contentFilter: @[ContentFilter(topics: @["pew", "pew2"])], topic: "topic") + rpc = FilterRequest(contentFilters: @[ContentFilter(topics: @["pew", "pew2"])], topic: "topic") dialSwitch.mount(proto) + proto.setPeer(listenSwitch.peerInfo) - proc emptyHandle(msg: MessagePush) {.async, gcsafe, closure.} = + proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = discard let @@ -53,11 +54,11 @@ procSuite "Waku Filter": subscriptions["test"] = subscription listenSwitch.mount(proto2) - await proto.subscribe(listenSwitch.peerInfo, rpc) + let id = await proto.subscribe(rpc) await sleepAsync(2.seconds) await subscriptions.notify("topic", post) check: - (await completionFut.withTimeout(5.seconds)) == true + (await responseRequestIdFuture) == id diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index b53e0d5ec..dc56b64bf 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -7,7 +7,7 @@ import libp2p/crypto/secp, libp2p/switch, eth/keys, - ../../waku/protocol/v2/[waku_relay, waku_store, message_notifier], + ../../waku/protocol/v2/[waku_relay, waku_store, waku_filter, message_notifier], ../../waku/node/v2/[wakunode2, waku_types], ../test_helpers @@ -20,7 +20,7 @@ procSuite "WakuNode": Port(60000)) pubSubTopic = "chat" contentTopic = "foobar" - contentFilter = ContentFilter(topics: @[contentTopic]) + filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])]) message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) @@ -31,24 +31,28 @@ procSuite "WakuNode": if msg.isOk(): check: topic == "chat" - node.filters.notify(msg[]) + node.filters.notify(msg.value(), topic) var completionFut = newFuture[bool]() # This would be the actual application handler - proc contentHandler(message: seq[byte]) {.gcsafe, closure.} = - let msg = string.fromBytes(message) + proc contentHandler(msg: WakuMessage) {.gcsafe, closure.} = + let message = string.fromBytes(msg.payload) check: - msg == "hello world" + message == "hello world" completionFut.complete(true) await node.start() # Subscribe our node to the pubSubTopic where all chat data go onto. await node.subscribe(pubSubTopic, relayHandler) + # Subscribe a contentFilter to trigger a specific application handler when # WakuMessages with that content are received - await node.subscribe(contentFilter, contentHandler) + # node2.wakuFilter.setPeer(node1.peerInfo) + await node.subscribe(filterRequest, contentHandler) + + await sleepAsync(2000.millis) node.publish(pubSubTopic, message) @@ -67,7 +71,7 @@ procSuite "WakuNode": Port(60002)) pubSubTopic = "chat" contentTopic = "foobar" - contentFilter = ContentFilter(topics: @[contentTopic]) + filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])]) message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) @@ -80,13 +84,13 @@ procSuite "WakuNode": if msg.isOk(): check: topic == "chat" - node1.filters.notify(msg[]) + node1.filters.notify(msg.value(), topic) # This would be the actual application handler - proc contentHandler1(message: seq[byte]) {.gcsafe, closure.} = - let msg = string.fromBytes(message) + proc contentHandler(msg: WakuMessage) {.gcsafe, closure.} = + let message = string.fromBytes(msg.payload) check: - msg == "hello world" + message == "hello world" completionFut.complete(true) await allFutures([node1.start(), node2.start()]) @@ -95,10 +99,13 @@ procSuite "WakuNode": await node1.subscribe(pubSubTopic, relayHandler) # Subscribe a contentFilter to trigger a specific application handler when # WakuMessages with that content are received - await node1.subscribe(contentFilter, contentHandler1) + node1.wakuFilter.setPeer(node2.peerInfo) + await node1.subscribe(filterRequest, contentHandler) + await sleepAsync(2000.millis) + # Connect peers by dialing from node2 to node1 let conn = await node2.switch.dial(node1.peerInfo, WakuRelayCodec) - # + # We need to sleep to allow the subscription to go through info "Going to sleep to allow subscribe to go through" await sleepAsync(2000.millis) @@ -144,3 +151,39 @@ procSuite "WakuNode": (await completionFut.withTimeout(5.seconds)) == true await node1.stop() await node2.stop() + + asyncTest "Filter protocol returns expected message": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), + Port(60000)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), + Port(60002)) + contentTopic = "foobar" + message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) + + var completionFut = newFuture[bool]() + + await node1.start() + await node2.start() + + node1.wakuFilter.setPeer(node2.peerInfo) + + proc handler(msg: WakuMessage) {.gcsafe, closure.} = + check: + msg == message + completionFut.complete(true) + + await node1.subscribe(FilterRequest(topic: "waku", contentFilters: @[ContentFilter(topics: @[contentTopic])]), handler) + + await sleepAsync(2000.millis) + + await node2.subscriptions.notify("waku", message) + + await sleepAsync(2000.millis) + + check: + (await completionFut.withTimeout(5.seconds)) == true + await node1.stop() + await node2.stop() diff --git a/waku/node/v2/rpc/wakucallsigs.nim b/waku/node/v2/rpc/wakucallsigs.nim index 6ca19d38b..112b2af3f 100644 --- a/waku/node/v2/rpc/wakucallsigs.nim +++ b/waku/node/v2/rpc/wakucallsigs.nim @@ -6,6 +6,7 @@ proc waku_publish(topic: string, message: seq[byte]): bool proc waku_publish2(topic: string, message: seq[byte]): bool proc waku_subscribe(topic: string): bool proc waku_query(topics: seq[string]): bool +proc waku_subscribe_filter(topic: string, contentFilters: seq[seq[string]]): bool #proc waku_subscribe(topic: string, handler: Topichandler): bool # NYI diff --git a/waku/node/v2/rpc/wakurpc.nim b/waku/node/v2/rpc/wakurpc.nim index 8fa410677..8f5f1838f 100644 --- a/waku/node/v2/rpc/wakurpc.nim +++ b/waku/node/v2/rpc/wakurpc.nim @@ -75,3 +75,17 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) = await node.query(HistoryQuery(topics: topics), handler) return true + + rpcsrv.rpc("waku_subscribe_filter") do(topic: string, contentFilters: seq[seq[string]]) -> bool: + debug "waku_subscribe_filter" + + # XXX: Hacky in-line handler + proc handler(msg: WakuMessage) {.gcsafe, closure.} = + info "Hit subscribe response", message=msg + + var filters = newSeq[ContentFilter]() + for topics in contentFilters: + filters.add(ContentFilter(topics: topics)) + + await node.subscribe(FilterRequest(topic: topic, contentFilters: filters), handler) + return true diff --git a/waku/node/v2/waku_types.nim b/waku/node/v2/waku_types.nim index 3f2024a7f..e4f77216b 100644 --- a/waku/node/v2/waku_types.nim +++ b/waku/node/v2/waku_types.nim @@ -18,20 +18,6 @@ type Topic* = string Message* = seq[byte] - # TODO: these filter structures can be simplified but like this for now to - # match Node API - # Also, should reuse in filter/wakufilter code, but cyclic imports right now. - ContentFilter* = object - topics*: seq[string] - - ContentFilterHandler* = proc(message: seq[byte]) {.gcsafe, closure.} - - Filter* = object - contentFilter*: ContentFilter - handler*: ContentFilterHandler - - Filters* = Table[string, Filter] - WakuMessage* = object payload*: seq[byte] contentTopic*: string @@ -67,7 +53,7 @@ type messages*: seq[WakuMessage] FilterRequest* = object - contentFilter*: seq[ContentFilter] + contentFilters*: seq[ContentFilter] topic*: string MessagePush* = object @@ -83,14 +69,30 @@ type requestId*: string filter*: FilterRequest # @TODO MAKE THIS A SEQUENCE AGAIN? - MessagePushHandler* = proc(msg: MessagePush): Future[void] {.gcsafe, closure.} + MessagePushHandler* = proc(requestId: string, msg: MessagePush) {.gcsafe, closure.} + + FilterPeer* = object + peerInfo*: PeerInfo WakuFilter* = ref object of LPProtocol rng*: ref BrHmacDrbgContext switch*: Switch + peers*: seq[FilterPeer] subscribers*: seq[Subscriber] pushHandler*: MessagePushHandler + ContentFilter* = object + topics*: seq[string] + + ContentFilterHandler* = proc(msg: WakuMessage) {.gcsafe, closure.} + + Filter* = object + contentFilters*: seq[ContentFilter] + handler*: ContentFilterHandler + + # @TODO MAYBE MORE INFO? + Filters* = Table[string, Filter] + # NOTE based on Eth2Node in NBC eth2_network.nim WakuNode* = ref object of RootObj switch*: Switch @@ -125,13 +127,23 @@ proc encode*(message: WakuMessage): ProtoBuffer = result.write(1, message.payload) result.write(2, message.contentTopic) -proc notify*(filters: Filters, msg: WakuMessage) = - for filter in filters.values: +proc notify*(filters: Filters, msg: WakuMessage, requestId: string = "") = + for key in filters.keys: + let filter = filters[key] + # We do this because the key for the filter is set to the requestId received from the filter protocol. + # This means we do not need to check the content filter explicitly as all MessagePushs already contain + # the requestId of the coresponding filter. + if requestId != "" and requestId == key: + filter.handler(msg) + continue + # TODO: In case of no topics we should either trigger here for all messages, # or we should not allow such filter to exist in the first place. - if filter.contentFilter.topics.len > 0: - if msg.contentTopic in filter.contentFilter.topics: - filter.handler(msg.payload) + for contentFilter in filter.contentFilters: + if contentFilter.topics.len > 0: + if msg.contentTopic in contentFilter.topics: + filter.handler(msg) + break proc generateRequestId*(rng: ref BrHmacDrbgContext): string = var bytes: array[10, byte] diff --git a/waku/node/v2/wakunode2.nim b/waku/node/v2/wakunode2.nim index dba05af6b..73cee2624 100644 --- a/waku/node/v2/wakunode2.nim +++ b/waku/node/v2/wakunode2.nim @@ -88,7 +88,8 @@ proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey, rng: crypto.newRng(), peerInfo: peerInfo, wakuRelay: wakuRelay, - subscriptions: newTable[string, MessageNotificationSubscription]() + subscriptions: newTable[string, MessageNotificationSubscription](), + filters: initTable[string, Filter]() ) for topic in topics: @@ -108,17 +109,19 @@ proc start*(node: WakuNode) {.async.} = node.switch.mount(node.wakuStore) node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription()) - proc pushHandler(msg: MessagePush) {.async, gcsafe.} = + proc filterHandler(requestId: string, msg: MessagePush) {.gcsafe.} = info "push received" + for message in msg.messages: + node.filters.notify(message, requestId) - node.wakuFilter = WakuFilter.init(node.switch, node.rng, pushHandler) + node.wakuFilter = WakuFilter.init(node.switch, node.rng, filterHandler) node.switch.mount(node.wakuFilter) node.subscriptions.subscribe(WakuFilterCodec, node.wakuFilter.subscription()) proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = let msg = WakuMessage.init(data) if msg.isOk(): - await node.subscriptions.notify(topic, msg.value()) + node.filters.notify(msg.value(), "") await node.wakuRelay.subscribe("waku", relayHandler) @@ -146,15 +149,16 @@ proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) {.async.} = let wakuRelay = node.wakuRelay await wakuRelay.subscribe(topic, handler) -proc subscribe*(node: WakuNode, contentFilter: waku_types.ContentFilter, handler: ContentFilterHandler) {.async.} = - ## Subscribes to a ContentFilter. Triggers handler when receiving messages on - ## this content filter. ContentFilter is a method that takes some content - ## filter, specifically with `ContentTopic`, and a `Message`. The `Message` - ## has to match the `ContentTopic`. - info "subscribe content", contentFilter=contentFilter +proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHandler) {.async, gcsafe.} = + ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. + ## FilterHandler is a method that takes a MessagePush. + ## + ## Status: Implemented. + info "subscribe content", filter=request - # TODO: get some random id, or use the Filter directly as key - node.filters.add("some random id", Filter(contentFilter: contentFilter, handler: handler)) + # @TODO: ERROR HANDLING + let id = await node.wakuFilter.subscribe(request) + node.filters[id] = Filter(contentFilters: request.contentFilters, handler: handler) proc unsubscribe*(w: WakuNode, topic: Topic) = echo "NYI" diff --git a/waku/protocol/v2/waku_filter.nim b/waku/protocol/v2/waku_filter.nim index 537fa9b20..953e8408c 100644 --- a/waku/protocol/v2/waku_filter.nim +++ b/waku/protocol/v2/waku_filter.nim @@ -32,7 +32,7 @@ proc encode*(filter: ContentFilter): ProtoBuffer = proc encode*(rpc: FilterRequest): ProtoBuffer = result = initProtoBuffer() - for filter in rpc.contentFilter: + for filter in rpc.contentFilters: result.write(1, filter.encode()) result.write(2, rpc.topic) @@ -46,14 +46,14 @@ proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] = ok(ContentFilter(topics: topics)) proc init*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] = - var rpc = FilterRequest(contentFilter: @[], topic: "") + var rpc = FilterRequest(contentFilters: @[], topic: "") let pb = initProtoBuffer(buffer) var buffs: seq[seq[byte]] discard ? pb.getRepeatedField(1, buffs) for buf in buffs: - rpc.contentFilter.add(? ContentFilter.init(buf)) + rpc.contentFilters.add(? ContentFilter.init(buf)) discard ? pb.getField(2, rpc.topic) @@ -81,13 +81,15 @@ proc init*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] = var rpc = FilterRPC() let pb = initProtoBuffer(buffer) + discard ? pb.getField(1, rpc.requestId) + var requestBuffer: seq[byte] - discard ? pb.getField(1, requestBuffer) + discard ? pb.getField(2, requestBuffer) rpc.request = ? FilterRequest.init(requestBuffer) var pushBuffer: seq[byte] - discard ? pb.getField(2, pushBuffer) + discard ? pb.getField(3, pushBuffer) rpc.push = ? MessagePush.init(pushBuffer) @@ -96,8 +98,9 @@ proc init*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] = proc encode*(rpc: FilterRPC): ProtoBuffer = result = initProtoBuffer() - result.write(1, rpc.request.encode()) - result.write(2, rpc.push.encode()) + result.write(1, rpc.requestId) + result.write(2, rpc.request.encode()) + result.write(3, rpc.push.encode()) method init*(wf: WakuFilter) = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = @@ -107,9 +110,11 @@ method init*(wf: WakuFilter) = error "failed to decode rpc" return + info "filter message received" + let value = res.value if value.push != MessagePush(): - await wf.pushHandler(value.push) + wf.pushHandler(value.requestId, value.push) if value.request != FilterRequest(): wf.subscribers.add(Subscriber(peer: conn.peerInfo, requestId: value.requestId, filter: value.request)) @@ -123,6 +128,10 @@ proc init*(T: type WakuFilter, switch: Switch, rng: ref BrHmacDrbgContext, handl result.pushHandler = handler result.init() +# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY +proc setPeer*(wf: WakuFilter, peer: PeerInfo) = + wf.peers.add(FilterPeer(peerInfo: peer)) + proc subscription*(proto: WakuFilter): MessageNotificationSubscription = ## Returns a Filter for the specific protocol ## This filter can then be used to send messages to subscribers that match conditions. @@ -131,7 +140,7 @@ proc subscription*(proto: WakuFilter): MessageNotificationSubscription = if subscriber.filter.topic != topic: continue - for filter in subscriber.filter.contentFilter: + for filter in subscriber.filter.contentFilters: if msg.contentTopic in filter.topics: let push = FilterRPC(requestId: subscriber.requestId, push: MessagePush(messages: @[msg])) let conn = await proto.switch.dial(subscriber.peer.peerId, subscriber.peer.addrs, WakuFilterCodec) @@ -140,6 +149,11 @@ proc subscription*(proto: WakuFilter): MessageNotificationSubscription = MessageNotificationSubscription.init(@[], handle) -proc subscribe*(wf: WakuFilter, peer: PeerInfo, request: FilterRequest) {.async, gcsafe.} = - let conn = await wf.switch.dial(peer.peerId, peer.addrs, WakuFilterCodec) - await conn.writeLP(FilterRPC(requestId: generateRequestId(wf.rng), request: request).encode().buffer) +proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[string] {.async, gcsafe.} = + let id = generateRequestId(wf.rng) + if wf.peers.len >= 1: + let peer = wf.peers[0].peerInfo + # @TODO: THERE SHOULD BE ERROR HANDLING HERE, WHAT IF A PEER IS GONE? WHAT IF THERE IS A TIMEOUT ETC. + let conn = await wf.switch.dial(peer.peerId, peer.addrs, WakuFilterCodec) + await conn.writeLP(FilterRPC(requestId: id, request: request).encode().buffer) + result = id