diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index b0b44e9..312db40 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -7,6 +7,7 @@ import pkg/upraises push: {.upraises: [].} import std/atomics +import std/strutils import pkg/chronos import pkg/chronos/threadsync @@ -14,81 +15,91 @@ import pkg/questionable import pkg/questionable/results import pkg/stew/ptrops import pkg/taskpools +import pkg/threading/smartptrs +import pkg/stew/byteutils import ../key import ../query import ../datastore -import ./foreignbuffer +import ./asyncsemaphore +import ./databuffer type - ThreadResults = object - ok: Atomic[bool] - msg: ForeignBuff[char] + ThreadTypes = void | bool | SomeInteger | DataBuffer + ThreadResult[T: ThreadTypes] = Result[T, DataBuffer] - TaskCtx = object + TaskCtx[T: ThreadTypes] = object ds: ptr Datastore - res: ptr ThreadResults + res: ptr ThreadResult[T] signal: ThreadSignalPtr ThreadDatastore* = ref object of Datastore - tp*: Taskpool - ds*: Datastore + tp: Taskpool + ds: Datastore + semaphore: AsyncSemaphore + tasks: seq[Future[void]] -proc success(self: var ThreadResults) {.inline.} = - self.ok.store(true) +template dispatchTask(self: ThreadDatastore, ctx: TaskCtx, runTask: proc): untyped = + let + fut = wait(ctx.signal) -proc failure(self: var ThreadResults, msg: var string) {.inline.} = - self.ok.store(false) - self.msg.attach(msg.toOpenArray(0, msg.high)) + try: + await self.semaphore.acquire() + runTask() + self.tasks.add(fut) + await fut + + if ctx.res[].isErr: + result = failure(ctx.res[].error()) + finally: + discard ctx.signal.close() + if ( + let idx = self.tasks.find(fut); + idx != -1): + self.tasks.del(idx) + + self.semaphore.release() proc hasTask( ctx: ptr TaskCtx, - key: ptr Key, - doesHave: ptr bool) = + key: ptr Key) = + + defer: + discard ctx[].signal.fireSync() without res =? (waitFor ctx[].ds[].has(key[])).catch, error: - ctx[].res[].failure(error.msg) + ctx[].res[].err(error) return - doesHave[] = res.get() - ctx[].res[].success() - discard ctx[].signal.fireSync() + ctx[].res[].ok(res.get()) method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} = var signal = ThreadSignalPtr.new().valueOr: return failure("Failed to create signal") - key = key - res = ThreadResults() - ctx = TaskCtx( + res = ThreadResult[bool]() + ctx = TaskCtx[bool]( ds: addr self.ds, res: addr res, signal: signal) - doesHave = false proc runTask() = - self.tp.spawn hasTask(addr ctx, addr key, addr doesHave) + self.tp.spawn hasTask(addr ctx, unsafeAddr key) - try: - runTask() - await wait(ctx.signal) - - if ctx.res.ok.load() == false: - return failure($(ctx.res.msg)) - - return success(doesHave) - finally: - ctx.signal.close() + self.dispatchTask(ctx, runTask) + return success(res.get()) proc delTask(ctx: ptr TaskCtx, key: ptr Key) = + defer: + discard ctx[].signal.fireSync() + without res =? (waitFor ctx[].ds[].delete(key[])).catch, error: - ctx[].res[].failure(error.msg) + ctx[].res[].err(error) return - ctx[].res[].ok.store(true) - discard ctx[].signal.fireSync() + ctx[].res[].ok() method delete*( self: ThreadDatastore, @@ -98,26 +109,17 @@ method delete*( signal = ThreadSignalPtr.new().valueOr: return failure("Failed to create signal") - key = key - res = ThreadResults() - ctx = TaskCtx( + res = ThreadResult[void]() + ctx = TaskCtx[void]( ds: addr self.ds, res: addr res, signal: signal) proc runTask() = - self.tp.spawn delTask(addr ctx, addr key) + self.tp.spawn delTask(addr ctx, unsafeAddr key) - try: - runTask() - await wait(ctx.signal) - - if ctx.res.ok.load() == false: - return failure("error") - - return success() - finally: - ctx.signal.close() + self.dispatchTask(ctx, runTask) + return success() method delete*(self: ThreadDatastore, keys: seq[Key]): Future[?!void] {.async.} = for key in keys: @@ -129,19 +131,19 @@ method delete*(self: ThreadDatastore, keys: seq[Key]): Future[?!void] {.async.} proc putTask( ctx: ptr TaskCtx, key: ptr Key, - data: ptr UncheckedArray[byte], + data: DataBuffer, len: int) = ## run put in a thread task ## - without res =? (waitFor ctx[].ds[].put( - key[], - @(toOpenArray(data, 0, len - 1)))).catch, error: - ctx[].res[].failure(error.msg) + defer: + discard ctx[].signal.fireSync() + + without res =? (waitFor ctx[].ds[].put(key[], @data)).catch, error: + ctx[].res[].err(error) return - ctx[].res[].ok.store(true) - discard ctx[].signal.fireSync() + ctx[].res[].ok() method put*( self: ThreadDatastore, @@ -151,10 +153,9 @@ method put*( var signal = ThreadSignalPtr.new().valueOr: return failure("Failed to create signal") - key = key - data = data - res = ThreadResults() - ctx = TaskCtx( + + res = ThreadResult[void]() + ctx = TaskCtx[void]( ds: addr self.ds, res: addr res, signal: signal) @@ -162,19 +163,11 @@ method put*( proc runTask() = self.tp.spawn putTask( addr ctx, - addr key, - makeUncheckedArray(baseAddr data), + unsafeAddr key, + DataBuffer.new(data), data.len) - try: - runTask() - await wait(ctx.signal) - finally: - ctx.signal.close() - - if ctx.res[].ok.load() == false: - return failure($(ctx.res[].msg)) - + self.dispatchTask(ctx, runTask) return success() method put*( @@ -189,22 +182,22 @@ method put*( proc getTask( ctx: ptr TaskCtx, - key: ptr Key, - buf: ptr ForeignBuff[byte]) = + key: ptr Key) = ## Run get in a thread task ## + defer: + discard ctx[].signal.fireSync() + without res =? (waitFor ctx[].ds[].get(key[])).catch, error: var err = error.msg - ctx[].res[].failure(error.msg) + ctx[].res[].err(error) return var data = res.get() - buf[].attach(data) - ctx[].res[].ok.store(res.isOk) - discard ctx[].signal.fireSync() + ctx[].res[].ok(DataBuffer.new(data)) method get*( self: ThreadDatastore, @@ -214,30 +207,32 @@ method get*( signal = ThreadSignalPtr.new().valueOr: return failure("Failed to create signal") - key = key - buf = ForeignBuff[byte].init() - res = ThreadResults() - ctx = TaskCtx( + var + res = ThreadResult[DataBuffer]() + ctx = TaskCtx[DataBuffer]( ds: addr self.ds, res: addr res, signal: signal) proc runTask() = - self.tp.spawn getTask(addr ctx, addr key, addr buf) + self.tp.spawn getTask(addr ctx, unsafeAddr key) - try: - runTask() - await wait(ctx.signal) + self.dispatchTask(ctx, runTask) + return success(@(res.get())) - if ctx.res.ok.load() == false: - return failure($(ctx.res[].msg)) +method close*(self: ThreadDatastore): Future[?!void] {.async.} = + for task in self.tasks: + await task.cancelAndWait() - return success(buf.toSeq()) - finally: - ctx.signal.close() + await self.ds.close() func new*( self: type ThreadDatastore, ds: Datastore, tp: Taskpool): ?!ThreadDatastore = - success ThreadDatastore(tp: tp, ds: ds) + doAssert tp.numThreads > 1, "ThreadDatastore requires at least 2 threads" + + success ThreadDatastore( + tp: tp, + ds: ds, + semaphore: AsyncSemaphore.new(tp.numThreads - 1)) # one thread is needed for the task dispatcher