From 8356f6cd972d65d2765e48a2b052f3882723cc54 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Thu, 21 Sep 2023 17:52:06 -0700 Subject: [PATCH] re-add async frontend --- datastore/sql.nim | 285 ++++++++++--------------------------- datastore/sql/sqliteds.nim | 5 +- 2 files changed, 81 insertions(+), 209 deletions(-) diff --git a/datastore/sql.nim b/datastore/sql.nim index 55a23e0..7e13da7 100644 --- a/datastore/sql.nim +++ b/datastore/sql.nim @@ -8,240 +8,111 @@ import pkg/sqlite3_abi from pkg/stew/results as stewResults import isErr import pkg/upraises +import std/sequtils import ../datastore -# import ./sqlitedsdb +import ./backend +import ./sql/sqliteds export datastore push: {.upraises: [].} -# type -# SQLiteDatastore* = ref object of Datastore -# readOnly: bool -# db: SQLiteDsDb +type + SQLiteDatastore* = ref object of Datastore + db: SQLiteBackend -# proc path*(self: SQLiteDatastore): string = -# self.db.dbPath +proc path*(self: SQLiteDatastore): string = + self.db.path() -# proc `readOnly=`*(self: SQLiteDatastore): bool -# {.error: "readOnly should not be assigned".} +proc readOnly*(self: SQLiteDatastore): bool = + self.db.readOnly() -# proc timestamp*(t = epochTime()): int64 = -# (t * 1_000_000).int64 +method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} = + return self.db.has(KeyId.new key.id()) -# method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} = -# var -# exists = false +method delete*(self: SQLiteDatastore, key: Key): Future[?!void] {.async.} = + return self.db.delete(KeyId.new key.id()) -# proc onData(s: RawStmtPtr) = -# exists = sqlite3_column_int64(s, ContainsStmtExistsCol.cint).bool +method delete*(self: SQLiteDatastore, keys: seq[Key]): Future[?!void] {.async.} = + let dkeys = keys.mapIt(KeyId.new it.id()) + return self.db.delete(dkeys) -# if err =? self.db.containsStmt.query((key.id), onData).errorOption: -# return failure err +method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} = + self.db.get(KeyId.new key.id()) -# return success exists +method put*(self: SQLiteDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} = + self.db.put(KeyId.new key.id(), DataBuffer.new data) -# method delete*(self: SQLiteDatastore, key: Key): Future[?!void] {.async.} = -# return self.db.deleteStmt.exec((key.id)) +method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.async.} = + var dbatch: seq[tuple[key: string, data: seq[byte]]] + for entry in batch: + dbatch.add((entry.key.id(), entry.data)) + self.db.put(dbatch) -# method delete*(self: SQLiteDatastore, keys: seq[Key]): Future[?!void] {.async.} = -# if err =? self.db.beginStmt.exec().errorOption: -# return failure(err) +method close*(self: SQLiteDatastore): Future[?!void] {.async.} = + self.db.close() -# for key in keys: -# if err =? self.db.deleteStmt.exec((key.id)).errorOption: -# if err =? self.db.rollbackStmt.exec().errorOption: -# return failure err.msg +method query*( + self: SQLiteDatastore, + query: Query +): Future[?!QueryIter] {.async.} = -# return failure err.msg + var iter = QueryIter() + let dbquery = DbQuery( + key: KeyId.new query.key.id(), + value: query.value, + limit: query.limit, + offset: query.offset, + sort: query.sort, + ) + var queries = ? self.db.query(dbquery) -# if err =? self.db.endStmt.exec().errorOption: -# return failure err.msg + let lock = newAsyncLock() + proc next(): Future[?!QueryResponse] {.async.} = + defer: + if lock.locked: + lock.release() -# return success() + if lock.locked: + return failure (ref DatastoreError)(msg: "Should always await query features") -# method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} = -# # see comment in ./filesystem_datastore re: finer control of memory -# # allocation in `method get`, could apply here as well if bytes were read -# # incrementally with `sqlite3_blob_read` + if iter.finished: + return failure((ref QueryEndedError)(msg: "Calling next on a finished query!")) -# var -# bytes: seq[byte] + await lock.acquire() -# proc onData(s: RawStmtPtr) = -# bytes = self.db.getDataCol() + let res = queries() + iter.result = res -# if err =? self.db.getStmt.query((key.id), onData).errorOption: -# return failure(err) + iter.finished = true -# if bytes.len <= 0: -# return failure( -# newException(DatastoreKeyNotFound, "Key doesn't exist")) + iter.dispose = proc(): Future[?!void] {.async.} = + discard sqlite3_reset(s) + discard sqlite3_clear_bindings(s) + s.dispose + return success() -# return success bytes + iter.next = next + return success iter -# method put*(self: SQLiteDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} = -# return self.db.putStmt.exec((key.id, data, timestamp())) +proc new*( + T: type SQLiteDatastore, + path: string, + readOnly = false): ?!T = -# method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.async.} = -# if err =? self.db.beginStmt.exec().errorOption: -# return failure err + let + flags = + if readOnly: SQLITE_OPEN_READONLY + else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE -# for entry in batch: -# if err =? self.db.putStmt.exec((entry.key.id, entry.data, timestamp())).errorOption: -# if err =? self.db.rollbackStmt.exec().errorOption: -# return failure err + success T( + db: ? SQLiteDsDb.open(path, flags), + readOnly: readOnly) -# return failure err +proc new*( + T: type SQLiteDatastore, + db: SQLiteDsDb): ?!T = -# if err =? self.db.endStmt.exec().errorOption: -# return failure err - -# return success() - -# method close*(self: SQLiteDatastore): Future[?!void] {.async.} = -# self.db.close() - -# return success() - -# method query*( -# self: SQLiteDatastore, -# query: Query): Future[?!QueryIter] {.async.} = - -# var -# iter = QueryIter() -# queryStr = if query.value: -# QueryStmtDataIdStr -# else: -# QueryStmtIdStr - -# if query.sort == SortOrder.Descending: -# queryStr &= QueryStmtOrderDescending -# else: -# queryStr &= QueryStmtOrderAscending - -# if query.limit != 0: -# queryStr &= QueryStmtLimit - -# if query.offset != 0: -# queryStr &= QueryStmtOffset - -# let -# queryStmt = QueryStmt.prepare( -# self.db.env, queryStr).expect("should not fail") - -# s = RawStmtPtr(queryStmt) - -# var -# v = sqlite3_bind_text( -# s, 1.cint, (query.key.id & "*").cstring, -1.cint, SQLITE_TRANSIENT_GCSAFE) - -# if not (v == SQLITE_OK): -# return failure newException(DatastoreError, $sqlite3_errstr(v)) - -# if query.limit != 0: -# v = sqlite3_bind_int(s, 2.cint, query.limit.cint) - -# if not (v == SQLITE_OK): -# return failure newException(DatastoreError, $sqlite3_errstr(v)) - -# if query.offset != 0: -# v = sqlite3_bind_int(s, 3.cint, query.offset.cint) - -# if not (v == SQLITE_OK): -# return failure newException(DatastoreError, $sqlite3_errstr(v)) - -# let lock = newAsyncLock() -# proc next(): Future[?!QueryResponse] {.async.} = -# defer: -# if lock.locked: -# lock.release() - -# if lock.locked: -# return failure (ref DatastoreError)(msg: "Should always await query features") - -# if iter.finished: -# return failure((ref QueryEndedError)(msg: "Calling next on a finished query!")) - -# await lock.acquire() - -# let -# v = sqlite3_step(s) - -# case v -# of SQLITE_ROW: -# let -# key = Key.init( -# $sqlite3_column_text_not_null(s, QueryStmtIdCol)) -# .expect("should not fail") - -# blob: ?pointer = -# if query.value: -# sqlite3_column_blob(s, QueryStmtDataCol).some -# else: -# pointer.none - -# # detect out-of-memory error -# # see the conversion table and final paragraph of: -# # https://www.sqlite.org/c3ref/column_blob.html -# # see also https://www.sqlite.org/rescode.html - -# # the "data" column can be NULL so in order to detect an out-of-memory -# # error it is necessary to check that the result is a null pointer and -# # that the result code is an error code -# if blob.isSome and blob.get().isNil: -# let -# v = sqlite3_errcode(sqlite3_db_handle(s)) - -# if not (v in [SQLITE_OK, SQLITE_ROW, SQLITE_DONE]): -# iter.finished = true -# return failure newException(DatastoreError, $sqlite3_errstr(v)) - -# let -# dataLen = sqlite3_column_bytes(s, QueryStmtDataCol) -# data = if blob.isSome: -# @( -# toOpenArray(cast[ptr UncheckedArray[byte]](blob.get), -# 0, -# dataLen - 1)) -# else: -# @[] - -# return success (key.some, data) -# of SQLITE_DONE: -# iter.finished = true -# return success (Key.none, EmptyBytes) -# else: -# iter.finished = true -# return failure newException(DatastoreError, $sqlite3_errstr(v)) - -# iter.dispose = proc(): Future[?!void] {.async.} = -# discard sqlite3_reset(s) -# discard sqlite3_clear_bindings(s) -# s.dispose -# return success() - -# iter.next = next -# return success iter - -# proc new*( -# T: type SQLiteDatastore, -# path: string, -# readOnly = false): ?!T = - -# let -# flags = -# if readOnly: SQLITE_OPEN_READONLY -# else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE - -# success T( -# db: ? SQLiteDsDb.open(path, flags), -# readOnly: readOnly) - -# proc new*( -# T: type SQLiteDatastore, -# db: SQLiteDsDb): ?!T = - -# success T( -# db: db, -# readOnly: db.readOnly) + success T( + db: db, + readOnly: db.readOnly) diff --git a/datastore/sql/sqliteds.nim b/datastore/sql/sqliteds.nim index 1faf799..21f067f 100644 --- a/datastore/sql/sqliteds.nim +++ b/datastore/sql/sqliteds.nim @@ -19,7 +19,7 @@ type db: SQLiteDsDb proc path*(self: SQLiteBackend): string = - self.db.dbPath + $self.db.dbPath proc readOnly*(self: SQLiteBackend): bool = self.db.readOnly @@ -197,11 +197,12 @@ proc query*(self: SQLiteBackend, return success (key.some, data) of SQLITE_DONE: - return + return success (KeyId.none, DataBuffer.new()) else: return failure newException(DatastoreError, $sqlite3_errstr(v)) finally: + echo "sqlite backend: query: finally close" discard sqlite3_reset(s) discard sqlite3_clear_bindings(s) s.dispose()