diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/__init__.py b/spiffworkflow-backend/src/spiffworkflow_backend/__init__.py index 9b7121ff..79923717 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/__init__.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/__init__.py @@ -92,6 +92,11 @@ def start_scheduler(app: flask.app.Flask, scheduler_class: BaseScheduler = Backg "interval", seconds=user_input_required_polling_interval_in_seconds, ) + scheduler.add_job( + BackgroundProcessingService(app).remove_stale_locks, + "interval", + seconds=app.config["MAX_INSTANCE_LOCK_DURATION_IN_SECONDS"], + ) scheduler.start() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/config/__init__.py b/spiffworkflow-backend/src/spiffworkflow_backend/config/__init__.py index 0b631a1d..fe342b0b 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/config/__init__.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/config/__init__.py @@ -185,6 +185,10 @@ def setup_config(app: Flask) -> None: if app.config["SPIFFWORKFLOW_BACKEND_DEFAULT_USER_GROUP"] == "": app.config["SPIFFWORKFLOW_BACKEND_DEFAULT_USER_GROUP"] = None + app.config["MAX_INSTANCE_LOCK_DURATION_IN_SECONDS"] = int( + app.config["SPIFFWORKFLOW_BACKEND_MAX_INSTANCE_LOCK_DURATION_IN_SECONDS"] + ) + thread_local_data = threading.local() app.config["THREAD_LOCAL_DATA"] = thread_local_data _set_up_tenant_specific_fields_as_list_of_strings(app) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py b/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py index 08e12fa7..1edebd19 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py @@ -188,3 +188,7 @@ SPIFFWORKFLOW_BACKEND_ELEMENT_UNITS_CACHE_DIR = environ.get( SPIFFWORKFLOW_BACKEND_USE_WERKZEUG_MIDDLEWARE_PROXY_FIX = ( environ.get("SPIFFWORKFLOW_BACKEND_USE_WERKZEUG_MIDDLEWARE_PROXY_FIX", default="false") == "true" ) + +SPIFFWORKFLOW_BACKEND_MAX_INSTANCE_LOCK_DURATION_IN_SECONDS = environ.get( + "SPIFFWORKFLOW_BACKEND_MAX_INSTANCE_LOCK_DURATION_IN_SECONDS", default="300" +) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_queue.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_queue.py index 697a6acf..f56eee8e 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_queue.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_queue.py @@ -15,10 +15,11 @@ class ProcessInstanceQueueModel(SpiffworkflowBaseDBModel): process_instance_id: int = db.Column( ForeignKey(ProcessInstanceModel.id), unique=True, nullable=False # type: ignore ) - run_at_in_seconds: int = db.Column(db.Integer) priority: int = db.Column(db.Integer) locked_by: str | None = db.Column(db.String(80), index=True, nullable=True) locked_at_in_seconds: int | None = db.Column(db.Integer, index=True, nullable=True) status: str = db.Column(db.String(50), index=True) + + run_at_in_seconds: int = db.Column(db.Integer) updated_at_in_seconds: int = db.Column(db.Integer) created_at_in_seconds: int = db.Column(db.Integer) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/background_processing_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/background_processing_service.py index 54efc5bc..3b78608c 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/background_processing_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/background_processing_service.py @@ -34,3 +34,8 @@ class BackgroundProcessingService: with self.app.app_context(): ProcessInstanceLockService.set_thread_local_locking_context("bg:messages") MessageService.correlate_all_message_instances() + + def remove_stale_locks(self) -> None: + """If something has been locked for a certain amount of time it is probably stale so unlock it.""" + with self.app.app_context(): + ProcessInstanceLockService.remove_stale_locks() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_lock_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_lock_service.py index 30725e62..1687c7f5 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_lock_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_lock_service.py @@ -1,8 +1,12 @@ import threading +import time from typing import Any from flask import current_app +from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.process_instance_queue import ProcessInstanceQueueModel +from sqlalchemy import and_ +from sqlalchemy import or_ class ExpectedLockNotFoundError(Exception): @@ -62,3 +66,34 @@ class ProcessInstanceLockService: def has_lock(cls, process_instance_id: int) -> bool: ctx = cls.get_thread_local_locking_context() return process_instance_id in ctx["locks"] + + @classmethod + def remove_stale_locks(cls) -> None: + max_duration = current_app.config["MAX_INSTANCE_LOCK_DURATION_IN_SECONDS"] + current_time = round(time.time()) + five_min_ago = current_time - max_duration + + # TODO: remove check for NULL locked_at_in_seconds and fallback to updated_at_in_seconds + # once we can confirm that old entries have been taken care of on current envs. + # New code should not allow rows where locked_by has a value but locked_at_in_seconds is null. + entries_with_stale_locks = ProcessInstanceQueueModel.query.filter( + ProcessInstanceQueueModel.locked_by != None, # noqa: E711 + or_( + ProcessInstanceQueueModel.locked_at_in_seconds <= five_min_ago, + and_( + ProcessInstanceQueueModel.updated_at_in_seconds <= five_min_ago, + ProcessInstanceQueueModel.locked_at_in_seconds == None, # noqa: E711 + ), + ), + ).all() + + for entry in entries_with_stale_locks: + locked_duration = current_time - (entry.locked_at_in_seconds or entry.updated_at_in_seconds) + current_app.logger.info( + f"Removing stale lock for process instance: {entry.process_instance_id} with locked_by:" + f" '{entry.locked_by}' because it has been locked for seconds: {locked_duration}" + ) + entry.locked_by = None + entry.locked_at_in_seconds = None + db.session.add(entry) + db.session.commit() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py index e5be53ad..16bed6f5 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py @@ -51,6 +51,7 @@ class ProcessInstanceQueueService: @classmethod def _dequeue(cls, process_instance: ProcessInstanceModel) -> None: locked_by = ProcessInstanceLockService.locked_by() + current_time = round(time.time()) db.session.query(ProcessInstanceQueueModel).filter( ProcessInstanceQueueModel.process_instance_id == process_instance.id, @@ -58,6 +59,7 @@ class ProcessInstanceQueueService: ).update( { "locked_by": locked_by, + "locked_at_in_seconds": current_time, } )