refactor(message_store): move message_store to node module

This commit is contained in:
Lorenzo Delgado 2022-11-04 19:48:22 +01:00 committed by GitHub
parent 12443427a1
commit 4509f4f361
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 86 additions and 319 deletions

View File

@ -28,12 +28,12 @@ import
../../waku/v2/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
../../waku/v2/node/dnsdisc/waku_dnsdisc,
../../waku/v2/node/discv5/waku_discv5,
../../waku/v2/node/storage/migration,
../../waku/v2/node/storage/message/waku_store_queue,
../../waku/v2/node/storage/message/sqlite_store,
../../waku/v2/node/storage/message/message_retention_policy,
../../waku/v2/node/storage/message/message_retention_policy_capacity,
../../waku/v2/node/storage/message/message_retention_policy_time,
../../waku/v2/node/message_store/sqlite_store/migrations as message_store_sqlite_migrations,
../../waku/v2/node/message_store/waku_store_queue,
../../waku/v2/node/message_store/sqlite_store,
../../waku/v2/node/message_store/message_retention_policy,
../../waku/v2/node/message_store/message_retention_policy_capacity,
../../waku/v2/node/message_store/message_retention_policy_time,
../../waku/v2/node/wakuswitch,
../../waku/v2/node/waku_node,
../../waku/v2/node/waku_metrics,
@ -104,15 +104,6 @@ proc performSqliteVacuum(db: SqliteDatabase): SetupResult[void] =
debug "finished sqlite database vacuuming"
proc performDbMigration(db: SqliteDatabase, migrationPath: string): SetupResult[void] =
## Run migration scripts on persistent storage
debug "starting sqlite database migration"
let migrationRes = db.migrate(migrationPath)
if migrationRes.isErr():
return err("failed to execute migration scripts: " & migrationRes.error)
debug "finished sqlite database migration"
const PeerPersistenceDbUrl = "sqlite://peers.db"
@ -189,7 +180,7 @@ proc setupMessageStorage(dbUrl: string, vacuum: bool, migrate: bool): SetupResul
# Database migration
if migrate:
?performDbMigration(db.get(), migrationPath=MessageStoreMigrationPath)
?message_store_sqlite_migrations.migrate(db.get())
# TODO: Extract capacity from `messageRetentionPolicy`
return setupMessagesStore(db, storeCapacity=high(int))

View File

