process children and tasks of parent subprocesses instead of looking for all tasks with a given state w/ burnettk

This commit is contained in:
jasquat 2023-03-27 10:37:31 -04:00
parent 199cf05960
commit 9e91ff8f91
No known key found for this signature in database
4 changed files with 149 additions and 124 deletions

View File

@ -1,5 +1,7 @@
"""Process_instance_processor."""
import _strptime # type: ignore
from sqlalchemy import or_
from sqlalchemy import and_
import decimal
import json
import logging
@ -1263,109 +1265,109 @@ class ProcessInstanceProcessor:
cls, process_instance: ProcessInstanceModel, to_task_guid: str, commit: Optional[bool] = False
) -> None:
"""Reset a process to an earlier state."""
raise Exception("This feature to reset a process instance to a given task is currently unavaiable")
# cls.add_event_to_process_instance(
# process_instance, ProcessInstanceEventType.process_instance_rewound_to_task.value, task_guid=to_task_guid
# )
#
# to_task_model = TaskModel.query.filter_by(guid=to_task_guid, process_instance_id=process_instance.id).first()
# if to_task_model is None:
# raise TaskNotFoundError(
# f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'"
# )
#
# parent_bpmn_processes, task_models_of_parent_bpmn_processes = TaskService.task_models_of_parent_bpmn_processes(
# to_task_model
# )
# [p.guid for p in task_models_of_parent_bpmn_processes if p.guid]
# [p.id for p in parent_bpmn_processes]
# tasks_to_update_query = db.session.query(TaskModel).filter(
# and_(
# or_(
# TaskModel.end_in_seconds > to_task_model.end_in_seconds,
# TaskModel.end_in_seconds.is_(None), # type: ignore
# ),
# TaskModel.process_instance_id == process_instance.id,
# # TaskModel.bpmn_process_id.in_(parent_bpmn_processes_ids), # type: ignore
# )
# )
# tasks_to_update = tasks_to_update_query.all()
#
# # run all queries before making changes to task_model
# if commit:
# # tasks_to_delete_query = db.session.query(TaskModel).filter(
# # and_(
# # or_(
# # TaskModel.end_in_seconds > to_task_model.end_in_seconds,
# # TaskModel.end_in_seconds.is_not(None), # type: ignore
# # ),
# # TaskModel.process_instance_id == process_instance.id,
# # TaskModel.guid.not_in(task_models_of_parent_bpmn_processes_guids), # type: ignore
# # TaskModel.bpmn_process_id.not_in(parent_bpmn_processes_ids), # type: ignore
# # )
# # )
# #
# # tasks_to_delete = tasks_to_delete_query.all()
# #
# # # delete any later tasks from to_task_model and delete bpmn processes that may be
# # # link directly to one of those tasks.
# # tasks_to_delete_guids = [t.guid for t in tasks_to_delete]
# # tasks_to_delete_ids = [t.id for t in tasks_to_delete]
# # bpmn_processes_to_delete = BpmnProcessModel.query.filter(
# # BpmnProcessModel.guid.in_(tasks_to_delete_guids) # type: ignore
# # ).order_by(BpmnProcessModel.id.desc()).all()
# # human_tasks_to_delete = HumanTaskModel.query.filter(
# # HumanTaskModel.task_model_id.in_(tasks_to_delete_ids) # type: ignore
# # ).all()
# #
# #
# # import pdb; pdb.set_trace()
# # # ensure the correct order for foreign keys
# # for human_task_to_delete in human_tasks_to_delete:
# # db.session.delete(human_task_to_delete)
# # db.session.commit()
# # for task_to_delete in tasks_to_delete:
# # db.session.delete(task_to_delete)
# # db.session.commit()
# # for bpmn_process_to_delete in bpmn_processes_to_delete:
# # db.session.delete(bpmn_process_to_delete)
# # db.session.commit()
#
# related_human_task = HumanTaskModel.query.filter_by(task_model_id=to_task_model.id).first()
# if related_human_task is not None:
# db.session.delete(related_human_task)
#
# tasks_to_update_ids = [t.id for t in tasks_to_update]
# human_tasks_to_delete = HumanTaskModel.query.filter(
# HumanTaskModel.task_model_id.in_(tasks_to_update_ids) # type: ignore
# ).all()
# for human_task_to_delete in human_tasks_to_delete:
# db.session.delete(human_task_to_delete)
# db.session.commit()
#
# for task_to_update in tasks_to_update:
# TaskService.reset_task_model(task_to_update, state="FUTURE", commit=commit)
#
# parent_task_model = TaskModel.query.filter_by(guid=to_task_model.properties_json["parent"]).first()
# if parent_task_model is None:
# raise TaskNotFoundError(
# f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'"
# )
#
# TaskService.reset_task_model(
# to_task_model,
# state="READY",
# json_data_hash=parent_task_model.json_data_hash,
# python_env_data_hash=parent_task_model.python_env_data_hash,
# commit=commit,
# )
# for task_model in task_models_of_parent_bpmn_processes:
# TaskService.reset_task_model(task_model, state="WAITING", commit=commit)
#
# if commit:
# processor = ProcessInstanceProcessor(process_instance)
# processor.save()
# processor.suspend()
# raise Exception("This feature to reset a process instance to a given task is currently unavaiable")
cls.add_event_to_process_instance(
process_instance, ProcessInstanceEventType.process_instance_rewound_to_task.value, task_guid=to_task_guid
)
to_task_model = TaskModel.query.filter_by(guid=to_task_guid, process_instance_id=process_instance.id).first()
if to_task_model is None:
raise TaskNotFoundError(
f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'"
)
parent_bpmn_processes, task_models_of_parent_bpmn_processes = TaskService.task_models_of_parent_bpmn_processes(
to_task_model
)
[p.guid for p in task_models_of_parent_bpmn_processes if p.guid]
[p.id for p in parent_bpmn_processes]
tasks_to_update_query = db.session.query(TaskModel).filter(
and_(
or_(
TaskModel.end_in_seconds > to_task_model.end_in_seconds,
TaskModel.end_in_seconds.is_(None), # type: ignore
),
TaskModel.process_instance_id == process_instance.id,
# TaskModel.bpmn_process_id.in_(parent_bpmn_processes_ids), # type: ignore
)
)
tasks_to_update = tasks_to_update_query.all()
# run all queries before making changes to task_model
if commit:
# tasks_to_delete_query = db.session.query(TaskModel).filter(
# and_(
# or_(
# TaskModel.end_in_seconds > to_task_model.end_in_seconds,
# TaskModel.end_in_seconds.is_not(None), # type: ignore
# ),
# TaskModel.process_instance_id == process_instance.id,
# TaskModel.guid.not_in(task_models_of_parent_bpmn_processes_guids), # type: ignore
# TaskModel.bpmn_process_id.not_in(parent_bpmn_processes_ids), # type: ignore
# )
# )
#
# tasks_to_delete = tasks_to_delete_query.all()
#
# # delete any later tasks from to_task_model and delete bpmn processes that may be
# # link directly to one of those tasks.
# tasks_to_delete_guids = [t.guid for t in tasks_to_delete]
# tasks_to_delete_ids = [t.id for t in tasks_to_delete]
# bpmn_processes_to_delete = BpmnProcessModel.query.filter(
# BpmnProcessModel.guid.in_(tasks_to_delete_guids) # type: ignore
# ).order_by(BpmnProcessModel.id.desc()).all()
# human_tasks_to_delete = HumanTaskModel.query.filter(
# HumanTaskModel.task_model_id.in_(tasks_to_delete_ids) # type: ignore
# ).all()
#
#
# import pdb; pdb.set_trace()
# # ensure the correct order for foreign keys
# for human_task_to_delete in human_tasks_to_delete:
# db.session.delete(human_task_to_delete)
# db.session.commit()
# for task_to_delete in tasks_to_delete:
# db.session.delete(task_to_delete)
# db.session.commit()
# for bpmn_process_to_delete in bpmn_processes_to_delete:
# db.session.delete(bpmn_process_to_delete)
# db.session.commit()
related_human_task = HumanTaskModel.query.filter_by(task_model_id=to_task_model.id).first()
if related_human_task is not None:
db.session.delete(related_human_task)
tasks_to_update_ids = [t.id for t in tasks_to_update]
human_tasks_to_delete = HumanTaskModel.query.filter(
HumanTaskModel.task_model_id.in_(tasks_to_update_ids) # type: ignore
).all()
for human_task_to_delete in human_tasks_to_delete:
db.session.delete(human_task_to_delete)
db.session.commit()
for task_to_update in tasks_to_update:
TaskService.reset_task_model(task_to_update, state="FUTURE", commit=commit)
parent_task_model = TaskModel.query.filter_by(guid=to_task_model.properties_json["parent"]).first()
if parent_task_model is None:
raise TaskNotFoundError(
f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'"
)
TaskService.reset_task_model(
to_task_model,
state="READY",
json_data_hash=parent_task_model.json_data_hash,
python_env_data_hash=parent_task_model.python_env_data_hash,
commit=commit,
)
for task_model in task_models_of_parent_bpmn_processes:
TaskService.reset_task_model(task_model, state="WAITING", commit=commit)
if commit:
processor = ProcessInstanceProcessor(process_instance)
processor.save()
processor.suspend()
@staticmethod
def get_parser() -> MyCustomParser:

