diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py b/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py index ca808564..2af3e7df 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py @@ -129,5 +129,13 @@ SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB = environ.get( "SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB", default="greedy" ) +SPIFFWORKFLOW_BACKEND_USER_INPUT_REQUIRED_LOCK_RETRY_TIMES = int( + environ.get("SPIFFWORKFLOW_BACKEND_USER_INPUT_REQUIRED_LOCK_RETRY_TIMES", default="3") +) + +SPIFFWORKFLOW_BACKEND_USER_INPUT_REQUIRED_LOCK_RETRY_INTERVAL_IN_SECONDS = int( + environ.get("SPIFFWORKFLOW_BACKEND_USER_INPUT_REQUIRED_LOCK_RETRY_INTERVAL_IN_SECONDS", default="1") +) + # this is only used in CI. use SPIFFWORKFLOW_BACKEND_DATABASE_URI instead for real configuration SPIFFWORKFLOW_BACKEND_DATABASE_PASSWORD = environ.get("SPIFFWORKFLOW_BACKEND_DATABASE_PASSWORD", default=None) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py index 7ca7c6eb..ad9868e6 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py @@ -378,8 +378,13 @@ def task_submit_shared( only_tasks_that_can_be_completed=True, ) + retry_times = current_app.config["SPIFFWORKFLOW_BACKEND_USER_INPUT_REQUIRED_LOCK_RETRY_TIMES"] + retry_interval_in_seconds = current_app.config[ + "SPIFFWORKFLOW_BACKEND_USER_INPUT_REQUIRED_LOCK_RETRY_INTERVAL_IN_SECONDS" + ] + with sentry_sdk.start_span(op="task", description="complete_form_task"): - processor.lock_process_instance("Web") + processor.lock_process_instance("Web", retry_times, retry_interval_in_seconds) ProcessInstanceService.complete_form_task( processor=processor, spiff_task=spiff_task, diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 6c8b64fc..5e771c12 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -91,9 +91,8 @@ from spiffworkflow_backend.services.file_system_service import FileSystemService from spiffworkflow_backend.services.process_instance_lock_service import ( ProcessInstanceLockService, ) -from spiffworkflow_backend.services.process_instance_queue_service import ( - ProcessInstanceQueueService, -) +from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceIsAlreadyLockedError +from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService from spiffworkflow_backend.services.process_model_service import ProcessModelService from spiffworkflow_backend.services.service_task_service import ServiceTaskDelegate from spiffworkflow_backend.services.spec_file_service import SpecFileService @@ -1459,8 +1458,23 @@ class ProcessInstanceProcessor: return the_status # TODO: replace with implicit/more granular locking in workflow execution service - def lock_process_instance(self, lock_prefix: str) -> None: - ProcessInstanceQueueService.dequeue(self.process_instance_model) + # TODO: remove the retry logic once all user_input_required's don't need to be locked to check timers + def lock_process_instance( + self, lock_prefix: str, retry_count: int = 0, retry_interval_in_seconds: int = 0 + ) -> None: + try: + ProcessInstanceQueueService.dequeue(self.process_instance_model) + except ProcessInstanceIsAlreadyLockedError as e: + if retry_count > 0: + current_app.logger.info( + f"process_instance_id {self.process_instance_model.id} is locked. " + f"will retry {retry_count} times with delay of {retry_interval_in_seconds}." + ) + if retry_interval_in_seconds > 0: + time.sleep(retry_interval_in_seconds) + self.lock_process_instance(lock_prefix, retry_count - 1, retry_interval_in_seconds) + else: + raise e # TODO: replace with implicit/more granular locking in workflow execution service def unlock_process_instance(self, lock_prefix: str) -> None: