From 49d846f468cf846b81da3d264c7e0a66fe0fb04f Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Wed, 27 Sep 2023 12:02:04 -0700 Subject: [PATCH] make Threadproxyds generic --- datastore/threads/threadproxyds.nim | 24 +++++++++++------------ tests/datastore/testthreadproxyds.nim | 28 +++++++-------------------- 2 files changed, 19 insertions(+), 33 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index f9d1189..c8e98ad 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -55,7 +55,7 @@ type var ctxLock: Lock ctxLock.initLock() -proc newTaskCtx*[T](signal: ThreadSignalPtr): TaskCtx[T] = +proc newTaskCtx*[T](tp: typedesc[T], signal: ThreadSignalPtr): TaskCtx[T] = newSharedPtr(TaskCtxObj[T](signal: signal)) proc setCancelled[T](ctx: TaskCtx[T]) = @@ -123,7 +123,6 @@ template dispatchTask[BT](self: ThreadDatastore[BT], ## handles dispatching a task from an async context ## `blk` is the actions, it has `ctx` and `ds` variables in scope. ## note that `ds` is a generic - # let ctx {.inject.} = newSharedPtr(TaskCtxObj[T](signal: signal)) try: dispatchTaskWrap[BT](self, signal, blk) except CancelledError as exc: @@ -146,13 +145,13 @@ method has*[BT](self: ThreadDatastore[BT], without signal =? acquireSignal(), err: return failure err - let ctx = newTaskCtx(bool, signal: signal) + let ctx = newTaskCtx(bool, signal=signal) dispatchTask(self, signal): let key = KeyId.new key.id() self.tp.spawn hasTask(ctx, ds, key) return ctx[].res.toRes(v => v) -proc deleteTask[T, DB](ctx: TaskCtx[T], ds: DB; +method deleteTask[T, DB](ctx: TaskCtx[T], ds: DB; key: KeyId) {.gcsafe.} = ## run backend command executeTask(ctx): @@ -165,7 +164,7 @@ method delete*[BT](self: ThreadDatastore[BT], without signal =? acquireSignal(), err: return failure err - let ctx = newTaskCtx[void](signal: signal) + let ctx = newTaskCtx(void, signal=signal) dispatchTask(self, signal): let key = KeyId.new key.id() self.tp.spawn deleteTask(ctx, ds, key) @@ -196,7 +195,7 @@ method put*[BT](self: ThreadDatastore[BT], without signal =? acquireSignal(), err: return failure err - let ctx = newTaskCtx[void](signal: signal) + let ctx = newTaskCtx(void, signal=signal) dispatchTask(self, signal): let key = KeyId.new key.id() let data = DataBuffer.new data @@ -229,8 +228,9 @@ method get*[BT](self: ThreadDatastore[BT], without signal =? acquireSignal(), err: return failure err - let ctx = newTaskCtx[void](signal: signal) - dispatchTask[DataBuffer](self, signal): + let ctx = newTaskCtx(DataBuffer, signal=signal) + dispatchTask(self, signal): + let key = KeyId.new key.id() self.tp.spawn getTask(ctx, ds, key) return ctx[].res.toRes(v => v.toSeq()) @@ -302,8 +302,8 @@ method query*[BT](self: ThreadDatastore[BT], value=q.value, limit=q.limit, offset=q.offset, sort=q.sort) # setup initial queryTask - let ctx = newTaskCtx[QResult](signal: signal) - dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer], BT](self, signal): + let ctx = newTaskCtx(QResult, signal=signal) + dispatchTaskWrap(self, signal): self.tp.spawn queryTask(ctx, ds, query, nextSignal) await nextSignal.fire() @@ -356,10 +356,10 @@ proc new*[DB](self: type ThreadDatastore, db: DB, withLocks = static false, tp: Taskpool - ): ?!ThreadDatastore = + ): ?!ThreadDatastore[DB] = doAssert tp.numThreads > 1, "ThreadDatastore requires at least 2 threads" - success ThreadDatastore( + success ThreadDatastore[DB]( tp: tp, backend: db, # TODO: are these needed anymore?? diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index 14a41b6..72758b0 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -25,14 +25,10 @@ import ./querycommontests const NumThreads = 20 # IO threads aren't attached to CPU count suite "Test Basic ThreadProxyDatastore": + var - sqlStore: SQLiteBackend[KeyId,DataBuffer] - ds: ThreadDatastore - taskPool: Taskpool key = Key.init("/a").tryGet() data = "some bytes".toBytes - - setupAll: sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() taskPool = Taskpool.new(NumThreads) ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() @@ -61,17 +57,12 @@ suite "Test Basic ThreadProxyDatastore": suite "Test Basic ThreadDatastore with SQLite": var - sqlStore: SQLiteBackend[KeyId,DataBuffer] - ds: ThreadDatastore - taskPool: Taskpool - key = Key.init("/a/b").tryGet() - bytes = "some bytes".toBytes - otherBytes = "some other bytes".toBytes - - setupAll: sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() taskPool = Taskpool.new(NumThreads) ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() + key = Key.init("/a/b").tryGet() + bytes = "some bytes".toBytes + otherBytes = "some other bytes".toBytes teardown: GC_fullCollect() @@ -85,17 +76,12 @@ suite "Test Basic ThreadDatastore with SQLite": suite "Test Query ThreadDatastore with SQLite": var - sqlStore: SQLiteBackend[KeyId,DataBuffer] - ds: ThreadDatastore - taskPool: Taskpool - key = Key.init("/a/b").tryGet() - bytes = "some bytes".toBytes - otherBytes = "some other bytes".toBytes - - setup: sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() taskPool = Taskpool.new(NumThreads) ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() + key = Key.init("/a/b").tryGet() + bytes = "some bytes".toBytes + otherBytes = "some other bytes".toBytes teardown: GC_fullCollect()