mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-05 15:13:14 +00:00
switch to custom sharedptr
This commit is contained in:
parent
97d4e68f5d
commit
0b5534f3f9
@ -30,7 +30,7 @@ method has*(
|
||||
|
||||
try:
|
||||
has(ret, self.tds, key)
|
||||
await wait(ret[].signal)
|
||||
await wait(ret)
|
||||
return ret.convert(bool)
|
||||
finally:
|
||||
ret.release()
|
||||
@ -44,7 +44,7 @@ method delete*(
|
||||
|
||||
try:
|
||||
delete(ret, self.tds, key)
|
||||
await wait(ret[].signal)
|
||||
await wait(ret)
|
||||
finally:
|
||||
ret.release()
|
||||
|
||||
@ -75,7 +75,7 @@ method get*(
|
||||
|
||||
try:
|
||||
get(ret, self.tds, key)
|
||||
await wait(ret[].signal)
|
||||
await wait(ret)
|
||||
finally:
|
||||
ret.release()
|
||||
|
||||
@ -91,7 +91,7 @@ method put*(
|
||||
|
||||
try:
|
||||
put(ret, self.tds, key, data)
|
||||
await wait(ret[].signal)
|
||||
await wait(ret)
|
||||
|
||||
return ret.convert(void)
|
||||
finally:
|
||||
@ -143,7 +143,7 @@ method query*(
|
||||
if not iter[].it.finished:
|
||||
iterWrapper.readyForNext = false
|
||||
query(ret, self.tds, iter)
|
||||
await wait(ret[].signal)
|
||||
await wait(ret)
|
||||
iterWrapper.readyForNext = true
|
||||
# echo ""
|
||||
# print "query:post: ", ret[].results
|
||||
|
||||
@ -32,7 +32,7 @@ proc decr*[T](x: var SharedPtr[T]) =
|
||||
if x.val != nil and x.cnt != nil:
|
||||
let res = atomicSubFetch(x.cnt, 1, ATOMIC_ACQUIRE)
|
||||
if res == 0:
|
||||
echo "SharedPtr: FREE: ", repr x.val.pointer, " ", x.cnt[], " tp: ", $(typeof(T))
|
||||
echo "SharedPtr: FREE: ", repr x.val.pointer, " ", x[].cnt[], " tp: ", $(typeof(T))
|
||||
when compiles(`=destroy`(x.val)):
|
||||
echo "DECR FREE"
|
||||
`=destroy`(x.val)
|
||||
@ -50,7 +50,7 @@ proc release*[T](x: var SharedPtr[T]) =
|
||||
|
||||
proc `=destroy`*[T](x: var SharedPtr[T]) =
|
||||
if x.val != nil:
|
||||
echo "SharedPtr: destroy: ", repr x.val.pointer, " cnt: ", x.cnt.repr, " tp: ", $(typeof(T))
|
||||
echo "SharedPtr: destroy: ", repr x.val.pointer.repr, " cnt: ", x.cnt.pointer.repr, " tp: ", $(typeof(T))
|
||||
decr(x)
|
||||
|
||||
proc `=dup`*[T](src: SharedPtr[T]): SharedPtr[T] =
|
||||
|
||||
@ -81,7 +81,7 @@ proc hasTask*(
|
||||
ret.failure(res.error())
|
||||
else:
|
||||
ret.success(res.get())
|
||||
discard ret[].signal.fireSync()
|
||||
discard ret.fireSync()
|
||||
except CatchableError as err:
|
||||
ret.failure(err)
|
||||
|
||||
@ -108,7 +108,7 @@ proc getTask*(
|
||||
let db = DataBuffer.new res.get()
|
||||
ret.success(db)
|
||||
|
||||
discard ret[].signal.fireSync()
|
||||
discard ret.fireSync()
|
||||
except CatchableError as err:
|
||||
ret.failure(err)
|
||||
|
||||
@ -139,7 +139,7 @@ proc putTask*(
|
||||
else:
|
||||
ret.success()
|
||||
|
||||
discard ret[].signal.fireSync()
|
||||
discard ret.fireSync()
|
||||
|
||||
proc put*(
|
||||
ret: TResult[void],
|
||||
@ -169,7 +169,7 @@ proc deleteTask*(
|
||||
else:
|
||||
ret.success()
|
||||
|
||||
discard ret[].signal.fireSync()
|
||||
discard ret.fireSync()
|
||||
|
||||
# import pretty
|
||||
|
||||
@ -204,7 +204,7 @@ proc queryTask*(
|
||||
except Exception as exc:
|
||||
ret.failure(exc)
|
||||
|
||||
discard ret[].signal.fireSync()
|
||||
discard ret.fireSync()
|
||||
|
||||
proc query*(
|
||||
ret: TResult[QueryResponseBuffer],
|
||||
|
||||
@ -8,6 +8,7 @@ import ./sharedptr
|
||||
import ./databuffer
|
||||
import ./threadsignalpool
|
||||
|
||||
export threadsignalpool
|
||||
export databuffer
|
||||
export sharedptr
|
||||
export threadsync
|
||||
@ -22,7 +23,7 @@ type
|
||||
## Encapsulates both the results from a thread but also the cross
|
||||
## thread signaling mechanism. This makes it easier to keep them
|
||||
## together.
|
||||
signal*: ThreadSignalPtr
|
||||
sig*: SharedSignal
|
||||
results*: Result[T, CatchableErrorBuffer]
|
||||
|
||||
TResult*[T] = SharedPtr[ThreadResult[T]] ##\
|
||||
@ -57,13 +58,17 @@ proc newThreadResult*[T](
|
||||
{.error: "only thread safe types can be used".}
|
||||
|
||||
let res = newSharedPtr(ThreadResult[T])
|
||||
res[].signal = await getThreadSignal()
|
||||
res[].sig = await SharedSignal.new()
|
||||
res
|
||||
|
||||
proc release*[T](res: var TResult[T]) {.raises: [].} =
|
||||
## release TResult and it's ThreadSignal
|
||||
res[].signal.release()
|
||||
# res[].signal.release()
|
||||
sharedptr.release(res)
|
||||
proc wait*[T](res: TResult[T]): Future[void] =
|
||||
res[].sig.wait()
|
||||
proc fireSync*[T](res: TResult[T]): Result[bool, string] =
|
||||
res[].sig.fireSync()
|
||||
|
||||
proc success*[T](ret: TResult[T], value: T) =
|
||||
## convenience wrapper for `TResult` to replicate
|
||||
|
||||
@ -76,3 +76,26 @@ proc release*(sig: ThreadSignalPtr) {.raises: [].} =
|
||||
signalPoolUsed.excl(sig)
|
||||
signalPoolFree.incl(sig)
|
||||
# echo "free:signalPoolUsed:size: ", signalPoolUsed.len()
|
||||
|
||||
|
||||
type
|
||||
SharedSignalObj* = object
|
||||
sigptr*: ThreadSignalPtr
|
||||
|
||||
SharedSignal* = SharedPtr[SharedSignalObj]
|
||||
|
||||
proc `=destroy`*[T](x: var SharedSignalObj) =
|
||||
if x.sigptr != nil:
|
||||
echo "ThreadSignalObj: destroy "
|
||||
release(x.sigptr)
|
||||
x.sigptr = nil
|
||||
|
||||
proc new*(tp: typedesc[SharedSignal]): Future[SharedSignal] {.async.} =
|
||||
result = newSharedPtr[SharedSignalObj](SharedSignalObj)
|
||||
result[].sigptr = await getThreadSignal()
|
||||
|
||||
proc wait*(sig: SharedSignal): Future[void] =
|
||||
sig[].sigptr.wait()
|
||||
|
||||
proc fireSync*(sig: SharedSignal): Result[bool, string] =
|
||||
sig[].sigptr.fireSync()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user