mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-04 06:33:11 +00:00
Merge pull request #58 from codex-storage/concurrent-datastore
Concurrent datastore interface & sqlite implementation
This commit is contained in:
commit
8f99eb06bd
2
.github/workflows/tests.yml
vendored
2
.github/workflows/tests.yml
vendored
@ -13,7 +13,7 @@ jobs:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
cache_nonce: [ 1 ]
|
||||
nim_version: [ 1.6.14 ]
|
||||
nim_version: [ 1.6.18 ]
|
||||
platform:
|
||||
- {
|
||||
icon: 🐧,
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import pkg/chronos
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/upraises
|
||||
|
||||
@ -12,6 +13,9 @@ push: {.upraises: [].}
|
||||
|
||||
type
|
||||
BatchEntry* = tuple[key: Key, data: seq[byte]]
|
||||
Function*[T, U] = proc(value: T): U {.raises: [CatchableError], gcsafe, closure.}
|
||||
Modify* = Function[?seq[byte], Future[?seq[byte]]]
|
||||
ModifyGet* = Function[?seq[byte], Future[(?seq[byte], seq[byte])]]
|
||||
|
||||
method has*(self: Datastore, key: Key): Future[?!bool] {.base, locks: "unknown".} =
|
||||
raiseAssert("Not implemented!")
|
||||
@ -42,3 +46,34 @@ method query*(
|
||||
|
||||
proc contains*(self: Datastore, key: Key): Future[bool] {.async.} =
|
||||
return (await self.has(key)) |? false
|
||||
|
||||
method modify*(self: Datastore, key: Key, fn: Modify): Future[?!void] {.base, locks: "unknown".} =
|
||||
## Concurrently safe way of modifying the value associated with the `key`.
|
||||
##
|
||||
## Same as `modifyGet`, but this takes `fn` that doesn't produce any auxillary value.
|
||||
##
|
||||
|
||||
raiseAssert("Not implemented!")
|
||||
|
||||
method modifyGet*(self: Datastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, locks: "unknown".} =
|
||||
## Concurrently safe way of updating value associated with the `key`. Returns auxillary value on
|
||||
## successful update.
|
||||
##
|
||||
## This method first reads a value stored under the `key`, if such value exists it's wrapped into `some`
|
||||
## and passed as the only arg to the `fn`, otherwise `none` is passed.
|
||||
##
|
||||
## Table below presents four possibilities of execution. `curr` represents a value passed to `fn`,
|
||||
## while `fn(curr)` represents a value returned by calling `fn` (auxillary value is omitted for clarity).
|
||||
##
|
||||
## | curr | fn(curr) | action |
|
||||
## |---------|----------|------------------------------|
|
||||
## | none | none | no action |
|
||||
## | none | some(v) | insert v |
|
||||
## | some(u) | none | delete u |
|
||||
## | some(u) | some(v) | replace u with v (if u != v) |
|
||||
##
|
||||
## Note that `fn` can be called multiple times (when concurrent modification was detected). In such case
|
||||
## only the last auxillary value is returned.
|
||||
##
|
||||
|
||||
raiseAssert("Not implemented!")
|
||||
63
datastore/defaultimpl.nim
Normal file
63
datastore/defaultimpl.nim
Normal file
@ -0,0 +1,63 @@
|
||||
import pkg/chronos
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
|
||||
import ./datastore
|
||||
|
||||
proc defaultModifyGetImpl*(
|
||||
self: Datastore,
|
||||
lock: AsyncLock,
|
||||
key: Key,
|
||||
fn: ModifyGet
|
||||
): Future[?!seq[byte]] {.async.} =
|
||||
# Default implementation, serializes all modify operations using provided lock
|
||||
#
|
||||
|
||||
await lock.acquire()
|
||||
|
||||
try:
|
||||
without data =? await self.get(key), err:
|
||||
if not (err of DatastoreKeyNotFound):
|
||||
return failure(err)
|
||||
|
||||
let maybeCurrentData =
|
||||
if data.len == 0:
|
||||
seq[byte].none
|
||||
else:
|
||||
data.some
|
||||
|
||||
var
|
||||
maybeNewData: ?seq[byte]
|
||||
aux: seq[byte]
|
||||
|
||||
try:
|
||||
(maybeNewData, aux) = await fn(maybeCurrentData)
|
||||
except CatchableError as err:
|
||||
return failure(err)
|
||||
|
||||
if newData =? maybeNewData:
|
||||
if err =? (await self.put(key, newData)).errorOption:
|
||||
return failure(err)
|
||||
elif currentData =? maybeCurrentData:
|
||||
if err =? (await self.delete(key)).errorOption:
|
||||
return failure(err)
|
||||
|
||||
return aux.success
|
||||
finally:
|
||||
lock.release()
|
||||
|
||||
proc defaultModifyImpl*(
|
||||
self: Datastore,
|
||||
lock: AsyncLock,
|
||||
key: Key,
|
||||
fn: Modify
|
||||
): Future[?!void] {.async.} =
|
||||
proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} =
|
||||
let res = await fn(maybeValue)
|
||||
let ignoredAux = newSeq[byte]()
|
||||
return (res, ignoredAux)
|
||||
|
||||
if err =? (await self.defaultModifyGetImpl(lock, key, wrappedFn)).errorOption:
|
||||
return failure(err)
|
||||
else:
|
||||
return success()
|
||||
@ -1,5 +1,6 @@
|
||||
import std/os
|
||||
import std/options
|
||||
import std/tables
|
||||
import std/strutils
|
||||
|
||||
import pkg/chronos
|
||||
@ -8,6 +9,7 @@ import pkg/questionable/results
|
||||
from pkg/stew/results as stewResults import get, isErr
|
||||
import pkg/upraises
|
||||
|
||||
import ./defaultimpl
|
||||
import ./datastore
|
||||
|
||||
export datastore
|
||||
@ -19,6 +21,7 @@ type
|
||||
root*: string
|
||||
ignoreProtected: bool
|
||||
depth: int
|
||||
locks: TableRef[Key, AsyncLock]
|
||||
|
||||
proc validDepth*(self: FSDatastore, key: Key): bool =
|
||||
key.len <= self.depth
|
||||
@ -217,6 +220,30 @@ method query*(
|
||||
iter.next = next
|
||||
return success iter
|
||||
|
||||
method modifyGet*(
|
||||
self: FSDatastore,
|
||||
key: Key,
|
||||
fn: ModifyGet): Future[?!seq[byte]] {.async.} =
|
||||
var lock: AsyncLock
|
||||
try:
|
||||
lock = self.locks.mgetOrPut(key, newAsyncLock())
|
||||
return await defaultModifyGetImpl(self, lock, key, fn)
|
||||
finally:
|
||||
if not lock.locked:
|
||||
self.locks.del(key)
|
||||
|
||||
method modify*(
|
||||
self: FSDatastore,
|
||||
key: Key,
|
||||
fn: Modify): Future[?!void] {.async.} =
|
||||
var lock: AsyncLock
|
||||
try:
|
||||
lock = self.locks.mgetOrPut(key, newAsyncLock())
|
||||
return await defaultModifyImpl(self, lock, key, fn)
|
||||
finally:
|
||||
if not lock.locked:
|
||||
self.locks.del(key)
|
||||
|
||||
proc new*(
|
||||
T: type FSDatastore,
|
||||
root: string,
|
||||
@ -235,4 +262,6 @@ proc new*(
|
||||
success T(
|
||||
root: root,
|
||||
ignoreProtected: ignoreProtected,
|
||||
depth: depth)
|
||||
depth: depth,
|
||||
locks: newTable[Key, AsyncLock]()
|
||||
)
|
||||
|
||||
@ -119,6 +119,26 @@ method put*(
|
||||
|
||||
return success()
|
||||
|
||||
method modifyGet*(
|
||||
self: MountedDatastore,
|
||||
key: Key,
|
||||
fn: ModifyGet): Future[?!seq[byte]] {.async.} =
|
||||
|
||||
without mounted =? self.dispatch(key), error:
|
||||
return failure(error)
|
||||
|
||||
return await mounted.store.store.modifyGet(mounted.relative, fn)
|
||||
|
||||
method modify*(
|
||||
self: MountedDatastore,
|
||||
key: Key,
|
||||
fn: Modify): Future[?!void] {.async.} =
|
||||
|
||||
without mounted =? self.dispatch(key), error:
|
||||
return failure(error)
|
||||
|
||||
return await mounted.store.store.modify(mounted.relative, fn)
|
||||
|
||||
method close*(self: MountedDatastore): Future[?!void] {.async.} =
|
||||
for s in self.stores.values:
|
||||
discard await s.store.close()
|
||||
|
||||
@ -29,6 +29,106 @@ proc `readOnly=`*(self: SQLiteDatastore): bool
|
||||
proc timestamp*(t = epochTime()): int64 =
|
||||
(t * 1_000_000).int64
|
||||
|
||||
const initVersion* = 0.int64
|
||||
|
||||
method modifyGet*(self: SQLiteDatastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.async.} =
|
||||
var
|
||||
retriesLeft = 100 # allows reasonable concurrency, avoids infinite loop
|
||||
aux: seq[byte]
|
||||
|
||||
while retriesLeft > 0:
|
||||
var
|
||||
currentData: seq[byte]
|
||||
currentVersion: int64
|
||||
|
||||
proc onData(s: RawStmtPtr) =
|
||||
currentData = dataCol(s, GetVersionedStmtDataCol)()
|
||||
currentVersion = versionCol(s, GetVersionedStmtVersionCol)()
|
||||
|
||||
if err =? self.db.getVersionedStmt.query((key.id), onData).errorOption:
|
||||
return failure(err)
|
||||
|
||||
let maybeCurrentData = if currentData.len > 0: some(currentData) else: seq[byte].none
|
||||
var maybeNewData: ?seq[byte]
|
||||
|
||||
try:
|
||||
(maybeNewData, aux) = await fn(maybeCurrentData)
|
||||
except CatchableError as err:
|
||||
return failure(err)
|
||||
|
||||
if maybeCurrentData == maybeNewData:
|
||||
# no need to change currently stored value
|
||||
break
|
||||
|
||||
if err =? self.db.beginStmt.exec().errorOption:
|
||||
return failure(err)
|
||||
if currentData =? maybeCurrentData and newData =? maybeNewData:
|
||||
let updateParams = (
|
||||
newData,
|
||||
currentVersion + 1,
|
||||
timestamp(),
|
||||
key.id,
|
||||
currentVersion
|
||||
)
|
||||
if err =? (self.db.updateVersionedStmt.exec(updateParams)).errorOption:
|
||||
return failure(err)
|
||||
elif currentData =? maybeCurrentData:
|
||||
let deleteParams = (
|
||||
key.id,
|
||||
currentVersion
|
||||
)
|
||||
if err =? (self.db.deleteVersionedStmt.exec(deleteParams)).errorOption:
|
||||
return failure(err)
|
||||
elif newData =? maybeNewData:
|
||||
let insertParams = (
|
||||
key.id,
|
||||
newData,
|
||||
initVersion,
|
||||
timestamp()
|
||||
)
|
||||
if err =? (self.db.insertVersionedStmt.exec(insertParams)).errorOption:
|
||||
return failure(err)
|
||||
|
||||
var changes = 0.int64
|
||||
proc onChangesResult(s: RawStmtPtr) =
|
||||
changes = changesCol(s, 0)()
|
||||
|
||||
if err =? self.db.getChangesStmt.query((), onChangesResult).errorOption:
|
||||
if err =? self.db.rollbackStmt.exec().errorOption:
|
||||
return failure(err)
|
||||
return failure(err)
|
||||
|
||||
if changes == 1:
|
||||
if err =? self.db.endStmt.exec().errorOption:
|
||||
return failure(err)
|
||||
break
|
||||
elif changes == 0:
|
||||
# race condition detected
|
||||
if err =? self.db.rollbackStmt.exec().errorOption:
|
||||
return failure(err)
|
||||
retriesLeft.dec
|
||||
else:
|
||||
if err =? self.db.rollbackStmt.exec().errorOption:
|
||||
return failure(err)
|
||||
return failure("Unexpected number of changes, expected either 0 or 1, was " & $changes)
|
||||
|
||||
if retriesLeft == 0:
|
||||
return failure("Retry limit exceeded")
|
||||
|
||||
return success(aux)
|
||||
|
||||
|
||||
method modify*(self: SQLiteDatastore, key: Key, fn: Modify): Future[?!void] {.async.} =
|
||||
proc wrappedFn(maybeValue: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} =
|
||||
let res = await fn(maybeValue)
|
||||
let ignoredAux = newSeq[byte]()
|
||||
return (res, ignoredAux)
|
||||
|
||||
if err =? (await self.modifyGet(key, wrappedFn)).errorOption:
|
||||
return failure(err)
|
||||
else:
|
||||
return success()
|
||||
|
||||
method has*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} =
|
||||
var
|
||||
exists = false
|
||||
@ -81,14 +181,14 @@ method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} =
|
||||
return success bytes
|
||||
|
||||
method put*(self: SQLiteDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} =
|
||||
return self.db.putStmt.exec((key.id, data, timestamp()))
|
||||
return self.db.putStmt.exec((key.id, data, initVersion, timestamp()))
|
||||
|
||||
method put*(self: SQLiteDatastore, batch: seq[BatchEntry]): Future[?!void] {.async.} =
|
||||
if err =? self.db.beginStmt.exec().errorOption:
|
||||
return failure err
|
||||
|
||||
for entry in batch:
|
||||
if err =? self.db.putStmt.exec((entry.key.id, entry.data, timestamp())).errorOption:
|
||||
if err =? self.db.putStmt.exec((entry.key.id, entry.data, initVersion, timestamp())).errorOption:
|
||||
if err =? self.db.rollbackStmt.exec().errorOption:
|
||||
return failure err
|
||||
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import std/os
|
||||
import std/strformat
|
||||
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
@ -10,6 +11,7 @@ export sqliteutils
|
||||
|
||||
type
|
||||
BoundIdCol* = proc (): string {.closure, gcsafe, upraises: [].}
|
||||
BoundVersionCol* = proc (): int64 {.closure, gcsafe, upraises: [].}
|
||||
BoundDataCol* = proc (): seq[byte] {.closure, gcsafe, upraises: [].}
|
||||
BoundTimestampCol* = proc (): int64 {.closure, gcsafe, upraises: [].}
|
||||
|
||||
@ -19,8 +21,13 @@ type
|
||||
ContainsStmt* = SQLiteStmt[(string), void]
|
||||
DeleteStmt* = SQLiteStmt[(string), void]
|
||||
GetStmt* = SQLiteStmt[(string), void]
|
||||
PutStmt* = SQLiteStmt[(string, seq[byte], int64), void]
|
||||
PutStmt* = SQLiteStmt[(string, seq[byte], int64, int64), void]
|
||||
QueryStmt* = SQLiteStmt[(string), void]
|
||||
GetVersionedStmt* = SQLiteStmt[(string), void]
|
||||
InsertVersionedStmt* = SQLiteStmt[(string, seq[byte], int64, int64), void]
|
||||
UpdateVersionedStmt* = SQLiteStmt[(seq[byte], int64, int64, string, int64), void]
|
||||
DeleteVersionedStmt* = SQLiteStmt[(string, int64), void]
|
||||
GetChangesStmt* = NoParamsStmt
|
||||
BeginStmt* = NoParamsStmt
|
||||
EndStmt* = NoParamsStmt
|
||||
RollbackStmt* = NoParamsStmt
|
||||
@ -34,6 +41,11 @@ type
|
||||
getDataCol*: BoundDataCol
|
||||
getStmt*: GetStmt
|
||||
putStmt*: PutStmt
|
||||
getVersionedStmt*: GetVersionedStmt
|
||||
updateVersionedStmt*: UpdateVersionedStmt
|
||||
insertVersionedStmt*: InsertVersionedStmt
|
||||
deleteVersionedStmt*: DeleteVersionedStmt
|
||||
getChangesStmt*: GetChangesStmt
|
||||
beginStmt*: BeginStmt
|
||||
endStmt*: EndStmt
|
||||
rollbackStmt*: RollbackStmt
|
||||
@ -44,10 +56,12 @@ const
|
||||
|
||||
IdColName* = "id"
|
||||
DataColName* = "data"
|
||||
VersionColName* = "version"
|
||||
TimestampColName* = "timestamp"
|
||||
|
||||
IdColType = "TEXT"
|
||||
DataColType = "BLOB"
|
||||
VersionColType = "INTEGER"
|
||||
TimestampColType = "INTEGER"
|
||||
|
||||
Memory* = ":memory:"
|
||||
@ -69,6 +83,7 @@ const
|
||||
CREATE TABLE IF NOT EXISTS """ & TableName & """ (
|
||||
""" & IdColName & """ """ & IdColType & """ NOT NULL PRIMARY KEY,
|
||||
""" & DataColName & """ """ & DataColType & """,
|
||||
""" & VersionColName & """ """ & VersionColType & """ NOT NULL,
|
||||
""" & TimestampColName & """ """ & TimestampColType & """ NOT NULL
|
||||
) WITHOUT ROWID;
|
||||
"""
|
||||
@ -89,8 +104,9 @@ const
|
||||
REPLACE INTO """ & TableName & """ (
|
||||
""" & IdColName & """,
|
||||
""" & DataColName & """,
|
||||
""" & VersionColName & """,
|
||||
""" & TimestampColName & """
|
||||
) VALUES (?, ?, ?)
|
||||
) VALUES (?, ?, ?, ?)
|
||||
"""
|
||||
|
||||
QueryStmtIdStr* = """
|
||||
@ -119,6 +135,43 @@ const
|
||||
ORDER BY """ & IdColName & """ DESC
|
||||
"""
|
||||
|
||||
GetVersionedStmtStr* = fmt"""
|
||||
SELECT {DataColName}, {VersionColName} FROM {TableName}
|
||||
WHERE {IdColName} = ?
|
||||
"""
|
||||
|
||||
GetVersionedStmtDataCol* = 0
|
||||
GetVersionedStmtVersionCol* = 1
|
||||
|
||||
InsertVersionedStmtStr* = fmt"""
|
||||
INSERT INTO {TableName}
|
||||
(
|
||||
{IdColName},
|
||||
{DataColName},
|
||||
{VersionColName},
|
||||
{TimestampColName}
|
||||
)
|
||||
VALUES (?, ?, ?, ?)
|
||||
"""
|
||||
|
||||
UpdateVersionedStmtStr* = fmt"""
|
||||
UPDATE {TableName}
|
||||
SET
|
||||
{DataColName} = ?,
|
||||
{VersionColName} = ?,
|
||||
{TimestampColName} = ?
|
||||
WHERE {IdColName} = ? AND {VersionColName} = ?
|
||||
"""
|
||||
|
||||
DeleteVersionedStmtStr* = fmt"""
|
||||
DELETE FROM {TableName}
|
||||
WHERE {IdColName} = ? AND {VersionColName} = ?
|
||||
"""
|
||||
|
||||
GetChangesStmtStr* = fmt"""
|
||||
SELECT changes()
|
||||
"""
|
||||
|
||||
BeginTransactionStr* = """
|
||||
BEGIN;
|
||||
"""
|
||||
@ -197,6 +250,21 @@ proc timestampCol*(
|
||||
return proc (): int64 =
|
||||
sqlite3_column_int64(s, index.cint)
|
||||
|
||||
proc versionCol*(
|
||||
s: RawStmtPtr,
|
||||
index: int): BoundVersionCol =
|
||||
|
||||
checkColMetadata(s, index, VersionColName)
|
||||
|
||||
return proc (): int64 =
|
||||
sqlite3_column_int64(s, index.cint)
|
||||
|
||||
proc changesCol*(
|
||||
s: RawStmtPtr,
|
||||
index: int): BoundVersionCol =
|
||||
return proc (): int64 =
|
||||
sqlite3_column_int64(s, index.cint)
|
||||
|
||||
proc getDBFilePath*(path: string): ?!string =
|
||||
try:
|
||||
let
|
||||
@ -217,6 +285,11 @@ proc close*(self: SQLiteDsDb) =
|
||||
self.beginStmt.dispose
|
||||
self.endStmt.dispose
|
||||
self.rollbackStmt.dispose
|
||||
self.getVersionedStmt.dispose
|
||||
self.updateVersionedStmt.dispose
|
||||
self.insertVersionedStmt.dispose
|
||||
self.deleteVersionedStmt.dispose
|
||||
self.getChangesStmt.dispose
|
||||
|
||||
if not RawStmtPtr(self.deleteStmt).isNil:
|
||||
self.deleteStmt.dispose
|
||||
@ -266,6 +339,11 @@ proc open*(
|
||||
deleteStmt: DeleteStmt
|
||||
getStmt: GetStmt
|
||||
putStmt: PutStmt
|
||||
getVersionedStmt: GetVersionedStmt
|
||||
updateVersionedStmt: UpdateVersionedStmt
|
||||
insertVersionedStmt: InsertVersionedStmt
|
||||
deleteVersionedStmt: DeleteVersionedStmt
|
||||
getChangesStmt: GetChangesStmt
|
||||
beginStmt: BeginStmt
|
||||
endStmt: EndStmt
|
||||
rollbackStmt: RollbackStmt
|
||||
@ -279,6 +357,18 @@ proc open*(
|
||||
putStmt = ? PutStmt.prepare(
|
||||
env.val, PutStmtStr, SQLITE_PREPARE_PERSISTENT)
|
||||
|
||||
insertVersionedStmt = ? InsertVersionedStmt.prepare(
|
||||
env.val, InsertVersionedStmtStr, SQLITE_PREPARE_PERSISTENT)
|
||||
|
||||
updateVersionedStmt = ? UpdateVersionedStmt.prepare(
|
||||
env.val, UpdateVersionedStmtStr, SQLITE_PREPARE_PERSISTENT)
|
||||
|
||||
deleteVersionedStmt = ? DeleteVersionedStmt.prepare(
|
||||
env.val, DeleteVersionedStmtStr, SQLITE_PREPARE_PERSISTENT)
|
||||
|
||||
getChangesStmt = ? GetChangesStmt.prepare(
|
||||
env.val, GetChangesStmtStr, SQLITE_PREPARE_PERSISTENT)
|
||||
|
||||
beginStmt = ? BeginStmt.prepare(
|
||||
env.val, BeginTransactionStr, SQLITE_PREPARE_PERSISTENT)
|
||||
|
||||
@ -294,6 +384,9 @@ proc open*(
|
||||
getStmt = ? GetStmt.prepare(
|
||||
env.val, GetStmtStr, SQLITE_PREPARE_PERSISTENT)
|
||||
|
||||
getVersionedStmt = ? GetVersionedStmt.prepare(
|
||||
env.val, GetVersionedStmtStr, SQLITE_PREPARE_PERSISTENT)
|
||||
|
||||
# if a readOnly/existing database does not satisfy the expected schema
|
||||
# `pepare()` will fail and `new` will return an error with message
|
||||
# "SQL logic error"
|
||||
@ -310,6 +403,11 @@ proc open*(
|
||||
getStmt: getStmt,
|
||||
getDataCol: getDataCol,
|
||||
putStmt: putStmt,
|
||||
getVersionedStmt: getVersionedStmt,
|
||||
updateVersionedStmt: updateVersionedStmt,
|
||||
insertVersionedStmt: insertVersionedStmt,
|
||||
deleteVersionedStmt: deleteVersionedStmt,
|
||||
getChangesStmt: getChangesStmt,
|
||||
beginStmt: beginStmt,
|
||||
endStmt: endStmt,
|
||||
rollbackStmt: rollbackStmt)
|
||||
|
||||
@ -116,6 +116,37 @@ method put*(
|
||||
|
||||
return success()
|
||||
|
||||
method modifyGet*(
|
||||
self: TieredDatastore,
|
||||
key: Key,
|
||||
fn: ModifyGet): Future[?!seq[byte]] {.async.} =
|
||||
|
||||
let
|
||||
pending = await allFinished(self.stores.mapIt(it.modifyGet(key, fn)))
|
||||
|
||||
var aux = newSeq[byte]()
|
||||
|
||||
for fut in pending:
|
||||
if fut.read().isErr:
|
||||
return fut.read()
|
||||
else:
|
||||
aux.add(fut.read().get)
|
||||
|
||||
return success(aux)
|
||||
|
||||
method modify*(
|
||||
self: TieredDatastore,
|
||||
key: Key,
|
||||
fn: Modify): Future[?!void] {.async.} =
|
||||
|
||||
let
|
||||
pending = await allFinished(self.stores.mapIt(it.modify(key, fn)))
|
||||
|
||||
for fut in pending:
|
||||
if fut.read().isErr: return fut.read()
|
||||
|
||||
return success()
|
||||
|
||||
# method query*(
|
||||
# self: TieredDatastore,
|
||||
# query: ...): Future[?!(?...)] {.async.} =
|
||||
|
||||
130
tests/datastore/modifycommontests.nim
Normal file
130
tests/datastore/modifycommontests.nim
Normal file
@ -0,0 +1,130 @@
|
||||
import std/options
|
||||
import std/sugar
|
||||
import std/random
|
||||
import std/sequtils
|
||||
import std/strutils
|
||||
|
||||
import pkg/asynctest
|
||||
import pkg/chronos
|
||||
import pkg/stew/endians2
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
|
||||
import pkg/datastore
|
||||
|
||||
proc modifyTests*(
|
||||
ds: Datastore,
|
||||
key: Key,
|
||||
multiAux: bool = false) =
|
||||
|
||||
randomize()
|
||||
|
||||
let
|
||||
processCount = 100
|
||||
timeout = (1 + processCount div 10).seconds
|
||||
|
||||
proc withRandDelay(op: Future[?!void]): Future[void] {.async: (raises: [Exception]).} =
|
||||
await sleepAsync(rand(processCount).millis)
|
||||
|
||||
let errMsg = (await op).errorOption.map((err) => err.msg)
|
||||
|
||||
require none(string) == errMsg
|
||||
|
||||
proc incAsyncFn(maybeBytes: ?seq[byte]): Future[?seq[byte]] {.async.} =
|
||||
await sleepAsync(2.millis) # allows interleaving
|
||||
if bytes =? maybeBytes:
|
||||
let value = uint64.fromBytes(bytes)
|
||||
return some(@((value + 1).toBytes))
|
||||
else:
|
||||
return seq[byte].none
|
||||
|
||||
test "unsafe increment - demo":
|
||||
# this test demonstrates non synchronized read-modify-write sequence
|
||||
(await ds.put(key, @(0.uint64.toBytes))).tryGet
|
||||
|
||||
proc getIncAndPut(): Future[?!void] {.async.} =
|
||||
without bytes =? (await ds.get(key)), err:
|
||||
return failure(err)
|
||||
|
||||
let value = uint64.fromBytes(bytes)
|
||||
await sleepAsync(2.millis) # allows interleaving
|
||||
|
||||
if err =? (await ds.put(key, @((value + 1).toBytes))).errorOption:
|
||||
return failure(err)
|
||||
else:
|
||||
return success()
|
||||
|
||||
let futs = newSeqWith(processCount, withRandDelay(getIncAndPut()))
|
||||
await allFutures(futs).wait(timeout)
|
||||
|
||||
let finalValue = uint64.fromBytes((await ds.get(key)).tryGet)
|
||||
|
||||
check finalValue.int < processCount
|
||||
|
||||
test "safe increment":
|
||||
(await ds.put(key, @(0.uint64.toBytes))).tryGet
|
||||
|
||||
let futs = newSeqWith(processCount, withRandDelay(ds.modify(key, incAsyncFn)))
|
||||
await allFutures(futs).wait(timeout)
|
||||
|
||||
let finalValue = uint64.fromBytes((await ds.get(key)).tryGet)
|
||||
|
||||
check finalValue.int == processCount
|
||||
|
||||
test "should update value":
|
||||
(await ds.put(key, @((0.uint64).toBytes))).tryGet
|
||||
|
||||
(await ds.modify(key, incAsyncFn)).tryGet
|
||||
|
||||
let finalValue = uint64.fromBytes((await ds.get(key)).tryGet)
|
||||
|
||||
check finalValue.int == 1
|
||||
|
||||
test "should put value":
|
||||
(await ds.delete(key)).tryGet()
|
||||
|
||||
proc returningSomeValue(_: ?seq[byte]): Future[?seq[byte]] {.async.} =
|
||||
return @(123.uint64.toBytes).some
|
||||
|
||||
(await ds.modify(key, returningSomeValue)).tryGet
|
||||
|
||||
let finalValue = uint64.fromBytes((await ds.get(key)).tryGet)
|
||||
|
||||
check finalValue.int == 123
|
||||
|
||||
test "should delete value":
|
||||
(await ds.put(key, @(0.uint64.toBytes))).tryGet
|
||||
|
||||
proc returningNone(_: ?seq[byte]): Future[?seq[byte]] {.async.} =
|
||||
return seq[byte].none
|
||||
|
||||
(await ds.modify(key, returningNone)).tryGet
|
||||
|
||||
let hasKey = (await ds.has(key)).tryGet
|
||||
|
||||
check not hasKey
|
||||
|
||||
test "should return correct auxillary value":
|
||||
proc returningAux(_: ?seq[byte]): Future[(?seq[byte], seq[byte])] {.async.} =
|
||||
return (seq[byte].none, @[byte 123])
|
||||
|
||||
let res = await ds.modifyGet(key, returningAux)
|
||||
|
||||
if multiAux:
|
||||
check:
|
||||
res.errorOption.map((err) => err.msg) == none(string)
|
||||
for b in res |? @[]:
|
||||
check:
|
||||
b == 123.byte
|
||||
else:
|
||||
check:
|
||||
res == success(@[byte 123])
|
||||
|
||||
test "should propagate exception as failure":
|
||||
proc throwing(a: ?seq[byte]): Future[?seq[byte]] {.async.} =
|
||||
raise newException(CatchableError, "some error msg")
|
||||
|
||||
let res = await ds.modify(key, throwing)
|
||||
|
||||
check:
|
||||
res.errorOption.map((err) => err.msg) == some("some error msg")
|
||||
@ -11,6 +11,7 @@ import pkg/stew/byteutils
|
||||
import pkg/datastore/sql/sqliteds
|
||||
|
||||
import ../dscommontests
|
||||
import ../modifycommontests
|
||||
import ../querycommontests
|
||||
|
||||
suite "Test Basic SQLiteDatastore":
|
||||
@ -24,6 +25,7 @@ suite "Test Basic SQLiteDatastore":
|
||||
(await ds.close()).tryGet()
|
||||
|
||||
basicStoreTests(ds, key, bytes, otherBytes)
|
||||
modifyTests(ds, key)
|
||||
|
||||
suite "Test Read Only SQLiteDatastore":
|
||||
let
|
||||
|
||||
@ -106,9 +106,9 @@ suite "Test SQLite Datastore DB operations":
|
||||
|
||||
test "Should insert key":
|
||||
check:
|
||||
readOnlyDb.putStmt.exec((key.id, data, timestamp())).isErr()
|
||||
readOnlyDb.putStmt.exec((key.id, data, initVersion, timestamp())).isErr()
|
||||
|
||||
dsDb.putStmt.exec((key.id, data, timestamp())).tryGet()
|
||||
dsDb.putStmt.exec((key.id, data, initVersion, timestamp())).tryGet()
|
||||
|
||||
test "Should select key":
|
||||
let
|
||||
@ -124,9 +124,9 @@ suite "Test SQLite Datastore DB operations":
|
||||
|
||||
test "Should update key":
|
||||
check:
|
||||
readOnlyDb.putStmt.exec((key.id, otherData, timestamp())).isErr()
|
||||
readOnlyDb.putStmt.exec((key.id, otherData, initVersion, timestamp())).isErr()
|
||||
|
||||
dsDb.putStmt.exec((key.id, otherData, timestamp())).tryGet()
|
||||
dsDb.putStmt.exec((key.id, otherData, initVersion, timestamp())).tryGet()
|
||||
|
||||
test "Should select updated key":
|
||||
let
|
||||
|
||||
@ -11,6 +11,7 @@ import pkg/stew/byteutils
|
||||
import pkg/datastore/fsds
|
||||
|
||||
import ./dscommontests
|
||||
import ./modifycommontests
|
||||
import ./querycommontests
|
||||
|
||||
suite "Test Basic FSDatastore":
|
||||
@ -37,6 +38,7 @@ suite "Test Basic FSDatastore":
|
||||
require(not dirExists(basePathAbs))
|
||||
|
||||
basicStoreTests(fsStore, key, bytes, otherBytes)
|
||||
modifyTests(fsStore, key)
|
||||
|
||||
suite "Test Misc FSDatastore":
|
||||
let
|
||||
|
||||
@ -12,6 +12,7 @@ import pkg/datastore/sql
|
||||
import pkg/datastore/fsds
|
||||
|
||||
import ./dscommontests
|
||||
import ./modifycommontests
|
||||
|
||||
suite "Test Basic Mounted Datastore":
|
||||
let
|
||||
@ -50,10 +51,12 @@ suite "Test Basic Mounted Datastore":
|
||||
suite "Mounted sql":
|
||||
let namespace = Key.init(sqlKey, key).tryGet
|
||||
basicStoreTests(mountedDs, namespace, bytes, otherBytes)
|
||||
modifyTests(mountedDs, namespace)
|
||||
|
||||
suite "Mounted fs":
|
||||
let namespace = Key.init(fsKey, key).tryGet
|
||||
basicStoreTests(mountedDs, namespace, bytes, otherBytes)
|
||||
modifyTests(mountedDs, namespace)
|
||||
|
||||
suite "Test Mounted Datastore":
|
||||
|
||||
|
||||
@ -10,6 +10,7 @@ import pkg/datastore/fsds
|
||||
import pkg/datastore/sql
|
||||
import pkg/datastore/tieredds
|
||||
|
||||
import ./modifycommontests
|
||||
import ./dscommontests
|
||||
|
||||
suite "Test Basic Tired Datastore":
|
||||
@ -40,6 +41,7 @@ suite "Test Basic Tired Datastore":
|
||||
require(not dirExists(rootAbs))
|
||||
|
||||
basicStoreTests(tiredDs, key, bytes, otherBytes)
|
||||
modifyTests(tiredDs, key, multiAux = true)
|
||||
|
||||
suite "TieredDatastore":
|
||||
# assumes tests/test_all is run from project root, e.g. with `nimble test`
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user