Workflow Processor will deserialize workflows using the version of the BPMN files used during creation, but allows for both a soft and hard reset - soft resets will use the new workflow without a restart and will retain full history. A hard-reset will restart the workflow from scratch, but will retain the data from the last completed task. Workflows have a complex version number associated with them that is used during the deserialization to find the correct files.

This commit is contained in:
Dan Funk 2020-03-05 15:35:55 -05:00
parent 697d930eab
commit 7b21b78987
5 changed files with 124 additions and 36 deletions

View File

@ -33,4 +33,5 @@ class WorkflowModel(db.Model):
status = db.Column(db.Enum(WorkflowStatus)) status = db.Column(db.Enum(WorkflowStatus))
study_id = db.Column(db.Integer, db.ForeignKey('study.id')) study_id = db.Column(db.Integer, db.ForeignKey('study.id'))
workflow_spec_id = db.Column(db.String, db.ForeignKey('workflow_spec.id')) workflow_spec_id = db.Column(db.String, db.ForeignKey('workflow_spec.id'))
spec_version = db.Column(db.String)

View File

@ -11,7 +11,7 @@ from SpiffWorkflow.camunda.parser.CamundaParser import CamundaParser
from SpiffWorkflow.dmn.parser.BpmnDmnParser import BpmnDmnParser from SpiffWorkflow.dmn.parser.BpmnDmnParser import BpmnDmnParser
from SpiffWorkflow.operators import Operator from SpiffWorkflow.operators import Operator
from crc import session from crc import session, db
from crc.api.common import ApiError from crc.api.common import ApiError
from crc.models.file import FileDataModel, FileModel, FileType from crc.models.file import FileDataModel, FileModel, FileType
from crc.models.workflow import WorkflowStatus, WorkflowModel from crc.models.workflow import WorkflowStatus, WorkflowModel
@ -90,34 +90,77 @@ class WorkflowProcessor(object):
_serializer = BpmnSerializer() _serializer = BpmnSerializer()
WORKFLOW_ID_KEY = "workflow_id" WORKFLOW_ID_KEY = "workflow_id"
STUDY_ID_KEY = "study_id" STUDY_ID_KEY = "study_id"
SPEC_VERSION_KEY = "spec_version"
def __init__(self, workflow_model: WorkflowModel): def __init__(self, workflow_model: WorkflowModel, soft_reset=False, hard_reset=False):
latest_spec = self.get_spec(workflow_model.workflow_spec_id) """Create a Worflow Processor based on the serialized information available in the workflow model.
If soft_reset is set to true, it will try to use the latest version of the workflow specification.
If hard_reset is set to true, it will create a new Workflow, but embed the data from the last
completed task in the previous workflow.
If neither flag is set, it will use the same version of the specification that was used to originally
create the workflow model. """
if soft_reset:
spec = self.get_spec(workflow_model.workflow_spec_id)
workflow_model.spec_version = spec.description
else:
spec = self.get_spec(workflow_model.workflow_spec_id, workflow_model.spec_version)
self.workflow_spec_id = workflow_model.workflow_spec_id self.workflow_spec_id = workflow_model.workflow_spec_id
self.bpmn_workflow = self._serializer.deserialize_workflow(workflow_model.bpmn_workflow_json, self.bpmn_workflow = self._serializer.deserialize_workflow(workflow_model.bpmn_workflow_json, workflow_spec=spec)
workflow_spec=latest_spec)
self.bpmn_workflow.script_engine = self._script_engine self.bpmn_workflow.script_engine = self._script_engine
if hard_reset:
# Now that the spec is loaded, get the data and rebuild the bpmn with the new details
workflow_model.spec_version = self.hard_reset()
@staticmethod @staticmethod
def get_parser(): def get_parser():
parser = MyCustomParser() parser = MyCustomParser()
return parser return parser
@staticmethod @staticmethod
def get_spec(workflow_spec_id): def __get_file_models_for_version(workflow_spec_id, version):
"""Returns the latest version of the specification.""" """Version is in the format v[VERSION] [FILE_ID_LIST]
parser = WorkflowProcessor.get_parser() For example, a single bpmn file with only one version would be
major_version = 0 # The version of the primary file. v1 [12] Where 12 is the id of the file that is used to create the
minor_version = [] # The versions of the minor files if any. specification. If multiple files exist, they are added on in
file_ids = [] dot notation to both the version number and the file list. So
process_id = None a Spec that includes a BPMN, DMN, an a Word file all on the first
file_data_models = session.query(FileDataModel) \ version would be v1.1.1 [12,45,21]"""
file_id_strings = re.findall('\((.*)\)', version)[0].split(".")
file_ids = [int(i) for i in file_id_strings]
files = session.query(FileDataModel)\
.join(FileModel) \
.filter(FileModel.workflow_spec_id == workflow_spec_id)\
.filter(FileDataModel.id.in_(file_ids)).all()
if len(files) != len(file_ids):
raise ApiError("invalid_version",
"The version '%s' of workflow specification '%s' is invalid. Unable to locate the correct files to recreate it." %
(version, workflow_spec_id))
return files
@staticmethod
def __get_latest_file_models(workflow_spec_id):
"""Returns all the latest files related to a workflow specification"""
return session.query(FileDataModel) \
.join(FileModel) \ .join(FileModel) \
.filter(FileModel.workflow_spec_id == workflow_spec_id)\ .filter(FileModel.workflow_spec_id == workflow_spec_id)\
.filter(FileDataModel.version == FileModel.latest_version)\ .filter(FileDataModel.version == FileModel.latest_version)\
.order_by(FileModel.id)\ .order_by(FileModel.id)\
.all() .all()
@staticmethod
def get_spec(workflow_spec_id, version=None):
"""Returns the requested version of the specification,
or the lastest version if none is specified."""
parser = WorkflowProcessor.get_parser()
major_version = 0 # The version of the primary file.
minor_version = [] # The versions of the minor files if any.
file_ids = []
process_id = None
if version is None:
file_data_models = WorkflowProcessor.__get_latest_file_models(workflow_spec_id)
else:
file_data_models = WorkflowProcessor.__get_file_models_for_version(workflow_spec_id, version)
for file_data in file_data_models: for file_data in file_data_models:
file_ids.append(file_data.id) file_ids.append(file_data.id)
if file_data.file_model.type == FileType.bpmn: if file_data.file_model.type == FileType.bpmn:
@ -138,7 +181,7 @@ class WorkflowProcessor(object):
spec = parser.get_spec(process_id) spec = parser.get_spec(process_id)
version = ".".join(str(x) for x in minor_version) version = ".".join(str(x) for x in minor_version)
files = ".".join(str(x) for x in file_ids) files = ".".join(str(x) for x in file_ids)
spec.description = "v%s (%s) " % (version, files) spec.description = "v%s (%s)" % (version, files)
return spec return spec
@staticmethod @staticmethod
@ -155,17 +198,17 @@ class WorkflowProcessor(object):
def create(cls, study_id, workflow_spec_id): def create(cls, study_id, workflow_spec_id):
spec = WorkflowProcessor.get_spec(workflow_spec_id) spec = WorkflowProcessor.get_spec(workflow_spec_id)
bpmn_workflow = BpmnWorkflow(spec, script_engine=cls._script_engine) bpmn_workflow = BpmnWorkflow(spec, script_engine=cls._script_engine)
bpmn_workflow.data[WorkflowProcessor.STUDY_ID_KEY] = study_id
bpmn_workflow.do_engine_steps() bpmn_workflow.do_engine_steps()
workflow_model = WorkflowModel(status=WorkflowProcessor.status_of(bpmn_workflow), workflow_model = WorkflowModel(status=WorkflowProcessor.status_of(bpmn_workflow),
study_id=study_id, study_id=study_id,
workflow_spec_id=workflow_spec_id) workflow_spec_id=workflow_spec_id,
spec_version=spec.description)
session.add(workflow_model) session.add(workflow_model)
session.commit() session.commit()
# Need to commit twice, first to get a unique id for the workflow model, and # Need to commit twice, first to get a unique id for the workflow model, and
# a second time to store the serilaization so we can maintain this link within # a second time to store the serilaization so we can maintain this link within
# the spiff-workflow process. # the spiff-workflow process.
bpmn_workflow.data[WorkflowProcessor.STUDY_ID_KEY] = study_id
bpmn_workflow.data[WorkflowProcessor.SPEC_VERSION_KEY] = spec.description
bpmn_workflow.data[WorkflowProcessor.WORKFLOW_ID_KEY] = workflow_model.id bpmn_workflow.data[WorkflowProcessor.WORKFLOW_ID_KEY] = workflow_model.id
workflow_model.bpmn_workflow_json = WorkflowProcessor._serializer.serialize_workflow(bpmn_workflow) workflow_model.bpmn_workflow_json = WorkflowProcessor._serializer.serialize_workflow(bpmn_workflow)
@ -174,10 +217,13 @@ class WorkflowProcessor(object):
processor = cls(workflow_model) processor = cls(workflow_model)
return processor return processor
def restart_with_current_task_data(self): def hard_reset(self):
"""Recreate this workflow, but keep the data from the last completed task and add it back into the first task. """Recreate this workflow, but keep the data from the last completed task and add it back into the first task.
This may be useful when a workflow specification changes, and users need to review all the This may be useful when a workflow specification changes, and users need to review all the
prior steps, but don't need to reenter all the previous data. """ prior steps, but don't need to reenter all the previous data.
Returns the new version.
"""
spec = WorkflowProcessor.get_spec(self.workflow_spec_id) spec = WorkflowProcessor.get_spec(self.workflow_spec_id)
bpmn_workflow = BpmnWorkflow(spec, script_engine=self._script_engine) bpmn_workflow = BpmnWorkflow(spec, script_engine=self._script_engine)
bpmn_workflow.data = self.bpmn_workflow.data bpmn_workflow.data = self.bpmn_workflow.data
@ -185,6 +231,7 @@ class WorkflowProcessor(object):
task.data = self.bpmn_workflow.last_task.data task.data = self.bpmn_workflow.last_task.data
bpmn_workflow.do_engine_steps() bpmn_workflow.do_engine_steps()
self.bpmn_workflow = bpmn_workflow self.bpmn_workflow = bpmn_workflow
return spec.description
def get_status(self): def get_status(self):
return self.status_of(self.bpmn_workflow) return self.status_of(self.bpmn_workflow)

View File

@ -0,0 +1,28 @@
"""empty message
Revision ID: 301c4fd103d2
Revises: 0ae9742a2b14
Create Date: 2020-03-05 15:36:06.750422
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '301c4fd103d2'
down_revision = '0ae9742a2b14'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('workflow', sa.Column('spec_version', sa.String(), nullable=True))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('workflow', 'spec_version')
# ### end Alembic commands ###

View File

@ -211,9 +211,11 @@ class TestTasksApi(BaseTest):
workflow_api = self.get_workflow_api(workflow) workflow_api = self.get_workflow_api(workflow)
self.complete_form(workflow, workflow_api.user_tasks[0], {"color": "blue"}) self.complete_form(workflow, workflow_api.user_tasks[0], {"color": "blue"})
# Modify the specification, with a major change that alters the flow and can't be serialized effectively. # Modify the specification, with a major change that alters the flow and can't be deserialized
# effectively, if it uses the latest spec files.
file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'two_forms', 'mods', 'two_forms_struc_mod.bpmn') file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'two_forms', 'mods', 'two_forms_struc_mod.bpmn')
self.replace_file("two_forms.bpmn", file_path) self.replace_file("two_forms.bpmn", file_path)
with self.assertRaises(KeyError): workflow_api = self.get_workflow_api(workflow)
workflow_api = self.get_workflow_api(workflow) self.assertTrue(workflow_api.spec_version.startswith("v1 "))
self.assertTrue(workflow_api.latest_spec_version)

View File

@ -248,14 +248,14 @@ class TestWorkflowProcessor(BaseTest):
task.data = {"key": "Value"} task.data = {"key": "Value"}
processor.complete_task(task) processor.complete_task(task)
task_before_restart = processor.next_task() task_before_restart = processor.next_task()
processor.restart_with_current_task_data() processor.hard_reset()
task_after_restart = processor.next_task() task_after_restart = processor.next_task()
self.assertNotEqual(task.get_name(), task_before_restart.get_name()) self.assertNotEqual(task.get_name(), task_before_restart.get_name())
self.assertEqual(task.get_name(), task_after_restart.get_name()) self.assertEqual(task.get_name(), task_after_restart.get_name())
self.assertEqual(task.data, task_after_restart.data) self.assertEqual(task.data, task_after_restart.data)
def test_modify_spec_with_text_change_with_running_workflow(self): def test_soft_reset(self):
self.load_example_data() self.load_example_data()
# Start the two_forms workflow, and enter some data in the first form. # Start the two_forms workflow, and enter some data in the first form.
@ -272,14 +272,22 @@ class TestWorkflowProcessor(BaseTest):
file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'two_forms', 'mods', 'two_forms_text_mod.bpmn') file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'two_forms', 'mods', 'two_forms_text_mod.bpmn')
self.replace_file("two_forms.bpmn", file_path) self.replace_file("two_forms.bpmn", file_path)
# Setting up another processor should not error out, but doesn't pick up the update.
workflow_model.bpmn_workflow_json = processor.serialize() workflow_model.bpmn_workflow_json = processor.serialize()
processor2 = WorkflowProcessor(workflow_model) processor2 = WorkflowProcessor(workflow_model)
self.assertEquals("Step 1", processor2.bpmn_workflow.last_task.task_spec.description) self.assertEquals("Step 1", processor2.bpmn_workflow.last_task.task_spec.description)
self.assertEquals("# This is some documentation I wanted to add.", self.assertNotEquals("# This is some documentation I wanted to add.",
processor2.bpmn_workflow.last_task.task_spec.documentation) processor2.bpmn_workflow.last_task.task_spec.documentation)
# You can do a soft update and get the right response.
processor3 = WorkflowProcessor(workflow_model, soft_reset=True)
self.assertEquals("Step 1", processor3.bpmn_workflow.last_task.task_spec.description)
self.assertEquals("# This is some documentation I wanted to add.",
processor3.bpmn_workflow.last_task.task_spec.documentation)
def test_modify_spec_with_structural_change_with_running_workflow(self):
def test_hard_reset(self):
self.load_example_data() self.load_example_data()
# Start the two_forms workflow, and enter some data in the first form. # Start the two_forms workflow, and enter some data in the first form.
@ -298,15 +306,17 @@ class TestWorkflowProcessor(BaseTest):
file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'two_forms', 'mods', 'two_forms_struc_mod.bpmn') file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'two_forms', 'mods', 'two_forms_struc_mod.bpmn')
self.replace_file("two_forms.bpmn", file_path) self.replace_file("two_forms.bpmn", file_path)
with self.assertRaises(KeyError): # Assure that creating a new processor doesn't cause any issues, and maintains the spec version.
workflow_model.bpmn_workflow_json = processor.serialize() workflow_model.bpmn_workflow_json = processor.serialize()
processor2 = WorkflowProcessor(workflow_model) processor2 = WorkflowProcessor(workflow_model)
self.assertTrue(processor2.get_spec_version().startswith("v1 ")) # Still at version 1.
# Restart the workflow, and the error should go away # Do a hard reset, which should bring us back to the beginning, but retain the data.
processor.restart_with_current_task_data() processor3 = WorkflowProcessor(workflow_model, hard_reset=True)
self.assertEquals("Step 1", processor.next_task().task_spec.description) self.assertEquals("Step 1", processor3.next_task().task_spec.description)
processor.complete_task(processor.next_task()) self.assertEquals({"color": "blue"}, processor3.next_task().data)
self.assertEquals("New Step", processor.next_task().task_spec.description) processor3.complete_task(processor3.next_task())
self.assertEquals({"color": "blue"}, processor.next_task().data) self.assertEquals("New Step", processor3.next_task().task_spec.description)
self.assertEquals({"color": "blue"}, processor3.next_task().data)