diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py index 8a389164..fef97417 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py @@ -22,6 +22,7 @@ from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.services.process_instance_processor import CustomBpmnScriptEngine from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor +from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceIsAlreadyLockedError from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService from spiffworkflow_backend.services.user_service import UserService @@ -81,26 +82,35 @@ class MessageService: # Assure we can send the message, otherwise keep going. if message_instance_receive is None or not receiving_process.can_receive_message(): - message_instance_send.status = "ready" message_instance_send.status = "ready" db.session.add(message_instance_send) db.session.commit() return None - # Set the receiving message to running, so it is not altered elswhere ... - message_instance_receive.status = "running" + try: + # currently only controllers and apscheduler call this + cls.raise_if_running_in_celery("correlate_send_message") + with ProcessInstanceQueueService.dequeued(receiving_process): + # Set the receiving message to running, so it is not altered elswhere ... + message_instance_receive.status = "running" - cls.process_message_receive( - receiving_process, message_instance_receive, message_instance_send, execution_mode=execution_mode - ) - message_instance_receive.status = "completed" - message_instance_receive.counterpart_id = message_instance_send.id - db.session.add(message_instance_receive) - message_instance_send.status = "completed" - message_instance_send.counterpart_id = message_instance_receive.id - db.session.add(message_instance_send) - db.session.commit() - return message_instance_receive + cls.process_message_receive( + receiving_process, message_instance_receive, message_instance_send, execution_mode=execution_mode + ) + message_instance_receive.status = "completed" + message_instance_receive.counterpart_id = message_instance_send.id + db.session.add(message_instance_receive) + message_instance_send.status = "completed" + message_instance_send.counterpart_id = message_instance_receive.id + db.session.add(message_instance_send) + db.session.commit() + return message_instance_receive + + except ProcessInstanceIsAlreadyLockedError: + message_instance_send.status = "ready" + db.session.add(message_instance_send) + db.session.commit() + return None except Exception as exception: db.session.rollback() @@ -129,12 +139,7 @@ class MessageService: user: UserModel, ) -> ProcessInstanceModel: """Start up a process instance, so it is ready to catch the event.""" - if os.environ.get("SPIFFWORKFLOW_BACKEND_RUNNING_IN_CELERY_WORKER") == "true": - raise MessageServiceError( - "Calling start_process_with_message in a celery worker. This is not supported! (We may need to add" - " additional_processing_identifier to this code path." - ) - + cls.raise_if_running_in_celery("start_process_with_message") process_instance_receive = ProcessInstanceService.create_process_instance_from_process_model_identifier( message_triggerable_process_model.process_model_identifier, user, @@ -300,3 +305,11 @@ class MessageService: ) ) return process_instance_receive + + @classmethod + def raise_if_running_in_celery(cls, method_name: str) -> None: + if os.environ.get("SPIFFWORKFLOW_BACKEND_RUNNING_IN_CELERY_WORKER") == "true": + raise MessageServiceError( + f"Calling {method_name} in a celery worker. This is not supported! We may need to add" + " additional_processing_identifier to this code path." + ) diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_background_processing_service.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_background_processing_service.py index 959c126e..9d9509bd 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_background_processing_service.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_background_processing_service.py @@ -1,10 +1,16 @@ +import time + from flask import Flask from pytest_mock.plugin import MockerFixture from spiffworkflow_backend.background_processing.background_processing_service import BackgroundProcessingService from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.future_task import FutureTaskModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel +from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus +from spiffworkflow_backend.models.process_instance_queue import ProcessInstanceQueueModel from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor +from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService +from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.test_data import load_test_spec @@ -64,6 +70,30 @@ class TestBackgroundProcessingService(BaseTest): future_tasks = BackgroundProcessingService.imminent_future_tasks(99999999999999999) assert len(future_tasks) == 1 + def test_do_waiting_errors_gracefully_when_instance_already_locked( + self, + app: Flask, + mocker: MockerFixture, + with_db_and_bpmn_file_cleanup: None, + ) -> None: + process_model = load_test_spec( + process_model_id="test_group/model_with_lanes", + bpmn_file_name="lanes.bpmn", + process_model_source_directory="model_with_lanes", + ) + process_instance = self.create_process_instance_from_process_model(process_model=process_model, status="waiting") + assert process_instance.status == ProcessInstanceStatus.waiting.value + queue_entry = ProcessInstanceQueueModel.query.filter_by(process_instance_id=process_instance.id).first() + assert queue_entry is not None + queue_entry.locked_by = "test:test_waiting" + queue_entry.locked_at_seconds = round(time.time()) + db.session.add(queue_entry) + db.session.commit() + + mocker.patch.object(ProcessInstanceQueueService, "peek_many", return_value=[process_instance.id]) + ProcessInstanceService.do_waiting(ProcessInstanceStatus.waiting.value) + assert process_instance.status == ProcessInstanceStatus.waiting.value + def _load_up_a_future_task_and_return_instance(self) -> ProcessInstanceModel: process_model = load_test_spec( process_model_id="test_group/user-task-with-timer", diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_instance.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_instance.py index c4967588..1fc9ad54 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_instance.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_instance.py @@ -10,7 +10,7 @@ from tests.spiffworkflow_backend.helpers.test_data import load_test_spec class TestMessageInstance(BaseTest): - def setup_message_tests(self, client: FlaskClient) -> ProcessModelInfo: + def setup_message_tests(self) -> ProcessModelInfo: process_model_id = "testk_group/hello_world" bpmn_file_name = "hello_world.bpmn" bpmn_file_location = "hello_world" @@ -28,7 +28,7 @@ class TestMessageInstance(BaseTest): with_db_and_bpmn_file_cleanup: None, ) -> None: message_name = "Message Model One" - process_model = self.setup_message_tests(client) + process_model = self.setup_message_tests() process_instance = self.create_process_instance_from_process_model(process_model, "waiting") queued_message = MessageInstanceModel( @@ -54,7 +54,7 @@ class TestMessageInstance(BaseTest): with_db_and_bpmn_file_cleanup: None, ) -> None: message_name = "message_model_one" - process_model = self.setup_message_tests(client) + process_model = self.setup_message_tests() process_instance = self.create_process_instance_from_process_model(process_model, "waiting") with pytest.raises(ValueError) as exception: @@ -87,7 +87,7 @@ class TestMessageInstance(BaseTest): with_db_and_bpmn_file_cleanup: None, ) -> None: message_name = "message_model_one" - process_model = self.setup_message_tests(client) + process_model = self.setup_message_tests() process_instance = self.create_process_instance_from_process_model(process_model, "waiting") with pytest.raises(ValueError) as exception: @@ -119,7 +119,7 @@ class TestMessageInstance(BaseTest): with_db_and_bpmn_file_cleanup: None, ) -> None: message_name = "message_model_one" - process_model = self.setup_message_tests(client) + process_model = self.setup_message_tests() process_instance = self.create_process_instance_from_process_model(process_model, "waiting") queued_message = MessageInstanceModel( diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py index d440e6b8..50dbdb6d 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py @@ -1,3 +1,5 @@ +import time + from flask import Flask from flask.testing import FlaskClient from spiffworkflow_backend.models.db import db @@ -5,6 +7,7 @@ from spiffworkflow_backend.models.message_instance import MessageInstanceModel from spiffworkflow_backend.models.message_triggerable_process_model import MessageTriggerableProcessModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus +from spiffworkflow_backend.models.process_instance_queue import ProcessInstanceQueueModel from spiffworkflow_backend.services.message_service import MessageService from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService @@ -289,3 +292,71 @@ class TestMessageService(BaseTest): message_name="travel_start_test_v2" ).first() assert message_triggerable_process_model is None + + def test_correlate_can_handle_process_instance_already_locked( + self, + app: Flask, + client: FlaskClient, + with_db_and_bpmn_file_cleanup: None, + ) -> None: + # self.create_process_group_with_api(client, with_super_admin_user, process_group_id, process_group_id) + + process_model_sender = load_test_spec( + "test_group/message_sender", + process_model_source_directory="message_send_two_conversations", + bpmn_file_name="message_sender", + ) + load_test_spec( + "test_group/message_receiver_one", + process_model_source_directory="message_send_two_conversations", + bpmn_file_name="message_receiver_one", + ) + load_test_spec( + "test_group/message_receiver_two", + process_model_source_directory="message_send_two_conversations", + bpmn_file_name="message_receiver_two", + ) + + user = self.find_or_create_user() + + process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier( + process_model_sender.id, user + ) + + processor_sender = ProcessInstanceProcessor(process_instance) + processor_sender.do_engine_steps(save=True) + + # At this point, the message_sender process has fired two different messages but those + # processes have not started, and it is now paused, waiting for to receive a message. so + # we should have two sends and a receive. + assert MessageInstanceModel.query.filter_by(process_instance_id=process_instance.id).count() == 3 + assert MessageInstanceModel.query.count() == 3 # all messages are related to the instance + orig_send_messages = MessageInstanceModel.query.filter_by(message_type="send").all() + assert len(orig_send_messages) == 2 + assert MessageInstanceModel.query.filter_by(message_type="receive").count() == 1 + + queue_entry = ProcessInstanceQueueModel.query.filter_by(process_instance_id=process_instance.id).first() + assert queue_entry is not None + queue_entry.locked_by = "test:test_waiting" + queue_entry.locked_at_seconds = round(time.time()) + db.session.add(queue_entry) + db.session.commit() + + MessageService.correlate_all_message_instances() + + assert ProcessInstanceModel.query.count() == 3 + MessageService.correlate_all_message_instances() + + message_send_instances = MessageInstanceModel.query.filter_by(name="Message Response Two", message_type="send").all() + assert len(message_send_instances) == 1 + message_send_instance = message_send_instances[0] + assert message_send_instance.status == "ready" + assert message_send_instance.failure_cause is None + + message_receive_instances = MessageInstanceModel.query.filter_by( + name="Message Response Two", message_type="receive" + ).all() + assert len(message_receive_instances) == 1 + message_receive_instance = message_receive_instances[0] + assert message_receive_instance.status == "ready" + assert message_receive_instance.failure_cause is None