diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_api_blueprint.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_api_blueprint.py index a9cf7dfb..cb94aa16 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_api_blueprint.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_api_blueprint.py @@ -462,7 +462,7 @@ def _task_submit_shared( # in the block causes the process instance to go into an error state. for example, when # AuthorizationService.assert_user_can_complete_task raises. this would have been solvable, but this seems simpler, # and the cost is not huge given that this function is not the most common code path in the world. - with ProcessInstanceQueueService.dequeued(process_instance): + with ProcessInstanceQueueService.dequeued(process_instance, max_attempts=3): ProcessInstanceMigrator.run(process_instance) processor = ProcessInstanceProcessor( @@ -487,7 +487,7 @@ def _task_submit_shared( ) with sentry_sdk.start_span(op="task", description="complete_form_task"): - with ProcessInstanceQueueService.dequeued(process_instance): + with ProcessInstanceQueueService.dequeued(process_instance, max_attempts=3): ProcessInstanceService.complete_form_task( processor=processor, spiff_task=spiff_task, diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py index 35ddeb75..5616e9b7 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py @@ -92,18 +92,42 @@ class ProcessInstanceQueueService: process_instance.id, queue_entry, additional_processing_identifier=additional_processing_identifier ) + @classmethod + def _dequeue_with_retries( + cls, + process_instance: ProcessInstanceModel, + additional_processing_identifier: str | None = None, + max_attempts: int = 1, + ) -> None: + attempt = 1 + backoff_factor = 2 + while True: + try: + return cls._dequeue(process_instance, additional_processing_identifier=additional_processing_identifier) + except ProcessInstanceIsAlreadyLockedError as exception: + if attempt >= max_attempts: + raise exception + time.sleep(backoff_factor**attempt) + attempt += 1 + @classmethod @contextlib.contextmanager def dequeued( - cls, process_instance: ProcessInstanceModel, additional_processing_identifier: str | None = None + cls, + process_instance: ProcessInstanceModel, + additional_processing_identifier: str | None = None, + max_attempts: int = 1, ) -> Generator[None, None, None]: reentering_lock = ProcessInstanceLockService.has_lock( process_instance.id, additional_processing_identifier=additional_processing_identifier ) + if not reentering_lock: # this can blow up with ProcessInstanceIsNotEnqueuedError or ProcessInstanceIsAlreadyLockedError # that's fine, let it bubble up. and in that case, there's no need to _enqueue / unlock - cls._dequeue(process_instance, additional_processing_identifier=additional_processing_identifier) + cls._dequeue_with_retries( + process_instance, additional_processing_identifier=additional_processing_identifier, max_attempts=max_attempts + ) try: yield except Exception as ex: diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_queue_service.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_queue_service.py index e844351c..a2d8a2ae 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_queue_service.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_queue_service.py @@ -1,9 +1,12 @@ import time from contextlib import suppress +import pytest from flask.app import Flask +from pytest_mock.plugin import MockerFixture from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.services.process_instance_lock_service import ProcessInstanceLockService +from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceIsAlreadyLockedError from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService from tests.spiffworkflow_backend.helpers.base_test import BaseTest @@ -114,3 +117,24 @@ class TestProcessInstanceQueueService(BaseTest): assert ProcessInstanceLockService.has_lock(process_instance.id) assert not ProcessInstanceLockService.has_lock(process_instance.id) + + def test_dequeue_with_retries_works( + self, + app: Flask, + mocker: MockerFixture, + with_db_and_bpmn_file_cleanup: None, + ) -> None: + process_instance = self._create_process_instance() + dequeue_mocker = mocker.patch.object( + ProcessInstanceQueueService, "_dequeue", side_effect=ProcessInstanceIsAlreadyLockedError + ) + mocker.patch("time.sleep") + with pytest.raises(ProcessInstanceIsAlreadyLockedError): + with ProcessInstanceQueueService.dequeued(process_instance, max_attempts=5): + pass + assert dequeue_mocker.call_count == 5 + + with pytest.raises(ProcessInstanceIsAlreadyLockedError): + with ProcessInstanceQueueService.dequeued(process_instance): + pass + assert dequeue_mocker.call_count == 6