deploy: d7d51dc4c1eed4c183cca8b2b356ecbb61510808

This commit is contained in:
LNSD 2022-09-27 10:12:52 +00:00
parent d28bf18bc2
commit 381c45d0d8
9 changed files with 188 additions and 111 deletions

View File

@ -19,13 +19,19 @@ const
DefaultContentTopic = ContentTopic("/waku/2/default-content/proto")
proc now(): Timestamp =
getNanosecondTime(getTime().toUnixFloat())
proc ts(offset=0, origin=now()): Timestamp =
origin + getNanosecondTime(offset)
proc newTestDatabase(): SqliteDatabase =
SqliteDatabase.init("", inMemory = true).tryGet()
proc fakeWakuMessage(
payload = "TEST-PAYLOAD",
contentTopic = DefaultContentTopic,
ts = getNanosecondTime(epochTime())
ts = now()
): WakuMessage =
WakuMessage(
payload: toBytes(payload),
@ -46,20 +52,20 @@ suite "message store - history query":
store = SqliteStore.init(database).tryGet()
let messages = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0),
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1),
fakeWakuMessage(ts=ts(0)),
fakeWakuMessage(ts=ts(1)),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=ts(2)),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=ts(3)),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=ts(4)),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=ts(5)),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=ts(6)),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=ts(7)),
]
for msg in messages:
require store.put(DefaultPubsubTopic, msg).isOk()
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
## When
let res = store.getMessagesByHistoryQuery(
@ -94,20 +100,20 @@ suite "message store - history query":
store = SqliteStore.init(database).tryGet()
let messages = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0),
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1),
fakeWakuMessage(ts=ts(0)),
fakeWakuMessage(ts=ts(1)),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=ts(2)),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=ts(3)),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=ts(4)),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=ts(5)),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=ts(6)),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=ts(7)),
]
for msg in messages:
require store.put(DefaultPubsubTopic, msg).isOk()
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
## When
let res = store.getMessagesByHistoryQuery(
@ -144,20 +150,20 @@ suite "message store - history query":
store = SqliteStore.init(database).tryGet()
let messages = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0),
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1),
fakeWakuMessage(ts=ts(0)),
fakeWakuMessage(ts=ts(1)),
fakeWakuMessage("MSG-01", contentTopic=contentTopic1, ts=getNanosecondTime(epochTime()) + 2),
fakeWakuMessage("MSG-02", contentTopic=contentTopic2, ts=getNanosecondTime(epochTime()) + 3),
fakeWakuMessage("MSG-01", contentTopic=contentTopic1, ts=ts(2)),
fakeWakuMessage("MSG-02", contentTopic=contentTopic2, ts=ts(3)),
fakeWakuMessage("MSG-03", contentTopic=contentTopic3, ts=getNanosecondTime(epochTime()) + 4),
fakeWakuMessage("MSG-04", contentTopic=contentTopic1, ts=getNanosecondTime(epochTime()) + 5),
fakeWakuMessage("MSG-05", contentTopic=contentTopic2, ts=getNanosecondTime(epochTime()) + 6),
fakeWakuMessage("MSG-06", contentTopic=contentTopic3, ts=getNanosecondTime(epochTime()) + 7),
fakeWakuMessage("MSG-03", contentTopic=contentTopic3, ts=ts(4)),
fakeWakuMessage("MSG-04", contentTopic=contentTopic1, ts=ts(5)),
fakeWakuMessage("MSG-05", contentTopic=contentTopic2, ts=ts(6)),
fakeWakuMessage("MSG-06", contentTopic=contentTopic3, ts=ts(7)),
]
for msg in messages:
require store.put(DefaultPubsubTopic, msg).isOk()
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
## When
let res = store.getMessagesByHistoryQuery(
@ -193,24 +199,24 @@ suite "message store - history query":
store = SqliteStore.init(database).tryGet()
let messages1 = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0),
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1),
fakeWakuMessage(ts=ts(0)),
fakeWakuMessage(ts=ts(1)),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=ts(2)),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=ts(3)),
]
for msg in messages1:
require store.put(DefaultPubsubTopic, msg).isOk()
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let messages2 = @[
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=ts(4)),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=ts(5)),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=ts(6)),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=ts(7)),
]
for msg in messages2:
require store.put(pubsubTopic, msg).isOk()
require store.put(pubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
## When
let res = store.getMessagesByHistoryQuery(
@ -246,20 +252,20 @@ suite "message store - history query":
store = SqliteStore.init(database).tryGet()
let messages = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0),
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1),
fakeWakuMessage(ts=ts(0)),
fakeWakuMessage(ts=ts(1)),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=ts(2)),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=ts(3)),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=ts(4)),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=ts(5)),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=ts(6)),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=ts(7)),
]
for msg in messages:
require store.put(DefaultPubsubTopic, msg).isOk()
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let cursor = Index.compute(messages[4], messages[4].timestamp, DefaultPubsubTopic)
@ -297,20 +303,20 @@ suite "message store - history query":
store = SqliteStore.init(database).tryGet()
let messages = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0),
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1),
fakeWakuMessage(ts=ts(0)),
fakeWakuMessage(ts=ts(1)),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=ts(2)),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=ts(3)),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=ts(4)),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=ts(5)),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=ts(6)),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=ts(7)),
]
for msg in messages:
require store.put(DefaultPubsubTopic, msg).isOk()
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let cursor = Index.compute(messages[6], messages[6].timestamp, DefaultPubsubTopic)
@ -349,22 +355,22 @@ suite "message store - history query":
store = SqliteStore.init(database).tryGet()
let messages1 = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0),
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4),
fakeWakuMessage(ts=ts(0)),
fakeWakuMessage(ts=ts(1)),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=ts(2)),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=ts(3)),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=ts(4)),
]
for msg in messages1:
require store.put(DefaultPubsubTopic, msg).isOk()
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let messages2 = @[
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=ts(5)),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=ts(6)),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=ts(7)),
]
for msg in messages2:
require store.put(pubsubTopic, msg).isOk()
require store.put(pubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let cursor = Index.compute(messages2[0], messages2[0].timestamp, DefaultPubsubTopic)
@ -403,15 +409,15 @@ suite "message store - history query":
store = SqliteStore.init(database).tryGet()
let messages = @[
fakeWakuMessage("MSG-01", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 2),
fakeWakuMessage("MSG-02", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 3),
fakeWakuMessage("MSG-03", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 4),
fakeWakuMessage("MSG-04", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 5),
fakeWakuMessage("MSG-05", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 6),
fakeWakuMessage("MSG-01", contentTopic=DefaultContentTopic, ts=ts(2)),
fakeWakuMessage("MSG-02", contentTopic=DefaultContentTopic, ts=ts(3)),
fakeWakuMessage("MSG-03", contentTopic=DefaultContentTopic, ts=ts(4)),
fakeWakuMessage("MSG-04", contentTopic=DefaultContentTopic, ts=ts(5)),
fakeWakuMessage("MSG-05", contentTopic=DefaultContentTopic, ts=ts(6)),
]
for msg in messages:
require store.put(DefaultPubsubTopic, msg).isOk()
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
## When
let res = store.getMessagesByHistoryQuery(
@ -431,6 +437,68 @@ suite "message store - history query":
## Teardown
store.close()
test "content topic and page size":
## Given
let pageSize: uint64 = 50
let
database = newTestDatabase()
store = SqliteStore.init(database).tryGet()
for t in 0..<70:
let msg = fakeWakuMessage("MSG-" & $t, DefaultContentTopic, ts=ts(t))
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[DefaultContentTopic]),
maxPageSize=pageSize,
ascendingOrder=true
)
## Then
check:
res.isOk()
let (filteredMessages, pagingInfo) = res.tryGet()
check:
filteredMessages.len == 50
pagingInfo.isSome()
pagingInfo.get().pageSize == 50
## Teardown
store.close()
test "content topic and page size - not enough messages stored":
## Given
let pageSize: uint64 = 50
let
database = newTestDatabase()
store = SqliteStore.init(database).tryGet()
for t in 0..<40:
let msg = fakeWakuMessage("MSG-" & $t, DefaultContentTopic, ts=ts(t))
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[DefaultContentTopic]),
maxPageSize=pageSize,
ascendingOrder=true
)
## Then
check:
res.isOk()
let (filteredMessages, pagingInfo) = res.tryGet()
check:
filteredMessages.len == 40
pagingInfo.isSome()
pagingInfo.get().pageSize == 40
## Teardown
store.close()
test "single content topic and valid time range":
## Given

View File

@ -2,7 +2,7 @@
# libtool - Provide generalized library-building support services.
# Generated automatically by config.status (libbacktrace) version-unused
# Libtool was configured on host fv-az296-660:
# Libtool was configured on host fv-az246-52:
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
#
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,

View File

@ -42,7 +42,7 @@ method getMessagesByHistoryQuery*(
cursor = none(Index),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = MaxPageSize,
maxPageSize = DefaultPageSize,
ascendingOrder = true
): MessageStoreResult[MessageStorePage] {.base.} = discard

View File

@ -37,9 +37,9 @@ proc queryRowWakuMessageCallback(s: ptr sqlite3_stmt, contentTopicCol, payloadCo
timestamp: Timestamp(senderTimestamp)
)
proc queryRowReceiverTimestampCallback(s: ptr sqlite3_stmt, receiverTimestampCol: cint): Timestamp =
let receiverTimestamp = sqlite3_column_int64(s, receiverTimestampCol)
Timestamp(receiverTimestamp)
proc queryRowReceiverTimestampCallback(s: ptr sqlite3_stmt, storedAtCol: cint): Timestamp =
let storedAt = sqlite3_column_int64(s, storedAtCol)
Timestamp(storedAt)
proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): string =
let
@ -58,7 +58,7 @@ proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): str
proc createTableQuery(table: string): SqlQueryStr =
"CREATE TABLE IF NOT EXISTS " & table & " (" &
" id BLOB," &
" receiverTimestamp INTEGER NOT NULL," &
" storedAt INTEGER NOT NULL," &
" contentTopic BLOB NOT NULL," &
" pubsubTopic BLOB NOT NULL," &
" payload BLOB," &
@ -76,7 +76,7 @@ proc createTable*(db: SqliteDatabase): DatabaseResult[void] =
## Create indices
proc createOldestMessageTimestampIndexQuery(table: string): SqlQueryStr =
"CREATE INDEX IF NOT EXISTS i_rt ON " & table & " (receiverTimestamp);"
"CREATE INDEX IF NOT EXISTS i_ts ON " & table & " (storedAt);"
proc createOldestMessageTimestampIndex*(db: SqliteDatabase): DatabaseResult[void] =
let query = createOldestMessageTimestampIndexQuery(DbTable)
@ -85,7 +85,7 @@ proc createOldestMessageTimestampIndex*(db: SqliteDatabase): DatabaseResult[void
proc createHistoryQueryIndexQuery(table: string): SqlQueryStr =
"CREATE INDEX IF NOT EXISTS i_msg ON " & table & " (contentTopic, pubsubTopic, senderTimestamp, id);"
"CREATE INDEX IF NOT EXISTS i_query ON " & table & " (contentTopic, pubsubTopic, storedAt, id);"
proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] =
let query = createHistoryQueryIndexQuery(DbTable)
@ -97,7 +97,7 @@ proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] =
type InsertMessageParams* = (seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp)
proc insertMessageQuery(table: string): SqlQueryStr =
"INSERT INTO " & table & "(id, receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp)" &
"INSERT INTO " & table & "(id, storedAt, contentTopic, payload, pubsubTopic, version, senderTimestamp)" &
" VALUES (?, ?, ?, ?, ?, ?, ?);"
proc prepareInsertMessageStmt*(db: SqliteDatabase): SqliteStmt[InsertMessageParams, void] =
@ -126,7 +126,7 @@ proc getMessageCount*(db: SqliteDatabase): DatabaseResult[int64] =
## Get oldest message receiver timestamp
proc selectOldestMessageTimestampQuery(table: string): SqlQueryStr =
"SELECT MIN(receiverTimestamp) FROM " & table
"SELECT MIN(storedAt) FROM " & table
proc selectOldestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.}=
var timestamp: Timestamp
@ -143,7 +143,7 @@ proc selectOldestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestam
## Get newest message receiver timestamp
proc selectNewestMessageTimestampQuery(table: string): SqlQueryStr =
"SELECT MAX(receiverTimestamp) FROM " & table
"SELECT MAX(storedAt) FROM " & table
proc selectNewestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.}=
var timestamp: Timestamp
@ -160,7 +160,7 @@ proc selectNewestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestam
## Delete messages older than timestamp
proc deleteMessagesOlderThanTimestampQuery(table: string, ts: Timestamp): SqlQueryStr =
"DELETE FROM " & table & " WHERE receiverTimestamp < " & $ts
"DELETE FROM " & table & " WHERE storedAt < " & $ts
proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64): DatabaseResult[void] =
let query = deleteMessagesOlderThanTimestampQuery(DbTable, ts)
@ -173,7 +173,7 @@ proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64): DatabaseR
proc deleteOldestMessagesNotWithinLimitQuery(table: string, limit: int): SqlQueryStr =
"DELETE FROM " & table & " WHERE id NOT IN (" &
" SELECT id FROM " & table &
" ORDER BY receiverTimestamp DESC" &
" ORDER BY storedAt DESC" &
" LIMIT " & $limit &
");"
@ -187,20 +187,20 @@ proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int): Databa
## Select all messages
proc selectAllMessagesQuery(table: string): SqlQueryStr =
"SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp" &
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, senderTimestamp" &
" FROM " & table &
" ORDER BY receiverTimestamp ASC"
" ORDER BY storedAt ASC"
proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(Timestamp, WakuMessage, string)]] =
## Retrieve all messages from the store.
var rows: seq[(Timestamp, WakuMessage, string)]
proc queryRowCallback(s: ptr sqlite3_stmt) =
let
receiverTimestamp = queryRowReceiverTimestampCallback(s, receiverTimestampCol=0)
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0)
wakuMessage = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5)
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3)
rows.add((receiverTimestamp, wakuMessage, pubsubTopic))
rows.add((storedAt, wakuMessage, pubsubTopic))
let query = selectAllMessagesQuery(DbTable)
let res = db.query(query, queryRowCallback)
@ -232,7 +232,7 @@ proc cursorWhereClause(cursor: Option[Index], ascending=true): Option[string] =
return none(string)
let comp = if ascending: ">" else: "<"
let whereClause = "(senderTimestamp, id, pubsubTopic) " & comp & " (?, ?, ?)"
let whereClause = "(storedAt, id, pubsubTopic) " & comp & " (?, ?, ?)"
some(whereClause)
proc pubsubWhereClause(pubsubTopic: Option[string]): Option[string] =
@ -247,11 +247,11 @@ proc timeRangeWhereClause(startTime: Option[Timestamp], endTime: Option[Timestam
var where = "("
if startTime.isSome():
where &= "senderTimestamp >= (?)"
where &= "storedAt >= (?)"
if startTime.isSome() and endTime.isSome():
where &= " AND "
if endTime.isSome():
where &= "senderTimestamp <= (?)"
where &= "storedAt <= (?)"
where &= ")"
some(where)
@ -273,13 +273,13 @@ proc selectMessagesWithLimitQuery(table: string, where: Option[string], limit: u
var query: string
query = "SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp"
query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, senderTimestamp"
query &= " FROM " & table
if where.isSome():
query &= " WHERE " & where.get()
query &= " ORDER BY senderTimestamp " & order & ", id " & order & ", pubsubTopic " & order & ", receiverTimestamp " & order
query &= " ORDER BY storedAt " & order
query &= " LIMIT " & $limit & ";"
query
@ -363,11 +363,11 @@ proc selectMessagesByHistoryQueryWithLimit*(db: SqliteDatabase,
var messages: seq[(WakuMessage, Timestamp, string)] = @[]
proc queryRowCallback(s: ptr sqlite3_stmt) =
let
receiverTimestamp = queryRowReceiverTimestampCallback(s, receiverTimestampCol=0)
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0)
message = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5)
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3)
messages.add((message, receiverTimestamp, pubsubTopic))
messages.add((message, storedAt, pubsubTopic))
let query = block:
let

View File

@ -66,12 +66,12 @@ method put*(s: SqliteStore, pubsubTopic: string, message: WakuMessage, digest: M
let res = s.insertStmt.exec((
@(digest.data), # id
receivedTime, # receiverTimestamp
toBytes(message.contentTopic), # contentTopic
receivedTime, # storedAt
toBytes(message.contentTopic), # contentTopic
message.payload, # payload
toBytes(pubsubTopic), # pubsubTopic
toBytes(pubsubTopic), # pubsubTopic
int64(message.version), # version
message.timestamp # senderTimestamp
message.timestamp # senderTimestamp
))
if res.isErr():
return err("message insert failed: " & res.error())
@ -95,11 +95,9 @@ method getMessagesByHistoryQuery*(
cursor = none(Index),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = MaxPageSize,
maxPageSize = DefaultPageSize,
ascendingOrder = true
): MessageStoreResult[MessageStorePage] =
let pageSizeLimit = if maxPageSize <= 0: MaxPageSize
else: min(maxPageSize, MaxPageSize)
let rows = ?s.db.selectMessagesByHistoryQueryWithLimit(
contentTopic,
@ -107,7 +105,7 @@ method getMessagesByHistoryQuery*(
cursor,
startTime,
endTime,
limit=pageSizeLimit,
limit=maxPageSize,
ascending=ascendingOrder
)
@ -118,8 +116,8 @@ method getMessagesByHistoryQuery*(
# TODO: Return the message hash from the DB, to avoid recomputing the hash of the last message
# Compute last message index
let (message, receivedTimestamp, pubsubTopic) = rows[^1]
let lastIndex = Index.compute(message, receivedTimestamp, pubsubTopic)
let (message, storedAt, pubsubTopic) = rows[^1]
let lastIndex = Index.compute(message, storedAt, pubsubTopic)
let pagingInfo = PagingInfo(
pageSize: uint64(messages.len),

View File

@ -357,8 +357,7 @@ proc getPage*(storeQueue: StoreQueueRef,
let
cursorOpt = if pagingInfo.cursor == Index(): none(Index) ## TODO: pagingInfo.cursor should be an Option. We shouldn't rely on empty initialisation to determine if set or not!
else: some(pagingInfo.cursor)
maxPageSize = if pagingInfo.pageSize <= 0: MaxPageSize
else: min(pagingInfo.pageSize, MaxPageSize)
maxPageSize = pagingInfo.pageSize
case pagingInfo.direction
of PagingDirection.FORWARD:
@ -384,7 +383,7 @@ method getMessagesByHistoryQuery*(
cursor = none(Index),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = MaxPageSize,
maxPageSize = DefaultPageSize,
ascendingOrder = true
): MessageStoreResult[MessageStorePage] =

View File

@ -7,7 +7,7 @@ const MESSAGE_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts/message"
const PEER_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts/peer"
const ALL_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts"
const USER_VERSION* = 5 # increase this when there is an update in the database schema
const USER_VERSION* = 6 # increase this when there is an update in the database schema
type MigrationScriptsResult*[T] = Result[T, string]
type

View File

@ -0,0 +1,11 @@
ALTER TABLE message RENAME COLUMN receiverTimestamp TO storedAt;
DROP INDEX IF EXISTS i_msg;
CREATE INDEX IF NOT EXISTS i_query ON message (contentTopic, pubsubTopic, storedAt, id);
DROP INDEX IF EXISTS i_rt;
CREATE INDEX IF NOT EXISTS i_ts ON message (storedAt);

View File

@ -109,7 +109,8 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.}
else: none(Timestamp)
qEndTime = if query.endTime != Timestamp(0): some(query.endTime)
else: none(Timestamp)
qMaxPageSize = query.pagingInfo.pageSize
qMaxPageSize = if query.pagingInfo.pageSize <= 0: DefaultPageSize
else: min(query.pagingInfo.pageSize, MaxPageSize)
qAscendingOrder = query.pagingInfo.direction == PagingDirection.FORWARD