spiffworkflow-connector/app.py

271 lines
9.5 KiB
Python
Raw Normal View History

2022-09-09 17:39:56 +00:00
import json
from flask import Flask, Response, redirect, request, session, url_for
from flask_oauthlib.contrib.client import OAuth
2022-09-09 17:39:56 +00:00
app = Flask(__name__)
app.config.from_pyfile('config.py', silent=True)
if app.config['ENV'] != 'production':
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
2022-09-09 17:39:56 +00:00
@app.before_first_request
def load_plugins():
print('load the plugins once here?')
@app.route('/liveness')
def status():
return Response(json.dumps({"ok": True}), status=200, mimetype='application/json')
def list_targets(targets):
2022-09-09 17:39:56 +00:00
descriptions = []
for plugin_name, plugin_targets in targets.items():
for target_name, target in plugin_targets.items():
description = PluginService.describe_target(plugin_name, target_name, target)
2022-09-09 17:39:56 +00:00
descriptions.append(description)
return Response(json.dumps(descriptions), status=200, mimetype='application/json')
@app.route('/v1/auths')
def list_auths():
return list_targets(PluginService.available_auths_by_plugin())
@app.route('/v1/commands')
def list_commands():
return list_targets(PluginService.available_commands_by_plugin())
def auth_handler(plugin_display_name, auth_name, params):
auth = PluginService.auth_named(plugin_display_name, auth_name)
if auth is not None:
handler_params = auth.filtered_params(params)
app_description = auth(**handler_params).app_description()
# TODO right now this assumes Oauth.
# would need to expand if other auth providers are used
handler = OAuth(app).remote_app(**app_description)
@handler.tokengetter
def tokengetter():
pass
@handler.tokensaver
def tokensaver(token):
pass
return handler
@app.route('/v1/auth/<plugin_display_name>/<auth_name>')
def do_auth(plugin_display_name, auth_name):
params = request.args.to_dict()
our_redirect_url = params['redirect_url']
session['redirect_url'] = our_redirect_url
handler = auth_handler(plugin_display_name, auth_name, params)
if handler is None:
return Response('Auth not found', status=404)
# TODO factor into handler
# TODO namespace the keys
session['client_id'] = params['client_id']
session['client_secret'] = params['client_secret']
oauth_redirect_url = url_for('auth_callback', plugin_display_name=plugin_display_name, auth_name=auth_name, _external=True)
return handler.authorize(callback_uri=oauth_redirect_url)
@app.route('/v1/auth/<plugin_display_name>/<auth_name>/callback')
def auth_callback(plugin_display_name, auth_name):
handler = auth_handler(plugin_display_name, auth_name, session)
if handler is None:
return Response('Auth not found', status=404)
response = json.dumps(handler.authorized_response())
redirect_url = session['redirect_url']
# TODO compare redirect_url to whitelist
return redirect(f'{redirect_url}?response={response}')
2022-09-09 19:08:21 +00:00
@app.route('/v1/do/<plugin_display_name>/<command_name>')
def do_command(plugin_display_name, command_name):
command = PluginService.command_named(plugin_display_name, command_name)
if command is None:
return Response('Command not found', status=404)
params = request.args.to_dict()
2022-09-13 15:11:34 +00:00
result = command(**params).execute()
2022-09-09 19:08:21 +00:00
return Response(result['response'], status=result['status'], mimetype=result['mimetype'])
# TODO move out to own home
2022-09-09 17:39:56 +00:00
import importlib
import inspect
import pkgutil
import types
2022-09-13 00:45:44 +00:00
import typing
2022-09-09 17:39:56 +00:00
class PluginService:
2022-09-13 14:43:43 +00:00
PLUGIN_PREFIX = 'connector_'
2022-09-09 17:39:56 +00:00
@staticmethod
def plugin_display_name(plugin_name):
return plugin_name.removeprefix(PluginService.PLUGIN_PREFIX)
2022-09-09 19:08:21 +00:00
@staticmethod
def plugin_name_from_display_name(plugin_display_name):
return PluginService.PLUGIN_PREFIX + plugin_display_name
2022-09-09 17:39:56 +00:00
@staticmethod
def available_plugins():
return {
name: importlib.import_module(name)
for finder, name, ispkg
in pkgutil.iter_modules()
if name.startswith(PluginService.PLUGIN_PREFIX)
}
@staticmethod
def available_auths_by_plugin():
return {
plugin_name: {
auth_name: auth
for auth_name, auth
in PluginService.auths_for_plugin(plugin_name, plugin)
}
for plugin_name, plugin
in PluginService.available_plugins().items()
}
2022-09-09 17:39:56 +00:00
@staticmethod
def available_commands_by_plugin():
return {
2022-09-09 19:08:21 +00:00
plugin_name: {
command_name: command
for command_name, command
in PluginService.commands_for_plugin(plugin_name, plugin)
2022-09-09 19:08:21 +00:00
}
for plugin_name, plugin
in PluginService.available_plugins().items()
2022-09-09 17:39:56 +00:00
}
2022-09-09 19:08:21 +00:00
@staticmethod
def target_id(plugin_name, target_name):
2022-09-09 19:08:21 +00:00
plugin_display_name = PluginService.plugin_display_name(plugin_name)
return f'{plugin_display_name}/{target_name}'
@staticmethod
def auth_named(plugin_display_name, auth_name):
plugin_name = PluginService.plugin_name_from_display_name(plugin_display_name)
available_auths_by_plugin = PluginService.available_auths_by_plugin()
try:
return available_auths_by_plugin[plugin_name][auth_name]
except:
return None
2022-09-09 19:08:21 +00:00
@staticmethod
def command_named(plugin_display_name, command_name):
plugin_name = PluginService.plugin_name_from_display_name(plugin_display_name)
available_commands_by_plugin = PluginService.available_commands_by_plugin()
try:
return available_commands_by_plugin[plugin_name][command_name]
except:
return None
2022-09-09 17:39:56 +00:00
@staticmethod
def modules_for_plugin_in_package(plugin, package_name):
2022-09-09 17:39:56 +00:00
for finder, name, ispkg in pkgutil.iter_modules(plugin.__path__):
if ispkg and name == package_name:
2022-09-09 17:39:56 +00:00
sub_pkg = finder.find_module(name).load_module(name)
yield from PluginService.modules_for_plugin_in_package(sub_pkg, None)
2022-09-09 17:39:56 +00:00
else:
spec = finder.find_spec(name)
if spec is not None and spec.loader is not None:
module = types.ModuleType(spec.name)
spec.loader.exec_module(module)
yield name, module
@staticmethod
def targets_for_plugin(plugin_name, plugin, target_package_name):
for module_name, module in PluginService.modules_for_plugin_in_package(plugin, target_package_name):
2022-09-13 15:11:34 +00:00
for member_name, member in inspect.getmembers(module, inspect.isclass):
2022-09-09 17:39:56 +00:00
if member.__module__ == module_name:
yield member_name, member
@staticmethod
def auths_for_plugin(plugin_name, plugin):
yield from PluginService.targets_for_plugin(plugin_name, plugin, 'auths')
@staticmethod
def commands_for_plugin(plugin_name, plugin):
# TODO check if class has an execute method before yielding
yield from PluginService.targets_for_plugin(plugin_name, plugin, 'commands')
2022-09-09 19:08:21 +00:00
@staticmethod
def param_annotation_desc(param):
"""Parses a callable parameter's type annotation, if any, to form a ParameterDescription."""
param_id = param.name
param_type_desc = "any"
none_type = type(None)
supported_types = {str, int, bool, none_type}
unsupported_type_marker = object
annotation = param.annotation
if annotation in supported_types:
annotation_types = {annotation}
else:
# an annotation can have more than one type in the case of a union
# get_args normalizes Union[str, dict] to (str, dict)
# get_args normalizes Optional[str] to (str, none)
# all unsupported types are marked so (str, dict) -> (str, unsupported)
# the absense of a type annotation results in an empty set
annotation_types = set(
map(
lambda t: t if t in supported_types else unsupported_type_marker,
2022-09-13 00:45:44 +00:00
typing.get_args(annotation),
2022-09-09 19:08:21 +00:00
)
)
# a parameter is required if it has no default value and none is not in its type set
param_req = param.default is param.empty and none_type not in annotation_types
# the none type from a union is used for requiredness, but needs to be discarded
# to single out the optional type
annotation_types.discard(none_type)
# if we have a single supported type use that, else any is the default
if len(annotation_types) == 1:
annotation_type = annotation_types.pop()
if annotation_type in supported_types:
param_type_desc = annotation_type.__name__
return {"id": param_id, "type": param_type_desc, "required": param_req}
@staticmethod
def callable_params_desc(kallable):
sig = inspect.signature(kallable)
2022-09-13 00:45:44 +00:00
params_to_skip = ['self', 'kwargs']
2022-09-09 19:08:21 +00:00
sig_params = filter(
lambda param: param.name not in params_to_skip, sig.parameters.values()
)
params = [
PluginService.param_annotation_desc(param) for param in sig_params
]
return params
@staticmethod
def describe_target(plugin_name, target_name, target):
parameters = PluginService.callable_params_desc(target.__init__)
target_id = PluginService.target_id(plugin_name, target_name)
return {'id': target_id, 'parameters': parameters}
2022-09-09 17:39:56 +00:00
if __name__ == '__main__':
2022-09-13 00:45:44 +00:00
app.run(host='localhost', port=5000)