diff --git a/crc/__init__.py b/crc/__init__.py index a211b0fa..59ffeac7 100644 --- a/crc/__init__.py +++ b/crc/__init__.py @@ -93,3 +93,10 @@ def clear_db(): """Load example data into the database.""" from example_data import ExampleDataLoader ExampleDataLoader.clean_db() + +@app.cli.command() +def rrt_data_fix(): + """Finds all the empty task event logs, and populates + them with good wholesome data.""" + from crc.services.workflow_service import WorkflowService + WorkflowService.fix_legacy_data_model_for_rrt() diff --git a/crc/api/admin.py b/crc/api/admin.py index 26a1b181..6a27b6da 100644 --- a/crc/api/admin.py +++ b/crc/api/admin.py @@ -1,15 +1,18 @@ # Admin app +import json from flask import url_for from flask_admin import Admin from flask_admin.contrib import sqla from flask_admin.contrib.sqla import ModelView from werkzeug.utils import redirect +from jinja2 import Markup from crc import db, app from crc.api.user import verify_token, verify_token_admin from crc.models.approval import ApprovalModel from crc.models.file import FileModel +from crc.models.stats import TaskEventModel from crc.models.study import StudyModel from crc.models.user import UserModel from crc.models.workflow import WorkflowModel @@ -47,6 +50,18 @@ class WorkflowView(AdminModelView): class FileView(AdminModelView): column_filters = ['workflow_id'] +def json_formatter(view, context, model, name): + value = getattr(model, name) + json_value = json.dumps(value, ensure_ascii=False, indent=2) + return Markup('
{}
'.format(json_value)) + +class TaskEventView(AdminModelView): + column_filters = ['workflow_id', 'action'] + column_list = ['study_id', 'user_id', 'workflow_id', 'action', 'task_title', 'task_data', 'date'] + column_formatters = { + 'task_data': json_formatter, + } + admin = Admin(app) admin.add_view(StudyView(StudyModel, db.session)) @@ -54,3 +69,4 @@ admin.add_view(ApprovalView(ApprovalModel, db.session)) admin.add_view(UserView(UserModel, db.session)) admin.add_view(WorkflowView(WorkflowModel, db.session)) admin.add_view(FileView(FileModel, db.session)) +admin.add_view(TaskEventView(TaskEventModel, db.session)) diff --git a/crc/api/workflow.py b/crc/api/workflow.py index 4d1667dd..14c40df5 100644 --- a/crc/api/workflow.py +++ b/crc/api/workflow.py @@ -96,66 +96,10 @@ def delete_workflow_specification(spec_id): session.commit() -def __get_workflow_api_model(processor: WorkflowProcessor, next_task = None): - """Returns an API model representing the state of the current workflow, if requested, and - possible, next_task is set to the current_task.""" - - nav_dict = processor.bpmn_workflow.get_nav_list() - navigation = [] - for nav_item in nav_dict: - spiff_task = processor.bpmn_workflow.get_task(nav_item['task_id']) - if 'description' in nav_item: - nav_item['title'] = nav_item.pop('description') - # fixme: duplicate code from the workflow_service. Should only do this in one place. - if ' ' in nav_item['title']: - nav_item['title'] = nav_item['title'].partition(' ')[2] - else: - nav_item['title'] = "" - if spiff_task: - nav_item['task'] = WorkflowService.spiff_task_to_api_task(spiff_task, add_docs_and_forms=False) - nav_item['title'] = nav_item['task'].title # Prefer the task title. - else: - nav_item['task'] = None - if not 'is_decision' in nav_item: - nav_item['is_decision'] = False - - navigation.append(NavigationItem(**nav_item)) - NavigationItemSchema().dump(nav_item) - - spec = session.query(WorkflowSpecModel).filter_by(id=processor.workflow_spec_id).first() - workflow_api = WorkflowApi( - id=processor.get_workflow_id(), - status=processor.get_status(), - next_task=None, - navigation=navigation, - workflow_spec_id=processor.workflow_spec_id, - spec_version=processor.get_version_string(), - is_latest_spec=processor.is_latest_spec, - total_tasks=len(navigation), - completed_tasks=processor.workflow_model.completed_tasks, - last_updated=processor.workflow_model.last_updated, - ) - if not next_task: # The Next Task can be requested to be a certain task, useful for parallel tasks. - # This may or may not work, sometimes there is no next task to complete. - next_task = processor.next_task() - if next_task: - latest_event = session.query(TaskEventModel) \ - .filter_by(workflow_id=processor.workflow_model.id) \ - .filter_by(task_name=next_task.task_spec.name) \ - .filter_by(action=WorkflowService.TASK_ACTION_COMPLETE) \ - .order_by(TaskEventModel.date.desc()).first() - if latest_event: - next_task.data = DeepMerge.merge(next_task.data, latest_event.task_data) - - workflow_api.next_task = WorkflowService.spiff_task_to_api_task(next_task, add_docs_and_forms=True) - - return workflow_api - - def get_workflow(workflow_id, soft_reset=False, hard_reset=False): workflow_model: WorkflowModel = session.query(WorkflowModel).filter_by(id=workflow_id).first() processor = WorkflowProcessor(workflow_model, soft_reset=soft_reset, hard_reset=hard_reset) - workflow_api_model = __get_workflow_api_model(processor) + workflow_api_model = WorkflowService.processor_to_workflow_api(processor) return WorkflowApiSchema().dump(workflow_api_model) @@ -181,7 +125,7 @@ def set_current_task(workflow_id, task_id): WorkflowService.log_task_action(user_uid, workflow_model, spiff_task, WorkflowService.TASK_ACTION_TOKEN_RESET, version=processor.get_version_string()) - workflow_api_model = __get_workflow_api_model(processor, spiff_task) + workflow_api_model = WorkflowService.processor_to_workflow_api(processor, spiff_task) return WorkflowApiSchema().dump(workflow_api_model) @@ -209,7 +153,7 @@ def update_task(workflow_id, task_id, body): WorkflowService.log_task_action(user_uid, workflow_model, spiff_task, WorkflowService.TASK_ACTION_COMPLETE, version=processor.get_version_string(), updated_data=spiff_task.data) - workflow_api_model = __get_workflow_api_model(processor) + workflow_api_model = WorkflowService.processor_to_workflow_api(processor) return WorkflowApiSchema().dump(workflow_api_model) diff --git a/crc/models/api_models.py b/crc/models/api_models.py index 53706a75..361b9183 100644 --- a/crc/models/api_models.py +++ b/crc/models/api_models.py @@ -36,6 +36,7 @@ class Task(object): PROP_OPTIONS_FILE = "spreadsheet.name" PROP_OPTIONS_VALUE_COLUMN = "spreadsheet.value.column" PROP_OPTIONS_LABEL_COL = "spreadsheet.label.column" + PROP_OPTIONS_READ_ONLY = "read_only" PROP_LDAP_LOOKUP = "ldap.lookup" VALIDATION_REQUIRED = "required" FIELD_TYPE_AUTO_COMPLETE = "autocomplete" diff --git a/crc/services/workflow_processor.py b/crc/services/workflow_processor.py index e5cbe0a3..c84aa3fa 100644 --- a/crc/services/workflow_processor.py +++ b/crc/services/workflow_processor.py @@ -102,14 +102,15 @@ class WorkflowProcessor(object): def __init__(self, workflow_model: WorkflowModel, soft_reset=False, hard_reset=False, validate_only=False): """Create a Workflow Processor based on the serialized information available in the workflow model. - If soft_reset is set to true, it will try to use the latest version of the workflow specification. - If hard_reset is set to true, it will create a new Workflow, but embed the data from the last - completed task in the previous workflow. + If soft_reset is set to true, it will try to use the latest version of the workflow specification + without resetting to the beginning of the workflow. This will work for some minor changes to the spec. + If hard_reset is set to true, it will use the latest spec, and start the workflow over from the beginning. + which should work in casees where a soft reset fails. If neither flag is set, it will use the same version of the specification that was used to originally create the workflow model. """ self.workflow_model = workflow_model - if soft_reset or len(workflow_model.dependencies) == 0: + if soft_reset or len(workflow_model.dependencies) == 0: # Depenencies of 0 means the workflow was never started. self.spec_data_files = FileService.get_spec_data_files( workflow_spec_id=workflow_model.workflow_spec_id) else: @@ -216,8 +217,6 @@ class WorkflowProcessor(object): full_version = "v%s (%s)" % (version, files) return full_version - - def update_dependencies(self, spec_data_files): existing_dependencies = FileService.get_spec_data_files( workflow_spec_id=self.workflow_model.workflow_spec_id, @@ -299,25 +298,12 @@ class WorkflowProcessor(object): return WorkflowStatus.waiting def hard_reset(self): - """Recreate this workflow, but keep the data from the last completed task and add - it back into the first task. This may be useful when a workflow specification changes, - and users need to review all the prior steps, but they don't need to reenter all the previous data. - - Returns the new version. + """Recreate this workflow. This will be useful when a workflow specification changes. """ - - # Create a new workflow based on the latest specs. self.spec_data_files = FileService.get_spec_data_files(workflow_spec_id=self.workflow_spec_id) new_spec = WorkflowProcessor.get_spec(self.spec_data_files, self.workflow_spec_id) new_bpmn_workflow = BpmnWorkflow(new_spec, script_engine=self._script_engine) new_bpmn_workflow.data = self.bpmn_workflow.data - - # Reset the current workflow to the beginning - which we will consider to be the first task after the root - # element. This feels a little sketchy, but I think it is safe to assume root will have one child. - first_task = self.bpmn_workflow.task_tree.children[0] - first_task.reset_token(reset_data=True) # Clear out the data. - for task in new_bpmn_workflow.get_tasks(SpiffTask.READY): - task.data = first_task.data new_bpmn_workflow.do_engine_steps() self.bpmn_workflow = new_bpmn_workflow diff --git a/crc/services/workflow_service.py b/crc/services/workflow_service.py index a8860886..3b064954 100644 --- a/crc/services/workflow_service.py +++ b/crc/services/workflow_service.py @@ -1,3 +1,4 @@ +import copy import string from datetime import datetime import random @@ -9,16 +10,17 @@ from SpiffWorkflow.bpmn.specs.ScriptTask import ScriptTask from SpiffWorkflow.bpmn.specs.UserTask import UserTask from SpiffWorkflow.dmn.specs.BusinessRuleTask import BusinessRuleTask from SpiffWorkflow.specs import CancelTask, StartTask +from SpiffWorkflow.util.deep_merge import DeepMerge from jinja2 import Template from crc import db, app from crc.api.common import ApiError -from crc.models.api_models import Task, MultiInstanceType +from crc.models.api_models import Task, MultiInstanceType, NavigationItem, NavigationItemSchema, WorkflowApi from crc.models.file import LookupDataModel from crc.models.stats import TaskEventModel from crc.models.study import StudyModel from crc.models.user import UserModel -from crc.models.workflow import WorkflowModel, WorkflowStatus +from crc.models.workflow import WorkflowModel, WorkflowStatus, WorkflowSpecModel from crc.services.file_service import FileService from crc.services.lookup_service import LookupService from crc.services.study_service import StudyService @@ -179,13 +181,81 @@ class WorkflowService(object): def __get_options(self): pass - @staticmethod def _random_string(string_length=10): """Generate a random string of fixed length """ letters = string.ascii_lowercase return ''.join(random.choice(letters) for i in range(string_length)) + @staticmethod + def processor_to_workflow_api(processor: WorkflowProcessor, next_task=None): + """Returns an API model representing the state of the current workflow, if requested, and + possible, next_task is set to the current_task.""" + + nav_dict = processor.bpmn_workflow.get_nav_list() + navigation = [] + for nav_item in nav_dict: + spiff_task = processor.bpmn_workflow.get_task(nav_item['task_id']) + if 'description' in nav_item: + nav_item['title'] = nav_item.pop('description') + # fixme: duplicate code from the workflow_service. Should only do this in one place. + if ' ' in nav_item['title']: + nav_item['title'] = nav_item['title'].partition(' ')[2] + else: + nav_item['title'] = "" + if spiff_task: + nav_item['task'] = WorkflowService.spiff_task_to_api_task(spiff_task, add_docs_and_forms=False) + nav_item['title'] = nav_item['task'].title # Prefer the task title. + else: + nav_item['task'] = None + if not 'is_decision' in nav_item: + nav_item['is_decision'] = False + + navigation.append(NavigationItem(**nav_item)) + NavigationItemSchema().dump(nav_item) + + spec = db.session.query(WorkflowSpecModel).filter_by(id=processor.workflow_spec_id).first() + workflow_api = WorkflowApi( + id=processor.get_workflow_id(), + status=processor.get_status(), + next_task=None, + navigation=navigation, + workflow_spec_id=processor.workflow_spec_id, + spec_version=processor.get_version_string(), + is_latest_spec=processor.is_latest_spec, + total_tasks=len(navigation), + completed_tasks=processor.workflow_model.completed_tasks, + last_updated=processor.workflow_model.last_updated, + title=spec.display_name + ) + if not next_task: # The Next Task can be requested to be a certain task, useful for parallel tasks. + # This may or may not work, sometimes there is no next task to complete. + next_task = processor.next_task() + if next_task: + workflow_api.next_task = WorkflowService.spiff_task_to_api_task(next_task, add_docs_and_forms=True) + + return workflow_api + + @staticmethod + def get_previously_submitted_data(workflow_id, task): + """ If the user has completed this task previously, find that data in the task events table, and return it.""" + latest_event = db.session.query(TaskEventModel) \ + .filter_by(workflow_id=workflow_id) \ + .filter_by(task_name=task.task_spec.name) \ + .filter_by(action=WorkflowService.TASK_ACTION_COMPLETE) \ + .order_by(TaskEventModel.date.desc()).first() + if latest_event: + if latest_event.task_data is not None: + return latest_event.task_data + else: + app.logger.error("missing_task_data", "We have lost data for workflow %i, task %s, it is not " + "in the task event model, " + "and it should be." % (workflow_id, task.task_spec.name)) + return {} + else: + return {} + + @staticmethod def spiff_task_to_api_task(spiff_task, add_docs_and_forms=False): task_type = spiff_task.task_spec.__class__.__name__ @@ -342,3 +412,67 @@ class WorkflowService(object): db.session.add(task_event) db.session.commit() + @staticmethod + def fix_legacy_data_model_for_rrt(): + """ Remove this after use! This is just to fix RRT so the data is handled correctly. + + Utility that is likely called via the flask command line, it will loop through all the + workflows in the system and attempt to add the right data into the task action log so that + users do not have to re fill out all of the forms if they start over or go back in the workflow. + Viciously inefficient, but should only have to run one time for RRT""" + workflows = db.session.query(WorkflowModel).all() + for workflow_model in workflows: + task_logs = db.session.query(TaskEventModel) \ + .filter(TaskEventModel.workflow_id == workflow_model.id) \ + .filter(TaskEventModel.action == WorkflowService.TASK_ACTION_COMPLETE) \ + .order_by(TaskEventModel.date.desc()).all() + + processor = WorkflowProcessor(workflow_model) + # Grab all the data from last task completed, which will be everything in this + # rrt situation because of how we were keeping all the data at the time. + latest_data = processor.next_task().data + + # Move forward in the task spec tree, dropping any data that would have been + # added in subsequent tasks, just looking at form data, will not track the automated + # task data additions, hopefully this doesn't hang us. + for log in task_logs: + if log.task_data is not None: # Only do this if the task event does not have data populated in it. + continue + data = copy.deepcopy(latest_data) # Or you end up with insane crazy issues. + # In the simple case of RRT, there is exactly one task for the given task_spec + task = processor.bpmn_workflow.get_tasks_from_spec_name(log.task_name)[0] + data = WorkflowService.__remove_data_added_by_children(data, task.children[0]) + log.task_data = data + db.session.add(log) + + db.session.commit() + + @staticmethod + def __remove_data_added_by_children(latest_data, child_task): + """Removes data from latest_data that would be added by the child task or any of it's children.""" + if hasattr(child_task.task_spec, 'form'): + for field in child_task.task_spec.form.fields: + latest_data.pop(field.id, None) + if field.has_property(Task.PROP_OPTIONS_READ_ONLY) and \ + field.get_property(Task.PROP_OPTIONS_READ_ONLY).lower().strip() == "true": + continue # Don't pop off read only fields. + if field.has_property(Task.PROP_OPTIONS_REPEAT): + group = field.get_property(Task.PROP_OPTIONS_REPEAT) + group_data = [] + if group in latest_data: + for item in latest_data[group]: + item.pop(field.id, None) + if item: + group_data.append(item) + latest_data[group] = group_data + if not latest_data[group]: + latest_data.pop(group, None) + if isinstance(child_task.task_spec, BusinessRuleTask): + for output in child_task.task_spec.dmnEngine.decisionTable.outputs: + latest_data.pop(output.name, None) + for child in child_task.children: + latest_data = WorkflowService.__remove_data_added_by_children(latest_data, child) + return latest_data + + + diff --git a/migrations/versions/3876e130664e_.py b/migrations/versions/1fdd1bdb600e_.py similarity index 78% rename from migrations/versions/3876e130664e_.py rename to migrations/versions/1fdd1bdb600e_.py index 31e7ce13..dff1fdae 100644 --- a/migrations/versions/3876e130664e_.py +++ b/migrations/versions/1fdd1bdb600e_.py @@ -1,8 +1,8 @@ """empty message -Revision ID: 3876e130664e -Revises: 5064b72284b7 -Create Date: 2020-06-01 15:39:53.937591 +Revision ID: 1fdd1bdb600e +Revises: 17597692d0b0 +Create Date: 2020-06-17 16:44:16.427988 """ from alembic import op @@ -10,8 +10,8 @@ import sqlalchemy as sa # revision identifiers, used by Alembic. -revision = '3876e130664e' -down_revision = '5064b72284b7' +revision = '1fdd1bdb600e' +down_revision = '17597692d0b0' branch_labels = None depends_on = None diff --git a/tests/test_workflow_service.py b/tests/test_workflow_service.py index 9f3ceda1..6f0fa5e3 100644 --- a/tests/test_workflow_service.py +++ b/tests/test_workflow_service.py @@ -1,7 +1,14 @@ +import json + from tests.base_test import BaseTest from crc.services.workflow_processor import WorkflowProcessor from crc.services.workflow_service import WorkflowService +from SpiffWorkflow import Task as SpiffTask, WorkflowException +from example_data import ExampleDataLoader +from crc import db +from crc.models.stats import TaskEventModel +from crc.models.api_models import Task class TestWorkflowService(BaseTest): @@ -78,4 +85,53 @@ class TestWorkflowService(BaseTest): task = processor.next_task() task_api = WorkflowService.spiff_task_to_api_task(task, add_docs_and_forms=True) WorkflowService.populate_form_with_random_data(task, task_api, required_only=False) - self.assertTrue(isinstance(task.data["sponsor"], dict)) \ No newline at end of file + self.assertTrue(isinstance(task.data["sponsor"], dict)) + + def test_fix_legacy_data_model_for_rrt(self): + ExampleDataLoader().load_rrt() # Make sure the research_rampup is loaded, as it's not a test spec. + workflow = self.create_workflow('research_rampup') + processor = WorkflowProcessor(workflow, validate_only=True) + + # Use the test spec code to complete the workflow of research rampup. + while not processor.bpmn_workflow.is_completed(): + processor.bpmn_workflow.do_engine_steps() + tasks = processor.bpmn_workflow.get_tasks(SpiffTask.READY) + for task in tasks: + task_api = WorkflowService.spiff_task_to_api_task(task, add_docs_and_forms=True) + WorkflowService.populate_form_with_random_data(task, task_api, False) + task.complete() + # create the task events with no task_data in them. + WorkflowService.log_task_action('dhf8r', workflow, task, + WorkflowService.TASK_ACTION_COMPLETE, + version=processor.get_version_string(), + updated_data=None) + processor.save() + db.session.commit() + + WorkflowService.fix_legacy_data_model_for_rrt() + + # All tasks should now have data associated with them. + task_logs = db.session.query(TaskEventModel) \ + .filter(TaskEventModel.workflow_id == workflow.id) \ + .filter(TaskEventModel.action == WorkflowService.TASK_ACTION_COMPLETE) \ + .order_by(TaskEventModel.date).all() # Get them back in order. + + self.assertEqual(17, len(task_logs)) + for log in task_logs: + task = processor.bpmn_workflow.get_tasks_from_spec_name(log.task_name)[0] + self.assertIsNotNone(log.task_data) + # Each task should have the data in the form for that task in the task event. + if hasattr(task.task_spec, 'form'): + for field in task.task_spec.form.fields: + if field.has_property(Task.PROP_OPTIONS_REPEAT): + self.assertIn(field.get_property(Task.PROP_OPTIONS_REPEAT), log.task_data) + else: + self.assertIn(field.id, log.task_data) + + # Some spot checks: + # The first task should be empty, with all the data removed. + self.assertEqual({}, task_logs[0].task_data) + + # The last task should have all the data. + self.assertDictEqual(processor.bpmn_workflow.last_task.data, task_logs[16].task_data) +