nwaku/apps/chat2bridge/chat2bridge.nim

324 lines
8.9 KiB
Nim

{.push raises: [].}
import
std/[tables, times, strutils, hashes, sequtils],
chronos,
confutils,
chronicles,
chronicles/topics_registry,
chronos/streams/tlsstream,
metrics,
metrics/chronos_httpserver,
stew/byteutils,
eth/net/nat,
json_rpc/rpcserver,
# Matterbridge client imports
# Waku v2 imports
libp2p/crypto/crypto,
libp2p/errors,
waku/[
waku_core,
waku_node,
node/peer_manager,
waku_filter_v2,
waku_store,
factory/builder,
common/utils/matterbridge_client,
],
# Chat 2 imports
../chat2/chat2,
# Common cli config
./config_chat2bridge
declarePublicCounter chat2_mb_transfers,
"Number of messages transferred between chat2 and Matterbridge", ["type"]
declarePublicCounter chat2_mb_dropped, "Number of messages dropped", ["reason"]
logScope:
topics = "chat2bridge"
##################
# Default values #
##################
const DeduplQSize = 20 # Maximum number of seen messages to keep in deduplication queue
#########
# Types #
#########
type
Chat2MatterBridge* = ref object of RootObj
mbClient*: MatterbridgeClient
nodev2*: WakuNode
running: bool
pollPeriod: chronos.Duration
seen: seq[Hash] #FIFO queue
contentTopic: string
MbMessageHandler = proc(jsonNode: JsonNode) {.async.}
###################
# Helper functions #
###################S
proc containsOrAdd(sequence: var seq[Hash], hash: Hash): bool =
if sequence.contains(hash):
return true
if sequence.len >= DeduplQSize:
trace "Deduplication queue full. Removing oldest item."
sequence.delete 0, 0 # Remove first item in queue
sequence.add(hash)
return false
proc toWakuMessage(
cmb: Chat2MatterBridge, jsonNode: JsonNode
): WakuMessage {.raises: [Defect, KeyError].} =
# Translates a Matterbridge API JSON response to a Waku v2 message
let msgFields = jsonNode.getFields()
# @TODO error handling here - verify expected fields
let chat2pb = Chat2Message(
timestamp: getTime().toUnix(), # @TODO use provided timestamp
nick: msgFields["username"].getStr(),
payload: msgFields["text"].getStr().toBytes(),
).encode()
WakuMessage(payload: chat2pb.buffer, contentTopic: cmb.contentTopic, version: 0)
proc toChat2(cmb: Chat2MatterBridge, jsonNode: JsonNode) {.async.} =
let msg = cmb.toWakuMessage(jsonNode)
if cmb.seen.containsOrAdd(msg.payload.hash()):
# This is a duplicate message. Return.
chat2_mb_dropped.inc(labelValues = ["duplicate"])
return
trace "Post Matterbridge message to chat2"
chat2_mb_transfers.inc(labelValues = ["mb_to_chat2"])
(await cmb.nodev2.publish(some(DefaultPubsubTopic), msg)).isOkOr:
error "failed to publish message", error = error
proc toMatterbridge(
cmb: Chat2MatterBridge, msg: WakuMessage
) {.gcsafe, raises: [Exception].} =
if cmb.seen.containsOrAdd(msg.payload.hash()):
# This is a duplicate message. Return.
chat2_mb_dropped.inc(labelValues = ["duplicate"])
return
if msg.contentTopic != cmb.contentTopic:
# Only bridge messages on the configured content topic
chat2_mb_dropped.inc(labelValues = ["filtered"])
return
trace "Post chat2 message to Matterbridge"
chat2_mb_transfers.inc(labelValues = ["chat2_to_mb"])
let chat2Msg = Chat2Message.init(msg.payload)
assert chat2Msg.isOk
let postRes = cmb.mbClient.postMessage(
text = string.fromBytes(chat2Msg[].payload), username = chat2Msg[].nick
)
if postRes.isErr() or (postRes[] == false):
chat2_mb_dropped.inc(labelValues = ["duplicate"])
error "Matterbridge host unreachable. Dropping message."
proc pollMatterbridge(cmb: Chat2MatterBridge, handler: MbMessageHandler) {.async.} =
while cmb.running:
let getRes = cmb.mbClient.getMessages()
if getRes.isOk():
for jsonNode in getRes[]:
await handler(jsonNode)
else:
error "Matterbridge host unreachable. Sleeping before retrying."
await sleepAsync(chronos.seconds(10))
await sleepAsync(cmb.pollPeriod)
##############
# Public API #
##############
proc new*(
T: type Chat2MatterBridge,
# Matterbridge initialisation
mbHostUri: string,
mbGateway: string,
# NodeV2 initialisation
nodev2Key: crypto.PrivateKey,
nodev2BindIp: IpAddress,
nodev2BindPort: Port,
nodev2ExtIp = none[IpAddress](),
nodev2ExtPort = none[Port](),
contentTopic: string,
): T {.
raises: [Defect, ValueError, KeyError, TLSStreamProtocolError, IOError, LPError]
.} =
# Setup Matterbridge
let mbClient = MatterbridgeClient.new(mbHostUri, mbGateway)
# Let's verify the Matterbridge configuration before continuing
let clientHealth = mbClient.isHealthy()
if clientHealth.isOk() and clientHealth[]:
info "Reached Matterbridge host", host = mbClient.host
else:
raise newException(ValueError, "Matterbridge client not reachable/healthy")
# Setup Waku v2 node
let nodev2 = block:
var builder = WakuNodeBuilder.init()
builder.withNodeKey(nodev2Key)
builder
.withNetworkConfigurationDetails(
nodev2BindIp, nodev2BindPort, nodev2ExtIp, nodev2ExtPort
)
.tryGet()
builder.build().tryGet()
return Chat2MatterBridge(
mbClient: mbClient,
nodev2: nodev2,
running: false,
pollPeriod: chronos.seconds(1),
contentTopic: contentTopic,
)
proc start*(cmb: Chat2MatterBridge) {.async.} =
info "Starting Chat2MatterBridge"
cmb.running = true
debug "Start polling Matterbridge"
# Start Matterbridge polling (@TODO: use streaming interface)
proc mbHandler(jsonNode: JsonNode) {.async.} =
trace "Bridging message from Matterbridge to chat2", jsonNode = jsonNode
waitFor cmb.toChat2(jsonNode)
asyncSpawn cmb.pollMatterbridge(mbHandler)
# Start Waku v2 node
debug "Start listening on Waku v2"
await cmb.nodev2.start()
# Always mount relay for bridge
# `triggerSelf` is false on a `bridge` to avoid duplicates
await cmb.nodev2.mountRelay()
cmb.nodev2.wakuRelay.triggerSelf = false
# Bridging
# Handle messages on Waku v2 and bridge to Matterbridge
proc relayHandler(
pubsubTopic: PubsubTopic, msg: WakuMessage
): Future[void] {.async.} =
trace "Bridging message from Chat2 to Matterbridge", msg = msg
try:
cmb.toMatterbridge(msg)
except:
error "exception in relayHandler: " & getCurrentExceptionMsg()
cmb.nodev2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
proc stop*(cmb: Chat2MatterBridge) {.async: (raises: [Exception]).} =
info "Stopping Chat2MatterBridge"
cmb.running = false
await cmb.nodev2.stop()
{.pop.}
# @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
when isMainModule:
import waku/common/utils/nat, waku/waku_api/message_cache
let
rng = newRng()
conf = Chat2MatterbridgeConf.load()
if conf.logLevel != LogLevel.NONE:
setLogLevel(conf.logLevel)
let natRes = setupNat(
conf.nat,
clientId,
Port(uint16(conf.libp2pTcpPort) + conf.portsShift),
Port(uint16(conf.udpPort) + conf.portsShift),
)
if natRes.isErr():
error "Error in setupNat", error = natRes.error
# Load address configuration
let
(nodev2ExtIp, nodev2ExtPort, _) = natRes.get()
## The following heuristic assumes that, in absence of manual
## config, the external port is the same as the bind port.
extPort =
if nodev2ExtIp.isSome() and nodev2ExtPort.isNone():
some(Port(uint16(conf.libp2pTcpPort) + conf.portsShift))
else:
nodev2ExtPort
let bridge = Chat2Matterbridge.new(
mbHostUri = "http://" & $initTAddress(conf.mbHostAddress, Port(conf.mbHostPort)),
mbGateway = conf.mbGateway,
nodev2Key = conf.nodekey,
nodev2BindIp = conf.listenAddress,
nodev2BindPort = Port(uint16(conf.libp2pTcpPort) + conf.portsShift),
nodev2ExtIp = nodev2ExtIp,
nodev2ExtPort = extPort,
contentTopic = conf.contentTopic,
)
waitFor bridge.start()
# Now load rest of config
# Mount configured Waku v2 protocols
waitFor mountLibp2pPing(bridge.nodev2)
if conf.store:
waitFor mountStore(bridge.nodev2)
if conf.filter:
waitFor mountFilter(bridge.nodev2)
if conf.staticnodes.len > 0:
waitFor connectToNodes(bridge.nodev2, conf.staticnodes)
if conf.storenode != "":
let storePeer = parsePeerInfo(conf.storenode)
if storePeer.isOk():
bridge.nodev2.peerManager.addServicePeer(storePeer.value, WakuStoreCodec)
else:
error "Error parsing conf.storenode", error = storePeer.error
if conf.filternode != "":
let filterPeer = parsePeerInfo(conf.filternode)
if filterPeer.isOk():
bridge.nodev2.peerManager.addServicePeer(
filterPeer.value, WakuFilterSubscribeCodec
)
else:
error "Error parsing conf.filternode", error = filterPeer.error
if conf.metricsServer:
let
address = conf.metricsServerAddress
port = conf.metricsServerPort + conf.portsShift
info "Starting metrics HTTP server", address, port
startMetricsHttpServer($address, Port(port))
runForever()