This commit is contained in:
Jaremy Creechley 2023-09-20 22:12:53 -07:00
parent 61bdc6b88a
commit 3b66afba6e
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
5 changed files with 226 additions and 235 deletions

View File

@ -3,6 +3,5 @@ import ./datastore/fsds
import ./datastore/sql
import ./datastore/mountedds
import ./datastore/tieredds
import ./datastore/threads/threadproxyds
export datastore, fsds, mountedds, tieredds, sql, threadproxyds
export datastore, fsds, mountedds, tieredds, sql

View File

@ -9,239 +9,239 @@ from pkg/stew/results as stewResults import isErr
import pkg/upraises
import ../datastore
import ./sqlitedsdb
# import ./sqlitedsdb
export datastore, sqlitedsdb
export datastore
push: {.upraises: [].}
type
SQLiteDatastore* = ref object of Datastore
readOnly: bool
db: SQLiteDsDb
# type
# SQLiteDatastore* = ref object of Datastore
# readOnly: bool
# db: SQLiteDsDb
proc path*(self: SQLiteDatastore): string =
self.db.dbPath
# proc path*(self: SQLiteDatastore): string =
# self.db.dbPath
proc `readOnly=`*(self: SQLiteDatastore): bool
{.error: "readOnly should not be assigned".}
# proc `readOnly=`*(self: SQLiteDatastore): bool
# {.error: "readOnly should not be assigned".}
proc timestamp*(t = epochTime()): int64 =
(t * 1_000_000).int64
# proc timestamp*(t = epochTime()): int64 =
# (t * 1_000_000).int64
method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} =
var
exists = false
# method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} =
# var
# exists = false
proc onData(s: RawStmtPtr) =
exists = sqlite3_column_int64(s, ContainsStmtExistsCol.cint).bool
# proc onData(s: RawStmtPtr) =
# exists = sqlite3_column_int64(s, ContainsStmtExistsCol.cint).bool
if err =? self.db.containsStmt.query((key.id), onData).errorOption:
return failure err
# if err =? self.db.containsStmt.query((key.id), onData).errorOption:
# return failure err
return success exists
# return success exists
method delete*(self: SQLiteDatastore, key: Key): Future[?!void] {.async.} =
return self.db.deleteStmt.exec((key.id))
# method delete*(self: SQLiteDatastore, key: Key): Future[?!void] {.async.} =
# return self.db.deleteStmt.exec((key.id))
method delete*(self: SQLiteDatastore, keys: seq[Key]): Future[?!void] {.async.} =
if err =? self.db.beginStmt.exec().errorOption:
return failure(err)
# method delete*(self: SQLiteDatastore, keys: seq[Key]): Future[?!void] {.async.} =
# if err =? self.db.beginStmt.exec().errorOption:
# return failure(err)
for key in keys:
if err =? self.db.deleteStmt.exec((key.id)).errorOption:
if err =? self.db.rollbackStmt.exec().errorOption:
return failure err.msg
# for key in keys:
# if err =? self.db.deleteStmt.exec((key.id)).errorOption:
# if err =? self.db.rollbackStmt.exec().errorOption:
# return failure err.msg
return failure err.msg
# return failure err.msg
if err =? self.db.endStmt.exec().errorOption:
return failure err.msg
# if err =? self.db.endStmt.exec().errorOption:
# return failure err.msg
return success()
# return success()
method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} =
# see comment in ./filesystem_datastore re: finer control of memory
# allocation in `method get`, could apply here as well if bytes were read
# incrementally with `sqlite3_blob_read`
# method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} =
# # see comment in ./filesystem_datastore re: finer control of memory
# # allocation in `method get`, could apply here as well if bytes were read
# # incrementally with `sqlite3_blob_read`
var
bytes: seq[byte]
# var
# bytes: seq[byte]
proc onData(s: RawStmtPtr) =
bytes = self.db.getDataCol()
# proc onData(s: RawStmtPtr) =
# bytes = self.db.getDataCol()
if err =? self.db.getStmt.query((key.id), onData).errorOption:
return failure(err)
# if err =? self.db.getStmt.query((key.id), onData).errorOption:
# return failure(err)
if bytes.len <= 0:
return failure(
newException(DatastoreKeyNotFound, "Key doesn't exist"))
# if bytes.len <= 0:
# return failure(
# newException(DatastoreKeyNotFound, "Key doesn't exist"))
return success bytes
# return success bytes
method put*(self: SQLiteDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} =
return self.db.putStmt.exec((key.id, data, timestamp()))
# method put*(self: SQLiteDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} =
# return self.db.putStmt.exec((key.id, data, timestamp()))
method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.async.} =
if err =? self.db.beginStmt.exec().errorOption:
return failure err
# method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.async.} =
# if err =? self.db.beginStmt.exec().errorOption:
# return failure err
for entry in batch:
if err =? self.db.putStmt.exec((entry.key.id, entry.data, timestamp())).errorOption:
if err =? self.db.rollbackStmt.exec().errorOption:
return failure err
# for entry in batch:
# if err =? self.db.putStmt.exec((entry.key.id, entry.data, timestamp())).errorOption:
# if err =? self.db.rollbackStmt.exec().errorOption:
# return failure err
return failure err
# return failure err
if err =? self.db.endStmt.exec().errorOption:
return failure err
# if err =? self.db.endStmt.exec().errorOption:
# return failure err
return success()
# return success()
method close*(self: SQLiteDatastore): Future[?!void] {.async.} =
self.db.close()
# method close*(self: SQLiteDatastore): Future[?!void] {.async.} =
# self.db.close()
return success()
# return success()
method query*(
self: SQLiteDatastore,
query: Query): Future[?!QueryIter] {.async.} =
# method query*(
# self: SQLiteDatastore,
# query: Query): Future[?!QueryIter] {.async.} =
var
iter = QueryIter()
queryStr = if query.value:
QueryStmtDataIdStr
else:
QueryStmtIdStr
# var
# iter = QueryIter()
# queryStr = if query.value:
# QueryStmtDataIdStr
# else:
# QueryStmtIdStr
if query.sort == SortOrder.Descending:
queryStr &= QueryStmtOrderDescending
else:
queryStr &= QueryStmtOrderAscending
# if query.sort == SortOrder.Descending:
# queryStr &= QueryStmtOrderDescending
# else:
# queryStr &= QueryStmtOrderAscending
if query.limit != 0:
queryStr &= QueryStmtLimit
# if query.limit != 0:
# queryStr &= QueryStmtLimit
if query.offset != 0:
queryStr &= QueryStmtOffset
# if query.offset != 0:
# queryStr &= QueryStmtOffset
let
queryStmt = QueryStmt.prepare(
self.db.env, queryStr).expect("should not fail")
# let
# queryStmt = QueryStmt.prepare(
# self.db.env, queryStr).expect("should not fail")
s = RawStmtPtr(queryStmt)
# s = RawStmtPtr(queryStmt)
var
v = sqlite3_bind_text(
s, 1.cint, (query.key.id & "*").cstring, -1.cint, SQLITE_TRANSIENT_GCSAFE)
# var
# v = sqlite3_bind_text(
# s, 1.cint, (query.key.id & "*").cstring, -1.cint, SQLITE_TRANSIENT_GCSAFE)
if not (v == SQLITE_OK):
return failure newException(DatastoreError, $sqlite3_errstr(v))
# if not (v == SQLITE_OK):
# return failure newException(DatastoreError, $sqlite3_errstr(v))
if query.limit != 0:
v = sqlite3_bind_int(s, 2.cint, query.limit.cint)
# if query.limit != 0:
# v = sqlite3_bind_int(s, 2.cint, query.limit.cint)
if not (v == SQLITE_OK):
return failure newException(DatastoreError, $sqlite3_errstr(v))
# if not (v == SQLITE_OK):
# return failure newException(DatastoreError, $sqlite3_errstr(v))
if query.offset != 0:
v = sqlite3_bind_int(s, 3.cint, query.offset.cint)
# if query.offset != 0:
# v = sqlite3_bind_int(s, 3.cint, query.offset.cint)
if not (v == SQLITE_OK):
return failure newException(DatastoreError, $sqlite3_errstr(v))
# if not (v == SQLITE_OK):
# return failure newException(DatastoreError, $sqlite3_errstr(v))
let lock = newAsyncLock()
proc next(): Future[?!QueryResponse] {.async.} =
defer:
if lock.locked:
lock.release()
# let lock = newAsyncLock()
# proc next(): Future[?!QueryResponse] {.async.} =
# defer:
# if lock.locked:
# lock.release()
if lock.locked:
return failure (ref DatastoreError)(msg: "Should always await query features")
# if lock.locked:
# return failure (ref DatastoreError)(msg: "Should always await query features")
if iter.finished:
return failure((ref QueryEndedError)(msg: "Calling next on a finished query!"))
# if iter.finished:
# return failure((ref QueryEndedError)(msg: "Calling next on a finished query!"))
await lock.acquire()
# await lock.acquire()
let
v = sqlite3_step(s)
# let
# v = sqlite3_step(s)
case v
of SQLITE_ROW:
let
key = Key.init(
$sqlite3_column_text_not_null(s, QueryStmtIdCol))
.expect("should not fail")
# case v
# of SQLITE_ROW:
# let
# key = Key.init(
# $sqlite3_column_text_not_null(s, QueryStmtIdCol))
# .expect("should not fail")
blob: ?pointer =
if query.value:
sqlite3_column_blob(s, QueryStmtDataCol).some
else:
pointer.none
# blob: ?pointer =
# if query.value:
# sqlite3_column_blob(s, QueryStmtDataCol).some
# else:
# pointer.none
# detect out-of-memory error
# see the conversion table and final paragraph of:
# https://www.sqlite.org/c3ref/column_blob.html
# see also https://www.sqlite.org/rescode.html
# # detect out-of-memory error
# # see the conversion table and final paragraph of:
# # https://www.sqlite.org/c3ref/column_blob.html
# # see also https://www.sqlite.org/rescode.html
# the "data" column can be NULL so in order to detect an out-of-memory
# error it is necessary to check that the result is a null pointer and
# that the result code is an error code
if blob.isSome and blob.get().isNil:
let
v = sqlite3_errcode(sqlite3_db_handle(s))
# # the "data" column can be NULL so in order to detect an out-of-memory
# # error it is necessary to check that the result is a null pointer and
# # that the result code is an error code
# if blob.isSome and blob.get().isNil:
# let
# v = sqlite3_errcode(sqlite3_db_handle(s))
if not (v in [SQLITE_OK, SQLITE_ROW, SQLITE_DONE]):
iter.finished = true
return failure newException(DatastoreError, $sqlite3_errstr(v))
# if not (v in [SQLITE_OK, SQLITE_ROW, SQLITE_DONE]):
# iter.finished = true
# return failure newException(DatastoreError, $sqlite3_errstr(v))
let
dataLen = sqlite3_column_bytes(s, QueryStmtDataCol)
data = if blob.isSome:
@(
toOpenArray(cast[ptr UncheckedArray[byte]](blob.get),
0,
dataLen - 1))
else:
@[]
# let
# dataLen = sqlite3_column_bytes(s, QueryStmtDataCol)
# data = if blob.isSome:
# @(
# toOpenArray(cast[ptr UncheckedArray[byte]](blob.get),
# 0,
# dataLen - 1))
# else:
# @[]
return success (key.some, data)
of SQLITE_DONE:
iter.finished = true
return success (Key.none, EmptyBytes)
else:
iter.finished = true
return failure newException(DatastoreError, $sqlite3_errstr(v))
# return success (key.some, data)
# of SQLITE_DONE:
# iter.finished = true
# return success (Key.none, EmptyBytes)
# else:
# iter.finished = true
# return failure newException(DatastoreError, $sqlite3_errstr(v))
iter.dispose = proc(): Future[?!void] {.async.} =
discard sqlite3_reset(s)
discard sqlite3_clear_bindings(s)
s.dispose
return success()
# iter.dispose = proc(): Future[?!void] {.async.} =
# discard sqlite3_reset(s)
# discard sqlite3_clear_bindings(s)
# s.dispose
# return success()
iter.next = next
return success iter
# iter.next = next
# return success iter
proc new*(
T: type SQLiteDatastore,
path: string,
readOnly = false): ?!T =
# proc new*(
# T: type SQLiteDatastore,
# path: string,
# readOnly = false): ?!T =
let
flags =
if readOnly: SQLITE_OPEN_READONLY
else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE
# let
# flags =
# if readOnly: SQLITE_OPEN_READONLY
# else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE
success T(
db: ? SQLiteDsDb.open(path, flags),
readOnly: readOnly)
# success T(
# db: ? SQLiteDsDb.open(path, flags),
# readOnly: readOnly)
proc new*(
T: type SQLiteDatastore,
db: SQLiteDsDb): ?!T =
# proc new*(
# T: type SQLiteDatastore,
# db: SQLiteDsDb): ?!T =
success T(
db: db,
readOnly: db.readOnly)
# success T(
# db: db,
# readOnly: db.readOnly)

