Adding a camunda parser, so we can use this free and beautiful editor to build diagrams.
Adding a command line implemenation so I can get a grip on what is happening in real time more quickly. Started working on some custom tasks, but need to put a little more effort into this area. Minor updates to the readme Adding a joke BPMN that I can test with.
This commit is contained in:
@ -39,6 +39,25 @@ run configuration and set up a run configuration that looks like the following (
run configuration so it doesn't go away.) :
![Run Configuration Screenshot](readme_images/run_config.png)
### Running the project
### Running the Web API
Just click the "Play" button next to RUN in the top right corner of the screen.
The Swagger based view of the API will be avialable at
### Testing from the Shell
This app includes a command line interface that will read in BPMN files and let you
play with it at the command line. To run it right click on app/command_line/ and
click run. Type "?" to get a list of commands.
### Additional Reading
1. [BPMN]( Is the tool we are using to create diagrams
of our business processes. It's is a beautiful concise diagramming tool. We strongly recommend you
read this complete tutorial, as this notation is the foundation on which this project as well as many
other software systems for businesses are built. Know it well.
### Notes on Creating Good BPMN Diagrams in Comunda
1. Be sure to give each task a thoughtful (but unique!) id. This will
make the command line and debugging far far easier. I've tended to pre-fix
these, so task_ask_riddle if a task is asking a riddle for instance.
@ -0,0 +1,28 @@
from SpiffWorkflow.bpmn.parser.ValidationException import ValidationException
from SpiffWorkflow.bpmn.parser.task_parsers import ExclusiveGatewayParser, first, xpath_eval
class CamundaExclusiveGatewayParser(ExclusiveGatewayParser):
def connect_outgoing(self, outgoing_task, outgoing_task_node, sequence_flow_node, is_default):
super(CamundaExclusiveGatewayParser, self).connect_outgoing(outgoing_task, outgoing_task_node, sequence_flow_node, is_default)
except ValidationException as ex:
if 'Non-default exclusive outgoing sequence flow without condition' not in str(ex):
xpath = xpath_eval(sequence_flow_node)
condition_expression_node = conditionExpression = first(
if not condition_expression_node:
for attrib in condition_expression_node.attrib:
if attrib.endswith('resource') and condition_expression_node.attrib[attrib]:
conditionExpression = condition_expression_node.attrib[attrib]
cond = self.parser.parse_condition(conditionExpression, outgoing_task, outgoing_task_node, sequence_flow_node, condition_expression_node, self)
self.task.connect_outgoing_if(cond, outgoing_task, sequence_flow_node.get('id'), sequence_flow_node.get(
'name', None), self.parser._parse_documentation(sequence_flow_node, task_parser=self))
@ -0,0 +1,104 @@
from io import BytesIO
from xml.etree import ElementTree
from SpiffWorkflow import Task
from SpiffWorkflow.bpmn.parser.BpmnParser import BpmnParser, full_tag
from SpiffWorkflow.bpmn.serializer.BpmnSerializer import BpmnSerializer
from SpiffWorkflow.bpmn.serializer.Packager import Packager
from SpiffWorkflow.bpmn.specs import ExclusiveGateway
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow
from app.camunda.CamundaParser import CamundaExclusiveGatewayParser
class InMemoryPackager(Packager):
Creates spiff's wf packages on the fly.
def package_in_memory(cls, workflow_name, workflow_files, editor):
Generates wf packages from workflow diagrams.
s = BytesIO()
p = cls(s, workflow_name, meta_data=[], editor=editor)
return s.getvalue()
class BPMNXMLWorkflowRunner:
def __init__(self, path, workflowProcessID=None, debug=False, **kwargs):
self.path = path
self.debug = debug
self.kwargs = kwargs
ETRroot = ElementTree.parse(self.path).getroot() # definitions
self.workflowProcessID = workflowProcessID or BPMNXMLWorkflowRunner.__getWorkflowProcessID(ETRroot)
self.workflowEditor = BPMNXMLWorkflowRunner.__getWorkflowEditor(ETRroot)
self.packager = InMemoryPackager
if self.workflowEditor == 'Camunda Modeler':
self.addParserSupport('exclusiveGateway', CamundaExclusiveGatewayParser, ExclusiveGateway.ExclusiveGateway)
self.workflow = None
def addParserSupport(self, full_tag_name, parserClass, taskClass):
self.packager.PARSER_CLASS.OVERRIDE_PARSER_CLASSES[full_tag(full_tag_name)] = (parserClass, taskClass)
def __getWorkflowProcessID(ETRroot):
processElements = []
for child in ETRroot:
if child.tag.endswith('process') and child.attrib.get('isExecutable', False):
if len(processElements) == 0:
raise Exception('No executable process tag found')
if len(processElements) > 1:
raise Exception('Multiple executable processes tags found')
return processElements[0].attrib['id']
def __getWorkflowEditor(ETRroot):
return ETRroot.attrib['exporter']
def __do_engine_steps(self):
assert not self.workflow.read_only
engine_steps = list(
[t for t in self.workflow.get_tasks(Task.READY) if self.workflow._is_engine_task(t.task_spec)])
while engine_steps:
for task in engine_steps:
engine_steps = list([t for t in self.workflow.get_tasks(Task.READY) if self.workflow._is_engine_task(t.task_spec)])
def start(self, **data):
package = self.packager.package_in_memory(self.workflowProcessID, self.path, self.workflowEditor)
workflowSpec = BpmnSerializer().deserialize_workflow_spec(package)
self.workflow = BpmnWorkflow(workflowSpec, **self.kwargs)
self.workflow.debug = self.debug
# Set input data to first ready task
# self.workflow.do_engine_steps()
def getEndEventName(self):
endTask = self.workflow.get_tasks()[-1]
parent = endTask.parent
while parent and'EndJoin', 'End')):
parent = parent.parent
if parent:
def getData(self):
@ -0,0 +1,87 @@
import json
from cmd import Cmd
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow
from SpiffWorkflow.serializer.json import JSONSerializer
from flask import jsonify
from app.model.WorkflowRunner import WorkflowRunner
class MyPrompt(Cmd):
def __init__(self):
runner = WorkflowRunner('../static/bpmn/joke.bpmn', debug=False)
spec = runner.get_spec()
self.workflow = BpmnWorkflow(spec)
self.workflow.debug = False
serializer = JSONSerializer()
data = self.workflow.serialize(serializer)
self.pretty = json.dumps(json.loads(data), indent=4, separators=(',', ': '))
def do_hello(self, args):
"""Says hello. If you provide a name, it will greet you with it."""
if len(args) == 0:
name = 'stranger'
name = args
print("Hello, %s" % name)
def do_quit(self, args):
"""Quits the program."""
raise SystemExit
def do_debug(self, args):
"""Prints the full task tree."""
def do_engine(self, args):
"""Completes any tasks that are engine specific, completes until there are
only READY user tasks, or WAITING tasks available. """
def do_complete_all(self, args):
"""Completes everything that is possible to complete"""
def do_ready(self, args):
"""Prints a list of user tasks that are ready for action."""
ready_tasks = self.workflow.get_ready_user_tasks()
print("The following task ids are ready for execution:")
for task in ready_tasks:
print("\t" + str( + " : " + str(task.get_name()))
def do_waiting(self, args):
"""Prints a list of tasks that are in the waiting state."""
tasks = self.workflow.get_waiting_tasks()
print("The following task ids are waiting for exectution:")
for task in tasks:
print("\t" + str( + " : " + str(task.get_name()))
def do_next(self, args):
"""Attempts to do the next task."""
print("Running the next task")
self.workflow.complete_next(pick_up=True, halt_on_manual=True)
tasks = self.workflow.get_waiting_tasks()
print("Next Tasks:")
for task in tasks:
print("\t" + str(task.get_name()))
def do_answer(self, args):
tasks = self.workflow.get_ready_user_tasks()
if len(tasks) == 1:
print("You answered " + args)
data = {}
data["answer"] = args
if __name__ == '__main__':
prompt = MyPrompt()
prompt.prompt = '> '
prompt.cmdloop('Starting prompt...')
@ -0,0 +1,63 @@
from io import BytesIO
from xml.etree import ElementTree
from SpiffWorkflow.bpmn.serializer.BpmnSerializer import BpmnSerializer
from SpiffWorkflow.bpmn.serializer.Packager import Packager
from SpiffWorkflow.bpmn.specs import ExclusiveGateway
from app.camunda.CamundaParser import CamundaExclusiveGatewayParser
class InMemoryPackager(Packager):
Creates spiff's wf packages on the fly.
def package_in_memory(cls, workflow_name, workflow_files, editor):
Generates wf packages from workflow diagrams.
s = BytesIO()
p = cls(s, workflow_name, meta_data=[], editor=editor)
return s.getvalue()
class WorkflowRunner:
def __init__(self, path, workflow_process_id=None, debug=False, **kwargs):
self.path = path
self.debug = debug
self.kwargs = kwargs
root_element = ElementTree.parse(self.path).getroot() # definitions
self.workflowProcessID = workflow_process_id or self.__get_workflow_process_id(root_element)
self.workflowEditor = root_element.attrib['exporter']
self.packager = InMemoryPackager
#if self.workflowEditor == 'Camunda Modeler':
# self.addParserSupport('exclusiveGateway', CamundaExclusiveGatewayParser, ExclusiveGateway.ExclusiveGateway)
self.workflow = None
def get_spec(self):
package = self.packager.package_in_memory(self.workflowProcessID, self.path, self.workflowEditor)
return BpmnSerializer().deserialize_workflow_spec(package)
def __get_workflow_process_id(root_element):
process_elements = []
for child in root_element:
if child.tag.endswith('process') and child.attrib.get('isExecutable', False):
if len(process_elements) == 0:
raise Exception('No executable process tag found')
if len(process_elements) > 1:
raise Exception('Multiple executable processes tags found')
return process_elements[0].attrib['id']
@ -0,0 +1,31 @@
from SpiffWorkflow.bpmn.serializer.BpmnSerializer import BpmnSerializer
from SpiffWorkflow.serializer.json import JSONSerializer
from SpiffWorkflow.specs import Simple
class TellJoke(Simple):
def _on_trigger(self, my_task):
print("What has a face and two hands but no arms or legs?")
def _on_complete_hook(self, my_task):
def serialize(self, serializer):
return serializer.serialize_joke(self)
def deserialize(cls, serializer, wf_spec, s_state):
return serializer.deserialize_joke(wf_spec, s_state)
class TellJokeSerializer(BpmnSerializer):
def serialize_joke(self, task_spec):
return self.serialize_task_spec(task_spec)
def deserialize_joke(self, wf_spec, s_state):
spec = TellJoke(wf_spec, s_state['name'])
self.deserialize_task_spec(wf_spec, s_state, spec=spec)
return spec
@ -0,0 +1,30 @@
from SpiffWorkflow.bpmn.serializer.BpmnSerializer import BpmnSerializer
from SpiffWorkflow.serializer.json import JSONSerializer
from SpiffWorkflow.specs import Simple
class Joke(Simple):
def _on_trigger(self, my_task):
def _on_complete_hook(self, my_task):
print("This is a Joke Task!")
def serialize(self, serializer):
return serializer.serialize_nuclear_strike(self)
def deserialize(cls, serializer, wf_spec, s_state):
return serializer.deserialize_nuclear_strike(wf_spec, s_state)
class JokeSerializer(BpmnSerializer):
def serialize_nuclear_strike(self, task_spec):
return self.serialize_task_spec(task_spec)
def deserialize_nuclear_strike(self, wf_spec, s_state):
spec = Joke(wf_spec, s_state['name'])
self.deserialize_task_spec(wf_spec, s_state, spec=spec)
return spec
@ -0,0 +1,124 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="" xmlns:bpmndi="" xmlns:dc="" xmlns:xsi="" xmlns:camunda="" xmlns:di="" id="Definitions_0dagfzd" targetNamespace="" exporter="Camunda Modeler" exporterVersion="3.4.1">
<bpmn:process id="Process_182v69k" isExecutable="true">
<bpmn:startEvent id="StartEvent_1">
<bpmn:sequenceFlow id="SequenceFlow_1q16qc5" sourceRef="StartEvent_1" targetRef="task_show_riddle" />
<bpmn:sequenceFlow id="SequenceFlow_08iqlh1" sourceRef="task_show_riddle" targetRef="tast_await_answer" />
<bpmn:exclusiveGateway id="ExclusiveGateway_evaluate_answer">
<bpmn:sequenceFlow id="SequenceFlow_correct_answer" name="correct" sourceRef="ExclusiveGateway_evaluate_answer" targetRef="task_say_good_job">
<bpmn:conditionExpression xsi:type="bpmn:tFormalExpression">task.get_data("answer") == "clock"</bpmn:conditionExpression>
<bpmn:sequenceFlow id="SequenceFlow_incorrect_answer" name="incorrect" sourceRef="ExclusiveGateway_evaluate_answer" targetRef="task_say_nope">
<bpmn:conditionExpression xsi:type="bpmn:tFormalExpression">task.get_data("answer") != "clock"</bpmn:conditionExpression>
<bpmn:endEvent id="EndEvent_1yj6trx">
<bpmn:sequenceFlow id="SequenceFlow_0vujjbq" sourceRef="task_say_nope" targetRef="EndEvent_1yj6trx" />
<bpmn:endEvent id="EndEvent_0mjci4q">
<bpmn:sequenceFlow id="SequenceFlow_0vemdak" sourceRef="task_say_good_job" targetRef="EndEvent_0mjci4q" />
<bpmn:scriptTask id="task_show_riddle" name="Show Riddle">
<camunda:property />
<bpmn:script>print("What has a face and two hands but no arms or legs?")</bpmn:script>
<bpmn:scriptTask id="task_say_good_job" name="display "good job"">
<bpmn:scriptTask id="task_say_nope" name="display "nope"">
<bpmn:sequenceFlow id="SequenceFlow_06f0smj" sourceRef="tast_await_answer" targetRef="ExclusiveGateway_evaluate_answer" />
<bpmn:userTask id="tast_await_answer" name="Await Answer">
<camunda:outputParameter name="correct">True</camunda:outputParameter>
<bpmn:message id="Message_0wcbdkb" />
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Process_182v69k">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="159" width="36" height="36" />
<bpmndi:BPMNEdge id="SequenceFlow_1q16qc5_di" bpmnElement="SequenceFlow_1q16qc5">
<di:waypoint x="215" y="177" />
<di:waypoint x="270" y="177" />
<bpmndi:BPMNEdge id="SequenceFlow_08iqlh1_di" bpmnElement="SequenceFlow_08iqlh1">
<di:waypoint x="370" y="177" />
<di:waypoint x="460" y="177" />
<bpmndi:BPMNShape id="ExclusiveGateway_0he1234_di" bpmnElement="ExclusiveGateway_evaluate_answer" isMarkerVisible="true">
<dc:Bounds x="655" y="152" width="50" height="50" />
<bpmndi:BPMNEdge id="SequenceFlow_02hgkgp_di" bpmnElement="SequenceFlow_correct_answer">
<di:waypoint x="680" y="152" />
<di:waypoint x="680" y="90" />
<di:waypoint x="800" y="90" />
<dc:Bounds x="678" y="118" width="34" height="14" />
<bpmndi:BPMNEdge id="SequenceFlow_1kegyis_di" bpmnElement="SequenceFlow_incorrect_answer">
<di:waypoint x="680" y="202" />
<di:waypoint x="680" y="290" />
<di:waypoint x="800" y="290" />
<dc:Bounds x="674" y="243" width="43" height="14" />
<bpmndi:BPMNShape id="EndEvent_1yj6trx_di" bpmnElement="EndEvent_1yj6trx">
<dc:Bounds x="1002" y="272" width="36" height="36" />
<bpmndi:BPMNEdge id="SequenceFlow_0vujjbq_di" bpmnElement="SequenceFlow_0vujjbq">
<di:waypoint x="900" y="290" />
<di:waypoint x="1002" y="290" />
<bpmndi:BPMNShape id="EndEvent_0mjci4q_di" bpmnElement="EndEvent_0mjci4q">
<dc:Bounds x="1002" y="72" width="36" height="36" />
<bpmndi:BPMNEdge id="SequenceFlow_0vemdak_di" bpmnElement="SequenceFlow_0vemdak">
<di:waypoint x="900" y="90" />
<di:waypoint x="1002" y="90" />
<bpmndi:BPMNShape id="ScriptTask_0lxpwi3_di" bpmnElement="task_show_riddle">
<dc:Bounds x="270" y="137" width="100" height="80" />
<bpmndi:BPMNShape id="ScriptTask_0qc6vev_di" bpmnElement="task_say_good_job">
<dc:Bounds x="800" y="50" width="100" height="80" />
<bpmndi:BPMNShape id="ScriptTask_0dy5sp8_di" bpmnElement="task_say_nope">
<dc:Bounds x="800" y="250" width="100" height="80" />
<bpmndi:BPMNEdge id="SequenceFlow_06f0smj_di" bpmnElement="SequenceFlow_06f0smj">
<di:waypoint x="560" y="177" />
<di:waypoint x="655" y="177" />
<bpmndi:BPMNShape id="UserTask_16xaap5_di" bpmnElement="tast_await_answer">
<dc:Bounds x="460" y="137" width="100" height="80" />
@ -1,9 +1,14 @@
import json
import unittest
from SpiffWorkflow import Workflow
from SpiffWorkflow.bpmn.serializer.BpmnSerializer import BpmnSerializer
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow
from SpiffWorkflow.serializer.json import JSONSerializer
from SpiffWorkflow.specs import WorkflowSpec
from app.camunda.WorkflowRunner import BPMNXMLWorkflowRunner
from app.model.WorkflowRunner import WorkflowRunner
from app.model.task.strike import NuclearSerializer
from tests.base_test import BaseTest
@ -22,19 +27,26 @@ class TestWorkflow(BaseTest, unittest.TestCase):
response_data = response.json
self.assertEqual('Full IRB Board Review',response_data[0]['name'])
def load_spec_from_file(self, path):
def load_spec_from_json(self, path):
with open(path) as fp:
workflow_json =
serializer = NuclearSerializer()
spec = WorkflowSpec.deserialize(serializer, workflow_json)
return spec
def load_spec_from_bpmn(self, path):
with open(path) as fp:
serializer = BpmnSerializer()
spec = WorkflowSpec.deserialize(serializer, fp)
return spec
def test_workflow_from_file(self):
spec = self.load_spec_from_file('../app/static/json/nuclear.json')
spec = self.load_spec_from_json('../app/static/json/nuclear.json')
def test_workflow_from_spec(self):
spec = self.load_spec_from_file('../app/static/json/nuclear.json')
spec = self.load_spec_from_json('../app/static/json/nuclear.json')
workflow = Workflow(spec)
@ -47,5 +59,22 @@ class TestWorkflow(BaseTest, unittest.TestCase):
def test_open_bpmn_diagram(self):
self.assertTrue(False, "Test loading a simple bpmn diagram")
runner = BPMNXMLWorkflowRunner('../app/static/bpmn/joke.bpmn', debug=True)
res = runner.getEndEventName()
self.assertEqual(res, 'Task_1u241z0')
def test_loading_joke(self):
runner = WorkflowRunner('../app/static/bpmn/joke.bpmn', debug=True)
spec = runner.get_spec()
workflow = BpmnWorkflow(spec)
workflow.debug = False
serializer = JSONSerializer()
data = workflow.serialize(serializer)
pretty = json.dumps(json.loads(data), indent=4, separators=(',', ': '))
Reference in New Issue