fix(store): update sqlite db table primary key and decouple queries module from pagination types

This commit is contained in:
Lorenzo Delgado 2022-10-03 15:23:39 +02:00 committed by GitHub
parent 8af3438814
commit adf1dab34f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 56 additions and 43 deletions

View File

@ -28,8 +28,8 @@ proc newTestDatabase(): SqliteDatabase =
proc now(): Timestamp =
getNanosecondTime(getTime().toUnixFloat())
proc getTestTimestamp(offset=0): Timestamp =
Timestamp(getNanosecondTime(getTime().toUnixFloat() + offset.float))
proc ts(offset=0, origin=now()): Timestamp =
origin + getNanosecondTime(offset)
proc fakeWakuMessage(
payload = "TEST-PAYLOAD",
@ -113,19 +113,19 @@ suite "SQLite message store - insert messages":
retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity)
let messages = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0),
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1),
fakeWakuMessage(ts=ts(0)),
fakeWakuMessage(ts=ts(1)),
fakeWakuMessage(contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2),
fakeWakuMessage(contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3),
fakeWakuMessage(contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4),
fakeWakuMessage(contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5),
fakeWakuMessage(contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6)
fakeWakuMessage(contentTopic=contentTopic, ts=ts(2)),
fakeWakuMessage(contentTopic=contentTopic, ts=ts(3)),
fakeWakuMessage(contentTopic=contentTopic, ts=ts(4)),
fakeWakuMessage(contentTopic=contentTopic, ts=ts(5)),
fakeWakuMessage(contentTopic=contentTopic, ts=ts(6))
]
## When
for msg in messages:
require store.put(DefaultPubsubTopic, msg).isOk()
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require retentionPolicy.execute(store).isOk()
## Then
@ -150,8 +150,8 @@ suite "Message Store":
store = SqliteStore.init(database).get()
let
t1 = getTestTimestamp(0)
t2 = getTestTimestamp(1)
t1 = ts(0)
t2 = ts(1)
t3 = high(int64)
var msgs = @[
@ -265,21 +265,18 @@ suite "Message Store":
ver.isErr == false
ver.value == 10
# TODO: Move this test case to retention policy test suite
test "number of messages retrieved by getAll is bounded by storeCapacity":
let
contentTopic = ContentTopic("/waku/2/default-content/proto")
pubsubTopic = "/waku/2/default-waku/proto"
capacity = 10
let capacity = 10
let
database = newTestDatabase()
store = SqliteStore.init(database).tryGet()
retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity)
for i in 1..capacity:
let msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i))
require store.put(pubsubTopic, msg).isOk()
let msg = WakuMessage(payload: @[byte i], contentTopic: DefaultContentTopic, version: 0, timestamp: Timestamp(i))
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require retentionPolicy.execute(store).isOk()
## Then
@ -297,6 +294,7 @@ suite "Message Store":
## Cleanup
store.close()
# TODO: Move this test case to retention policy test suite
test "DB store capacity":
let
contentTopic = ContentTopic("/waku/2/default-content/proto")

View File

