mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-08 00:23:10 +00:00
switching up
This commit is contained in:
parent
ef402c4276
commit
73899e7fcf
@ -6,6 +6,7 @@ import pkg/questionable
|
|||||||
import pkg/questionable/results
|
import pkg/questionable/results
|
||||||
import pkg/upraises
|
import pkg/upraises
|
||||||
import pkg/taskpools
|
import pkg/taskpools
|
||||||
|
import pkg/stew/results
|
||||||
|
|
||||||
import ./key
|
import ./key
|
||||||
import ./query
|
import ./query
|
||||||
@ -48,9 +49,8 @@ method get*(
|
|||||||
key: Key
|
key: Key
|
||||||
): Future[?!seq[byte]] {.async.} =
|
): Future[?!seq[byte]] {.async.} =
|
||||||
|
|
||||||
var res = newThreadResult(DataBuffer)
|
var res = ?newThreadResult(DataBuffer)
|
||||||
res[].signal = ThreadSignalPtr.new().valueOr:
|
echo "res: ", res
|
||||||
return failure newException(DatastoreError, "error creating signal")
|
|
||||||
|
|
||||||
get(res, self.tds, key)
|
get(res, self.tds, key)
|
||||||
await wait(res[].signal)
|
await wait(res[].signal)
|
||||||
@ -66,18 +66,18 @@ method put*(
|
|||||||
data: seq[byte]
|
data: seq[byte]
|
||||||
): Future[?!void] {.async.} =
|
): Future[?!void] {.async.} =
|
||||||
|
|
||||||
var res = newThreadResult(void)
|
var res = ?newThreadResult(void)
|
||||||
res[].signal = ThreadSignalPtr.new().valueOr:
|
|
||||||
return failure newException(DatastoreError, "error creating signal")
|
|
||||||
|
|
||||||
|
echo "res: ", res
|
||||||
put(res, self.tds, key, data)
|
put(res, self.tds, key, data)
|
||||||
await wait(res[].signal)
|
await wait(res[].signal)
|
||||||
|
echo "closing signal"
|
||||||
res[].signal.close()
|
res[].signal.close()
|
||||||
|
|
||||||
echo "\nSharedDataStore:put:value: ", res[].repr
|
echo "\nSharedDataStore:put:value: ", res[].repr
|
||||||
|
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
|
|
||||||
method put*(
|
method put*(
|
||||||
self: SharedDatastore,
|
self: SharedDatastore,
|
||||||
batch: seq[BatchEntry]
|
batch: seq[BatchEntry]
|
||||||
@ -98,7 +98,7 @@ proc newSharedDataStore*(
|
|||||||
|
|
||||||
var
|
var
|
||||||
self = SharedDatastore()
|
self = SharedDatastore()
|
||||||
res = newThreadResult(ThreadDatastorePtr)
|
res = ?newThreadResult(ThreadDatastorePtr)
|
||||||
|
|
||||||
res[].value = newSharedPtr(ThreadDatastore)
|
res[].value = newSharedPtr(ThreadDatastore)
|
||||||
res[].signal = ThreadSignalPtr.new().valueOr:
|
res[].signal = ThreadSignalPtr.new().valueOr:
|
||||||
|
|||||||
@ -68,8 +68,14 @@ var
|
|||||||
fsDatastore {.threadvar.}: FSDatastore ##\
|
fsDatastore {.threadvar.}: FSDatastore ##\
|
||||||
## TODO: figure out a better way to capture this?
|
## TODO: figure out a better way to capture this?
|
||||||
|
|
||||||
proc newThreadResult*[T](tp: typedesc[T]): TResult[T] =
|
proc newThreadResult*[T](tp: typedesc[T]): Result[TResult[T], ref CatchableError] =
|
||||||
newSharedPtr(ThreadResult[T])
|
let res = newSharedPtr(ThreadResult[T])
|
||||||
|
let signal = ThreadSignalPtr.new()
|
||||||
|
if signal.isErr:
|
||||||
|
return err((ref CatchableError)(msg: signal.error()))
|
||||||
|
else:
|
||||||
|
res[].signal = signal.get()
|
||||||
|
ok res
|
||||||
|
|
||||||
proc startupDatastore(
|
proc startupDatastore(
|
||||||
ret: TResult[ThreadDatastorePtr],
|
ret: TResult[ThreadDatastorePtr],
|
||||||
@ -134,6 +140,8 @@ proc get*(
|
|||||||
|
|
||||||
tds[].tp.spawn getTask(ret, tds[].backend, bkey)
|
tds[].tp.spawn getTask(ret, tds[].backend, bkey)
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
proc putTask*(
|
proc putTask*(
|
||||||
ret: TResult[void],
|
ret: TResult[void],
|
||||||
backend: ThreadBackendKind,
|
backend: ThreadBackendKind,
|
||||||
@ -144,6 +152,7 @@ proc putTask*(
|
|||||||
print "\nthrbackend: putTask:key: ", key
|
print "\nthrbackend: putTask:key: ", key
|
||||||
print "\nthrbackend: putTask:data: ", data
|
print "\nthrbackend: putTask:data: ", data
|
||||||
|
|
||||||
|
os.sleep(200)
|
||||||
print "thrbackend: putTask: fire", ret[].signal.fireSync().get()
|
print "thrbackend: putTask: fire", ret[].signal.fireSync().get()
|
||||||
|
|
||||||
proc put*(
|
proc put*(
|
||||||
@ -151,7 +160,7 @@ proc put*(
|
|||||||
tds: ThreadDatastorePtr,
|
tds: ThreadDatastorePtr,
|
||||||
key: Key,
|
key: Key,
|
||||||
data: seq[byte]
|
data: seq[byte]
|
||||||
): TResult[void] =
|
) =
|
||||||
echo "thrfrontend:put: "
|
echo "thrfrontend:put: "
|
||||||
let bkey = StringBuffer.new(key.id())
|
let bkey = StringBuffer.new(key.id())
|
||||||
let bval = DataBuffer.new(data)
|
let bval = DataBuffer.new(data)
|
||||||
|
|||||||
@ -31,7 +31,9 @@ suite "Test Basic SharedDatastore":
|
|||||||
|
|
||||||
echo "\n\n=== put ==="
|
echo "\n\n=== put ==="
|
||||||
let key1 = Key.init("/a").tryGet
|
let key1 = Key.init("/a").tryGet
|
||||||
let res1 = await sds.put(key1, "value for 1".toBytes())
|
# let res1 = await sds.put(key1, "value for 1".toBytes())
|
||||||
|
let res1 = sds.put(key1, "value for 1".toBytes())
|
||||||
|
res1.cancel()
|
||||||
echo "res1: ", res1.repr
|
echo "res1: ", res1.repr
|
||||||
|
|
||||||
echo "\n\n=== get ==="
|
echo "\n\n=== get ==="
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user