avoid duplicating code

This commit is contained in:
Dmitriy Ryajov 2023-09-18 13:40:44 -06:00
parent ed09b9c936
commit 75fa37f567
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4

View File

@ -65,7 +65,6 @@ template withLocks(
if self.withLocks: if self.withLocks:
await self.queryLock.acquire() # only lock if it's required (fsds) await self.queryLock.acquire() # only lock if it's required (fsds)
body body
finally: finally:
if self.withLocks: if self.withLocks:
@ -74,29 +73,35 @@ template withLocks(
if self.queryLock.locked: if self.queryLock.locked:
self.queryLock.release() self.queryLock.release()
# TODO: needs rework, we can't use `result` with async
template dispatchTask( template dispatchTask(
self: ThreadDatastore, self: ThreadDatastore,
ctx: TaskCtx, ctx: TaskCtx,
key: ?Key = Key.none, key: ?Key = Key.none,
runTask: proc): untyped = runTask: proc): untyped =
try:
await self.semaphore.acquire()
ctx.signal = ThreadSignalPtr.new().valueOr:
result = failure(error())
return
let let
fut = wait(ctx.signal) fut = wait(ctx.signal)
withLocks(self, ctx, key, fut): withLocks(self, ctx, key, fut):
try:
runTask() runTask()
await fut await fut
if ctx.res[].isErr: if ctx.res[].isErr:
result = failure(ctx.res[].error()) # TODO: fix this, result shouldn't be accessed result = failure(ctx.res[].error()) # TODO: fix this, result shouldn't be accessed from a thread
except CancelledError as exc: except CancelledError as exc:
trace "Cancelling thread future!", exc = exc.msg trace "Cancelling thread future!", exc = exc.msg
ctx.cancelled = true ctx.cancelled = true
await ctx.signal.fire() await ctx.signal.fire()
raise exc raise exc
finally: finally:
discard ctx.signal.close() discard ctx.signal.close()
self.semaphore.release()
proc signalMonitor[T](ctx: ptr TaskCtx, fut: Future[T]) {.async.} = proc signalMonitor[T](ctx: ptr TaskCtx, fut: Future[T]) {.async.} =
## Monitor the signal and cancel the future if ## Monitor the signal and cancel the future if
@ -142,20 +147,11 @@ proc hasTask(ctx: ptr TaskCtx, key: ptr Key) =
raiseAssert exc.msg raiseAssert exc.msg
method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} = method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} =
defer:
self.semaphore.release()
await self.semaphore.acquire()
var var
signal = ThreadSignalPtr.new().valueOr:
return failure(error())
res = ThreadResult[bool]() res = ThreadResult[bool]()
ctx = TaskCtx[bool]( ctx = TaskCtx[bool](
ds: addr self.ds, ds: addr self.ds,
res: addr res, res: addr res)
signal: signal)
proc runTask() = proc runTask() =
self.tp.spawn hasTask(addr ctx, unsafeAddr key) self.tp.spawn hasTask(addr ctx, unsafeAddr key)
@ -189,20 +185,11 @@ proc delTask(ctx: ptr TaskCtx, key: ptr Key) =
method delete*( method delete*(
self: ThreadDatastore, self: ThreadDatastore,
key: Key): Future[?!void] {.async.} = key: Key): Future[?!void] {.async.} =
defer:
self.semaphore.release()
await self.semaphore.acquire()
var var
signal = ThreadSignalPtr.new().valueOr:
return failure(error())
res = ThreadResult[void]() res = ThreadResult[void]()
ctx = TaskCtx[void]( ctx = TaskCtx[void](
ds: addr self.ds, ds: addr self.ds,
res: addr res, res: addr res)
signal: signal)
proc runTask() = proc runTask() =
self.tp.spawn delTask(addr ctx, unsafeAddr key) self.tp.spawn delTask(addr ctx, unsafeAddr key)
@ -257,20 +244,11 @@ method put*(
self: ThreadDatastore, self: ThreadDatastore,
key: Key, key: Key,
data: seq[byte]): Future[?!void] {.async.} = data: seq[byte]): Future[?!void] {.async.} =
defer:
self.semaphore.release()
await self.semaphore.acquire()
var var
signal = ThreadSignalPtr.new().valueOr:
return failure(error())
res = ThreadResult[void]() res = ThreadResult[void]()
ctx = TaskCtx[void]( ctx = TaskCtx[void](
ds: addr self.ds, ds: addr self.ds,
res: addr res, res: addr res)
signal: signal)
proc runTask() = proc runTask() =
self.tp.spawn putTask( self.tp.spawn putTask(
@ -324,21 +302,11 @@ proc getTask(
method get*( method get*(
self: ThreadDatastore, self: ThreadDatastore,
key: Key): Future[?!seq[byte]] {.async.} = key: Key): Future[?!seq[byte]] {.async.} =
defer:
self.semaphore.release()
await self.semaphore.acquire()
var
signal = ThreadSignalPtr.new().valueOr:
return failure(error())
var var
res = ThreadResult[DataBuffer]() res = ThreadResult[DataBuffer]()
ctx = TaskCtx[DataBuffer]( ctx = TaskCtx[DataBuffer](
ds: addr self.ds, ds: addr self.ds,
res: addr res, res: addr res)
signal: signal)
proc runTask() = proc runTask() =
self.tp.spawn getTask(addr ctx, unsafeAddr key) self.tp.spawn getTask(addr ctx, unsafeAddr key)
@ -403,10 +371,8 @@ method query*(
proc next(): Future[?!QueryResponse] {.async.} = proc next(): Future[?!QueryResponse] {.async.} =
defer: defer:
locked = false locked = false
self.semaphore.release()
trace "About to query" trace "About to query"
await self.semaphore.acquire()
if locked: if locked:
return failure (ref DatastoreError)(msg: "Should always await query features") return failure (ref DatastoreError)(msg: "Should always await query features")
@ -419,21 +385,17 @@ method query*(
return success (Key.none, EmptyBytes) return success (Key.none, EmptyBytes)
var var
signal = ThreadSignalPtr.new().valueOr:
return failure("Failed to create signal")
res = ThreadResult[(bool, DataBuffer, DataBuffer)]() res = ThreadResult[(bool, DataBuffer, DataBuffer)]()
ctx = TaskCtx[(bool, DataBuffer, DataBuffer)]( ctx = TaskCtx[(bool, DataBuffer, DataBuffer)](
ds: addr self.ds, ds: addr self.ds,
res: addr res, res: addr res)
signal: signal)
proc runTask() = proc runTask() =
self.tp.spawn queryTask(addr ctx, addr childIter) self.tp.spawn queryTask(addr ctx, addr childIter)
self.dispatchTask(ctx, Key.none, runTask) self.dispatchTask(ctx, Key.none, runTask)
if err =? res.errorOption: if err =? res.errorOption:
trace "Query failed", err = err trace "Query failed", err = $err
return failure err return failure err
let (ok, key, data) = res.get() let (ok, key, data) = res.get()