Creating an "app" base directory - this allows us to organize the source files and keep them separate from tests, configuration, and pipenv stuffs. Adding basic config and test directories and getting that set up and ready to roll.

This commit is contained in:
Dan Funk 2019-11-21 09:22:42 -05:00
parent 358860918f
commit 92ce0d048a
15 changed files with 218 additions and 16 deletions

View File

@ -1,2 +0,0 @@
import api.workflows
import api.workflows_tasks

8
app/__init__.py Normal file
View File

@ -0,0 +1,8 @@
import logging
import connexion
from app.api import workflows
logging.basicConfig(level=logging.DEBUG)
app = connexion.FlaskApp(__name__)
app.add_api('api.yml')

View File

@ -10,7 +10,7 @@ paths:
# /v1.0/workflows
/workflows:
get:
operationId: api.workflows.list_all
operationId: app.api.workflows.list_all
summary: List all workflows
tags:
- workflows
@ -44,7 +44,7 @@ paths:
# /v1.0/workflows/0
/workflows/{workflow_id}:
get:
operationId: api.workflows_tasks.start
operationId: app.api.workflows_tasks.start
summary: Status info for a specific workflow instance
tags:
- workflows
@ -73,7 +73,7 @@ paths:
# /v1.0/workflows/0/tasks/0
/workflows/{workflow_id}/tasks/{task_id}:
get:
operationId: api.workflows_tasks.get
operationId: app.api.workflows_tasks.get
summary: Get status of specific task in specific workflow instance
tags:
- workflows
@ -107,7 +107,7 @@ paths:
$ref: "#/components/schemas/Error"
post:
operationId: api.workflows_tasks.post
operationId: app.api.workflows_tasks.post
summary: Update data for a workflow task
tags:
- workflows

1
app/api/__init__.py Normal file
View File

@ -0,0 +1 @@

View File

@ -1,8 +1,6 @@
import datetime
from connexion import NoContent
from SpiffWorkflow import Workflow
from workflows.workflows import TrainingWorkflowSpec
workflow_tasks = [
{

View File

@ -0,0 +1,63 @@
from __future__ import print_function
from SpiffWorkflow.specs import WorkflowSpec, ExclusiveChoice, Simple, Cancel
from SpiffWorkflow.operators import Equal, Attrib
import json
from SpiffWorkflow import Workflow
from SpiffWorkflow.specs import WorkflowSpec
from SpiffWorkflow.serializer.json import JSONSerializer
# Load from JSON
with open('nuclear.json') as fp:
workflow_json = fp.read()
serializer = JSONSerializer()
spec = WorkflowSpec.deserialize(serializer, workflow_json)
# Create the workflow.
workflow = Workflow(spec)
# Execute until all tasks are done or require manual intervention.
# For the sake of this tutorial, we ignore the "manual" flag on the
# tasks. In practice, you probably don't want to do that.
workflow.complete_all(halt_on_manual=False)
# Alternatively, this is what a UI would do for a manual task.
#workflow.complete_task_from_id(...)
def my_nuclear_strike(msg):
print("Launched:", msg)
class NuclearStrikeWorkflowSpec(WorkflowSpec):
def __init__(self):
WorkflowSpec.__init__(self)
# The first step of our workflow is to let the general confirm
# the nuclear strike.
general_choice = ExclusiveChoice(self, 'general')
self.start.connect(general_choice)
# The default choice of the general is to abort.
cancel = Cancel(self, 'workflow_aborted')
general_choice.connect(cancel)
# Otherwise, we will ask the president to confirm.
president_choice = ExclusiveChoice(self, 'president')
cond = Equal(Attrib('confirmation'), 'yes')
general_choice.connect_if(cond, president_choice)
# The default choice of the president is to abort.
president_choice.connect(cancel)
# Otherwise, we will perform the nuclear strike.
strike = Simple(self, 'nuclear_strike')
president_choice.connect_if(cond, strike)
# Now we connect our Python function to the Task named 'nuclear_strike'
strike.completed_event.connect(my_nuclear_strike)
# As soon as all tasks are either "completed" or "aborted", the
# workflow implicitely ends.

View File

@ -0,0 +1,93 @@
{
"task_specs": {
"Start": {
"class": "SpiffWorkflow.specs.StartTask.StartTask",
"manual": false,
"outputs": [
"general"
]
},
"general": {
"class": "SpiffWorkflow.specs.ExclusiveChoice.ExclusiveChoice",
"name": "general",
"manual": true,
"inputs": [
"Start"
],
"outputs": [
"workflow_aborted",
"president"
],
"choice": null,
"default_task_spec": "workflow_aborted",
"cond_task_specs": [
[
[
"SpiffWorkflow.operators.Equal",
[
[
"Attrib",
"confirmation"
],
[
"value",
"yes"
]
]
],
"president"
]
]
},
"president": {
"class": "SpiffWorkflow.specs.ExclusiveChoice.ExclusiveChoice",
"name": "president",
"manual": true,
"inputs": [
"general"
],
"outputs": [
"workflow_aborted",
"nuclear_strike"
],
"choice": null,
"default_task_spec": "workflow_aborted",
"cond_task_specs": [
[
[
"SpiffWorkflow.operators.Equal",
[
[
"Attrib",
"confirmation"
],
[
"value",
"yes"
]
]
],
"nuclear_strike"
]
]
},
"nuclear_strike": {
"class": "SpiffWorkflow.specs.Simple.Simple",
"name": "nuclear_strike",
"inputs": [
"president"
]
},
"workflow_aborted": {
"class": "SpiffWorkflow.specs.Cancel.Cancel",
"name": "workflow_aborted",
"inputs": [
"general",
"president"
]
}
},
"description": "",
"file": null,
"name": ""
}

9
run.py
View File

@ -1,10 +1,3 @@
import logging
import connexion
logging.basicConfig(level=logging.DEBUG)
app = connexion.FlaskApp(__name__)
app.add_api('api.yml')
from app import app
if __name__ == "__main__":
app.run()

37
tests/BaseTest.py Normal file
View File

@ -0,0 +1,37 @@
from app import app
# Great class to inherit from, as it sets up and tears down
# classes efficiently when we have a database in place.
class BaseTest:
auths = {}
@classmethod
def setUpClass(cls):
app.config.from_object('config.testing')
cls.ctx = app.test_request_context()
cls.app = app.test_client()
# Great place to do a db.create_all()
@classmethod
def tearDownClass(cls):
# Create place to clear everything out ...
# db.drop_all()
# db.session.remove()
# elastic_index.clear()
pass
def setUp(self):
self.ctx.push()
def tearDown(self):
# db.session.rollback()
# Most efficient thing here is to delete all rows from
# the database with a clear db method like this one:
# def clean_db(db):
# for table in reversed(db.metadata.sorted_tables):
# db.session.execute(table.delete())
# clean_db(db)
self.ctx.pop()
self.auths = {}

11
tests/TestWorkflow.py Normal file
View File

@ -0,0 +1,11 @@
import unittest
from tests.BaseTest import BaseTest
class TestWorkflow(BaseTest, unittest.TestCase):
def test_truthyness(self):
self.assertTrue(True)
def test_