mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-08 00:43:06 +00:00
refactor(wakunode2): split setup logic into app module
This commit is contained in:
parent
bcdb7e9cfb
commit
1c665742be
2
.github/workflows/ci-experimental.yml
vendored
2
.github/workflows/ci-experimental.yml
vendored
@ -122,4 +122,4 @@ jobs:
|
|||||||
key: ${{ runner.os }}-zerokit-${{ steps.submodules.outputs.zerokit-hash }}
|
key: ${{ runner.os }}-zerokit-${{ steps.submodules.outputs.zerokit-hash }}
|
||||||
|
|
||||||
- name: Run tests
|
- name: Run tests
|
||||||
run: make V=1 LOG_LEVEL=DEBUG test2
|
run: make V=1 LOG_LEVEL=DEBUG test2 testwakunode2
|
||||||
|
|||||||
2
.github/workflows/ci.yml
vendored
2
.github/workflows/ci.yml
vendored
@ -117,7 +117,7 @@ jobs:
|
|||||||
key: ${{ runner.os }}-nim-${{ steps.submodules.outputs.nim-hash }}
|
key: ${{ runner.os }}-nim-${{ steps.submodules.outputs.nim-hash }}
|
||||||
|
|
||||||
- name: Run tests
|
- name: Run tests
|
||||||
run: make V=1 LOG_LEVEL=DEBUG test2
|
run: make V=1 LOG_LEVEL=DEBUG test2 testwakunode2
|
||||||
|
|
||||||
|
|
||||||
build-legacy:
|
build-legacy:
|
||||||
|
|||||||
10
Makefile
10
Makefile
@ -146,7 +146,7 @@ testcommon: | build deps
|
|||||||
#############
|
#############
|
||||||
## Waku v2 ##
|
## Waku v2 ##
|
||||||
#############
|
#############
|
||||||
.PHONY: testwaku2 wakunode2 example2 sim2 scripts2 wakubridge chat2 chat2bridge
|
.PHONY: testwaku2 wakunode2 testwakunode2 example2 sim2 scripts2 wakubridge testbridge chat2 chat2bridge
|
||||||
|
|
||||||
testwaku2: | build deps librln
|
testwaku2: | build deps librln
|
||||||
echo -e $(BUILD_MSG) "build/$@" && \
|
echo -e $(BUILD_MSG) "build/$@" && \
|
||||||
@ -156,6 +156,10 @@ wakunode2: | build deps librln
|
|||||||
echo -e $(BUILD_MSG) "build/$@" && \
|
echo -e $(BUILD_MSG) "build/$@" && \
|
||||||
$(ENV_SCRIPT) nim wakunode2 $(NIM_PARAMS) $(EXPERIMENTAL_PARAMS) waku.nims
|
$(ENV_SCRIPT) nim wakunode2 $(NIM_PARAMS) $(EXPERIMENTAL_PARAMS) waku.nims
|
||||||
|
|
||||||
|
testwakunode2: | build deps librln
|
||||||
|
echo -e $(BUILD_MSG) "build/$@" && \
|
||||||
|
$(ENV_SCRIPT) nim testwakunode2 $(NIM_PARAMS) $(EXPERIMENTAL_PARAMS) waku.nims
|
||||||
|
|
||||||
example2: | build deps
|
example2: | build deps
|
||||||
echo -e $(BUILD_MSG) "build/$@" && \
|
echo -e $(BUILD_MSG) "build/$@" && \
|
||||||
$(ENV_SCRIPT) nim example2 $(NIM_PARAMS) waku.nims
|
$(ENV_SCRIPT) nim example2 $(NIM_PARAMS) waku.nims
|
||||||
@ -174,6 +178,10 @@ wakubridge: | build deps
|
|||||||
echo -e $(BUILD_MSG) "build/$@" && \
|
echo -e $(BUILD_MSG) "build/$@" && \
|
||||||
$(ENV_SCRIPT) nim bridge $(NIM_PARAMS) waku.nims
|
$(ENV_SCRIPT) nim bridge $(NIM_PARAMS) waku.nims
|
||||||
|
|
||||||
|
testbridge: | build deps librln
|
||||||
|
echo -e $(BUILD_MSG) "build/$@" && \
|
||||||
|
$(ENV_SCRIPT) nim testbridge $(NIM_PARAMS) $(EXPERIMENTAL_PARAMS) waku.nims
|
||||||
|
|
||||||
chat2: | build deps librln
|
chat2: | build deps librln
|
||||||
echo -e $(BUILD_MSG) "build/$@" && \
|
echo -e $(BUILD_MSG) "build/$@" && \
|
||||||
$(ENV_SCRIPT) nim chat2 $(NIM_PARAMS) $(EXPERIMENTAL_PARAMS) waku.nims
|
$(ENV_SCRIPT) nim chat2 $(NIM_PARAMS) $(EXPERIMENTAL_PARAMS) waku.nims
|
||||||
|
|||||||
771
apps/wakunode2/app.nim
Normal file
771
apps/wakunode2/app.nim
Normal file
@ -0,0 +1,771 @@
|
|||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/[options, strutils, sequtils],
|
||||||
|
stew/results,
|
||||||
|
chronicles,
|
||||||
|
chronos,
|
||||||
|
libp2p/crypto/crypto,
|
||||||
|
libp2p/nameresolving/dnsresolver,
|
||||||
|
libp2p/protocols/pubsub/gossipsub,
|
||||||
|
libp2p/peerid,
|
||||||
|
eth/keys,
|
||||||
|
eth/net/nat,
|
||||||
|
json_rpc/rpcserver,
|
||||||
|
presto
|
||||||
|
import
|
||||||
|
../../waku/common/sqlite,
|
||||||
|
../../waku/v2/waku_core,
|
||||||
|
../../waku/v2/waku_node,
|
||||||
|
../../waku/v2/node/waku_metrics,
|
||||||
|
../../waku/v2/node/peer_manager,
|
||||||
|
../../waku/v2/node/peer_manager/peer_store/waku_peer_storage,
|
||||||
|
../../waku/v2/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
|
||||||
|
../../waku/v2/waku_archive,
|
||||||
|
../../waku/v2/waku_archive/driver/queue_driver,
|
||||||
|
../../waku/v2/waku_archive/driver/sqlite_driver,
|
||||||
|
../../waku/v2/waku_archive/driver/sqlite_driver/migrations as archive_driver_sqlite_migrations,
|
||||||
|
../../waku/v2/waku_archive/retention_policy,
|
||||||
|
../../waku/v2/waku_archive/retention_policy/retention_policy_capacity,
|
||||||
|
../../waku/v2/waku_archive/retention_policy/retention_policy_time,
|
||||||
|
../../waku/v2/waku_dnsdisc,
|
||||||
|
../../waku/v2/waku_enr,
|
||||||
|
../../waku/v2/waku_discv5,
|
||||||
|
../../waku/v2/waku_peer_exchange,
|
||||||
|
../../waku/v2/waku_relay/validators,
|
||||||
|
../../waku/v2/waku_store,
|
||||||
|
../../waku/v2/waku_lightpush,
|
||||||
|
../../waku/v2/waku_filter,
|
||||||
|
./config
|
||||||
|
|
||||||
|
when defined(rln):
|
||||||
|
import ../../waku/v2/waku_rln_relay
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "wakunode app"
|
||||||
|
|
||||||
|
|
||||||
|
# Git version in git describe format (defined at compile time)
|
||||||
|
const git_version* {.strdefine.} = "n/a"
|
||||||
|
|
||||||
|
type
|
||||||
|
App* = object
|
||||||
|
version: string
|
||||||
|
conf: WakuNodeConf
|
||||||
|
|
||||||
|
rng: ref HmacDrbgContext
|
||||||
|
peerStore: Option[WakuPeerStorage]
|
||||||
|
archiveDriver: Option[ArchiveDriver]
|
||||||
|
archiveRetentionPolicy: Option[RetentionPolicy]
|
||||||
|
dynamicBootstrapNodes: seq[RemotePeerInfo]
|
||||||
|
|
||||||
|
node: WakuNode
|
||||||
|
|
||||||
|
rpcServer: Option[RpcHttpServer]
|
||||||
|
restServer: Option[RestServerRef]
|
||||||
|
|
||||||
|
AppResult*[T] = Result[T, string]
|
||||||
|
|
||||||
|
|
||||||
|
func node*(app: App): WakuNode =
|
||||||
|
app.node
|
||||||
|
|
||||||
|
func version*(app: App): string =
|
||||||
|
app.version
|
||||||
|
|
||||||
|
|
||||||
|
## Initialisation
|
||||||
|
|
||||||
|
proc init*(T: type App, rng: ref HmacDrbgContext, conf: WakuNodeConf): T =
|
||||||
|
App(version: git_version, conf: conf, rng: rng, node: nil)
|
||||||
|
|
||||||
|
|
||||||
|
## SQLite database
|
||||||
|
|
||||||
|
proc setupDatabaseConnection(dbUrl: string): AppResult[Option[SqliteDatabase]] =
|
||||||
|
## dbUrl mimics SQLAlchemy Database URL schema
|
||||||
|
## See: https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls
|
||||||
|
if dbUrl == "" or dbUrl == "none":
|
||||||
|
return ok(none(SqliteDatabase))
|
||||||
|
|
||||||
|
let dbUrlParts = dbUrl.split("://", 1)
|
||||||
|
let
|
||||||
|
engine = dbUrlParts[0]
|
||||||
|
path = dbUrlParts[1]
|
||||||
|
|
||||||
|
let connRes = case engine
|
||||||
|
of "sqlite":
|
||||||
|
# SQLite engine
|
||||||
|
# See: https://docs.sqlalchemy.org/en/14/core/engines.html#sqlite
|
||||||
|
SqliteDatabase.new(path)
|
||||||
|
|
||||||
|
else:
|
||||||
|
return err("unknown database engine")
|
||||||
|
|
||||||
|
if connRes.isErr():
|
||||||
|
return err("failed to init database connection: " & connRes.error)
|
||||||
|
|
||||||
|
ok(some(connRes.value))
|
||||||
|
|
||||||
|
|
||||||
|
## Peer persistence
|
||||||
|
|
||||||
|
const PeerPersistenceDbUrl = "sqlite://peers.db"
|
||||||
|
|
||||||
|
proc setupPeerStorage(): AppResult[Option[WakuPeerStorage]] =
|
||||||
|
let db = ?setupDatabaseConnection(PeerPersistenceDbUrl)
|
||||||
|
|
||||||
|
?peer_store_sqlite_migrations.migrate(db.get())
|
||||||
|
|
||||||
|
let res = WakuPeerStorage.new(db.get())
|
||||||
|
if res.isErr():
|
||||||
|
return err("failed to init peer store" & res.error)
|
||||||
|
|
||||||
|
ok(some(res.value))
|
||||||
|
|
||||||
|
proc setupPeerPersistence*(app: var App): AppResult[void] =
|
||||||
|
if not app.conf.peerPersistence:
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
let peerStoreRes = setupPeerStorage()
|
||||||
|
if peerStoreRes.isErr():
|
||||||
|
return err("failed to setup peer store" & peerStoreRes.error)
|
||||||
|
|
||||||
|
app.peerStore = peerStoreRes.get()
|
||||||
|
|
||||||
|
ok()
|
||||||
|
|
||||||
|
|
||||||
|
## Waku archive
|
||||||
|
|
||||||
|
proc gatherSqlitePageStats(db: SqliteDatabase): AppResult[(int64, int64, int64)] =
|
||||||
|
let
|
||||||
|
pageSize = ?db.getPageSize()
|
||||||
|
pageCount = ?db.getPageCount()
|
||||||
|
freelistCount = ?db.getFreelistCount()
|
||||||
|
|
||||||
|
ok((pageSize, pageCount, freelistCount))
|
||||||
|
|
||||||
|
proc performSqliteVacuum(db: SqliteDatabase): AppResult[void] =
|
||||||
|
## SQLite database vacuuming
|
||||||
|
# TODO: Run vacuuming conditionally based on database page stats
|
||||||
|
# if (pageCount > 0 and freelistCount > 0):
|
||||||
|
|
||||||
|
debug "starting sqlite database vacuuming"
|
||||||
|
|
||||||
|
let resVacuum = db.vacuum()
|
||||||
|
if resVacuum.isErr():
|
||||||
|
return err("failed to execute vacuum: " & resVacuum.error)
|
||||||
|
|
||||||
|
debug "finished sqlite database vacuuming"
|
||||||
|
|
||||||
|
proc setupWakuArchiveRetentionPolicy(retentionPolicy: string): AppResult[Option[RetentionPolicy]] =
|
||||||
|
if retentionPolicy == "" or retentionPolicy == "none":
|
||||||
|
return ok(none(RetentionPolicy))
|
||||||
|
|
||||||
|
let rententionPolicyParts = retentionPolicy.split(":", 1)
|
||||||
|
let
|
||||||
|
policy = rententionPolicyParts[0]
|
||||||
|
policyArgs = rententionPolicyParts[1]
|
||||||
|
|
||||||
|
|
||||||
|
if policy == "time":
|
||||||
|
var retentionTimeSeconds: int64
|
||||||
|
try:
|
||||||
|
retentionTimeSeconds = parseInt(policyArgs)
|
||||||
|
except ValueError:
|
||||||
|
return err("invalid time retention policy argument")
|
||||||
|
|
||||||
|
let retPolicy: RetentionPolicy = TimeRetentionPolicy.init(retentionTimeSeconds)
|
||||||
|
return ok(some(retPolicy))
|
||||||
|
|
||||||
|
elif policy == "capacity":
|
||||||
|
var retentionCapacity: int
|
||||||
|
try:
|
||||||
|
retentionCapacity = parseInt(policyArgs)
|
||||||
|
except ValueError:
|
||||||
|
return err("invalid capacity retention policy argument")
|
||||||
|
|
||||||
|
let retPolicy: RetentionPolicy = CapacityRetentionPolicy.init(retentionCapacity)
|
||||||
|
return ok(some(retPolicy))
|
||||||
|
|
||||||
|
else:
|
||||||
|
return err("unknown retention policy")
|
||||||
|
|
||||||
|
proc setupWakuArchiveDriver(dbUrl: string, vacuum: bool, migrate: bool): AppResult[ArchiveDriver] =
|
||||||
|
let db = ?setupDatabaseConnection(dbUrl)
|
||||||
|
|
||||||
|
if db.isSome():
|
||||||
|
# SQLite vacuum
|
||||||
|
# TODO: Run this only if the database engine is SQLite
|
||||||
|
let (pageSize, pageCount, freelistCount) = ?gatherSqlitePageStats(db.get())
|
||||||
|
debug "sqlite database page stats", pageSize=pageSize, pages=pageCount, freePages=freelistCount
|
||||||
|
|
||||||
|
if vacuum and (pageCount > 0 and freelistCount > 0):
|
||||||
|
?performSqliteVacuum(db.get())
|
||||||
|
|
||||||
|
# Database migration
|
||||||
|
if migrate:
|
||||||
|
?archive_driver_sqlite_migrations.migrate(db.get())
|
||||||
|
|
||||||
|
if db.isSome():
|
||||||
|
debug "setting up sqlite waku archive driver"
|
||||||
|
let res = SqliteDriver.new(db.get())
|
||||||
|
if res.isErr():
|
||||||
|
return err("failed to init sqlite archive driver: " & res.error)
|
||||||
|
|
||||||
|
ok(res.value)
|
||||||
|
|
||||||
|
else:
|
||||||
|
debug "setting up in-memory waku archive driver"
|
||||||
|
let driver = QueueDriver.new() # Defaults to a capacity of 25.000 messages
|
||||||
|
ok(driver)
|
||||||
|
|
||||||
|
proc setupWakuArchive*(app: var App): AppResult[void] =
|
||||||
|
if not app.conf.store:
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
# Message storage
|
||||||
|
let dbUrlValidationRes = validateDbUrl(app.conf.storeMessageDbUrl)
|
||||||
|
if dbUrlValidationRes.isErr():
|
||||||
|
return err("failed to configure the message store database connection: " & dbUrlValidationRes.error)
|
||||||
|
|
||||||
|
let archiveDriverRes = setupWakuArchiveDriver(dbUrlValidationRes.get(),
|
||||||
|
vacuum = app.conf.storeMessageDbVacuum,
|
||||||
|
migrate = app.conf.storeMessageDbMigration)
|
||||||
|
if archiveDriverRes.isOk():
|
||||||
|
app.archiveDriver = some(archiveDriverRes.get())
|
||||||
|
else:
|
||||||
|
return err("failed to configure archive driver: " & archiveDriverRes.error)
|
||||||
|
|
||||||
|
# Message store retention policy
|
||||||
|
let storeMessageRetentionPolicyRes = validateStoreMessageRetentionPolicy(app.conf.storeMessageRetentionPolicy)
|
||||||
|
if storeMessageRetentionPolicyRes.isErr():
|
||||||
|
return err("failed to configure the message retention policy: " & storeMessageRetentionPolicyRes.error)
|
||||||
|
|
||||||
|
let archiveRetentionPolicyRes = setupWakuArchiveRetentionPolicy(storeMessageRetentionPolicyRes.get())
|
||||||
|
if archiveRetentionPolicyRes.isOk():
|
||||||
|
app.archiveRetentionPolicy = archiveRetentionPolicyRes.get()
|
||||||
|
else:
|
||||||
|
return err("failed to configure the message retention policy: " & archiveRetentionPolicyRes.error)
|
||||||
|
|
||||||
|
# TODO: Move retention policy execution here
|
||||||
|
# if archiveRetentionPolicy.isSome():
|
||||||
|
# executeMessageRetentionPolicy(node)
|
||||||
|
# startMessageRetentionPolicyPeriodicTask(node, interval=WakuArchiveDefaultRetentionPolicyInterval)
|
||||||
|
|
||||||
|
ok()
|
||||||
|
|
||||||
|
## Retrieve dynamic bootstrap nodes (DNS discovery)
|
||||||
|
|
||||||
|
proc retrieveDynamicBootstrapNodes*(dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[ValidIpAddress]): AppResult[seq[RemotePeerInfo]] =
|
||||||
|
|
||||||
|
if dnsDiscovery and dnsDiscoveryUrl != "":
|
||||||
|
# DNS discovery
|
||||||
|
debug "Discovering nodes using Waku DNS discovery", url=dnsDiscoveryUrl
|
||||||
|
|
||||||
|
var nameServers: seq[TransportAddress]
|
||||||
|
for ip in dnsDiscoveryNameServers:
|
||||||
|
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53
|
||||||
|
|
||||||
|
let dnsResolver = DnsResolver.new(nameServers)
|
||||||
|
|
||||||
|
proc resolver(domain: string): Future[string] {.async, gcsafe.} =
|
||||||
|
trace "resolving", domain=domain
|
||||||
|
let resolved = await dnsResolver.resolveTxt(domain)
|
||||||
|
return resolved[0] # Use only first answer
|
||||||
|
|
||||||
|
var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver)
|
||||||
|
if wakuDnsDiscovery.isOk():
|
||||||
|
return wakuDnsDiscovery.get().findPeers()
|
||||||
|
.mapErr(proc (e: cstring): string = $e)
|
||||||
|
else:
|
||||||
|
warn "Failed to init Waku DNS discovery"
|
||||||
|
|
||||||
|
debug "No method for retrieving dynamic bootstrap nodes specified."
|
||||||
|
ok(newSeq[RemotePeerInfo]()) # Return an empty seq by default
|
||||||
|
|
||||||
|
proc setupDyamicBootstrapNodes*(app: var App): AppResult[void] =
|
||||||
|
let dynamicBootstrapNodesRes = retrieveDynamicBootstrapNodes(app.conf.dnsDiscovery,
|
||||||
|
app.conf.dnsDiscoveryUrl,
|
||||||
|
app.conf.dnsDiscoveryNameServers)
|
||||||
|
if dynamicBootstrapNodesRes.isOk():
|
||||||
|
app.dynamicBootstrapNodes = dynamicBootstrapNodesRes.get()
|
||||||
|
else:
|
||||||
|
warn "2/7 Retrieving dynamic bootstrap nodes failed. Continuing without dynamic bootstrap nodes.", error=dynamicBootstrapNodesRes.error
|
||||||
|
|
||||||
|
ok()
|
||||||
|
|
||||||
|
|
||||||
|
## Init waku node instance
|
||||||
|
|
||||||
|
proc setupNat(natConf, clientId: string, tcpPort, udpPort: Port):
|
||||||
|
AppResult[tuple[ip: Option[ValidIpAddress], tcpPort: Option[Port], udpPort: Option[Port]]] {.gcsafe.} =
|
||||||
|
|
||||||
|
let strategy = case natConf.toLowerAscii():
|
||||||
|
of "any": NatAny
|
||||||
|
of "none": NatNone
|
||||||
|
of "upnp": NatUpnp
|
||||||
|
of "pmp": NatPmp
|
||||||
|
else: NatNone
|
||||||
|
|
||||||
|
var endpoint: tuple[ip: Option[ValidIpAddress], tcpPort: Option[Port], udpPort: Option[Port]]
|
||||||
|
|
||||||
|
if strategy != NatNone:
|
||||||
|
let extIp = getExternalIP(strategy)
|
||||||
|
if extIP.isSome():
|
||||||
|
endpoint.ip = some(ValidIpAddress.init(extIp.get()))
|
||||||
|
# RedirectPorts in considered a gcsafety violation
|
||||||
|
# because it obtains the address of a non-gcsafe proc?
|
||||||
|
var extPorts: Option[(Port, Port)]
|
||||||
|
try:
|
||||||
|
extPorts = ({.gcsafe.}: redirectPorts(tcpPort = tcpPort,
|
||||||
|
udpPort = udpPort,
|
||||||
|
description = clientId))
|
||||||
|
except CatchableError:
|
||||||
|
# TODO: nat.nim Error: can raise an unlisted exception: Exception. Isolate here for now.
|
||||||
|
error "unable to determine external ports"
|
||||||
|
extPorts = none((Port, Port))
|
||||||
|
|
||||||
|
if extPorts.isSome():
|
||||||
|
let (extTcpPort, extUdpPort) = extPorts.get()
|
||||||
|
endpoint.tcpPort = some(extTcpPort)
|
||||||
|
endpoint.udpPort = some(extUdpPort)
|
||||||
|
|
||||||
|
else: # NatNone
|
||||||
|
if not natConf.startsWith("extip:"):
|
||||||
|
return err("not a valid NAT mechanism: " & $natConf)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# any required port redirection is assumed to be done by hand
|
||||||
|
endpoint.ip = some(ValidIpAddress.init(natConf[6..^1]))
|
||||||
|
except ValueError:
|
||||||
|
return err("not a valid IP address: " & $natConf[6..^1])
|
||||||
|
|
||||||
|
return ok(endpoint)
|
||||||
|
|
||||||
|
proc initNode(conf: WakuNodeConf,
|
||||||
|
rng: ref HmacDrbgContext,
|
||||||
|
peerStore: Option[WakuPeerStorage],
|
||||||
|
dynamicBootstrapNodes: openArray[RemotePeerInfo] = @[]): AppResult[WakuNode] =
|
||||||
|
|
||||||
|
## Setup a basic Waku v2 node based on a supplied configuration
|
||||||
|
## file. Optionally include persistent peer storage.
|
||||||
|
## No protocols are mounted yet.
|
||||||
|
|
||||||
|
var dnsResolver: DnsResolver
|
||||||
|
if conf.dnsAddrs:
|
||||||
|
# Support for DNS multiaddrs
|
||||||
|
var nameServers: seq[TransportAddress]
|
||||||
|
for ip in conf.dnsAddrsNameServers:
|
||||||
|
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53
|
||||||
|
|
||||||
|
dnsResolver = DnsResolver.new(nameServers)
|
||||||
|
|
||||||
|
let
|
||||||
|
nodekey = if conf.nodekey.isSome():
|
||||||
|
conf.nodekey.get()
|
||||||
|
else:
|
||||||
|
let nodekeyRes = crypto.PrivateKey.random(Secp256k1, rng[])
|
||||||
|
if nodekeyRes.isErr():
|
||||||
|
return err("failed to generate nodekey: " & $nodekeyRes.error)
|
||||||
|
nodekeyRes.get()
|
||||||
|
|
||||||
|
|
||||||
|
## `udpPort` is only supplied to satisfy underlying APIs but is not
|
||||||
|
## actually a supported transport for libp2p traffic.
|
||||||
|
let udpPort = conf.tcpPort
|
||||||
|
let natRes = setupNat(conf.nat, clientId,
|
||||||
|
Port(uint16(conf.tcpPort) + conf.portsShift),
|
||||||
|
Port(uint16(udpPort) + conf.portsShift))
|
||||||
|
if natRes.isErr():
|
||||||
|
return err("failed to setup NAT: " & $natRes.error)
|
||||||
|
|
||||||
|
let (extIp, extTcpPort, _) = natRes.get()
|
||||||
|
|
||||||
|
|
||||||
|
let
|
||||||
|
dns4DomainName = if conf.dns4DomainName != "": some(conf.dns4DomainName)
|
||||||
|
else: none(string)
|
||||||
|
|
||||||
|
discv5UdpPort = if conf.discv5Discovery: some(Port(uint16(conf.discv5UdpPort) + conf.portsShift))
|
||||||
|
else: none(Port)
|
||||||
|
|
||||||
|
## TODO: the NAT setup assumes a manual port mapping configuration if extIp config is set. This probably
|
||||||
|
## implies adding manual config item for extPort as well. The following heuristic assumes that, in absence of manual
|
||||||
|
## config, the external port is the same as the bind port.
|
||||||
|
extPort = if (extIp.isSome() or dns4DomainName.isSome()) and extTcpPort.isNone():
|
||||||
|
some(Port(uint16(conf.tcpPort) + conf.portsShift))
|
||||||
|
else:
|
||||||
|
extTcpPort
|
||||||
|
extMultiAddrs = if (conf.extMultiAddrs.len > 0):
|
||||||
|
let extMultiAddrsValidationRes = validateExtMultiAddrs(conf.extMultiAddrs)
|
||||||
|
if extMultiAddrsValidationRes.isErr():
|
||||||
|
return err("invalid external multiaddress: " & extMultiAddrsValidationRes.error)
|
||||||
|
else:
|
||||||
|
extMultiAddrsValidationRes.get()
|
||||||
|
else:
|
||||||
|
@[]
|
||||||
|
|
||||||
|
wakuFlags = CapabilitiesBitfield.init(
|
||||||
|
lightpush = conf.lightpush,
|
||||||
|
filter = conf.filter,
|
||||||
|
store = conf.store,
|
||||||
|
relay = conf.relay
|
||||||
|
)
|
||||||
|
|
||||||
|
var node: WakuNode
|
||||||
|
|
||||||
|
let pStorage = if peerStore.isNone(): nil
|
||||||
|
else: peerStore.get()
|
||||||
|
|
||||||
|
let rng = crypto.newRng()
|
||||||
|
# Wrap in none because NetConfig does not have a default constructor
|
||||||
|
# TODO: We could change bindIp in NetConfig to be something less restrictive than ValidIpAddress,
|
||||||
|
# which doesn't allow default construction
|
||||||
|
let netConfigRes = NetConfig.init(
|
||||||
|
bindIp = conf.listenAddress,
|
||||||
|
bindPort = Port(uint16(conf.tcpPort) + conf.portsShift),
|
||||||
|
extIp = extIp,
|
||||||
|
extPort = extPort,
|
||||||
|
extMultiAddrs = extMultiAddrs,
|
||||||
|
wsBindPort = Port(uint16(conf.websocketPort) + conf.portsShift),
|
||||||
|
wsEnabled = conf.websocketSupport,
|
||||||
|
wssEnabled = conf.websocketSecureSupport,
|
||||||
|
dns4DomainName = dns4DomainName,
|
||||||
|
discv5UdpPort = discv5UdpPort,
|
||||||
|
wakuFlags = some(wakuFlags),
|
||||||
|
)
|
||||||
|
if netConfigRes.isErr():
|
||||||
|
return err("failed to create net config instance: " & netConfigRes.error)
|
||||||
|
|
||||||
|
let netConfig = netConfigRes.get()
|
||||||
|
var wakuDiscv5 = none(WakuDiscoveryV5)
|
||||||
|
|
||||||
|
if conf.discv5Discovery:
|
||||||
|
let dynamicBootstrapEnrs = dynamicBootstrapNodes
|
||||||
|
.filterIt(it.hasUdpPort())
|
||||||
|
.mapIt(it.enr.get())
|
||||||
|
var discv5BootstrapEnrs: seq[enr.Record]
|
||||||
|
# parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq
|
||||||
|
for enrUri in conf.discv5BootstrapNodes:
|
||||||
|
addBootstrapNode(enrUri, discv5BootstrapEnrs)
|
||||||
|
discv5BootstrapEnrs.add(dynamicBootstrapEnrs)
|
||||||
|
let discv5Config = DiscoveryConfig.init(conf.discv5TableIpLimit,
|
||||||
|
conf.discv5BucketIpLimit,
|
||||||
|
conf.discv5BitsPerHop)
|
||||||
|
try:
|
||||||
|
wakuDiscv5 = some(WakuDiscoveryV5.new(
|
||||||
|
extIp = netConfig.extIp,
|
||||||
|
extTcpPort = netConfig.extPort,
|
||||||
|
extUdpPort = netConfig.discv5UdpPort,
|
||||||
|
bindIp = netConfig.bindIp,
|
||||||
|
discv5UdpPort = netConfig.discv5UdpPort.get(),
|
||||||
|
bootstrapEnrs = discv5BootstrapEnrs,
|
||||||
|
enrAutoUpdate = conf.discv5EnrAutoUpdate,
|
||||||
|
privateKey = keys.PrivateKey(nodekey.skkey),
|
||||||
|
flags = netConfig.wakuFlags.get(),
|
||||||
|
multiaddrs = netConfig.enrMultiaddrs,
|
||||||
|
rng = rng,
|
||||||
|
discv5Config = discv5Config,
|
||||||
|
))
|
||||||
|
except CatchableError:
|
||||||
|
return err("failed to create waku discv5 instance: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
# Build waku node instance
|
||||||
|
var builder = WakuNodeBuilder.init()
|
||||||
|
builder.withRng(rng)
|
||||||
|
builder.withNodeKey(nodekey)
|
||||||
|
builder.withNetworkConfiguration(netConfig)
|
||||||
|
builder.withPeerStorage(pStorage, capacity = conf.peerStoreCapacity)
|
||||||
|
builder.withSwitchConfiguration(
|
||||||
|
maxConnections = some(conf.maxConnections.int),
|
||||||
|
secureKey = some(conf.websocketSecureKeyPath),
|
||||||
|
secureCert = some(conf.websocketSecureCertPath),
|
||||||
|
nameResolver = dnsResolver,
|
||||||
|
sendSignedPeerRecord = conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled
|
||||||
|
agentString = some(conf.agentString)
|
||||||
|
)
|
||||||
|
builder.withWakuDiscv5(wakuDiscv5.get(nil))
|
||||||
|
|
||||||
|
node = ? builder.build().mapErr(proc (err: string): string = "failed to create waku node instance: " & err)
|
||||||
|
|
||||||
|
ok(node)
|
||||||
|
|
||||||
|
proc setupWakuNode*(app: var App): AppResult[void] =
|
||||||
|
## Waku node
|
||||||
|
let initNodeRes = initNode(app.conf, app.rng, app.peerStore, app.dynamicBootstrapNodes)
|
||||||
|
if initNodeRes.isErr():
|
||||||
|
return err("failed to init node: " & initNodeRes.error)
|
||||||
|
|
||||||
|
app.node = initNodeRes.get()
|
||||||
|
ok()
|
||||||
|
|
||||||
|
|
||||||
|
## Mount protocols
|
||||||
|
|
||||||
|
proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
|
||||||
|
archiveDriver: Option[ArchiveDriver],
|
||||||
|
archiveRetentionPolicy: Option[RetentionPolicy]): Future[AppResult[void]] {.async.} =
|
||||||
|
## Setup configured protocols on an existing Waku v2 node.
|
||||||
|
## Optionally include persistent message storage.
|
||||||
|
## No protocols are started yet.
|
||||||
|
|
||||||
|
# Mount relay on all nodes
|
||||||
|
var peerExchangeHandler = none(RoutingRecordsHandler)
|
||||||
|
if conf.relayPeerExchange:
|
||||||
|
proc handlePeerExchange(peer: PeerId, topic: string,
|
||||||
|
peers: seq[RoutingRecordsPair]) {.gcsafe.} =
|
||||||
|
## Handle peers received via gossipsub peer exchange
|
||||||
|
# TODO: Only consider peers on pubsub topics we subscribe to
|
||||||
|
let exchangedPeers = peers.filterIt(it.record.isSome()) # only peers with populated records
|
||||||
|
.mapIt(toRemotePeerInfo(it.record.get()))
|
||||||
|
|
||||||
|
debug "connecting to exchanged peers", src=peer, topic=topic, numPeers=exchangedPeers.len
|
||||||
|
|
||||||
|
# asyncSpawn, as we don't want to block here
|
||||||
|
asyncSpawn node.connectToNodes(exchangedPeers, "peer exchange")
|
||||||
|
|
||||||
|
peerExchangeHandler = some(handlePeerExchange)
|
||||||
|
|
||||||
|
if conf.relay:
|
||||||
|
try:
|
||||||
|
let pubsubTopics = conf.topics.split(" ")
|
||||||
|
await mountRelay(node, pubsubTopics, peerExchangeHandler = peerExchangeHandler)
|
||||||
|
except CatchableError:
|
||||||
|
return err("failed to mount waku relay protocol: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
# TODO: Get this from cli
|
||||||
|
var topicsPublicKeys = initTable[string, SkPublicKey]()
|
||||||
|
# Add validation keys to protected topics
|
||||||
|
for topic, publicKey in topicsPublicKeys.pairs:
|
||||||
|
info "routing only signed traffic", topic=topic, publicKey=publicKey
|
||||||
|
node.wakuRelay.addSignedTopicValidator(Pubsubtopic(topic), publicKey)
|
||||||
|
|
||||||
|
|
||||||
|
# Keepalive mounted on all nodes
|
||||||
|
try:
|
||||||
|
await mountLibp2pPing(node)
|
||||||
|
except CatchableError:
|
||||||
|
return err("failed to mount libp2p ping protocol: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
when defined(rln):
|
||||||
|
if conf.rlnRelay:
|
||||||
|
|
||||||
|
let rlnConf = WakuRlnConfig(
|
||||||
|
rlnRelayDynamic: conf.rlnRelayDynamic,
|
||||||
|
rlnRelayPubsubTopic: conf.rlnRelayPubsubTopic,
|
||||||
|
rlnRelayContentTopic: conf.rlnRelayContentTopic,
|
||||||
|
rlnRelayMembershipIndex: some(conf.rlnRelayMembershipIndex),
|
||||||
|
rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress,
|
||||||
|
rlnRelayEthClientAddress: conf.rlnRelayEthClientAddress,
|
||||||
|
rlnRelayEthAccountPrivateKey: conf.rlnRelayEthAccountPrivateKey,
|
||||||
|
rlnRelayEthAccountAddress: conf.rlnRelayEthAccountAddress,
|
||||||
|
rlnRelayCredPath: conf.rlnRelayCredPath,
|
||||||
|
rlnRelayCredentialsPassword: conf.rlnRelayCredentialsPassword
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await node.mountRlnRelay(rlnConf)
|
||||||
|
except CatchableError:
|
||||||
|
return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
if conf.store:
|
||||||
|
# Archive setup
|
||||||
|
let messageValidator: MessageValidator = DefaultMessageValidator()
|
||||||
|
mountArchive(node, archiveDriver, messageValidator=some(messageValidator), retentionPolicy=archiveRetentionPolicy)
|
||||||
|
|
||||||
|
# Store setup
|
||||||
|
try:
|
||||||
|
await mountStore(node)
|
||||||
|
except CatchableError:
|
||||||
|
return err("failed to mount waku store protocol: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
# TODO: Move this to storage setup phase
|
||||||
|
if archiveRetentionPolicy.isSome():
|
||||||
|
executeMessageRetentionPolicy(node)
|
||||||
|
startMessageRetentionPolicyPeriodicTask(node, interval=WakuArchiveDefaultRetentionPolicyInterval)
|
||||||
|
|
||||||
|
mountStoreClient(node)
|
||||||
|
if conf.storenode != "":
|
||||||
|
let storeNode = parsePeerInfo(conf.storenode)
|
||||||
|
if storeNode.isOk():
|
||||||
|
node.peerManager.addServicePeer(storeNode.value, WakuStoreCodec)
|
||||||
|
else:
|
||||||
|
return err("failed to set node waku store peer: " & storeNode.error)
|
||||||
|
|
||||||
|
# NOTE Must be mounted after relay
|
||||||
|
if conf.lightpush:
|
||||||
|
try:
|
||||||
|
await mountLightPush(node)
|
||||||
|
except CatchableError:
|
||||||
|
return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
if conf.lightpushnode != "":
|
||||||
|
let lightPushNode = parsePeerInfo(conf.lightpushnode)
|
||||||
|
if lightPushNode.isOk():
|
||||||
|
mountLightPushClient(node)
|
||||||
|
node.peerManager.addServicePeer(lightPushNode.value, WakuLightPushCodec)
|
||||||
|
else:
|
||||||
|
return err("failed to set node waku lightpush peer: " & lightPushNode.error)
|
||||||
|
|
||||||
|
# Filter setup. NOTE Must be mounted after relay
|
||||||
|
if conf.filter:
|
||||||
|
try:
|
||||||
|
await mountFilter(node, filterTimeout = chronos.seconds(conf.filterTimeout))
|
||||||
|
except CatchableError:
|
||||||
|
return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
if conf.filternode != "":
|
||||||
|
let filterNode = parsePeerInfo(conf.filternode)
|
||||||
|
if filterNode.isOk():
|
||||||
|
await mountFilterClient(node)
|
||||||
|
node.peerManager.addServicePeer(filterNode.value, WakuFilterCodec)
|
||||||
|
else:
|
||||||
|
return err("failed to set node waku filter peer: " & filterNode.error)
|
||||||
|
|
||||||
|
# waku peer exchange setup
|
||||||
|
if conf.peerExchangeNode != "" or conf.peerExchange:
|
||||||
|
try:
|
||||||
|
await mountPeerExchange(node)
|
||||||
|
except CatchableError:
|
||||||
|
return err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
if conf.peerExchangeNode != "":
|
||||||
|
let peerExchangeNode = parsePeerInfo(conf.peerExchangeNode)
|
||||||
|
if peerExchangeNode.isOk():
|
||||||
|
node.peerManager.addServicePeer(peerExchangeNode.value, WakuPeerExchangeCodec)
|
||||||
|
else:
|
||||||
|
return err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error)
|
||||||
|
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
proc setupAndMountProtocols*(app: App): Future[AppResult[void]] {.async.} =
|
||||||
|
return await setupProtocols(
|
||||||
|
app.node,
|
||||||
|
app.conf,
|
||||||
|
app.archiveDriver,
|
||||||
|
app.archiveRetentionPolicy
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
## Start node
|
||||||
|
|
||||||
|
proc startNode(node: WakuNode, conf: WakuNodeConf,
|
||||||
|
dynamicBootstrapNodes: seq[RemotePeerInfo] = @[]): Future[AppResult[void]] {.async.} =
|
||||||
|
## Start a configured node and all mounted protocols.
|
||||||
|
## Connect to static nodes and start
|
||||||
|
## keep-alive, if configured.
|
||||||
|
|
||||||
|
# Start Waku v2 node
|
||||||
|
try:
|
||||||
|
await node.start()
|
||||||
|
except CatchableError:
|
||||||
|
return err("failed to start waku node: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
# Start discv5 and connect to discovered nodes
|
||||||
|
if conf.discv5Discovery:
|
||||||
|
try:
|
||||||
|
if not await node.startDiscv5():
|
||||||
|
error "could not start Discovery v5"
|
||||||
|
except CatchableError:
|
||||||
|
return err("failed to start waku discovery v5: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
# Connect to configured static nodes
|
||||||
|
if conf.staticnodes.len > 0:
|
||||||
|
try:
|
||||||
|
await connectToNodes(node, conf.staticnodes, "static")
|
||||||
|
except CatchableError:
|
||||||
|
return err("failed to connect to static nodes: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
if dynamicBootstrapNodes.len > 0:
|
||||||
|
info "Connecting to dynamic bootstrap peers"
|
||||||
|
try:
|
||||||
|
await connectToNodes(node, dynamicBootstrapNodes, "dynamic bootstrap")
|
||||||
|
except CatchableError:
|
||||||
|
return err("failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
# retrieve px peers and add the to the peer store
|
||||||
|
if conf.peerExchangeNode != "":
|
||||||
|
let desiredOutDegree = node.wakuRelay.parameters.d.uint64()
|
||||||
|
await node.fetchPeerExchangePeers(desiredOutDegree)
|
||||||
|
|
||||||
|
# Start keepalive, if enabled
|
||||||
|
if conf.keepAlive:
|
||||||
|
node.startKeepalive()
|
||||||
|
|
||||||
|
# Maintain relay connections
|
||||||
|
if conf.relay:
|
||||||
|
node.peerManager.start()
|
||||||
|
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
proc startNode*(app: App): Future[AppResult[void]] {.async.} =
|
||||||
|
return await startNode(
|
||||||
|
app.node,
|
||||||
|
app.conf,
|
||||||
|
app.dynamicBootstrapNodes
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
## Monitoring and external interfaces
|
||||||
|
|
||||||
|
# TODO: Merge the `wakunode_setup_*.nim` files here. Once the encapsulating
|
||||||
|
# type (e.g., App) is implemented. Hold both servers instances to support
|
||||||
|
# a graceful shutdown.
|
||||||
|
import
|
||||||
|
./wakunode2_setup_rpc,
|
||||||
|
./wakunode2_setup_rest
|
||||||
|
|
||||||
|
|
||||||
|
proc startMetricsServer(node: WakuNode, address: ValidIpAddress, port: uint16, portsShift: uint16): AppResult[void] =
|
||||||
|
startMetricsServer(address, Port(port + portsShift))
|
||||||
|
ok()
|
||||||
|
|
||||||
|
proc startMetricsLogging(): AppResult[void] =
|
||||||
|
startMetricsLog()
|
||||||
|
ok()
|
||||||
|
|
||||||
|
proc setupMonitoringAndExternalInterfaces*(app: var App): AppResult[void] =
|
||||||
|
if app.conf.rpc:
|
||||||
|
let startRpcServerRes = startRpcServer(app.node, app.conf.rpcAddress, app.conf.rpcPort, app.conf.portsShift, app.conf)
|
||||||
|
if startRpcServerRes.isErr():
|
||||||
|
error "6/7 Starting JSON-RPC server failed. Continuing in current state.", error=startRpcServerRes.error
|
||||||
|
else:
|
||||||
|
app.rpcServer = some(startRpcServerRes.value)
|
||||||
|
|
||||||
|
if app.conf.rest:
|
||||||
|
let startRestServerRes = startRestServer(app.node, app.conf.restAddress, app.conf.restPort, app.conf.portsShift, app.conf)
|
||||||
|
if startRestServerRes.isErr():
|
||||||
|
error "6/7 Starting REST server failed. Continuing in current state.", error=startRestServerRes.error
|
||||||
|
else:
|
||||||
|
app.restServer = some(startRestServerRes.value)
|
||||||
|
|
||||||
|
|
||||||
|
if app.conf.metricsServer:
|
||||||
|
let startMetricsServerRes = startMetricsServer(app.node, app.conf.metricsServerAddress, app.conf.metricsServerPort, app.conf.portsShift)
|
||||||
|
if startMetricsServerRes.isErr():
|
||||||
|
error "6/7 Starting metrics server failed. Continuing in current state.", error=startMetricsServerRes.error
|
||||||
|
|
||||||
|
if app.conf.metricsLogging:
|
||||||
|
let startMetricsLoggingRes = startMetricsLogging()
|
||||||
|
if startMetricsLoggingRes.isErr():
|
||||||
|
error "6/7 Starting metrics console logging failed. Continuing in current state.", error=startMetricsLoggingRes.error
|
||||||
|
|
||||||
|
ok()
|
||||||
|
|
||||||
|
|
||||||
|
# App shutdown
|
||||||
|
|
||||||
|
proc stop*(app: App): Future[void] {.async.} =
|
||||||
|
if app.restServer.isSome():
|
||||||
|
await app.restServer.get().stop()
|
||||||
|
|
||||||
|
if app.rpcServer.isSome():
|
||||||
|
await app.rpcServer.get().stop()
|
||||||
|
|
||||||
|
if not app.node.isNil():
|
||||||
|
await app.node.stop()
|
||||||
@ -4,629 +4,21 @@ else:
|
|||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[options, tables, strutils, sequtils, os],
|
std/[options, strutils, os],
|
||||||
stew/shims/net as stewNet,
|
stew/shims/net as stewNet,
|
||||||
chronicles,
|
chronicles,
|
||||||
chronos,
|
chronos,
|
||||||
metrics,
|
metrics,
|
||||||
libbacktrace,
|
libbacktrace,
|
||||||
system/ansi_c,
|
system/ansi_c,
|
||||||
eth/keys,
|
libp2p/crypto/crypto
|
||||||
eth/net/nat,
|
|
||||||
eth/p2p/discoveryv5/enr,
|
|
||||||
libp2p/builders,
|
|
||||||
libp2p/multihash,
|
|
||||||
libp2p/crypto/crypto,
|
|
||||||
libp2p/protocols/ping,
|
|
||||||
libp2p/protocols/pubsub/gossipsub,
|
|
||||||
libp2p/protocols/pubsub/rpc/messages,
|
|
||||||
libp2p/transports/wstransport,
|
|
||||||
libp2p/nameresolving/dnsresolver
|
|
||||||
import
|
import
|
||||||
../../waku/common/sqlite,
|
|
||||||
../../waku/common/logging,
|
../../waku/common/logging,
|
||||||
../../waku/v2/node/peer_manager,
|
./config,
|
||||||
../../waku/v2/node/peer_manager/peer_store/waku_peer_storage,
|
./app
|
||||||
../../waku/v2/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
|
|
||||||
../../waku/v2/waku_core,
|
|
||||||
../../waku/v2/waku_node,
|
|
||||||
../../waku/v2/node/waku_metrics,
|
|
||||||
../../waku/v2/waku_archive,
|
|
||||||
../../waku/v2/waku_archive/driver/queue_driver,
|
|
||||||
../../waku/v2/waku_archive/driver/sqlite_driver,
|
|
||||||
../../waku/v2/waku_archive/driver/sqlite_driver/migrations as archive_driver_sqlite_migrations,
|
|
||||||
../../waku/v2/waku_archive/retention_policy,
|
|
||||||
../../waku/v2/waku_archive/retention_policy/retention_policy_capacity,
|
|
||||||
../../waku/v2/waku_archive/retention_policy/retention_policy_time,
|
|
||||||
../../waku/v2/waku_store,
|
|
||||||
../../waku/v2/waku_filter,
|
|
||||||
../../waku/v2/waku_lightpush,
|
|
||||||
../../waku/v2/waku_enr,
|
|
||||||
../../waku/v2/waku_dnsdisc,
|
|
||||||
../../waku/v2/waku_discv5,
|
|
||||||
../../waku/v2/waku_peer_exchange,
|
|
||||||
../../waku/v2/waku_relay/validators,
|
|
||||||
./wakunode2_setup_rest,
|
|
||||||
./wakunode2_setup_rpc,
|
|
||||||
./config
|
|
||||||
|
|
||||||
when defined(rln):
|
|
||||||
import
|
|
||||||
../../waku/v2/waku_rln_relay
|
|
||||||
|
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "wakunode"
|
topics = "wakunode main"
|
||||||
|
|
||||||
|
|
||||||
type SetupResult[T] = Result[T, string]
|
|
||||||
|
|
||||||
|
|
||||||
proc setupDatabaseConnection(dbUrl: string): SetupResult[Option[SqliteDatabase]] =
|
|
||||||
## dbUrl mimics SQLAlchemy Database URL schema
|
|
||||||
## See: https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls
|
|
||||||
if dbUrl == "" or dbUrl == "none":
|
|
||||||
return ok(none(SqliteDatabase))
|
|
||||||
|
|
||||||
let dbUrlParts = dbUrl.split("://", 1)
|
|
||||||
let
|
|
||||||
engine = dbUrlParts[0]
|
|
||||||
path = dbUrlParts[1]
|
|
||||||
|
|
||||||
let connRes = case engine
|
|
||||||
of "sqlite":
|
|
||||||
# SQLite engine
|
|
||||||
# See: https://docs.sqlalchemy.org/en/14/core/engines.html#sqlite
|
|
||||||
SqliteDatabase.new(path)
|
|
||||||
|
|
||||||
else:
|
|
||||||
return err("unknown database engine")
|
|
||||||
|
|
||||||
if connRes.isErr():
|
|
||||||
return err("failed to init database connection: " & connRes.error)
|
|
||||||
|
|
||||||
ok(some(connRes.value))
|
|
||||||
|
|
||||||
proc gatherSqlitePageStats(db: SqliteDatabase): SetupResult[(int64, int64, int64)] =
|
|
||||||
let
|
|
||||||
pageSize = ?db.getPageSize()
|
|
||||||
pageCount = ?db.getPageCount()
|
|
||||||
freelistCount = ?db.getFreelistCount()
|
|
||||||
|
|
||||||
ok((pageSize, pageCount, freelistCount))
|
|
||||||
|
|
||||||
proc performSqliteVacuum(db: SqliteDatabase): SetupResult[void] =
|
|
||||||
## SQLite database vacuuming
|
|
||||||
# TODO: Run vacuuming conditionally based on database page stats
|
|
||||||
# if (pageCount > 0 and freelistCount > 0):
|
|
||||||
|
|
||||||
debug "starting sqlite database vacuuming"
|
|
||||||
|
|
||||||
let resVacuum = db.vacuum()
|
|
||||||
if resVacuum.isErr():
|
|
||||||
return err("failed to execute vacuum: " & resVacuum.error)
|
|
||||||
|
|
||||||
debug "finished sqlite database vacuuming"
|
|
||||||
|
|
||||||
|
|
||||||
const PeerPersistenceDbUrl = "sqlite://peers.db"
|
|
||||||
|
|
||||||
proc setupPeerStorage(): SetupResult[Option[WakuPeerStorage]] =
|
|
||||||
let db = ?setupDatabaseConnection(PeerPersistenceDbUrl)
|
|
||||||
|
|
||||||
?peer_store_sqlite_migrations.migrate(db.get())
|
|
||||||
|
|
||||||
let res = WakuPeerStorage.new(db.get())
|
|
||||||
if res.isErr():
|
|
||||||
return err("failed to init peer store" & res.error)
|
|
||||||
|
|
||||||
ok(some(res.value))
|
|
||||||
|
|
||||||
|
|
||||||
proc setupWakuArchiveRetentionPolicy(retentionPolicy: string): SetupResult[Option[RetentionPolicy]] =
|
|
||||||
if retentionPolicy == "" or retentionPolicy == "none":
|
|
||||||
return ok(none(RetentionPolicy))
|
|
||||||
|
|
||||||
let rententionPolicyParts = retentionPolicy.split(":", 1)
|
|
||||||
let
|
|
||||||
policy = rententionPolicyParts[0]
|
|
||||||
policyArgs = rententionPolicyParts[1]
|
|
||||||
|
|
||||||
|
|
||||||
if policy == "time":
|
|
||||||
var retentionTimeSeconds: int64
|
|
||||||
try:
|
|
||||||
retentionTimeSeconds = parseInt(policyArgs)
|
|
||||||
except ValueError:
|
|
||||||
return err("invalid time retention policy argument")
|
|
||||||
|
|
||||||
let retPolicy: RetentionPolicy = TimeRetentionPolicy.init(retentionTimeSeconds)
|
|
||||||
return ok(some(retPolicy))
|
|
||||||
|
|
||||||
elif policy == "capacity":
|
|
||||||
var retentionCapacity: int
|
|
||||||
try:
|
|
||||||
retentionCapacity = parseInt(policyArgs)
|
|
||||||
except ValueError:
|
|
||||||
return err("invalid capacity retention policy argument")
|
|
||||||
|
|
||||||
let retPolicy: RetentionPolicy = CapacityRetentionPolicy.init(retentionCapacity)
|
|
||||||
return ok(some(retPolicy))
|
|
||||||
|
|
||||||
else:
|
|
||||||
return err("unknown retention policy")
|
|
||||||
|
|
||||||
proc setupWakuArchiveDriver(dbUrl: string, vacuum: bool, migrate: bool): SetupResult[ArchiveDriver] =
|
|
||||||
let db = ?setupDatabaseConnection(dbUrl)
|
|
||||||
|
|
||||||
if db.isSome():
|
|
||||||
# SQLite vacuum
|
|
||||||
# TODO: Run this only if the database engine is SQLite
|
|
||||||
let (pageSize, pageCount, freelistCount) = ?gatherSqlitePageStats(db.get())
|
|
||||||
debug "sqlite database page stats", pageSize=pageSize, pages=pageCount, freePages=freelistCount
|
|
||||||
|
|
||||||
if vacuum and (pageCount > 0 and freelistCount > 0):
|
|
||||||
?performSqliteVacuum(db.get())
|
|
||||||
|
|
||||||
# Database migration
|
|
||||||
if migrate:
|
|
||||||
?archive_driver_sqlite_migrations.migrate(db.get())
|
|
||||||
|
|
||||||
if db.isSome():
|
|
||||||
debug "setting up sqlite waku archive driver"
|
|
||||||
let res = SqliteDriver.new(db.get())
|
|
||||||
if res.isErr():
|
|
||||||
return err("failed to init sqlite archive driver: " & res.error)
|
|
||||||
|
|
||||||
ok(res.value)
|
|
||||||
|
|
||||||
else:
|
|
||||||
debug "setting up in-memory waku archive driver"
|
|
||||||
let driver = QueueDriver.new() # Defaults to a capacity of 25.000 messages
|
|
||||||
ok(driver)
|
|
||||||
|
|
||||||
|
|
||||||
proc retrieveDynamicBootstrapNodes*(dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[ValidIpAddress]): SetupResult[seq[RemotePeerInfo]] =
|
|
||||||
|
|
||||||
if dnsDiscovery and dnsDiscoveryUrl != "":
|
|
||||||
# DNS discovery
|
|
||||||
debug "Discovering nodes using Waku DNS discovery", url=dnsDiscoveryUrl
|
|
||||||
|
|
||||||
var nameServers: seq[TransportAddress]
|
|
||||||
for ip in dnsDiscoveryNameServers:
|
|
||||||
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53
|
|
||||||
|
|
||||||
let dnsResolver = DnsResolver.new(nameServers)
|
|
||||||
|
|
||||||
proc resolver(domain: string): Future[string] {.async, gcsafe.} =
|
|
||||||
trace "resolving", domain=domain
|
|
||||||
let resolved = await dnsResolver.resolveTxt(domain)
|
|
||||||
return resolved[0] # Use only first answer
|
|
||||||
|
|
||||||
var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver)
|
|
||||||
if wakuDnsDiscovery.isOk():
|
|
||||||
return wakuDnsDiscovery.get().findPeers()
|
|
||||||
.mapErr(proc (e: cstring): string = $e)
|
|
||||||
else:
|
|
||||||
warn "Failed to init Waku DNS discovery"
|
|
||||||
|
|
||||||
debug "No method for retrieving dynamic bootstrap nodes specified."
|
|
||||||
ok(newSeq[RemotePeerInfo]()) # Return an empty seq by default
|
|
||||||
|
|
||||||
proc setupNat(natConf, clientId: string, tcpPort, udpPort: Port):
|
|
||||||
SetupResult[tuple[ip: Option[ValidIpAddress], tcpPort: Option[Port], udpPort: Option[Port]]] {.gcsafe.} =
|
|
||||||
|
|
||||||
let strategy = case natConf.toLowerAscii():
|
|
||||||
of "any": NatAny
|
|
||||||
of "none": NatNone
|
|
||||||
of "upnp": NatUpnp
|
|
||||||
of "pmp": NatPmp
|
|
||||||
else: NatNone
|
|
||||||
|
|
||||||
var endpoint: tuple[ip: Option[ValidIpAddress], tcpPort: Option[Port], udpPort: Option[Port]]
|
|
||||||
|
|
||||||
if strategy != NatNone:
|
|
||||||
let extIp = getExternalIP(strategy)
|
|
||||||
if extIP.isSome():
|
|
||||||
endpoint.ip = some(ValidIpAddress.init(extIp.get()))
|
|
||||||
# RedirectPorts in considered a gcsafety violation
|
|
||||||
# because it obtains the address of a non-gcsafe proc?
|
|
||||||
var extPorts: Option[(Port, Port)]
|
|
||||||
try:
|
|
||||||
extPorts = ({.gcsafe.}: redirectPorts(tcpPort = tcpPort,
|
|
||||||
udpPort = udpPort,
|
|
||||||
description = clientId))
|
|
||||||
except CatchableError:
|
|
||||||
# TODO: nat.nim Error: can raise an unlisted exception: Exception. Isolate here for now.
|
|
||||||
error "unable to determine external ports"
|
|
||||||
extPorts = none((Port, Port))
|
|
||||||
|
|
||||||
if extPorts.isSome():
|
|
||||||
let (extTcpPort, extUdpPort) = extPorts.get()
|
|
||||||
endpoint.tcpPort = some(extTcpPort)
|
|
||||||
endpoint.udpPort = some(extUdpPort)
|
|
||||||
|
|
||||||
else: # NatNone
|
|
||||||
if not natConf.startsWith("extip:"):
|
|
||||||
return err("not a valid NAT mechanism: " & $natConf)
|
|
||||||
|
|
||||||
try:
|
|
||||||
# any required port redirection is assumed to be done by hand
|
|
||||||
endpoint.ip = some(ValidIpAddress.init(natConf[6..^1]))
|
|
||||||
except ValueError:
|
|
||||||
return err("not a valid IP address: " & $natConf[6..^1])
|
|
||||||
|
|
||||||
return ok(endpoint)
|
|
||||||
|
|
||||||
proc initNode(conf: WakuNodeConf,
|
|
||||||
rng: ref HmacDrbgContext,
|
|
||||||
peerStore: Option[WakuPeerStorage],
|
|
||||||
dynamicBootstrapNodes: openArray[RemotePeerInfo] = @[]): SetupResult[WakuNode] =
|
|
||||||
|
|
||||||
## Setup a basic Waku v2 node based on a supplied configuration
|
|
||||||
## file. Optionally include persistent peer storage.
|
|
||||||
## No protocols are mounted yet.
|
|
||||||
|
|
||||||
var dnsResolver: DnsResolver
|
|
||||||
if conf.dnsAddrs:
|
|
||||||
# Support for DNS multiaddrs
|
|
||||||
var nameServers: seq[TransportAddress]
|
|
||||||
for ip in conf.dnsAddrsNameServers:
|
|
||||||
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53
|
|
||||||
|
|
||||||
dnsResolver = DnsResolver.new(nameServers)
|
|
||||||
|
|
||||||
let
|
|
||||||
nodekey = if conf.nodekey.isSome():
|
|
||||||
conf.nodekey.get()
|
|
||||||
else:
|
|
||||||
let nodekeyRes = crypto.PrivateKey.random(Secp256k1, rng[])
|
|
||||||
if nodekeyRes.isErr():
|
|
||||||
return err("failed to generate nodekey: " & $nodekeyRes.error)
|
|
||||||
nodekeyRes.get()
|
|
||||||
|
|
||||||
|
|
||||||
## `udpPort` is only supplied to satisfy underlying APIs but is not
|
|
||||||
## actually a supported transport for libp2p traffic.
|
|
||||||
let udpPort = conf.tcpPort
|
|
||||||
let natRes = setupNat(conf.nat, clientId,
|
|
||||||
Port(uint16(conf.tcpPort) + conf.portsShift),
|
|
||||||
Port(uint16(udpPort) + conf.portsShift))
|
|
||||||
if natRes.isErr():
|
|
||||||
return err("failed to setup NAT: " & $natRes.error)
|
|
||||||
|
|
||||||
let (extIp, extTcpPort, _) = natRes.get()
|
|
||||||
|
|
||||||
|
|
||||||
let
|
|
||||||
dns4DomainName = if conf.dns4DomainName != "": some(conf.dns4DomainName)
|
|
||||||
else: none(string)
|
|
||||||
|
|
||||||
discv5UdpPort = if conf.discv5Discovery: some(Port(uint16(conf.discv5UdpPort) + conf.portsShift))
|
|
||||||
else: none(Port)
|
|
||||||
|
|
||||||
## @TODO: the NAT setup assumes a manual port mapping configuration if extIp config is set. This probably
|
|
||||||
## implies adding manual config item for extPort as well. The following heuristic assumes that, in absence of manual
|
|
||||||
## config, the external port is the same as the bind port.
|
|
||||||
extPort = if (extIp.isSome() or dns4DomainName.isSome()) and extTcpPort.isNone():
|
|
||||||
some(Port(uint16(conf.tcpPort) + conf.portsShift))
|
|
||||||
else:
|
|
||||||
extTcpPort
|
|
||||||
extMultiAddrs = if (conf.extMultiAddrs.len > 0):
|
|
||||||
let extMultiAddrsValidationRes = validateExtMultiAddrs(conf.extMultiAddrs)
|
|
||||||
if extMultiAddrsValidationRes.isErr():
|
|
||||||
return err("invalid external multiaddress: " & extMultiAddrsValidationRes.error)
|
|
||||||
else:
|
|
||||||
extMultiAddrsValidationRes.get()
|
|
||||||
else:
|
|
||||||
@[]
|
|
||||||
|
|
||||||
wakuFlags = CapabilitiesBitfield.init(
|
|
||||||
lightpush = conf.lightpush,
|
|
||||||
filter = conf.filter,
|
|
||||||
store = conf.store,
|
|
||||||
relay = conf.relay
|
|
||||||
)
|
|
||||||
|
|
||||||
var node: WakuNode
|
|
||||||
|
|
||||||
let pStorage = if peerStore.isNone(): nil
|
|
||||||
else: peerStore.get()
|
|
||||||
|
|
||||||
let rng = crypto.newRng()
|
|
||||||
# Wrap in none because NetConfig does not have a default constructor
|
|
||||||
# TODO: We could change bindIp in NetConfig to be something less restrictive than ValidIpAddress,
|
|
||||||
# which doesn't allow default construction
|
|
||||||
let netConfigRes = NetConfig.init(
|
|
||||||
bindIp = conf.listenAddress,
|
|
||||||
bindPort = Port(uint16(conf.tcpPort) + conf.portsShift),
|
|
||||||
extIp = extIp,
|
|
||||||
extPort = extPort,
|
|
||||||
extMultiAddrs = extMultiAddrs,
|
|
||||||
wsBindPort = Port(uint16(conf.websocketPort) + conf.portsShift),
|
|
||||||
wsEnabled = conf.websocketSupport,
|
|
||||||
wssEnabled = conf.websocketSecureSupport,
|
|
||||||
dns4DomainName = dns4DomainName,
|
|
||||||
discv5UdpPort = discv5UdpPort,
|
|
||||||
wakuFlags = some(wakuFlags),
|
|
||||||
)
|
|
||||||
if netConfigRes.isErr():
|
|
||||||
return err("failed to create net config instance: " & netConfigRes.error)
|
|
||||||
|
|
||||||
let netConfig = netConfigRes.get()
|
|
||||||
var wakuDiscv5 = none(WakuDiscoveryV5)
|
|
||||||
|
|
||||||
if conf.discv5Discovery:
|
|
||||||
let dynamicBootstrapEnrs = dynamicBootstrapNodes
|
|
||||||
.filterIt(it.hasUdpPort())
|
|
||||||
.mapIt(it.enr.get())
|
|
||||||
var discv5BootstrapEnrs: seq[enr.Record]
|
|
||||||
# parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq
|
|
||||||
for enrUri in conf.discv5BootstrapNodes:
|
|
||||||
addBootstrapNode(enrUri, discv5BootstrapEnrs)
|
|
||||||
discv5BootstrapEnrs.add(dynamicBootstrapEnrs)
|
|
||||||
let discv5Config = DiscoveryConfig.init(conf.discv5TableIpLimit,
|
|
||||||
conf.discv5BucketIpLimit,
|
|
||||||
conf.discv5BitsPerHop)
|
|
||||||
try:
|
|
||||||
wakuDiscv5 = some(WakuDiscoveryV5.new(
|
|
||||||
extIp = netConfig.extIp,
|
|
||||||
extTcpPort = netConfig.extPort,
|
|
||||||
extUdpPort = netConfig.discv5UdpPort,
|
|
||||||
bindIp = netConfig.bindIp,
|
|
||||||
discv5UdpPort = netConfig.discv5UdpPort.get(),
|
|
||||||
bootstrapEnrs = discv5BootstrapEnrs,
|
|
||||||
enrAutoUpdate = conf.discv5EnrAutoUpdate,
|
|
||||||
privateKey = keys.PrivateKey(nodekey.skkey),
|
|
||||||
flags = netConfig.wakuFlags.get(),
|
|
||||||
multiaddrs = netConfig.enrMultiaddrs,
|
|
||||||
rng = rng,
|
|
||||||
discv5Config = discv5Config,
|
|
||||||
))
|
|
||||||
except CatchableError:
|
|
||||||
return err("failed to create waku discv5 instance: " & getCurrentExceptionMsg())
|
|
||||||
|
|
||||||
# Build waku node instance
|
|
||||||
var builder = WakuNodeBuilder.init()
|
|
||||||
builder.withRng(rng)
|
|
||||||
builder.withNodeKey(nodekey)
|
|
||||||
builder.withNetworkConfiguration(netConfig)
|
|
||||||
builder.withPeerStorage(pStorage, capacity = conf.peerStoreCapacity)
|
|
||||||
builder.withSwitchConfiguration(
|
|
||||||
maxConnections = some(conf.maxConnections.int),
|
|
||||||
secureKey = some(conf.websocketSecureKeyPath),
|
|
||||||
secureCert = some(conf.websocketSecureCertPath),
|
|
||||||
nameResolver = dnsResolver,
|
|
||||||
sendSignedPeerRecord = conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled
|
|
||||||
agentString = some(conf.agentString)
|
|
||||||
)
|
|
||||||
builder.withWakuDiscv5(wakuDiscv5.get(nil))
|
|
||||||
|
|
||||||
node = ? builder.build().mapErr(proc (err: string): string = "failed to create waku node instance: " & err)
|
|
||||||
|
|
||||||
ok(node)
|
|
||||||
|
|
||||||
proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
|
|
||||||
archiveDriver: Option[ArchiveDriver],
|
|
||||||
archiveRetentionPolicy: Option[RetentionPolicy]): Future[SetupResult[void]] {.async.} =
|
|
||||||
## Setup configured protocols on an existing Waku v2 node.
|
|
||||||
## Optionally include persistent message storage.
|
|
||||||
## No protocols are started yet.
|
|
||||||
|
|
||||||
# Mount relay on all nodes
|
|
||||||
var peerExchangeHandler = none(RoutingRecordsHandler)
|
|
||||||
if conf.relayPeerExchange:
|
|
||||||
proc handlePeerExchange(peer: PeerId, topic: string,
|
|
||||||
peers: seq[RoutingRecordsPair]) {.gcsafe.} =
|
|
||||||
## Handle peers received via gossipsub peer exchange
|
|
||||||
# TODO: Only consider peers on pubsub topics we subscribe to
|
|
||||||
let exchangedPeers = peers.filterIt(it.record.isSome()) # only peers with populated records
|
|
||||||
.mapIt(toRemotePeerInfo(it.record.get()))
|
|
||||||
|
|
||||||
debug "connecting to exchanged peers", src=peer, topic=topic, numPeers=exchangedPeers.len
|
|
||||||
|
|
||||||
# asyncSpawn, as we don't want to block here
|
|
||||||
asyncSpawn node.connectToNodes(exchangedPeers, "peer exchange")
|
|
||||||
|
|
||||||
peerExchangeHandler = some(handlePeerExchange)
|
|
||||||
|
|
||||||
if conf.relay:
|
|
||||||
try:
|
|
||||||
let pubsubTopics = conf.topics.split(" ")
|
|
||||||
await mountRelay(node, pubsubTopics, peerExchangeHandler = peerExchangeHandler)
|
|
||||||
except CatchableError:
|
|
||||||
return err("failed to mount waku relay protocol: " & getCurrentExceptionMsg())
|
|
||||||
|
|
||||||
# TODO: Get this from cli
|
|
||||||
var topicsPublicKeys = initTable[string, SkPublicKey]()
|
|
||||||
# Add validation keys to protected topics
|
|
||||||
for topic, publicKey in topicsPublicKeys.pairs:
|
|
||||||
info "routing only signed traffic", topic=topic, publicKey=publicKey
|
|
||||||
node.wakuRelay.addSignedTopicValidator(Pubsubtopic(topic), publicKey)
|
|
||||||
|
|
||||||
|
|
||||||
# Keepalive mounted on all nodes
|
|
||||||
try:
|
|
||||||
await mountLibp2pPing(node)
|
|
||||||
except CatchableError:
|
|
||||||
return err("failed to mount libp2p ping protocol: " & getCurrentExceptionMsg())
|
|
||||||
|
|
||||||
when defined(rln):
|
|
||||||
if conf.rlnRelay:
|
|
||||||
|
|
||||||
let rlnConf = WakuRlnConfig(
|
|
||||||
rlnRelayDynamic: conf.rlnRelayDynamic,
|
|
||||||
rlnRelayPubsubTopic: conf.rlnRelayPubsubTopic,
|
|
||||||
rlnRelayContentTopic: conf.rlnRelayContentTopic,
|
|
||||||
rlnRelayMembershipIndex: some(conf.rlnRelayMembershipIndex),
|
|
||||||
rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress,
|
|
||||||
rlnRelayEthClientAddress: conf.rlnRelayEthClientAddress,
|
|
||||||
rlnRelayEthAccountPrivateKey: conf.rlnRelayEthAccountPrivateKey,
|
|
||||||
rlnRelayEthAccountAddress: conf.rlnRelayEthAccountAddress,
|
|
||||||
rlnRelayCredPath: conf.rlnRelayCredPath,
|
|
||||||
rlnRelayCredentialsPassword: conf.rlnRelayCredentialsPassword
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
await node.mountRlnRelay(rlnConf)
|
|
||||||
except CatchableError:
|
|
||||||
return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg())
|
|
||||||
|
|
||||||
if conf.store:
|
|
||||||
# Archive setup
|
|
||||||
let messageValidator: MessageValidator = DefaultMessageValidator()
|
|
||||||
mountArchive(node, archiveDriver, messageValidator=some(messageValidator), retentionPolicy=archiveRetentionPolicy)
|
|
||||||
|
|
||||||
# Store setup
|
|
||||||
try:
|
|
||||||
await mountStore(node)
|
|
||||||
except CatchableError:
|
|
||||||
return err("failed to mount waku store protocol: " & getCurrentExceptionMsg())
|
|
||||||
|
|
||||||
# TODO: Move this to storage setup phase
|
|
||||||
if archiveRetentionPolicy.isSome():
|
|
||||||
executeMessageRetentionPolicy(node)
|
|
||||||
startMessageRetentionPolicyPeriodicTask(node, interval=WakuArchiveDefaultRetentionPolicyInterval)
|
|
||||||
|
|
||||||
mountStoreClient(node)
|
|
||||||
if conf.storenode != "":
|
|
||||||
let storeNode = parsePeerInfo(conf.storenode)
|
|
||||||
if storeNode.isOk():
|
|
||||||
node.peerManager.addServicePeer(storeNode.value, WakuStoreCodec)
|
|
||||||
else:
|
|
||||||
return err("failed to set node waku store peer: " & storeNode.error)
|
|
||||||
|
|
||||||
# NOTE Must be mounted after relay
|
|
||||||
if conf.lightpush:
|
|
||||||
try:
|
|
||||||
await mountLightPush(node)
|
|
||||||
except CatchableError:
|
|
||||||
return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg())
|
|
||||||
|
|
||||||
if conf.lightpushnode != "":
|
|
||||||
let lightPushNode = parsePeerInfo(conf.lightpushnode)
|
|
||||||
if lightPushNode.isOk():
|
|
||||||
mountLightPushClient(node)
|
|
||||||
node.peerManager.addServicePeer(lightPushNode.value, WakuLightPushCodec)
|
|
||||||
else:
|
|
||||||
return err("failed to set node waku lightpush peer: " & lightPushNode.error)
|
|
||||||
|
|
||||||
# Filter setup. NOTE Must be mounted after relay
|
|
||||||
if conf.filter:
|
|
||||||
try:
|
|
||||||
await mountFilter(node, filterTimeout = chronos.seconds(conf.filterTimeout))
|
|
||||||
except CatchableError:
|
|
||||||
return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg())
|
|
||||||
|
|
||||||
if conf.filternode != "":
|
|
||||||
let filterNode = parsePeerInfo(conf.filternode)
|
|
||||||
if filterNode.isOk():
|
|
||||||
await mountFilterClient(node)
|
|
||||||
node.peerManager.addServicePeer(filterNode.value, WakuFilterCodec)
|
|
||||||
else:
|
|
||||||
return err("failed to set node waku filter peer: " & filterNode.error)
|
|
||||||
|
|
||||||
# waku peer exchange setup
|
|
||||||
if (conf.peerExchangeNode != "") or (conf.peerExchange):
|
|
||||||
try:
|
|
||||||
await mountPeerExchange(node)
|
|
||||||
except CatchableError:
|
|
||||||
return err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg())
|
|
||||||
|
|
||||||
if conf.peerExchangeNode != "":
|
|
||||||
let peerExchangeNode = parsePeerInfo(conf.peerExchangeNode)
|
|
||||||
if peerExchangeNode.isOk():
|
|
||||||
node.peerManager.addServicePeer(peerExchangeNode.value, WakuPeerExchangeCodec)
|
|
||||||
else:
|
|
||||||
return err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error)
|
|
||||||
|
|
||||||
return ok()
|
|
||||||
|
|
||||||
proc startNode(node: WakuNode, conf: WakuNodeConf,
|
|
||||||
dynamicBootstrapNodes: seq[RemotePeerInfo] = @[]): Future[SetupResult[void]] {.async.} =
|
|
||||||
## Start a configured node and all mounted protocols.
|
|
||||||
## Connect to static nodes and start
|
|
||||||
## keep-alive, if configured.
|
|
||||||
|
|
||||||
# Start Waku v2 node
|
|
||||||
try:
|
|
||||||
await node.start()
|
|
||||||
except CatchableError:
|
|
||||||
return err("failed to start waku node: " & getCurrentExceptionMsg())
|
|
||||||
|
|
||||||
# Start discv5 and connect to discovered nodes
|
|
||||||
if conf.discv5Discovery:
|
|
||||||
try:
|
|
||||||
if not await node.startDiscv5():
|
|
||||||
error "could not start Discovery v5"
|
|
||||||
except CatchableError:
|
|
||||||
return err("failed to start waku discovery v5: " & getCurrentExceptionMsg())
|
|
||||||
|
|
||||||
# Connect to configured static nodes
|
|
||||||
if conf.staticnodes.len > 0:
|
|
||||||
try:
|
|
||||||
await connectToNodes(node, conf.staticnodes, "static")
|
|
||||||
except CatchableError:
|
|
||||||
return err("failed to connect to static nodes: " & getCurrentExceptionMsg())
|
|
||||||
|
|
||||||
if dynamicBootstrapNodes.len > 0:
|
|
||||||
info "Connecting to dynamic bootstrap peers"
|
|
||||||
try:
|
|
||||||
await connectToNodes(node, dynamicBootstrapNodes, "dynamic bootstrap")
|
|
||||||
except CatchableError:
|
|
||||||
return err("failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg())
|
|
||||||
|
|
||||||
# retrieve px peers and add the to the peer store
|
|
||||||
if conf.peerExchangeNode != "":
|
|
||||||
let desiredOutDegree = node.wakuRelay.parameters.d.uint64()
|
|
||||||
await node.fetchPeerExchangePeers(desiredOutDegree)
|
|
||||||
|
|
||||||
# Start keepalive, if enabled
|
|
||||||
if conf.keepAlive:
|
|
||||||
node.startKeepalive()
|
|
||||||
|
|
||||||
# Maintain relay connections
|
|
||||||
if conf.relay:
|
|
||||||
node.peerManager.start()
|
|
||||||
|
|
||||||
return ok()
|
|
||||||
|
|
||||||
when defined(waku_exp_store_resume):
|
|
||||||
proc resumeMessageStore(node: WakuNode, address: string): Future[SetupResult[void]] {.async.} =
|
|
||||||
# Resume historical messages, this has to be called after the node has been started
|
|
||||||
if address != "":
|
|
||||||
return err("empty peer multiaddres")
|
|
||||||
|
|
||||||
let remotePeer = parsePeerInfo(address)
|
|
||||||
if remotePeer.isErr():
|
|
||||||
return err("invalid peer multiaddress: " & remotePeer.error)
|
|
||||||
|
|
||||||
try:
|
|
||||||
await node.resume(some(@[remotePeer.value]))
|
|
||||||
except CatchableError:
|
|
||||||
return err("failed to resume messages history: " & getCurrentExceptionMsg())
|
|
||||||
|
|
||||||
|
|
||||||
proc startRpcServer(node: WakuNode, address: ValidIpAddress, port: uint16, portsShift: uint16, conf: WakuNodeConf): SetupResult[void] =
|
|
||||||
try:
|
|
||||||
startRpcServer(node, address, Port(port + portsShift), conf)
|
|
||||||
except CatchableError:
|
|
||||||
return err("failed to start the json-rpc server: " & getCurrentExceptionMsg())
|
|
||||||
|
|
||||||
ok()
|
|
||||||
|
|
||||||
proc startRestServer(node: WakuNode, address: ValidIpAddress, port: uint16, portsShift: uint16, conf: WakuNodeConf): SetupResult[void] =
|
|
||||||
startRestServer(node, address, Port(port + portsShift), conf)
|
|
||||||
ok()
|
|
||||||
|
|
||||||
proc startMetricsServer(node: WakuNode, address: ValidIpAddress, port: uint16, portsShift: uint16): SetupResult[void] =
|
|
||||||
startMetricsServer(address, Port(port + portsShift))
|
|
||||||
ok()
|
|
||||||
|
|
||||||
|
|
||||||
proc startMetricsLogging(): SetupResult[void] =
|
|
||||||
startMetricsLog()
|
|
||||||
ok()
|
|
||||||
|
|
||||||
|
|
||||||
{.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
|
{.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
|
||||||
@ -639,7 +31,7 @@ when isMainModule:
|
|||||||
## 5. Start monitoring tools and external interfaces
|
## 5. Start monitoring tools and external interfaces
|
||||||
## 6. Setup graceful shutdown hooks
|
## 6. Setup graceful shutdown hooks
|
||||||
|
|
||||||
const versionString = "version / git commit hash: " & git_version
|
const versionString = "version / git commit hash: " & app.git_version
|
||||||
let rng = crypto.newRng()
|
let rng = crypto.newRng()
|
||||||
|
|
||||||
let confRes = WakuNodeConf.load(version=versionString)
|
let confRes = WakuNodeConf.load(version=versionString)
|
||||||
@ -659,6 +51,8 @@ when isMainModule:
|
|||||||
logging.setupLogFormat(conf.logFormat, color)
|
logging.setupLogFormat(conf.logFormat, color)
|
||||||
|
|
||||||
|
|
||||||
|
var wakunode2 = App.init(rng, conf)
|
||||||
|
|
||||||
##############
|
##############
|
||||||
# Node setup #
|
# Node setup #
|
||||||
##############
|
##############
|
||||||
@ -666,122 +60,57 @@ when isMainModule:
|
|||||||
debug "1/7 Setting up storage"
|
debug "1/7 Setting up storage"
|
||||||
|
|
||||||
## Peer persistence
|
## Peer persistence
|
||||||
var peerStore = none(WakuPeerStorage)
|
let res1 = wakunode2.setupPeerPersistence()
|
||||||
|
if res1.isErr():
|
||||||
if conf.peerPersistence:
|
error "1/7 Setting up storage failed", error=res1.error
|
||||||
let peerStoreRes = setupPeerStorage();
|
quit(QuitFailure)
|
||||||
if peerStoreRes.isOk():
|
|
||||||
peerStore = peerStoreRes.get()
|
|
||||||
else:
|
|
||||||
error "failed to setup peer store", error=peerStoreRes.error
|
|
||||||
waku_node_errors.inc(labelValues = ["init_store_failure"])
|
|
||||||
|
|
||||||
## Waku archive
|
## Waku archive
|
||||||
var archiveDriver = none(ArchiveDriver)
|
let res2 = wakunode2.setupWakuArchive()
|
||||||
var archiveRetentionPolicy = none(RetentionPolicy)
|
if res2.isErr():
|
||||||
|
error "1/7 Setting up storage failed (waku archive)", error=res2.error
|
||||||
if conf.store:
|
|
||||||
# Message storage
|
|
||||||
let dbUrlValidationRes = validateDbUrl(conf.storeMessageDbUrl)
|
|
||||||
if dbUrlValidationRes.isErr():
|
|
||||||
error "failed to configure the message store database connection", error=dbUrlValidationRes.error
|
|
||||||
quit(QuitFailure)
|
quit(QuitFailure)
|
||||||
|
|
||||||
let archiveDriverRes = setupWakuArchiveDriver(dbUrlValidationRes.get(), vacuum=conf.storeMessageDbVacuum, migrate=conf.storeMessageDbMigration)
|
|
||||||
if archiveDriverRes.isOk():
|
|
||||||
archiveDriver = some(archiveDriverRes.get())
|
|
||||||
else:
|
|
||||||
error "failed to configure archive driver", error=archiveDriverRes.error
|
|
||||||
quit(QuitFailure)
|
|
||||||
|
|
||||||
# Message store retention policy
|
|
||||||
let storeMessageRetentionPolicyRes = validateStoreMessageRetentionPolicy(conf.storeMessageRetentionPolicy)
|
|
||||||
if storeMessageRetentionPolicyRes.isErr():
|
|
||||||
error "invalid store message retention policy configuration", error=storeMessageRetentionPolicyRes.error
|
|
||||||
quit(QuitFailure)
|
|
||||||
|
|
||||||
let archiveRetentionPolicyRes = setupWakuArchiveRetentionPolicy(storeMessageRetentionPolicyRes.get())
|
|
||||||
if archiveRetentionPolicyRes.isOk():
|
|
||||||
archiveRetentionPolicy = archiveRetentionPolicyRes.get()
|
|
||||||
else:
|
|
||||||
error "failed to configure the message retention policy", error=archiveRetentionPolicyRes.error
|
|
||||||
quit(QuitFailure)
|
|
||||||
|
|
||||||
# TODO: Move retention policy execution here
|
|
||||||
# if archiveRetentionPolicy.isSome():
|
|
||||||
# executeMessageRetentionPolicy(node)
|
|
||||||
# startMessageRetentionPolicyPeriodicTask(node, interval=WakuArchiveDefaultRetentionPolicyInterval)
|
|
||||||
|
|
||||||
|
|
||||||
debug "2/7 Retrieve dynamic bootstrap nodes"
|
debug "2/7 Retrieve dynamic bootstrap nodes"
|
||||||
|
|
||||||
var dynamicBootstrapNodes: seq[RemotePeerInfo]
|
let res3 = wakunode2.setupDyamicBootstrapNodes()
|
||||||
let dynamicBootstrapNodesRes = retrieveDynamicBootstrapNodes(conf.dnsDiscovery, conf.dnsDiscoveryUrl, conf.dnsDiscoveryNameServers)
|
if res3.isErr():
|
||||||
if dynamicBootstrapNodesRes.isOk():
|
error "2/7 Retrieving dynamic bootstrap nodes failed", error=res3.error
|
||||||
dynamicBootstrapNodes = dynamicBootstrapNodesRes.get()
|
quit(QuitFailure)
|
||||||
else:
|
|
||||||
warn "2/7 Retrieving dynamic bootstrap nodes failed. Continuing without dynamic bootstrap nodes.", error=dynamicBootstrapNodesRes.error
|
|
||||||
|
|
||||||
debug "3/7 Initializing node"
|
debug "3/7 Initializing node"
|
||||||
|
|
||||||
var node: WakuNode # This is the node we're going to setup using the conf
|
let res4 = wakunode2.setupWakuNode()
|
||||||
|
if res4.isErr():
|
||||||
let initNodeRes = initNode(conf, rng, peerStore, dynamicBootstrapNodes)
|
error "3/7 Initializing node failed", error=res4.error
|
||||||
if initNodeRes.isok():
|
|
||||||
node = initNodeRes.get()
|
|
||||||
else:
|
|
||||||
error "3/7 Initializing node failed. Quitting.", error=initNodeRes.error
|
|
||||||
quit(QuitFailure)
|
quit(QuitFailure)
|
||||||
|
|
||||||
debug "4/7 Mounting protocols"
|
debug "4/7 Mounting protocols"
|
||||||
|
|
||||||
let setupProtocolsRes = waitFor setupProtocols(node, conf, archiveDriver, archiveRetentionPolicy)
|
let res5 = waitFor wakunode2.setupAndMountProtocols()
|
||||||
if setupProtocolsRes.isErr():
|
if res5.isErr():
|
||||||
error "4/7 Mounting protocols failed. Continuing in current state.", error=setupProtocolsRes.error
|
error "4/7 Mounting protocols failed", error=res5.error
|
||||||
|
quit(QuitFailure)
|
||||||
|
|
||||||
debug "5/7 Starting node and mounted protocols"
|
debug "5/7 Starting node and mounted protocols"
|
||||||
|
|
||||||
let startNodeRes = waitFor startNode(node, conf, dynamicBootstrapNodes)
|
let res6 = waitFor wakunode2.startNode()
|
||||||
if startNodeRes.isErr():
|
if res6.isErr():
|
||||||
error "5/7 Starting node and mounted protocols failed. Continuing in current state.", error=startNodeRes.error
|
error "5/7 Starting node and protocols failed", error=res6.error
|
||||||
|
quit(QuitFailure)
|
||||||
|
|
||||||
when defined(waku_exp_store_resume):
|
|
||||||
# Resume message store on boot
|
|
||||||
if conf.storeResumePeer != "":
|
|
||||||
let resumeMessageStoreRes = waitFor resumeMessageStore(node, conf.storeResumePeer)
|
|
||||||
if resumeMessageStoreRes.isErr():
|
|
||||||
error "failed to resume message store from peer node. Continuing in current state", error=resumeMessageStoreRes.error
|
|
||||||
|
|
||||||
|
|
||||||
debug "6/7 Starting monitoring and external interfaces"
|
debug "6/7 Starting monitoring and external interfaces"
|
||||||
|
|
||||||
if conf.rpc:
|
let res7 = wakunode2.setupMonitoringAndExternalInterfaces()
|
||||||
let startRpcServerRes = startRpcServer(node, conf.rpcAddress, conf.rpcPort, conf.portsShift, conf)
|
if res7.isErr():
|
||||||
if startRpcServerRes.isErr():
|
error "6/7 Starting monitoring and external interfaces failed", error=res7.error
|
||||||
error "6/7 Starting JSON-RPC server failed. Continuing in current state.", error=startRpcServerRes.error
|
quit(QuitFailure)
|
||||||
|
|
||||||
if conf.rest:
|
|
||||||
let startRestServerRes = startRestServer(node, conf.restAddress, conf.restPort, conf.portsShift, conf)
|
|
||||||
if startRestServerRes.isErr():
|
|
||||||
error "6/7 Starting REST server failed. Continuing in current state.", error=startRestServerRes.error
|
|
||||||
|
|
||||||
if conf.metricsServer:
|
|
||||||
let startMetricsServerRes = startMetricsServer(node, conf.metricsServerAddress, conf.metricsServerPort, conf.portsShift)
|
|
||||||
if startMetricsServerRes.isErr():
|
|
||||||
error "6/7 Starting metrics server failed. Continuing in current state.", error=startMetricsServerRes.error
|
|
||||||
|
|
||||||
if conf.metricsLogging:
|
|
||||||
let startMetricsLoggingRes = startMetricsLogging()
|
|
||||||
if startMetricsLoggingRes.isErr():
|
|
||||||
error "6/7 Starting metrics console logging failed. Continuing in current state.", error=startMetricsLoggingRes.error
|
|
||||||
|
|
||||||
|
|
||||||
debug "7/7 Setting up shutdown hooks"
|
debug "7/7 Setting up shutdown hooks"
|
||||||
## Setup shutdown hooks for this process.
|
## Setup shutdown hooks for this process.
|
||||||
## Stop node gracefully on shutdown.
|
## Stop node gracefully on shutdown.
|
||||||
|
|
||||||
proc asyncStopper(node: WakuNode) {.async.} =
|
proc asyncStopper(node: App) {.async.} =
|
||||||
await node.stop()
|
await node.stop()
|
||||||
quit(QuitSuccess)
|
quit(QuitSuccess)
|
||||||
|
|
||||||
@ -791,7 +120,7 @@ when isMainModule:
|
|||||||
# workaround for https://github.com/nim-lang/Nim/issues/4057
|
# workaround for https://github.com/nim-lang/Nim/issues/4057
|
||||||
setupForeignThreadGc()
|
setupForeignThreadGc()
|
||||||
notice "Shutting down after receiving SIGINT"
|
notice "Shutting down after receiving SIGINT"
|
||||||
asyncSpawn asyncStopper(node)
|
asyncSpawn asyncStopper(wakunode2)
|
||||||
|
|
||||||
setControlCHook(handleCtrlC)
|
setControlCHook(handleCtrlC)
|
||||||
|
|
||||||
@ -799,7 +128,7 @@ when isMainModule:
|
|||||||
when defined(posix):
|
when defined(posix):
|
||||||
proc handleSigterm(signal: cint) {.noconv.} =
|
proc handleSigterm(signal: cint) {.noconv.} =
|
||||||
notice "Shutting down after receiving SIGTERM"
|
notice "Shutting down after receiving SIGTERM"
|
||||||
asyncSpawn asyncStopper(node)
|
asyncSpawn asyncStopper(wakunode2)
|
||||||
|
|
||||||
c_signal(ansi_c.SIGTERM, handleSigterm)
|
c_signal(ansi_c.SIGTERM, handleSigterm)
|
||||||
|
|
||||||
@ -812,7 +141,7 @@ when isMainModule:
|
|||||||
# Not available in -d:release mode
|
# Not available in -d:release mode
|
||||||
writeStackTrace()
|
writeStackTrace()
|
||||||
|
|
||||||
waitFor node.stop()
|
waitFor wakunode2.stop()
|
||||||
quit(QuitFailure)
|
quit(QuitFailure)
|
||||||
|
|
||||||
c_signal(ansi_c.SIGSEGV, handleSigsegv)
|
c_signal(ansi_c.SIGSEGV, handleSigsegv)
|
||||||
|
|||||||
@ -4,6 +4,7 @@ else:
|
|||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
import
|
import
|
||||||
|
stew/results,
|
||||||
stew/shims/net,
|
stew/shims/net,
|
||||||
chronicles,
|
chronicles,
|
||||||
presto
|
presto
|
||||||
@ -21,13 +22,8 @@ logScope:
|
|||||||
topics = "wakunode rest"
|
topics = "wakunode rest"
|
||||||
|
|
||||||
|
|
||||||
proc startRestServer*(node: WakuNode, address: ValidIpAddress, port: Port, conf: WakuNodeConf) =
|
proc startRestServer(node: WakuNode, address: ValidIpAddress, port: Port, conf: WakuNodeConf): RestServerResult[RestServerRef] =
|
||||||
let serverResult = newRestHttpServer(address, port)
|
let server = ? newRestHttpServer(address, port)
|
||||||
if serverResult.isErr():
|
|
||||||
notice "REST HTTP server could not be started", address = $address&":" & $port, reason = serverResult.error()
|
|
||||||
return
|
|
||||||
|
|
||||||
let server = serverResult.get()
|
|
||||||
|
|
||||||
## Debug REST API
|
## Debug REST API
|
||||||
installDebugApiHandlers(server.router, node)
|
installDebugApiHandlers(server.router, node)
|
||||||
@ -42,3 +38,8 @@ proc startRestServer*(node: WakuNode, address: ValidIpAddress, port: Port, conf:
|
|||||||
|
|
||||||
server.start()
|
server.start()
|
||||||
info "Starting REST HTTP server", url = "http://" & $address & ":" & $port & "/"
|
info "Starting REST HTTP server", url = "http://" & $address & ":" & $port & "/"
|
||||||
|
|
||||||
|
ok(server)
|
||||||
|
|
||||||
|
proc startRestServer*(node: WakuNode, address: ValidIpAddress, port: uint16, portsShift: uint16, conf: WakuNodeConf): RestServerResult[RestServerRef] =
|
||||||
|
return startRestServer(node, address, Port(port + portsShift), conf)
|
||||||
|
|||||||
@ -4,6 +4,7 @@ else:
|
|||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
import
|
import
|
||||||
|
stew/results,
|
||||||
stew/shims/net,
|
stew/shims/net,
|
||||||
chronicles,
|
chronicles,
|
||||||
json_rpc/rpcserver
|
json_rpc/rpcserver
|
||||||
@ -20,13 +21,14 @@ import
|
|||||||
logScope:
|
logScope:
|
||||||
topics = "wakunode jsonrpc"
|
topics = "wakunode jsonrpc"
|
||||||
|
|
||||||
|
proc startRpcServer(node: WakuNode, address: ValidIpAddress, port: Port, conf: WakuNodeConf): Result[RpcHttpServer, string] =
|
||||||
|
let ta = initTAddress(address, port)
|
||||||
|
|
||||||
proc startRpcServer*(node: WakuNode, address: ValidIpAddress, port: Port, conf: WakuNodeConf)
|
var server: RpcHttpServer
|
||||||
{.raises: [CatchableError].} =
|
try:
|
||||||
|
|
||||||
let
|
|
||||||
ta = initTAddress(address, port)
|
|
||||||
server = newRpcHttpServer([ta])
|
server = newRpcHttpServer([ta])
|
||||||
|
except CatchableError:
|
||||||
|
return err("failed to init JSON-RPC server: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
installDebugApiHandlers(node, server)
|
installDebugApiHandlers(node, server)
|
||||||
|
|
||||||
@ -49,3 +51,8 @@ proc startRpcServer*(node: WakuNode, address: ValidIpAddress, port: Port, conf:
|
|||||||
|
|
||||||
server.start()
|
server.start()
|
||||||
info "RPC Server started", address=ta
|
info "RPC Server started", address=ta
|
||||||
|
|
||||||
|
ok(server)
|
||||||
|
|
||||||
|
proc startRpcServer*(node: WakuNode, address: ValidIpAddress, port: uint16, portsShift: uint16, conf: WakuNodeConf): Result[RpcHttpServer, string] =
|
||||||
|
return startRpcServer(node, address, Port(port + portsShift), conf)
|
||||||
|
|||||||
4
tests/all_tests_wakunode2.nim
Normal file
4
tests/all_tests_wakunode2.nim
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
## Wakunode2
|
||||||
|
|
||||||
|
import
|
||||||
|
./wakunode2/test_app
|
||||||
82
tests/wakunode2/test_app.nim
Normal file
82
tests/wakunode2/test_app.nim
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
{.used.}
|
||||||
|
|
||||||
|
import
|
||||||
|
stew/shims/net,
|
||||||
|
testutils/unittests,
|
||||||
|
chronicles,
|
||||||
|
chronos,
|
||||||
|
libp2p/crypto/crypto,
|
||||||
|
libp2p/crypto/secp,
|
||||||
|
libp2p/multiaddress,
|
||||||
|
libp2p/switch
|
||||||
|
import
|
||||||
|
../../apps/wakunode2/config,
|
||||||
|
../../apps/wakunode2/app,
|
||||||
|
../v2/testlib/common,
|
||||||
|
../v2/testlib/wakucore
|
||||||
|
|
||||||
|
|
||||||
|
proc defaultTestWakuNodeConf(): WakuNodeConf =
|
||||||
|
WakuNodeConf(
|
||||||
|
listenAddress: ValidIpAddress.init("127.0.0.1"),
|
||||||
|
rpcAddress: ValidIpAddress.init("127.0.0.1"),
|
||||||
|
restAddress: ValidIpAddress.init("127.0.0.1"),
|
||||||
|
metricsServerAddress: ValidIpAddress.init("127.0.0.1"),
|
||||||
|
nat: "any",
|
||||||
|
maxConnections: 50,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
suite "Wakunode2 - App":
|
||||||
|
test "compilation version should be reported":
|
||||||
|
## Given
|
||||||
|
let conf = defaultTestWakuNodeConf()
|
||||||
|
|
||||||
|
var wakunode2 = App.init(rng(), conf)
|
||||||
|
|
||||||
|
## When
|
||||||
|
let version = wakunode2.version
|
||||||
|
|
||||||
|
## Then
|
||||||
|
check:
|
||||||
|
version == app.git_version
|
||||||
|
|
||||||
|
|
||||||
|
suite "Wakunode2 - App initialization":
|
||||||
|
test "peer persistence setup should be successfully mounted":
|
||||||
|
## Given
|
||||||
|
var conf = defaultTestWakuNodeConf()
|
||||||
|
conf.peerPersistence = true
|
||||||
|
|
||||||
|
var wakunode2 = App.init(rng(), conf)
|
||||||
|
|
||||||
|
## When
|
||||||
|
let res = wakunode2.setupPeerPersistence()
|
||||||
|
|
||||||
|
## Then
|
||||||
|
check res.isOk()
|
||||||
|
|
||||||
|
test "node setup is successful with default configuration":
|
||||||
|
## Given
|
||||||
|
let conf = defaultTestWakuNodeConf()
|
||||||
|
|
||||||
|
## When
|
||||||
|
var wakunode2 = App.init(rng(), conf)
|
||||||
|
require wakunode2.setupPeerPersistence().isOk()
|
||||||
|
require wakunode2.setupWakuArchive().isOk()
|
||||||
|
require wakunode2.setupDyamicBootstrapNodes().isOk()
|
||||||
|
require wakunode2.setupWakuNode().isOk()
|
||||||
|
require isOk(waitFor wakunode2.setupAndMountProtocols())
|
||||||
|
require isOk(waitFor wakunode2.startNode())
|
||||||
|
require wakunode2.setupMonitoringAndExternalInterfaces().isOk()
|
||||||
|
|
||||||
|
## Then
|
||||||
|
let node = wakunode2.node
|
||||||
|
check:
|
||||||
|
not node.isNil()
|
||||||
|
node.wakuArchive.isNil()
|
||||||
|
node.wakuStore.isNil()
|
||||||
|
not node.wakuStoreClient.isNil()
|
||||||
|
|
||||||
|
## Cleanup
|
||||||
|
waitFor wakunode2.stop()
|
||||||
@ -70,7 +70,7 @@ task test1, "Build & run Waku v1 tests":
|
|||||||
|
|
||||||
|
|
||||||
### Waku v2 tasks
|
### Waku v2 tasks
|
||||||
task wakunode2, "Build Waku v2 (experimental) cli node":
|
task wakunode2, "Build Waku v2 cli node":
|
||||||
let name = "wakunode2"
|
let name = "wakunode2"
|
||||||
buildBinary name, "apps/wakunode2/", "-d:chronicles_log_level=TRACE"
|
buildBinary name, "apps/wakunode2/", "-d:chronicles_log_level=TRACE"
|
||||||
|
|
||||||
@ -78,6 +78,13 @@ task bridge, "Build Waku v1 - v2 bridge":
|
|||||||
let name = "wakubridge"
|
let name = "wakubridge"
|
||||||
buildBinary name, "apps/wakubridge/", "-d:chronicles_log_level=TRACE"
|
buildBinary name, "apps/wakubridge/", "-d:chronicles_log_level=TRACE"
|
||||||
|
|
||||||
|
|
||||||
|
task testwakunode2, "Build & run wakunode2 app tests":
|
||||||
|
test "all_tests_wakunode2"
|
||||||
|
|
||||||
|
task testbridge, "Build & run wakubridge tests":
|
||||||
|
test "all_tests_wakubridge"
|
||||||
|
|
||||||
task test2, "Build & run Waku v2 tests":
|
task test2, "Build & run Waku v2 tests":
|
||||||
test "all_tests_v2"
|
test "all_tests_v2"
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user