mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-28 07:35:56 +00:00
fix(sqlite): memory leak due to undisposed prepared statements (#981)
This commit is contained in:
parent
1ac029025a
commit
37325c8f48
@ -37,6 +37,7 @@ type
|
|||||||
storeCapacity: int # represents both the number of messages that are persisted in the sqlite DB (excl. the overflow window explained above), and the number of messages that get loaded via `getAll`.
|
storeCapacity: int # represents both the number of messages that are persisted in the sqlite DB (excl. the overflow window explained above), and the number of messages that get loaded via `getAll`.
|
||||||
storeMaxLoad: int # = storeCapacity * MaxStoreOverflow
|
storeMaxLoad: int # = storeCapacity * MaxStoreOverflow
|
||||||
deleteWindow: int # = (storeCapacity * MaxStoreOverflow - storeCapacity)/2; half of the overflow window, the amount of messages deleted when overflow occurs
|
deleteWindow: int # = (storeCapacity * MaxStoreOverflow - storeCapacity)/2; half of the overflow window, the amount of messages deleted when overflow occurs
|
||||||
|
insertStmt: SqliteStmt[(seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp), void]
|
||||||
|
|
||||||
proc messageCount(db: SqliteDatabase): MessageStoreResult[int64] =
|
proc messageCount(db: SqliteDatabase): MessageStoreResult[int64] =
|
||||||
var numMessages: int64
|
var numMessages: int64
|
||||||
@ -72,12 +73,10 @@ proc deleteOldest(db: WakuMessageStore): MessageStoreResult[void] =
|
|||||||
ok()
|
ok()
|
||||||
|
|
||||||
proc init*(T: type WakuMessageStore, db: SqliteDatabase, storeCapacity: int = 50000): MessageStoreResult[T] =
|
proc init*(T: type WakuMessageStore, db: SqliteDatabase, storeCapacity: int = 50000): MessageStoreResult[T] =
|
||||||
## Table is the SQL query for creating the messages Table.
|
|
||||||
## It contains:
|
|
||||||
## - 4-Byte ContentTopic stored as an Integer
|
|
||||||
## - Payload stored as a blob
|
|
||||||
|
|
||||||
let prepare = db.prepareStmt("""
|
## Table Creation
|
||||||
|
let
|
||||||
|
createStmt = db.prepareStmt("""
|
||||||
CREATE TABLE IF NOT EXISTS """ & TABLE_TITLE & """ (
|
CREATE TABLE IF NOT EXISTS """ & TABLE_TITLE & """ (
|
||||||
id BLOB,
|
id BLOB,
|
||||||
receiverTimestamp """ & TIMESTAMP_TABLE_TYPE & """ NOT NULL,
|
receiverTimestamp """ & TIMESTAMP_TABLE_TYPE & """ NOT NULL,
|
||||||
@ -88,31 +87,45 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase, storeCapacity: int = 50
|
|||||||
senderTimestamp """ & TIMESTAMP_TABLE_TYPE & """ NOT NULL,
|
senderTimestamp """ & TIMESTAMP_TABLE_TYPE & """ NOT NULL,
|
||||||
CONSTRAINT messageIndex PRIMARY KEY (senderTimestamp, id, pubsubTopic)
|
CONSTRAINT messageIndex PRIMARY KEY (senderTimestamp, id, pubsubTopic)
|
||||||
) WITHOUT ROWID;
|
) WITHOUT ROWID;
|
||||||
""", NoParams, void)
|
""", NoParams, void).expect("this is a valid statement")
|
||||||
|
|
||||||
if prepare.isErr:
|
let prepareRes = createStmt.exec(())
|
||||||
return err("failed to prepare")
|
|
||||||
|
|
||||||
let prepareRes = prepare.value.exec(())
|
|
||||||
if prepareRes.isErr:
|
if prepareRes.isErr:
|
||||||
return err("failed to exec")
|
return err("failed to exec")
|
||||||
|
|
||||||
|
# We dispose of this prepared statement here, as we never use it again
|
||||||
|
createStmt.dispose()
|
||||||
|
|
||||||
|
## Reusable prepared statements
|
||||||
|
let
|
||||||
|
insertStmt = db.prepareStmt(
|
||||||
|
"INSERT INTO " & TABLE_TITLE & " (id, receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp) VALUES (?, ?, ?, ?, ?, ?, ?);",
|
||||||
|
(seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp),
|
||||||
|
void
|
||||||
|
).expect("this is a valid statement")
|
||||||
|
|
||||||
|
## General initialization
|
||||||
|
|
||||||
let numMessages = messageCount(db).get()
|
let numMessages = messageCount(db).get()
|
||||||
debug "number of messages in sqlite database", messageNum=numMessages
|
debug "number of messages in sqlite database", messageNum=numMessages
|
||||||
|
|
||||||
# add index on receiverTimestamp
|
# add index on receiverTimestamp
|
||||||
let addIndexStmt = "CREATE INDEX IF NOT EXISTS i_rt ON " & TABLE_TITLE & "(receiverTimestamp);"
|
let
|
||||||
let resIndex = db.query(addIndexStmt, proc(s: ptr sqlite3_stmt) = discard)
|
addIndexStmt = "CREATE INDEX IF NOT EXISTS i_rt ON " & TABLE_TITLE & "(receiverTimestamp);"
|
||||||
|
resIndex = db.query(addIndexStmt, proc(s: ptr sqlite3_stmt) = discard)
|
||||||
if resIndex.isErr:
|
if resIndex.isErr:
|
||||||
return err("Could not establish index on receiverTimestamp: " & resIndex.error)
|
return err("Could not establish index on receiverTimestamp: " & resIndex.error)
|
||||||
|
|
||||||
let storeMaxLoad = int(float(storeCapacity) * MaxStoreOverflow)
|
let
|
||||||
let deleteWindow = int(float(storeMaxLoad - storeCapacity) / 2)
|
storeMaxLoad = int(float(storeCapacity) * MaxStoreOverflow)
|
||||||
|
deleteWindow = int(float(storeMaxLoad - storeCapacity) / 2)
|
||||||
|
|
||||||
let wms = WakuMessageStore(database: db,
|
let wms = WakuMessageStore(database: db,
|
||||||
numMessages: int(numMessages),
|
numMessages: int(numMessages),
|
||||||
storeCapacity: storeCapacity,
|
storeCapacity: storeCapacity,
|
||||||
storeMaxLoad: storeMaxLoad,
|
storeMaxLoad: storeMaxLoad,
|
||||||
deleteWindow: deleteWindow)
|
deleteWindow: deleteWindow,
|
||||||
|
insertStmt: insertStmt)
|
||||||
|
|
||||||
# If the loaded db is already over max load, delete the oldest messages before returning the WakuMessageStore object
|
# If the loaded db is already over max load, delete the oldest messages before returning the WakuMessageStore object
|
||||||
if wms.numMessages >= wms.storeMaxLoad:
|
if wms.numMessages >= wms.storeMaxLoad:
|
||||||
@ -132,16 +145,8 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTop
|
|||||||
## if res.isErr:
|
## if res.isErr:
|
||||||
## echo "error"
|
## echo "error"
|
||||||
##
|
##
|
||||||
let prepare = db.database.prepareStmt(
|
|
||||||
"INSERT INTO " & TABLE_TITLE & " (id, receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp) VALUES (?, ?, ?, ?, ?, ?, ?);",
|
|
||||||
(seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp),
|
|
||||||
void
|
|
||||||
)
|
|
||||||
|
|
||||||
if prepare.isErr:
|
let res = db.insertStmt.exec((@(cursor.digest.data), cursor.receiverTime, message.contentTopic.toBytes(), message.payload, pubsubTopic.toBytes(), int64(message.version), message.timestamp))
|
||||||
return err("failed to prepare")
|
|
||||||
|
|
||||||
let res = prepare.value.exec((@(cursor.digest.data), cursor.receiverTime, message.contentTopic.toBytes(), message.payload, pubsubTopic.toBytes(), int64(message.version), message.timestamp))
|
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
return err("failed")
|
return err("failed")
|
||||||
|
|
||||||
@ -152,8 +157,6 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTop
|
|||||||
|
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageStoreResult[bool] =
|
method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageStoreResult[bool] =
|
||||||
## Retrieves `storeCapacity` many messages from the storage.
|
## Retrieves `storeCapacity` many messages from the storage.
|
||||||
##
|
##
|
||||||
@ -210,4 +213,5 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageSto
|
|||||||
|
|
||||||
proc close*(db: WakuMessageStore) =
|
proc close*(db: WakuMessageStore) =
|
||||||
## Closes the database.
|
## Closes the database.
|
||||||
|
db.insertStmt.dispose()
|
||||||
db.database.close()
|
db.database.close()
|
||||||
|
@ -14,6 +14,7 @@ export sqlite
|
|||||||
type
|
type
|
||||||
WakuPeerStorage* = ref object of PeerStorage
|
WakuPeerStorage* = ref object of PeerStorage
|
||||||
database*: SqliteDatabase
|
database*: SqliteDatabase
|
||||||
|
replaceStmt: SqliteStmt[(seq[byte], seq[byte], int32, int64), void]
|
||||||
|
|
||||||
##########################
|
##########################
|
||||||
# Protobuf Serialisation #
|
# Protobuf Serialisation #
|
||||||
@ -60,29 +61,42 @@ proc encode*(storedInfo: StoredInfo): PeerStorageResult[ProtoBuffer] =
|
|||||||
##########################
|
##########################
|
||||||
|
|
||||||
proc new*(T: type WakuPeerStorage, db: SqliteDatabase): PeerStorageResult[T] =
|
proc new*(T: type WakuPeerStorage, db: SqliteDatabase): PeerStorageResult[T] =
|
||||||
|
|
||||||
## Create the "Peer" table
|
## Create the "Peer" table
|
||||||
## It contains:
|
## It contains:
|
||||||
## - peer id as primary key, stored as a blob
|
## - peer id as primary key, stored as a blob
|
||||||
## - stored info (serialised protobuf), stored as a blob
|
## - stored info (serialised protobuf), stored as a blob
|
||||||
## - last known enumerated connectedness state, stored as an integer
|
## - last known enumerated connectedness state, stored as an integer
|
||||||
## - disconnect time in epoch seconds, if applicable
|
## - disconnect time in epoch seconds, if applicable
|
||||||
let prepare = db.prepareStmt("""
|
let
|
||||||
|
createStmt = db.prepareStmt("""
|
||||||
CREATE TABLE IF NOT EXISTS Peer (
|
CREATE TABLE IF NOT EXISTS Peer (
|
||||||
peerId BLOB PRIMARY KEY,
|
peerId BLOB PRIMARY KEY,
|
||||||
storedInfo BLOB,
|
storedInfo BLOB,
|
||||||
connectedness INTEGER,
|
connectedness INTEGER,
|
||||||
disconnectTime INTEGER
|
disconnectTime INTEGER
|
||||||
) WITHOUT ROWID;
|
) WITHOUT ROWID;
|
||||||
""", NoParams, void)
|
""", NoParams, void).expect("this is a valid statement")
|
||||||
|
|
||||||
if prepare.isErr:
|
let res = createStmt.exec(())
|
||||||
return err("failed to prepare")
|
|
||||||
|
|
||||||
let res = prepare.value.exec(())
|
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
return err("failed to exec")
|
return err("failed to exec")
|
||||||
|
|
||||||
ok(WakuPeerStorage(database: db))
|
# We dispose of this prepared statement here, as we never use it again
|
||||||
|
createStmt.dispose()
|
||||||
|
|
||||||
|
## Reusable prepared statements
|
||||||
|
let
|
||||||
|
replaceStmt = db.prepareStmt(
|
||||||
|
"REPLACE INTO Peer (peerId, storedInfo, connectedness, disconnectTime) VALUES (?, ?, ?, ?);",
|
||||||
|
(seq[byte], seq[byte], int32, int64),
|
||||||
|
void
|
||||||
|
).expect("this is a valid statement")
|
||||||
|
|
||||||
|
## General initialization
|
||||||
|
|
||||||
|
ok(WakuPeerStorage(database: db,
|
||||||
|
replaceStmt: replaceStmt))
|
||||||
|
|
||||||
|
|
||||||
method put*(db: WakuPeerStorage,
|
method put*(db: WakuPeerStorage,
|
||||||
@ -92,21 +106,12 @@ method put*(db: WakuPeerStorage,
|
|||||||
disconnectTime: int64): PeerStorageResult[void] =
|
disconnectTime: int64): PeerStorageResult[void] =
|
||||||
|
|
||||||
## Adds a peer to storage or replaces existing entry if it already exists
|
## Adds a peer to storage or replaces existing entry if it already exists
|
||||||
let prepare = db.database.prepareStmt(
|
|
||||||
"REPLACE INTO Peer (peerId, storedInfo, connectedness, disconnectTime) VALUES (?, ?, ?, ?);",
|
|
||||||
(seq[byte], seq[byte], int32, int64),
|
|
||||||
void
|
|
||||||
)
|
|
||||||
|
|
||||||
if prepare.isErr:
|
|
||||||
return err("failed to prepare")
|
|
||||||
|
|
||||||
let encoded = storedInfo.encode()
|
let encoded = storedInfo.encode()
|
||||||
|
|
||||||
if encoded.isErr:
|
if encoded.isErr:
|
||||||
return err("failed to encode: " & encoded.error())
|
return err("failed to encode: " & encoded.error())
|
||||||
|
|
||||||
let res = prepare.value.exec((peerId.data, encoded.get().buffer, int32(ord(connectedness)), disconnectTime))
|
let res = db.replaceStmt.exec((peerId.data, encoded.get().buffer, int32(ord(connectedness)), disconnectTime))
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
return err("failed")
|
return err("failed")
|
||||||
|
|
||||||
@ -147,4 +152,5 @@ method getAll*(db: WakuPeerStorage, onData: peer_storage.DataProc): PeerStorageR
|
|||||||
|
|
||||||
proc close*(db: WakuPeerStorage) =
|
proc close*(db: WakuPeerStorage) =
|
||||||
## Closes the database.
|
## Closes the database.
|
||||||
|
db.replaceStmt.dispose()
|
||||||
db.database.close()
|
db.database.close()
|
@ -32,8 +32,11 @@ type
|
|||||||
template dispose(db: Sqlite) =
|
template dispose(db: Sqlite) =
|
||||||
discard sqlite3_close(db)
|
discard sqlite3_close(db)
|
||||||
|
|
||||||
template dispose(db: RawStmtPtr) =
|
template dispose(rawStmt: RawStmtPtr) =
|
||||||
discard sqlite3_finalize(db)
|
discard sqlite3_finalize(rawStmt)
|
||||||
|
|
||||||
|
template dispose*(sqliteStmt: SqliteStmt) =
|
||||||
|
discard sqlite3_finalize(RawStmtPtr sqliteStmt)
|
||||||
|
|
||||||
proc release[T](x: var AutoDisposed[T]): T =
|
proc release[T](x: var AutoDisposed[T]): T =
|
||||||
result = x.val
|
result = x.val
|
||||||
@ -192,6 +195,7 @@ proc query*(db: SqliteDatabase, query: string, onData: DataProc): DatabaseResult
|
|||||||
# release implicit transaction
|
# release implicit transaction
|
||||||
discard sqlite3_reset(s) # same return information as step
|
discard sqlite3_reset(s) # same return information as step
|
||||||
discard sqlite3_clear_bindings(s) # no errors possible
|
discard sqlite3_clear_bindings(s) # no errors possible
|
||||||
|
discard sqlite3_finalize(s) # NB: dispose of the prepared query statement and free associated memory
|
||||||
|
|
||||||
proc prepareStmt*(
|
proc prepareStmt*(
|
||||||
db: SqliteDatabase,
|
db: SqliteDatabase,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user