implementing query type

This commit is contained in:
Jaremy Creechley 2023-08-29 15:55:42 -07:00
parent e4ace8a759
commit eb79fad131
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
2 changed files with 25 additions and 23 deletions

View File

@ -17,8 +17,8 @@ export key, query, smartptrs, databuffer
push: {.upraises: [].}
type
ThreadResult*[T: DataBuffer | void | bool | ThreadDatastorePtr] = object
ThreadSafeTypes* = DataBuffer | void | bool | ThreadDatastorePtr | QueryResponseBuffer
ThreadResult*[T: ThreadSafeTypes] = object
signal*: ThreadSignalPtr
results*: Result[T, CatchableErrorBuffer]
@ -30,6 +30,10 @@ type
ThreadDatastorePtr* = SharedPtr[ThreadDatastore]
QueryIterStore* = object
it*: QueryIter
QueryIterPtr* = SharedPtr[QueryIterStore]
proc newThreadResult*[T](
tp: typedesc[T]
): Result[TResult[T], ref CatchableError] =
@ -176,14 +180,14 @@ proc delete*(
let bkey = StringBuffer.new(key.id())
tds[].tp.spawn deleteTask(ret, tds, bkey)
method queryTask*(
proc queryTask*(
ret: TResult[QueryResponseBuffer],
tds: ThreadDatastorePtr,
qiter: QueryIter,
qiter: QueryIterPtr,
) =
try:
without res =? waitFor(qiter.next()), err:
without res =? waitFor(qiter[].it.next()), err:
ret.failure(err)
let qrb = res.toBuffer()
@ -191,18 +195,3 @@ method queryTask*(
except Exception:
echo "failure"
proc query*(
ret: TResult[QueryResponseBuffer],
tds: ThreadDatastorePtr,
q: Query,
) =
try:
without iter =? waitFor(tds[].ds.query(q)), err:
ret.failure(err)
while not iter.finished:
tds[].tp.spawn queryTask(ret, tds, iter)
except Exception as exc:
ret.failure(exc)

View File

@ -129,12 +129,24 @@ method query*(
return failure(err)
try:
delete(ret, self.tds, key)
await wait(ret[].signal)
## we need to setup the query iter on the main thread
## to keep it's lifetime associated with this async Future
without it =? await self.tds[].ds.query(query), err:
ret.failure(err)
var iter = newSharedPtr(QueryIterStore())
## note that bypasses SharedPtr isolation - may need `protect` here?
iter[].it = it
while not iter[].it.finished:
self.tds[].tp.spawn queryTask(ret, self.tds, iter)
await wait(ret[].signal)
iter[].it = nil # ensure our sharedptr doesn't try and dealloc
finally:
ret[].signal.close()
return ret.convert(void)
# return ret.convert(void)
method close*(
self: ThreadProxyDatastore
@ -150,6 +162,7 @@ method close*(
## this can block... how to handle? maybe just leak?
self.tds[].tp.shutdown()
self[].tds[].ds = nil # ensure our sharedptr doesn't try and dealloc
proc newThreadProxyDatastore*(
ds: Datastore,