diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 312db40..d3ae21a 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -15,7 +15,6 @@ import pkg/questionable import pkg/questionable/results import pkg/stew/ptrops import pkg/taskpools -import pkg/threading/smartptrs import pkg/stew/byteutils import ../key @@ -26,7 +25,7 @@ import ./asyncsemaphore import ./databuffer type - ThreadTypes = void | bool | SomeInteger | DataBuffer + ThreadTypes = void | bool | SomeInteger | DataBuffer | tuple ThreadResult[T: ThreadTypes] = Result[T, DataBuffer] TaskCtx[T: ThreadTypes] = object @@ -68,11 +67,12 @@ proc hasTask( defer: discard ctx[].signal.fireSync() - without res =? (waitFor ctx[].ds[].has(key[])).catch, error: + without ret =? + (waitFor ctx[].ds[].has(key[])).catch and res =? ret, error: ctx[].res[].err(error) return - ctx[].res[].ok(res.get()) + ctx[].res[].ok(res) method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} = var @@ -189,14 +189,11 @@ proc getTask( defer: discard ctx[].signal.fireSync() - without res =? (waitFor ctx[].ds[].get(key[])).catch, error: - var err = error.msg + without res =? + (waitFor ctx[].ds[].get(key[])).catch and data =? res, error: ctx[].res[].err(error) return - var - data = res.get() - ctx[].res[].ok(DataBuffer.new(data)) method get*( @@ -226,6 +223,78 @@ method close*(self: ThreadDatastore): Future[?!void] {.async.} = await self.ds.close() +proc queryTask*( + ctx: ptr TaskCtx, + iter: ptr QueryIter) = + + defer: + discard ctx[].signal.fireSync() + + without ret =? (waitFor iter[].next()).catch and res =? ret, error: + ctx[].res[].err(error) + return + + if res.key.isNone: + ctx[].res[].ok((false, DataBuffer.new(), DataBuffer.new())) + return + + var + keyBuf = DataBuffer.new($(res.key.get())) + dataBuf = DataBuffer.new(res.data) + + ctx[].res[].ok((true, keyBuf, dataBuf)) + +method query*( + self: ThreadDatastore, + query: Query): Future[?!QueryIter] {.async.} = + + without var childIter =? await self.ds.query(query), error: + return failure error + + var + iter = QueryIter.init() + + let lock = newAsyncLock() + proc next(): Future[?!QueryResponse] {.async.} = + defer: + if lock.locked: + lock.release() + + if iter.finished == true: + return failure (ref QueryEndedError)(msg: "Calling next on a finished query!") + + await lock.acquire() + + if iter.finished == true: + return success (Key.none, EmptyBytes) + + var + signal = ThreadSignalPtr.new().valueOr: + return failure("Failed to create signal") + + res = ThreadResult[(bool, DataBuffer, DataBuffer)]() + ctx = TaskCtx[(bool, DataBuffer, DataBuffer)]( + ds: addr self.ds, + res: addr res, + signal: signal) + + proc runTask() = + self.tp.spawn queryTask(addr ctx, addr childIter) + + self.dispatchTask(ctx, runTask) + if err =? res.errorOption: + return failure err + + let (ok, key, data) = res.get() + if not ok: + iter.finished = true + return success (Key.none, EmptyBytes) + + return success (Key.init($key).expect("should not fail").some, @(data)) + + iter.next = next + return success iter + func new*( self: type ThreadDatastore, ds: Datastore,