Update store protocol interface: add history content filter (#488)

* replaces topics with seq of ContentFilters

* update topics to contentFilter

* updates the contentFilter  structure

 one content  topic per content filter instead of a sequence of topics

* updates store json rpc api

* renames ContentFilter to HistoryContentFilter

* unit test for a query with several content filters

* makes shortcut for store api

* updates chat2

* clean up

* renames topic to contentTopic

* clarifies the use of content topic in store api

* clarifies the use of contentTopic in the init method of HistoryContentFilter
This commit is contained in:
Sanaz Taheri Boshrooyeh 2021-04-19 10:38:30 -07:00 committed by GitHub
parent 192d4dd66b
commit fb2ea06a4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 102 additions and 24 deletions

View File

@ -302,7 +302,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
echo &"{chatLine}" echo &"{chatLine}"
info "Hit store handler" info "Hit store handler"
await node.query(HistoryQuery(topics: @[DefaultContentTopic]), storeHandler) await node.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]), storeHandler)
if conf.filternode != "": if conf.filternode != "":
node.mountFilter() node.mountFilter()

View File

@ -34,7 +34,7 @@ procSuite "Waku Store":
let let
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
subscription = proto.subscription() subscription = proto.subscription()
rpc = HistoryQuery(topics: @[topic]) rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)])
proto.setPeer(listenSwitch.peerInfo) proto.setPeer(listenSwitch.peerInfo)
@ -56,6 +56,52 @@ procSuite "Waku Store":
await proto.query(rpc, handler) await proto.query(rpc, handler)
check:
(await completionFut.withTimeout(5.seconds)) == true
asyncTest "handle query with multiple content filters":
let
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.init(key)
topic1 = defaultContentTopic
topic2 = ContentTopic("2")
topic3 = ContentTopic("3")
msg1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic1)
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic2)
msg3 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic3)
var dialSwitch = newStandardSwitch()
discard await dialSwitch.start()
var listenSwitch = newStandardSwitch(some(key))
discard await listenSwitch.start()
let
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
subscription = proto.subscription()
rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic1), HistoryContentFilter(contentTopic: topic3)])
proto.setPeer(listenSwitch.peerInfo)
var subscriptions = newTable[string, MessageNotificationSubscription]()
subscriptions["test"] = subscription
listenSwitch.mount(proto)
await subscriptions.notify("foo", msg1)
await subscriptions.notify("foo", msg2)
await subscriptions.notify("foo", msg3)
var completionFut = newFuture[bool]()
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.messages.len() == 2
response.messages.anyIt(it == msg1)
response.messages.anyIt(it == msg3)
completionFut.complete(true)
await proto.query(rpc, handler)
check: check:
(await completionFut.withTimeout(5.seconds)) == true (await completionFut.withTimeout(5.seconds)) == true
@ -78,7 +124,7 @@ procSuite "Waku Store":
let let
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store) proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store)
subscription = proto.subscription() subscription = proto.subscription()
rpc = HistoryQuery(topics: @[topic]) rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)])
proto.setPeer(listenSwitch.peerInfo) proto.setPeer(listenSwitch.peerInfo)
@ -152,7 +198,7 @@ procSuite "Waku Store":
let let
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
subscription = proto.subscription() subscription = proto.subscription()
rpc = HistoryQuery(topics: @[defaultContentTopic], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) ) rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) )
proto.setPeer(listenSwitch.peerInfo) proto.setPeer(listenSwitch.peerInfo)
@ -225,7 +271,7 @@ procSuite "Waku Store":
response.pagingInfo.cursor != Index() response.pagingInfo.cursor != Index()
completionFut.complete(true) completionFut.complete(true)
let rpc = HistoryQuery(topics: @[defaultContentTopic], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD) ) let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD) )
await proto.query(rpc, handler) await proto.query(rpc, handler)
check: check:
@ -274,7 +320,7 @@ procSuite "Waku Store":
response.pagingInfo == PagingInfo() response.pagingInfo == PagingInfo()
completionFut.complete(true) completionFut.complete(true)
let rpc = HistoryQuery(topics: @[defaultContentTopic] ) let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic)] )
await proto.query(rpc, handler) await proto.query(rpc, handler)
@ -329,7 +375,7 @@ procSuite "Waku Store":
let let
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic)) index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic))
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD) pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
query=HistoryQuery(topics: @[defaultContentTopic], pagingInfo: pagingInfo, startTime: float64(10), endTime: float64(11)) query = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic), HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: pagingInfo, startTime: float64(10), endTime: float64(11))
pb = query.encode() pb = query.encode()
decodedQuery = HistoryQuery.init(pb.buffer) decodedQuery = HistoryQuery.init(pb.buffer)
@ -417,7 +463,7 @@ procSuite "Waku Store":
response.messages.anyIt(it.timestamp == float(5)) response.messages.anyIt(it.timestamp == float(5))
completionFut.complete(true) completionFut.complete(true)
let rpc = HistoryQuery(topics: @[ContentTopic("1")], startTime: float(2), endTime: float(5)) let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: float(2), endTime: float(5))
await proto.query(rpc, handler) await proto.query(rpc, handler)
check: check:
@ -433,7 +479,7 @@ procSuite "Waku Store":
response.messages.len() == 0 response.messages.len() == 0
completionFut.complete(true) completionFut.complete(true)
let rpc = HistoryQuery(topics: @[ContentTopic("1")], startTime: float(2), endTime: float(2)) let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: float(2), endTime: float(2))
await proto.query(rpc, handler) await proto.query(rpc, handler)
check: check:
@ -450,7 +496,7 @@ procSuite "Waku Store":
completionFut.complete(true) completionFut.complete(true)
# time window is invalid since start time > end time # time window is invalid since start time > end time
let rpc = HistoryQuery(topics: @[ContentTopic("1")], startTime: float(5), endTime: float(2)) let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: float(5), endTime: float(2))
await proto.query(rpc, handler) await proto.query(rpc, handler)
check: check:

View File

@ -81,7 +81,7 @@ procSuite "Waku SWAP Accounting":
response.messages[0] == message response.messages[0] == message
completionFut.complete(true) completionFut.complete(true)
await node1.query(HistoryQuery(topics: @[contentTopic]), storeHandler) await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), storeHandler)
check: check:
(await completionFut.withTimeout(5.seconds)) == true (await completionFut.withTimeout(5.seconds)) == true
@ -127,8 +127,8 @@ procSuite "Waku SWAP Accounting":
futures[1].complete(true) futures[1].complete(true)
# TODO Handshakes - for now we assume implicit, e2e still works for PoC # TODO Handshakes - for now we assume implicit, e2e still works for PoC
await node1.query(HistoryQuery(topics: @[contentTopic]), handler1) await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), handler1)
await node1.query(HistoryQuery(topics: @[contentTopic]), handler2) await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), handler2)
check: check:
(await allFutures(futures).withTimeout(5.seconds)) == true (await allFutures(futures).withTimeout(5.seconds)) == true

View File

@ -238,7 +238,9 @@ procSuite "WakuNode":
response.messages[0] == message response.messages[0] == message
completionFut.complete(true) completionFut.complete(true)
await node1.query(HistoryQuery(topics: @[contentTopic]), storeHandler)
await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), storeHandler)
check: check:
(await completionFut.withTimeout(5.seconds)) == true (await completionFut.withTimeout(5.seconds)) == true

View File

@ -17,7 +17,7 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
## Store API version 1 definitions ## Store API version 1 definitions
rpcsrv.rpc("get_waku_v2_store_v1_messages") do(topics: seq[ContentTopic], pagingOptions: Option[StorePagingOptions]) -> StoreResponse: rpcsrv.rpc("get_waku_v2_store_v1_messages") do(contentTopics: seq[ContentTopic], pagingOptions: Option[StorePagingOptions]) -> StoreResponse:
## Returns history for a list of content topics with optional paging ## Returns history for a list of content topics with optional paging
debug "get_waku_v2_store_v1_messages" debug "get_waku_v2_store_v1_messages"
@ -27,7 +27,11 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
debug "get_waku_v2_store_v1_messages response" debug "get_waku_v2_store_v1_messages response"
responseFut.complete(response.toStoreResponse()) responseFut.complete(response.toStoreResponse())
let historyQuery = HistoryQuery(topics: topics, var contentFilters: seq[HistoryContentFilter] = @[]
# items in contentTopics map to the contentTopic field of waku message (not to be confused with pubsub topic)
for ct in contentTopics:
contentFilters.add(HistoryContentFilter(contentTopic: ct))
let historyQuery = HistoryQuery(contentFilters: contentFilters,
pagingInfo: if pagingOptions.isSome: pagingOptions.get.toPagingInfo() else: PagingInfo()) pagingInfo: if pagingOptions.isSome: pagingOptions.get.toPagingInfo() else: PagingInfo())
await node.query(historyQuery, queryFuncHandler) await node.query(historyQuery, queryFuncHandler)

