diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index c9b0408..42ed8d0 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -65,7 +65,6 @@ template withLocks( if self.withLocks: await self.queryLock.acquire() # only lock if it's required (fsds) - body finally: if self.withLocks: @@ -74,29 +73,35 @@ template withLocks( if self.queryLock.locked: self.queryLock.release() +# TODO: needs rework, we can't use `result` with async template dispatchTask( self: ThreadDatastore, ctx: TaskCtx, key: ?Key = Key.none, runTask: proc): untyped = + try: + await self.semaphore.acquire() + ctx.signal = ThreadSignalPtr.new().valueOr: + result = failure(error()) + return - let - fut = wait(ctx.signal) + let + fut = wait(ctx.signal) - withLocks(self, ctx, key, fut): - try: + withLocks(self, ctx, key, fut): runTask() await fut if ctx.res[].isErr: - result = failure(ctx.res[].error()) # TODO: fix this, result shouldn't be accessed - except CancelledError as exc: - trace "Cancelling thread future!", exc = exc.msg - ctx.cancelled = true - await ctx.signal.fire() - raise exc - finally: - discard ctx.signal.close() + result = failure(ctx.res[].error()) # TODO: fix this, result shouldn't be accessed from a thread + except CancelledError as exc: + trace "Cancelling thread future!", exc = exc.msg + ctx.cancelled = true + await ctx.signal.fire() + raise exc + finally: + discard ctx.signal.close() + self.semaphore.release() proc signalMonitor[T](ctx: ptr TaskCtx, fut: Future[T]) {.async.} = ## Monitor the signal and cancel the future if @@ -142,20 +147,11 @@ proc hasTask(ctx: ptr TaskCtx, key: ptr Key) = raiseAssert exc.msg method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} = - defer: - self.semaphore.release() - - await self.semaphore.acquire() - var - signal = ThreadSignalPtr.new().valueOr: - return failure(error()) - res = ThreadResult[bool]() ctx = TaskCtx[bool]( ds: addr self.ds, - res: addr res, - signal: signal) + res: addr res) proc runTask() = self.tp.spawn hasTask(addr ctx, unsafeAddr key) @@ -189,20 +185,11 @@ proc delTask(ctx: ptr TaskCtx, key: ptr Key) = method delete*( self: ThreadDatastore, key: Key): Future[?!void] {.async.} = - defer: - self.semaphore.release() - - await self.semaphore.acquire() - var - signal = ThreadSignalPtr.new().valueOr: - return failure(error()) - res = ThreadResult[void]() ctx = TaskCtx[void]( ds: addr self.ds, - res: addr res, - signal: signal) + res: addr res) proc runTask() = self.tp.spawn delTask(addr ctx, unsafeAddr key) @@ -257,20 +244,11 @@ method put*( self: ThreadDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} = - defer: - self.semaphore.release() - - await self.semaphore.acquire() - var - signal = ThreadSignalPtr.new().valueOr: - return failure(error()) - res = ThreadResult[void]() ctx = TaskCtx[void]( ds: addr self.ds, - res: addr res, - signal: signal) + res: addr res) proc runTask() = self.tp.spawn putTask( @@ -324,21 +302,11 @@ proc getTask( method get*( self: ThreadDatastore, key: Key): Future[?!seq[byte]] {.async.} = - defer: - self.semaphore.release() - - await self.semaphore.acquire() - - var - signal = ThreadSignalPtr.new().valueOr: - return failure(error()) - var res = ThreadResult[DataBuffer]() ctx = TaskCtx[DataBuffer]( ds: addr self.ds, - res: addr res, - signal: signal) + res: addr res) proc runTask() = self.tp.spawn getTask(addr ctx, unsafeAddr key) @@ -403,10 +371,8 @@ method query*( proc next(): Future[?!QueryResponse] {.async.} = defer: locked = false - self.semaphore.release() trace "About to query" - await self.semaphore.acquire() if locked: return failure (ref DatastoreError)(msg: "Should always await query features") @@ -419,21 +385,17 @@ method query*( return success (Key.none, EmptyBytes) var - signal = ThreadSignalPtr.new().valueOr: - return failure("Failed to create signal") - res = ThreadResult[(bool, DataBuffer, DataBuffer)]() ctx = TaskCtx[(bool, DataBuffer, DataBuffer)]( ds: addr self.ds, - res: addr res, - signal: signal) + res: addr res) proc runTask() = self.tp.spawn queryTask(addr ctx, addr childIter) self.dispatchTask(ctx, Key.none, runTask) if err =? res.errorOption: - trace "Query failed", err = err + trace "Query failed", err = $err return failure err let (ok, key, data) = res.get()