diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py index cd6b8476b..047d93cdc 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py @@ -1,7 +1,11 @@ +from typing import Any from billiard import current_process # type: ignore from celery import shared_task from flask import current_app +from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import ( + queue_future_task_if_appropriate, +) from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import ( queue_process_instance_if_appropriate, ) @@ -23,9 +27,15 @@ class SpiffCeleryWorkerError(Exception): pass +def get_current_process_index() -> Any: + raise Exception("NOOO") + # return current_process().index + return 1 + + @shared_task(ignore_result=False, time_limit=ten_minutes) def celery_task_process_instance_run(process_instance_id: int, task_guid: str | None = None) -> dict: - proc_index = current_process().index + proc_index = get_current_process_index() ProcessInstanceLockService.set_thread_local_locking_context("celery:worker", additional_processing_identifier=proc_index) process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first() @@ -71,6 +81,8 @@ def celery_task_process_instance_run(process_instance_id: int, task_guid: str | f"Could not run process instance with worker: {current_app.config['PROCESS_UUID']} - {proc_index}. Error was:" f" {str(exception)}" ) + # NOTE: consider exponential backoff + queue_future_task_if_appropriate(process_instance, eta_in_seconds=10, task_guid=task_guid) return {"ok": False, "process_instance_id": process_instance_id, "task_guid": task_guid, "exception": str(exception)} except Exception as exception: db.session.rollback() # in case the above left the database with a bad transaction diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py index a774179d9..6ffc1e3c9 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py @@ -35,7 +35,9 @@ def should_queue_process_instance(process_instance: ProcessInstanceModel, execut return False -def queue_future_task_if_appropriate(process_instance: ProcessInstanceModel, eta_in_seconds: float, task_guid: str) -> bool: +def queue_future_task_if_appropriate( + process_instance: ProcessInstanceModel, eta_in_seconds: float, task_guid: str | None = None +) -> bool: if queue_enabled_for_process_model(process_instance): buffer = 1 countdown = eta_in_seconds - time.time() + buffer diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_task.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_task.py new file mode 100644 index 000000000..992258fd1 --- /dev/null +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_task.py @@ -0,0 +1,59 @@ +from unittest.mock import patch +from pytest_mock.plugin import MockerFixture +from flask.app import Flask +from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task import celery_task_process_instance_run + +# from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import ( +# queue_future_task_if_appropriate, +# ) +from spiffworkflow_backend.models.db import db +import time +from tests.spiffworkflow_backend.helpers.test_data import load_test_spec +from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus +from spiffworkflow_backend.models.process_instance_queue import ProcessInstanceQueueModel +from tests.spiffworkflow_backend.helpers.base_test import BaseTest + + +class TestProcessInstanceTask(BaseTest): + def test_queues_process_instance_if_locked( + self, + app: Flask, + mocker: MockerFixture, + with_db_and_bpmn_file_cleanup: None, + ) -> None: + with self.app_config_mock(app, "SPIFFWORKFLOW_BACKEND_CELERY_ENABLED", True): + # mocked_process = mocker.Mock() + # mocked_process.index = 12345 # Setting the pid attribute to a mock value + # mocker.patch("billiard.current_process", return_value=mocked_process) + + process_model = load_test_spec( + process_model_id="test_group/model_with_lanes", + bpmn_file_name="lanes.bpmn", + process_model_source_directory="model_with_lanes", + ) + process_instance = self.create_process_instance_from_process_model(process_model=process_model, status="waiting") + assert process_instance.status == ProcessInstanceStatus.waiting.value + queue_entry = ProcessInstanceQueueModel.query.filter_by(process_instance_id=process_instance.id).first() + assert queue_entry is not None + queue_entry.locked_by = "test:test_waiting" + queue_entry.locked_at_seconds = round(time.time()) + db.session.add(queue_entry) + db.session.commit() + # mocker.patch( + # "spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer.queue_future_task_if_appropriate" + # ) + + with patch( + "spiffworkflow_backend.background_processing.celery_tasks.process_instance_task.get_current_process_index" + ) as mock_proc_index: + mock_proc_index.return_value = 1 + # with patch( + # "spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer.queue_future_task_if_appropriate" + # ) as mock_queue: + with patch( + "spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer.queue_future_task_if_appropriate" + ) as mock_queue: + mock_queue.return_value = 1 + # mock = mocker.patch("billiard.current_process()", return_value=0) + celery_task_process_instance_run(process_instance_id=process_instance.id) + assert mock_queue.call_count == 1