deploy: a0fd36393c1d37cae99e3b1045e123a094c25741

This commit is contained in:
LNSD 2022-09-27 10:12:52 +00:00
parent 8f7f26b960
commit b2756ba6f4
8 changed files with 187 additions and 110 deletions

View File

@ -19,13 +19,19 @@ const
DefaultContentTopic = ContentTopic("/waku/2/default-content/proto") DefaultContentTopic = ContentTopic("/waku/2/default-content/proto")
proc now(): Timestamp =
getNanosecondTime(getTime().toUnixFloat())
proc ts(offset=0, origin=now()): Timestamp =
origin + getNanosecondTime(offset)
proc newTestDatabase(): SqliteDatabase = proc newTestDatabase(): SqliteDatabase =
SqliteDatabase.init("", inMemory = true).tryGet() SqliteDatabase.init("", inMemory = true).tryGet()
proc fakeWakuMessage( proc fakeWakuMessage(
payload = "TEST-PAYLOAD", payload = "TEST-PAYLOAD",
contentTopic = DefaultContentTopic, contentTopic = DefaultContentTopic,
ts = getNanosecondTime(epochTime()) ts = now()
): WakuMessage = ): WakuMessage =
WakuMessage( WakuMessage(
payload: toBytes(payload), payload: toBytes(payload),
@ -46,20 +52,20 @@ suite "message store - history query":
store = SqliteStore.init(database).tryGet() store = SqliteStore.init(database).tryGet()
let messages = @[ let messages = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), fakeWakuMessage(ts=ts(0)),
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1), fakeWakuMessage(ts=ts(1)),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2), fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=ts(2)),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3), fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=ts(3)),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4), fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=ts(4)),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5), fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=ts(5)),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6), fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=ts(6)),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7), fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=ts(7)),
] ]
for msg in messages: for msg in messages:
require store.put(DefaultPubsubTopic, msg).isOk() require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
## When ## When
let res = store.getMessagesByHistoryQuery( let res = store.getMessagesByHistoryQuery(
@ -94,20 +100,20 @@ suite "message store - history query":
store = SqliteStore.init(database).tryGet() store = SqliteStore.init(database).tryGet()
let messages = @[ let messages = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), fakeWakuMessage(ts=ts(0)),
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1), fakeWakuMessage(ts=ts(1)),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2), fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=ts(2)),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3), fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=ts(3)),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4), fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=ts(4)),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5), fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=ts(5)),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6), fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=ts(6)),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7), fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=ts(7)),
] ]
for msg in messages: for msg in messages:
require store.put(DefaultPubsubTopic, msg).isOk() require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
## When ## When
let res = store.getMessagesByHistoryQuery( let res = store.getMessagesByHistoryQuery(
@ -144,20 +150,20 @@ suite "message store - history query":
store = SqliteStore.init(database).tryGet() store = SqliteStore.init(database).tryGet()
let messages = @[ let messages = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), fakeWakuMessage(ts=ts(0)),
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1), fakeWakuMessage(ts=ts(1)),
fakeWakuMessage("MSG-01", contentTopic=contentTopic1, ts=getNanosecondTime(epochTime()) + 2), fakeWakuMessage("MSG-01", contentTopic=contentTopic1, ts=ts(2)),
fakeWakuMessage("MSG-02", contentTopic=contentTopic2, ts=getNanosecondTime(epochTime()) + 3), fakeWakuMessage("MSG-02", contentTopic=contentTopic2, ts=ts(3)),
fakeWakuMessage("MSG-03", contentTopic=contentTopic3, ts=getNanosecondTime(epochTime()) + 4), fakeWakuMessage("MSG-03", contentTopic=contentTopic3, ts=ts(4)),
fakeWakuMessage("MSG-04", contentTopic=contentTopic1, ts=getNanosecondTime(epochTime()) + 5), fakeWakuMessage("MSG-04", contentTopic=contentTopic1, ts=ts(5)),
fakeWakuMessage("MSG-05", contentTopic=contentTopic2, ts=getNanosecondTime(epochTime()) + 6), fakeWakuMessage("MSG-05", contentTopic=contentTopic2, ts=ts(6)),
fakeWakuMessage("MSG-06", contentTopic=contentTopic3, ts=getNanosecondTime(epochTime()) + 7), fakeWakuMessage("MSG-06", contentTopic=contentTopic3, ts=ts(7)),
] ]
for msg in messages: for msg in messages:
require store.put(DefaultPubsubTopic, msg).isOk() require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
## When ## When
let res = store.getMessagesByHistoryQuery( let res = store.getMessagesByHistoryQuery(
@ -193,24 +199,24 @@ suite "message store - history query":
store = SqliteStore.init(database).tryGet() store = SqliteStore.init(database).tryGet()
let messages1 = @[ let messages1 = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), fakeWakuMessage(ts=ts(0)),
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1), fakeWakuMessage(ts=ts(1)),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2), fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=ts(2)),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3), fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=ts(3)),
] ]
for msg in messages1: for msg in messages1:
require store.put(DefaultPubsubTopic, msg).isOk() require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let messages2 = @[ let messages2 = @[
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4), fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=ts(4)),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5), fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=ts(5)),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6), fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=ts(6)),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7), fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=ts(7)),
] ]
for msg in messages2: for msg in messages2:
require store.put(pubsubTopic, msg).isOk() require store.put(pubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
## When ## When
let res = store.getMessagesByHistoryQuery( let res = store.getMessagesByHistoryQuery(
@ -246,20 +252,20 @@ suite "message store - history query":
store = SqliteStore.init(database).tryGet() store = SqliteStore.init(database).tryGet()
let messages = @[ let messages = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), fakeWakuMessage(ts=ts(0)),
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1), fakeWakuMessage(ts=ts(1)),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2), fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=ts(2)),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3), fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=ts(3)),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4), fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=ts(4)),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5), fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=ts(5)),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6), fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=ts(6)),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7), fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=ts(7)),
] ]
for msg in messages: for msg in messages:
require store.put(DefaultPubsubTopic, msg).isOk() require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let cursor = Index.compute(messages[4], messages[4].timestamp, DefaultPubsubTopic) let cursor = Index.compute(messages[4], messages[4].timestamp, DefaultPubsubTopic)
@ -297,20 +303,20 @@ suite "message store - history query":
store = SqliteStore.init(database).tryGet() store = SqliteStore.init(database).tryGet()
let messages = @[ let messages = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), fakeWakuMessage(ts=ts(0)),
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1), fakeWakuMessage(ts=ts(1)),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2), fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=ts(2)),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3), fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=ts(3)),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4), fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=ts(4)),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5), fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=ts(5)),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6), fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=ts(6)),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7), fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=ts(7)),
] ]
for msg in messages: for msg in messages:
require store.put(DefaultPubsubTopic, msg).isOk() require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let cursor = Index.compute(messages[6], messages[6].timestamp, DefaultPubsubTopic) let cursor = Index.compute(messages[6], messages[6].timestamp, DefaultPubsubTopic)
@ -349,22 +355,22 @@ suite "message store - history query":
store = SqliteStore.init(database).tryGet() store = SqliteStore.init(database).tryGet()
let messages1 = @[ let messages1 = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0), fakeWakuMessage(ts=ts(0)),
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1), fakeWakuMessage(ts=ts(1)),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2), fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=ts(2)),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3), fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=ts(3)),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4), fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=ts(4)),
] ]
for msg in messages1: for msg in messages1:
require store.put(DefaultPubsubTopic, msg).isOk() require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let messages2 = @[ let messages2 = @[
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5), fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=ts(5)),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6), fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=ts(6)),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7), fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=ts(7)),
] ]
for msg in messages2: for msg in messages2:
require store.put(pubsubTopic, msg).isOk() require store.put(pubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let cursor = Index.compute(messages2[0], messages2[0].timestamp, DefaultPubsubTopic) let cursor = Index.compute(messages2[0], messages2[0].timestamp, DefaultPubsubTopic)
@ -403,15 +409,15 @@ suite "message store - history query":
store = SqliteStore.init(database).tryGet() store = SqliteStore.init(database).tryGet()
let messages = @[ let messages = @[
fakeWakuMessage("MSG-01", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 2), fakeWakuMessage("MSG-01", contentTopic=DefaultContentTopic, ts=ts(2)),
fakeWakuMessage("MSG-02", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 3), fakeWakuMessage("MSG-02", contentTopic=DefaultContentTopic, ts=ts(3)),
fakeWakuMessage("MSG-03", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 4), fakeWakuMessage("MSG-03", contentTopic=DefaultContentTopic, ts=ts(4)),
fakeWakuMessage("MSG-04", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 5), fakeWakuMessage("MSG-04", contentTopic=DefaultContentTopic, ts=ts(5)),
fakeWakuMessage("MSG-05", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 6), fakeWakuMessage("MSG-05", contentTopic=DefaultContentTopic, ts=ts(6)),
] ]
for msg in messages: for msg in messages:
require store.put(DefaultPubsubTopic, msg).isOk() require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
## When ## When
let res = store.getMessagesByHistoryQuery( let res = store.getMessagesByHistoryQuery(
@ -431,6 +437,68 @@ suite "message store - history query":
## Teardown ## Teardown
store.close() store.close()
test "content topic and page size":
## Given
let pageSize: uint64 = 50
let
database = newTestDatabase()
store = SqliteStore.init(database).tryGet()
for t in 0..<70:
let msg = fakeWakuMessage("MSG-" & $t, DefaultContentTopic, ts=ts(t))
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[DefaultContentTopic]),
maxPageSize=pageSize,
ascendingOrder=true
)
## Then
check:
res.isOk()
let (filteredMessages, pagingInfo) = res.tryGet()
check:
filteredMessages.len == 50
pagingInfo.isSome()
pagingInfo.get().pageSize == 50
## Teardown
store.close()
test "content topic and page size - not enough messages stored":
## Given
let pageSize: uint64 = 50
let
database = newTestDatabase()
store = SqliteStore.init(database).tryGet()
for t in 0..<40:
let msg = fakeWakuMessage("MSG-" & $t, DefaultContentTopic, ts=ts(t))
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[DefaultContentTopic]),
maxPageSize=pageSize,
ascendingOrder=true
)
## Then
check:
res.isOk()
let (filteredMessages, pagingInfo) = res.tryGet()
check:
filteredMessages.len == 40
pagingInfo.isSome()
pagingInfo.get().pageSize == 40
## Teardown
store.close()
test "single content topic and valid time range": test "single content topic and valid time range":
## Given ## Given

