mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-05 23:23:10 +00:00
switch to custom sharedptr
This commit is contained in:
parent
9c7c8393bd
commit
fed5a906eb
@ -26,22 +26,21 @@ method has*(
|
||||
key: Key
|
||||
): Future[?!bool] {.async.} =
|
||||
|
||||
let ret = await newThreadResult(bool)
|
||||
var ret = await newThreadResult(bool)
|
||||
|
||||
try:
|
||||
has(ret, self.tds, key)
|
||||
await wait(ret[].signal)
|
||||
return ret.convert(bool)
|
||||
finally:
|
||||
ret.release()
|
||||
|
||||
return ret.convert(bool)
|
||||
|
||||
method delete*(
|
||||
self: ThreadProxyDatastore,
|
||||
key: Key
|
||||
): Future[?!void] {.async.} =
|
||||
|
||||
let ret = await newThreadResult(void)
|
||||
var ret = await newThreadResult(void)
|
||||
|
||||
try:
|
||||
delete(ret, self.tds, key)
|
||||
@ -72,7 +71,7 @@ method get*(
|
||||
## probably be switched to use a single ThreadSignal
|
||||
## for the entire batch
|
||||
|
||||
let ret = await newThreadResult(ValueBuffer)
|
||||
var ret = await newThreadResult(ValueBuffer)
|
||||
|
||||
try:
|
||||
get(ret, self.tds, key)
|
||||
@ -88,15 +87,16 @@ method put*(
|
||||
data: seq[byte]
|
||||
): Future[?!void] {.async.} =
|
||||
|
||||
let ret = await newThreadResult(void)
|
||||
var ret = await newThreadResult(void)
|
||||
|
||||
try:
|
||||
put(ret, self.tds, key, data)
|
||||
await wait(ret[].signal)
|
||||
finally:
|
||||
ret.release()
|
||||
|
||||
return ret.convert(void)
|
||||
finally:
|
||||
echo "PUT RELEASE"
|
||||
ret.release()
|
||||
|
||||
method put*(
|
||||
self: ThreadProxyDatastore,
|
||||
@ -121,7 +121,7 @@ method query*(
|
||||
query: Query
|
||||
): Future[?!QueryIter] {.async.} =
|
||||
|
||||
let ret = await newThreadResult(QueryResponseBuffer)
|
||||
var ret = await newThreadResult(QueryResponseBuffer)
|
||||
|
||||
# echo "\n\n=== Query Start === "
|
||||
|
||||
@ -185,7 +185,7 @@ proc newThreadProxyDatastore*(
|
||||
## create a new
|
||||
|
||||
var self = ThreadProxyDatastore()
|
||||
let value = newSharedPtr(ThreadDatastore)
|
||||
var value = newSharedPtr(ThreadDatastore)
|
||||
# GC_ref(ds) ## TODO: is this needed?
|
||||
|
||||
try:
|
||||
|
||||
@ -1,7 +1,9 @@
|
||||
import threading/smartptrs
|
||||
import std/hashes
|
||||
|
||||
import ./sharedptr
|
||||
|
||||
export hashes
|
||||
export sharedptr
|
||||
|
||||
type
|
||||
DataBufferHolder* = object
|
||||
@ -30,7 +32,7 @@ proc `=destroy`*(x: var DataBufferHolder) =
|
||||
|
||||
proc len*(a: DataBuffer): int = a[].size
|
||||
|
||||
proc isNil*(a: DataBuffer): bool = smartptrs.isNil(a)
|
||||
proc isNil*(a: DataBuffer): bool = sharedptr.isNil(a)
|
||||
|
||||
proc hash*(a: DataBuffer): Hash =
|
||||
a[].buf.toOpenArray(0, a[].size-1).hash()
|
||||
|
||||
@ -23,24 +23,29 @@ type
|
||||
cnt: ptr int
|
||||
val*: ptr T
|
||||
|
||||
proc incr*[T](a: SharedPtr[T]) =
|
||||
proc incr*[T](a: var SharedPtr[T]) =
|
||||
if a.val != nil and a.cnt != nil:
|
||||
let res = atomicAddFetch(a.cnt, 1, ATOMIC_RELAXED)
|
||||
echo "SharedPtr: manual incr: ", res
|
||||
|
||||
proc decr*[T](x: SharedPtr[T]) =
|
||||
proc decr*[T](x: var SharedPtr[T]) =
|
||||
if x.val != nil and x.cnt != nil:
|
||||
let res = atomicSubFetch(x.cnt, 1, ATOMIC_ACQUIRE)
|
||||
if res == 0:
|
||||
echo "SharedPtr: FREE: ", repr x.val.pointer, " ", x.cnt[], " tp: ", $(typeof(T))
|
||||
when compiles(`=destroy`(x.val)):
|
||||
echo "DECR FREE"
|
||||
`=destroy`(x.val)
|
||||
deallocShared(x.val)
|
||||
deallocShared(x.cnt)
|
||||
x.val = nil
|
||||
x.cnt = nil
|
||||
else:
|
||||
echo "SharedPtr: decr: ", repr x.val.pointer, " ", x.cnt[], " tp: ", $(typeof(T))
|
||||
|
||||
proc `=destroy`*[T](x: var SharedPtr[T]) =
|
||||
echo "SharedPtr: destroy: ", repr x.val.pointer, " ", x.cnt.repr, " tp: ", $(typeof(T))
|
||||
# echo "SharedPtr: destroy:st: ", ($getStackTrace()).split("\n").join(";")
|
||||
if x.val != nil:
|
||||
echo "SharedPtr: destroy: ", repr x.val.pointer, " cnt: ", x.cnt.repr, " tp: ", $(typeof(T))
|
||||
decr(x)
|
||||
|
||||
proc `=dup`*[T](src: SharedPtr[T]): SharedPtr[T] =
|
||||
@ -60,8 +65,8 @@ proc newSharedPtr*[T](val: sink Isolated[T]): SharedPtr[T] {.nodestroy.} =
|
||||
## ownership of the object by reference counting.
|
||||
result.cnt = cast[ptr int](allocShared0(sizeof(result.cnt)))
|
||||
result.val = cast[typeof(result.val)](allocShared(sizeof(result.val[])))
|
||||
result.cnt[] = 1
|
||||
result.val.value = extract val
|
||||
result.cnt[] = 0
|
||||
result.val[] = extract val
|
||||
echo "SharedPtr: alloc: ", result.val.pointer.repr, " tp: ", $(typeof(T))
|
||||
|
||||
template newSharedPtr*[T](val: T): SharedPtr[T] =
|
||||
@ -72,8 +77,8 @@ proc newSharedPtr*[T](t: typedesc[T]): SharedPtr[T] =
|
||||
## so reading from it before writing to it is undefined behaviour!
|
||||
result.cnt = cast[ptr int](allocShared0(sizeof(result.cnt)))
|
||||
result.val = cast[typeof(result.val)](allocShared0(sizeof(result.val[])))
|
||||
result.cnt[] = 1
|
||||
echo "SharedPtr: allocT: ", result.val.pointer.repr, " tp: ", $(typeof(T))
|
||||
result.cnt[] = 0
|
||||
echo "SharedPtr: alloc: ", result.val.pointer.repr, " tp: ", $(typeof(T))
|
||||
|
||||
proc isNil*[T](p: SharedPtr[T]): bool {.inline.} =
|
||||
p.val == nil
|
||||
|
||||
@ -60,9 +60,10 @@ proc newThreadResult*[T](
|
||||
res[].signal = await getThreadSignal()
|
||||
res
|
||||
|
||||
proc release*[T](res: TResult[T]) {.raises: [].} =
|
||||
proc release*[T](res: var TResult[T]) {.raises: [].} =
|
||||
## release TResult and it's ThreadSignal
|
||||
res[].signal.release()
|
||||
res.decr()
|
||||
|
||||
proc success*[T](ret: TResult[T], value: T) =
|
||||
## convenience wrapper for `TResult` to replicate
|
||||
|
||||
@ -16,7 +16,8 @@ import ./querycommontests
|
||||
|
||||
# import pretty
|
||||
|
||||
suite "Test Basic ThreadProxyDatastore":
|
||||
proc threadTest() =
|
||||
suite "Test Basic ThreadProxyDatastore":
|
||||
var
|
||||
sds: ThreadProxyDatastore
|
||||
mem: MemoryDatastore
|
||||
@ -34,6 +35,11 @@ suite "Test Basic ThreadProxyDatastore":
|
||||
let res1 = await sds.put(key1, data)
|
||||
check res1.isOk
|
||||
# print "res1: ", res1
|
||||
GC_fullCollect()
|
||||
|
||||
threadTest()
|
||||
GC_fullCollect()
|
||||
|
||||
|
||||
# test "check get":
|
||||
# # echo "\n\n=== get ==="
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user