From 5e424262c3cb39863d259fc8bcf7c683144d2158 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Wed, 27 Sep 2023 11:51:55 -0700 Subject: [PATCH 01/20] rework tuple types --- datastore/threads/threadproxyds.nim | 94 ++++++++++++----------------- 1 file changed, 40 insertions(+), 54 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 772dc72..f9d1189 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -36,16 +36,6 @@ logScope: type - ThreadBackendKinds* = enum - Sqlite - # Filesystem - - ThreadBackend* = object - ## backend case type to avoid needing to make ThreadDatastore generic - case kind*: ThreadBackendKinds - of Sqlite: - sql*: SQLiteBackend[KeyId,DataBuffer] - TaskCtxObj*[T: ThreadTypes] = object res: ThreadResult[T] signal: ThreadSignalPtr @@ -56,15 +46,18 @@ type ## Task context object. ## This is a SharedPtr to make the query iter simpler - ThreadDatastore* = ref object of Datastore + ThreadDatastore*[BT] = ref object of Datastore tp: Taskpool - backend: ThreadBackend + backend: BT semaphore: AsyncSemaphore # semaphore is used for backpressure \ # to avoid exhausting file descriptors var ctxLock: Lock ctxLock.initLock() +proc newTaskCtx*[T](signal: ThreadSignalPtr): TaskCtx[T] = + newSharedPtr(TaskCtxObj[T](signal: signal)) + proc setCancelled[T](ctx: TaskCtx[T]) = # withLock(ctxLock): ctx[].cancelled = true @@ -113,28 +106,26 @@ template executeTask[T](ctx: TaskCtx[T], blk: untyped) = ctx.setDone() discard ctx[].signal.fireSync() -template dispatchTaskWrap[T](self: ThreadDatastore, - signal: ThreadSignalPtr, - blk: untyped - ): auto = - case self.backend.kind: - of Sqlite: - var ds {.used, inject.} = self.backend.sql - proc runTask() = - `blk` - runTask() - await wait(ctx[].signal) +template dispatchTaskWrap[BT](self: ThreadDatastore[BT], + signal: ThreadSignalPtr, + blk: untyped + ): auto = + var ds {.used, inject.} = self.backend + proc runTask() = + `blk` + runTask() + await wait(ctx[].signal) -template dispatchTask[T](self: ThreadDatastore, +template dispatchTask[BT](self: ThreadDatastore[BT], signal: ThreadSignalPtr, blk: untyped - ): auto = + ): auto = ## handles dispatching a task from an async context ## `blk` is the actions, it has `ctx` and `ds` variables in scope. ## note that `ds` is a generic - let ctx {.inject.} = newSharedPtr(TaskCtxObj[T](signal: signal)) + # let ctx {.inject.} = newSharedPtr(TaskCtxObj[T](signal: signal)) try: - dispatchTaskWrap[T](self, signal, blk) + dispatchTaskWrap[BT](self, signal, blk) except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg ctx.setCancelled() @@ -149,38 +140,39 @@ proc hasTask[T, DB](ctx: TaskCtx[T], ds: DB, key: KeyId) {.gcsafe.} = executeTask(ctx): has(ds, key) -method has*(self: ThreadDatastore, +method has*[BT](self: ThreadDatastore[BT], key: Key): Future[?!bool] {.async.} = await self.semaphore.acquire() without signal =? acquireSignal(), err: return failure err - let key = KeyId.new key.id() - dispatchTask[bool](self, signal): + let ctx = newTaskCtx(bool, signal: signal) + dispatchTask(self, signal): + let key = KeyId.new key.id() self.tp.spawn hasTask(ctx, ds, key) return ctx[].res.toRes(v => v) - proc deleteTask[T, DB](ctx: TaskCtx[T], ds: DB; key: KeyId) {.gcsafe.} = ## run backend command executeTask(ctx): delete(ds, key) -method delete*(self: ThreadDatastore, +method delete*[BT](self: ThreadDatastore[BT], key: Key): Future[?!void] {.async.} = ## delete key await self.semaphore.acquire() without signal =? acquireSignal(), err: return failure err - let key = KeyId.new key.id() - dispatchTask[void](self, signal): + let ctx = newTaskCtx[void](signal: signal) + dispatchTask(self, signal): + let key = KeyId.new key.id() self.tp.spawn deleteTask(ctx, ds, key) return ctx[].res.toRes() -method delete*(self: ThreadDatastore, +method delete*[BT](self: ThreadDatastore[BT], keys: seq[Key]): Future[?!void] {.async.} = ## delete batch for key in keys: @@ -196,7 +188,7 @@ proc putTask[T, DB](ctx: TaskCtx[T], ds: DB; executeTask(ctx): put(ds, key, data) -method put*(self: ThreadDatastore, +method put*[BT](self: ThreadDatastore[BT], key: Key, data: seq[byte]): Future[?!void] {.async.} = ## put key with data @@ -204,7 +196,8 @@ method put*(self: ThreadDatastore, without signal =? acquireSignal(), err: return failure err - dispatchTask[void](self, signal): + let ctx = newTaskCtx[void](signal: signal) + dispatchTask(self, signal): let key = KeyId.new key.id() let data = DataBuffer.new data self.tp.spawn putTask(ctx, ds, key, data) @@ -229,24 +222,22 @@ proc getTask[DB](ctx: TaskCtx[DataBuffer], ds: DB; let res = get(ds, key) res -method get*(self: ThreadDatastore, - key: Key, - ): Future[?!seq[byte]] {.async.} = +method get*[BT](self: ThreadDatastore[BT], + key: Key, + ): Future[?!seq[byte]] {.async.} = await self.semaphore.acquire() without signal =? acquireSignal(), err: return failure err - let key = KeyId.new key.id() + let ctx = newTaskCtx[void](signal: signal) dispatchTask[DataBuffer](self, signal): self.tp.spawn getTask(ctx, ds, key) return ctx[].res.toRes(v => v.toSeq()) -method close*(self: ThreadDatastore): Future[?!void] {.async.} = +method close*[BT](self: ThreadDatastore[BT]): Future[?!void] {.async.} = await self.semaphore.closeAll() - case self.backend.kind: - of Sqlite: - self.backend.sql.close() + self.backend.close() type QResult = DbQueryResponse[KeyId, DataBuffer] @@ -293,7 +284,7 @@ proc queryTask[DB]( # set final result (?!QResult).ok((KeyId.none, DataBuffer())) -method query*(self: ThreadDatastore, +method query*[BT](self: ThreadDatastore[BT], q: Query ): Future[?!QueryIter] {.async.} = ## performs async query @@ -311,8 +302,8 @@ method query*(self: ThreadDatastore, value=q.value, limit=q.limit, offset=q.offset, sort=q.sort) # setup initial queryTask - let ctx {.inject.} = newSharedPtr(TaskCtxObj[QResult](signal: signal)) - dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer]](self, signal): + let ctx = newTaskCtx[QResult](signal: signal) + dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer], BT](self, signal): self.tp.spawn queryTask(ctx, ds, query, nextSignal) await nextSignal.fire() @@ -368,14 +359,9 @@ proc new*[DB](self: type ThreadDatastore, ): ?!ThreadDatastore = doAssert tp.numThreads > 1, "ThreadDatastore requires at least 2 threads" - when DB is SQLiteBackend[KeyId,DataBuffer]: - let backend = ThreadBackend(kind: Sqlite, sql: db) - else: - {.error: "unsupported backend: " & $typeof(db).} - success ThreadDatastore( tp: tp, - backend: backend, + backend: db, # TODO: are these needed anymore?? # withLocks: withLocks, # queryLock: newAsyncLock(), From 49d846f468cf846b81da3d264c7e0a66fe0fb04f Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Wed, 27 Sep 2023 12:02:04 -0700 Subject: [PATCH 02/20] make Threadproxyds generic --- datastore/threads/threadproxyds.nim | 24 +++++++++++------------ tests/datastore/testthreadproxyds.nim | 28 +++++++-------------------- 2 files changed, 19 insertions(+), 33 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index f9d1189..c8e98ad 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -55,7 +55,7 @@ type var ctxLock: Lock ctxLock.initLock() -proc newTaskCtx*[T](signal: ThreadSignalPtr): TaskCtx[T] = +proc newTaskCtx*[T](tp: typedesc[T], signal: ThreadSignalPtr): TaskCtx[T] = newSharedPtr(TaskCtxObj[T](signal: signal)) proc setCancelled[T](ctx: TaskCtx[T]) = @@ -123,7 +123,6 @@ template dispatchTask[BT](self: ThreadDatastore[BT], ## handles dispatching a task from an async context ## `blk` is the actions, it has `ctx` and `ds` variables in scope. ## note that `ds` is a generic - # let ctx {.inject.} = newSharedPtr(TaskCtxObj[T](signal: signal)) try: dispatchTaskWrap[BT](self, signal, blk) except CancelledError as exc: @@ -146,13 +145,13 @@ method has*[BT](self: ThreadDatastore[BT], without signal =? acquireSignal(), err: return failure err - let ctx = newTaskCtx(bool, signal: signal) + let ctx = newTaskCtx(bool, signal=signal) dispatchTask(self, signal): let key = KeyId.new key.id() self.tp.spawn hasTask(ctx, ds, key) return ctx[].res.toRes(v => v) -proc deleteTask[T, DB](ctx: TaskCtx[T], ds: DB; +method deleteTask[T, DB](ctx: TaskCtx[T], ds: DB; key: KeyId) {.gcsafe.} = ## run backend command executeTask(ctx): @@ -165,7 +164,7 @@ method delete*[BT](self: ThreadDatastore[BT], without signal =? acquireSignal(), err: return failure err - let ctx = newTaskCtx[void](signal: signal) + let ctx = newTaskCtx(void, signal=signal) dispatchTask(self, signal): let key = KeyId.new key.id() self.tp.spawn deleteTask(ctx, ds, key) @@ -196,7 +195,7 @@ method put*[BT](self: ThreadDatastore[BT], without signal =? acquireSignal(), err: return failure err - let ctx = newTaskCtx[void](signal: signal) + let ctx = newTaskCtx(void, signal=signal) dispatchTask(self, signal): let key = KeyId.new key.id() let data = DataBuffer.new data @@ -229,8 +228,9 @@ method get*[BT](self: ThreadDatastore[BT], without signal =? acquireSignal(), err: return failure err - let ctx = newTaskCtx[void](signal: signal) - dispatchTask[DataBuffer](self, signal): + let ctx = newTaskCtx(DataBuffer, signal=signal) + dispatchTask(self, signal): + let key = KeyId.new key.id() self.tp.spawn getTask(ctx, ds, key) return ctx[].res.toRes(v => v.toSeq()) @@ -302,8 +302,8 @@ method query*[BT](self: ThreadDatastore[BT], value=q.value, limit=q.limit, offset=q.offset, sort=q.sort) # setup initial queryTask - let ctx = newTaskCtx[QResult](signal: signal) - dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer], BT](self, signal): + let ctx = newTaskCtx(QResult, signal=signal) + dispatchTaskWrap(self, signal): self.tp.spawn queryTask(ctx, ds, query, nextSignal) await nextSignal.fire() @@ -356,10 +356,10 @@ proc new*[DB](self: type ThreadDatastore, db: DB, withLocks = static false, tp: Taskpool - ): ?!ThreadDatastore = + ): ?!ThreadDatastore[DB] = doAssert tp.numThreads > 1, "ThreadDatastore requires at least 2 threads" - success ThreadDatastore( + success ThreadDatastore[DB]( tp: tp, backend: db, # TODO: are these needed anymore?? diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index 14a41b6..72758b0 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -25,14 +25,10 @@ import ./querycommontests const NumThreads = 20 # IO threads aren't attached to CPU count suite "Test Basic ThreadProxyDatastore": + var - sqlStore: SQLiteBackend[KeyId,DataBuffer] - ds: ThreadDatastore - taskPool: Taskpool key = Key.init("/a").tryGet() data = "some bytes".toBytes - - setupAll: sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() taskPool = Taskpool.new(NumThreads) ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() @@ -61,17 +57,12 @@ suite "Test Basic ThreadProxyDatastore": suite "Test Basic ThreadDatastore with SQLite": var - sqlStore: SQLiteBackend[KeyId,DataBuffer] - ds: ThreadDatastore - taskPool: Taskpool - key = Key.init("/a/b").tryGet() - bytes = "some bytes".toBytes - otherBytes = "some other bytes".toBytes - - setupAll: sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() taskPool = Taskpool.new(NumThreads) ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() + key = Key.init("/a/b").tryGet() + bytes = "some bytes".toBytes + otherBytes = "some other bytes".toBytes teardown: GC_fullCollect() @@ -85,17 +76,12 @@ suite "Test Basic ThreadDatastore with SQLite": suite "Test Query ThreadDatastore with SQLite": var - sqlStore: SQLiteBackend[KeyId,DataBuffer] - ds: ThreadDatastore - taskPool: Taskpool - key = Key.init("/a/b").tryGet() - bytes = "some bytes".toBytes - otherBytes = "some other bytes".toBytes - - setup: sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() taskPool = Taskpool.new(NumThreads) ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() + key = Key.init("/a/b").tryGet() + bytes = "some bytes".toBytes + otherBytes = "some other bytes".toBytes teardown: GC_fullCollect() From 6a4e460e58002448286830c1b544b1e3270fcd3d Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Wed, 27 Sep 2023 12:41:27 -0700 Subject: [PATCH 03/20] queryLocks can be done at FsDs backend level --- datastore/threads/threadproxyds.nim | 3 --- 1 file changed, 3 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 772dc72..dcc1680 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -376,8 +376,5 @@ proc new*[DB](self: type ThreadDatastore, success ThreadDatastore( tp: tp, backend: backend, - # TODO: are these needed anymore?? - # withLocks: withLocks, - # queryLock: newAsyncLock(), semaphore: AsyncSemaphore.new(tp.numThreads - 1) ) From b4b534bf67d8ac1928492a70fece524759148ef2 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Wed, 27 Sep 2023 12:54:22 -0700 Subject: [PATCH 04/20] loop tests --- datastore/threads/threadproxyds.nim | 2 + tests/datastore/testthreadproxyds.nim | 112 ++++++++++---------------- 2 files changed, 44 insertions(+), 70 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index dcc1680..432eb6f 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -348,6 +348,7 @@ method query*(self: ThreadDatastore, trace "Cancelling thread future!", exc = exc.msg ctx.setCancelled() discard ctx[].signal.close() + echo "nextSignal:CLOSE!" discard nextSignal.close() self.semaphore.release() raise exc @@ -357,6 +358,7 @@ method query*(self: ThreadDatastore, except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg discard signal.close() + echo "nextSignal:CLOSE!" discard nextSignal.close() self.semaphore.release() raise exc diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index 14a41b6..27c8e82 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -22,88 +22,60 @@ import pkg/datastore/threads/threadproxyds {.all.} import ./dscommontests import ./querycommontests -const NumThreads = 20 # IO threads aren't attached to CPU count +const + NumThreads = 20 # IO threads aren't attached to CPU count + N = 100 -suite "Test Basic ThreadProxyDatastore": - var - sqlStore: SQLiteBackend[KeyId,DataBuffer] - ds: ThreadDatastore - taskPool: Taskpool - key = Key.init("/a").tryGet() - data = "some bytes".toBytes +for i in 1..N: + suite "Test Basic ThreadDatastore with SQLite": - setupAll: - sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() - taskPool = Taskpool.new(NumThreads) - ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() + var + sqlStore: SQLiteBackend[KeyId,DataBuffer] + ds: ThreadDatastore + taskPool: Taskpool + key = Key.init("/a/b").tryGet() + bytes = "some bytes".toBytes + otherBytes = "some other bytes".toBytes - teardownAll: - echo "teardown done" + setupAll: + sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() + taskPool = Taskpool.new(NumThreads) + ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() - test "check put": - echo "\n\n=== put ===" - let res1 = await ds.put(key, data) - echo "res1: ", res1.repr - check res1.isOk + teardown: + GC_fullCollect() - test "check get": - echo "\n\n=== get ===" - echo "get send key: ", key.repr - let res2 = await ds.get(key) - echo "get key post: ", key.repr - echo "get res2: ", res2.repr - echo res2.get() == data - var val = "" - for c in res2.get(): - val &= char(c) - echo "get res2: ", $val + teardownAll: + (await ds.close()).tryGet() + taskPool.shutdown() -suite "Test Basic ThreadDatastore with SQLite": + basicStoreTests(ds, key, bytes, otherBytes) + GC_fullCollect() - var - sqlStore: SQLiteBackend[KeyId,DataBuffer] - ds: ThreadDatastore - taskPool: Taskpool - key = Key.init("/a/b").tryGet() - bytes = "some bytes".toBytes - otherBytes = "some other bytes".toBytes +for i in 1..N: + suite "Test Query ThreadDatastore with SQLite": - setupAll: - sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() - taskPool = Taskpool.new(NumThreads) - ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() + var + sqlStore: SQLiteBackend[KeyId,DataBuffer] + ds: ThreadDatastore + taskPool: Taskpool + key = Key.init("/a/b").tryGet() + bytes = "some bytes".toBytes + otherBytes = "some other bytes".toBytes - teardown: - GC_fullCollect() + setup: + sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() + taskPool = Taskpool.new(NumThreads) + ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() - teardownAll: - (await ds.close()).tryGet() - taskPool.shutdown() + teardown: + GC_fullCollect() - basicStoreTests(ds, key, bytes, otherBytes) + (await ds.close()).tryGet() + taskPool.shutdown() -suite "Test Query ThreadDatastore with SQLite": - - var - sqlStore: SQLiteBackend[KeyId,DataBuffer] - ds: ThreadDatastore - taskPool: Taskpool - key = Key.init("/a/b").tryGet() - bytes = "some bytes".toBytes - otherBytes = "some other bytes".toBytes - - setup: - sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() - taskPool = Taskpool.new(NumThreads) - ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() - - teardown: - GC_fullCollect() - - (await ds.close()).tryGet() - taskPool.shutdown() - - queryTests(ds, true) + queryTests(ds, true) + GC_fullCollect() # suite "Test Basic ThreadDatastore with fsds": # let From 030084186ef68472f4e0c7f60d962f95aee9ad93 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Wed, 27 Sep 2023 12:55:48 -0700 Subject: [PATCH 05/20] loop tests --- tests/datastore/testthreadproxyds.nim | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index 27c8e82..b42cbb0 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -24,7 +24,8 @@ import ./querycommontests const NumThreads = 20 # IO threads aren't attached to CPU count - N = 100 + ThreadTestLoops {.intdefine.} = 10 + N = ThreadTestLoops for i in 1..N: suite "Test Basic ThreadDatastore with SQLite": From 1fd80c6f2be318b9da26a5ffea34326cc3f236fb Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Wed, 27 Sep 2023 13:12:32 -0700 Subject: [PATCH 06/20] fix merge --- datastore/threads/threadproxyds.nim | 2 +- tests/datastore/testthreadproxyds.nim | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index b573bf1..422c548 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -204,7 +204,7 @@ method put*[BT](self: ThreadDatastore[BT], return ctx[].res.toRes() method put*[DB]( - self: ThreadDatastore, + self: ThreadDatastore[DB], batch: seq[BatchEntry]): Future[?!void] {.async.} = ## put batch data for entry in batch: diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index f415ba6..000c52d 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -63,7 +63,6 @@ for i in 1..N: key = Key.init("/a/b").tryGet() bytes = "some bytes".toBytes otherBytes = "some other bytes".toBytes - setup: sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() From c51d354f72a40ea7c0b9b09c126ab80e2294b6a9 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Wed, 27 Sep 2023 13:16:30 -0700 Subject: [PATCH 07/20] add nextSignal using mutex --- datastore/threads/threadproxyds.nim | 8 ++++++++ tests/datastore/testthreadproxyds.nim | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 422c548..e273e06 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -41,6 +41,7 @@ type signal: ThreadSignalPtr running: bool ## used to mark when a task worker is running cancelled: bool ## used to cancel a task before it's started + nextSignal: (Lock, Cond) TaskCtx*[T] = SharedPtr[TaskCtxObj[T]] ## Task context object. @@ -72,6 +73,13 @@ proc setDone[T](ctx: TaskCtx[T]) = # withLock(ctxLock): ctx[].running = false +proc waitSync*(sig: var (Lock, Cond)) = + withLock(sig[0]): + wait(sig[1], sig[0]) +proc fireSync*(sig: var (Lock, Cond)) = + withLock(sig[0]): + signal(sig[1]) + proc acquireSignal(): ?!ThreadSignalPtr = let signal = ThreadSignalPtr.new() if signal.isErr(): diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index 000c52d..57fea88 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -24,7 +24,7 @@ import ./querycommontests const NumThreads = 20 # IO threads aren't attached to CPU count - ThreadTestLoops {.intdefine.} = 10 + ThreadTestLoops {.intdefine.} = 100 N = ThreadTestLoops for i in 1..N: From 30728c1d0f0ae85068bd774f8e0ca420eaee293c Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Wed, 27 Sep 2023 13:24:32 -0700 Subject: [PATCH 08/20] add nextSignal using mutex --- datastore/threads/threadproxyds.nim | 17 ++++------------- datastore/threads/threadresult.nim | 22 ++++++++++++++++++++++ 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index e273e06..1c5528b 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -41,7 +41,7 @@ type signal: ThreadSignalPtr running: bool ## used to mark when a task worker is running cancelled: bool ## used to cancel a task before it's started - nextSignal: (Lock, Cond) + nextSignal: MutexSignal TaskCtx*[T] = SharedPtr[TaskCtxObj[T]] ## Task context object. @@ -73,13 +73,6 @@ proc setDone[T](ctx: TaskCtx[T]) = # withLock(ctxLock): ctx[].running = false -proc waitSync*(sig: var (Lock, Cond)) = - withLock(sig[0]): - wait(sig[1], sig[0]) -proc fireSync*(sig: var (Lock, Cond)) = - withLock(sig[0]): - signal(sig[1]) - proc acquireSignal(): ?!ThreadSignalPtr = let signal = ThreadSignalPtr.new() if signal.isErr(): @@ -254,7 +247,6 @@ method queryTask[DB]( ctx: TaskCtx[QResult], ds: DB, query: DbQuery[KeyId], - nextSignal: ThreadSignalPtr ) {.gcsafe, nimcall.} = ## run query command executeTask(ctx): @@ -299,8 +291,8 @@ method query*[BT](self: ThreadDatastore[BT], await self.semaphore.acquire() without signal =? acquireSignal(), err: return failure err - without nextSignal =? acquireSignal(), err: - return failure err + let ctx = newTaskCtx(QResult, signal=signal) + ctx[].nextSignal.init() try: let query = dbQuery( @@ -308,9 +300,8 @@ method query*[BT](self: ThreadDatastore[BT], value=q.value, limit=q.limit, offset=q.offset, sort=q.sort) # setup initial queryTask - let ctx = newTaskCtx(QResult, signal=signal) dispatchTaskWrap(self, signal): - self.tp.spawn queryTask(ctx, ds, query, nextSignal) + self.tp.spawn queryTask(ctx, ds, query) await nextSignal.fire() var diff --git a/datastore/threads/threadresult.nim b/datastore/threads/threadresult.nim index da0df6d..9caa70c 100644 --- a/datastore/threads/threadresult.nim +++ b/datastore/threads/threadresult.nim @@ -1,5 +1,6 @@ import std/atomics import std/options +import std/locks import pkg/questionable/results import pkg/results @@ -51,3 +52,24 @@ proc toRes*[T,S](res: ThreadResult[T], result.err res.error().toExc() else: result.ok m(res.get()) + +type + MutexSignal* = tuple[lock: Lock, cond: Cond, open: bool] + +proc open*(sig: var MutexSignal) = + sig.lock.initLock() + sig.cond.initCond() + sig.open = true + +proc waitSync*(sig: var MutexSignal) = + withLock(sig.lock): + wait(sig.cond, sig.lock) + +proc fireSync*(sig: var MutexSignal) = + withLock(sig.lock): + signal(sig.cond) + +proc close*(sig: var MutexSignal) = + if sig.open: + sig.lock.deinitLock() + sig.cond.deinitCond() From e571d2116a986fb7f2233e084170a182afee9b05 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Wed, 27 Sep 2023 13:28:58 -0700 Subject: [PATCH 09/20] add nextSignal using mutex --- datastore/threads/threadproxyds.nim | 14 ++++++-------- datastore/threads/threadresult.nim | 6 +++--- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 1c5528b..8a543b8 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -260,8 +260,7 @@ method queryTask[DB]( # otherwise manually an set empty ok result ctx[].res.ok (KeyId.none, DataBuffer(), ) discard ctx[].signal.fireSync() - if not nextSignal.waitSync(10.seconds).get(): - raise newException(DeadThreadDefect, "query task timeout; possible deadlock!") + ctx[].nextSignal.wait() var handle = handleRes.get() for item in handle.iter(): @@ -276,8 +275,7 @@ method queryTask[DB]( exc discard ctx[].signal.fireSync() - - discard nextSignal.waitSync().get() + ctx[].nextSignal.wait() # set final result (?!QResult).ok((KeyId.none, DataBuffer())) @@ -302,7 +300,7 @@ method query*[BT](self: ThreadDatastore[BT], # setup initial queryTask dispatchTaskWrap(self, signal): self.tp.spawn queryTask(ctx, ds, query) - await nextSignal.fire() + ctx[].nextSignal.fire() var lock = newAsyncLock() # serialize querying under threads @@ -323,7 +321,7 @@ method query*[BT](self: ThreadDatastore[BT], iter.finished = true defer: - await nextSignal.fire() + ctx[].nextSignal.fire() if ctx[].res.isErr(): return err(ctx[].res.error()) @@ -337,7 +335,7 @@ method query*[BT](self: ThreadDatastore[BT], ctx.setCancelled() discard ctx[].signal.close() echo "nextSignal:CLOSE!" - discard nextSignal.close() + ctx[].nextSignal.close() self.semaphore.release() raise exc @@ -347,7 +345,7 @@ method query*[BT](self: ThreadDatastore[BT], trace "Cancelling thread future!", exc = exc.msg discard signal.close() echo "nextSignal:CLOSE!" - discard nextSignal.close() + ctx[].nextSignal.close() self.semaphore.release() raise exc diff --git a/datastore/threads/threadresult.nim b/datastore/threads/threadresult.nim index 9caa70c..3508e9e 100644 --- a/datastore/threads/threadresult.nim +++ b/datastore/threads/threadresult.nim @@ -56,16 +56,16 @@ proc toRes*[T,S](res: ThreadResult[T], type MutexSignal* = tuple[lock: Lock, cond: Cond, open: bool] -proc open*(sig: var MutexSignal) = +proc init*(sig: var MutexSignal) = sig.lock.initLock() sig.cond.initCond() sig.open = true -proc waitSync*(sig: var MutexSignal) = +proc wait*(sig: var MutexSignal) = withLock(sig.lock): wait(sig.cond, sig.lock) -proc fireSync*(sig: var MutexSignal) = +proc fire*(sig: var MutexSignal) = withLock(sig.lock): signal(sig.cond) From 3cc21b3a59279cc01b971b12e6258c5e0dcaa56c Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Wed, 27 Sep 2023 13:30:14 -0700 Subject: [PATCH 10/20] add nextSignal using mutex --- tests/datastore/testthreadproxyds.nim | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index 57fea88..c5b6ab0 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -24,7 +24,7 @@ import ./querycommontests const NumThreads = 20 # IO threads aren't attached to CPU count - ThreadTestLoops {.intdefine.} = 100 + ThreadTestLoops {.intdefine.} = 1000 N = ThreadTestLoops for i in 1..N: @@ -60,9 +60,6 @@ for i in 1..N: sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() taskPool = Taskpool.new(NumThreads) ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() - key = Key.init("/a/b").tryGet() - bytes = "some bytes".toBytes - otherBytes = "some other bytes".toBytes setup: sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() From 1da59ba730c03e1dae30bd2607b2c29ab0991782 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Wed, 27 Sep 2023 13:35:14 -0700 Subject: [PATCH 11/20] add nextSignal using mutex --- datastore/threads/threadproxyds.nim | 3 +++ tests/datastore/testthreadproxyds.nim | 10 +++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 8a543b8..ae9f64c 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -131,6 +131,7 @@ template dispatchTask[BT](self: ThreadDatastore[BT], ctx.setCancelled() raise exc finally: + echo "signal:CLOSE!" discard ctx[].signal.close() self.semaphore.release() @@ -333,6 +334,7 @@ method query*[BT](self: ThreadDatastore[BT], except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg ctx.setCancelled() + echo "signal:CLOSE!" discard ctx[].signal.close() echo "nextSignal:CLOSE!" ctx[].nextSignal.close() @@ -343,6 +345,7 @@ method query*[BT](self: ThreadDatastore[BT], return success iter except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg + echo "signal:CLOSE!" discard signal.close() echo "nextSignal:CLOSE!" ctx[].nextSignal.close() diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index c5b6ab0..e0cb822 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -24,8 +24,10 @@ import ./querycommontests const NumThreads = 20 # IO threads aren't attached to CPU count - ThreadTestLoops {.intdefine.} = 1000 + ThreadTestLoops {.intdefine.} = 1 N = ThreadTestLoops + ThreadTestInnerLoops {.intdefine.} = 1 + M = ThreadTestInnerLoops for i in 1..N: suite "Test Basic ThreadDatastore with SQLite": @@ -50,7 +52,8 @@ for i in 1..N: (await ds.close()).tryGet() taskPool.shutdown() - basicStoreTests(ds, key, bytes, otherBytes) + for i in 1..M: + basicStoreTests(ds, key, bytes, otherBytes) GC_fullCollect() for i in 1..N: @@ -72,7 +75,8 @@ for i in 1..N: (await ds.close()).tryGet() taskPool.shutdown() - queryTests(ds, true) + for i in 1..M: + queryTests(ds, true) GC_fullCollect() # suite "Test Basic ThreadDatastore with fsds": From ac77917146c3bdeed72dc2bc9e4d7e75f61f6da6 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Wed, 27 Sep 2023 13:36:16 -0700 Subject: [PATCH 12/20] remove ctx locks --- datastore/threads/threadproxyds.nim | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index ae9f64c..8c84fc7 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -53,27 +53,22 @@ type semaphore: AsyncSemaphore # semaphore is used for backpressure \ # to avoid exhausting file descriptors -var ctxLock: Lock -ctxLock.initLock() - proc newTaskCtx*[T](tp: typedesc[T], signal: ThreadSignalPtr): TaskCtx[T] = newSharedPtr(TaskCtxObj[T](signal: signal)) proc setCancelled[T](ctx: TaskCtx[T]) = - # withLock(ctxLock): ctx[].cancelled = true proc setRunning[T](ctx: TaskCtx[T]): bool = - # withLock(ctxLock): if ctx[].cancelled: return false ctx[].running = true return true proc setDone[T](ctx: TaskCtx[T]) = - # withLock(ctxLock): ctx[].running = false proc acquireSignal(): ?!ThreadSignalPtr = + echo "signal:OPEN!" let signal = ThreadSignalPtr.new() if signal.isErr(): failure (ref CatchableError)(msg: "failed to aquire ThreadSignalPtr: " & signal.error()) From ca5695f2c8fc87dd2d5e8803ddc35184e1d34111 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Wed, 27 Sep 2023 13:37:12 -0700 Subject: [PATCH 13/20] remove ctx locks --- datastore/threads/threadproxyds.nim | 1 + 1 file changed, 1 insertion(+) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 8c84fc7..dbe2554 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -286,6 +286,7 @@ method query*[BT](self: ThreadDatastore[BT], without signal =? acquireSignal(), err: return failure err let ctx = newTaskCtx(QResult, signal=signal) + echo "nextSignal:OPEN!" ctx[].nextSignal.init() try: From fbc00613c477b9c603316f43c2116094843f5d11 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Wed, 27 Sep 2023 13:50:21 -0700 Subject: [PATCH 14/20] remove ctx locks --- datastore/threads/threadproxyds.nim | 31 +++++++++++++-------------- tests/datastore/testthreadproxyds.nim | 4 ++-- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index dbe2554..4a6e514 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -289,6 +289,13 @@ method query*[BT](self: ThreadDatastore[BT], echo "nextSignal:OPEN!" ctx[].nextSignal.init() + proc iterDispose() = + echo "signal:CLOSE!" + discard signal.close() + echo "nextSignal:CLOSE!" + ctx[].nextSignal.close() + self.semaphore.release() + try: let query = dbQuery( key= KeyId.new q.key.id(), @@ -299,11 +306,13 @@ method query*[BT](self: ThreadDatastore[BT], self.tp.spawn queryTask(ctx, ds, query) ctx[].nextSignal.fire() - var - lock = newAsyncLock() # serialize querying under threads - iter = QueryIter.new() + var lock = newAsyncLock() # serialize querying under threads + var iter = QueryIter.new() + iter.dispose = proc (): Future[?!void] {.async.} = + iterDispose() + success() - proc next(): Future[?!QueryResponse] {.async.} = + iter.next = proc(): Future[?!QueryResponse] {.async.} = let ctx = ctx try: trace "About to query" @@ -313,7 +322,6 @@ method query*[BT](self: ThreadDatastore[BT], return failure (ref QueryEndedError)(msg: "Calling next on a finished query!") await wait(ctx[].signal) - if not ctx[].running: iter.finished = true @@ -330,22 +338,13 @@ method query*[BT](self: ThreadDatastore[BT], except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg ctx.setCancelled() - echo "signal:CLOSE!" - discard ctx[].signal.close() - echo "nextSignal:CLOSE!" - ctx[].nextSignal.close() - self.semaphore.release() + iterDispose() raise exc - iter.next = next return success iter except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg - echo "signal:CLOSE!" - discard signal.close() - echo "nextSignal:CLOSE!" - ctx[].nextSignal.close() - self.semaphore.release() + iterDispose() raise exc proc new*[DB](self: type ThreadDatastore, diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index e0cb822..0926f07 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -52,8 +52,8 @@ for i in 1..N: (await ds.close()).tryGet() taskPool.shutdown() - for i in 1..M: - basicStoreTests(ds, key, bytes, otherBytes) + # for i in 1..M: + # basicStoreTests(ds, key, bytes, otherBytes) GC_fullCollect() for i in 1..N: From efd2e1d19d9f792fe7b5e4642e737634c2d34191 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Wed, 27 Sep 2023 14:01:21 -0700 Subject: [PATCH 15/20] running 1000+ outer loops --- datastore/threads/threadproxyds.nim | 10 +++++----- tests/datastore/testthreadproxyds.nim | 16 ++++++++-------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 4a6e514..3085d6c 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -68,7 +68,7 @@ proc setDone[T](ctx: TaskCtx[T]) = ctx[].running = false proc acquireSignal(): ?!ThreadSignalPtr = - echo "signal:OPEN!" + # echo "signal:OPEN!" let signal = ThreadSignalPtr.new() if signal.isErr(): failure (ref CatchableError)(msg: "failed to aquire ThreadSignalPtr: " & signal.error()) @@ -126,7 +126,7 @@ template dispatchTask[BT](self: ThreadDatastore[BT], ctx.setCancelled() raise exc finally: - echo "signal:CLOSE!" + # echo "signal:CLOSE!" discard ctx[].signal.close() self.semaphore.release() @@ -286,13 +286,13 @@ method query*[BT](self: ThreadDatastore[BT], without signal =? acquireSignal(), err: return failure err let ctx = newTaskCtx(QResult, signal=signal) - echo "nextSignal:OPEN!" + # echo "nextSignal:OPEN!" ctx[].nextSignal.init() proc iterDispose() = - echo "signal:CLOSE!" + # echo "signal:CLOSE!" discard signal.close() - echo "nextSignal:CLOSE!" + # echo "nextSignal:CLOSE!" ctx[].nextSignal.close() self.semaphore.release() diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index 0926f07..8ab5339 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -33,9 +33,9 @@ for i in 1..N: suite "Test Basic ThreadDatastore with SQLite": var - sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() - taskPool = Taskpool.new(NumThreads) - ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() + sqlStore: SQLiteBackend[KeyId, DataBuffer] + taskPool: Taskpool + ds: ThreadDatastore[SQLiteBackend[KeyId, DataBuffer]] key = Key.init("/a/b").tryGet() bytes = "some bytes".toBytes otherBytes = "some other bytes".toBytes @@ -52,17 +52,17 @@ for i in 1..N: (await ds.close()).tryGet() taskPool.shutdown() - # for i in 1..M: - # basicStoreTests(ds, key, bytes, otherBytes) + for i in 1..M: + basicStoreTests(ds, key, bytes, otherBytes) GC_fullCollect() for i in 1..N: suite "Test Query ThreadDatastore with SQLite": var - sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() - taskPool = Taskpool.new(NumThreads) - ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() + sqlStore: SQLiteBackend[KeyId, DataBuffer] + taskPool: Taskpool + ds: ThreadDatastore[SQLiteBackend[KeyId, DataBuffer]] setup: sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() From a4748ef4c6585164734e56643fe284522c832f21 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Wed, 27 Sep 2023 15:30:21 -0700 Subject: [PATCH 16/20] running 1000+ outer loops --- tests/datastore/testthreadproxyds.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index 8ab5339..841dfcd 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -30,7 +30,7 @@ const M = ThreadTestInnerLoops for i in 1..N: - suite "Test Basic ThreadDatastore with SQLite": + suite "Test Basic ThreadDatastore with SQLite " & $i: var sqlStore: SQLiteBackend[KeyId, DataBuffer] @@ -57,7 +57,7 @@ for i in 1..N: GC_fullCollect() for i in 1..N: - suite "Test Query ThreadDatastore with SQLite": + suite "Test Query ThreadDatastore with SQLite " & $i: var sqlStore: SQLiteBackend[KeyId, DataBuffer] From 78ea3b117e9916904e513425b1fb77c42480649f Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Wed, 27 Sep 2023 16:01:40 -0700 Subject: [PATCH 17/20] global taskpool --- datastore/threads/threadproxyds.nim | 2 ++ tests/datastore/testthreadproxyds.nim | 17 ++++++++++------- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 3085d6c..c192b06 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -291,6 +291,8 @@ method query*[BT](self: ThreadDatastore[BT], proc iterDispose() = # echo "signal:CLOSE!" + ctx.setCancelled() + ctx[].nextSignal.fire() discard signal.close() # echo "nextSignal:CLOSE!" ctx[].nextSignal.close() diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index 841dfcd..9bb4849 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -29,12 +29,14 @@ const ThreadTestInnerLoops {.intdefine.} = 1 M = ThreadTestInnerLoops -for i in 1..N: +var + taskPool: Taskpool = Taskpool.new(NumThreads) + +for i in 1..1: suite "Test Basic ThreadDatastore with SQLite " & $i: var sqlStore: SQLiteBackend[KeyId, DataBuffer] - taskPool: Taskpool ds: ThreadDatastore[SQLiteBackend[KeyId, DataBuffer]] key = Key.init("/a/b").tryGet() bytes = "some bytes".toBytes @@ -42,7 +44,7 @@ for i in 1..N: setupAll: sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() - taskPool = Taskpool.new(NumThreads) + # taskPool = Taskpool.new(NumThreads) ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() teardown: @@ -50,30 +52,31 @@ for i in 1..N: teardownAll: (await ds.close()).tryGet() - taskPool.shutdown() + # taskPool.shutdown() for i in 1..M: basicStoreTests(ds, key, bytes, otherBytes) GC_fullCollect() + for i in 1..N: suite "Test Query ThreadDatastore with SQLite " & $i: var sqlStore: SQLiteBackend[KeyId, DataBuffer] - taskPool: Taskpool + # taskPool: Taskpool ds: ThreadDatastore[SQLiteBackend[KeyId, DataBuffer]] setup: sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() - taskPool = Taskpool.new(NumThreads) + # taskPool = Taskpool.new(NumThreads) ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() teardown: GC_fullCollect() (await ds.close()).tryGet() - taskPool.shutdown() + # taskPool.shutdown() for i in 1..M: queryTests(ds, true) From 5afec2b3d8cfefb74216ee8ac3c9cad21ed3b011 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Wed, 27 Sep 2023 16:16:18 -0700 Subject: [PATCH 18/20] change nextSignal back to ThreadSignalPtr for timeouts --- datastore/threads/threadproxyds.nim | 38 ++++++++++++++++------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index c192b06..8be8a8a 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -41,7 +41,7 @@ type signal: ThreadSignalPtr running: bool ## used to mark when a task worker is running cancelled: bool ## used to cancel a task before it's started - nextSignal: MutexSignal + nextSignal: ThreadSignalPtr TaskCtx*[T] = SharedPtr[TaskCtxObj[T]] ## Task context object. @@ -53,8 +53,10 @@ type semaphore: AsyncSemaphore # semaphore is used for backpressure \ # to avoid exhausting file descriptors -proc newTaskCtx*[T](tp: typedesc[T], signal: ThreadSignalPtr): TaskCtx[T] = - newSharedPtr(TaskCtxObj[T](signal: signal)) +proc newTaskCtx*[T](tp: typedesc[T], + signal: ThreadSignalPtr, + nextSignal: ThreadSignalPtr = nil): TaskCtx[T] = + newSharedPtr(TaskCtxObj[T](signal: signal, nextSignal: nextSignal)) proc setCancelled[T](ctx: TaskCtx[T]) = ctx[].cancelled = true @@ -137,7 +139,7 @@ proc hasTask[T, DB](ctx: TaskCtx[T], ds: DB, key: KeyId) {.gcsafe.} = has(ds, key) method has*[BT](self: ThreadDatastore[BT], - key: Key): Future[?!bool] {.async.} = + key: Key): Future[?!bool] {.async.} = await self.semaphore.acquire() without signal =? acquireSignal(), err: return failure err @@ -256,7 +258,8 @@ method queryTask[DB]( # otherwise manually an set empty ok result ctx[].res.ok (KeyId.none, DataBuffer(), ) discard ctx[].signal.fireSync() - ctx[].nextSignal.wait() + if not ctx[].nextSignal.waitSync(10.seconds).get(): + raise newException(DeadThreadDefect, "queryTask timed out") var handle = handleRes.get() for item in handle.iter(): @@ -271,7 +274,8 @@ method queryTask[DB]( exc discard ctx[].signal.fireSync() - ctx[].nextSignal.wait() + if not ctx[].nextSignal.waitSync(10.seconds).get(): + raise newException(DeadThreadDefect, "queryTask timed out") # set final result (?!QResult).ok((KeyId.none, DataBuffer())) @@ -285,17 +289,17 @@ method query*[BT](self: ThreadDatastore[BT], await self.semaphore.acquire() without signal =? acquireSignal(), err: return failure err - let ctx = newTaskCtx(QResult, signal=signal) - # echo "nextSignal:OPEN!" - ctx[].nextSignal.init() + without nextSignal =? acquireSignal(), err: + return failure err + let ctx = newTaskCtx(QResult, signal=signal, nextSignal=nextSignal) - proc iterDispose() = + proc iterDispose() {.async.} = # echo "signal:CLOSE!" ctx.setCancelled() - ctx[].nextSignal.fire() - discard signal.close() + await ctx[].nextSignal.fire() + discard ctx[].signal.close() # echo "nextSignal:CLOSE!" - ctx[].nextSignal.close() + discard ctx[].nextSignal.close() self.semaphore.release() try: @@ -306,7 +310,7 @@ method query*[BT](self: ThreadDatastore[BT], # setup initial queryTask dispatchTaskWrap(self, signal): self.tp.spawn queryTask(ctx, ds, query) - ctx[].nextSignal.fire() + await ctx[].nextSignal.fire() var lock = newAsyncLock() # serialize querying under threads var iter = QueryIter.new() @@ -328,7 +332,7 @@ method query*[BT](self: ThreadDatastore[BT], iter.finished = true defer: - ctx[].nextSignal.fire() + await ctx[].nextSignal.fire() if ctx[].res.isErr(): return err(ctx[].res.error()) @@ -340,13 +344,13 @@ method query*[BT](self: ThreadDatastore[BT], except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg ctx.setCancelled() - iterDispose() + await iterDispose() # todo: is this valid? raise exc return success iter except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg - iterDispose() + await iterDispose() raise exc proc new*[DB](self: type ThreadDatastore, From 69529876268f305651f0e65eca300395e7bfa66b Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Wed, 27 Sep 2023 16:17:02 -0700 Subject: [PATCH 19/20] running 1000+ outer loops --- tests/datastore/testthreadproxyds.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index 9bb4849..d59992c 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -32,7 +32,7 @@ const var taskPool: Taskpool = Taskpool.new(NumThreads) -for i in 1..1: +for i in 1..N: suite "Test Basic ThreadDatastore with SQLite " & $i: var From 74953e175b58dc5a7e8d9d09661c2e1a10ece2c4 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Wed, 27 Sep 2023 16:30:47 -0700 Subject: [PATCH 20/20] try newer questionable? --- datastore.nimble | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datastore.nimble b/datastore.nimble index 3b5b28e..bcd3139 100644 --- a/datastore.nimble +++ b/datastore.nimble @@ -9,7 +9,7 @@ license = "Apache License 2.0 or MIT" requires "nim >= 1.6.14", "asynctest >= 0.3.1 & < 0.4.0", "chronos#0277b65be2c7a365ac13df002fba6e172be55537", - "questionable >= 0.10.3 & < 0.11.0", + "questionable", "sqlite3_abi", "stew", "unittest2",