mirror of https://github.com/waku-org/nwaku.git
Add db migration support for peer store (#648)
* moves migration paths to the migration_types module * moves USER_VERSION to the migration_types module * updates migration execution condition to cover the peer store * adds a message to the store if it is inserted to the db * adds debug messages and checks migration path recursively * deletes unused debug messages * deletes commented codes * clean up * fixes a bug * renames the first migration file of the message store
This commit is contained in:
parent
05cb073a0f
commit
81b207624b
Binary file not shown.
|
@ -16,8 +16,6 @@ import
|
|||
export sqlite
|
||||
|
||||
const TABLE_TITLE = "Message"
|
||||
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
|
||||
const MESSAGE_STORE_MIGRATION_PATH* = sourceDir / "../migration/migrations_scripts/message"
|
||||
|
||||
# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
|
||||
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
|
||||
|
|
|
@ -1,4 +1,11 @@
|
|||
import tables, stew/results
|
||||
import tables, stew/results, strutils, os
|
||||
|
||||
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
|
||||
const MESSAGE_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts/message"
|
||||
const PEER_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts/peer"
|
||||
const ALL_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts"
|
||||
|
||||
const USER_VERSION* = 2 # increase this when there is an update in the database schema
|
||||
|
||||
type MigrationScriptsResult*[T] = Result[T, string]
|
||||
type
|
||||
|
|
|
@ -8,7 +8,7 @@ proc getScripts*(migrationPath: string): MigrationScriptsResult[MigrationScripts
|
|||
## the code in this procedure is an adaptation of https://github.com/status-im/nim-status/blob/21aebe41be03cb6450ea261793b800ed7d3e6cda/nim_status/migrations/sql_generate.nim#L4
|
||||
var migrationScripts = MigrationScripts(migrationUp:initOrderedTable[string, string](), migrationDown:initOrderedTable[string, string]())
|
||||
try:
|
||||
for kind, path in walkDir(migrationPath):
|
||||
for path in walkDirRec(migrationPath):
|
||||
let (_, name, ext) = splitFile(path)
|
||||
if ext != ".sql": continue
|
||||
|
||||
|
@ -35,6 +35,7 @@ proc getScripts*(migrationPath: string): MigrationScriptsResult[MigrationScripts
|
|||
ok(migrationScripts)
|
||||
|
||||
except OSError, IOError:
|
||||
debug "failed to load the migration scripts"
|
||||
return err("failed to load the migration scripts")
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
CREATE TABLE IF NOT EXISTS Peer (
|
||||
peerId BLOB PRIMARY KEY,
|
||||
storedInfo BLOB,
|
||||
connectedness INTEGER,
|
||||
disconnectTime INTEGER
|
||||
) WITHOUT ROWID;
|
|
@ -30,8 +30,6 @@ type
|
|||
SqliteDatabase* = ref object of RootObj
|
||||
env*: Sqlite
|
||||
|
||||
const USER_VERSION = 2 # increase this when there is a breaking change in the table schema
|
||||
|
||||
template dispose(db: Sqlite) =
|
||||
discard sqlite3_close(db)
|
||||
|
||||
|
@ -235,7 +233,7 @@ proc setUserVersion*(database: SqliteDatabase, version: int64): DatabaseResult[b
|
|||
ok(true)
|
||||
|
||||
|
||||
proc migrate*(db: SqliteDatabase, path: string, targetVersion: int64 = USER_VERSION): DatabaseResult[bool] =
|
||||
proc migrate*(db: SqliteDatabase, path: string, targetVersion: int64 = migration_types.USER_VERSION): DatabaseResult[bool] =
|
||||
## compares the user_version of the db with the targetVersion
|
||||
## runs migration scripts if the user_version is outdated (does not support down migration)
|
||||
## path points to the directory holding the migrations scripts
|
||||
|
|
|
@ -24,6 +24,7 @@ import
|
|||
../utils/peers,
|
||||
./storage/message/message_store,
|
||||
./storage/peer/peer_storage,
|
||||
./storage/migration/migration_types,
|
||||
../utils/requests,
|
||||
./peer_manager/peer_manager
|
||||
|
||||
|
@ -711,6 +712,20 @@ when isMainModule:
|
|||
else:
|
||||
sqliteDatabase = dbRes.value
|
||||
|
||||
if not sqliteDatabase.isNil:
|
||||
var migrationPath = ""
|
||||
if conf.persistPeers and conf.persistMessages: migrationPath = migration_types.ALL_STORE_MIGRATION_PATH
|
||||
elif conf.persistPeers: migrationPath = migration_types.PEER_STORE_MIGRATION_PATH
|
||||
elif conf.persistMessages: migrationPath = migration_types.MESSAGE_STORE_MIGRATION_PATH
|
||||
|
||||
# run migration
|
||||
info "running migration ... "
|
||||
let migrationResult = sqliteDatabase.migrate(migrationPath)
|
||||
if migrationResult.isErr:
|
||||
warn "migration failed"
|
||||
else:
|
||||
info "migration is done"
|
||||
|
||||
var pStorage: WakuPeerStorage
|
||||
|
||||
if conf.persistPeers and not sqliteDatabase.isNil:
|
||||
|
@ -747,14 +762,6 @@ when isMainModule:
|
|||
var store: WakuMessageStore
|
||||
if (not sqliteDatabase.isNil) and conf.persistMessages:
|
||||
|
||||
# run migration
|
||||
info "running migration ... "
|
||||
let migrationResult = sqliteDatabase.migrate(MESSAGE_STORE_MIGRATION_PATH)
|
||||
if migrationResult.isErr:
|
||||
warn "migration failed"
|
||||
else:
|
||||
info "migration is done"
|
||||
|
||||
let res = WakuMessageStore.init(sqliteDatabase)
|
||||
if res.isErr:
|
||||
warn "failed to init WakuMessageStore", err = res.error
|
||||
|
|
|
@ -553,19 +553,20 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo]
|
|||
# store the new message
|
||||
let index = msg.computeIndex()
|
||||
let indexedWakuMsg = IndexedWakuMessage(msg: msg, index: index, pubsubTopic: DefaultTopic)
|
||||
|
||||
# store in db if exists
|
||||
if not ws.store.isNil:
|
||||
let res = ws.store.put(index, msg, DefaultTopic)
|
||||
if res.isErr:
|
||||
warn "failed to store messages", err = res.error
|
||||
waku_store_errors.inc(labelValues = ["store_failure"])
|
||||
continue
|
||||
|
||||
ws.messages.add(indexedWakuMsg)
|
||||
waku_store_messages.inc(labelValues = ["stored"])
|
||||
|
||||
added = added + 1
|
||||
|
||||
# store in db if exists
|
||||
if ws.store.isNil: continue
|
||||
let res = ws.store.put(index, msg, DefaultTopic)
|
||||
if res.isErr:
|
||||
warn "failed to store messages", err = res.error
|
||||
waku_store_errors.inc(labelValues = ["store_failure"])
|
||||
|
||||
|
||||
debug "number of duplicate messages found in resume", dismissed=dismissed
|
||||
debug "number of messages added via resume", added=added
|
||||
|
||||
|
|
Loading…
Reference in New Issue