This commit is contained in:
Jaremy Creechley 2023-09-28 17:36:05 -07:00
parent 4002ba9507
commit 01ccbb5798
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
3 changed files with 35 additions and 63 deletions

View File

@ -6,94 +6,68 @@ import pkg/questionable
import pkg/questionable/results
from pkg/stew/results as stewResults import get, isErr
import pkg/upraises
import pkg/chronos
import pkg/taskpools
import ./threads/backend
import ./threads/sqlbackend
import ./threads/threadproxyds
import ./datastore
export datastore
export datastore, Taskpool
push: {.upraises: [].}
type
SQLiteDatastore* = ref object of Datastore
db: SQLiteBackend[KeyId, DataBuffer]
db: ThreadDatastore[SQLiteBackend[KeyId, DataBuffer]]
proc path*(self: SQLiteDatastore): string =
self.db.path()
self.db.backend.path()
proc readOnly*(self: SQLiteDatastore): bool =
self.db.readOnly()
self.db.backend.readOnly()
method has*(self: SQLiteDatastore,
key: Key): Future[?!bool] {.async.} =
return self.db.has(KeyId.new key.id())
await self.db.has(key)
method delete*(self: SQLiteDatastore,
key: Key): Future[?!void] {.async.} =
return self.db.delete(KeyId.new key.id())
await self.db.delete(key)
method delete*(self: SQLiteDatastore,
keys: seq[Key]): Future[?!void] {.async.} =
let dkeys = keys.mapIt(KeyId.new it.id())
return self.db.delete(dkeys)
await self.db.delete(keys)
method get*(self: SQLiteDatastore,
key: Key): Future[?!seq[byte]] {.async.} =
self.db.get(KeyId.new key.id()).map() do(d: DataBuffer) -> seq[byte]:
d.toSeq()
await self.db.get(key)
method put*(self: SQLiteDatastore,
key: Key,
data: seq[byte]): Future[?!void] {.async.} =
self.db.put(KeyId.new key.id(), DataBuffer.new data)
await self.db.put(key, data)
method put*(self: SQLiteDatastore,
batch: seq[BatchEntry]): Future[?!void] {.async.} =
var dbatch: seq[tuple[key: KeyId, data: DataBuffer]]
for entry in batch:
dbatch.add((KeyId.new entry.key.id(), DataBuffer.new entry.data))
self.db.put(dbatch)
await self.db.put(batch)
method close*(self: SQLiteDatastore): Future[?!void] {.async.} =
self.db.close()
await self.db.close()
method queryIter*(self: SQLiteDatastore,
query: Query
): ?!(iterator(): ?!QueryResponse) =
let dbquery = dbQuery(
key= KeyId.new query.key.id(),
value= query.value,
limit= query.limit,
offset= query.offset,
sort= query.sort,
)
var qhandle = ? self.db.query(dbquery)
let iter = iterator(): ?!QueryResponse =
for resp in qhandle.queryIter():
without qres =? resp, err:
yield QueryResponse.failure err
let k = qres.key.map() do(k: KeyId) -> Key:
Key.init($k).expect("valid key")
let v: seq[byte] = qres.data.toSeq()
yield success (k, v)
success iter
method query*(self: SQLiteDatastore,
q: Query): Future[?!QueryIter] {.async.} =
await self.db.query(q)
proc new*(
T: type SQLiteDatastore,
path: string,
readOnly = false): ?!SQLiteDatastore =
readOnly = false,
tp: Taskpool,
): ?!SQLiteDatastore =
success SQLiteDatastore(
db: ? newSQLiteBackend[KeyId, DataBuffer](path, readOnly))
proc new*(
T: type SQLiteDatastore,
db: SQLiteBackend[KeyId, DataBuffer]): ?!T =
success T(
db: db,
readOnly: db.readOnly)
let
backend = ? newSQLiteBackend[KeyId, DataBuffer](path, readOnly)
db = ? ThreadDatastore.new(backend, tp = tp)
success SQLiteDatastore(db: db)

View File

@ -16,12 +16,12 @@ import pkg/chronos/threadsync
import pkg/questionable
import pkg/questionable/results
import pkg/taskpools
import std/isolation
import pkg/chronicles
import pkg/threading/smartptrs
import ../key
import ../query
import ../datastore
import ./backend
import ./fsbackend
import ./sqlbackend
@ -30,7 +30,7 @@ import ./asyncsemaphore
import ./databuffer
import ./threadresult
export threadresult
export threadresult, smartptrs, isolation, chronicles
logScope:
topics = "datastore threadproxyds"
@ -48,10 +48,10 @@ type
## Task context object.
## This is a SharedPtr to make the query iter simpler
ThreadDatastore*[BT] = ref object of Datastore
tp: Taskpool
backend: BT
semaphore: AsyncSemaphore # semaphore is used for backpressure \
ThreadDatastore*[BT] = object
tp*: Taskpool
backend*: BT
semaphore*: AsyncSemaphore # semaphore is used for backpressure \
# to avoid exhausting file descriptors
proc newTaskCtx*[T](tp: typedesc[T],
@ -207,9 +207,9 @@ proc put*[BT](self: ThreadDatastore[BT],
return ctx[].res.toRes()
proc put*[DB](
proc put*[E, DB](
self: ThreadDatastore[DB],
batch: seq[BatchEntry]): Future[?!void] {.async.} =
batch: seq[E]): Future[?!void] {.async.} =
## put batch data
for entry in batch:
if err =? (await self.put(entry.key, entry.data)).errorOption:

View File

@ -37,15 +37,13 @@ for i in 1..N:
suite "Test Basic ThreadDatastore with SQLite " & $i:
var
sqlStore: SQLiteBackend[KeyId, DataBuffer]
ds: ThreadDatastore[SQLiteBackend[KeyId, DataBuffer]]
ds: SQLiteDatastore
key = Key.init("/a/b").tryGet()
bytes = "some bytes".toBytes
otherBytes = "some other bytes".toBytes
setupAll:
sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet()
ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
ds = SQLiteDatastore.new(Memory, tp = taskPool).tryGet()
teardown:
GC_fullCollect()