Remove add proce info (#1526)

* call the proc index when we need it rather than passing it through w/ burnettk

* removed all uses of additional_processing_identifier w/ burnettk

* added comment about why we need current proc index and made coderabbit suggestion w/ burnettk

---------

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
jasquat 2024-05-09 20:53:36 +00:00 committed by GitHub
parent ac83a4e639
commit 50754b9d33
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 46 additions and 73 deletions

View File

@ -84,7 +84,6 @@ def _add_jobs_that_should_run_regardless_of_celery_config(app: flask.app.Flask,
]
# TODO: see if we can queue with celery instead on celery based configuration
# NOTE: pass in additional_processing_identifier if we move to celery
scheduler.add_job(
BackgroundProcessingService(app).process_message_instances_with_app_context,
"interval",

View File

@ -29,7 +29,7 @@ class SpiffCeleryWorkerError(Exception):
@shared_task(ignore_result=False, time_limit=ten_minutes)
def celery_task_process_instance_run(process_instance_id: int, task_guid: str | None = None) -> dict:
proc_index = current_process().index
ProcessInstanceLockService.set_thread_local_locking_context("celery:worker", additional_processing_identifier=proc_index)
ProcessInstanceLockService.set_thread_local_locking_context("celery:worker")
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first()
if task_guid is None and ProcessInstanceTmpService.is_enqueued_to_run_in_the_future(process_instance):
@ -41,14 +41,13 @@ def celery_task_process_instance_run(process_instance_id: int, task_guid: str |
}
try:
task_guid_for_requeueing = task_guid
with ProcessInstanceQueueService.dequeued(process_instance, additional_processing_identifier=proc_index):
with ProcessInstanceQueueService.dequeued(process_instance):
ProcessInstanceService.run_process_instance_with_processor(
process_instance, execution_strategy_name="run_current_ready_tasks", additional_processing_identifier=proc_index
process_instance, execution_strategy_name="run_current_ready_tasks"
)
_processor, task_runnability = ProcessInstanceService.run_process_instance_with_processor(
process_instance,
execution_strategy_name="queue_instructions_for_end_user",
additional_processing_identifier=proc_index,
)
# currently, whenever we get a task_guid, that means that that task, which was a future task, is ready to run.
# there is an assumption that it was successfully processed by run_process_instance_with_processor above.

View File

@ -1,4 +1,3 @@
import os
from typing import Any
from flask import g
@ -91,8 +90,6 @@ class MessageService:
return None
try:
# currently only controllers and apscheduler call this
cls.raise_if_running_in_celery("correlate_send_message")
with ProcessInstanceQueueService.dequeued(receiving_process_instance):
# Set the receiving message to running, so it is not altered elswhere ...
message_instance_receive.status = "running"
@ -144,7 +141,6 @@ class MessageService:
user: UserModel,
) -> ProcessInstanceModel:
"""Start up a process instance, so it is ready to catch the event."""
cls.raise_if_running_in_celery("start_process_with_message")
receiving_process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier(
message_triggerable_process_model.process_model_identifier,
user,
@ -309,11 +305,3 @@ class MessageService:
)
)
return receiving_process_instance
@classmethod
def raise_if_running_in_celery(cls, method_name: str) -> None:
if os.environ.get("SPIFFWORKFLOW_BACKEND_RUNNING_IN_CELERY_WORKER") == "true":
raise MessageServiceError(
f"Calling {method_name} in a celery worker. This is not supported! We may need to add"
" additional_processing_identifier to this code path."
)

View File

