feat(wakunode2): simplify wakunode2 config and decouple peer persistence

This commit is contained in:
Lorenzo Delgado 2022-10-28 00:05:02 +02:00 committed by GitHub
parent 0725da0b66
commit 62328f0abd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 312 additions and 250 deletions

View File

@ -72,30 +72,6 @@ type
defaultValue: false, defaultValue: false,
name: "peer-persistence" }: bool name: "peer-persistence" }: bool
# TODO: Deprecated. Remove in next release
persistPeers* {.
desc: "DEPRECATED: Use '--peer-persistence' instead.",
defaultValue: false,
name: "persist-peers" }: bool
# TODO: Deprecated. Remove in next release
dbPath* {.
desc: "DEPRECATED: Use '--store-message-db-url' instead",
defaultValue: "",
name: "db-path" }: string
# TODO: Deprecated. Remove in next release
dbVacuum* {.
desc: "DEPRECATED: Use '--store-message-db-vacuum' instead",
defaultValue: false,
name: "db-vacuum" }: bool
# TODO: Deprecated. Remove in next release
persistMessages* {.
desc: "DEPRECATED: Use '--store' instead",
defaultValue: false
name: "persist-messages" }: bool
## DNS addrs config ## DNS addrs config
dnsAddrs* {. dnsAddrs* {.
@ -204,9 +180,14 @@ type
store* {. store* {.
desc: "Enable/disable waku store protocol", desc: "Enable/disable waku store protocol",
defaultValue: true defaultValue: true,
name: "store" }: bool name: "store" }: bool
storenode* {.
desc: "Peer multiaddress to query for storage",
defaultValue: "",
name: "storenode" }: string
storeMessageRetentionPolicy* {. storeMessageRetentionPolicy* {.
desc: "Message store retention policy. Time retention policy: 'time:<seconds>'. Capacity retention policy: 'capacity:<count>'", desc: "Message store retention policy. Time retention policy: 'time:<seconds>'. Capacity retention policy: 'capacity:<count>'",
defaultValue: "time:" & $2.days.seconds, defaultValue: "time:" & $2.days.seconds,
@ -231,30 +212,6 @@ type
desc: "Peer multiaddress to resume the message store at boot.", desc: "Peer multiaddress to resume the message store at boot.",
defaultValue: "", defaultValue: "",
name: "store-resume-peer" }: string name: "store-resume-peer" }: string
# TODO: Deprecated. Remove in next release
storenode* {.
desc: "DEPRECATED: Use '--store-resume-peer' instead.",
defaultValue: ""
name: "storenode" }: string
# TODO: Deprecated. Remove in next release
storeCapacity* {.
desc: "DEPRECATED: Use '--store-message-retention-policy=capacity:<count>' instead",
defaultValue: 50000
name: "store-capacity" }: int
# TODO: Deprecated. Remove in next release
sqliteStore* {.
desc: "DEPRECATED: SQLite is the default message store implementation.",
defaultValue: false
name: "sqlite-store" }: bool
# TODO: Deprecated. Remove in next release
sqliteRetentionTime* {.
desc: "DEPRECATED: Use '--store-message-retention-policy=time:<seconds>' instead",
defaultValue: 30.days.seconds
name: "sqlite-retention-time" }: int64
## Filter config ## Filter config

View File

@ -25,18 +25,21 @@ import
../../waku/v2/node/dnsdisc/waku_dnsdisc, ../../waku/v2/node/dnsdisc/waku_dnsdisc,
../../waku/v2/node/discv5/waku_discv5, ../../waku/v2/node/discv5/waku_discv5,
../../waku/v2/node/storage/sqlite, ../../waku/v2/node/storage/sqlite,
../../waku/v2/node/storage/migration,
../../waku/v2/node/storage/peer/waku_peer_storage, ../../waku/v2/node/storage/peer/waku_peer_storage,
../../waku/v2/node/storage/message/waku_store_queue, ../../waku/v2/node/storage/message/waku_store_queue,
../../waku/v2/node/storage/message/dual_message_store,
../../waku/v2/node/storage/message/sqlite_store, ../../waku/v2/node/storage/message/sqlite_store,
../../waku/v2/node/storage/message/message_retention_policy,
../../waku/v2/node/storage/message/message_retention_policy_capacity, ../../waku/v2/node/storage/message/message_retention_policy_capacity,
../../waku/v2/node/storage/message/message_retention_policy_time, ../../waku/v2/node/storage/message/message_retention_policy_time,
../../waku/v2/node/[wakuswitch, waku_node, waku_metrics], ../../waku/v2/node/wakuswitch,
../../waku/v2/utils/[peers, wakuenr], ../../waku/v2/node/waku_node,
../../waku/v2/node/waku_metrics,
../../waku/v2/utils/peers,
../../waku/v2/utils/wakuenr,
../../waku/common/utils/nat, ../../waku/common/utils/nat,
./wakunode2_setup_rest, ./wakunode2_setup_rest,
./wakunode2_setup_rpc, ./wakunode2_setup_rpc,
./wakunode2_setup_sql_migrations,
./config ./config
when defined(rln) or defined(rlnzerokit): when defined(rln) or defined(rlnzerokit):
@ -46,98 +49,157 @@ when defined(rln) or defined(rlnzerokit):
logScope: logScope:
topics = "wakunode.setup" topics = "wakunode"
type SetupResult[T] = Result[T, string] type SetupResult[T] = Result[T, string]
proc setupStorage(conf: WakuNodeConf): proc setupDatabaseConnection(dbUrl: string): SetupResult[Option[SqliteDatabase]] =
SetupResult[tuple[pStorage: WakuPeerStorage, mStorage: MessageStore]] = ## dbUrl mimics SQLAlchemy Database URL schema
## See: https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls
if dbUrl == "":
return ok(none(SqliteDatabase))
## Setup a SQLite Database for a wakunode based on a supplied let dbUrlParts = dbUrl.split("://", 1)
## configuration file and perform all necessary migration. let
## engine = dbUrlParts[0]
## If config allows, return peer storage and message store path = dbUrlParts[1]
## for use elsewhere.
let connRes = case engine
var of "sqlite":
sqliteDatabase: SqliteDatabase # SQLite engine
storeTuple: tuple[pStorage: WakuPeerStorage, mStorage: MessageStore] # See: https://docs.sqlalchemy.org/en/14/core/engines.html#sqlite
SqliteDatabase.new(path)
# Setup database connection
if conf.dbPath != "":
let dbRes = SqliteDatabase.init(conf.dbPath)
if dbRes.isErr():
warn "failed to init database connection", err = dbRes.error
waku_node_errors.inc(labelValues = ["init_db_failure"])
return err("failed to init database connection")
else: else:
sqliteDatabase = dbRes.value return err("unknown database engine")
if connRes.isErr():
return err("failed to init database connection: " & connRes.error)
ok(some(connRes.value))
proc gatherSqlitePageStats(db: SqliteDatabase): SetupResult[(int64, int64, int64)] =
let
pageSize = ?db.getPageSize()
pageCount = ?db.getPageCount()
freelistCount = ?db.getFreelistCount()
ok((pageSize, pageCount, freelistCount))
proc performSqliteVacuum(db: SqliteDatabase): SetupResult[void] =
## SQLite database vacuuming
# TODO: Run vacuuming conditionally based on database page stats
# if (pageCount > 0 and freelistCount > 0):
debug "starting sqlite database vacuuming"
let resVacuum = db.vacuum()
if resVacuum.isErr():
return err("failed to execute vacuum: " & resVacuum.error)
debug "finished sqlite database vacuuming"
proc performDbMigration(db: SqliteDatabase, migrationPath: string): SetupResult[void] =
## Run migration scripts on persistent storage
debug "starting sqlite database migration"
let migrationRes = db.migrate(migrationPath)
if migrationRes.isErr():
return err("failed to execute migration scripts: " & migrationRes.error)
debug "finished sqlite database migration"
if not sqliteDatabase.isNil(): const PeerPersistenceDbUrl = "sqlite://peers.db"
## Database vacuuming
# TODO: Wrap and move this logic to the appropriate module
let
pageSize = ?sqliteDatabase.getPageSize()
pageCount = ?sqliteDatabase.getPageCount()
freelistCount = ?sqliteDatabase.getFreelistCount()
proc setupPeerStorage(): SetupResult[Option[WakuPeerStorage]] =
let db = ?setupDatabaseConnection(PeerPersistenceDbUrl)
?performDbMigration(db.get(), migrationPath=MessageStoreMigrationPath)
let res = WakuPeerStorage.new(db.get())
if res.isErr():
return err("failed to init peer store" & res.error)
ok(some(res.value))
proc setupMessagesStore(db: Option[SqliteDatabase], storeCapacity: int = high(int)): SetupResult[MessageStore] =
if db.isSome():
debug "setting up sqlite-only message store"
let res = SqliteStore.init(db.get())
if res.isErr():
return err("failed to init sqlite message store: " & res.error)
return ok(res.value)
else:
debug "setting up in-memory message store"
let store = StoreQueueRef.new(storeCapacity)
return ok(store)
proc setupMessageStoreRetentionPolicy(retentionPolicy: string): SetupResult[Option[MessageRetentionPolicy]] =
if retentionPolicy == "":
return ok(none(MessageRetentionPolicy))
let rententionPolicyParts = retentionPolicy.split(":", 1)
let
policy = rententionPolicyParts[0]
policyArgs = rententionPolicyParts[1]
if policy == "time":
var retentionTimeSeconds: int64
try:
retentionTimeSeconds = parseInt(policyArgs)
except ValueError:
return err("invalid time retention policy argument")
let retPolicy: MessageRetentionPolicy = TimeRetentionPolicy.init(retentionTimeSeconds)
return ok(some(retPolicy))
elif policy == "capacity":
var retentionCapacity: int
try:
retentionCapacity = parseInt(policyArgs)
except ValueError:
return err("invalid capacity retention policy argument")
let retPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(retentionCapacity)
return ok(some(retPolicy))
else:
return err("unknown retention policy")
proc setupMessageStorage(dbUrl: string, vacuum: bool, migrate: bool): SetupResult[MessageStore] =
let db = ?setupDatabaseConnection(dbUrl)
if db.isSome():
# SQLite vacuum
# TODO: Run this only if the database engine is SQLite
let (pageSize, pageCount, freelistCount) = ?gatherSqlitePageStats(db.get())
debug "sqlite database page stats", pageSize=pageSize, pages=pageCount, freePages=freelistCount debug "sqlite database page stats", pageSize=pageSize, pages=pageCount, freePages=freelistCount
# TODO: Run vacuuming conditionally based on database page stats if vacuum and (pageCount > 0 and freelistCount > 0):
if conf.dbVacuum and (pageCount > 0 and freelistCount > 0): ?performSqliteVacuum(db.get())
debug "starting sqlite database vacuuming"
let resVacuum = sqliteDatabase.vacuum() # Database migration
if resVacuum.isErr(): if migrate:
return err("failed to execute vacuum: " & resVacuum.error()) ?performDbMigration(db.get(), migrationPath=MessageStoreMigrationPath)
debug "finished sqlite database vacuuming" # TODO: Extract capacity from `messageRetentionPolicy`
return setupMessagesStore(db, storeCapacity=high(int))
sqliteDatabase.runMigrations(conf)
if conf.persistPeers: proc retrieveDynamicBootstrapNodes(dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[ValidIpAddress]): SetupResult[seq[RemotePeerInfo]] =
let res = WakuPeerStorage.new(sqliteDatabase)
if res.isErr():
warn "failed to init peer store", err = res.error
waku_node_errors.inc(labelValues = ["init_store_failure"])
else:
storeTuple.pStorage = res.value
if conf.persistMessages:
if conf.sqliteStore:
debug "setting up sqlite-only store"
let res = SqliteStore.init(sqliteDatabase)
if res.isErr():
warn "failed to init message store", err = res.error
waku_node_errors.inc(labelValues = ["init_store_failure"])
else:
storeTuple.mStorage = res.value
elif not sqliteDatabase.isNil():
debug "setting up dual message store"
let res = DualMessageStore.init(sqliteDatabase, conf.storeCapacity)
if res.isErr():
warn "failed to init message store", err = res.error
waku_node_errors.inc(labelValues = ["init_store_failure"])
else:
storeTuple.mStorage = res.value
else:
debug "setting up in-memory store"
storeTuple.mStorage = StoreQueueRef.new(conf.storeCapacity)
ok(storeTuple)
proc retrieveDynamicBootstrapNodes(conf: WakuNodeConf): SetupResult[seq[RemotePeerInfo]] =
if conf.dnsDiscovery and conf.dnsDiscoveryUrl != "": if dnsDiscovery and dnsDiscoveryUrl != "":
# DNS discovery # DNS discovery
debug "Discovering nodes using Waku DNS discovery", url=conf.dnsDiscoveryUrl debug "Discovering nodes using Waku DNS discovery", url=dnsDiscoveryUrl
var nameServers: seq[TransportAddress] var nameServers: seq[TransportAddress]
for ip in conf.dnsDiscoveryNameServers: for ip in dnsDiscoveryNameServers:
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53 nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53
let dnsResolver = DnsResolver.new(nameServers) let dnsResolver = DnsResolver.new(nameServers)
@ -147,9 +209,8 @@ proc retrieveDynamicBootstrapNodes(conf: WakuNodeConf): SetupResult[seq[RemotePe
let resolved = await dnsResolver.resolveTxt(domain) let resolved = await dnsResolver.resolveTxt(domain)
return resolved[0] # Use only first answer return resolved[0] # Use only first answer
var wakuDnsDiscovery = WakuDnsDiscovery.init(conf.dnsDiscoveryUrl, var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver)
resolver) if wakuDnsDiscovery.isOk():
if wakuDnsDiscovery.isOk:
return wakuDnsDiscovery.get().findPeers() return wakuDnsDiscovery.get().findPeers()
.mapErr(proc (e: cstring): string = $e) .mapErr(proc (e: cstring): string = $e)
else: else:
@ -159,7 +220,7 @@ proc retrieveDynamicBootstrapNodes(conf: WakuNodeConf): SetupResult[seq[RemotePe
ok(newSeq[RemotePeerInfo]()) # Return an empty seq by default ok(newSeq[RemotePeerInfo]()) # Return an empty seq by default
proc initNode(conf: WakuNodeConf, proc initNode(conf: WakuNodeConf,
pStorage: WakuPeerStorage = nil, peerStore: Option[WakuPeerStorage],
dynamicBootstrapNodes: openArray[RemotePeerInfo] = @[]): SetupResult[WakuNode] = dynamicBootstrapNodes: openArray[RemotePeerInfo] = @[]): SetupResult[WakuNode] =
## Setup a basic Waku v2 node based on a supplied configuration ## Setup a basic Waku v2 node based on a supplied configuration
@ -204,6 +265,9 @@ proc initNode(conf: WakuNodeConf,
conf.relay) conf.relay)
var node: WakuNode var node: WakuNode
let pStorage = if peerStore.isNone(): nil
else: peerStore.get()
try: try:
node = WakuNode.new(conf.nodekey, node = WakuNode.new(conf.nodekey,
conf.listenAddress, Port(uint16(conf.tcpPort) + conf.portsShift), conf.listenAddress, Port(uint16(conf.tcpPort) + conf.portsShift),
@ -259,7 +323,9 @@ proc initNode(conf: WakuNodeConf,
ok(node) ok(node)
proc setupProtocols(node: WakuNode, conf: WakuNodeConf, mStorage: MessageStore): Future[SetupResult[void]] {.async.} = proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
mStore: Option[MessageStore],
mStoreRetentionPolicy: Option[MessageRetentionPolicy]): Future[SetupResult[void]] {.async.} =
## Setup configured protocols on an existing Waku v2 node. ## Setup configured protocols on an existing Waku v2 node.
## Optionally include persistent message storage. ## Optionally include persistent message storage.
## No protocols are started yet. ## No protocols are started yet.
@ -312,17 +378,19 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf, mStorage: MessageStore):
return err("failed to mount waku swap protocol: " & getCurrentExceptionMsg()) return err("failed to mount waku swap protocol: " & getCurrentExceptionMsg())
# Store setup # Store setup
if (conf.storenode != "") or (conf.store): if conf.store:
let retentionPolicy = if conf.sqliteStore: TimeRetentionPolicy.init(conf.sqliteRetentionTime)
else: CapacityRetentionPolicy.init(conf.storeCapacity)
try: try:
await mountStore(node, mStorage, retentionPolicy=some(retentionPolicy)) # TODO: Decouple message store and message retention policy from waku store protocol object
let mStorage = if mStore.isNone(): nil
else: mStore.get()
await mountStore(node, mStorage, retentionPolicy=mStoreRetentionPolicy)
except: except:
return err("failed to mount waku store protocol: " & getCurrentExceptionMsg()) return err("failed to mount waku store protocol: " & getCurrentExceptionMsg())
executeMessageRetentionPolicy(node) # TODO: Move this to storage setup phase
startMessageRetentionPolicyPeriodicTask(node, interval=MessageStoreDefaultRetentionPolicyInterval) if mStoreRetentionPolicy.isSome():
executeMessageRetentionPolicy(node)
startMessageRetentionPolicyPeriodicTask(node, interval=MessageStoreDefaultRetentionPolicyInterval)
if conf.storenode != "": if conf.storenode != "":
try: try:
@ -373,10 +441,10 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf, mStorage: MessageStore):
return ok() return ok()
proc startNode(node: WakuNode, conf: WakuNodeConf, proc startNode(node: WakuNode, conf: WakuNodeConf,
dynamicBootstrapNodes: seq[RemotePeerInfo] = @[]): Future[SetupResult[void]] {.async.} = dynamicBootstrapNodes: seq[RemotePeerInfo] = @[]): Future[SetupResult[void]] {.async.} =
## Start a configured node and all mounted protocols. ## Start a configured node and all mounted protocols.
## Resume history, connect to static nodes and start ## Connect to static nodes and start
## keep-alive, if configured. ## keep-alive, if configured.
# Start Waku v2 node # Start Waku v2 node
@ -393,14 +461,6 @@ proc startNode(node: WakuNode, conf: WakuNodeConf,
except: except:
return err("failed to start waku discovery v5: " & getCurrentExceptionMsg()) return err("failed to start waku discovery v5: " & getCurrentExceptionMsg())
# Resume historical messages, this has to be called after the node has been started
if conf.store and conf.persistMessages:
try:
await node.resume()
except:
return err("failed to resume messages history: " & getCurrentExceptionMsg())
# Connect to configured static nodes # Connect to configured static nodes
if conf.staticnodes.len > 0: if conf.staticnodes.len > 0:
try: try:
@ -415,7 +475,6 @@ proc startNode(node: WakuNode, conf: WakuNodeConf,
except: except:
return err("failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg()) return err("failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg())
# retrieve and connect to peer exchange peers # retrieve and connect to peer exchange peers
if conf.peerExchangeNode != "": if conf.peerExchangeNode != "":
info "Retrieving peer info via peer exchange protocol" info "Retrieving peer info via peer exchange protocol"
@ -431,27 +490,42 @@ proc startNode(node: WakuNode, conf: WakuNodeConf,
return ok() return ok()
proc startExternal(node: WakuNode, conf: WakuNodeConf): SetupResult[void] = proc resumeMessageStore(node: WakuNode, address: string): Future[SetupResult[void]] {.async.} =
## Start configured external interfaces and monitoring tools # Resume historical messages, this has to be called after the node has been started
## on a Waku v2 node, including the RPC API, REST API and metrics if address != "":
## monitoring ports. return err("empty peer multiaddres")
if conf.rpc: var remotePeer: RemotePeerInfo
try: try:
startRpcServer(node, conf.rpcAddress, Port(conf.rpcPort + conf.portsShift), conf) remotePeer = parseRemotePeerInfo(address)
except: except:
return err("failed to start the json-rpc server: " & getCurrentExceptionMsg()) return err("invalid peer multiaddress: " & getCurrentExceptionMsg())
if conf.rest: try:
startRestServer(node, conf.restAddress, Port(conf.restPort + conf.portsShift), conf) await node.resume(some(@[remotePeer]))
except:
return err("failed to resume messages history: " & getCurrentExceptionMsg())
if conf.metricsLogging:
startMetricsLog()
if conf.metricsServer: proc startRpcServer(node: WakuNode, address: ValidIpAddress, port: uint16, portsShift: uint16, conf: WakuNodeConf): SetupResult[void] =
startMetricsServer(conf.metricsServerAddress, try:
Port(conf.metricsServerPort + conf.portsShift)) startRpcServer(node, address, Port(port + portsShift), conf)
except:
return err("failed to start the json-rpc server: " & getCurrentExceptionMsg())
ok()
proc startRestServer(node: WakuNode, address: ValidIpAddress, port: uint16, portsShift: uint16, conf: WakuNodeConf): SetupResult[void] =
startRestServer(node, address, Port(port + portsShift), conf)
ok()
proc startMetricsServer(node: WakuNode, address: ValidIpAddress, port: uint16, portsShift: uint16): SetupResult[void] =
startMetricsServer(address, Port(port + portsShift))
ok()
proc startMetricsLogging(): SetupResult[void] =
startMetricsLog()
ok() ok()
@ -474,7 +548,7 @@ when isMainModule:
) )
except CatchableError: except CatchableError:
error "Failure while loading the configuration: \n", error=getCurrentExceptionMsg() error "Failure while loading the configuration: \n", error=getCurrentExceptionMsg()
quit 1 # if we don't leave here, the initialization of conf does not work in the success case quit(QuitFailure) # if we don't leave here, the initialization of conf does not work in the success case
{.pop.} {.pop.}
# if called with --version, print the version and quit # if called with --version, print the version and quit
@ -486,46 +560,84 @@ when isMainModule:
if conf.logLevel != LogLevel.NONE: if conf.logLevel != LogLevel.NONE:
setLogLevel(conf.logLevel) setLogLevel(conf.logLevel)
var
node: WakuNode # This is the node we're going to setup using the conf
############## ##############
# Node setup # # Node setup #
############## ##############
debug "1/7 Setting up storage" debug "1/7 Setting up storage"
## Peer persistence
var peerStore = none(WakuPeerStorage)
var if conf.peerPersistence:
pStorage: WakuPeerStorage let peerStoreRes = setupPeerStorage();
mStorage: MessageStore if peerStoreRes.isOk():
peerStore = peerStoreRes.get()
else:
error "failed to setup peer store", error=peerStoreRes.error
waku_node_errors.inc(labelValues = ["init_store_failure"])
let setupStorageRes = setupStorage(conf) ## Message store
if setupStorageRes.isErr(): var messageStore = none(MessageStore)
error "1/7 Setting up storage failed. Continuing without storage.", error=setupStorageRes.error var messageStoreRetentionPolicy = none(MessageRetentionPolicy)
else:
(pStorage, mStorage) = setupStorageRes.get() if conf.store:
# Message storage
let dbUrlValidationRes = validateDbUrl(conf.storeMessageDbUrl)
if dbUrlValidationRes.isErr():
error "failed to configure the message store database connection", error=dbUrlValidationRes.error
quit(QuitFailure)
let messageStoreRes = setupMessageStorage(dbUrlValidationRes.get(), vacuum=conf.storeMessageDbVacuum, migrate=conf.storeMessageDbMigration)
if messageStoreRes.isOk():
messageStore = some(messageStoreRes.get())
else:
error "failed to configure message store", error=messageStoreRes.error
quit(QuitFailure)
# Message store retention policy
let storeMessageRetentionPolicyRes = validateStoreMessageRetentionPolicy(conf.storeMessageRetentionPolicy)
if storeMessageRetentionPolicyRes.isErr():
error "invalid store message retention policy configuration", error=storeMessageRetentionPolicyRes.error
quit(QuitFailure)
let messageStoreRetentionPolicyRes = setupMessageStoreRetentionPolicy(storeMessageRetentionPolicyRes.get())
if messageStoreRetentionPolicyRes.isOk():
messageStoreRetentionPolicy = messageStoreRetentionPolicyRes.get()
else:
error "failed to configure the message retention policy", error=messageStoreRetentionPolicyRes.error
quit(QuitFailure)
# TODO: Move retention policy execution here
# if messageStoreRetentionPolicy.isSome():
# executeMessageRetentionPolicy(node)
# startMessageRetentionPolicyPeriodicTask(node, interval=MessageStoreDefaultRetentionPolicyInterval)
debug "2/7 Retrieve dynamic bootstrap nodes" debug "2/7 Retrieve dynamic bootstrap nodes"
var dynamicBootstrapNodes: seq[RemotePeerInfo] var dynamicBootstrapNodes: seq[RemotePeerInfo]
let dynamicBootstrapNodesRes = retrieveDynamicBootstrapNodes(conf) let dynamicBootstrapNodesRes = retrieveDynamicBootstrapNodes(conf.dnsDiscovery, conf.dnsDiscoveryUrl, conf.dnsDiscoveryNameServers)
if dynamicBootstrapNodesRes.isErr(): if dynamicBootstrapNodesRes.isOk():
warn "2/7 Retrieving dynamic bootstrap nodes failed. Continuing without dynamic bootstrap nodes.", error=dynamicBootstrapNodesRes.error
else:
dynamicBootstrapNodes = dynamicBootstrapNodesRes.get() dynamicBootstrapNodes = dynamicBootstrapNodesRes.get()
else:
warn "2/7 Retrieving dynamic bootstrap nodes failed. Continuing without dynamic bootstrap nodes.", error=dynamicBootstrapNodesRes.error
debug "3/7 Initializing node" debug "3/7 Initializing node"
let initNodeRes = initNode(conf, pStorage, dynamicBootstrapNodes) var node: WakuNode # This is the node we're going to setup using the conf
if initNodeRes.isErr():
let initNodeRes = initNode(conf, peerStore, dynamicBootstrapNodes)
if initNodeRes.isok():
node = initNodeRes.get()
else:
error "3/7 Initializing node failed. Quitting.", error=initNodeRes.error error "3/7 Initializing node failed. Quitting.", error=initNodeRes.error
quit(QuitFailure) quit(QuitFailure)
else:
node = initNodeRes.get()
debug "4/7 Mounting protocols" debug "4/7 Mounting protocols"
let setupProtocolsRes = waitFor setupProtocols(node, conf, mStorage) let setupProtocolsRes = waitFor setupProtocols(node, conf, messageStore, messageStoreRetentionPolicy)
if setupProtocolsRes.isErr(): if setupProtocolsRes.isErr():
error "4/7 Mounting protocols failed. Continuing in current state.", error=setupProtocolsRes.error error "4/7 Mounting protocols failed. Continuing in current state.", error=setupProtocolsRes.error
@ -535,15 +647,37 @@ when isMainModule:
if startNodeRes.isErr(): if startNodeRes.isErr():
error "5/7 Starting node and mounted protocols failed. Continuing in current state.", error=startNodeRes.error error "5/7 Starting node and mounted protocols failed. Continuing in current state.", error=startNodeRes.error
# Resume message store on boot
if conf.storeResumePeer != "":
let resumeMessageStoreRes = waitFor resumeMessageStore(node, conf.storeResumePeer)
if resumeMessageStoreRes.isErr():
error "failed to resume message store from peer node. Continuing in current state", error=resumeMessageStoreRes.error
debug "6/7 Starting monitoring and external interfaces" debug "6/7 Starting monitoring and external interfaces"
let startExternalRes = startExternal(node, conf) if conf.rpc:
if startExternalRes.isErr(): let startRpcServerRes = startRpcServer(node, conf.rpcAddress, conf.rpcPort, conf.portsShift, conf)
error "6/7 Starting monitoring and external interfaces failed. Continuing in current state.", error=startExternalRes.error if startRpcServerRes.isErr():
error "6/7 Starting JSON-RPC server failed. Continuing in current state.", error=startRpcServerRes.error
if conf.rest:
let startRestServerRes = startRestServer(node, conf.restAddress, conf.restPort, conf.portsShift, conf)
if startRestServerRes.isErr():
error "6/7 Starting REST server failed. Continuing in current state.", error=startRestServerRes.error
if conf.metricsServer:
let startMetricsServerRes = startMetricsServer(node, conf.metricsServerAddress, conf.metricsServerPort, conf.portsShift)
if startMetricsServerRes.isErr():
error "6/7 Starting metrics server failed. Continuing in current state.", error=startMetricsServerRes.error
if conf.metricsLogging:
let startMetricsLoggingRes = startMetricsLogging()
if startMetricsLoggingRes.isErr():
error "6/7 Starting metrics console logging failed. Continuing in current state.", error=startMetricsLoggingRes.error
debug "7/7 Setting up shutdown hooks" debug "7/7 Setting up shutdown hooks"
# 7/7 Setup graceful shutdown hooks
## Setup shutdown hooks for this process. ## Setup shutdown hooks for this process.
## Stop node gracefully on shutdown. ## Stop node gracefully on shutdown.

View File

@ -1,29 +0,0 @@
{.push raises: [Defect].}
import
stew/results,
chronicles
import
../../waku/v2/node/storage/sqlite,
../../waku/v2/node/storage/migration,
./config
logScope:
topics = "wakunode.setup.migrations"
proc runMigrations*(sqliteDatabase: SqliteDatabase, conf: WakuNodeConf) =
# Run migration scripts on persistent storage
var migrationPath: string
if conf.persistPeers and conf.persistMessages:
migrationPath = ALL_STORE_MIGRATION_PATH
elif conf.persistPeers:
migrationPath = PEER_STORE_MIGRATION_PATH
elif conf.persistMessages:
migrationPath = MESSAGE_STORE_MIGRATION_PATH
let migrationResult = sqliteDatabase.migrate(migrationPath)
if migrationResult.isErr():
warn "migration failed", error=migrationResult.error
else:
info "migration is done"

View File

@ -18,7 +18,7 @@ import
proc newTestDatabase(): SqliteDatabase = proc newTestDatabase(): SqliteDatabase =
SqliteDatabase.init("", inMemory = true).tryGet() SqliteDatabase.new(":memory:").tryGet()
suite "SQLite message store - init store": suite "SQLite message store - init store":
@ -262,7 +262,7 @@ suite "Message Store":
overload = 65 overload = 65
let let
database = SqliteDatabase.init("", inMemory = true)[] database = newTestDatabase()
store = SqliteStore.init(database).tryGet() store = SqliteStore.init(database).tryGet()
retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity) retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity)

View File

@ -16,7 +16,7 @@ import
proc newTestDatabase(): SqliteDatabase = proc newTestDatabase(): SqliteDatabase =
SqliteDatabase.init("", inMemory = true).tryGet() SqliteDatabase.new(":memory:").tryGet()
suite "message store - history query": suite "message store - history query":

View File

@ -2,16 +2,17 @@
import import
std/[unittest, sets], std/[unittest, sets],
libp2p/crypto/crypto, libp2p/crypto/crypto
../test_helpers, import
../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/node/storage/peer/waku_peer_storage ../../waku/v2/node/storage/peer/waku_peer_storage,
../test_helpers
suite "Peer Storage": suite "Peer Storage":
test "Store, replace and retrieve from persistent peer storage": test "Store, replace and retrieve from persistent peer storage":
let let
database = SqliteDatabase.init("", inMemory = true)[] database = SqliteDatabase.new(":memory:").tryGet()
storage = WakuPeerStorage.new(database)[] storage = WakuPeerStorage.new(database)[]
# Test Peer # Test Peer

View File

@ -28,7 +28,7 @@ import
proc newTestMessageStore(): MessageStore = proc newTestMessageStore(): MessageStore =
let database = SqliteDatabase.init("", inMemory = true)[] let database = SqliteDatabase.new(":memory:").tryGet()
SqliteStore.init(database).tryGet() SqliteStore.init(database).tryGet()

View File

@ -3,9 +3,9 @@
import tables, stew/results, strutils, os import tables, stew/results, strutils, os
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0] template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
const MESSAGE_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts/message" const MessageStoreMigrationPath* = sourceDir / "migrations_scripts/message"
const PEER_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts/peer" const PeerStoreMigrationPath* = sourceDir / "migrations_scripts/peer"
const ALL_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts" const AllStoreMigrationPath* = sourceDir / "migrations_scripts"
type MigrationScriptsResult*[T] = Result[T, string] type MigrationScriptsResult*[T] = Result[T, string]
type type

View File

@ -64,30 +64,20 @@ type DataProc* = proc(s: RawStmtPtr) {.closure.} # the nim-eth definition is dif
const NoopRowHandler* = proc(s: RawStmtPtr) {.closure.} = discard const NoopRowHandler* = proc(s: RawStmtPtr) {.closure.} = discard
proc init*( proc new*(T: type SqliteDatabase, path: string, readOnly=false): DatabaseResult[T] =
T: type SqliteDatabase,
basePath: string,
name: string = "store",
readOnly = false,
inMemory = false): DatabaseResult[T] =
var env: AutoDisposed[ptr sqlite3] var env: AutoDisposed[ptr sqlite3]
defer: disposeIfUnreleased(env) defer: disposeIfUnreleased(env)
let let flags = if readOnly: SQLITE_OPEN_READONLY
name = else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE
if inMemory: ":memory:"
else: basepath / name & ".sqlite3"
flags =
if readOnly: SQLITE_OPEN_READONLY
else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE
if not inMemory: if path != ":memory:":
try: try:
createDir(basePath) createDir(parentDir(path))
except OSError, IOError: except OSError, IOError:
return err("`sqlite: cannot create database directory") return err("sqlite: cannot create database directory")
checkErr sqlite3_open_v2(name, addr env.val, flags.cint, nil) checkErr sqlite3_open_v2(path, addr env.val, flags.cint, nil)
template prepare(q: string, cleanup: untyped): ptr sqlite3_stmt = template prepare(q: string, cleanup: untyped): ptr sqlite3_stmt =
var s: ptr sqlite3_stmt var s: ptr sqlite3_stmt
@ -126,9 +116,18 @@ proc init*(
checkWalPragmaResult(journalModePragma) checkWalPragmaResult(journalModePragma)
checkExec(journalModePragma) checkExec(journalModePragma)
ok(SqliteDatabase( ok(SqliteDatabase(env: env.release))
env: env.release
)) proc init*(
T: type SqliteDatabase,
basePath: string,
name: string = "store",
readOnly = false,
inMemory = false): DatabaseResult[T] {.deprecated: "use `SqliteDatabase.new()` instead".} =
let path = if inMemory: ":memory:"
else: basePath / name & ".sqlite3"
SqliteDatabase.new(path, readOnly)
template prepare*(env: Sqlite, q: string, cleanup: untyped): ptr sqlite3_stmt = template prepare*(env: Sqlite, q: string, cleanup: untyped): ptr sqlite3_stmt =
var s: ptr sqlite3_stmt var s: ptr sqlite3_stmt