@ -7,7 +7,6 @@ import
import
../../sqlite,
../../../../protocol/waku_message,
../../../../protocol/waku_store/pagination,
../../../../utils/time
@ -15,6 +14,8 @@ const DbTable = "Message"
type SqlQueryStr = string
type DbCursor* = (Timestamp, seq[byte], string)
### SQLite column helper methods
@ -57,14 +58,14 @@ proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): str
proc createTableQuery(table: string): SqlQueryStr =
"CREATE TABLE IF NOT EXISTS " & table & " (" &
" id BLOB," &
" storedAt INTEGER NOT NULL," &
" contentTopic BLOB NOT NULL," &
" pubsubTopic BLOB NOT NULL," &
" contentTopic BLOB NOT NULL," &
" payload BLOB," &
" version INTEGER NOT NULL," &
" senderTimestamp INTEGER NOT NULL," &
" CONSTRAINT messageIndex PRIMARY KEY (senderTimestamp, id, pubsubTopic)" &
" timestamp INTEGER NOT NULL," &
" id BLOB," &
" storedAt INTEGER NOT NULL," &
" CONSTRAINT messageIndex PRIMARY KEY (storedAt, id, pubsubTopic)" &
") WITHOUT ROWID;"
proc createTable*(db: SqliteDatabase): DatabaseResult[void] =
@ -97,7 +98,7 @@ proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] =
type InsertMessageParams* = (seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp)
proc insertMessageQuery(table: string): SqlQueryStr =
"INSERT INTO " & table & "(id, storedAt, contentTopic, payload, pubsubTopic, version, senderTimestamp)" &
"INSERT INTO " & table & "(id, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" &
" VALUES (?, ?, ?, ?, ?, ?, ?);"
proc prepareInsertMessageStmt*(db: SqliteDatabase): SqliteStmt[InsertMessageParams, void] =
@ -187,7 +188,7 @@ proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int): Databa
## Select all messages
proc selectAllMessagesQuery(table: string): SqlQueryStr =
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, senderTimestamp" &
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp" &
" FROM " & table &
" ORDER BY storedAt ASC"
@ -227,7 +228,7 @@ proc contentTopicWhereClause(contentTopic: Option[seq[ContentTopic]]): Option[st
contentTopicWhere &= ")"
some(contentTopicWhere)
proc cursorWhereClause(cursor: Option[PagingIndex], ascending=true): Option[string] =
proc cursorWhereClause(cursor: Option[DbCursor], ascending=true): Option[string] =
if cursor.isNone():
return none(string)
@ -273,13 +274,13 @@ proc selectMessagesWithLimitQuery(table: string, where: Option[string], limit: u
var query: string
query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, senderTimestamp"
query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp"
query &= " FROM " & table
if where.isSome():
query &= " WHERE " & where.get()
query &= " ORDER BY storedAt " & order
query &= " ORDER BY storedAt " & order & ", id " & order & ", pubsubTopic " & order
query &= " LIMIT " & $limit & ";"
query
@ -292,7 +293,7 @@ proc prepareSelectMessagesWithlimitStmt(db: SqliteDatabase, stmt: string): Datab
proc execSelectMessagesWithLimitStmt(s: SqliteStmt,
contentTopic: Option[seq[ContentTopic]],
pubsubTopic: Option[string],
cursor: Option[PagingIndex],
cursor: Option[DbCursor],
startTime: Option[Timestamp],
endTime: Option[Timestamp],
onRowCallback: DataProc): DatabaseResult[void] =
@ -302,21 +303,16 @@ proc execSelectMessagesWithLimitStmt(s: SqliteStmt,
var paramIndex = 1
if contentTopic.isSome():
for topic in contentTopic.get():
let topicBlob = toBytes(topic)
checkErr bindParam(s, paramIndex, topicBlob)
checkErr bindParam(s, paramIndex, topic.toBytes())
paramIndex += 1
if cursor.isSome(): # cursor = senderTimestamp, id, pubsubTopic
let senderTimestamp = cursor.get().senderTime
checkErr bindParam(s, paramIndex, senderTimestamp)
if cursor.isSome(): # cursor = storedAt, id, pubsubTopic
let (storedAt, id, pubsubTopic) = cursor.get()
checkErr bindParam(s, paramIndex, storedAt)
paramIndex += 1
let id = @(cursor.get().digest.data)
checkErr bindParam(s, paramIndex, id)
paramIndex += 1
let pubsubTopic = toBytes(cursor.get().pubsubTopic)
checkErr bindParam(s, paramIndex, pubsubTopic)
checkErr bindParam(s, paramIndex, pubsubTopic.toBytes())
paramIndex += 1
if pubsubTopic.isSome():
@ -353,7 +349,7 @@ proc execSelectMessagesWithLimitStmt(s: SqliteStmt,
proc selectMessagesByHistoryQueryWithLimit*(db: SqliteDatabase,
contentTopic: Option[seq[ContentTopic]],
pubsubTopic: Option[string],
cursor: Option[PagingIndex],
cursor: Option[DbCursor],
startTime: Option[Timestamp],
endTime: Option[Timestamp],
limit: uint64,

View File

@ -98,6 +98,7 @@ method getMessagesByHistoryQuery*(
maxPageSize = DefaultPageSize,
ascendingOrder = true
): MessageStoreResult[MessageStorePage] =
let cursor = cursor.map(proc(c: PagingIndex): DbCursor = (c.receiverTime, @(c.digest.data), c.pubsubTopic))
let rows = ?s.db.selectMessagesByHistoryQueryWithLimit(
contentTopic,

View File

@ -7,7 +7,7 @@ const MESSAGE_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts/message"
const PEER_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts/peer"
const ALL_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts"
const USER_VERSION* = 6 # increase this when there is an update in the database schema
const USER_VERSION* = 7 # increase this when there is an update in the database schema
type MigrationScriptsResult*[T] = Result[T, string]
type

View File

@ -0,0 +1,18 @@
ALTER TABLE message RENAME TO message_backup;
CREATE TABLE IF NOT EXISTS message(
pubsubTopic BLOB NOT NULL,
contentTopic BLOB NOT NULL,
payload BLOB,
version INTEGER NOT NULL,
timestamp INTEGER NOT NULL,
id BLOB,
storedAt INTEGER NOT NULL,
CONSTRAINT messageIndex PRIMARY KEY (storedAt, id, pubsubTopic)
) WITHOUT ROWID;
INSERT OR IGNORE INTO message(pubsubTopic, contentTopic, payload, version, timestamp, id, storedAt)
SELECT pubsubTopic, contentTopic, payload, version, senderTimestamp, id, storedAt
FROM message_backup;
DROP TABLE message_backup;