nwaku/waku/waku_archive/driver/queue_driver/queue_driver.nim

362 lines
10 KiB
Nim

when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import std/options, stew/results, stew/sorted_set, chronicles, chronos
import ../../../waku_core, ../../common, ../../driver, ./index
logScope:
topics = "waku archive queue_store"
const QueueDriverDefaultMaxCapacity* = 25_000
type
QueryFilterMatcher = proc(index: Index, msg: WakuMessage): bool {.gcsafe, closure.}
QueueDriver* = ref object of ArchiveDriver
## Bounded repository for indexed messages
##
## The store queue will keep messages up to its
## configured capacity. As soon as this capacity
## is reached and a new message is added, the oldest
## item will be removed to make space for the new one.
## This implies both a `delete` and `add` operation
## for new items.
# TODO: a circular/ring buffer may be a more efficient implementation
items: SortedSet[Index, WakuMessage] # sorted set of stored messages
capacity: int # Maximum amount of messages to keep
QueueDriverErrorKind {.pure.} = enum
INVALID_CURSOR
QueueDriverGetPageResult = Result[seq[ArchiveRow], QueueDriverErrorKind]
proc `$`(error: QueueDriverErrorKind): string =
case error
of INVALID_CURSOR: "invalid_cursor"
### Helpers
proc walkToCursor(
w: SortedSetWalkRef[Index, WakuMessage], startCursor: Index, forward: bool
): SortedSetResult[Index, WakuMessage] =
## Walk to util we find the cursor
## TODO: Improve performance here with a binary/tree search
var nextItem =
if forward:
w.first()
else:
w.last()
## Fast forward until we reach the startCursor
while nextItem.isOk():
if nextItem.value.key == startCursor:
break
# Not yet at cursor. Continue advancing
nextItem =
if forward:
w.next()
else:
w.prev()
return nextItem
#### API
proc new*(T: type QueueDriver, capacity: int = QueueDriverDefaultMaxCapacity): T =
var items = SortedSet[Index, WakuMessage].init()
return QueueDriver(items: items, capacity: capacity)
proc contains*(driver: QueueDriver, index: Index): bool =
## Return `true` if the store queue already contains the `index`, `false` otherwise.
return driver.items.eq(index).isOk()
proc len*(driver: QueueDriver): int {.noSideEffect.} =
return driver.items.len
proc getPage(
driver: QueueDriver,
pageSize: uint = 0,
forward: bool = true,
cursor: Option[Index] = none(Index),
predicate: QueryFilterMatcher = nil,
): QueueDriverGetPageResult =
## Populate a single page in forward direction
## Start at the `startCursor` (exclusive), or first entry (inclusive) if not defined.
## Page size must not exceed `maxPageSize`
## Each entry must match the `pred`
var outSeq: seq[ArchiveRow]
var w = SortedSetWalkRef[Index, WakuMessage].init(driver.items)
defer:
w.destroy()
var currentEntry: SortedSetResult[Index, WakuMessage]
# Find starting entry
if cursor.isSome():
let cursorEntry = w.walkToCursor(cursor.get(), forward)
if cursorEntry.isErr():
return err(QueueDriverErrorKind.INVALID_CURSOR)
# Advance walker once more
currentEntry =
if forward:
w.next()
else:
w.prev()
else:
# Start from the beginning of the queue
currentEntry =
if forward:
w.first()
else:
w.last()
trace "Starting page query", currentEntry = currentEntry
## This loop walks forward over the queue:
## 1. from the given cursor (or first/last entry, if not provided)
## 2. adds entries matching the predicate function to output page
## 3. until either the end of the queue or maxPageSize is reached
var numberOfItems: uint = 0
while currentEntry.isOk() and numberOfItems < pageSize:
trace "Continuing page query",
currentEntry = currentEntry, numberOfItems = numberOfItems
let
key = currentEntry.value.key
data = currentEntry.value.data
if predicate.isNil() or predicate(key, data):
numberOfItems += 1
outSeq.add(
(key.pubsubTopic, data, @(key.digest.data), key.receiverTime, key.hash)
)
currentEntry =
if forward:
w.next()
else:
w.prev()
trace "Successfully retrieved page", len = outSeq.len
return ok(outSeq)
## --- SortedSet accessors ---
iterator fwdIterator*(driver: QueueDriver): (Index, WakuMessage) =
## Forward iterator over the entire store queue
var
w = SortedSetWalkRef[Index, WakuMessage].init(driver.items)
res = w.first()
while res.isOk():
yield (res.value.key, res.value.data)
res = w.next()
w.destroy()
iterator bwdIterator*(driver: QueueDriver): (Index, WakuMessage) =
## Backwards iterator over the entire store queue
var
w = SortedSetWalkRef[Index, WakuMessage].init(driver.items)
res = w.last()
while res.isOk():
yield (res.value.key, res.value.data)
res = w.prev()
w.destroy()
proc first*(driver: QueueDriver): ArchiveDriverResult[Index] =
var
w = SortedSetWalkRef[Index, WakuMessage].init(driver.items)
res = w.first()
w.destroy()
if res.isErr():
return err("Not found")
return ok(res.value.key)
proc last*(driver: QueueDriver): ArchiveDriverResult[Index] =
var
w = SortedSetWalkRef[Index, WakuMessage].init(driver.items)
res = w.last()
w.destroy()
if res.isErr():
return err("Not found")
return ok(res.value.key)
## --- Queue API ---
proc add*(
driver: QueueDriver, index: Index, msg: WakuMessage
): ArchiveDriverResult[void] =
## Add a message to the queue
##
## If we're at capacity, we will be removing, the oldest (first) item
if driver.contains(index):
trace "could not add item to store queue. Index already exists", index = index
return err("duplicate")
# TODO: the below delete block can be removed if we convert to circular buffer
if driver.items.len >= driver.capacity:
var
w = SortedSetWalkRef[Index, WakuMessage].init(driver.items)
firstItem = w.first
if cmp(index, firstItem.value.key) < 0:
# When at capacity, we won't add if message index is smaller (older) than our oldest item
w.destroy # Clean up walker
return err("too_old")
discard driver.items.delete(firstItem.value.key)
w.destroy # better to destroy walker after a delete operation
driver.items.insert(index).value.data = msg
return ok()
method put*(
driver: QueueDriver,
pubsubTopic: PubsubTopic,
message: WakuMessage,
digest: MessageDigest,
messageHash: WakuMessageHash,
receivedTime: Timestamp,
): Future[ArchiveDriverResult[void]] {.async.} =
let index = Index(
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
receiverTime: receivedTime,
digest: digest,
hash: messageHash,
)
return driver.add(index, message)
method getAllMessages*(
driver: QueueDriver
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
# TODO: Implement this message_store method
return err("interface method not implemented")
method existsTable*(
driver: QueueDriver, tableName: string
): Future[ArchiveDriverResult[bool]] {.async.} =
return err("interface method not implemented")
method getMessages*(
driver: QueueDriver,
contentTopic: seq[ContentTopic] = @[],
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
hashes: seq[WakuMessageHash] = @[],
maxPageSize = DefaultPageSize,
ascendingOrder = true,
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
let cursor = cursor.map(toIndex)
let matchesQuery: QueryFilterMatcher =
func (index: Index, msg: WakuMessage): bool =
if pubsubTopic.isSome() and index.pubsubTopic != pubsubTopic.get():
return false
if contentTopic.len > 0 and msg.contentTopic notin contentTopic:
return false
if startTime.isSome() and msg.timestamp < startTime.get():
return false
if endTime.isSome() and msg.timestamp > endTime.get():
return false
if hashes.len > 0 and index.hash notin hashes:
return false
return true
var pageRes: QueueDriverGetPageResult
try:
pageRes = driver.getPage(maxPageSize, ascendingOrder, cursor, matchesQuery)
except CatchableError, Exception:
return err(getCurrentExceptionMsg())
if pageRes.isErr():
return err($pageRes.error)
return ok(pageRes.value)
method getMessagesCount*(
driver: QueueDriver
): Future[ArchiveDriverResult[int64]] {.async.} =
return ok(int64(driver.len()))
method getPagesCount*(
driver: QueueDriver
): Future[ArchiveDriverResult[int64]] {.async.} =
return ok(int64(driver.len()))
method getPagesSize*(
driver: QueueDriver
): Future[ArchiveDriverResult[int64]] {.async.} =
return ok(int64(driver.len()))
method getDatabaseSize*(
driver: QueueDriver
): Future[ArchiveDriverResult[int64]] {.async.} =
return ok(int64(driver.len()))
method performVacuum*(
driver: QueueDriver
): Future[ArchiveDriverResult[void]] {.async.} =
return err("interface method not implemented")
method getOldestMessageTimestamp*(
driver: QueueDriver
): Future[ArchiveDriverResult[Timestamp]] {.async.} =
return driver.first().map(
proc(index: Index): Timestamp =
index.receiverTime
)
method getNewestMessageTimestamp*(
driver: QueueDriver
): Future[ArchiveDriverResult[Timestamp]] {.async.} =
return driver.last().map(
proc(index: Index): Timestamp =
index.receiverTime
)
method deleteMessagesOlderThanTimestamp*(
driver: QueueDriver, ts: Timestamp
): Future[ArchiveDriverResult[void]] {.async.} =
# TODO: Implement this message_store method
return err("interface method not implemented")
method deleteOldestMessagesNotWithinLimit*(
driver: QueueDriver, limit: int
): Future[ArchiveDriverResult[void]] {.async.} =
# TODO: Implement this message_store method
return err("interface method not implemented")
method decreaseDatabaseSize*(
driver: QueueDriver, targetSizeInBytes: int64, forceRemoval: bool = false
): Future[ArchiveDriverResult[void]] {.async.} =
return err("interface method not implemented")
method close*(driver: QueueDriver): Future[ArchiveDriverResult[void]] {.async.} =
return ok()