commented out reset process code and added comment and raise until we get it actually working and fixed issue with viewing at completed task where it was not including the tasks for the parent bpmn processes

This commit is contained in:
jasquat 2023-03-23 10:44:09 -04:00
parent 722680a5ac
commit 7a14a58518
9 changed files with 234 additions and 169 deletions

View File

@ -1,5 +1,7 @@
from __future__ import annotations
from dataclasses import dataclass
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
@ -18,6 +20,7 @@ class BpmnProcessNotFoundError(Exception):
# "success", # boolean
# "bpmn_messages", # if top-level process
# "correlations", # if top-level process
@dataclass
class BpmnProcessModel(SpiffworkflowBaseDBModel):
__tablename__ = "bpmn_process"
id: int = db.Column(db.Integer, primary_key=True)

View File

@ -1,5 +1,7 @@
from __future__ import annotations
from dataclasses import dataclass
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
@ -10,6 +12,7 @@ from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
#
# each subprocess will have its own row in this table.
# there is a join table to link them together: bpmn_process_definition_relationship
@dataclass
class BpmnProcessDefinitionModel(SpiffworkflowBaseDBModel):
__tablename__ = "bpmn_process_definition"
id: int = db.Column(db.Integer, primary_key=True)

View File

@ -1,5 +1,7 @@
from __future__ import annotations
from dataclasses import dataclass
from sqlalchemy import ForeignKey
from sqlalchemy import UniqueConstraint
@ -10,6 +12,7 @@ from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
@dataclass
class BpmnProcessDefinitionRelationshipModel(SpiffworkflowBaseDBModel):
__tablename__ = "bpmn_process_definition_relationship"
__table_args__ = (

View File

@ -1,5 +1,7 @@
from __future__ import annotations
from dataclasses import dataclass
from sqlalchemy import ForeignKey
from sqlalchemy import UniqueConstraint
from sqlalchemy.orm import relationship
@ -11,6 +13,7 @@ from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
@dataclass
class TaskDefinitionModel(SpiffworkflowBaseDBModel):
__tablename__ = "task_definition"
__table_args__ = (

View File

@ -606,6 +606,8 @@ def process_instance_task_list(
TaskModel.process_instance_id == process_instance.id,
)
to_task_model: Optional[TaskModel] = None
task_models_of_parent_bpmn_processes_guids: list[str] = []
if to_task_guid is not None:
to_task_model = TaskModel.query.filter_by(guid=to_task_guid, process_instance_id=process_instance.id).first()
if to_task_model is None:
@ -614,7 +616,28 @@ def process_instance_task_list(
message=f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'",
status_code=400,
)
task_model_query = task_model_query.filter(TaskModel.end_in_seconds <= to_task_model.end_in_seconds)
if to_task_model.state != "COMPLETED":
# TODO: find a better term for viewing at task state
raise ApiError(
error_code="task_cannot_be_viewed_at",
message=(
f"Desired task with guid '{to_task_guid}' for process instance '{process_instance.id}' was never"
" completed and therefore cannot be viewed at."
),
status_code=400,
)
_parent_bpmn_processes, task_models_of_parent_bpmn_processes = (
TaskService.task_models_of_parent_bpmn_processes(to_task_model)
)
task_models_of_parent_bpmn_processes_guids = [p.guid for p in task_models_of_parent_bpmn_processes if p.guid]
task_model_query = task_model_query.filter(
or_(
TaskModel.end_in_seconds <= to_task_model.end_in_seconds, # type: ignore
TaskModel.guid.in_(task_models_of_parent_bpmn_processes_guids), # type: ignore
)
)
bpmn_process_alias = aliased(BpmnProcessModel)
direct_parent_bpmn_process_alias = aliased(BpmnProcessModel)
@ -649,6 +672,9 @@ def process_instance_task_list(
TaskDefinitionModel.properties_json.label("task_definition_properties_json"), # type: ignore
TaskModel.guid,
TaskModel.state,
TaskModel.properties_json,
TaskModel.end_in_seconds,
TaskModel.start_in_seconds,
)
)
@ -656,11 +682,18 @@ def process_instance_task_list(
task_model_query = task_model_query.filter(bpmn_process_alias.id.in_(bpmn_process_ids))
task_models = task_model_query.all()
if to_task_guid is not None:
if to_task_model is not None:
task_models_dict = json.loads(current_app.json.dumps(task_models))
for task_model in task_models_dict:
if task_model["guid"] == to_task_guid and task_model["state"] == "COMPLETED":
task_model["state"] = "READY"
end_in_seconds = float(task_model["end_in_seconds"])
if to_task_model.guid == task_model["guid"] and task_model["state"] == "COMPLETED":
TaskService.reset_task_model_dict(task_model, state="READY")
elif (
end_in_seconds is None
or to_task_model.end_in_seconds is None
or to_task_model.end_in_seconds < end_in_seconds
) and task_model["guid"] in task_models_of_parent_bpmn_processes_guids:
TaskService.reset_task_model_dict(task_model, state="WAITING")
return make_response(jsonify(task_models_dict), 200)
return make_response(jsonify(task_models), 200)

View File

@ -52,8 +52,6 @@ from SpiffWorkflow.spiff.serializer.config import SPIFF_SPEC_CONFIG # type: ign
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.task import TaskState
from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore
from sqlalchemy import and_
from sqlalchemy import or_
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
@ -1311,114 +1309,118 @@ class ProcessInstanceProcessor:
# Saving the workflow seems to reset the status
self.suspend()
# FIXME: this currently cannot work for multi-instance tasks and loopback. It can somewhat for not those
# if we can properly handling resetting children tasks. Right now if we set them all to FUTURE then
# they never get picked up by spiff and processed. The process instance just stops after the to_task_guid
# and marks itself complete without processing any of the children.
@classmethod
def reset_process(
cls, process_instance: ProcessInstanceModel, to_task_guid: str, commit: Optional[bool] = False
) -> None:
"""Reset a process to an earlier state."""
cls.add_event_to_process_instance(
process_instance, ProcessInstanceEventType.process_instance_rewound_to_task.value, task_guid=to_task_guid
)
to_task_model = TaskModel.query.filter_by(guid=to_task_guid, process_instance_id=process_instance.id).first()
if to_task_model is None:
raise TaskNotFoundError(
f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'"
)
parent_bpmn_processes, task_models_of_parent_bpmn_processes = TaskService.task_models_of_parent_bpmn_processes(
to_task_model
)
task_models_of_parent_bpmn_processes_guids = [p.guid for p in task_models_of_parent_bpmn_processes if p.guid]
parent_bpmn_processes_ids = [p.id for p in parent_bpmn_processes]
tasks_to_update_query = db.session.query(TaskModel).filter(
and_(
or_(
TaskModel.end_in_seconds > to_task_model.end_in_seconds,
TaskModel.end_in_seconds.is_(None), # type: ignore
),
TaskModel.process_instance_id == process_instance.id,
# TaskModel.bpmn_process_id.in_(parent_bpmn_processes_ids), # type: ignore
)
)
tasks_to_update = tasks_to_update_query.all()
# run all queries before making changes to task_model
if commit:
# tasks_to_delete_query = db.session.query(TaskModel).filter(
raise Exception("This feature to reset a process instance to a given task is currently unavaiable")
# cls.add_event_to_process_instance(
# process_instance, ProcessInstanceEventType.process_instance_rewound_to_task.value, task_guid=to_task_guid
# )
#
# to_task_model = TaskModel.query.filter_by(guid=to_task_guid, process_instance_id=process_instance.id).first()
# if to_task_model is None:
# raise TaskNotFoundError(
# f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'"
# )
#
# parent_bpmn_processes, task_models_of_parent_bpmn_processes = TaskService.task_models_of_parent_bpmn_processes(
# to_task_model
# )
# [p.guid for p in task_models_of_parent_bpmn_processes if p.guid]
# [p.id for p in parent_bpmn_processes]
# tasks_to_update_query = db.session.query(TaskModel).filter(
# and_(
# or_(
# TaskModel.end_in_seconds > to_task_model.end_in_seconds,
# TaskModel.end_in_seconds.is_not(None), # type: ignore
# TaskModel.end_in_seconds.is_(None), # type: ignore
# ),
# TaskModel.process_instance_id == process_instance.id,
# TaskModel.guid.not_in(task_models_of_parent_bpmn_processes_guids), # type: ignore
# TaskModel.bpmn_process_id.not_in(parent_bpmn_processes_ids), # type: ignore
# # TaskModel.bpmn_process_id.in_(parent_bpmn_processes_ids), # type: ignore
# )
# )
# tasks_to_update = tasks_to_update_query.all()
#
# tasks_to_delete = tasks_to_delete_query.all()
# # run all queries before making changes to task_model
# if commit:
# # tasks_to_delete_query = db.session.query(TaskModel).filter(
# # and_(
# # or_(
# # TaskModel.end_in_seconds > to_task_model.end_in_seconds,
# # TaskModel.end_in_seconds.is_not(None), # type: ignore
# # ),
# # TaskModel.process_instance_id == process_instance.id,
# # TaskModel.guid.not_in(task_models_of_parent_bpmn_processes_guids), # type: ignore
# # TaskModel.bpmn_process_id.not_in(parent_bpmn_processes_ids), # type: ignore
# # )
# # )
# #
# # tasks_to_delete = tasks_to_delete_query.all()
# #
# # # delete any later tasks from to_task_model and delete bpmn processes that may be
# # # link directly to one of those tasks.
# # tasks_to_delete_guids = [t.guid for t in tasks_to_delete]
# # tasks_to_delete_ids = [t.id for t in tasks_to_delete]
# # bpmn_processes_to_delete = BpmnProcessModel.query.filter(
# # BpmnProcessModel.guid.in_(tasks_to_delete_guids) # type: ignore
# # ).order_by(BpmnProcessModel.id.desc()).all()
# # human_tasks_to_delete = HumanTaskModel.query.filter(
# # HumanTaskModel.task_model_id.in_(tasks_to_delete_ids) # type: ignore
# # ).all()
# #
# #
# # import pdb; pdb.set_trace()
# # # ensure the correct order for foreign keys
# # for human_task_to_delete in human_tasks_to_delete:
# # db.session.delete(human_task_to_delete)
# # db.session.commit()
# # for task_to_delete in tasks_to_delete:
# # db.session.delete(task_to_delete)
# # db.session.commit()
# # for bpmn_process_to_delete in bpmn_processes_to_delete:
# # db.session.delete(bpmn_process_to_delete)
# # db.session.commit()
#
# # delete any later tasks from to_task_model and delete bpmn processes that may be
# # link directly to one of those tasks.
# tasks_to_delete_guids = [t.guid for t in tasks_to_delete]
# tasks_to_delete_ids = [t.id for t in tasks_to_delete]
# bpmn_processes_to_delete = BpmnProcessModel.query.filter(
# BpmnProcessModel.guid.in_(tasks_to_delete_guids) # type: ignore
# ).order_by(BpmnProcessModel.id.desc()).all()
# related_human_task = HumanTaskModel.query.filter_by(task_model_id=to_task_model.id).first()
# if related_human_task is not None:
# db.session.delete(related_human_task)
#
# tasks_to_update_ids = [t.id for t in tasks_to_update]
# human_tasks_to_delete = HumanTaskModel.query.filter(
# HumanTaskModel.task_model_id.in_(tasks_to_delete_ids) # type: ignore
# HumanTaskModel.task_model_id.in_(tasks_to_update_ids) # type: ignore
# ).all()
#
#
# import pdb; pdb.set_trace()
# # ensure the correct order for foreign keys
# for human_task_to_delete in human_tasks_to_delete:
# db.session.delete(human_task_to_delete)
# db.session.commit()
# for task_to_delete in tasks_to_delete:
# db.session.delete(task_to_delete)
# db.session.commit()
# for bpmn_process_to_delete in bpmn_processes_to_delete:
# db.session.delete(bpmn_process_to_delete)
# db.session.commit()
related_human_task = HumanTaskModel.query.filter_by(task_model_id=to_task_model.id).first()
if related_human_task is not None:
db.session.delete(related_human_task)
tasks_to_update_ids = [t.id for t in tasks_to_update]
human_tasks_to_delete = HumanTaskModel.query.filter(
HumanTaskModel.task_model_id.in_(tasks_to_update_ids) # type: ignore
).all()
for human_task_to_delete in human_tasks_to_delete:
db.session.delete(human_task_to_delete)
db.session.commit()
for task_to_update in tasks_to_update:
TaskService.reset_task_model(task_to_update, state="FUTURE", commit=commit)
parent_task_model = TaskModel.query.filter_by(guid=to_task_model.properties_json["parent"]).first()
if parent_task_model is None:
raise TaskNotFoundError(
f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'"
)
TaskService.reset_task_model(
to_task_model,
state="READY",
json_data_hash=parent_task_model.json_data_hash,
python_env_data_hash=parent_task_model.python_env_data_hash,
commit=commit,
)
for task_model in task_models_of_parent_bpmn_processes:
TaskService.reset_task_model(task_model, state="WAITING", commit=commit)
if commit:
processor = ProcessInstanceProcessor(process_instance)
processor.save()
processor.suspend()
#
# for task_to_update in tasks_to_update:
# TaskService.reset_task_model(task_to_update, state="FUTURE", commit=commit)
#
# parent_task_model = TaskModel.query.filter_by(guid=to_task_model.properties_json["parent"]).first()
# if parent_task_model is None:
# raise TaskNotFoundError(
# f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'"
# )
#
# TaskService.reset_task_model(
# to_task_model,
# state="READY",
# json_data_hash=parent_task_model.json_data_hash,
# python_env_data_hash=parent_task_model.python_env_data_hash,
# commit=commit,
# )
# for task_model in task_models_of_parent_bpmn_processes:
# TaskService.reset_task_model(task_model, state="WAITING", commit=commit)
#
# if commit:
# processor = ProcessInstanceProcessor(process_instance)
# processor.save()
# processor.suspend()
@staticmethod
def get_parser() -> MyCustomParser:

View File

@ -332,6 +332,17 @@ class TaskService:
return (bpmn_processes + b, [parent_task_model] + t)
return (bpmn_processes, task_models)
@classmethod
def reset_task_model_dict(
cls,
task_model: dict,
state: str,
) -> None:
task_model["state"] = state
task_model["start_in_seconds"] = None
task_model["end_in_seconds"] = None
task_model["properties_json"]["state"] = getattr(TaskState, state)
@classmethod
def reset_task_model(
cls,

View File

@ -107,6 +107,8 @@ class TaskModelSavingDelegate(EngineStepDelegate):
def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None:
if self._should_update_task_model():
# TODO: also include children of the last task processed. This may help with task resets
# if we have to set their states to FUTURE.
# excludes FUTURE and COMPLETED. the others were required to get PP1 to go to completion.
for waiting_spiff_task in bpmn_process_instance.get_tasks(
TaskState.WAITING | TaskState.CANCELLED | TaskState.READY | TaskState.MAYBE | TaskState.LIKELY

View File

@ -256,63 +256,60 @@ class TestProcessInstanceProcessor(BaseTest):
assert spiff_task is not None
assert spiff_task.state == TaskState.COMPLETED
def test_properly_resets_process_to_given_task(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
self.create_process_group(client, with_super_admin_user, "test_group", "test_group")
initiator_user = self.find_or_create_user("initiator_user")
finance_user_three = self.find_or_create_user("testuser3")
assert initiator_user.principal is not None
assert finance_user_three.principal is not None
AuthorizationService.import_permissions_from_yaml_file()
finance_group = GroupModel.query.filter_by(identifier="Finance Team").first()
assert finance_group is not None
process_model = load_test_spec(
process_model_id="test_group/manual_task_with_subprocesses",
process_model_source_directory="manual_task_with_subprocesses",
)
process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=initiator_user
)
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
assert len(process_instance.active_human_tasks) == 1
initial_human_task_id = process_instance.active_human_tasks[0].id
# save again to ensure we go attempt to process the human tasks again
processor.save()
assert len(process_instance.active_human_tasks) == 1
assert initial_human_task_id == process_instance.active_human_tasks[0].id
processor = ProcessInstanceProcessor(process_instance)
human_task_one = process_instance.active_human_tasks[0]
spiff_manual_task = processor.__class__.get_task_by_bpmn_identifier(
human_task_one.task_name, processor.bpmn_process_instance
)
ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one)
processor.suspend()
ProcessInstanceProcessor.reset_process(process_instance, str(spiff_manual_task.id), commit=True)
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first()
processor = ProcessInstanceProcessor(process_instance)
human_task_one = process_instance.active_human_tasks[0]
spiff_manual_task = processor.bpmn_process_instance.get_task(UUID(human_task_one.task_id))
import pdb; pdb.set_trace()
ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one)
import pdb; pdb.set_trace()
human_task_one = process_instance.active_human_tasks[0]
spiff_manual_task = processor.bpmn_process_instance.get_task(UUID(human_task_one.task_id))
ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one)
import pdb; pdb.set_trace()
# TODO: FIX resetting a process instance to a task
# def test_properly_resets_process_to_given_task(
# self,
# app: Flask,
# client: FlaskClient,
# with_db_and_bpmn_file_cleanup: None,
# with_super_admin_user: UserModel,
# ) -> None:
# self.create_process_group(client, with_super_admin_user, "test_group", "test_group")
# initiator_user = self.find_or_create_user("initiator_user")
# finance_user_three = self.find_or_create_user("testuser3")
# assert initiator_user.principal is not None
# assert finance_user_three.principal is not None
# AuthorizationService.import_permissions_from_yaml_file()
#
# finance_group = GroupModel.query.filter_by(identifier="Finance Team").first()
# assert finance_group is not None
#
# process_model = load_test_spec(
# process_model_id="test_group/manual_task_with_subprocesses",
# process_model_source_directory="manual_task_with_subprocesses",
# )
# process_instance = self.create_process_instance_from_process_model(
# process_model=process_model, user=initiator_user
# )
# processor = ProcessInstanceProcessor(process_instance)
# processor.do_engine_steps(save=True)
# assert len(process_instance.active_human_tasks) == 1
# initial_human_task_id = process_instance.active_human_tasks[0].id
#
# # save again to ensure we go attempt to process the human tasks again
# processor.save()
#
# assert len(process_instance.active_human_tasks) == 1
# assert initial_human_task_id == process_instance.active_human_tasks[0].id
#
# processor = ProcessInstanceProcessor(process_instance)
# human_task_one = process_instance.active_human_tasks[0]
# spiff_manual_task = processor.__class__.get_task_by_bpmn_identifier(
# human_task_one.task_name, processor.bpmn_process_instance
# )
# ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one)
#
# processor.suspend()
# ProcessInstanceProcessor.reset_process(process_instance, str(spiff_manual_task.id), commit=True)
#
# process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first()
# processor = ProcessInstanceProcessor(process_instance)
# human_task_one = process_instance.active_human_tasks[0]
# spiff_manual_task = processor.bpmn_process_instance.get_task(UUID(human_task_one.task_id))
# ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one)
# human_task_one = process_instance.active_human_tasks[0]
# spiff_manual_task = processor.bpmn_process_instance.get_task(UUID(human_task_one.task_id))
# ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one)
def test_properly_saves_tasks_when_running(
self,
@ -374,7 +371,11 @@ class TestProcessInstanceProcessor(BaseTest):
}
third_data_set = {
**second_data_set,
**{"set_in_test_process_to_call_script": 1, "set_in_test_process_to_call_subprocess_subprocess_script": 1, "set_in_test_process_to_call_subprocess_script": 1},
**{
"set_in_test_process_to_call_script": 1,
"set_in_test_process_to_call_subprocess_subprocess_script": 1,
"set_in_test_process_to_call_subprocess_script": 1,
},
}
fourth_data_set = {**third_data_set, **{"a": 1, "we_move_on": True}}
fifth_data_set = {**fourth_data_set, **{"validate_only": False, "set_top_level_process_script_after_gate": 1}}
@ -401,7 +402,10 @@ class TestProcessInstanceProcessor(BaseTest):
expected_python_env_data = expected_task_data[expected_task_data_key]
base_failure_message = f"Failed on {bpmn_process_identifier} - {spiff_task_identifier} - task data key {expected_task_data_key}."
base_failure_message = (
f"Failed on {bpmn_process_identifier} - {spiff_task_identifier} - task data key"
f" {expected_task_data_key}."
)
task_model = TaskModel.query.filter_by(guid=str(spiff_task.id)).first()
assert task_model.start_in_seconds is not None
@ -414,7 +418,8 @@ class TestProcessInstanceProcessor(BaseTest):
assert task_definition.bpmn_process_definition.bpmn_identifier == bpmn_process_identifier
message = (
f"{base_failure_message} Expected: {sorted(expected_python_env_data)}. Received: {sorted(task_model.json_data())}"
f"{base_failure_message} Expected: {sorted(expected_python_env_data)}. Received:"
f" {sorted(task_model.json_data())}"
)
# TODO: if we split out env data again we will need to use it here instead of json_data
# assert task_model.python_env_data() == expected_python_env_data, message