NagyZoltanPeter dd86da3247 feat: HTTP REST API: Filter support v2 (#1890)
Filter v2 rest api support implemented 
Filter rest api documentation updated with v1 and v2 interface support.
Separated legacy filter rest interface
Fix code and tests of v2 Filter rest api
Filter v2 message push test added
Applied autoshard to Filter V2
Redesigned FilterPushHandling, code style, catch up apps and tests with filter v2 interface changes
Rename of FilterV1SubscriptionsRequest to FilterLegacySubscribeRequest, fix broken chat2 app, fix tests
Changed Filter v2 push handler subscription to simple register
Separate node's filterUnsubscribe and filterUnsubscribeAll
2023-09-14 21:28:57 +02:00

323 lines
10 KiB
Nim

when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[tables, times, strutils, hashes, sequtils],
chronos, confutils, chronicles, chronicles/topics_registry, chronos/streams/tlsstream,
metrics, metrics/chronos_httpserver,
stew/byteutils,
stew/shims/net as stewNet, json_rpc/rpcserver,
# Matterbridge client imports
../../waku/common/utils/matterbridge_client,
# Waku v2 imports
libp2p/crypto/crypto,
libp2p/errors,
../../../waku/waku_core,
../../../waku/waku_node,
../../../waku/node/peer_manager,
../../waku/waku_filter,
../../waku/waku_filter_v2,
../../waku/waku_store,
# Chat 2 imports
../chat2/chat2,
# Common cli config
./config_chat2bridge
declarePublicCounter chat2_mb_transfers, "Number of messages transferred between chat2 and Matterbridge", ["type"]
declarePublicCounter chat2_mb_dropped, "Number of messages dropped", ["reason"]
logScope:
topics = "chat2bridge"
##################
# Default values #
##################
const
DeduplQSize = 20 # Maximum number of seen messages to keep in deduplication queue
#########
# Types #
#########
type
Chat2MatterBridge* = ref object of RootObj
mbClient*: MatterbridgeClient
nodev2*: WakuNode
running: bool
pollPeriod: chronos.Duration
seen: seq[Hash] #FIFO queue
contentTopic: string
MbMessageHandler* = proc (jsonNode: JsonNode) {.gcsafe.}
###################
# Helper funtions #
###################S
proc containsOrAdd(sequence: var seq[Hash], hash: Hash): bool =
if sequence.contains(hash):
return true
if sequence.len >= DeduplQSize:
trace "Deduplication queue full. Removing oldest item."
sequence.delete 0, 0 # Remove first item in queue
sequence.add(hash)
return false
proc toWakuMessage(cmb: Chat2MatterBridge, jsonNode: JsonNode): WakuMessage {.raises: [Defect, KeyError]} =
# Translates a Matterbridge API JSON response to a Waku v2 message
let msgFields = jsonNode.getFields()
# @TODO error handling here - verify expected fields
let chat2pb = Chat2Message(timestamp: getTime().toUnix(), # @TODO use provided timestamp
nick: msgFields["username"].getStr(),
payload: msgFields["text"].getStr().toBytes()).encode()
WakuMessage(payload: chat2pb.buffer,
contentTopic: cmb.contentTopic,
version: 0)
proc toChat2(cmb: Chat2MatterBridge, jsonNode: JsonNode) {.async.} =
let msg = cmb.toWakuMessage(jsonNode)
if cmb.seen.containsOrAdd(msg.payload.hash()):
# This is a duplicate message. Return.
chat2_mb_dropped.inc(labelValues = ["duplicate"])
return
trace "Post Matterbridge message to chat2"
chat2_mb_transfers.inc(labelValues = ["mb_to_chat2"])
await cmb.nodev2.publish(DefaultPubsubTopic, msg)
proc toMatterbridge(cmb: Chat2MatterBridge, msg: WakuMessage) {.gcsafe, raises: [Exception].} =
if cmb.seen.containsOrAdd(msg.payload.hash()):
# This is a duplicate message. Return.
chat2_mb_dropped.inc(labelValues = ["duplicate"])
return
if msg.contentTopic != cmb.contentTopic:
# Only bridge messages on the configured content topic
chat2_mb_dropped.inc(labelValues = ["filtered"])
return
trace "Post chat2 message to Matterbridge"
chat2_mb_transfers.inc(labelValues = ["chat2_to_mb"])
let chat2Msg = Chat2Message.init(msg.payload)
assert chat2Msg.isOk
let postRes = cmb.mbClient.postMessage(text = string.fromBytes(chat2Msg[].payload),
username = chat2Msg[].nick)
if postRes.isErr() or (postRes[] == false):
chat2_mb_dropped.inc(labelValues = ["duplicate"])
error "Matterbridge host unreachable. Dropping message."
proc pollMatterbridge(cmb: Chat2MatterBridge, handler: MbMessageHandler) {.async.} =
while cmb.running:
let getRes = cmb.mbClient.getMessages()
if getRes.isOk():
for jsonNode in getRes[]:
handler(jsonNode)
else:
error "Matterbridge host unreachable. Sleeping before retrying."
await sleepAsync(chronos.seconds(10))
await sleepAsync(cmb.pollPeriod)
##############
# Public API #
##############
proc new*(T: type Chat2MatterBridge,
# Matterbridge initialisation
mbHostUri: string,
mbGateway: string,
# NodeV2 initialisation
nodev2Key: crypto.PrivateKey,
nodev2BindIp: ValidIpAddress, nodev2BindPort: Port,
nodev2ExtIp = none[ValidIpAddress](), nodev2ExtPort = none[Port](),
contentTopic: string): T
{.raises: [Defect, ValueError, KeyError, TLSStreamProtocolError, IOError, LPError].} =
# Setup Matterbridge
let
mbClient = MatterbridgeClient.new(mbHostUri, mbGateway)
# Let's verify the Matterbridge configuration before continuing
let clientHealth = mbClient.isHealthy()
if clientHealth.isOk() and clientHealth[]:
info "Reached Matterbridge host", host=mbClient.host
else:
raise newException(ValueError, "Matterbridge client not reachable/healthy")
# Setup Waku v2 node
let nodev2 = block:
var builder = WakuNodeBuilder.init()
builder.withNodeKey(nodev2Key)
builder.withNetworkConfigurationDetails(nodev2BindIp, nodev2BindPort, nodev2ExtIp, nodev2ExtPort).tryGet()
builder.build().tryGet()
return Chat2MatterBridge(mbClient: mbClient,
nodev2: nodev2,
running: false,
pollPeriod: chronos.seconds(1),
contentTopic: contentTopic)
proc start*(cmb: Chat2MatterBridge) {.async.} =
info "Starting Chat2MatterBridge"
cmb.running = true
debug "Start polling Matterbridge"
# Start Matterbridge polling (@TODO: use streaming interface)
proc mbHandler(jsonNode: JsonNode) {.gcsafe, raises: [Exception].} =
trace "Bridging message from Matterbridge to chat2", jsonNode=jsonNode
waitFor cmb.toChat2(jsonNode)
asyncSpawn cmb.pollMatterbridge(mbHandler)
# Start Waku v2 node
debug "Start listening on Waku v2"
await cmb.nodev2.start()
# Always mount relay for bridge
# `triggerSelf` is false on a `bridge` to avoid duplicates
await cmb.nodev2.mountRelay()
cmb.nodev2.wakuRelay.triggerSelf = false
# Bridging
# Handle messages on Waku v2 and bridge to Matterbridge
proc relayHandler(pubsubTopic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
trace "Bridging message from Chat2 to Matterbridge", msg=msg
cmb.toMatterbridge(msg)
cmb.nodev2.subscribe(DefaultPubsubTopic, relayHandler)
proc stop*(cmb: Chat2MatterBridge) {.async.} =
info "Stopping Chat2MatterBridge"
cmb.running = false
await cmb.nodev2.stop()
{.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
when isMainModule:
import
../../../waku/common/utils/nat,
../../waku/node/message_cache,
../../waku/node/jsonrpc/debug/handlers as debug_api,
../../waku/node/jsonrpc/filter/handlers as filter_api,
../../waku/node/jsonrpc/relay/handlers as relay_api,
../../waku/node/jsonrpc/store/handlers as store_api
proc startV2Rpc(node: WakuNode, rpcServer: RpcHttpServer, conf: Chat2MatterbridgeConf) {.raises: [Exception].} =
installDebugApiHandlers(node, rpcServer)
# Install enabled API handlers:
if conf.relay:
let topicCache = relay_api.MessageCache.init(capacity=30)
installRelayApiHandlers(node, rpcServer, topicCache)
if conf.filter:
let messageCache = filter_api.MessageCache.init(capacity=30)
installFilterApiHandlers(node, rpcServer, messageCache)
if conf.store:
installStoreApiHandlers(node, rpcServer)
rpcServer.start()
let
rng = newRng()
conf = Chat2MatterbridgeConf.load()
if conf.logLevel != LogLevel.NONE:
setLogLevel(conf.logLevel)
let natRes = setupNat(conf.nat, clientId,
Port(uint16(conf.libp2pTcpPort) + conf.portsShift),
Port(uint16(conf.udpPort) + conf.portsShift))
if natRes.isErr():
error "Error in setupNat", error = natRes.error
# Load address configuration
let
(nodev2ExtIp, nodev2ExtPort, _) = natRes.get()
## The following heuristic assumes that, in absence of manual
## config, the external port is the same as the bind port.
extPort = if nodev2ExtIp.isSome() and nodev2ExtPort.isNone():
some(Port(uint16(conf.libp2pTcpPort) + conf.portsShift))
else:
nodev2ExtPort
let
bridge = Chat2Matterbridge.new(
mbHostUri = "http://" & $initTAddress(conf.mbHostAddress, Port(conf.mbHostPort)),
mbGateway = conf.mbGateway,
nodev2Key = conf.nodekey,
nodev2BindIp = conf.listenAddress, nodev2BindPort = Port(uint16(conf.libp2pTcpPort) + conf.portsShift),
nodev2ExtIp = nodev2ExtIp, nodev2ExtPort = extPort,
contentTopic = conf.contentTopic)
waitFor bridge.start()
# Now load rest of config
# Mount configured Waku v2 protocols
waitFor mountLibp2pPing(bridge.nodev2)
if conf.store:
waitFor mountStore(bridge.nodev2)
if conf.filter:
waitFor mountFilter(bridge.nodev2)
if conf.staticnodes.len > 0:
waitFor connectToNodes(bridge.nodev2, conf.staticnodes)
if conf.storenode != "":
let storePeer = parsePeerInfo(conf.storenode)
if storePeer.isOk():
bridge.nodev2.peerManager.addServicePeer(storePeer.value, WakuStoreCodec)
else:
error "Error parsing conf.storenode", error = storePeer.error
if conf.filternode != "":
let filterPeer = parsePeerInfo(conf.filternode)
if filterPeer.isOk():
bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuLegacyFilterCodec)
bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuFilterSubscribeCodec)
else:
error "Error parsing conf.filternode", error = filterPeer.error
if conf.rpc:
let ta = initTAddress(conf.rpcAddress,
Port(conf.rpcPort + conf.portsShift))
var rpcServer = newRpcHttpServer([ta])
# Waku v2 rpc
startV2Rpc(bridge.nodev2, rpcServer, conf)
rpcServer.start()
if conf.metricsServer:
let
address = conf.metricsServerAddress
port = conf.metricsServerPort + conf.portsShift
info "Starting metrics HTTP server", address, port
startMetricsHttpServer($address, Port(port))
runForever()