From a9fc10aa55846cb1d1300d0696a32eb61ae5c702 Mon Sep 17 00:00:00 2001 From: jasquat Date: Tue, 11 Apr 2023 14:40:38 -0400 Subject: [PATCH] dequeue in background processor instead of init to avoid lock issues when a read only object is necessary w/ burnettk jbirddog --- .../services/process_instance_lock_service.py | 10 ++++++++-- .../services/process_instance_processor.py | 19 +++++++++---------- .../process_instance_queue_service.py | 6 ++++-- .../services/process_instance_service.py | 4 ++-- 4 files changed, 23 insertions(+), 16 deletions(-) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_lock_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_lock_service.py index 866c073b3..70333aea2 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_lock_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_lock_service.py @@ -10,6 +10,10 @@ from spiffworkflow_backend.models.process_instance_queue import ( ) +class ExpectedLockNotFoundError(Exception): + pass + + class ProcessInstanceLockService: """TODO: comment.""" @@ -49,8 +53,10 @@ class ProcessInstanceLockService: @classmethod def unlock(cls, process_instance_id: int) -> ProcessInstanceQueueModel: - ctx = cls.get_thread_local_locking_context() - return ctx["locks"].pop(process_instance_id) # type: ignore + queue_model = cls.try_unlock(process_instance_id) + if queue_model is None: + raise ExpectedLockNotFoundError(f"Could not find a lock for process instance: {process_instance_id}") + return queue_model @classmethod def try_unlock(cls, process_instance_id: int) -> Optional[ProcessInstanceQueueModel]: diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 658426c40..dc0849817 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -419,16 +419,15 @@ class ProcessInstanceProcessor: # * __get_bpmn_process_instance, which takes spec and subprocesses and instantiates and returns a BpmnWorkflow def __init__(self, process_instance_model: ProcessInstanceModel, validate_only: bool = False) -> None: """Create a Workflow Processor based on the serialized information available in the process_instance model.""" - with ProcessInstanceQueueService.dequeued(process_instance_model): - try: - self.setup_processor_with_process_instance( - process_instance_model=process_instance_model, validate_only=validate_only - ) - except Exception as ex: - process_instance_model.status = ProcessInstanceStatus.error.value - db.session.add(process_instance_model) - db.session.commit() - raise ex + try: + self.setup_processor_with_process_instance( + process_instance_model=process_instance_model, validate_only=validate_only + ) + except Exception as ex: + process_instance_model.status = ProcessInstanceStatus.error.value + db.session.add(process_instance_model) + db.session.commit() + raise ex def setup_processor_with_process_instance( self, process_instance_model: ProcessInstanceModel, validate_only: bool = False diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py index 9021ab4d8..23ba0e48d 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py @@ -90,9 +90,11 @@ class ProcessInstanceQueueService: @contextlib.contextmanager def dequeued(cls, process_instance: ProcessInstanceModel) -> Generator[None, None, None]: reentering_lock = ProcessInstanceLockService.has_lock(process_instance.id) + if not reentering_lock: + # this can blow up with ProcessInstanceIsNotEnqueuedError or ProcessInstanceIsAlreadyLockedError + # that's fine, let it bubble up. and in that case, there's no need to _enqueue / unlock + cls._dequeue(process_instance) try: - if not reentering_lock: - cls._dequeue(process_instance) yield finally: if not reentering_lock: diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py index 3ec3ab4da..3a0307f18 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py @@ -113,10 +113,10 @@ class ProcessInstanceService: .all() ) for process_instance in records: - processor = None try: current_app.logger.info(f"Processing process_instance {process_instance.id}") - processor = ProcessInstanceProcessor(process_instance) + with ProcessInstanceQueueService.dequeued(process_instance): + processor = ProcessInstanceProcessor(process_instance) if cls.can_optimistically_skip(processor, status_value): current_app.logger.info(f"Optimistically skipped process_instance {process_instance.id}") continue