From ccd8575a7d8a73113e88b70f215d9cded5d7b4f6 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Tue, 19 Sep 2023 20:16:27 -0700 Subject: [PATCH] refactor to callbacks --- datastore/datastore2.nim | 24 ++++++++++---- datastore/memoryds.nim | 15 ++++----- datastore/threadproxyds.nim | 18 +++++----- datastore/threads/threadbackend.nim | 38 +++++++++++----------- tests/datastore/testthreadproxyds.nim | 47 +++++++++++++-------------- 5 files changed, 76 insertions(+), 66 deletions(-) diff --git a/datastore/datastore2.nim b/datastore/datastore2.nim index d6f564c..f9e9f99 100644 --- a/datastore/datastore2.nim +++ b/datastore/datastore2.nim @@ -6,10 +6,22 @@ import threads/databuffer push: {.upraises: [].} type - Datastore2* = object of RootObj - has*: proc(self: Datastore2, key: KeyBuffer): ?!bool {.nimcall.} - delete*: proc(self: Datastore2, key: KeyBuffer): ?!void {.nimcall.} - get*: proc(self: Datastore2, key: KeyBuffer): ?!ValueBuffer {.nimcall.} - put*: proc(self: Datastore2, key: KeyBuffer, data: ValueBuffer): ?!void {.nimcall.} - close*: proc(self: Datastore2): ?!void {.nimcall.} + Datastore2*[T] = object of RootObj + has*: proc(self: var T, key: KeyBuffer): ?!bool {.nimcall, gcsafe, raises: [].} + delete*: proc(self: var T, key: KeyBuffer): ?!void {.nimcall.} + get*: proc(self: var T, key: KeyBuffer): ?!ValueBuffer {.nimcall.} + put*: proc(self: var T, key: KeyBuffer, data: ValueBuffer): ?!void {.nimcall.} + close*: proc(self: var T): ?!void {.nimcall.} + ds*: T +proc has*[T](self: var Datastore2[T], key: KeyBuffer): ?!bool = + self.has(self.ds, key) + +proc delete*[T](self: var Datastore2[T], key: KeyBuffer): ?!void {.nimcall.} = + self.delete(self.ds, key) +proc get*[T](self: var Datastore2[T], key: KeyBuffer): ?!ValueBuffer {.nimcall.} = + self.get(self.ds, key) +proc put*[T](self: var Datastore2[T], key: KeyBuffer, data: ValueBuffer): ?!void {.nimcall.} = + self.put(self.ds, key, data) +proc close*[T](self: var Datastore2[T]): ?!void {.nimcall.} = + self.close(self.ds) diff --git a/datastore/memoryds.nim b/datastore/memoryds.nim index 4437028..a93b3c7 100644 --- a/datastore/memoryds.nim +++ b/datastore/memoryds.nim @@ -21,14 +21,11 @@ export key, query push: {.upraises: [].} type - MemoryDatastore* = object of Datastore2 + MemoryDatastore* = object lock*: Lock store*: SimpleTable[10_000] -proc has*( - self: var MemoryDatastore, - key: KeyBuffer -): ?!bool = +proc has*(self: var MemoryDatastore, key: KeyBuffer): ?!bool = withLock(self.lock): let res: bool = self.store.hasKey(key) @@ -58,7 +55,7 @@ proc put*( self: var MemoryDatastore, key: KeyBuffer, data: ValueBuffer -): Future[?!void] {.async.} = +): ?!void = withLock(self.lock): self.store[key] = data @@ -68,9 +65,9 @@ proc close*(self: var MemoryDatastore): ?!void = self.store.clear() return success() -func new*(tp: typedesc[MemoryDatastore]): MemoryDatastore = - var self = tp() - self.lock.initLock() +func initMemoryDatastore*(): Datastore2[MemoryDatastore] = + var self = Datastore2[MemoryDatastore]() + self.ds.lock.initLock() self.has = has self.delete = delete self.get = get diff --git a/datastore/threadproxyds.nim b/datastore/threadproxyds.nim index bbc1850..fac83b5 100644 --- a/datastore/threadproxyds.nim +++ b/datastore/threadproxyds.nim @@ -11,15 +11,17 @@ import pkg/stew/results import ./key import ./query import ./datastore +import ./datastore2 +import ./threads/sharedptr import ./threads/threadbackend -export key, query +export key, query, sharedptr push: {.upraises: [].} type - ThreadProxyDatastore* = ref object of Datastore - tds: ThreadDatastorePtr + ThreadProxyDatastore*[T] = ref object of Datastore + tds: SharedPtr[ThreadDatastore[T]] method has*( self: ThreadProxyDatastore, @@ -199,13 +201,13 @@ method close*( echo "close done" # dispose(dsCell) -proc newThreadProxyDatastore*( - ds: Datastore, -): ?!ThreadProxyDatastore = +proc newThreadProxyDatastore*[T]( + ds: Datastore2[T], +): ?!ThreadProxyDatastore[T] = ## create a new - var self = ThreadProxyDatastore() - var value = newSharedPtr(ThreadDatastore) + var self = ThreadProxyDatastore[T]() + var value = newSharedPtr(ThreadDatastore[T]) # let dsCell = protect(cast[pointer](ds)) # GC_ref(ds) ## TODO: is this needed? diff --git a/datastore/threads/threadbackend.nim b/datastore/threads/threadbackend.nim index 4dc2f22..26bcf47 100644 --- a/datastore/threads/threadbackend.nim +++ b/datastore/threads/threadbackend.nim @@ -9,7 +9,8 @@ import pkg/taskpools import ./sharedptr import ../key import ../query -import ./datastore +import ../datastore +import ../datastore2 import ./databuffer import ./threadresults @@ -57,20 +58,19 @@ push: {.upraises: [].} ## type - ThreadDatastore* = object + ThreadDatastore*[T] = object tp*: Taskpool - ds*: Datastore + ds*: Datastore2[T] - ThreadDatastorePtr* = SharedPtr[ThreadDatastore] QueryIterStore* = object it*: QueryIter QueryIterPtr* = SharedPtr[QueryIterStore] -proc hasTask*( +proc hasTask*[T]( sig: SharedSignal, ret: TResult[bool], - tds: ThreadDatastorePtr, + tds: SharedPtr[ThreadDatastore[T]], kb: KeyBuffer, ) = @@ -86,10 +86,10 @@ proc hasTask*( except CatchableError as err: ret.failure(err) -proc deleteTask*( +proc deleteTask*[T]( sig: SharedSignal, ret: TResult[void], - tds: ThreadDatastorePtr, + tds: SharedPtr[ThreadDatastore[T]], kb: KeyBuffer, ) = @@ -104,10 +104,10 @@ proc deleteTask*( discard sig.fireSync() -proc getTask*( +proc getTask*[T]( sig: SharedSignal, ret: TResult[DataBuffer], - tds: ThreadDatastorePtr, + tds: SharedPtr[ThreadDatastore[T]], kb: KeyBuffer, ) = echoed "getTask: ", $getThreadId(), " kb: ", kb.repr @@ -127,10 +127,10 @@ proc getTask*( import std/os -proc putTask*( +proc putTask*[T]( sig: SharedSignal, ret: TResult[void], - tds: ThreadDatastorePtr, + tds: SharedPtr[ThreadDatastore[T]], kb: KeyBuffer, db: DataBuffer, ) = @@ -141,10 +141,10 @@ proc putTask*( # echo "putTask:kb: ", kb.toString # echo "putTask:db: ", db.toString - let key = kb.toKey() + let key = kb - let data = db.toSeq(byte) - let res = (waitFor tds[].ds.put(key, data)).catch + let data = db + let res = tds[].ds.put(tds[].ds, key, data) # print "thrbackend: putTask: fire", ret[].signal.fireSync().get() if res.isErr: ret.failure(res.error()) @@ -155,10 +155,10 @@ proc putTask*( sig.decr() echoed "putTask: FINISH\n" -proc queryTask*( +proc queryTask*[T]( sig: SharedSignal, ret: TResult[QueryResponseBuffer], - tds: ThreadDatastorePtr, + tds: SharedPtr[ThreadDatastore[T]], qiter: QueryIterPtr, ) = @@ -179,10 +179,10 @@ proc queryTask*( discard sig.fireSync() -proc query*( +proc query*[T]( sig: SharedSignal, ret: TResult[QueryResponseBuffer], - tds: ThreadDatastorePtr, + tds: SharedPtr[ThreadDatastore[T]], qiter: QueryIterPtr, ) = tds[].tp.spawn queryTask(sig, ret, tds, qiter) diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index 2290639..e651ce8 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -16,17 +16,16 @@ import ./querycommontests import pretty - proc testThreadProxy() = suite "Test Basic ThreadProxyDatastore": var - sds: ThreadProxyDatastore + sds: ThreadProxyDatastore[MemoryDatastore] mem: MemoryDatastore key1: Key data: seq[byte] setupAll: - mem = MemoryDatastore.new() + mem = initMemoryDatastore() sds = newThreadProxyDatastore(mem).expect("should work") key1 = Key.init("/a").tryGet data = "value for 1".toBytes() @@ -73,34 +72,34 @@ proc testThreadProxyBasics() = basicStoreTests(sds, key, bytes, otherBytes) -proc testThreadProxyQuery() = - suite "Test Query": - var - mem: MemoryDatastore - sds: ThreadProxyDatastore +# proc testThreadProxyQuery() = +# suite "Test Query": +# var +# mem: MemoryDatastore +# sds: ThreadProxyDatastore - setup: - mem = MemoryDatastore.new() - sds = newThreadProxyDatastore(mem).expect("should work") +# setup: +# mem = MemoryDatastore.new() +# sds = newThreadProxyDatastore(mem).expect("should work") - queryTests(sds, false) +# queryTests(sds, false) - test "query iter fails": +# test "query iter fails": - expect FutureDefect: - let q = Query.init(key1) +# expect FutureDefect: +# let q = Query.init(key1) - (await sds.put(key1, val1)).tryGet - (await sds.put(key2, val2)).tryGet - (await sds.put(key3, val3)).tryGet +# (await sds.put(key1, val1)).tryGet +# (await sds.put(key2, val2)).tryGet +# (await sds.put(key3, val3)).tryGet - let - iter = (await sds.query(q)).tryGet - res = (await allFinished(toSeq(iter))) - .mapIt(it.read.tryGet) - .filterIt(it.key.isSome) +# let +# iter = (await sds.query(q)).tryGet +# res = (await allFinished(toSeq(iter))) +# .mapIt(it.read.tryGet) +# .filterIt(it.key.isSome) - check res.len() > 0 +# check res.len() > 0 when isMainModule: for i in 1..100: