From e945e3062641bd339b5474509234d4ab640e0cae Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Thu, 28 Sep 2023 18:19:05 -0700 Subject: [PATCH] reorg --- datastore/fsds.nim | 4 +- datastore/sql.nim | 4 +- datastore/threads/threadproxy.nim | 37 +++++++++---------- tests/datastore/threads/testthreadproxyds.nim | 4 +- 4 files changed, 24 insertions(+), 25 deletions(-) diff --git a/datastore/fsds.nim b/datastore/fsds.nim index 333e653..8e9703a 100644 --- a/datastore/fsds.nim +++ b/datastore/fsds.nim @@ -19,7 +19,7 @@ push: {.upraises: [].} type FSDatastore* = ref object of Datastore - db: ThreadDatastore[FSBackend[KeyId, DataBuffer]] + db: ThreadProxy[FSBackend[KeyId, DataBuffer]] method has*(self: FSDatastore, key: Key): Future[?!bool] {.async.} = @@ -65,5 +65,5 @@ proc new*( let backend = ? newFSBackend[KeyId, DataBuffer]( root=root, depth=depth, caseSensitive=caseSensitive, ignoreProtected=ignoreProtected) - db = ? ThreadDatastore.new(backend, tp = tp) + db = ? ThreadProxy.new(backend, tp = tp) success FSDatastore(db: db) diff --git a/datastore/sql.nim b/datastore/sql.nim index 0a860b6..5946ba6 100644 --- a/datastore/sql.nim +++ b/datastore/sql.nim @@ -20,7 +20,7 @@ push: {.upraises: [].} type SQLiteDatastore* = ref object of Datastore - db: ThreadDatastore[SQLiteBackend[KeyId, DataBuffer]] + db: ThreadProxy[SQLiteBackend[KeyId, DataBuffer]] proc path*(self: SQLiteDatastore): string = self.db.backend.path() @@ -68,6 +68,6 @@ proc new*( let backend = ? newSQLiteBackend[KeyId, DataBuffer](path, readOnly) - db = ? ThreadDatastore.new(backend, tp = tp) + db = ? ThreadProxy.new(backend, tp = tp) success SQLiteDatastore(db: db) diff --git a/datastore/threads/threadproxy.nim b/datastore/threads/threadproxy.nim index 2181b6a..85be32e 100644 --- a/datastore/threads/threadproxy.nim +++ b/datastore/threads/threadproxy.nim @@ -48,10 +48,10 @@ type ## Task context object. ## This is a SharedPtr to make the query iter simpler - ThreadDatastore*[BT] = object - tp*: Taskpool + ThreadProxy*[BT] = object + tp: Taskpool backend*: BT - semaphore*: AsyncSemaphore # semaphore is used for backpressure \ + semaphore: AsyncSemaphore # semaphore is used for backpressure \ # to avoid exhausting file descriptors proc newTaskCtx*[T](tp: typedesc[T], @@ -105,7 +105,7 @@ template executeTask*[T](ctx: TaskCtx[T], blk: untyped) = ctx.setDone() discard ctx[].signal.fireSync() -template dispatchTaskWrap[BT](self: ThreadDatastore[BT], +template dispatchTaskWrap[BT](self: ThreadProxy[BT], signal: ThreadSignalPtr, blk: untyped ): auto = @@ -115,7 +115,7 @@ template dispatchTaskWrap[BT](self: ThreadDatastore[BT], runTask() await wait(ctx[].signal) -template dispatchTask*[BT](self: ThreadDatastore[BT], +template dispatchTask*[BT](self: ThreadProxy[BT], signal: ThreadSignalPtr, blk: untyped ): auto = @@ -141,7 +141,7 @@ proc hasTask[T, DB](ctx: TaskCtx[T], ds: DB, key: KeyId) {.gcsafe.} = executeTask(ctx): has(ds, key) -proc has*[BT](self: ThreadDatastore[BT], +proc has*[BT](self: ThreadProxy[BT], key: Key): Future[?!bool] {.async.} = await self.semaphore.acquire() let signal = acquireSignal().get() @@ -161,7 +161,7 @@ proc deleteTask[T, DB](ctx: TaskCtx[T], ds: DB; executeTask(ctx): delete(ds, key) -proc delete*[BT](self: ThreadDatastore[BT], +proc delete*[BT](self: ThreadProxy[BT], key: Key): Future[?!void] {.async.} = ## delete key await self.semaphore.acquire() @@ -176,7 +176,7 @@ proc delete*[BT](self: ThreadDatastore[BT], return ctx[].res.toRes() -proc delete*[BT](self: ThreadDatastore[BT], +proc delete*[BT](self: ThreadProxy[BT], keys: seq[Key]): Future[?!void] {.async.} = ## delete batch for key in keys: @@ -193,7 +193,7 @@ proc putTask[T, DB](ctx: TaskCtx[T], ds: DB; executeTask(ctx): put(ds, key, data) -proc put*[BT](self: ThreadDatastore[BT], +proc put*[BT](self: ThreadProxy[BT], key: Key, data: seq[byte]): Future[?!void] {.async.} = ## put key with data @@ -210,9 +210,8 @@ proc put*[BT](self: ThreadDatastore[BT], return ctx[].res.toRes() -proc put*[E, DB]( - self: ThreadDatastore[DB], - batch: seq[E]): Future[?!void] {.async.} = +proc put*[E, DB](self: ThreadProxy[DB], + batch: seq[E]): Future[?!void] {.async.} = ## put batch data for entry in batch: if err =? (await self.put(entry.key, entry.data)).errorOption: @@ -229,7 +228,7 @@ proc getTask[DB](ctx: TaskCtx[DataBuffer], ds: DB; let res = get(ds, key) res -proc get*[BT](self: ThreadDatastore[BT], +proc get*[BT](self: ThreadProxy[BT], key: Key, ): Future[?!seq[byte]] {.async.} = await self.semaphore.acquire() @@ -244,7 +243,7 @@ proc get*[BT](self: ThreadDatastore[BT], return ctx[].res.toRes(v => v.toSeq()) -proc close*[BT](self: ThreadDatastore[BT]): Future[?!void] {.async.} = +proc close*[BT](self: ThreadProxy[BT]): Future[?!void] {.async.} = await self.semaphore.closeAll() self.backend.close() @@ -295,7 +294,7 @@ proc queryTask[DB]( # set final result (?!QResult).ok((KeyId.none, DataBuffer())) -proc query*[BT](self: ThreadDatastore[BT], +proc query*[BT](self: ThreadProxy[BT], q: Query ): Future[?!QueryIter] {.async.} = ## performs async query @@ -369,14 +368,14 @@ proc query*[BT](self: ThreadDatastore[BT], await iterDispose() raise exc -proc new*[DB](self: type ThreadDatastore, +proc new*[DB](self: type ThreadProxy, db: DB, withLocks = static false, tp: Taskpool - ): ?!ThreadDatastore[DB] = - doAssert tp.numThreads > 1, "ThreadDatastore requires at least 2 threads" + ): ?!ThreadProxy[DB] = + doAssert tp.numThreads > 1, "ThreadProxy requires at least 2 threads" - success ThreadDatastore[DB]( + success ThreadProxy[DB]( tp: tp, backend: db, semaphore: AsyncSemaphore.new(tp.numThreads - 1) diff --git a/tests/datastore/threads/testthreadproxyds.nim b/tests/datastore/threads/testthreadproxyds.nim index 000a3e1..75e3d85 100644 --- a/tests/datastore/threads/testthreadproxyds.nim +++ b/tests/datastore/threads/testthreadproxyds.nim @@ -131,8 +131,8 @@ for i in 1..N: suite "Test ThreadDatastore cancelations": privateAccess(SQLiteDatastore) # expose private fields - privateAccess(ThreadDatastore) # expose private fields - # privateAccess(TaskCtx) # expose private fields + privateAccess(ThreadProxy) # expose private fields + privateAccess(TaskCtx) # expose private fields var sds: SQLiteDatastore