View File

@ -69,7 +69,7 @@ proc get*(self: SQLiteDatastore, key: DbKey): ?!seq[byte] =
proc onData(s: RawStmtPtr) =
bytes = self.db.getDataCol()
if err =? self.db.getStmt.query((key.id), onData).errorOption:
if err =? self.db.getStmt.query((key), onData).errorOption:
return failure(err)
if bytes.len <= 0:
@ -80,7 +80,7 @@ proc get*(self: SQLiteDatastore, key: DbKey): ?!seq[byte] =
proc put*(self: SQLiteDatastore, key: DbKey, data: DbVal): ?!void =
when DbVal is seq[byte]:
return self.db.putStmt.exec((key.id, data, timestamp()))
return self.db.putStmt.exec((key, data, timestamp()))
elif DbVal is DataBuffer:
return self.db.putBufferStmt.exec((key.id, data, timestamp()))
else:
@ -199,7 +199,10 @@ proc query*(self: SQLiteDatastore,
discard sqlite3_clear_bindings(s)
s.dispose()
return
proc contains*(self: SQLiteDatastore, key: DbKey): bool =
return self.has(key)
proc new*(T: type SQLiteDatastore,

View File

@ -23,6 +23,7 @@ import pkg/chronicles
import ../key
import ../query
import ../datastore
import ../backend
import ./asyncsemaphore
import ./databuffer
@ -427,8 +428,8 @@ method query*(
iter.finished = childIter.finished
var
res = ThreadResult[ThreadQueryRes]()
ctx = TaskCtx[ThreadQueryRes](
res = ThreadResult[DbQueryResponse]()
ctx = TaskCtx[DbQueryResponse](
ds: self.ds,
res: addr res)

View File

@ -3,87 +3,75 @@ import std/os
import std/sequtils
from std/algorithm import sort, reversed
import pkg/asynctest
import pkg/unittest2
import pkg/chronos
import pkg/stew/results
import pkg/stew/byteutils
import pkg/datastore/sql/sqliteds
import pkg/datastore/key
import ../dscommontests
import ../querycommontests
suite "Test Basic SQLiteDatastore":
let
ds = SQLiteDatastore.new(Memory).tryGet()
key = Key.init("a:b/c/d:e").tryGet()
key = Key.init("a:b/c/d:e").tryGet().id()
bytes = "some bytes".toBytes
otherBytes = "some other bytes".toBytes
teardownAll:
(await ds.close()).tryGet()
basicStoreTests(ds, key, bytes, otherBytes)
suite "Test Read Only SQLiteDatastore":
let
path = currentSourcePath() # get this file's name
basePath = "tests_data"
basePathAbs = path.parentDir / basePath
filename = "test_store" & DbExt
dbPathAbs = basePathAbs / filename
key = Key.init("a:b/c/d:e").tryGet()
bytes = "some bytes".toBytes
var
dsDb: SQLiteDatastore
readOnlyDb: SQLiteDatastore
setupAll:
removeDir(basePathAbs)
require(not dirExists(basePathAbs))
createDir(basePathAbs)
dsDb = SQLiteDatastore.new(path = dbPathAbs).tryGet()
readOnlyDb = SQLiteDatastore.new(path = dbPathAbs, readOnly = true).tryGet()
teardownAll:
(await dsDb.close()).tryGet()
(await readOnlyDb.close()).tryGet()
removeDir(basePathAbs)
require(not dirExists(basePathAbs))
teardown:
ds.close().tryGet()
test "put":
check:
(await readOnlyDb.put(key, bytes)).isErr
(await dsDb.put(key, bytes)).tryGet()
ds.put(key, bytes).tryGet()
test "get":
check:
(await readOnlyDb.get(key)).tryGet() == bytes
(await dsDb.get(key)).tryGet() == bytes
ds.get(key).tryGet() == bytes
test "put update":
ds.put(key, otherBytes).tryGet()
test "get updated":
check:
ds.get(key).tryGet() == otherBytes
test "delete":
check:
(await readOnlyDb.delete(key)).isErr
(await dsDb.delete(key)).tryGet()
ds.delete(key).tryGet()
test "contains":
check:
not (await readOnlyDb.has(key)).tryGet()
not (await dsDb.has(key)).tryGet()
not await (key in ds)
suite "Test Query":
var
ds: SQLiteDatastore
test "put batch":
var
batch: seq[BatchEntry]
setup:
ds = SQLiteDatastore.new(Memory).tryGet()
for k in 0..<100:
batch.add((Key.init(key.id, $k).tryGet, @[k.byte]))
teardown:
(await ds.close()).tryGet
ds.put(batch).tryGet
queryTests(ds)
for k in batch:
check: ds.has(k.key).tryGet
test "delete batch":
var
batch: seq[Key]
for k in 0..<100:
batch.add(Key.init(key.id, $k).tryGet)
ds.delete(batch).tryGet
for k in batch:
check: not ds.has(k).tryGet
test "handle missing key":
let key = Key.init("/missing/key").tryGet()
expect(DatastoreKeyNotFound):
discard ds.get(key).tryGet() # non existing key