View File

@ -110,9 +110,9 @@ class TaskService:
for sp_id, sp in top_level_workflow.subprocesses.items():
if sp == my_wf:
my_sp = sp
my_sp_id = sp_id
my_sp_id = str(sp_id)
break
return (str(my_sp_id), my_sp)
return (my_sp_id, my_sp)
@classmethod
def task_bpmn_process(

View File

@ -1,5 +1,6 @@
import logging
import time
from uuid import UUID
from typing import Callable
from typing import Optional
@ -67,6 +68,7 @@ class TaskModelSavingDelegate(EngineStepDelegate):
self.task_models: dict[str, TaskModel] = {}
self.json_data_dicts: dict[str, JsonDataDict] = {}
self.process_instance_events: dict[str, ProcessInstanceEventModel] = {}
self.last_completed_spiff_task: Optional[SpiffTask] = None
def will_complete_task(self, spiff_task: SpiffTask) -> None:
if self._should_update_task_model():
@ -81,6 +83,7 @@ class TaskModelSavingDelegate(EngineStepDelegate):
raise Exception("Could not find cached current_task_start_in_seconds. This should never have happend")
task_model.start_in_seconds = self.current_task_start_in_seconds
task_model.end_in_seconds = time.time()
self.last_completed_spiff_task= spiff_task
if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.did_complete_task(spiff_task)
@ -104,10 +107,27 @@ class TaskModelSavingDelegate(EngineStepDelegate):
# TODO: also include children of the last task processed. This may help with task resets
# if we have to set their states to FUTURE.
# excludes FUTURE and COMPLETED. the others were required to get PP1 to go to completion.
for waiting_spiff_task in bpmn_process_instance.get_tasks(
TaskState.WAITING | TaskState.CANCELLED | TaskState.READY | TaskState.MAYBE | TaskState.LIKELY
):
self._update_task_model_with_spiff_task(waiting_spiff_task)
# for waiting_spiff_task in bpmn_process_instance.get_tasks(
# TaskState.WAITING | TaskState.CANCELLED | TaskState.READY | TaskState.MAYBE | TaskState.LIKELY
# ):
# self._update_task_model_with_spiff_task(waiting_spiff_task)
if self.last_completed_spiff_task is not None:
self._process_spiff_task_children(self.last_completed_spiff_task)
self._process_spiff_task_parents(self.last_completed_spiff_task)
def _process_spiff_task_children(self, spiff_task: SpiffTask) -> None:
for child_spiff_task in spiff_task.children:
self._update_task_model_with_spiff_task(child_spiff_task)
self._process_spiff_task_children(child_spiff_task)
def _process_spiff_task_parents(self, spiff_task: SpiffTask) -> None:
(parent_subprocess_guid, _parent_subprocess) = TaskService.task_subprocess(spiff_task)
if parent_subprocess_guid is not None:
spiff_task_of_parent_subprocess = spiff_task.workflow._get_outermost_workflow().get_task(UUID(parent_subprocess_guid))
if spiff_task_of_parent_subprocess is not None:
self._update_task_model_with_spiff_task(spiff_task_of_parent_subprocess)
self._process_spiff_task_parents(spiff_task_of_parent_subprocess)
def _should_update_task_model(self) -> bool:
"""We need to figure out if we have previously save task info on this process intance.

View File

@ -260,7 +260,7 @@ export default function ProcessInstanceShow({ variant }: OwnProps) {
return !taskToTimeTravelTo;
};
const completionViewLink = (label: any, taskGuid: string) => {
const queryParams = () => {
const processIdentifier = searchParams.get('process_identifier');
const callActivityTaskId = searchParams.get('bpmn_process_guid');
const queryParamArray = [];
@ -270,16 +270,19 @@ export default function ProcessInstanceShow({ variant }: OwnProps) {
if (callActivityTaskId) {
queryParamArray.push(`bpmn_process_guid=${callActivityTaskId}`);
}
let queryParams = '';
let queryParamString = '';
if (queryParamArray.length > 0) {
queryParams = `?${queryParamArray.join('&')}`;
queryParamString = `?${queryParamArray.join('&')}`;
}
return queryParamString;
};
const completionViewLink = (label: any, taskGuid: string) => {
return (
<Link
reloadDocument
data-qa="process-instance-step-link"
to={`${processInstanceShowPageBaseUrl}/${taskGuid}${queryParams}`}
to={`${processInstanceShowPageBaseUrl}/${taskGuid}${queryParams()}`}
>
{label}
</Link>
@ -287,7 +290,7 @@ export default function ProcessInstanceShow({ variant }: OwnProps) {
};
const returnToProcessInstance = () => {
window.location.href = processInstanceShowPageBaseUrl;
window.location.href = `${processInstanceShowPageBaseUrl}${queryParams()}`;
};
const resetProcessInstance = () => {
@ -671,16 +674,16 @@ export default function ProcessInstanceShow({ variant }: OwnProps) {
);
};
const canResetProcess = (_task: Task) => {
// disabling this feature for now
return false;
// return (
// ability.can('POST', targetUris.processInstanceResetPath) &&
// processInstance &&
// processInstance.status === 'suspended' &&
// task.state === 'READY' &&
// !showingActiveTask()
// );
const canResetProcess = (task: Task) => {
// // disabling this feature for now
// return false;
return (
ability.can('POST', targetUris.processInstanceResetPath) &&
processInstance &&
processInstance.status === 'suspended' &&
task.state === 'READY' &&
!showingActiveTask()
);
};
const getEvents = (task: Task) => {