fix(store): add a table index to improve message insertion time

This commit is contained in:
Lorenzo Delgado 2022-09-09 15:04:48 +02:00 committed by GitHub
parent 5fd2eb08ed
commit 30f0e19d79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 40 additions and 30 deletions

View File

@ -88,15 +88,19 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase,
## Database initialization
# Create table (if not exists)
# Create table, if doesn't exist
let resCreate = createTable(db)
if resCreate.isErr():
return err("an error occurred while creating the table: " & resCreate.error())
return err("failed to create table: " & resCreate.error())
# Create index on receiverTimestamp (if not exists)
let resIndex = createIndex(db)
if resIndex.isErr():
return err("Could not establish index on receiverTimestamp: " & resIndex.error())
# Create indices, if don't exist
let resRtIndex = createOldestMessageTimestampIndex(db)
if resRtIndex.isErr():
return err("failed to create i_rt index: " & resRtIndex.error())
let resMsgIndex = createHistoryQueryIndex(db)
if resMsgIndex.isErr():
return err("failed to create i_msg index: " & resMsgIndex.error())
## General initialization
@ -150,7 +154,6 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase,
# Update message count after deleting messages
wms.numMessages = int(getMessageCount(db).expect("query for oldest timestamp should work"))
ok(wms)

View File

