Don't put all the data into Spiff Tasks on a reload or backtrack, just store the data that gets submitted each time in the task log, and use that.

This should correct issues with parallel tasks and other complex areas - so we don't have tasks seeing data that isn't along their path.
This commit is contained in:
Dan Funk 2020-06-01 17:42:28 -04:00
parent 8e9b847e1b
commit 1db9401166
8 changed files with 73 additions and 69 deletions

View File

@ -1,5 +1,7 @@
import uuid import uuid
from SpiffWorkflow.util.deep_merge import DeepMerge
from crc import session from crc import session
from crc.api.common import ApiError, ApiErrorSchema from crc.api.common import ApiError, ApiErrorSchema
from crc.models.api_models import WorkflowApi, WorkflowApiSchema, NavigationItem, NavigationItemSchema from crc.models.api_models import WorkflowApi, WorkflowApiSchema, NavigationItem, NavigationItemSchema
@ -132,12 +134,19 @@ def __get_workflow_api_model(processor: WorkflowProcessor, next_task = None):
total_tasks=processor.workflow_model.total_tasks, total_tasks=processor.workflow_model.total_tasks,
completed_tasks=processor.workflow_model.completed_tasks, completed_tasks=processor.workflow_model.completed_tasks,
last_updated=processor.workflow_model.last_updated, last_updated=processor.workflow_model.last_updated,
title=spec.display_name
) )
if not next_task: # The Next Task can be requested to be a certain task, useful for parallel tasks. if not next_task: # The Next Task can be requested to be a certain task, useful for parallel tasks.
# This may or may not work, sometimes there is no next task to complete. # This may or may not work, sometimes there is no next task to complete.
next_task = processor.next_task() next_task = processor.next_task()
if next_task: if next_task:
latest_event = session.query(TaskEventModel) \
.filter_by(workflow_id=processor.workflow_model.id) \
.filter_by(task_name=next_task.task_spec.name) \
.filter_by(action=WorkflowService.TASK_ACTION_COMPLETE) \
.order_by(TaskEventModel.date.desc()).first()
if latest_event:
next_task.data = DeepMerge.merge(next_task.data, latest_event.task_data)
workflow_api.next_task = WorkflowService.spiff_task_to_api_task(next_task, add_docs_and_forms=True) workflow_api.next_task = WorkflowService.spiff_task_to_api_task(next_task, add_docs_and_forms=True)
return workflow_api return workflow_api
@ -158,17 +167,22 @@ def set_current_task(workflow_id, task_id):
workflow_model = session.query(WorkflowModel).filter_by(id=workflow_id).first() workflow_model = session.query(WorkflowModel).filter_by(id=workflow_id).first()
processor = WorkflowProcessor(workflow_model) processor = WorkflowProcessor(workflow_model)
task_id = uuid.UUID(task_id) task_id = uuid.UUID(task_id)
task = processor.bpmn_workflow.get_task(task_id) spiff_task = processor.bpmn_workflow.get_task(task_id)
if task.state != task.COMPLETED and task.state != task.READY: if spiff_task.state != spiff_task.COMPLETED and spiff_task.state != spiff_task.READY:
raise ApiError("invalid_state", "You may not move the token to a task who's state is not " raise ApiError("invalid_state", "You may not move the token to a task who's state is not "
"currently set to COMPLETE or READY.") "currently set to COMPLETE or READY.")
# Only reset the token if the task doesn't already have it. # Only reset the token if the task doesn't already have it.
if task.state == task.COMPLETED: if spiff_task.state == spiff_task.COMPLETED:
task.reset_token(reset_data=False) # we could optionally clear the previous data. spiff_task.reset_token(reset_data=True) # Don't try to copy the existing data back into this task.
processor.save() processor.save()
WorkflowService.log_task_action(processor, task, WorkflowService.TASK_ACTION_TOKEN_RESET) task_api = WorkflowService.spiff_task_to_api_task(spiff_task, add_docs_and_forms=True)
workflow_api_model = __get_workflow_api_model(processor, task) WorkflowService.log_task_action(workflow_model, task_api,
WorkflowService.TASK_ACTION_TOKEN_RESET,
version = processor.get_version_string())
workflow_api_model = __get_workflow_api_model(processor, spiff_task)
return WorkflowApiSchema().dump(workflow_api_model) return WorkflowApiSchema().dump(workflow_api_model)
@ -176,15 +190,21 @@ def update_task(workflow_id, task_id, body):
workflow_model = session.query(WorkflowModel).filter_by(id=workflow_id).first() workflow_model = session.query(WorkflowModel).filter_by(id=workflow_id).first()
processor = WorkflowProcessor(workflow_model) processor = WorkflowProcessor(workflow_model)
task_id = uuid.UUID(task_id) task_id = uuid.UUID(task_id)
task = processor.bpmn_workflow.get_task(task_id) spiff_task = processor.bpmn_workflow.get_task(task_id)
if task.state != task.READY: if spiff_task.state != spiff_task.READY:
raise ApiError("invalid_state", "You may not update a task unless it is in the READY state. " raise ApiError("invalid_state", "You may not update a task unless it is in the READY state. "
"Consider calling a token reset to make this task Ready.") "Consider calling a token reset to make this task Ready.")
task.update_data(body) spiff_task.update_data(body)
processor.complete_task(task) processor.complete_task(spiff_task)
processor.do_engine_steps() processor.do_engine_steps()
processor.save() processor.save()
WorkflowService.log_task_action(processor, task, WorkflowService.TASK_ACTION_COMPLETE)
task_api = WorkflowService.spiff_task_to_api_task(spiff_task, add_docs_and_forms=True)
WorkflowService.log_task_action(workflow_model,
task_api,
WorkflowService.TASK_ACTION_COMPLETE,
version = processor.get_version_string(),
updated_data = spiff_task.data)
workflow_api_model = __get_workflow_api_model(processor) workflow_api_model = __get_workflow_api_model(processor)
return WorkflowApiSchema().dump(workflow_api_model) return WorkflowApiSchema().dump(workflow_api_model)

