Feature/unlock stale locks (#360)
* set locked_at_in_seconds so we can find stale locks w/ burnettk * added background job to cleanup old locks w/ burnettk --------- Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
parent
c936c02db0
commit
c2706c5bde
|
@ -92,6 +92,11 @@ def start_scheduler(app: flask.app.Flask, scheduler_class: BaseScheduler = Backg
|
||||||
"interval",
|
"interval",
|
||||||
seconds=user_input_required_polling_interval_in_seconds,
|
seconds=user_input_required_polling_interval_in_seconds,
|
||||||
)
|
)
|
||||||
|
scheduler.add_job(
|
||||||
|
BackgroundProcessingService(app).remove_stale_locks,
|
||||||
|
"interval",
|
||||||
|
seconds=app.config["MAX_INSTANCE_LOCK_DURATION_IN_SECONDS"],
|
||||||
|
)
|
||||||
scheduler.start()
|
scheduler.start()
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -185,6 +185,10 @@ def setup_config(app: Flask) -> None:
|
||||||
if app.config["SPIFFWORKFLOW_BACKEND_DEFAULT_USER_GROUP"] == "":
|
if app.config["SPIFFWORKFLOW_BACKEND_DEFAULT_USER_GROUP"] == "":
|
||||||
app.config["SPIFFWORKFLOW_BACKEND_DEFAULT_USER_GROUP"] = None
|
app.config["SPIFFWORKFLOW_BACKEND_DEFAULT_USER_GROUP"] = None
|
||||||
|
|
||||||
|
app.config["MAX_INSTANCE_LOCK_DURATION_IN_SECONDS"] = int(
|
||||||
|
app.config["SPIFFWORKFLOW_BACKEND_MAX_INSTANCE_LOCK_DURATION_IN_SECONDS"]
|
||||||
|
)
|
||||||
|
|
||||||
thread_local_data = threading.local()
|
thread_local_data = threading.local()
|
||||||
app.config["THREAD_LOCAL_DATA"] = thread_local_data
|
app.config["THREAD_LOCAL_DATA"] = thread_local_data
|
||||||
_set_up_tenant_specific_fields_as_list_of_strings(app)
|
_set_up_tenant_specific_fields_as_list_of_strings(app)
|
||||||
|
|
|
@ -188,3 +188,7 @@ SPIFFWORKFLOW_BACKEND_ELEMENT_UNITS_CACHE_DIR = environ.get(
|
||||||
SPIFFWORKFLOW_BACKEND_USE_WERKZEUG_MIDDLEWARE_PROXY_FIX = (
|
SPIFFWORKFLOW_BACKEND_USE_WERKZEUG_MIDDLEWARE_PROXY_FIX = (
|
||||||
environ.get("SPIFFWORKFLOW_BACKEND_USE_WERKZEUG_MIDDLEWARE_PROXY_FIX", default="false") == "true"
|
environ.get("SPIFFWORKFLOW_BACKEND_USE_WERKZEUG_MIDDLEWARE_PROXY_FIX", default="false") == "true"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
SPIFFWORKFLOW_BACKEND_MAX_INSTANCE_LOCK_DURATION_IN_SECONDS = environ.get(
|
||||||
|
"SPIFFWORKFLOW_BACKEND_MAX_INSTANCE_LOCK_DURATION_IN_SECONDS", default="300"
|
||||||
|
)
|
||||||
|
|
|
@ -15,10 +15,11 @@ class ProcessInstanceQueueModel(SpiffworkflowBaseDBModel):
|
||||||
process_instance_id: int = db.Column(
|
process_instance_id: int = db.Column(
|
||||||
ForeignKey(ProcessInstanceModel.id), unique=True, nullable=False # type: ignore
|
ForeignKey(ProcessInstanceModel.id), unique=True, nullable=False # type: ignore
|
||||||
)
|
)
|
||||||
run_at_in_seconds: int = db.Column(db.Integer)
|
|
||||||
priority: int = db.Column(db.Integer)
|
priority: int = db.Column(db.Integer)
|
||||||
locked_by: str | None = db.Column(db.String(80), index=True, nullable=True)
|
locked_by: str | None = db.Column(db.String(80), index=True, nullable=True)
|
||||||
locked_at_in_seconds: int | None = db.Column(db.Integer, index=True, nullable=True)
|
locked_at_in_seconds: int | None = db.Column(db.Integer, index=True, nullable=True)
|
||||||
status: str = db.Column(db.String(50), index=True)
|
status: str = db.Column(db.String(50), index=True)
|
||||||
|
|
||||||
|
run_at_in_seconds: int = db.Column(db.Integer)
|
||||||
updated_at_in_seconds: int = db.Column(db.Integer)
|
updated_at_in_seconds: int = db.Column(db.Integer)
|
||||||
created_at_in_seconds: int = db.Column(db.Integer)
|
created_at_in_seconds: int = db.Column(db.Integer)
|
||||||
|
|
|
@ -34,3 +34,8 @@ class BackgroundProcessingService:
|
||||||
with self.app.app_context():
|
with self.app.app_context():
|
||||||
ProcessInstanceLockService.set_thread_local_locking_context("bg:messages")
|
ProcessInstanceLockService.set_thread_local_locking_context("bg:messages")
|
||||||
MessageService.correlate_all_message_instances()
|
MessageService.correlate_all_message_instances()
|
||||||
|
|
||||||
|
def remove_stale_locks(self) -> None:
|
||||||
|
"""If something has been locked for a certain amount of time it is probably stale so unlock it."""
|
||||||
|
with self.app.app_context():
|
||||||
|
ProcessInstanceLockService.remove_stale_locks()
|
||||||
|
|
|
@ -1,8 +1,12 @@
|
||||||
import threading
|
import threading
|
||||||
|
import time
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from flask import current_app
|
from flask import current_app
|
||||||
|
from spiffworkflow_backend.models.db import db
|
||||||
from spiffworkflow_backend.models.process_instance_queue import ProcessInstanceQueueModel
|
from spiffworkflow_backend.models.process_instance_queue import ProcessInstanceQueueModel
|
||||||
|
from sqlalchemy import and_
|
||||||
|
from sqlalchemy import or_
|
||||||
|
|
||||||
|
|
||||||
class ExpectedLockNotFoundError(Exception):
|
class ExpectedLockNotFoundError(Exception):
|
||||||
|
@ -62,3 +66,34 @@ class ProcessInstanceLockService:
|
||||||
def has_lock(cls, process_instance_id: int) -> bool:
|
def has_lock(cls, process_instance_id: int) -> bool:
|
||||||
ctx = cls.get_thread_local_locking_context()
|
ctx = cls.get_thread_local_locking_context()
|
||||||
return process_instance_id in ctx["locks"]
|
return process_instance_id in ctx["locks"]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def remove_stale_locks(cls) -> None:
|
||||||
|
max_duration = current_app.config["MAX_INSTANCE_LOCK_DURATION_IN_SECONDS"]
|
||||||
|
current_time = round(time.time())
|
||||||
|
five_min_ago = current_time - max_duration
|
||||||
|
|
||||||
|
# TODO: remove check for NULL locked_at_in_seconds and fallback to updated_at_in_seconds
|
||||||
|
# once we can confirm that old entries have been taken care of on current envs.
|
||||||
|
# New code should not allow rows where locked_by has a value but locked_at_in_seconds is null.
|
||||||
|
entries_with_stale_locks = ProcessInstanceQueueModel.query.filter(
|
||||||
|
ProcessInstanceQueueModel.locked_by != None, # noqa: E711
|
||||||
|
or_(
|
||||||
|
ProcessInstanceQueueModel.locked_at_in_seconds <= five_min_ago,
|
||||||
|
and_(
|
||||||
|
ProcessInstanceQueueModel.updated_at_in_seconds <= five_min_ago,
|
||||||
|
ProcessInstanceQueueModel.locked_at_in_seconds == None, # noqa: E711
|
||||||
|
),
|
||||||
|
),
|
||||||
|
).all()
|
||||||
|
|
||||||
|
for entry in entries_with_stale_locks:
|
||||||
|
locked_duration = current_time - (entry.locked_at_in_seconds or entry.updated_at_in_seconds)
|
||||||
|
current_app.logger.info(
|
||||||
|
f"Removing stale lock for process instance: {entry.process_instance_id} with locked_by:"
|
||||||
|
f" '{entry.locked_by}' because it has been locked for seconds: {locked_duration}"
|
||||||
|
)
|
||||||
|
entry.locked_by = None
|
||||||
|
entry.locked_at_in_seconds = None
|
||||||
|
db.session.add(entry)
|
||||||
|
db.session.commit()
|
||||||
|
|
|
@ -51,6 +51,7 @@ class ProcessInstanceQueueService:
|
||||||
@classmethod
|
@classmethod
|
||||||
def _dequeue(cls, process_instance: ProcessInstanceModel) -> None:
|
def _dequeue(cls, process_instance: ProcessInstanceModel) -> None:
|
||||||
locked_by = ProcessInstanceLockService.locked_by()
|
locked_by = ProcessInstanceLockService.locked_by()
|
||||||
|
current_time = round(time.time())
|
||||||
|
|
||||||
db.session.query(ProcessInstanceQueueModel).filter(
|
db.session.query(ProcessInstanceQueueModel).filter(
|
||||||
ProcessInstanceQueueModel.process_instance_id == process_instance.id,
|
ProcessInstanceQueueModel.process_instance_id == process_instance.id,
|
||||||
|
@ -58,6 +59,7 @@ class ProcessInstanceQueueService:
|
||||||
).update(
|
).update(
|
||||||
{
|
{
|
||||||
"locked_by": locked_by,
|
"locked_by": locked_by,
|
||||||
|
"locked_at_in_seconds": current_time,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue