mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-07 16:13:07 +00:00
cleanup
This commit is contained in:
parent
7d98f5ed9d
commit
a626a03a8c
@ -8,6 +8,8 @@ import ./types
|
|||||||
|
|
||||||
export key, query, types
|
export key, query, types
|
||||||
|
|
||||||
|
const datastoreUseSync* {.booldefine.} = false
|
||||||
|
|
||||||
push: {.upraises: [].}
|
push: {.upraises: [].}
|
||||||
|
|
||||||
type
|
type
|
||||||
|
|||||||
@ -67,6 +67,7 @@ proc new*(
|
|||||||
|
|
||||||
let
|
let
|
||||||
backend = ? newFSBackend[KeyId, DataBuffer](
|
backend = ? newFSBackend[KeyId, DataBuffer](
|
||||||
root=root, depth=depth, caseSensitive=caseSensitive, ignoreProtected=ignoreProtected)
|
root = root, depth = depth, caseSensitive = caseSensitive,
|
||||||
|
ignoreProtected = ignoreProtected)
|
||||||
db = ? ThreadProxy.new(backend, tp = tp)
|
db = ? ThreadProxy.new(backend, tp = tp)
|
||||||
success FSDatastore(db: db)
|
success FSDatastore(db: db)
|
||||||
|
|||||||
@ -40,7 +40,7 @@ type
|
|||||||
TaskCtxObj*[T: ThreadTypes] = object
|
TaskCtxObj*[T: ThreadTypes] = object
|
||||||
res*: ThreadResult[T]
|
res*: ThreadResult[T]
|
||||||
signal: ThreadSignalPtr
|
signal: ThreadSignalPtr
|
||||||
running*: bool ## used to mark when a task worker is running
|
running*: bool ## used to mark when a task worker is running
|
||||||
cancelled*: bool ## used to cancel a task before it's started
|
cancelled*: bool ## used to cancel a task before it's started
|
||||||
nextSignal: ThreadSignalPtr
|
nextSignal: ThreadSignalPtr
|
||||||
|
|
||||||
@ -60,31 +60,32 @@ proc newTaskCtx*[T](tp: typedesc[T],
|
|||||||
newSharedPtr(TaskCtxObj[T](signal: signal, nextSignal: nextSignal))
|
newSharedPtr(TaskCtxObj[T](signal: signal, nextSignal: nextSignal))
|
||||||
|
|
||||||
proc setCancelled[T](ctx: TaskCtx[T]) =
|
proc setCancelled[T](ctx: TaskCtx[T]) =
|
||||||
ctx[].cancelled = true
|
ctx[].cancelled = true
|
||||||
|
|
||||||
proc setRunning[T](ctx: TaskCtx[T]): bool =
|
proc setRunning[T](ctx: TaskCtx[T]): bool =
|
||||||
if ctx[].cancelled:
|
if ctx[].cancelled:
|
||||||
return false
|
return false
|
||||||
ctx[].running = true
|
ctx[].running = true
|
||||||
return true
|
return true
|
||||||
proc setDone[T](ctx: TaskCtx[T]) =
|
proc setDone[T](ctx: TaskCtx[T]) =
|
||||||
ctx[].running = false
|
ctx[].running = false
|
||||||
|
|
||||||
proc acquireSignal(): ?!ThreadSignalPtr =
|
proc acquireSignal(): ?!ThreadSignalPtr =
|
||||||
# echo "signal:OPEN!"
|
# echo "signal:OPEN!"
|
||||||
let signal = ThreadSignalPtr.new()
|
let signal = ThreadSignalPtr.new()
|
||||||
if signal.isErr():
|
if signal.isErr():
|
||||||
failure (ref CatchableError)(msg: "failed to aquire ThreadSignalPtr: " & signal.error())
|
failure (ref CatchableError)(msg: "failed to aquire ThreadSignalPtr: " &
|
||||||
|
signal.error())
|
||||||
else:
|
else:
|
||||||
success signal.get()
|
success signal.get()
|
||||||
|
|
||||||
template executeTask*[T](ctx: TaskCtx[T], blk: untyped) =
|
template executeTask*[T](ctx: TaskCtx[T], blk: untyped) =
|
||||||
## executes a task on a thread work and handles cleanup after cancels/errors
|
## executes a task on a thread work and handles cleanup after cancels/errors
|
||||||
##
|
##
|
||||||
try:
|
try:
|
||||||
if not ctx.setRunning():
|
if not ctx.setRunning():
|
||||||
return
|
return
|
||||||
|
|
||||||
## run backend command
|
## run backend command
|
||||||
let res = `blk`
|
let res = `blk`
|
||||||
if res.isOk():
|
if res.isOk():
|
||||||
@ -148,7 +149,7 @@ proc has*[BT](self: ThreadProxy[BT],
|
|||||||
# without signal =? acquireSignal(), err:
|
# without signal =? acquireSignal(), err:
|
||||||
# return failure err
|
# return failure err
|
||||||
|
|
||||||
let ctx = newTaskCtx(bool, signal=signal)
|
let ctx = newTaskCtx(bool, signal = signal)
|
||||||
dispatchTask(self, signal):
|
dispatchTask(self, signal):
|
||||||
let key = KeyId.new key.id()
|
let key = KeyId.new key.id()
|
||||||
self.tp.spawn hasTask(ctx, ds, key)
|
self.tp.spawn hasTask(ctx, ds, key)
|
||||||
@ -169,7 +170,7 @@ proc delete*[BT](self: ThreadProxy[BT],
|
|||||||
# without signal =? acquireSignal(), err:
|
# without signal =? acquireSignal(), err:
|
||||||
# return failure err
|
# return failure err
|
||||||
|
|
||||||
let ctx = newTaskCtx(void, signal=signal)
|
let ctx = newTaskCtx(void, signal = signal)
|
||||||
dispatchTask(self, signal):
|
dispatchTask(self, signal):
|
||||||
let key = KeyId.new key.id()
|
let key = KeyId.new key.id()
|
||||||
self.tp.spawn deleteTask(ctx, ds, key)
|
self.tp.spawn deleteTask(ctx, ds, key)
|
||||||
@ -202,7 +203,7 @@ proc put*[BT](self: ThreadProxy[BT],
|
|||||||
# without signal =? acquireSignal(), err:
|
# without signal =? acquireSignal(), err:
|
||||||
# return failure err
|
# return failure err
|
||||||
|
|
||||||
let ctx = newTaskCtx(void, signal=signal)
|
let ctx = newTaskCtx(void, signal = signal)
|
||||||
dispatchTask(self, signal):
|
dispatchTask(self, signal):
|
||||||
let key = KeyId.new key.id()
|
let key = KeyId.new key.id()
|
||||||
let data = DataBuffer.new data
|
let data = DataBuffer.new data
|
||||||
@ -216,7 +217,7 @@ proc put*[E, DB](self: ThreadProxy[DB],
|
|||||||
for entry in batch:
|
for entry in batch:
|
||||||
if err =? (await self.put(entry.key, entry.data)).errorOption:
|
if err =? (await self.put(entry.key, entry.data)).errorOption:
|
||||||
return failure err
|
return failure err
|
||||||
|
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
|
|
||||||
@ -236,7 +237,7 @@ proc get*[BT](self: ThreadProxy[BT],
|
|||||||
# without signal =? acquireSignal(), err:
|
# without signal =? acquireSignal(), err:
|
||||||
# return failure err
|
# return failure err
|
||||||
|
|
||||||
let ctx = newTaskCtx(DataBuffer, signal=signal)
|
let ctx = newTaskCtx(DataBuffer, signal = signal)
|
||||||
dispatchTask(self, signal):
|
dispatchTask(self, signal):
|
||||||
let key = KeyId.new key.id()
|
let key = KeyId.new key.id()
|
||||||
self.tp.spawn getTask(ctx, ds, key)
|
self.tp.spawn getTask(ctx, ds, key)
|
||||||
@ -261,8 +262,6 @@ proc queryTask[DB](
|
|||||||
# we execute this all inside `executeTask`
|
# we execute this all inside `executeTask`
|
||||||
# so we need to return a final result
|
# so we need to return a final result
|
||||||
let handleRes = query(ds, query)
|
let handleRes = query(ds, query)
|
||||||
static:
|
|
||||||
echo "HANDLE_RES: ", typeof(handleRes)
|
|
||||||
if handleRes.isErr():
|
if handleRes.isErr():
|
||||||
# set error and exit executeTask, which will fire final signal
|
# set error and exit executeTask, which will fire final signal
|
||||||
(?!QResult).err(handleRes.error())
|
(?!QResult).err(handleRes.error())
|
||||||
@ -274,8 +273,6 @@ proc queryTask[DB](
|
|||||||
raise newException(DeadThreadDefect, "queryTask timed out")
|
raise newException(DeadThreadDefect, "queryTask timed out")
|
||||||
|
|
||||||
var handle = handleRes.get()
|
var handle = handleRes.get()
|
||||||
static:
|
|
||||||
echo "HANDLE: ", typeof(handle)
|
|
||||||
for item in handle.queryIter():
|
for item in handle.queryIter():
|
||||||
# wait for next request from async thread
|
# wait for next request from async thread
|
||||||
|
|
||||||
@ -298,8 +295,8 @@ proc query*[BT](self: ThreadProxy[BT],
|
|||||||
q: Query
|
q: Query
|
||||||
): Future[?!QueryIter] {.async.} =
|
): Future[?!QueryIter] {.async.} =
|
||||||
## performs async query
|
## performs async query
|
||||||
## keeps one thread running queryTask until finished
|
## keeps one thread running queryTask until finished
|
||||||
##
|
##
|
||||||
await self.semaphore.acquire()
|
await self.semaphore.acquire()
|
||||||
let signal = acquireSignal().get()
|
let signal = acquireSignal().get()
|
||||||
# without signal =? acquireSignal(), err:
|
# without signal =? acquireSignal(), err:
|
||||||
@ -307,7 +304,7 @@ proc query*[BT](self: ThreadProxy[BT],
|
|||||||
let nextSignal = acquireSignal().get()
|
let nextSignal = acquireSignal().get()
|
||||||
# without nextSignal =? acquireSignal(), err:
|
# without nextSignal =? acquireSignal(), err:
|
||||||
# return failure err
|
# return failure err
|
||||||
let ctx = newTaskCtx(QResult, signal=signal, nextSignal=nextSignal)
|
let ctx = newTaskCtx(QResult, signal = signal, nextSignal = nextSignal)
|
||||||
|
|
||||||
proc iterDispose() {.async.} =
|
proc iterDispose() {.async.} =
|
||||||
ctx.setCancelled()
|
ctx.setCancelled()
|
||||||
@ -318,8 +315,8 @@ proc query*[BT](self: ThreadProxy[BT],
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
let query = dbQuery(
|
let query = dbQuery(
|
||||||
key= KeyId.new q.key.id(),
|
key = KeyId.new q.key.id(),
|
||||||
value=q.value, limit=q.limit, offset=q.offset, sort=q.sort)
|
value = q.value, limit = q.limit, offset = q.offset, sort = q.sort)
|
||||||
|
|
||||||
# setup initial queryTask
|
# setup initial queryTask
|
||||||
dispatchTaskWrap(self, signal):
|
dispatchTaskWrap(self, signal):
|
||||||
@ -337,9 +334,11 @@ proc query*[BT](self: ThreadProxy[BT],
|
|||||||
try:
|
try:
|
||||||
trace "About to query"
|
trace "About to query"
|
||||||
if lock.locked:
|
if lock.locked:
|
||||||
return failure (ref DatastoreError)(msg: "Should always await query features")
|
return failure (ref DatastoreError)(
|
||||||
|
msg: "Should always await query features")
|
||||||
if iter.finished == true:
|
if iter.finished == true:
|
||||||
return failure (ref QueryEndedError)(msg: "Calling next on a finished query!")
|
return failure (ref QueryEndedError)(
|
||||||
|
msg: "Calling next on a finished query!")
|
||||||
|
|
||||||
await wait(ctx[].signal)
|
await wait(ctx[].signal)
|
||||||
if not ctx[].running:
|
if not ctx[].running:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user