nim-datastore/datastore/memoryds.nim

150 lines
3.1 KiB
Nim
Raw Permalink Normal View History

2023-08-28 18:22:53 -07:00
import std/tables
2023-09-12 13:50:07 -06:00
import std/sharedtables
import std/sharedlist
2023-08-28 19:22:16 -07:00
import std/sequtils
import std/strutils
2023-08-28 21:32:28 -07:00
import std/algorithm
import std/locks
import std/atomics
2023-08-28 18:22:53 -07:00
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import pkg/upraises
import ./key
import ./query
import ./datastore
export key, query
push: {.upraises: [].}
type
MemoryDatastore* = ref object of Datastore
2023-09-12 13:50:07 -06:00
store*: SharedTable[Key, seq[byte]]
keys*: SharedList[Key]
lock: Lock # yes, we need the lock since we need to update both the table and the list :facepalm:
template withLock(lock: Lock, body: untyped) =
try:
lock.acquire()
body
finally:
lock.release()
2023-08-28 18:22:53 -07:00
method has*(
2023-09-11 14:48:53 -06:00
self: MemoryDatastore,
key: Key): Future[?!bool] {.async.} =
let
keys = toSeq(self.keys)
for k in keys:
if k == key:
return success true
2023-08-28 18:22:53 -07:00
return success false
2023-08-28 18:22:53 -07:00
method delete*(
2023-09-08 15:09:15 -06:00
self: MemoryDatastore,
key: Key): Future[?!void] {.async.} =
2023-08-28 18:22:53 -07:00
withLock(self.lock):
self.keys.iterAndMutate(proc(k: Key): bool = k == key)
self.store.del(key)
2023-08-28 18:37:14 -07:00
return success()
2023-08-28 18:22:53 -07:00
method delete*(
self: MemoryDatastore,
keys: seq[Key]): Future[?!void] {.async.} =
for key in keys:
if err =? (await self.delete(key)).errorOption:
return failure err
return success()
method get*(
2023-09-08 15:09:15 -06:00
self: MemoryDatastore,
key: Key): Future[?!seq[byte]] {.async.} =
2023-08-28 18:37:14 -07:00
withLock(self.lock):
let
has = (await self.has(key))
if has.isOk and has.get:
return self.store.mget(key).catch()
return failure (ref DatastoreError)(msg: "not found")
2023-08-28 18:22:53 -07:00
method put*(
2023-09-08 15:09:15 -06:00
self: MemoryDatastore,
key: Key,
data: seq[byte]): Future[?!void] {.async.} =
2023-08-28 18:22:53 -07:00
withLock(self.lock):
if not self.store.hasKeyOrPut(key, data):
self.keys.add(key)
else:
self.store[key] = data
2023-08-28 18:22:53 -07:00
method put*(
self: MemoryDatastore,
batch: seq[BatchEntry]): Future[?!void] {.async.} =
for entry in batch:
if err =? (await self.put(entry.key, entry.data)).errorOption:
return failure err
return success()
method query*(
self: MemoryDatastore,
query: Query,
): Future[?!QueryIter] {.async.} =
2023-08-28 19:22:16 -07:00
let
queryKey = query.key.id()
keys = toSeq(self.keys)
2023-08-28 19:22:16 -07:00
var
iter = QueryIter.new()
pos = 0
2023-08-28 19:22:16 -07:00
proc next(): Future[?!QueryResponse] {.async.} =
defer:
pos.inc
2023-08-28 21:32:28 -07:00
if iter.finished:
return failure (ref QueryEndedError)(msg: "Calling next on a finished query!")
2023-08-28 19:22:16 -07:00
if pos > keys.len - 1:
iter.finished = true
return success (Key.none, EmptyBytes)
2023-08-28 19:22:16 -07:00
return success (
Key.init(keys[pos]).expect("Should not fail!").some,
if query.value: self.store.mget(keys[pos]) else: EmptyBytes)
2023-08-28 19:22:16 -07:00
iter.next = next
return success iter
2023-08-28 21:33:52 -07:00
2023-08-28 18:22:53 -07:00
method close*(self: MemoryDatastore): Future[?!void] {.async.} =
2023-09-12 13:50:07 -06:00
self.store.deinitSharedTable()
self.keys.deinitSharedList()
self.lock.deinitLock()
2023-08-28 18:22:53 -07:00
return success()
2023-09-12 13:50:07 -06:00
proc new*(tp: type MemoryDatastore): MemoryDatastore =
var
table: SharedTable[Key, seq[byte]]
keys: SharedList[Key]
lock: Lock
2023-09-12 13:50:07 -06:00
table.init()
keys.init()
lock.initLock()
MemoryDatastore(store: table, keys: keys, lock: lock)