diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index dcff5e0..4e2a429 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -265,7 +265,7 @@ proc queryTask[DB]( # otherwise manually an set empty ok result ctx[].res.ok (KeyId.none, DataBuffer(), ) discard ctx[].signal.fireSync() - if not nextSignal.waitSync(1.seconds).get(): + if not nextSignal.waitSync(10.seconds).get(): raise newException(DeadThreadDefect, "query task timeout; possible deadlock!") var handle = handleRes.get() @@ -287,58 +287,73 @@ proc queryTask[DB]( # set final result (?!QResult).ok((KeyId.none, DataBuffer())) -method query*( - self: ThreadDatastore, - q: Query): Future[?!QueryIter] {.async.} = - +method query*(self: ThreadDatastore, + q: Query + ): Future[?!QueryIter] {.async.} = + ## performs async query + ## keeps one thread running queryTask until finished + ## await self.semaphore.acquire() without signal =? acquireSignal(), err: return failure err without nextSignal =? acquireSignal(), err: return failure err - let query = dbQuery( - key= KeyId.new q.key.id(), - value=q.value, limit=q.limit, offset=q.offset, sort=q.sort) + try: + let query = dbQuery( + key= KeyId.new q.key.id(), + value=q.value, limit=q.limit, offset=q.offset, sort=q.sort) - # setup initial queryTask - let ctx {.inject.} = newSharedPtr(TaskCtxObj[QResult](signal: signal)) - dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer]](self, signal): - self.tp.spawn queryTask(ctx, ds, query, nextSignal) - await nextSignal.fire() + # setup initial queryTask + let ctx {.inject.} = newSharedPtr(TaskCtxObj[QResult](signal: signal)) + dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer]](self, signal): + self.tp.spawn queryTask(ctx, ds, query, nextSignal) + await nextSignal.fire() - var - lock = newAsyncLock() # serialize querying under threads - iter = QueryIter.new() + var + lock = newAsyncLock() # serialize querying under threads + iter = QueryIter.new() - proc next(): Future[?!QueryResponse] {.async.} = - let ctx = ctx + proc next(): Future[?!QueryResponse] {.async.} = + let ctx = ctx + try: + trace "About to query" + if lock.locked: + return failure (ref DatastoreError)(msg: "Should always await query features") + if iter.finished == true: + return failure (ref QueryEndedError)(msg: "Calling next on a finished query!") - trace "About to query" - if lock.locked: - return failure (ref DatastoreError)(msg: "Should always await query features") - if iter.finished == true: - return failure (ref QueryEndedError)(msg: "Calling next on a finished query!") + await wait(ctx[].signal) - await wait(ctx[].signal) + if not ctx[].running: + iter.finished = true - if not ctx[].running: - iter.finished = true + defer: + await nextSignal.fire() - defer: - await nextSignal.fire() + if ctx[].res.isErr(): + return err(ctx[].res.error()) + else: + let qres = ctx[].res.get() + let key = qres.key.map(proc (k: KeyId): Key = k.toKey()) + let data = qres.data.toSeq() + return (?!QueryResponse).ok((key: key, data: data)) + except CancelledError as exc: + trace "Cancelling thread future!", exc = exc.msg + ctx.setCancelled() + discard ctx[].signal.close() + discard nextSignal.close() + self.semaphore.release() + raise exc - if ctx[].res.isErr(): - return err(ctx[].res.error()) - else: - let qres = ctx[].res.get() - let key = qres.key.map(proc (k: KeyId): Key = k.toKey()) - let data = qres.data.toSeq() - return (?!QueryResponse).ok((key: key, data: data)) - - - iter.next = next - return success iter + iter.next = next + return success iter + except CancelledError as exc: + trace "Cancelling thread future!", exc = exc.msg + discard signal.close() + discard nextSignal.close() + self.semaphore.release() + raise exc proc new*[DB](self: type ThreadDatastore, db: DB,