mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-04 11:05:38 +00:00
Improved store
query performance (#812)
This commit is contained in:
parent
f3ecdb80ea
commit
c422866a49
@ -15,6 +15,7 @@ which is a sequence of string.
|
||||
### Fixes
|
||||
- All `HistoryResponse` messages are now auto-paginated to a maximum of 100 messages per response
|
||||
- Increased maximum length for reading from a libp2p input stream to allow largest possible protocol messages, including `HistoryResponse` messages at max size.
|
||||
- Significantly improved store node query performance
|
||||
- Added GossipSub `MessageIdProvider` for `11/WAKU2-RELAY` messages.
|
||||
|
||||
## 2021-11-05 v0.6
|
||||
|
@ -278,8 +278,9 @@ proc findIndex*(msgList: seq[IndexedWakuMessage], index: Index): Option[int] =
|
||||
return some(i)
|
||||
return none(int)
|
||||
|
||||
proc paginate*(list: seq[IndexedWakuMessage], pinfo: PagingInfo): (seq[IndexedWakuMessage], PagingInfo, HistoryResponseError) =
|
||||
## takes list, and performs paging based on pinfo
|
||||
proc paginate*(msgList: seq[IndexedWakuMessage], pinfo: PagingInfo): (seq[IndexedWakuMessage], PagingInfo, HistoryResponseError) =
|
||||
## takes a message list, and performs paging based on pinfo
|
||||
## the message list must be sorted
|
||||
## returns the page i.e, a sequence of IndexedWakuMessage and the new paging info to be used for the next paging request
|
||||
var
|
||||
cursor = pinfo.cursor
|
||||
@ -287,8 +288,8 @@ proc paginate*(list: seq[IndexedWakuMessage], pinfo: PagingInfo): (seq[IndexedWa
|
||||
dir = pinfo.direction
|
||||
output: (seq[IndexedWakuMessage], PagingInfo, HistoryResponseError)
|
||||
|
||||
if list.len == 0: # no pagination is needed for an empty list
|
||||
output = (list, PagingInfo(pageSize: 0, cursor:pinfo.cursor, direction: pinfo.direction), HistoryResponseError.NONE)
|
||||
if msgList.len == 0: # no pagination is needed for an empty list
|
||||
output = (msgList, PagingInfo(pageSize: 0, cursor:pinfo.cursor, direction: pinfo.direction), HistoryResponseError.NONE)
|
||||
return output
|
||||
|
||||
## Adjust pageSize:
|
||||
@ -298,28 +299,26 @@ proc paginate*(list: seq[IndexedWakuMessage], pinfo: PagingInfo): (seq[IndexedWa
|
||||
if (pageSize == uint64(0)) or (pageSize > MaxPageSize):
|
||||
pageSize = MaxPageSize
|
||||
|
||||
# sort the existing messages
|
||||
var
|
||||
msgList = list # makes a copy of the list
|
||||
total = uint64(msgList.len)
|
||||
# sorts msgList based on the custom comparison proc indexedWakuMessageComparison
|
||||
msgList.sort(indexedWakuMessageComparison)
|
||||
let total = uint64(msgList.len)
|
||||
|
||||
# set the cursor of the initial paging request
|
||||
var isInitialQuery = false
|
||||
var cursorIndex: uint64
|
||||
if cursor == Index(): # an empty cursor means it is an initial query
|
||||
isInitialQuery = true
|
||||
case dir
|
||||
of PagingDirection.FORWARD:
|
||||
cursor = msgList[0].index # set the cursor to the beginning of the list
|
||||
of PagingDirection.BACKWARD:
|
||||
cursor = msgList[list.len - 1].index # set the cursor to the end of the list
|
||||
|
||||
var cursorIndexOption = msgList.findIndex(cursor)
|
||||
if cursorIndexOption.isNone: # the cursor is not valid
|
||||
output = (@[], PagingInfo(pageSize: 0, cursor:pinfo.cursor, direction: pinfo.direction), HistoryResponseError.INVALID_CURSOR)
|
||||
return output
|
||||
var cursorIndex = uint64(cursorIndexOption.get())
|
||||
of PagingDirection.FORWARD:
|
||||
cursorIndex = 0
|
||||
cursor = msgList[cursorIndex].index # set the cursor to the beginning of the list
|
||||
of PagingDirection.BACKWARD:
|
||||
cursorIndex = total - 1
|
||||
cursor = msgList[cursorIndex].index # set the cursor to the end of the list
|
||||
else:
|
||||
var cursorIndexOption = msgList.findIndex(cursor)
|
||||
if cursorIndexOption.isNone: # the cursor is not valid
|
||||
output = (@[], PagingInfo(pageSize: 0, cursor:pinfo.cursor, direction: pinfo.direction), HistoryResponseError.INVALID_CURSOR)
|
||||
return output
|
||||
cursorIndex = uint64(cursorIndexOption.get())
|
||||
|
||||
case dir
|
||||
of PagingDirection.FORWARD: # forward pagination
|
||||
@ -374,42 +373,57 @@ proc paginate*(list: seq[IndexedWakuMessage], pinfo: PagingInfo): (seq[IndexedWa
|
||||
|
||||
|
||||
proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse =
|
||||
var data : seq[IndexedWakuMessage] = w.messages.allItems()
|
||||
|
||||
# filter based on content filters
|
||||
# an empty list of contentFilters means no content filter is requested
|
||||
if ((query.contentFilters).len != 0):
|
||||
# matchedMessages holds IndexedWakuMessage whose content topics match the queried Content filters
|
||||
var matchedMessages : seq[IndexedWakuMessage] = @[]
|
||||
for filter in query.contentFilters:
|
||||
var matched = w.messages.filterIt(it.msg.contentTopic == filter.contentTopic)
|
||||
matchedMessages.add(matched)
|
||||
# remove duplicates
|
||||
# duplicates may exist if two content filters target the same content topic, then the matched message gets added more than once
|
||||
data = matchedMessages.deduplicate()
|
||||
|
||||
# filter based on pubsub topic
|
||||
# an empty pubsub topic means no pubsub topic filter is requested
|
||||
if ((query.pubsubTopic).len != 0):
|
||||
data = data.filterIt(it.pubsubTopic == query.pubsubTopic)
|
||||
|
||||
# temporal filtering
|
||||
# check whether the history query contains a time filter
|
||||
if (query.endTime != float64(0) and query.startTime != float64(0)):
|
||||
# for a valid time query, select messages whose sender generated timestamps fall bw the queried start time and end time
|
||||
data = data.filterIt(it.msg.timestamp <= query.endTime and it.msg.timestamp >= query.startTime)
|
||||
|
||||
## Extract query criteria
|
||||
## All query criteria are optional
|
||||
let
|
||||
qContentTopics = if (query.contentFilters.len != 0): some(query.contentFilters.mapIt(it.contentTopic))
|
||||
else: none(seq[ContentTopic])
|
||||
qPubSubTopic = if (query.pubsubTopic != ""): some(query.pubsubTopic)
|
||||
else: none(string)
|
||||
qStartTime = if query.startTime != float64(0): some(query.startTime)
|
||||
else: none(float64)
|
||||
qEndTime = if query.endTime != float64(0): some(query.endTime)
|
||||
else: none(float64)
|
||||
|
||||
# perform pagination
|
||||
var (indexedWakuMsgList, updatedPagingInfo, error) = paginate(data, query.pagingInfo)
|
||||
## Compose filter predicate for message from query criteria
|
||||
proc matchesQuery(indMsg: IndexedWakuMessage): bool =
|
||||
if qPubSubTopic.isSome():
|
||||
# filter on pubsub topic
|
||||
if indMsg.pubsubTopic != qPubSubTopic.get():
|
||||
return false
|
||||
|
||||
if qStartTime.isSome() and qEndTime.isSome():
|
||||
# temporal filtering
|
||||
# select only messages whose sender generated timestamps fall bw the queried start time and end time
|
||||
if indMsg.msg.timestamp > qEndTime.get() or indMsg.msg.timestamp < qStartTime.get():
|
||||
return false
|
||||
|
||||
if qContentTopics.isSome():
|
||||
# filter on content
|
||||
if indMsg.msg.contentTopic notin qContentTopics.get():
|
||||
return false
|
||||
|
||||
return true
|
||||
|
||||
# extract waku messages
|
||||
var wakuMsgList = indexedWakuMsgList.mapIt(it.msg)
|
||||
## Filter history using predicate and sort on indexedWakuMessageComparison
|
||||
## TODO: since MaxPageSize is likely much smaller than w.messages.len,
|
||||
## we could optimise here by only filtering a portion of w.messages,
|
||||
## and repeat until we have populated a full page.
|
||||
## TODO: we can gain a lot by rather sorting on insert. Perhaps use a nim-stew
|
||||
## sorted set?
|
||||
let filteredMsgs = w.messages.filterIt(it.matchesQuery)
|
||||
.sorted(indexedWakuMessageComparison)
|
||||
|
||||
## Paginate the filtered messages
|
||||
let (indexedWakuMsgList, updatedPagingInfo, error) = paginate(filteredMsgs, query.pagingInfo)
|
||||
|
||||
let historyRes = HistoryResponse(messages: wakuMsgList, pagingInfo: updatedPagingInfo, error: error)
|
||||
## Extract and return response
|
||||
let
|
||||
wakuMsgList = indexedWakuMsgList.mapIt(it.msg)
|
||||
historyRes = HistoryResponse(messages: wakuMsgList, pagingInfo: updatedPagingInfo, error: error)
|
||||
|
||||
return historyRes
|
||||
|
||||
|
||||
proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) =
|
||||
|
||||
proc handler(conn: Connection, proto: string) {.async.} =
|
||||
|
Loading…
x
Reference in New Issue
Block a user