From a1a54c54bb96a69d3614ff98851f25e0fe7b85eb Mon Sep 17 00:00:00 2001 From: jbirddog <100367399+jbirddog@users.noreply.github.com> Date: Fri, 31 Mar 2023 10:59:09 -0400 Subject: [PATCH] Trip safe asserts in tests, various process instance queue improvements (#199) --- .gitignore | 1 + .../spiffworkflow_backend/config/default.py | 8 -- .../routes/process_instances_controller.py | 29 +--- .../routes/tasks_controller.py | 25 ++-- .../services/assertion_service.py | 2 +- .../services/process_instance_processor.py | 38 ++---- .../process_instance_queue_service.py | 82 +++++------- .../services/process_instance_service.py | 8 +- .../helpers/base_test.py | 2 +- .../unit/test_process_instance_processor.py | 40 ------ .../test_process_instance_queue_service.py | 124 ++++++++++++++++++ 11 files changed, 193 insertions(+), 166 deletions(-) create mode 100644 spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_queue_service.py diff --git a/.gitignore b/.gitignore index d391cd85..24a0ada5 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ pyrightconfig.json .idea/ t .dccache +*~ \ No newline at end of file diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py b/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py index 5c51e294..4ba0efd9 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py @@ -139,13 +139,5 @@ SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB = environ.get( "SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB", default="greedy" ) -SPIFFWORKFLOW_BACKEND_USER_INPUT_REQUIRED_LOCK_RETRY_TIMES = int( - environ.get("SPIFFWORKFLOW_BACKEND_USER_INPUT_REQUIRED_LOCK_RETRY_TIMES", default="3") -) - -SPIFFWORKFLOW_BACKEND_USER_INPUT_REQUIRED_LOCK_RETRY_INTERVAL_IN_SECONDS = int( - environ.get("SPIFFWORKFLOW_BACKEND_USER_INPUT_REQUIRED_LOCK_RETRY_INTERVAL_IN_SECONDS", default="1") -) - # this is only used in CI. use SPIFFWORKFLOW_BACKEND_DATABASE_URI instead for real configuration SPIFFWORKFLOW_BACKEND_DATABASE_PASSWORD = environ.get("SPIFFWORKFLOW_BACKEND_DATABASE_PASSWORD", default=None) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py index e27f68a5..51c304c5 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py @@ -56,9 +56,6 @@ from spiffworkflow_backend.services.error_handling_service import ErrorHandlingS from spiffworkflow_backend.services.git_service import GitCommandError from spiffworkflow_backend.services.git_service import GitService from spiffworkflow_backend.services.message_service import MessageService -from spiffworkflow_backend.services.process_instance_lock_service import ( - ProcessInstanceLockService, -) from spiffworkflow_backend.services.process_instance_processor import ( ProcessInstanceProcessor, ) @@ -105,7 +102,6 @@ def process_instance_create( process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier( process_model_identifier, g.user ) - ProcessInstanceQueueService.enqueue(process_instance) return Response( json.dumps(ProcessInstanceModelSchema().dump(process_instance)), status=201, @@ -131,7 +127,6 @@ def process_instance_run( if do_engine_steps: try: - processor.lock_process_instance("Web") processor.do_engine_steps(save=True) except ( ApiError, @@ -150,9 +145,6 @@ def process_instance_run( status_code=400, task=task, ) from e - finally: - if ProcessInstanceLockService.has_lock(process_instance.id): - processor.unlock_process_instance("Web") if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER"]: MessageService.correlate_all_message_instances() @@ -173,14 +165,11 @@ def process_instance_terminate( processor = ProcessInstanceProcessor(process_instance) try: - processor.lock_process_instance("Web") - processor.terminate() + with ProcessInstanceQueueService.dequeued(process_instance): + processor.terminate() except (ProcessInstanceIsNotEnqueuedError, ProcessInstanceIsAlreadyLockedError) as e: ErrorHandlingService().handle_error(processor, e) raise e - finally: - if ProcessInstanceLockService.has_lock(process_instance.id): - processor.unlock_process_instance("Web") return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") @@ -194,14 +183,11 @@ def process_instance_suspend( processor = ProcessInstanceProcessor(process_instance) try: - processor.lock_process_instance("Web") - processor.suspend() + with ProcessInstanceQueueService.dequeued(process_instance): + processor.suspend() except (ProcessInstanceIsNotEnqueuedError, ProcessInstanceIsAlreadyLockedError) as e: ErrorHandlingService().handle_error(processor, e) raise e - finally: - if ProcessInstanceLockService.has_lock(process_instance.id): - processor.unlock_process_instance("Web") return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") @@ -215,14 +201,11 @@ def process_instance_resume( processor = ProcessInstanceProcessor(process_instance) try: - processor.lock_process_instance("Web") - processor.resume() + with ProcessInstanceQueueService.dequeued(process_instance): + processor.resume() except (ProcessInstanceIsNotEnqueuedError, ProcessInstanceIsAlreadyLockedError) as e: ErrorHandlingService().handle_error(processor, e) raise e - finally: - if ProcessInstanceLockService.has_lock(process_instance.id): - processor.unlock_process_instance("Web") return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py index 3d0eac40..7145dcce 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py @@ -56,6 +56,9 @@ from spiffworkflow_backend.services.file_system_service import FileSystemService from spiffworkflow_backend.services.process_instance_processor import ( ProcessInstanceProcessor, ) +from spiffworkflow_backend.services.process_instance_queue_service import ( + ProcessInstanceQueueService, +) from spiffworkflow_backend.services.process_instance_service import ( ProcessInstanceService, ) @@ -426,21 +429,15 @@ def task_submit_shared( only_tasks_that_can_be_completed=True, ) - retry_times = current_app.config["SPIFFWORKFLOW_BACKEND_USER_INPUT_REQUIRED_LOCK_RETRY_TIMES"] - retry_interval_in_seconds = current_app.config[ - "SPIFFWORKFLOW_BACKEND_USER_INPUT_REQUIRED_LOCK_RETRY_INTERVAL_IN_SECONDS" - ] - with sentry_sdk.start_span(op="task", description="complete_form_task"): - processor.lock_process_instance("Web", retry_times, retry_interval_in_seconds) - ProcessInstanceService.complete_form_task( - processor=processor, - spiff_task=spiff_task, - data=body, - user=g.user, - human_task=human_task, - ) - processor.unlock_process_instance("Web") + with ProcessInstanceQueueService.dequeued(process_instance): + ProcessInstanceService.complete_form_task( + processor=processor, + spiff_task=spiff_task, + data=body, + user=g.user, + human_task=human_task, + ) # If we need to update all tasks, then get the next ready task and if it a multi-instance with the same # task spec, complete that form as well. diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/assertion_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/assertion_service.py index b9f7c61b..e8d534b8 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/assertion_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/assertion_service.py @@ -14,5 +14,5 @@ def safe_assertion(condition: bool) -> Generator[bool, None, None]: if not condition: sentry_sdk.capture_exception(e) current_app.logger.exception(e) - if current_app.config["ENV_IDENTIFIER"] == "local_development": + if current_app.config["ENV_IDENTIFIER"] in ["local_development", "unit_testing"]: raise e diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 93cd64fb..d2579357 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -89,7 +89,6 @@ from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.scripts.script import Script from spiffworkflow_backend.services.custom_parser import MyCustomParser from spiffworkflow_backend.services.file_system_service import FileSystemService -from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceIsAlreadyLockedError from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService from spiffworkflow_backend.services.process_model_service import ProcessModelService from spiffworkflow_backend.services.service_task_service import ServiceTaskDelegate @@ -1544,29 +1543,6 @@ class ProcessInstanceProcessor: # current_app.logger.debug(f"the_status: {the_status} for instance {self.process_instance_model.id}") return the_status - # TODO: replace with implicit/more granular locking in workflow execution service - # TODO: remove the retry logic once all user_input_required's don't need to be locked to check timers - def lock_process_instance( - self, lock_prefix: str, retry_count: int = 0, retry_interval_in_seconds: int = 0 - ) -> None: - try: - ProcessInstanceQueueService.dequeue(self.process_instance_model) - except ProcessInstanceIsAlreadyLockedError as e: - if retry_count > 0: - current_app.logger.info( - f"process_instance_id {self.process_instance_model.id} is locked. " - f"will retry {retry_count} times with delay of {retry_interval_in_seconds}." - ) - if retry_interval_in_seconds > 0: - time.sleep(retry_interval_in_seconds) - self.lock_process_instance(lock_prefix, retry_count - 1, retry_interval_in_seconds) - else: - raise e - - # TODO: replace with implicit/more granular locking in workflow execution service - def unlock_process_instance(self, lock_prefix: str) -> None: - ProcessInstanceQueueService.enqueue(self.process_instance_model) - def process_bpmn_messages(self) -> None: """Process_bpmn_messages.""" bpmn_messages = self.bpmn_process_instance.get_bpmn_messages() @@ -1622,6 +1598,18 @@ class ProcessInstanceProcessor: exit_at: None = None, save: bool = False, execution_strategy_name: Optional[str] = None, + ) -> None: + with ProcessInstanceQueueService.dequeued(self.process_instance_model): + # TODO: ideally we just lock in the execution service, but not sure + # about _add_bpmn_process_definitions and if that needs to happen in + # the same lock like it does on main + self._do_engine_steps(exit_at, save, execution_strategy_name) + + def _do_engine_steps( + self, + exit_at: None = None, + save: bool = False, + execution_strategy_name: Optional[str] = None, ) -> None: self._add_bpmn_process_definitions() @@ -1646,7 +1634,7 @@ class ProcessInstanceProcessor: execution_service.do_engine_steps(exit_at, save) finally: # clear out failling spiff tasks here since the ProcessInstanceProcessor creates an instance of the - # script engine on a class variable. + # script engine on a class variable. if ( hasattr(self._script_engine, "failing_spiff_task") and self._script_engine.failing_spiff_task is not None diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py index 2d2bc4df..9021ab4d 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py @@ -1,9 +1,9 @@ +import contextlib import time +from typing import Generator from typing import List from typing import Optional -from flask import current_app - from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus @@ -26,28 +26,32 @@ class ProcessInstanceIsAlreadyLockedError(Exception): class ProcessInstanceQueueService: """TODO: comment.""" - @staticmethod - def enqueue(process_instance: ProcessInstanceModel) -> None: - queue_item = ProcessInstanceLockService.try_unlock(process_instance.id) - - if queue_item is None: - queue_item = ProcessInstanceQueueModel(process_instance_id=process_instance.id) - + @classmethod + def _configure_and_save_queue_entry( + cls, process_instance: ProcessInstanceModel, queue_entry: ProcessInstanceQueueModel + ) -> None: # TODO: configurable params (priority/run_at) - queue_item.run_at_in_seconds = round(time.time()) - queue_item.priority = 2 - queue_item.status = process_instance.status - queue_item.locked_by = None - queue_item.locked_at_in_seconds = None + queue_entry.run_at_in_seconds = round(time.time()) + queue_entry.priority = 2 + queue_entry.status = process_instance.status + queue_entry.locked_by = None + queue_entry.locked_at_in_seconds = None - db.session.add(queue_item) + db.session.add(queue_entry) db.session.commit() - @staticmethod - def dequeue(process_instance: ProcessInstanceModel) -> None: - if ProcessInstanceLockService.has_lock(process_instance.id): - return + @classmethod + def enqueue_new_process_instance(cls, process_instance: ProcessInstanceModel) -> None: + queue_entry = ProcessInstanceQueueModel(process_instance_id=process_instance.id) + cls._configure_and_save_queue_entry(process_instance, queue_entry) + @classmethod + def _enqueue(cls, process_instance: ProcessInstanceModel) -> None: + queue_entry = ProcessInstanceLockService.unlock(process_instance.id) + cls._configure_and_save_queue_entry(process_instance, queue_entry) + + @classmethod + def _dequeue(cls, process_instance: ProcessInstanceModel) -> None: locked_by = ProcessInstanceLockService.locked_by() db.session.query(ProcessInstanceQueueModel).filter( @@ -82,6 +86,18 @@ class ProcessInstanceQueueService: ProcessInstanceLockService.lock(process_instance.id, queue_entry) + @classmethod + @contextlib.contextmanager + def dequeued(cls, process_instance: ProcessInstanceModel) -> Generator[None, None, None]: + reentering_lock = ProcessInstanceLockService.has_lock(process_instance.id) + try: + if not reentering_lock: + cls._dequeue(process_instance) + yield + finally: + if not reentering_lock: + cls._enqueue(process_instance) + @classmethod def entries_with_status( cls, @@ -105,31 +121,3 @@ class ProcessInstanceQueueService: queue_entries = cls.entries_with_status(status_value, None) ids_with_status = [entry.process_instance_id for entry in queue_entries] return ids_with_status - - @classmethod - def dequeue_many( - cls, - status_value: str = ProcessInstanceStatus.waiting.value, - ) -> List[int]: - locked_by = ProcessInstanceLockService.locked_by() - - # TODO: configurable params (priority/run_at/limit) - db.session.query(ProcessInstanceQueueModel).filter( - ProcessInstanceQueueModel.status == status_value, - ProcessInstanceQueueModel.locked_by.is_(None), # type: ignore - ).update( - { - "locked_by": locked_by, - } - ) - - db.session.commit() - - queue_entries = cls.entries_with_status(status_value, locked_by) - - locked_ids = ProcessInstanceLockService.lock_many(queue_entries) - - if len(locked_ids) > 0: - current_app.logger.info(f"{locked_by} dequeued_many: {locked_ids}") - - return locked_ids diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py index 0da39886..39f6de15 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py @@ -70,6 +70,7 @@ class ProcessInstanceService: ) db.session.add(process_instance_model) db.session.commit() + ProcessInstanceQueueService.enqueue_new_process_instance(process_instance_model) return process_instance_model @classmethod @@ -111,9 +112,7 @@ class ProcessInstanceService: .filter(ProcessInstanceModel.id.in_(process_instance_ids_to_check)) # type: ignore .all() ) - process_instance_lock_prefix = "Background" for process_instance in records: - locked = False processor = None try: current_app.logger.info(f"Processing process_instance {process_instance.id}") @@ -122,8 +121,6 @@ class ProcessInstanceService: current_app.logger.info(f"Optimistically skipped process_instance {process_instance.id}") continue - processor.lock_process_instance(process_instance_lock_prefix) - locked = True db.session.refresh(process_instance) if process_instance.status == status_value: execution_strategy_name = current_app.config[ @@ -142,9 +139,6 @@ class ProcessInstanceService: + f"({process_instance.process_model_identifier}). {str(e)}" ) current_app.logger.error(error_message) - finally: - if locked and processor: - processor.unlock_process_instance(process_instance_lock_prefix) @staticmethod def processor_to_process_instance_api( diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/helpers/base_test.py b/spiffworkflow-backend/tests/spiffworkflow_backend/helpers/base_test.py index 6b4d0143..03620228 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/helpers/base_test.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/helpers/base_test.py @@ -304,7 +304,7 @@ class BaseTest: db.session.add(process_instance) db.session.commit() - ProcessInstanceQueueService.enqueue(process_instance) + ProcessInstanceQueueService.enqueue_new_process_instance(process_instance) return process_instance diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index 1caa952d..f4f9d538 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -24,9 +24,6 @@ from spiffworkflow_backend.services.authorization_service import ( from spiffworkflow_backend.services.process_instance_processor import ( ProcessInstanceProcessor, ) -from spiffworkflow_backend.services.process_instance_queue_service import ( - ProcessInstanceIsAlreadyLockedError, -) from spiffworkflow_backend.services.process_instance_service import ( ProcessInstanceService, ) @@ -632,43 +629,6 @@ class TestProcessInstanceProcessor(BaseTest): assert len(process_instance.active_human_tasks) == 1 assert initial_human_task_id == process_instance.active_human_tasks[0].id - # TODO: port this test to queue_service test - def xxx_test_it_can_lock_and_unlock_a_process_instance( - self, - app: Flask, - client: FlaskClient, - with_db_and_bpmn_file_cleanup: None, - ) -> None: - initiator_user = self.find_or_create_user("initiator_user") - process_model = load_test_spec( - process_model_id="test_group/model_with_lanes", - bpmn_file_name="lanes_with_owner_dict.bpmn", - process_model_source_directory="model_with_lanes", - ) - process_instance = self.create_process_instance_from_process_model( - process_model=process_model, user=initiator_user - ) - processor = ProcessInstanceProcessor(process_instance) - assert process_instance.locked_by is None - assert process_instance.locked_at_in_seconds is None - processor.lock_process_instance("TEST") - - process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() - assert process_instance.locked_by is not None - assert process_instance.locked_at_in_seconds is not None - - with pytest.raises(ProcessInstanceIsAlreadyLockedError): - processor.lock_process_instance("TEST") - - # with pytest.raises(ProcessInstanceLockedBySomethingElseError): - # processor.unlock_process_instance("TEST2") - - processor.unlock_process_instance("TEST") - - process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() - assert process_instance.locked_by is None - assert process_instance.locked_at_in_seconds is None - def test_it_can_loopback_to_previous_bpmn_task_with_gateway( self, app: Flask, diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_queue_service.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_queue_service.py new file mode 100644 index 00000000..f676479f --- /dev/null +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_queue_service.py @@ -0,0 +1,124 @@ +"""Test_process_instance_queue_service.""" +from contextlib import suppress + +from flask.app import Flask +from tests.spiffworkflow_backend.helpers.base_test import BaseTest +from tests.spiffworkflow_backend.helpers.test_data import load_test_spec + +from spiffworkflow_backend.models.process_instance import ProcessInstanceModel +from spiffworkflow_backend.services.process_instance_lock_service import ( + ProcessInstanceLockService, +) +from spiffworkflow_backend.services.process_instance_queue_service import ( + ProcessInstanceQueueService, +) + + +class TestProcessInstanceQueueService(BaseTest): + """TestProcessInstanceQueueService.""" + + def _create_process_instance(self) -> ProcessInstanceModel: + initiator_user = self.find_or_create_user("initiator_user") + process_model = load_test_spec( + process_model_id="test_group/model_with_lanes", + bpmn_file_name="lanes.bpmn", + process_model_source_directory="model_with_lanes", + ) + process_instance = self.create_process_instance_from_process_model( + process_model=process_model, user=initiator_user + ) + return process_instance + + def test_newly_created_process_instances_are_not_locked_when_added_to_the_queue( + self, + app: Flask, + with_db_and_bpmn_file_cleanup: None, + ) -> None: + process_instance = self._create_process_instance() + assert not ProcessInstanceLockService.has_lock(process_instance.id) + queue_entries = ProcessInstanceQueueService.entries_with_status("not_started", None) + check_passed = False + for entry in queue_entries: + if entry.process_instance_id == process_instance.id: + assert entry.locked_by is None + check_passed = True + break + assert check_passed + + def test_peek_many_can_see_queue_entries_with_a_given_status( + self, + app: Flask, + with_db_and_bpmn_file_cleanup: None, + ) -> None: + process_instance = self._create_process_instance() + queue_entry_ids = ProcessInstanceQueueService.peek_many("not_started") + assert process_instance.id in queue_entry_ids + + def test_can_run_some_code_with_a_dequeued_process_instance( + self, + app: Flask, + with_db_and_bpmn_file_cleanup: None, + ) -> None: + process_instance = self._create_process_instance() + check_passed = False + with ProcessInstanceQueueService.dequeued(process_instance): + check_passed = True + assert check_passed + + def test_holds_a_lock_for_dequeued_process_instance( + self, + app: Flask, + with_db_and_bpmn_file_cleanup: None, + ) -> None: + process_instance = self._create_process_instance() + assert not ProcessInstanceLockService.has_lock(process_instance.id) + with ProcessInstanceQueueService.dequeued(process_instance): + assert ProcessInstanceLockService.has_lock(process_instance.id) + assert not ProcessInstanceLockService.has_lock(process_instance.id) + + def test_unlocks_if_an_exception_is_thrown_with_a__dequeued_process_instance( + self, + app: Flask, + with_db_and_bpmn_file_cleanup: None, + ) -> None: + process_instance = self._create_process_instance() + + with suppress(Exception): + with ProcessInstanceQueueService.dequeued(process_instance): + assert ProcessInstanceLockService.has_lock(process_instance.id) + raise Exception("just testing") + + assert not ProcessInstanceLockService.has_lock(process_instance.id) + + def test_can_call_dequeued_mulitple_times( + self, + app: Flask, + with_db_and_bpmn_file_cleanup: None, + ) -> None: + process_instance = self._create_process_instance() + + with ProcessInstanceQueueService.dequeued(process_instance): + assert ProcessInstanceLockService.has_lock(process_instance.id) + + with ProcessInstanceQueueService.dequeued(process_instance): + assert ProcessInstanceLockService.has_lock(process_instance.id) + + with ProcessInstanceQueueService.dequeued(process_instance): + assert ProcessInstanceLockService.has_lock(process_instance.id) + + def test_can_nest_multiple_dequeued_calls( + self, + app: Flask, + with_db_and_bpmn_file_cleanup: None, + ) -> None: + process_instance = self._create_process_instance() + + with ProcessInstanceQueueService.dequeued(process_instance): + with ProcessInstanceQueueService.dequeued(process_instance): + with ProcessInstanceQueueService.dequeued(process_instance): + assert ProcessInstanceLockService.has_lock(process_instance.id) + + assert ProcessInstanceLockService.has_lock(process_instance.id) + assert ProcessInstanceLockService.has_lock(process_instance.id) + + assert not ProcessInstanceLockService.has_lock(process_instance.id)