cleanup other tasks

This commit is contained in:
Jaremy Creechley 2023-09-14 19:06:05 -07:00
parent 13e994c986
commit 2026310dd6
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
2 changed files with 37 additions and 72 deletions

View File

@ -26,16 +26,17 @@ method has*(
key: Key
): Future[?!bool] {.async.} =
var ret = newThreadResult(bool)
let sig = SharedSignal.new(0)
await sig.acquireSig()
try:
await sig.acquireSig()
sig.has(ret, self.tds, key)
await sig.wait()
return ret.convert(bool)
finally:
ret.release()
var ret = newThreadResult(bool)
proc submitHas() =
let bkey = KeyBuffer.new(key)
self.tds[].tp.spawn hasTask(sig, ret, self.tds, bkey)
submitHas()
await sig.wait()
return ret.convert(bool)
method delete*(
self: ThreadProxyDatastore,
@ -43,15 +44,15 @@ method delete*(
): Future[?!void] {.async.} =
let sig = SharedSignal.new(0)
await sig.acquireSig()
var ret = newThreadResult(void)
proc submitDelete() =
let bkey = KeyBuffer.new(key)
self.tds[].tp.spawn deleteTask(sig, ret, self.tds, bkey)
try:
await sig.acquireSig()
sig.delete(ret, self.tds, key)
await sig.wait()
finally:
ret.release()
submitDelete()
await sig.wait()
return ret.convert(void)
method delete*(
@ -76,18 +77,17 @@ method get*(
## for the entire batch
let sig = SharedSignal.new(0)
await sig.acquireSig()
var ret = newThreadResult(ValueBuffer)
proc submitGet() =
let bkey = KeyBuffer.new(key)
self.tds[].tp.spawn getTask(sig, ret, self.tds, bkey)
try:
sig.get(ret, self.tds, key)
await sig.wait()
finally:
ret.release()
submitGet()
await sig.wait()
return ret.convert(seq[byte])
import ./threads/then
import std/os
method put*(
self: ThreadProxyDatastore,
@ -127,8 +127,6 @@ method put*(
return success()
import pretty
method query*(
self: ThreadProxyDatastore,
query: Query

View File

@ -86,14 +86,23 @@ proc hasTask*(
except CatchableError as err:
ret.failure(err)
proc has*(
proc deleteTask*(
sig: SharedSignal,
ret: TResult[bool],
ret: TResult[void],
tds: ThreadDatastorePtr,
key: Key,
kb: KeyBuffer,
) =
let bkey = KeyBuffer.new(key)
tds[].tp.spawn hasTask(sig, ret, tds, bkey)
let key = kb.toKey()
let res = (waitFor tds[].ds.delete(key)).catch
# print "thrbackend: putTask: fire", ret[].signal.fireSync().get()
if res.isErr:
ret.failure(res.error())
else:
ret.success()
discard sig.fireSync()
proc getTask*(
sig: SharedSignal,
@ -114,15 +123,6 @@ proc getTask*(
except CatchableError as err:
ret.failure(err)
proc get*(
sig: SharedSignal,
ret: TResult[DataBuffer],
tds: ThreadDatastorePtr,
key: Key,
) =
let bkey = KeyBuffer.new(key)
tds[].tp.spawn getTask(sig, ret, tds, bkey)
import std/os
proc putTask*(
@ -154,39 +154,6 @@ proc putTask*(
sig.decr()
echoed "putTask: FINISH\n"
proc deleteTask*(
sig: SharedSignal,
ret: TResult[void],
tds: ThreadDatastorePtr,
kb: KeyBuffer,
) =
let key = kb.toKey()
let res = (waitFor tds[].ds.delete(key)).catch
# print "thrbackend: putTask: fire", ret[].signal.fireSync().get()
if res.isErr:
ret.failure(res.error())
else:
ret.success()
discard sig.fireSync()
# import pretty
proc delete*(
sig: SharedSignal,
ret: TResult[void],
tds: ThreadDatastorePtr,
key: Key,
) =
let bkey = KeyBuffer.new(key)
tds[].tp.spawn deleteTask(sig, ret, tds, bkey)
# import os
proc queryTask*(
sig: SharedSignal,
ret: TResult[QueryResponseBuffer],