mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-03-17 01:23:07 +00:00
manually allocat TaskCtx
This commit is contained in:
parent
1a4b9076b3
commit
5d259aca1d
@ -31,10 +31,10 @@ type
|
||||
ErrorEnum {.pure.} = enum
|
||||
DatastoreErr, DatastoreKeyNotFoundErr, CatchableErr
|
||||
|
||||
ThreadTypes = void | bool | SomeInteger | DataBuffer | tuple | Atomic
|
||||
ThreadResult[T: ThreadTypes] = Result[T, DataBuffer]
|
||||
ThreadTypes* = void | bool | SomeInteger | DataBuffer | tuple | Atomic
|
||||
ThreadResult*[T: ThreadTypes] = Result[T, DataBuffer]
|
||||
|
||||
TaskCtx[T: ThreadTypes] = ref object
|
||||
TaskCtx*[T: ThreadTypes] = object
|
||||
ds*: ptr Datastore
|
||||
res*: ThreadResult[T]
|
||||
cancelled: Atomic[bool]
|
||||
@ -52,9 +52,21 @@ type
|
||||
queryLock: AsyncLock # global query lock, this is only really \
|
||||
# needed for the fsds, but it is expensive!
|
||||
|
||||
template addrOf*[T](ctx: ptr TaskCtx[T]): ptr TaskCtx[T] =
|
||||
ctx
|
||||
|
||||
proc new*[T](
|
||||
ctx: typedesc[TaskCtx[T]],
|
||||
ds: Datastore,
|
||||
signal: ThreadSignalPtr,
|
||||
): ptr TaskCtx[T] =
|
||||
let res = cast[ptr TaskCtx[T]](allocShared0(sizeof(TaskCtx[T])))
|
||||
res[].ds = unsafeAddr ds
|
||||
res
|
||||
|
||||
template withLocks(
|
||||
self: ThreadDatastore,
|
||||
ctx: TaskCtx,
|
||||
ctx: ptr TaskCtx,
|
||||
key: ?Key = Key.none,
|
||||
fut: Future[void],
|
||||
body: untyped) =
|
||||
@ -77,7 +89,7 @@ template withLocks(
|
||||
|
||||
template dispatchTask(
|
||||
self: ThreadDatastore,
|
||||
ctx: TaskCtx,
|
||||
ctx: ptr TaskCtx,
|
||||
key: ?Key = Key.none,
|
||||
runTask: proc): untyped =
|
||||
|
||||
@ -97,7 +109,6 @@ template dispatchTask(
|
||||
# could do a spinlock here until the other side cancels,
|
||||
# but for now it'd at least be better to leak than possibly
|
||||
# corrupt memory since it's easier to detect and fix leaks
|
||||
GC_ref(ctx)
|
||||
warn "request was cancelled while thread task is running", exc = exc.msg
|
||||
ctx.cancelled.store(true, moAcquireRelease)
|
||||
await ctx.signal.fire()
|
||||
@ -161,12 +172,12 @@ method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} =
|
||||
signal = ThreadSignalPtr.new().valueOr:
|
||||
return failure(error())
|
||||
|
||||
ctx = TaskCtx[bool](
|
||||
ds: addr self.ds,
|
||||
signal: signal)
|
||||
ctx = TaskCtx[bool].new(
|
||||
ds = self.ds,
|
||||
signal = signal)
|
||||
|
||||
proc runTask() =
|
||||
self.tp.spawn hasTask(addr ctx, unsafeAddr key)
|
||||
self.tp.spawn hasTask(addrOf(ctx), unsafeAddr key)
|
||||
|
||||
self.dispatchTask(ctx, key.some, runTask)
|
||||
return success(ctx.res.get())
|
||||
@ -206,12 +217,12 @@ method delete*(
|
||||
signal = ThreadSignalPtr.new().valueOr:
|
||||
return failure(error())
|
||||
|
||||
ctx = TaskCtx[void](
|
||||
ds: addr self.ds,
|
||||
signal: signal)
|
||||
ctx = TaskCtx[void].new(
|
||||
ds= self.ds,
|
||||
signal= signal)
|
||||
|
||||
proc runTask() =
|
||||
self.tp.spawn delTask(addr ctx, unsafeAddr key)
|
||||
self.tp.spawn delTask(addrOf(ctx), unsafeAddr key)
|
||||
|
||||
self.dispatchTask(ctx, key.some, runTask)
|
||||
return success()
|
||||
@ -272,13 +283,13 @@ method put*(
|
||||
signal = ThreadSignalPtr.new().valueOr:
|
||||
return failure(error())
|
||||
|
||||
ctx = TaskCtx[void](
|
||||
ds: addr self.ds,
|
||||
signal: signal)
|
||||
ctx = TaskCtx[void].new(
|
||||
ds= self.ds,
|
||||
signal= signal)
|
||||
|
||||
proc runTask() =
|
||||
self.tp.spawn putTask(
|
||||
addr ctx,
|
||||
addrOf(ctx),
|
||||
unsafeAddr key,
|
||||
makeUncheckedArray(baseAddr data),
|
||||
data.len)
|
||||
@ -338,12 +349,12 @@ method get*(
|
||||
return failure(error())
|
||||
|
||||
var
|
||||
ctx = TaskCtx[DataBuffer](
|
||||
ds: addr self.ds,
|
||||
signal: signal)
|
||||
ctx = TaskCtx[DataBuffer].new(
|
||||
ds= self.ds,
|
||||
signal= signal)
|
||||
|
||||
proc runTask() =
|
||||
self.tp.spawn getTask(addr ctx, unsafeAddr key)
|
||||
self.tp.spawn getTask(addrOf(ctx), unsafeAddr key)
|
||||
|
||||
self.dispatchTask(ctx, key.some, runTask)
|
||||
if err =? ctx.res.errorOption:
|
||||
@ -424,12 +435,12 @@ method query*(
|
||||
signal = ThreadSignalPtr.new().valueOr:
|
||||
return failure("Failed to create signal")
|
||||
|
||||
ctx = TaskCtx[(bool, DataBuffer, DataBuffer)](
|
||||
ds: addr self.ds,
|
||||
signal: signal)
|
||||
ctx = TaskCtx[(bool, DataBuffer, DataBuffer)].new(
|
||||
ds= self.ds,
|
||||
signal= signal)
|
||||
|
||||
proc runTask() =
|
||||
self.tp.spawn queryTask(addr ctx, addr childIter)
|
||||
self.tp.spawn queryTask(addrOf(ctx), addr childIter)
|
||||
|
||||
self.dispatchTask(ctx, Key.none, runTask)
|
||||
if err =? ctx.res.errorOption:
|
||||
|
||||
@ -150,7 +150,7 @@ suite "Test ThreadDatastore cancelations":
|
||||
ds: addr sqlStore,
|
||||
signal: signal)
|
||||
fut = newFuture[void]("signalMonitor")
|
||||
threadArgs: (ptr TaskCtx, ptr Future[void]) = (unsafeAddr ctx[], addr fut)
|
||||
threadArgs = (cast[ptr TaskCtx[void]](ctx), addr fut)
|
||||
|
||||
var
|
||||
thread: Thread[type threadArgs]
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user