add the pi id and task guid to errors in celery worker w/ burnettk

This commit is contained in:
jasquat 2024-03-29 13:56:26 -04:00
parent ef945c458c
commit f24cdecf6d
No known key found for this signature in database
2 changed files with 20 additions and 13 deletions

View File

@ -17,6 +17,10 @@ from spiffworkflow_backend.services.workflow_execution_service import TaskRunnab
ten_minutes = 60 * 10 ten_minutes = 60 * 10
class SpiffCeleryWorkerError(Exception):
pass
@shared_task(ignore_result=False, time_limit=ten_minutes) @shared_task(ignore_result=False, time_limit=ten_minutes)
def celery_task_process_instance_run(process_instance_id: int, task_guid: str | None = None) -> dict: def celery_task_process_instance_run(process_instance_id: int, task_guid: str | None = None) -> dict:
proc_index = current_process().index proc_index = current_process().index
@ -47,12 +51,13 @@ def celery_task_process_instance_run(process_instance_id: int, task_guid: str |
f" {str(exception)}" f" {str(exception)}"
) )
return {"ok": False, "process_instance_id": process_instance_id, "task_guid": task_guid, "exception": str(exception)} return {"ok": False, "process_instance_id": process_instance_id, "task_guid": task_guid, "exception": str(exception)}
except Exception as e: except Exception as exception:
db.session.rollback() # in case the above left the database with a bad transaction db.session.rollback() # in case the above left the database with a bad transaction
error_message = ( error_message = (
f"Error running process_instance {process_instance.id}" + f"({process_instance.process_model_identifier}). {str(e)}" f"Error running process_instance {process_instance.id} "
+ f"({process_instance.process_model_identifier}) and task_guid {task_guid}. {str(exception)}"
) )
current_app.logger.error(error_message) current_app.logger.error(error_message)
db.session.add(process_instance) db.session.add(process_instance)
db.session.commit() db.session.commit()
raise e raise SpiffCeleryWorkerError(error_message) from exception

View File

@ -5,10 +5,8 @@ from flask import current_app
from spiffworkflow_backend.background_processing import CELERY_TASK_PROCESS_INSTANCE_RUN from spiffworkflow_backend.background_processing import CELERY_TASK_PROCESS_INSTANCE_RUN
from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.exceptions.error import PublishingAttemptWhileLockedError
from spiffworkflow_backend.helpers.spiff_enum import ProcessInstanceExecutionMode from spiffworkflow_backend.helpers.spiff_enum import ProcessInstanceExecutionMode
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.services.process_instance_lock_service import ProcessInstanceLockService
def queue_enabled_for_process_model(process_instance: ProcessInstanceModel) -> bool: def queue_enabled_for_process_model(process_instance: ProcessInstanceModel) -> bool:
@ -59,14 +57,18 @@ def queue_future_task_if_appropriate(process_instance: ProcessInstanceModel, eta
# if waiting, check all waiting tasks and see if theyt are timers. if they are timers, it's not runnable. # if waiting, check all waiting tasks and see if theyt are timers. if they are timers, it's not runnable.
def queue_process_instance_if_appropriate(process_instance: ProcessInstanceModel, execution_mode: str | None = None) -> bool: def queue_process_instance_if_appropriate(process_instance: ProcessInstanceModel, execution_mode: str | None = None) -> bool:
# ideally, if this code were run from the backgrond processing celery worker, # FIXME: we should only run this check if we are NOT in a celery worker
# we should be passing in the additional processing identifier, #
# but we don't have it, so basically this assertion won't help there. at least it will help find issues with non-celery code. # # ideally, if this code were run from the backgrond processing celery worker,
if ProcessInstanceLockService.has_lock(process_instance_id=process_instance.id): # # we should be passing in the additional processing identifier,
raise PublishingAttemptWhileLockedError( # # but we don't have it, so basically this assertion won't help there.
f"Attempted to queue task for process instance {process_instance.id} while the process already has it locked. This" # # at least it will help find issues with non-celery code.
" can lead to further locking issues." # if ProcessInstanceLockService.has_lock(process_instance_id=process_instance.id):
) # raise PublishingAttemptWhileLockedError(
# f"Attempted to queue task for process instance {process_instance.id} while the process already has it locked. This"
# " can lead to further locking issues."
# )
if should_queue_process_instance(process_instance, execution_mode): if should_queue_process_instance(process_instance, execution_mode):
celery.current_app.send_task(CELERY_TASK_PROCESS_INSTANCE_RUN, (process_instance.id,)) celery.current_app.send_task(CELERY_TASK_PROCESS_INSTANCE_RUN, (process_instance.id,))
return True return True