View File

@ -111,15 +111,29 @@ proc init*(T: type PagingInfo, buffer: seq[byte]): ProtoResult[T] =
ok(pagingInfo) ok(pagingInfo)
proc init*(T: type HistoryContentFilter, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer)
# ContentTopic corresponds to the contentTopic field of waku message (not to be confused with pubsub topic)
var contentTopic: ContentTopic
discard ? pb.getField(1, contentTopic)
ok(HistoryContentFilter(contentTopic: contentTopic))
proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] = proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] =
var msg = HistoryQuery() var msg = HistoryQuery()
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var topics: seq[ContentTopic] # var topics: seq[ContentTopic]
discard ? pb.getRepeatedField(2, topics) # discard ? pb.getRepeatedField(2, topics)
# msg.topics = topics
var buffs: seq[seq[byte]]
discard ? pb.getRepeatedField(2, buffs)
for buf in buffs:
msg.contentFilters.add(? HistoryContentFilter.init(buf))
msg.topics = topics
var pagingInfoBuffer: seq[byte] var pagingInfoBuffer: seq[byte]
discard ? pb.getField(3, pagingInfoBuffer) discard ? pb.getField(3, pagingInfoBuffer)
@ -166,11 +180,17 @@ proc init*(T: type HistoryRPC, buffer: seq[byte]): ProtoResult[T] =
ok(rpc) ok(rpc)
proc encode*(filter: HistoryContentFilter): ProtoBuffer =
result = initProtoBuffer()
result.write(1, filter.contentTopic)
proc encode*(query: HistoryQuery): ProtoBuffer = proc encode*(query: HistoryQuery): ProtoBuffer =
result = initProtoBuffer() result = initProtoBuffer()
for topic in query.topics: # for topic in query.topics:
result.write(2, topic) # result.write(2, topic)
for filter in query.contentFilters:
result.write(2, filter.encode())
result.write(3, query.pagingInfo.encode()) result.write(3, query.pagingInfo.encode())
@ -292,7 +312,11 @@ proc paginateWithoutIndex(list: seq[IndexedWakuMessage], pinfo: PagingInfo): (se
proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse = proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse =
result = HistoryResponse(messages: newSeq[WakuMessage]()) result = HistoryResponse(messages: newSeq[WakuMessage]())
# data holds IndexedWakuMessage whose topics match the query # data holds IndexedWakuMessage whose topics match the query
var data = w.messages.filterIt(it.msg.contentTopic in query.topics) var data : seq[IndexedWakuMessage] = @[]
for filter in query.contentFilters:
var matched = w.messages.filterIt(it.msg.contentTopic == filter.contentTopic)
# TODO remove duplicates from data
data.add(matched)
# temporal filtering # temporal filtering
# check whether the history query contains a time filter # check whether the history query contains a time filter

View File

@ -17,6 +17,8 @@ export pagination
const MaxPageSize* = uint64(100) # Maximum number of waku messages in each page const MaxPageSize* = uint64(100) # Maximum number of waku messages in each page
type type
HistoryContentFilter* = object
contentTopic*: ContentTopic
QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.} QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.}
@ -37,7 +39,7 @@ type
direction*: PagingDirection direction*: PagingDirection
HistoryQuery* = object HistoryQuery* = object
topics*: seq[ContentTopic] contentFilters*: seq[HistoryContentFilter]
pagingInfo*: PagingInfo # used for pagination pagingInfo*: PagingInfo # used for pagination
startTime*: float64 # used for time-window query startTime*: float64 # used for time-window query
endTime*: float64 # used for time-window query endTime*: float64 # used for time-window query