mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-02-17 20:33:11 +00:00
* Fix peer selection for cases where ENR is not yet advertiesed but metadata exchange already adjusted supported shards. Fix initialization rendezvous protocol with configured and autoshards to let connect to relay nodes without having a valid subscribed shard already. This solves issue for autoshard nodes to connect ahead of subscribing. * Extend peer selection, rendezvous and metadata tests * Fix rendezvous test, fix metadata test failing due wrong setup, added it into all_tests
528 lines
19 KiB
Nim
528 lines
19 KiB
Nim
import
|
|
std/[options, sequtils],
|
|
chronicles,
|
|
chronos,
|
|
libp2p/peerid,
|
|
libp2p/protocols/pubsub/gossipsub,
|
|
libp2p/protocols/connectivity/relay/relay,
|
|
libp2p/nameresolving/dnsresolver,
|
|
libp2p/crypto/crypto
|
|
|
|
import
|
|
./internal_config,
|
|
./networks_config,
|
|
./waku_conf,
|
|
./builder,
|
|
./validator_signed,
|
|
../waku_enr/sharding,
|
|
../waku_node,
|
|
../waku_core,
|
|
../waku_core/codecs,
|
|
../waku_rln_relay,
|
|
../discovery/waku_dnsdisc,
|
|
../waku_archive/retention_policy as policy,
|
|
../waku_archive/retention_policy/builder as policy_builder,
|
|
../waku_archive/driver as driver,
|
|
../waku_archive/driver/builder as driver_builder,
|
|
../waku_archive_legacy/driver as legacy_driver,
|
|
../waku_archive_legacy/driver/builder as legacy_driver_builder,
|
|
../waku_store,
|
|
../waku_store/common as store_common,
|
|
../waku_store_legacy,
|
|
../waku_store_legacy/common as legacy_common,
|
|
../waku_filter_v2,
|
|
../waku_peer_exchange,
|
|
../node/peer_manager,
|
|
../node/peer_manager/peer_store/waku_peer_storage,
|
|
../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
|
|
../waku_lightpush_legacy/common,
|
|
../common/rate_limit/setting,
|
|
../common/databases/dburl
|
|
|
|
## Peer persistence
|
|
|
|
const PeerPersistenceDbUrl = "peers.db"
|
|
proc setupPeerStorage(): Result[Option[WakuPeerStorage], string] =
|
|
let db = ?SqliteDatabase.new(PeerPersistenceDbUrl)
|
|
|
|
?peer_store_sqlite_migrations.migrate(db)
|
|
|
|
let res = WakuPeerStorage.new(db).valueOr:
|
|
return err("failed to init peer store" & error)
|
|
|
|
return ok(some(res))
|
|
|
|
## Init waku node instance
|
|
|
|
proc initNode(
|
|
conf: WakuConf,
|
|
netConfig: NetConfig,
|
|
rng: ref HmacDrbgContext,
|
|
record: enr.Record,
|
|
peerStore: Option[WakuPeerStorage],
|
|
relay: Relay,
|
|
dynamicBootstrapNodes: openArray[RemotePeerInfo] = @[],
|
|
): Result[WakuNode, string] =
|
|
## Setup a basic Waku v2 node based on a supplied configuration
|
|
## file. Optionally include persistent peer storage.
|
|
## No protocols are mounted yet.
|
|
|
|
let pStorage =
|
|
if peerStore.isNone():
|
|
nil
|
|
else:
|
|
peerStore.get()
|
|
|
|
let (secureKey, secureCert) =
|
|
if conf.webSocketConf.isSome() and conf.webSocketConf.get().secureConf.isSome():
|
|
let wssConf = conf.webSocketConf.get().secureConf.get()
|
|
(some(wssConf.keyPath), some(wssConf.certPath))
|
|
else:
|
|
(none(string), none(string))
|
|
|
|
let nameResolver =
|
|
DnsResolver.new(conf.dnsAddrsNameServers.mapIt(initTAddress(it, Port(53))))
|
|
|
|
# Build waku node instance
|
|
var builder = WakuNodeBuilder.init()
|
|
builder.withRng(rng)
|
|
builder.withNodeKey(conf.nodeKey)
|
|
builder.withRecord(record)
|
|
builder.withNetworkConfiguration(netConfig)
|
|
builder.withPeerStorage(pStorage, capacity = conf.peerStoreCapacity)
|
|
builder.withSwitchConfiguration(
|
|
maxConnections = some(conf.maxConnections.int),
|
|
secureKey = secureKey,
|
|
secureCert = secureCert,
|
|
nameResolver = nameResolver,
|
|
sendSignedPeerRecord = conf.relayPeerExchange,
|
|
# We send our own signed peer record when peer exchange enabled
|
|
agentString = some(conf.agentString),
|
|
)
|
|
builder.withColocationLimit(conf.colocationLimit)
|
|
|
|
if conf.maxRelayPeers.isSome():
|
|
let
|
|
maxRelayPeers = conf.maxRelayPeers.get()
|
|
maxConnections = conf.maxConnections
|
|
# Calculate the ratio as percentages
|
|
relayRatio = (maxRelayPeers.float / maxConnections.float) * 100
|
|
serviceRatio = 100 - relayRatio
|
|
|
|
builder.withPeerManagerConfig(
|
|
maxConnections = conf.maxConnections,
|
|
relayServiceRatio = $relayRatio & ":" & $serviceRatio,
|
|
shardAware = conf.relayShardedPeerManagement,
|
|
)
|
|
error "maxRelayPeers is deprecated. It is recommended to use relayServiceRatio instead. If relayServiceRatio is not set, it will be automatically calculated based on maxConnections and maxRelayPeers."
|
|
else:
|
|
builder.withPeerManagerConfig(
|
|
maxConnections = conf.maxConnections,
|
|
relayServiceRatio = conf.relayServiceRatio,
|
|
shardAware = conf.relayShardedPeerManagement,
|
|
)
|
|
builder.withRateLimit(conf.rateLimit)
|
|
builder.withCircuitRelay(relay)
|
|
|
|
let node =
|
|
?builder.build().mapErr(
|
|
proc(err: string): string =
|
|
"failed to create waku node instance: " & err
|
|
)
|
|
|
|
ok(node)
|
|
|
|
## Mount protocols
|
|
|
|
proc getAutoshards*(
|
|
node: WakuNode, contentTopics: seq[string]
|
|
): Result[seq[RelayShard], string] =
|
|
if node.wakuAutoSharding.isNone():
|
|
return err("Static sharding used, cannot get shards from content topics")
|
|
var autoShards: seq[RelayShard]
|
|
for contentTopic in contentTopics:
|
|
let shard = node.wakuAutoSharding.get().getShard(contentTopic).valueOr:
|
|
return err("Could not parse content topic: " & error)
|
|
autoShards.add(shard)
|
|
return ok(autoshards)
|
|
|
|
proc setupProtocols(
|
|
node: WakuNode, conf: WakuConf
|
|
): Future[Result[void, string]] {.async.} =
|
|
## Setup configured protocols on an existing Waku v2 node.
|
|
## Optionally include persistent message storage.
|
|
## No protocols are started yet.
|
|
|
|
var allShards = conf.subscribeShards
|
|
node.mountMetadata(conf.clusterId, allShards).isOkOr:
|
|
return err("failed to mount waku metadata protocol: " & error)
|
|
|
|
var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} =
|
|
## Action to be taken when an internal error occurs during the node run.
|
|
## e.g. the connection with the database is lost and not recovered.
|
|
error "Unrecoverable error occurred", error = msg
|
|
quit(QuitFailure)
|
|
|
|
#mount mix
|
|
if conf.mixConf.isSome():
|
|
(
|
|
await node.mountMix(
|
|
conf.clusterId, conf.mixConf.get().mixKey, conf.mixConf.get().mixnodes
|
|
)
|
|
).isOkOr:
|
|
return err("failed to mount waku mix protocol: " & $error)
|
|
|
|
if conf.storeServiceConf.isSome():
|
|
let storeServiceConf = conf.storeServiceConf.get()
|
|
if storeServiceConf.supportV2:
|
|
let archiveDriver = (
|
|
await legacy_driver.ArchiveDriver.new(
|
|
storeServiceConf.dbUrl, storeServiceConf.dbVacuum,
|
|
storeServiceConf.dbMigration, storeServiceConf.maxNumDbConnections,
|
|
onFatalErrorAction,
|
|
)
|
|
).valueOr:
|
|
return err("failed to setup legacy archive driver: " & error)
|
|
|
|
node.mountLegacyArchive(archiveDriver).isOkOr:
|
|
return err("failed to mount waku legacy archive protocol: " & error)
|
|
|
|
## For now we always mount the future archive driver but if the legacy one is mounted,
|
|
## then the legacy will be in charge of performing the archiving.
|
|
## Regarding storage, the only diff between the current/future archive driver and the legacy
|
|
## one, is that the legacy stores an extra field: the id (message digest.)
|
|
|
|
## TODO: remove this "migrate" variable once legacy store is removed
|
|
## It is now necessary because sqlite's legacy store has an extra field: storedAt
|
|
## This breaks compatibility between store's and legacy store's schemas in sqlite
|
|
## So for now, we need to make sure that when legacy store is enabled and we use sqlite
|
|
## that we migrate our db according to legacy store's schema to have the extra field
|
|
|
|
let engine = dburl.getDbEngine(storeServiceConf.dbUrl).valueOr:
|
|
return err("error getting db engine in setupProtocols: " & error)
|
|
|
|
let migrate =
|
|
if engine == "sqlite" and storeServiceConf.supportV2:
|
|
false
|
|
else:
|
|
storeServiceConf.dbMigration
|
|
|
|
let archiveDriver = (
|
|
await driver.ArchiveDriver.new(
|
|
storeServiceConf.dbUrl, storeServiceConf.dbVacuum, migrate,
|
|
storeServiceConf.maxNumDbConnections, onFatalErrorAction,
|
|
)
|
|
).valueOr:
|
|
return err("failed to setup archive driver: " & error)
|
|
|
|
let retPolicy = policy.RetentionPolicy.new(storeServiceConf.retentionPolicy).valueOr:
|
|
return err("failed to create retention policy: " & error)
|
|
|
|
node.mountArchive(archiveDriver, retPolicy).isOkOr:
|
|
return err("failed to mount waku archive protocol: " & error)
|
|
|
|
if storeServiceConf.supportV2:
|
|
# Store legacy setup
|
|
try:
|
|
await mountLegacyStore(node, node.rateLimitSettings.getSetting(STOREV2))
|
|
except CatchableError:
|
|
return
|
|
err("failed to mount waku legacy store protocol: " & getCurrentExceptionMsg())
|
|
|
|
# Store setup
|
|
try:
|
|
await mountStore(node, node.rateLimitSettings.getSetting(STOREV3))
|
|
except CatchableError:
|
|
return err("failed to mount waku store protocol: " & getCurrentExceptionMsg())
|
|
|
|
if storeServiceConf.storeSyncConf.isSome():
|
|
let confStoreSync = storeServiceConf.storeSyncConf.get()
|
|
|
|
(
|
|
await node.mountStoreSync(
|
|
conf.clusterId, conf.subscribeShards, conf.contentTopics,
|
|
confStoreSync.rangeSec, confStoreSync.intervalSec,
|
|
confStoreSync.relayJitterSec,
|
|
)
|
|
).isOkOr:
|
|
return err("failed to mount waku store sync protocol: " & $error)
|
|
|
|
if conf.remoteStoreNode.isSome():
|
|
let storeNode = parsePeerInfo(conf.remoteStoreNode.get()).valueOr:
|
|
return err("failed to set node waku store-sync peer: " & error)
|
|
|
|
node.peerManager.addServicePeer(storeNode, WakuReconciliationCodec)
|
|
node.peerManager.addServicePeer(storeNode, WakuTransferCodec)
|
|
|
|
mountStoreClient(node)
|
|
if conf.remoteStoreNode.isSome():
|
|
let storeNode = parsePeerInfo(conf.remoteStoreNode.get()).valueOr:
|
|
return err("failed to set node waku store peer: " & error)
|
|
node.peerManager.addServicePeer(storeNode, WakuStoreCodec)
|
|
|
|
mountLegacyStoreClient(node)
|
|
if conf.remoteStoreNode.isSome():
|
|
let storeNode = parsePeerInfo(conf.remoteStoreNode.get()).valueOr:
|
|
return err("failed to set node waku legacy store peer: " & error)
|
|
node.peerManager.addServicePeer(storeNode, WakuLegacyStoreCodec)
|
|
|
|
if conf.storeServiceConf.isSome and conf.storeServiceConf.get().resume:
|
|
node.setupStoreResume()
|
|
|
|
if conf.shardingConf.kind == AutoSharding:
|
|
node.mountAutoSharding(conf.clusterId, conf.shardingConf.numShardsInCluster).isOkOr:
|
|
return err("failed to mount waku auto sharding: " & error)
|
|
else:
|
|
warn("Auto sharding is disabled")
|
|
|
|
# Mount relay on all nodes
|
|
var peerExchangeHandler = none(RoutingRecordsHandler)
|
|
if conf.relayPeerExchange:
|
|
proc handlePeerExchange(
|
|
peer: PeerId, topic: string, peers: seq[RoutingRecordsPair]
|
|
) {.gcsafe.} =
|
|
## Handle peers received via gossipsub peer exchange
|
|
# TODO: Only consider peers on pubsub topics we subscribe to
|
|
let exchangedPeers = peers.filterIt(it.record.isSome())
|
|
# only peers with populated records
|
|
.mapIt(toRemotePeerInfo(it.record.get()))
|
|
|
|
info "adding exchanged peers",
|
|
src = peer, topic = topic, numPeers = exchangedPeers.len
|
|
|
|
for peer in exchangedPeers:
|
|
# Peers added are filtered by the peer manager
|
|
node.peerManager.addPeer(peer, PeerOrigin.PeerExchange)
|
|
|
|
peerExchangeHandler = some(handlePeerExchange)
|
|
|
|
# TODO: when using autosharding, the user should not be expected to pass any shards, but only content topics
|
|
# Hence, this joint logic should be removed in favour of an either logic:
|
|
# use passed shards (static) or deduce shards from content topics (auto)
|
|
let autoShards =
|
|
if node.wakuAutoSharding.isSome():
|
|
node.getAutoshards(conf.contentTopics).valueOr:
|
|
return err("Could not get autoshards: " & error)
|
|
else:
|
|
@[]
|
|
|
|
info "Shards created from content topics",
|
|
contentTopics = conf.contentTopics, shards = autoShards
|
|
|
|
let confShards = conf.subscribeShards.mapIt(
|
|
RelayShard(clusterId: conf.clusterId, shardId: uint16(it))
|
|
)
|
|
let shards = confShards & autoShards
|
|
|
|
if conf.relay:
|
|
info "Setting max message size", num_bytes = conf.maxMessageSizeBytes
|
|
|
|
(
|
|
await mountRelay(
|
|
node, peerExchangeHandler = peerExchangeHandler, int(conf.maxMessageSizeBytes)
|
|
)
|
|
).isOkOr:
|
|
return err("failed to mount waku relay protocol: " & $error)
|
|
|
|
# Add validation keys to protected topics
|
|
var subscribedProtectedShards: seq[ProtectedShard]
|
|
for shardKey in conf.protectedShards:
|
|
if shardKey.shard notin conf.subscribeShards:
|
|
warn "protected shard not in subscribed shards, skipping adding validator",
|
|
protectedShard = shardKey.shard, subscribedShards = shards
|
|
continue
|
|
subscribedProtectedShards.add(shardKey)
|
|
notice "routing only signed traffic",
|
|
protectedShard = shardKey.shard, publicKey = shardKey.key
|
|
node.wakuRelay.addSignedShardsValidator(subscribedProtectedShards, conf.clusterId)
|
|
|
|
if conf.rendezvous:
|
|
await node.mountRendezvous(conf.clusterId, shards)
|
|
await node.mountRendezvousClient(conf.clusterId)
|
|
|
|
# Keepalive mounted on all nodes
|
|
try:
|
|
await mountLibp2pPing(node)
|
|
except CatchableError:
|
|
return err("failed to mount libp2p ping protocol: " & getCurrentExceptionMsg())
|
|
|
|
if conf.rlnRelayConf.isSome():
|
|
let rlnRelayConf = conf.rlnRelayConf.get()
|
|
let rlnConf = WakuRlnConfig(
|
|
dynamic: rlnRelayConf.dynamic,
|
|
credIndex: rlnRelayConf.credIndex,
|
|
ethContractAddress: rlnRelayConf.ethContractAddress,
|
|
chainId: rlnRelayConf.chainId,
|
|
ethClientUrls: rlnRelayConf.ethClientUrls,
|
|
creds: rlnRelayConf.creds,
|
|
userMessageLimit: rlnRelayConf.userMessageLimit,
|
|
epochSizeSec: rlnRelayConf.epochSizeSec,
|
|
onFatalErrorAction: onFatalErrorAction,
|
|
)
|
|
|
|
try:
|
|
await node.mountRlnRelay(rlnConf)
|
|
except CatchableError:
|
|
return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg())
|
|
|
|
# NOTE Must be mounted after relay
|
|
if conf.lightPush:
|
|
try:
|
|
(await mountLightPush(node, node.rateLimitSettings.getSetting(LIGHTPUSH))).isOkOr:
|
|
return err("failed to mount waku lightpush protocol: " & $error)
|
|
|
|
(await mountLegacyLightPush(node, node.rateLimitSettings.getSetting(LIGHTPUSH))).isOkOr:
|
|
return err("failed to mount waku legacy lightpush protocol: " & $error)
|
|
except CatchableError:
|
|
return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg())
|
|
|
|
mountLightPushClient(node)
|
|
mountLegacyLightPushClient(node)
|
|
if conf.remoteLightPushNode.isSome():
|
|
let lightPushNode = parsePeerInfo(conf.remoteLightPushNode.get()).valueOr:
|
|
return err("failed to set node waku lightpush peer: " & error)
|
|
node.peerManager.addServicePeer(lightPushNode, WakuLightPushCodec)
|
|
node.peerManager.addServicePeer(lightPushNode, WakuLegacyLightPushCodec)
|
|
|
|
# Filter setup. NOTE Must be mounted after relay
|
|
if conf.filterServiceConf.isSome():
|
|
let confFilter = conf.filterServiceConf.get()
|
|
try:
|
|
await mountFilter(
|
|
node,
|
|
subscriptionTimeout = chronos.seconds(confFilter.subscriptionTimeout),
|
|
maxFilterPeers = confFilter.maxPeersToServe,
|
|
maxFilterCriteriaPerPeer = confFilter.maxCriteria,
|
|
rateLimitSetting = node.rateLimitSettings.getSetting(FILTER),
|
|
)
|
|
except CatchableError:
|
|
return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg())
|
|
|
|
await node.mountFilterClient()
|
|
if conf.remoteFilterNode.isSome():
|
|
let filterNode = parsePeerInfo(conf.remoteFilterNode.get()).valueOr:
|
|
return err("failed to set node waku filter peer: " & error)
|
|
try:
|
|
node.peerManager.addServicePeer(filterNode, WakuFilterSubscribeCodec)
|
|
except CatchableError:
|
|
return
|
|
err("failed to mount waku filter client protocol: " & getCurrentExceptionMsg())
|
|
|
|
# waku peer exchange setup
|
|
if conf.peerExchangeService:
|
|
try:
|
|
await mountPeerExchange(
|
|
node, some(conf.clusterId), node.rateLimitSettings.getSetting(PEEREXCHG)
|
|
)
|
|
except CatchableError:
|
|
return
|
|
err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg())
|
|
|
|
if conf.remotePeerExchangeNode.isSome():
|
|
let peerExchangeNode = parsePeerInfo(conf.remotePeerExchangeNode.get()).valueOr:
|
|
return err("failed to set node waku peer-exchange peer: " & error)
|
|
node.peerManager.addServicePeer(peerExchangeNode, WakuPeerExchangeCodec)
|
|
|
|
if conf.peerExchangeDiscovery:
|
|
await node.mountPeerExchangeClient()
|
|
|
|
return ok()
|
|
|
|
## Start node
|
|
|
|
proc startNode*(
|
|
node: WakuNode, conf: WakuConf, dynamicBootstrapNodes: seq[RemotePeerInfo] = @[]
|
|
): Future[Result[void, string]] {.async: (raises: []).} =
|
|
## Start a configured node and all mounted protocols.
|
|
## Connect to static nodes and start
|
|
## keep-alive, if configured.
|
|
|
|
info "Running nwaku node", version = git_version
|
|
try:
|
|
await node.start()
|
|
except CatchableError:
|
|
return err("failed to start waku node: " & getCurrentExceptionMsg())
|
|
|
|
# Connect to configured static nodes
|
|
if conf.staticNodes.len > 0:
|
|
try:
|
|
await connectToNodes(node, conf.staticNodes, "static")
|
|
except CatchableError:
|
|
return err("failed to connect to static nodes: " & getCurrentExceptionMsg())
|
|
|
|
if dynamicBootstrapNodes.len > 0:
|
|
info "Connecting to dynamic bootstrap peers"
|
|
try:
|
|
await connectToNodes(node, dynamicBootstrapNodes, "dynamic bootstrap")
|
|
except CatchableError:
|
|
return
|
|
err("failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg())
|
|
|
|
# retrieve px peers and add the to the peer store
|
|
if conf.remotePeerExchangeNode.isSome():
|
|
var desiredOutDegree = DefaultPXNumPeersReq
|
|
if not node.wakuRelay.isNil() and node.wakuRelay.parameters.d.uint64() > 0:
|
|
desiredOutDegree = node.wakuRelay.parameters.d.uint64()
|
|
(await node.fetchPeerExchangePeers(desiredOutDegree)).isOkOr:
|
|
error "error while fetching peers from peer exchange", error = error
|
|
|
|
# TODO: behavior described by comment is undesired. PX as client should be used in tandem with discv5.
|
|
#
|
|
# Use px to periodically get peers if discv5 is disabled, as discv5 nodes have their own
|
|
# periodic loop to find peers and px returned peers actually come from discv5
|
|
if conf.peerExchangeDiscovery and not conf.discv5Conf.isSome():
|
|
node.startPeerExchangeLoop()
|
|
|
|
# Maintain relay connections
|
|
if conf.relay:
|
|
node.peerManager.start()
|
|
|
|
return ok()
|
|
|
|
proc setupNode*(
|
|
wakuConf: WakuConf, rng: ref HmacDrbgContext = crypto.newRng(), relay: Relay
|
|
): Future[Result[WakuNode, string]] {.async.} =
|
|
let netConfig = (
|
|
await networkConfiguration(
|
|
wakuConf.clusterId, wakuConf.endpointConf, wakuConf.discv5Conf,
|
|
wakuConf.webSocketConf, wakuConf.wakuFlags, wakuConf.dnsAddrsNameServers,
|
|
wakuConf.portsShift, clientId,
|
|
)
|
|
).valueOr:
|
|
error "failed to create internal config", error = error
|
|
return err("failed to create internal config: " & error)
|
|
|
|
let record = enrConfiguration(wakuConf, netConfig).valueOr:
|
|
error "failed to create record", error = error
|
|
return err("failed to create record: " & error)
|
|
|
|
if isClusterMismatched(record, wakuConf.clusterId):
|
|
error "cluster id mismatch configured shards"
|
|
return err("cluster id mismatch configured shards")
|
|
|
|
info "Setting up storage"
|
|
|
|
## Peer persistence
|
|
var peerStore: Option[WakuPeerStorage]
|
|
if wakuConf.peerPersistence:
|
|
peerStore = setupPeerStorage().valueOr:
|
|
error "Setting up storage failed", error = "failed to setup peer store " & error
|
|
return err("Setting up storage failed: " & error)
|
|
|
|
info "Initializing node"
|
|
|
|
let node = initNode(wakuConf, netConfig, rng, record, peerStore, relay).valueOr:
|
|
error "Initializing node failed", error = error
|
|
return err("Initializing node failed: " & error)
|
|
|
|
info "Mounting protocols"
|
|
|
|
try:
|
|
(await node.setupProtocols(wakuConf)).isOkOr:
|
|
error "Mounting protocols failed", error = error
|
|
return err("Mounting protocols failed: " & error)
|
|
except CatchableError:
|
|
return err("Exception setting up protocols: " & getCurrentExceptionMsg())
|
|
|
|
return ok(node)
|