diff --git a/datastore/sql.nim b/datastore/sql.nim index 5b68b53..28b1f3d 100644 --- a/datastore/sql.nim +++ b/datastore/sql.nim @@ -13,7 +13,7 @@ import ../datastore import ./backend import ./sql/sqliteds -export datastore +export datastore, sqliteds push: {.upraises: [].} @@ -42,7 +42,8 @@ method delete*(self: SQLiteDatastore, method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} = - self.db.get(KeyId.new key.id()) + self.db.get(KeyId.new key.id()).map() do(d: DataBuffer) -> seq[byte]: + d.toSeq() method put*(self: SQLiteDatastore, key: Key, @@ -79,7 +80,7 @@ method queryIter*( yield QueryResponse.failure err let k = qres.key.map() do(k: KeyId) -> Key: Key.init($k).expect("valid key") - let v: seq[byte] = qres.data + let v: seq[byte] = qres.data.toSeq() yield success (k, v) success iter @@ -87,20 +88,14 @@ method queryIter*( proc new*( T: type SQLiteDatastore, path: string, - readOnly = false): ?!T = + readOnly = false): ?!SQLiteDatastore = - let - flags = - if readOnly: SQLITE_OPEN_READONLY - else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE - - success T( - db: ? SQLiteDsDb.open(path, flags), - readOnly: readOnly) + success SQLiteDatastore( + db: ? newSQLiteBackend[KeyId, DataBuffer](path, readOnly)) proc new*( T: type SQLiteDatastore, - db: SQLiteDsDb): ?!T = + db: SQLiteBackend[KeyId, DataBuffer]): ?!T = success T( db: db, diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index ae14782..795274e 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -50,6 +50,7 @@ type signal: ThreadSignalPtr running: bool cancelled: bool + nextSignal: (Lock, Cond) TaskCtx*[T] = SharedPtr[TaskCtxObj[T]] @@ -251,7 +252,6 @@ proc queryTask[DB]( ctx: TaskCtx[QResult], ds: DB, query: DbQuery[KeyId], - nextSignal: ThreadSignalPtr ) {.gcsafe, nimcall.} = ## run query command executeTask(ctx): @@ -265,8 +265,11 @@ proc queryTask[DB]( # otherwise manually an set empty ok result ctx[].res.ok (KeyId.none, DataBuffer(), ) discard ctx[].signal.fireSync() - if not nextSignal.waitSync(10.seconds).get(): - raise newException(DeadThreadDefect, "query task timeout; possible deadlock!") + + withLock(ctx[].nextSignal[0]): + wait(ctx[].nextSignal[1], ctx[].nextSignal[0]) + # if not nextSignal.waitSync(10.seconds).get(): + # raise newException(DeadThreadDefect, "query task timeout; possible deadlock!") var handle = handleRes.get() for item in handle.iter(): @@ -282,7 +285,9 @@ proc queryTask[DB]( discard ctx[].signal.fireSync() - discard nextSignal.waitSync().get() + # discard nextSignal.waitSync().get() + withLock(ctx[].nextSignal[0]): + wait(ctx[].nextSignal[1], ctx[].nextSignal[0]) # set final result (?!QResult).ok((KeyId.none, DataBuffer())) @@ -296,8 +301,11 @@ method query*(self: ThreadDatastore, await self.semaphore.acquire() without signal =? acquireSignal(), err: return failure err - without nextSignal =? acquireSignal(), err: - return failure err + # without nextSignal =? acquireSignal(), err: + # return failure err + let ctx {.inject.} = newSharedPtr(TaskCtxObj[QResult](signal: signal)) + ctx[].nextSignal[0].initLock() + ctx[].nextSignal[1].initCond() try: let query = dbQuery( @@ -305,10 +313,10 @@ method query*(self: ThreadDatastore, value=q.value, limit=q.limit, offset=q.offset, sort=q.sort) # setup initial queryTask - let ctx {.inject.} = newSharedPtr(TaskCtxObj[QResult](signal: signal)) dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer]](self, signal): - self.tp.spawn queryTask(ctx, ds, query, nextSignal) - await nextSignal.fire() + self.tp.spawn queryTask(ctx, ds, query) + withLock(ctx[].nextSignal[0]): + signal(ctx[].nextSignal[1]) var lock = newAsyncLock() # serialize querying under threads @@ -329,7 +337,8 @@ method query*(self: ThreadDatastore, iter.finished = true defer: - await nextSignal.fire() + withLock(ctx[].nextSignal[0]): + signal(ctx[].nextSignal[1]) if ctx[].res.isErr(): return err(ctx[].res.error()) @@ -342,7 +351,7 @@ method query*(self: ThreadDatastore, trace "Cancelling thread future!", exc = exc.msg ctx.setCancelled() discard ctx[].signal.close() - discard nextSignal.close() + # discard nextSignal.close() self.semaphore.release() raise exc @@ -351,7 +360,7 @@ method query*(self: ThreadDatastore, except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg discard signal.close() - discard nextSignal.close() + # discard nextSignal.close() self.semaphore.release() raise exc diff --git a/tests/datastore/sql/testsqlite.nim b/tests/datastore/sql/testsqlite.nim index c629eb0..3e15c43 100644 --- a/tests/datastore/sql/testsqlite.nim +++ b/tests/datastore/sql/testsqlite.nim @@ -8,7 +8,7 @@ import pkg/chronos import pkg/stew/results import pkg/stew/byteutils -import pkg/datastore/sql/sqliteds +import pkg/datastore/sql import ../dscommontests import ../querycommontests diff --git a/tests/datastore/testasyncsemaphore.nim b/tests/datastore/testasyncsemaphore.nim index 3c8bb84..aefff19 100644 --- a/tests/datastore/testasyncsemaphore.nim +++ b/tests/datastore/testasyncsemaphore.nim @@ -98,7 +98,6 @@ suite "AsyncSemaphore": resource.inc() check resource > 0 and resource <= 3 let sleep = rand(0..10).millis - # echo sleep await sleepAsync(sleep) finally: resource.dec()