nwaku/waku/waku_archive/driver/queue_driver/queue_driver.nim

305 lines
9.9 KiB
Nim

when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/options,
stew/results,
stew/sorted_set,
chronicles,
chronos
import
../../../waku_core,
../../common,
../../driver,
./index
logScope:
topics = "waku archive queue_store"
const QueueDriverDefaultMaxCapacity* = 25_000
type
IndexedWakuMessage = object
# TODO: may need to rename this object as it holds both the index and the pubsub topic of a waku message
## This type is used to encapsulate a WakuMessage and its Index
msg*: WakuMessage
index*: Index
pubsubTopic*: string
QueryFilterMatcher = proc(indexedWakuMsg: IndexedWakuMessage): bool {.gcsafe, closure.}
type
QueueDriverErrorKind {.pure.} = enum
INVALID_CURSOR
QueueDriverGetPageResult = Result[seq[ArchiveRow], QueueDriverErrorKind]
proc `$`(error: QueueDriverErrorKind): string =
case error:
of INVALID_CURSOR:
"invalid_cursor"
type QueueDriver* = ref object of ArchiveDriver
## Bounded repository for indexed messages
##
## The store queue will keep messages up to its
## configured capacity. As soon as this capacity
## is reached and a new message is added, the oldest
## item will be removed to make space for the new one.
## This implies both a `delete` and `add` operation
## for new items.
##
## TODO: a circular/ring buffer may be a more efficient implementation
## TODO: we don't need to store the Index twice (as key and in the value)
items: SortedSet[Index, IndexedWakuMessage] # sorted set of stored messages
capacity: int # Maximum amount of messages to keep
### Helpers
proc walkToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage],
startCursor: Index,
forward: bool): SortedSetResult[Index, IndexedWakuMessage] =
## Walk to util we find the cursor
## TODO: Improve performance here with a binary/tree search
var nextItem = if forward: w.first()
else: w.last()
## Fast forward until we reach the startCursor
while nextItem.isOk():
if nextItem.value.key == startCursor:
break
# Not yet at cursor. Continue advancing
nextItem = if forward: w.next()
else: w.prev()
return nextItem
#### API
proc new*(T: type QueueDriver, capacity: int = QueueDriverDefaultMaxCapacity): T =
var items = SortedSet[Index, IndexedWakuMessage].init()
return QueueDriver(items: items, capacity: capacity)
proc contains*(driver: QueueDriver, index: Index): bool =
## Return `true` if the store queue already contains the `index`, `false` otherwise.
driver.items.eq(index).isOk()
proc len*(driver: QueueDriver): int {.noSideEffect.} =
driver.items.len
proc getPage(driver: QueueDriver,
pageSize: uint = 0,
forward: bool = true,
cursor: Option[Index] = none(Index),
predicate: QueryFilterMatcher = nil): QueueDriverGetPageResult =
## Populate a single page in forward direction
## Start at the `startCursor` (exclusive), or first entry (inclusive) if not defined.
## Page size must not exceed `maxPageSize`
## Each entry must match the `pred`
var outSeq: seq[ArchiveRow]
var w = SortedSetWalkRef[Index,IndexedWakuMessage].init(driver.items)
defer: w.destroy()
var currentEntry: SortedSetResult[Index, IndexedWakuMessage]
# Find starting entry
if cursor.isSome():
let cursorEntry = w.walkToCursor(cursor.get(), forward)
if cursorEntry.isErr():
return err(QueueDriverErrorKind.INVALID_CURSOR)
# Advance walker once more
currentEntry = if forward: w.next()
else: w.prev()
else:
# Start from the beginning of the queue
currentEntry = if forward: w.first()
else: w.last()
trace "Starting page query", currentEntry=currentEntry
## This loop walks forward over the queue:
## 1. from the given cursor (or first/last entry, if not provided)
## 2. adds entries matching the predicate function to output page
## 3. until either the end of the queue or maxPageSize is reached
var numberOfItems: uint = 0
while currentEntry.isOk() and numberOfItems < pageSize:
trace "Continuing page query", currentEntry=currentEntry, numberOfItems=numberOfItems
if predicate.isNil() or predicate(currentEntry.value.data):
let
key = currentEntry.value.key
data = currentEntry.value.data
numberOfItems += 1
outSeq.add((key.pubsubTopic, data.msg, @(key.digest.data), key.receiverTime))
currentEntry = if forward: w.next()
else: w.prev()
trace "Successfully retrieved page", len=outSeq.len
return ok(outSeq)
## --- SortedSet accessors ---
iterator fwdIterator*(driver: QueueDriver): (Index, IndexedWakuMessage) =
## Forward iterator over the entire store queue
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(driver.items)
res = w.first()
while res.isOk():
yield (res.value.key, res.value.data)
res = w.next()
w.destroy()
iterator bwdIterator*(driver: QueueDriver): (Index, IndexedWakuMessage) =
## Backwards iterator over the entire store queue
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(driver.items)
res = w.last()
while res.isOk():
yield (res.value.key, res.value.data)
res = w.prev()
w.destroy()
proc first*(driver: QueueDriver): ArchiveDriverResult[IndexedWakuMessage] =
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(driver.items)
res = w.first()
w.destroy()
if res.isErr():
return err("Not found")
return ok(res.value.data)
proc last*(driver: QueueDriver): ArchiveDriverResult[IndexedWakuMessage] =
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(driver.items)
res = w.last()
w.destroy()
if res.isErr():
return err("Not found")
return ok(res.value.data)
## --- Queue API ---
proc add*(driver: QueueDriver, msg: IndexedWakuMessage): ArchiveDriverResult[void] =
## Add a message to the queue
##
## If we're at capacity, we will be removing, the oldest (first) item
if driver.contains(msg.index):
trace "could not add item to store queue. Index already exists", index=msg.index
return err("duplicate")
# TODO: the below delete block can be removed if we convert to circular buffer
if driver.items.len >= driver.capacity:
var
w = SortedSetWalkRef[Index, IndexedWakuMessage].init(driver.items)
firstItem = w.first
if cmp(msg.index, firstItem.value.key) < 0:
# When at capacity, we won't add if message index is smaller (older) than our oldest item
w.destroy # Clean up walker
return err("too_old")
discard driver.items.delete(firstItem.value.key)
w.destroy # better to destroy walker after a delete operation
driver.items.insert(msg.index).value.data = msg
return ok()
method put*(driver: QueueDriver,
pubsubTopic: PubsubTopic,
message: WakuMessage,
digest: MessageDigest,
receivedTime: Timestamp):
Future[ArchiveDriverResult[void]] {.async.} =
let index = Index(pubsubTopic: pubsubTopic, senderTime: message.timestamp, receiverTime: receivedTime, digest: digest)
let message = IndexedWakuMessage(msg: message, index: index, pubsubTopic: pubsubTopic)
return driver.add(message)
method getAllMessages*(driver: QueueDriver):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
# TODO: Implement this message_store method
return err("interface method not implemented")
method getMessages*(driver: QueueDriver,
contentTopic: seq[ContentTopic] = @[],
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = DefaultPageSize,
ascendingOrder = true):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.}=
let cursor = cursor.map(toIndex)
let matchesQuery: QueryFilterMatcher = func(row: IndexedWakuMessage): bool =
if pubsubTopic.isSome() and row.pubsubTopic != pubsubTopic.get():
return false
if contentTopic.len > 0 and row.msg.contentTopic notin contentTopic:
return false
if startTime.isSome() and row.msg.timestamp < startTime.get():
return false
if endTime.isSome() and row.msg.timestamp > endTime.get():
return false
return true
var pageRes: QueueDriverGetPageResult
try:
pageRes = driver.getPage(maxPageSize, ascendingOrder, cursor, matchesQuery)
except CatchableError, Exception:
return err(getCurrentExceptionMsg())
if pageRes.isErr():
return err($pageRes.error)
return ok(pageRes.value)
method getMessagesCount*(driver: QueueDriver):
Future[ArchiveDriverResult[int64]] {.async} =
return ok(int64(driver.len()))
method getOldestMessageTimestamp*(driver: QueueDriver):
Future[ArchiveDriverResult[Timestamp]] {.async.} =
return driver.first().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime)
method getNewestMessageTimestamp*(driver: QueueDriver):
Future[ArchiveDriverResult[Timestamp]] {.async.} =
return driver.last().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime)
method deleteMessagesOlderThanTimestamp*(driver: QueueDriver,
ts: Timestamp):
Future[ArchiveDriverResult[void]] {.async.} =
# TODO: Implement this message_store method
return err("interface method not implemented")
method deleteOldestMessagesNotWithinLimit*(driver: QueueDriver,
limit: int):
Future[ArchiveDriverResult[void]] {.async.} =
# TODO: Implement this message_store method
return err("interface method not implemented")
method close*(driver: QueueDriver):
Future[ArchiveDriverResult[void]] {.async.} =
return ok()