diff --git a/tests/v2/test_message_store.nim b/tests/v2/test_message_store.nim index a2072159b..34f3da07e 100644 --- a/tests/v2/test_message_store.nim +++ b/tests/v2/test_message_store.nim @@ -4,12 +4,14 @@ import chronos, chronicles, ../../waku/v2/node/message_store, ./utils, - ../../waku/v2/waku_types + ../../waku/v2/waku_types, + ../../waku/v2/node/sqlite suite "Message Store": test "set and get works": let - store = MessageStore.init("", inMemory = true)[] + database = SqliteDatabase.init("", inMemory = true)[] + store = MessageStore.init(database)[] topic = ContentTopic(1) var msgs = @[ diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index 06ac20e44..4e22630aa 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -9,7 +9,7 @@ import libp2p/crypto/crypto, libp2p/protocols/pubsub/rpc/message, ../../waku/v2/protocol/[waku_store, message_notifier], - ../../waku/v2/node/message_store, + ../../waku/v2/node/[message_store, sqlite], ../test_helpers, ./utils, ../../waku/v2/waku_types @@ -61,7 +61,8 @@ procSuite "Waku Store": key = PrivateKey.random(ECDSA, rng[]).get() peer = PeerInfo.init(key) topic = ContentTopic(1) - store = MessageStore.init("/foo", inMemory = true)[] + database = SqliteDatabase.init("", inMemory = true)[] + store = MessageStore.init(database)[] msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic) msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic(2)) diff --git a/waku/v2/node/message_store.nim b/waku/v2/node/message_store.nim index c359d72cb..f0cb9e53b 100644 --- a/waku/v2/node/message_store.nim +++ b/waku/v2/node/message_store.nim @@ -7,9 +7,8 @@ import libp2p/protobuf/minprotobuf, libp2p/stream/connection, stew/results, metrics, - ../waku_types - -{.push raises: [Defect].} + ../waku_types, + ./sqlite # The code in this file is an adaptation of the Sqlite KV Store found in nim-eth. # https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim @@ -17,137 +16,30 @@ import # Most of it is a direct copy, the only unique functions being `get` and `put`. type - RawStmtPtr = ptr sqlite3_stmt - - AutoDisposed[T: ptr|ref] = object - val: T - - DataProc* = proc(timestamp: uint64, msg: WakuMessage) {.closure.} - -template dispose(db: Sqlite) = - discard sqlite3_close(db) - -template dispose(db: RawStmtPtr) = - discard sqlite3_finalize(db) - -proc release[T](x: var AutoDisposed[T]): T = - result = x.val - x.val = nil - -proc disposeIfUnreleased[T](x: var AutoDisposed[T]) = - mixin dispose - if x.val != nil: - dispose(x.release) - -template checkErr(op, cleanup: untyped) = - if (let v = (op); v != SQLITE_OK): - cleanup - return err($sqlite3_errstr(v)) - -template checkErr(op) = - checkErr(op): discard - -proc init*( - T: type MessageStore, - basePath: string, - name: string = "store", - readOnly = false, - inMemory = false): MessageStoreResult[T] = - var env: AutoDisposed[ptr sqlite3] - defer: disposeIfUnreleased(env) - - let - name = - if inMemory: ":memory:" - else: basepath / name & ".sqlite3" - flags = - if readOnly: SQLITE_OPEN_READONLY - else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE - - if not inMemory: - try: - createDir(basePath) - except OSError, IOError: - return err("`sqlite: cannot create database directory") - - checkErr sqlite3_open_v2(name, addr env.val, flags.cint, nil) - - template prepare(q: string, cleanup: untyped): ptr sqlite3_stmt = - var s: ptr sqlite3_stmt - checkErr sqlite3_prepare_v2(env.val, q, q.len.cint, addr s, nil): - cleanup - s - - template checkExec(s: ptr sqlite3_stmt) = - if (let x = sqlite3_step(s); x != SQLITE_DONE): - discard sqlite3_finalize(s) - return err($sqlite3_errstr(x)) - - if (let x = sqlite3_finalize(s); x != SQLITE_OK): - return err($sqlite3_errstr(x)) - - template checkExec(q: string) = - let s = prepare(q): discard - checkExec(s) - - template checkWalPragmaResult(journalModePragma: ptr sqlite3_stmt) = - if (let x = sqlite3_step(journalModePragma); x != SQLITE_ROW): - discard sqlite3_finalize(journalModePragma) - return err($sqlite3_errstr(x)) - - if (let x = sqlite3_column_type(journalModePragma, 0); x != SQLITE3_TEXT): - discard sqlite3_finalize(journalModePragma) - return err($sqlite3_errstr(x)) - - if (let x = sqlite3_column_text(journalModePragma, 0); - x != "memory" and x != "wal"): - discard sqlite3_finalize(journalModePragma) - return err("Invalid pragma result: " & $x) - - # TODO: check current version and implement schema versioning - checkExec "PRAGMA user_version = 1;" - - let journalModePragma = prepare("PRAGMA journal_mode = WAL;"): discard - checkWalPragmaResult(journalModePragma) - checkExec(journalModePragma) + DataProc* = proc(timestamp: uint64, msg: WakuMessage) {.closure.} +proc init*(T: type MessageStore, db: SqliteDatabase): MessageStoreResult[T] = ## Table is the SQL query for creating the messages Table. ## It contains: ## - 4-Byte ContentTopic stored as an Integer ## - Payload stored as a blob - checkExec """ + let prepare = db.prepareStmt(""" CREATE TABLE IF NOT EXISTS messages ( id BLOB PRIMARY KEY, timestamp INTEGER NOT NULL, contentTopic INTEGER NOT NULL, payload BLOB ) WITHOUT ROWID; - """ + """, NoParams, void) - ok(MessageStore( - env: env.release - )) + if prepare.isErr: + return err("failed to prepare") -template prepare(env: Sqlite, q: string, cleanup: untyped): ptr sqlite3_stmt = - var s: ptr sqlite3_stmt - checkErr sqlite3_prepare_v2(env, q, q.len.cint, addr s, nil): - cleanup - s + let res = prepare.value.exec(()) + if res.isErr: + return err("failed to exec") -proc bindParam(s: RawStmtPtr, n: int, val: auto): cint = - when val is openarray[byte]|seq[byte]: - if val.len > 0: - sqlite3_bind_blob(s, n.cint, unsafeAddr val[0], val.len.cint, nil) - else: - sqlite3_bind_blob(s, n.cint, nil, 0.cint, nil) - elif val is int32: - sqlite3_bind_int(s, n.cint, val) - elif val is uint32: - sqlite3_bind_int(s, int(n).cint, int(val).cint) - elif val is int64: - sqlite3_bind_int64(s, n.cint, val) - else: - {.fatal: "Please add support for the 'kek' type".} + ok(MessageStore(database: db)) proc put*(db: MessageStore, cursor: Index, message: WakuMessage): MessageStoreResult[void] = ## Adds a message to the storage. @@ -158,28 +50,21 @@ proc put*(db: MessageStore, cursor: Index, message: WakuMessage): MessageStoreRe ## let res = db.put(message) ## if res.isErr: ## echo "error" - let s = prepare(db.env, "INSERT INTO messages (id, timestamp, contentTopic, payload) VALUES (?, ?, ?, ?);"): discard - checkErr bindParam(s, 1, @(cursor.digest.data)) - checkErr bindParam(s, 2, int64(cursor.receivedTime)) - checkErr bindParam(s, 3, message.contentTopic) - checkErr bindParam(s, 4, message.payload) + ## + let prepare = db.database.prepareStmt( + "INSERT INTO messages (id, timestamp, contentTopic, payload) VALUES (?, ?, ?, ?);", + (seq[byte], int64, uint32, seq[byte]), + void + ) - let res = - if (let v = sqlite3_step(s); v != SQLITE_DONE): - err($sqlite3_errstr(v)) - else: - ok() + if prepare.isErr: + return err("failed to prepare") - # release implict transaction - discard sqlite3_reset(s) # same return information as step - discard sqlite3_clear_bindings(s) # no errors possible + let res = prepare.value.exec((@(cursor.digest.data), int64(cursor.receivedTime), message.contentTopic, message.payload)) + if res.isErr: + return err("failed") - res - -proc close*(db: MessageStore) = - discard sqlite3_close(db.env) - - db[] = MessageStore()[] + ok() proc getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] = ## Retreives all messages from the storage. @@ -193,30 +78,23 @@ proc getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] = ## let res = db.get(data) ## if res.isErr: ## echo "error" + var gotMessages = false + proc msg(s: ptr sqlite3_stmt) = + gotMessages = true + let + timestamp = sqlite3_column_int64(s, 0) + topic = sqlite3_column_int(s, 1) + p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 2)) + l = sqlite3_column_bytes(s, 2) - let query = "SELECT timestamp, contentTopic, payload FROM messages" - var s = prepare(db.env, query): discard + onData(uint64(timestamp), WakuMessage(contentTopic: ContentTopic(int(topic)), payload: @(toOpenArray(p, 0, l-1)))) - try: - var gotResults = false - while true: - let v = sqlite3_step(s) - case v - of SQLITE_ROW: - let - timestamp = sqlite3_column_int64(s, 0) - topic = sqlite3_column_int(s, 1) - p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 2)) - l = sqlite3_column_bytes(s, 2) + let res = db.database.query("SELECT timestamp, contentTopic, payload FROM messages", msg) + if res.isErr: + return err("failed") - onData(uint64(timestamp), WakuMessage(contentTopic: ContentTopic(int(topic)), payload: @(toOpenArray(p, 0, l-1)))) - gotResults = true - of SQLITE_DONE: - break - else: - return err($sqlite3_errstr(v)) - return ok gotResults - finally: - # release implicit transaction - discard sqlite3_reset(s) # same return information as step - discard sqlite3_clear_bindings(s) # no errors possible + ok gotMessages + +proc close*(db: MessageStore) = + ## Closes the database. + db.database.close() diff --git a/waku/v2/node/sqlite.nim b/waku/v2/node/sqlite.nim new file mode 100644 index 000000000..1d2127e36 --- /dev/null +++ b/waku/v2/node/sqlite.nim @@ -0,0 +1,207 @@ +import + os, + sqlite3_abi, + chronos, chronicles, metrics, stew/results, + libp2p/crypto/crypto, + libp2p/protocols/protocol, + libp2p/protobuf/minprotobuf, + libp2p/stream/connection, + stew/results, metrics + +{.push raises: [Defect].} + +# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth. +# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim +# +# Most of it is a direct copy, the only unique functions being `get` and `put`. + +type + DatabaseResult*[T] = Result[T, string] + + Sqlite = ptr sqlite3 + + NoParams* = tuple + RawStmtPtr = ptr sqlite3_stmt + SqliteStmt*[Params; Result] = distinct RawStmtPtr + + AutoDisposed[T: ptr|ref] = object + val: T + + SqliteDatabase* = ref object of RootObj + env*: Sqlite + +template dispose(db: Sqlite) = + discard sqlite3_close(db) + +template dispose(db: RawStmtPtr) = + discard sqlite3_finalize(db) + +proc release[T](x: var AutoDisposed[T]): T = + result = x.val + x.val = nil + +proc disposeIfUnreleased[T](x: var AutoDisposed[T]) = + mixin dispose + if x.val != nil: + dispose(x.release) + +template checkErr*(op, cleanup: untyped) = + if (let v = (op); v != SQLITE_OK): + cleanup + return err($sqlite3_errstr(v)) + +template checkErr*(op) = + checkErr(op): discard + +proc init*( + T: type SqliteDatabase, + basePath: string, + name: string = "store", + readOnly = false, + inMemory = false): DatabaseResult[T] = + var env: AutoDisposed[ptr sqlite3] + defer: disposeIfUnreleased(env) + + let + name = + if inMemory: ":memory:" + else: basepath / name & ".sqlite3" + flags = + if readOnly: SQLITE_OPEN_READONLY + else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE + + if not inMemory: + try: + createDir(basePath) + except OSError, IOError: + return err("`sqlite: cannot create database directory") + + checkErr sqlite3_open_v2(name, addr env.val, flags.cint, nil) + + template prepare(q: string, cleanup: untyped): ptr sqlite3_stmt = + var s: ptr sqlite3_stmt + checkErr sqlite3_prepare_v2(env.val, q, q.len.cint, addr s, nil): + cleanup + s + + template checkExec(s: ptr sqlite3_stmt) = + if (let x = sqlite3_step(s); x != SQLITE_DONE): + discard sqlite3_finalize(s) + return err($sqlite3_errstr(x)) + + if (let x = sqlite3_finalize(s); x != SQLITE_OK): + return err($sqlite3_errstr(x)) + + template checkExec(q: string) = + let s = prepare(q): discard + checkExec(s) + + template checkWalPragmaResult(journalModePragma: ptr sqlite3_stmt) = + if (let x = sqlite3_step(journalModePragma); x != SQLITE_ROW): + discard sqlite3_finalize(journalModePragma) + return err($sqlite3_errstr(x)) + + if (let x = sqlite3_column_type(journalModePragma, 0); x != SQLITE3_TEXT): + discard sqlite3_finalize(journalModePragma) + return err($sqlite3_errstr(x)) + + if (let x = sqlite3_column_text(journalModePragma, 0); + x != "memory" and x != "wal"): + discard sqlite3_finalize(journalModePragma) + return err("Invalid pragma result: " & $x) + + # TODO: check current version and implement schema versioning + checkExec "PRAGMA user_version = 1;" + + let journalModePragma = prepare("PRAGMA journal_mode = WAL;"): discard + checkWalPragmaResult(journalModePragma) + checkExec(journalModePragma) + + ok(SqliteDatabase( + env: env.release + )) + +template prepare*(env: Sqlite, q: string, cleanup: untyped): ptr sqlite3_stmt = + var s: ptr sqlite3_stmt + checkErr sqlite3_prepare_v2(env, q, q.len.cint, addr s, nil): + cleanup + s + +proc bindParam*(s: RawStmtPtr, n: int, val: auto): cint = + when val is openarray[byte]|seq[byte]: + if val.len > 0: + sqlite3_bind_blob(s, n.cint, unsafeAddr val[0], val.len.cint, nil) + else: + sqlite3_bind_blob(s, n.cint, nil, 0.cint, nil) + elif val is int32: + sqlite3_bind_int(s, n.cint, val) + elif val is uint32: + sqlite3_bind_int(s, int(n).cint, int(val).cint) + elif val is int64: + sqlite3_bind_int64(s, n.cint, val) + else: + {.fatal: "Please add support for the 'kek' type".} + +template bindParams(s: RawStmtPtr, params: auto) = + when params is tuple: + var i = 1 + for param in fields(params): + checkErr bindParam(s, i, param) + inc i + else: + checkErr bindParam(s, 1, params) + +proc exec*[P](s: SqliteStmt[P, void], params: P): DatabaseResult[void] = + let s = RawStmtPtr s + bindParams(s, params) + + let res = + if (let v = sqlite3_step(s); v != SQLITE_DONE): + err($sqlite3_errstr(v)) + else: + ok() + + # release implict transaction + discard sqlite3_reset(s) # same return information as step + discard sqlite3_clear_bindings(s) # no errors possible + + res + +type + DataProc* = proc(s: ptr sqlite3_stmt) {.closure.} + +proc query*(db: SqliteDatabase, query: string, onData: DataProc): DatabaseResult[bool] = + var s = prepare(db.env, query): discard + + try: + var gotResults = false + while true: + let v = sqlite3_step(s) + case v + of SQLITE_ROW: + onData(s) + gotResults = true + of SQLITE_DONE: + break + else: + return err($sqlite3_errstr(v)) + return ok gotResults + finally: + # release implicit transaction + discard sqlite3_reset(s) # same return information as step + discard sqlite3_clear_bindings(s) # no errors possible + +proc prepareStmt*( + db: SqliteDatabase, + stmt: string, + Params: type, + Res: type +): DatabaseResult[SqliteStmt[Params, Res]] = + var s: RawStmtPtr + checkErr sqlite3_prepare_v2(db.env, stmt, stmt.len.cint, addr s, nil) + ok SqliteStmt[Params, Res](s) + +proc close*(db: SqliteDatabase) = + discard sqlite3_close(db.env) + + db[] = SqliteDatabase()[] diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 08bf7c175..90faba3d5 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -13,7 +13,8 @@ import ../protocol/[waku_relay, waku_store, waku_filter, message_notifier], ../protocol/waku_swap/waku_swap, ../waku_types, - ./message_store + ./message_store, + ./sqlite export waku_types @@ -402,7 +403,11 @@ when isMainModule: var store: MessageStore if conf.dbpath != "": - let res = MessageStore.init(conf.dbpath) + let dbRes = SqliteDatabase.init(conf.dbpath) + if dbRes.isErr: + warn "failed to init database", err = dbRes.error + + let res = MessageStore.init(dbRes.value) if res.isErr: warn "failed to init MessageStore", err = res.error else: diff --git a/waku/v2/waku_types.nim b/waku/v2/waku_types.nim index 8ee4f3bc1..d2d908799 100644 --- a/waku/v2/waku_types.nim +++ b/waku/v2/waku_types.nim @@ -13,7 +13,7 @@ import libp2p/protocols/pubsub/[pubsub, gossipsub], protocol/waku_swap/waku_swap_types, nimcrypto/sha2, - sqlite3_abi + ./node/sqlite # Constants required for pagination ------------------------------------------- const MaxPageSize* = 100 # Maximum number of waku messages in each page @@ -80,10 +80,8 @@ type MessageStoreResult*[T] = Result[T, string] - Sqlite* = ptr sqlite3 - MessageStore* = ref object of RootObj - env*: Sqlite + database*: SqliteDatabase WakuStore* = ref object of LPProtocol switch*: Switch