mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-28 15:46:33 +00:00
deploy: 30f0e19d79de4b0693ba94de2f98c84aeb321328
This commit is contained in:
parent
daf4db8a5b
commit
686b78a844
@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
# libtool - Provide generalized library-building support services.
|
# libtool - Provide generalized library-building support services.
|
||||||
# Generated automatically by config.status (libbacktrace) version-unused
|
# Generated automatically by config.status (libbacktrace) version-unused
|
||||||
# Libtool was configured on host fv-az343-585:
|
# Libtool was configured on host fv-az208-350:
|
||||||
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
|
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
|
||||||
#
|
#
|
||||||
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,
|
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,
|
||||||
|
@ -88,15 +88,19 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase,
|
|||||||
|
|
||||||
## Database initialization
|
## Database initialization
|
||||||
|
|
||||||
# Create table (if not exists)
|
# Create table, if doesn't exist
|
||||||
let resCreate = createTable(db)
|
let resCreate = createTable(db)
|
||||||
if resCreate.isErr():
|
if resCreate.isErr():
|
||||||
return err("an error occurred while creating the table: " & resCreate.error())
|
return err("failed to create table: " & resCreate.error())
|
||||||
|
|
||||||
# Create index on receiverTimestamp (if not exists)
|
# Create indices, if don't exist
|
||||||
let resIndex = createIndex(db)
|
let resRtIndex = createOldestMessageTimestampIndex(db)
|
||||||
if resIndex.isErr():
|
if resRtIndex.isErr():
|
||||||
return err("Could not establish index on receiverTimestamp: " & resIndex.error())
|
return err("failed to create i_rt index: " & resRtIndex.error())
|
||||||
|
|
||||||
|
let resMsgIndex = createHistoryQueryIndex(db)
|
||||||
|
if resMsgIndex.isErr():
|
||||||
|
return err("failed to create i_msg index: " & resMsgIndex.error())
|
||||||
|
|
||||||
## General initialization
|
## General initialization
|
||||||
|
|
||||||
@ -150,7 +154,6 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase,
|
|||||||
# Update message count after deleting messages
|
# Update message count after deleting messages
|
||||||
wms.numMessages = int(getMessageCount(db).expect("query for oldest timestamp should work"))
|
wms.numMessages = int(getMessageCount(db).expect("query for oldest timestamp should work"))
|
||||||
|
|
||||||
|
|
||||||
ok(wms)
|
ok(wms)
|
||||||
|
|
||||||
|
|
||||||
|
@ -18,7 +18,7 @@ type SqlQueryStr = string
|
|||||||
|
|
||||||
### SQLite column helper methods
|
### SQLite column helper methods
|
||||||
|
|
||||||
proc queryRowWakuMessageCallback(s: ptr sqlite3_stmt, contentTopicCol, payloadCol, versionCol, senderTimestampCol: cint): WakuMessage {.inline.} =
|
proc queryRowWakuMessageCallback(s: ptr sqlite3_stmt, contentTopicCol, payloadCol, versionCol, senderTimestampCol: cint): WakuMessage =
|
||||||
let
|
let
|
||||||
topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, contentTopicCol))
|
topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, contentTopicCol))
|
||||||
topicLength = sqlite3_column_bytes(s, 1)
|
topicLength = sqlite3_column_bytes(s, 1)
|
||||||
@ -37,11 +37,11 @@ proc queryRowWakuMessageCallback(s: ptr sqlite3_stmt, contentTopicCol, payloadCo
|
|||||||
timestamp: Timestamp(senderTimestamp)
|
timestamp: Timestamp(senderTimestamp)
|
||||||
)
|
)
|
||||||
|
|
||||||
proc queryRowReceiverTimestampCallback(s: ptr sqlite3_stmt, receiverTimestampCol: cint): Timestamp {.inline.} =
|
proc queryRowReceiverTimestampCallback(s: ptr sqlite3_stmt, receiverTimestampCol: cint): Timestamp =
|
||||||
let receiverTimestamp = sqlite3_column_int64(s, receiverTimestampCol)
|
let receiverTimestamp = sqlite3_column_int64(s, receiverTimestampCol)
|
||||||
Timestamp(receiverTimestamp)
|
Timestamp(receiverTimestamp)
|
||||||
|
|
||||||
proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): string {.inline.} =
|
proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): string =
|
||||||
let
|
let
|
||||||
pubsubTopicPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, pubsubTopicCol))
|
pubsubTopicPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, pubsubTopicCol))
|
||||||
pubsubTopicLength = sqlite3_column_bytes(s, 3)
|
pubsubTopicLength = sqlite3_column_bytes(s, 3)
|
||||||
@ -55,7 +55,7 @@ proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): str
|
|||||||
|
|
||||||
## Create table
|
## Create table
|
||||||
|
|
||||||
template createTableQuery(table: string): SqlQueryStr =
|
proc createTableQuery(table: string): SqlQueryStr =
|
||||||
"CREATE TABLE IF NOT EXISTS " & table & " (" &
|
"CREATE TABLE IF NOT EXISTS " & table & " (" &
|
||||||
" id BLOB," &
|
" id BLOB," &
|
||||||
" receiverTimestamp INTEGER NOT NULL," &
|
" receiverTimestamp INTEGER NOT NULL," &
|
||||||
@ -67,19 +67,28 @@ template createTableQuery(table: string): SqlQueryStr =
|
|||||||
" CONSTRAINT messageIndex PRIMARY KEY (senderTimestamp, id, pubsubTopic)" &
|
" CONSTRAINT messageIndex PRIMARY KEY (senderTimestamp, id, pubsubTopic)" &
|
||||||
") WITHOUT ROWID;"
|
") WITHOUT ROWID;"
|
||||||
|
|
||||||
proc createTable*(db: SqliteDatabase): DatabaseResult[void] {.inline.} =
|
proc createTable*(db: SqliteDatabase): DatabaseResult[void] =
|
||||||
let query = createTableQuery(DbTable)
|
let query = createTableQuery(DbTable)
|
||||||
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
|
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
|
|
||||||
## Create index
|
## Create indices
|
||||||
|
|
||||||
template createIndexQuery(table: string): SqlQueryStr =
|
proc createOldestMessageTimestampIndexQuery(table: string): SqlQueryStr =
|
||||||
|
"CREATE INDEX IF NOT EXISTS i_rt ON " & table & " (receiverTimestamp);"
|
||||||
|
|
||||||
|
proc createOldestMessageTimestampIndex*(db: SqliteDatabase): DatabaseResult[void] =
|
||||||
|
let query = createOldestMessageTimestampIndexQuery(DbTable)
|
||||||
|
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
|
||||||
|
ok()
|
||||||
|
|
||||||
|
|
||||||
|
proc createHistoryQueryIndexQuery(table: string): SqlQueryStr =
|
||||||
"CREATE INDEX IF NOT EXISTS i_msg ON " & table & " (contentTopic, pubsubTopic, senderTimestamp, id);"
|
"CREATE INDEX IF NOT EXISTS i_msg ON " & table & " (contentTopic, pubsubTopic, senderTimestamp, id);"
|
||||||
|
|
||||||
proc createIndex*(db: SqliteDatabase): DatabaseResult[void] {.inline.} =
|
proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] =
|
||||||
let query = createIndexQuery(DbTable)
|
let query = createHistoryQueryIndexQuery(DbTable)
|
||||||
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
|
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
@ -87,7 +96,7 @@ proc createIndex*(db: SqliteDatabase): DatabaseResult[void] {.inline.} =
|
|||||||
## Insert message
|
## Insert message
|
||||||
type InsertMessageParams* = (seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp)
|
type InsertMessageParams* = (seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp)
|
||||||
|
|
||||||
template insertMessageQuery(table: string): SqlQueryStr =
|
proc insertMessageQuery(table: string): SqlQueryStr =
|
||||||
"INSERT INTO " & table & "(id, receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp)" &
|
"INSERT INTO " & table & "(id, receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp)" &
|
||||||
" VALUES (?, ?, ?, ?, ?, ?, ?);"
|
" VALUES (?, ?, ?, ?, ?, ?, ?);"
|
||||||
|
|
||||||
@ -98,10 +107,10 @@ proc prepareInsertMessageStmt*(db: SqliteDatabase): SqliteStmt[InsertMessagePara
|
|||||||
|
|
||||||
## Count table messages
|
## Count table messages
|
||||||
|
|
||||||
template countMessagesQuery(table: string): SqlQueryStr =
|
proc countMessagesQuery(table: string): SqlQueryStr =
|
||||||
"SELECT COUNT(*) FROM " & table
|
"SELECT COUNT(*) FROM " & table
|
||||||
|
|
||||||
proc getMessageCount*(db: SqliteDatabase): DatabaseResult[int64] {.inline.} =
|
proc getMessageCount*(db: SqliteDatabase): DatabaseResult[int64] =
|
||||||
var count: int64
|
var count: int64
|
||||||
proc queryRowCallback(s: ptr sqlite3_stmt) =
|
proc queryRowCallback(s: ptr sqlite3_stmt) =
|
||||||
count = sqlite3_column_int64(s, 0)
|
count = sqlite3_column_int64(s, 0)
|
||||||
@ -116,7 +125,7 @@ proc getMessageCount*(db: SqliteDatabase): DatabaseResult[int64] {.inline.} =
|
|||||||
|
|
||||||
## Get oldest receiver timestamp
|
## Get oldest receiver timestamp
|
||||||
|
|
||||||
template selectOldestMessageTimestampQuery(table: string): SqlQueryStr =
|
proc selectOldestMessageTimestampQuery(table: string): SqlQueryStr =
|
||||||
"SELECT MIN(receiverTimestamp) FROM " & table
|
"SELECT MIN(receiverTimestamp) FROM " & table
|
||||||
|
|
||||||
proc selectOldestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.}=
|
proc selectOldestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.}=
|
||||||
@ -134,10 +143,10 @@ proc selectOldestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestam
|
|||||||
|
|
||||||
## Delete messages older than timestamp
|
## Delete messages older than timestamp
|
||||||
|
|
||||||
template deleteMessagesOlderThanTimestampQuery(table: string, ts: Timestamp): SqlQueryStr =
|
proc deleteMessagesOlderThanTimestampQuery(table: string, ts: Timestamp): SqlQueryStr =
|
||||||
"DELETE FROM " & table & " WHERE receiverTimestamp < " & $ts
|
"DELETE FROM " & table & " WHERE receiverTimestamp < " & $ts
|
||||||
|
|
||||||
proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64): DatabaseResult[void] {.inline.} =
|
proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64): DatabaseResult[void] =
|
||||||
let query = deleteMessagesOlderThanTimestampQuery(DbTable, ts)
|
let query = deleteMessagesOlderThanTimestampQuery(DbTable, ts)
|
||||||
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
|
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
|
||||||
ok()
|
ok()
|
||||||
@ -145,14 +154,14 @@ proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64): DatabaseR
|
|||||||
|
|
||||||
## Delete oldest messages not within limit
|
## Delete oldest messages not within limit
|
||||||
|
|
||||||
template deleteOldestMessagesNotWithinLimitQuery*(table: string, limit: int): SqlQueryStr =
|
proc deleteOldestMessagesNotWithinLimitQuery*(table: string, limit: int): SqlQueryStr =
|
||||||
"DELETE FROM " & table & " WHERE id NOT IN (" &
|
"DELETE FROM " & table & " WHERE id NOT IN (" &
|
||||||
" SELECT id FROM " & table &
|
" SELECT id FROM " & table &
|
||||||
" ORDER BY receiverTimestamp DESC" &
|
" ORDER BY receiverTimestamp DESC" &
|
||||||
" LIMIT " & $limit &
|
" LIMIT " & $limit &
|
||||||
");"
|
");"
|
||||||
|
|
||||||
proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int): DatabaseResult[void] {.inline.} =
|
proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int): DatabaseResult[void] =
|
||||||
# NOTE: The word `limit` here refers the store capacity/maximum number-of-messages allowed limit
|
# NOTE: The word `limit` here refers the store capacity/maximum number-of-messages allowed limit
|
||||||
let query = deleteOldestMessagesNotWithinLimitQuery(DbTable, limit=limit)
|
let query = deleteOldestMessagesNotWithinLimitQuery(DbTable, limit=limit)
|
||||||
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
|
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
|
||||||
@ -161,7 +170,7 @@ proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int): Databa
|
|||||||
|
|
||||||
## Select all messages
|
## Select all messages
|
||||||
|
|
||||||
template selectAllMessagesQuery(table: string): SqlQueryStr =
|
proc selectAllMessagesQuery(table: string): SqlQueryStr =
|
||||||
"SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp" &
|
"SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp" &
|
||||||
" FROM " & table &
|
" FROM " & table &
|
||||||
" ORDER BY receiverTimestamp ASC"
|
" ORDER BY receiverTimestamp ASC"
|
||||||
|
@ -1,3 +1 @@
|
|||||||
DROP INDEX IF EXISTS i_rt;
|
|
||||||
|
|
||||||
CREATE INDEX IF NOT EXISTS i_msg ON Message (contentTopic, pubsubTopic, senderTimestamp, id);
|
CREATE INDEX IF NOT EXISTS i_msg ON Message (contentTopic, pubsubTopic, senderTimestamp, id);
|
||||||
|
@ -212,23 +212,23 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} =
|
|||||||
|
|
||||||
# Add message to in-memory store
|
# Add message to in-memory store
|
||||||
if not w.isSqliteOnly:
|
if not w.isSqliteOnly:
|
||||||
# Handle WakuMessage according to store protocol
|
|
||||||
trace "handle message in WakuStore", topic=topic, msg=msg
|
trace "handle message in WakuStore", topic=topic, msg=msg
|
||||||
|
|
||||||
let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic))
|
let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic))
|
||||||
if addRes.isErr():
|
if addRes.isErr():
|
||||||
trace "Attempt to add message to store failed", msg=msg, index=index, err=addRes.error()
|
debug "Attempt to add message to store failed", msg=msg, index=index, err=addRes.error()
|
||||||
waku_store_errors.inc(labelValues = [$(addRes.error())])
|
waku_store_errors.inc(labelValues = [$(addRes.error())])
|
||||||
return
|
return
|
||||||
|
|
||||||
waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"])
|
waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"])
|
||||||
|
|
||||||
|
# Add messages to persistent store, if present
|
||||||
if w.store.isNil:
|
if w.store.isNil:
|
||||||
return
|
return
|
||||||
|
|
||||||
let res = w.store.put(index, msg, topic)
|
let res = w.store.put(index, msg, topic)
|
||||||
if res.isErr():
|
if res.isErr():
|
||||||
trace "failed to store messages", err=res.error()
|
debug "failed to store messages", err=res.error()
|
||||||
waku_store_errors.inc(labelValues = [storeFailure])
|
waku_store_errors.inc(labelValues = [storeFailure])
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user