diff --git a/datastore/fsds.nim b/datastore/fsds.nim index 178de6e..ac9817a 100644 --- a/datastore/fsds.nim +++ b/datastore/fsds.nim @@ -6,94 +6,68 @@ import pkg/questionable import pkg/questionable/results from pkg/stew/results as stewResults import get, isErr import pkg/upraises +import pkg/chronos +import pkg/taskpools -import ./threads/backend +import ./threads/sqlbackend +import ./threads/threadproxyds import ./datastore -export datastore +export datastore, Taskpool push: {.upraises: [].} type SQLiteDatastore* = ref object of Datastore - db: SQLiteBackend[KeyId, DataBuffer] + db: ThreadDatastore[SQLiteBackend[KeyId, DataBuffer]] proc path*(self: SQLiteDatastore): string = - self.db.path() + self.db.backend.path() proc readOnly*(self: SQLiteDatastore): bool = - self.db.readOnly() + self.db.backend.readOnly() method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} = - return self.db.has(KeyId.new key.id()) + await self.db.has(key) method delete*(self: SQLiteDatastore, key: Key): Future[?!void] {.async.} = - return self.db.delete(KeyId.new key.id()) + await self.db.delete(key) method delete*(self: SQLiteDatastore, keys: seq[Key]): Future[?!void] {.async.} = - let dkeys = keys.mapIt(KeyId.new it.id()) - return self.db.delete(dkeys) + await self.db.delete(keys) method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} = - self.db.get(KeyId.new key.id()).map() do(d: DataBuffer) -> seq[byte]: - d.toSeq() + await self.db.get(key) method put*(self: SQLiteDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} = - self.db.put(KeyId.new key.id(), DataBuffer.new data) + await self.db.put(key, data) method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.async.} = - var dbatch: seq[tuple[key: KeyId, data: DataBuffer]] - for entry in batch: - dbatch.add((KeyId.new entry.key.id(), DataBuffer.new entry.data)) - self.db.put(dbatch) + await self.db.put(batch) method close*(self: SQLiteDatastore): Future[?!void] {.async.} = - self.db.close() + await self.db.close() -method queryIter*(self: SQLiteDatastore, - query: Query - ): ?!(iterator(): ?!QueryResponse) = - - let dbquery = dbQuery( - key= KeyId.new query.key.id(), - value= query.value, - limit= query.limit, - offset= query.offset, - sort= query.sort, - ) - var qhandle = ? self.db.query(dbquery) - - let iter = iterator(): ?!QueryResponse = - for resp in qhandle.queryIter(): - without qres =? resp, err: - yield QueryResponse.failure err - let k = qres.key.map() do(k: KeyId) -> Key: - Key.init($k).expect("valid key") - let v: seq[byte] = qres.data.toSeq() - yield success (k, v) - - success iter +method query*(self: SQLiteDatastore, + q: Query): Future[?!QueryIter] {.async.} = + await self.db.query(q) proc new*( T: type SQLiteDatastore, path: string, - readOnly = false): ?!SQLiteDatastore = + readOnly = false, + tp: Taskpool, +): ?!SQLiteDatastore = - success SQLiteDatastore( - db: ? newSQLiteBackend[KeyId, DataBuffer](path, readOnly)) - -proc new*( - T: type SQLiteDatastore, - db: SQLiteBackend[KeyId, DataBuffer]): ?!T = - - success T( - db: db, - readOnly: db.readOnly) + let + backend = ? newSQLiteBackend[KeyId, DataBuffer](path, readOnly) + db = ? ThreadDatastore.new(backend, tp = tp) + success SQLiteDatastore(db: db) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 7c7dc03..41e5b88 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -16,12 +16,12 @@ import pkg/chronos/threadsync import pkg/questionable import pkg/questionable/results import pkg/taskpools +import std/isolation import pkg/chronicles import pkg/threading/smartptrs import ../key import ../query -import ../datastore import ./backend import ./fsbackend import ./sqlbackend @@ -30,7 +30,7 @@ import ./asyncsemaphore import ./databuffer import ./threadresult -export threadresult +export threadresult, smartptrs, isolation, chronicles logScope: topics = "datastore threadproxyds" @@ -48,10 +48,10 @@ type ## Task context object. ## This is a SharedPtr to make the query iter simpler - ThreadDatastore*[BT] = ref object of Datastore - tp: Taskpool - backend: BT - semaphore: AsyncSemaphore # semaphore is used for backpressure \ + ThreadDatastore*[BT] = object + tp*: Taskpool + backend*: BT + semaphore*: AsyncSemaphore # semaphore is used for backpressure \ # to avoid exhausting file descriptors proc newTaskCtx*[T](tp: typedesc[T], @@ -207,9 +207,9 @@ proc put*[BT](self: ThreadDatastore[BT], return ctx[].res.toRes() -proc put*[DB]( +proc put*[E, DB]( self: ThreadDatastore[DB], - batch: seq[BatchEntry]): Future[?!void] {.async.} = + batch: seq[E]): Future[?!void] {.async.} = ## put batch data for entry in batch: if err =? (await self.put(entry.key, entry.data)).errorOption: diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index d6a2b83..e3217a4 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -37,15 +37,13 @@ for i in 1..N: suite "Test Basic ThreadDatastore with SQLite " & $i: var - sqlStore: SQLiteBackend[KeyId, DataBuffer] - ds: ThreadDatastore[SQLiteBackend[KeyId, DataBuffer]] + ds: SQLiteDatastore key = Key.init("/a/b").tryGet() bytes = "some bytes".toBytes otherBytes = "some other bytes".toBytes setupAll: - sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() - ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() + ds = SQLiteDatastore.new(Memory, tp = taskPool).tryGet() teardown: GC_fullCollect()