diff --git a/spiffworkflow-backend/bin/query_tasks b/spiffworkflow-backend/bin/query_tasks index 037cb5ff..8cd6984e 100755 --- a/spiffworkflow-backend/bin/query_tasks +++ b/spiffworkflow-backend/bin/query_tasks @@ -7,18 +7,18 @@ function error_handler() { trap 'error_handler ${LINENO} $?' ERR set -o errtrace -o errexit -o nounset -o pipefail -mysql -uroot spiffworkflow_backend_local_development -e ' +mysql -uroot spiffworkflow_backend_unit_testing -e ' select * from process_instance; select t.guid as task_guid, t.state as task_state, td.bpmn_identifier as task_id from task t join task_definition td on td.id = t.task_definition_id - where process_instance_id=(select max(id) from process_instance) and td.bpmn_identifier = "top_level_subprocess"; + where process_instance_id=(select max(id) from process_instance); select bp.guid as bp_guid, bpd.bpmn_identifier as bp_identifier from bpmn_process bp join bpmn_process_definition bpd on bpd.id = bp.bpmn_process_definition_id join bpmn_process bpb on bpb.id = bp.direct_parent_process_id join process_instance pi on bpb.id = pi.bpmn_process_id - where bpd.bpmn_identifier = "top_level_subprocess" and pi.id = (select max(id) from process_instance); + where pi.id = (select max(id) from process_instance); ' # mysql -uroot spiffworkflow_backend_local_development -e '\ diff --git a/spiffworkflow-backend/pyproject.toml b/spiffworkflow-backend/pyproject.toml index c7edf450..81f12deb 100644 --- a/spiffworkflow-backend/pyproject.toml +++ b/spiffworkflow-backend/pyproject.toml @@ -29,7 +29,7 @@ flask-restful = "*" werkzeug = "*" SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "main"} # SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "6cad2981712bb61eca23af1adfafce02d3277cb9"} -# SpiffWorkflow = {develop = true, path = "../../SpiffWorkflow" } +# SpiffWorkflow = {develop = true, path = "../../spiffworkflow/" } sentry-sdk = "^1.10" sphinx-autoapi = "^2.0" flask-bpmn = {git = "https://github.com/sartography/flask-bpmn", rev = "main"} diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/config/__init__.py b/spiffworkflow-backend/src/spiffworkflow_backend/config/__init__.py index d61fa085..a7115745 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/config/__init__.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/config/__init__.py @@ -18,13 +18,13 @@ def setup_database_uri(app: Flask) -> None: if app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_URI") is None: database_name = f"spiffworkflow_backend_{app.config['ENV_IDENTIFIER']}" if app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_TYPE") == "sqlite": - app.config["SQLALCHEMY_DATABASE_URI"] = ( - f"sqlite:///{app.instance_path}/db_{app.config['ENV_IDENTIFIER']}.sqlite3" - ) + app.config[ + "SQLALCHEMY_DATABASE_URI" + ] = f"sqlite:///{app.instance_path}/db_{app.config['ENV_IDENTIFIER']}.sqlite3" elif app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_TYPE") == "postgres": - app.config["SQLALCHEMY_DATABASE_URI"] = ( - f"postgresql://spiffworkflow_backend:spiffworkflow_backend@localhost:5432/{database_name}" - ) + app.config[ + "SQLALCHEMY_DATABASE_URI" + ] = f"postgresql://spiffworkflow_backend:spiffworkflow_backend@localhost:5432/{database_name}" else: # use pswd to trick flake8 with hardcoded passwords db_pswd = app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_PASSWORD") diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py index 1668565c..078925e7 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py @@ -129,9 +129,9 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel): def serialized_with_metadata(self) -> dict[str, Any]: process_instance_attributes = self.serialized process_instance_attributes["process_metadata"] = self.process_metadata - process_instance_attributes["process_model_with_diagram_identifier"] = ( - self.process_model_with_diagram_identifier - ) + process_instance_attributes[ + "process_model_with_diagram_identifier" + ] = self.process_model_with_diagram_identifier return process_instance_attributes @property diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index d15a6e80..2a740697 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -2,7 +2,6 @@ # TODO: clean up this service for a clear distinction between it and the process_instance_service # where this points to the pi service import _strptime # type: ignore -import copy import decimal import json import logging @@ -53,7 +52,6 @@ from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import TaskState from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore from sqlalchemy import and_ -from sqlalchemy import or_ from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel @@ -423,9 +421,9 @@ class ProcessInstanceProcessor: tld.process_instance_id = process_instance_model.id # we want this to be the fully qualified path to the process model including all group subcomponents - current_app.config["THREAD_LOCAL_DATA"].process_model_identifier = ( - f"{process_instance_model.process_model_identifier}" - ) + current_app.config[ + "THREAD_LOCAL_DATA" + ].process_model_identifier = f"{process_instance_model.process_model_identifier}" self.process_instance_model = process_instance_model self.process_model_service = ProcessModelService() @@ -585,9 +583,9 @@ class ProcessInstanceProcessor: bpmn_subprocess_definition.bpmn_identifier ] = bpmn_process_definition_dict spiff_bpmn_process_dict["subprocess_specs"][bpmn_subprocess_definition.bpmn_identifier]["task_specs"] = {} - bpmn_subprocess_definition_bpmn_identifiers[bpmn_subprocess_definition.id] = ( - bpmn_subprocess_definition.bpmn_identifier - ) + bpmn_subprocess_definition_bpmn_identifiers[ + bpmn_subprocess_definition.id + ] = bpmn_subprocess_definition.bpmn_identifier task_definitions = TaskDefinitionModel.query.filter( TaskDefinitionModel.bpmn_process_definition_id.in_( # type: ignore @@ -1199,140 +1197,27 @@ class ProcessInstanceProcessor: # Saving the workflow seems to reset the status self.suspend() - # FIXME: this currently cannot work for multi-instance tasks and loopback. It can somewhat for not those - # if we can properly handling resetting children tasks. Right now if we set them all to FUTURE then - # they never get picked up by spiff and processed. The process instance just stops after the to_task_guid - # and marks itself complete without processing any of the children. @classmethod def reset_process(cls, process_instance: ProcessInstanceModel, to_task_guid: str) -> None: """Reset a process to an earlier state.""" - # raise Exception("This feature to reset a process instance to a given task is currently unavaiable") + start_time = time.time() + + # Log the event that we are moving back to a previous task. ProcessInstanceTmpService.add_event_to_process_instance( process_instance, ProcessInstanceEventType.process_instance_rewound_to_task.value, task_guid=to_task_guid ) - - to_task_model = TaskModel.query.filter_by(guid=to_task_guid, process_instance_id=process_instance.id).first() - if to_task_model is None: - raise TaskNotFoundError( - f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'" - ) - # If this task model has a parent boundary event, reset to that point instead, - # so we can reset all the boundary timers, etc... - parent_id = to_task_model.properties_json.get("parent", "") - parent = TaskModel.query.filter_by(guid=parent_id).first() - is_boundary_parent = False - if parent and parent.task_definition.typename == "_BoundaryEventParent": - to_task_model = parent - is_boundary_parent = True # Will need to complete this task at the end so we are on the correct process. - - # NOTE: run ALL queries before making changes to ensure we get everything before anything changes - parent_bpmn_processes, task_models_of_parent_bpmn_processes = TaskService.task_models_of_parent_bpmn_processes( - to_task_model - ) - task_models_of_parent_bpmn_processes_guids = [p.guid for p in task_models_of_parent_bpmn_processes if p.guid] - parent_bpmn_processes_ids = [p.id for p in parent_bpmn_processes] - - tasks_to_update_query = db.session.query(TaskModel).filter( - and_( - or_( - TaskModel.end_in_seconds > to_task_model.end_in_seconds, - TaskModel.end_in_seconds.is_(None), # type: ignore - ), - TaskModel.process_instance_id == process_instance.id, - TaskModel.bpmn_process_id.in_(parent_bpmn_processes_ids), # type: ignore - ) - ) - tasks_to_update = tasks_to_update_query.all() - tasks_to_update_guids = [t.guid for t in tasks_to_update] - - tasks_to_delete_query = db.session.query(TaskModel).filter( - and_( - or_( - TaskModel.end_in_seconds > to_task_model.end_in_seconds, - TaskModel.end_in_seconds.is_(None), # type: ignore - ), - TaskModel.process_instance_id == process_instance.id, - TaskModel.guid.not_in(task_models_of_parent_bpmn_processes_guids), # type: ignore - TaskModel.bpmn_process_id.not_in(parent_bpmn_processes_ids), # type: ignore - ) - ) - tasks_to_delete = tasks_to_delete_query.all() - tasks_to_delete_guids = [t.guid for t in tasks_to_delete] - tasks_to_delete_ids = [t.id for t in tasks_to_delete] - - # delete bpmn processes that are also tasks that we either deleted or will update. - # this is to force spiff to recreate those bpmn processes with the correct associated task guids. - bpmn_processes_to_delete_query = db.session.query(BpmnProcessModel).filter( - or_( - BpmnProcessModel.guid.in_(tasks_to_delete_guids), # type: ignore - and_( - BpmnProcessModel.guid.in_(tasks_to_update_guids), # type: ignore - BpmnProcessModel.id.not_in(parent_bpmn_processes_ids), # type: ignore - ), - ) - ) - bpmn_processes_to_delete = bpmn_processes_to_delete_query.order_by( - BpmnProcessModel.id.desc() # type: ignore - ).all() - - # delete any human task that was for a task that we deleted since they will get recreated later. - human_tasks_to_delete = HumanTaskModel.query.filter( - HumanTaskModel.task_model_id.in_(tasks_to_delete_ids) # type: ignore - ).all() - - # ensure the correct order for foreign keys - for human_task_to_delete in human_tasks_to_delete: - db.session.delete(human_task_to_delete) - for task_to_delete in tasks_to_delete: - db.session.delete(task_to_delete) - for bpmn_process_to_delete in bpmn_processes_to_delete: - db.session.delete(bpmn_process_to_delete) - - related_human_task = HumanTaskModel.query.filter_by(task_model_id=to_task_model.id).first() - if related_human_task is not None: - db.session.delete(related_human_task) - - tasks_to_update_ids = [t.id for t in tasks_to_update] - human_tasks_to_delete = HumanTaskModel.query.filter( - HumanTaskModel.task_model_id.in_(tasks_to_update_ids) # type: ignore - ).all() - for human_task_to_delete in human_tasks_to_delete: - db.session.delete(human_task_to_delete) - - for task_to_update in tasks_to_update: - TaskService.reset_task_model(task_to_update, state="FUTURE") - db.session.bulk_save_objects(tasks_to_update) - - parent_task_model = TaskModel.query.filter_by(guid=to_task_model.properties_json["parent"]).first() - if parent_task_model is None: - raise TaskNotFoundError( - f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'" - ) - - TaskService.reset_task_model( - to_task_model, - state="READY", - json_data_hash=parent_task_model.json_data_hash, - python_env_data_hash=parent_task_model.python_env_data_hash, - ) - db.session.add(to_task_model) - for task_model in task_models_of_parent_bpmn_processes: - TaskService.reset_task_model(task_model, state="WAITING") - db.session.bulk_save_objects(task_models_of_parent_bpmn_processes) - - bpmn_process = to_task_model.bpmn_process - properties_json = copy.copy(bpmn_process.properties_json) - properties_json["last_task"] = parent_task_model.guid - bpmn_process.properties_json = properties_json - db.session.add(bpmn_process) - db.session.commit() - processor = ProcessInstanceProcessor(process_instance) + deleted_tasks = processor.bpmn_process_instance.reset_from_task_id(UUID(to_task_guid)) + spiff_tasks = processor.bpmn_process_instance.get_tasks() - # If this as a boundary event parent, run it, so we get back to an active task. - if is_boundary_parent: - processor.do_engine_steps(execution_strategy_name="one_at_a_time") + task_service = TaskService( + process_instance=processor.process_instance_model, + serializer=processor._serializer, + bpmn_definition_to_task_definitions_mappings=processor.bpmn_definition_to_task_definitions_mappings, + ) + task_service.update_all_tasks_from_spiff_tasks(spiff_tasks, deleted_tasks, start_time) + # Save the process processor.save() processor.suspend() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py index 659242e5..d0b74d1d 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py @@ -602,31 +602,6 @@ class TaskService: task_model["start_in_seconds"] = None task_model["end_in_seconds"] = None - @classmethod - def reset_task_model( - cls, - task_model: TaskModel, - state: str, - json_data_hash: Optional[str] = None, - python_env_data_hash: Optional[str] = None, - ) -> None: - if json_data_hash is None: - cls.update_task_data_on_task_model_and_return_dict_if_updated(task_model, {}, "json_data_hash") - else: - task_model.json_data_hash = json_data_hash - if python_env_data_hash is None: - cls.update_task_data_on_task_model_and_return_dict_if_updated(task_model, {}, "python_env_data") - else: - task_model.python_env_data_hash = python_env_data_hash - - task_model.state = state - task_model.start_in_seconds = None - task_model.end_in_seconds = None - - new_properties_json = copy.copy(task_model.properties_json) - new_properties_json["state"] = getattr(TaskState, state) - task_model.properties_json = new_properties_json - @classmethod def get_extensions_from_task_model(cls, task_model: TaskModel) -> dict: task_definition = task_model.task_definition diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index 92e19919..aca824ce 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -469,10 +469,11 @@ class TestProcessInstanceProcessor(BaseTest): processor.suspend() processor = ProcessInstanceProcessor(process_instance) ProcessInstanceProcessor.reset_process(process_instance, str(reset_to_spiff_task.id)) + process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() human_task_one = process_instance.active_human_tasks[0] assert human_task_one.task_title == "Manual Task #1" processor = ProcessInstanceProcessor(process_instance) - processor.manual_complete_task(str(spiff_manual_task.id), execute=True) + processor.manual_complete_task(str(human_task_one.task_id), execute=True) processor = ProcessInstanceProcessor(process_instance) processor.resume() processor.do_engine_steps(save=True)