Handle pi lock from message service (#1278)

* handle process instance already locked in message service

* moved celery check to own method in message sevice w/ burnettk

* removed initial implementation w/ burnettk

---------

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
jasquat 2024-03-26 20:04:15 +00:00 committed by GitHub
parent aa7c2b587b
commit 930cfb720c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 139 additions and 25 deletions

View File

@ -22,6 +22,7 @@ from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_instance_processor import CustomBpmnScriptEngine from spiffworkflow_backend.services.process_instance_processor import CustomBpmnScriptEngine
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceIsAlreadyLockedError
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService
from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService
from spiffworkflow_backend.services.user_service import UserService from spiffworkflow_backend.services.user_service import UserService
@ -81,26 +82,35 @@ class MessageService:
# Assure we can send the message, otherwise keep going. # Assure we can send the message, otherwise keep going.
if message_instance_receive is None or not receiving_process.can_receive_message(): if message_instance_receive is None or not receiving_process.can_receive_message():
message_instance_send.status = "ready"
message_instance_send.status = "ready" message_instance_send.status = "ready"
db.session.add(message_instance_send) db.session.add(message_instance_send)
db.session.commit() db.session.commit()
return None return None
# Set the receiving message to running, so it is not altered elswhere ... try:
message_instance_receive.status = "running" # currently only controllers and apscheduler call this
cls.raise_if_running_in_celery("correlate_send_message")
with ProcessInstanceQueueService.dequeued(receiving_process):
# Set the receiving message to running, so it is not altered elswhere ...
message_instance_receive.status = "running"
cls.process_message_receive( cls.process_message_receive(
receiving_process, message_instance_receive, message_instance_send, execution_mode=execution_mode receiving_process, message_instance_receive, message_instance_send, execution_mode=execution_mode
) )
message_instance_receive.status = "completed" message_instance_receive.status = "completed"
message_instance_receive.counterpart_id = message_instance_send.id message_instance_receive.counterpart_id = message_instance_send.id
db.session.add(message_instance_receive) db.session.add(message_instance_receive)
message_instance_send.status = "completed" message_instance_send.status = "completed"
message_instance_send.counterpart_id = message_instance_receive.id message_instance_send.counterpart_id = message_instance_receive.id
db.session.add(message_instance_send) db.session.add(message_instance_send)
db.session.commit() db.session.commit()
return message_instance_receive return message_instance_receive
except ProcessInstanceIsAlreadyLockedError:
message_instance_send.status = "ready"
db.session.add(message_instance_send)
db.session.commit()
return None
except Exception as exception: except Exception as exception:
db.session.rollback() db.session.rollback()
@ -129,12 +139,7 @@ class MessageService:
user: UserModel, user: UserModel,
) -> ProcessInstanceModel: ) -> ProcessInstanceModel:
"""Start up a process instance, so it is ready to catch the event.""" """Start up a process instance, so it is ready to catch the event."""
if os.environ.get("SPIFFWORKFLOW_BACKEND_RUNNING_IN_CELERY_WORKER") == "true": cls.raise_if_running_in_celery("start_process_with_message")
raise MessageServiceError(
"Calling start_process_with_message in a celery worker. This is not supported! (We may need to add"
" additional_processing_identifier to this code path."
)
process_instance_receive = ProcessInstanceService.create_process_instance_from_process_model_identifier( process_instance_receive = ProcessInstanceService.create_process_instance_from_process_model_identifier(
message_triggerable_process_model.process_model_identifier, message_triggerable_process_model.process_model_identifier,
user, user,
@ -300,3 +305,11 @@ class MessageService:
) )
) )
return process_instance_receive return process_instance_receive
@classmethod
def raise_if_running_in_celery(cls, method_name: str) -> None:
if os.environ.get("SPIFFWORKFLOW_BACKEND_RUNNING_IN_CELERY_WORKER") == "true":
raise MessageServiceError(
f"Calling {method_name} in a celery worker. This is not supported! We may need to add"
" additional_processing_identifier to this code path."
)

View File

@ -1,10 +1,16 @@
import time
from flask import Flask from flask import Flask
from pytest_mock.plugin import MockerFixture from pytest_mock.plugin import MockerFixture
from spiffworkflow_backend.background_processing.background_processing_service import BackgroundProcessingService from spiffworkflow_backend.background_processing.background_processing_service import BackgroundProcessingService
from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.future_task import FutureTaskModel from spiffworkflow_backend.models.future_task import FutureTaskModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
from spiffworkflow_backend.models.process_instance_queue import ProcessInstanceQueueModel
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService
from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService
from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
@ -64,6 +70,30 @@ class TestBackgroundProcessingService(BaseTest):
future_tasks = BackgroundProcessingService.imminent_future_tasks(99999999999999999) future_tasks = BackgroundProcessingService.imminent_future_tasks(99999999999999999)
assert len(future_tasks) == 1 assert len(future_tasks) == 1
def test_do_waiting_errors_gracefully_when_instance_already_locked(
self,
app: Flask,
mocker: MockerFixture,
with_db_and_bpmn_file_cleanup: None,
) -> None:
process_model = load_test_spec(
process_model_id="test_group/model_with_lanes",
bpmn_file_name="lanes.bpmn",
process_model_source_directory="model_with_lanes",
)
process_instance = self.create_process_instance_from_process_model(process_model=process_model, status="waiting")
assert process_instance.status == ProcessInstanceStatus.waiting.value
queue_entry = ProcessInstanceQueueModel.query.filter_by(process_instance_id=process_instance.id).first()
assert queue_entry is not None
queue_entry.locked_by = "test:test_waiting"
queue_entry.locked_at_seconds = round(time.time())
db.session.add(queue_entry)
db.session.commit()
mocker.patch.object(ProcessInstanceQueueService, "peek_many", return_value=[process_instance.id])
ProcessInstanceService.do_waiting(ProcessInstanceStatus.waiting.value)
assert process_instance.status == ProcessInstanceStatus.waiting.value
def _load_up_a_future_task_and_return_instance(self) -> ProcessInstanceModel: def _load_up_a_future_task_and_return_instance(self) -> ProcessInstanceModel:
process_model = load_test_spec( process_model = load_test_spec(
process_model_id="test_group/user-task-with-timer", process_model_id="test_group/user-task-with-timer",

View File

@ -10,7 +10,7 @@ from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
class TestMessageInstance(BaseTest): class TestMessageInstance(BaseTest):
def setup_message_tests(self, client: FlaskClient) -> ProcessModelInfo: def setup_message_tests(self) -> ProcessModelInfo:
process_model_id = "testk_group/hello_world" process_model_id = "testk_group/hello_world"
bpmn_file_name = "hello_world.bpmn" bpmn_file_name = "hello_world.bpmn"
bpmn_file_location = "hello_world" bpmn_file_location = "hello_world"
@ -28,7 +28,7 @@ class TestMessageInstance(BaseTest):
with_db_and_bpmn_file_cleanup: None, with_db_and_bpmn_file_cleanup: None,
) -> None: ) -> None:
message_name = "Message Model One" message_name = "Message Model One"
process_model = self.setup_message_tests(client) process_model = self.setup_message_tests()
process_instance = self.create_process_instance_from_process_model(process_model, "waiting") process_instance = self.create_process_instance_from_process_model(process_model, "waiting")
queued_message = MessageInstanceModel( queued_message = MessageInstanceModel(
@ -54,7 +54,7 @@ class TestMessageInstance(BaseTest):
with_db_and_bpmn_file_cleanup: None, with_db_and_bpmn_file_cleanup: None,
) -> None: ) -> None:
message_name = "message_model_one" message_name = "message_model_one"
process_model = self.setup_message_tests(client) process_model = self.setup_message_tests()
process_instance = self.create_process_instance_from_process_model(process_model, "waiting") process_instance = self.create_process_instance_from_process_model(process_model, "waiting")
with pytest.raises(ValueError) as exception: with pytest.raises(ValueError) as exception:
@ -87,7 +87,7 @@ class TestMessageInstance(BaseTest):
with_db_and_bpmn_file_cleanup: None, with_db_and_bpmn_file_cleanup: None,
) -> None: ) -> None:
message_name = "message_model_one" message_name = "message_model_one"
process_model = self.setup_message_tests(client) process_model = self.setup_message_tests()
process_instance = self.create_process_instance_from_process_model(process_model, "waiting") process_instance = self.create_process_instance_from_process_model(process_model, "waiting")
with pytest.raises(ValueError) as exception: with pytest.raises(ValueError) as exception:
@ -119,7 +119,7 @@ class TestMessageInstance(BaseTest):
with_db_and_bpmn_file_cleanup: None, with_db_and_bpmn_file_cleanup: None,
) -> None: ) -> None:
message_name = "message_model_one" message_name = "message_model_one"
process_model = self.setup_message_tests(client) process_model = self.setup_message_tests()
process_instance = self.create_process_instance_from_process_model(process_model, "waiting") process_instance = self.create_process_instance_from_process_model(process_model, "waiting")
queued_message = MessageInstanceModel( queued_message = MessageInstanceModel(

View File

@ -1,3 +1,5 @@
import time
from flask import Flask from flask import Flask
from flask.testing import FlaskClient from flask.testing import FlaskClient
from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.db import db
@ -5,6 +7,7 @@ from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_triggerable_process_model import MessageTriggerableProcessModel from spiffworkflow_backend.models.message_triggerable_process_model import MessageTriggerableProcessModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
from spiffworkflow_backend.models.process_instance_queue import ProcessInstanceQueueModel
from spiffworkflow_backend.services.message_service import MessageService from spiffworkflow_backend.services.message_service import MessageService
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService
@ -289,3 +292,71 @@ class TestMessageService(BaseTest):
message_name="travel_start_test_v2" message_name="travel_start_test_v2"
).first() ).first()
assert message_triggerable_process_model is None assert message_triggerable_process_model is None
def test_correlate_can_handle_process_instance_already_locked(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
# self.create_process_group_with_api(client, with_super_admin_user, process_group_id, process_group_id)
process_model_sender = load_test_spec(
"test_group/message_sender",
process_model_source_directory="message_send_two_conversations",
bpmn_file_name="message_sender",
)
load_test_spec(
"test_group/message_receiver_one",
process_model_source_directory="message_send_two_conversations",
bpmn_file_name="message_receiver_one",
)
load_test_spec(
"test_group/message_receiver_two",
process_model_source_directory="message_send_two_conversations",
bpmn_file_name="message_receiver_two",
)
user = self.find_or_create_user()
process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier(
process_model_sender.id, user
)
processor_sender = ProcessInstanceProcessor(process_instance)
processor_sender.do_engine_steps(save=True)
# At this point, the message_sender process has fired two different messages but those
# processes have not started, and it is now paused, waiting for to receive a message. so
# we should have two sends and a receive.
assert MessageInstanceModel.query.filter_by(process_instance_id=process_instance.id).count() == 3
assert MessageInstanceModel.query.count() == 3 # all messages are related to the instance
orig_send_messages = MessageInstanceModel.query.filter_by(message_type="send").all()
assert len(orig_send_messages) == 2
assert MessageInstanceModel.query.filter_by(message_type="receive").count() == 1
queue_entry = ProcessInstanceQueueModel.query.filter_by(process_instance_id=process_instance.id).first()
assert queue_entry is not None
queue_entry.locked_by = "test:test_waiting"
queue_entry.locked_at_seconds = round(time.time())
db.session.add(queue_entry)
db.session.commit()
MessageService.correlate_all_message_instances()
assert ProcessInstanceModel.query.count() == 3
MessageService.correlate_all_message_instances()
message_send_instances = MessageInstanceModel.query.filter_by(name="Message Response Two", message_type="send").all()
assert len(message_send_instances) == 1
message_send_instance = message_send_instances[0]
assert message_send_instance.status == "ready"
assert message_send_instance.failure_cause is None
message_receive_instances = MessageInstanceModel.query.filter_by(
name="Message Response Two", message_type="receive"
).all()
assert len(message_receive_instances) == 1
message_receive_instance = message_receive_instances[0]
assert message_receive_instance.status == "ready"
assert message_receive_instance.failure_cause is None