diff --git a/waku/v2/node/storage/message/waku_message_store.nim b/waku/v2/node/storage/message/waku_message_store.nim index a9560a6dc..072c781c3 100644 --- a/waku/v2/node/storage/message/waku_message_store.nim +++ b/waku/v2/node/storage/message/waku_message_store.nim @@ -37,6 +37,7 @@ type storeCapacity: int # represents both the number of messages that are persisted in the sqlite DB (excl. the overflow window explained above), and the number of messages that get loaded via `getAll`. storeMaxLoad: int # = storeCapacity * MaxStoreOverflow deleteWindow: int # = (storeCapacity * MaxStoreOverflow - storeCapacity)/2; half of the overflow window, the amount of messages deleted when overflow occurs + insertStmt: SqliteStmt[(seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp), void] proc messageCount(db: SqliteDatabase): MessageStoreResult[int64] = var numMessages: int64 @@ -72,47 +73,59 @@ proc deleteOldest(db: WakuMessageStore): MessageStoreResult[void] = ok() proc init*(T: type WakuMessageStore, db: SqliteDatabase, storeCapacity: int = 50000): MessageStoreResult[T] = - ## Table is the SQL query for creating the messages Table. - ## It contains: - ## - 4-Byte ContentTopic stored as an Integer - ## - Payload stored as a blob - - let prepare = db.prepareStmt(""" - CREATE TABLE IF NOT EXISTS """ & TABLE_TITLE & """ ( - id BLOB, - receiverTimestamp """ & TIMESTAMP_TABLE_TYPE & """ NOT NULL, - contentTopic BLOB NOT NULL, - pubsubTopic BLOB NOT NULL, - payload BLOB, - version INTEGER NOT NULL, - senderTimestamp """ & TIMESTAMP_TABLE_TYPE & """ NOT NULL, - CONSTRAINT messageIndex PRIMARY KEY (senderTimestamp, id, pubsubTopic) - ) WITHOUT ROWID; - """, NoParams, void) - - if prepare.isErr: - return err("failed to prepare") - - let prepareRes = prepare.value.exec(()) + + ## Table Creation + let + createStmt = db.prepareStmt(""" + CREATE TABLE IF NOT EXISTS """ & TABLE_TITLE & """ ( + id BLOB, + receiverTimestamp """ & TIMESTAMP_TABLE_TYPE & """ NOT NULL, + contentTopic BLOB NOT NULL, + pubsubTopic BLOB NOT NULL, + payload BLOB, + version INTEGER NOT NULL, + senderTimestamp """ & TIMESTAMP_TABLE_TYPE & """ NOT NULL, + CONSTRAINT messageIndex PRIMARY KEY (senderTimestamp, id, pubsubTopic) + ) WITHOUT ROWID; + """, NoParams, void).expect("this is a valid statement") + + let prepareRes = createStmt.exec(()) if prepareRes.isErr: return err("failed to exec") + # We dispose of this prepared statement here, as we never use it again + createStmt.dispose() + + ## Reusable prepared statements + let + insertStmt = db.prepareStmt( + "INSERT INTO " & TABLE_TITLE & " (id, receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp) VALUES (?, ?, ?, ?, ?, ?, ?);", + (seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp), + void + ).expect("this is a valid statement") + + ## General initialization + let numMessages = messageCount(db).get() debug "number of messages in sqlite database", messageNum=numMessages # add index on receiverTimestamp - let addIndexStmt = "CREATE INDEX IF NOT EXISTS i_rt ON " & TABLE_TITLE & "(receiverTimestamp);" - let resIndex = db.query(addIndexStmt, proc(s: ptr sqlite3_stmt) = discard) + let + addIndexStmt = "CREATE INDEX IF NOT EXISTS i_rt ON " & TABLE_TITLE & "(receiverTimestamp);" + resIndex = db.query(addIndexStmt, proc(s: ptr sqlite3_stmt) = discard) if resIndex.isErr: return err("Could not establish index on receiverTimestamp: " & resIndex.error) - let storeMaxLoad = int(float(storeCapacity) * MaxStoreOverflow) - let deleteWindow = int(float(storeMaxLoad - storeCapacity) / 2) + let + storeMaxLoad = int(float(storeCapacity) * MaxStoreOverflow) + deleteWindow = int(float(storeMaxLoad - storeCapacity) / 2) + let wms = WakuMessageStore(database: db, numMessages: int(numMessages), storeCapacity: storeCapacity, storeMaxLoad: storeMaxLoad, - deleteWindow: deleteWindow) + deleteWindow: deleteWindow, + insertStmt: insertStmt) # If the loaded db is already over max load, delete the oldest messages before returning the WakuMessageStore object if wms.numMessages >= wms.storeMaxLoad: @@ -131,17 +144,9 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTop ## let res = db.put(message) ## if res.isErr: ## echo "error" - ## - let prepare = db.database.prepareStmt( - "INSERT INTO " & TABLE_TITLE & " (id, receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp) VALUES (?, ?, ?, ?, ?, ?, ?);", - (seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp), - void - ) + ## - if prepare.isErr: - return err("failed to prepare") - - let res = prepare.value.exec((@(cursor.digest.data), cursor.receiverTime, message.contentTopic.toBytes(), message.payload, pubsubTopic.toBytes(), int64(message.version), message.timestamp)) + let res = db.insertStmt.exec((@(cursor.digest.data), cursor.receiverTime, message.contentTopic.toBytes(), message.payload, pubsubTopic.toBytes(), int64(message.version), message.timestamp)) if res.isErr: return err("failed") @@ -152,8 +157,6 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTop ok() - - method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageStoreResult[bool] = ## Retrieves `storeCapacity` many messages from the storage. ## @@ -210,4 +213,5 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageSto proc close*(db: WakuMessageStore) = ## Closes the database. + db.insertStmt.dispose() db.database.close() diff --git a/waku/v2/node/storage/peer/waku_peer_storage.nim b/waku/v2/node/storage/peer/waku_peer_storage.nim index c02b4e6a0..f096e17da 100644 --- a/waku/v2/node/storage/peer/waku_peer_storage.nim +++ b/waku/v2/node/storage/peer/waku_peer_storage.nim @@ -14,6 +14,7 @@ export sqlite type WakuPeerStorage* = ref object of PeerStorage database*: SqliteDatabase + replaceStmt: SqliteStmt[(seq[byte], seq[byte], int32, int64), void] ########################## # Protobuf Serialisation # @@ -60,29 +61,42 @@ proc encode*(storedInfo: StoredInfo): PeerStorageResult[ProtoBuffer] = ########################## proc new*(T: type WakuPeerStorage, db: SqliteDatabase): PeerStorageResult[T] = + ## Create the "Peer" table ## It contains: ## - peer id as primary key, stored as a blob ## - stored info (serialised protobuf), stored as a blob ## - last known enumerated connectedness state, stored as an integer ## - disconnect time in epoch seconds, if applicable - let prepare = db.prepareStmt(""" - CREATE TABLE IF NOT EXISTS Peer ( - peerId BLOB PRIMARY KEY, - storedInfo BLOB, - connectedness INTEGER, - disconnectTime INTEGER - ) WITHOUT ROWID; - """, NoParams, void) + let + createStmt = db.prepareStmt(""" + CREATE TABLE IF NOT EXISTS Peer ( + peerId BLOB PRIMARY KEY, + storedInfo BLOB, + connectedness INTEGER, + disconnectTime INTEGER + ) WITHOUT ROWID; + """, NoParams, void).expect("this is a valid statement") - if prepare.isErr: - return err("failed to prepare") - - let res = prepare.value.exec(()) + let res = createStmt.exec(()) if res.isErr: return err("failed to exec") - ok(WakuPeerStorage(database: db)) + # We dispose of this prepared statement here, as we never use it again + createStmt.dispose() + + ## Reusable prepared statements + let + replaceStmt = db.prepareStmt( + "REPLACE INTO Peer (peerId, storedInfo, connectedness, disconnectTime) VALUES (?, ?, ?, ?);", + (seq[byte], seq[byte], int32, int64), + void + ).expect("this is a valid statement") + + ## General initialization + + ok(WakuPeerStorage(database: db, + replaceStmt: replaceStmt)) method put*(db: WakuPeerStorage, @@ -92,21 +106,12 @@ method put*(db: WakuPeerStorage, disconnectTime: int64): PeerStorageResult[void] = ## Adds a peer to storage or replaces existing entry if it already exists - let prepare = db.database.prepareStmt( - "REPLACE INTO Peer (peerId, storedInfo, connectedness, disconnectTime) VALUES (?, ?, ?, ?);", - (seq[byte], seq[byte], int32, int64), - void - ) - - if prepare.isErr: - return err("failed to prepare") - let encoded = storedInfo.encode() if encoded.isErr: return err("failed to encode: " & encoded.error()) - let res = prepare.value.exec((peerId.data, encoded.get().buffer, int32(ord(connectedness)), disconnectTime)) + let res = db.replaceStmt.exec((peerId.data, encoded.get().buffer, int32(ord(connectedness)), disconnectTime)) if res.isErr: return err("failed") @@ -147,4 +152,5 @@ method getAll*(db: WakuPeerStorage, onData: peer_storage.DataProc): PeerStorageR proc close*(db: WakuPeerStorage) = ## Closes the database. + db.replaceStmt.dispose() db.database.close() \ No newline at end of file diff --git a/waku/v2/node/storage/sqlite.nim b/waku/v2/node/storage/sqlite.nim index fbb1b52a6..48d924aec 100644 --- a/waku/v2/node/storage/sqlite.nim +++ b/waku/v2/node/storage/sqlite.nim @@ -32,8 +32,11 @@ type template dispose(db: Sqlite) = discard sqlite3_close(db) -template dispose(db: RawStmtPtr) = - discard sqlite3_finalize(db) +template dispose(rawStmt: RawStmtPtr) = + discard sqlite3_finalize(rawStmt) + +template dispose*(sqliteStmt: SqliteStmt) = + discard sqlite3_finalize(RawStmtPtr sqliteStmt) proc release[T](x: var AutoDisposed[T]): T = result = x.val @@ -192,6 +195,7 @@ proc query*(db: SqliteDatabase, query: string, onData: DataProc): DatabaseResult # release implicit transaction discard sqlite3_reset(s) # same return information as step discard sqlite3_clear_bindings(s) # no errors possible + discard sqlite3_finalize(s) # NB: dispose of the prepared query statement and free associated memory proc prepareStmt*( db: SqliteDatabase,