mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-26 01:13:07 +00:00
remove ctx locks
This commit is contained in:
parent
ca5695f2c8
commit
fbc00613c4
@ -289,6 +289,13 @@ method query*[BT](self: ThreadDatastore[BT],
|
||||
echo "nextSignal:OPEN!"
|
||||
ctx[].nextSignal.init()
|
||||
|
||||
proc iterDispose() =
|
||||
echo "signal:CLOSE!"
|
||||
discard signal.close()
|
||||
echo "nextSignal:CLOSE!"
|
||||
ctx[].nextSignal.close()
|
||||
self.semaphore.release()
|
||||
|
||||
try:
|
||||
let query = dbQuery(
|
||||
key= KeyId.new q.key.id(),
|
||||
@ -299,11 +306,13 @@ method query*[BT](self: ThreadDatastore[BT],
|
||||
self.tp.spawn queryTask(ctx, ds, query)
|
||||
ctx[].nextSignal.fire()
|
||||
|
||||
var
|
||||
lock = newAsyncLock() # serialize querying under threads
|
||||
iter = QueryIter.new()
|
||||
var lock = newAsyncLock() # serialize querying under threads
|
||||
var iter = QueryIter.new()
|
||||
iter.dispose = proc (): Future[?!void] {.async.} =
|
||||
iterDispose()
|
||||
success()
|
||||
|
||||
proc next(): Future[?!QueryResponse] {.async.} =
|
||||
iter.next = proc(): Future[?!QueryResponse] {.async.} =
|
||||
let ctx = ctx
|
||||
try:
|
||||
trace "About to query"
|
||||
@ -313,7 +322,6 @@ method query*[BT](self: ThreadDatastore[BT],
|
||||
return failure (ref QueryEndedError)(msg: "Calling next on a finished query!")
|
||||
|
||||
await wait(ctx[].signal)
|
||||
|
||||
if not ctx[].running:
|
||||
iter.finished = true
|
||||
|
||||
@ -330,22 +338,13 @@ method query*[BT](self: ThreadDatastore[BT],
|
||||
except CancelledError as exc:
|
||||
trace "Cancelling thread future!", exc = exc.msg
|
||||
ctx.setCancelled()
|
||||
echo "signal:CLOSE!"
|
||||
discard ctx[].signal.close()
|
||||
echo "nextSignal:CLOSE!"
|
||||
ctx[].nextSignal.close()
|
||||
self.semaphore.release()
|
||||
iterDispose()
|
||||
raise exc
|
||||
|
||||
iter.next = next
|
||||
return success iter
|
||||
except CancelledError as exc:
|
||||
trace "Cancelling thread future!", exc = exc.msg
|
||||
echo "signal:CLOSE!"
|
||||
discard signal.close()
|
||||
echo "nextSignal:CLOSE!"
|
||||
ctx[].nextSignal.close()
|
||||
self.semaphore.release()
|
||||
iterDispose()
|
||||
raise exc
|
||||
|
||||
proc new*[DB](self: type ThreadDatastore,
|
||||
|
||||
@ -52,8 +52,8 @@ for i in 1..N:
|
||||
(await ds.close()).tryGet()
|
||||
taskPool.shutdown()
|
||||
|
||||
for i in 1..M:
|
||||
basicStoreTests(ds, key, bytes, otherBytes)
|
||||
# for i in 1..M:
|
||||
# basicStoreTests(ds, key, bytes, otherBytes)
|
||||
GC_fullCollect()
|
||||
|
||||
for i in 1..N:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user