refactor to callbacks

This commit is contained in:
Jaremy Creechley 2023-09-19 20:16:27 -07:00
parent f204207606
commit ccd8575a7d
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
5 changed files with 76 additions and 66 deletions

View File

@ -6,10 +6,22 @@ import threads/databuffer
push: {.upraises: [].}
type
Datastore2* = object of RootObj
has*: proc(self: Datastore2, key: KeyBuffer): ?!bool {.nimcall.}
delete*: proc(self: Datastore2, key: KeyBuffer): ?!void {.nimcall.}
get*: proc(self: Datastore2, key: KeyBuffer): ?!ValueBuffer {.nimcall.}
put*: proc(self: Datastore2, key: KeyBuffer, data: ValueBuffer): ?!void {.nimcall.}
close*: proc(self: Datastore2): ?!void {.nimcall.}
Datastore2*[T] = object of RootObj
has*: proc(self: var T, key: KeyBuffer): ?!bool {.nimcall, gcsafe, raises: [].}
delete*: proc(self: var T, key: KeyBuffer): ?!void {.nimcall.}
get*: proc(self: var T, key: KeyBuffer): ?!ValueBuffer {.nimcall.}
put*: proc(self: var T, key: KeyBuffer, data: ValueBuffer): ?!void {.nimcall.}
close*: proc(self: var T): ?!void {.nimcall.}
ds*: T
proc has*[T](self: var Datastore2[T], key: KeyBuffer): ?!bool =
self.has(self.ds, key)
proc delete*[T](self: var Datastore2[T], key: KeyBuffer): ?!void {.nimcall.} =
self.delete(self.ds, key)
proc get*[T](self: var Datastore2[T], key: KeyBuffer): ?!ValueBuffer {.nimcall.} =
self.get(self.ds, key)
proc put*[T](self: var Datastore2[T], key: KeyBuffer, data: ValueBuffer): ?!void {.nimcall.} =
self.put(self.ds, key, data)
proc close*[T](self: var Datastore2[T]): ?!void {.nimcall.} =
self.close(self.ds)

View File

@ -21,14 +21,11 @@ export key, query
push: {.upraises: [].}
type
MemoryDatastore* = object of Datastore2
MemoryDatastore* = object
lock*: Lock
store*: SimpleTable[10_000]
proc has*(
self: var MemoryDatastore,
key: KeyBuffer
): ?!bool =
proc has*(self: var MemoryDatastore, key: KeyBuffer): ?!bool =
withLock(self.lock):
let res: bool = self.store.hasKey(key)
@ -58,7 +55,7 @@ proc put*(
self: var MemoryDatastore,
key: KeyBuffer,
data: ValueBuffer
): Future[?!void] {.async.} =
): ?!void =
withLock(self.lock):
self.store[key] = data
@ -68,9 +65,9 @@ proc close*(self: var MemoryDatastore): ?!void =
self.store.clear()
return success()
func new*(tp: typedesc[MemoryDatastore]): MemoryDatastore =
var self = tp()
self.lock.initLock()
func initMemoryDatastore*(): Datastore2[MemoryDatastore] =
var self = Datastore2[MemoryDatastore]()
self.ds.lock.initLock()
self.has = has
self.delete = delete
self.get = get

View File

