diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/apscheduler.py b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/apscheduler.py index 8acffc424..aaa0a8fa2 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/apscheduler.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/apscheduler.py @@ -84,7 +84,6 @@ def _add_jobs_that_should_run_regardless_of_celery_config(app: flask.app.Flask, ] # TODO: see if we can queue with celery instead on celery based configuration - # NOTE: pass in additional_processing_identifier if we move to celery scheduler.add_job( BackgroundProcessingService(app).process_message_instances_with_app_context, "interval", diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py index 06d701fb8..6b26c2a4d 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py @@ -29,7 +29,7 @@ class SpiffCeleryWorkerError(Exception): @shared_task(ignore_result=False, time_limit=ten_minutes) def celery_task_process_instance_run(process_instance_id: int, task_guid: str | None = None) -> dict: proc_index = current_process().index - ProcessInstanceLockService.set_thread_local_locking_context("celery:worker", additional_processing_identifier=proc_index) + ProcessInstanceLockService.set_thread_local_locking_context("celery:worker") process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first() if task_guid is None and ProcessInstanceTmpService.is_enqueued_to_run_in_the_future(process_instance): @@ -41,14 +41,13 @@ def celery_task_process_instance_run(process_instance_id: int, task_guid: str | } try: task_guid_for_requeueing = task_guid - with ProcessInstanceQueueService.dequeued(process_instance, additional_processing_identifier=proc_index): + with ProcessInstanceQueueService.dequeued(process_instance): ProcessInstanceService.run_process_instance_with_processor( - process_instance, execution_strategy_name="run_current_ready_tasks", additional_processing_identifier=proc_index + process_instance, execution_strategy_name="run_current_ready_tasks" ) _processor, task_runnability = ProcessInstanceService.run_process_instance_with_processor( process_instance, execution_strategy_name="queue_instructions_for_end_user", - additional_processing_identifier=proc_index, ) # currently, whenever we get a task_guid, that means that that task, which was a future task, is ready to run. # there is an assumption that it was successfully processed by run_process_instance_with_processor above. diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py index 461c42d58..b302506f4 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py @@ -1,4 +1,3 @@ -import os from typing import Any from flask import g @@ -91,8 +90,6 @@ class MessageService: return None try: - # currently only controllers and apscheduler call this - cls.raise_if_running_in_celery("correlate_send_message") with ProcessInstanceQueueService.dequeued(receiving_process_instance): # Set the receiving message to running, so it is not altered elswhere ... message_instance_receive.status = "running" @@ -144,7 +141,6 @@ class MessageService: user: UserModel, ) -> ProcessInstanceModel: """Start up a process instance, so it is ready to catch the event.""" - cls.raise_if_running_in_celery("start_process_with_message") receiving_process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier( message_triggerable_process_model.process_model_identifier, user, @@ -309,11 +305,3 @@ class MessageService: ) ) return receiving_process_instance - - @classmethod - def raise_if_running_in_celery(cls, method_name: str) -> None: - if os.environ.get("SPIFFWORKFLOW_BACKEND_RUNNING_IN_CELERY_WORKER") == "true": - raise MessageServiceError( - f"Calling {method_name} in a celery worker. This is not supported! We may need to add" - " additional_processing_identifier to this code path." - ) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_lock_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_lock_service.py index 156bf990e..997356310 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_lock_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_lock_service.py @@ -2,6 +2,7 @@ import threading import time from typing import Any +from billiard import current_process # type: ignore from flask import current_app from sqlalchemy import and_ from sqlalchemy import or_ @@ -15,14 +16,27 @@ class ExpectedLockNotFoundError(Exception): class ProcessInstanceLockService: - """TODO: comment.""" + # when we lock process instances, we need to make sure we do not use the same locking identifier + # as anything else, or else we will use their lock and be unintentionally stomping on the same + # process instance as them. this happened with celery workers. we generated a guid on startup + # in backend, but this same guid was used by all concurrent celery workers. to mitigate this, + # and make sure they weren't trying to use each others locks, we found out about billiard.current_process(), + # which can give us a unique index for each worker even if they are running in the same python process. + # if we are not in celery, get_current_process_index will return None, and that is also fine, since + # if we are not in celery, there is no concern about multiple things happening at once in a process (other than. + # theading, which is accounted for by the thread_id). + @classmethod + def get_current_process_index(cls) -> Any: + process = current_process() + index = getattr(process, "index", None) + return index @classmethod - def set_thread_local_locking_context(cls, domain: str, additional_processing_identifier: str | None = None) -> None: + def set_thread_local_locking_context(cls, domain: str) -> None: tld = current_app.config["THREAD_LOCAL_DATA"] if not hasattr(tld, "lock_service_context"): tld.lock_service_context = {} - tld.lock_service_context[additional_processing_identifier] = { + tld.lock_service_context[cls.get_current_process_index()] = { "domain": domain, "uuid": current_app.config["PROCESS_UUID"], "thread_id": threading.get_ident(), @@ -30,39 +44,37 @@ class ProcessInstanceLockService: } @classmethod - def get_thread_local_locking_context(cls, additional_processing_identifier: str | None = None) -> dict[str, Any]: + def get_thread_local_locking_context(cls) -> dict[str, Any]: tld = current_app.config["THREAD_LOCAL_DATA"] if not hasattr(tld, "lock_service_context"): - cls.set_thread_local_locking_context("web", additional_processing_identifier=additional_processing_identifier) - return tld.lock_service_context[additional_processing_identifier] # type: ignore + cls.set_thread_local_locking_context("web") + return tld.lock_service_context[cls.get_current_process_index()] # type: ignore @classmethod - def locked_by(cls, additional_processing_identifier: str | None = None) -> str: - ctx = cls.get_thread_local_locking_context(additional_processing_identifier=additional_processing_identifier) - return f"{ctx['domain']}:{ctx['uuid']}:{ctx['thread_id']}:{additional_processing_identifier}" + def locked_by(cls) -> str: + ctx = cls.get_thread_local_locking_context() + return f"{ctx['domain']}:{ctx['uuid']}:{ctx['thread_id']}:{cls.get_current_process_index()}" @classmethod - def lock( - cls, process_instance_id: int, queue_entry: ProcessInstanceQueueModel, additional_processing_identifier: str | None = None - ) -> None: - ctx = cls.get_thread_local_locking_context(additional_processing_identifier=additional_processing_identifier) + def lock(cls, process_instance_id: int, queue_entry: ProcessInstanceQueueModel) -> None: + ctx = cls.get_thread_local_locking_context() ctx["locks"][process_instance_id] = queue_entry.id @classmethod - def unlock(cls, process_instance_id: int, additional_processing_identifier: str | None = None) -> int: - queue_model_id = cls.try_unlock(process_instance_id, additional_processing_identifier=additional_processing_identifier) + def unlock(cls, process_instance_id: int) -> int: + queue_model_id = cls.try_unlock(process_instance_id) if queue_model_id is None: raise ExpectedLockNotFoundError(f"Could not find a lock for process instance: {process_instance_id}") return queue_model_id @classmethod - def try_unlock(cls, process_instance_id: int, additional_processing_identifier: str | None = None) -> int | None: - ctx = cls.get_thread_local_locking_context(additional_processing_identifier=additional_processing_identifier) + def try_unlock(cls, process_instance_id: int) -> int | None: + ctx = cls.get_thread_local_locking_context() return ctx["locks"].pop(process_instance_id, None) # type: ignore @classmethod - def has_lock(cls, process_instance_id: int, additional_processing_identifier: str | None = None) -> bool: - ctx = cls.get_thread_local_locking_context(additional_processing_identifier=additional_processing_identifier) + def has_lock(cls, process_instance_id: int) -> bool: + ctx = cls.get_thread_local_locking_context() current_app.logger.info(f"THREAD LOCK: {ctx}") return process_instance_id in ctx["locks"] diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 1bd577db9..438ddf847 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -419,14 +419,12 @@ class ProcessInstanceProcessor: script_engine: PythonScriptEngine | None = None, workflow_completed_handler: WorkflowCompletedHandler | None = None, process_id_to_run: str | None = None, - additional_processing_identifier: str | None = None, include_task_data_for_completed_tasks: bool = False, include_completed_subprocesses: bool = False, ) -> None: """Create a Workflow Processor based on the serialized information available in the process_instance model.""" self._script_engine = script_engine or self.__class__._default_script_engine self._workflow_completed_handler = workflow_completed_handler - self.additional_processing_identifier = additional_processing_identifier self.setup_processor_with_process_instance( process_instance_model=process_instance_model, process_id_to_run=process_id_to_run, @@ -1419,9 +1417,7 @@ class ProcessInstanceProcessor: execution_strategy: ExecutionStrategy | None = None, ) -> TaskRunnability: if self.process_instance_model.persistence_level != "none": - with ProcessInstanceQueueService.dequeued( - self.process_instance_model, additional_processing_identifier=self.additional_processing_identifier - ): + with ProcessInstanceQueueService.dequeued(self.process_instance_model): # TODO: ideally we just lock in the execution service, but not sure # about _add_bpmn_process_definitions and if that needs to happen in # the same lock like it does on main @@ -1468,7 +1464,6 @@ class ProcessInstanceProcessor: execution_strategy, self._script_engine.environment.finalize_result, self.save, - additional_processing_identifier=self.additional_processing_identifier, ) task_runnability = execution_service.run_and_save(exit_at, save) self.check_all_tasks() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py index d5ecb3440..f0845a1c6 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py @@ -40,10 +40,8 @@ class ProcessInstanceQueueService: cls._configure_and_save_queue_entry(process_instance, queue_entry) @classmethod - def _enqueue(cls, process_instance: ProcessInstanceModel, additional_processing_identifier: str | None = None) -> None: - queue_entry_id = ProcessInstanceLockService.unlock( - process_instance.id, additional_processing_identifier=additional_processing_identifier - ) + def _enqueue(cls, process_instance: ProcessInstanceModel) -> None: + queue_entry_id = ProcessInstanceLockService.unlock(process_instance.id) queue_entry = ProcessInstanceQueueModel.query.filter_by(id=queue_entry_id).first() if queue_entry is None: raise ExpectedLockNotFoundError(f"Could not find a lock for process instance: {process_instance.id}") @@ -53,8 +51,8 @@ class ProcessInstanceQueueService: cls._configure_and_save_queue_entry(process_instance, queue_entry) @classmethod - def _dequeue(cls, process_instance: ProcessInstanceModel, additional_processing_identifier: str | None = None) -> None: - locked_by = ProcessInstanceLockService.locked_by(additional_processing_identifier=additional_processing_identifier) + def _dequeue(cls, process_instance: ProcessInstanceModel) -> None: + locked_by = ProcessInstanceLockService.locked_by() current_time = round(time.time()) db.session.query(ProcessInstanceQueueModel).filter( @@ -89,22 +87,19 @@ class ProcessInstanceQueueService: f"{locked_by} cannot lock process instance {process_instance.id}. {message}" ) - ProcessInstanceLockService.lock( - process_instance.id, queue_entry, additional_processing_identifier=additional_processing_identifier - ) + ProcessInstanceLockService.lock(process_instance.id, queue_entry) @classmethod def _dequeue_with_retries( cls, process_instance: ProcessInstanceModel, - additional_processing_identifier: str | None = None, max_attempts: int = 1, ) -> None: attempt = 1 backoff_factor = 2 while True: try: - return cls._dequeue(process_instance, additional_processing_identifier=additional_processing_identifier) + return cls._dequeue(process_instance) except ProcessInstanceIsAlreadyLockedError as exception: if attempt >= max_attempts: raise exception @@ -116,19 +111,14 @@ class ProcessInstanceQueueService: def dequeued( cls, process_instance: ProcessInstanceModel, - additional_processing_identifier: str | None = None, max_attempts: int = 1, ) -> Generator[None, None, None]: - reentering_lock = ProcessInstanceLockService.has_lock( - process_instance.id, additional_processing_identifier=additional_processing_identifier - ) + reentering_lock = ProcessInstanceLockService.has_lock(process_instance.id) if not reentering_lock: # this can blow up with ProcessInstanceIsNotEnqueuedError or ProcessInstanceIsAlreadyLockedError # that's fine, let it bubble up. and in that case, there's no need to _enqueue / unlock - cls._dequeue_with_retries( - process_instance, additional_processing_identifier=additional_processing_identifier, max_attempts=max_attempts - ) + cls._dequeue_with_retries(process_instance, max_attempts=max_attempts) try: yield except Exception as ex: @@ -142,7 +132,7 @@ class ProcessInstanceQueueService: raise ex finally: if not reentering_lock: - cls._enqueue(process_instance, additional_processing_identifier=additional_processing_identifier) + cls._enqueue(process_instance) @classmethod def entries_with_status( diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py index e4abe2979..ead3d5ec0 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py @@ -278,18 +278,14 @@ class ProcessInstanceService: process_instance: ProcessInstanceModel, status_value: str | None = None, execution_strategy_name: str | None = None, - additional_processing_identifier: str | None = None, ) -> tuple[ProcessInstanceProcessor | None, TaskRunnability]: processor = None task_runnability = TaskRunnability.unknown_if_ready_tasks - with ProcessInstanceQueueService.dequeued( - process_instance, additional_processing_identifier=additional_processing_identifier - ): + with ProcessInstanceQueueService.dequeued(process_instance): ProcessInstanceMigrator.run(process_instance) processor = ProcessInstanceProcessor( process_instance, workflow_completed_handler=cls.schedule_next_process_model_cycle, - additional_processing_identifier=additional_processing_identifier, ) # if status_value is user_input_required (we are processing instances with that status from background processor), diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index b54c59bfa..58a7464c5 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -442,14 +442,12 @@ class WorkflowExecutionService: execution_strategy: ExecutionStrategy, process_instance_completer: ProcessInstanceCompleter, process_instance_saver: ProcessInstanceSaver, - additional_processing_identifier: str | None = None, ): self.bpmn_process_instance = bpmn_process_instance self.process_instance_model = process_instance_model self.execution_strategy = execution_strategy self.process_instance_completer = process_instance_completer self.process_instance_saver = process_instance_saver - self.additional_processing_identifier = additional_processing_identifier # names of methods that do spiff stuff: # processor.do_engine_steps calls: @@ -458,11 +456,7 @@ class WorkflowExecutionService: # spiff.[some_run_task_method] def run_and_save(self, exit_at: None = None, save: bool = False) -> TaskRunnability: if self.process_instance_model.persistence_level != "none": - with safe_assertion( - ProcessInstanceLockService.has_lock( - self.process_instance_model.id, additional_processing_identifier=self.additional_processing_identifier - ) - ) as tripped: + with safe_assertion(ProcessInstanceLockService.has_lock(self.process_instance_model.id)) as tripped: if tripped: raise AssertionError( "The current thread has not obtained a lock for this process"