diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 207ce62..28789be 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -35,6 +35,7 @@ type ThreadResult*[T: ThreadTypes] = Result[T, DataBuffer] TaskCtx*[T: ThreadTypes] = object + head: int ds*: ptr Datastore res*: ThreadResult[T] cancelled: Atomic[bool] @@ -61,9 +62,13 @@ proc new*[T]( ds: Datastore, ): ref TaskCtx[T] = # let res = cast[ptr TaskCtx[T]](allocShared0(sizeof(TaskCtx[T]))) - let res = (ref TaskCtx[T])() - res.ds = unsafeAddr ds - res + result = (ref TaskCtx[T])() + result.ds = unsafeAddr ds + echo "" + echo "TaskCtx:new: ", "addrOf: ", addrOf(result).pointer.repr + echo "TaskCtx:new: ", "head:ptr: ", unsafeAddr(result.head).pointer.repr + echo "TaskCtx:new: ", " result:repr:\n", result.repr + echo "" template withLocks( self: ThreadDatastore, @@ -94,6 +99,7 @@ template dispatchTask( key: ?Key = Key.none, runTask: proc): untyped = try: + # GC_ref(ctx) await self.semaphore.acquire() ctx[].signal = ThreadSignalPtr.new().valueOr: result = failure(error()) @@ -115,11 +121,12 @@ template dispatchTask( # but for now it'd at least be better to leak than possibly # corrupt memory since it's easier to detect and fix leaks warn "request was cancelled while thread task is running", exc = exc.msg - GC_ref(ctx) + # GC_ref(ctx) ctx.cancelled.store(true, moAcquireRelease) await ctx.signal.fire() raise exc finally: + # GC_unref(ctx) discard ctx.signal.close() self.semaphore.release() @@ -171,8 +178,6 @@ proc hasTask(ctx: ptr TaskCtx, key: ptr Key) = method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} = var - signal = ThreadSignalPtr.new().valueOr: - return failure(error()) ctx = TaskCtx[bool].new( ds = self.ds) @@ -209,8 +214,6 @@ method delete*( self: ThreadDatastore, key: Key): Future[?!void] {.async.} = var - signal = ThreadSignalPtr.new().valueOr: - return failure(error()) ctx = TaskCtx[void].new( ds= self.ds) @@ -236,12 +239,21 @@ proc asyncPutTask( data: ptr UncheckedArray[byte], len: int) {.async.} = - echo "PUT TASK: ", ctx.repr + echo "" + echo "ASYNC PUT TASK:ptr: ", ctx.pointer.repr + # echo "PUT TASK:\n", ctx[].repr + echo "ASYNC PUT TASK:ds: ", ctx[].ds.pointer.repr + echo "ASYNC PUT TASK:res: ", ctx[].res.repr + echo "ASYNC PUT TASK:signal: ", ctx[].signal.pointer.repr + echo "ASYNC PUT TASK:cancelled: ", ctx[].cancelled.repr + echo "ASYNC PUT TASK:semaphore: ", ctx[].semaphore.repr + echo "ASYNC PUT TASK:ds: ", ctx[].ds[].repr + defer: - discard ctx[].signal.fireSync() + discard ctx.signal.fireSync() let - fut = ctx[].ds[].put(key[], @(data.toOpenArray(0, len - 1))) + fut = ctx.ds[].put(key[], @(data.toOpenArray(0, len - 1))) asyncSpawn signalMonitor(ctx, fut) without res =? (await fut).catch, error: @@ -259,6 +271,15 @@ proc putTask( ## run put in a thread task ## + echo "" + echo "PUT TASK:ptr: ", ctx.pointer.repr + # echo "PUT TASK:\n", ctx[].repr + echo "PUT TASK:ds: ", ctx[].ds.pointer.repr + echo "PUT TASK:res: ", ctx[].res.repr + echo "PUT TASK:signal: ", ctx[].signal.pointer.repr + echo "PUT TASK:cancelled: ", ctx[].cancelled.repr + echo "PUT TASK:semaphore: ", ctx[].semaphore.repr + try: waitFor asyncPutTask(ctx, key, data, len) except CatchableError as exc: @@ -269,10 +290,8 @@ method put*( self: ThreadDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} = - var - signal = ThreadSignalPtr.new().valueOr: - return failure(error()) + var ctx = TaskCtx[void].new( ds= self.ds) proc runTask() = @@ -407,8 +426,6 @@ method query*( return success (Key.none, EmptyBytes) var - signal = ThreadSignalPtr.new().valueOr: - return failure("Failed to create signal") ctx = TaskCtx[(bool, DataBuffer, DataBuffer)].new( ds= self.ds)