@ -11,15 +11,17 @@ import pkg/stew/results
import ./key
import ./query
import ./datastore
import ./datastore2
import ./threads/sharedptr
import ./threads/threadbackend
export key, query
export key, query, sharedptr
push: {.upraises: [].}
type
ThreadProxyDatastore* = ref object of Datastore
tds: ThreadDatastorePtr
ThreadProxyDatastore*[T] = ref object of Datastore
tds: SharedPtr[ThreadDatastore[T]]
method has*(
self: ThreadProxyDatastore,
@ -199,13 +201,13 @@ method close*(
echo "close done"
# dispose(dsCell)
proc newThreadProxyDatastore*(
ds: Datastore,
): ?!ThreadProxyDatastore =
proc newThreadProxyDatastore*[T](
ds: Datastore2[T],
): ?!ThreadProxyDatastore[T] =
## create a new
var self = ThreadProxyDatastore()
var value = newSharedPtr(ThreadDatastore)
var self = ThreadProxyDatastore[T]()
var value = newSharedPtr(ThreadDatastore[T])
# let dsCell = protect(cast[pointer](ds))
# GC_ref(ds) ## TODO: is this needed?

View File

@ -9,7 +9,8 @@ import pkg/taskpools
import ./sharedptr
import ../key
import ../query
import ./datastore
import ../datastore
import ../datastore2
import ./databuffer
import ./threadresults
@ -57,20 +58,19 @@ push: {.upraises: [].}
##
type
ThreadDatastore* = object
ThreadDatastore*[T] = object
tp*: Taskpool
ds*: Datastore
ds*: Datastore2[T]
ThreadDatastorePtr* = SharedPtr[ThreadDatastore]
QueryIterStore* = object
it*: QueryIter
QueryIterPtr* = SharedPtr[QueryIterStore]
proc hasTask*(
proc hasTask*[T](
sig: SharedSignal,
ret: TResult[bool],
tds: ThreadDatastorePtr,
tds: SharedPtr[ThreadDatastore[T]],
kb: KeyBuffer,
) =
@ -86,10 +86,10 @@ proc hasTask*(
except CatchableError as err:
ret.failure(err)
proc deleteTask*(
proc deleteTask*[T](
sig: SharedSignal,
ret: TResult[void],
tds: ThreadDatastorePtr,
tds: SharedPtr[ThreadDatastore[T]],
kb: KeyBuffer,
) =
@ -104,10 +104,10 @@ proc deleteTask*(
discard sig.fireSync()
proc getTask*(
proc getTask*[T](
sig: SharedSignal,
ret: TResult[DataBuffer],
tds: ThreadDatastorePtr,
tds: SharedPtr[ThreadDatastore[T]],
kb: KeyBuffer,
) =
echoed "getTask: ", $getThreadId(), " kb: ", kb.repr
@ -127,10 +127,10 @@ proc getTask*(
import std/os
proc putTask*(
proc putTask*[T](
sig: SharedSignal,
ret: TResult[void],
tds: ThreadDatastorePtr,
tds: SharedPtr[ThreadDatastore[T]],
kb: KeyBuffer,
db: DataBuffer,
) =
@ -141,10 +141,10 @@ proc putTask*(
# echo "putTask:kb: ", kb.toString
# echo "putTask:db: ", db.toString
let key = kb.toKey()
let key = kb
let data = db.toSeq(byte)
let res = (waitFor tds[].ds.put(key, data)).catch
let data = db
let res = tds[].ds.put(tds[].ds, key, data)
# print "thrbackend: putTask: fire", ret[].signal.fireSync().get()
if res.isErr:
ret.failure(res.error())
@ -155,10 +155,10 @@ proc putTask*(
sig.decr()
echoed "putTask: FINISH\n"
proc queryTask*(
proc queryTask*[T](
sig: SharedSignal,
ret: TResult[QueryResponseBuffer],
tds: ThreadDatastorePtr,
tds: SharedPtr[ThreadDatastore[T]],
qiter: QueryIterPtr,
) =
@ -179,10 +179,10 @@ proc queryTask*(
discard sig.fireSync()
proc query*(
proc query*[T](
sig: SharedSignal,
ret: TResult[QueryResponseBuffer],
tds: ThreadDatastorePtr,
tds: SharedPtr[ThreadDatastore[T]],
qiter: QueryIterPtr,
) =
tds[].tp.spawn queryTask(sig, ret, tds, qiter)

View File

@ -16,17 +16,16 @@ import ./querycommontests
import pretty
proc testThreadProxy() =
suite "Test Basic ThreadProxyDatastore":
var
sds: ThreadProxyDatastore
sds: ThreadProxyDatastore[MemoryDatastore]
mem: MemoryDatastore
key1: Key
data: seq[byte]
setupAll:
mem = MemoryDatastore.new()
mem = initMemoryDatastore()
sds = newThreadProxyDatastore(mem).expect("should work")
key1 = Key.init("/a").tryGet
data = "value for 1".toBytes()
@ -73,34 +72,34 @@ proc testThreadProxyBasics() =
basicStoreTests(sds, key, bytes, otherBytes)
proc testThreadProxyQuery() =
suite "Test Query":
var
mem: MemoryDatastore
sds: ThreadProxyDatastore
# proc testThreadProxyQuery() =
# suite "Test Query":
# var
# mem: MemoryDatastore
# sds: ThreadProxyDatastore
setup:
mem = MemoryDatastore.new()
sds = newThreadProxyDatastore(mem).expect("should work")
# setup:
# mem = MemoryDatastore.new()
# sds = newThreadProxyDatastore(mem).expect("should work")
queryTests(sds, false)
# queryTests(sds, false)
test "query iter fails":
# test "query iter fails":
expect FutureDefect:
let q = Query.init(key1)
# expect FutureDefect:
# let q = Query.init(key1)
(await sds.put(key1, val1)).tryGet
(await sds.put(key2, val2)).tryGet
(await sds.put(key3, val3)).tryGet
# (await sds.put(key1, val1)).tryGet
# (await sds.put(key2, val2)).tryGet
# (await sds.put(key3, val3)).tryGet
let
iter = (await sds.query(q)).tryGet
res = (await allFinished(toSeq(iter)))
.mapIt(it.read.tryGet)
.filterIt(it.key.isSome)
# let
# iter = (await sds.query(q)).tryGet
# res = (await allFinished(toSeq(iter)))
# .mapIt(it.read.tryGet)
# .filterIt(it.key.isSome)
check res.len() > 0
# check res.len() > 0
when isMainModule:
for i in 1..100: