queue the pi in the future if it is locked w/ burnettk (#1513)

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
jasquat 2024-05-07 21:15:28 +00:00 committed by GitHub
parent a7370cd293
commit 6bb24e51bf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 8 additions and 1 deletions

View File

@ -2,6 +2,9 @@ from billiard import current_process # type: ignore
from celery import shared_task from celery import shared_task
from flask import current_app from flask import current_app
from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import (
queue_future_task_if_appropriate,
)
from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import ( from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import (
queue_process_instance_if_appropriate, queue_process_instance_if_appropriate,
) )
@ -71,6 +74,8 @@ def celery_task_process_instance_run(process_instance_id: int, task_guid: str |
f"Could not run process instance with worker: {current_app.config['PROCESS_UUID']} - {proc_index}. Error was:" f"Could not run process instance with worker: {current_app.config['PROCESS_UUID']} - {proc_index}. Error was:"
f" {str(exception)}" f" {str(exception)}"
) )
# NOTE: consider exponential backoff
queue_future_task_if_appropriate(process_instance, eta_in_seconds=10, task_guid=task_guid)
return {"ok": False, "process_instance_id": process_instance_id, "task_guid": task_guid, "exception": str(exception)} return {"ok": False, "process_instance_id": process_instance_id, "task_guid": task_guid, "exception": str(exception)}
except Exception as exception: except Exception as exception:
db.session.rollback() # in case the above left the database with a bad transaction db.session.rollback() # in case the above left the database with a bad transaction

View File

@ -35,7 +35,9 @@ def should_queue_process_instance(process_instance: ProcessInstanceModel, execut
return False return False
def queue_future_task_if_appropriate(process_instance: ProcessInstanceModel, eta_in_seconds: float, task_guid: str) -> bool: def queue_future_task_if_appropriate(
process_instance: ProcessInstanceModel, eta_in_seconds: float, task_guid: str | None = None
) -> bool:
if queue_enabled_for_process_model(process_instance): if queue_enabled_for_process_model(process_instance):
buffer = 1 buffer = 1
countdown = eta_in_seconds - time.time() + buffer countdown = eta_in_seconds - time.time() + buffer