mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-02 13:43:11 +00:00
split out and cleanup sql lite store
This commit is contained in:
parent
acd77c5385
commit
13bc02f595
156
datastore/sql/sqliteds.nim
Normal file
156
datastore/sql/sqliteds.nim
Normal file
@ -0,0 +1,156 @@
|
||||
import std/os
|
||||
import std/times
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/sqlite3_abi
|
||||
import pkg/stew/byteutils
|
||||
from pkg/stew/results as stewResults import isErr
|
||||
import pkg/upraises
|
||||
|
||||
import ./datastore
|
||||
import ./sqlitedsdb
|
||||
|
||||
export datastore, sqlitedsdb
|
||||
|
||||
push: {.upraises: [].}
|
||||
|
||||
type
|
||||
SQLiteDatastore* = ref object of Datastore
|
||||
readOnly: bool
|
||||
db: SQLiteDsDB
|
||||
|
||||
proc path*(self: SQLiteDatastore): string =
|
||||
self.db.dbPath
|
||||
|
||||
proc `readOnly=`*(self: SQLiteDatastore): bool
|
||||
{.error: "readOnly should not be assigned".}
|
||||
|
||||
proc timestamp*(t = epochTime()): int64 =
|
||||
(t * 1_000_000).int64
|
||||
|
||||
method contains*(self: SQLiteDatastore, key: Key): Future[?!bool] {.async.} =
|
||||
var
|
||||
exists = false
|
||||
|
||||
proc onData(s: RawStmtPtr) =
|
||||
exists = sqlite3_column_int64(s, ContainsStmtExistsCol.cint).bool
|
||||
|
||||
let
|
||||
queryRes = self.db.containsStmt.query((key.id), onData)
|
||||
|
||||
if queryRes.isErr: return queryRes
|
||||
return success exists
|
||||
|
||||
method delete*(self: SQLiteDatastore, key: Key): Future[?!void] {.async.} =
|
||||
return self.db.deleteStmt.exec((key.id))
|
||||
|
||||
method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} =
|
||||
# see comment in ./filesystem_datastore re: finer control of memory
|
||||
# allocation in `method get`, could apply here as well if bytes were read
|
||||
# incrementally with `sqlite3_blob_read`
|
||||
|
||||
var
|
||||
bytes: seq[byte]
|
||||
|
||||
let
|
||||
dataCol = self.db.getDataCol
|
||||
|
||||
proc onData(s: RawStmtPtr) =
|
||||
bytes = dataCol()
|
||||
|
||||
let
|
||||
queryRes = self.db.getStmt.query((key.id), onData)
|
||||
|
||||
if queryRes.isErr:
|
||||
return failure queryRes.error.msg
|
||||
|
||||
return success bytes
|
||||
|
||||
method put*(self: SQLiteDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} =
|
||||
return self.db.putStmt.exec((key.id, @data, timestamp()))
|
||||
|
||||
iterator query*(
|
||||
self: SQLiteDatastore,
|
||||
query: Query): Future[QueryResponse] =
|
||||
|
||||
let
|
||||
queryStmt = QueryStmt.prepare(
|
||||
self.db.env, QueryStmtStr).expect("should not fail")
|
||||
|
||||
s = RawStmtPtr(queryStmt)
|
||||
|
||||
defer:
|
||||
discard sqlite3_reset(s)
|
||||
discard sqlite3_clear_bindings(s)
|
||||
s.dispose
|
||||
|
||||
let
|
||||
v = sqlite3_bind_text(s, 1.cint, query.key.id.cstring, -1.cint,
|
||||
SQLITE_TRANSIENT_GCSAFE)
|
||||
|
||||
if not (v == SQLITE_OK):
|
||||
raise (ref Defect)(msg: $sqlite3_errstr(v))
|
||||
|
||||
while true:
|
||||
let
|
||||
v = sqlite3_step(s)
|
||||
|
||||
case v
|
||||
of SQLITE_ROW:
|
||||
let
|
||||
key = Key.init($sqlite3_column_text_not_null(
|
||||
s, QueryStmtIdCol)).expect("should not fail")
|
||||
|
||||
blob = sqlite3_column_blob(s, QueryStmtDataCol)
|
||||
|
||||
# detect out-of-memory error
|
||||
# see the conversion table and final paragraph of:
|
||||
# https://www.sqlite.org/c3ref/column_blob.html
|
||||
# see also https://www.sqlite.org/rescode.html
|
||||
|
||||
# the "data" column can be NULL so in order to detect an out-of-memory
|
||||
# error it is necessary to check that the result is a null pointer and
|
||||
# that the result code is an error code
|
||||
if blob.isNil:
|
||||
let
|
||||
v = sqlite3_errcode(sqlite3_db_handle(s))
|
||||
|
||||
if not (v in [SQLITE_OK, SQLITE_ROW, SQLITE_DONE]):
|
||||
raise (ref Defect)(msg: $sqlite3_errstr(v))
|
||||
|
||||
let
|
||||
dataLen = sqlite3_column_bytes(s, QueryStmtDataCol)
|
||||
dataBytes = cast[ptr UncheckedArray[byte]](blob)
|
||||
data = @(toOpenArray(dataBytes, 0, dataLen - 1))
|
||||
fut = newFuture[QueryResponse]()
|
||||
|
||||
fut.complete((key, data))
|
||||
yield fut
|
||||
of SQLITE_DONE:
|
||||
break
|
||||
else:
|
||||
raise (ref Defect)(msg: $sqlite3_errstr(v))
|
||||
|
||||
proc new*(
|
||||
T: type SQLiteDatastore,
|
||||
path: string,
|
||||
readOnly = false): ?!T =
|
||||
|
||||
let
|
||||
flags =
|
||||
if readOnly: SQLITE_OPEN_READONLY
|
||||
else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE
|
||||
|
||||
success T(
|
||||
db: ? SQLIteDsDb.open(path, flags),
|
||||
readOnly: readOnly)
|
||||
|
||||
proc new*(
|
||||
T: type SQLiteDatastore,
|
||||
db: SQLIteDsDb): ?!T =
|
||||
|
||||
success T(
|
||||
db: db,
|
||||
readOnly: db.readOnly)
|
||||
264
datastore/sql/sqlitedsdb.nim
Normal file
264
datastore/sql/sqlitedsdb.nim
Normal file
@ -0,0 +1,264 @@
|
||||
import std/os
|
||||
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/upraises
|
||||
|
||||
import ./sqliteutils
|
||||
|
||||
export sqliteutils
|
||||
|
||||
type
|
||||
BoundIdCol* = proc (): string {.closure, gcsafe, upraises: [].}
|
||||
|
||||
BoundDataCol* = proc (): seq[byte] {.closure, gcsafe, upraises: [].}
|
||||
|
||||
BoundTimestampCol* = proc (): int64 {.closure, gcsafe, upraises: [].}
|
||||
|
||||
# feels odd to use `void` for prepared statements corresponding to SELECT
|
||||
# queries but it fits with the rest of the SQLite wrapper adapted from
|
||||
# status-im/nwaku, at least in its current form in ./sqlite
|
||||
ContainsStmt* = SQLiteStmt[(string), void]
|
||||
|
||||
DeleteStmt* = SQLiteStmt[(string), void]
|
||||
|
||||
GetStmt* = SQLiteStmt[(string), void]
|
||||
|
||||
PutStmt* = SQLiteStmt[(string, seq[byte], int64), void]
|
||||
|
||||
QueryStmt* = SQLiteStmt[(string), void]
|
||||
|
||||
SQLiteDsDb* = object
|
||||
readOnly*: bool
|
||||
dbPath*: string
|
||||
containsStmt*: ContainsStmt
|
||||
deleteStmt*: DeleteStmt
|
||||
env*: SQLite
|
||||
getDataCol*: BoundDataCol
|
||||
getStmt*: GetStmt
|
||||
putStmt*: PutStmt
|
||||
|
||||
const
|
||||
DbExt* = ".sqlite3"
|
||||
TableName* = "Store"
|
||||
|
||||
IdColName* = "id"
|
||||
DataColName* = "data"
|
||||
TimestampColName* = "timestamp"
|
||||
|
||||
IdColType = "TEXT"
|
||||
DataColType = "BLOB"
|
||||
TimestampColType = "INTEGER"
|
||||
|
||||
Memory* = ":memory:"
|
||||
|
||||
# https://stackoverflow.com/a/9756276
|
||||
# EXISTS returns a boolean value represented by an integer:
|
||||
# https://sqlite.org/datatype3.html#boolean_datatype
|
||||
# https://sqlite.org/lang_expr.html#the_exists_operator
|
||||
ContainsStmtStr* = """
|
||||
SELECT EXISTS(
|
||||
SELECT 1 FROM """ & TableName & """
|
||||
WHERE """ & IdColName & """ = ?
|
||||
);
|
||||
"""
|
||||
|
||||
ContainsStmtExistsCol* = 0
|
||||
|
||||
CreateStmtStr* = """
|
||||
CREATE TABLE IF NOT EXISTS """ & TableName & """ (
|
||||
""" & IdColName & """ """ & IdColType & """ NOT NULL PRIMARY KEY,
|
||||
""" & DataColName & """ """ & DataColType & """,
|
||||
""" & TimestampColName & """ """ & TimestampColType & """ NOT NULL
|
||||
) WITHOUT ROWID;
|
||||
"""
|
||||
|
||||
DeleteStmtStr* = """
|
||||
DELETE FROM """ & TableName & """
|
||||
WHERE """ & IdColName & """ = ?;
|
||||
"""
|
||||
|
||||
GetStmtStr* = """
|
||||
SELECT """ & DataColName & """ FROM """ & TableName & """
|
||||
WHERE """ & IdColName & """ = ?;
|
||||
"""
|
||||
|
||||
GetStmtDataCol* = 0
|
||||
|
||||
PutStmtStr* = """
|
||||
REPLACE INTO """ & TableName & """ (
|
||||
""" & IdColName & """,
|
||||
""" & DataColName & """,
|
||||
""" & TimestampColName & """
|
||||
) VALUES (?, ?, ?);
|
||||
"""
|
||||
|
||||
QueryStmtStr* = """
|
||||
SELECT """ & IdColName & """, """ & DataColName & """ FROM """ & TableName &
|
||||
""" WHERE """ & IdColName & """ GLOB ?;
|
||||
"""
|
||||
|
||||
QueryStmtIdCol* = 0
|
||||
QueryStmtDataCol* = 1
|
||||
|
||||
proc checkColMetadata(s: RawStmtPtr, i: int, expectedName: string) =
|
||||
let
|
||||
colName = sqlite3_column_origin_name(s, i.cint)
|
||||
|
||||
if colName.isNil:
|
||||
raise (ref Defect)(msg: "no column exists for index " & $i & " in `" &
|
||||
$sqlite3_sql(s) & "`")
|
||||
|
||||
if $colName != expectedName:
|
||||
raise (ref Defect)(msg: "original column name for index " & $i & " was \"" &
|
||||
$colName & "\" in `" & $sqlite3_sql(s) & "` but callee expected \"" &
|
||||
expectedName & "\"")
|
||||
|
||||
proc idCol*(
|
||||
s: RawStmtPtr,
|
||||
index: int): BoundIdCol =
|
||||
|
||||
checkColMetadata(s, index, IdColName)
|
||||
|
||||
return proc (): string =
|
||||
$sqlite3_column_text_not_null(s, index.cint)
|
||||
|
||||
proc dataCol*(
|
||||
s: RawStmtPtr,
|
||||
index: int): BoundDataCol =
|
||||
|
||||
checkColMetadata(s, index, DataColName)
|
||||
|
||||
return proc (): seq[byte] =
|
||||
let
|
||||
i = index.cint
|
||||
blob = sqlite3_column_blob(s, i)
|
||||
|
||||
# detect out-of-memory error
|
||||
# see the conversion table and final paragraph of:
|
||||
# https://www.sqlite.org/c3ref/column_blob.html
|
||||
# see also https://www.sqlite.org/rescode.html
|
||||
|
||||
# the "data" column can be NULL so in order to detect an out-of-memory error
|
||||
# it is necessary to check that the result is a null pointer and that the
|
||||
# result code is an error code
|
||||
if blob.isNil:
|
||||
let
|
||||
v = sqlite3_errcode(sqlite3_db_handle(s))
|
||||
|
||||
if not (v in [SQLITE_OK, SQLITE_ROW, SQLITE_DONE]):
|
||||
raise (ref Defect)(msg: $sqlite3_errstr(v))
|
||||
|
||||
let
|
||||
dataLen = sqlite3_column_bytes(s, i)
|
||||
dataBytes = cast[ptr UncheckedArray[byte]](blob)
|
||||
|
||||
@(toOpenArray(dataBytes, 0, dataLen - 1))
|
||||
|
||||
proc timestampCol*(
|
||||
s: RawStmtPtr,
|
||||
index: int): BoundTimestampCol =
|
||||
|
||||
checkColMetadata(s, index, TimestampColName)
|
||||
|
||||
return proc (): int64 =
|
||||
sqlite3_column_int64(s, index.cint)
|
||||
|
||||
proc getDBFilePath*(path: string): ?!string =
|
||||
try:
|
||||
let
|
||||
(parent, name, ext) = path.normalizePathEnd.splitFile
|
||||
dbExt = if ext == "": DbExt else: ext
|
||||
absPath =
|
||||
if parent.isAbsolute: parent
|
||||
else: getCurrentDir() / parent
|
||||
dbPath = absPath / name & dbExt
|
||||
|
||||
return success dbPath
|
||||
except CatchableError as exc:
|
||||
return failure(exc.msg)
|
||||
|
||||
proc close*(self: SQLiteDsDb) =
|
||||
self.containsStmt.dispose
|
||||
self.getStmt.dispose
|
||||
|
||||
if not RawStmtPtr(self.deleteStmt).isNil:
|
||||
self.deleteStmt.dispose
|
||||
|
||||
if not RawStmtPtr(self.putStmt).isNil:
|
||||
self.putStmt.dispose
|
||||
|
||||
self.env.dispose
|
||||
|
||||
proc open*(
|
||||
T: type SQLiteDsDb,
|
||||
path = Memory,
|
||||
flags = SQLITE_OPEN_READONLY): ?!SQLiteDsDb =
|
||||
|
||||
# make it optional to enable WAL with it enabled being the default?
|
||||
|
||||
# make it possible to specify a custom page size?
|
||||
# https://www.sqlite.org/pragma.html#pragma_page_size
|
||||
# https://www.sqlite.org/intern-v-extern-blob.html
|
||||
|
||||
var
|
||||
env: AutoDisposed[SQLite]
|
||||
|
||||
defer:
|
||||
disposeIfUnreleased(env)
|
||||
|
||||
let
|
||||
isMemory = path == Memory
|
||||
absPath = if isMemory: Memory else: ?path.getDBFilePath
|
||||
readOnly = (SQLITE_OPEN_READONLY and flags).bool
|
||||
|
||||
if not isMemory:
|
||||
if readOnly and not fileExists(absPath):
|
||||
return failure "read-only database does not exist: " & absPath
|
||||
elif not dirExists(absPath.parentDir):
|
||||
return failure "directory does not exist: " & absPath
|
||||
|
||||
open(absPath, env.val, flags)
|
||||
|
||||
let
|
||||
pragmaStmt = journalModePragmaStmt(env.val)
|
||||
|
||||
checkExec(pragmaStmt)
|
||||
|
||||
var
|
||||
containsStmt: ContainsStmt
|
||||
deleteStmt: DeleteStmt
|
||||
getStmt: GetStmt
|
||||
putStmt: PutStmt
|
||||
|
||||
if not readOnly:
|
||||
checkExec(env.val, CreateStmtStr)
|
||||
|
||||
deleteStmt = ? DeleteStmt.prepare(
|
||||
env.val, DeleteStmtStr, SQLITE_PREPARE_PERSISTENT)
|
||||
|
||||
putStmt = ? PutStmt.prepare(
|
||||
env.val, PutStmtStr, SQLITE_PREPARE_PERSISTENT)
|
||||
|
||||
containsStmt = ? ContainsStmt.prepare(
|
||||
env.val, ContainsStmtStr, SQLITE_PREPARE_PERSISTENT)
|
||||
|
||||
getStmt = ? GetStmt.prepare(
|
||||
env.val, GetStmtStr, SQLITE_PREPARE_PERSISTENT)
|
||||
|
||||
# if a readOnly/existing database does not satisfy the expected schema
|
||||
# `pepare()` will fail and `new` will return an error with message
|
||||
# "SQL logic error"
|
||||
|
||||
let
|
||||
getDataCol = dataCol(RawStmtPtr(getStmt), GetStmtDataCol)
|
||||
|
||||
success T(
|
||||
readOnly: readOnly,
|
||||
dbPath: path,
|
||||
containsStmt: containsStmt,
|
||||
deleteStmt: deleteStmt,
|
||||
env: env.release,
|
||||
getStmt: getStmt,
|
||||
getDataCol: getDataCol,
|
||||
putStmt: putStmt)
|
||||
@ -3,6 +3,8 @@ import pkg/questionable/results
|
||||
import pkg/sqlite3_abi
|
||||
import pkg/upraises
|
||||
|
||||
export sqlite3_abi
|
||||
|
||||
# Adapted from:
|
||||
# https://github.com/status-im/nwaku/blob/master/waku/v2/node/storage/sqlite.nim
|
||||
|
||||
@ -74,9 +76,7 @@ template bindParams(
|
||||
|
||||
when params is tuple:
|
||||
when params isnot NoParams:
|
||||
var
|
||||
i = 1
|
||||
|
||||
var i = 1
|
||||
for param in fields(params):
|
||||
checkErr bindParam(s, i, param)
|
||||
inc i
|
||||
@ -1,413 +0,0 @@
|
||||
import std/os
|
||||
import std/times
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/sqlite3_abi
|
||||
import pkg/stew/byteutils
|
||||
from pkg/stew/results as stewResults import isErr
|
||||
import pkg/upraises
|
||||
|
||||
import ./datastore
|
||||
import ./sqlite
|
||||
|
||||
export datastore, sqlite
|
||||
|
||||
push: {.upraises: [].}
|
||||
|
||||
type
|
||||
BoundIdCol = proc (): string {.closure, gcsafe, upraises: [].}
|
||||
|
||||
BoundDataCol = proc (): seq[byte] {.closure, gcsafe, upraises: [].}
|
||||
|
||||
BoundTimestampCol = proc (): int64 {.closure, gcsafe, upraises: [].}
|
||||
|
||||
# feels odd to use `void` for prepared statements corresponding to SELECT
|
||||
# queries but it fits with the rest of the SQLite wrapper adapted from
|
||||
# status-im/nwaku, at least in its current form in ./sqlite
|
||||
ContainsStmt = SQLiteStmt[(string), void]
|
||||
|
||||
DeleteStmt = SQLiteStmt[(string), void]
|
||||
|
||||
GetStmt = SQLiteStmt[(string), void]
|
||||
|
||||
PutStmt = SQLiteStmt[(string, seq[byte], int64), void]
|
||||
|
||||
QueryStmt = SQLiteStmt[(string), void]
|
||||
|
||||
SQLiteDatastore* = ref object of Datastore
|
||||
dbPath: string
|
||||
containsStmt: ContainsStmt
|
||||
deleteStmt: DeleteStmt
|
||||
env: SQLite
|
||||
getDataCol: BoundDataCol
|
||||
getStmt: GetStmt
|
||||
putStmt: PutStmt
|
||||
readOnly: bool
|
||||
|
||||
const
|
||||
dbExt* = ".sqlite3"
|
||||
tableName* = "Store"
|
||||
|
||||
idColName* = "id"
|
||||
dataColName* = "data"
|
||||
timestampColName* = "timestamp"
|
||||
|
||||
idColType = "TEXT"
|
||||
dataColType = "BLOB"
|
||||
timestampColType = "INTEGER"
|
||||
|
||||
memory* = ":memory:"
|
||||
|
||||
# https://stackoverflow.com/a/9756276
|
||||
# EXISTS returns a boolean value represented by an integer:
|
||||
# https://sqlite.org/datatype3.html#boolean_datatype
|
||||
# https://sqlite.org/lang_expr.html#the_exists_operator
|
||||
containsStmtStr = """
|
||||
SELECT EXISTS(
|
||||
SELECT 1 FROM """ & tableName & """
|
||||
WHERE """ & idColName & """ = ?
|
||||
);
|
||||
"""
|
||||
|
||||
containsStmtExistsCol = 0
|
||||
|
||||
createStmtStr = """
|
||||
CREATE TABLE IF NOT EXISTS """ & tableName & """ (
|
||||
""" & idColName & """ """ & idColType & """ NOT NULL PRIMARY KEY,
|
||||
""" & dataColName & """ """ & dataColType & """,
|
||||
""" & timestampColName & """ """ & timestampColType & """ NOT NULL
|
||||
) WITHOUT ROWID;
|
||||
"""
|
||||
|
||||
deleteStmtStr = """
|
||||
DELETE FROM """ & tableName & """
|
||||
WHERE """ & idColName & """ = ?;
|
||||
"""
|
||||
|
||||
getStmtStr = """
|
||||
SELECT """ & dataColName & """ FROM """ & tableName & """
|
||||
WHERE """ & idColName & """ = ?;
|
||||
"""
|
||||
|
||||
getStmtDataCol = 0
|
||||
|
||||
putStmtStr = """
|
||||
REPLACE INTO """ & tableName & """ (
|
||||
""" & idColName & """,
|
||||
""" & dataColName & """,
|
||||
""" & timestampColName & """
|
||||
) VALUES (?, ?, ?);
|
||||
"""
|
||||
|
||||
queryStmtStr = """
|
||||
SELECT """ & idColName & """, """ & dataColName & """ FROM """ & tableName &
|
||||
""" WHERE """ & idColName & """ GLOB ?;
|
||||
"""
|
||||
|
||||
queryStmtIdCol = 0
|
||||
queryStmtDataCol = 1
|
||||
|
||||
proc checkColMetadata(s: RawStmtPtr, i: int, expectedName: string) =
|
||||
let
|
||||
colName = sqlite3_column_origin_name(s, i.cint)
|
||||
|
||||
if colName.isNil:
|
||||
raise (ref Defect)(msg: "no column exists for index " & $i & " in `" &
|
||||
$sqlite3_sql(s) & "`")
|
||||
|
||||
if $colName != expectedName:
|
||||
raise (ref Defect)(msg: "original column name for index " & $i & " was \"" &
|
||||
$colName & "\" in `" & $sqlite3_sql(s) & "` but callee expected \"" &
|
||||
expectedName & "\"")
|
||||
|
||||
proc idCol*(
|
||||
s: RawStmtPtr,
|
||||
index: int): BoundIdCol =
|
||||
|
||||
checkColMetadata(s, index, idColName)
|
||||
|
||||
return proc (): string =
|
||||
$sqlite3_column_text_not_null(s, index.cint)
|
||||
|
||||
proc dataCol*(
|
||||
s: RawStmtPtr,
|
||||
index: int): BoundDataCol =
|
||||
|
||||
checkColMetadata(s, index, dataColName)
|
||||
|
||||
return proc (): seq[byte] =
|
||||
let
|
||||
i = index.cint
|
||||
blob = sqlite3_column_blob(s, i)
|
||||
|
||||
# detect out-of-memory error
|
||||
# see the conversion table and final paragraph of:
|
||||
# https://www.sqlite.org/c3ref/column_blob.html
|
||||
# see also https://www.sqlite.org/rescode.html
|
||||
|
||||
# the "data" column can be NULL so in order to detect an out-of-memory error
|
||||
# it is necessary to check that the result is a null pointer and that the
|
||||
# result code is an error code
|
||||
if blob.isNil:
|
||||
let
|
||||
v = sqlite3_errcode(sqlite3_db_handle(s))
|
||||
|
||||
if not (v in [SQLITE_OK, SQLITE_ROW, SQLITE_DONE]):
|
||||
raise (ref Defect)(msg: $sqlite3_errstr(v))
|
||||
|
||||
let
|
||||
dataLen = sqlite3_column_bytes(s, i)
|
||||
dataBytes = cast[ptr UncheckedArray[byte]](blob)
|
||||
|
||||
@(toOpenArray(dataBytes, 0, dataLen - 1))
|
||||
|
||||
proc timestampCol*(
|
||||
s: RawStmtPtr,
|
||||
index: int): BoundTimestampCol =
|
||||
|
||||
checkColMetadata(s, index, timestampColName)
|
||||
|
||||
return proc (): int64 =
|
||||
sqlite3_column_int64(s, index.cint)
|
||||
|
||||
proc new*(
|
||||
T: type SQLiteDatastore,
|
||||
basePath: string,
|
||||
filename = "store" & dbExt,
|
||||
readOnly = false): ?!T =
|
||||
|
||||
# make it optional to enable WAL with it enabled being the default?
|
||||
|
||||
# make it possible to specify a custom page size?
|
||||
# https://www.sqlite.org/pragma.html#pragma_page_size
|
||||
# https://www.sqlite.org/intern-v-extern-blob.html
|
||||
|
||||
var
|
||||
env: AutoDisposed[SQLite]
|
||||
|
||||
defer: disposeIfUnreleased(env)
|
||||
|
||||
var
|
||||
basep, fname, dbPath: string
|
||||
|
||||
if basePath == memory:
|
||||
if readOnly:
|
||||
return failure "SQLiteDatastore cannot be read-only and in-memory"
|
||||
else:
|
||||
dbPath = memory
|
||||
else:
|
||||
try:
|
||||
basep = normalizePathEnd(
|
||||
if basePath.isAbsolute: basePath
|
||||
else: getCurrentDir() / basePath)
|
||||
|
||||
fname = filename.normalizePathEnd
|
||||
dbPath = basep / fname
|
||||
|
||||
if readOnly and not fileExists(dbPath):
|
||||
return failure "read-only database does not exist: " & dbPath
|
||||
elif not dirExists(basep):
|
||||
return failure "directory does not exist: " & basep
|
||||
|
||||
except IOError as e:
|
||||
return failure e
|
||||
|
||||
except OSError as e:
|
||||
return failure e
|
||||
|
||||
let
|
||||
flags =
|
||||
if readOnly: SQLITE_OPEN_READONLY
|
||||
else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE
|
||||
|
||||
open(dbPath, env.val, flags)
|
||||
|
||||
let
|
||||
pragmaStmt = journalModePragmaStmt(env.val)
|
||||
|
||||
checkExec(pragmaStmt)
|
||||
|
||||
var
|
||||
containsStmt: ContainsStmt
|
||||
deleteStmt: DeleteStmt
|
||||
getStmt: GetStmt
|
||||
putStmt: PutStmt
|
||||
|
||||
if not readOnly:
|
||||
checkExec(env.val, createStmtStr)
|
||||
|
||||
deleteStmt = ? DeleteStmt.prepare(
|
||||
env.val, deleteStmtStr, SQLITE_PREPARE_PERSISTENT)
|
||||
|
||||
putStmt = ? PutStmt.prepare(
|
||||
env.val, putStmtStr, SQLITE_PREPARE_PERSISTENT)
|
||||
|
||||
containsStmt = ? ContainsStmt.prepare(
|
||||
env.val, containsStmtStr, SQLITE_PREPARE_PERSISTENT)
|
||||
|
||||
getStmt = ? GetStmt.prepare(
|
||||
env.val, getStmtStr, SQLITE_PREPARE_PERSISTENT)
|
||||
|
||||
# if a readOnly/existing database does not satisfy the expected schema
|
||||
# `pepare()` will fail and `new` will return an error with message
|
||||
# "SQL logic error"
|
||||
|
||||
let
|
||||
getDataCol = dataCol(RawStmtPtr(getStmt), getStmtDataCol)
|
||||
|
||||
success T(dbPath: dbPath, containsStmt: containsStmt, deleteStmt: deleteStmt,
|
||||
env: env.release, getStmt: getStmt, getDataCol: getDataCol,
|
||||
putStmt: putStmt, readOnly: readOnly)
|
||||
|
||||
proc dbPath*(self: SQLiteDatastore): string =
|
||||
self.dbPath
|
||||
|
||||
proc env*(self: SQLiteDatastore): SQLite =
|
||||
self.env
|
||||
|
||||
proc close*(self: SQLiteDatastore) =
|
||||
self.containsStmt.dispose
|
||||
self.getStmt.dispose
|
||||
|
||||
if not self.readOnly:
|
||||
self.deleteStmt.dispose
|
||||
self.putStmt.dispose
|
||||
|
||||
self.env.dispose
|
||||
self[] = SQLiteDatastore()[]
|
||||
|
||||
proc timestamp*(t = epochTime()): int64 =
|
||||
(t * 1_000_000).int64
|
||||
|
||||
method contains*(
|
||||
self: SQLiteDatastore,
|
||||
key: Key): Future[?!bool] {.async, locks: "unknown".} =
|
||||
|
||||
var
|
||||
exists = false
|
||||
|
||||
proc onData(s: RawStmtPtr) =
|
||||
exists = sqlite3_column_int64(s, containsStmtExistsCol.cint).bool
|
||||
|
||||
let
|
||||
queryRes = self.containsStmt.query((key.id), onData)
|
||||
|
||||
if queryRes.isErr: return queryRes
|
||||
|
||||
return success exists
|
||||
|
||||
method delete*(
|
||||
self: SQLiteDatastore,
|
||||
key: Key): Future[?!void] {.async, locks: "unknown".} =
|
||||
|
||||
if self.readOnly:
|
||||
return failure "database is read-only":
|
||||
else:
|
||||
return self.deleteStmt.exec((key.id))
|
||||
|
||||
method get*(
|
||||
self: SQLiteDatastore,
|
||||
key: Key): Future[?!(?seq[byte])] {.async, locks: "unknown".} =
|
||||
|
||||
# see comment in ./filesystem_datastore re: finer control of memory
|
||||
# allocation in `method get`, could apply here as well if bytes were read
|
||||
# incrementally with `sqlite3_blob_read`
|
||||
|
||||
var
|
||||
bytes: ?seq[byte]
|
||||
|
||||
let
|
||||
dataCol = self.getDataCol
|
||||
|
||||
proc onData(s: RawStmtPtr) =
|
||||
bytes = dataCol().some
|
||||
|
||||
let
|
||||
queryRes = self.getStmt.query((key.id), onData)
|
||||
|
||||
if queryRes.isErr:
|
||||
return failure queryRes.error.msg
|
||||
else:
|
||||
return success bytes
|
||||
|
||||
proc put*(
|
||||
self: SQLiteDatastore,
|
||||
key: Key,
|
||||
data: seq[byte],
|
||||
timestamp: int64): Future[?!void] {.async.} =
|
||||
|
||||
if self.readOnly:
|
||||
return failure "database is read-only"
|
||||
else:
|
||||
return self.putStmt.exec((key.id, @data, timestamp))
|
||||
|
||||
method put*(
|
||||
self: SQLiteDatastore,
|
||||
key: Key,
|
||||
data: seq[byte]): Future[?!void] {.async, locks: "unknown".} =
|
||||
|
||||
return await self.put(key, data, timestamp())
|
||||
|
||||
iterator query*(
|
||||
self: SQLiteDatastore,
|
||||
query: Query): Future[QueryResponse] =
|
||||
|
||||
let
|
||||
queryStmt = QueryStmt.prepare(
|
||||
self.env, queryStmtStr).expect("should not fail")
|
||||
|
||||
s = RawStmtPtr(queryStmt)
|
||||
|
||||
defer:
|
||||
discard sqlite3_reset(s)
|
||||
discard sqlite3_clear_bindings(s)
|
||||
s.dispose
|
||||
|
||||
let
|
||||
v = sqlite3_bind_text(s, 1.cint, query.key.id.cstring, -1.cint,
|
||||
SQLITE_TRANSIENT_GCSAFE)
|
||||
|
||||
if not (v == SQLITE_OK):
|
||||
raise (ref Defect)(msg: $sqlite3_errstr(v))
|
||||
|
||||
while true:
|
||||
let
|
||||
v = sqlite3_step(s)
|
||||
|
||||
case v
|
||||
of SQLITE_ROW:
|
||||
let
|
||||
key = Key.init($sqlite3_column_text_not_null(
|
||||
s, queryStmtIdCol)).expect("should not fail")
|
||||
|
||||
blob = sqlite3_column_blob(s, queryStmtDataCol)
|
||||
|
||||
# detect out-of-memory error
|
||||
# see the conversion table and final paragraph of:
|
||||
# https://www.sqlite.org/c3ref/column_blob.html
|
||||
# see also https://www.sqlite.org/rescode.html
|
||||
|
||||
# the "data" column can be NULL so in order to detect an out-of-memory
|
||||
# error it is necessary to check that the result is a null pointer and
|
||||
# that the result code is an error code
|
||||
if blob.isNil:
|
||||
let
|
||||
v = sqlite3_errcode(sqlite3_db_handle(s))
|
||||
|
||||
if not (v in [SQLITE_OK, SQLITE_ROW, SQLITE_DONE]):
|
||||
raise (ref Defect)(msg: $sqlite3_errstr(v))
|
||||
|
||||
let
|
||||
dataLen = sqlite3_column_bytes(s, queryStmtDataCol)
|
||||
dataBytes = cast[ptr UncheckedArray[byte]](blob)
|
||||
data = @(toOpenArray(dataBytes, 0, dataLen - 1))
|
||||
fut = newFuture[QueryResponse]()
|
||||
|
||||
fut.complete((key, data))
|
||||
yield fut
|
||||
of SQLITE_DONE:
|
||||
break
|
||||
else:
|
||||
raise (ref Defect)(msg: $sqlite3_errstr(v))
|
||||
Loading…
x
Reference in New Issue
Block a user