@ -18,7 +18,7 @@ type SqlQueryStr = string
### SQLite column helper methods
proc queryRowWakuMessageCallback(s: ptr sqlite3_stmt, contentTopicCol, payloadCol, versionCol, senderTimestampCol: cint): WakuMessage {.inline.} =
proc queryRowWakuMessageCallback(s: ptr sqlite3_stmt, contentTopicCol, payloadCol, versionCol, senderTimestampCol: cint): WakuMessage =
let
topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, contentTopicCol))
topicLength = sqlite3_column_bytes(s, 1)
@ -37,11 +37,11 @@ proc queryRowWakuMessageCallback(s: ptr sqlite3_stmt, contentTopicCol, payloadCo
timestamp: Timestamp(senderTimestamp)
)
proc queryRowReceiverTimestampCallback(s: ptr sqlite3_stmt, receiverTimestampCol: cint): Timestamp {.inline.} =
proc queryRowReceiverTimestampCallback(s: ptr sqlite3_stmt, receiverTimestampCol: cint): Timestamp =
let receiverTimestamp = sqlite3_column_int64(s, receiverTimestampCol)
Timestamp(receiverTimestamp)
proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): string {.inline.} =
proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): string =
let
pubsubTopicPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, pubsubTopicCol))
pubsubTopicLength = sqlite3_column_bytes(s, 3)
@ -55,7 +55,7 @@ proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): str
## Create table
template createTableQuery(table: string): SqlQueryStr =
proc createTableQuery(table: string): SqlQueryStr =
"CREATE TABLE IF NOT EXISTS " & table & " (" &
" id BLOB," &
" receiverTimestamp INTEGER NOT NULL," &
@ -67,19 +67,28 @@ template createTableQuery(table: string): SqlQueryStr =
" CONSTRAINT messageIndex PRIMARY KEY (senderTimestamp, id, pubsubTopic)" &
") WITHOUT ROWID;"
proc createTable*(db: SqliteDatabase): DatabaseResult[void] {.inline.} =
proc createTable*(db: SqliteDatabase): DatabaseResult[void] =
let query = createTableQuery(DbTable)
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
ok()
## Create index
## Create indices
template createIndexQuery(table: string): SqlQueryStr =
proc createOldestMessageTimestampIndexQuery(table: string): SqlQueryStr =
"CREATE INDEX IF NOT EXISTS i_rt ON " & table & " (receiverTimestamp);"
proc createOldestMessageTimestampIndex*(db: SqliteDatabase): DatabaseResult[void] =
let query = createOldestMessageTimestampIndexQuery(DbTable)
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
ok()
proc createHistoryQueryIndexQuery(table: string): SqlQueryStr =
"CREATE INDEX IF NOT EXISTS i_msg ON " & table & " (contentTopic, pubsubTopic, senderTimestamp, id);"
proc createIndex*(db: SqliteDatabase): DatabaseResult[void] {.inline.} =
let query = createIndexQuery(DbTable)
proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] =
let query = createHistoryQueryIndexQuery(DbTable)
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
ok()
@ -87,7 +96,7 @@ proc createIndex*(db: SqliteDatabase): DatabaseResult[void] {.inline.} =
## Insert message
type InsertMessageParams* = (seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp)
template insertMessageQuery(table: string): SqlQueryStr =
proc insertMessageQuery(table: string): SqlQueryStr =
"INSERT INTO " & table & "(id, receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp)" &
" VALUES (?, ?, ?, ?, ?, ?, ?);"
@ -98,10 +107,10 @@ proc prepareInsertMessageStmt*(db: SqliteDatabase): SqliteStmt[InsertMessagePara
## Count table messages
template countMessagesQuery(table: string): SqlQueryStr =
proc countMessagesQuery(table: string): SqlQueryStr =
"SELECT COUNT(*) FROM " & table
proc getMessageCount*(db: SqliteDatabase): DatabaseResult[int64] {.inline.} =
proc getMessageCount*(db: SqliteDatabase): DatabaseResult[int64] =
var count: int64
proc queryRowCallback(s: ptr sqlite3_stmt) =
count = sqlite3_column_int64(s, 0)
@ -116,7 +125,7 @@ proc getMessageCount*(db: SqliteDatabase): DatabaseResult[int64] {.inline.} =
## Get oldest receiver timestamp
template selectOldestMessageTimestampQuery(table: string): SqlQueryStr =
proc selectOldestMessageTimestampQuery(table: string): SqlQueryStr =
"SELECT MIN(receiverTimestamp) FROM " & table
proc selectOldestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.}=
@ -134,10 +143,10 @@ proc selectOldestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestam
## Delete messages older than timestamp
template deleteMessagesOlderThanTimestampQuery(table: string, ts: Timestamp): SqlQueryStr =
proc deleteMessagesOlderThanTimestampQuery(table: string, ts: Timestamp): SqlQueryStr =
"DELETE FROM " & table & " WHERE receiverTimestamp < " & $ts
proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64): DatabaseResult[void] {.inline.} =
proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64): DatabaseResult[void] =
let query = deleteMessagesOlderThanTimestampQuery(DbTable, ts)
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
ok()
@ -145,14 +154,14 @@ proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64): DatabaseR
## Delete oldest messages not within limit
template deleteOldestMessagesNotWithinLimitQuery*(table: string, limit: int): SqlQueryStr =
proc deleteOldestMessagesNotWithinLimitQuery*(table: string, limit: int): SqlQueryStr =
"DELETE FROM " & table & " WHERE id NOT IN (" &
" SELECT id FROM " & table &
" ORDER BY receiverTimestamp DESC" &
" LIMIT " & $limit &
");"
proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int): DatabaseResult[void] {.inline.} =
proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int): DatabaseResult[void] =
# NOTE: The word `limit` here refers the store capacity/maximum number-of-messages allowed limit
let query = deleteOldestMessagesNotWithinLimitQuery(DbTable, limit=limit)
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
@ -161,7 +170,7 @@ proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int): Databa
## Select all messages
template selectAllMessagesQuery(table: string): SqlQueryStr =
proc selectAllMessagesQuery(table: string): SqlQueryStr =
"SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp" &
" FROM " & table &
" ORDER BY receiverTimestamp ASC"

View File

@ -1,3 +1 @@
DROP INDEX IF EXISTS i_rt;
CREATE INDEX IF NOT EXISTS i_msg ON Message (contentTopic, pubsubTopic, senderTimestamp, id);

View File

@ -212,23 +212,23 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} =
# Add message to in-memory store
if not w.isSqliteOnly:
# Handle WakuMessage according to store protocol
trace "handle message in WakuStore", topic=topic, msg=msg
let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic))
if addRes.isErr():
trace "Attempt to add message to store failed", msg=msg, index=index, err=addRes.error()
debug "Attempt to add message to store failed", msg=msg, index=index, err=addRes.error()
waku_store_errors.inc(labelValues = [$(addRes.error())])
return
waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"])
# Add messages to persistent store, if present
if w.store.isNil:
return
let res = w.store.put(index, msg, topic)
if res.isErr():
trace "failed to store messages", err=res.error()
debug "failed to store messages", err=res.error()
waku_store_errors.inc(labelValues = [storeFailure])