From ae91c6a07d7c3cdff61bd0a9a28b20648548741e Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Thu, 14 Sep 2023 18:18:14 -0700 Subject: [PATCH] split out sig acquire --- datastore/threadproxyds.nim | 31 ++++++++++++++++++----------- datastore/threads/threadbackend.nim | 24 ++++++++++++++-------- 2 files changed, 35 insertions(+), 20 deletions(-) diff --git a/datastore/threadproxyds.nim b/datastore/threadproxyds.nim index cd789e0..a7da3f2 100644 --- a/datastore/threadproxyds.nim +++ b/datastore/threadproxyds.nim @@ -26,11 +26,13 @@ method has*( key: Key ): Future[?!bool] {.async.} = - var ret = await newThreadResult(bool) + var ret = newThreadResult(bool) + let sig = SharedSignal.new(0) try: - has(ret, self.tds, key) - await wait(ret) + await sig.acquireSig() + sig.has(ret, self.tds, key) + await sig.wait() return ret.convert(bool) finally: ret.release() @@ -40,11 +42,13 @@ method delete*( key: Key ): Future[?!void] {.async.} = - var ret = await newThreadResult(void) + let sig = SharedSignal.new(0) + var ret = newThreadResult(void) try: - delete(ret, self.tds, key) - await wait(ret) + await sig.acquireSig() + sig.delete(ret, self.tds, key) + await sig.wait() finally: ret.release() @@ -71,11 +75,12 @@ method get*( ## probably be switched to use a single ThreadSignal ## for the entire batch - var ret = await newThreadResult(ValueBuffer) + let sig = SharedSignal.new(0) + var ret = newThreadResult(ValueBuffer) try: - get(ret, self.tds, key) - await wait(ret) + sig.get(ret, self.tds, key) + await sig.wait() finally: ret.release() @@ -148,7 +153,9 @@ method query*( query: Query ): Future[?!QueryIter] {.async.} = - var ret = await newThreadResult(QueryResponseBuffer) + let sig = SharedSignal.new(0) + await sig.acquireSig() + var ret = newThreadResult(QueryResponseBuffer) # echo "\n\n=== Query Start === " @@ -169,8 +176,8 @@ method query*( iterWrapper.finished = iter[].it.finished if not iter[].it.finished: iterWrapper.readyForNext = false - query(ret, self.tds, iter) - await wait(ret) + sig.query(ret, self.tds, iter) + await sig.wait() iterWrapper.readyForNext = true # echo "" # print "query:post: ", ret[].results diff --git a/datastore/threads/threadbackend.nim b/datastore/threads/threadbackend.nim index 402d0b5..bbd7c06 100644 --- a/datastore/threads/threadbackend.nim +++ b/datastore/threads/threadbackend.nim @@ -68,6 +68,7 @@ type QueryIterPtr* = SharedPtr[QueryIterStore] proc hasTask*( + sig: SharedSignal, ret: TResult[bool], tds: ThreadDatastorePtr, kb: KeyBuffer, @@ -81,19 +82,21 @@ proc hasTask*( ret.failure(res.error()) else: ret.success(res.get()) - discard ret.fireSync() + discard sig.fireSync() except CatchableError as err: ret.failure(err) proc has*( + sig: SharedSignal, ret: TResult[bool], tds: ThreadDatastorePtr, key: Key, ) = let bkey = KeyBuffer.new(key) - tds[].tp.spawn hasTask(ret, tds, bkey) + tds[].tp.spawn hasTask(sig, ret, tds, bkey) proc getTask*( + sig: SharedSignal, ret: TResult[DataBuffer], tds: ThreadDatastorePtr, kb: KeyBuffer, @@ -107,17 +110,18 @@ proc getTask*( let db = DataBuffer.new res.get() ret.success(db) - discard ret.fireSync() + discard sig.fireSync() except CatchableError as err: ret.failure(err) proc get*( + sig: SharedSignal, ret: TResult[DataBuffer], tds: ThreadDatastorePtr, key: Key, ) = let bkey = KeyBuffer.new(key) - tds[].tp.spawn getTask(ret, tds, bkey) + tds[].tp.spawn getTask(sig, ret, tds, bkey) import std/os @@ -153,6 +157,7 @@ proc putTask*( proc deleteTask*( + sig: SharedSignal, ret: TResult[void], tds: ThreadDatastorePtr, kb: KeyBuffer, @@ -167,21 +172,23 @@ proc deleteTask*( else: ret.success() - discard ret.fireSync() + discard sig.fireSync() # import pretty proc delete*( + sig: SharedSignal, ret: TResult[void], tds: ThreadDatastorePtr, key: Key, ) = let bkey = KeyBuffer.new(key) - tds[].tp.spawn deleteTask(ret, tds, bkey) + tds[].tp.spawn deleteTask(sig, ret, tds, bkey) # import os proc queryTask*( + sig: SharedSignal, ret: TResult[QueryResponseBuffer], tds: ThreadDatastorePtr, qiter: QueryIterPtr, @@ -202,11 +209,12 @@ proc queryTask*( except Exception as exc: ret.failure(exc) - discard ret.fireSync() + discard sig.fireSync() proc query*( + sig: SharedSignal, ret: TResult[QueryResponseBuffer], tds: ThreadDatastorePtr, qiter: QueryIterPtr, ) = - tds[].tp.spawn queryTask(ret, tds, qiter) + tds[].tp.spawn queryTask(sig, ret, tds, qiter)