From 05b50df2b395809a0780e5d0069e346a8a9efa02 Mon Sep 17 00:00:00 2001 From: burnettk Date: Fri, 2 Feb 2024 17:59:37 -0500 Subject: [PATCH] Revert "future tasks should not cause anything to happen if the instance is suspended" This reverts commit b627567addbb911447299641dee43f7081b6213d. --- .../migrations/versions/acf20342181e_.py | 34 -------- .../background_processing_service.py | 51 ++++-------- .../process_instance_task_producer.py | 7 +- .../models/future_task.py | 1 - .../models/process_instance.py | 21 +---- .../services/process_instance_processor.py | 19 +---- .../services/process_instance_service.py | 1 - .../test_background_processing_service.py | 80 ------------------- 8 files changed, 19 insertions(+), 195 deletions(-) delete mode 100644 spiffworkflow-backend/migrations/versions/acf20342181e_.py delete mode 100644 spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_background_processing_service.py diff --git a/spiffworkflow-backend/migrations/versions/acf20342181e_.py b/spiffworkflow-backend/migrations/versions/acf20342181e_.py deleted file mode 100644 index 74ddcd4d..00000000 --- a/spiffworkflow-backend/migrations/versions/acf20342181e_.py +++ /dev/null @@ -1,34 +0,0 @@ -"""empty message - -Revision ID: acf20342181e -Revises: 343b406f723d -Create Date: 2024-02-02 16:47:00.942504 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = 'acf20342181e' -down_revision = '343b406f723d' -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('future_task', schema=None) as batch_op: - batch_op.add_column(sa.Column('archived_for_process_instance_status', sa.Boolean(), nullable=False)) - batch_op.create_index(batch_op.f('ix_future_task_archived_for_process_instance_status'), ['archived_for_process_instance_status'], unique=False) - - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('future_task', schema=None) as batch_op: - batch_op.drop_index(batch_op.f('ix_future_task_archived_for_process_instance_status')) - batch_op.drop_column('archived_for_process_instance_status') - - # ### end Alembic commands ### diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/background_processing_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/background_processing_service.py index 9a8dfa82..a444c219 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/background_processing_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/background_processing_service.py @@ -6,7 +6,6 @@ from sqlalchemy import and_ from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import ( queue_future_task_if_appropriate, ) -from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.future_task import FutureTaskModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus @@ -58,46 +57,24 @@ class BackgroundProcessingService: ProcessInstanceLockService.remove_stale_locks() def process_future_tasks(self) -> None: - """Timer related tasks go in the future_task table. - - Celery is not great at scheduling things in the distant future. So this function periodically checks the future_task - table and puts tasks into the queue that are imminently ready to run. Imminently is configurable and defaults to those - that are 5 minutes away or less. - """ - + """If something has been locked for a certain amount of time it is probably stale so unlock it.""" with self.app.app_context(): future_task_lookahead_in_seconds = self.app.config[ "SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_FUTURE_TASK_LOOKAHEAD_IN_SECONDS" ] - self.__class__.do_process_future_tasks(future_task_lookahead_in_seconds) - - @classmethod - def do_process_future_tasks(cls, future_task_lookahead_in_seconds: int) -> None: - future_tasks = cls.imminent_future_tasks(future_task_lookahead_in_seconds) - for future_task in future_tasks: - process_instance = ( - ProcessInstanceModel.query.join(TaskModel, TaskModel.process_instance_id == ProcessInstanceModel.id) - .filter(TaskModel.guid == future_task.guid) - .first() - ) - if process_instance.allowed_to_run(): + lookahead = time.time() + future_task_lookahead_in_seconds + future_tasks = FutureTaskModel.query.filter( + and_( + FutureTaskModel.completed == False, # noqa: E712 + FutureTaskModel.run_at_in_seconds < lookahead, + ) + ).all() + for future_task in future_tasks: + process_instance = ( + ProcessInstanceModel.query.join(TaskModel, TaskModel.process_instance_id == ProcessInstanceModel.id) + .filter(TaskModel.guid == future_task.guid) + .first() + ) queue_future_task_if_appropriate( process_instance, eta_in_seconds=future_task.run_at_in_seconds, task_guid=future_task.guid ) - else: - # if we are not allowed to run the process instance, we should not keep processing the future task - future_task.archived_for_process_instance_status = True - db.session.add(future_task) - db.session.commit() - - @classmethod - def imminent_future_tasks(cls, future_task_lookahead_in_seconds: int) -> list[FutureTaskModel]: - lookahead = time.time() + future_task_lookahead_in_seconds - future_tasks: list[FutureTaskModel] = FutureTaskModel.query.filter( - and_( - FutureTaskModel.completed == False, # noqa: E712 - FutureTaskModel.archived_for_process_instance_status == False, # noqa: E712 - FutureTaskModel.run_at_in_seconds < lookahead, - ) - ).all() - return future_tasks diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py index 0a585b20..05c73d1b 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py @@ -16,12 +16,7 @@ def queue_future_task_if_appropriate(process_instance: ProcessInstanceModel, eta if queue_enabled_for_process_model(process_instance): buffer = 1 countdown = eta_in_seconds - time.time() + buffer - args_to_celery = { - "process_instance_id": process_instance.id, - "task_guid": task_guid, - # the producer_identifier is so we can know what is putting messages in the queue - "producer_identifier": "future_task", - } + args_to_celery = {"process_instance_id": process_instance.id, "task_guid": task_guid} # add buffer to countdown to avoid rounding issues and race conditions with spiff. the situation we want to avoid is where # we think the timer said to run it at 6:34:11, and we initialize the SpiffWorkflow library, # expecting the timer to be ready, but the library considered it ready a little after that time diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/future_task.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/future_task.py index 07ac155e..d9d418ad 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/future_task.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/future_task.py @@ -17,7 +17,6 @@ class FutureTaskModel(SpiffworkflowBaseDBModel): guid: str = db.Column(db.String(36), primary_key=True) run_at_in_seconds: int = db.Column(db.Integer, nullable=False, index=True) completed: bool = db.Column(db.Boolean, default=False, nullable=False, index=True) - archived_for_process_instance_status: bool = db.Column(db.Boolean, default=False, nullable=False, index=True) updated_at_in_seconds: int = db.Column(db.Integer, nullable=False) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py index 27f7c529..3c542768 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py @@ -1,7 +1,4 @@ from __future__ import annotations -from flask_sqlalchemy.query import Query -from spiffworkflow_backend.models.task import TaskModel # noqa: F401 -from spiffworkflow_backend.models.future_task import FutureTaskModel from typing import Any @@ -51,9 +48,7 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel): process_model_display_name: str = db.Column(db.String(255), nullable=False, index=True) process_initiator_id: int = db.Column(ForeignKey(UserModel.id), nullable=False, index=True) # type: ignore bpmn_process_definition_id: int | None = db.Column( - ForeignKey(BpmnProcessDefinitionModel.id), # type: ignore - nullable=True, - index=True, + ForeignKey(BpmnProcessDefinitionModel.id), nullable=True, index=True # type: ignore ) bpmn_process_id: int | None = db.Column(ForeignKey(BpmnProcessModel.id), nullable=True, index=True) # type: ignore @@ -122,16 +117,6 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel): """ return self.bpmn_process_definition_id is not None and self.bpmn_process_id is not None - def future_tasks_query(self) -> Query: - future_tasks: Query = ( - FutureTaskModel.query.filter( - FutureTaskModel.completed == False, # noqa: E712 - ) - .join(TaskModel, TaskModel.guid == FutureTaskModel.guid) - .filter(TaskModel.process_instance_id == self.id) - ) - return future_tasks - def serialized(self) -> dict[str, Any]: """Return object data in serializeable format.""" return { @@ -167,10 +152,6 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel): def can_submit_task(self) -> bool: return not self.has_terminal_status() and self.status != "suspended" - def allowed_to_run(self) -> bool: - """If this process can currently move forward with things like do_engine_steps.""" - return not self.has_terminal_status() and self.status != "suspended" - def can_receive_message(self) -> bool: """If this process can currently accept messages.""" return not self.has_terminal_status() and self.status != "suspended" diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index db82378e..73176cd3 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1,5 +1,3 @@ -from spiffworkflow_backend.models.future_task import FutureTaskModel - # TODO: clean up this service for a clear distinction between it and the process_instance_service # where this points to the pi service import copy @@ -611,9 +609,9 @@ class ProcessInstanceProcessor: bpmn_process_definition_dict: dict = bpmn_subprocess_definition.properties_json spiff_bpmn_process_dict["subprocess_specs"][bpmn_subprocess_definition.bpmn_identifier] = bpmn_process_definition_dict spiff_bpmn_process_dict["subprocess_specs"][bpmn_subprocess_definition.bpmn_identifier]["task_specs"] = {} - bpmn_subprocess_definition_bpmn_identifiers[ - bpmn_subprocess_definition.id - ] = bpmn_subprocess_definition.bpmn_identifier + bpmn_subprocess_definition_bpmn_identifiers[bpmn_subprocess_definition.id] = ( + bpmn_subprocess_definition.bpmn_identifier + ) task_definitions = TaskDefinitionModel.query.filter( TaskDefinitionModel.bpmn_process_definition_id.in_(bpmn_subprocess_definition_bpmn_identifiers.keys()) # type: ignore @@ -1812,20 +1810,9 @@ class ProcessInstanceProcessor: ) db.session.commit() - def bring_archived_future_tasks_back_to_life(self) -> None: - archived_future_tasks = ( - self.process_instance_model.future_tasks_query() - .filter(FutureTaskModel.archived_for_process_instance_status == True) # noqa: E712 - .all() - ) - for archived_future_task in archived_future_tasks: - archived_future_task.archived_for_process_instance_status = False - db.session.add(archived_future_task) - def resume(self) -> None: self.process_instance_model.status = ProcessInstanceStatus.waiting.value db.session.add(self.process_instance_model) - self.bring_archived_future_tasks_back_to_life() ProcessInstanceTmpService.add_event_to_process_instance( self.process_instance_model, ProcessInstanceEventType.process_instance_resumed.value ) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py index bba7b50b..24a013e1 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py @@ -42,7 +42,6 @@ from spiffworkflow_backend.models.process_instance_file_data import ProcessInsta from spiffworkflow_backend.models.process_model import ProcessModelInfo from spiffworkflow_backend.models.process_model_cycle import ProcessModelCycleModel from spiffworkflow_backend.models.task import Task -from spiffworkflow_backend.models.task import TaskModel # noqa: F401 from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.services.authorization_service import AuthorizationService from spiffworkflow_backend.services.git_service import GitCommandError diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_background_processing_service.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_background_processing_service.py deleted file mode 100644 index c2326378..00000000 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_background_processing_service.py +++ /dev/null @@ -1,80 +0,0 @@ -from flask import Flask -from pytest_mock.plugin import MockerFixture -from spiffworkflow_backend.background_processing.background_processing_service import BackgroundProcessingService -from spiffworkflow_backend.models.db import db -from spiffworkflow_backend.models.future_task import FutureTaskModel -from spiffworkflow_backend.models.process_instance import ProcessInstanceModel -from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor - -from tests.spiffworkflow_backend.helpers.base_test import BaseTest -from tests.spiffworkflow_backend.helpers.test_data import load_test_spec - - -class TestBackgroundProcessingService(BaseTest): - def test_process_future_tasks_with_no_future_tasks( - self, - app: Flask, - with_db_and_bpmn_file_cleanup: None, - ) -> None: - BackgroundProcessingService(app).process_future_tasks() - - def test_do_process_future_tasks_with_processable_future_task( - self, - app: Flask, - mocker: MockerFixture, - with_db_and_bpmn_file_cleanup: None, - ) -> None: - with self.app_config_mock(app, "SPIFFWORKFLOW_BACKEND_CELERY_ENABLED", True): - mock = mocker.patch("celery.current_app.send_task") - process_instance = self._load_up_a_future_task_and_return_instance() - assert mock.call_count == 0 - BackgroundProcessingService.do_process_future_tasks(99999999999999999) - assert mock.call_count == 1 - future_tasks = FutureTaskModel.query.all() - assert len(future_tasks) == 1 - assert future_tasks[0].archived_for_process_instance_status is False - - def test_do_process_future_tasks_with_unprocessable_future_task( - self, - app: Flask, - mocker: MockerFixture, - with_db_and_bpmn_file_cleanup: None, - ) -> None: - with self.app_config_mock(app, "SPIFFWORKFLOW_BACKEND_CELERY_ENABLED", True): - mock = mocker.patch("celery.current_app.send_task") - process_instance = self._load_up_a_future_task_and_return_instance() - assert mock.call_count == 0 - process_instance.status = "suspended" - db.session.add(process_instance) - db.session.commit() - future_tasks = BackgroundProcessingService.imminent_future_tasks(99999999999999999) - assert len(future_tasks) == 1 - BackgroundProcessingService.do_process_future_tasks(99999999999999999) - # should not process anything, so nothing goes to queue - assert mock.call_count == 0 - future_tasks = FutureTaskModel.query.all() - assert len(future_tasks) == 1 - assert future_tasks[0].archived_for_process_instance_status is True - - # the next time do_process_future_tasks runs, it will not consider this task, which is nice - future_tasks = BackgroundProcessingService.imminent_future_tasks(99999999999999999) - assert len(future_tasks) == 0 - processor = ProcessInstanceProcessor(process_instance) - processor.resume() - future_tasks = BackgroundProcessingService.imminent_future_tasks(99999999999999999) - assert len(future_tasks) == 1 - - def _load_up_a_future_task_and_return_instance(self) -> ProcessInstanceModel: - process_model = load_test_spec( - process_model_id="test_group/user-task-with-timer", - process_model_source_directory="user-task-with-timer", - ) - process_instance = self.create_process_instance_from_process_model(process_model=process_model) - processor = ProcessInstanceProcessor(process_instance) - processor.do_engine_steps(save=True) - - assert process_instance.status == "user_input_required" - - future_tasks = FutureTaskModel.query.all() - assert len(future_tasks) == 1 - return process_instance