Continuing a major refactor. Some important points:

* TaskEvents now contain the data for each event as it was when the task was completed.
* When loading a task for the front end, if the task was completed previously, we take that data, and overwrite it with the lastest data, allowing users to see previously entered values.
* Pulling in the Admin branch, as there are changes in that branch that are critical to seeing what is happening when we do this thing.
* Moved code for converting a workflow to an API ready data stricture into the Workflow service where it belongs, and out of the API.
* Hard resets just convert to using the latest spec, they don't try to keep the data from the last task.  There is a better way.
* Moving to a previous task does not attept to keep the data from the last completed task.
* Added a function that will fix all the existing RRT data by adding critical data into the TaskEvent model. This can be called with from the flask command line tool.
This commit is contained in:
Dan Funk 2020-06-17 17:11:15 -04:00
parent cf09d986ea
commit 3b57adb84c
8 changed files with 232 additions and 88 deletions

View File

@ -93,3 +93,10 @@ def clear_db():
"""Load example data into the database."""
from example_data import ExampleDataLoader
ExampleDataLoader.clean_db()
@app.cli.command()
def rrt_data_fix():
"""Finds all the empty task event logs, and populates
them with good wholesome data."""
from crc.services.workflow_service import WorkflowService
WorkflowService.fix_legacy_data_model_for_rrt()

View File

@ -1,15 +1,18 @@
# Admin app
import json
from flask import url_for
from flask_admin import Admin
from flask_admin.contrib import sqla
from flask_admin.contrib.sqla import ModelView
from werkzeug.utils import redirect
from jinja2 import Markup
from crc import db, app
from crc.api.user import verify_token, verify_token_admin
from crc.models.approval import ApprovalModel
from crc.models.file import FileModel
from crc.models.stats import TaskEventModel
from crc.models.study import StudyModel
from crc.models.user import UserModel
from crc.models.workflow import WorkflowModel
@ -47,6 +50,18 @@ class WorkflowView(AdminModelView):
class FileView(AdminModelView):
column_filters = ['workflow_id']
def json_formatter(view, context, model, name):
value = getattr(model, name)
json_value = json.dumps(value, ensure_ascii=False, indent=2)
return Markup('<pre>{}</pre>'.format(json_value))
class TaskEventView(AdminModelView):
column_filters = ['workflow_id', 'action']
column_list = ['study_id', 'user_id', 'workflow_id', 'action', 'task_title', 'task_data', 'date']
column_formatters = {
'task_data': json_formatter,
}
admin = Admin(app)
admin.add_view(StudyView(StudyModel, db.session))
@ -54,3 +69,4 @@ admin.add_view(ApprovalView(ApprovalModel, db.session))
admin.add_view(UserView(UserModel, db.session))
admin.add_view(WorkflowView(WorkflowModel, db.session))
admin.add_view(FileView(FileModel, db.session))
admin.add_view(TaskEventView(TaskEventModel, db.session))

View File

@ -96,66 +96,10 @@ def delete_workflow_specification(spec_id):
session.commit()
def __get_workflow_api_model(processor: WorkflowProcessor, next_task = None):
"""Returns an API model representing the state of the current workflow, if requested, and
possible, next_task is set to the current_task."""
nav_dict = processor.bpmn_workflow.get_nav_list()
navigation = []
for nav_item in nav_dict:
spiff_task = processor.bpmn_workflow.get_task(nav_item['task_id'])
if 'description' in nav_item:
nav_item['title'] = nav_item.pop('description')
# fixme: duplicate code from the workflow_service. Should only do this in one place.
if ' ' in nav_item['title']:
nav_item['title'] = nav_item['title'].partition(' ')[2]
else:
nav_item['title'] = ""
if spiff_task:
nav_item['task'] = WorkflowService.spiff_task_to_api_task(spiff_task, add_docs_and_forms=False)
nav_item['title'] = nav_item['task'].title # Prefer the task title.
else:
nav_item['task'] = None
if not 'is_decision' in nav_item:
nav_item['is_decision'] = False
navigation.append(NavigationItem(**nav_item))
NavigationItemSchema().dump(nav_item)
spec = session.query(WorkflowSpecModel).filter_by(id=processor.workflow_spec_id).first()
workflow_api = WorkflowApi(
id=processor.get_workflow_id(),
status=processor.get_status(),
next_task=None,
navigation=navigation,
workflow_spec_id=processor.workflow_spec_id,
spec_version=processor.get_version_string(),
is_latest_spec=processor.is_latest_spec,
total_tasks=len(navigation),
completed_tasks=processor.workflow_model.completed_tasks,
last_updated=processor.workflow_model.last_updated,
)
if not next_task: # The Next Task can be requested to be a certain task, useful for parallel tasks.
# This may or may not work, sometimes there is no next task to complete.
next_task = processor.next_task()
if next_task:
latest_event = session.query(TaskEventModel) \
.filter_by(workflow_id=processor.workflow_model.id) \
.filter_by(task_name=next_task.task_spec.name) \
.filter_by(action=WorkflowService.TASK_ACTION_COMPLETE) \
.order_by(TaskEventModel.date.desc()).first()
if latest_event:
next_task.data = DeepMerge.merge(next_task.data, latest_event.task_data)
workflow_api.next_task = WorkflowService.spiff_task_to_api_task(next_task, add_docs_and_forms=True)
return workflow_api
def get_workflow(workflow_id, soft_reset=False, hard_reset=False):
workflow_model: WorkflowModel = session.query(WorkflowModel).filter_by(id=workflow_id).first()
processor = WorkflowProcessor(workflow_model, soft_reset=soft_reset, hard_reset=hard_reset)
workflow_api_model = __get_workflow_api_model(processor)
workflow_api_model = WorkflowService.processor_to_workflow_api(processor)
return WorkflowApiSchema().dump(workflow_api_model)
@ -181,7 +125,7 @@ def set_current_task(workflow_id, task_id):
WorkflowService.log_task_action(user_uid, workflow_model, spiff_task,
WorkflowService.TASK_ACTION_TOKEN_RESET,
version=processor.get_version_string())
workflow_api_model = __get_workflow_api_model(processor, spiff_task)
workflow_api_model = WorkflowService.processor_to_workflow_api(processor, spiff_task)
return WorkflowApiSchema().dump(workflow_api_model)
@ -209,7 +153,7 @@ def update_task(workflow_id, task_id, body):
WorkflowService.log_task_action(user_uid, workflow_model, spiff_task, WorkflowService.TASK_ACTION_COMPLETE,
version=processor.get_version_string(),
updated_data=spiff_task.data)
workflow_api_model = __get_workflow_api_model(processor)
workflow_api_model = WorkflowService.processor_to_workflow_api(processor)
return WorkflowApiSchema().dump(workflow_api_model)

View File

@ -36,6 +36,7 @@ class Task(object):
PROP_OPTIONS_FILE = "spreadsheet.name"
PROP_OPTIONS_VALUE_COLUMN = "spreadsheet.value.column"
PROP_OPTIONS_LABEL_COL = "spreadsheet.label.column"
PROP_OPTIONS_READ_ONLY = "read_only"
PROP_LDAP_LOOKUP = "ldap.lookup"
VALIDATION_REQUIRED = "required"
FIELD_TYPE_AUTO_COMPLETE = "autocomplete"

View File

