2023-09-21 17:52:06 -07:00

119 lines
2.8 KiB
Nim

import std/times
import std/options
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import pkg/sqlite3_abi
from pkg/stew/results as stewResults import isErr
import pkg/upraises
import std/sequtils
import ../datastore
import ./backend
import ./sql/sqliteds
export datastore
push: {.upraises: [].}
type
SQLiteDatastore* = ref object of Datastore
db: SQLiteBackend
proc path*(self: SQLiteDatastore): string =
self.db.path()
proc readOnly*(self: SQLiteDatastore): bool =
self.db.readOnly()
method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} =
return self.db.has(KeyId.new key.id())
method delete*(self: SQLiteDatastore, key: Key): Future[?!void] {.async.} =
return self.db.delete(KeyId.new key.id())
method delete*(self: SQLiteDatastore, keys: seq[Key]): Future[?!void] {.async.} =
let dkeys = keys.mapIt(KeyId.new it.id())
return self.db.delete(dkeys)
method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} =
self.db.get(KeyId.new key.id())
method put*(self: SQLiteDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} =
self.db.put(KeyId.new key.id(), DataBuffer.new data)
method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.async.} =
var dbatch: seq[tuple[key: string, data: seq[byte]]]
for entry in batch:
dbatch.add((entry.key.id(), entry.data))
self.db.put(dbatch)
method close*(self: SQLiteDatastore): Future[?!void] {.async.} =
self.db.close()
method query*(
self: SQLiteDatastore,
query: Query
): Future[?!QueryIter] {.async.} =
var iter = QueryIter()
let dbquery = DbQuery(
key: KeyId.new query.key.id(),
value: query.value,
limit: query.limit,
offset: query.offset,
sort: query.sort,
)
var queries = ? self.db.query(dbquery)
let lock = newAsyncLock()
proc next(): Future[?!QueryResponse] {.async.} =
defer:
if lock.locked:
lock.release()
if lock.locked:
return failure (ref DatastoreError)(msg: "Should always await query features")
if iter.finished:
return failure((ref QueryEndedError)(msg: "Calling next on a finished query!"))
await lock.acquire()
let res = queries()
iter.result = res
iter.finished = true
iter.dispose = proc(): Future[?!void] {.async.} =
discard sqlite3_reset(s)
discard sqlite3_clear_bindings(s)
s.dispose
return success()
iter.next = next
return success iter
proc new*(
T: type SQLiteDatastore,
path: string,
readOnly = false): ?!T =
let
flags =
if readOnly: SQLITE_OPEN_READONLY
else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE
success T(
db: ? SQLiteDsDb.open(path, flags),
readOnly: readOnly)
proc new*(
T: type SQLiteDatastore,
db: SQLiteDsDb): ?!T =
success T(
db: db,
readOnly: db.readOnly)