deploy: adf1dab34fd3f354fab14ac516656b9548825535

This commit is contained in:
LNSD 2022-10-03 14:50:45 +00:00
parent 24b669b862
commit 2dc084204b
6 changed files with 57 additions and 44 deletions

View File

@ -28,8 +28,8 @@ proc newTestDatabase(): SqliteDatabase =
proc now(): Timestamp =
getNanosecondTime(getTime().toUnixFloat())
proc getTestTimestamp(offset=0): Timestamp =
Timestamp(getNanosecondTime(getTime().toUnixFloat() + offset.float))
proc ts(offset=0, origin=now()): Timestamp =
origin + getNanosecondTime(offset)
proc fakeWakuMessage(
payload = "TEST-PAYLOAD",
@ -113,19 +113,19 @@ suite "SQLite message store - insert messages":
retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=storeCapacity)
let messages = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0),
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1),
fakeWakuMessage(ts=ts(0)),
fakeWakuMessage(ts=ts(1)),
fakeWakuMessage(contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2),
fakeWakuMessage(contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3),
fakeWakuMessage(contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4),
fakeWakuMessage(contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5),
fakeWakuMessage(contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6)
fakeWakuMessage(contentTopic=contentTopic, ts=ts(2)),
fakeWakuMessage(contentTopic=contentTopic, ts=ts(3)),
fakeWakuMessage(contentTopic=contentTopic, ts=ts(4)),
fakeWakuMessage(contentTopic=contentTopic, ts=ts(5)),
fakeWakuMessage(contentTopic=contentTopic, ts=ts(6))
]
## When
for msg in messages:
require store.put(DefaultPubsubTopic, msg).isOk()
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require retentionPolicy.execute(store).isOk()
## Then
@ -150,8 +150,8 @@ suite "Message Store":
store = SqliteStore.init(database).get()
let
t1 = getTestTimestamp(0)
t2 = getTestTimestamp(1)
t1 = ts(0)
t2 = ts(1)
t3 = high(int64)
var msgs = @[
@ -265,21 +265,18 @@ suite "Message Store":
ver.isErr == false
ver.value == 10
# TODO: Move this test case to retention policy test suite
test "number of messages retrieved by getAll is bounded by storeCapacity":
let
contentTopic = ContentTopic("/waku/2/default-content/proto")
pubsubTopic = "/waku/2/default-waku/proto"
capacity = 10
let capacity = 10
let
database = newTestDatabase()
store = SqliteStore.init(database).tryGet()
retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity)
for i in 1..capacity:
let msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i))
require store.put(pubsubTopic, msg).isOk()
let msg = WakuMessage(payload: @[byte i], contentTopic: DefaultContentTopic, version: 0, timestamp: Timestamp(i))
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require retentionPolicy.execute(store).isOk()
## Then
@ -297,6 +294,7 @@ suite "Message Store":
## Cleanup
store.close()
# TODO: Move this test case to retention policy test suite
test "DB store capacity":
let
contentTopic = ContentTopic("/waku/2/default-content/proto")

View File

@ -2,7 +2,7 @@
# libtool - Provide generalized library-building support services.
# Generated automatically by config.status (libbacktrace) version-unused
# Libtool was configured on host fv-az451-748:
# Libtool was configured on host fv-az278-782:
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
#
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,

View File

@ -7,7 +7,6 @@ import
import
../../sqlite,
../../../../protocol/waku_message,
../../../../protocol/waku_store/pagination,
../../../../utils/time
@ -15,6 +14,8 @@ const DbTable = "Message"
type SqlQueryStr = string
type DbCursor* = (Timestamp, seq[byte], string)
### SQLite column helper methods
@ -57,14 +58,14 @@ proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): str
proc createTableQuery(table: string): SqlQueryStr =
"CREATE TABLE IF NOT EXISTS " & table & " (" &
" id BLOB," &
" storedAt INTEGER NOT NULL," &
" contentTopic BLOB NOT NULL," &
" pubsubTopic BLOB NOT NULL," &
" contentTopic BLOB NOT NULL," &
" payload BLOB," &
" version INTEGER NOT NULL," &
" senderTimestamp INTEGER NOT NULL," &
" CONSTRAINT messageIndex PRIMARY KEY (senderTimestamp, id, pubsubTopic)" &
" timestamp INTEGER NOT NULL," &
" id BLOB," &
" storedAt INTEGER NOT NULL," &
" CONSTRAINT messageIndex PRIMARY KEY (storedAt, id, pubsubTopic)" &
") WITHOUT ROWID;"
proc createTable*(db: SqliteDatabase): DatabaseResult[void] =
@ -97,7 +98,7 @@ proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] =
type InsertMessageParams* = (seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp)
proc insertMessageQuery(table: string): SqlQueryStr =
"INSERT INTO " & table & "(id, storedAt, contentTopic, payload, pubsubTopic, version, senderTimestamp)" &
"INSERT INTO " & table & "(id, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" &
" VALUES (?, ?, ?, ?, ?, ?, ?);"
proc prepareInsertMessageStmt*(db: SqliteDatabase): SqliteStmt[InsertMessageParams, void] =
@ -187,7 +188,7 @@ proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int): Databa
## Select all messages
proc selectAllMessagesQuery(table: string): SqlQueryStr =
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, senderTimestamp" &
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp" &
" FROM " & table &
" ORDER BY storedAt ASC"
@ -227,7 +228,7 @@ proc contentTopicWhereClause(contentTopic: Option[seq[ContentTopic]]): Option[st
contentTopicWhere &= ")"
some(contentTopicWhere)
proc cursorWhereClause(cursor: Option[PagingIndex], ascending=true): Option[string] =
proc cursorWhereClause(cursor: Option[DbCursor], ascending=true): Option[string] =
if cursor.isNone():
return none(string)
@ -273,13 +274,13 @@ proc selectMessagesWithLimitQuery(table: string, where: Option[string], limit: u
var query: string
query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, senderTimestamp"
query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp"
query &= " FROM " & table
if where.isSome():
query &= " WHERE " & where.get()
query &= " ORDER BY storedAt " & order
query &= " ORDER BY storedAt " & order & ", id " & order & ", pubsubTopic " & order
query &= " LIMIT " & $limit & ";"
query
@ -292,7 +293,7 @@ proc prepareSelectMessagesWithlimitStmt(db: SqliteDatabase, stmt: string): Datab
proc execSelectMessagesWithLimitStmt(s: SqliteStmt,
contentTopic: Option[seq[ContentTopic]],
pubsubTopic: Option[string],
cursor: Option[PagingIndex],
cursor: Option[DbCursor],
startTime: Option[Timestamp],
endTime: Option[Timestamp],
onRowCallback: DataProc): DatabaseResult[void] =
@ -302,21 +303,16 @@ proc execSelectMessagesWithLimitStmt(s: SqliteStmt,
var paramIndex = 1
if contentTopic.isSome():
for topic in contentTopic.get():
let topicBlob = toBytes(topic)
checkErr bindParam(s, paramIndex, topicBlob)
checkErr bindParam(s, paramIndex, topic.toBytes())
paramIndex += 1
if cursor.isSome(): # cursor = senderTimestamp, id, pubsubTopic
let senderTimestamp = cursor.get().senderTime
checkErr bindParam(s, paramIndex, senderTimestamp)
if cursor.isSome(): # cursor = storedAt, id, pubsubTopic
let (storedAt, id, pubsubTopic) = cursor.get()
checkErr bindParam(s, paramIndex, storedAt)
paramIndex += 1
let id = @(cursor.get().digest.data)
checkErr bindParam(s, paramIndex, id)
paramIndex += 1
let pubsubTopic = toBytes(cursor.get().pubsubTopic)
checkErr bindParam(s, paramIndex, pubsubTopic)
checkErr bindParam(s, paramIndex, pubsubTopic.toBytes())
paramIndex += 1
if pubsubTopic.isSome():
@ -353,7 +349,7 @@ proc execSelectMessagesWithLimitStmt(s: SqliteStmt,
proc selectMessagesByHistoryQueryWithLimit*(db: SqliteDatabase,
contentTopic: Option[seq[ContentTopic]],
pubsubTopic: Option[string],
cursor: Option[PagingIndex],
cursor: Option[DbCursor],
startTime: Option[Timestamp],
endTime: Option[Timestamp],
limit: uint64,

View File

@ -98,6 +98,7 @@ method getMessagesByHistoryQuery*(
maxPageSize = DefaultPageSize,
ascendingOrder = true
): MessageStoreResult[MessageStorePage] =
let cursor = cursor.map(proc(c: PagingIndex): DbCursor = (c.receiverTime, @(c.digest.data), c.pubsubTopic))
let rows = ?s.db.selectMessagesByHistoryQueryWithLimit(
contentTopic,

View File

@ -7,7 +7,7 @@ const MESSAGE_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts/message"
const PEER_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts/peer"
const ALL_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts"
const USER_VERSION* = 6 # increase this when there is an update in the database schema
const USER_VERSION* = 7 # increase this when there is an update in the database schema
type MigrationScriptsResult*[T] = Result[T, string]
type

View File

@ -0,0 +1,18 @@
ALTER TABLE message RENAME TO message_backup;
CREATE TABLE IF NOT EXISTS message(
pubsubTopic BLOB NOT NULL,
contentTopic BLOB NOT NULL,
payload BLOB,
version INTEGER NOT NULL,
timestamp INTEGER NOT NULL,
id BLOB,
storedAt INTEGER NOT NULL,
CONSTRAINT messageIndex PRIMARY KEY (storedAt, id, pubsubTopic)
) WITHOUT ROWID;
INSERT OR IGNORE INTO message(pubsubTopic, contentTopic, payload, version, timestamp, id, storedAt)
SELECT pubsubTopic, contentTopic, payload, version, senderTimestamp, id, storedAt
FROM message_backup;
DROP TABLE message_backup;