From 6bb24e51bf84f32b28f903f91e4e1ee38d1bf75a Mon Sep 17 00:00:00 2001 From: jasquat <2487833+jasquat@users.noreply.github.com> Date: Tue, 7 May 2024 21:15:28 +0000 Subject: [PATCH] queue the pi in the future if it is locked w/ burnettk (#1513) Co-authored-by: jasquat --- .../celery_tasks/process_instance_task.py | 5 +++++ .../celery_tasks/process_instance_task_producer.py | 4 +++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py index cd6b8476..06d701fb 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py @@ -2,6 +2,9 @@ from billiard import current_process # type: ignore from celery import shared_task from flask import current_app +from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import ( + queue_future_task_if_appropriate, +) from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import ( queue_process_instance_if_appropriate, ) @@ -71,6 +74,8 @@ def celery_task_process_instance_run(process_instance_id: int, task_guid: str | f"Could not run process instance with worker: {current_app.config['PROCESS_UUID']} - {proc_index}. Error was:" f" {str(exception)}" ) + # NOTE: consider exponential backoff + queue_future_task_if_appropriate(process_instance, eta_in_seconds=10, task_guid=task_guid) return {"ok": False, "process_instance_id": process_instance_id, "task_guid": task_guid, "exception": str(exception)} except Exception as exception: db.session.rollback() # in case the above left the database with a bad transaction diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py index a774179d..6ffc1e3c 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py @@ -35,7 +35,9 @@ def should_queue_process_instance(process_instance: ProcessInstanceModel, execut return False -def queue_future_task_if_appropriate(process_instance: ProcessInstanceModel, eta_in_seconds: float, task_guid: str) -> bool: +def queue_future_task_if_appropriate( + process_instance: ProcessInstanceModel, eta_in_seconds: float, task_guid: str | None = None +) -> bool: if queue_enabled_for_process_model(process_instance): buffer = 1 countdown = eta_in_seconds - time.time() + buffer