diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 9e5ce85..4d089e4 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -43,7 +43,7 @@ type of Sqlite: sql*: SQLiteBackend[KeyId,DataBuffer] - TaskCtx[D; T: ThreadTypes] = object + TaskCtx[T: ThreadTypes] = object res: ThreadResult[T] signal: ThreadSignalPtr running: bool @@ -78,7 +78,7 @@ template dispatchTask[T](self: ThreadDatastore, blk: untyped ): auto = var - ctx {.inject.} = TaskCtx[SqliteDB, T](signal: signal) + ctx {.inject.} = TaskCtx[T](signal: signal) try: case self.backend.kind: of Sqlite: @@ -98,7 +98,7 @@ template dispatchTask[T](self: ThreadDatastore, discard ctx.signal.close() self.semaphore.release() -template executeTask(ctx: ptr TaskCtx, blk: untyped) = +template executeTask[T](ctx: ptr TaskCtx[T], blk: untyped) = try: withLock(ctxLock): if ctx.cancelled: @@ -107,18 +107,23 @@ template executeTask(ctx: ptr TaskCtx, blk: untyped) = ## run backend command let res = `blk` + if res.isOk(): + when T is void: + ctx.res.ok() + else: + ctx.res.ok(res.get()) + else: + ctx.res.err res.error().toThreadErr() withLock(ctxLock): ctx.running = false - ctx.res = res.mapErr() do(e: ref CatchableError) -> ThreadResErr: - e.toThreadErr() except CatchableError as exc: trace "Unexpected exception thrown in asyncHasTask", exc = exc.msg raiseAssert exc.msg finally: discard ctx[].signal.fireSync() -proc hasTask[DB](ctx: ptr TaskCtx, ds: DB, key: KeyId) {.gcsafe.} = +proc hasTask[T, DB](ctx: ptr TaskCtx[T], ds: DB, key: KeyId) {.gcsafe.} = ## run backend command executeTask(ctx): has(ds, key) @@ -133,8 +138,8 @@ method has*(self: ThreadDatastore, dispatchTask[bool](self, signal): self.tp.spawn hasTask(addr ctx, ds, key) -proc deleteTask[DB](ctx: ptr TaskCtx, ds: DB; - key: KeyId) {.gcsafe.} = +proc deleteTask[T, DB](ctx: ptr TaskCtx[T], ds: DB; + key: KeyId) {.gcsafe.} = ## run backend command executeTask(ctx): delete(ds, key) @@ -158,128 +163,51 @@ method delete*(self: ThreadDatastore, return success() -# proc asyncPutTask( -# ctx: ptr TaskCtx[void], -# key: ptr Key, -# data: ptr UncheckedArray[byte], -# len: int) {.async.} = +proc putTask[DB](ctx: ptr TaskCtx, ds: DB; + key: KeyId, + data: DataBuffer) {.gcsafe, nimcall.} = + ## run backend command + executeTask(ctx): + put(ds, key, data) -# if ctx.isNil: -# trace "ctx is nil" -# return +method put*(self: ThreadDatastore, + key: Key, + data: seq[byte]): Future[?!void] {.async.} = + await self.semaphore.acquire() + without signal =? acquireSignal(), err: + return failure err -# let -# key = key[] -# data = @(data.toOpenArray(0, len - 1)) -# fut = ctx[].ds.put(key, data) + let key = KeyId.new key.id() + let data = DataBuffer.new data + dispatchTask[void](self, signal): + self.tp.spawn putTask(addr ctx, ds, key, data) + +method put*( + self: ThreadDatastore, + batch: seq[BatchEntry]): Future[?!void] {.async.} = -# asyncSpawn signalMonitor(ctx, fut) -# without res =? (await fut).catch, error: -# trace "Error in asyncPutTask", error = error.msg -# ctx[].res[].err(error) -# return + for entry in batch: + if err =? (await self.put(entry.key, entry.data)).errorOption: + return failure err -# ctx[].res[].ok() + return success() -# proc putTask( -# ctx: ptr TaskCtx, -# key: ptr Key, -# data: ptr UncheckedArray[byte], -# len: int) = -# ## run put in a thread task -# ## +proc getTask[DB](ctx: ptr TaskCtx, ds: DB; + key: KeyId) {.gcsafe, nimcall.} = + ## run backend command + executeTask(ctx): + get(ds, key) -# defer: -# if not ctx.isNil: -# discard ctx[].signal.fireSync() +method get*(self: ThreadDatastore, + key: Key, + ): Future[?!seq[byte]] {.async.} = + await self.semaphore.acquire() + without signal =? acquireSignal(), err: + return failure err -# try: -# waitFor asyncPutTask(ctx, key, data, len) -# except CatchableError as exc: -# trace "Unexpected exception thrown in asyncPutTask", exc = exc.msg -# raiseAssert exc.msg - -# method put*( -# self: ThreadDatastore, -# key: Key, -# data: seq[byte]): Future[?!void] {.async.} = -# var -# key = key -# data = data -# res = ThreadResult[void]() -# ctx = TaskCtx[void]( -# ds: self.ds, -# res: addr res) - -# proc runTask() = -# self.tp.spawn putTask( -# addr ctx, -# addr key, -# makeUncheckedArray(addr data[0]), -# data.len) - -# return self.dispatchTask(ctx, key.some, runTask) - -# method put*( -# self: ThreadDatastore, -# batch: seq[BatchEntry]): Future[?!void] {.async.} = - -# for entry in batch: -# if err =? (await self.put(entry.key, entry.data)).errorOption: -# return failure err - -# return success() - -# proc asyncGetTask( -# ctx: ptr TaskCtx[DataBuffer], -# key: ptr Key) {.async.} = -# if ctx.isNil: -# trace "ctx is nil" -# return - -# let -# key = key[] -# fut = ctx[].ds.get(key) - -# asyncSpawn signalMonitor(ctx, fut) -# without res =? (await fut).catch and data =? res, error: -# trace "Error in asyncGetTask", error = error.msg -# ctx[].res[].err(error) -# return - -# trace "Got data in get" -# ctx[].res[].ok(DataBuffer.new(data)) - -# proc getTask( -# ctx: ptr TaskCtx, -# key: ptr Key) = -# ## Run get in a thread task -# ## - -# defer: -# if not ctx.isNil: -# discard ctx[].signal.fireSync() - -# try: -# waitFor asyncGetTask(ctx, key) -# except CatchableError as exc: -# trace "Unexpected exception thrown in asyncGetTask", exc = exc.msg -# raiseAssert exc.msg - -# method get*( -# self: ThreadDatastore, -# key: Key): Future[?!seq[byte]] {.async.} = -# var -# key = key -# res = ThreadResult[DataBuffer]() -# ctx = TaskCtx[DataBuffer]( -# ds: self.ds, -# res: addr res) - -# proc runTask() = -# self.tp.spawn getTask(addr ctx, addr key) - -# return self.dispatchTask(ctx, key.some, runTask) + let key = KeyId.new key.id() + dispatchTask[void](self, signal): + self.tp.spawn getTask(addr ctx, ds, key) # method close*(self: ThreadDatastore): Future[?!void] {.async.} = # for fut in self.tasks.values.toSeq: