Merge pull request #7 from sartography/feature/add_linting

Feature/add linting
This commit is contained in:
jasquat 2022-10-10 12:31:16 -04:00 committed by GitHub
commit c94230ce27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 1728 additions and 232 deletions

2
.darglint Normal file
View File

@ -0,0 +1,2 @@
[darglint]
strictness = long

12
.flake8 Normal file
View File

@ -0,0 +1,12 @@
[flake8]
select = B,B9,C,D,DAR,E,F,N,RST,S,W
ignore = E203,E501,RST201,RST203,RST301,W503,S410,S320
max-line-length = 120
max-complexity = 30
docstring-convention = google
rst-roles = class,const,func,meth,mod,ref
rst-directives = deprecated
per-file-ignores =
# prefer naming tests descriptively rather than forcing comments
tests/*:S101,D103

2
.gitignore vendored
View File

@ -130,4 +130,4 @@ dmypy.json
.pyre/
# IDEs
.idea
.idea

63
.pre-commit-config.yaml Normal file
View File

@ -0,0 +1,63 @@
repos:
- repo: local
hooks:
- id: black
name: black
entry: black
language: system
types: [python]
require_serial: true
exclude: ^migrations/
- id: check-added-large-files
name: Check for added large files
entry: check-added-large-files
language: system
- id: check-toml
name: Check Toml
entry: check-toml
language: system
types: [toml]
- id: check-yaml
name: Check Yaml
entry: check-yaml
language: system
types: [yaml]
- id: end-of-file-fixer
name: Fix End of Files
entry: end-of-file-fixer
language: system
types: [text]
stages: [commit, push, manual]
- id: flake8
name: flake8
entry: flake8
language: system
types: [python]
require_serial: true
exclude: ^migrations/
- id: pyupgrade
name: pyupgrade
description: Automatically upgrade syntax for newer versions.
entry: pyupgrade
language: system
types: [python]
args: [--py37-plus]
- id: reorder-python-imports
name: Reorder python imports
entry: reorder-python-imports
language: system
types: [python]
args: [--application-directories=src]
exclude: ^migrations/
- id: trailing-whitespace
name: Trim Trailing Whitespace
entry: trailing-whitespace-fixer
language: system
types: [text]
stages: [commit, push, manual]
exclude: ^migrations/
- repo: https://github.com/pre-commit/mirrors-prettier
rev: v2.4.1
hooks:
- id: prettier
exclude_types: [html]

2
README
View File

@ -1,5 +1,7 @@
# Run the service
poetry run flask --debug run --port=7004
# You can check to see if it is running by loading
http://localhost:7004/v1/commands

159
app.py
View File

@ -1,42 +1,61 @@
import importlib
import inspect
import json
import os
import pkgutil
import types
import typing
from flask import Flask, Response, redirect, request, session, url_for
from flask import Flask
from flask import redirect
from flask import request
from flask import Response
from flask import session
from flask import url_for
from flask_oauthlib.contrib.client import OAuth
app = Flask(__name__)
app.config.from_pyfile('config.py', silent=True)
app.config.from_pyfile("config.py", silent=True)
if app.config["ENV"] != "production":
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
if app.config['ENV'] != 'production':
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
@app.before_first_request
def load_plugins():
print('load the plugins once here?')
print("load the plugins once here?")
@app.route('/liveness')
@app.route("/liveness")
def status():
return Response(json.dumps({"ok": True}), status=200, mimetype='application/json')
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
def list_targets(targets):
descriptions = []
for plugin_name, plugin_targets in targets.items():
for target_name, target in plugin_targets.items():
description = PluginService.describe_target(plugin_name, target_name, target)
description = PluginService.describe_target(
plugin_name, target_name, target
)
descriptions.append(description)
return Response(json.dumps(descriptions), status=200, mimetype='application/json')
return Response(json.dumps(descriptions), status=200, mimetype="application/json")
@app.route('/v1/auths')
@app.route("/v1/auths")
def list_auths():
return list_targets(PluginService.available_auths_by_plugin())
@app.route('/v1/commands')
@app.route("/v1/commands")
def list_commands():
return list_targets(PluginService.available_commands_by_plugin())
def auth_handler(plugin_display_name, auth_name, params):
auth = PluginService.auth_named(plugin_display_name, auth_name)
if auth is not None:
@ -57,70 +76,73 @@ def auth_handler(plugin_display_name, auth_name, params):
return handler
@app.route('/v1/auth/<plugin_display_name>/<auth_name>')
@app.route("/v1/auth/<plugin_display_name>/<auth_name>")
def do_auth(plugin_display_name, auth_name):
params = request.args.to_dict()
our_redirect_url = params['redirect_url']
session['redirect_url'] = our_redirect_url
our_redirect_url = params["redirect_url"]
session["redirect_url"] = our_redirect_url
handler = auth_handler(plugin_display_name, auth_name, params)
if handler is None:
return Response('Auth not found', status=404)
return Response("Auth not found", status=404)
# TODO factor into handler
# TODO namespace the keys
session['client_id'] = params['client_id']
session['client_secret'] = params['client_secret']
session["client_id"] = params["client_id"]
session["client_secret"] = params["client_secret"]
oauth_redirect_url = url_for('auth_callback', plugin_display_name=plugin_display_name, auth_name=auth_name, _external=True)
oauth_redirect_url = url_for(
"auth_callback",
plugin_display_name=plugin_display_name,
auth_name=auth_name,
_external=True,
)
return handler.authorize(callback_uri=oauth_redirect_url)
@app.route('/v1/auth/<plugin_display_name>/<auth_name>/callback')
@app.route("/v1/auth/<plugin_display_name>/<auth_name>/callback")
def auth_callback(plugin_display_name, auth_name):
handler = auth_handler(plugin_display_name, auth_name, session)
if handler is None:
return Response('Auth not found', status=404)
return Response("Auth not found", status=404)
response = json.dumps(handler.authorized_response())
redirect_url = session['redirect_url']
redirect_url = session["redirect_url"]
# TODO compare redirect_url to whitelist
return redirect(f'{redirect_url}?response={response}')
return redirect(f"{redirect_url}?response={response}")
@app.route('/v1/do/<plugin_display_name>/<command_name>')
@app.route("/v1/do/<plugin_display_name>/<command_name>")
def do_command(plugin_display_name, command_name):
command = PluginService.command_named(plugin_display_name, command_name)
if command is None:
return json_error_response(f'Command not found: {plugin_display_name}:{command_name}', status=404)
return json_error_response(
f"Command not found: {plugin_display_name}:{command_name}", status=404
)
params = request.args.to_dict()
try:
result = command(**params).execute(app.config)
except Exception as e:
return json_error_response(f'Error encountered when executing {plugin_display_name}:{command_name} {str(e)}',
status=404)
return json_error_response(
f"Error encountered when executing {plugin_display_name}:{command_name} {str(e)}",
status=404,
)
return Response(result["response"], mimetype=result["mimetype"], status=200)
return Response(result['response'], mimetype=result['mimetype'], status=200)
def json_error_response(message, status):
resp = {
'error':message,
'status':status
}
resp = {"error": message, "status": status}
return Response(json.dumps(resp), status=status)
# TODO move out to own home
import importlib
import inspect
import pkgutil
import types
import typing
class PluginService:
PLUGIN_PREFIX = 'connector_'
PLUGIN_PREFIX = "connector_"
@staticmethod
def plugin_display_name(plugin_name):
@ -129,44 +151,43 @@ class PluginService:
@staticmethod
def plugin_name_from_display_name(plugin_display_name):
return PluginService.PLUGIN_PREFIX + plugin_display_name
@staticmethod
def available_plugins():
return {
name: importlib.import_module(name)
for finder, name, ispkg
in pkgutil.iter_modules()
for finder, name, ispkg in pkgutil.iter_modules()
if name.startswith(PluginService.PLUGIN_PREFIX)
}
@staticmethod
def available_auths_by_plugin():
return {
plugin_name: {
auth_name: auth
for auth_name, auth
in PluginService.auths_for_plugin(plugin_name, plugin)
}
for plugin_name, plugin
in PluginService.available_plugins().items()
plugin_name: {
auth_name: auth
for auth_name, auth in PluginService.auths_for_plugin(
plugin_name, plugin
)
}
for plugin_name, plugin in PluginService.available_plugins().items()
}
@staticmethod
def available_commands_by_plugin():
return {
plugin_name: {
command_name: command
for command_name, command
in PluginService.commands_for_plugin(plugin_name, plugin)
}
for plugin_name, plugin
in PluginService.available_plugins().items()
plugin_name: {
command_name: command
for command_name, command in PluginService.commands_for_plugin(
plugin_name, plugin
)
}
for plugin_name, plugin in PluginService.available_plugins().items()
}
@staticmethod
def target_id(plugin_name, target_name):
plugin_display_name = PluginService.plugin_display_name(plugin_name)
return f'{plugin_display_name}/{target_name}'
return f"{plugin_display_name}/{target_name}"
@staticmethod
def auth_named(plugin_display_name, auth_name):
@ -175,7 +196,7 @@ class PluginService:
try:
return available_auths_by_plugin[plugin_name][auth_name]
except:
except Exception:
return None
@staticmethod
@ -185,7 +206,7 @@ class PluginService:
try:
return available_commands_by_plugin[plugin_name][command_name]
except:
except Exception:
return None
@staticmethod
@ -203,19 +224,21 @@ class PluginService:
@staticmethod
def targets_for_plugin(plugin_name, plugin, target_package_name):
for module_name, module in PluginService.modules_for_plugin_in_package(plugin, target_package_name):
for module_name, module in PluginService.modules_for_plugin_in_package(
plugin, target_package_name
):
for member_name, member in inspect.getmembers(module, inspect.isclass):
if member.__module__ == module_name:
yield member_name, member
@staticmethod
def auths_for_plugin(plugin_name, plugin):
yield from PluginService.targets_for_plugin(plugin_name, plugin, 'auths')
yield from PluginService.targets_for_plugin(plugin_name, plugin, "auths")
@staticmethod
def commands_for_plugin(plugin_name, plugin):
# TODO check if class has an execute method before yielding
yield from PluginService.targets_for_plugin(plugin_name, plugin, 'commands')
yield from PluginService.targets_for_plugin(plugin_name, plugin, "commands")
@staticmethod
def param_annotation_desc(param):
@ -262,13 +285,11 @@ class PluginService:
@staticmethod
def callable_params_desc(kallable):
sig = inspect.signature(kallable)
params_to_skip = ['self', 'kwargs']
params_to_skip = ["self", "kwargs"]
sig_params = filter(
lambda param: param.name not in params_to_skip, sig.parameters.values()
)
params = [
PluginService.param_annotation_desc(param) for param in sig_params
]
params = [PluginService.param_annotation_desc(param) for param in sig_params]
return params
@ -276,8 +297,8 @@ class PluginService:
def describe_target(plugin_name, target_name, target):
parameters = PluginService.callable_params_desc(target.__init__)
target_id = PluginService.target_id(plugin_name, target_name)
return {'id': target_id, 'parameters': parameters}
return {"id": target_id, "parameters": parameters}
if __name__ == '__main__':
app.run(host='localhost', port=5000)
if __name__ == "__main__":
app.run(host="localhost", port=5000)

15
bin/run_server_locally Executable file
View File

@ -0,0 +1,15 @@
#!/usr/bin/env bash
function error_handler() {
>&2 echo "Exited with BAD EXIT CODE '${2}' in ${0} script at line: ${1}."
exit "$2"
}
trap 'error_handler ${LINENO} $?' ERR
set -o errtrace -o errexit -o nounset -o pipefail
if [[ -z "${FLASK_ENV:-}" ]]; then
export FLASK_ENV=development
fi
export FLASK_SESSION_SECRET_KEY=super_secret_key
poetry run flask run -p 7004

View File

@ -0,0 +1 @@
"""__init__."""

View File

@ -1,29 +1,28 @@
import boto3
from botocore.config import Config
"""SimpleAuth."""
import boto3 # type: ignore
from botocore.config import Config # type: ignore
class SimpleAuth:
"""Established a simple Boto 3 Client based on an access key and a secret key"""
"""Established a simple Boto 3 Client based on an access key and a secret key."""
def __init__(self, resource_type: str, access_key: str, secret_key: str):
def __init__(self, resource_type: str, access_key: str, secret_key: str):
"""
:param access_key: AWS Access Key
:param secret_key: AWS Secret Key
"""
my_config = Config(
region_name='us-east-1',
retries={
'max_attempts': 10,
'mode': 'standard'
}
region_name="us-east-1", retries={"max_attempts": 10, "mode": "standard"}
)
# Get the service resource.
self.resource = boto3.resource(resource_type,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
config=my_config)
self.resource = boto3.resource(
resource_type,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
config=my_config,
)
def get_resource(self):
"""Get_resource."""
return self.resource

View File

@ -0,0 +1 @@
"""__init__."""

View File

@ -1,16 +1,15 @@
"""AddDynamoItem."""
import json
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
from connector_aws.auths.simpleAuth import SimpleAuth
from connector_aws.auths.simpleAuth import SimpleAuth # type: ignore
class AddDynamoItem:
"""Add a new record to a dynamo db table."""
def __init__(self, access_key: str, secret_key: str, table_name: str, item_data: str):
def __init__(
self, access_key: str, secret_key: str, table_name: str, item_data: str
):
"""
:param access_key: AWS Access Key
:param secret_key: AWS Secret Key
@ -20,8 +19,7 @@ class AddDynamoItem:
and a response string.
"""
# Get the service resource.
self.dynamodb = SimpleAuth('dynamodb', access_key, secret_key).get_resource()
self.dynamodb = SimpleAuth("dynamodb", access_key, secret_key).get_resource()
# Instantiate a table resource object without actually
# creating a DynamoDB table. Note that the attributes of this table
@ -32,8 +30,9 @@ class AddDynamoItem:
self.item_data = json.loads(item_data)
def execute(self, config):
"""Execute."""
result = self.table.put_item(Item=self.item_data)
if 'ResponseMetadata' in result:
del result['ResponseMetadata']
if "ResponseMetadata" in result:
del result["ResponseMetadata"]
result_str = json.dumps(result)
return dict(response=result_str, mimetype='application/json')
return dict(response=result_str, mimetype="application/json")

View File

@ -1,14 +1,11 @@
"""QueryDynamoTable."""
import json
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
from connector_aws.auths.simpleAuth import SimpleAuth
from connector_aws.auths.simpleAuth import SimpleAuth # type: ignore
class QueryDynamoTable:
"""Return all records for a given partition key"""
"""Return all records for a given partition key."""
def __init__(self, access_key: str, secret_key: str, table_name: str, key: str):
"""
@ -18,14 +15,14 @@ class QueryDynamoTable:
:param key: The partition key for what to return.
:return: Json Data structure containing the requested data.
"""
self.dynamodb = SimpleAuth('dynamodb', access_key, secret_key).get_resource()
self.dynamodb = SimpleAuth("dynamodb", access_key, secret_key).get_resource()
self.table = self.dynamodb.Table(table_name)
self.key = key
def execute(self, config):
"""Execute."""
result = self.table.get_item(Key={"primaryKeyName": self.key})
if 'ResponseMetadata' in result:
del result['ResponseMetadata']
if "ResponseMetadata" in result:
del result["ResponseMetadata"]
result_str = json.dumps(result)
return dict(response=result_str, mimetype='application/json')
return dict(response=result_str, mimetype="application/json")

View File

@ -1,10 +1,7 @@
"""ScanDynamoTable."""
import json
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
from connector_aws.auths.simpleAuth import SimpleAuth
from connector_aws.auths.simpleAuth import SimpleAuth # type: ignore
class ScanDynamoTable:
@ -17,12 +14,13 @@ class ScanDynamoTable:
:param table_name: The name of hte Dynamo DB table to scan
:return: Json Data structure containing the requested data.
"""
self.dynamodb = SimpleAuth('dynamodb', access_key, secret_key).get_resource()
self.dynamodb = SimpleAuth("dynamodb", access_key, secret_key).get_resource()
self.table = self.dynamodb.Table(table_name)
def execute(self, config):
"""Execute."""
result = self.table.scan()
if 'ResponseMetadata' in result:
del result['ResponseMetadata']
if "ResponseMetadata" in result:
del result["ResponseMetadata"]
result_str = json.dumps(result)
return dict(response=result_str, mimetype='application/json')
return dict(response=result_str, mimetype="application/json")

View File

@ -1,14 +1,19 @@
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
from connector_aws.auths.simpleAuth import SimpleAuth
"""UploadFile."""
from botocore.exceptions import ClientError # type: ignore
from connector_aws.auths.simpleAuth import SimpleAuth # type: ignore
class UploadFileData:
"""UploadFileData."""
def __init__(self, access_key: str, secret_key: str,
file_data:bytes, bucket: str, object_name: str):
def __init__(
self,
access_key: str,
secret_key: str,
file_data: bytes,
bucket: str,
object_name: str,
):
"""
:param access_key: AWS Access Key
:param secret_key: AWS Secret Key
@ -18,29 +23,27 @@ class UploadFileData:
:return: Json Data structure containing a http status code (hopefully '200' for success..)
and a response string.
"""
self.client = SimpleAuth('s3', access_key, secret_key).get_resource()
self.client = SimpleAuth("s3", access_key, secret_key).get_resource()
self.file_data = file_data
self.bucket = bucket
self.object_name = object_name
def execute(self, config):
"""Execute."""
# Upload the file
try:
result = self.client.Object(self.bucket, self.object_name).put(Body=self.file_data)
status = str(result['ResponseMetadata']['HTTPStatusCode'])
result = self.client.Object(self.bucket, self.object_name).put(
Body=self.file_data
)
status = str(result["ResponseMetadata"]["HTTPStatusCode"])
# TODO these can be improved
if status == '200':
if status == "200":
response = '{ "result": "success" }'
else:
response = '{ "result": "error" }'
except ClientError as e:
response = f'{ "error": "AWS Excetion {e}" }'
status = '500'
status = "500"
return {
'response': response,
'status': status,
'mimetype': 'application/json'
}
return {"response": response, "status": status, "mimetype": "application/json"}

View File

@ -0,0 +1 @@
"""__init__."""

View File

@ -1,4 +1,6 @@
"""GetPayRate."""
import json
import requests
#
@ -12,32 +14,37 @@ import requests
# "payRate": "65000.00 USD"
# }
class GetPayRate:
"""GetPayRate."""
def __init__(self, employee_id: str):
"""__init__."""
self.employee_id = employee_id
def execute(self, config):
api_key = config['BAMBOOHR_API_KEY']
subdomain = config['BAMBOOHR_SUBDOMAIN']
"""Execute."""
api_key = config["BAMBOOHR_API_KEY"]
subdomain = config["BAMBOOHR_SUBDOMAIN"]
url = f'https://api.bamboohr.com/api/gateway.php/{subdomain}/v1/employees/{self.employee_id}'
headers = {'Accept': 'application/json'}
params = {'fields': 'payRate', 'onlyCurrent': 'true'}
auth = (api_key, 'x')
url = f"https://api.bamboohr.com/api/gateway.php/{subdomain}/v1/employees/{self.employee_id}"
headers = {"Accept": "application/json"}
params = {"fields": "payRate", "onlyCurrent": "true"}
auth = (api_key, "x")
try:
raw_response = requests.get(url, params, headers=headers, auth=auth)
parsed_response = json.loads(raw_response.text)
pay_rate = parsed_response['payRate']
pay_rate_parts = pay_rate.split(' ')
parsed_response['amount'] = pay_rate_parts[0]
parsed_response['currency'] = pay_rate_parts[1]
pay_rate = parsed_response["payRate"]
pay_rate_parts = pay_rate.split(" ")
parsed_response["amount"] = pay_rate_parts[0]
parsed_response["currency"] = pay_rate_parts[1]
response = json.dumps(parsed_response)
except:
except Exception:
response = '{ "error": "Invalid Employee ID" }'
return {
'response': response,
'status': raw_response.status_code,
'mimetype': 'application/json'
"response": response,
"status": raw_response.status_code,
"mimetype": "application/json",
}

View File

@ -0,0 +1 @@
"""__init__."""

View File

@ -0,0 +1 @@
"""__init__."""

View File

@ -1,75 +1,88 @@
"""Create."""
from io import BytesIO
from jinja2 import BaseLoader, Environment
from markdown2 import markdown
from xhtml2pdf import pisa
from connector_aws.commands.uploadFile import UploadFileData
from jinja2 import BaseLoader
from jinja2 import Environment
from markdown2 import markdown # type: ignore
from xhtml2pdf import pisa # type: ignore
class CreatePDF:
"""CreatePDF."""
def __init__(self, template: str):
"""__init__."""
self.template = template
def execute(self, config):
"""Execute."""
buf = BytesIO()
# TODO this will be provided in an upcoming pr
task_data = {
'name': 'Bob',
'amount': '123',
"name": "Bob",
"amount": "123",
}
html_string = markdown(self.template)
html_template = Environment(loader=BaseLoader).from_string(html_string)
html_template = Environment(loader=BaseLoader, autoescape=True).from_string(
html_string
)
html_content = html_template.render(**task_data)
pisa_status = pisa.CreatePDF(html_content, dest=buf)
if pisa_status.err:
return {
'response': 'ERR',
'status': '500',
'mimetype': 'text',
"response": "ERR",
"status": "500",
"mimetype": "text",
}
return {
'response': buf.getvalue(),
'status': '200',
'mimetype': 'application/pdf',
"response": buf.getvalue(),
"status": "200",
"mimetype": "application/pdf",
}
class CreatePDFAndUploadToS3:
"""CreatePDFAndUploadToS3."""
def __init__(self, template: str, aws_object_name: str):
"""__init__."""
self.template = template
self.aws_object_name = aws_object_name
def execute(self, config):
aws_access_key_id = config['AWS_ACCESS_KEY_ID']
aws_secret_access_key = config['AWS_SECRET_ACCESS_KEY']
aws_bucket = config['AWS_INVOICE_S3_BUCKET']
"""Execute."""
aws_access_key_id = config["AWS_ACCESS_KEY_ID"]
aws_secret_access_key = config["AWS_SECRET_ACCESS_KEY"]
aws_bucket = config["AWS_INVOICE_S3_BUCKET"]
pdf_result = CreatePDF(self.template).execute(config)
if pdf_result['status'] != '200':
if pdf_result["status"] != "200":
return {
'response': '{ "error": "failed to create pdf" }',
'status': '500',
'mimetype': 'application/json',
"response": '{ "error": "failed to create pdf" }',
"status": "500",
"mimetype": "application/json",
}
aws_result = UploadFileData(aws_access_key_id,
aws_secret_access_key,
pdf_result['response'],
aws_result = UploadFileData(
aws_access_key_id,
aws_secret_access_key,
pdf_result["response"],
aws_bucket,
self.aws_object_name).execute(config)
self.aws_object_name,
).execute(config)
if aws_result['status'] != '200':
if aws_result["status"] != "200":
return aws_result
return {
'response': '{ "created": "' + self.aws_object_name + '"}',
'status': '200',
'mimetype': 'application/json',
"response": '{ "created": "' + self.aws_object_name + '"}',
"status": "200",
"mimetype": "application/json",
}

View File

@ -0,0 +1 @@
"""__init__."""

View File

@ -0,0 +1 @@
"""__init__."""

View File

@ -0,0 +1 @@
"""__init__."""

View File

@ -1,19 +1,28 @@
"""SendMessage."""
import json
import requests
from dataclasses import dataclass
import requests
from flask import current_app
@dataclass
class SendMessage:
"""SendMessage."""
message: str
message_type: str
recipient: str
def execute(self, config):
"""Execute."""
url = f'{current_app.config["WAKU_PROXY_BASE_URL"]}/sendMessage'
headers = {'Accept': 'application/json', 'Content-type': 'application/json'}
request_body = {"message": self.message, "recipient": self.recipient, "message_type": self.message_type}
headers = {"Accept": "application/json", "Content-type": "application/json"}
request_body = {
"message": self.message,
"recipient": self.recipient,
"message_type": self.message_type,
}
status_code = None
try:
@ -25,8 +34,8 @@ class SendMessage:
response = json.dumps({"error": str(ex)})
return {
'response': response,
'node_returned_200': True,
'status': status_code,
'mimetype': 'application/json'
"response": response,
"node_returned_200": True,
"status": status_code,
"mimetype": "application/json",
}

View File

@ -0,0 +1 @@
"""__init__."""

View File

@ -0,0 +1 @@
"""__init__."""

View File

@ -1,9 +1,16 @@
"""Oauth."""
class OAuth:
"""OAuth."""
def __init__(self, client_id: str, client_secret: str):
"""__init__."""
self.client_id = client_id
self.client_secret = client_secret
def app_description(self):
"""App_description."""
return {
"name": "xero",
"version": "2",
@ -14,12 +21,13 @@ class OAuth:
"access_token_url": "https://identity.xero.com/connect/token",
"refresh_token_url": "https://identity.xero.com/connect/token",
"scope": "offline_access openid profile email accounting.transactions "
"accounting.reports.read accounting.journals.read accounting.settings "
"accounting.contacts accounting.attachments assets projects",
"accounting.reports.read accounting.journals.read accounting.settings "
"accounting.contacts accounting.attachments assets projects",
}
@staticmethod
def filtered_params(params):
"""Filtered_params."""
return {
"client_id": params["client_id"],
"client_secret": params["client_secret"],

View File

@ -0,0 +1 @@
"""__init__."""

View File

@ -1,13 +1,18 @@
"""CreateInvoice."""
import json
from datetime import datetime
from datetime import timedelta
from datetime import datetime, timedelta
from xero_python.accounting import AccountingApi, Contact, LineItem, Invoice, Invoices
from xero_python.api_client import ApiClient, serialize
from xero_python.api_client.oauth2 import OAuth2Token
from xero_python.api_client.configuration import Configuration
from xero_python.identity import IdentityApi
from xero_python.api_client.serializer import serialize
from xero_python.accounting import AccountingApi # type: ignore
from xero_python.accounting import Contact
from xero_python.accounting import Invoice
from xero_python.accounting import Invoices
from xero_python.accounting import LineItem
from xero_python.api_client import ApiClient # type: ignore
from xero_python.api_client.configuration import Configuration # type: ignore
from xero_python.api_client.oauth2 import OAuth2Token # type: ignore
from xero_python.api_client.serializer import serialize # type: ignore
from xero_python.identity import IdentityApi # type: ignore
#
# Sample response
@ -131,20 +136,23 @@ from xero_python.api_client.serializer import serialize
# ]
# }
class CreateInvoice:
def __init__(self,
access_token,
class CreateInvoice:
"""CreateInvoice."""
def __init__(
self,
access_token,
description: str,
contact_name: str,
contact_email: str,
amount: str,
#reference: str,
#created_date: str,
#due_date: str,
#account_code: str,
# reference: str,
# created_date: str,
# due_date: str,
# account_code: str,
):
"""__init__."""
self.access_token = access_token
self.description = description
self.contact_name = contact_name
@ -153,8 +161,8 @@ class CreateInvoice:
def execute(self, config):
"""Creates an invoice in xero."""
client_id = config['XERO_CLIENT_ID']
client_secret = config['XERO_CLIENT_SECRET']
client_id = config["XERO_CLIENT_ID"]
client_secret = config["XERO_CLIENT_SECRET"]
access_token = json.loads(self.access_token)
@ -162,7 +170,7 @@ class CreateInvoice:
Configuration(
debug=True,
oauth2_token=OAuth2Token(
client_id=sclient_id, client_secret=client_secret
client_id=client_id, client_secret=client_secret
),
),
pool_threads=1,
@ -170,46 +178,50 @@ class CreateInvoice:
@api_client.oauth2_token_getter
def obtain_xero_oauth2_token():
"""Obtain_xero_oauth2_token."""
return access_token
@api_client.oauth2_token_saver
def store_xero_oauth2_token(token):
access_token = token
"""Store_xero_oauth2_token."""
access_token = token # noqa
api_instance = AccountingApi(api_client)
summarize_errors = 'True'
summarize_errors = "True"
unitdp = 2
date_value = datetime.now()
due_date_value = date_value + timedelta(days=7)
contact = Contact(name = self.contact_name,
email_address = self.contact_email)
contact = Contact(name=self.contact_name, email_address=self.contact_email)
line_item = LineItem(
description = self.description,
quantity = 1.0,
unit_amount = self.amount,
account_code = "400",
tracking = [])
line_items = []
description=self.description,
quantity=1.0,
unit_amount=self.amount,
account_code="400",
tracking=[],
)
line_items = []
line_items.append(line_item)
invoice = Invoice(
type = "ACCREC",
contact = contact,
date = date_value,
due_date = due_date_value,
line_items = line_items,
reference = "Created by SpiffWorkflow",
status = "AUTHORISED")
type="ACCREC",
contact=contact,
date=date_value,
due_date=due_date_value,
line_items=line_items,
reference="Created by SpiffWorkflow",
status="AUTHORISED",
)
invoices = Invoices(invoices = [invoice])
invoices = Invoices(invoices=[invoice])
try:
xero_tenant_id = self._get_xero_tenant_id(api_client, access_token)
created_invoices = api_instance.create_invoices(xero_tenant_id,
invoices, summarize_errors, unitdp)
created_invoices = api_instance.create_invoices(
xero_tenant_id, invoices, summarize_errors, unitdp
)
response = json.dumps(serialize(created_invoices))
status = 200
except Exception as e:
@ -217,13 +229,10 @@ class CreateInvoice:
response = f'{{ "error": "{e.reason}" }}'
status = 500
return {
'response': response,
'status': status,
'mimetype': 'application/json'
}
return {"response": response, "status": status, "mimetype": "application/json"}
def _get_xero_tenant_id(self, api_client, token):
"""_get_xero_tenant_id."""
if not token:
return None

220
noxfile.py Normal file
View File

@ -0,0 +1,220 @@
"""Nox sessions."""
import os
import shutil
import sys
from pathlib import Path
from textwrap import dedent
import nox
try:
from nox_poetry import Session
from nox_poetry import session
except ImportError:
message = f"""\
Nox failed to import the 'nox-poetry' package.
Please install it using the following command:
{sys.executable} -m pip install nox-poetry"""
raise SystemExit(dedent(message)) from None
package = "connector_proxy_status_im"
python_versions = ["3.10", "3.9"]
nox.needs_version = ">= 2021.6.6"
nox.options.sessions = (
"pre-commit",
"safety",
"mypy",
"tests",
"typeguard",
"xdoctest",
"docs-build",
)
def setup_database(session: Session) -> None:
"""Run database migrations against the database."""
session.env["FLASK_INSTANCE_PATH"] = os.path.join(
os.getcwd(), "instance", "testing"
)
flask_env_key = "FLASK_SESSION_SECRET_KEY"
session.env[flask_env_key] = "super_secret_key"
session.env["FLASK_APP"] = "src/connector_proxy_status_im"
session.env["FLASK_ENV"] = "testing"
session.run("flask", "db", "upgrade")
def activate_virtualenv_in_precommit_hooks(session: Session) -> None:
"""Activate virtualenv in hooks installed by pre-commit.
This function patches git hooks installed by pre-commit to activate the
session's virtual environment. This allows pre-commit to locate hooks in
that environment when invoked from git.
Args:
session: The Session object.
"""
assert session.bin is not None # noqa: S101
virtualenv = session.env.get("VIRTUAL_ENV")
if virtualenv is None:
return
hookdir = Path(".git") / "hooks"
if not hookdir.is_dir():
return
for hook in hookdir.iterdir():
if hook.name.endswith(".sample") or not hook.is_file():
continue
text = hook.read_text()
bindir = repr(session.bin)[1:-1] # strip quotes
if not (
Path("A") == Path("a") and bindir.lower() in text.lower() or bindir in text
):
continue
lines = text.splitlines()
if not (lines[0].startswith("#!") and "python" in lines[0].lower()):
continue
header = dedent(
f"""\
import os
os.environ["VIRTUAL_ENV"] = {virtualenv!r}
os.environ["PATH"] = os.pathsep.join((
{session.bin!r},
os.environ.get("PATH", ""),
))
"""
)
lines.insert(1, header)
hook.write_text("\n".join(lines))
@session(name="pre-commit", python="3.10")
def precommit(session: Session) -> None:
"""Lint using pre-commit."""
args = session.posargs or ["run", "--all-files", "--show-diff-on-failure"]
session.install(
"black",
"darglint",
"flake8",
"flake8-bandit",
"flake8-bugbear",
"flake8-docstrings",
"flake8-rst-docstrings",
"pep8-naming",
"pre-commit",
"pre-commit-hooks",
"pyupgrade",
"reorder-python-imports",
)
session.run("pre-commit", *args)
if args and args[0] == "install":
activate_virtualenv_in_precommit_hooks(session)
@session(python="3.10")
def safety(session: Session) -> None:
"""Scan dependencies for insecure packages."""
requirements = session.poetry.export_requirements()
session.install("safety")
session.run("safety", "check", "--full-report", f"--file={requirements}")
@session(python=python_versions)
def mypy(session: Session) -> None:
"""Type-check using mypy."""
args = session.posargs or ["src", "tests", "docs/conf.py"]
session.install(".")
session.install("mypy", "pytest", "sqlalchemy-stubs")
session.run("mypy", *args)
if not session.posargs:
session.run("mypy", f"--python-executable={sys.executable}", "noxfile.py")
@session(python=python_versions)
def tests(session: Session) -> None:
"""Run the test suite."""
session.install(".")
session.install("coverage[toml]", "pytest", "pygments")
try:
setup_database(session)
session.run("coverage", "run", "--parallel", "-m", "pytest", *session.posargs)
finally:
if session.interactive:
session.notify("coverage", posargs=[])
@session
def coverage(session: Session) -> None:
"""Produce the coverage report."""
args = session.posargs or ["report"]
session.install("coverage[toml]")
if not session.posargs and any(Path().glob(".coverage.*")):
session.run("coverage", "combine")
session.run("coverage", *args)
@session(python=python_versions)
def typeguard(session: Session) -> None:
"""Runtime type checking using Typeguard."""
session.install(".")
session.install("pytest", "typeguard", "pygments")
setup_database(session)
session.env["RUN_TYPEGUARD"] = "true"
session.run("pytest", *session.posargs)
@session(python=python_versions)
def xdoctest(session: Session) -> None:
"""Run examples with xdoctest."""
if session.posargs:
args = [package, *session.posargs]
else:
args = [f"--modname={package}", "--command=all"]
if "FORCE_COLOR" in os.environ:
args.append("--colored=1")
session.install(".")
session.install("xdoctest[colors]")
session.run("python", "-m", "xdoctest", *args)
@session(name="docs-build", python="3.10")
def docs_build(session: Session) -> None:
"""Build the documentation."""
args = session.posargs or ["docs", "docs/_build"]
if not session.posargs and "FORCE_COLOR" in os.environ:
args.insert(0, "--color")
session.install(".")
session.install("sphinx", "sphinx-click", "furo")
build_dir = Path("docs", "_build")
if build_dir.exists():
shutil.rmtree(build_dir)
session.run("sphinx-build", *args)
@session(python="3.10")
def docs(session: Session) -> None:
"""Build and serve the documentation with live reloading on file changes."""
args = session.posargs or ["--open-browser", "docs", "docs/_build"]
session.install(".")
session.install("sphinx", "sphinx-autobuild", "sphinx-click", "furo")
build_dir = Path("docs", "_build")
if build_dir.exists():
shutil.rmtree(build_dir)
session.run("sphinx-autobuild", *args)

1092
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -17,9 +17,26 @@ connector-pdf = {develop=true, path="connectors/connector-pdf"}
gunicorn = "^20.1.0"
Flask-OAuthlib = "^0.9.6"
Flask-Session = "^0.4.0"
types-requests = "^2.28.11.2"
[tool.poetry.dev-dependencies]
[tool.poetry.group.dev.dependencies]
pytest = "^7.1.3"
coverage = "^6.5.0"
safety = "^2.3.1"
mypy = "^0.982"
typeguard = "^2.13.3"
xdoctest = "^1.1.0"
Sphinx = "^5.2.3"
sphinx-autobuild = "^2021.3.14"
pre-commit = "^2.20.0"
flake8 = "^5.0.4"
black = "^22.10.0"
flake8-bandit = "^4.1.1"
pyupgrade = "^3.0.0"
pre-commit-hooks = "^4.3.0"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"