diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py index f9729604..37a7840c 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py @@ -14,6 +14,27 @@ def queue_enabled_for_process_model(process_instance: ProcessInstanceModel) -> b return current_app.config["SPIFFWORKFLOW_BACKEND_CELERY_ENABLED"] is True +def should_queue_process_instance(process_instance: ProcessInstanceModel, execution_mode: str | None = None) -> bool: + # check if the enum value is valid + if execution_mode: + ProcessInstanceExecutionMode(execution_mode) + + if execution_mode == ProcessInstanceExecutionMode.synchronous.value: + return False + + queue_enabled = queue_enabled_for_process_model(process_instance) + if execution_mode == ProcessInstanceExecutionMode.asynchronous.value and not queue_enabled: + raise ApiError( + error_code="async_mode_called_without_celery", + message="Execution mode asynchronous requested but SPIFFWORKFLOW_BACKEND_CELERY_ENABLED is not set to true.", + status_code=400, + ) + + if queue_enabled: + return True + return False + + def queue_future_task_if_appropriate(process_instance: ProcessInstanceModel, eta_in_seconds: float, task_guid: str) -> bool: if queue_enabled_for_process_model(process_instance): buffer = 1 @@ -36,22 +57,7 @@ def queue_future_task_if_appropriate(process_instance: ProcessInstanceModel, eta # if waiting, check all waiting tasks and see if theyt are timers. if they are timers, it's not runnable. def queue_process_instance_if_appropriate(process_instance: ProcessInstanceModel, execution_mode: str | None = None) -> bool: - # check if the enum value is valid - if execution_mode: - ProcessInstanceExecutionMode(execution_mode) - - if execution_mode == ProcessInstanceExecutionMode.synchronous.value: - return False - - queue_enabled = queue_enabled_for_process_model(process_instance) - if execution_mode == ProcessInstanceExecutionMode.asynchronous.value and not queue_enabled: - raise ApiError( - error_code="async_mode_called_without_celery", - message="Execution mode asynchronous requested but SPIFFWORKFLOW_BACKEND_CELERY_ENABLED is not set to true.", - status_code=400, - ) - - if queue_enabled: + if should_queue_process_instance(process_instance, execution_mode): celery.current_app.send_task(CELERY_TASK_PROCESS_INSTANCE_RUN, (process_instance.id,)) return True return False diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py index f0219e06..7fa2bbd0 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py @@ -8,6 +8,7 @@ from SpiffWorkflow.spiff.specs.event_definitions import MessageEventDefinition from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import ( queue_process_instance_if_appropriate, ) +from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import should_queue_process_instance from spiffworkflow_backend.helpers.spiff_enum import ProcessInstanceExecutionMode from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.message_instance import MessageInstanceModel @@ -57,7 +58,6 @@ class MessageService: for message_instance in available_receive_messages: if message_instance.correlates(message_instance_send, CustomBpmnScriptEngine()): message_instance_receive = message_instance - if message_instance_receive is None: # Check for a message triggerable process and start that to create a new message_instance_receive message_triggerable_process_model = MessageTriggerableProcessModel.query.filter_by( @@ -67,9 +67,7 @@ class MessageService: user: UserModel | None = message_instance_send.user if user is None: user = UserService.find_or_create_system_user() - receiving_process = MessageService.start_process_with_message( - message_triggerable_process_model, user, execution_mode=execution_mode - ) + receiving_process = MessageService.start_process_with_message(message_triggerable_process_model, user) message_instance_receive = MessageInstanceModel.query.filter_by( process_instance_id=receiving_process.id, message_type="receive", @@ -126,7 +124,6 @@ class MessageService: cls, message_triggerable_process_model: MessageTriggerableProcessModel, user: UserModel, - execution_mode: str | None = None, ) -> ProcessInstanceModel: """Start up a process instance, so it is ready to catch the event.""" if os.environ.get("SPIFFWORKFLOW_BACKEND_RUNNING_IN_CELERY_WORKER") == "true": @@ -211,9 +208,12 @@ class MessageService: processor_receive.bpmn_process_instance.send_event(bpmn_event) execution_strategy_name = None - if not queue_process_instance_if_appropriate( - process_instance_receive, execution_mode=execution_mode - ) and not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance_receive): + if should_queue_process_instance(process_instance_receive, execution_mode=execution_mode): + # even if we are queueing, we ran a "send_event" call up above, and it updated some tasks. + # we need to serialize these task updates to the db. do_engine_steps with save does that. + processor_receive.do_engine_steps(save=True, execution_strategy_name="run_current_ready_tasks") + queue_process_instance_if_appropriate(process_instance_receive, execution_mode=execution_mode) + elif not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance_receive): execution_strategy_name = None if execution_mode == ProcessInstanceExecutionMode.synchronous.value: execution_strategy_name = "greedy"