From 14f8c3a71cbc7682dc4c61bf386f18825e395003 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 19 Sep 2023 21:10:38 -0600 Subject: [PATCH] check for nil ctx and set iter.finished correctly --- datastore/threads/threadproxyds.nim | 52 ++++++++++++++++++++++------- 1 file changed, 40 insertions(+), 12 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index fc69a69..cf33160 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -115,6 +115,10 @@ proc signalMonitor[T](ctx: ptr TaskCtx, fut: Future[T]) {.async.} = ## the cancellation flag is set ## + if ctx.isNil: + trace "ctx is nil" + return + try: await ctx[].signal.wait() trace "Received signal" @@ -134,7 +138,12 @@ proc asyncHasTask( ctx: ptr TaskCtx[bool], key: ptr Key) {.async.} = defer: - discard ctx[].signal.fireSync() + if not ctx.isNil: + discard ctx[].signal.fireSync() + + if ctx.isNil: + trace "ctx is nil" + return let fut = ctx[].ds.has(key[]) @@ -167,7 +176,12 @@ method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} = proc asyncDelTask(ctx: ptr TaskCtx[void], key: ptr Key) {.async.} = defer: - discard ctx[].signal.fireSync() + if not ctx.isNil: + discard ctx[].signal.fireSync() + + if ctx.isNil: + trace "ctx is nil" + return let fut = ctx[].ds.delete(key[]) @@ -218,7 +232,12 @@ proc asyncPutTask( data: ptr UncheckedArray[byte], len: int) {.async.} = defer: - discard ctx[].signal.fireSync() + if not ctx.isNil: + discard ctx[].signal.fireSync() + + if ctx.isNil: + trace "ctx is nil" + return let fut = ctx[].ds.put(key[], @(data.toOpenArray(0, len - 1))) @@ -278,7 +297,12 @@ proc asyncGetTask( ctx: ptr TaskCtx[DataBuffer], key: ptr Key) {.async.} = defer: - discard ctx[].signal.fireSync() + if not ctx.isNil: + discard ctx[].signal.fireSync() + + if ctx.isNil: + trace "ctx is nil" + return let fut = ctx[].ds.get(key[]) @@ -328,7 +352,12 @@ proc asyncQueryTask( ctx: ptr TaskCtx, iter: ptr QueryIter) {.async.} = defer: - discard ctx[].signal.fireSync() + if not ctx.isNil: + discard ctx[].signal.fireSync() + + if ctx.isNil or iter.isNil: + trace "ctx is nil" + return let fut = iter[].next() @@ -368,24 +397,23 @@ method query*( var iter = QueryIter.new() - locked = false + lock = newAsyncLock() # serialize querying under threads proc next(): Future[?!QueryResponse] {.async.} = defer: - locked = false + if lock.locked: + lock.release() trace "About to query" - if locked: + if lock.locked: return failure (ref DatastoreError)(msg: "Should always await query features") - locked = true + await lock.acquire() if iter.finished == true: return failure (ref QueryEndedError)(msg: "Calling next on a finished query!") - if iter.finished == true: - return success (Key.none, EmptyBytes) - + iter.finished = childIter.finished var res = ThreadResult[ThreadQueryRes]() ctx = TaskCtx[ThreadQueryRes](