mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-05 03:25:04 +00:00
refactor/db-message-store-split (#280)
* fixes * fic * fix * fix * fix * fixed * fix * fixes * fixes, using sqlite lib fully * fix * Update sqlite.nim
This commit is contained in:
parent
54ac399f5f
commit
e875027be4
@ -4,12 +4,14 @@ import
|
||||
chronos, chronicles,
|
||||
../../waku/v2/node/message_store,
|
||||
./utils,
|
||||
../../waku/v2/waku_types
|
||||
../../waku/v2/waku_types,
|
||||
../../waku/v2/node/sqlite
|
||||
|
||||
suite "Message Store":
|
||||
test "set and get works":
|
||||
let
|
||||
store = MessageStore.init("", inMemory = true)[]
|
||||
database = SqliteDatabase.init("", inMemory = true)[]
|
||||
store = MessageStore.init(database)[]
|
||||
topic = ContentTopic(1)
|
||||
|
||||
var msgs = @[
|
||||
|
@ -9,7 +9,7 @@ import
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/protocols/pubsub/rpc/message,
|
||||
../../waku/v2/protocol/[waku_store, message_notifier],
|
||||
../../waku/v2/node/message_store,
|
||||
../../waku/v2/node/[message_store, sqlite],
|
||||
../test_helpers, ./utils,
|
||||
../../waku/v2/waku_types
|
||||
|
||||
@ -61,7 +61,8 @@ procSuite "Waku Store":
|
||||
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||
peer = PeerInfo.init(key)
|
||||
topic = ContentTopic(1)
|
||||
store = MessageStore.init("/foo", inMemory = true)[]
|
||||
database = SqliteDatabase.init("", inMemory = true)[]
|
||||
store = MessageStore.init(database)[]
|
||||
msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic)
|
||||
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic(2))
|
||||
|
||||
|
@ -7,9 +7,8 @@ import
|
||||
libp2p/protobuf/minprotobuf,
|
||||
libp2p/stream/connection,
|
||||
stew/results, metrics,
|
||||
../waku_types
|
||||
|
||||
{.push raises: [Defect].}
|
||||
../waku_types,
|
||||
./sqlite
|
||||
|
||||
# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
|
||||
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
|
||||
@ -17,137 +16,30 @@ import
|
||||
# Most of it is a direct copy, the only unique functions being `get` and `put`.
|
||||
|
||||
type
|
||||
RawStmtPtr = ptr sqlite3_stmt
|
||||
|
||||
AutoDisposed[T: ptr|ref] = object
|
||||
val: T
|
||||
|
||||
DataProc* = proc(timestamp: uint64, msg: WakuMessage) {.closure.}
|
||||
|
||||
template dispose(db: Sqlite) =
|
||||
discard sqlite3_close(db)
|
||||
|
||||
template dispose(db: RawStmtPtr) =
|
||||
discard sqlite3_finalize(db)
|
||||
|
||||
proc release[T](x: var AutoDisposed[T]): T =
|
||||
result = x.val
|
||||
x.val = nil
|
||||
|
||||
proc disposeIfUnreleased[T](x: var AutoDisposed[T]) =
|
||||
mixin dispose
|
||||
if x.val != nil:
|
||||
dispose(x.release)
|
||||
|
||||
template checkErr(op, cleanup: untyped) =
|
||||
if (let v = (op); v != SQLITE_OK):
|
||||
cleanup
|
||||
return err($sqlite3_errstr(v))
|
||||
|
||||
template checkErr(op) =
|
||||
checkErr(op): discard
|
||||
|
||||
proc init*(
|
||||
T: type MessageStore,
|
||||
basePath: string,
|
||||
name: string = "store",
|
||||
readOnly = false,
|
||||
inMemory = false): MessageStoreResult[T] =
|
||||
var env: AutoDisposed[ptr sqlite3]
|
||||
defer: disposeIfUnreleased(env)
|
||||
|
||||
let
|
||||
name =
|
||||
if inMemory: ":memory:"
|
||||
else: basepath / name & ".sqlite3"
|
||||
flags =
|
||||
if readOnly: SQLITE_OPEN_READONLY
|
||||
else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE
|
||||
|
||||
if not inMemory:
|
||||
try:
|
||||
createDir(basePath)
|
||||
except OSError, IOError:
|
||||
return err("`sqlite: cannot create database directory")
|
||||
|
||||
checkErr sqlite3_open_v2(name, addr env.val, flags.cint, nil)
|
||||
|
||||
template prepare(q: string, cleanup: untyped): ptr sqlite3_stmt =
|
||||
var s: ptr sqlite3_stmt
|
||||
checkErr sqlite3_prepare_v2(env.val, q, q.len.cint, addr s, nil):
|
||||
cleanup
|
||||
s
|
||||
|
||||
template checkExec(s: ptr sqlite3_stmt) =
|
||||
if (let x = sqlite3_step(s); x != SQLITE_DONE):
|
||||
discard sqlite3_finalize(s)
|
||||
return err($sqlite3_errstr(x))
|
||||
|
||||
if (let x = sqlite3_finalize(s); x != SQLITE_OK):
|
||||
return err($sqlite3_errstr(x))
|
||||
|
||||
template checkExec(q: string) =
|
||||
let s = prepare(q): discard
|
||||
checkExec(s)
|
||||
|
||||
template checkWalPragmaResult(journalModePragma: ptr sqlite3_stmt) =
|
||||
if (let x = sqlite3_step(journalModePragma); x != SQLITE_ROW):
|
||||
discard sqlite3_finalize(journalModePragma)
|
||||
return err($sqlite3_errstr(x))
|
||||
|
||||
if (let x = sqlite3_column_type(journalModePragma, 0); x != SQLITE3_TEXT):
|
||||
discard sqlite3_finalize(journalModePragma)
|
||||
return err($sqlite3_errstr(x))
|
||||
|
||||
if (let x = sqlite3_column_text(journalModePragma, 0);
|
||||
x != "memory" and x != "wal"):
|
||||
discard sqlite3_finalize(journalModePragma)
|
||||
return err("Invalid pragma result: " & $x)
|
||||
|
||||
# TODO: check current version and implement schema versioning
|
||||
checkExec "PRAGMA user_version = 1;"
|
||||
|
||||
let journalModePragma = prepare("PRAGMA journal_mode = WAL;"): discard
|
||||
checkWalPragmaResult(journalModePragma)
|
||||
checkExec(journalModePragma)
|
||||
DataProc* = proc(timestamp: uint64, msg: WakuMessage) {.closure.}
|
||||
|
||||
proc init*(T: type MessageStore, db: SqliteDatabase): MessageStoreResult[T] =
|
||||
## Table is the SQL query for creating the messages Table.
|
||||
## It contains:
|
||||
## - 4-Byte ContentTopic stored as an Integer
|
||||
## - Payload stored as a blob
|
||||
checkExec """
|
||||
let prepare = db.prepareStmt("""
|
||||
CREATE TABLE IF NOT EXISTS messages (
|
||||
id BLOB PRIMARY KEY,
|
||||
timestamp INTEGER NOT NULL,
|
||||
contentTopic INTEGER NOT NULL,
|
||||
payload BLOB
|
||||
) WITHOUT ROWID;
|
||||
"""
|
||||
""", NoParams, void)
|
||||
|
||||
ok(MessageStore(
|
||||
env: env.release
|
||||
))
|
||||
if prepare.isErr:
|
||||
return err("failed to prepare")
|
||||
|
||||
template prepare(env: Sqlite, q: string, cleanup: untyped): ptr sqlite3_stmt =
|
||||
var s: ptr sqlite3_stmt
|
||||
checkErr sqlite3_prepare_v2(env, q, q.len.cint, addr s, nil):
|
||||
cleanup
|
||||
s
|
||||
let res = prepare.value.exec(())
|
||||
if res.isErr:
|
||||
return err("failed to exec")
|
||||
|
||||
proc bindParam(s: RawStmtPtr, n: int, val: auto): cint =
|
||||
when val is openarray[byte]|seq[byte]:
|
||||
if val.len > 0:
|
||||
sqlite3_bind_blob(s, n.cint, unsafeAddr val[0], val.len.cint, nil)
|
||||
else:
|
||||
sqlite3_bind_blob(s, n.cint, nil, 0.cint, nil)
|
||||
elif val is int32:
|
||||
sqlite3_bind_int(s, n.cint, val)
|
||||
elif val is uint32:
|
||||
sqlite3_bind_int(s, int(n).cint, int(val).cint)
|
||||
elif val is int64:
|
||||
sqlite3_bind_int64(s, n.cint, val)
|
||||
else:
|
||||
{.fatal: "Please add support for the 'kek' type".}
|
||||
ok(MessageStore(database: db))
|
||||
|
||||
proc put*(db: MessageStore, cursor: Index, message: WakuMessage): MessageStoreResult[void] =
|
||||
## Adds a message to the storage.
|
||||
@ -158,28 +50,21 @@ proc put*(db: MessageStore, cursor: Index, message: WakuMessage): MessageStoreRe
|
||||
## let res = db.put(message)
|
||||
## if res.isErr:
|
||||
## echo "error"
|
||||
let s = prepare(db.env, "INSERT INTO messages (id, timestamp, contentTopic, payload) VALUES (?, ?, ?, ?);"): discard
|
||||
checkErr bindParam(s, 1, @(cursor.digest.data))
|
||||
checkErr bindParam(s, 2, int64(cursor.receivedTime))
|
||||
checkErr bindParam(s, 3, message.contentTopic)
|
||||
checkErr bindParam(s, 4, message.payload)
|
||||
##
|
||||
let prepare = db.database.prepareStmt(
|
||||
"INSERT INTO messages (id, timestamp, contentTopic, payload) VALUES (?, ?, ?, ?);",
|
||||
(seq[byte], int64, uint32, seq[byte]),
|
||||
void
|
||||
)
|
||||
|
||||
let res =
|
||||
if (let v = sqlite3_step(s); v != SQLITE_DONE):
|
||||
err($sqlite3_errstr(v))
|
||||
else:
|
||||
ok()
|
||||
if prepare.isErr:
|
||||
return err("failed to prepare")
|
||||
|
||||
# release implict transaction
|
||||
discard sqlite3_reset(s) # same return information as step
|
||||
discard sqlite3_clear_bindings(s) # no errors possible
|
||||
let res = prepare.value.exec((@(cursor.digest.data), int64(cursor.receivedTime), message.contentTopic, message.payload))
|
||||
if res.isErr:
|
||||
return err("failed")
|
||||
|
||||
res
|
||||
|
||||
proc close*(db: MessageStore) =
|
||||
discard sqlite3_close(db.env)
|
||||
|
||||
db[] = MessageStore()[]
|
||||
ok()
|
||||
|
||||
proc getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] =
|
||||
## Retreives all messages from the storage.
|
||||
@ -193,30 +78,23 @@ proc getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] =
|
||||
## let res = db.get(data)
|
||||
## if res.isErr:
|
||||
## echo "error"
|
||||
var gotMessages = false
|
||||
proc msg(s: ptr sqlite3_stmt) =
|
||||
gotMessages = true
|
||||
let
|
||||
timestamp = sqlite3_column_int64(s, 0)
|
||||
topic = sqlite3_column_int(s, 1)
|
||||
p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 2))
|
||||
l = sqlite3_column_bytes(s, 2)
|
||||
|
||||
let query = "SELECT timestamp, contentTopic, payload FROM messages"
|
||||
var s = prepare(db.env, query): discard
|
||||
onData(uint64(timestamp), WakuMessage(contentTopic: ContentTopic(int(topic)), payload: @(toOpenArray(p, 0, l-1))))
|
||||
|
||||
try:
|
||||
var gotResults = false
|
||||
while true:
|
||||
let v = sqlite3_step(s)
|
||||
case v
|
||||
of SQLITE_ROW:
|
||||
let
|
||||
timestamp = sqlite3_column_int64(s, 0)
|
||||
topic = sqlite3_column_int(s, 1)
|
||||
p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 2))
|
||||
l = sqlite3_column_bytes(s, 2)
|
||||
let res = db.database.query("SELECT timestamp, contentTopic, payload FROM messages", msg)
|
||||
if res.isErr:
|
||||
return err("failed")
|
||||
|
||||
onData(uint64(timestamp), WakuMessage(contentTopic: ContentTopic(int(topic)), payload: @(toOpenArray(p, 0, l-1))))
|
||||
gotResults = true
|
||||
of SQLITE_DONE:
|
||||
break
|
||||
else:
|
||||
return err($sqlite3_errstr(v))
|
||||
return ok gotResults
|
||||
finally:
|
||||
# release implicit transaction
|
||||
discard sqlite3_reset(s) # same return information as step
|
||||
discard sqlite3_clear_bindings(s) # no errors possible
|
||||
ok gotMessages
|
||||
|
||||
proc close*(db: MessageStore) =
|
||||
## Closes the database.
|
||||
db.database.close()
|
||||
|
207
waku/v2/node/sqlite.nim
Normal file
207
waku/v2/node/sqlite.nim
Normal file
@ -0,0 +1,207 @@
|
||||
import
|
||||
os,
|
||||
sqlite3_abi,
|
||||
chronos, chronicles, metrics, stew/results,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/protocols/protocol,
|
||||
libp2p/protobuf/minprotobuf,
|
||||
libp2p/stream/connection,
|
||||
stew/results, metrics
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
|
||||
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
|
||||
#
|
||||
# Most of it is a direct copy, the only unique functions being `get` and `put`.
|
||||
|
||||
type
|
||||
DatabaseResult*[T] = Result[T, string]
|
||||
|
||||
Sqlite = ptr sqlite3
|
||||
|
||||
NoParams* = tuple
|
||||
RawStmtPtr = ptr sqlite3_stmt
|
||||
SqliteStmt*[Params; Result] = distinct RawStmtPtr
|
||||
|
||||
AutoDisposed[T: ptr|ref] = object
|
||||
val: T
|
||||
|
||||
SqliteDatabase* = ref object of RootObj
|
||||
env*: Sqlite
|
||||
|
||||
template dispose(db: Sqlite) =
|
||||
discard sqlite3_close(db)
|
||||
|
||||
template dispose(db: RawStmtPtr) =
|
||||
discard sqlite3_finalize(db)
|
||||
|
||||
proc release[T](x: var AutoDisposed[T]): T =
|
||||
result = x.val
|
||||
x.val = nil
|
||||
|
||||
proc disposeIfUnreleased[T](x: var AutoDisposed[T]) =
|
||||
mixin dispose
|
||||
if x.val != nil:
|
||||
dispose(x.release)
|
||||
|
||||
template checkErr*(op, cleanup: untyped) =
|
||||
if (let v = (op); v != SQLITE_OK):
|
||||
cleanup
|
||||
return err($sqlite3_errstr(v))
|
||||
|
||||
template checkErr*(op) =
|
||||
checkErr(op): discard
|
||||
|
||||
proc init*(
|
||||
T: type SqliteDatabase,
|
||||
basePath: string,
|
||||
name: string = "store",
|
||||
readOnly = false,
|
||||
inMemory = false): DatabaseResult[T] =
|
||||
var env: AutoDisposed[ptr sqlite3]
|
||||
defer: disposeIfUnreleased(env)
|
||||
|
||||
let
|
||||
name =
|
||||
if inMemory: ":memory:"
|
||||
else: basepath / name & ".sqlite3"
|
||||
flags =
|
||||
if readOnly: SQLITE_OPEN_READONLY
|
||||
else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE
|
||||
|
||||
if not inMemory:
|
||||
try:
|
||||
createDir(basePath)
|
||||
except OSError, IOError:
|
||||
return err("`sqlite: cannot create database directory")
|
||||
|
||||
checkErr sqlite3_open_v2(name, addr env.val, flags.cint, nil)
|
||||
|
||||
template prepare(q: string, cleanup: untyped): ptr sqlite3_stmt =
|
||||
var s: ptr sqlite3_stmt
|
||||
checkErr sqlite3_prepare_v2(env.val, q, q.len.cint, addr s, nil):
|
||||
cleanup
|
||||
s
|
||||
|
||||
template checkExec(s: ptr sqlite3_stmt) =
|
||||
if (let x = sqlite3_step(s); x != SQLITE_DONE):
|
||||
discard sqlite3_finalize(s)
|
||||
return err($sqlite3_errstr(x))
|
||||
|
||||
if (let x = sqlite3_finalize(s); x != SQLITE_OK):
|
||||
return err($sqlite3_errstr(x))
|
||||
|
||||
template checkExec(q: string) =
|
||||
let s = prepare(q): discard
|
||||
checkExec(s)
|
||||
|
||||
template checkWalPragmaResult(journalModePragma: ptr sqlite3_stmt) =
|
||||
if (let x = sqlite3_step(journalModePragma); x != SQLITE_ROW):
|
||||
discard sqlite3_finalize(journalModePragma)
|
||||
return err($sqlite3_errstr(x))
|
||||
|
||||
if (let x = sqlite3_column_type(journalModePragma, 0); x != SQLITE3_TEXT):
|
||||
discard sqlite3_finalize(journalModePragma)
|
||||
return err($sqlite3_errstr(x))
|
||||
|
||||
if (let x = sqlite3_column_text(journalModePragma, 0);
|
||||
x != "memory" and x != "wal"):
|
||||
discard sqlite3_finalize(journalModePragma)
|
||||
return err("Invalid pragma result: " & $x)
|
||||
|
||||
# TODO: check current version and implement schema versioning
|
||||
checkExec "PRAGMA user_version = 1;"
|
||||
|
||||
let journalModePragma = prepare("PRAGMA journal_mode = WAL;"): discard
|
||||
checkWalPragmaResult(journalModePragma)
|
||||
checkExec(journalModePragma)
|
||||
|
||||
ok(SqliteDatabase(
|
||||
env: env.release
|
||||
))
|
||||
|
||||
template prepare*(env: Sqlite, q: string, cleanup: untyped): ptr sqlite3_stmt =
|
||||
var s: ptr sqlite3_stmt
|
||||
checkErr sqlite3_prepare_v2(env, q, q.len.cint, addr s, nil):
|
||||
cleanup
|
||||
s
|
||||
|
||||
proc bindParam*(s: RawStmtPtr, n: int, val: auto): cint =
|
||||
when val is openarray[byte]|seq[byte]:
|
||||
if val.len > 0:
|
||||
sqlite3_bind_blob(s, n.cint, unsafeAddr val[0], val.len.cint, nil)
|
||||
else:
|
||||
sqlite3_bind_blob(s, n.cint, nil, 0.cint, nil)
|
||||
elif val is int32:
|
||||
sqlite3_bind_int(s, n.cint, val)
|
||||
elif val is uint32:
|
||||
sqlite3_bind_int(s, int(n).cint, int(val).cint)
|
||||
elif val is int64:
|
||||
sqlite3_bind_int64(s, n.cint, val)
|
||||
else:
|
||||
{.fatal: "Please add support for the 'kek' type".}
|
||||
|
||||
template bindParams(s: RawStmtPtr, params: auto) =
|
||||
when params is tuple:
|
||||
var i = 1
|
||||
for param in fields(params):
|
||||
checkErr bindParam(s, i, param)
|
||||
inc i
|
||||
else:
|
||||
checkErr bindParam(s, 1, params)
|
||||
|
||||
proc exec*[P](s: SqliteStmt[P, void], params: P): DatabaseResult[void] =
|
||||
let s = RawStmtPtr s
|
||||
bindParams(s, params)
|
||||
|
||||
let res =
|
||||
if (let v = sqlite3_step(s); v != SQLITE_DONE):
|
||||
err($sqlite3_errstr(v))
|
||||
else:
|
||||
ok()
|
||||
|
||||
# release implict transaction
|
||||
discard sqlite3_reset(s) # same return information as step
|
||||
discard sqlite3_clear_bindings(s) # no errors possible
|
||||
|
||||
res
|
||||
|
||||
type
|
||||
DataProc* = proc(s: ptr sqlite3_stmt) {.closure.}
|
||||
|
||||
proc query*(db: SqliteDatabase, query: string, onData: DataProc): DatabaseResult[bool] =
|
||||
var s = prepare(db.env, query): discard
|
||||
|
||||
try:
|
||||
var gotResults = false
|
||||
while true:
|
||||
let v = sqlite3_step(s)
|
||||
case v
|
||||
of SQLITE_ROW:
|
||||
onData(s)
|
||||
gotResults = true
|
||||
of SQLITE_DONE:
|
||||
break
|
||||
else:
|
||||
return err($sqlite3_errstr(v))
|
||||
return ok gotResults
|
||||
finally:
|
||||
# release implicit transaction
|
||||
discard sqlite3_reset(s) # same return information as step
|
||||
discard sqlite3_clear_bindings(s) # no errors possible
|
||||
|
||||
proc prepareStmt*(
|
||||
db: SqliteDatabase,
|
||||
stmt: string,
|
||||
Params: type,
|
||||
Res: type
|
||||
): DatabaseResult[SqliteStmt[Params, Res]] =
|
||||
var s: RawStmtPtr
|
||||
checkErr sqlite3_prepare_v2(db.env, stmt, stmt.len.cint, addr s, nil)
|
||||
ok SqliteStmt[Params, Res](s)
|
||||
|
||||
proc close*(db: SqliteDatabase) =
|
||||
discard sqlite3_close(db.env)
|
||||
|
||||
db[] = SqliteDatabase()[]
|
@ -13,7 +13,8 @@ import
|
||||
../protocol/[waku_relay, waku_store, waku_filter, message_notifier],
|
||||
../protocol/waku_swap/waku_swap,
|
||||
../waku_types,
|
||||
./message_store
|
||||
./message_store,
|
||||
./sqlite
|
||||
|
||||
export waku_types
|
||||
|
||||
@ -402,7 +403,11 @@ when isMainModule:
|
||||
var store: MessageStore
|
||||
|
||||
if conf.dbpath != "":
|
||||
let res = MessageStore.init(conf.dbpath)
|
||||
let dbRes = SqliteDatabase.init(conf.dbpath)
|
||||
if dbRes.isErr:
|
||||
warn "failed to init database", err = dbRes.error
|
||||
|
||||
let res = MessageStore.init(dbRes.value)
|
||||
if res.isErr:
|
||||
warn "failed to init MessageStore", err = res.error
|
||||
else:
|
||||
|
@ -13,7 +13,7 @@ import
|
||||
libp2p/protocols/pubsub/[pubsub, gossipsub],
|
||||
protocol/waku_swap/waku_swap_types,
|
||||
nimcrypto/sha2,
|
||||
sqlite3_abi
|
||||
./node/sqlite
|
||||
|
||||
# Constants required for pagination -------------------------------------------
|
||||
const MaxPageSize* = 100 # Maximum number of waku messages in each page
|
||||
@ -80,10 +80,8 @@ type
|
||||
|
||||
MessageStoreResult*[T] = Result[T, string]
|
||||
|
||||
Sqlite* = ptr sqlite3
|
||||
|
||||
MessageStore* = ref object of RootObj
|
||||
env*: Sqlite
|
||||
database*: SqliteDatabase
|
||||
|
||||
WakuStore* = ref object of LPProtocol
|
||||
switch*: Switch
|
||||
|
Loading…
x
Reference in New Issue
Block a user