From 2026310dd69d5ad8019d6ab90dd4c306d22ce06a Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Thu, 14 Sep 2023 19:06:05 -0700 Subject: [PATCH] cleanup other tasks --- datastore/threadproxyds.nim | 48 +++++++++++------------ datastore/threads/threadbackend.nim | 61 +++++++---------------------- 2 files changed, 37 insertions(+), 72 deletions(-) diff --git a/datastore/threadproxyds.nim b/datastore/threadproxyds.nim index 205418e..67a1842 100644 --- a/datastore/threadproxyds.nim +++ b/datastore/threadproxyds.nim @@ -26,16 +26,17 @@ method has*( key: Key ): Future[?!bool] {.async.} = - var ret = newThreadResult(bool) let sig = SharedSignal.new(0) + await sig.acquireSig() - try: - await sig.acquireSig() - sig.has(ret, self.tds, key) - await sig.wait() - return ret.convert(bool) - finally: - ret.release() + var ret = newThreadResult(bool) + proc submitHas() = + let bkey = KeyBuffer.new(key) + self.tds[].tp.spawn hasTask(sig, ret, self.tds, bkey) + + submitHas() + await sig.wait() + return ret.convert(bool) method delete*( self: ThreadProxyDatastore, @@ -43,15 +44,15 @@ method delete*( ): Future[?!void] {.async.} = let sig = SharedSignal.new(0) + await sig.acquireSig() + var ret = newThreadResult(void) + proc submitDelete() = + let bkey = KeyBuffer.new(key) + self.tds[].tp.spawn deleteTask(sig, ret, self.tds, bkey) - try: - await sig.acquireSig() - sig.delete(ret, self.tds, key) - await sig.wait() - finally: - ret.release() - + submitDelete() + await sig.wait() return ret.convert(void) method delete*( @@ -76,18 +77,17 @@ method get*( ## for the entire batch let sig = SharedSignal.new(0) + await sig.acquireSig() + var ret = newThreadResult(ValueBuffer) + proc submitGet() = + let bkey = KeyBuffer.new(key) + self.tds[].tp.spawn getTask(sig, ret, self.tds, bkey) - try: - sig.get(ret, self.tds, key) - await sig.wait() - finally: - ret.release() - + submitGet() + await sig.wait() return ret.convert(seq[byte]) -import ./threads/then -import std/os method put*( self: ThreadProxyDatastore, @@ -127,8 +127,6 @@ method put*( return success() -import pretty - method query*( self: ThreadProxyDatastore, query: Query diff --git a/datastore/threads/threadbackend.nim b/datastore/threads/threadbackend.nim index bbd7c06..48cc8a6 100644 --- a/datastore/threads/threadbackend.nim +++ b/datastore/threads/threadbackend.nim @@ -86,14 +86,23 @@ proc hasTask*( except CatchableError as err: ret.failure(err) -proc has*( +proc deleteTask*( sig: SharedSignal, - ret: TResult[bool], + ret: TResult[void], tds: ThreadDatastorePtr, - key: Key, + kb: KeyBuffer, ) = - let bkey = KeyBuffer.new(key) - tds[].tp.spawn hasTask(sig, ret, tds, bkey) + + let key = kb.toKey() + + let res = (waitFor tds[].ds.delete(key)).catch + # print "thrbackend: putTask: fire", ret[].signal.fireSync().get() + if res.isErr: + ret.failure(res.error()) + else: + ret.success() + + discard sig.fireSync() proc getTask*( sig: SharedSignal, @@ -114,15 +123,6 @@ proc getTask*( except CatchableError as err: ret.failure(err) -proc get*( - sig: SharedSignal, - ret: TResult[DataBuffer], - tds: ThreadDatastorePtr, - key: Key, -) = - let bkey = KeyBuffer.new(key) - tds[].tp.spawn getTask(sig, ret, tds, bkey) - import std/os proc putTask*( @@ -154,39 +154,6 @@ proc putTask*( sig.decr() echoed "putTask: FINISH\n" - - -proc deleteTask*( - sig: SharedSignal, - ret: TResult[void], - tds: ThreadDatastorePtr, - kb: KeyBuffer, -) = - - let key = kb.toKey() - - let res = (waitFor tds[].ds.delete(key)).catch - # print "thrbackend: putTask: fire", ret[].signal.fireSync().get() - if res.isErr: - ret.failure(res.error()) - else: - ret.success() - - discard sig.fireSync() - -# import pretty - -proc delete*( - sig: SharedSignal, - ret: TResult[void], - tds: ThreadDatastorePtr, - key: Key, -) = - let bkey = KeyBuffer.new(key) - tds[].tp.spawn deleteTask(sig, ret, tds, bkey) - -# import os - proc queryTask*( sig: SharedSignal, ret: TResult[QueryResponseBuffer],