@ -14,7 +14,7 @@ import
libp2p/protocols/pubsub/rpc/message
import
../../waku/v1/node/rpc/hexstrings,
../../waku/v2/node/storage/message/waku_store_queue,
../../waku/v2/node/message_store/waku_store_queue,
../../waku/v2/node/waku_node,
../../waku/v2/node/jsonrpc/[store_api,
relay_api,

View File

@ -5,7 +5,7 @@ import
stew/results,
testutils/unittests
import
../../waku/v2/node/storage/message/waku_store_queue,
../../waku/v2/node/message_store/waku_store_queue,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store,
../../waku/v2/utils/time

View File

@ -8,7 +8,7 @@ import
import
../../waku/v2/protocol/waku_message,
../../waku/v2/utils/time,
../../waku/v2/node/storage/message/queue_store/index,
../../waku/v2/node/message_store/queue_store/index,
./testlib/common

View File

@ -6,7 +6,7 @@ import
nimcrypto/sha2,
libp2p/protobuf/minprotobuf
import
../../waku/v2/node/storage/message/waku_store_queue,
../../waku/v2/node/message_store/waku_store_queue,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_message,
../../waku/v2/utils/time,

View File

@ -6,9 +6,9 @@ import
chronos
import
../../waku/common/sqlite,
../../waku/v2/node/storage/message/sqlite_store,
../../waku/v2/node/storage/message/message_retention_policy,
../../waku/v2/node/storage/message/message_retention_policy_capacity,
../../waku/v2/node/message_store/sqlite_store,
../../waku/v2/node/message_store/message_retention_policy,
../../waku/v2/node/message_store/message_retention_policy_capacity,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store/pagination,
../../waku/v2/utils/time,

View File

@ -6,7 +6,7 @@ import
chronos
import
../../waku/common/sqlite,
../../waku/v2/node/storage/message/sqlite_store,
../../waku/v2/node/message_store/sqlite_store,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store/pagination,
./utils,

View File

@ -7,8 +7,8 @@ import
libp2p/crypto/crypto
import
../../waku/common/sqlite,
../../waku/v2/node/storage/message/waku_store_queue,
../../waku/v2/node/storage/message/sqlite_store,
../../waku/v2/node/message_store/waku_store_queue,
../../waku/v2/node/message_store/sqlite_store,
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store,

View File

@ -7,7 +7,7 @@ import
libp2p/crypto/crypto
import
../../waku/common/sqlite,
../../waku/v2/node/storage/message/sqlite_store,
../../waku/v2/node/message_store/sqlite_store,
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store,

View File

@ -8,7 +8,7 @@ import
libp2p/crypto/crypto
import
../../waku/common/sqlite,
../../waku/v2/node/storage/message/sqlite_store,
../../waku/v2/node/message_store/sqlite_store,
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store,

View File

@ -15,7 +15,7 @@ import
import
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_swap/waku_swap,
../../waku/v2/node/storage/message/waku_store_queue,
../../waku/v2/node/message_store/waku_store_queue,
../../waku/v2/node/waku_node,
../../waku/v2/utils/peers,
../test_helpers,

View File

@ -14,8 +14,8 @@ import
libp2p/protocols/pubsub/gossipsub
import
../../waku/common/sqlite,
../../waku/v2/node/storage/message/sqlite_store,
../../waku/v2/node/storage/message/waku_store_queue,
../../waku/v2/node/message_store/sqlite_store,
../../waku/v2/node/message_store/waku_store_queue,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_filter,

View File

@ -6,7 +6,7 @@ else:
import
stew/results
import
../../../protocol/waku_store/message_store
../../protocol/waku_store/message_store
type RetentionPolicyResult*[T] = Result[T, string]

View File

@ -7,7 +7,7 @@ import
stew/results,
chronicles
import
../../../protocol/waku_store/message_store,
../../protocol/waku_store/message_store,
./message_retention_policy
logScope:

View File

@ -9,8 +9,8 @@ import
chronicles,
chronos
import
../../../protocol/waku_store/message_store,
../../../utils/time,
../../protocol/waku_store/message_store,
../../utils/time,
./message_retention_policy
logScope:

View File

@ -7,9 +7,9 @@ import
stew/byteutils,
nimcrypto/sha2
import
../../../../protocol/waku_message,
../../../../protocol/waku_store/pagination,
../../../../utils/time
../../../protocol/waku_message,
../../../protocol/waku_store/pagination,
../../../utils/time
type Index* = object

View File

@ -8,11 +8,11 @@ import
stew/[results, sorted_set],
chronicles
import
../../../../protocol/waku_message,
../../../../protocol/waku_store/rpc,
../../../../protocol/waku_store/pagination,
../../../../protocol/waku_store/message_store,
../../../../utils/time,
../../../protocol/waku_message,
../../../protocol/waku_store/rpc,
../../../protocol/waku_store/pagination,
../../../protocol/waku_store/message_store,
../../../utils/time,
./index

View File

@ -0,0 +1,38 @@
{.push raises: [].}
import
std/[tables, strutils, os],
stew/results,
chronicles
import
../../../../common/sqlite,
../../../../common/sqlite/migrations
logScope:
topics = "message_store.migration"
const SchemaVersion* = 7 # increase this when there is an update in the database schema
template projectRoot: string = currentSourcePath.rsplit(DirSep, 1)[0] / ".." / ".." / ".." / ".." / ".."
const MessageStoreMigrationPath: string = projectRoot / "migrations" / "message_store"
proc migrate*(db: SqliteDatabase, targetVersion = SchemaVersion): DatabaseResult[void] =
## Compares the `user_version` of the sqlite database with the provided `targetVersion`, then
## it runs migration scripts if the `user_version` is outdated. The `migrationScriptsDir` path
## points to the directory holding the migrations scripts once the db is updated, it sets the
## `user_version` to the `tragetVersion`.
##
## If not `targetVersion` is provided, it defaults to `SchemaVersion`.
##
## NOTE: Down migration it is not currently supported
debug "starting message store's sqlite database migration"
let migrationRes = migrations.migrate(db, targetVersion, migrationsScriptsDir=MessageStoreMigrationPath)
if migrationRes.isErr():
return err("failed to execute migration scripts: " & migrationRes.error)
debug "finished message store's sqlite database migration"
ok()

View File

@ -8,9 +8,9 @@ import
stew/[results, byteutils],
sqlite3_abi
import
../../../../../common/sqlite,
../../../../protocol/waku_message,
../../../../utils/time
../../../../common/sqlite,
../../../protocol/waku_message,
../../../utils/time
const DbTable = "Message"

View File

@ -10,11 +10,11 @@ import
stew/[byteutils, results],
chronicles
import
../../../../../common/sqlite,
../../../../protocol/waku_message,
../../../../protocol/waku_store/pagination,
../../../../protocol/waku_store/message_store,
../../../../utils/time,
../../../../common/sqlite,
../../../protocol/waku_message,
../../../protocol/waku_store/pagination,
../../../protocol/waku_store/message_store,
../../../utils/time,
./queries
logScope:

View File

@ -1,91 +0,0 @@
{.push raises: [Defect].}
import
std/options,
stew/results,
chronicles
import
../../../protocol/waku_message,
../../../protocol/waku_store/pagination,
../../../protocol/waku_store/message_store,
../../../utils/time,
../sqlite,
./waku_store_queue,
./sqlite_store
logScope:
topics = "message_store.dual"
type DualMessageStore* = ref object of MessageStore
inmemory: StoreQueueRef
persistent: SqliteStore
proc init*(T: type DualMessageStore, db: SqliteDatabase, capacity: int): MessageStoreResult[T] =
let
inmemory = StoreQueueRef.new(capacity)
persistent = ?SqliteStore.init(db)
info "loading messages from persistent storage to in-memory store"
let res = persistent.getAllMessages()
if res.isErr():
warn "failed to load messages from the persistent store", err = res.error
else:
for (pubsubTopic, msg, _, storeTimestamp) in res.value:
discard inmemory.put(pubsubTopic, msg, computeDigest(msg), storeTimestamp)
info "successfully loaded messages from the persistent store"
return ok(DualMessageStore(inmemory: inmemory, persistent: persistent))
method put*(s: DualMessageStore, pubsubTopic: string, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] =
?s.inmemory.put(pubsubTopic, message, digest, receivedTime)
?s.persistent.put(pubsubTopic, message, digest, receivedTime)
ok()
method put*(s: DualMessageStore, pubsubTopic: string, message: WakuMessage): MessageStoreResult[void] =
procCall MessageStore(s).put(pubsubTopic, message)
method getAllMessages*(s: DualMessageStore): MessageStoreResult[seq[MessageStoreRow]] =
s.inmemory.getAllMessages()
method getMessagesByHistoryQuery*(
s: DualMessageStore,
contentTopic = none(seq[ContentTopic]),
pubsubTopic = none(string),
cursor = none(PagingIndex),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = MaxPageSize,
ascendingOrder = true
): MessageStoreResult[seq[MessageStoreRow]] =
s.inmemory.getMessagesByHistoryQuery(contentTopic, pubsubTopic, cursor, startTime, endTime, maxPageSize, ascendingOrder)
method getMessagesCount*(s: DualMessageStore): MessageStoreResult[int64] =
s.inmemory.getMessagesCount()
method getOldestMessageTimestamp*(s: DualMessageStore): MessageStoreResult[Timestamp] =
s.inmemory.getOldestMessageTimestamp()
method getNewestMessageTimestamp*(s: DualMessageStore): MessageStoreResult[Timestamp] =
s.inmemory.getNewestMessageTimestamp()
method deleteMessagesOlderThanTimestamp*(s: DualMessageStore, ts: Timestamp): MessageStoreResult[void] =
# NOTE: Current in-memory store deletes messages as they are inserted. This method fails with a "not implemented" error
# ?s.inmemory.deleteMessagesOlderThanTimestamp(ts)
?s.persistent.deleteMessagesOlderThanTimestamp(ts)
ok()
method deleteOldestMessagesNotWithinLimit*(s: DualMessageStore, limit: int): MessageStoreResult[void] =
# NOTE: Current in-memory store deletes messages as they are inserted. This method fails with a "not implemented" error
# ?s.inmemory.deleteOldestMessagesNotWithinLimit(limit)
?s.persistent.deleteOldestMessagesNotWithinLimit(limit)
ok()

View File

@ -1,81 +0,0 @@
import
stew/results,
chronicles
import
../../../common/sqlite,
./migration/migration_types,
./migration/migration_utils
export
migration_types,
migration_utils
logScope:
topics = "storage.migration"
const USER_VERSION* = 7 # increase this when there is an update in the database schema
proc migrate*(db: SqliteDatabase, path: string, targetVersion: int64 = USER_VERSION): DatabaseResult[void] =
## Compares the user_version of the db with the targetVersion
## runs migration scripts if the user_version is outdated (does not support down migration)
## path points to the directory holding the migrations scripts
## once the db is updated, it sets the user_version to the tragetVersion
# read database version
let userVersionRes = db.getUserVersion()
if userVersionRes.isErr():
debug "failed to get user_version", error=userVersionRes.error
let userVersion = userVersionRes.value
debug "current database user_version", userVersion=userVersion, targetVersion=targetVersion
if userVersion == targetVersion:
info "database is up to date"
return ok()
info "database user_version outdated. migrating.", userVersion=userVersion, targetVersion=targetVersion
# fetch migration scripts
let migrationScriptsRes = getScripts(path)
if migrationScriptsRes.isErr():
return err("failed to load migration scripts")
let migrationScripts = migrationScriptsRes.value
# filter scripts based on their versions
let scriptsRes = migrationScripts.filterScripts(userVersion, targetVersion)
if scriptsRes.isErr():
return err("failed to filter migration scripts")
let scripts = scriptsRes.value
if scripts.len == 0:
return err("no suitable migration scripts")
trace "migration scripts", scripts=scripts
# Run the migration scripts
for script in scripts:
for query in script.splitScript():
debug "executing migration statement", statement=query
let execRes = db.query(query, NoopRowHandler)
if execRes.isErr():
error "failed to execute migration statement", statement=query, error=execRes.error
return err("failed to execute migration statement")
debug "migration statement executed succesfully", statement=query
# Update user_version
let res = db.setUserVersion(targetVersion)
if res.isErr():
return err("failed to set the new user_version")
debug "database user_version updated", userVersion=targetVersion
ok()

View File

@ -1,15 +0,0 @@
{.push raises: [].}
import
std/[tables, os, strutils],
stew/results
template projectRoot: string = currentSourcePath.rsplit(DirSep, 1)[0] / ".." / ".." / ".." / ".." / ".."
const MessageStoreMigrationPath* = projectRoot / "migrations" / "message_store"
const PeerStoreMigrationPath* = projectRoot / "migrations" / "peer_store"
type MigrationScriptsResult*[T] = Result[T, string]
type
MigrationScripts* = ref object of RootObj
migrationUp*:OrderedTable[string, string]
migrationDown*:OrderedTable[string, string]

View File

@ -1,75 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[os, algorithm, tables, strutils],
chronicles,
stew/results,
migration_types
export migration_types
logScope:
topics = "storage.migration"
proc getScripts*(migrationPath: string): MigrationScriptsResult[MigrationScripts] =
## the code in this procedure is an adaptation of https://github.com/status-im/nim-status/blob/21aebe41be03cb6450ea261793b800ed7d3e6cda/nim_status/migrations/sql_generate.nim#L4
var migrationScripts = MigrationScripts(migrationUp:initOrderedTable[string, string](), migrationDown:initOrderedTable[string, string]())
try:
for path in walkDirRec(migrationPath):
let (_, name, ext) = splitFile(path)
if ext != ".sql": continue
let parts = name.split(".")
if parts.len < 2:
continue
let script = parts[0]
let direction = parts[1]
debug "name", script=script
case direction:
of "up":
migrationScripts.migrationUp[script] = readFile(path)
debug "up script", readScript=migrationScripts.migrationUp[script]
of "down":
migrationScripts.migrationDown[script] = readFile(path)
debug "down script", readScript=migrationScripts.migrationDown[script]
else:
debug "Invalid script: ", name
migrationScripts.migrationUp.sort(system.cmp)
migrationScripts.migrationDown.sort(system.cmp)
ok(migrationScripts)
except OSError, IOError:
debug "failed to load the migration scripts"
return err("failed to load the migration scripts")
proc filterScripts*(migrationScripts: MigrationScripts, s: int64, e: int64 ): Result[seq[string], string] =
## returns migration scripts whose version fall between s and e (e is inclusive)
var scripts: seq[string]
try:
for name, script in migrationScripts.migrationUp:
let parts = name.split("_")
#TODO this should be int64
let ver = parseInt(parts[0])
# filter scripts based on their version
if s < ver and ver <= e:
scripts.add(script)
ok(scripts)
except ValueError:
return err("failed to filter scripts")
proc splitScript*(script: string): seq[string] =
## parses the script into its individual sql commands and returns them
var queries: seq[string] = @[]
for q in script.split(';'):
if isEmptyOrWhitespace(q): continue
let query = q.strip() & ";"
queries.add(query)
return queries

View File

@ -36,10 +36,10 @@ import
../utils/peers,
../utils/wakuenr,
./peer_manager/peer_manager,
./storage/message/waku_store_queue,
./storage/message/message_retention_policy,
./storage/message/message_retention_policy_capacity,
./storage/message/message_retention_policy_time,
./message_store/waku_store_queue,
./message_store/message_retention_policy,
./message_store/message_retention_policy_capacity,
./message_store/message_retention_policy_time,
./dnsdisc/waku_dnsdisc,
./discv5/waku_discv5,
./wakuswitch

View File

@ -18,8 +18,8 @@ import
libp2p/stream/connection,
metrics
import
../../node/storage/message/message_retention_policy,
../../node/storage/message/waku_store_queue,
../../node/message_store/message_retention_policy,
../../node/message_store/waku_store_queue,
../../node/peer_manager/peer_manager,
../../utils/time,
../waku_message,