diff --git a/datastore/datastore.nim b/datastore/datastore.nim index c0f1f0d..4be2598 100644 --- a/datastore/datastore.nim +++ b/datastore/datastore.nim @@ -8,6 +8,8 @@ import ./types export key, query, types +const datastoreUseSync* {.booldefine.} = false + push: {.upraises: [].} type diff --git a/datastore/fsds.nim b/datastore/fsds.nim index 7e1ba33..7420eff 100644 --- a/datastore/fsds.nim +++ b/datastore/fsds.nim @@ -67,6 +67,7 @@ proc new*( let backend = ? newFSBackend[KeyId, DataBuffer]( - root=root, depth=depth, caseSensitive=caseSensitive, ignoreProtected=ignoreProtected) + root = root, depth = depth, caseSensitive = caseSensitive, + ignoreProtected = ignoreProtected) db = ? ThreadProxy.new(backend, tp = tp) success FSDatastore(db: db) diff --git a/datastore/threads/threadproxy.nim b/datastore/threads/threadproxy.nim index a3f9a2d..a553894 100644 --- a/datastore/threads/threadproxy.nim +++ b/datastore/threads/threadproxy.nim @@ -40,7 +40,7 @@ type TaskCtxObj*[T: ThreadTypes] = object res*: ThreadResult[T] signal: ThreadSignalPtr - running*: bool ## used to mark when a task worker is running + running*: bool ## used to mark when a task worker is running cancelled*: bool ## used to cancel a task before it's started nextSignal: ThreadSignalPtr @@ -60,31 +60,32 @@ proc newTaskCtx*[T](tp: typedesc[T], newSharedPtr(TaskCtxObj[T](signal: signal, nextSignal: nextSignal)) proc setCancelled[T](ctx: TaskCtx[T]) = - ctx[].cancelled = true + ctx[].cancelled = true proc setRunning[T](ctx: TaskCtx[T]): bool = - if ctx[].cancelled: - return false - ctx[].running = true - return true + if ctx[].cancelled: + return false + ctx[].running = true + return true proc setDone[T](ctx: TaskCtx[T]) = - ctx[].running = false + ctx[].running = false proc acquireSignal(): ?!ThreadSignalPtr = # echo "signal:OPEN!" let signal = ThreadSignalPtr.new() if signal.isErr(): - failure (ref CatchableError)(msg: "failed to aquire ThreadSignalPtr: " & signal.error()) + failure (ref CatchableError)(msg: "failed to aquire ThreadSignalPtr: " & + signal.error()) else: success signal.get() template executeTask*[T](ctx: TaskCtx[T], blk: untyped) = ## executes a task on a thread work and handles cleanup after cancels/errors - ## + ## try: if not ctx.setRunning(): return - + ## run backend command let res = `blk` if res.isOk(): @@ -148,7 +149,7 @@ proc has*[BT](self: ThreadProxy[BT], # without signal =? acquireSignal(), err: # return failure err - let ctx = newTaskCtx(bool, signal=signal) + let ctx = newTaskCtx(bool, signal = signal) dispatchTask(self, signal): let key = KeyId.new key.id() self.tp.spawn hasTask(ctx, ds, key) @@ -169,7 +170,7 @@ proc delete*[BT](self: ThreadProxy[BT], # without signal =? acquireSignal(), err: # return failure err - let ctx = newTaskCtx(void, signal=signal) + let ctx = newTaskCtx(void, signal = signal) dispatchTask(self, signal): let key = KeyId.new key.id() self.tp.spawn deleteTask(ctx, ds, key) @@ -202,7 +203,7 @@ proc put*[BT](self: ThreadProxy[BT], # without signal =? acquireSignal(), err: # return failure err - let ctx = newTaskCtx(void, signal=signal) + let ctx = newTaskCtx(void, signal = signal) dispatchTask(self, signal): let key = KeyId.new key.id() let data = DataBuffer.new data @@ -216,7 +217,7 @@ proc put*[E, DB](self: ThreadProxy[DB], for entry in batch: if err =? (await self.put(entry.key, entry.data)).errorOption: return failure err - + return success() @@ -236,7 +237,7 @@ proc get*[BT](self: ThreadProxy[BT], # without signal =? acquireSignal(), err: # return failure err - let ctx = newTaskCtx(DataBuffer, signal=signal) + let ctx = newTaskCtx(DataBuffer, signal = signal) dispatchTask(self, signal): let key = KeyId.new key.id() self.tp.spawn getTask(ctx, ds, key) @@ -261,8 +262,6 @@ proc queryTask[DB]( # we execute this all inside `executeTask` # so we need to return a final result let handleRes = query(ds, query) - static: - echo "HANDLE_RES: ", typeof(handleRes) if handleRes.isErr(): # set error and exit executeTask, which will fire final signal (?!QResult).err(handleRes.error()) @@ -274,8 +273,6 @@ proc queryTask[DB]( raise newException(DeadThreadDefect, "queryTask timed out") var handle = handleRes.get() - static: - echo "HANDLE: ", typeof(handle) for item in handle.queryIter(): # wait for next request from async thread @@ -298,8 +295,8 @@ proc query*[BT](self: ThreadProxy[BT], q: Query ): Future[?!QueryIter] {.async.} = ## performs async query - ## keeps one thread running queryTask until finished - ## + ## keeps one thread running queryTask until finished + ## await self.semaphore.acquire() let signal = acquireSignal().get() # without signal =? acquireSignal(), err: @@ -307,7 +304,7 @@ proc query*[BT](self: ThreadProxy[BT], let nextSignal = acquireSignal().get() # without nextSignal =? acquireSignal(), err: # return failure err - let ctx = newTaskCtx(QResult, signal=signal, nextSignal=nextSignal) + let ctx = newTaskCtx(QResult, signal = signal, nextSignal = nextSignal) proc iterDispose() {.async.} = ctx.setCancelled() @@ -318,8 +315,8 @@ proc query*[BT](self: ThreadProxy[BT], try: let query = dbQuery( - key= KeyId.new q.key.id(), - value=q.value, limit=q.limit, offset=q.offset, sort=q.sort) + key = KeyId.new q.key.id(), + value = q.value, limit = q.limit, offset = q.offset, sort = q.sort) # setup initial queryTask dispatchTaskWrap(self, signal): @@ -337,9 +334,11 @@ proc query*[BT](self: ThreadProxy[BT], try: trace "About to query" if lock.locked: - return failure (ref DatastoreError)(msg: "Should always await query features") + return failure (ref DatastoreError)( + msg: "Should always await query features") if iter.finished == true: - return failure (ref QueryEndedError)(msg: "Calling next on a finished query!") + return failure (ref QueryEndedError)( + msg: "Calling next on a finished query!") await wait(ctx[].signal) if not ctx[].running: