From e1414ac922ca4716a92fb0f94f80d7be36369299 Mon Sep 17 00:00:00 2001 From: Dean Eigenmann <7621705+decanus@users.noreply.github.com> Date: Wed, 23 Sep 2020 06:19:37 +0200 Subject: [PATCH] fix/make-filter-work (#182) * starts to actually get the filter protocol working * made it work * Update all_tests_v2.nim * Update test_waku_filter.nim --- tests/all_tests_v2.nim | 5 +-- tests/v2/test_waku_filter.nim | 74 +++++++++++++++----------------- waku/node/v2/waku_types.nim | 10 ++++- waku/node/v2/wakunode2.nim | 5 ++- waku/protocol/v2/waku_filter.nim | 49 +++++++++++++++++---- 5 files changed, 89 insertions(+), 54 deletions(-) diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index bd234863d..c25cd2a6a 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -3,6 +3,5 @@ import # TODO: enable this when it is altered into a proper waku relay test # ./v2/test_waku, ./v2/test_wakunode, - ./v2/test_waku_store -# NOTE: Disabling broken filter protocol, we don't rely on it for Nangang -# ./v2/test_waku_filter + ./v2/test_waku_store, + ./v2/test_waku_filter diff --git a/tests/v2/test_waku_filter.nim b/tests/v2/test_waku_filter.nim index df8da2b95..28e658485 100644 --- a/tests/v2/test_waku_filter.nim +++ b/tests/v2/test_waku_filter.nim @@ -17,53 +17,47 @@ import procSuite "Waku Filter": asyncTest "handle filter": + let - proto = WakuFilter.init() - subscription = proto.subscription() + key = PrivateKey.random(ECDSA, rng[]).get() + peer = PeerInfo.init(key) + post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: "pew2") + + var dialSwitch = newStandardSwitch() + discard await dialSwitch.start() + + var listenSwitch = newStandardSwitch(some(key)) + discard await listenSwitch.start() + + var completionFut = newFuture[bool]() + proc handle(msg: MessagePush) {.async, gcsafe, closure.} = + check: + msg.messages.len() == 1 + msg.messages[0] == post + completionFut.complete(true) + + let + proto = WakuFilter.init(dialSwitch, handle) + rpc = FilterRequest(contentFilter: @[ContentFilter(topics: @["pew", "pew2"])], topic: "topic") + + dialSwitch.mount(proto) + + proc emptyHandle(msg: MessagePush) {.async, gcsafe, closure.} = + discard + + let + proto2 = WakuFilter.init(listenSwitch, emptyHandle) + subscription = proto2.subscription() var subscriptions = newTable[string, MessageNotificationSubscription]() subscriptions["test"] = subscription + listenSwitch.mount(proto2) - let - peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) - msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: "pew") - msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: "pew2") - - let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() - let remoteSecKey = PrivateKey.random(ECDSA, rng[]).get() - let remotePeerInfo = PeerInfo.init( - remoteSecKey, - [ma], - ["/test/proto1/1.0.0", "/test/proto2/1.0.0"] - ) - - var serverFut: Future[void] - let msListen = newMultistream() - - msListen.addHandler(WakuFilterCodec, proto) - proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} = - await msListen.handle(conn) - - var transport1 = TcpTransport.init() - serverFut = await transport1.listen(ma, connHandler) - - let msDial = newMultistream() - let transport2: TcpTransport = TcpTransport.init() - let conn = await transport2.dial(transport1.ma) - - var rpc = FilterRequest(contentFilter: @[waku_filter.ContentFilter(topics: @["pew", "pew2"])], topic: "topic") - discard await msDial.select(conn, WakuFilterCodec) - await conn.writeLP(rpc.encode().buffer) + await proto.subscribe(listenSwitch.peerInfo, rpc) await sleepAsync(2.seconds) - subscriptions.notify("topic", msg) - subscriptions.notify("topic", msg2) - - var message = await conn.readLp(64*1024) + await subscriptions.notify("topic", post) - let response = MessagePush.init(message) - let res = response.value check: - res.messages.len() == 1 - res.messages[0] == msg + (await completionFut.withTimeout(5.seconds)) == true diff --git a/waku/node/v2/waku_types.nim b/waku/node/v2/waku_types.nim index d1ee8c568..68546fe47 100644 --- a/waku/node/v2/waku_types.nim +++ b/waku/node/v2/waku_types.nim @@ -61,12 +61,20 @@ type MessagePush* = object messages*: seq[WakuMessage] + FilterRPC* = object + request*: FilterRequest + push*: MessagePush + Subscriber* = object - connection*: Connection + peer*: PeerInfo filter*: FilterRequest # @TODO MAKE THIS A SEQUENCE AGAIN? + MessagePushHandler* = proc(msg: MessagePush): Future[void] {.gcsafe, closure.} + WakuFilter* = ref object of LPProtocol + switch*: Switch subscribers*: seq[Subscriber] + pushHandler*: MessagePushHandler # NOTE based on Eth2Node in NBC eth2_network.nim WakuNode* = ref object of RootObj diff --git a/waku/node/v2/wakunode2.nim b/waku/node/v2/wakunode2.nim index 85ab99e6c..fded29b12 100644 --- a/waku/node/v2/wakunode2.nim +++ b/waku/node/v2/wakunode2.nim @@ -106,7 +106,10 @@ proc start*(node: WakuNode) {.async.} = node.switch.mount(node.wakuStore) node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription()) - node.wakuFilter = WakuFilter.init() + proc pushHandler(msg: MessagePush) {.async, gcsafe.} = + info "push received" + + node.wakuFilter = WakuFilter.init(node.switch, pushHandler) node.switch.mount(node.wakuFilter) node.subscriptions.subscribe(WakuFilterCodec, node.wakuFilter.subscription()) diff --git a/waku/protocol/v2/waku_filter.nim b/waku/protocol/v2/waku_filter.nim index bb4456a09..76a0a8ed2 100644 --- a/waku/protocol/v2/waku_filter.nim +++ b/waku/protocol/v2/waku_filter.nim @@ -7,6 +7,7 @@ import libp2p/protocols/protocol, libp2p/protobuf/minprotobuf, libp2p/stream/connection, + libp2p/switch, ./message_notifier, ./../../node/v2/waku_types @@ -74,38 +75,68 @@ proc init*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] = ok(push) +proc init*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] = + var rpc = FilterRPC() + let pb = initProtoBuffer(buffer) + + var requestBuffer: seq[byte] + discard ? pb.getField(1, requestBuffer) + + rpc.request = ? FilterRequest.init(requestBuffer) + + var pushBuffer: seq[byte] + discard ? pb.getField(2, pushBuffer) + + rpc.push = ? MessagePush.init(pushBuffer) + + ok(rpc) + +proc encode*(rpc: FilterRPC): ProtoBuffer = + result = initProtoBuffer() + + result.write(1, rpc.request.encode()) + result.write(2, rpc.push.encode()) + method init*(wf: WakuFilter) = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = var message = await conn.readLp(64*1024) - var res = FilterRequest.init(message) + var res = FilterRPC.init(message) if res.isErr: + error "failed to decode rpc" return - wf.subscribers.add(Subscriber(connection: conn, filter: res.value)) - # @TODO THIS IS A VERY ROUGH EXPERIMENT + let value = res.value + if value.push != MessagePush(): + await wf.pushHandler(value.push) + if value.request != FilterRequest(): + wf.subscribers.add(Subscriber(peer: conn.peerInfo, filter: value.request)) wf.handler = handle wf.codec = WakuFilterCodec -proc init*(T: type WakuFilter): T = +proc init*(T: type WakuFilter, switch: Switch, handler: MessagePushHandler): T = new result + result.switch = switch + result.pushHandler = handler result.init() proc subscription*(proto: WakuFilter): MessageNotificationSubscription = ## Returns a Filter for the specific protocol ## This filter can then be used to send messages to subscribers that match conditions. proc handle(topic: string, msg: WakuMessage) {.async.} = - var futures = newSeq[Future[void]]() - for subscriber in proto.subscribers: if subscriber.filter.topic != topic: continue for filter in subscriber.filter.contentFilter: if msg.contentTopic in filter.topics: - futures.add(subscriber.connection.writeLp(MessagePush(messages: @[msg]).encode().buffer)) + let push = FilterRPC(push: MessagePush(messages: @[msg])) + let conn = await proto.switch.dial(subscriber.peer.peerId, subscriber.peer.addrs, WakuFilterCodec) + await conn.writeLP(push.encode().buffer) break - await allFutures(futures) - MessageNotificationSubscription.init(@[], handle) + +proc subscribe*(wf: WakuFilter, peer: PeerInfo, request: FilterRequest) {.async, gcsafe.} = + let conn = await wf.switch.dial(peer.peerId, peer.addrs, WakuFilterCodec) + await conn.writeLP(FilterRPC(request: request).encode().buffer)