mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-08 08:33:10 +00:00
fix occasional deadlock
This commit is contained in:
parent
4ff162e0c5
commit
7169a7d20f
@ -265,7 +265,7 @@ proc queryTask[DB](
|
||||
# otherwise manually an set empty ok result
|
||||
ctx[].res.ok (KeyId.none, DataBuffer(), )
|
||||
discard ctx[].signal.fireSync()
|
||||
if not nextSignal.waitSync(1.seconds).get():
|
||||
if not nextSignal.waitSync(10.seconds).get():
|
||||
raise newException(DeadThreadDefect, "query task timeout; possible deadlock!")
|
||||
|
||||
var handle = handleRes.get()
|
||||
@ -287,58 +287,73 @@ proc queryTask[DB](
|
||||
# set final result
|
||||
(?!QResult).ok((KeyId.none, DataBuffer()))
|
||||
|
||||
method query*(
|
||||
self: ThreadDatastore,
|
||||
q: Query): Future[?!QueryIter] {.async.} =
|
||||
|
||||
method query*(self: ThreadDatastore,
|
||||
q: Query
|
||||
): Future[?!QueryIter] {.async.} =
|
||||
## performs async query
|
||||
## keeps one thread running queryTask until finished
|
||||
##
|
||||
await self.semaphore.acquire()
|
||||
without signal =? acquireSignal(), err:
|
||||
return failure err
|
||||
without nextSignal =? acquireSignal(), err:
|
||||
return failure err
|
||||
|
||||
let query = dbQuery(
|
||||
key= KeyId.new q.key.id(),
|
||||
value=q.value, limit=q.limit, offset=q.offset, sort=q.sort)
|
||||
try:
|
||||
let query = dbQuery(
|
||||
key= KeyId.new q.key.id(),
|
||||
value=q.value, limit=q.limit, offset=q.offset, sort=q.sort)
|
||||
|
||||
# setup initial queryTask
|
||||
let ctx {.inject.} = newSharedPtr(TaskCtxObj[QResult](signal: signal))
|
||||
dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer]](self, signal):
|
||||
self.tp.spawn queryTask(ctx, ds, query, nextSignal)
|
||||
await nextSignal.fire()
|
||||
# setup initial queryTask
|
||||
let ctx {.inject.} = newSharedPtr(TaskCtxObj[QResult](signal: signal))
|
||||
dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer]](self, signal):
|
||||
self.tp.spawn queryTask(ctx, ds, query, nextSignal)
|
||||
await nextSignal.fire()
|
||||
|
||||
var
|
||||
lock = newAsyncLock() # serialize querying under threads
|
||||
iter = QueryIter.new()
|
||||
var
|
||||
lock = newAsyncLock() # serialize querying under threads
|
||||
iter = QueryIter.new()
|
||||
|
||||
proc next(): Future[?!QueryResponse] {.async.} =
|
||||
let ctx = ctx
|
||||
proc next(): Future[?!QueryResponse] {.async.} =
|
||||
let ctx = ctx
|
||||
try:
|
||||
trace "About to query"
|
||||
if lock.locked:
|
||||
return failure (ref DatastoreError)(msg: "Should always await query features")
|
||||
if iter.finished == true:
|
||||
return failure (ref QueryEndedError)(msg: "Calling next on a finished query!")
|
||||
|
||||
trace "About to query"
|
||||
if lock.locked:
|
||||
return failure (ref DatastoreError)(msg: "Should always await query features")
|
||||
if iter.finished == true:
|
||||
return failure (ref QueryEndedError)(msg: "Calling next on a finished query!")
|
||||
await wait(ctx[].signal)
|
||||
|
||||
await wait(ctx[].signal)
|
||||
if not ctx[].running:
|
||||
iter.finished = true
|
||||
|
||||
if not ctx[].running:
|
||||
iter.finished = true
|
||||
defer:
|
||||
await nextSignal.fire()
|
||||
|
||||
defer:
|
||||
await nextSignal.fire()
|
||||
if ctx[].res.isErr():
|
||||
return err(ctx[].res.error())
|
||||
else:
|
||||
let qres = ctx[].res.get()
|
||||
let key = qres.key.map(proc (k: KeyId): Key = k.toKey())
|
||||
let data = qres.data.toSeq()
|
||||
return (?!QueryResponse).ok((key: key, data: data))
|
||||
except CancelledError as exc:
|
||||
trace "Cancelling thread future!", exc = exc.msg
|
||||
ctx.setCancelled()
|
||||
discard ctx[].signal.close()
|
||||
discard nextSignal.close()
|
||||
self.semaphore.release()
|
||||
raise exc
|
||||
|
||||
if ctx[].res.isErr():
|
||||
return err(ctx[].res.error())
|
||||
else:
|
||||
let qres = ctx[].res.get()
|
||||
let key = qres.key.map(proc (k: KeyId): Key = k.toKey())
|
||||
let data = qres.data.toSeq()
|
||||
return (?!QueryResponse).ok((key: key, data: data))
|
||||
|
||||
|
||||
iter.next = next
|
||||
return success iter
|
||||
iter.next = next
|
||||
return success iter
|
||||
except CancelledError as exc:
|
||||
trace "Cancelling thread future!", exc = exc.msg
|
||||
discard signal.close()
|
||||
discard nextSignal.close()
|
||||
self.semaphore.release()
|
||||
raise exc
|
||||
|
||||
proc new*[DB](self: type ThreadDatastore,
|
||||
db: DB,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user