quick and dirty query

This commit is contained in:
Dmitriy Ryajov 2023-09-13 14:41:16 -06:00
parent d6c4d97d91
commit 19954c6f94
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4

View File

@ -15,7 +15,6 @@ import pkg/questionable
import pkg/questionable/results
import pkg/stew/ptrops
import pkg/taskpools
import pkg/threading/smartptrs
import pkg/stew/byteutils
import ../key
@ -26,7 +25,7 @@ import ./asyncsemaphore
import ./databuffer
type
ThreadTypes = void | bool | SomeInteger | DataBuffer
ThreadTypes = void | bool | SomeInteger | DataBuffer | tuple
ThreadResult[T: ThreadTypes] = Result[T, DataBuffer]
TaskCtx[T: ThreadTypes] = object
@ -68,11 +67,12 @@ proc hasTask(
defer:
discard ctx[].signal.fireSync()
without res =? (waitFor ctx[].ds[].has(key[])).catch, error:
without ret =?
(waitFor ctx[].ds[].has(key[])).catch and res =? ret, error:
ctx[].res[].err(error)
return
ctx[].res[].ok(res.get())
ctx[].res[].ok(res)
method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} =
var
@ -189,14 +189,11 @@ proc getTask(
defer:
discard ctx[].signal.fireSync()
without res =? (waitFor ctx[].ds[].get(key[])).catch, error:
var err = error.msg
without res =?
(waitFor ctx[].ds[].get(key[])).catch and data =? res, error:
ctx[].res[].err(error)
return
var
data = res.get()
ctx[].res[].ok(DataBuffer.new(data))
method get*(
@ -226,6 +223,78 @@ method close*(self: ThreadDatastore): Future[?!void] {.async.} =
await self.ds.close()
proc queryTask*(
ctx: ptr TaskCtx,
iter: ptr QueryIter) =
defer:
discard ctx[].signal.fireSync()
without ret =? (waitFor iter[].next()).catch and res =? ret, error:
ctx[].res[].err(error)
return
if res.key.isNone:
ctx[].res[].ok((false, DataBuffer.new(), DataBuffer.new()))
return
var
keyBuf = DataBuffer.new($(res.key.get()))
dataBuf = DataBuffer.new(res.data)
ctx[].res[].ok((true, keyBuf, dataBuf))
method query*(
self: ThreadDatastore,
query: Query): Future[?!QueryIter] {.async.} =
without var childIter =? await self.ds.query(query), error:
return failure error
var
iter = QueryIter.init()
let lock = newAsyncLock()
proc next(): Future[?!QueryResponse] {.async.} =
defer:
if lock.locked:
lock.release()
if iter.finished == true:
return failure (ref QueryEndedError)(msg: "Calling next on a finished query!")
await lock.acquire()
if iter.finished == true:
return success (Key.none, EmptyBytes)
var
signal = ThreadSignalPtr.new().valueOr:
return failure("Failed to create signal")
res = ThreadResult[(bool, DataBuffer, DataBuffer)]()
ctx = TaskCtx[(bool, DataBuffer, DataBuffer)](
ds: addr self.ds,
res: addr res,
signal: signal)
proc runTask() =
self.tp.spawn queryTask(addr ctx, addr childIter)
self.dispatchTask(ctx, runTask)
if err =? res.errorOption:
return failure err
let (ok, key, data) = res.get()
if not ok:
iter.finished = true
return success (Key.none, EmptyBytes)
return success (Key.init($key).expect("should not fail").some, @(data))
iter.next = next
return success iter
func new*(
self: type ThreadDatastore,
ds: Datastore,