diff --git a/datastore/threadproxyds.nim b/datastore/threadproxyds.nim index 6cce0c1..091e572 100644 --- a/datastore/threadproxyds.nim +++ b/datastore/threadproxyds.nim @@ -30,7 +30,7 @@ method has*( try: has(ret, self.tds, key) - await wait(ret[].signal) + await wait(ret) return ret.convert(bool) finally: ret.release() @@ -44,7 +44,7 @@ method delete*( try: delete(ret, self.tds, key) - await wait(ret[].signal) + await wait(ret) finally: ret.release() @@ -75,7 +75,7 @@ method get*( try: get(ret, self.tds, key) - await wait(ret[].signal) + await wait(ret) finally: ret.release() @@ -91,7 +91,7 @@ method put*( try: put(ret, self.tds, key, data) - await wait(ret[].signal) + await wait(ret) return ret.convert(void) finally: @@ -143,7 +143,7 @@ method query*( if not iter[].it.finished: iterWrapper.readyForNext = false query(ret, self.tds, iter) - await wait(ret[].signal) + await wait(ret) iterWrapper.readyForNext = true # echo "" # print "query:post: ", ret[].results diff --git a/datastore/threads/sharedptr.nim b/datastore/threads/sharedptr.nim index a2af3cb..ee3c301 100644 --- a/datastore/threads/sharedptr.nim +++ b/datastore/threads/sharedptr.nim @@ -32,7 +32,7 @@ proc decr*[T](x: var SharedPtr[T]) = if x.val != nil and x.cnt != nil: let res = atomicSubFetch(x.cnt, 1, ATOMIC_ACQUIRE) if res == 0: - echo "SharedPtr: FREE: ", repr x.val.pointer, " ", x.cnt[], " tp: ", $(typeof(T)) + echo "SharedPtr: FREE: ", repr x.val.pointer, " ", x[].cnt[], " tp: ", $(typeof(T)) when compiles(`=destroy`(x.val)): echo "DECR FREE" `=destroy`(x.val) @@ -50,7 +50,7 @@ proc release*[T](x: var SharedPtr[T]) = proc `=destroy`*[T](x: var SharedPtr[T]) = if x.val != nil: - echo "SharedPtr: destroy: ", repr x.val.pointer, " cnt: ", x.cnt.repr, " tp: ", $(typeof(T)) + echo "SharedPtr: destroy: ", repr x.val.pointer.repr, " cnt: ", x.cnt.pointer.repr, " tp: ", $(typeof(T)) decr(x) proc `=dup`*[T](src: SharedPtr[T]): SharedPtr[T] = diff --git a/datastore/threads/threadbackend.nim b/datastore/threads/threadbackend.nim index 483dba7..f58f7c3 100644 --- a/datastore/threads/threadbackend.nim +++ b/datastore/threads/threadbackend.nim @@ -81,7 +81,7 @@ proc hasTask*( ret.failure(res.error()) else: ret.success(res.get()) - discard ret[].signal.fireSync() + discard ret.fireSync() except CatchableError as err: ret.failure(err) @@ -108,7 +108,7 @@ proc getTask*( let db = DataBuffer.new res.get() ret.success(db) - discard ret[].signal.fireSync() + discard ret.fireSync() except CatchableError as err: ret.failure(err) @@ -139,7 +139,7 @@ proc putTask*( else: ret.success() - discard ret[].signal.fireSync() + discard ret.fireSync() proc put*( ret: TResult[void], @@ -169,7 +169,7 @@ proc deleteTask*( else: ret.success() - discard ret[].signal.fireSync() + discard ret.fireSync() # import pretty @@ -204,7 +204,7 @@ proc queryTask*( except Exception as exc: ret.failure(exc) - discard ret[].signal.fireSync() + discard ret.fireSync() proc query*( ret: TResult[QueryResponseBuffer], diff --git a/datastore/threads/threadresults.nim b/datastore/threads/threadresults.nim index cbec039..6591eab 100644 --- a/datastore/threads/threadresults.nim +++ b/datastore/threads/threadresults.nim @@ -8,6 +8,7 @@ import ./sharedptr import ./databuffer import ./threadsignalpool +export threadsignalpool export databuffer export sharedptr export threadsync @@ -22,7 +23,7 @@ type ## Encapsulates both the results from a thread but also the cross ## thread signaling mechanism. This makes it easier to keep them ## together. - signal*: ThreadSignalPtr + sig*: SharedSignal results*: Result[T, CatchableErrorBuffer] TResult*[T] = SharedPtr[ThreadResult[T]] ##\ @@ -57,13 +58,17 @@ proc newThreadResult*[T]( {.error: "only thread safe types can be used".} let res = newSharedPtr(ThreadResult[T]) - res[].signal = await getThreadSignal() + res[].sig = await SharedSignal.new() res proc release*[T](res: var TResult[T]) {.raises: [].} = ## release TResult and it's ThreadSignal - res[].signal.release() + # res[].signal.release() sharedptr.release(res) +proc wait*[T](res: TResult[T]): Future[void] = + res[].sig.wait() +proc fireSync*[T](res: TResult[T]): Result[bool, string] = + res[].sig.fireSync() proc success*[T](ret: TResult[T], value: T) = ## convenience wrapper for `TResult` to replicate diff --git a/datastore/threads/threadsignalpool.nim b/datastore/threads/threadsignalpool.nim index 875efe1..57a8d38 100644 --- a/datastore/threads/threadsignalpool.nim +++ b/datastore/threads/threadsignalpool.nim @@ -76,3 +76,26 @@ proc release*(sig: ThreadSignalPtr) {.raises: [].} = signalPoolUsed.excl(sig) signalPoolFree.incl(sig) # echo "free:signalPoolUsed:size: ", signalPoolUsed.len() + + +type + SharedSignalObj* = object + sigptr*: ThreadSignalPtr + + SharedSignal* = SharedPtr[SharedSignalObj] + +proc `=destroy`*[T](x: var SharedSignalObj) = + if x.sigptr != nil: + echo "ThreadSignalObj: destroy " + release(x.sigptr) + x.sigptr = nil + +proc new*(tp: typedesc[SharedSignal]): Future[SharedSignal] {.async.} = + result = newSharedPtr[SharedSignalObj](SharedSignalObj) + result[].sigptr = await getThreadSignal() + +proc wait*(sig: SharedSignal): Future[void] = + sig[].sigptr.wait() + +proc fireSync*(sig: SharedSignal): Result[bool, string] = + sig[].sigptr.fireSync()