diff --git a/datastore.nim b/datastore.nim index 9e42c47..6d43a20 100644 --- a/datastore.nim +++ b/datastore.nim @@ -3,6 +3,5 @@ import ./datastore/fsds import ./datastore/sql import ./datastore/mountedds import ./datastore/tieredds -import ./datastore/threads/threadproxyds -export datastore, fsds, mountedds, tieredds, sql, threadproxyds +export datastore, fsds, mountedds, tieredds, sql diff --git a/datastore/sql.nim b/datastore/sql.nim index e5fad22..55a23e0 100644 --- a/datastore/sql.nim +++ b/datastore/sql.nim @@ -9,239 +9,239 @@ from pkg/stew/results as stewResults import isErr import pkg/upraises import ../datastore -import ./sqlitedsdb +# import ./sqlitedsdb -export datastore, sqlitedsdb +export datastore push: {.upraises: [].} -type - SQLiteDatastore* = ref object of Datastore - readOnly: bool - db: SQLiteDsDb +# type +# SQLiteDatastore* = ref object of Datastore +# readOnly: bool +# db: SQLiteDsDb -proc path*(self: SQLiteDatastore): string = - self.db.dbPath +# proc path*(self: SQLiteDatastore): string = +# self.db.dbPath -proc `readOnly=`*(self: SQLiteDatastore): bool - {.error: "readOnly should not be assigned".} +# proc `readOnly=`*(self: SQLiteDatastore): bool +# {.error: "readOnly should not be assigned".} -proc timestamp*(t = epochTime()): int64 = - (t * 1_000_000).int64 +# proc timestamp*(t = epochTime()): int64 = +# (t * 1_000_000).int64 -method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} = - var - exists = false +# method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} = +# var +# exists = false - proc onData(s: RawStmtPtr) = - exists = sqlite3_column_int64(s, ContainsStmtExistsCol.cint).bool +# proc onData(s: RawStmtPtr) = +# exists = sqlite3_column_int64(s, ContainsStmtExistsCol.cint).bool - if err =? self.db.containsStmt.query((key.id), onData).errorOption: - return failure err +# if err =? self.db.containsStmt.query((key.id), onData).errorOption: +# return failure err - return success exists +# return success exists -method delete*(self: SQLiteDatastore, key: Key): Future[?!void] {.async.} = - return self.db.deleteStmt.exec((key.id)) +# method delete*(self: SQLiteDatastore, key: Key): Future[?!void] {.async.} = +# return self.db.deleteStmt.exec((key.id)) -method delete*(self: SQLiteDatastore, keys: seq[Key]): Future[?!void] {.async.} = - if err =? self.db.beginStmt.exec().errorOption: - return failure(err) +# method delete*(self: SQLiteDatastore, keys: seq[Key]): Future[?!void] {.async.} = +# if err =? self.db.beginStmt.exec().errorOption: +# return failure(err) - for key in keys: - if err =? self.db.deleteStmt.exec((key.id)).errorOption: - if err =? self.db.rollbackStmt.exec().errorOption: - return failure err.msg +# for key in keys: +# if err =? self.db.deleteStmt.exec((key.id)).errorOption: +# if err =? self.db.rollbackStmt.exec().errorOption: +# return failure err.msg - return failure err.msg +# return failure err.msg - if err =? self.db.endStmt.exec().errorOption: - return failure err.msg +# if err =? self.db.endStmt.exec().errorOption: +# return failure err.msg - return success() +# return success() -method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} = - # see comment in ./filesystem_datastore re: finer control of memory - # allocation in `method get`, could apply here as well if bytes were read - # incrementally with `sqlite3_blob_read` +# method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} = +# # see comment in ./filesystem_datastore re: finer control of memory +# # allocation in `method get`, could apply here as well if bytes were read +# # incrementally with `sqlite3_blob_read` - var - bytes: seq[byte] +# var +# bytes: seq[byte] - proc onData(s: RawStmtPtr) = - bytes = self.db.getDataCol() +# proc onData(s: RawStmtPtr) = +# bytes = self.db.getDataCol() - if err =? self.db.getStmt.query((key.id), onData).errorOption: - return failure(err) +# if err =? self.db.getStmt.query((key.id), onData).errorOption: +# return failure(err) - if bytes.len <= 0: - return failure( - newException(DatastoreKeyNotFound, "Key doesn't exist")) +# if bytes.len <= 0: +# return failure( +# newException(DatastoreKeyNotFound, "Key doesn't exist")) - return success bytes +# return success bytes -method put*(self: SQLiteDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} = - return self.db.putStmt.exec((key.id, data, timestamp())) +# method put*(self: SQLiteDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} = +# return self.db.putStmt.exec((key.id, data, timestamp())) -method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.async.} = - if err =? self.db.beginStmt.exec().errorOption: - return failure err +# method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.async.} = +# if err =? self.db.beginStmt.exec().errorOption: +# return failure err - for entry in batch: - if err =? self.db.putStmt.exec((entry.key.id, entry.data, timestamp())).errorOption: - if err =? self.db.rollbackStmt.exec().errorOption: - return failure err +# for entry in batch: +# if err =? self.db.putStmt.exec((entry.key.id, entry.data, timestamp())).errorOption: +# if err =? self.db.rollbackStmt.exec().errorOption: +# return failure err - return failure err +# return failure err - if err =? self.db.endStmt.exec().errorOption: - return failure err +# if err =? self.db.endStmt.exec().errorOption: +# return failure err - return success() +# return success() -method close*(self: SQLiteDatastore): Future[?!void] {.async.} = - self.db.close() +# method close*(self: SQLiteDatastore): Future[?!void] {.async.} = +# self.db.close() - return success() +# return success() -method query*( - self: SQLiteDatastore, - query: Query): Future[?!QueryIter] {.async.} = +# method query*( +# self: SQLiteDatastore, +# query: Query): Future[?!QueryIter] {.async.} = - var - iter = QueryIter() - queryStr = if query.value: - QueryStmtDataIdStr - else: - QueryStmtIdStr +# var +# iter = QueryIter() +# queryStr = if query.value: +# QueryStmtDataIdStr +# else: +# QueryStmtIdStr - if query.sort == SortOrder.Descending: - queryStr &= QueryStmtOrderDescending - else: - queryStr &= QueryStmtOrderAscending +# if query.sort == SortOrder.Descending: +# queryStr &= QueryStmtOrderDescending +# else: +# queryStr &= QueryStmtOrderAscending - if query.limit != 0: - queryStr &= QueryStmtLimit +# if query.limit != 0: +# queryStr &= QueryStmtLimit - if query.offset != 0: - queryStr &= QueryStmtOffset +# if query.offset != 0: +# queryStr &= QueryStmtOffset - let - queryStmt = QueryStmt.prepare( - self.db.env, queryStr).expect("should not fail") +# let +# queryStmt = QueryStmt.prepare( +# self.db.env, queryStr).expect("should not fail") - s = RawStmtPtr(queryStmt) +# s = RawStmtPtr(queryStmt) - var - v = sqlite3_bind_text( - s, 1.cint, (query.key.id & "*").cstring, -1.cint, SQLITE_TRANSIENT_GCSAFE) +# var +# v = sqlite3_bind_text( +# s, 1.cint, (query.key.id & "*").cstring, -1.cint, SQLITE_TRANSIENT_GCSAFE) - if not (v == SQLITE_OK): - return failure newException(DatastoreError, $sqlite3_errstr(v)) +# if not (v == SQLITE_OK): +# return failure newException(DatastoreError, $sqlite3_errstr(v)) - if query.limit != 0: - v = sqlite3_bind_int(s, 2.cint, query.limit.cint) +# if query.limit != 0: +# v = sqlite3_bind_int(s, 2.cint, query.limit.cint) - if not (v == SQLITE_OK): - return failure newException(DatastoreError, $sqlite3_errstr(v)) +# if not (v == SQLITE_OK): +# return failure newException(DatastoreError, $sqlite3_errstr(v)) - if query.offset != 0: - v = sqlite3_bind_int(s, 3.cint, query.offset.cint) +# if query.offset != 0: +# v = sqlite3_bind_int(s, 3.cint, query.offset.cint) - if not (v == SQLITE_OK): - return failure newException(DatastoreError, $sqlite3_errstr(v)) +# if not (v == SQLITE_OK): +# return failure newException(DatastoreError, $sqlite3_errstr(v)) - let lock = newAsyncLock() - proc next(): Future[?!QueryResponse] {.async.} = - defer: - if lock.locked: - lock.release() +# let lock = newAsyncLock() +# proc next(): Future[?!QueryResponse] {.async.} = +# defer: +# if lock.locked: +# lock.release() - if lock.locked: - return failure (ref DatastoreError)(msg: "Should always await query features") +# if lock.locked: +# return failure (ref DatastoreError)(msg: "Should always await query features") - if iter.finished: - return failure((ref QueryEndedError)(msg: "Calling next on a finished query!")) +# if iter.finished: +# return failure((ref QueryEndedError)(msg: "Calling next on a finished query!")) - await lock.acquire() +# await lock.acquire() - let - v = sqlite3_step(s) +# let +# v = sqlite3_step(s) - case v - of SQLITE_ROW: - let - key = Key.init( - $sqlite3_column_text_not_null(s, QueryStmtIdCol)) - .expect("should not fail") +# case v +# of SQLITE_ROW: +# let +# key = Key.init( +# $sqlite3_column_text_not_null(s, QueryStmtIdCol)) +# .expect("should not fail") - blob: ?pointer = - if query.value: - sqlite3_column_blob(s, QueryStmtDataCol).some - else: - pointer.none +# blob: ?pointer = +# if query.value: +# sqlite3_column_blob(s, QueryStmtDataCol).some +# else: +# pointer.none - # detect out-of-memory error - # see the conversion table and final paragraph of: - # https://www.sqlite.org/c3ref/column_blob.html - # see also https://www.sqlite.org/rescode.html +# # detect out-of-memory error +# # see the conversion table and final paragraph of: +# # https://www.sqlite.org/c3ref/column_blob.html +# # see also https://www.sqlite.org/rescode.html - # the "data" column can be NULL so in order to detect an out-of-memory - # error it is necessary to check that the result is a null pointer and - # that the result code is an error code - if blob.isSome and blob.get().isNil: - let - v = sqlite3_errcode(sqlite3_db_handle(s)) +# # the "data" column can be NULL so in order to detect an out-of-memory +# # error it is necessary to check that the result is a null pointer and +# # that the result code is an error code +# if blob.isSome and blob.get().isNil: +# let +# v = sqlite3_errcode(sqlite3_db_handle(s)) - if not (v in [SQLITE_OK, SQLITE_ROW, SQLITE_DONE]): - iter.finished = true - return failure newException(DatastoreError, $sqlite3_errstr(v)) +# if not (v in [SQLITE_OK, SQLITE_ROW, SQLITE_DONE]): +# iter.finished = true +# return failure newException(DatastoreError, $sqlite3_errstr(v)) - let - dataLen = sqlite3_column_bytes(s, QueryStmtDataCol) - data = if blob.isSome: - @( - toOpenArray(cast[ptr UncheckedArray[byte]](blob.get), - 0, - dataLen - 1)) - else: - @[] +# let +# dataLen = sqlite3_column_bytes(s, QueryStmtDataCol) +# data = if blob.isSome: +# @( +# toOpenArray(cast[ptr UncheckedArray[byte]](blob.get), +# 0, +# dataLen - 1)) +# else: +# @[] - return success (key.some, data) - of SQLITE_DONE: - iter.finished = true - return success (Key.none, EmptyBytes) - else: - iter.finished = true - return failure newException(DatastoreError, $sqlite3_errstr(v)) +# return success (key.some, data) +# of SQLITE_DONE: +# iter.finished = true +# return success (Key.none, EmptyBytes) +# else: +# iter.finished = true +# return failure newException(DatastoreError, $sqlite3_errstr(v)) - iter.dispose = proc(): Future[?!void] {.async.} = - discard sqlite3_reset(s) - discard sqlite3_clear_bindings(s) - s.dispose - return success() +# iter.dispose = proc(): Future[?!void] {.async.} = +# discard sqlite3_reset(s) +# discard sqlite3_clear_bindings(s) +# s.dispose +# return success() - iter.next = next - return success iter +# iter.next = next +# return success iter -proc new*( - T: type SQLiteDatastore, - path: string, - readOnly = false): ?!T = +# proc new*( +# T: type SQLiteDatastore, +# path: string, +# readOnly = false): ?!T = - let - flags = - if readOnly: SQLITE_OPEN_READONLY - else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE +# let +# flags = +# if readOnly: SQLITE_OPEN_READONLY +# else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE - success T( - db: ? SQLiteDsDb.open(path, flags), - readOnly: readOnly) +# success T( +# db: ? SQLiteDsDb.open(path, flags), +# readOnly: readOnly) -proc new*( - T: type SQLiteDatastore, - db: SQLiteDsDb): ?!T = +# proc new*( +# T: type SQLiteDatastore, +# db: SQLiteDsDb): ?!T = - success T( - db: db, - readOnly: db.readOnly) +# success T( +# db: db, +# readOnly: db.readOnly) diff --git a/datastore/sql/sqliteds.nim b/datastore/sql/sqliteds.nim index b2f6fc3..a823485 100644 --- a/datastore/sql/sqliteds.nim +++ b/datastore/sql/sqliteds.nim @@ -69,7 +69,7 @@ proc get*(self: SQLiteDatastore, key: DbKey): ?!seq[byte] = proc onData(s: RawStmtPtr) = bytes = self.db.getDataCol() - if err =? self.db.getStmt.query((key.id), onData).errorOption: + if err =? self.db.getStmt.query((key), onData).errorOption: return failure(err) if bytes.len <= 0: @@ -80,7 +80,7 @@ proc get*(self: SQLiteDatastore, key: DbKey): ?!seq[byte] = proc put*(self: SQLiteDatastore, key: DbKey, data: DbVal): ?!void = when DbVal is seq[byte]: - return self.db.putStmt.exec((key.id, data, timestamp())) + return self.db.putStmt.exec((key, data, timestamp())) elif DbVal is DataBuffer: return self.db.putBufferStmt.exec((key.id, data, timestamp())) else: @@ -199,7 +199,10 @@ proc query*(self: SQLiteDatastore, discard sqlite3_clear_bindings(s) s.dispose() return - + + +proc contains*(self: SQLiteDatastore, key: DbKey): bool = + return self.has(key) proc new*(T: type SQLiteDatastore, diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 31a65ba..2a92056 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -23,6 +23,7 @@ import pkg/chronicles import ../key import ../query import ../datastore +import ../backend import ./asyncsemaphore import ./databuffer @@ -427,8 +428,8 @@ method query*( iter.finished = childIter.finished var - res = ThreadResult[ThreadQueryRes]() - ctx = TaskCtx[ThreadQueryRes]( + res = ThreadResult[DbQueryResponse]() + ctx = TaskCtx[DbQueryResponse]( ds: self.ds, res: addr res) diff --git a/tests/datastore/sql/testsqliteds.nim b/tests/datastore/sql/testsqliteds.nim index c629eb0..6e4b585 100644 --- a/tests/datastore/sql/testsqliteds.nim +++ b/tests/datastore/sql/testsqliteds.nim @@ -3,87 +3,75 @@ import std/os import std/sequtils from std/algorithm import sort, reversed -import pkg/asynctest +import pkg/unittest2 import pkg/chronos import pkg/stew/results import pkg/stew/byteutils import pkg/datastore/sql/sqliteds +import pkg/datastore/key import ../dscommontests import ../querycommontests suite "Test Basic SQLiteDatastore": + let ds = SQLiteDatastore.new(Memory).tryGet() - key = Key.init("a:b/c/d:e").tryGet() + key = Key.init("a:b/c/d:e").tryGet().id() bytes = "some bytes".toBytes otherBytes = "some other bytes".toBytes - teardownAll: - (await ds.close()).tryGet() - - basicStoreTests(ds, key, bytes, otherBytes) - -suite "Test Read Only SQLiteDatastore": - let - path = currentSourcePath() # get this file's name - basePath = "tests_data" - basePathAbs = path.parentDir / basePath - filename = "test_store" & DbExt - dbPathAbs = basePathAbs / filename - key = Key.init("a:b/c/d:e").tryGet() - bytes = "some bytes".toBytes - - var - dsDb: SQLiteDatastore - readOnlyDb: SQLiteDatastore - - setupAll: - removeDir(basePathAbs) - require(not dirExists(basePathAbs)) - createDir(basePathAbs) - - dsDb = SQLiteDatastore.new(path = dbPathAbs).tryGet() - readOnlyDb = SQLiteDatastore.new(path = dbPathAbs, readOnly = true).tryGet() - - teardownAll: - (await dsDb.close()).tryGet() - (await readOnlyDb.close()).tryGet() - - removeDir(basePathAbs) - require(not dirExists(basePathAbs)) + teardown: + ds.close().tryGet() test "put": - check: - (await readOnlyDb.put(key, bytes)).isErr - - (await dsDb.put(key, bytes)).tryGet() + ds.put(key, bytes).tryGet() test "get": check: - (await readOnlyDb.get(key)).tryGet() == bytes - (await dsDb.get(key)).tryGet() == bytes + ds.get(key).tryGet() == bytes + + test "put update": + ds.put(key, otherBytes).tryGet() + + test "get updated": + check: + ds.get(key).tryGet() == otherBytes test "delete": - check: - (await readOnlyDb.delete(key)).isErr - - (await dsDb.delete(key)).tryGet() + ds.delete(key).tryGet() test "contains": check: - not (await readOnlyDb.has(key)).tryGet() - not (await dsDb.has(key)).tryGet() + not await (key in ds) -suite "Test Query": - var - ds: SQLiteDatastore + test "put batch": + var + batch: seq[BatchEntry] - setup: - ds = SQLiteDatastore.new(Memory).tryGet() + for k in 0..<100: + batch.add((Key.init(key.id, $k).tryGet, @[k.byte])) - teardown: - (await ds.close()).tryGet + ds.put(batch).tryGet - queryTests(ds) + for k in batch: + check: ds.has(k.key).tryGet + + test "delete batch": + var + batch: seq[Key] + + for k in 0..<100: + batch.add(Key.init(key.id, $k).tryGet) + + ds.delete(batch).tryGet + + for k in batch: + check: not ds.has(k).tryGet + + test "handle missing key": + let key = Key.init("/missing/key").tryGet() + + expect(DatastoreKeyNotFound): + discard ds.get(key).tryGet() # non existing key