From 7821ebc36e3f202a340776b9851cff14d2d5772c Mon Sep 17 00:00:00 2001 From: Sanaz Taheri Boshrooyeh <35961250+staheri14@users.noreply.github.com> Date: Mon, 19 Apr 2021 10:38:30 -0700 Subject: [PATCH] Update store protocol interface: add history content filter (#488) * replaces topics with seq of ContentFilters * update topics to contentFilter * updates the contentFilter structure one content topic per content filter instead of a sequence of topics * updates store json rpc api * renames ContentFilter to HistoryContentFilter * unit test for a query with several content filters * makes shortcut for store api * updates chat2 * clean up * renames topic to contentTopic * clarifies the use of content topic in store api * clarifies the use of contentTopic in the init method of HistoryContentFilter --- examples/v2/chat2.nim | 2 +- tests/v2/test_waku_store.nim | 64 ++++++++++++++++--- tests/v2/test_waku_swap.nim | 6 +- tests/v2/test_wakunode.nim | 4 +- waku/v2/node/jsonrpc/store_api.nim | 8 ++- waku/v2/protocol/waku_store/waku_store.nim | 38 +++++++++-- .../protocol/waku_store/waku_store_types.nim | 4 +- 7 files changed, 102 insertions(+), 24 deletions(-) diff --git a/examples/v2/chat2.nim b/examples/v2/chat2.nim index 7d36e6857..9792c24ca 100644 --- a/examples/v2/chat2.nim +++ b/examples/v2/chat2.nim @@ -302,7 +302,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = echo &"{chatLine}" info "Hit store handler" - await node.query(HistoryQuery(topics: @[DefaultContentTopic]), storeHandler) + await node.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]), storeHandler) if conf.filternode != "": node.mountFilter() diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index 16244db42..60ad520df 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -34,7 +34,7 @@ procSuite "Waku Store": let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) subscription = proto.subscription() - rpc = HistoryQuery(topics: @[topic]) + rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)]) proto.setPeer(listenSwitch.peerInfo) @@ -56,6 +56,52 @@ procSuite "Waku Store": await proto.query(rpc, handler) + check: + (await completionFut.withTimeout(5.seconds)) == true + asyncTest "handle query with multiple content filters": + let + key = PrivateKey.random(ECDSA, rng[]).get() + peer = PeerInfo.init(key) + topic1 = defaultContentTopic + topic2 = ContentTopic("2") + topic3 = ContentTopic("3") + msg1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic1) + msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic2) + msg3 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic3) + + var dialSwitch = newStandardSwitch() + discard await dialSwitch.start() + + var listenSwitch = newStandardSwitch(some(key)) + discard await listenSwitch.start() + + let + proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) + subscription = proto.subscription() + rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic1), HistoryContentFilter(contentTopic: topic3)]) + + proto.setPeer(listenSwitch.peerInfo) + + var subscriptions = newTable[string, MessageNotificationSubscription]() + subscriptions["test"] = subscription + + listenSwitch.mount(proto) + + await subscriptions.notify("foo", msg1) + await subscriptions.notify("foo", msg2) + await subscriptions.notify("foo", msg3) + + var completionFut = newFuture[bool]() + + proc handler(response: HistoryResponse) {.gcsafe, closure.} = + check: + response.messages.len() == 2 + response.messages.anyIt(it == msg1) + response.messages.anyIt(it == msg3) + completionFut.complete(true) + + await proto.query(rpc, handler) + check: (await completionFut.withTimeout(5.seconds)) == true @@ -78,7 +124,7 @@ procSuite "Waku Store": let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store) subscription = proto.subscription() - rpc = HistoryQuery(topics: @[topic]) + rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)]) proto.setPeer(listenSwitch.peerInfo) @@ -152,7 +198,7 @@ procSuite "Waku Store": let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) subscription = proto.subscription() - rpc = HistoryQuery(topics: @[defaultContentTopic], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) ) + rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) ) proto.setPeer(listenSwitch.peerInfo) @@ -225,7 +271,7 @@ procSuite "Waku Store": response.pagingInfo.cursor != Index() completionFut.complete(true) - let rpc = HistoryQuery(topics: @[defaultContentTopic], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD) ) + let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD) ) await proto.query(rpc, handler) check: @@ -274,7 +320,7 @@ procSuite "Waku Store": response.pagingInfo == PagingInfo() completionFut.complete(true) - let rpc = HistoryQuery(topics: @[defaultContentTopic] ) + let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic)] ) await proto.query(rpc, handler) @@ -329,7 +375,7 @@ procSuite "Waku Store": let index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic)) pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD) - query=HistoryQuery(topics: @[defaultContentTopic], pagingInfo: pagingInfo, startTime: float64(10), endTime: float64(11)) + query = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic), HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: pagingInfo, startTime: float64(10), endTime: float64(11)) pb = query.encode() decodedQuery = HistoryQuery.init(pb.buffer) @@ -417,7 +463,7 @@ procSuite "Waku Store": response.messages.anyIt(it.timestamp == float(5)) completionFut.complete(true) - let rpc = HistoryQuery(topics: @[ContentTopic("1")], startTime: float(2), endTime: float(5)) + let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: float(2), endTime: float(5)) await proto.query(rpc, handler) check: @@ -433,7 +479,7 @@ procSuite "Waku Store": response.messages.len() == 0 completionFut.complete(true) - let rpc = HistoryQuery(topics: @[ContentTopic("1")], startTime: float(2), endTime: float(2)) + let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: float(2), endTime: float(2)) await proto.query(rpc, handler) check: @@ -450,7 +496,7 @@ procSuite "Waku Store": completionFut.complete(true) # time window is invalid since start time > end time - let rpc = HistoryQuery(topics: @[ContentTopic("1")], startTime: float(5), endTime: float(2)) + let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: float(5), endTime: float(2)) await proto.query(rpc, handler) check: diff --git a/tests/v2/test_waku_swap.nim b/tests/v2/test_waku_swap.nim index 5ef7832d2..0c005ecb4 100644 --- a/tests/v2/test_waku_swap.nim +++ b/tests/v2/test_waku_swap.nim @@ -81,7 +81,7 @@ procSuite "Waku SWAP Accounting": response.messages[0] == message completionFut.complete(true) - await node1.query(HistoryQuery(topics: @[contentTopic]), storeHandler) + await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), storeHandler) check: (await completionFut.withTimeout(5.seconds)) == true @@ -127,8 +127,8 @@ procSuite "Waku SWAP Accounting": futures[1].complete(true) # TODO Handshakes - for now we assume implicit, e2e still works for PoC - await node1.query(HistoryQuery(topics: @[contentTopic]), handler1) - await node1.query(HistoryQuery(topics: @[contentTopic]), handler2) + await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), handler1) + await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), handler2) check: (await allFutures(futures).withTimeout(5.seconds)) == true diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index 40ed3bc4e..ceacd5166 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -238,7 +238,9 @@ procSuite "WakuNode": response.messages[0] == message completionFut.complete(true) - await node1.query(HistoryQuery(topics: @[contentTopic]), storeHandler) + + await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), storeHandler) + check: (await completionFut.withTimeout(5.seconds)) == true diff --git a/waku/v2/node/jsonrpc/store_api.nim b/waku/v2/node/jsonrpc/store_api.nim index 05806ff1e..ce79eb14f 100644 --- a/waku/v2/node/jsonrpc/store_api.nim +++ b/waku/v2/node/jsonrpc/store_api.nim @@ -17,7 +17,7 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = ## Store API version 1 definitions - rpcsrv.rpc("get_waku_v2_store_v1_messages") do(topics: seq[ContentTopic], pagingOptions: Option[StorePagingOptions]) -> StoreResponse: + rpcsrv.rpc("get_waku_v2_store_v1_messages") do(contentTopics: seq[ContentTopic], pagingOptions: Option[StorePagingOptions]) -> StoreResponse: ## Returns history for a list of content topics with optional paging debug "get_waku_v2_store_v1_messages" @@ -27,7 +27,11 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = debug "get_waku_v2_store_v1_messages response" responseFut.complete(response.toStoreResponse()) - let historyQuery = HistoryQuery(topics: topics, + var contentFilters: seq[HistoryContentFilter] = @[] + # items in contentTopics map to the contentTopic field of waku message (not to be confused with pubsub topic) + for ct in contentTopics: + contentFilters.add(HistoryContentFilter(contentTopic: ct)) + let historyQuery = HistoryQuery(contentFilters: contentFilters, pagingInfo: if pagingOptions.isSome: pagingOptions.get.toPagingInfo() else: PagingInfo()) await node.query(historyQuery, queryFuncHandler) diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index fa991a41e..006edb9fc 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -110,16 +110,30 @@ proc init*(T: type PagingInfo, buffer: seq[byte]): ProtoResult[T] = pagingInfo.direction = PagingDirection(direction) ok(pagingInfo) - + +proc init*(T: type HistoryContentFilter, buffer: seq[byte]): ProtoResult[T] = + let pb = initProtoBuffer(buffer) + + # ContentTopic corresponds to the contentTopic field of waku message (not to be confused with pubsub topic) + var contentTopic: ContentTopic + discard ? pb.getField(1, contentTopic) + + ok(HistoryContentFilter(contentTopic: contentTopic)) + proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] = var msg = HistoryQuery() let pb = initProtoBuffer(buffer) - var topics: seq[ContentTopic] + # var topics: seq[ContentTopic] - discard ? pb.getRepeatedField(2, topics) + # discard ? pb.getRepeatedField(2, topics) + # msg.topics = topics + var buffs: seq[seq[byte]] + discard ? pb.getRepeatedField(2, buffs) + + for buf in buffs: + msg.contentFilters.add(? HistoryContentFilter.init(buf)) - msg.topics = topics var pagingInfoBuffer: seq[byte] discard ? pb.getField(3, pagingInfoBuffer) @@ -166,11 +180,17 @@ proc init*(T: type HistoryRPC, buffer: seq[byte]): ProtoResult[T] = ok(rpc) +proc encode*(filter: HistoryContentFilter): ProtoBuffer = + result = initProtoBuffer() + result.write(1, filter.contentTopic) + proc encode*(query: HistoryQuery): ProtoBuffer = result = initProtoBuffer() - for topic in query.topics: - result.write(2, topic) + # for topic in query.topics: + # result.write(2, topic) + for filter in query.contentFilters: + result.write(2, filter.encode()) result.write(3, query.pagingInfo.encode()) @@ -292,7 +312,11 @@ proc paginateWithoutIndex(list: seq[IndexedWakuMessage], pinfo: PagingInfo): (se proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse = result = HistoryResponse(messages: newSeq[WakuMessage]()) # data holds IndexedWakuMessage whose topics match the query - var data = w.messages.filterIt(it.msg.contentTopic in query.topics) + var data : seq[IndexedWakuMessage] = @[] + for filter in query.contentFilters: + var matched = w.messages.filterIt(it.msg.contentTopic == filter.contentTopic) + # TODO remove duplicates from data + data.add(matched) # temporal filtering # check whether the history query contains a time filter diff --git a/waku/v2/protocol/waku_store/waku_store_types.nim b/waku/v2/protocol/waku_store/waku_store_types.nim index c5dee2def..3046de3fb 100644 --- a/waku/v2/protocol/waku_store/waku_store_types.nim +++ b/waku/v2/protocol/waku_store/waku_store_types.nim @@ -17,6 +17,8 @@ export pagination const MaxPageSize* = uint64(100) # Maximum number of waku messages in each page type + HistoryContentFilter* = object + contentTopic*: ContentTopic QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.} @@ -37,7 +39,7 @@ type direction*: PagingDirection HistoryQuery* = object - topics*: seq[ContentTopic] + contentFilters*: seq[HistoryContentFilter] pagingInfo*: PagingInfo # used for pagination startTime*: float64 # used for time-window query endTime*: float64 # used for time-window query