View File

@ -119,7 +119,7 @@ class NavigationItemSchema(ma.Schema):
class WorkflowApi(object): class WorkflowApi(object):
def __init__(self, id, status, next_task, navigation, def __init__(self, id, status, next_task, navigation,
spec_version, is_latest_spec, workflow_spec_id, total_tasks, completed_tasks, last_updated, title): spec_version, is_latest_spec, workflow_spec_id, total_tasks, completed_tasks, last_updated):
self.id = id self.id = id
self.status = status self.status = status
self.next_task = next_task # The next task that requires user input. self.next_task = next_task # The next task that requires user input.
@ -130,14 +130,13 @@ class WorkflowApi(object):
self.total_tasks = total_tasks self.total_tasks = total_tasks
self.completed_tasks = completed_tasks self.completed_tasks = completed_tasks
self.last_updated = last_updated self.last_updated = last_updated
self.title = title
class WorkflowApiSchema(ma.Schema): class WorkflowApiSchema(ma.Schema):
class Meta: class Meta:
model = WorkflowApi model = WorkflowApi
fields = ["id", "status", "next_task", "navigation", fields = ["id", "status", "next_task", "navigation",
"workflow_spec_id", "spec_version", "is_latest_spec", "total_tasks", "completed_tasks", "workflow_spec_id", "spec_version", "is_latest_spec", "total_tasks", "completed_tasks",
"last_updated", "title"] "last_updated"]
unknown = INCLUDE unknown = INCLUDE
status = EnumField(WorkflowStatus) status = EnumField(WorkflowStatus)
@ -148,7 +147,7 @@ class WorkflowApiSchema(ma.Schema):
def make_workflow(self, data, **kwargs): def make_workflow(self, data, **kwargs):
keys = ['id', 'status', 'next_task', 'navigation', keys = ['id', 'status', 'next_task', 'navigation',
'workflow_spec_id', 'spec_version', 'is_latest_spec', "total_tasks", "completed_tasks", 'workflow_spec_id', 'spec_version', 'is_latest_spec', "total_tasks", "completed_tasks",
"last_updated", "title"] "last_updated"]
filtered_fields = {key: data[key] for key in keys} filtered_fields = {key: data[key] for key in keys}
filtered_fields['next_task'] = TaskSchema().make_task(data['next_task']) filtered_fields['next_task'] = TaskSchema().make_task(data['next_task'])
return WorkflowApi(**filtered_fields) return WorkflowApi(**filtered_fields)

