From f508722b5be21e89dc8cc0fd149f21887dd22a6c Mon Sep 17 00:00:00 2001 From: jasquat <2487833+jasquat@users.noreply.github.com> Date: Wed, 27 Mar 2024 18:06:12 +0000 Subject: [PATCH] do not queue a process instance for celery if the instance is currently locked w/ burnettk (#1288) Co-authored-by: jasquat --- .../celery_tasks/process_instance_task.py | 2 +- .../process_instance_task_producer.py | 10 +++++ .../spiffworkflow_backend/exceptions/error.py | 4 ++ .../routes/process_api_blueprint.py | 17 ++++---- .../services/message_service.py | 41 ++++++++++--------- .../process_instance_queue_service.py | 7 ++-- .../services/process_instance_service.py | 24 ++++++----- 7 files changed, 62 insertions(+), 43 deletions(-) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py index 3484790d9..e86c6aa1a 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py @@ -27,7 +27,7 @@ def celery_task_process_instance_run(process_instance_id: int, task_guid: str | ProcessInstanceService.run_process_instance_with_processor( process_instance, execution_strategy_name="run_current_ready_tasks", additional_processing_identifier=proc_index ) - processor, task_runnability = ProcessInstanceService.run_process_instance_with_processor( + _processor, task_runnability = ProcessInstanceService.run_process_instance_with_processor( process_instance, execution_strategy_name="queue_instructions_for_end_user", additional_processing_identifier=proc_index, diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py index 37a7840ca..e16082a3d 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py @@ -5,8 +5,10 @@ from flask import current_app from spiffworkflow_backend.background_processing import CELERY_TASK_PROCESS_INSTANCE_RUN from spiffworkflow_backend.exceptions.api_error import ApiError +from spiffworkflow_backend.exceptions.error import PublishingAttemptWhileLockedError from spiffworkflow_backend.helpers.spiff_enum import ProcessInstanceExecutionMode from spiffworkflow_backend.models.process_instance import ProcessInstanceModel +from spiffworkflow_backend.services.process_instance_lock_service import ProcessInstanceLockService def queue_enabled_for_process_model(process_instance: ProcessInstanceModel) -> bool: @@ -57,6 +59,14 @@ def queue_future_task_if_appropriate(process_instance: ProcessInstanceModel, eta # if waiting, check all waiting tasks and see if theyt are timers. if they are timers, it's not runnable. def queue_process_instance_if_appropriate(process_instance: ProcessInstanceModel, execution_mode: str | None = None) -> bool: + # ideally, if this code were run from the backgrond processing celery worker, + # we should be passing in the additional processing identifier, + # but we don't have it, so basically this assertion won't help there. at least it will help find issues with non-celery code. + if ProcessInstanceLockService.has_lock(process_instance_id=process_instance.id): + raise PublishingAttemptWhileLockedError( + f"Attempted to queue task for process instance {process_instance.id} while the process already has it locked. This" + " can lead to further locking issues." + ) if should_queue_process_instance(process_instance, execution_mode): celery.current_app.send_task(CELERY_TASK_PROCESS_INSTANCE_RUN, (process_instance.id,)) return True diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/exceptions/error.py b/spiffworkflow-backend/src/spiffworkflow_backend/exceptions/error.py index 61a391431..c9107f1ef 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/exceptions/error.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/exceptions/error.py @@ -59,3 +59,7 @@ class InvalidRedirectUrlError(Exception): class TaskMismatchError(Exception): pass + + +class PublishingAttemptWhileLockedError(Exception): + pass diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_api_blueprint.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_api_blueprint.py index cb94aa168..249be47df 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_api_blueprint.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_api_blueprint.py @@ -487,15 +487,14 @@ def _task_submit_shared( ) with sentry_sdk.start_span(op="task", description="complete_form_task"): - with ProcessInstanceQueueService.dequeued(process_instance, max_attempts=3): - ProcessInstanceService.complete_form_task( - processor=processor, - spiff_task=spiff_task, - data=body, - user=g.user, - human_task=human_task, - execution_mode=execution_mode, - ) + ProcessInstanceService.complete_form_task( + processor=processor, + spiff_task=spiff_task, + data=body, + user=g.user, + human_task=human_task, + execution_mode=execution_mode, + ) # currently task_model has the potential to be None. This should be removable once # we backfill the human_task table for task_guid and make that column not nullable diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py index fef97417f..f0d95082e 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py @@ -71,17 +71,19 @@ class MessageService: user: UserModel | None = message_instance_send.user if user is None: user = UserService.find_or_create_system_user() - receiving_process = MessageService.start_process_with_message(message_triggerable_process_model, user) + receiving_process_instance = MessageService.start_process_with_message( + message_triggerable_process_model, user + ) message_instance_receive = MessageInstanceModel.query.filter_by( - process_instance_id=receiving_process.id, + process_instance_id=receiving_process_instance.id, message_type="receive", status="ready", ).first() else: - receiving_process = MessageService.get_process_instance_for_message_instance(message_instance_receive) + receiving_process_instance = MessageService.get_process_instance_for_message_instance(message_instance_receive) # Assure we can send the message, otherwise keep going. - if message_instance_receive is None or not receiving_process.can_receive_message(): + if message_instance_receive is None or not receiving_process_instance.can_receive_message(): message_instance_send.status = "ready" db.session.add(message_instance_send) db.session.commit() @@ -90,12 +92,12 @@ class MessageService: try: # currently only controllers and apscheduler call this cls.raise_if_running_in_celery("correlate_send_message") - with ProcessInstanceQueueService.dequeued(receiving_process): + with ProcessInstanceQueueService.dequeued(receiving_process_instance): # Set the receiving message to running, so it is not altered elswhere ... message_instance_receive.status = "running" cls.process_message_receive( - receiving_process, message_instance_receive, message_instance_send, execution_mode=execution_mode + receiving_process_instance, message_instance_receive, message_instance_send, execution_mode=execution_mode ) message_instance_receive.status = "completed" message_instance_receive.counterpart_id = message_instance_send.id @@ -104,7 +106,9 @@ class MessageService: message_instance_send.counterpart_id = message_instance_receive.id db.session.add(message_instance_send) db.session.commit() - return message_instance_receive + if should_queue_process_instance(receiving_process_instance, execution_mode=execution_mode): + queue_process_instance_if_appropriate(receiving_process_instance, execution_mode=execution_mode) + return message_instance_receive except ProcessInstanceIsAlreadyLockedError: message_instance_send.status = "ready" @@ -140,22 +144,22 @@ class MessageService: ) -> ProcessInstanceModel: """Start up a process instance, so it is ready to catch the event.""" cls.raise_if_running_in_celery("start_process_with_message") - process_instance_receive = ProcessInstanceService.create_process_instance_from_process_model_identifier( + receiving_process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier( message_triggerable_process_model.process_model_identifier, user, ) - with ProcessInstanceQueueService.dequeued(process_instance_receive): - processor_receive = ProcessInstanceProcessor(process_instance_receive) + with ProcessInstanceQueueService.dequeued(receiving_process_instance): + processor_receive = ProcessInstanceProcessor(receiving_process_instance) cls._cancel_non_matching_start_events(processor_receive, message_triggerable_process_model) processor_receive.save() processor_receive.do_engine_steps(save=True) - return process_instance_receive + return receiving_process_instance @staticmethod def process_message_receive( - process_instance_receive: ProcessInstanceModel, + receiving_process_instance: ProcessInstanceModel, message_instance_receive: MessageInstanceModel, message_instance_send: MessageInstanceModel, execution_mode: str | None = None, @@ -178,16 +182,15 @@ class MessageService: payload=message_instance_send.payload, correlations=message_instance_send.correlation_keys, ) - processor_receive = ProcessInstanceProcessor(process_instance_receive) + processor_receive = ProcessInstanceProcessor(receiving_process_instance) processor_receive.bpmn_process_instance.send_event(bpmn_event) execution_strategy_name = None - if should_queue_process_instance(process_instance_receive, execution_mode=execution_mode): + if should_queue_process_instance(receiving_process_instance, execution_mode=execution_mode): # even if we are queueing, we ran a "send_event" call up above, and it updated some tasks. # we need to serialize these task updates to the db. do_engine_steps with save does that. processor_receive.do_engine_steps(save=True, execution_strategy_name="run_current_ready_tasks") - queue_process_instance_if_appropriate(process_instance_receive, execution_mode=execution_mode) - elif not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance_receive): + elif not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(receiving_process_instance): execution_strategy_name = None if execution_mode == ProcessInstanceExecutionMode.synchronous.value: execution_strategy_name = "greedy" @@ -291,10 +294,10 @@ class MessageService: def get_process_instance_for_message_instance( message_instance_receive: MessageInstanceModel, ) -> ProcessInstanceModel: - process_instance_receive: ProcessInstanceModel = ProcessInstanceModel.query.filter_by( + receiving_process_instance: ProcessInstanceModel = ProcessInstanceModel.query.filter_by( id=message_instance_receive.process_instance_id ).first() - if process_instance_receive is None: + if receiving_process_instance is None: raise MessageServiceError( ( ( @@ -304,7 +307,7 @@ class MessageService: ), ) ) - return process_instance_receive + return receiving_process_instance @classmethod def raise_if_running_in_celery(cls, method_name: str) -> None: diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py index 5616e9b7b..1fe2d1cb4 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py @@ -66,7 +66,6 @@ class ProcessInstanceQueueService: "locked_at_in_seconds": current_time, } ) - db.session.commit() queue_entry = ( @@ -83,9 +82,11 @@ class ProcessInstanceQueueService: ) if queue_entry.locked_by != locked_by: + message = f"It has already been locked by {queue_entry.locked_by}." + if queue_entry.locked_by is None: + message = "It was locked by something else when we tried to lock it in the db, but it has since been unlocked." raise ProcessInstanceIsAlreadyLockedError( - f"{locked_by} cannot lock process instance {process_instance.id}. " - f"It has already been locked by {queue_entry.locked_by}." + f"{locked_by} cannot lock process instance {process_instance.id}. {message}" ) ProcessInstanceLockService.lock( diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py index 6c9bfd559..33374657a 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py @@ -477,26 +477,28 @@ class ProcessInstanceService: Abstracted here because we need to do it multiple times when completing all tasks in a multi-instance task. """ - ProcessInstanceService.update_form_task_data(processor.process_instance_model, spiff_task, data, user) - # ProcessInstanceService.post_process_form(spiff_task) # some properties may update the data store. - processor.complete_task(spiff_task, human_task, user=user) + with ProcessInstanceQueueService.dequeued(processor.process_instance_model, max_attempts=3): + ProcessInstanceService.update_form_task_data(processor.process_instance_model, spiff_task, data, user) + processor.complete_task(spiff_task, human_task, user=user) if queue_process_instance_if_appropriate(processor.process_instance_model, execution_mode): return - elif not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(processor.process_instance_model): - with sentry_sdk.start_span(op="task", description="backend_do_engine_steps"): - execution_strategy_name = None - if execution_mode == ProcessInstanceExecutionMode.synchronous.value: - execution_strategy_name = "greedy" + else: + with ProcessInstanceQueueService.dequeued(processor.process_instance_model, max_attempts=3): + if not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(processor.process_instance_model): + with sentry_sdk.start_span(op="task", description="backend_do_engine_steps"): + execution_strategy_name = None + if execution_mode == ProcessInstanceExecutionMode.synchronous.value: + execution_strategy_name = "greedy" - # maybe move this out once we have the interstitial page since this is here just so we can get the next human task - processor.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name) + # maybe move this out once we have the interstitial page since this is + # here just so we can get the next human task + processor.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name) @staticmethod def spiff_task_to_api_task( processor: ProcessInstanceProcessor, spiff_task: SpiffTask, - add_docs_and_forms: bool = False, ) -> Task: task_type = spiff_task.task_spec.description task_guid = str(spiff_task.id)