nim-datastore/datastore/threads/threadproxyds.nim

244 lines
4.9 KiB
Nim
Raw Normal View History

2023-09-08 15:09:15 -06:00
2023-09-11 14:48:53 -06:00
when not compileOption("threads"):
{.error: "This module requires --threads:on compilation flag".}
2023-09-08 15:09:15 -06:00
import pkg/upraises
push: {.upraises: [].}
import std/atomics
import pkg/chronos
import pkg/chronos/threadsync
import pkg/questionable
import pkg/questionable/results
import pkg/stew/ptrops
import pkg/taskpools
import ../key
import ../query
import ../datastore
import ./foreignbuffer
type
2023-09-11 14:48:53 -06:00
ThreadResults = object
2023-09-08 15:09:15 -06:00
ok: Atomic[bool]
2023-09-11 14:48:53 -06:00
msg: ForeignBuff[char]
2023-09-08 15:09:15 -06:00
TaskCtx = object
ds: ptr Datastore
2023-09-11 14:48:53 -06:00
res: ptr ThreadResults
2023-09-08 15:09:15 -06:00
signal: ThreadSignalPtr
ThreadDatastore* = ref object of Datastore
tp*: Taskpool
ds*: Datastore
2023-09-11 14:48:53 -06:00
proc success(self: var ThreadResults) {.inline.} =
self.ok.store(true)
proc failure(self: var ThreadResults, msg: var string) {.inline.} =
self.ok.store(false)
self.msg.attach(msg.toOpenArray(0, msg.high))
2023-09-08 15:09:15 -06:00
proc hasTask(
ctx: ptr TaskCtx,
key: ptr Key,
doesHave: ptr bool) =
2023-09-11 14:48:53 -06:00
without res =? (waitFor ctx[].ds[].has(key[])).catch, error:
ctx[].res[].failure(error.msg)
return
2023-09-08 15:09:15 -06:00
2023-09-11 14:48:53 -06:00
doesHave[] = res.get()
ctx[].res[].success()
2023-09-08 15:09:15 -06:00
discard ctx[].signal.fireSync()
2023-09-11 14:48:53 -06:00
method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} =
2023-09-08 15:09:15 -06:00
var
signal = ThreadSignalPtr.new().valueOr:
return failure("Failed to create signal")
key = key
2023-09-11 14:48:53 -06:00
res = ThreadResults()
2023-09-08 15:09:15 -06:00
ctx = TaskCtx(
ds: addr self.ds,
2023-09-11 14:48:53 -06:00
res: addr res,
2023-09-08 15:09:15 -06:00
signal: signal)
doesHave = false
proc runTask() =
self.tp.spawn hasTask(addr ctx, addr key, addr doesHave)
try:
runTask()
await wait(ctx.signal)
if ctx.res.ok.load() == false:
2023-09-11 14:48:53 -06:00
return failure($(ctx.res.msg))
2023-09-08 15:09:15 -06:00
return success(doesHave)
finally:
ctx.signal.close()
proc delTask(ctx: ptr TaskCtx, key: ptr Key) =
2023-09-11 14:48:53 -06:00
without res =? (waitFor ctx[].ds[].delete(key[])).catch, error:
ctx[].res[].failure(error.msg)
return
2023-09-08 15:09:15 -06:00
2023-09-11 14:48:53 -06:00
ctx[].res[].ok.store(true)
2023-09-08 15:09:15 -06:00
discard ctx[].signal.fireSync()
2023-09-11 14:48:53 -06:00
method delete*(
2023-09-08 15:09:15 -06:00
self: ThreadDatastore,
key: Key): Future[?!void] {.async.} =
var
signal = ThreadSignalPtr.new().valueOr:
return failure("Failed to create signal")
key = key
2023-09-11 14:48:53 -06:00
res = ThreadResults()
2023-09-08 15:09:15 -06:00
ctx = TaskCtx(
ds: addr self.ds,
2023-09-11 14:48:53 -06:00
res: addr res,
2023-09-08 15:09:15 -06:00
signal: signal)
proc runTask() =
self.tp.spawn delTask(addr ctx, addr key)
try:
runTask()
await wait(ctx.signal)
if ctx.res.ok.load() == false:
return failure("error")
return success()
finally:
ctx.signal.close()
2023-09-11 14:48:53 -06:00
method delete*(self: ThreadDatastore, keys: seq[Key]): Future[?!void] {.async.} =
for key in keys:
if err =? (await self.delete(key)).errorOption:
return failure err
return success()
2023-09-08 15:09:15 -06:00
proc putTask(
ctx: ptr TaskCtx,
key: ptr Key,
data: ptr UncheckedArray[byte],
len: int) =
## run put in a thread task
##
2023-09-11 14:48:53 -06:00
without res =? (waitFor ctx[].ds[].put(
2023-09-08 15:09:15 -06:00
key[],
2023-09-11 14:48:53 -06:00
@(toOpenArray(data, 0, len - 1)))).catch, error:
ctx[].res[].failure(error.msg)
return
2023-09-08 15:09:15 -06:00
2023-09-11 14:48:53 -06:00
ctx[].res[].ok.store(true)
2023-09-08 15:09:15 -06:00
discard ctx[].signal.fireSync()
2023-09-11 14:48:53 -06:00
method put*(
2023-09-08 15:09:15 -06:00
self: ThreadDatastore,
key: Key,
data: seq[byte]): Future[?!void] {.async.} =
var
signal = ThreadSignalPtr.new().valueOr:
return failure("Failed to create signal")
key = key
data = data
2023-09-11 14:48:53 -06:00
res = ThreadResults()
2023-09-08 15:09:15 -06:00
ctx = TaskCtx(
ds: addr self.ds,
2023-09-11 14:48:53 -06:00
res: addr res,
2023-09-08 15:09:15 -06:00
signal: signal)
proc runTask() =
self.tp.spawn putTask(
addr ctx,
2023-09-11 14:48:53 -06:00
addr key,
makeUncheckedArray(baseAddr data),
2023-09-08 15:09:15 -06:00
data.len)
try:
runTask()
await wait(ctx.signal)
finally:
ctx.signal.close()
2023-09-11 14:48:53 -06:00
if ctx.res[].ok.load() == false:
return failure($(ctx.res[].msg))
return success()
method put*(
self: ThreadDatastore,
batch: seq[BatchEntry]): Future[?!void] {.async.} =
for entry in batch:
if err =? (await self.put(entry.key, entry.data)).errorOption:
return failure err
2023-09-08 15:09:15 -06:00
return success()
proc getTask(
ctx: ptr TaskCtx,
key: ptr Key,
buf: ptr ForeignBuff[byte]) =
## Run get in a thread task
##
without res =? (waitFor ctx[].ds[].get(key[])).catch, error:
2023-09-11 14:48:53 -06:00
var err = error.msg
ctx[].res[].failure(error.msg)
2023-09-08 15:09:15 -06:00
return
var
data = res.get()
2023-09-11 14:48:53 -06:00
buf[].attach(data)
ctx[].res[].ok.store(res.isOk)
2023-09-08 15:09:15 -06:00
discard ctx[].signal.fireSync()
2023-09-11 14:48:53 -06:00
method get*(
2023-09-08 15:09:15 -06:00
self: ThreadDatastore,
key: Key): Future[?!seq[byte]] {.async.} =
var
signal = ThreadSignalPtr.new().valueOr:
return failure("Failed to create signal")
key = key
buf = ForeignBuff[byte].init()
2023-09-11 14:48:53 -06:00
res = ThreadResults()
2023-09-08 15:09:15 -06:00
ctx = TaskCtx(
ds: addr self.ds,
2023-09-11 14:48:53 -06:00
res: addr res,
2023-09-08 15:09:15 -06:00
signal: signal)
proc runTask() =
self.tp.spawn getTask(addr ctx, addr key, addr buf)
try:
runTask()
await wait(ctx.signal)
if ctx.res.ok.load() == false:
2023-09-11 14:48:53 -06:00
return failure($(ctx.res[].msg))
2023-09-08 15:09:15 -06:00
2023-09-11 14:48:53 -06:00
return success(buf.toSeq())
2023-09-08 15:09:15 -06:00
finally:
ctx.signal.close()
2023-09-11 14:48:53 -06:00
func new*(
2023-09-08 15:09:15 -06:00
self: type ThreadDatastore,
ds: Datastore,
tp: Taskpool): ?!ThreadDatastore =
success ThreadDatastore(tp: tp, ds: ds)