reworked with less copying

This commit is contained in:
Dmitriy Ryajov 2023-09-12 13:51:01 -06:00
parent 5adc7c9611
commit 4c48383b88
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4

View File

@ -7,6 +7,7 @@ import pkg/upraises
push: {.upraises: [].} push: {.upraises: [].}
import std/atomics import std/atomics
import std/strutils
import pkg/chronos import pkg/chronos
import pkg/chronos/threadsync import pkg/chronos/threadsync
@ -14,81 +15,91 @@ import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import pkg/stew/ptrops import pkg/stew/ptrops
import pkg/taskpools import pkg/taskpools
import pkg/threading/smartptrs
import pkg/stew/byteutils
import ../key import ../key
import ../query import ../query
import ../datastore import ../datastore
import ./foreignbuffer import ./asyncsemaphore
import ./databuffer
type type
ThreadResults = object ThreadTypes = void | bool | SomeInteger | DataBuffer
ok: Atomic[bool] ThreadResult[T: ThreadTypes] = Result[T, DataBuffer]
msg: ForeignBuff[char]
TaskCtx = object TaskCtx[T: ThreadTypes] = object
ds: ptr Datastore ds: ptr Datastore
res: ptr ThreadResults res: ptr ThreadResult[T]
signal: ThreadSignalPtr signal: ThreadSignalPtr
ThreadDatastore* = ref object of Datastore ThreadDatastore* = ref object of Datastore
tp*: Taskpool tp: Taskpool
ds*: Datastore ds: Datastore
semaphore: AsyncSemaphore
tasks: seq[Future[void]]
proc success(self: var ThreadResults) {.inline.} = template dispatchTask(self: ThreadDatastore, ctx: TaskCtx, runTask: proc): untyped =
self.ok.store(true) let
fut = wait(ctx.signal)
proc failure(self: var ThreadResults, msg: var string) {.inline.} = try:
self.ok.store(false) await self.semaphore.acquire()
self.msg.attach(msg.toOpenArray(0, msg.high)) runTask()
self.tasks.add(fut)
await fut
if ctx.res[].isErr:
result = failure(ctx.res[].error())
finally:
discard ctx.signal.close()
if (
let idx = self.tasks.find(fut);
idx != -1):
self.tasks.del(idx)
self.semaphore.release()
proc hasTask( proc hasTask(
ctx: ptr TaskCtx, ctx: ptr TaskCtx,
key: ptr Key, key: ptr Key) =
doesHave: ptr bool) =
defer:
discard ctx[].signal.fireSync()
without res =? (waitFor ctx[].ds[].has(key[])).catch, error: without res =? (waitFor ctx[].ds[].has(key[])).catch, error:
ctx[].res[].failure(error.msg) ctx[].res[].err(error)
return return
doesHave[] = res.get() ctx[].res[].ok(res.get())
ctx[].res[].success()
discard ctx[].signal.fireSync()
method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} = method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} =
var var
signal = ThreadSignalPtr.new().valueOr: signal = ThreadSignalPtr.new().valueOr:
return failure("Failed to create signal") return failure("Failed to create signal")
key = key res = ThreadResult[bool]()
res = ThreadResults() ctx = TaskCtx[bool](
ctx = TaskCtx(
ds: addr self.ds, ds: addr self.ds,
res: addr res, res: addr res,
signal: signal) signal: signal)
doesHave = false
proc runTask() = proc runTask() =
self.tp.spawn hasTask(addr ctx, addr key, addr doesHave) self.tp.spawn hasTask(addr ctx, unsafeAddr key)
try: self.dispatchTask(ctx, runTask)
runTask() return success(res.get())
await wait(ctx.signal)
if ctx.res.ok.load() == false:
return failure($(ctx.res.msg))
return success(doesHave)
finally:
ctx.signal.close()
proc delTask(ctx: ptr TaskCtx, key: ptr Key) = proc delTask(ctx: ptr TaskCtx, key: ptr Key) =
defer:
discard ctx[].signal.fireSync()
without res =? (waitFor ctx[].ds[].delete(key[])).catch, error: without res =? (waitFor ctx[].ds[].delete(key[])).catch, error:
ctx[].res[].failure(error.msg) ctx[].res[].err(error)
return return
ctx[].res[].ok.store(true) ctx[].res[].ok()
discard ctx[].signal.fireSync()
method delete*( method delete*(
self: ThreadDatastore, self: ThreadDatastore,
@ -98,26 +109,17 @@ method delete*(
signal = ThreadSignalPtr.new().valueOr: signal = ThreadSignalPtr.new().valueOr:
return failure("Failed to create signal") return failure("Failed to create signal")
key = key res = ThreadResult[void]()
res = ThreadResults() ctx = TaskCtx[void](
ctx = TaskCtx(
ds: addr self.ds, ds: addr self.ds,
res: addr res, res: addr res,
signal: signal) signal: signal)
proc runTask() = proc runTask() =
self.tp.spawn delTask(addr ctx, addr key) self.tp.spawn delTask(addr ctx, unsafeAddr key)
try:
runTask()
await wait(ctx.signal)
if ctx.res.ok.load() == false:
return failure("error")
self.dispatchTask(ctx, runTask)
return success() return success()
finally:
ctx.signal.close()
method delete*(self: ThreadDatastore, keys: seq[Key]): Future[?!void] {.async.} = method delete*(self: ThreadDatastore, keys: seq[Key]): Future[?!void] {.async.} =
for key in keys: for key in keys:
@ -129,19 +131,19 @@ method delete*(self: ThreadDatastore, keys: seq[Key]): Future[?!void] {.async.}
proc putTask( proc putTask(
ctx: ptr TaskCtx, ctx: ptr TaskCtx,
key: ptr Key, key: ptr Key,
data: ptr UncheckedArray[byte], data: DataBuffer,
len: int) = len: int) =
## run put in a thread task ## run put in a thread task
## ##
without res =? (waitFor ctx[].ds[].put( defer:
key[], discard ctx[].signal.fireSync()
@(toOpenArray(data, 0, len - 1)))).catch, error:
ctx[].res[].failure(error.msg) without res =? (waitFor ctx[].ds[].put(key[], @data)).catch, error:
ctx[].res[].err(error)
return return
ctx[].res[].ok.store(true) ctx[].res[].ok()
discard ctx[].signal.fireSync()
method put*( method put*(
self: ThreadDatastore, self: ThreadDatastore,
@ -151,10 +153,9 @@ method put*(
var var
signal = ThreadSignalPtr.new().valueOr: signal = ThreadSignalPtr.new().valueOr:
return failure("Failed to create signal") return failure("Failed to create signal")
key = key
data = data res = ThreadResult[void]()
res = ThreadResults() ctx = TaskCtx[void](
ctx = TaskCtx(
ds: addr self.ds, ds: addr self.ds,
res: addr res, res: addr res,
signal: signal) signal: signal)
@ -162,19 +163,11 @@ method put*(
proc runTask() = proc runTask() =
self.tp.spawn putTask( self.tp.spawn putTask(
addr ctx, addr ctx,
addr key, unsafeAddr key,
makeUncheckedArray(baseAddr data), DataBuffer.new(data),
data.len) data.len)
try: self.dispatchTask(ctx, runTask)
runTask()
await wait(ctx.signal)
finally:
ctx.signal.close()
if ctx.res[].ok.load() == false:
return failure($(ctx.res[].msg))
return success() return success()
method put*( method put*(
@ -189,22 +182,22 @@ method put*(
proc getTask( proc getTask(
ctx: ptr TaskCtx, ctx: ptr TaskCtx,
key: ptr Key, key: ptr Key) =
buf: ptr ForeignBuff[byte]) =
## Run get in a thread task ## Run get in a thread task
## ##
defer:
discard ctx[].signal.fireSync()
without res =? (waitFor ctx[].ds[].get(key[])).catch, error: without res =? (waitFor ctx[].ds[].get(key[])).catch, error:
var err = error.msg var err = error.msg
ctx[].res[].failure(error.msg) ctx[].res[].err(error)
return return
var var
data = res.get() data = res.get()
buf[].attach(data) ctx[].res[].ok(DataBuffer.new(data))
ctx[].res[].ok.store(res.isOk)
discard ctx[].signal.fireSync()
method get*( method get*(
self: ThreadDatastore, self: ThreadDatastore,
@ -214,30 +207,32 @@ method get*(
signal = ThreadSignalPtr.new().valueOr: signal = ThreadSignalPtr.new().valueOr:
return failure("Failed to create signal") return failure("Failed to create signal")
key = key var
buf = ForeignBuff[byte].init() res = ThreadResult[DataBuffer]()
res = ThreadResults() ctx = TaskCtx[DataBuffer](
ctx = TaskCtx(
ds: addr self.ds, ds: addr self.ds,
res: addr res, res: addr res,
signal: signal) signal: signal)
proc runTask() = proc runTask() =
self.tp.spawn getTask(addr ctx, addr key, addr buf) self.tp.spawn getTask(addr ctx, unsafeAddr key)
try: self.dispatchTask(ctx, runTask)
runTask() return success(@(res.get()))
await wait(ctx.signal)
if ctx.res.ok.load() == false: method close*(self: ThreadDatastore): Future[?!void] {.async.} =
return failure($(ctx.res[].msg)) for task in self.tasks:
await task.cancelAndWait()
return success(buf.toSeq()) await self.ds.close()
finally:
ctx.signal.close()
func new*( func new*(
self: type ThreadDatastore, self: type ThreadDatastore,
ds: Datastore, ds: Datastore,
tp: Taskpool): ?!ThreadDatastore = tp: Taskpool): ?!ThreadDatastore =
success ThreadDatastore(tp: tp, ds: ds) doAssert tp.numThreads > 1, "ThreadDatastore requires at least 2 threads"
success ThreadDatastore(
tp: tp,
ds: ds,
semaphore: AsyncSemaphore.new(tp.numThreads - 1)) # one thread is needed for the task dispatcher