run do engine steps for message start events even in async mode to ensure tasks are committed to the db properly w/ burnettk (#1191)

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
jasquat 2024-03-11 21:26:37 +00:00 committed by GitHub
parent f69bcce6ee
commit 816d0607a2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 30 additions and 24 deletions

View File

@ -14,6 +14,27 @@ def queue_enabled_for_process_model(process_instance: ProcessInstanceModel) -> b
return current_app.config["SPIFFWORKFLOW_BACKEND_CELERY_ENABLED"] is True
def should_queue_process_instance(process_instance: ProcessInstanceModel, execution_mode: str | None = None) -> bool:
# check if the enum value is valid
if execution_mode:
ProcessInstanceExecutionMode(execution_mode)
if execution_mode == ProcessInstanceExecutionMode.synchronous.value:
return False
queue_enabled = queue_enabled_for_process_model(process_instance)
if execution_mode == ProcessInstanceExecutionMode.asynchronous.value and not queue_enabled:
raise ApiError(
error_code="async_mode_called_without_celery",
message="Execution mode asynchronous requested but SPIFFWORKFLOW_BACKEND_CELERY_ENABLED is not set to true.",
status_code=400,
)
if queue_enabled:
return True
return False
def queue_future_task_if_appropriate(process_instance: ProcessInstanceModel, eta_in_seconds: float, task_guid: str) -> bool:
if queue_enabled_for_process_model(process_instance):
buffer = 1
@ -36,22 +57,7 @@ def queue_future_task_if_appropriate(process_instance: ProcessInstanceModel, eta
# if waiting, check all waiting tasks and see if theyt are timers. if they are timers, it's not runnable.
def queue_process_instance_if_appropriate(process_instance: ProcessInstanceModel, execution_mode: str | None = None) -> bool:
# check if the enum value is valid
if execution_mode:
ProcessInstanceExecutionMode(execution_mode)
if execution_mode == ProcessInstanceExecutionMode.synchronous.value:
return False
queue_enabled = queue_enabled_for_process_model(process_instance)
if execution_mode == ProcessInstanceExecutionMode.asynchronous.value and not queue_enabled:
raise ApiError(
error_code="async_mode_called_without_celery",
message="Execution mode asynchronous requested but SPIFFWORKFLOW_BACKEND_CELERY_ENABLED is not set to true.",
status_code=400,
)
if queue_enabled:
if should_queue_process_instance(process_instance, execution_mode):
celery.current_app.send_task(CELERY_TASK_PROCESS_INSTANCE_RUN, (process_instance.id,))
return True
return False

View File

@ -8,6 +8,7 @@ from SpiffWorkflow.spiff.specs.event_definitions import MessageEventDefinition
from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import (
queue_process_instance_if_appropriate,
)
from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import should_queue_process_instance
from spiffworkflow_backend.helpers.spiff_enum import ProcessInstanceExecutionMode
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
@ -57,7 +58,6 @@ class MessageService:
for message_instance in available_receive_messages:
if message_instance.correlates(message_instance_send, CustomBpmnScriptEngine()):
message_instance_receive = message_instance
if message_instance_receive is None:
# Check for a message triggerable process and start that to create a new message_instance_receive
message_triggerable_process_model = MessageTriggerableProcessModel.query.filter_by(
@ -67,9 +67,7 @@ class MessageService:
user: UserModel | None = message_instance_send.user
if user is None:
user = UserService.find_or_create_system_user()
receiving_process = MessageService.start_process_with_message(
message_triggerable_process_model, user, execution_mode=execution_mode
)
receiving_process = MessageService.start_process_with_message(message_triggerable_process_model, user)
message_instance_receive = MessageInstanceModel.query.filter_by(
process_instance_id=receiving_process.id,
message_type="receive",
@ -126,7 +124,6 @@ class MessageService:
cls,
message_triggerable_process_model: MessageTriggerableProcessModel,
user: UserModel,
execution_mode: str | None = None,
) -> ProcessInstanceModel:
"""Start up a process instance, so it is ready to catch the event."""
if os.environ.get("SPIFFWORKFLOW_BACKEND_RUNNING_IN_CELERY_WORKER") == "true":
@ -211,9 +208,12 @@ class MessageService:
processor_receive.bpmn_process_instance.send_event(bpmn_event)
execution_strategy_name = None
if not queue_process_instance_if_appropriate(
process_instance_receive, execution_mode=execution_mode
) and not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance_receive):
if should_queue_process_instance(process_instance_receive, execution_mode=execution_mode):
# even if we are queueing, we ran a "send_event" call up above, and it updated some tasks.
# we need to serialize these task updates to the db. do_engine_steps with save does that.
processor_receive.do_engine_steps(save=True, execution_strategy_name="run_current_ready_tasks")
queue_process_instance_if_appropriate(process_instance_receive, execution_mode=execution_mode)
elif not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance_receive):
execution_strategy_name = None
if execution_mode == ProcessInstanceExecutionMode.synchronous.value:
execution_strategy_name = "greedy"