cleaned up the reset code w/ burnettk

This commit is contained in:
jasquat 2023-03-31 15:42:18 -04:00
parent 343aae0628
commit 0ff54c9ce8
5 changed files with 77 additions and 94 deletions

View File

@ -693,7 +693,7 @@ def process_instance_reset(
) -> flask.wrappers.Response: ) -> flask.wrappers.Response:
"""Reset a process instance to a particular step.""" """Reset a process instance to a particular step."""
process_instance = _find_process_instance_by_id_or_raise(process_instance_id) process_instance = _find_process_instance_by_id_or_raise(process_instance_id)
ProcessInstanceProcessor.reset_process(process_instance, to_task_guid, commit=True) ProcessInstanceProcessor.reset_process(process_instance, to_task_guid)
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")

View File

@ -1,8 +1,6 @@
"""Process_instance_processor.""" """Process_instance_processor."""
import _strptime # type: ignore import _strptime # type: ignore
import copy import copy
from sqlalchemy import or_
from sqlalchemy import and_
import decimal import decimal
import json import json
import logging import logging
@ -53,6 +51,8 @@ from SpiffWorkflow.spiff.serializer.config import SPIFF_SPEC_CONFIG # type: ign
from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.task import TaskState from SpiffWorkflow.task import TaskState
from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore
from sqlalchemy import and_
from sqlalchemy import or_
from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
@ -1265,9 +1265,7 @@ class ProcessInstanceProcessor:
# they never get picked up by spiff and processed. The process instance just stops after the to_task_guid # they never get picked up by spiff and processed. The process instance just stops after the to_task_guid
# and marks itself complete without processing any of the children. # and marks itself complete without processing any of the children.
@classmethod @classmethod
def reset_process( def reset_process(cls, process_instance: ProcessInstanceModel, to_task_guid: str) -> None:
cls, process_instance: ProcessInstanceModel, to_task_guid: str, commit: Optional[bool] = False
) -> None:
"""Reset a process to an earlier state.""" """Reset a process to an earlier state."""
# raise Exception("This feature to reset a process instance to a given task is currently unavaiable") # raise Exception("This feature to reset a process instance to a given task is currently unavaiable")
cls.add_event_to_process_instance( cls.add_event_to_process_instance(
@ -1280,11 +1278,13 @@ class ProcessInstanceProcessor:
f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'" f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'"
) )
# NOTE: run ALL queries before making changes to ensure we get everything before anything changes
parent_bpmn_processes, task_models_of_parent_bpmn_processes = TaskService.task_models_of_parent_bpmn_processes( parent_bpmn_processes, task_models_of_parent_bpmn_processes = TaskService.task_models_of_parent_bpmn_processes(
to_task_model to_task_model
) )
task_models_of_parent_bpmn_processes_guids = [p.guid for p in task_models_of_parent_bpmn_processes if p.guid] task_models_of_parent_bpmn_processes_guids = [p.guid for p in task_models_of_parent_bpmn_processes if p.guid]
parent_bpmn_processes_ids = [p.id for p in parent_bpmn_processes] parent_bpmn_processes_ids = [p.id for p in parent_bpmn_processes]
tasks_to_update_query = db.session.query(TaskModel).filter( tasks_to_update_query = db.session.query(TaskModel).filter(
and_( and_(
or_( or_(
@ -1297,72 +1297,67 @@ class ProcessInstanceProcessor:
) )
tasks_to_update = tasks_to_update_query.all() tasks_to_update = tasks_to_update_query.all()
tasks_to_update_guids = [t.guid for t in tasks_to_update] tasks_to_update_guids = [t.guid for t in tasks_to_update]
bpmn_processes_to_update_query = db.session.query(BpmnProcessModel).filter(
tasks_to_delete_query = db.session.query(TaskModel).filter(
and_( and_(
BpmnProcessModel.guid.in_(tasks_to_update_guids), # type: ignore or_(
BpmnProcessModel.id.not_in(parent_bpmn_processes_ids), # type: ignore TaskModel.end_in_seconds > to_task_model.end_in_seconds,
TaskModel.end_in_seconds.is_not(None), # type: ignore
),
TaskModel.process_instance_id == process_instance.id,
TaskModel.guid.not_in(task_models_of_parent_bpmn_processes_guids), # type: ignore
TaskModel.bpmn_process_id.not_in(parent_bpmn_processes_ids), # type: ignore
) )
) )
bpmn_processes_to_update = bpmn_processes_to_update_query.all() tasks_to_delete = tasks_to_delete_query.all()
tasks_to_delete_guids = [t.guid for t in tasks_to_delete]
tasks_to_delete_ids = [t.id for t in tasks_to_delete]
# run all queries before making changes to task_model # delete bpmn processes that are also tasks that we either deleted or will update.
if commit: # this is to force spiff to recreate those bpmn processes with the correct associated task guids.
tasks_to_delete_query = db.session.query(TaskModel).filter( bpmn_processes_to_delete_query = db.session.query(BpmnProcessModel).filter(
or_(
BpmnProcessModel.guid.in_(tasks_to_delete_guids), # type: ignore
and_( and_(
or_( BpmnProcessModel.guid.in_(tasks_to_update_guids), # type: ignore
TaskModel.end_in_seconds > to_task_model.end_in_seconds, BpmnProcessModel.id.not_in(parent_bpmn_processes_ids), # type: ignore
TaskModel.end_in_seconds.is_not(None), # type: ignore ),
),
TaskModel.process_instance_id == process_instance.id,
TaskModel.guid.not_in(task_models_of_parent_bpmn_processes_guids), # type: ignore
TaskModel.bpmn_process_id.not_in(parent_bpmn_processes_ids), # type: ignore
)
) )
)
bpmn_processes_to_delete = bpmn_processes_to_delete_query.order_by(
BpmnProcessModel.id.desc() # type: ignore
).all()
tasks_to_delete = tasks_to_delete_query.all() # delete any human task that was for a task that we deleted since they will get recreated later.
human_tasks_to_delete = HumanTaskModel.query.filter(
HumanTaskModel.task_model_id.in_(tasks_to_delete_ids) # type: ignore
).all()
# delete any later tasks from to_task_model and delete bpmn processes that may be # ensure the correct order for foreign keys
# link directly to one of those tasks. for human_task_to_delete in human_tasks_to_delete:
tasks_to_delete_guids = [t.guid for t in tasks_to_delete] db.session.delete(human_task_to_delete)
tasks_to_delete_ids = [t.id for t in tasks_to_delete] db.session.commit()
bpmn_processes_to_delete = BpmnProcessModel.query.filter( for task_to_delete in tasks_to_delete:
BpmnProcessModel.guid.in_(tasks_to_delete_guids) # type: ignore db.session.delete(task_to_delete)
).order_by(BpmnProcessModel.id.desc()).all() db.session.commit()
human_tasks_to_delete = HumanTaskModel.query.filter( for bpmn_process_to_delete in bpmn_processes_to_delete:
HumanTaskModel.task_model_id.in_(tasks_to_delete_ids) # type: ignore db.session.delete(bpmn_process_to_delete)
).all() db.session.commit()
related_human_task = HumanTaskModel.query.filter_by(task_model_id=to_task_model.id).first()
if related_human_task is not None:
db.session.delete(related_human_task)
# import pdb; pdb.set_trace() tasks_to_update_ids = [t.id for t in tasks_to_update]
# ensure the correct order for foreign keys human_tasks_to_delete = HumanTaskModel.query.filter(
for human_task_to_delete in human_tasks_to_delete: HumanTaskModel.task_model_id.in_(tasks_to_update_ids) # type: ignore
db.session.delete(human_task_to_delete) ).all()
db.session.commit() for human_task_to_delete in human_tasks_to_delete:
for task_to_delete in tasks_to_delete: db.session.delete(human_task_to_delete)
db.session.delete(task_to_delete) db.session.commit()
db.session.commit()
for bpmn_process_to_delete in bpmn_processes_to_delete:
db.session.delete(bpmn_process_to_delete)
db.session.commit()
related_human_task = HumanTaskModel.query.filter_by(task_model_id=to_task_model.id).first()
if related_human_task is not None:
db.session.delete(related_human_task)
tasks_to_update_ids = [t.id for t in tasks_to_update]
human_tasks_to_delete = HumanTaskModel.query.filter(
HumanTaskModel.task_model_id.in_(tasks_to_update_ids) # type: ignore
).all()
for human_task_to_delete in human_tasks_to_delete:
db.session.delete(human_task_to_delete)
db.session.commit()
for task_to_update in tasks_to_update: for task_to_update in tasks_to_update:
print(f"task_to_update.state: {task_to_update.state}") TaskService.reset_task_model(task_to_update, state="FUTURE")
TaskService.reset_task_model(task_to_update, state="FUTURE", commit=commit)
for bpmn_process_to_update in bpmn_processes_to_update:
db.session.delete(bpmn_process_to_update)
parent_task_model = TaskModel.query.filter_by(guid=to_task_model.properties_json["parent"]).first() parent_task_model = TaskModel.query.filter_by(guid=to_task_model.properties_json["parent"]).first()
if parent_task_model is None: if parent_task_model is None:
@ -1375,10 +1370,9 @@ class ProcessInstanceProcessor:
state="READY", state="READY",
json_data_hash=parent_task_model.json_data_hash, json_data_hash=parent_task_model.json_data_hash,
python_env_data_hash=parent_task_model.python_env_data_hash, python_env_data_hash=parent_task_model.python_env_data_hash,
commit=commit,
) )
for task_model in task_models_of_parent_bpmn_processes: for task_model in task_models_of_parent_bpmn_processes:
TaskService.reset_task_model(task_model, state="WAITING", commit=commit) TaskService.reset_task_model(task_model, state="WAITING")
bpmn_process = to_task_model.bpmn_process bpmn_process = to_task_model.bpmn_process
properties_json = copy.copy(bpmn_process.properties_json) properties_json = copy.copy(bpmn_process.properties_json)
@ -1387,11 +1381,9 @@ class ProcessInstanceProcessor:
db.session.add(bpmn_process) db.session.add(bpmn_process)
db.session.commit() db.session.commit()
import pdb; pdb.set_trace() processor = ProcessInstanceProcessor(process_instance)
if commit: processor.save()
processor = ProcessInstanceProcessor(process_instance) processor.suspend()
processor.save()
processor.suspend()
@staticmethod @staticmethod
def get_parser() -> MyCustomParser: def get_parser() -> MyCustomParser:

View File

@ -504,7 +504,6 @@ class TaskService:
cls, cls,
task_model: TaskModel, task_model: TaskModel,
state: str, state: str,
commit: Optional[bool] = True,
json_data_hash: Optional[str] = None, json_data_hash: Optional[str] = None,
python_env_data_hash: Optional[str] = None, python_env_data_hash: Optional[str] = None,
) -> None: ) -> None:
@ -522,18 +521,16 @@ class TaskService:
task_model.start_in_seconds = None task_model.start_in_seconds = None
task_model.end_in_seconds = None task_model.end_in_seconds = None
if commit: db.session.add(task_model)
db.session.add(task_model) db.session.commit()
db.session.commit()
new_properties_json["state"] = getattr(TaskState, state) new_properties_json["state"] = getattr(TaskState, state)
task_model.properties_json = new_properties_json task_model.properties_json = new_properties_json
if commit: # if we commit the properties json at the same time as the other items
# if we commit the properties json at the same time as the other items # the json gets reset for some reason.
# the json gets reset for some reason. db.session.add(task_model)
db.session.add(task_model) db.session.commit()
db.session.commit()
@classmethod @classmethod
def _create_task( def _create_task(

View File

@ -1,6 +1,5 @@
"""Test_process_instance_processor.""" """Test_process_instance_processor."""
from uuid import UUID from uuid import UUID
from spiffworkflow_backend.models.task_definition import TaskDefinitionModel
import pytest import pytest
from flask import g from flask import g
@ -17,6 +16,7 @@ from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
from spiffworkflow_backend.models.task import TaskModel # noqa: F401 from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.task_definition import TaskDefinitionModel
from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.authorization_service import AuthorizationService from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.authorization_service import ( from spiffworkflow_backend.services.authorization_service import (
@ -298,7 +298,7 @@ class TestProcessInstanceProcessor(BaseTest):
assert spiff_manual_task is not None assert spiff_manual_task is not None
processor.suspend() processor.suspend()
ProcessInstanceProcessor.reset_process(process_instance, str(spiff_manual_task.parent.id), commit=True) ProcessInstanceProcessor.reset_process(process_instance, str(spiff_manual_task.parent.id))
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first()
processor = ProcessInstanceProcessor(process_instance) processor = ProcessInstanceProcessor(process_instance)
@ -341,15 +341,12 @@ class TestProcessInstanceProcessor(BaseTest):
assert len(process_instance.active_human_tasks) == 1 assert len(process_instance.active_human_tasks) == 1
assert initial_human_task_id == process_instance.active_human_tasks[0].id assert initial_human_task_id == process_instance.active_human_tasks[0].id
# import pdb; pdb.set_trace()
human_task_one = process_instance.active_human_tasks[0] human_task_one = process_instance.active_human_tasks[0]
spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id))
ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one)
# import pdb; pdb.set_trace()
human_task_one = process_instance.active_human_tasks[0] human_task_one = process_instance.active_human_tasks[0]
spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id))
ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one)
# import pdb; pdb.set_trace()
# NOTES: # NOTES:
# somehow we are hosing the task state so that when completing tasks of a subprocess, the task AFTER the subprocess task # somehow we are hosing the task state so that when completing tasks of a subprocess, the task AFTER the subprocess task
@ -367,15 +364,12 @@ class TestProcessInstanceProcessor(BaseTest):
.first() .first()
) )
assert task_model_to_reset_to is not None assert task_model_to_reset_to is not None
import pdb; pdb.set_trace() ProcessInstanceProcessor.reset_process(process_instance, task_model_to_reset_to.guid)
ProcessInstanceProcessor.reset_process(process_instance, task_model_to_reset_to.guid, commit=True)
import pdb; pdb.set_trace()
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first()
processor = ProcessInstanceProcessor(process_instance) processor = ProcessInstanceProcessor(process_instance)
processor.resume() processor.resume()
processor.do_engine_steps(save=True) processor.do_engine_steps(save=True)
import pdb; pdb.set_trace()
assert len(process_instance.active_human_tasks) == 1 assert len(process_instance.active_human_tasks) == 1
human_task_one = process_instance.active_human_tasks[0] human_task_one = process_instance.active_human_tasks[0]

View File

@ -674,16 +674,16 @@ export default function ProcessInstanceShow({ variant }: OwnProps) {
); );
}; };
const canResetProcess = (_task: Task) => { const canResetProcess = (task: Task) => {
// disabling this feature for now // disabling this feature for now
return false; // return false;
// return ( return (
// ability.can('POST', targetUris.processInstanceResetPath) && ability.can('POST', targetUris.processInstanceResetPath) &&
// processInstance && processInstance &&
// processInstance.status === 'suspended' && processInstance.status === 'suspended' &&
// task.state === 'READY' && task.state === 'READY' &&
// !showingActiveTask() !showingActiveTask()
// ); );
}; };
const getEvents = (task: Task) => { const getEvents = (task: Task) => {