Retry locked user input submissions (#185)
This commit is contained in:
parent
f5f0c86bf4
commit
ab75215a15
|
@ -129,5 +129,13 @@ SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB = environ.get(
|
||||||
"SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB", default="greedy"
|
"SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB", default="greedy"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
SPIFFWORKFLOW_BACKEND_USER_INPUT_REQUIRED_LOCK_RETRY_TIMES = int(
|
||||||
|
environ.get("SPIFFWORKFLOW_BACKEND_USER_INPUT_REQUIRED_LOCK_RETRY_TIMES", default="3")
|
||||||
|
)
|
||||||
|
|
||||||
|
SPIFFWORKFLOW_BACKEND_USER_INPUT_REQUIRED_LOCK_RETRY_INTERVAL_IN_SECONDS = int(
|
||||||
|
environ.get("SPIFFWORKFLOW_BACKEND_USER_INPUT_REQUIRED_LOCK_RETRY_INTERVAL_IN_SECONDS", default="1")
|
||||||
|
)
|
||||||
|
|
||||||
# this is only used in CI. use SPIFFWORKFLOW_BACKEND_DATABASE_URI instead for real configuration
|
# this is only used in CI. use SPIFFWORKFLOW_BACKEND_DATABASE_URI instead for real configuration
|
||||||
SPIFFWORKFLOW_BACKEND_DATABASE_PASSWORD = environ.get("SPIFFWORKFLOW_BACKEND_DATABASE_PASSWORD", default=None)
|
SPIFFWORKFLOW_BACKEND_DATABASE_PASSWORD = environ.get("SPIFFWORKFLOW_BACKEND_DATABASE_PASSWORD", default=None)
|
||||||
|
|
|
@ -378,8 +378,13 @@ def task_submit_shared(
|
||||||
only_tasks_that_can_be_completed=True,
|
only_tasks_that_can_be_completed=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
retry_times = current_app.config["SPIFFWORKFLOW_BACKEND_USER_INPUT_REQUIRED_LOCK_RETRY_TIMES"]
|
||||||
|
retry_interval_in_seconds = current_app.config[
|
||||||
|
"SPIFFWORKFLOW_BACKEND_USER_INPUT_REQUIRED_LOCK_RETRY_INTERVAL_IN_SECONDS"
|
||||||
|
]
|
||||||
|
|
||||||
with sentry_sdk.start_span(op="task", description="complete_form_task"):
|
with sentry_sdk.start_span(op="task", description="complete_form_task"):
|
||||||
processor.lock_process_instance("Web")
|
processor.lock_process_instance("Web", retry_times, retry_interval_in_seconds)
|
||||||
ProcessInstanceService.complete_form_task(
|
ProcessInstanceService.complete_form_task(
|
||||||
processor=processor,
|
processor=processor,
|
||||||
spiff_task=spiff_task,
|
spiff_task=spiff_task,
|
||||||
|
|
|
@ -91,9 +91,8 @@ from spiffworkflow_backend.services.file_system_service import FileSystemService
|
||||||
from spiffworkflow_backend.services.process_instance_lock_service import (
|
from spiffworkflow_backend.services.process_instance_lock_service import (
|
||||||
ProcessInstanceLockService,
|
ProcessInstanceLockService,
|
||||||
)
|
)
|
||||||
from spiffworkflow_backend.services.process_instance_queue_service import (
|
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceIsAlreadyLockedError
|
||||||
ProcessInstanceQueueService,
|
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService
|
||||||
)
|
|
||||||
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
||||||
from spiffworkflow_backend.services.service_task_service import ServiceTaskDelegate
|
from spiffworkflow_backend.services.service_task_service import ServiceTaskDelegate
|
||||||
from spiffworkflow_backend.services.spec_file_service import SpecFileService
|
from spiffworkflow_backend.services.spec_file_service import SpecFileService
|
||||||
|
@ -1459,8 +1458,23 @@ class ProcessInstanceProcessor:
|
||||||
return the_status
|
return the_status
|
||||||
|
|
||||||
# TODO: replace with implicit/more granular locking in workflow execution service
|
# TODO: replace with implicit/more granular locking in workflow execution service
|
||||||
def lock_process_instance(self, lock_prefix: str) -> None:
|
# TODO: remove the retry logic once all user_input_required's don't need to be locked to check timers
|
||||||
|
def lock_process_instance(
|
||||||
|
self, lock_prefix: str, retry_count: int = 0, retry_interval_in_seconds: int = 0
|
||||||
|
) -> None:
|
||||||
|
try:
|
||||||
ProcessInstanceQueueService.dequeue(self.process_instance_model)
|
ProcessInstanceQueueService.dequeue(self.process_instance_model)
|
||||||
|
except ProcessInstanceIsAlreadyLockedError as e:
|
||||||
|
if retry_count > 0:
|
||||||
|
current_app.logger.info(
|
||||||
|
f"process_instance_id {self.process_instance_model.id} is locked. "
|
||||||
|
f"will retry {retry_count} times with delay of {retry_interval_in_seconds}."
|
||||||
|
)
|
||||||
|
if retry_interval_in_seconds > 0:
|
||||||
|
time.sleep(retry_interval_in_seconds)
|
||||||
|
self.lock_process_instance(lock_prefix, retry_count - 1, retry_interval_in_seconds)
|
||||||
|
else:
|
||||||
|
raise e
|
||||||
|
|
||||||
# TODO: replace with implicit/more granular locking in workflow execution service
|
# TODO: replace with implicit/more granular locking in workflow execution service
|
||||||
def unlock_process_instance(self, lock_prefix: str) -> None:
|
def unlock_process_instance(self, lock_prefix: str) -> None:
|
||||||
|
|
Loading…
Reference in New Issue