From 7ae81f47cffb34413b2cd66f55d73ee5d8e63176 Mon Sep 17 00:00:00 2001 From: jasquat <2487833+jasquat@users.noreply.github.com> Date: Thu, 4 Apr 2024 20:24:58 +0000 Subject: [PATCH] Celery start timer fix (#1337) * check if pi is enqueued to run in the future before running it in celery w/ burnettk * only mark future tasks as complete if the corresponding task is complete w/ burnettk --------- Co-authored-by: jasquat --- .../celery_tasks/process_instance_task.py | 30 +++++++++++++++---- .../process_instance_task_producer.py | 6 ++-- .../routes/process_instances_controller.py | 2 +- .../routes/tasks_controller.py | 6 ++-- .../services/message_service.py | 3 +- .../process_instance_queue_service.py | 14 --------- .../services/process_instance_service.py | 3 +- .../services/process_instance_tmp_service.py | 15 ++++++++++ 8 files changed, 52 insertions(+), 27 deletions(-) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py index d25190b97..cd6b8476b 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py @@ -8,10 +8,12 @@ from spiffworkflow_backend.background_processing.celery_tasks.process_instance_t from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.future_task import FutureTaskModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel +from spiffworkflow_backend.models.task import TaskModel # noqa: F401 from spiffworkflow_backend.services.process_instance_lock_service import ProcessInstanceLockService from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceIsAlreadyLockedError from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService +from spiffworkflow_backend.services.process_instance_tmp_service import ProcessInstanceTmpService from spiffworkflow_backend.services.workflow_execution_service import TaskRunnability ten_minutes = 60 * 10 @@ -26,7 +28,16 @@ def celery_task_process_instance_run(process_instance_id: int, task_guid: str | proc_index = current_process().index ProcessInstanceLockService.set_thread_local_locking_context("celery:worker", additional_processing_identifier=proc_index) process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first() + + if task_guid is None and ProcessInstanceTmpService.is_enqueued_to_run_in_the_future(process_instance): + return { + "ok": True, + "process_instance_id": process_instance_id, + "task_guid": task_guid, + "message": "Skipped because the process instance is set to run in the future.", + } try: + task_guid_for_requeueing = task_guid with ProcessInstanceQueueService.dequeued(process_instance, additional_processing_identifier=proc_index): ProcessInstanceService.run_process_instance_with_processor( process_instance, execution_strategy_name="run_current_ready_tasks", additional_processing_identifier=proc_index @@ -40,13 +51,20 @@ def celery_task_process_instance_run(process_instance_id: int, task_guid: str | # there is an assumption that it was successfully processed by run_process_instance_with_processor above. # we might want to check that assumption. if task_guid is not None: - future_task = FutureTaskModel.query.filter_by(completed=False, guid=task_guid).first() - if future_task is not None: - future_task.completed = True - db.session.add(future_task) - db.session.commit() + completed_task_model = ( + TaskModel.query.filter_by(guid=task_guid) + .filter(TaskModel.state.in_(["COMPLETED", "ERROR", "CANCELLED"])) # type: ignore + .first() + ) + if completed_task_model is not None: + future_task = FutureTaskModel.query.filter_by(completed=False, guid=task_guid).first() + if future_task is not None: + future_task.completed = True + db.session.add(future_task) + db.session.commit() + task_guid_for_requeueing = None if task_runnability == TaskRunnability.has_ready_tasks: - queue_process_instance_if_appropriate(process_instance) + queue_process_instance_if_appropriate(process_instance, task_guid=task_guid_for_requeueing) return {"ok": True, "process_instance_id": process_instance_id, "task_guid": task_guid} except ProcessInstanceIsAlreadyLockedError as exception: current_app.logger.info( diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py index 7d54e1ff8..a774179d9 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py @@ -57,7 +57,9 @@ def queue_future_task_if_appropriate(process_instance: ProcessInstanceModel, eta # if waiting, check all waiting tasks and see if theyt are timers. if they are timers, it's not runnable. -def queue_process_instance_if_appropriate(process_instance: ProcessInstanceModel, execution_mode: str | None = None) -> bool: +def queue_process_instance_if_appropriate( + process_instance: ProcessInstanceModel, execution_mode: str | None = None, task_guid: str | None = None +) -> bool: # FIXME: we should only run this check if we are NOT in a celery worker # # # ideally, if this code were run from the backgrond processing celery worker, @@ -71,7 +73,7 @@ def queue_process_instance_if_appropriate(process_instance: ProcessInstanceModel # ) if should_queue_process_instance(process_instance, execution_mode): - async_result = celery.current_app.send_task(CELERY_TASK_PROCESS_INSTANCE_RUN, (process_instance.id,)) + async_result = celery.current_app.send_task(CELERY_TASK_PROCESS_INSTANCE_RUN, (process_instance.id, task_guid)) current_app.logger.info(f"Queueing process instance ({process_instance.id}) for celery ({async_result.task_id})") return True return False diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py index dd0df0481..6175f3273 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py @@ -662,7 +662,7 @@ def _process_instance_run( ProcessInstanceTmpService.add_event_to_process_instance(process_instance, "process_instance_force_run") if not queue_process_instance_if_appropriate( process_instance, execution_mode=execution_mode - ) and not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance): + ) and not ProcessInstanceTmpService.is_enqueued_to_run_in_the_future(process_instance): execution_strategy_name = None if execution_mode == ProcessInstanceExecutionMode.synchronous.value: execution_strategy_name = "greedy" diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py index eded01c11..523c85981 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py @@ -455,7 +455,9 @@ def process_instance_progress( if next_human_task_assigned_to_me: response["task"] = HumanTaskModel.to_task(next_human_task_assigned_to_me) # this may not catch all times we should redirect to instance show page - elif not process_instance.is_immediately_runnable(): + elif not process_instance.is_immediately_runnable() or ProcessInstanceTmpService.is_enqueued_to_run_in_the_future( + process_instance + ): # any time we assign this process_instance, the frontend progress page will redirect to process instance show response["process_instance"] = process_instance @@ -615,7 +617,7 @@ def _dequeued_interstitial_stream( # need something better to show? if execute_tasks: try: - if not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance): + if not ProcessInstanceTmpService.is_enqueued_to_run_in_the_future(process_instance): with ProcessInstanceQueueService.dequeued(process_instance): ProcessInstanceMigrator.run(process_instance) yield from _interstitial_stream(process_instance, execute_tasks=execute_tasks) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py index f0d95082e..461c42d58 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py @@ -25,6 +25,7 @@ from spiffworkflow_backend.services.process_instance_processor import ProcessIns from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceIsAlreadyLockedError from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService +from spiffworkflow_backend.services.process_instance_tmp_service import ProcessInstanceTmpService from spiffworkflow_backend.services.user_service import UserService @@ -190,7 +191,7 @@ class MessageService: # even if we are queueing, we ran a "send_event" call up above, and it updated some tasks. # we need to serialize these task updates to the db. do_engine_steps with save does that. processor_receive.do_engine_steps(save=True, execution_strategy_name="run_current_ready_tasks") - elif not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(receiving_process_instance): + elif not ProcessInstanceTmpService.is_enqueued_to_run_in_the_future(receiving_process_instance): execution_strategy_name = None if execution_mode == ProcessInstanceExecutionMode.synchronous.value: execution_strategy_name = "greedy" diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py index 1fe2d1cb4..d5ecb3440 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py @@ -174,17 +174,3 @@ class ProcessInstanceQueueService: queue_entries = cls.entries_with_status(status_value, None, run_at_in_seconds_threshold, min_age_in_seconds) ids_with_status = [entry.process_instance_id for entry in queue_entries] return ids_with_status - - @staticmethod - def is_enqueued_to_run_in_the_future(process_instance: ProcessInstanceModel) -> bool: - queue_entry = ( - db.session.query(ProcessInstanceQueueModel) - .filter(ProcessInstanceQueueModel.process_instance_id == process_instance.id) - .first() - ) - - if queue_entry is None: - return False - - current_time = round(time.time()) - return queue_entry.run_at_in_seconds > current_time diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py index 0950d24c5..3ea5deebb 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py @@ -49,6 +49,7 @@ from spiffworkflow_backend.services.process_instance_processor import ProcessIns from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceIsAlreadyLockedError from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceIsNotEnqueuedError from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService +from spiffworkflow_backend.services.process_instance_tmp_service import ProcessInstanceTmpService from spiffworkflow_backend.services.process_model_service import ProcessModelService from spiffworkflow_backend.services.workflow_execution_service import TaskRunnability from spiffworkflow_backend.services.workflow_execution_service import WorkflowExecutionServiceError @@ -480,7 +481,7 @@ class ProcessInstanceService: # the caller needs to handle the actual queueing of the process instance for better dequeueing ability if not should_queue_process_instance(processor.process_instance_model, execution_mode): - if not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(processor.process_instance_model): + if not ProcessInstanceTmpService.is_enqueued_to_run_in_the_future(processor.process_instance_model): with sentry_sdk.start_span(op="task", description="backend_do_engine_steps"): execution_strategy_name = None if execution_mode == ProcessInstanceExecutionMode.synchronous.value: diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_tmp_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_tmp_service.py index b59dc3ba2..b9dab35da 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_tmp_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_tmp_service.py @@ -8,6 +8,7 @@ from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance_error_detail import ProcessInstanceErrorDetailModel from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventModel +from spiffworkflow_backend.models.process_instance_queue import ProcessInstanceQueueModel class ProcessInstanceTmpService: @@ -76,3 +77,17 @@ class ProcessInstanceTmpService: if add_to_db_session: db.session.add(process_instance_error_detail) return (process_instance_event, process_instance_error_detail) + + @staticmethod + def is_enqueued_to_run_in_the_future(process_instance: ProcessInstanceModel) -> bool: + queue_entry = ( + db.session.query(ProcessInstanceQueueModel) + .filter(ProcessInstanceQueueModel.process_instance_id == process_instance.id) + .first() + ) + + if queue_entry is None: + return False + + current_time = round(time.time()) + return queue_entry.run_at_in_seconds > current_time