Abhimanyu fc810eaf63 chore: add size retention policy (#2093)
* chore: add retention policy with GB or MB limitation #1885

* chore: add retention policy with GB or MB limitation

* chore: updated code post review- retention policy

* ci: extract discordNotify to separate file

Signed-off-by: Jakub Sokołowski <jakub@status.im>

* ci: push images to new wakuorg/nwaku repo

Signed-off-by: Jakub Sokołowski <jakub@status.im>

* ci: enforce default Docker image tags strictly

Signed-off-by: Jakub Sokołowski <jakub@status.im>

* ci: push GIT_REF if it looks like a version

Signed-off-by: Jakub Sokołowski <jakub@status.im>

* fix: update wakuv2 fleet DNS discovery enrtree

https://github.com/status-im/infra-misc/issues/171

* chore: resolving DNS IP and publishing it when no extIp is provided (#2030)

* feat(coverage): Add simple coverage (#2067)

* Add test aggregator to all directories.
* Implement coverage script.

* fix(ci): fix name of discord notify method

Also use absolute path to load Groovy script.

Signed-off-by: Jakub Sokołowski <jakub@status.im>

* chore(networkmonitor): refactor setConnectedPeersMetrics, make it partially concurrent, add version (#2080)

* chore(networkmonitor): refactor setConnectedPeersMetrics, make it partially concurrent, add version

* add more metrics, refactor how most metrics are calculated

* rework metrics table fillup

* reset connErr to make sure we honour successful reconnection

* chore(cbindings): Adding cpp example that integrates the 'libwaku' (#2079)

* Adding cpp example that integrates the `libwaku`

---------

Co-authored-by: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com>

* fix(ci): update the dependency list in pre-release WF (#2088)

* chore: adding NetConfig test suite (#2091)

---------

Signed-off-by: Jakub Sokołowski <jakub@status.im>
Co-authored-by: Jakub Sokołowski <jakub@status.im>
Co-authored-by: Anton Iakimov <yakimant@gmail.com>
Co-authored-by: gabrielmer <101006718+gabrielmer@users.noreply.github.com>
Co-authored-by: Álex Cabeza Romero <alex93cabeza@gmail.com>
Co-authored-by: Vaclav Pavlin <vaclav@status.im>
Co-authored-by: Ivan Folgueira Bande <128452529+Ivansete-status@users.noreply.github.com>
Co-authored-by: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com>
2023-09-30 11:10:52 +05:30

150 lines
4.7 KiB
Nim

# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/options,
stew/[byteutils, results],
chronicles,
chronos
import
../../../common/databases/db_sqlite,
../../../waku_core,
../../common,
../../driver,
./cursor,
./queries
logScope:
topics = "waku archive sqlite"
proc init(db: SqliteDatabase): ArchiveDriverResult[void] =
## Misconfiguration can lead to nil DB
if db.isNil():
return err("db not initialized")
# Create table, if doesn't exist
let resCreate = createTable(db)
if resCreate.isErr():
return err("failed to create table: " & resCreate.error())
# Create indices, if don't exist
let resRtIndex = createOldestMessageTimestampIndex(db)
if resRtIndex.isErr():
return err("failed to create i_rt index: " & resRtIndex.error())
let resMsgIndex = createHistoryQueryIndex(db)
if resMsgIndex.isErr():
return err("failed to create i_msg index: " & resMsgIndex.error())
ok()
type SqliteDriver* = ref object of ArchiveDriver
db: SqliteDatabase
insertStmt: SqliteStmt[InsertMessageParams, void]
proc new*(T: type SqliteDriver, db: SqliteDatabase): ArchiveDriverResult[T] =
# Database initialization
let resInit = init(db)
if resInit.isErr():
return err(resInit.error())
# General initialization
let insertStmt = db.prepareInsertMessageStmt()
ok(SqliteDriver(db: db, insertStmt: insertStmt))
method put*(s: SqliteDriver,
pubsubTopic: PubsubTopic,
message: WakuMessage,
digest: MessageDigest,
receivedTime: Timestamp):
Future[ArchiveDriverResult[void]] {.async.} =
## Inserts a message into the store
let res = s.insertStmt.exec((
@(digest.data), # id
receivedTime, # storedAt
toBytes(message.contentTopic), # contentTopic
message.payload, # payload
toBytes(pubsubTopic), # pubsubTopic
int64(message.version), # version
message.timestamp # senderTimestamp
))
return res
method getAllMessages*(s: SqliteDriver):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## Retrieve all messages from the store.
return s.db.selectAllMessages()
method getMessages*(s: SqliteDriver,
contentTopic: seq[ContentTopic] = @[],
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = DefaultPageSize,
ascendingOrder = true):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
let cursor = cursor.map(toDbCursor)
let rowsRes = s.db.selectMessagesByHistoryQueryWithLimit(
contentTopic,
pubsubTopic,
cursor,
startTime,
endTime,
limit=maxPageSize,
ascending=ascendingOrder
)
return rowsRes
method getMessagesCount*(s: SqliteDriver):
Future[ArchiveDriverResult[int64]] {.async.} =
return s.db.getMessageCount()
method getPagesCount*(s: SqliteDriver):
Future[ArchiveDriverResult[int64]] {.async.} =
return s.db.getPageCount()
method getPagesSize*(s: SqliteDriver):
Future[ArchiveDriverResult[int64]] {.async.} =
return s.db.getPageSize()
method performsVacuum*(s: SqliteDriver):
Future[ArchiveDriverResult[void]] {.async.} =
return s.db.performSqliteVacuum()
method getOldestMessageTimestamp*(s: SqliteDriver):
Future[ArchiveDriverResult[Timestamp]] {.async.} =
return s.db.selectOldestReceiverTimestamp()
method getNewestMessageTimestamp*(s: SqliteDriver):
Future[ArchiveDriverResult[Timestamp]] {.async.} =
return s.db.selectnewestReceiverTimestamp()
method deleteMessagesOlderThanTimestamp*(s: SqliteDriver,
ts: Timestamp):
Future[ArchiveDriverResult[void]] {.async.} =
return s.db.deleteMessagesOlderThanTimestamp(ts)
method deleteOldestMessagesNotWithinLimit*(s: SqliteDriver,
limit: int):
Future[ArchiveDriverResult[void]] {.async.} =
return s.db.deleteOldestMessagesNotWithinLimit(limit)
method close*(s: SqliteDriver):
Future[ArchiveDriverResult[void]] {.async.} =
## Close the database connection
# Dispose statements
s.insertStmt.dispose()
# Close connection
s.db.close()
return ok()