nwaku/waku/v2/node/waku_setup.nim
2021-12-07 13:06:59 +01:00

116 lines
3.4 KiB
Nim

{.push raises: [Defect].}
## Collection of utilities commonly used
## during the setup phase of a Waku v2 node
import
std/tables,
chronos,
chronicles,
json_rpc/rpcserver,
metrics,
metrics/chronos_httpserver,
stew/results,
stew/shims/net,
./storage/sqlite,
./storage/migration/migration_types,
./jsonrpc/[admin_api,
debug_api,
filter_api,
relay_api,
store_api,
private_api,
debug_api],
./config,
./wakunode2
logScope:
topics = "wakunode.setup"
type
SetupResult*[T] = Result[T, string]
##########################
# Setup helper functions #
##########################
proc startRpc*(node: WakuNode, rpcIp: ValidIpAddress, rpcPort: Port, conf: WakuNodeConf)
{.raises: [Defect, RpcBindError, CatchableError].} =
# @TODO: API handlers still raise CatchableError
let
ta = initTAddress(rpcIp, rpcPort)
rpcServer = newRpcHttpServer([ta])
installDebugApiHandlers(node, rpcServer)
# Install enabled API handlers:
if conf.relay:
let topicCache = newTable[string, seq[WakuMessage]]()
installRelayApiHandlers(node, rpcServer, topicCache)
if conf.rpcPrivate:
# Private API access allows WakuRelay functionality that
# is backwards compatible with Waku v1.
installPrivateApiHandlers(node, rpcServer, node.rng, topicCache)
if conf.filter:
let messageCache = newTable[ContentTopic, seq[WakuMessage]]()
installFilterApiHandlers(node, rpcServer, messageCache)
if conf.store:
installStoreApiHandlers(node, rpcServer)
if conf.rpcAdmin:
installAdminApiHandlers(node, rpcServer)
rpcServer.start()
info "RPC Server started", ta
proc startMetricsServer*(serverIp: ValidIpAddress, serverPort: Port) =
info "Starting metrics HTTP server", serverIp, serverPort
try:
startMetricsHttpServer($serverIp, serverPort)
except Exception as e:
raiseAssert("Exception while starting metrics HTTP server: " & e.msg)
info "Metrics HTTP server started", serverIp, serverPort
proc startMetricsLog*() =
# https://github.com/nim-lang/Nim/issues/17369
var logMetrics: proc(udata: pointer) {.gcsafe, raises: [Defect].}
logMetrics = proc(udata: pointer) =
{.gcsafe.}:
# TODO: libp2p_pubsub_peers is not public, so we need to make this either
# public in libp2p or do our own peer counting after all.
var
totalMessages = 0.float64
for key in waku_node_messages.metrics.keys():
try:
totalMessages = totalMessages + waku_node_messages.value(key)
except KeyError:
discard
info "Node metrics", totalMessages
discard setTimer(Moment.fromNow(2.seconds), logMetrics)
discard setTimer(Moment.fromNow(2.seconds), logMetrics)
proc runMigrations*(sqliteDatabase: SqliteDatabase, conf: WakuNodeConf) =
# Run migration scripts on persistent storage
var migrationPath: string
if conf.persistPeers and conf.persistMessages:
migrationPath = migration_types.ALL_STORE_MIGRATION_PATH
elif conf.persistPeers:
migrationPath = migration_types.PEER_STORE_MIGRATION_PATH
elif conf.persistMessages:
migrationPath = migration_types.MESSAGE_STORE_MIGRATION_PATH
# run migration
info "running migration ...", migrationPath=migrationPath
let migrationResult = sqliteDatabase.migrate(migrationPath)
if migrationResult.isErr:
warn "migration failed", error=migrationResult.error
else:
info "migration is done"