added ability to retry getting a lock on task submit w/ burnettk (#1279)

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
jasquat 2024-03-26 21:20:48 +00:00 committed by GitHub
parent ea9ac73918
commit e724cc5856
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 52 additions and 4 deletions

View File

@ -462,7 +462,7 @@ def _task_submit_shared(
# in the block causes the process instance to go into an error state. for example, when
# AuthorizationService.assert_user_can_complete_task raises. this would have been solvable, but this seems simpler,
# and the cost is not huge given that this function is not the most common code path in the world.
with ProcessInstanceQueueService.dequeued(process_instance):
with ProcessInstanceQueueService.dequeued(process_instance, max_attempts=3):
ProcessInstanceMigrator.run(process_instance)
processor = ProcessInstanceProcessor(
@ -487,7 +487,7 @@ def _task_submit_shared(
)
with sentry_sdk.start_span(op="task", description="complete_form_task"):
with ProcessInstanceQueueService.dequeued(process_instance):
with ProcessInstanceQueueService.dequeued(process_instance, max_attempts=3):
ProcessInstanceService.complete_form_task(
processor=processor,
spiff_task=spiff_task,

View File

@ -92,18 +92,42 @@ class ProcessInstanceQueueService:
process_instance.id, queue_entry, additional_processing_identifier=additional_processing_identifier
)
@classmethod
def _dequeue_with_retries(
cls,
process_instance: ProcessInstanceModel,
additional_processing_identifier: str | None = None,
max_attempts: int = 1,
) -> None:
attempt = 1
backoff_factor = 2
while True:
try:
return cls._dequeue(process_instance, additional_processing_identifier=additional_processing_identifier)
except ProcessInstanceIsAlreadyLockedError as exception:
if attempt >= max_attempts:
raise exception
time.sleep(backoff_factor**attempt)
attempt += 1
@classmethod
@contextlib.contextmanager
def dequeued(
cls, process_instance: ProcessInstanceModel, additional_processing_identifier: str | None = None
cls,
process_instance: ProcessInstanceModel,
additional_processing_identifier: str | None = None,
max_attempts: int = 1,
) -> Generator[None, None, None]:
reentering_lock = ProcessInstanceLockService.has_lock(
process_instance.id, additional_processing_identifier=additional_processing_identifier
)
if not reentering_lock:
# this can blow up with ProcessInstanceIsNotEnqueuedError or ProcessInstanceIsAlreadyLockedError
# that's fine, let it bubble up. and in that case, there's no need to _enqueue / unlock
cls._dequeue(process_instance, additional_processing_identifier=additional_processing_identifier)
cls._dequeue_with_retries(
process_instance, additional_processing_identifier=additional_processing_identifier, max_attempts=max_attempts
)
try:
yield
except Exception as ex:

View File

@ -1,9 +1,12 @@
import time
from contextlib import suppress
import pytest
from flask.app import Flask
from pytest_mock.plugin import MockerFixture
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.services.process_instance_lock_service import ProcessInstanceLockService
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceIsAlreadyLockedError
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
@ -114,3 +117,24 @@ class TestProcessInstanceQueueService(BaseTest):
assert ProcessInstanceLockService.has_lock(process_instance.id)
assert not ProcessInstanceLockService.has_lock(process_instance.id)
def test_dequeue_with_retries_works(
self,
app: Flask,
mocker: MockerFixture,
with_db_and_bpmn_file_cleanup: None,
) -> None:
process_instance = self._create_process_instance()
dequeue_mocker = mocker.patch.object(
ProcessInstanceQueueService, "_dequeue", side_effect=ProcessInstanceIsAlreadyLockedError
)
mocker.patch("time.sleep")
with pytest.raises(ProcessInstanceIsAlreadyLockedError):
with ProcessInstanceQueueService.dequeued(process_instance, max_attempts=5):
pass
assert dequeue_mocker.call_count == 5
with pytest.raises(ProcessInstanceIsAlreadyLockedError):
with ProcessInstanceQueueService.dequeued(process_instance):
pass
assert dequeue_mocker.call_count == 6