spiffworkflow-connector/app.py

311 lines
10 KiB
Python
Raw Normal View History

import importlib
import inspect
2022-09-09 17:39:56 +00:00
import json
2022-09-26 19:42:45 +00:00
import os
import pkgutil
import re
import types
import typing
2022-09-09 17:39:56 +00:00
from flask import Flask
from flask import redirect
from flask import request
from flask import Response
from flask import session
from flask import url_for
from flask_oauthlib.contrib.client import OAuth
2022-09-09 17:39:56 +00:00
2022-09-09 17:39:56 +00:00
app = Flask(__name__)
app.config.from_pyfile("config.py", silent=True)
if app.config["ENV"] != "production":
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
2022-09-09 17:39:56 +00:00
@app.before_first_request
def load_plugins():
print("load the plugins once here?")
2022-09-09 17:39:56 +00:00
@app.route("/liveness")
def status():
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
def list_targets(targets):
2022-09-09 17:39:56 +00:00
descriptions = []
for plugin_name, plugin_targets in targets.items():
for target_name, target in plugin_targets.items():
description = PluginService.describe_target(
plugin_name, target_name, target
)
2022-09-09 17:39:56 +00:00
descriptions.append(description)
return Response(json.dumps(descriptions), status=200, mimetype="application/json")
2022-09-09 17:39:56 +00:00
@app.route("/v1/auths")
def list_auths():
return list_targets(PluginService.available_auths_by_plugin())
@app.route("/v1/commands")
def list_commands():
return list_targets(PluginService.available_commands_by_plugin())
def auth_handler(plugin_display_name, auth_name, params):
auth = PluginService.auth_named(plugin_display_name, auth_name)
if auth is not None:
app_description = auth().app_description(app.config)
# TODO right now this assumes Oauth.
# would need to expand if other auth providers are used
handler = OAuth(app).remote_app(**app_description)
@handler.tokengetter
def tokengetter():
pass
@handler.tokensaver
def tokensaver(token):
pass
return handler
@app.route("/v1/auth/<plugin_display_name>/<auth_name>")
def do_auth(plugin_display_name, auth_name):
params = request.args.to_dict()
our_redirect_url = params["redirect_url"]
session["redirect_url"] = our_redirect_url
handler = auth_handler(plugin_display_name, auth_name, params)
if handler is None:
return Response("Auth not found", status=404)
# TODO factor into handler
# TODO namespace the keys
session["client_id"] = app.config["XERO_CLIENT_ID"]
session["client_secret"] = app.config["XERO_CLIENT_SECRET"]
oauth_redirect_url = url_for(
"auth_callback",
plugin_display_name=plugin_display_name,
auth_name=auth_name,
_external=True,
)
return handler.authorize(callback_uri=oauth_redirect_url)
@app.route("/v1/auth/<plugin_display_name>/<auth_name>/callback")
def auth_callback(plugin_display_name, auth_name):
handler = auth_handler(plugin_display_name, auth_name, session)
if handler is None:
return Response("Auth not found", status=404)
response = json.dumps(handler.authorized_response())
redirect_url = session["redirect_url"]
# TODO compare redirect_url to whitelist
redirect_url_params_symbol = "?"
if re.match(r".*\?.*", redirect_url):
redirect_url_params_symbol = "&"
return redirect(f"{redirect_url}{redirect_url_params_symbol}response={response}")
@app.route("/v1/do/<plugin_display_name>/<command_name>", methods=["GET", "POST"])
2022-09-09 19:08:21 +00:00
def do_command(plugin_display_name, command_name):
command = PluginService.command_named(plugin_display_name, command_name)
if command is None:
return json_error_response(
f"Command not found: {plugin_display_name}:{command_name}", status=404
)
2022-09-09 19:08:21 +00:00
params = request.json
task_data = params.pop("spiff__task_data", "{}")
try:
result = command(**params).execute(app.config, task_data)
except Exception as e:
return json_error_response(
f"Error encountered when executing {plugin_display_name}:{command_name} {str(e)}",
status=404,
)
return Response(result["response"], mimetype=result["mimetype"], status=200)
def json_error_response(message, status):
resp = {"error": message, "status": status}
return Response(json.dumps(resp), status=status)
2022-09-09 19:08:21 +00:00
2022-09-09 17:39:56 +00:00
class PluginService:
PLUGIN_PREFIX = "connector_"
2022-09-09 17:39:56 +00:00
@staticmethod
def plugin_display_name(plugin_name):
return plugin_name.removeprefix(PluginService.PLUGIN_PREFIX)
2022-09-09 19:08:21 +00:00
@staticmethod
def plugin_name_from_display_name(plugin_display_name):
return PluginService.PLUGIN_PREFIX + plugin_display_name
2022-09-09 17:39:56 +00:00
@staticmethod
def available_plugins():
return {
name: importlib.import_module(name)
for finder, name, ispkg in pkgutil.iter_modules()
2022-09-09 17:39:56 +00:00
if name.startswith(PluginService.PLUGIN_PREFIX)
}
@staticmethod
def available_auths_by_plugin():
return {
plugin_name: {
auth_name: auth
for auth_name, auth in PluginService.auths_for_plugin(
plugin_name, plugin
)
}
for plugin_name, plugin in PluginService.available_plugins().items()
}
2022-09-09 17:39:56 +00:00
@staticmethod
def available_commands_by_plugin():
return {
plugin_name: {
command_name: command
for command_name, command in PluginService.commands_for_plugin(
plugin_name, plugin
)
}
for plugin_name, plugin in PluginService.available_plugins().items()
2022-09-09 17:39:56 +00:00
}
2022-09-09 19:08:21 +00:00
@staticmethod
def target_id(plugin_name, target_name):
2022-09-09 19:08:21 +00:00
plugin_display_name = PluginService.plugin_display_name(plugin_name)
return f"{plugin_display_name}/{target_name}"
@staticmethod
def auth_named(plugin_display_name, auth_name):
plugin_name = PluginService.plugin_name_from_display_name(plugin_display_name)
available_auths_by_plugin = PluginService.available_auths_by_plugin()
try:
return available_auths_by_plugin[plugin_name][auth_name]
except Exception:
return None
2022-09-09 19:08:21 +00:00
@staticmethod
def command_named(plugin_display_name, command_name):
plugin_name = PluginService.plugin_name_from_display_name(plugin_display_name)
available_commands_by_plugin = PluginService.available_commands_by_plugin()
try:
return available_commands_by_plugin[plugin_name][command_name]
except Exception:
2022-09-09 19:08:21 +00:00
return None
2022-09-09 17:39:56 +00:00
@staticmethod
def modules_for_plugin_in_package(plugin, package_name):
2022-09-09 17:39:56 +00:00
for finder, name, ispkg in pkgutil.iter_modules(plugin.__path__):
if ispkg and name == package_name:
2022-09-09 17:39:56 +00:00
sub_pkg = finder.find_module(name).load_module(name)
yield from PluginService.modules_for_plugin_in_package(sub_pkg, None)
2022-09-09 17:39:56 +00:00
else:
spec = finder.find_spec(name)
if spec is not None and spec.loader is not None:
module = types.ModuleType(spec.name)
spec.loader.exec_module(module)
yield name, module
@staticmethod
def targets_for_plugin(plugin_name, plugin, target_package_name):
for module_name, module in PluginService.modules_for_plugin_in_package(
plugin, target_package_name
):
2022-09-13 15:11:34 +00:00
for member_name, member in inspect.getmembers(module, inspect.isclass):
2022-09-09 17:39:56 +00:00
if member.__module__ == module_name:
yield member_name, member
@staticmethod
def auths_for_plugin(plugin_name, plugin):
yield from PluginService.targets_for_plugin(plugin_name, plugin, "auths")
@staticmethod
def commands_for_plugin(plugin_name, plugin):
# TODO check if class has an execute method before yielding
yield from PluginService.targets_for_plugin(plugin_name, plugin, "commands")
2022-09-09 19:08:21 +00:00
@staticmethod
def param_annotation_desc(param):
"""Parses a callable parameter's type annotation, if any, to form a ParameterDescription."""
param_id = param.name
param_type_desc = "any"
none_type = type(None)
supported_types = {str, int, bool, none_type}
unsupported_type_marker = object
annotation = param.annotation
if annotation in supported_types:
annotation_types = {annotation}
else:
# an annotation can have more than one type in the case of a union
# get_args normalizes Union[str, dict] to (str, dict)
# get_args normalizes Optional[str] to (str, none)
# all unsupported types are marked so (str, dict) -> (str, unsupported)
# the absense of a type annotation results in an empty set
annotation_types = set(
map(
lambda t: t if t in supported_types else unsupported_type_marker,
2022-09-13 00:45:44 +00:00
typing.get_args(annotation),
2022-09-09 19:08:21 +00:00
)
)
# a parameter is required if it has no default value and none is not in its type set
param_req = param.default is param.empty and none_type not in annotation_types
# the none type from a union is used for requiredness, but needs to be discarded
# to single out the optional type
annotation_types.discard(none_type)
# if we have a single supported type use that, else any is the default
if len(annotation_types) == 1:
annotation_type = annotation_types.pop()
if annotation_type in supported_types:
param_type_desc = annotation_type.__name__
return {"id": param_id, "type": param_type_desc, "required": param_req}
@staticmethod
def callable_params_desc(kallable):
sig = inspect.signature(kallable)
params_to_skip = ["self", "kwargs"]
2022-09-09 19:08:21 +00:00
sig_params = filter(
lambda param: param.name not in params_to_skip, sig.parameters.values()
)
params = [PluginService.param_annotation_desc(param) for param in sig_params]
2022-09-09 19:08:21 +00:00
return params
@staticmethod
def describe_target(plugin_name, target_name, target):
parameters = PluginService.callable_params_desc(target.__init__)
target_id = PluginService.target_id(plugin_name, target_name)
return {"id": target_id, "parameters": parameters}
if __name__ == "__main__":
app.run(host="localhost", port=5000)