mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-06 15:43:10 +00:00
setup flags
This commit is contained in:
parent
cf4eb38d4f
commit
c96ae7c016
@ -37,7 +37,8 @@ type
|
||||
TaskCtx[T: ThreadTypes] = ref object
|
||||
ds: ptr Datastore
|
||||
res: ThreadResult[T]
|
||||
cancelled: bool
|
||||
cancelled: Atomic[bool]
|
||||
hasStarted: Atomic[bool]
|
||||
semaphore: AsyncSemaphore
|
||||
signal: ThreadSignalPtr
|
||||
|
||||
@ -85,7 +86,6 @@ template dispatchTask(
|
||||
|
||||
withLocks(self, ctx, key, fut):
|
||||
try:
|
||||
GC_ref(ctx)
|
||||
runTask()
|
||||
await fut
|
||||
|
||||
@ -93,7 +93,12 @@ template dispatchTask(
|
||||
result = failure(ctx.res.error()) # TODO: fix this, result shouldn't be accessed
|
||||
except CancelledError as exc:
|
||||
trace "Cancelling thread future!", exc = exc.msg
|
||||
ctx.cancelled = true
|
||||
if ctx.hasStarted.load(moAcquireRelease):
|
||||
# could do a spinlock here, but for now
|
||||
# it'd be better to leak than possibly corrupt memory
|
||||
# since it's easier to detect and fix leaks
|
||||
GC_ref(ctx)
|
||||
ctx.cancelled.store(true, moAcquireRelease)
|
||||
await ctx.signal.fire()
|
||||
raise exc
|
||||
finally:
|
||||
@ -105,10 +110,11 @@ proc signalMonitor[T](ctx: ptr TaskCtx, fut: Future[T]) {.async.} =
|
||||
##
|
||||
|
||||
try:
|
||||
ctx[].hasStarted.store(true, moAcquireRelease)
|
||||
await ctx[].signal.wait()
|
||||
trace "Received signal"
|
||||
|
||||
if ctx[].cancelled: # there could eventually be other flags
|
||||
if ctx[].cancelled.load(moAcquireRelease): # there could eventually be other flags
|
||||
trace "Cancelling future"
|
||||
if not fut.finished:
|
||||
await fut.cancelAndWait() # cancel the `has` future
|
||||
@ -337,10 +343,10 @@ method get*(
|
||||
self.tp.spawn getTask(addr ctx, unsafeAddr key)
|
||||
|
||||
self.dispatchTask(ctx, key.some, runTask)
|
||||
if err =? res.errorOption:
|
||||
if err =? ctx.res.errorOption:
|
||||
return failure err
|
||||
|
||||
return success(@(res.get()))
|
||||
return success(@(ctx.res.get()))
|
||||
|
||||
method close*(self: ThreadDatastore): Future[?!void] {.async.} =
|
||||
for fut in self.tasks.values.toSeq:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user