NagyZoltanPeter dac072f843
feat: HTTP REST API: Filter support v2 (#1890)
Filter v2 rest api support implemented 
Filter rest api documentation updated with v1 and v2 interface support.
Separated legacy filter rest interface
Fix code and tests of v2 Filter rest api
Filter v2 message push test added
Applied autoshard to Filter V2
Redesigned FilterPushHandling, code style, catch up apps and tests with filter v2 interface changes
Rename of FilterV1SubscriptionsRequest to FilterLegacySubscribeRequest, fix broken chat2 app, fix tests
Changed Filter v2 push handler subscription to simple register
Separate node's filterUnsubscribe and filterUnsubscribeAll
2023-09-14 21:28:57 +02:00

695 lines
23 KiB
Nim

when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[options, strutils, sequtils],
stew/results,
chronicles,
chronos,
libp2p/crypto/crypto,
libp2p/nameresolving/dnsresolver,
libp2p/protocols/pubsub/gossipsub,
libp2p/peerid,
eth/keys,
json_rpc/rpcserver,
presto,
metrics,
metrics/chronos_httpserver
import
../../waku/common/utils/nat,
../../waku/common/databases/db_sqlite,
../../waku/waku_archive/driver/builder,
../../waku/waku_archive/retention_policy/builder,
../../waku/waku_core,
../../waku/waku_node,
../../waku/node/waku_metrics,
../../waku/node/peer_manager,
../../waku/node/peer_manager/peer_store/waku_peer_storage,
../../waku/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
../../waku/waku_archive,
../../waku/waku_dnsdisc,
../../waku/waku_enr,
../../waku/waku_discv5,
../../waku/waku_peer_exchange,
../../waku/waku_rln_relay,
../../waku/waku_store,
../../waku/waku_lightpush,
../../waku/waku_filter,
../../waku/waku_filter_v2,
../../waku/waku_filter_v2/client as waku_filter_client,
./wakunode2_validator_signed,
./internal_config,
./external_config
import
../../waku/node/message_cache,
../../waku/node/rest/server,
../../waku/node/rest/debug/handlers as rest_debug_api,
../../waku/node/rest/relay/handlers as rest_relay_api,
../../waku/node/rest/relay/topic_cache,
../../waku/node/rest/filter/legacy_handlers as rest_legacy_filter_api,
../../waku/node/rest/filter/handlers as rest_filter_api,
../../waku/node/rest/store/handlers as rest_store_api,
../../waku/node/rest/health/handlers as rest_health_api,
../../waku/node/jsonrpc/admin/handlers as rpc_admin_api,
../../waku/node/jsonrpc/debug/handlers as rpc_debug_api,
../../waku/node/jsonrpc/filter/handlers as rpc_filter_api,
../../waku/node/jsonrpc/relay/handlers as rpc_relay_api,
../../waku/node/jsonrpc/store/handlers as rpc_store_api
logScope:
topics = "wakunode app"
# Git version in git describe format (defined at compile time)
const git_version* {.strdefine.} = "n/a"
type
App* = object
version: string
conf: WakuNodeConf
netConf: NetConfig
rng: ref HmacDrbgContext
key: crypto.PrivateKey
record: Record
wakuDiscv5: Option[WakuDiscoveryV5]
peerStore: Option[WakuPeerStorage]
dynamicBootstrapNodes: seq[RemotePeerInfo]
node: WakuNode
rpcServer: Option[RpcHttpServer]
restServer: Option[RestServerRef]
metricsServer: Option[MetricsHttpServerRef]
AppResult*[T] = Result[T, string]
func node*(app: App): WakuNode =
app.node
func version*(app: App): string =
app.version
## Initialisation
proc init*(T: type App, rng: ref HmacDrbgContext, conf: WakuNodeConf): T =
let key =
if conf.nodeKey.isSome():
conf.nodeKey.get()
else:
let keyRes = crypto.PrivateKey.random(Secp256k1, rng[])
if keyRes.isErr():
error "failed to generate key", error=keyRes.error
quit(QuitFailure)
keyRes.get()
let netConfigRes = networkConfiguration(conf, clientId)
let netConfig =
if netConfigRes.isErr():
error "failed to create internal config", error=netConfigRes.error
quit(QuitFailure)
else: netConfigRes.get()
var enrBuilder = EnrBuilder.init(key)
enrBuilder.withIpAddressAndPorts(
netConfig.enrIp,
netConfig.enrPort,
netConfig.discv5UdpPort
)
if netConfig.wakuFlags.isSome():
enrBuilder.withWakuCapabilities(netConfig.wakuFlags.get())
enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs)
let topics =
if conf.pubsubTopics.len > 0 or conf.contentTopics.len > 0:
let shardsRes = conf.contentTopics.mapIt(getShard(it))
for res in shardsRes:
if res.isErr():
error "failed to shard content topic", error=res.error
quit(QuitFailure)
let shards = shardsRes.mapIt(it.get())
conf.pubsubTopics & shards
else:
conf.topics
let addShardedTopics = enrBuilder.withShardedTopics(topics)
if addShardedTopics.isErr():
error "failed to add sharded topics to ENR", error=addShardedTopics.error
quit(QuitFailure)
let recordRes = enrBuilder.build()
let record =
if recordRes.isErr():
error "failed to create record", error=recordRes.error
quit(QuitFailure)
else: recordRes.get()
App(
version: git_version,
conf: conf,
netConf: netConfig,
rng: rng,
key: key,
record: record,
node: nil
)
## Peer persistence
const PeerPersistenceDbUrl = "peers.db"
proc setupPeerStorage(): AppResult[Option[WakuPeerStorage]] =
let db = ? SqliteDatabase.new(PeerPersistenceDbUrl)
? peer_store_sqlite_migrations.migrate(db)
let res = WakuPeerStorage.new(db)
if res.isErr():
return err("failed to init peer store" & res.error)
ok(some(res.value))
proc setupPeerPersistence*(app: var App): AppResult[void] =
if not app.conf.peerPersistence:
return ok()
let peerStoreRes = setupPeerStorage()
if peerStoreRes.isErr():
return err("failed to setup peer store" & peerStoreRes.error)
app.peerStore = peerStoreRes.get()
ok()
## Retrieve dynamic bootstrap nodes (DNS discovery)
proc retrieveDynamicBootstrapNodes*(dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[ValidIpAddress]): AppResult[seq[RemotePeerInfo]] =
if dnsDiscovery and dnsDiscoveryUrl != "":
# DNS discovery
debug "Discovering nodes using Waku DNS discovery", url=dnsDiscoveryUrl
var nameServers: seq[TransportAddress]
for ip in dnsDiscoveryNameServers:
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53
let dnsResolver = DnsResolver.new(nameServers)
proc resolver(domain: string): Future[string] {.async, gcsafe.} =
trace "resolving", domain=domain
let resolved = await dnsResolver.resolveTxt(domain)
return resolved[0] # Use only first answer
var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver)
if wakuDnsDiscovery.isOk():
return wakuDnsDiscovery.get().findPeers()
.mapErr(proc (e: cstring): string = $e)
else:
warn "Failed to init Waku DNS discovery"
debug "No method for retrieving dynamic bootstrap nodes specified."
ok(newSeq[RemotePeerInfo]()) # Return an empty seq by default
proc setupDyamicBootstrapNodes*(app: var App): AppResult[void] =
let dynamicBootstrapNodesRes = retrieveDynamicBootstrapNodes(app.conf.dnsDiscovery,
app.conf.dnsDiscoveryUrl,
app.conf.dnsDiscoveryNameServers)
if dynamicBootstrapNodesRes.isOk():
app.dynamicBootstrapNodes = dynamicBootstrapNodesRes.get()
else:
warn "2/7 Retrieving dynamic bootstrap nodes failed. Continuing without dynamic bootstrap nodes.", error=dynamicBootstrapNodesRes.error
ok()
## Setup DiscoveryV5
proc setupDiscoveryV5*(app: App): WakuDiscoveryV5 =
let dynamicBootstrapEnrs = app.dynamicBootstrapNodes
.filterIt(it.hasUdpPort())
.mapIt(it.enr.get())
var discv5BootstrapEnrs: seq[enr.Record]
# parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq
for enrUri in app.conf.discv5BootstrapNodes:
addBootstrapNode(enrUri, discv5BootstrapEnrs)
discv5BootstrapEnrs.add(dynamicBootstrapEnrs)
let discv5Config = DiscoveryConfig.init(app.conf.discv5TableIpLimit,
app.conf.discv5BucketIpLimit,
app.conf.discv5BitsPerHop)
let discv5UdpPort = Port(uint16(app.conf.discv5UdpPort) + app.conf.portsShift)
let discv5Conf = WakuDiscoveryV5Config(
discv5Config: some(discv5Config),
address: app.conf.listenAddress,
port: discv5UdpPort,
privateKey: keys.PrivateKey(app.key.skkey),
bootstrapRecords: discv5BootstrapEnrs,
autoupdateRecord: app.conf.discv5EnrAutoUpdate,
)
WakuDiscoveryV5.new(app.rng, discv5Conf, some(app.record))
## Init waku node instance
proc initNode(conf: WakuNodeConf,
netConfig: NetConfig,
rng: ref HmacDrbgContext,
nodeKey: crypto.PrivateKey,
record: enr.Record,
peerStore: Option[WakuPeerStorage],
dynamicBootstrapNodes: openArray[RemotePeerInfo] = @[]): AppResult[WakuNode] =
## Setup a basic Waku v2 node based on a supplied configuration
## file. Optionally include persistent peer storage.
## No protocols are mounted yet.
var dnsResolver: DnsResolver
if conf.dnsAddrs:
# Support for DNS multiaddrs
var nameServers: seq[TransportAddress]
for ip in conf.dnsAddrsNameServers:
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53
dnsResolver = DnsResolver.new(nameServers)
var node: WakuNode
let pStorage = if peerStore.isNone(): nil
else: peerStore.get()
# Build waku node instance
var builder = WakuNodeBuilder.init()
builder.withRng(rng)
builder.withNodeKey(nodekey)
builder.withRecord(record)
builder.withNetworkConfiguration(netConfig)
builder.withPeerStorage(pStorage, capacity = conf.peerStoreCapacity)
builder.withSwitchConfiguration(
maxConnections = some(conf.maxConnections.int),
secureKey = some(conf.websocketSecureKeyPath),
secureCert = some(conf.websocketSecureCertPath),
nameResolver = dnsResolver,
sendSignedPeerRecord = conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled
agentString = some(conf.agentString)
)
builder.withPeerManagerConfig(maxRelayPeers = conf.maxRelayPeers)
node = ? builder.build().mapErr(proc (err: string): string = "failed to create waku node instance: " & err)
ok(node)
proc setupWakuApp*(app: var App): AppResult[void] =
## Discv5
if app.conf.discv5Discovery:
app.wakuDiscV5 = some(app.setupDiscoveryV5())
## Waku node
let initNodeRes = initNode(app.conf, app.netConf, app.rng, app.key, app.record, app.peerStore, app.dynamicBootstrapNodes)
if initNodeRes.isErr():
return err("failed to init node: " & initNodeRes.error)
app.node = initNodeRes.get()
ok()
## Mount protocols
proc setupProtocols(node: WakuNode,
conf: WakuNodeConf,
nodeKey: crypto.PrivateKey):
Future[AppResult[void]] {.async.} =
## Setup configured protocols on an existing Waku v2 node.
## Optionally include persistent message storage.
## No protocols are started yet.
# Mount relay on all nodes
var peerExchangeHandler = none(RoutingRecordsHandler)
if conf.relayPeerExchange:
proc handlePeerExchange(peer: PeerId, topic: string,
peers: seq[RoutingRecordsPair]) {.gcsafe.} =
## Handle peers received via gossipsub peer exchange
# TODO: Only consider peers on pubsub topics we subscribe to
let exchangedPeers = peers.filterIt(it.record.isSome()) # only peers with populated records
.mapIt(toRemotePeerInfo(it.record.get()))
debug "connecting to exchanged peers", src=peer, topic=topic, numPeers=exchangedPeers.len
# asyncSpawn, as we don't want to block here
asyncSpawn node.connectToNodes(exchangedPeers, "peer exchange")
peerExchangeHandler = some(handlePeerExchange)
if conf.relay:
let pubsubTopics =
if conf.pubsubTopics.len > 0 or conf.contentTopics.len > 0:
# TODO autoshard content topics only once.
# Already checked for errors in app.init
let shards = conf.contentTopics.mapIt(getShard(it).expect("Valid Shard"))
conf.pubsubTopics & shards
else:
conf.topics
try:
await mountRelay(node, pubsubTopics, peerExchangeHandler = peerExchangeHandler)
except CatchableError:
return err("failed to mount waku relay protocol: " & getCurrentExceptionMsg())
# Add validation keys to protected topics
for topicKey in conf.protectedTopics:
if topicKey.topic notin pubsubTopics:
warn "protected topic not in subscribed pubsub topics, skipping adding validator",
protectedTopic=topicKey.topic, subscribedTopics=pubsubTopics
continue
notice "routing only signed traffic", protectedTopic=topicKey.topic, publicKey=topicKey.key
node.wakuRelay.addSignedTopicValidator(Pubsubtopic(topicKey.topic), topicKey.key)
# Enable Rendezvous Discovery protocol when Relay is enabled
try:
await mountRendezvous(node)
except CatchableError:
return err("failed to mount waku rendezvous protocol: " & getCurrentExceptionMsg())
# Keepalive mounted on all nodes
try:
await mountLibp2pPing(node)
except CatchableError:
return err("failed to mount libp2p ping protocol: " & getCurrentExceptionMsg())
if conf.rlnRelay:
let rlnConf = WakuRlnConfig(
rlnRelayDynamic: conf.rlnRelayDynamic,
rlnRelayCredIndex: conf.rlnRelayCredIndex,
rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress,
rlnRelayEthClientAddress: conf.rlnRelayEthClientAddress,
rlnRelayCredPath: conf.rlnRelayCredPath,
rlnRelayCredPassword: conf.rlnRelayCredPassword,
rlnRelayTreePath: conf.rlnRelayTreePath,
)
try:
waitFor node.mountRlnRelay(rlnConf)
except CatchableError:
return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg())
if conf.store:
var onErrAction = proc(msg: string) {.gcsafe, closure.} =
## Action to be taken when an internal error occurs during the node run.
## e.g. the connection with the database is lost and not recovered.
error "Unrecoverable error occurred", error = msg
quit(QuitFailure)
# Archive setup
let archiveDriverRes = ArchiveDriver.new(conf.storeMessageDbUrl,
conf.storeMessageDbVacuum,
conf.storeMessageDbMigration,
onErrAction)
if archiveDriverRes.isErr():
return err("failed to setup archive driver: " & archiveDriverRes.error)
let retPolicyRes = RetentionPolicy.new(conf.storeMessageRetentionPolicy)
if retPolicyRes.isErr():
return err("failed to create retention policy: " & retPolicyRes.error)
let mountArcRes = node.mountArchive(archiveDriverRes.get(),
retPolicyRes.get())
if mountArcRes.isErr():
return err("failed to mount waku archive protocol: " & mountArcRes.error)
# Store setup
try:
await mountStore(node)
except CatchableError:
return err("failed to mount waku store protocol: " & getCurrentExceptionMsg())
mountStoreClient(node)
if conf.storenode != "":
let storeNode = parsePeerInfo(conf.storenode)
if storeNode.isOk():
node.peerManager.addServicePeer(storeNode.value, WakuStoreCodec)
else:
return err("failed to set node waku store peer: " & storeNode.error)
# NOTE Must be mounted after relay
if conf.lightpush:
try:
await mountLightPush(node)
except CatchableError:
return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg())
if conf.lightpushnode != "":
let lightPushNode = parsePeerInfo(conf.lightpushnode)
if lightPushNode.isOk():
mountLightPushClient(node)
node.peerManager.addServicePeer(lightPushNode.value, WakuLightPushCodec)
else:
return err("failed to set node waku lightpush peer: " & lightPushNode.error)
# Filter setup. NOTE Must be mounted after relay
if conf.filter:
try:
await mountFilter(node, filterTimeout = chronos.seconds(conf.filterTimeout))
except CatchableError:
return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg())
if conf.filternode != "":
let filterNode = parsePeerInfo(conf.filternode)
if filterNode.isOk():
await node.mountFilterClient()
node.peerManager.addServicePeer(filterNode.value, WakuLegacyFilterCodec)
node.peerManager.addServicePeer(filterNode.value, WakuFilterSubscribeCodec)
else:
return err("failed to set node waku filter peer: " & filterNode.error)
# waku peer exchange setup
if conf.peerExchangeNode != "" or conf.peerExchange:
try:
await mountPeerExchange(node)
except CatchableError:
return err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg())
if conf.peerExchangeNode != "":
let peerExchangeNode = parsePeerInfo(conf.peerExchangeNode)
if peerExchangeNode.isOk():
node.peerManager.addServicePeer(peerExchangeNode.value, WakuPeerExchangeCodec)
else:
return err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error)
return ok()
proc setupAndMountProtocols*(app: App): Future[AppResult[void]] {.async.} =
return await setupProtocols(
app.node,
app.conf,
app.key
)
## Start node
proc startNode(node: WakuNode, conf: WakuNodeConf,
dynamicBootstrapNodes: seq[RemotePeerInfo] = @[]): Future[AppResult[void]] {.async.} =
## Start a configured node and all mounted protocols.
## Connect to static nodes and start
## keep-alive, if configured.
# Start Waku v2 node
try:
await node.start()
except CatchableError:
return err("failed to start waku node: " & getCurrentExceptionMsg())
# Connect to configured static nodes
if conf.staticnodes.len > 0:
try:
await connectToNodes(node, conf.staticnodes, "static")
except CatchableError:
return err("failed to connect to static nodes: " & getCurrentExceptionMsg())
if dynamicBootstrapNodes.len > 0:
info "Connecting to dynamic bootstrap peers"
try:
await connectToNodes(node, dynamicBootstrapNodes, "dynamic bootstrap")
except CatchableError:
return err("failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg())
# retrieve px peers and add the to the peer store
if conf.peerExchangeNode != "":
let desiredOutDegree = node.wakuRelay.parameters.d.uint64()
await node.fetchPeerExchangePeers(desiredOutDegree)
# Start keepalive, if enabled
if conf.keepAlive:
node.startKeepalive()
# Maintain relay connections
if conf.relay:
node.peerManager.start()
return ok()
proc startApp*(app: App): Future[AppResult[void]] {.async.} =
if app.wakuDiscv5.isSome():
let wakuDiscv5 = app.wakuDiscv5.get()
let res = wakuDiscv5.start()
if res.isErr():
return err("failed to start waku discovery v5: " & $res.error)
asyncSpawn wakuDiscv5.searchLoop(app.node.peerManager)
asyncSpawn wakuDiscv5.subscriptionsListener(app.node.topicSubscriptionQueue)
return await startNode(
app.node,
app.conf,
app.dynamicBootstrapNodes
)
## Monitoring and external interfaces
proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNodeConf): AppResult[RestServerRef] =
let server = ? newRestHttpServer(address, port)
## Debug REST API
installDebugApiHandlers(server.router, app.node)
## Health REST API
installHealthApiHandler(server.router, app.node)
## Relay REST API
if conf.relay:
let relayCache = TopicCache.init(capacity=conf.restRelayCacheCapacity)
installRelayApiHandlers(server.router, app.node, relayCache)
## Filter REST API
if conf.filter:
let legacyFilterCache = rest_legacy_filter_api.MessageCache.init()
rest_legacy_filter_api.installLegacyFilterRestApiHandlers(server.router, app.node, legacyFilterCache)
let filterCache = rest_filter_api.MessageCache.init()
rest_filter_api.installFilterRestApiHandlers(server.router, app.node, filterCache)
## Store REST API
installStoreApiHandlers(server.router, app.node)
server.start()
info "Starting REST HTTP server", url = "http://" & $address & ":" & $port & "/"
ok(server)
proc startRpcServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNodeConf): AppResult[RpcHttpServer] =
let ta = initTAddress(address, port)
var server: RpcHttpServer
try:
server = newRpcHttpServer([ta])
except CatchableError:
return err("failed to init JSON-RPC server: " & getCurrentExceptionMsg())
installDebugApiHandlers(app.node, server)
if conf.relay:
let relayMessageCache = rpc_relay_api.MessageCache.init(capacity=30)
installRelayApiHandlers(app.node, server, relayMessageCache)
if conf.filternode != "":
let filterMessageCache = rpc_filter_api.MessageCache.init(capacity=30)
installFilterApiHandlers(app.node, server, filterMessageCache)
installStoreApiHandlers(app.node, server)
if conf.rpcAdmin:
installAdminApiHandlers(app.node, server)
server.start()
info "RPC Server started", address=ta
ok(server)
proc startMetricsServer(serverIp: ValidIpAddress, serverPort: Port): AppResult[MetricsHttpServerRef] =
info "Starting metrics HTTP server", serverIp= $serverIp, serverPort= $serverPort
let metricsServerRes = MetricsHttpServerRef.new($serverIp, serverPort)
if metricsServerRes.isErr():
return err("metrics HTTP server start failed: " & $metricsServerRes.error)
let server = metricsServerRes.value
try:
waitFor server.start()
except CatchableError:
return err("metrics HTTP server start failed: " & getCurrentExceptionMsg())
info "Metrics HTTP server started", serverIp= $serverIp, serverPort= $serverPort
ok(server)
proc startMetricsLogging(): AppResult[void] =
startMetricsLog()
ok()
proc setupMonitoringAndExternalInterfaces*(app: var App): AppResult[void] =
if app.conf.rpc:
let startRpcServerRes = startRpcServer(app, app.conf.rpcAddress, Port(app.conf.rpcPort + app.conf.portsShift), app.conf)
if startRpcServerRes.isErr():
error "6/7 Starting JSON-RPC server failed. Continuing in current state.", error=startRpcServerRes.error
else:
app.rpcServer = some(startRpcServerRes.value)
if app.conf.rest:
let startRestServerRes = startRestServer(app, app.conf.restAddress, Port(app.conf.restPort + app.conf.portsShift), app.conf)
if startRestServerRes.isErr():
error "6/7 Starting REST server failed. Continuing in current state.", error=startRestServerRes.error
else:
app.restServer = some(startRestServerRes.value)
if app.conf.metricsServer:
let startMetricsServerRes = startMetricsServer(app.conf.metricsServerAddress, Port(app.conf.metricsServerPort + app.conf.portsShift))
if startMetricsServerRes.isErr():
error "6/7 Starting metrics server failed. Continuing in current state.", error=startMetricsServerRes.error
else:
app.metricsServer = some(startMetricsServerRes.value)
if app.conf.metricsLogging:
let startMetricsLoggingRes = startMetricsLogging()
if startMetricsLoggingRes.isErr():
error "6/7 Starting metrics console logging failed. Continuing in current state.", error=startMetricsLoggingRes.error
ok()
# App shutdown
proc stop*(app: App): Future[void] {.async.} =
if app.restServer.isSome():
await app.restServer.get().stop()
if app.rpcServer.isSome():
await app.rpcServer.get().stop()
if app.metricsServer.isSome():
await app.metricsServer.get().stop()
if app.wakuDiscv5.isSome():
await app.wakuDiscv5.get().stop()
if not app.node.isNil():
await app.node.stop()