From fed5a906ebb1368717eb62d4ee180e897f5f128b Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Tue, 12 Sep 2023 20:09:55 -0700 Subject: [PATCH] switch to custom sharedptr --- datastore/threadproxyds.nim | 22 ++++++++-------- datastore/threads/databuffer.nim | 6 +++-- datastore/threads/sharedptr.nim | 21 +++++++++------ datastore/threads/threadresults.nim | 3 ++- tests/datastore/testthreadproxyds.nim | 38 ++++++++++++++++----------- 5 files changed, 52 insertions(+), 38 deletions(-) diff --git a/datastore/threadproxyds.nim b/datastore/threadproxyds.nim index 41e86cb..6cce0c1 100644 --- a/datastore/threadproxyds.nim +++ b/datastore/threadproxyds.nim @@ -26,22 +26,21 @@ method has*( key: Key ): Future[?!bool] {.async.} = - let ret = await newThreadResult(bool) + var ret = await newThreadResult(bool) try: has(ret, self.tds, key) await wait(ret[].signal) + return ret.convert(bool) finally: ret.release() - return ret.convert(bool) - method delete*( self: ThreadProxyDatastore, key: Key ): Future[?!void] {.async.} = - let ret = await newThreadResult(void) + var ret = await newThreadResult(void) try: delete(ret, self.tds, key) @@ -72,7 +71,7 @@ method get*( ## probably be switched to use a single ThreadSignal ## for the entire batch - let ret = await newThreadResult(ValueBuffer) + var ret = await newThreadResult(ValueBuffer) try: get(ret, self.tds, key) @@ -88,15 +87,16 @@ method put*( data: seq[byte] ): Future[?!void] {.async.} = - let ret = await newThreadResult(void) + var ret = await newThreadResult(void) try: put(ret, self.tds, key, data) await wait(ret[].signal) - finally: - ret.release() - return ret.convert(void) + return ret.convert(void) + finally: + echo "PUT RELEASE" + ret.release() method put*( self: ThreadProxyDatastore, @@ -121,7 +121,7 @@ method query*( query: Query ): Future[?!QueryIter] {.async.} = - let ret = await newThreadResult(QueryResponseBuffer) + var ret = await newThreadResult(QueryResponseBuffer) # echo "\n\n=== Query Start === " @@ -185,7 +185,7 @@ proc newThreadProxyDatastore*( ## create a new var self = ThreadProxyDatastore() - let value = newSharedPtr(ThreadDatastore) + var value = newSharedPtr(ThreadDatastore) # GC_ref(ds) ## TODO: is this needed? try: diff --git a/datastore/threads/databuffer.nim b/datastore/threads/databuffer.nim index 9cdf5aa..269ec3b 100644 --- a/datastore/threads/databuffer.nim +++ b/datastore/threads/databuffer.nim @@ -1,7 +1,9 @@ -import threading/smartptrs import std/hashes +import ./sharedptr + export hashes +export sharedptr type DataBufferHolder* = object @@ -30,7 +32,7 @@ proc `=destroy`*(x: var DataBufferHolder) = proc len*(a: DataBuffer): int = a[].size -proc isNil*(a: DataBuffer): bool = smartptrs.isNil(a) +proc isNil*(a: DataBuffer): bool = sharedptr.isNil(a) proc hash*(a: DataBuffer): Hash = a[].buf.toOpenArray(0, a[].size-1).hash() diff --git a/datastore/threads/sharedptr.nim b/datastore/threads/sharedptr.nim index 2393902..e46e70f 100644 --- a/datastore/threads/sharedptr.nim +++ b/datastore/threads/sharedptr.nim @@ -23,24 +23,29 @@ type cnt: ptr int val*: ptr T -proc incr*[T](a: SharedPtr[T]) = +proc incr*[T](a: var SharedPtr[T]) = if a.val != nil and a.cnt != nil: let res = atomicAddFetch(a.cnt, 1, ATOMIC_RELAXED) echo "SharedPtr: manual incr: ", res -proc decr*[T](x: SharedPtr[T]) = +proc decr*[T](x: var SharedPtr[T]) = if x.val != nil and x.cnt != nil: let res = atomicSubFetch(x.cnt, 1, ATOMIC_ACQUIRE) if res == 0: echo "SharedPtr: FREE: ", repr x.val.pointer, " ", x.cnt[], " tp: ", $(typeof(T)) + when compiles(`=destroy`(x.val)): + echo "DECR FREE" + `=destroy`(x.val) deallocShared(x.val) deallocShared(x.cnt) + x.val = nil + x.cnt = nil else: echo "SharedPtr: decr: ", repr x.val.pointer, " ", x.cnt[], " tp: ", $(typeof(T)) proc `=destroy`*[T](x: var SharedPtr[T]) = - echo "SharedPtr: destroy: ", repr x.val.pointer, " ", x.cnt.repr, " tp: ", $(typeof(T)) - # echo "SharedPtr: destroy:st: ", ($getStackTrace()).split("\n").join(";") + if x.val != nil: + echo "SharedPtr: destroy: ", repr x.val.pointer, " cnt: ", x.cnt.repr, " tp: ", $(typeof(T)) decr(x) proc `=dup`*[T](src: SharedPtr[T]): SharedPtr[T] = @@ -60,8 +65,8 @@ proc newSharedPtr*[T](val: sink Isolated[T]): SharedPtr[T] {.nodestroy.} = ## ownership of the object by reference counting. result.cnt = cast[ptr int](allocShared0(sizeof(result.cnt))) result.val = cast[typeof(result.val)](allocShared(sizeof(result.val[]))) - result.cnt[] = 1 - result.val.value = extract val + result.cnt[] = 0 + result.val[] = extract val echo "SharedPtr: alloc: ", result.val.pointer.repr, " tp: ", $(typeof(T)) template newSharedPtr*[T](val: T): SharedPtr[T] = @@ -72,8 +77,8 @@ proc newSharedPtr*[T](t: typedesc[T]): SharedPtr[T] = ## so reading from it before writing to it is undefined behaviour! result.cnt = cast[ptr int](allocShared0(sizeof(result.cnt))) result.val = cast[typeof(result.val)](allocShared0(sizeof(result.val[]))) - result.cnt[] = 1 - echo "SharedPtr: allocT: ", result.val.pointer.repr, " tp: ", $(typeof(T)) + result.cnt[] = 0 + echo "SharedPtr: alloc: ", result.val.pointer.repr, " tp: ", $(typeof(T)) proc isNil*[T](p: SharedPtr[T]): bool {.inline.} = p.val == nil diff --git a/datastore/threads/threadresults.nim b/datastore/threads/threadresults.nim index b6e5ea8..12213eb 100644 --- a/datastore/threads/threadresults.nim +++ b/datastore/threads/threadresults.nim @@ -60,9 +60,10 @@ proc newThreadResult*[T]( res[].signal = await getThreadSignal() res -proc release*[T](res: TResult[T]) {.raises: [].} = +proc release*[T](res: var TResult[T]) {.raises: [].} = ## release TResult and it's ThreadSignal res[].signal.release() + res.decr() proc success*[T](ret: TResult[T], value: T) = ## convenience wrapper for `TResult` to replicate diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index 74db804..1821703 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -16,24 +16,30 @@ import ./querycommontests # import pretty -suite "Test Basic ThreadProxyDatastore": - var - sds: ThreadProxyDatastore - mem: MemoryDatastore - key1: Key - data: seq[byte] +proc threadTest() = + suite "Test Basic ThreadProxyDatastore": + var + sds: ThreadProxyDatastore + mem: MemoryDatastore + key1: Key + data: seq[byte] - setupAll: - mem = MemoryDatastore.new() - sds = newThreadProxyDatastore(mem).expect("should work") - key1 = Key.init("/a").tryGet - data = "value for 1".toBytes() + setupAll: + mem = MemoryDatastore.new() + sds = newThreadProxyDatastore(mem).expect("should work") + key1 = Key.init("/a").tryGet + data = "value for 1".toBytes() + + test "check put": + # echo "\n\n=== put ===" + let res1 = await sds.put(key1, data) + check res1.isOk + # print "res1: ", res1 + GC_fullCollect() + +threadTest() +GC_fullCollect() - test "check put": - # echo "\n\n=== put ===" - let res1 = await sds.put(key1, data) - check res1.isOk - # print "res1: ", res1 # test "check get": # # echo "\n\n=== get ==="