Celery start timer fix (#1337)
* check if pi is enqueued to run in the future before running it in celery w/ burnettk * only mark future tasks as complete if the corresponding task is complete w/ burnettk --------- Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
parent
6c1f141848
commit
7ae81f47cf
|
@ -8,10 +8,12 @@ from spiffworkflow_backend.background_processing.celery_tasks.process_instance_t
|
|||
from spiffworkflow_backend.models.db import db
|
||||
from spiffworkflow_backend.models.future_task import FutureTaskModel
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
|
||||
from spiffworkflow_backend.services.process_instance_lock_service import ProcessInstanceLockService
|
||||
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceIsAlreadyLockedError
|
||||
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService
|
||||
from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService
|
||||
from spiffworkflow_backend.services.process_instance_tmp_service import ProcessInstanceTmpService
|
||||
from spiffworkflow_backend.services.workflow_execution_service import TaskRunnability
|
||||
|
||||
ten_minutes = 60 * 10
|
||||
|
@ -26,7 +28,16 @@ def celery_task_process_instance_run(process_instance_id: int, task_guid: str |
|
|||
proc_index = current_process().index
|
||||
ProcessInstanceLockService.set_thread_local_locking_context("celery:worker", additional_processing_identifier=proc_index)
|
||||
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first()
|
||||
|
||||
if task_guid is None and ProcessInstanceTmpService.is_enqueued_to_run_in_the_future(process_instance):
|
||||
return {
|
||||
"ok": True,
|
||||
"process_instance_id": process_instance_id,
|
||||
"task_guid": task_guid,
|
||||
"message": "Skipped because the process instance is set to run in the future.",
|
||||
}
|
||||
try:
|
||||
task_guid_for_requeueing = task_guid
|
||||
with ProcessInstanceQueueService.dequeued(process_instance, additional_processing_identifier=proc_index):
|
||||
ProcessInstanceService.run_process_instance_with_processor(
|
||||
process_instance, execution_strategy_name="run_current_ready_tasks", additional_processing_identifier=proc_index
|
||||
|
@ -40,13 +51,20 @@ def celery_task_process_instance_run(process_instance_id: int, task_guid: str |
|
|||
# there is an assumption that it was successfully processed by run_process_instance_with_processor above.
|
||||
# we might want to check that assumption.
|
||||
if task_guid is not None:
|
||||
future_task = FutureTaskModel.query.filter_by(completed=False, guid=task_guid).first()
|
||||
if future_task is not None:
|
||||
future_task.completed = True
|
||||
db.session.add(future_task)
|
||||
db.session.commit()
|
||||
completed_task_model = (
|
||||
TaskModel.query.filter_by(guid=task_guid)
|
||||
.filter(TaskModel.state.in_(["COMPLETED", "ERROR", "CANCELLED"])) # type: ignore
|
||||
.first()
|
||||
)
|
||||
if completed_task_model is not None:
|
||||
future_task = FutureTaskModel.query.filter_by(completed=False, guid=task_guid).first()
|
||||
if future_task is not None:
|
||||
future_task.completed = True
|
||||
db.session.add(future_task)
|
||||
db.session.commit()
|
||||
task_guid_for_requeueing = None
|
||||
if task_runnability == TaskRunnability.has_ready_tasks:
|
||||
queue_process_instance_if_appropriate(process_instance)
|
||||
queue_process_instance_if_appropriate(process_instance, task_guid=task_guid_for_requeueing)
|
||||
return {"ok": True, "process_instance_id": process_instance_id, "task_guid": task_guid}
|
||||
except ProcessInstanceIsAlreadyLockedError as exception:
|
||||
current_app.logger.info(
|
||||
|
|
|
@ -57,7 +57,9 @@ def queue_future_task_if_appropriate(process_instance: ProcessInstanceModel, eta
|
|||
|
||||
|
||||
# if waiting, check all waiting tasks and see if theyt are timers. if they are timers, it's not runnable.
|
||||
def queue_process_instance_if_appropriate(process_instance: ProcessInstanceModel, execution_mode: str | None = None) -> bool:
|
||||
def queue_process_instance_if_appropriate(
|
||||
process_instance: ProcessInstanceModel, execution_mode: str | None = None, task_guid: str | None = None
|
||||
) -> bool:
|
||||
# FIXME: we should only run this check if we are NOT in a celery worker
|
||||
#
|
||||
# # ideally, if this code were run from the backgrond processing celery worker,
|
||||
|
@ -71,7 +73,7 @@ def queue_process_instance_if_appropriate(process_instance: ProcessInstanceModel
|
|||
# )
|
||||
|
||||
if should_queue_process_instance(process_instance, execution_mode):
|
||||
async_result = celery.current_app.send_task(CELERY_TASK_PROCESS_INSTANCE_RUN, (process_instance.id,))
|
||||
async_result = celery.current_app.send_task(CELERY_TASK_PROCESS_INSTANCE_RUN, (process_instance.id, task_guid))
|
||||
current_app.logger.info(f"Queueing process instance ({process_instance.id}) for celery ({async_result.task_id})")
|
||||
return True
|
||||
return False
|
||||
|
|
|
@ -662,7 +662,7 @@ def _process_instance_run(
|
|||
ProcessInstanceTmpService.add_event_to_process_instance(process_instance, "process_instance_force_run")
|
||||
if not queue_process_instance_if_appropriate(
|
||||
process_instance, execution_mode=execution_mode
|
||||
) and not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance):
|
||||
) and not ProcessInstanceTmpService.is_enqueued_to_run_in_the_future(process_instance):
|
||||
execution_strategy_name = None
|
||||
if execution_mode == ProcessInstanceExecutionMode.synchronous.value:
|
||||
execution_strategy_name = "greedy"
|
||||
|
|
|
@ -455,7 +455,9 @@ def process_instance_progress(
|
|||
if next_human_task_assigned_to_me:
|
||||
response["task"] = HumanTaskModel.to_task(next_human_task_assigned_to_me)
|
||||
# this may not catch all times we should redirect to instance show page
|
||||
elif not process_instance.is_immediately_runnable():
|
||||
elif not process_instance.is_immediately_runnable() or ProcessInstanceTmpService.is_enqueued_to_run_in_the_future(
|
||||
process_instance
|
||||
):
|
||||
# any time we assign this process_instance, the frontend progress page will redirect to process instance show
|
||||
response["process_instance"] = process_instance
|
||||
|
||||
|
@ -615,7 +617,7 @@ def _dequeued_interstitial_stream(
|
|||
# need something better to show?
|
||||
if execute_tasks:
|
||||
try:
|
||||
if not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance):
|
||||
if not ProcessInstanceTmpService.is_enqueued_to_run_in_the_future(process_instance):
|
||||
with ProcessInstanceQueueService.dequeued(process_instance):
|
||||
ProcessInstanceMigrator.run(process_instance)
|
||||
yield from _interstitial_stream(process_instance, execute_tasks=execute_tasks)
|
||||
|
|
|
@ -25,6 +25,7 @@ from spiffworkflow_backend.services.process_instance_processor import ProcessIns
|
|||
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceIsAlreadyLockedError
|
||||
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService
|
||||
from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService
|
||||
from spiffworkflow_backend.services.process_instance_tmp_service import ProcessInstanceTmpService
|
||||
from spiffworkflow_backend.services.user_service import UserService
|
||||
|
||||
|
||||
|
@ -190,7 +191,7 @@ class MessageService:
|
|||
# even if we are queueing, we ran a "send_event" call up above, and it updated some tasks.
|
||||
# we need to serialize these task updates to the db. do_engine_steps with save does that.
|
||||
processor_receive.do_engine_steps(save=True, execution_strategy_name="run_current_ready_tasks")
|
||||
elif not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(receiving_process_instance):
|
||||
elif not ProcessInstanceTmpService.is_enqueued_to_run_in_the_future(receiving_process_instance):
|
||||
execution_strategy_name = None
|
||||
if execution_mode == ProcessInstanceExecutionMode.synchronous.value:
|
||||
execution_strategy_name = "greedy"
|
||||
|
|
|
@ -174,17 +174,3 @@ class ProcessInstanceQueueService:
|
|||
queue_entries = cls.entries_with_status(status_value, None, run_at_in_seconds_threshold, min_age_in_seconds)
|
||||
ids_with_status = [entry.process_instance_id for entry in queue_entries]
|
||||
return ids_with_status
|
||||
|
||||
@staticmethod
|
||||
def is_enqueued_to_run_in_the_future(process_instance: ProcessInstanceModel) -> bool:
|
||||
queue_entry = (
|
||||
db.session.query(ProcessInstanceQueueModel)
|
||||
.filter(ProcessInstanceQueueModel.process_instance_id == process_instance.id)
|
||||
.first()
|
||||
)
|
||||
|
||||
if queue_entry is None:
|
||||
return False
|
||||
|
||||
current_time = round(time.time())
|
||||
return queue_entry.run_at_in_seconds > current_time
|
||||
|
|
|
@ -49,6 +49,7 @@ from spiffworkflow_backend.services.process_instance_processor import ProcessIns
|
|||
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceIsAlreadyLockedError
|
||||
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceIsNotEnqueuedError
|
||||
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService
|
||||
from spiffworkflow_backend.services.process_instance_tmp_service import ProcessInstanceTmpService
|
||||
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
||||
from spiffworkflow_backend.services.workflow_execution_service import TaskRunnability
|
||||
from spiffworkflow_backend.services.workflow_execution_service import WorkflowExecutionServiceError
|
||||
|
@ -480,7 +481,7 @@ class ProcessInstanceService:
|
|||
|
||||
# the caller needs to handle the actual queueing of the process instance for better dequeueing ability
|
||||
if not should_queue_process_instance(processor.process_instance_model, execution_mode):
|
||||
if not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(processor.process_instance_model):
|
||||
if not ProcessInstanceTmpService.is_enqueued_to_run_in_the_future(processor.process_instance_model):
|
||||
with sentry_sdk.start_span(op="task", description="backend_do_engine_steps"):
|
||||
execution_strategy_name = None
|
||||
if execution_mode == ProcessInstanceExecutionMode.synchronous.value:
|
||||
|
|
|
@ -8,6 +8,7 @@ from spiffworkflow_backend.models.db import db
|
|||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||
from spiffworkflow_backend.models.process_instance_error_detail import ProcessInstanceErrorDetailModel
|
||||
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventModel
|
||||
from spiffworkflow_backend.models.process_instance_queue import ProcessInstanceQueueModel
|
||||
|
||||
|
||||
class ProcessInstanceTmpService:
|
||||
|
@ -76,3 +77,17 @@ class ProcessInstanceTmpService:
|
|||
if add_to_db_session:
|
||||
db.session.add(process_instance_error_detail)
|
||||
return (process_instance_event, process_instance_error_detail)
|
||||
|
||||
@staticmethod
|
||||
def is_enqueued_to_run_in_the_future(process_instance: ProcessInstanceModel) -> bool:
|
||||
queue_entry = (
|
||||
db.session.query(ProcessInstanceQueueModel)
|
||||
.filter(ProcessInstanceQueueModel.process_instance_id == process_instance.id)
|
||||
.first()
|
||||
)
|
||||
|
||||
if queue_entry is None:
|
||||
return False
|
||||
|
||||
current_time = round(time.time())
|
||||
return queue_entry.run_at_in_seconds > current_time
|
||||
|
|
Loading…
Reference in New Issue