diff --git a/datastore/threads/databuffer.nim b/datastore/threads/databuffer.nim index be1f0ca..66bb7e6 100644 --- a/datastore/threads/databuffer.nim +++ b/datastore/threads/databuffer.nim @@ -1,5 +1,6 @@ import threading/smartptrs import std/hashes +import pkg/stew/ptrops export hashes @@ -50,7 +51,9 @@ proc new*[T: byte | char](tp: type DataBuffer, data: openArray[T]): DataBuffer = ## result = DataBuffer.new(data.len) if data.len() > 0: - copyMem(result[].buf, unsafeAddr data[0], data.len) + # TODO: we might want to copy data, otherwise the GC might + # release it on stack-unwind + copyMem(result[].buf, baseAddr data, data.len) converter toSeq*(self: DataBuffer): seq[byte] = ## convert buffer to a seq type using copy and either a byte or char @@ -58,7 +61,7 @@ converter toSeq*(self: DataBuffer): seq[byte] = result = newSeq[byte](self.len) if self.len() > 0: - copyMem(addr result[0], unsafeAddr self[].buf[0], self.len) + copyMem(addr result[0], addr self[].buf[0], self.len) proc `@`*(self: DataBuffer): seq[byte] = ## Convert a buffer to a seq type using copy and @@ -74,7 +77,7 @@ converter toString*(data: DataBuffer): string = if data.isNil: return "" result = newString(data.len()) if data.len() > 0: - copyMem(addr result[0], unsafeAddr data[].buf[0], data.len) + copyMem(addr result[0], addr data[].buf[0], data.len) proc `$`*(data: DataBuffer): string = ## convert buffer to string type using copy diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 2439e8c..f8c972b 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -81,7 +81,6 @@ template withLocks( if self.withLocks: await self.queryLock.acquire() # only lock if it's required (fsds) - body finally: if self.withLocks: @@ -90,35 +89,41 @@ template withLocks( if self.queryLock.locked: self.queryLock.release() +# TODO: needs rework, we can't use `result` with async template dispatchTask( self: ThreadDatastore, ctx: ref TaskCtx, key: ?Key = Key.none, runTask: proc): untyped = + try: + await self.semaphore.acquire() + ctx.signal = ThreadSignalPtr.new().valueOr: + result = failure(error()) + return - let - fut = wait(ctx.signal) + let + fut = wait(ctx.signal) - withLocks(self, ctx, key, fut): - try: + withLocks(self, ctx, key, fut): runTask() await fut if ctx.res.isErr: result = failure(ctx.res.error()) # TODO: fix this, result shouldn't be accessed - except CancelledError as exc: - trace "Cancelling thread future!", exc = exc.msg - if ctx.isActive.load(moAcquireRelease): - # could do a spinlock here until the other side cancels, - # but for now it'd at least be better to leak than possibly - # corrupt memory since it's easier to detect and fix leaks - warn "request was cancelled while thread task is running", exc = exc.msg - GC_ref(ctx) - ctx.cancelled.store(true, moAcquireRelease) - await ctx.signal.fire() - raise exc - finally: - discard ctx.signal.close() + except CancelledError as exc: + trace "Cancelling thread future!", exc = exc.msg + if ctx.isActive.load(moAcquireRelease): + # could do a spinlock here until the other side cancels, + # but for now it'd at least be better to leak than possibly + # corrupt memory since it's easier to detect and fix leaks + warn "request was cancelled while thread task is running", exc = exc.msg + GC_ref(ctx) + ctx.cancelled.store(true, moAcquireRelease) + await ctx.signal.fire() + raise exc + finally: + discard ctx.signal.close() + self.semaphore.release() proc signalMonitor[T](ctx: ptr TaskCtx, fut: Future[T]) {.async.} = ## Monitor the signal and cancel the future if @@ -167,11 +172,6 @@ proc hasTask(ctx: ptr TaskCtx, key: ptr Key) = raiseAssert exc.msg method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} = - defer: - self.semaphore.release() - - await self.semaphore.acquire() - var signal = ThreadSignalPtr.new().valueOr: return failure(error()) @@ -212,11 +212,6 @@ proc delTask(ctx: ptr TaskCtx, key: ptr Key) = method delete*( self: ThreadDatastore, key: Key): Future[?!void] {.async.} = - defer: - self.semaphore.release() - - await self.semaphore.acquire() - var signal = ThreadSignalPtr.new().valueOr: return failure(error()) @@ -278,11 +273,6 @@ method put*( self: ThreadDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} = - defer: - self.semaphore.release() - - await self.semaphore.acquire() - var signal = ThreadSignalPtr.new().valueOr: return failure(error()) @@ -343,15 +333,6 @@ proc getTask( method get*( self: ThreadDatastore, key: Key): Future[?!seq[byte]] {.async.} = - defer: - self.semaphore.release() - - await self.semaphore.acquire() - - var - signal = ThreadSignalPtr.new().valueOr: - return failure(error()) - var ctx = TaskCtx[DataBuffer].new( ds= self.ds, @@ -420,10 +401,8 @@ method query*( proc next(): Future[?!QueryResponse] {.async.} = defer: locked = false - self.semaphore.release() trace "About to query" - await self.semaphore.acquire() if locked: return failure (ref DatastoreError)(msg: "Should always await query features")