nim-datastore/datastore/sharedds.nim

124 lines
2.4 KiB
Nim
Raw Normal View History

2023-08-24 15:51:25 -07:00
import std/tables
import pkg/chronos
2023-08-24 17:33:32 -07:00
import pkg/chronos/threadsync
2023-08-24 15:51:25 -07:00
import pkg/questionable
import pkg/questionable/results
import pkg/upraises
2023-08-24 18:28:30 -07:00
import pkg/taskpools
2023-08-25 14:34:11 -07:00
import pkg/stew/results
2023-08-24 15:51:25 -07:00
import ./key
import ./query
import ./datastore
2023-08-24 18:28:30 -07:00
import ./threadbackend
2023-08-24 19:55:44 -07:00
import threading/smartptrs
2023-08-24 15:51:25 -07:00
2023-08-24 21:55:53 -07:00
import pretty
2023-08-24 19:50:58 -07:00
export key, query, ThreadBackend
2023-08-24 15:51:25 -07:00
push: {.upraises: [].}
type
2023-08-24 15:57:15 -07:00
SharedDatastore* = ref object of Datastore
# stores*: Table[Key, SharedDatastore]
2023-08-24 18:28:30 -07:00
tds: ThreadDatastorePtr
2023-08-24 15:51:25 -07:00
method has*(
2023-08-24 15:57:15 -07:00
self: SharedDatastore,
key: Key
): Future[?!bool] {.async.} =
return success(true)
2023-08-24 15:51:25 -07:00
method delete*(
2023-08-24 15:57:15 -07:00
self: SharedDatastore,
key: Key
): Future[?!void] {.async.} =
return success()
2023-08-24 15:51:25 -07:00
method delete*(
2023-08-24 15:57:15 -07:00
self: SharedDatastore,
keys: seq[Key]
): Future[?!void] {.async.} =
2023-08-24 15:51:25 -07:00
return success()
method get*(
2023-08-24 15:57:15 -07:00
self: SharedDatastore,
key: Key
): Future[?!seq[byte]] {.async.} =
2023-08-24 22:14:21 -07:00
2023-08-25 14:46:29 -07:00
without ret =? newThreadResult(DataBuffer), err:
return failure(err)
2023-08-24 22:14:21 -07:00
2023-08-25 15:00:18 -07:00
try:
get(ret, self.tds, key)
await wait(ret[].signal)
finally:
echo "closing signal"
ret[].signal.close()
2023-08-24 22:14:21 -07:00
2023-08-25 15:21:12 -07:00
print "\nSharedDataStore:put:value: ", ret[]
2023-08-25 14:46:29 -07:00
let data = ret[].value.toSeq(byte)
2023-08-24 22:14:21 -07:00
return success(data)
2023-08-24 15:51:25 -07:00
method put*(
2023-08-24 15:57:15 -07:00
self: SharedDatastore,
2023-08-24 15:51:25 -07:00
key: Key,
2023-08-24 15:57:15 -07:00
data: seq[byte]
): Future[?!void] {.async.} =
2023-08-24 15:51:25 -07:00
2023-08-25 14:46:29 -07:00
without ret =? newThreadResult(void), err:
return failure(err)
2023-08-24 17:59:43 -07:00
2023-08-25 14:46:29 -07:00
echo "res: ", ret
2023-08-25 15:00:18 -07:00
try:
put(ret, self.tds, key, data)
await wait(ret[].signal)
finally:
echo "closing signal"
ret[].signal.close()
2023-08-24 21:55:53 -07:00
2023-08-25 14:46:29 -07:00
echo "\nSharedDataStore:put:value: ", ret[].repr
2023-08-24 17:59:43 -07:00
return success()
2023-08-24 15:51:25 -07:00
method put*(
2023-08-24 15:57:15 -07:00
self: SharedDatastore,
batch: seq[BatchEntry]
): Future[?!void] {.async.} =
2023-08-24 17:33:32 -07:00
raiseAssert("Not implemented!")
2023-08-24 15:51:25 -07:00
2023-08-24 15:57:15 -07:00
method close*(
self: SharedDatastore
): Future[?!void] {.async.} =
2023-08-24 15:51:25 -07:00
# TODO: how to handle failed close?
return success()
2023-08-24 18:57:06 -07:00
proc newSharedDataStore*(
2023-08-24 18:48:56 -07:00
# T: typedesc[SharedDatastore],
2023-08-24 18:28:30 -07:00
backend: ThreadBackend,
2023-08-24 19:02:46 -07:00
): Future[?!SharedDatastore] {.async.} =
2023-08-24 15:51:25 -07:00
2023-08-24 18:28:30 -07:00
var
self = SharedDatastore()
2023-08-25 14:46:29 -07:00
without res =? newThreadResult(ThreadDatastorePtr), err:
return failure(err)
2023-08-24 19:55:44 -07:00
2023-08-25 15:00:18 -07:00
try:
res[].value = newSharedPtr(ThreadDatastore)
echo "\nnewDataStore: threadId:", getThreadId()
res.createThreadDatastore(backend)
await wait(res[].signal)
finally:
echo "closing signal"
res[].signal.close()
2023-08-24 19:10:08 -07:00
2023-08-25 15:21:12 -07:00
print "\nnewSharedDataStore:state: ", res[].state
print "\nnewSharedDataStore:value: ", res[].value[].backend
2023-08-24 21:18:22 -07:00
2023-08-24 21:55:53 -07:00
self.tds = res[].value
2023-08-24 15:51:25 -07:00
success self