From fddbb99a9875df0ee656358336bca24f37e6c452 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Tue, 26 Sep 2023 00:00:25 -0700 Subject: [PATCH] await all --- datastore/threads/asyncsemaphore.nim | 7 +- datastore/threads/threadproxyds.nim | 106 ++++++++------------------- 2 files changed, 37 insertions(+), 76 deletions(-) diff --git a/datastore/threads/asyncsemaphore.nim b/datastore/threads/asyncsemaphore.nim index c9a7627..1735460 100644 --- a/datastore/threads/asyncsemaphore.nim +++ b/datastore/threads/asyncsemaphore.nim @@ -21,6 +21,7 @@ type AsyncSemaphore* = ref object of RootObj size*: int count: int + exit: bool queue: seq[Future[void]] func new*(_: type AsyncSemaphore, size: int): AsyncSemaphore = @@ -28,12 +29,16 @@ func new*(_: type AsyncSemaphore, size: int): AsyncSemaphore = proc `count`*(s: AsyncSemaphore): int = s.count +proc waitAll*(s: AsyncSemaphore) {.async.} = + s.exit = true + await allFutures(s.queue) + proc tryAcquire*(s: AsyncSemaphore): bool = ## Attempts to acquire a resource, if successful ## returns true, otherwise false ## - if s.count > 0 and s.queue.len == 0: + if s.count > 0 and s.queue.len == 0 and not s.exit: s.count.dec trace "Acquired slot", available = s.count, queue = s.queue.len return true diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 4d089e4..c0fc8a0 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -32,7 +32,6 @@ logScope: topics = "datastore threadproxyds" type - SqliteDB = SQLiteBackend[KeyId,DataBuffer] ThreadBackendKinds* = enum Sqlite @@ -209,92 +208,49 @@ method get*(self: ThreadDatastore, dispatchTask[void](self, signal): self.tp.spawn getTask(addr ctx, ds, key) -# method close*(self: ThreadDatastore): Future[?!void] {.async.} = -# for fut in self.tasks.values.toSeq: -# await fut.cancelAndWait() # probably want to store the signal, instead of the future (or both?) +method close*(self: ThreadDatastore): Future[?!void] {.async.} = -# await self.ds.close() + await self.ds.close() -# proc asyncQueryTask( -# ctx: ptr TaskCtx, -# iter: ptr QueryIter) {.async.} = -# if ctx.isNil or iter.isNil: -# trace "ctx is nil" -# return +method query*( + self: ThreadDatastore, + query: Query): Future[?!QueryIter] {.async.} = + without var childIter =? await self.ds.query(query), error: + return failure error -# let -# fut = iter[].next() + var + iter = QueryIter.new() + lock = newAsyncLock() # serialize querying under threads -# asyncSpawn signalMonitor(ctx, fut) -# without ret =? (await fut).catch and res =? ret, error: -# trace "Error in asyncQueryTask", error = error.msg -# ctx[].res[].err(error) -# return + proc next(): Future[?!QueryResponse] {.async.} = + defer: + if lock.locked: + lock.release() -# if res.key.isNone: -# ctx[].res[].ok((default(DataBuffer), default(DataBuffer))) -# return + trace "About to query" + if lock.locked: + return failure (ref DatastoreError)(msg: "Should always await query features") -# var -# keyBuf = DataBuffer.new($(res.key.get())) -# dataBuf = DataBuffer.new(res.data) + await lock.acquire() -# trace "Got query result", key = $res.key.get(), data = res.data -# ctx[].res[].ok((keyBuf, dataBuf)) + if iter.finished == true: + return failure (ref QueryEndedError)(msg: "Calling next on a finished query!") -# proc queryTask( -# ctx: ptr TaskCtx, -# iter: ptr QueryIter) = + iter.finished = childIter.finished + var + res = ThreadResult[QueryResponse]() + ctx = TaskCtx[QueryResponse]( + ds: self.ds, + res: addr res) -# defer: -# if not ctx.isNil: -# discard ctx[].signal.fireSync() + proc runTask() = + self.tp.spawn queryTask(addr ctx, addr childIter) -# try: -# waitFor asyncQueryTask(ctx, iter) -# except CatchableError as exc: -# trace "Unexpected exception thrown in asyncQueryTask", exc = exc.msg -# raiseAssert exc.msg + return self.dispatchTask(ctx, Key.none, runTask) -# method query*( -# self: ThreadDatastore, -# query: Query): Future[?!QueryIter] {.async.} = -# without var childIter =? await self.ds.query(query), error: -# return failure error - -# var -# iter = QueryIter.new() -# lock = newAsyncLock() # serialize querying under threads - -# proc next(): Future[?!QueryResponse] {.async.} = -# defer: -# if lock.locked: -# lock.release() - -# trace "About to query" -# if lock.locked: -# return failure (ref DatastoreError)(msg: "Should always await query features") - -# await lock.acquire() - -# if iter.finished == true: -# return failure (ref QueryEndedError)(msg: "Calling next on a finished query!") - -# iter.finished = childIter.finished -# var -# res = ThreadResult[QueryResponse]() -# ctx = TaskCtx[QueryResponse]( -# ds: self.ds, -# res: addr res) - -# proc runTask() = -# self.tp.spawn queryTask(addr ctx, addr childIter) - -# return self.dispatchTask(ctx, Key.none, runTask) - -# iter.next = next -# return success iter + iter.next = next + return success iter proc new*( self: type ThreadDatastore,