2022-11-04 10:52:27 +01:00
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
{.push raises: [].}
2021-07-16 17:13:36 +02:00
2021-05-06 15:43:43 +02:00
2021-05-07 10:05:11 +02:00
std/[tables, times, strutils, hashes, sequtils],
2021-11-10 12:05:36 +00:00
chronos, confutils, chronicles, chronicles/topics_registry, chronos/streams/tlsstream,
2021-06-10 16:18:41 +02:00
metrics, metrics/chronos_httpserver,
2022-11-04 10:52:08 +01:00
2023-12-14 07:16:39 +01:00
2021-05-06 15:43:43 +02:00
# Matterbridge client imports
2022-10-12 14:41:25 -05:00
2021-05-06 15:43:43 +02:00
# Waku v2 imports
2022-09-06 23:54:48 +02:00
2023-08-09 18:11:50 +01:00
2023-09-14 21:28:57 +02:00
2023-08-09 18:11:50 +01:00
2021-05-06 15:43:43 +02:00
# Chat 2 imports
2022-10-12 14:41:25 -05:00
2021-05-06 15:43:43 +02:00
# Common cli config
declarePublicCounter chat2_mb_transfers, "Number of messages transferred between chat2 and Matterbridge", ["type"]
2021-05-07 10:05:11 +02:00
declarePublicCounter chat2_mb_dropped, "Number of messages dropped", ["reason"]
2021-05-06 15:43:43 +02:00
topics = "chat2bridge"
# Default values #
2021-05-07 10:05:11 +02:00
DeduplQSize = 20 # Maximum number of seen messages to keep in deduplication queue
2021-05-06 15:43:43 +02:00
# Types #
Chat2MatterBridge* = ref object of RootObj
mbClient*: MatterbridgeClient
nodev2*: WakuNode
running: bool
pollPeriod: chronos.Duration
2021-05-07 10:05:11 +02:00
seen: seq[Hash] #FIFO queue
2021-05-26 15:48:09 +02:00
contentTopic: string
2023-02-10 10:43:16 +01:00
2023-12-14 07:16:39 +01:00
MbMessageHandler = proc (jsonNode: JsonNode) {.async.}
2021-05-06 15:43:43 +02:00
# Helper funtions #
2021-05-07 10:05:11 +02:00
proc containsOrAdd(sequence: var seq[Hash], hash: Hash): bool =
if sequence.contains(hash):
2023-02-10 10:43:16 +01:00
return true
2021-05-07 10:05:11 +02:00
if sequence.len >= DeduplQSize:
trace "Deduplication queue full. Removing oldest item."
sequence.delete 0, 0 # Remove first item in queue
2023-02-10 10:43:16 +01:00
2021-05-07 10:05:11 +02:00
return false
2021-06-09 16:37:08 +02:00
proc toWakuMessage(cmb: Chat2MatterBridge, jsonNode: JsonNode): WakuMessage {.raises: [Defect, KeyError]} =
2021-05-06 15:43:43 +02:00
# Translates a Matterbridge API JSON response to a Waku v2 message
let msgFields = jsonNode.getFields()
# @TODO error handling here - verify expected fields
let chat2pb = Chat2Message(timestamp: getTime().toUnix(), # @TODO use provided timestamp
nick: msgFields["username"].getStr(),
payload: msgFields["text"].getStr().toBytes()).encode()
WakuMessage(payload: chat2pb.buffer,
2021-05-26 15:48:09 +02:00
contentTopic: cmb.contentTopic,
2021-05-06 15:43:43 +02:00
version: 0)
proc toChat2(cmb: Chat2MatterBridge, jsonNode: JsonNode) {.async.} =
2021-05-26 15:48:09 +02:00
let msg = cmb.toWakuMessage(jsonNode)
2021-05-07 10:05:11 +02:00
if cmb.seen.containsOrAdd(msg.payload.hash()):
# This is a duplicate message. Return.
chat2_mb_dropped.inc(labelValues = ["duplicate"])
2021-05-06 15:43:43 +02:00
trace "Post Matterbridge message to chat2"
2023-02-10 10:43:16 +01:00
2021-05-07 10:05:11 +02:00
chat2_mb_transfers.inc(labelValues = ["mb_to_chat2"])
2021-05-06 15:43:43 +02:00
2023-09-26 07:33:52 -04:00
await cmb.nodev2.publish(some(DefaultPubsubTopic), msg)
2021-05-06 15:43:43 +02:00
2021-06-09 16:37:08 +02:00
proc toMatterbridge(cmb: Chat2MatterBridge, msg: WakuMessage) {.gcsafe, raises: [Exception].} =
2021-05-07 10:05:11 +02:00
if cmb.seen.containsOrAdd(msg.payload.hash()):
# This is a duplicate message. Return.
chat2_mb_dropped.inc(labelValues = ["duplicate"])
2021-05-06 15:43:43 +02:00
2021-06-02 16:54:38 +02:00
if msg.contentTopic != cmb.contentTopic:
# Only bridge messages on the configured content topic
chat2_mb_dropped.inc(labelValues = ["filtered"])
2021-05-06 15:43:43 +02:00
trace "Post chat2 message to Matterbridge"
2021-05-07 10:05:11 +02:00
chat2_mb_transfers.inc(labelValues = ["chat2_to_mb"])
2021-05-06 15:43:43 +02:00
let chat2Msg = Chat2Message.init(msg.payload)
assert chat2Msg.isOk
2021-07-16 17:13:36 +02:00
let postRes = cmb.mbClient.postMessage(text = string.fromBytes(chat2Msg[].payload),
username = chat2Msg[].nick)
2023-02-10 10:43:16 +01:00
2021-07-16 17:13:36 +02:00
if postRes.isErr() or (postRes[] == false):
2021-05-07 10:05:11 +02:00
chat2_mb_dropped.inc(labelValues = ["duplicate"])
error "Matterbridge host unreachable. Dropping message."
2021-05-06 15:43:43 +02:00
proc pollMatterbridge(cmb: Chat2MatterBridge, handler: MbMessageHandler) {.async.} =
while cmb.running:
2021-07-16 17:13:36 +02:00
let getRes = cmb.mbClient.getMessages()
if getRes.isOk():
for jsonNode in getRes[]:
2023-12-14 07:16:39 +01:00
await handler(jsonNode)
2021-07-16 17:13:36 +02:00
2021-05-07 10:05:11 +02:00
error "Matterbridge host unreachable. Sleeping before retrying."
await sleepAsync(chronos.seconds(10))
2021-05-06 15:43:43 +02:00
await sleepAsync(cmb.pollPeriod)
# Public API #
proc new*(T: type Chat2MatterBridge,
# Matterbridge initialisation
mbHostUri: string,
mbGateway: string,
# NodeV2 initialisation
nodev2Key: crypto.PrivateKey,
2023-12-14 07:16:39 +01:00
nodev2BindIp: IpAddress, nodev2BindPort: Port,
nodev2ExtIp = none[IpAddress](), nodev2ExtPort = none[Port](),
2021-07-16 17:13:36 +02:00
contentTopic: string): T
2021-11-10 12:05:36 +00:00
{.raises: [Defect, ValueError, KeyError, TLSStreamProtocolError, IOError, LPError].} =
2021-05-06 15:43:43 +02:00
2023-02-10 10:43:16 +01:00
# Setup Matterbridge
2021-05-06 15:43:43 +02:00
mbClient = MatterbridgeClient.new(mbHostUri, mbGateway)
2023-02-10 10:43:16 +01:00
2021-05-07 10:05:11 +02:00
# Let's verify the Matterbridge configuration before continuing
2021-07-16 17:13:36 +02:00
let clientHealth = mbClient.isHealthy()
if clientHealth.isOk() and clientHealth[]:
info "Reached Matterbridge host", host=mbClient.host
raise newException(ValueError, "Matterbridge client not reachable/healthy")
2021-05-06 15:43:43 +02:00
# Setup Waku v2 node
2023-04-05 14:27:11 +02:00
let nodev2 = block:
var builder = WakuNodeBuilder.init()
builder.withNetworkConfigurationDetails(nodev2BindIp, nodev2BindPort, nodev2ExtIp, nodev2ExtPort).tryGet()
2023-02-10 10:43:16 +01:00
2021-05-26 15:48:09 +02:00
return Chat2MatterBridge(mbClient: mbClient,
nodev2: nodev2,
running: false,
pollPeriod: chronos.seconds(1),
contentTopic: contentTopic)
2021-05-06 15:43:43 +02:00
proc start*(cmb: Chat2MatterBridge) {.async.} =
info "Starting Chat2MatterBridge"
cmb.running = true
debug "Start polling Matterbridge"
2023-02-10 10:43:16 +01:00
2021-05-06 15:43:43 +02:00
# Start Matterbridge polling (@TODO: use streaming interface)
2023-12-14 07:16:39 +01:00
proc mbHandler(jsonNode: JsonNode) {.async.} =
2021-05-06 15:43:43 +02:00
trace "Bridging message from Matterbridge to chat2", jsonNode=jsonNode
waitFor cmb.toChat2(jsonNode)
2023-02-10 10:43:16 +01:00
2021-05-06 15:43:43 +02:00
asyncSpawn cmb.pollMatterbridge(mbHandler)
2023-02-10 10:43:16 +01:00
2021-05-06 15:43:43 +02:00
# Start Waku v2 node
debug "Start listening on Waku v2"
await cmb.nodev2.start()
2023-02-10 10:43:16 +01:00
2021-05-24 13:19:33 +02:00
# Always mount relay for bridge
# `triggerSelf` is false on a `bridge` to avoid duplicates
2023-06-06 19:28:47 +02:00
await cmb.nodev2.mountRelay()
cmb.nodev2.wakuRelay.triggerSelf = false
2021-05-06 15:43:43 +02:00
# Bridging
# Handle messages on Waku v2 and bridge to Matterbridge
2023-12-14 07:16:39 +01:00
proc relayHandler(pubsubTopic: PubsubTopic, msg: WakuMessage): Future[void] {.async.} =
2023-06-06 19:28:47 +02:00
trace "Bridging message from Chat2 to Matterbridge", msg=msg
2023-12-14 07:16:39 +01:00
error "exception in relayHandler: " & getCurrentExceptionMsg()
2023-02-10 10:43:16 +01:00
2023-09-26 07:33:52 -04:00
cmb.nodev2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
2021-05-06 15:43:43 +02:00
2023-12-14 07:16:39 +01:00
proc stop*(cmb: Chat2MatterBridge) {.async: (raises: [Exception]).} =
2021-05-06 15:43:43 +02:00
info "Stopping Chat2MatterBridge"
2023-02-10 10:43:16 +01:00
2021-05-06 15:43:43 +02:00
cmb.running = false
await cmb.nodev2.stop()
2021-07-16 17:13:36 +02:00
{.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
2021-05-06 15:43:43 +02:00
when isMainModule:
2023-09-22 09:36:46 -04:00
../../waku/waku_api/jsonrpc/debug/handlers as debug_api,
../../waku/waku_api/jsonrpc/filter/handlers as filter_api,
../../waku/waku_api/jsonrpc/relay/handlers as relay_api,
../../waku/waku_api/jsonrpc/store/handlers as store_api
2023-02-10 10:43:16 +01:00
2021-05-06 15:43:43 +02:00
2021-06-09 16:37:08 +02:00
proc startV2Rpc(node: WakuNode, rpcServer: RpcHttpServer, conf: Chat2MatterbridgeConf) {.raises: [Exception].} =
2021-05-06 15:43:43 +02:00
installDebugApiHandlers(node, rpcServer)
# Install enabled API handlers:
if conf.relay:
2023-11-28 07:21:41 -05:00
let cache = MessageCache.init(capacity=30)
2023-09-26 07:33:52 -04:00
installRelayApiHandlers(node, rpcServer, cache)
2023-02-10 10:43:16 +01:00
2021-05-06 15:43:43 +02:00
if conf.filter:
2023-11-28 07:21:41 -05:00
let messageCache = MessageCache.init(capacity=30)
2021-05-06 15:43:43 +02:00
installFilterApiHandlers(node, rpcServer, messageCache)
2023-02-10 10:43:16 +01:00
2021-05-06 15:43:43 +02:00
if conf.store:
installStoreApiHandlers(node, rpcServer)
2023-02-10 10:43:16 +01:00
2021-05-06 15:43:43 +02:00
2023-02-10 10:43:16 +01:00
2021-05-06 15:43:43 +02:00
rng = newRng()
conf = Chat2MatterbridgeConf.load()
2023-02-10 10:43:16 +01:00
2021-05-06 15:43:43 +02:00
if conf.logLevel != LogLevel.NONE:
2023-05-17 18:32:53 +02:00
let natRes = setupNat(conf.nat, clientId,
Port(uint16(conf.libp2pTcpPort) + conf.portsShift),
Port(uint16(conf.udpPort) + conf.portsShift))
if natRes.isErr():
error "Error in setupNat", error = natRes.error
2021-05-06 15:43:43 +02:00
# Load address configuration
2023-05-17 18:32:53 +02:00
(nodev2ExtIp, nodev2ExtPort, _) = natRes.get()
2021-08-30 11:01:28 +02:00
## The following heuristic assumes that, in absence of manual
## config, the external port is the same as the bind port.
extPort = if nodev2ExtIp.isSome() and nodev2ExtPort.isNone():
some(Port(uint16(conf.libp2pTcpPort) + conf.portsShift))
2021-05-06 15:43:43 +02:00
bridge = Chat2Matterbridge.new(
2021-05-07 10:05:11 +02:00
mbHostUri = "http://" & $initTAddress(conf.mbHostAddress, Port(conf.mbHostPort)),
2021-05-06 15:43:43 +02:00
mbGateway = conf.mbGateway,
2021-05-10 09:34:43 +02:00
nodev2Key = conf.nodekey,
2021-05-06 15:43:43 +02:00
nodev2BindIp = conf.listenAddress, nodev2BindPort = Port(uint16(conf.libp2pTcpPort) + conf.portsShift),
2021-08-30 11:01:28 +02:00
nodev2ExtIp = nodev2ExtIp, nodev2ExtPort = extPort,
2021-05-26 15:48:09 +02:00
contentTopic = conf.contentTopic)
2023-02-10 10:43:16 +01:00
2021-05-06 15:43:43 +02:00
waitFor bridge.start()
# Now load rest of config
# Mount configured Waku v2 protocols
2022-09-07 16:31:27 +01:00
waitFor mountLibp2pPing(bridge.nodev2)
2021-06-02 16:54:38 +02:00
2021-05-06 15:43:43 +02:00
if conf.store:
2022-09-07 16:31:27 +01:00
waitFor mountStore(bridge.nodev2)
2021-05-06 15:43:43 +02:00
if conf.filter:
2022-09-07 16:31:27 +01:00
waitFor mountFilter(bridge.nodev2)
2021-05-06 15:43:43 +02:00
2021-05-10 09:34:43 +02:00
if conf.staticnodes.len > 0:
waitFor connectToNodes(bridge.nodev2, conf.staticnodes)
2021-05-06 15:43:43 +02:00
if conf.storenode != "":
2023-04-12 11:29:11 +02:00
let storePeer = parsePeerInfo(conf.storenode)
if storePeer.isOk():
bridge.nodev2.peerManager.addServicePeer(storePeer.value, WakuStoreCodec)
error "Error parsing conf.storenode", error = storePeer.error
2021-05-06 15:43:43 +02:00
if conf.filternode != "":
2023-04-12 11:29:11 +02:00
let filterPeer = parsePeerInfo(conf.filternode)
if filterPeer.isOk():
2023-09-14 21:28:57 +02:00
bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuLegacyFilterCodec)
bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuFilterSubscribeCodec)
2023-04-12 11:29:11 +02:00
error "Error parsing conf.filternode", error = filterPeer.error
2021-05-06 15:43:43 +02:00
if conf.rpc:
let ta = initTAddress(conf.rpcAddress,
Port(conf.rpcPort + conf.portsShift))
var rpcServer = newRpcHttpServer([ta])
# Waku v2 rpc
startV2Rpc(bridge.nodev2, rpcServer, conf)
2021-06-10 16:18:41 +02:00
if conf.metricsServer:
address = conf.metricsServerAddress
port = conf.metricsServerPort + conf.portsShift
info "Starting metrics HTTP server", address, port
startMetricsHttpServer($address, Port(port))
2021-05-06 15:43:43 +02:00