diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py index 489b710c7..a29727933 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py @@ -170,7 +170,17 @@ def process_instance_terminate( """Process_instance_run.""" process_instance = _find_process_instance_by_id_or_raise(process_instance_id) processor = ProcessInstanceProcessor(process_instance) - processor.terminate() + + try: + processor.lock_process_instance("Web") + processor.terminate() + except (ProcessInstanceIsNotEnqueuedError, ProcessInstanceIsAlreadyLockedError) as e: + ErrorHandlingService().handle_error(processor, e) + raise e + finally: + if ProcessInstanceLockService.has_lock(process_instance.id): + processor.unlock_process_instance("Web") + return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") @@ -180,7 +190,18 @@ def process_instance_suspend( ) -> flask.wrappers.Response: """Process_instance_suspend.""" process_instance = _find_process_instance_by_id_or_raise(process_instance_id) - ProcessInstanceProcessor.suspend(process_instance) + processor = ProcessInstanceProcessor(process_instance) + + try: + processor.lock_process_instance("Web") + processor.suspend() + except (ProcessInstanceIsNotEnqueuedError, ProcessInstanceIsAlreadyLockedError) as e: + ErrorHandlingService().handle_error(processor, e) + raise e + finally: + if ProcessInstanceLockService.has_lock(process_instance.id): + processor.unlock_process_instance("Web") + return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") @@ -190,7 +211,18 @@ def process_instance_resume( ) -> flask.wrappers.Response: """Process_instance_resume.""" process_instance = _find_process_instance_by_id_or_raise(process_instance_id) - ProcessInstanceProcessor.resume(process_instance) + processor = ProcessInstanceProcessor(process_instance) + + try: + processor.lock_process_instance("Web") + processor.resume() + except (ProcessInstanceIsNotEnqueuedError, ProcessInstanceIsAlreadyLockedError) as e: + ErrorHandlingService().handle_error(processor, e) + raise e + finally: + if ProcessInstanceLockService.has_lock(process_instance.id): + processor.unlock_process_instance("Web") + return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 5e771c12a..fda3d395b 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1249,7 +1249,7 @@ class ProcessInstanceProcessor: self.add_step() self.save() # Saving the workflow seems to reset the status - self.suspend(self.process_instance_model) + self.suspend() def reset_process(self, spiff_step: int) -> None: """Reset a process to an earlier state.""" @@ -1292,7 +1292,7 @@ class ProcessInstanceProcessor: db.session.delete(row) self.save() - self.suspend(self.process_instance_model) + self.suspend() @staticmethod def get_parser() -> MyCustomParser: @@ -1900,16 +1900,14 @@ class ProcessInstanceProcessor: db.session.add(self.process_instance_model) db.session.commit() - @classmethod - def suspend(cls, process_instance: ProcessInstanceModel) -> None: + def suspend(self) -> None: """Suspend.""" - process_instance.status = ProcessInstanceStatus.suspended.value - db.session.add(process_instance) + self.process_instance_model.status = ProcessInstanceStatus.suspended.value + db.session.add(self.process_instance_model) db.session.commit() - @classmethod - def resume(cls, process_instance: ProcessInstanceModel) -> None: + def resume(self) -> None: """Resume.""" - process_instance.status = ProcessInstanceStatus.waiting.value - db.session.add(process_instance) + self.process_instance_model.status = ProcessInstanceStatus.waiting.value + db.session.add(self.process_instance_model) db.session.commit() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py index 5e149965d..d7ea5613b 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py @@ -25,6 +25,7 @@ from spiffworkflow_backend.models.process_instance_file_data import ( from spiffworkflow_backend.models.process_model import ProcessModelInfo from spiffworkflow_backend.models.task import Task from spiffworkflow_backend.models.user import UserModel +from spiffworkflow_backend.services.assertion_service import safe_assertion from spiffworkflow_backend.services.authorization_service import AuthorizationService from spiffworkflow_backend.services.git_service import GitCommandError from spiffworkflow_backend.services.git_service import GitService @@ -95,6 +96,13 @@ class ProcessInstanceService: ) process_instance_lock_prefix = "Background" for process_instance in records: + with safe_assertion(process_instance.status == status_value) as false_assumption: + if false_assumption: + raise AssertionError( + f"Queue assumed process instance {process_instance.id} has status of {status_value} " + f"when it really is {process_instance.status}" + ) + locked = False processor = None try: diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py index 086841c08..f7f644ddb 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py @@ -1418,7 +1418,7 @@ class TestProcessApi(BaseTest): ) processor.save() - processor.suspend(process_instance) + processor.suspend() payload["description"] = "Message To Suspended" response = client.post( f"/v1.0/messages/{message_model_identifier}", @@ -1430,7 +1430,7 @@ class TestProcessApi(BaseTest): assert response.json assert response.json["error_code"] == "message_not_accepted" - processor.resume(process_instance) + processor.resume() payload["description"] = "Message To Resumed" response = client.post( f"/v1.0/messages/{message_model_identifier}",