From f24cdecf6de7a6e3567aac6e8be775e19255ddab Mon Sep 17 00:00:00 2001 From: jasquat Date: Fri, 29 Mar 2024 13:56:26 -0400 Subject: [PATCH] add the pi id and task guid to errors in celery worker w/ burnettk --- .../celery_tasks/process_instance_task.py | 11 +++++++--- .../process_instance_task_producer.py | 22 ++++++++++--------- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py index e86c6aa1a..4c30ffbdd 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py @@ -17,6 +17,10 @@ from spiffworkflow_backend.services.workflow_execution_service import TaskRunnab ten_minutes = 60 * 10 +class SpiffCeleryWorkerError(Exception): + pass + + @shared_task(ignore_result=False, time_limit=ten_minutes) def celery_task_process_instance_run(process_instance_id: int, task_guid: str | None = None) -> dict: proc_index = current_process().index @@ -47,12 +51,13 @@ def celery_task_process_instance_run(process_instance_id: int, task_guid: str | f" {str(exception)}" ) return {"ok": False, "process_instance_id": process_instance_id, "task_guid": task_guid, "exception": str(exception)} - except Exception as e: + except Exception as exception: db.session.rollback() # in case the above left the database with a bad transaction error_message = ( - f"Error running process_instance {process_instance.id}" + f"({process_instance.process_model_identifier}). {str(e)}" + f"Error running process_instance {process_instance.id} " + + f"({process_instance.process_model_identifier}) and task_guid {task_guid}. {str(exception)}" ) current_app.logger.error(error_message) db.session.add(process_instance) db.session.commit() - raise e + raise SpiffCeleryWorkerError(error_message) from exception diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py index e16082a3d..80cd519b1 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py @@ -5,10 +5,8 @@ from flask import current_app from spiffworkflow_backend.background_processing import CELERY_TASK_PROCESS_INSTANCE_RUN from spiffworkflow_backend.exceptions.api_error import ApiError -from spiffworkflow_backend.exceptions.error import PublishingAttemptWhileLockedError from spiffworkflow_backend.helpers.spiff_enum import ProcessInstanceExecutionMode from spiffworkflow_backend.models.process_instance import ProcessInstanceModel -from spiffworkflow_backend.services.process_instance_lock_service import ProcessInstanceLockService def queue_enabled_for_process_model(process_instance: ProcessInstanceModel) -> bool: @@ -59,14 +57,18 @@ def queue_future_task_if_appropriate(process_instance: ProcessInstanceModel, eta # if waiting, check all waiting tasks and see if theyt are timers. if they are timers, it's not runnable. def queue_process_instance_if_appropriate(process_instance: ProcessInstanceModel, execution_mode: str | None = None) -> bool: - # ideally, if this code were run from the backgrond processing celery worker, - # we should be passing in the additional processing identifier, - # but we don't have it, so basically this assertion won't help there. at least it will help find issues with non-celery code. - if ProcessInstanceLockService.has_lock(process_instance_id=process_instance.id): - raise PublishingAttemptWhileLockedError( - f"Attempted to queue task for process instance {process_instance.id} while the process already has it locked. This" - " can lead to further locking issues." - ) + # FIXME: we should only run this check if we are NOT in a celery worker + # + # # ideally, if this code were run from the backgrond processing celery worker, + # # we should be passing in the additional processing identifier, + # # but we don't have it, so basically this assertion won't help there. + # # at least it will help find issues with non-celery code. + # if ProcessInstanceLockService.has_lock(process_instance_id=process_instance.id): + # raise PublishingAttemptWhileLockedError( + # f"Attempted to queue task for process instance {process_instance.id} while the process already has it locked. This" + # " can lead to further locking issues." + # ) + if should_queue_process_instance(process_instance, execution_mode): celery.current_app.send_task(CELERY_TASK_PROCESS_INSTANCE_RUN, (process_instance.id,)) return True