diff --git a/spiffworkflow-backend/bin/clear_celery_queue b/spiffworkflow-backend/bin/clear_celery_queue new file mode 100755 index 000000000..c94f563e7 --- /dev/null +++ b/spiffworkflow-backend/bin/clear_celery_queue @@ -0,0 +1,20 @@ +#!/usr/bin/env bash + +function error_handler() { + echo >&2 "Exited with BAD EXIT CODE '${2}' in ${0} script at line: ${1}." + exit "$2" +} +trap 'error_handler ${LINENO} $?' ERR +set -o errtrace -o errexit -o nounset -o pipefail + +script_dir="$( + cd -- "$(dirname "$0")" >/dev/null 2>&1 + pwd -P +)" +. "${script_dir}/local_development_environment_setup" + +export SPIFFWORKFLOW_BACKEND_CELERY_ENABLED=true + +poetry run celery -A src.spiffworkflow_backend.background_processing.celery_worker purge -f + +# poetry run celery -A src.spiffworkflow_backend.background_processing.celery_worker inspect active diff --git a/spiffworkflow-backend/migrations/versions/fc5815a9d482_.py b/spiffworkflow-backend/migrations/versions/fc5815a9d482_.py new file mode 100644 index 000000000..136f4ed99 --- /dev/null +++ b/spiffworkflow-backend/migrations/versions/fc5815a9d482_.py @@ -0,0 +1,34 @@ +"""empty message + +Revision ID: fc5815a9d482 +Revises: 7eaec0e12079 +Create Date: 2024-06-25 16:05:40.787119 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'fc5815a9d482' +down_revision = '7eaec0e12079' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('future_task', schema=None) as batch_op: + batch_op.add_column(sa.Column('queued_to_run_at_in_seconds', sa.Integer(), nullable=True)) + batch_op.create_index(batch_op.f('ix_future_task_queued_to_run_at_in_seconds'), ['queued_to_run_at_in_seconds'], unique=False) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('future_task', schema=None) as batch_op: + batch_op.drop_index(batch_op.f('ix_future_task_queued_to_run_at_in_seconds')) + batch_op.drop_column('queued_to_run_at_in_seconds') + + # ### end Alembic commands ### diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/background_processing_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/background_processing_service.py index ab70089fc..6ad0eb180 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/background_processing_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/background_processing_service.py @@ -2,6 +2,7 @@ import time import flask from sqlalchemy import and_ +from sqlalchemy import or_ from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import ( queue_future_task_if_appropriate, @@ -98,6 +99,10 @@ class BackgroundProcessingService: FutureTaskModel.completed == False, # noqa: E712 FutureTaskModel.archived_for_process_instance_status == False, # noqa: E712 FutureTaskModel.run_at_in_seconds < lookahead, + or_( + FutureTaskModel.queued_to_run_at_in_seconds != FutureTaskModel.run_at_in_seconds, + FutureTaskModel.queued_to_run_at_in_seconds == None, # noqa: E711 + ), ) ).all() return future_tasks diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery.py b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery.py index 56c7c8302..9d9b293e7 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery.py @@ -23,6 +23,7 @@ def celery_init_app(app: flask.app.Flask) -> Celery: "result_serializer": "json", "accept_content": ["json"], "enable_utc": True, + "worker_redirect_stdouts_level": "DEBUG", } celery_app = Celery(app.name) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py index 038ff8d14..4321deaec 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py @@ -2,9 +2,6 @@ from billiard import current_process # type: ignore from celery import shared_task from flask import current_app -from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import ( - queue_future_task_if_appropriate, -) from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import ( queue_process_instance_if_appropriate, ) @@ -26,31 +23,44 @@ class SpiffCeleryWorkerError(Exception): pass -@shared_task(ignore_result=False, time_limit=ten_minutes) -def celery_task_process_instance_run(process_instance_id: int, task_guid: str | None = None) -> dict: +# ignore types so we can use self and get the celery task id from self.request.id. +@shared_task(ignore_result=False, time_limit=ten_minutes, bind=True) +def celery_task_process_instance_run(self, process_instance_id: int, task_guid: str | None = None) -> dict: # type: ignore proc_index = current_process().index - message = f"celery_task_process_instance_run: process_instance_id: {process_instance_id}" + celery_task_id = self.request.id + logger_prefix = f"celery_task_process_instance_run[{celery_task_id}]" + worker_intro_log_message = f"{logger_prefix}: process_instance_id: {process_instance_id}" if task_guid: - message += f" task_guid: {task_guid}" - current_app.logger.info(message) + worker_intro_log_message += f" task_guid: {task_guid}" + current_app.logger.info(worker_intro_log_message) ProcessInstanceLockService.set_thread_local_locking_context("celery:worker") process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first() - if task_guid is None and ProcessInstanceTmpService.is_enqueued_to_run_in_the_future(process_instance): + skipped_mesage = None + if process_instance is None: + skipped_mesage = "Skipped because the process instance no longer exists in the database. It could have been deleted." + elif task_guid is None and ProcessInstanceTmpService.is_enqueued_to_run_in_the_future(process_instance): + skipped_mesage = "Skipped because the process instance is set to run in the future." + if skipped_mesage is not None: return { "ok": True, "process_instance_id": process_instance_id, "task_guid": task_guid, - "message": "Skipped because the process instance is set to run in the future.", + "message": skipped_mesage, } + try: task_guid_for_requeueing = task_guid with ProcessInstanceQueueService.dequeued(process_instance): + # run ready tasks to force them to run in case they have instructions on them since queue_instructions_for_end_user + # has a should_break_before that will exit if there are instructions. ProcessInstanceService.run_process_instance_with_processor( - process_instance, execution_strategy_name="run_current_ready_tasks" + process_instance, execution_strategy_name="run_current_ready_tasks", should_schedule_waiting_timer_events=False ) + # we need to save instructions to the db so the frontend progress page can view them, + # and this is the only way to do it _processor, task_runnability = ProcessInstanceService.run_process_instance_with_processor( process_instance, execution_strategy_name="queue_instructions_for_end_user", @@ -76,17 +86,14 @@ def celery_task_process_instance_run(process_instance_id: int, task_guid: str | return {"ok": True, "process_instance_id": process_instance_id, "task_guid": task_guid} except ProcessInstanceIsAlreadyLockedError as exception: current_app.logger.info( - f"Could not run process instance with worker: {current_app.config['PROCESS_UUID']} - {proc_index}. Error was:" - f" {str(exception)}" + f"{logger_prefix}: Could not run process instance with worker: {current_app.config['PROCESS_UUID']}" + f" - {proc_index}. Error was: {str(exception)}" ) - # NOTE: consider exponential backoff - queue_future_task_if_appropriate(process_instance, eta_in_seconds=10, task_guid=task_guid) return {"ok": False, "process_instance_id": process_instance_id, "task_guid": task_guid, "exception": str(exception)} except Exception as exception: db.session.rollback() # in case the above left the database with a bad transaction error_message = ( - f"Error running process_instance {process_instance.id} " - + f"({process_instance.process_model_identifier}) and task_guid {task_guid}. {str(exception)}" + f"{logger_prefix}: Error running process_instance {process_instance_id} task_guid {task_guid}. {str(exception)}" ) current_app.logger.error(error_message) db.session.add(process_instance) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py index 6ffc1e3c9..6cb7a0f08 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py @@ -52,7 +52,11 @@ def queue_future_task_if_appropriate( # celery_task_process_instance_run.apply_async(kwargs=args_to_celery, countdown=countdown + 1) # type: ignore async_result = celery.current_app.send_task(CELERY_TASK_PROCESS_INSTANCE_RUN, kwargs=args_to_celery, countdown=countdown) - current_app.logger.info(f"Queueing process instance ({process_instance.id}) for celery ({async_result.task_id})") + message = ( + f"Queueing process instance ({process_instance.id}) for future task ({task_guid}). " + f"new celery task id: ({async_result.task_id})" + ) + current_app.logger.info(message) return True return False diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/future_task.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/future_task.py index cda4f3676..ab737d3c5 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/future_task.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/future_task.py @@ -1,3 +1,4 @@ +import copy import time from dataclasses import dataclass @@ -19,6 +20,7 @@ class FutureTaskModel(SpiffworkflowBaseDBModel): guid: str = db.Column(ForeignKey(TaskModel.guid, ondelete="CASCADE", name="future_task_task_guid_fk"), primary_key=True) run_at_in_seconds: int = db.Column(db.Integer, nullable=False, index=True) + queued_to_run_at_in_seconds: int = db.Column(db.Integer, nullable=True, index=True) completed: bool = db.Column(db.Boolean, default=False, nullable=False, index=True) archived_for_process_instance_status: bool = db.Column( db.Boolean, @@ -31,20 +33,23 @@ class FutureTaskModel(SpiffworkflowBaseDBModel): updated_at_in_seconds: int = db.Column(db.Integer, nullable=False) @classmethod - def insert_or_update(cls, guid: str, run_at_in_seconds: int) -> None: - task_info = [ - { - "guid": guid, - "run_at_in_seconds": run_at_in_seconds, - "updated_at_in_seconds": round(time.time()), - } - ] + def insert_or_update(cls, guid: str, run_at_in_seconds: int, queued_to_run_at_in_seconds: int | None = None) -> None: + task_info: dict[str, int | str | None] = { + "guid": guid, + "run_at_in_seconds": run_at_in_seconds, + "updated_at_in_seconds": round(time.time()), + } + + if queued_to_run_at_in_seconds is not None: + task_info["queued_to_run_at_in_seconds"] = queued_to_run_at_in_seconds + + new_values = copy.copy(task_info) + del new_values["guid"] + on_duplicate_key_stmt = None if current_app.config["SPIFFWORKFLOW_BACKEND_DATABASE_TYPE"] == "mysql": insert_stmt = mysql_insert(FutureTaskModel).values(task_info) - on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update( - run_at_in_seconds=insert_stmt.inserted.run_at_in_seconds, updated_at_in_seconds=round(time.time()) - ) + on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(**new_values) else: insert_stmt = None if current_app.config["SPIFFWORKFLOW_BACKEND_DATABASE_TYPE"] == "sqlite": @@ -53,6 +58,6 @@ class FutureTaskModel(SpiffworkflowBaseDBModel): insert_stmt = postgres_insert(FutureTaskModel).values(task_info) on_duplicate_key_stmt = insert_stmt.on_conflict_do_update( index_elements=["guid"], - set_={"run_at_in_seconds": run_at_in_seconds, "updated_at_in_seconds": round(time.time())}, + set_=new_values, ) db.session.execute(on_duplicate_key_stmt) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_queue.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_queue.py index a16177a2d..babd8c42a 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_queue.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance_queue.py @@ -18,6 +18,12 @@ class ProcessInstanceQueueModel(SpiffworkflowBaseDBModel): locked_at_in_seconds: int | None = db.Column(db.Integer, index=True, nullable=True) status: str = db.Column(db.String(50), index=True) + # for timers. right now the apscheduler jobs without celery check for waiting process instances. + # if the instance's run_at_in_seconds is now or earlier, the instance will run. + # so we can save some effort if we detect that it is scheduled to run later. + # note that we still run an apscheduler job to manage timer start events, even if + # SPIFFWORKFLOW_BACKEND_CELERY_ENABLED=true run_at_in_seconds: int = db.Column(db.Integer) + updated_at_in_seconds: int = db.Column(db.Integer) created_at_in_seconds: int = db.Column(db.Integer) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 22f37da7e..60daa5169 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1438,19 +1438,27 @@ class ProcessInstanceProcessor: save: bool = False, execution_strategy_name: str | None = None, execution_strategy: ExecutionStrategy | None = None, + should_schedule_waiting_timer_events: bool = True, ) -> TaskRunnability: if self.process_instance_model.persistence_level != "none": with ProcessInstanceQueueService.dequeued(self.process_instance_model): # TODO: ideally we just lock in the execution service, but not sure # about _add_bpmn_process_definitions and if that needs to happen in # the same lock like it does on main - return self._do_engine_steps(exit_at, save, execution_strategy_name, execution_strategy) + return self._do_engine_steps( + exit_at, + save, + execution_strategy_name, + execution_strategy, + should_schedule_waiting_timer_events=should_schedule_waiting_timer_events, + ) else: return self._do_engine_steps( exit_at, save=False, execution_strategy_name=execution_strategy_name, execution_strategy=execution_strategy, + should_schedule_waiting_timer_events=should_schedule_waiting_timer_events, ) def _do_engine_steps( @@ -1459,6 +1467,7 @@ class ProcessInstanceProcessor: save: bool = False, execution_strategy_name: str | None = None, execution_strategy: ExecutionStrategy | None = None, + should_schedule_waiting_timer_events: bool = True, ) -> TaskRunnability: self._add_bpmn_process_definitions( self.serialize(), @@ -1488,7 +1497,11 @@ class ProcessInstanceProcessor: self._script_engine.environment.finalize_result, self.save, ) - task_runnability = execution_service.run_and_save(exit_at, save) + task_runnability = execution_service.run_and_save( + exit_at, + save, + should_schedule_waiting_timer_events=should_schedule_waiting_timer_events, + ) self.check_all_tasks() return task_runnability diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py index 81169709e..3926f5f00 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py @@ -38,7 +38,6 @@ from spiffworkflow_backend.models.process_instance_file_data import ProcessInsta from spiffworkflow_backend.models.process_model import ProcessModelInfo from spiffworkflow_backend.models.process_model_cycle import ProcessModelCycleModel from spiffworkflow_backend.models.task import Task -from spiffworkflow_backend.models.task import TaskModel # noqa: F401 from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.services.authorization_service import AuthorizationService from spiffworkflow_backend.services.error_handling_service import ErrorHandlingService @@ -280,6 +279,7 @@ class ProcessInstanceService: process_instance: ProcessInstanceModel, status_value: str | None = None, execution_strategy_name: str | None = None, + should_schedule_waiting_timer_events: bool = True, ) -> tuple[ProcessInstanceProcessor | None, TaskRunnability]: processor = None task_runnability = TaskRunnability.unknown_if_ready_tasks @@ -302,6 +302,7 @@ class ProcessInstanceService: task_runnability = processor.do_engine_steps( save=True, execution_strategy_name=execution_strategy_name, + should_schedule_waiting_timer_events=should_schedule_waiting_timer_events, ) return (processor, task_runnability) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index d2bd9d69d..02a586718 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -358,6 +358,8 @@ class QueueInstructionsForEndUserExecutionStrategy(ExecutionStrategy): JinjaService.add_instruction_for_end_user_if_appropriate(tasks, process_instance_model.id, self.tasks_that_have_been_seen) def should_break_before(self, tasks: list[SpiffTask], process_instance_model: ProcessInstanceModel) -> bool: + # exit if there are instructionsForEndUser so the instructions can be comitted to the db using the normal save method + # for the process instance. for spiff_task in tasks: if hasattr(spiff_task.task_spec, "extensions") and spiff_task.task_spec.extensions.get( "instructionsForEndUser", None @@ -451,7 +453,12 @@ class WorkflowExecutionService: # run # execution_strategy.spiff_run # spiff.[some_run_task_method] - def run_and_save(self, exit_at: None = None, save: bool = False) -> TaskRunnability: + def run_and_save( + self, + exit_at: None = None, + save: bool = False, + should_schedule_waiting_timer_events: bool = True, + ) -> TaskRunnability: if self.process_instance_model.persistence_level != "none": with safe_assertion(ProcessInstanceLockService.has_lock(self.process_instance_model.id)) as tripped: if tripped: @@ -492,7 +499,8 @@ class WorkflowExecutionService: self.execution_strategy.add_object_to_db_session(self.bpmn_process_instance) if save: self.process_instance_saver() - self.schedule_waiting_timer_events() + if should_schedule_waiting_timer_events: + self.schedule_waiting_timer_events() def is_happening_soon(self, time_in_seconds: int) -> bool: # if it is supposed to happen in less than the amount of time we take between polling runs @@ -509,11 +517,17 @@ class WorkflowExecutionService: if "Time" in event.event_type: time_string = event.value run_at_in_seconds = round(datetime.fromisoformat(time_string).timestamp()) - FutureTaskModel.insert_or_update(guid=str(spiff_task.id), run_at_in_seconds=run_at_in_seconds) + queued_to_run_at_in_seconds = None if self.is_happening_soon(run_at_in_seconds): - queue_future_task_if_appropriate( + if queue_future_task_if_appropriate( self.process_instance_model, eta_in_seconds=run_at_in_seconds, task_guid=str(spiff_task.id) - ) + ): + queued_to_run_at_in_seconds = run_at_in_seconds + FutureTaskModel.insert_or_update( + guid=str(spiff_task.id), + run_at_in_seconds=run_at_in_seconds, + queued_to_run_at_in_seconds=queued_to_run_at_in_seconds, + ) def process_bpmn_messages(self) -> None: # FIXE: get_events clears out the events so if we have other events we care about @@ -588,12 +602,19 @@ class WorkflowExecutionService: class ProfiledWorkflowExecutionService(WorkflowExecutionService): """A profiled version of the workflow execution service.""" - def run_and_save(self, exit_at: None = None, save: bool = False) -> TaskRunnability: + def run_and_save( + self, + exit_at: None = None, + save: bool = False, + should_schedule_waiting_timer_events: bool = True, + ) -> TaskRunnability: import cProfile from pstats import SortKey task_runnability = TaskRunnability.unknown_if_ready_tasks with cProfile.Profile() as pr: - task_runnability = super().run_and_save(exit_at=exit_at, save=save) + task_runnability = super().run_and_save( + exit_at=exit_at, save=save, should_schedule_waiting_timer_events=should_schedule_waiting_timer_events + ) pr.print_stats(sort=SortKey.CUMULATIVE) return task_runnability diff --git a/spiffworkflow-backend/tests/data/user-task-with-timer/user_task_with_short_timer.bpmn b/spiffworkflow-backend/tests/data/user-task-with-timer/user_task_with_short_timer.bpmn new file mode 100644 index 000000000..14163527a --- /dev/null +++ b/spiffworkflow-backend/tests/data/user-task-with-timer/user_task_with_short_timer.bpmn @@ -0,0 +1,69 @@ + + + + + Flow_0903e0h + + + + Flow_1yn50r0 + + + + Flow_0903e0h + Flow_1yn50r0 + + + Flow_1ky2hak + + "PT4M" + + + + Flow_1ky2hak + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_background_processing_service.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_background_processing_service.py index 9d9509bd6..c7017710a 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_background_processing_service.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_background_processing_service.py @@ -8,6 +8,7 @@ from spiffworkflow_backend.models.future_task import FutureTaskModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus from spiffworkflow_backend.models.process_instance_queue import ProcessInstanceQueueModel +from spiffworkflow_backend.models.process_model import ProcessModelInfo from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService @@ -94,17 +95,67 @@ class TestBackgroundProcessingService(BaseTest): ProcessInstanceService.do_waiting(ProcessInstanceStatus.waiting.value) assert process_instance.status == ProcessInstanceStatus.waiting.value - def _load_up_a_future_task_and_return_instance(self) -> ProcessInstanceModel: - process_model = load_test_spec( - process_model_id="test_group/user-task-with-timer", - process_model_source_directory="user-task-with-timer", - ) - process_instance = self.create_process_instance_from_process_model(process_model=process_model) + def test_does_not_queue_future_tasks_if_requested( + self, + app: Flask, + mocker: MockerFixture, + with_db_and_bpmn_file_cleanup: None, + ) -> None: + with self.app_config_mock(app, "SPIFFWORKFLOW_BACKEND_CELERY_ENABLED", True): + mock = mocker.patch("celery.current_app.send_task") + self._load_up_a_future_task_and_return_instance(should_schedule_waiting_timer_events=False) + assert mock.call_count == 0 + BackgroundProcessingService.do_process_future_tasks(99999999999999999) + assert mock.call_count == 0 + future_tasks = FutureTaskModel.query.all() + assert len(future_tasks) == 0 + + def test_does_not_requeue_if_recently_queued( + self, + app: Flask, + mocker: MockerFixture, + with_db_and_bpmn_file_cleanup: None, + ) -> None: + with self.app_config_mock(app, "SPIFFWORKFLOW_BACKEND_CELERY_ENABLED", True): + mock = mocker.patch("celery.current_app.send_task") + assert mock.call_count == 0 + process_model = load_test_spec( + process_model_id="test_group/user-task-with-timer", + process_model_source_directory="user-task-with-timer", + bpmn_file_name="user_task_with_short_timer.bpmn", + ) + + # it should queue only when it runs the process model + self._load_up_a_future_task_and_return_instance(process_model=process_model) + assert mock.call_count == 1 + BackgroundProcessingService.do_process_future_tasks(99999999999999999) + assert mock.call_count == 1 + future_tasks = FutureTaskModel.query.all() + assert len(future_tasks) == 1 + assert future_tasks[0].archived_for_process_instance_status is False + + BackgroundProcessingService.do_process_future_tasks(99999999999999999) + assert mock.call_count == 1 + future_tasks = FutureTaskModel.query.all() + assert len(future_tasks) == 1 + assert future_tasks[0].archived_for_process_instance_status is False + + def _load_up_a_future_task_and_return_instance( + self, process_model: ProcessModelInfo | None = None, should_schedule_waiting_timer_events: bool = True + ) -> ProcessInstanceModel: + process_model_to_use = process_model + if process_model_to_use is None: + process_model_to_use = load_test_spec( + process_model_id="test_group/user-task-with-timer", + process_model_source_directory="user-task-with-timer", + bpmn_file_name="user_task_with_timer.bpmn", + ) + process_instance = self.create_process_instance_from_process_model(process_model=process_model_to_use) processor = ProcessInstanceProcessor(process_instance) - processor.do_engine_steps(save=True) + processor.do_engine_steps(save=True, should_schedule_waiting_timer_events=should_schedule_waiting_timer_events) assert process_instance.status == "user_input_required" future_tasks = FutureTaskModel.query.all() - assert len(future_tasks) == 1 + assert len(future_tasks) == (1 if should_schedule_waiting_timer_events else 0) return process_instance diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_future_task.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_future_task.py index 654b69d48..316c987c3 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_future_task.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_future_task.py @@ -22,6 +22,7 @@ class TestFutureTask(BaseTest): process_model = load_test_spec( process_model_id="test_group/user-task-with-timer", process_model_source_directory="user-task-with-timer", + bpmn_file_name="user_task_with_timer.bpmn", ) process_instance = self.create_process_instance_from_process_model(process_model=process_model) processor = ProcessInstanceProcessor(process_instance)