This commit is contained in:
Jon Herron 2022-09-09 15:08:21 -04:00
parent 5402b882d2
commit 7eb4c5c29c
1 changed files with 97 additions and 7 deletions

104
app.py
View File

@ -1,6 +1,6 @@
import json import json
from flask import Flask, Response from flask import Flask, Response, request
app = Flask(__name__) app = Flask(__name__)
@ -11,21 +11,33 @@ def load_plugins():
@app.route('/v1/commands') @app.route('/v1/commands')
def list_commands(): def list_commands():
def describe_command(plugin_name, command_name, command): def describe_command(plugin_name, command_name, command):
parameters = [] parameters = PluginService.command_params_desc(command)
plugin_display_name = PluginService.plugin_display_name(plugin_name) plugin_display_name = PluginService.plugin_display_name(plugin_name)
return { 'id': f'{plugin_display_name}/{command_name}', 'parameters': parameters } command_id = PluginService.command_id(plugin_name, command_name)
return { 'id': command_id, 'parameters': parameters }
commands_by_plugin = PluginService.available_commands_by_plugin() commands_by_plugin = PluginService.available_commands_by_plugin()
descriptions = [] descriptions = []
for plugin_name, commands in commands_by_plugin.items(): for plugin_name, commands in commands_by_plugin.items():
for command_name, command in commands: for command_name, command in commands.items():
description = describe_command(plugin_name, command_name, command) description = describe_command(plugin_name, command_name, command)
descriptions.append(description) descriptions.append(description)
return Response(json.dumps(descriptions), status=200, mimetype='application/json') return Response(json.dumps(descriptions), status=200, mimetype='application/json')
# TODO move out to own file @app.route('/v1/do/<plugin_display_name>/<command_name>')
def do_command(plugin_display_name, command_name):
command = PluginService.command_named(plugin_display_name, command_name)
if command is None:
return Response('Command not found', status=404)
params = request.args.to_dict()
result = command(**params)
return Response(result['response'], status=result['status'], mimetype=result['mimetype'])
# TODO move out to own home
import importlib import importlib
import inspect import inspect
import pkgutil import pkgutil
@ -37,6 +49,10 @@ class PluginService:
@staticmethod @staticmethod
def plugin_display_name(plugin_name): def plugin_display_name(plugin_name):
return plugin_name.removeprefix(PluginService.PLUGIN_PREFIX) return plugin_name.removeprefix(PluginService.PLUGIN_PREFIX)
@staticmethod
def plugin_name_from_display_name(plugin_display_name):
return PluginService.PLUGIN_PREFIX + plugin_display_name
@staticmethod @staticmethod
def available_plugins(): def available_plugins():
@ -50,11 +66,30 @@ class PluginService:
@staticmethod @staticmethod
def available_commands_by_plugin(): def available_commands_by_plugin():
return { return {
name: list(PluginService.commands_for_plugin(name, plugin)) plugin_name: {
for name, plugin command_name: command
for command_name, command
in PluginService.commands_for_plugin(plugin_name, plugin)
}
for plugin_name, plugin
in PluginService.available_plugins().items() in PluginService.available_plugins().items()
} }
@staticmethod
def command_id(plugin_name, command_name):
plugin_display_name = PluginService.plugin_display_name(plugin_name)
return f'{plugin_display_name}/{command_name}'
@staticmethod
def command_named(plugin_display_name, command_name):
plugin_name = PluginService.plugin_name_from_display_name(plugin_display_name)
available_commands_by_plugin = PluginService.available_commands_by_plugin()
try:
return available_commands_by_plugin[plugin_name][command_name]
except:
return None
@staticmethod @staticmethod
def modules_for_plugin(plugin): def modules_for_plugin(plugin):
for finder, name, ispkg in pkgutil.iter_modules(plugin.__path__): for finder, name, ispkg in pkgutil.iter_modules(plugin.__path__):
@ -75,5 +110,60 @@ class PluginService:
if member.__module__ == module_name: if member.__module__ == module_name:
yield member_name, member yield member_name, member
@staticmethod
def param_annotation_desc(param):
"""Parses a callable parameter's type annotation, if any, to form a ParameterDescription."""
param_id = param.name
param_type_desc = "any"
none_type = type(None)
supported_types = {str, int, bool, none_type}
unsupported_type_marker = object
annotation = param.annotation
if annotation in supported_types:
annotation_types = {annotation}
else:
# an annotation can have more than one type in the case of a union
# get_args normalizes Union[str, dict] to (str, dict)
# get_args normalizes Optional[str] to (str, none)
# all unsupported types are marked so (str, dict) -> (str, unsupported)
# the absense of a type annotation results in an empty set
annotation_types = set(
map(
lambda t: t if t in supported_types else unsupported_type_marker,
get_args(annotation),
)
)
# a parameter is required if it has no default value and none is not in its type set
param_req = param.default is param.empty and none_type not in annotation_types
# the none type from a union is used for requiredness, but needs to be discarded
# to single out the optional type
annotation_types.discard(none_type)
# if we have a single supported type use that, else any is the default
if len(annotation_types) == 1:
annotation_type = annotation_types.pop()
if annotation_type in supported_types:
param_type_desc = annotation_type.__name__
return {"id": param_id, "type": param_type_desc, "required": param_req}
@staticmethod
def command_params_desc(command):
sig = inspect.signature(command)
params_to_skip = ['self', 'kwargs'] # TODO no self if staying with function
sig_params = filter(
lambda param: param.name not in params_to_skip, sig.parameters.values()
)
params = [
PluginService.param_annotation_desc(param) for param in sig_params
]
return params
if __name__ == '__main__': if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000) app.run(host='0.0.0.0', port=5000)