@ -2,6 +2,7 @@ import threading
import time
from typing import Any
from billiard import current_process # type: ignore
from flask import current_app
from sqlalchemy import and_
from sqlalchemy import or_
@ -15,14 +16,27 @@ class ExpectedLockNotFoundError(Exception):
class ProcessInstanceLockService:
"""TODO: comment."""
# when we lock process instances, we need to make sure we do not use the same locking identifier
# as anything else, or else we will use their lock and be unintentionally stomping on the same
# process instance as them. this happened with celery workers. we generated a guid on startup
# in backend, but this same guid was used by all concurrent celery workers. to mitigate this,
# and make sure they weren't trying to use each others locks, we found out about billiard.current_process(),
# which can give us a unique index for each worker even if they are running in the same python process.
# if we are not in celery, get_current_process_index will return None, and that is also fine, since
# if we are not in celery, there is no concern about multiple things happening at once in a process (other than.
# theading, which is accounted for by the thread_id).
@classmethod
def get_current_process_index(cls) -> Any:
process = current_process()
index = getattr(process, "index", None)
return index
@classmethod
def set_thread_local_locking_context(cls, domain: str, additional_processing_identifier: str | None = None) -> None:
def set_thread_local_locking_context(cls, domain: str) -> None:
tld = current_app.config["THREAD_LOCAL_DATA"]
if not hasattr(tld, "lock_service_context"):
tld.lock_service_context = {}
tld.lock_service_context[additional_processing_identifier] = {
tld.lock_service_context[cls.get_current_process_index()] = {
"domain": domain,
"uuid": current_app.config["PROCESS_UUID"],
"thread_id": threading.get_ident(),
@ -30,39 +44,37 @@ class ProcessInstanceLockService:
}
@classmethod
def get_thread_local_locking_context(cls, additional_processing_identifier: str | None = None) -> dict[str, Any]:
def get_thread_local_locking_context(cls) -> dict[str, Any]:
tld = current_app.config["THREAD_LOCAL_DATA"]
if not hasattr(tld, "lock_service_context"):
cls.set_thread_local_locking_context("web", additional_processing_identifier=additional_processing_identifier)
return tld.lock_service_context[additional_processing_identifier] # type: ignore
cls.set_thread_local_locking_context("web")
return tld.lock_service_context[cls.get_current_process_index()] # type: ignore
@classmethod
def locked_by(cls, additional_processing_identifier: str | None = None) -> str:
ctx = cls.get_thread_local_locking_context(additional_processing_identifier=additional_processing_identifier)
return f"{ctx['domain']}:{ctx['uuid']}:{ctx['thread_id']}:{additional_processing_identifier}"
def locked_by(cls) -> str:
ctx = cls.get_thread_local_locking_context()
return f"{ctx['domain']}:{ctx['uuid']}:{ctx['thread_id']}:{cls.get_current_process_index()}"
@classmethod
def lock(
cls, process_instance_id: int, queue_entry: ProcessInstanceQueueModel, additional_processing_identifier: str | None = None
) -> None:
ctx = cls.get_thread_local_locking_context(additional_processing_identifier=additional_processing_identifier)
def lock(cls, process_instance_id: int, queue_entry: ProcessInstanceQueueModel) -> None:
ctx = cls.get_thread_local_locking_context()
ctx["locks"][process_instance_id] = queue_entry.id
@classmethod
def unlock(cls, process_instance_id: int, additional_processing_identifier: str | None = None) -> int:
queue_model_id = cls.try_unlock(process_instance_id, additional_processing_identifier=additional_processing_identifier)
def unlock(cls, process_instance_id: int) -> int:
queue_model_id = cls.try_unlock(process_instance_id)
if queue_model_id is None:
raise ExpectedLockNotFoundError(f"Could not find a lock for process instance: {process_instance_id}")
return queue_model_id
@classmethod
def try_unlock(cls, process_instance_id: int, additional_processing_identifier: str | None = None) -> int | None:
ctx = cls.get_thread_local_locking_context(additional_processing_identifier=additional_processing_identifier)
def try_unlock(cls, process_instance_id: int) -> int | None:
ctx = cls.get_thread_local_locking_context()
return ctx["locks"].pop(process_instance_id, None) # type: ignore
@classmethod
def has_lock(cls, process_instance_id: int, additional_processing_identifier: str | None = None) -> bool:
ctx = cls.get_thread_local_locking_context(additional_processing_identifier=additional_processing_identifier)
def has_lock(cls, process_instance_id: int) -> bool:
ctx = cls.get_thread_local_locking_context()
current_app.logger.info(f"THREAD LOCK: {ctx}")
return process_instance_id in ctx["locks"]

View File

@ -419,14 +419,12 @@ class ProcessInstanceProcessor:
script_engine: PythonScriptEngine | None = None,
workflow_completed_handler: WorkflowCompletedHandler | None = None,
process_id_to_run: str | None = None,
additional_processing_identifier: str | None = None,
include_task_data_for_completed_tasks: bool = False,
include_completed_subprocesses: bool = False,
) -> None:
"""Create a Workflow Processor based on the serialized information available in the process_instance model."""
self._script_engine = script_engine or self.__class__._default_script_engine
self._workflow_completed_handler = workflow_completed_handler
self.additional_processing_identifier = additional_processing_identifier
self.setup_processor_with_process_instance(
process_instance_model=process_instance_model,
process_id_to_run=process_id_to_run,
@ -1419,9 +1417,7 @@ class ProcessInstanceProcessor:
execution_strategy: ExecutionStrategy | None = None,
) -> TaskRunnability:
if self.process_instance_model.persistence_level != "none":
with ProcessInstanceQueueService.dequeued(
self.process_instance_model, additional_processing_identifier=self.additional_processing_identifier
):
with ProcessInstanceQueueService.dequeued(self.process_instance_model):
# TODO: ideally we just lock in the execution service, but not sure
# about _add_bpmn_process_definitions and if that needs to happen in
# the same lock like it does on main
@ -1468,7 +1464,6 @@ class ProcessInstanceProcessor:
execution_strategy,
self._script_engine.environment.finalize_result,
self.save,
additional_processing_identifier=self.additional_processing_identifier,
)
task_runnability = execution_service.run_and_save(exit_at, save)
self.check_all_tasks()

View File