@ -102,14 +102,15 @@ class WorkflowProcessor(object):
def __init__(self, workflow_model: WorkflowModel, soft_reset=False, hard_reset=False, validate_only=False):
"""Create a Workflow Processor based on the serialized information available in the workflow model.
If soft_reset is set to true, it will try to use the latest version of the workflow specification.
If hard_reset is set to true, it will create a new Workflow, but embed the data from the last
completed task in the previous workflow.
If soft_reset is set to true, it will try to use the latest version of the workflow specification
without resetting to the beginning of the workflow. This will work for some minor changes to the spec.
If hard_reset is set to true, it will use the latest spec, and start the workflow over from the beginning.
which should work in casees where a soft reset fails.
If neither flag is set, it will use the same version of the specification that was used to originally
create the workflow model. """
self.workflow_model = workflow_model
if soft_reset or len(workflow_model.dependencies) == 0:
if soft_reset or len(workflow_model.dependencies) == 0: # Depenencies of 0 means the workflow was never started.
self.spec_data_files = FileService.get_spec_data_files(
workflow_spec_id=workflow_model.workflow_spec_id)
else:
@ -216,8 +217,6 @@ class WorkflowProcessor(object):
full_version = "v%s (%s)" % (version, files)
return full_version
def update_dependencies(self, spec_data_files):
existing_dependencies = FileService.get_spec_data_files(
workflow_spec_id=self.workflow_model.workflow_spec_id,
@ -299,25 +298,12 @@ class WorkflowProcessor(object):
return WorkflowStatus.waiting
def hard_reset(self):
"""Recreate this workflow, but keep the data from the last completed task and add
it back into the first task. This may be useful when a workflow specification changes,
and users need to review all the prior steps, but they don't need to reenter all the previous data.
Returns the new version.
"""Recreate this workflow. This will be useful when a workflow specification changes.
"""
# Create a new workflow based on the latest specs.
self.spec_data_files = FileService.get_spec_data_files(workflow_spec_id=self.workflow_spec_id)
new_spec = WorkflowProcessor.get_spec(self.spec_data_files, self.workflow_spec_id)
new_bpmn_workflow = BpmnWorkflow(new_spec, script_engine=self._script_engine)
new_bpmn_workflow.data = self.bpmn_workflow.data
# Reset the current workflow to the beginning - which we will consider to be the first task after the root
# element. This feels a little sketchy, but I think it is safe to assume root will have one child.
first_task = self.bpmn_workflow.task_tree.children[0]
first_task.reset_token(reset_data=True) # Clear out the data.
for task in new_bpmn_workflow.get_tasks(SpiffTask.READY):
task.data = first_task.data
new_bpmn_workflow.do_engine_steps()
self.bpmn_workflow = new_bpmn_workflow

View File

@ -1,3 +1,4 @@
import copy
import string
from datetime import datetime
import random
@ -9,16 +10,17 @@ from SpiffWorkflow.bpmn.specs.ScriptTask import ScriptTask
from SpiffWorkflow.bpmn.specs.UserTask import UserTask
from SpiffWorkflow.dmn.specs.BusinessRuleTask import BusinessRuleTask
from SpiffWorkflow.specs import CancelTask, StartTask
from SpiffWorkflow.util.deep_merge import DeepMerge
from jinja2 import Template
from crc import db, app
from crc.api.common import ApiError
from crc.models.api_models import Task, MultiInstanceType
from crc.models.api_models import Task, MultiInstanceType, NavigationItem, NavigationItemSchema, WorkflowApi
from crc.models.file import LookupDataModel
from crc.models.stats import TaskEventModel
from crc.models.study import StudyModel
from crc.models.user import UserModel
from crc.models.workflow import WorkflowModel, WorkflowStatus
from crc.models.workflow import WorkflowModel, WorkflowStatus, WorkflowSpecModel
from crc.services.file_service import FileService
from crc.services.lookup_service import LookupService
from crc.services.study_service import StudyService
@ -179,13 +181,81 @@ class WorkflowService(object):
def __get_options(self):
pass
@staticmethod
def _random_string(string_length=10):
"""Generate a random string of fixed length """
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(string_length))
@staticmethod
def processor_to_workflow_api(processor: WorkflowProcessor, next_task=None):
"""Returns an API model representing the state of the current workflow, if requested, and
possible, next_task is set to the current_task."""
nav_dict = processor.bpmn_workflow.get_nav_list()
navigation = []
for nav_item in nav_dict:
spiff_task = processor.bpmn_workflow.get_task(nav_item['task_id'])
if 'description' in nav_item:
nav_item['title'] = nav_item.pop('description')
# fixme: duplicate code from the workflow_service. Should only do this in one place.
if ' ' in nav_item['title']:
nav_item['title'] = nav_item['title'].partition(' ')[2]
else:
nav_item['title'] = ""
if spiff_task:
nav_item['task'] = WorkflowService.spiff_task_to_api_task(spiff_task, add_docs_and_forms=False)
nav_item['title'] = nav_item['task'].title # Prefer the task title.
else:
nav_item['task'] = None
if not 'is_decision' in nav_item:
nav_item['is_decision'] = False
navigation.append(NavigationItem(**nav_item))
NavigationItemSchema().dump(nav_item)
spec = db.session.query(WorkflowSpecModel).filter_by(id=processor.workflow_spec_id).first()
workflow_api = WorkflowApi(
id=processor.get_workflow_id(),
status=processor.get_status(),
next_task=None,
navigation=navigation,
workflow_spec_id=processor.workflow_spec_id,
spec_version=processor.get_version_string(),
is_latest_spec=processor.is_latest_spec,
total_tasks=len(navigation),
completed_tasks=processor.workflow_model.completed_tasks,
last_updated=processor.workflow_model.last_updated,
title=spec.display_name
)
if not next_task: # The Next Task can be requested to be a certain task, useful for parallel tasks.
# This may or may not work, sometimes there is no next task to complete.
next_task = processor.next_task()
if next_task:
workflow_api.next_task = WorkflowService.spiff_task_to_api_task(next_task, add_docs_and_forms=True)
return workflow_api
@staticmethod
def get_previously_submitted_data(workflow_id, task):
""" If the user has completed this task previously, find that data in the task events table, and return it."""
latest_event = db.session.query(TaskEventModel) \
.filter_by(workflow_id=workflow_id) \
.filter_by(task_name=task.task_spec.name) \
.filter_by(action=WorkflowService.TASK_ACTION_COMPLETE) \
.order_by(TaskEventModel.date.desc()).first()
if latest_event:
if latest_event.task_data is not None:
return latest_event.task_data
else:
app.logger.error("missing_task_data", "We have lost data for workflow %i, task %s, it is not "
"in the task event model, "
"and it should be." % (workflow_id, task.task_spec.name))
return {}
else:
return {}
@staticmethod
def spiff_task_to_api_task(spiff_task, add_docs_and_forms=False):
task_type = spiff_task.task_spec.__class__.__name__
@ -342,3 +412,67 @@ class WorkflowService(object):
db.session.add(task_event)
db.session.commit()
@staticmethod
def fix_legacy_data_model_for_rrt():
""" Remove this after use! This is just to fix RRT so the data is handled correctly.
Utility that is likely called via the flask command line, it will loop through all the
workflows in the system and attempt to add the right data into the task action log so that
users do not have to re fill out all of the forms if they start over or go back in the workflow.
Viciously inefficient, but should only have to run one time for RRT"""
workflows = db.session.query(WorkflowModel).all()
for workflow_model in workflows:
task_logs = db.session.query(TaskEventModel) \
.filter(TaskEventModel.workflow_id == workflow_model.id) \
.filter(TaskEventModel.action == WorkflowService.TASK_ACTION_COMPLETE) \
.order_by(TaskEventModel.date.desc()).all()
processor = WorkflowProcessor(workflow_model)
# Grab all the data from last task completed, which will be everything in this
# rrt situation because of how we were keeping all the data at the time.
latest_data = processor.next_task().data
# Move forward in the task spec tree, dropping any data that would have been
# added in subsequent tasks, just looking at form data, will not track the automated
# task data additions, hopefully this doesn't hang us.
for log in task_logs:
if log.task_data is not None: # Only do this if the task event does not have data populated in it.
continue
data = copy.deepcopy(latest_data) # Or you end up with insane crazy issues.
# In the simple case of RRT, there is exactly one task for the given task_spec
task = processor.bpmn_workflow.get_tasks_from_spec_name(log.task_name)[0]
data = WorkflowService.__remove_data_added_by_children(data, task.children[0])
log.task_data = data
db.session.add(log)
db.session.commit()
@staticmethod
def __remove_data_added_by_children(latest_data, child_task):
"""Removes data from latest_data that would be added by the child task or any of it's children."""
if hasattr(child_task.task_spec, 'form'):
for field in child_task.task_spec.form.fields:
latest_data.pop(field.id, None)
if field.has_property(Task.PROP_OPTIONS_READ_ONLY) and \
field.get_property(Task.PROP_OPTIONS_READ_ONLY).lower().strip() == "true":
continue # Don't pop off read only fields.
if field.has_property(Task.PROP_OPTIONS_REPEAT):
group = field.get_property(Task.PROP_OPTIONS_REPEAT)
group_data = []
if group in latest_data:
for item in latest_data[group]:
item.pop(field.id, None)
if item:
group_data.append(item)
latest_data[group] = group_data
if not latest_data[group]:
latest_data.pop(group, None)
if isinstance(child_task.task_spec, BusinessRuleTask):
for output in child_task.task_spec.dmnEngine.decisionTable.outputs:
latest_data.pop(output.name, None)
for child in child_task.children:
latest_data = WorkflowService.__remove_data_added_by_children(latest_data, child)
return latest_data

View File

@ -1,8 +1,8 @@
"""empty message
Revision ID: 3876e130664e
Revises: 5064b72284b7
Create Date: 2020-06-01 15:39:53.937591
Revision ID: 1fdd1bdb600e
Revises: 17597692d0b0
Create Date: 2020-06-17 16:44:16.427988
"""
from alembic import op
@ -10,8 +10,8 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '3876e130664e'
down_revision = '5064b72284b7'
revision = '1fdd1bdb600e'
down_revision = '17597692d0b0'
branch_labels = None
depends_on = None

View File

@ -1,7 +1,14 @@
import json
from tests.base_test import BaseTest
from crc.services.workflow_processor import WorkflowProcessor
from crc.services.workflow_service import WorkflowService
from SpiffWorkflow import Task as SpiffTask, WorkflowException
from example_data import ExampleDataLoader
from crc import db
from crc.models.stats import TaskEventModel
from crc.models.api_models import Task
class TestWorkflowService(BaseTest):
@ -78,4 +85,53 @@ class TestWorkflowService(BaseTest):
task = processor.next_task()
task_api = WorkflowService.spiff_task_to_api_task(task, add_docs_and_forms=True)
WorkflowService.populate_form_with_random_data(task, task_api, required_only=False)
self.assertTrue(isinstance(task.data["sponsor"], dict))
self.assertTrue(isinstance(task.data["sponsor"], dict))
def test_fix_legacy_data_model_for_rrt(self):
ExampleDataLoader().load_rrt() # Make sure the research_rampup is loaded, as it's not a test spec.
workflow = self.create_workflow('research_rampup')
processor = WorkflowProcessor(workflow, validate_only=True)
# Use the test spec code to complete the workflow of research rampup.
while not processor.bpmn_workflow.is_completed():
processor.bpmn_workflow.do_engine_steps()
tasks = processor.bpmn_workflow.get_tasks(SpiffTask.READY)
for task in tasks:
task_api = WorkflowService.spiff_task_to_api_task(task, add_docs_and_forms=True)
WorkflowService.populate_form_with_random_data(task, task_api, False)
task.complete()
# create the task events with no task_data in them.
WorkflowService.log_task_action('dhf8r', workflow, task,
WorkflowService.TASK_ACTION_COMPLETE,
version=processor.get_version_string(),
updated_data=None)
processor.save()
db.session.commit()
WorkflowService.fix_legacy_data_model_for_rrt()
# All tasks should now have data associated with them.
task_logs = db.session.query(TaskEventModel) \
.filter(TaskEventModel.workflow_id == workflow.id) \
.filter(TaskEventModel.action == WorkflowService.TASK_ACTION_COMPLETE) \
.order_by(TaskEventModel.date).all() # Get them back in order.
self.assertEqual(17, len(task_logs))
for log in task_logs:
task = processor.bpmn_workflow.get_tasks_from_spec_name(log.task_name)[0]
self.assertIsNotNone(log.task_data)
# Each task should have the data in the form for that task in the task event.
if hasattr(task.task_spec, 'form'):
for field in task.task_spec.form.fields:
if field.has_property(Task.PROP_OPTIONS_REPEAT):
self.assertIn(field.get_property(Task.PROP_OPTIONS_REPEAT), log.task_data)
else:
self.assertIn(field.id, log.task_data)
# Some spot checks:
# The first task should be empty, with all the data removed.
self.assertEqual({}, task_logs[0].task_data)
# The last task should have all the data.
self.assertDictEqual(processor.bpmn_workflow.last_task.data, task_logs[16].task_data)