do not queue a process instance for celery if the instance is currently locked w/ burnettk (#1288)

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
jasquat 2024-03-27 18:06:12 +00:00 committed by GitHub
parent 4a373be939
commit f508722b5b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 62 additions and 43 deletions

View File

@ -27,7 +27,7 @@ def celery_task_process_instance_run(process_instance_id: int, task_guid: str |
ProcessInstanceService.run_process_instance_with_processor(
process_instance, execution_strategy_name="run_current_ready_tasks", additional_processing_identifier=proc_index
)
processor, task_runnability = ProcessInstanceService.run_process_instance_with_processor(
_processor, task_runnability = ProcessInstanceService.run_process_instance_with_processor(
process_instance,
execution_strategy_name="queue_instructions_for_end_user",
additional_processing_identifier=proc_index,

View File

@ -5,8 +5,10 @@ from flask import current_app
from spiffworkflow_backend.background_processing import CELERY_TASK_PROCESS_INSTANCE_RUN
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.exceptions.error import PublishingAttemptWhileLockedError
from spiffworkflow_backend.helpers.spiff_enum import ProcessInstanceExecutionMode
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.services.process_instance_lock_service import ProcessInstanceLockService
def queue_enabled_for_process_model(process_instance: ProcessInstanceModel) -> bool:
@ -57,6 +59,14 @@ def queue_future_task_if_appropriate(process_instance: ProcessInstanceModel, eta
# if waiting, check all waiting tasks and see if theyt are timers. if they are timers, it's not runnable.
def queue_process_instance_if_appropriate(process_instance: ProcessInstanceModel, execution_mode: str | None = None) -> bool:
# ideally, if this code were run from the backgrond processing celery worker,
# we should be passing in the additional processing identifier,
# but we don't have it, so basically this assertion won't help there. at least it will help find issues with non-celery code.
if ProcessInstanceLockService.has_lock(process_instance_id=process_instance.id):
raise PublishingAttemptWhileLockedError(
f"Attempted to queue task for process instance {process_instance.id} while the process already has it locked. This"
" can lead to further locking issues."
)
if should_queue_process_instance(process_instance, execution_mode):
celery.current_app.send_task(CELERY_TASK_PROCESS_INSTANCE_RUN, (process_instance.id,))
return True

View File

@ -59,3 +59,7 @@ class InvalidRedirectUrlError(Exception):
class TaskMismatchError(Exception):
pass
class PublishingAttemptWhileLockedError(Exception):
pass

View File

@ -487,15 +487,14 @@ def _task_submit_shared(
)
with sentry_sdk.start_span(op="task", description="complete_form_task"):
with ProcessInstanceQueueService.dequeued(process_instance, max_attempts=3):
ProcessInstanceService.complete_form_task(
processor=processor,
spiff_task=spiff_task,
data=body,
user=g.user,
human_task=human_task,
execution_mode=execution_mode,
)
ProcessInstanceService.complete_form_task(
processor=processor,
spiff_task=spiff_task,
data=body,
user=g.user,
human_task=human_task,
execution_mode=execution_mode,
)
# currently task_model has the potential to be None. This should be removable once
# we backfill the human_task table for task_guid and make that column not nullable

View File

@ -71,17 +71,19 @@ class MessageService:
user: UserModel | None = message_instance_send.user
if user is None:
user = UserService.find_or_create_system_user()
receiving_process = MessageService.start_process_with_message(message_triggerable_process_model, user)
receiving_process_instance = MessageService.start_process_with_message(
message_triggerable_process_model, user
)
message_instance_receive = MessageInstanceModel.query.filter_by(
process_instance_id=receiving_process.id,
process_instance_id=receiving_process_instance.id,
message_type="receive",
status="ready",
).first()
else:
receiving_process = MessageService.get_process_instance_for_message_instance(message_instance_receive)
receiving_process_instance = MessageService.get_process_instance_for_message_instance(message_instance_receive)
# Assure we can send the message, otherwise keep going.
if message_instance_receive is None or not receiving_process.can_receive_message():
if message_instance_receive is None or not receiving_process_instance.can_receive_message():
message_instance_send.status = "ready"
db.session.add(message_instance_send)
db.session.commit()
@ -90,12 +92,12 @@ class MessageService:
try:
# currently only controllers and apscheduler call this
cls.raise_if_running_in_celery("correlate_send_message")
with ProcessInstanceQueueService.dequeued(receiving_process):
with ProcessInstanceQueueService.dequeued(receiving_process_instance):
# Set the receiving message to running, so it is not altered elswhere ...
message_instance_receive.status = "running"
cls.process_message_receive(
receiving_process, message_instance_receive, message_instance_send, execution_mode=execution_mode
receiving_process_instance, message_instance_receive, message_instance_send, execution_mode=execution_mode
)
message_instance_receive.status = "completed"
message_instance_receive.counterpart_id = message_instance_send.id
@ -104,7 +106,9 @@ class MessageService:
message_instance_send.counterpart_id = message_instance_receive.id
db.session.add(message_instance_send)
db.session.commit()
return message_instance_receive
if should_queue_process_instance(receiving_process_instance, execution_mode=execution_mode):
queue_process_instance_if_appropriate(receiving_process_instance, execution_mode=execution_mode)
return message_instance_receive
except ProcessInstanceIsAlreadyLockedError:
message_instance_send.status = "ready"
@ -140,22 +144,22 @@ class MessageService:
) -> ProcessInstanceModel:
"""Start up a process instance, so it is ready to catch the event."""
cls.raise_if_running_in_celery("start_process_with_message")
process_instance_receive = ProcessInstanceService.create_process_instance_from_process_model_identifier(
receiving_process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier(
message_triggerable_process_model.process_model_identifier,
user,
)
with ProcessInstanceQueueService.dequeued(process_instance_receive):
processor_receive = ProcessInstanceProcessor(process_instance_receive)
with ProcessInstanceQueueService.dequeued(receiving_process_instance):
processor_receive = ProcessInstanceProcessor(receiving_process_instance)
cls._cancel_non_matching_start_events(processor_receive, message_triggerable_process_model)
processor_receive.save()
processor_receive.do_engine_steps(save=True)
return process_instance_receive
return receiving_process_instance
@staticmethod
def process_message_receive(
process_instance_receive: ProcessInstanceModel,
receiving_process_instance: ProcessInstanceModel,
message_instance_receive: MessageInstanceModel,
message_instance_send: MessageInstanceModel,
execution_mode: str | None = None,
@ -178,16 +182,15 @@ class MessageService:
payload=message_instance_send.payload,
correlations=message_instance_send.correlation_keys,
)
processor_receive = ProcessInstanceProcessor(process_instance_receive)
processor_receive = ProcessInstanceProcessor(receiving_process_instance)
processor_receive.bpmn_process_instance.send_event(bpmn_event)
execution_strategy_name = None
if should_queue_process_instance(process_instance_receive, execution_mode=execution_mode):
if should_queue_process_instance(receiving_process_instance, execution_mode=execution_mode):
# even if we are queueing, we ran a "send_event" call up above, and it updated some tasks.
# we need to serialize these task updates to the db. do_engine_steps with save does that.
processor_receive.do_engine_steps(save=True, execution_strategy_name="run_current_ready_tasks")
queue_process_instance_if_appropriate(process_instance_receive, execution_mode=execution_mode)
elif not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance_receive):
elif not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(receiving_process_instance):
execution_strategy_name = None
if execution_mode == ProcessInstanceExecutionMode.synchronous.value:
execution_strategy_name = "greedy"
@ -291,10 +294,10 @@ class MessageService:
def get_process_instance_for_message_instance(
message_instance_receive: MessageInstanceModel,
) -> ProcessInstanceModel:
process_instance_receive: ProcessInstanceModel = ProcessInstanceModel.query.filter_by(
receiving_process_instance: ProcessInstanceModel = ProcessInstanceModel.query.filter_by(
id=message_instance_receive.process_instance_id
).first()
if process_instance_receive is None:
if receiving_process_instance is None:
raise MessageServiceError(
(
(
@ -304,7 +307,7 @@ class MessageService:
),
)
)
return process_instance_receive
return receiving_process_instance
@classmethod
def raise_if_running_in_celery(cls, method_name: str) -> None:

View File

@ -66,7 +66,6 @@ class ProcessInstanceQueueService:
"locked_at_in_seconds": current_time,
}
)
db.session.commit()
queue_entry = (
@ -83,9 +82,11 @@ class ProcessInstanceQueueService:
)
if queue_entry.locked_by != locked_by:
message = f"It has already been locked by {queue_entry.locked_by}."
if queue_entry.locked_by is None:
message = "It was locked by something else when we tried to lock it in the db, but it has since been unlocked."
raise ProcessInstanceIsAlreadyLockedError(
f"{locked_by} cannot lock process instance {process_instance.id}. "
f"It has already been locked by {queue_entry.locked_by}."
f"{locked_by} cannot lock process instance {process_instance.id}. {message}"
)
ProcessInstanceLockService.lock(

View File

@ -477,26 +477,28 @@ class ProcessInstanceService:
Abstracted here because we need to do it multiple times when completing all tasks in
a multi-instance task.
"""
ProcessInstanceService.update_form_task_data(processor.process_instance_model, spiff_task, data, user)
# ProcessInstanceService.post_process_form(spiff_task) # some properties may update the data store.
processor.complete_task(spiff_task, human_task, user=user)
with ProcessInstanceQueueService.dequeued(processor.process_instance_model, max_attempts=3):
ProcessInstanceService.update_form_task_data(processor.process_instance_model, spiff_task, data, user)
processor.complete_task(spiff_task, human_task, user=user)
if queue_process_instance_if_appropriate(processor.process_instance_model, execution_mode):
return
elif not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(processor.process_instance_model):
with sentry_sdk.start_span(op="task", description="backend_do_engine_steps"):
execution_strategy_name = None
if execution_mode == ProcessInstanceExecutionMode.synchronous.value:
execution_strategy_name = "greedy"
else:
with ProcessInstanceQueueService.dequeued(processor.process_instance_model, max_attempts=3):
if not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(processor.process_instance_model):
with sentry_sdk.start_span(op="task", description="backend_do_engine_steps"):
execution_strategy_name = None
if execution_mode == ProcessInstanceExecutionMode.synchronous.value:
execution_strategy_name = "greedy"
# maybe move this out once we have the interstitial page since this is here just so we can get the next human task
processor.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name)
# maybe move this out once we have the interstitial page since this is
# here just so we can get the next human task
processor.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name)
@staticmethod
def spiff_task_to_api_task(
processor: ProcessInstanceProcessor,
spiff_task: SpiffTask,
add_docs_and_forms: bool = False,
) -> Task:
task_type = spiff_task.task_spec.description
task_guid = str(spiff_task.id)