Feature/reset pi go button (#978)

* delete human task for task that is being reset to in a process instance w/ burnettk

* added script to remove duplicate human tasks from the database w/ burnettk

* test that human tasks are not duplicated during pi reset w/ burnettk

---------

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
jasquat 2024-02-06 15:50:00 -05:00 committed by GitHub
parent a61f2d6927
commit 5ae59367cb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 43 additions and 4 deletions

View File

@ -5,6 +5,7 @@ from spiffworkflow_backend import create_app
from spiffworkflow_backend.data_migrations.version_1_3 import VersionOneThree from spiffworkflow_backend.data_migrations.version_1_3 import VersionOneThree
from spiffworkflow_backend.data_migrations.version_2 import Version2 from spiffworkflow_backend.data_migrations.version_2 import Version2
from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.human_task import HumanTaskModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from sqlalchemy import update from sqlalchemy import update
@ -39,6 +40,27 @@ def put_serializer_version_onto_numeric_track() -> None:
db.session.commit() db.session.commit()
@benchmark_log_func
def remove_duplicate_human_task_rows() -> None:
result = (
db.session.query(HumanTaskModel.process_instance_id, HumanTaskModel.task_guid, db.func.count().label("ct"))
.group_by(HumanTaskModel.task_guid, HumanTaskModel.process_instance_id)
.having(db.func.count() > 1)
.all()
)
# Process the result as needed
rows_to_delete = []
for row in result:
human_tasks = (
HumanTaskModel.query.filter_by(task_guid=row.task_guid).order_by(HumanTaskModel.created_at_in_seconds.desc()).all()
)
rows_to_delete = rows_to_delete + human_tasks[1:]
for row in rows_to_delete:
db.session.delete(row)
db.session.commit()
def all_potentially_relevant_process_instances() -> list[ProcessInstanceModel]: def all_potentially_relevant_process_instances() -> list[ProcessInstanceModel]:
return ProcessInstanceModel.query.filter( return ProcessInstanceModel.query.filter(
ProcessInstanceModel.spiff_serializer_version < Version2.version(), ProcessInstanceModel.spiff_serializer_version < Version2.version(),
@ -65,6 +87,7 @@ def main() -> None:
current_app.logger.debug(f"data_migrations/run_all::create_app took {end_time - start_time} seconds") current_app.logger.debug(f"data_migrations/run_all::create_app took {end_time - start_time} seconds")
start_time = time.time() start_time = time.time()
put_serializer_version_onto_numeric_track() put_serializer_version_onto_numeric_track()
remove_duplicate_human_task_rows()
process_instances = all_potentially_relevant_process_instances() process_instances = all_potentially_relevant_process_instances()
potentially_relevant_instance_count = len(process_instances) potentially_relevant_instance_count = len(process_instances)
current_app.logger.debug(f"Found potentially relevant process_instances: {potentially_relevant_instance_count}") current_app.logger.debug(f"Found potentially relevant process_instances: {potentially_relevant_instance_count}")

View File

@ -1230,7 +1230,7 @@ class ProcessInstanceProcessor:
serializer=processor._serializer, serializer=processor._serializer,
bpmn_definition_to_task_definitions_mappings=processor.bpmn_definition_to_task_definitions_mappings, bpmn_definition_to_task_definitions_mappings=processor.bpmn_definition_to_task_definitions_mappings,
) )
task_service.update_all_tasks_from_spiff_tasks(spiff_tasks, deleted_tasks, start_time) task_service.update_all_tasks_from_spiff_tasks(spiff_tasks, deleted_tasks, start_time, to_task_guid=to_task_guid)
# Save the process # Save the process
processor.save() processor.save()

View File

@ -446,13 +446,25 @@ class TaskService:
self.task_models[task_model.guid] = task_model self.task_models[task_model.guid] = task_model
def update_all_tasks_from_spiff_tasks( def update_all_tasks_from_spiff_tasks(
self, spiff_tasks: list[SpiffTask], deleted_spiff_tasks: list[SpiffTask], start_time: float self,
spiff_tasks: list[SpiffTask],
deleted_spiff_tasks: list[SpiffTask],
start_time: float,
to_task_guid: str | None = None,
) -> None: ) -> None:
"""Update given spiff tasks in the database and remove deleted tasks.""" """Update given spiff tasks in the database and remove deleted tasks."""
# Remove all the deleted/pruned tasks from the database. # Remove all the deleted/pruned tasks from the database.
deleted_task_guids = [str(t.id) for t in deleted_spiff_tasks] deleted_task_guids = [str(t.id) for t in deleted_spiff_tasks]
tasks_to_clear = TaskModel.query.filter(TaskModel.guid.in_(deleted_task_guids)).all() # type: ignore tasks_to_clear = TaskModel.query.filter(TaskModel.guid.in_(deleted_task_guids)).all() # type: ignore
human_tasks_to_clear = HumanTaskModel.query.filter(HumanTaskModel.task_id.in_(deleted_task_guids)).all() # type: ignore
human_task_guids_to_clear = deleted_task_guids
# ensure we clear out any human tasks that were associated with this guid in case it was a human task
if to_task_guid is not None:
human_task_guids_to_clear.append(to_task_guid)
human_tasks_to_clear = HumanTaskModel.query.filter(
HumanTaskModel.task_id.in_(human_task_guids_to_clear) # type: ignore
).all()
# delete human tasks first to avoid potential conflicts when deleting tasks. # delete human tasks first to avoid potential conflicts when deleting tasks.
# otherwise sqlalchemy returns several warnings. # otherwise sqlalchemy returns several warnings.

View File

@ -257,14 +257,18 @@ class TestProcessInstanceProcessor(BaseTest):
human_task_one.task_name, processor.bpmn_process_instance human_task_one.task_name, processor.bpmn_process_instance
) )
assert spiff_manual_task is not None assert spiff_manual_task is not None
ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one)
processor.suspend() processor.suspend()
ProcessInstanceProcessor.reset_process(process_instance, str(spiff_manual_task.parent.id)) ProcessInstanceProcessor.reset_process(process_instance, str(spiff_manual_task.id))
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first()
processor = ProcessInstanceProcessor(process_instance) processor = ProcessInstanceProcessor(process_instance)
processor.resume() processor.resume()
processor.do_engine_steps(save=True) processor.do_engine_steps(save=True)
# if if there are more human tasks then they were duplicated in the reset process method
assert len(process_instance.human_tasks) == 1
human_task_one = process_instance.active_human_tasks[0] human_task_one = process_instance.active_human_tasks[0]
spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id))
ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one)