From 8e0324df632cb2cf63ca7f1582ddbdf73b17ade4 Mon Sep 17 00:00:00 2001 From: jbirddog <100367399+jbirddog@users.noreply.github.com> Date: Wed, 15 Mar 2023 12:32:55 -0400 Subject: [PATCH] Smaller locking window for the background processor (#183) --- .../process_instance_queue_service.py | 37 ++++++++++++++----- .../services/process_instance_service.py | 6 +-- 2 files changed, 31 insertions(+), 12 deletions(-) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py index d75d903f..a0aceb94 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py @@ -1,5 +1,6 @@ import time from typing import List +from typing import Optional from flask import current_app @@ -84,8 +85,33 @@ class ProcessInstanceQueueService: ProcessInstanceLockService.lock(process_instance.id, queue_entry) - @staticmethod + @classmethod + def entries_with_status( + cls, + status_value: str = ProcessInstanceStatus.waiting.value, + locked_by: Optional[str] = None, + ) -> List[ProcessInstanceQueueModel]: + return ( + db.session.query(ProcessInstanceQueueModel) + .filter( + ProcessInstanceQueueModel.status == status_value, + ProcessInstanceQueueModel.locked_by == locked_by, + ) + .all() + ) + + @classmethod + def peek_many( + cls, + status_value: str = ProcessInstanceStatus.waiting.value, + ) -> List[int]: + queue_entries = cls.entries_with_status(status_value, None) + ids_with_status = [entry.process_instance_id for entry in queue_entries] + return ids_with_status + + @classmethod def dequeue_many( + cls, status_value: str = ProcessInstanceStatus.waiting.value, ) -> List[int]: locked_by = ProcessInstanceLockService.locked_by() @@ -102,14 +128,7 @@ class ProcessInstanceQueueService: db.session.commit() - queue_entries = ( - db.session.query(ProcessInstanceQueueModel) - .filter( - ProcessInstanceQueueModel.status == status_value, - ProcessInstanceQueueModel.locked_by == locked_by, - ) - .all() - ) + queue_entries = cls.entries_with_status(status_value, locked_by) locked_ids = ProcessInstanceLockService.lock_many(queue_entries) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py index dfeb2bde..23ce9a22 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py @@ -84,15 +84,15 @@ class ProcessInstanceService: @staticmethod def do_waiting(status_value: str = ProcessInstanceStatus.waiting.value) -> None: """Do_waiting.""" - locked_process_instance_ids = ProcessInstanceQueueService.dequeue_many( + process_instance_ids_to_check = ProcessInstanceQueueService.peek_many( status_value ) - if len(locked_process_instance_ids) == 0: + if len(process_instance_ids_to_check) == 0: return records = ( db.session.query(ProcessInstanceModel) - .filter(ProcessInstanceModel.id.in_(locked_process_instance_ids)) # type: ignore + .filter(ProcessInstanceModel.id.in_(process_instance_ids_to_check)) # type: ignore .all() ) process_instance_lock_prefix = "Background"