View File

@ -17,6 +17,7 @@ class TaskEventModel(db.Model):
task_title = db.Column(db.String) task_title = db.Column(db.String)
task_type = db.Column(db.String) task_type = db.Column(db.String)
task_state = db.Column(db.String) task_state = db.Column(db.String)
task_data = db.Column(db.JSON)
mi_type = db.Column(db.String) mi_type = db.Column(db.String)
mi_count = db.Column(db.Integer) mi_count = db.Column(db.Integer)
mi_index = db.Column(db.Integer) mi_index = db.Column(db.Integer)

View File

@ -315,7 +315,7 @@ class WorkflowProcessor(object):
# Reset the current workflow to the beginning - which we will consider to be the first task after the root # Reset the current workflow to the beginning - which we will consider to be the first task after the root
# element. This feels a little sketchy, but I think it is safe to assume root will have one child. # element. This feels a little sketchy, but I think it is safe to assume root will have one child.
first_task = self.bpmn_workflow.task_tree.children[0] first_task = self.bpmn_workflow.task_tree.children[0]
first_task.reset_token(reset_data=False) first_task.reset_token(reset_data=True) # Clear out the data.
for task in new_bpmn_workflow.get_tasks(SpiffTask.READY): for task in new_bpmn_workflow.get_tasks(SpiffTask.READY):
task.data = first_task.data task.data = first_task.data
new_bpmn_workflow.do_engine_steps() new_bpmn_workflow.do_engine_steps()

View File

@ -9,6 +9,7 @@ from SpiffWorkflow.bpmn.specs.ScriptTask import ScriptTask
from SpiffWorkflow.bpmn.specs.UserTask import UserTask from SpiffWorkflow.bpmn.specs.UserTask import UserTask
from SpiffWorkflow.dmn.specs.BusinessRuleTask import BusinessRuleTask from SpiffWorkflow.dmn.specs.BusinessRuleTask import BusinessRuleTask
from SpiffWorkflow.specs import CancelTask, StartTask from SpiffWorkflow.specs import CancelTask, StartTask
from SpiffWorkflow.util.deep_merge import DeepMerge
from flask import g from flask import g
from jinja2 import Template from jinja2 import Template
@ -316,21 +317,21 @@ class WorkflowService(object):
field.options.append({"id": d.value, "name": d.label}) field.options.append({"id": d.value, "name": d.label})
@staticmethod @staticmethod
def log_task_action(processor, spiff_task, action): def log_task_action(workflow_model: WorkflowModel, task: Task,
task = WorkflowService.spiff_task_to_api_task(spiff_task) action: string, version, updated_data=None):
workflow_model = processor.workflow_model
task_event = TaskEventModel( task_event = TaskEventModel(
study_id=workflow_model.study_id, study_id=workflow_model.study_id,
user_uid=g.user.uid, user_uid=g.user.uid,
workflow_id=workflow_model.id, workflow_id=workflow_model.id,
workflow_spec_id=workflow_model.workflow_spec_id, workflow_spec_id=workflow_model.workflow_spec_id,
spec_version=processor.get_version_string(), spec_version=version,
action=action, action=action,
task_id=task.id, task_id=task.id,
task_name=task.name, task_name=task.name,
task_title=task.title, task_title=task.title,
task_type=str(task.type), task_type=str(task.type),
task_state=task.state, task_state=task.state,
task_data=updated_data,
mi_type=task.multi_instance_type.value, # Some tasks have a repeat behavior. mi_type=task.multi_instance_type.value, # Some tasks have a repeat behavior.
mi_count=task.multi_instance_count, # This is the number of times the task could repeat. mi_count=task.multi_instance_count, # This is the number of times the task could repeat.
mi_index=task.multi_instance_index, # And the index of the currently repeating task. mi_index=task.multi_instance_index, # And the index of the currently repeating task.

View File

@ -0,0 +1,28 @@
"""empty message
Revision ID: 3876e130664e
Revises: 5064b72284b7
Create Date: 2020-06-01 15:39:53.937591
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '3876e130664e'
down_revision = '5064b72284b7'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('task_event', sa.Column('task_data', sa.JSON(), nullable=True))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('task_event', 'task_data')
# ### end Alembic commands ###

View File

@ -10,7 +10,6 @@ from crc.models.api_models import WorkflowApiSchema, MultiInstanceType, TaskSche
from crc.models.file import FileModelSchema from crc.models.file import FileModelSchema
from crc.models.stats import TaskEventModel from crc.models.stats import TaskEventModel
from crc.models.workflow import WorkflowStatus from crc.models.workflow import WorkflowStatus
from crc.services.protocol_builder import ProtocolBuilderService
from crc.services.workflow_service import WorkflowService from crc.services.workflow_service import WorkflowService
@ -79,6 +78,9 @@ class TestTasksApi(BaseTest):
self.assertEquals(task_in.process_name, event.process_name) self.assertEquals(task_in.process_name, event.process_name)
self.assertIsNotNone(event.date) self.assertIsNotNone(event.date)
# Assure that the data provided occurs in the task data log.
for key in dict_data.keys():
self.assertIn(key, event.task_data)
workflow = WorkflowApiSchema().load(json_data) workflow = WorkflowApiSchema().load(json_data)
return workflow return workflow

View File

@ -270,53 +270,6 @@ class TestWorkflowProcessor(BaseTest):
processor = self.get_processor(study, workflow_spec_model) processor = self.get_processor(study, workflow_spec_model)
self.assertTrue(processor.get_version_string().startswith('v2.1.1')) self.assertTrue(processor.get_version_string().startswith('v2.1.1'))
def test_restart_workflow(self):
self.load_example_data()
study = session.query(StudyModel).first()
workflow_spec_model = self.load_test_spec("two_forms")
processor = self.get_processor(study, workflow_spec_model)
self.assertEqual(processor.workflow_model.workflow_spec_id, workflow_spec_model.id)
task = processor.next_task()
task.data = {"key": "Value"}
processor.complete_task(task)
task_before_restart = processor.next_task()
processor.hard_reset()
task_after_restart = processor.next_task()
self.assertNotEqual(task.get_name(), task_before_restart.get_name())
self.assertEqual(task.get_name(), task_after_restart.get_name())
self.assertEqual(task.data, task_after_restart.data)
def test_soft_reset(self):
self.load_example_data()
# Start the two_forms workflow, and enter some data in the first form.
study = session.query(StudyModel).first()
workflow_spec_model = self.load_test_spec("two_forms")
processor = self.get_processor(study, workflow_spec_model)
self.assertEqual(processor.workflow_model.workflow_spec_id, workflow_spec_model.id)
task = processor.next_task()
task.data = {"color": "blue"}
processor.complete_task(task)
# Modify the specification, with a minor text change.
file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'two_forms', 'mods', 'two_forms_text_mod.bpmn')
self.replace_file("two_forms.bpmn", file_path)
# Setting up another processor should not error out, but doesn't pick up the update.
processor.workflow_model.bpmn_workflow_json = processor.serialize()
processor2 = WorkflowProcessor(processor.workflow_model)
self.assertEqual("Step 1", processor2.bpmn_workflow.last_task.task_spec.description)
self.assertNotEqual("# This is some documentation I wanted to add.",
processor2.bpmn_workflow.last_task.task_spec.documentation)
# You can do a soft update and get the right response.
processor3 = WorkflowProcessor(processor.workflow_model, soft_reset=True)
self.assertEqual("Step 1", processor3.bpmn_workflow.last_task.task_spec.description)
self.assertEqual("# This is some documentation I wanted to add.",
processor3.bpmn_workflow.last_task.task_spec.documentation)
def test_hard_reset(self): def test_hard_reset(self):
self.load_example_data() self.load_example_data()