nim-datastore/datastore/threads/threadproxyds.nim

330 lines
8.5 KiB
Nim
Raw Normal View History

2023-09-08 15:09:15 -06:00
2023-09-11 14:48:53 -06:00
when not compileOption("threads"):
{.error: "This module requires --threads:on compilation flag".}
2023-09-08 15:09:15 -06:00
import pkg/upraises
push: {.upraises: [].}
2023-09-15 13:08:38 -06:00
import std/tables
2023-09-25 22:35:34 -07:00
import std/locks
2023-09-08 15:09:15 -06:00
import pkg/chronos
import pkg/chronos/threadsync
import pkg/questionable
import pkg/questionable/results
import pkg/taskpools
2023-09-15 13:08:38 -06:00
import pkg/chronicles
2023-09-26 14:53:12 -07:00
import pkg/threading/smartptrs
2023-09-08 15:09:15 -06:00
import ../key
import ../query
import ../datastore
2023-09-20 22:12:53 -07:00
import ../backend
2023-09-25 21:27:35 -07:00
import ../sql/sqliteds
2023-09-08 15:09:15 -06:00
2023-09-15 13:08:38 -06:00
import ./asyncsemaphore
2023-09-12 13:51:01 -06:00
import ./databuffer
import ./threadresult
2023-09-08 15:09:15 -06:00
export threadresult
2023-09-15 13:08:38 -06:00
logScope:
topics = "datastore threadproxyds"
2023-09-08 15:09:15 -06:00
type
2023-09-25 22:35:34 -07:00
2023-09-25 21:27:35 -07:00
ThreadBackendKinds* = enum
Sqlite
# Filesystem
ThreadBackend* = object
case kind*: ThreadBackendKinds
of Sqlite:
sql*: SQLiteBackend[KeyId,DataBuffer]
2023-09-26 14:53:12 -07:00
TaskCtxObj[T: ThreadTypes] = object
2023-09-25 21:44:26 -07:00
res: ThreadResult[T]
2023-09-08 15:09:15 -06:00
signal: ThreadSignalPtr
2023-09-25 22:35:34 -07:00
running: bool
cancelled: bool
2023-09-08 15:09:15 -06:00
2023-09-26 14:53:12 -07:00
TaskCtx[T] = SharedPtr[TaskCtxObj[T]]
2023-09-08 15:09:15 -06:00
ThreadDatastore* = ref object of Datastore
2023-09-12 13:51:01 -06:00
tp: Taskpool
2023-09-25 21:44:26 -07:00
backend: ThreadBackend
2023-09-15 13:08:38 -06:00
semaphore: AsyncSemaphore # semaphore is used for backpressure \
# to avoid exhausting file descriptors
2023-09-15 16:40:46 -06:00
2023-09-25 22:54:58 -07:00
var ctxLock: Lock
ctxLock.initLock()
2023-09-25 22:35:34 -07:00
2023-09-26 15:29:35 -07:00
proc setCancelled[T](ctx: TaskCtx[T]) =
2023-09-25 22:54:58 -07:00
withLock(ctxLock):
2023-09-26 15:29:35 -07:00
ctx[].cancelled = true
2023-09-25 22:35:34 -07:00
2023-09-26 14:53:12 -07:00
proc setRunning[T](ctx: TaskCtx[T]): bool =
2023-09-26 13:25:15 -07:00
withLock(ctxLock):
2023-09-26 14:53:12 -07:00
if ctx[].cancelled:
2023-09-26 13:25:15 -07:00
return
2023-09-26 14:53:12 -07:00
ctx[].running = true
proc setDone[T](ctx: TaskCtx[T]) =
2023-09-26 13:25:15 -07:00
withLock(ctxLock):
2023-09-26 14:53:12 -07:00
ctx[].running = false
2023-09-26 13:25:15 -07:00
2023-09-25 22:51:34 -07:00
proc acquireSignal(): ?!ThreadSignalPtr =
let signal = ThreadSignalPtr.new()
if signal.isErr():
failure (ref CatchableError)(msg: "failed to aquire ThreadSignalPtr: " & signal.error())
else:
success signal.get()
2023-09-26 14:53:12 -07:00
template executeTask[T](ctx: TaskCtx[T], blk: untyped) =
2023-09-15 13:08:38 -06:00
try:
2023-09-26 13:25:15 -07:00
if not ctx.setRunning():
return
2023-09-25 23:05:26 -07:00
## run backend command
2023-09-25 23:21:08 -07:00
let res = `blk`
2023-09-25 23:48:17 -07:00
if res.isOk():
when T is void:
2023-09-26 14:53:12 -07:00
ctx[].res.ok()
2023-09-25 23:48:17 -07:00
else:
2023-09-26 14:53:12 -07:00
ctx[].res.ok(res.get())
2023-09-25 23:48:17 -07:00
else:
2023-09-26 14:53:12 -07:00
ctx[].res.err res.error().toThreadErr()
2023-09-15 13:08:38 -06:00
except CatchableError as exc:
2023-09-26 13:25:15 -07:00
trace "Unexpected exception thrown in async task", exc = exc.msg
ctx[].res.err exc.toThreadErr()
2023-09-26 13:40:57 -07:00
except Exception as exc:
trace "Unexpected defect thrown in async task", exc = exc.msg
ctx[].res.err exc.toThreadErr()
2023-09-25 23:05:26 -07:00
finally:
2023-09-26 13:25:15 -07:00
ctx.setDone()
2023-09-25 23:05:26 -07:00
discard ctx[].signal.fireSync()
2023-09-26 15:37:19 -07:00
template dispatchTaskWrap[T](self: ThreadDatastore,
2023-09-26 12:51:28 -07:00
signal: ThreadSignalPtr,
blk: untyped
): auto =
try:
case self.backend.kind:
of Sqlite:
var ds {.inject.} = self.backend.sql
proc runTask() =
`blk`
runTask()
2023-09-26 14:53:12 -07:00
await wait(ctx[].signal)
2023-09-26 12:51:28 -07:00
except CancelledError as exc:
trace "Cancelling thread future!", exc = exc.msg
2023-09-26 15:29:35 -07:00
ctx.setCancelled()
2023-09-26 12:51:28 -07:00
raise exc
finally:
2023-09-26 14:53:12 -07:00
discard ctx[].signal.close()
2023-09-26 12:51:28 -07:00
self.semaphore.release()
2023-09-26 15:37:19 -07:00
template dispatchTask[T](self: ThreadDatastore,
signal: ThreadSignalPtr,
blk: untyped
): auto =
let ctx {.inject.} = newSharedPtr(TaskCtxObj[T](signal: signal))
dispatchTaskWrap[T](self, signal, blk)
2023-09-26 14:53:12 -07:00
proc hasTask[T, DB](ctx: TaskCtx[T], ds: DB, key: KeyId) {.gcsafe.} =
2023-09-25 23:05:26 -07:00
## run backend command
2023-09-25 23:26:59 -07:00
executeTask(ctx):
2023-09-25 23:19:23 -07:00
has(ds, key)
2023-09-15 13:08:38 -06:00
2023-09-25 23:26:59 -07:00
method has*(self: ThreadDatastore,
key: Key): Future[?!bool] {.async.} =
2023-09-25 22:09:26 -07:00
await self.semaphore.acquire()
2023-09-25 22:35:34 -07:00
without signal =? acquireSignal(), err:
2023-09-25 23:26:59 -07:00
return failure err
2023-09-25 22:09:26 -07:00
2023-09-25 23:19:23 -07:00
let key = KeyId.new key.id()
2023-09-25 23:29:22 -07:00
dispatchTask[bool](self, signal):
2023-09-26 14:53:12 -07:00
self.tp.spawn hasTask(ctx, ds, key)
2023-09-25 21:44:26 -07:00
2023-09-26 14:53:12 -07:00
proc deleteTask[T, DB](ctx: TaskCtx[T], ds: DB;
2023-09-25 23:48:17 -07:00
key: KeyId) {.gcsafe.} =
2023-09-25 23:26:59 -07:00
## run backend command
executeTask(ctx):
delete(ds, key)
2023-09-25 21:44:26 -07:00
2023-09-25 23:26:59 -07:00
method delete*(self: ThreadDatastore,
key: Key): Future[?!void] {.async.} =
await self.semaphore.acquire()
without signal =? acquireSignal(), err:
return failure err
2023-09-25 21:44:26 -07:00
2023-09-25 23:26:59 -07:00
let key = KeyId.new key.id()
2023-09-25 23:29:22 -07:00
dispatchTask[void](self, signal):
2023-09-26 14:53:12 -07:00
self.tp.spawn deleteTask(ctx, ds, key)
2023-09-25 21:44:26 -07:00
2023-09-25 23:26:59 -07:00
method delete*(self: ThreadDatastore,
keys: seq[Key]): Future[?!void] {.async.} =
2023-09-25 21:44:26 -07:00
2023-09-25 23:26:59 -07:00
for key in keys:
if err =? (await self.delete(key)).errorOption:
return failure err
2023-09-25 21:44:26 -07:00
2023-09-25 23:26:59 -07:00
return success()
2023-09-25 21:44:26 -07:00
2023-09-26 14:53:12 -07:00
proc putTask[T, DB](ctx: TaskCtx[T], ds: DB;
2023-09-25 23:48:17 -07:00
key: KeyId,
data: DataBuffer) {.gcsafe, nimcall.} =
## run backend command
executeTask(ctx):
put(ds, key, data)
2023-09-25 21:44:26 -07:00
2023-09-25 23:48:17 -07:00
method put*(self: ThreadDatastore,
key: Key,
data: seq[byte]): Future[?!void] {.async.} =
await self.semaphore.acquire()
without signal =? acquireSignal(), err:
return failure err
2023-09-25 21:44:26 -07:00
2023-09-25 23:48:17 -07:00
let key = KeyId.new key.id()
let data = DataBuffer.new data
dispatchTask[void](self, signal):
2023-09-26 14:53:12 -07:00
self.tp.spawn putTask(ctx, ds, key, data)
2023-09-25 23:48:17 -07:00
method put*(
self: ThreadDatastore,
batch: seq[BatchEntry]): Future[?!void] {.async.} =
for entry in batch:
if err =? (await self.put(entry.key, entry.data)).errorOption:
return failure err
2023-09-25 21:44:26 -07:00
2023-09-25 23:48:17 -07:00
return success()
2023-09-25 21:44:26 -07:00
2023-09-26 14:53:12 -07:00
proc getTask[T, DB](ctx: TaskCtx[T], ds: DB;
2023-09-25 23:48:17 -07:00
key: KeyId) {.gcsafe, nimcall.} =
## run backend command
executeTask(ctx):
get(ds, key)
2023-09-25 21:44:26 -07:00
2023-09-25 23:48:17 -07:00
method get*(self: ThreadDatastore,
key: Key,
): Future[?!seq[byte]] {.async.} =
await self.semaphore.acquire()
without signal =? acquireSignal(), err:
return failure err
2023-09-25 21:44:26 -07:00
2023-09-25 23:48:17 -07:00
let key = KeyId.new key.id()
dispatchTask[void](self, signal):
2023-09-26 14:53:12 -07:00
self.tp.spawn getTask(ctx, ds, key)
2023-09-25 21:44:26 -07:00
2023-09-26 00:00:25 -07:00
method close*(self: ThreadDatastore): Future[?!void] {.async.} =
2023-09-26 00:02:44 -07:00
await self.semaphore.closeAll()
case self.backend.kind:
of Sqlite:
self.backend.sql.close()
2023-09-25 21:44:26 -07:00
2023-09-26 13:25:15 -07:00
type
QResult = DbQueryResponse[KeyId, DataBuffer]
proc queryTask[DB](
2023-09-26 14:53:12 -07:00
ctx: TaskCtx[QResult],
2023-09-26 13:25:15 -07:00
ds: DB,
dq: DbQuery[KeyId]
) {.gcsafe, nimcall.} =
## run query command
2023-09-26 13:31:24 -07:00
executeTask(ctx):
2023-09-26 15:54:43 -07:00
# we execute this all inside `executeTask`
# so we need to return a final result
2023-09-26 15:29:35 -07:00
let qh = ds.query(dq)
2023-09-26 15:50:22 -07:00
if qh.isErr():
2023-09-26 15:54:43 -07:00
# set error and exit executeTask, which will fire final signal
2023-09-26 15:50:22 -07:00
(?!QResult).err(qh.error())
2023-09-26 15:29:35 -07:00
else:
2023-09-26 15:54:43 -07:00
# otherwise manually an set empty ok result
2023-09-26 16:02:42 -07:00
ctx[].res.ok (KeyId.none, DataBuffer(), )
2023-09-26 15:50:22 -07:00
discard ctx[].signal.fireSync()
2023-09-26 15:29:35 -07:00
2023-09-26 15:50:22 -07:00
var handle = qh.get()
for item in handle.iter():
2023-09-26 15:29:35 -07:00
# wait for next request from async thread
discard ctx[].signal.waitSync().get()
2023-09-26 14:02:31 -07:00
2023-09-26 15:50:22 -07:00
if ctx[].cancelled:
# cancel iter, then run next cycle so it'll finish and close
handle.cancel = true
continue
else:
ctx[].res = item.mapErr() do(exc: ref CatchableError) -> ThreadResErr:
exc
discard ctx[].signal.fireSync()
2023-09-26 15:54:43 -07:00
# set final result
2023-09-26 16:02:42 -07:00
(?!QResult).ok((KeyId.none, DataBuffer()))
2023-09-25 21:44:26 -07:00
2023-09-26 00:00:25 -07:00
method query*(
self: ThreadDatastore,
2023-09-26 15:04:18 -07:00
q: Query): Future[?!QueryIter] {.async.} =
2023-09-25 21:44:26 -07:00
2023-09-26 13:01:53 -07:00
await self.semaphore.acquire()
without signal =? acquireSignal(), err:
return failure err
let dq = dbQuery(
2023-09-26 15:04:18 -07:00
key= KeyId.new q.key.id(),
value=q.value, limit=q.limit, offset=q.offset, sort=q.sort)
2023-09-26 13:01:53 -07:00
dispatchTask[DbQueryResponse[KeyId, DataBuffer]](self, signal):
2023-09-26 14:53:12 -07:00
self.tp.spawn queryTask(ctx, ds, dq)
2023-09-26 13:01:53 -07:00
2023-09-26 00:00:25 -07:00
var
lock = newAsyncLock() # serialize querying under threads
2023-09-26 13:34:20 -07:00
iter = QueryIter.new()
2023-09-25 21:44:26 -07:00
2023-09-26 00:00:25 -07:00
proc next(): Future[?!QueryResponse] {.async.} =
2023-09-26 15:40:28 -07:00
let ctx = ctx
2023-09-26 00:00:25 -07:00
defer:
if lock.locked:
lock.release()
2023-09-25 21:44:26 -07:00
2023-09-26 00:00:25 -07:00
trace "About to query"
if lock.locked:
return failure (ref DatastoreError)(msg: "Should always await query features")
2023-09-26 13:51:36 -07:00
if iter.finished == true:
return failure (ref QueryEndedError)(msg: "Calling next on a finished query!")
2023-09-25 21:44:26 -07:00
2023-09-26 00:00:25 -07:00
await lock.acquire()
2023-09-25 21:44:26 -07:00
2023-09-26 15:40:28 -07:00
dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer]](self, signal):
# trigger query task to iterate then wait for new result!
discard ctx[].signal.fireSync()
2023-09-25 21:44:26 -07:00
2023-09-26 15:56:09 -07:00
if not ctx[].running:
2023-09-26 14:09:38 -07:00
iter.finished = true
2023-09-26 15:56:09 -07:00
if ctx[].res.isErr():
2023-09-26 14:53:12 -07:00
return err(ctx[].res.error())
2023-09-26 14:09:38 -07:00
else:
2023-09-26 15:04:18 -07:00
let qres = ctx[].res.get()
let key = qres.key.map(proc (k: KeyId): Key = k.toKey())
let data = qres.data.toSeq()
return (?!QueryResponse).ok((key: key, data: data))
2023-09-13 14:41:16 -06:00
2023-09-26 00:00:25 -07:00
iter.next = next
return success iter
2023-09-13 14:41:16 -06:00
2023-09-26 16:02:42 -07:00
proc new*(self: type ThreadDatastore,
backend: ThreadBackendKinds,
withLocks = static false,
tp: Taskpool
): ?!ThreadDatastore =
2023-09-12 13:51:01 -06:00
doAssert tp.numThreads > 1, "ThreadDatastore requires at least 2 threads"
success ThreadDatastore(
tp: tp,
2023-09-26 16:02:42 -07:00
ds: ThreadBackend(),
withLocks: withLocks,
queryLock: newAsyncLock(),
2023-09-26 16:02:42 -07:00
semaphore: AsyncSemaphore.new(tp.numThreads - 1)
)