check for nil ctx and set iter.finished correctly

This commit is contained in:
Dmitriy Ryajov 2023-09-19 21:10:38 -06:00
parent 181168d073
commit 14f8c3a71c
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4

View File

@ -115,6 +115,10 @@ proc signalMonitor[T](ctx: ptr TaskCtx, fut: Future[T]) {.async.} =
## the cancellation flag is set ## the cancellation flag is set
## ##
if ctx.isNil:
trace "ctx is nil"
return
try: try:
await ctx[].signal.wait() await ctx[].signal.wait()
trace "Received signal" trace "Received signal"
@ -134,7 +138,12 @@ proc asyncHasTask(
ctx: ptr TaskCtx[bool], ctx: ptr TaskCtx[bool],
key: ptr Key) {.async.} = key: ptr Key) {.async.} =
defer: defer:
discard ctx[].signal.fireSync() if not ctx.isNil:
discard ctx[].signal.fireSync()
if ctx.isNil:
trace "ctx is nil"
return
let let
fut = ctx[].ds.has(key[]) fut = ctx[].ds.has(key[])
@ -167,7 +176,12 @@ method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} =
proc asyncDelTask(ctx: ptr TaskCtx[void], key: ptr Key) {.async.} = proc asyncDelTask(ctx: ptr TaskCtx[void], key: ptr Key) {.async.} =
defer: defer:
discard ctx[].signal.fireSync() if not ctx.isNil:
discard ctx[].signal.fireSync()
if ctx.isNil:
trace "ctx is nil"
return
let let
fut = ctx[].ds.delete(key[]) fut = ctx[].ds.delete(key[])
@ -218,7 +232,12 @@ proc asyncPutTask(
data: ptr UncheckedArray[byte], data: ptr UncheckedArray[byte],
len: int) {.async.} = len: int) {.async.} =
defer: defer:
discard ctx[].signal.fireSync() if not ctx.isNil:
discard ctx[].signal.fireSync()
if ctx.isNil:
trace "ctx is nil"
return
let let
fut = ctx[].ds.put(key[], @(data.toOpenArray(0, len - 1))) fut = ctx[].ds.put(key[], @(data.toOpenArray(0, len - 1)))
@ -278,7 +297,12 @@ proc asyncGetTask(
ctx: ptr TaskCtx[DataBuffer], ctx: ptr TaskCtx[DataBuffer],
key: ptr Key) {.async.} = key: ptr Key) {.async.} =
defer: defer:
discard ctx[].signal.fireSync() if not ctx.isNil:
discard ctx[].signal.fireSync()
if ctx.isNil:
trace "ctx is nil"
return
let let
fut = ctx[].ds.get(key[]) fut = ctx[].ds.get(key[])
@ -328,7 +352,12 @@ proc asyncQueryTask(
ctx: ptr TaskCtx, ctx: ptr TaskCtx,
iter: ptr QueryIter) {.async.} = iter: ptr QueryIter) {.async.} =
defer: defer:
discard ctx[].signal.fireSync() if not ctx.isNil:
discard ctx[].signal.fireSync()
if ctx.isNil or iter.isNil:
trace "ctx is nil"
return
let let
fut = iter[].next() fut = iter[].next()
@ -368,24 +397,23 @@ method query*(
var var
iter = QueryIter.new() iter = QueryIter.new()
locked = false lock = newAsyncLock() # serialize querying under threads
proc next(): Future[?!QueryResponse] {.async.} = proc next(): Future[?!QueryResponse] {.async.} =
defer: defer:
locked = false if lock.locked:
lock.release()
trace "About to query" trace "About to query"
if locked: if lock.locked:
return failure (ref DatastoreError)(msg: "Should always await query features") return failure (ref DatastoreError)(msg: "Should always await query features")
locked = true await lock.acquire()
if iter.finished == true: if iter.finished == true:
return failure (ref QueryEndedError)(msg: "Calling next on a finished query!") return failure (ref QueryEndedError)(msg: "Calling next on a finished query!")
if iter.finished == true: iter.finished = childIter.finished
return success (Key.none, EmptyBytes)
var var
res = ThreadResult[ThreadQueryRes]() res = ThreadResult[ThreadQueryRes]()
ctx = TaskCtx[ThreadQueryRes]( ctx = TaskCtx[ThreadQueryRes](