This commit is contained in:
Jaremy Creechley 2023-09-18 14:15:47 -07:00
commit bbfc319624
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
2 changed files with 29 additions and 47 deletions

View File

@ -1,5 +1,6 @@
import threading/smartptrs
import std/hashes
import pkg/stew/ptrops
export hashes
@ -50,7 +51,9 @@ proc new*[T: byte | char](tp: type DataBuffer, data: openArray[T]): DataBuffer =
##
result = DataBuffer.new(data.len)
if data.len() > 0:
copyMem(result[].buf, unsafeAddr data[0], data.len)
# TODO: we might want to copy data, otherwise the GC might
# release it on stack-unwind
copyMem(result[].buf, baseAddr data, data.len)
converter toSeq*(self: DataBuffer): seq[byte] =
## convert buffer to a seq type using copy and either a byte or char
@ -58,7 +61,7 @@ converter toSeq*(self: DataBuffer): seq[byte] =
result = newSeq[byte](self.len)
if self.len() > 0:
copyMem(addr result[0], unsafeAddr self[].buf[0], self.len)
copyMem(addr result[0], addr self[].buf[0], self.len)
proc `@`*(self: DataBuffer): seq[byte] =
## Convert a buffer to a seq type using copy and
@ -74,7 +77,7 @@ converter toString*(data: DataBuffer): string =
if data.isNil: return ""
result = newString(data.len())
if data.len() > 0:
copyMem(addr result[0], unsafeAddr data[].buf[0], data.len)
copyMem(addr result[0], addr data[].buf[0], data.len)
proc `$`*(data: DataBuffer): string =
## convert buffer to string type using copy

View File

@ -81,7 +81,6 @@ template withLocks(
if self.withLocks:
await self.queryLock.acquire() # only lock if it's required (fsds)
body
finally:
if self.withLocks:
@ -90,35 +89,41 @@ template withLocks(
if self.queryLock.locked:
self.queryLock.release()
# TODO: needs rework, we can't use `result` with async
template dispatchTask(
self: ThreadDatastore,
ctx: ref TaskCtx,
key: ?Key = Key.none,
runTask: proc): untyped =
try:
await self.semaphore.acquire()
ctx.signal = ThreadSignalPtr.new().valueOr:
result = failure(error())
return
let
fut = wait(ctx.signal)
let
fut = wait(ctx.signal)
withLocks(self, ctx, key, fut):
try:
withLocks(self, ctx, key, fut):
runTask()
await fut
if ctx.res.isErr:
result = failure(ctx.res.error()) # TODO: fix this, result shouldn't be accessed
except CancelledError as exc:
trace "Cancelling thread future!", exc = exc.msg
if ctx.isActive.load(moAcquireRelease):
# could do a spinlock here until the other side cancels,
# but for now it'd at least be better to leak than possibly
# corrupt memory since it's easier to detect and fix leaks
warn "request was cancelled while thread task is running", exc = exc.msg
GC_ref(ctx)
ctx.cancelled.store(true, moAcquireRelease)
await ctx.signal.fire()
raise exc
finally:
discard ctx.signal.close()
except CancelledError as exc:
trace "Cancelling thread future!", exc = exc.msg
if ctx.isActive.load(moAcquireRelease):
# could do a spinlock here until the other side cancels,
# but for now it'd at least be better to leak than possibly
# corrupt memory since it's easier to detect and fix leaks
warn "request was cancelled while thread task is running", exc = exc.msg
GC_ref(ctx)
ctx.cancelled.store(true, moAcquireRelease)
await ctx.signal.fire()
raise exc
finally:
discard ctx.signal.close()
self.semaphore.release()
proc signalMonitor[T](ctx: ptr TaskCtx, fut: Future[T]) {.async.} =
## Monitor the signal and cancel the future if
@ -167,11 +172,6 @@ proc hasTask(ctx: ptr TaskCtx, key: ptr Key) =
raiseAssert exc.msg
method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} =
defer:
self.semaphore.release()
await self.semaphore.acquire()
var
signal = ThreadSignalPtr.new().valueOr:
return failure(error())
@ -212,11 +212,6 @@ proc delTask(ctx: ptr TaskCtx, key: ptr Key) =
method delete*(
self: ThreadDatastore,
key: Key): Future[?!void] {.async.} =
defer:
self.semaphore.release()
await self.semaphore.acquire()
var
signal = ThreadSignalPtr.new().valueOr:
return failure(error())
@ -278,11 +273,6 @@ method put*(
self: ThreadDatastore,
key: Key,
data: seq[byte]): Future[?!void] {.async.} =
defer:
self.semaphore.release()
await self.semaphore.acquire()
var
signal = ThreadSignalPtr.new().valueOr:
return failure(error())
@ -343,15 +333,6 @@ proc getTask(
method get*(
self: ThreadDatastore,
key: Key): Future[?!seq[byte]] {.async.} =
defer:
self.semaphore.release()
await self.semaphore.acquire()
var
signal = ThreadSignalPtr.new().valueOr:
return failure(error())
var
ctx = TaskCtx[DataBuffer].new(
ds= self.ds,
@ -420,10 +401,8 @@ method query*(
proc next(): Future[?!QueryResponse] {.async.} =
defer:
locked = false
self.semaphore.release()
trace "About to query"
await self.semaphore.acquire()
if locked:
return failure (ref DatastoreError)(msg: "Should always await query features")