From 5d259aca1d40c9d29297c1e799854b1b71cc3318 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Mon, 18 Sep 2023 13:54:56 -0700 Subject: [PATCH] manually allocat TaskCtx --- datastore/threads/threadproxyds.nim | 63 ++++++++++++++++----------- tests/datastore/testthreadproxyds.nim | 2 +- 2 files changed, 38 insertions(+), 27 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index b3647dd..e34accf 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -31,10 +31,10 @@ type ErrorEnum {.pure.} = enum DatastoreErr, DatastoreKeyNotFoundErr, CatchableErr - ThreadTypes = void | bool | SomeInteger | DataBuffer | tuple | Atomic - ThreadResult[T: ThreadTypes] = Result[T, DataBuffer] + ThreadTypes* = void | bool | SomeInteger | DataBuffer | tuple | Atomic + ThreadResult*[T: ThreadTypes] = Result[T, DataBuffer] - TaskCtx[T: ThreadTypes] = ref object + TaskCtx*[T: ThreadTypes] = object ds*: ptr Datastore res*: ThreadResult[T] cancelled: Atomic[bool] @@ -52,9 +52,21 @@ type queryLock: AsyncLock # global query lock, this is only really \ # needed for the fsds, but it is expensive! +template addrOf*[T](ctx: ptr TaskCtx[T]): ptr TaskCtx[T] = + ctx + +proc new*[T]( + ctx: typedesc[TaskCtx[T]], + ds: Datastore, + signal: ThreadSignalPtr, +): ptr TaskCtx[T] = + let res = cast[ptr TaskCtx[T]](allocShared0(sizeof(TaskCtx[T]))) + res[].ds = unsafeAddr ds + res + template withLocks( self: ThreadDatastore, - ctx: TaskCtx, + ctx: ptr TaskCtx, key: ?Key = Key.none, fut: Future[void], body: untyped) = @@ -77,7 +89,7 @@ template withLocks( template dispatchTask( self: ThreadDatastore, - ctx: TaskCtx, + ctx: ptr TaskCtx, key: ?Key = Key.none, runTask: proc): untyped = @@ -97,7 +109,6 @@ template dispatchTask( # could do a spinlock here until the other side cancels, # but for now it'd at least be better to leak than possibly # corrupt memory since it's easier to detect and fix leaks - GC_ref(ctx) warn "request was cancelled while thread task is running", exc = exc.msg ctx.cancelled.store(true, moAcquireRelease) await ctx.signal.fire() @@ -161,12 +172,12 @@ method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} = signal = ThreadSignalPtr.new().valueOr: return failure(error()) - ctx = TaskCtx[bool]( - ds: addr self.ds, - signal: signal) + ctx = TaskCtx[bool].new( + ds = self.ds, + signal = signal) proc runTask() = - self.tp.spawn hasTask(addr ctx, unsafeAddr key) + self.tp.spawn hasTask(addrOf(ctx), unsafeAddr key) self.dispatchTask(ctx, key.some, runTask) return success(ctx.res.get()) @@ -206,12 +217,12 @@ method delete*( signal = ThreadSignalPtr.new().valueOr: return failure(error()) - ctx = TaskCtx[void]( - ds: addr self.ds, - signal: signal) + ctx = TaskCtx[void].new( + ds= self.ds, + signal= signal) proc runTask() = - self.tp.spawn delTask(addr ctx, unsafeAddr key) + self.tp.spawn delTask(addrOf(ctx), unsafeAddr key) self.dispatchTask(ctx, key.some, runTask) return success() @@ -272,13 +283,13 @@ method put*( signal = ThreadSignalPtr.new().valueOr: return failure(error()) - ctx = TaskCtx[void]( - ds: addr self.ds, - signal: signal) + ctx = TaskCtx[void].new( + ds= self.ds, + signal= signal) proc runTask() = self.tp.spawn putTask( - addr ctx, + addrOf(ctx), unsafeAddr key, makeUncheckedArray(baseAddr data), data.len) @@ -338,12 +349,12 @@ method get*( return failure(error()) var - ctx = TaskCtx[DataBuffer]( - ds: addr self.ds, - signal: signal) + ctx = TaskCtx[DataBuffer].new( + ds= self.ds, + signal= signal) proc runTask() = - self.tp.spawn getTask(addr ctx, unsafeAddr key) + self.tp.spawn getTask(addrOf(ctx), unsafeAddr key) self.dispatchTask(ctx, key.some, runTask) if err =? ctx.res.errorOption: @@ -424,12 +435,12 @@ method query*( signal = ThreadSignalPtr.new().valueOr: return failure("Failed to create signal") - ctx = TaskCtx[(bool, DataBuffer, DataBuffer)]( - ds: addr self.ds, - signal: signal) + ctx = TaskCtx[(bool, DataBuffer, DataBuffer)].new( + ds= self.ds, + signal= signal) proc runTask() = - self.tp.spawn queryTask(addr ctx, addr childIter) + self.tp.spawn queryTask(addrOf(ctx), addr childIter) self.dispatchTask(ctx, Key.none, runTask) if err =? ctx.res.errorOption: diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index 02f9626..8cf4d3d 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -150,7 +150,7 @@ suite "Test ThreadDatastore cancelations": ds: addr sqlStore, signal: signal) fut = newFuture[void]("signalMonitor") - threadArgs: (ptr TaskCtx, ptr Future[void]) = (unsafeAddr ctx[], addr fut) + threadArgs = (cast[ptr TaskCtx[void]](ctx), addr fut) var thread: Thread[type threadArgs]