@ -40,10 +40,8 @@ class ProcessInstanceQueueService:
cls._configure_and_save_queue_entry(process_instance, queue_entry)
@classmethod
def _enqueue(cls, process_instance: ProcessInstanceModel, additional_processing_identifier: str | None = None) -> None:
queue_entry_id = ProcessInstanceLockService.unlock(
process_instance.id, additional_processing_identifier=additional_processing_identifier
)
def _enqueue(cls, process_instance: ProcessInstanceModel) -> None:
queue_entry_id = ProcessInstanceLockService.unlock(process_instance.id)
queue_entry = ProcessInstanceQueueModel.query.filter_by(id=queue_entry_id).first()
if queue_entry is None:
raise ExpectedLockNotFoundError(f"Could not find a lock for process instance: {process_instance.id}")
@ -53,8 +51,8 @@ class ProcessInstanceQueueService:
cls._configure_and_save_queue_entry(process_instance, queue_entry)
@classmethod
def _dequeue(cls, process_instance: ProcessInstanceModel, additional_processing_identifier: str | None = None) -> None:
locked_by = ProcessInstanceLockService.locked_by(additional_processing_identifier=additional_processing_identifier)
def _dequeue(cls, process_instance: ProcessInstanceModel) -> None:
locked_by = ProcessInstanceLockService.locked_by()
current_time = round(time.time())
db.session.query(ProcessInstanceQueueModel).filter(
@ -89,22 +87,19 @@ class ProcessInstanceQueueService:
f"{locked_by} cannot lock process instance {process_instance.id}. {message}"
)
ProcessInstanceLockService.lock(
process_instance.id, queue_entry, additional_processing_identifier=additional_processing_identifier
)
ProcessInstanceLockService.lock(process_instance.id, queue_entry)
@classmethod
def _dequeue_with_retries(
cls,
process_instance: ProcessInstanceModel,
additional_processing_identifier: str | None = None,
max_attempts: int = 1,
) -> None:
attempt = 1
backoff_factor = 2
while True:
try:
return cls._dequeue(process_instance, additional_processing_identifier=additional_processing_identifier)
return cls._dequeue(process_instance)
except ProcessInstanceIsAlreadyLockedError as exception:
if attempt >= max_attempts:
raise exception
@ -116,19 +111,14 @@ class ProcessInstanceQueueService:
def dequeued(
cls,
process_instance: ProcessInstanceModel,
additional_processing_identifier: str | None = None,
max_attempts: int = 1,
) -> Generator[None, None, None]:
reentering_lock = ProcessInstanceLockService.has_lock(
process_instance.id, additional_processing_identifier=additional_processing_identifier
)
reentering_lock = ProcessInstanceLockService.has_lock(process_instance.id)
if not reentering_lock:
# this can blow up with ProcessInstanceIsNotEnqueuedError or ProcessInstanceIsAlreadyLockedError
# that's fine, let it bubble up. and in that case, there's no need to _enqueue / unlock
cls._dequeue_with_retries(
process_instance, additional_processing_identifier=additional_processing_identifier, max_attempts=max_attempts
)
cls._dequeue_with_retries(process_instance, max_attempts=max_attempts)
try:
yield
except Exception as ex:
@ -142,7 +132,7 @@ class ProcessInstanceQueueService:
raise ex
finally:
if not reentering_lock:
cls._enqueue(process_instance, additional_processing_identifier=additional_processing_identifier)
cls._enqueue(process_instance)
@classmethod
def entries_with_status(

View File

@ -278,18 +278,14 @@ class ProcessInstanceService:
process_instance: ProcessInstanceModel,
status_value: str | None = None,
execution_strategy_name: str | None = None,
additional_processing_identifier: str | None = None,
) -> tuple[ProcessInstanceProcessor | None, TaskRunnability]:
processor = None
task_runnability = TaskRunnability.unknown_if_ready_tasks
with ProcessInstanceQueueService.dequeued(
process_instance, additional_processing_identifier=additional_processing_identifier
):
with ProcessInstanceQueueService.dequeued(process_instance):
ProcessInstanceMigrator.run(process_instance)
processor = ProcessInstanceProcessor(
process_instance,
workflow_completed_handler=cls.schedule_next_process_model_cycle,
additional_processing_identifier=additional_processing_identifier,
)
# if status_value is user_input_required (we are processing instances with that status from background processor),

View File

@ -442,14 +442,12 @@ class WorkflowExecutionService:
execution_strategy: ExecutionStrategy,
process_instance_completer: ProcessInstanceCompleter,
process_instance_saver: ProcessInstanceSaver,
additional_processing_identifier: str | None = None,
):
self.bpmn_process_instance = bpmn_process_instance
self.process_instance_model = process_instance_model
self.execution_strategy = execution_strategy
self.process_instance_completer = process_instance_completer
self.process_instance_saver = process_instance_saver
self.additional_processing_identifier = additional_processing_identifier
# names of methods that do spiff stuff:
# processor.do_engine_steps calls:
@ -458,11 +456,7 @@ class WorkflowExecutionService:
# spiff.[some_run_task_method]
def run_and_save(self, exit_at: None = None, save: bool = False) -> TaskRunnability:
if self.process_instance_model.persistence_level != "none":
with safe_assertion(
ProcessInstanceLockService.has_lock(
self.process_instance_model.id, additional_processing_identifier=self.additional_processing_identifier
)
) as tripped:
with safe_assertion(ProcessInstanceLockService.has_lock(self.process_instance_model.id)) as tripped:
if tripped:
raise AssertionError(
"The current thread has not obtained a lock for this process"