View File

@ -42,7 +42,7 @@ method getMessagesByHistoryQuery*(
cursor = none(Index), cursor = none(Index),
startTime = none(Timestamp), startTime = none(Timestamp),
endTime = none(Timestamp), endTime = none(Timestamp),
maxPageSize = MaxPageSize, maxPageSize = DefaultPageSize,
ascendingOrder = true ascendingOrder = true
): MessageStoreResult[MessageStorePage] {.base.} = discard ): MessageStoreResult[MessageStorePage] {.base.} = discard

View File

@ -37,9 +37,9 @@ proc queryRowWakuMessageCallback(s: ptr sqlite3_stmt, contentTopicCol, payloadCo
timestamp: Timestamp(senderTimestamp) timestamp: Timestamp(senderTimestamp)
) )
proc queryRowReceiverTimestampCallback(s: ptr sqlite3_stmt, receiverTimestampCol: cint): Timestamp = proc queryRowReceiverTimestampCallback(s: ptr sqlite3_stmt, storedAtCol: cint): Timestamp =
let receiverTimestamp = sqlite3_column_int64(s, receiverTimestampCol) let storedAt = sqlite3_column_int64(s, storedAtCol)
Timestamp(receiverTimestamp) Timestamp(storedAt)
proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): string = proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): string =
let let
@ -58,7 +58,7 @@ proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): str
proc createTableQuery(table: string): SqlQueryStr = proc createTableQuery(table: string): SqlQueryStr =
"CREATE TABLE IF NOT EXISTS " & table & " (" & "CREATE TABLE IF NOT EXISTS " & table & " (" &
" id BLOB," & " id BLOB," &
" receiverTimestamp INTEGER NOT NULL," & " storedAt INTEGER NOT NULL," &
" contentTopic BLOB NOT NULL," & " contentTopic BLOB NOT NULL," &
" pubsubTopic BLOB NOT NULL," & " pubsubTopic BLOB NOT NULL," &
" payload BLOB," & " payload BLOB," &
@ -76,7 +76,7 @@ proc createTable*(db: SqliteDatabase): DatabaseResult[void] =
## Create indices ## Create indices
proc createOldestMessageTimestampIndexQuery(table: string): SqlQueryStr = proc createOldestMessageTimestampIndexQuery(table: string): SqlQueryStr =
"CREATE INDEX IF NOT EXISTS i_rt ON " & table & " (receiverTimestamp);" "CREATE INDEX IF NOT EXISTS i_ts ON " & table & " (storedAt);"
proc createOldestMessageTimestampIndex*(db: SqliteDatabase): DatabaseResult[void] = proc createOldestMessageTimestampIndex*(db: SqliteDatabase): DatabaseResult[void] =
let query = createOldestMessageTimestampIndexQuery(DbTable) let query = createOldestMessageTimestampIndexQuery(DbTable)
@ -85,7 +85,7 @@ proc createOldestMessageTimestampIndex*(db: SqliteDatabase): DatabaseResult[void
proc createHistoryQueryIndexQuery(table: string): SqlQueryStr = proc createHistoryQueryIndexQuery(table: string): SqlQueryStr =
"CREATE INDEX IF NOT EXISTS i_msg ON " & table & " (contentTopic, pubsubTopic, senderTimestamp, id);" "CREATE INDEX IF NOT EXISTS i_query ON " & table & " (contentTopic, pubsubTopic, storedAt, id);"
proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] = proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] =
let query = createHistoryQueryIndexQuery(DbTable) let query = createHistoryQueryIndexQuery(DbTable)
@ -97,7 +97,7 @@ proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] =
type InsertMessageParams* = (seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp) type InsertMessageParams* = (seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp)
proc insertMessageQuery(table: string): SqlQueryStr = proc insertMessageQuery(table: string): SqlQueryStr =
"INSERT INTO " & table & "(id, receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp)" & "INSERT INTO " & table & "(id, storedAt, contentTopic, payload, pubsubTopic, version, senderTimestamp)" &
" VALUES (?, ?, ?, ?, ?, ?, ?);" " VALUES (?, ?, ?, ?, ?, ?, ?);"
proc prepareInsertMessageStmt*(db: SqliteDatabase): SqliteStmt[InsertMessageParams, void] = proc prepareInsertMessageStmt*(db: SqliteDatabase): SqliteStmt[InsertMessageParams, void] =
@ -126,7 +126,7 @@ proc getMessageCount*(db: SqliteDatabase): DatabaseResult[int64] =
## Get oldest message receiver timestamp ## Get oldest message receiver timestamp
proc selectOldestMessageTimestampQuery(table: string): SqlQueryStr = proc selectOldestMessageTimestampQuery(table: string): SqlQueryStr =
"SELECT MIN(receiverTimestamp) FROM " & table "SELECT MIN(storedAt) FROM " & table
proc selectOldestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.}= proc selectOldestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.}=
var timestamp: Timestamp var timestamp: Timestamp
@ -143,7 +143,7 @@ proc selectOldestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestam
## Get newest message receiver timestamp ## Get newest message receiver timestamp
proc selectNewestMessageTimestampQuery(table: string): SqlQueryStr = proc selectNewestMessageTimestampQuery(table: string): SqlQueryStr =
"SELECT MAX(receiverTimestamp) FROM " & table "SELECT MAX(storedAt) FROM " & table
proc selectNewestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.}= proc selectNewestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.}=
var timestamp: Timestamp var timestamp: Timestamp
@ -160,7 +160,7 @@ proc selectNewestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestam
## Delete messages older than timestamp ## Delete messages older than timestamp
proc deleteMessagesOlderThanTimestampQuery(table: string, ts: Timestamp): SqlQueryStr = proc deleteMessagesOlderThanTimestampQuery(table: string, ts: Timestamp): SqlQueryStr =
"DELETE FROM " & table & " WHERE receiverTimestamp < " & $ts "DELETE FROM " & table & " WHERE storedAt < " & $ts
proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64): DatabaseResult[void] = proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64): DatabaseResult[void] =
let query = deleteMessagesOlderThanTimestampQuery(DbTable, ts) let query = deleteMessagesOlderThanTimestampQuery(DbTable, ts)
@ -173,7 +173,7 @@ proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64): DatabaseR
proc deleteOldestMessagesNotWithinLimitQuery(table: string, limit: int): SqlQueryStr = proc deleteOldestMessagesNotWithinLimitQuery(table: string, limit: int): SqlQueryStr =
"DELETE FROM " & table & " WHERE id NOT IN (" & "DELETE FROM " & table & " WHERE id NOT IN (" &
" SELECT id FROM " & table & " SELECT id FROM " & table &
" ORDER BY receiverTimestamp DESC" & " ORDER BY storedAt DESC" &
" LIMIT " & $limit & " LIMIT " & $limit &
");" ");"
@ -187,20 +187,20 @@ proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int): Databa
## Select all messages ## Select all messages
proc selectAllMessagesQuery(table: string): SqlQueryStr = proc selectAllMessagesQuery(table: string): SqlQueryStr =
"SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp" & "SELECT storedAt, contentTopic, payload, pubsubTopic, version, senderTimestamp" &
" FROM " & table & " FROM " & table &
" ORDER BY receiverTimestamp ASC" " ORDER BY storedAt ASC"
proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(Timestamp, WakuMessage, string)]] = proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(Timestamp, WakuMessage, string)]] =
## Retrieve all messages from the store. ## Retrieve all messages from the store.
var rows: seq[(Timestamp, WakuMessage, string)] var rows: seq[(Timestamp, WakuMessage, string)]
proc queryRowCallback(s: ptr sqlite3_stmt) = proc queryRowCallback(s: ptr sqlite3_stmt) =
let let
receiverTimestamp = queryRowReceiverTimestampCallback(s, receiverTimestampCol=0) storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0)
wakuMessage = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5) wakuMessage = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5)
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3) pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3)
rows.add((receiverTimestamp, wakuMessage, pubsubTopic)) rows.add((storedAt, wakuMessage, pubsubTopic))
let query = selectAllMessagesQuery(DbTable) let query = selectAllMessagesQuery(DbTable)
let res = db.query(query, queryRowCallback) let res = db.query(query, queryRowCallback)
@ -232,7 +232,7 @@ proc cursorWhereClause(cursor: Option[Index], ascending=true): Option[string] =
return none(string) return none(string)
let comp = if ascending: ">" else: "<" let comp = if ascending: ">" else: "<"
let whereClause = "(senderTimestamp, id, pubsubTopic) " & comp & " (?, ?, ?)" let whereClause = "(storedAt, id, pubsubTopic) " & comp & " (?, ?, ?)"
some(whereClause) some(whereClause)
proc pubsubWhereClause(pubsubTopic: Option[string]): Option[string] = proc pubsubWhereClause(pubsubTopic: Option[string]): Option[string] =
@ -247,11 +247,11 @@ proc timeRangeWhereClause(startTime: Option[Timestamp], endTime: Option[Timestam
var where = "(" var where = "("
if startTime.isSome(): if startTime.isSome():
where &= "senderTimestamp >= (?)" where &= "storedAt >= (?)"
if startTime.isSome() and endTime.isSome(): if startTime.isSome() and endTime.isSome():
where &= " AND " where &= " AND "
if endTime.isSome(): if endTime.isSome():
where &= "senderTimestamp <= (?)" where &= "storedAt <= (?)"
where &= ")" where &= ")"
some(where) some(where)
@ -273,13 +273,13 @@ proc selectMessagesWithLimitQuery(table: string, where: Option[string], limit: u
var query: string var query: string
query = "SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp" query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, senderTimestamp"
query &= " FROM " & table query &= " FROM " & table
if where.isSome(): if where.isSome():
query &= " WHERE " & where.get() query &= " WHERE " & where.get()
query &= " ORDER BY senderTimestamp " & order & ", id " & order & ", pubsubTopic " & order & ", receiverTimestamp " & order query &= " ORDER BY storedAt " & order
query &= " LIMIT " & $limit & ";" query &= " LIMIT " & $limit & ";"
query query
@ -363,11 +363,11 @@ proc selectMessagesByHistoryQueryWithLimit*(db: SqliteDatabase,
var messages: seq[(WakuMessage, Timestamp, string)] = @[] var messages: seq[(WakuMessage, Timestamp, string)] = @[]
proc queryRowCallback(s: ptr sqlite3_stmt) = proc queryRowCallback(s: ptr sqlite3_stmt) =
let let
receiverTimestamp = queryRowReceiverTimestampCallback(s, receiverTimestampCol=0) storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0)
message = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5) message = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5)
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3) pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3)
messages.add((message, receiverTimestamp, pubsubTopic)) messages.add((message, storedAt, pubsubTopic))
let query = block: let query = block:
let let

View File

@ -66,12 +66,12 @@ method put*(s: SqliteStore, pubsubTopic: string, message: WakuMessage, digest: M
let res = s.insertStmt.exec(( let res = s.insertStmt.exec((
@(digest.data), # id @(digest.data), # id
receivedTime, # receiverTimestamp receivedTime, # storedAt
toBytes(message.contentTopic), # contentTopic toBytes(message.contentTopic), # contentTopic
message.payload, # payload message.payload, # payload
toBytes(pubsubTopic), # pubsubTopic toBytes(pubsubTopic), # pubsubTopic
int64(message.version), # version int64(message.version), # version
message.timestamp # senderTimestamp message.timestamp # senderTimestamp
)) ))
if res.isErr(): if res.isErr():
return err("message insert failed: " & res.error()) return err("message insert failed: " & res.error())
@ -95,11 +95,9 @@ method getMessagesByHistoryQuery*(
cursor = none(Index), cursor = none(Index),
startTime = none(Timestamp), startTime = none(Timestamp),
endTime = none(Timestamp), endTime = none(Timestamp),
maxPageSize = MaxPageSize, maxPageSize = DefaultPageSize,
ascendingOrder = true ascendingOrder = true
): MessageStoreResult[MessageStorePage] = ): MessageStoreResult[MessageStorePage] =
let pageSizeLimit = if maxPageSize <= 0: MaxPageSize
else: min(maxPageSize, MaxPageSize)
let rows = ?s.db.selectMessagesByHistoryQueryWithLimit( let rows = ?s.db.selectMessagesByHistoryQueryWithLimit(
contentTopic, contentTopic,
@ -107,7 +105,7 @@ method getMessagesByHistoryQuery*(
cursor, cursor,
startTime, startTime,
endTime, endTime,
limit=pageSizeLimit, limit=maxPageSize,
ascending=ascendingOrder ascending=ascendingOrder
) )
@ -118,8 +116,8 @@ method getMessagesByHistoryQuery*(
# TODO: Return the message hash from the DB, to avoid recomputing the hash of the last message # TODO: Return the message hash from the DB, to avoid recomputing the hash of the last message
# Compute last message index # Compute last message index
let (message, receivedTimestamp, pubsubTopic) = rows[^1] let (message, storedAt, pubsubTopic) = rows[^1]
let lastIndex = Index.compute(message, receivedTimestamp, pubsubTopic) let lastIndex = Index.compute(message, storedAt, pubsubTopic)
let pagingInfo = PagingInfo( let pagingInfo = PagingInfo(
pageSize: uint64(messages.len), pageSize: uint64(messages.len),

View File

@ -357,8 +357,7 @@ proc getPage*(storeQueue: StoreQueueRef,
let let
cursorOpt = if pagingInfo.cursor == Index(): none(Index) ## TODO: pagingInfo.cursor should be an Option. We shouldn't rely on empty initialisation to determine if set or not! cursorOpt = if pagingInfo.cursor == Index(): none(Index) ## TODO: pagingInfo.cursor should be an Option. We shouldn't rely on empty initialisation to determine if set or not!
else: some(pagingInfo.cursor) else: some(pagingInfo.cursor)
maxPageSize = if pagingInfo.pageSize <= 0: MaxPageSize maxPageSize = pagingInfo.pageSize
else: min(pagingInfo.pageSize, MaxPageSize)
case pagingInfo.direction case pagingInfo.direction
of PagingDirection.FORWARD: of PagingDirection.FORWARD:
@ -384,7 +383,7 @@ method getMessagesByHistoryQuery*(
cursor = none(Index), cursor = none(Index),
startTime = none(Timestamp), startTime = none(Timestamp),
endTime = none(Timestamp), endTime = none(Timestamp),
maxPageSize = MaxPageSize, maxPageSize = DefaultPageSize,
ascendingOrder = true ascendingOrder = true
): MessageStoreResult[MessageStorePage] = ): MessageStoreResult[MessageStorePage] =

View File

@ -7,7 +7,7 @@ const MESSAGE_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts/message"
const PEER_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts/peer" const PEER_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts/peer"
const ALL_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts" const ALL_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts"
const USER_VERSION* = 5 # increase this when there is an update in the database schema const USER_VERSION* = 6 # increase this when there is an update in the database schema
type MigrationScriptsResult*[T] = Result[T, string] type MigrationScriptsResult*[T] = Result[T, string]
type type

View File

@ -0,0 +1,11 @@
ALTER TABLE message RENAME COLUMN receiverTimestamp TO storedAt;
DROP INDEX IF EXISTS i_msg;
CREATE INDEX IF NOT EXISTS i_query ON message (contentTopic, pubsubTopic, storedAt, id);
DROP INDEX IF EXISTS i_rt;
CREATE INDEX IF NOT EXISTS i_ts ON message (storedAt);

View File

@ -109,7 +109,8 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.}
else: none(Timestamp) else: none(Timestamp)
qEndTime = if query.endTime != Timestamp(0): some(query.endTime) qEndTime = if query.endTime != Timestamp(0): some(query.endTime)
else: none(Timestamp) else: none(Timestamp)
qMaxPageSize = query.pagingInfo.pageSize qMaxPageSize = if query.pagingInfo.pageSize <= 0: DefaultPageSize
else: min(query.pagingInfo.pageSize, MaxPageSize)
qAscendingOrder = query.pagingInfo.direction == PagingDirection.FORWARD qAscendingOrder = query.pagingInfo.direction == PagingDirection.FORWARD