From 5e424262c3cb39863d259fc8bcf7c683144d2158 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Wed, 27 Sep 2023 11:51:55 -0700 Subject: [PATCH] rework tuple types --- datastore/threads/threadproxyds.nim | 94 ++++++++++++----------------- 1 file changed, 40 insertions(+), 54 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 772dc72..f9d1189 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -36,16 +36,6 @@ logScope: type - ThreadBackendKinds* = enum - Sqlite - # Filesystem - - ThreadBackend* = object - ## backend case type to avoid needing to make ThreadDatastore generic - case kind*: ThreadBackendKinds - of Sqlite: - sql*: SQLiteBackend[KeyId,DataBuffer] - TaskCtxObj*[T: ThreadTypes] = object res: ThreadResult[T] signal: ThreadSignalPtr @@ -56,15 +46,18 @@ type ## Task context object. ## This is a SharedPtr to make the query iter simpler - ThreadDatastore* = ref object of Datastore + ThreadDatastore*[BT] = ref object of Datastore tp: Taskpool - backend: ThreadBackend + backend: BT semaphore: AsyncSemaphore # semaphore is used for backpressure \ # to avoid exhausting file descriptors var ctxLock: Lock ctxLock.initLock() +proc newTaskCtx*[T](signal: ThreadSignalPtr): TaskCtx[T] = + newSharedPtr(TaskCtxObj[T](signal: signal)) + proc setCancelled[T](ctx: TaskCtx[T]) = # withLock(ctxLock): ctx[].cancelled = true @@ -113,28 +106,26 @@ template executeTask[T](ctx: TaskCtx[T], blk: untyped) = ctx.setDone() discard ctx[].signal.fireSync() -template dispatchTaskWrap[T](self: ThreadDatastore, - signal: ThreadSignalPtr, - blk: untyped - ): auto = - case self.backend.kind: - of Sqlite: - var ds {.used, inject.} = self.backend.sql - proc runTask() = - `blk` - runTask() - await wait(ctx[].signal) +template dispatchTaskWrap[BT](self: ThreadDatastore[BT], + signal: ThreadSignalPtr, + blk: untyped + ): auto = + var ds {.used, inject.} = self.backend + proc runTask() = + `blk` + runTask() + await wait(ctx[].signal) -template dispatchTask[T](self: ThreadDatastore, +template dispatchTask[BT](self: ThreadDatastore[BT], signal: ThreadSignalPtr, blk: untyped - ): auto = + ): auto = ## handles dispatching a task from an async context ## `blk` is the actions, it has `ctx` and `ds` variables in scope. ## note that `ds` is a generic - let ctx {.inject.} = newSharedPtr(TaskCtxObj[T](signal: signal)) + # let ctx {.inject.} = newSharedPtr(TaskCtxObj[T](signal: signal)) try: - dispatchTaskWrap[T](self, signal, blk) + dispatchTaskWrap[BT](self, signal, blk) except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg ctx.setCancelled() @@ -149,38 +140,39 @@ proc hasTask[T, DB](ctx: TaskCtx[T], ds: DB, key: KeyId) {.gcsafe.} = executeTask(ctx): has(ds, key) -method has*(self: ThreadDatastore, +method has*[BT](self: ThreadDatastore[BT], key: Key): Future[?!bool] {.async.} = await self.semaphore.acquire() without signal =? acquireSignal(), err: return failure err - let key = KeyId.new key.id() - dispatchTask[bool](self, signal): + let ctx = newTaskCtx(bool, signal: signal) + dispatchTask(self, signal): + let key = KeyId.new key.id() self.tp.spawn hasTask(ctx, ds, key) return ctx[].res.toRes(v => v) - proc deleteTask[T, DB](ctx: TaskCtx[T], ds: DB; key: KeyId) {.gcsafe.} = ## run backend command executeTask(ctx): delete(ds, key) -method delete*(self: ThreadDatastore, +method delete*[BT](self: ThreadDatastore[BT], key: Key): Future[?!void] {.async.} = ## delete key await self.semaphore.acquire() without signal =? acquireSignal(), err: return failure err - let key = KeyId.new key.id() - dispatchTask[void](self, signal): + let ctx = newTaskCtx[void](signal: signal) + dispatchTask(self, signal): + let key = KeyId.new key.id() self.tp.spawn deleteTask(ctx, ds, key) return ctx[].res.toRes() -method delete*(self: ThreadDatastore, +method delete*[BT](self: ThreadDatastore[BT], keys: seq[Key]): Future[?!void] {.async.} = ## delete batch for key in keys: @@ -196,7 +188,7 @@ proc putTask[T, DB](ctx: TaskCtx[T], ds: DB; executeTask(ctx): put(ds, key, data) -method put*(self: ThreadDatastore, +method put*[BT](self: ThreadDatastore[BT], key: Key, data: seq[byte]): Future[?!void] {.async.} = ## put key with data @@ -204,7 +196,8 @@ method put*(self: ThreadDatastore, without signal =? acquireSignal(), err: return failure err - dispatchTask[void](self, signal): + let ctx = newTaskCtx[void](signal: signal) + dispatchTask(self, signal): let key = KeyId.new key.id() let data = DataBuffer.new data self.tp.spawn putTask(ctx, ds, key, data) @@ -229,24 +222,22 @@ proc getTask[DB](ctx: TaskCtx[DataBuffer], ds: DB; let res = get(ds, key) res -method get*(self: ThreadDatastore, - key: Key, - ): Future[?!seq[byte]] {.async.} = +method get*[BT](self: ThreadDatastore[BT], + key: Key, + ): Future[?!seq[byte]] {.async.} = await self.semaphore.acquire() without signal =? acquireSignal(), err: return failure err - let key = KeyId.new key.id() + let ctx = newTaskCtx[void](signal: signal) dispatchTask[DataBuffer](self, signal): self.tp.spawn getTask(ctx, ds, key) return ctx[].res.toRes(v => v.toSeq()) -method close*(self: ThreadDatastore): Future[?!void] {.async.} = +method close*[BT](self: ThreadDatastore[BT]): Future[?!void] {.async.} = await self.semaphore.closeAll() - case self.backend.kind: - of Sqlite: - self.backend.sql.close() + self.backend.close() type QResult = DbQueryResponse[KeyId, DataBuffer] @@ -293,7 +284,7 @@ proc queryTask[DB]( # set final result (?!QResult).ok((KeyId.none, DataBuffer())) -method query*(self: ThreadDatastore, +method query*[BT](self: ThreadDatastore[BT], q: Query ): Future[?!QueryIter] {.async.} = ## performs async query @@ -311,8 +302,8 @@ method query*(self: ThreadDatastore, value=q.value, limit=q.limit, offset=q.offset, sort=q.sort) # setup initial queryTask - let ctx {.inject.} = newSharedPtr(TaskCtxObj[QResult](signal: signal)) - dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer]](self, signal): + let ctx = newTaskCtx[QResult](signal: signal) + dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer], BT](self, signal): self.tp.spawn queryTask(ctx, ds, query, nextSignal) await nextSignal.fire() @@ -368,14 +359,9 @@ proc new*[DB](self: type ThreadDatastore, ): ?!ThreadDatastore = doAssert tp.numThreads > 1, "ThreadDatastore requires at least 2 threads" - when DB is SQLiteBackend[KeyId,DataBuffer]: - let backend = ThreadBackend(kind: Sqlite, sql: db) - else: - {.error: "unsupported backend: " & $typeof(db).} - success ThreadDatastore( tp: tp, - backend: backend, + backend: db, # TODO: are these needed anymore?? # withLocks: withLocks, # queryLock: newAsyncLock(),