diff --git a/CHANGELOG.md b/CHANGELOG.md index 304d1b464..087683a84 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ which is a sequence of string. ### Fixes - All `HistoryResponse` messages are now auto-paginated to a maximum of 100 messages per response - Increased maximum length for reading from a libp2p input stream to allow largest possible protocol messages, including `HistoryResponse` messages at max size. +- Significantly improved store node query performance - Added GossipSub `MessageIdProvider` for `11/WAKU2-RELAY` messages. ## 2021-11-05 v0.6 diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index ed607f67f..ecbab596c 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -278,8 +278,9 @@ proc findIndex*(msgList: seq[IndexedWakuMessage], index: Index): Option[int] = return some(i) return none(int) -proc paginate*(list: seq[IndexedWakuMessage], pinfo: PagingInfo): (seq[IndexedWakuMessage], PagingInfo, HistoryResponseError) = - ## takes list, and performs paging based on pinfo +proc paginate*(msgList: seq[IndexedWakuMessage], pinfo: PagingInfo): (seq[IndexedWakuMessage], PagingInfo, HistoryResponseError) = + ## takes a message list, and performs paging based on pinfo + ## the message list must be sorted ## returns the page i.e, a sequence of IndexedWakuMessage and the new paging info to be used for the next paging request var cursor = pinfo.cursor @@ -287,8 +288,8 @@ proc paginate*(list: seq[IndexedWakuMessage], pinfo: PagingInfo): (seq[IndexedWa dir = pinfo.direction output: (seq[IndexedWakuMessage], PagingInfo, HistoryResponseError) - if list.len == 0: # no pagination is needed for an empty list - output = (list, PagingInfo(pageSize: 0, cursor:pinfo.cursor, direction: pinfo.direction), HistoryResponseError.NONE) + if msgList.len == 0: # no pagination is needed for an empty list + output = (msgList, PagingInfo(pageSize: 0, cursor:pinfo.cursor, direction: pinfo.direction), HistoryResponseError.NONE) return output ## Adjust pageSize: @@ -298,28 +299,26 @@ proc paginate*(list: seq[IndexedWakuMessage], pinfo: PagingInfo): (seq[IndexedWa if (pageSize == uint64(0)) or (pageSize > MaxPageSize): pageSize = MaxPageSize - # sort the existing messages - var - msgList = list # makes a copy of the list - total = uint64(msgList.len) - # sorts msgList based on the custom comparison proc indexedWakuMessageComparison - msgList.sort(indexedWakuMessageComparison) + let total = uint64(msgList.len) # set the cursor of the initial paging request var isInitialQuery = false + var cursorIndex: uint64 if cursor == Index(): # an empty cursor means it is an initial query isInitialQuery = true case dir - of PagingDirection.FORWARD: - cursor = msgList[0].index # set the cursor to the beginning of the list - of PagingDirection.BACKWARD: - cursor = msgList[list.len - 1].index # set the cursor to the end of the list - - var cursorIndexOption = msgList.findIndex(cursor) - if cursorIndexOption.isNone: # the cursor is not valid - output = (@[], PagingInfo(pageSize: 0, cursor:pinfo.cursor, direction: pinfo.direction), HistoryResponseError.INVALID_CURSOR) - return output - var cursorIndex = uint64(cursorIndexOption.get()) + of PagingDirection.FORWARD: + cursorIndex = 0 + cursor = msgList[cursorIndex].index # set the cursor to the beginning of the list + of PagingDirection.BACKWARD: + cursorIndex = total - 1 + cursor = msgList[cursorIndex].index # set the cursor to the end of the list + else: + var cursorIndexOption = msgList.findIndex(cursor) + if cursorIndexOption.isNone: # the cursor is not valid + output = (@[], PagingInfo(pageSize: 0, cursor:pinfo.cursor, direction: pinfo.direction), HistoryResponseError.INVALID_CURSOR) + return output + cursorIndex = uint64(cursorIndexOption.get()) case dir of PagingDirection.FORWARD: # forward pagination @@ -374,42 +373,57 @@ proc paginate*(list: seq[IndexedWakuMessage], pinfo: PagingInfo): (seq[IndexedWa proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse = - var data : seq[IndexedWakuMessage] = w.messages.allItems() - - # filter based on content filters - # an empty list of contentFilters means no content filter is requested - if ((query.contentFilters).len != 0): - # matchedMessages holds IndexedWakuMessage whose content topics match the queried Content filters - var matchedMessages : seq[IndexedWakuMessage] = @[] - for filter in query.contentFilters: - var matched = w.messages.filterIt(it.msg.contentTopic == filter.contentTopic) - matchedMessages.add(matched) - # remove duplicates - # duplicates may exist if two content filters target the same content topic, then the matched message gets added more than once - data = matchedMessages.deduplicate() - - # filter based on pubsub topic - # an empty pubsub topic means no pubsub topic filter is requested - if ((query.pubsubTopic).len != 0): - data = data.filterIt(it.pubsubTopic == query.pubsubTopic) - - # temporal filtering - # check whether the history query contains a time filter - if (query.endTime != float64(0) and query.startTime != float64(0)): - # for a valid time query, select messages whose sender generated timestamps fall bw the queried start time and end time - data = data.filterIt(it.msg.timestamp <= query.endTime and it.msg.timestamp >= query.startTime) - + ## Extract query criteria + ## All query criteria are optional + let + qContentTopics = if (query.contentFilters.len != 0): some(query.contentFilters.mapIt(it.contentTopic)) + else: none(seq[ContentTopic]) + qPubSubTopic = if (query.pubsubTopic != ""): some(query.pubsubTopic) + else: none(string) + qStartTime = if query.startTime != float64(0): some(query.startTime) + else: none(float64) + qEndTime = if query.endTime != float64(0): some(query.endTime) + else: none(float64) - # perform pagination - var (indexedWakuMsgList, updatedPagingInfo, error) = paginate(data, query.pagingInfo) + ## Compose filter predicate for message from query criteria + proc matchesQuery(indMsg: IndexedWakuMessage): bool = + if qPubSubTopic.isSome(): + # filter on pubsub topic + if indMsg.pubsubTopic != qPubSubTopic.get(): + return false + + if qStartTime.isSome() and qEndTime.isSome(): + # temporal filtering + # select only messages whose sender generated timestamps fall bw the queried start time and end time + if indMsg.msg.timestamp > qEndTime.get() or indMsg.msg.timestamp < qStartTime.get(): + return false + + if qContentTopics.isSome(): + # filter on content + if indMsg.msg.contentTopic notin qContentTopics.get(): + return false + + return true - # extract waku messages - var wakuMsgList = indexedWakuMsgList.mapIt(it.msg) + ## Filter history using predicate and sort on indexedWakuMessageComparison + ## TODO: since MaxPageSize is likely much smaller than w.messages.len, + ## we could optimise here by only filtering a portion of w.messages, + ## and repeat until we have populated a full page. + ## TODO: we can gain a lot by rather sorting on insert. Perhaps use a nim-stew + ## sorted set? + let filteredMsgs = w.messages.filterIt(it.matchesQuery) + .sorted(indexedWakuMessageComparison) + + ## Paginate the filtered messages + let (indexedWakuMsgList, updatedPagingInfo, error) = paginate(filteredMsgs, query.pagingInfo) - let historyRes = HistoryResponse(messages: wakuMsgList, pagingInfo: updatedPagingInfo, error: error) + ## Extract and return response + let + wakuMsgList = indexedWakuMsgList.mapIt(it.msg) + historyRes = HistoryResponse(messages: wakuMsgList, pagingInfo: updatedPagingInfo, error: error) + return historyRes - proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) = proc handler(conn: Connection, proto: string) {.async.} =