Revert "future tasks should not cause anything to happen if the instance is suspended"
This reverts commit b627567add
.
This commit is contained in:
parent
b627567add
commit
05b50df2b3
|
@ -1,34 +0,0 @@
|
||||||
"""empty message
|
|
||||||
|
|
||||||
Revision ID: acf20342181e
|
|
||||||
Revises: 343b406f723d
|
|
||||||
Create Date: 2024-02-02 16:47:00.942504
|
|
||||||
|
|
||||||
"""
|
|
||||||
from alembic import op
|
|
||||||
import sqlalchemy as sa
|
|
||||||
|
|
||||||
|
|
||||||
# revision identifiers, used by Alembic.
|
|
||||||
revision = 'acf20342181e'
|
|
||||||
down_revision = '343b406f723d'
|
|
||||||
branch_labels = None
|
|
||||||
depends_on = None
|
|
||||||
|
|
||||||
|
|
||||||
def upgrade():
|
|
||||||
# ### commands auto generated by Alembic - please adjust! ###
|
|
||||||
with op.batch_alter_table('future_task', schema=None) as batch_op:
|
|
||||||
batch_op.add_column(sa.Column('archived_for_process_instance_status', sa.Boolean(), nullable=False))
|
|
||||||
batch_op.create_index(batch_op.f('ix_future_task_archived_for_process_instance_status'), ['archived_for_process_instance_status'], unique=False)
|
|
||||||
|
|
||||||
# ### end Alembic commands ###
|
|
||||||
|
|
||||||
|
|
||||||
def downgrade():
|
|
||||||
# ### commands auto generated by Alembic - please adjust! ###
|
|
||||||
with op.batch_alter_table('future_task', schema=None) as batch_op:
|
|
||||||
batch_op.drop_index(batch_op.f('ix_future_task_archived_for_process_instance_status'))
|
|
||||||
batch_op.drop_column('archived_for_process_instance_status')
|
|
||||||
|
|
||||||
# ### end Alembic commands ###
|
|
|
@ -6,7 +6,6 @@ from sqlalchemy import and_
|
||||||
from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import (
|
from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import (
|
||||||
queue_future_task_if_appropriate,
|
queue_future_task_if_appropriate,
|
||||||
)
|
)
|
||||||
from spiffworkflow_backend.models.db import db
|
|
||||||
from spiffworkflow_backend.models.future_task import FutureTaskModel
|
from spiffworkflow_backend.models.future_task import FutureTaskModel
|
||||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
|
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
|
||||||
|
@ -58,46 +57,24 @@ class BackgroundProcessingService:
|
||||||
ProcessInstanceLockService.remove_stale_locks()
|
ProcessInstanceLockService.remove_stale_locks()
|
||||||
|
|
||||||
def process_future_tasks(self) -> None:
|
def process_future_tasks(self) -> None:
|
||||||
"""Timer related tasks go in the future_task table.
|
"""If something has been locked for a certain amount of time it is probably stale so unlock it."""
|
||||||
|
|
||||||
Celery is not great at scheduling things in the distant future. So this function periodically checks the future_task
|
|
||||||
table and puts tasks into the queue that are imminently ready to run. Imminently is configurable and defaults to those
|
|
||||||
that are 5 minutes away or less.
|
|
||||||
"""
|
|
||||||
|
|
||||||
with self.app.app_context():
|
with self.app.app_context():
|
||||||
future_task_lookahead_in_seconds = self.app.config[
|
future_task_lookahead_in_seconds = self.app.config[
|
||||||
"SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_FUTURE_TASK_LOOKAHEAD_IN_SECONDS"
|
"SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_FUTURE_TASK_LOOKAHEAD_IN_SECONDS"
|
||||||
]
|
]
|
||||||
self.__class__.do_process_future_tasks(future_task_lookahead_in_seconds)
|
lookahead = time.time() + future_task_lookahead_in_seconds
|
||||||
|
future_tasks = FutureTaskModel.query.filter(
|
||||||
@classmethod
|
and_(
|
||||||
def do_process_future_tasks(cls, future_task_lookahead_in_seconds: int) -> None:
|
FutureTaskModel.completed == False, # noqa: E712
|
||||||
future_tasks = cls.imminent_future_tasks(future_task_lookahead_in_seconds)
|
FutureTaskModel.run_at_in_seconds < lookahead,
|
||||||
|
)
|
||||||
|
).all()
|
||||||
for future_task in future_tasks:
|
for future_task in future_tasks:
|
||||||
process_instance = (
|
process_instance = (
|
||||||
ProcessInstanceModel.query.join(TaskModel, TaskModel.process_instance_id == ProcessInstanceModel.id)
|
ProcessInstanceModel.query.join(TaskModel, TaskModel.process_instance_id == ProcessInstanceModel.id)
|
||||||
.filter(TaskModel.guid == future_task.guid)
|
.filter(TaskModel.guid == future_task.guid)
|
||||||
.first()
|
.first()
|
||||||
)
|
)
|
||||||
if process_instance.allowed_to_run():
|
|
||||||
queue_future_task_if_appropriate(
|
queue_future_task_if_appropriate(
|
||||||
process_instance, eta_in_seconds=future_task.run_at_in_seconds, task_guid=future_task.guid
|
process_instance, eta_in_seconds=future_task.run_at_in_seconds, task_guid=future_task.guid
|
||||||
)
|
)
|
||||||
else:
|
|
||||||
# if we are not allowed to run the process instance, we should not keep processing the future task
|
|
||||||
future_task.archived_for_process_instance_status = True
|
|
||||||
db.session.add(future_task)
|
|
||||||
db.session.commit()
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def imminent_future_tasks(cls, future_task_lookahead_in_seconds: int) -> list[FutureTaskModel]:
|
|
||||||
lookahead = time.time() + future_task_lookahead_in_seconds
|
|
||||||
future_tasks: list[FutureTaskModel] = FutureTaskModel.query.filter(
|
|
||||||
and_(
|
|
||||||
FutureTaskModel.completed == False, # noqa: E712
|
|
||||||
FutureTaskModel.archived_for_process_instance_status == False, # noqa: E712
|
|
||||||
FutureTaskModel.run_at_in_seconds < lookahead,
|
|
||||||
)
|
|
||||||
).all()
|
|
||||||
return future_tasks
|
|
||||||
|
|
|
@ -16,12 +16,7 @@ def queue_future_task_if_appropriate(process_instance: ProcessInstanceModel, eta
|
||||||
if queue_enabled_for_process_model(process_instance):
|
if queue_enabled_for_process_model(process_instance):
|
||||||
buffer = 1
|
buffer = 1
|
||||||
countdown = eta_in_seconds - time.time() + buffer
|
countdown = eta_in_seconds - time.time() + buffer
|
||||||
args_to_celery = {
|
args_to_celery = {"process_instance_id": process_instance.id, "task_guid": task_guid}
|
||||||
"process_instance_id": process_instance.id,
|
|
||||||
"task_guid": task_guid,
|
|
||||||
# the producer_identifier is so we can know what is putting messages in the queue
|
|
||||||
"producer_identifier": "future_task",
|
|
||||||
}
|
|
||||||
# add buffer to countdown to avoid rounding issues and race conditions with spiff. the situation we want to avoid is where
|
# add buffer to countdown to avoid rounding issues and race conditions with spiff. the situation we want to avoid is where
|
||||||
# we think the timer said to run it at 6:34:11, and we initialize the SpiffWorkflow library,
|
# we think the timer said to run it at 6:34:11, and we initialize the SpiffWorkflow library,
|
||||||
# expecting the timer to be ready, but the library considered it ready a little after that time
|
# expecting the timer to be ready, but the library considered it ready a little after that time
|
||||||
|
|
|
@ -17,7 +17,6 @@ class FutureTaskModel(SpiffworkflowBaseDBModel):
|
||||||
guid: str = db.Column(db.String(36), primary_key=True)
|
guid: str = db.Column(db.String(36), primary_key=True)
|
||||||
run_at_in_seconds: int = db.Column(db.Integer, nullable=False, index=True)
|
run_at_in_seconds: int = db.Column(db.Integer, nullable=False, index=True)
|
||||||
completed: bool = db.Column(db.Boolean, default=False, nullable=False, index=True)
|
completed: bool = db.Column(db.Boolean, default=False, nullable=False, index=True)
|
||||||
archived_for_process_instance_status: bool = db.Column(db.Boolean, default=False, nullable=False, index=True)
|
|
||||||
|
|
||||||
updated_at_in_seconds: int = db.Column(db.Integer, nullable=False)
|
updated_at_in_seconds: int = db.Column(db.Integer, nullable=False)
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,4 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from flask_sqlalchemy.query import Query
|
|
||||||
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
|
|
||||||
from spiffworkflow_backend.models.future_task import FutureTaskModel
|
|
||||||
|
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
|
@ -51,9 +48,7 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel):
|
||||||
process_model_display_name: str = db.Column(db.String(255), nullable=False, index=True)
|
process_model_display_name: str = db.Column(db.String(255), nullable=False, index=True)
|
||||||
process_initiator_id: int = db.Column(ForeignKey(UserModel.id), nullable=False, index=True) # type: ignore
|
process_initiator_id: int = db.Column(ForeignKey(UserModel.id), nullable=False, index=True) # type: ignore
|
||||||
bpmn_process_definition_id: int | None = db.Column(
|
bpmn_process_definition_id: int | None = db.Column(
|
||||||
ForeignKey(BpmnProcessDefinitionModel.id), # type: ignore
|
ForeignKey(BpmnProcessDefinitionModel.id), nullable=True, index=True # type: ignore
|
||||||
nullable=True,
|
|
||||||
index=True,
|
|
||||||
)
|
)
|
||||||
bpmn_process_id: int | None = db.Column(ForeignKey(BpmnProcessModel.id), nullable=True, index=True) # type: ignore
|
bpmn_process_id: int | None = db.Column(ForeignKey(BpmnProcessModel.id), nullable=True, index=True) # type: ignore
|
||||||
|
|
||||||
|
@ -122,16 +117,6 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel):
|
||||||
"""
|
"""
|
||||||
return self.bpmn_process_definition_id is not None and self.bpmn_process_id is not None
|
return self.bpmn_process_definition_id is not None and self.bpmn_process_id is not None
|
||||||
|
|
||||||
def future_tasks_query(self) -> Query:
|
|
||||||
future_tasks: Query = (
|
|
||||||
FutureTaskModel.query.filter(
|
|
||||||
FutureTaskModel.completed == False, # noqa: E712
|
|
||||||
)
|
|
||||||
.join(TaskModel, TaskModel.guid == FutureTaskModel.guid)
|
|
||||||
.filter(TaskModel.process_instance_id == self.id)
|
|
||||||
)
|
|
||||||
return future_tasks
|
|
||||||
|
|
||||||
def serialized(self) -> dict[str, Any]:
|
def serialized(self) -> dict[str, Any]:
|
||||||
"""Return object data in serializeable format."""
|
"""Return object data in serializeable format."""
|
||||||
return {
|
return {
|
||||||
|
@ -167,10 +152,6 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel):
|
||||||
def can_submit_task(self) -> bool:
|
def can_submit_task(self) -> bool:
|
||||||
return not self.has_terminal_status() and self.status != "suspended"
|
return not self.has_terminal_status() and self.status != "suspended"
|
||||||
|
|
||||||
def allowed_to_run(self) -> bool:
|
|
||||||
"""If this process can currently move forward with things like do_engine_steps."""
|
|
||||||
return not self.has_terminal_status() and self.status != "suspended"
|
|
||||||
|
|
||||||
def can_receive_message(self) -> bool:
|
def can_receive_message(self) -> bool:
|
||||||
"""If this process can currently accept messages."""
|
"""If this process can currently accept messages."""
|
||||||
return not self.has_terminal_status() and self.status != "suspended"
|
return not self.has_terminal_status() and self.status != "suspended"
|
||||||
|
|
|
@ -1,5 +1,3 @@
|
||||||
from spiffworkflow_backend.models.future_task import FutureTaskModel
|
|
||||||
|
|
||||||
# TODO: clean up this service for a clear distinction between it and the process_instance_service
|
# TODO: clean up this service for a clear distinction between it and the process_instance_service
|
||||||
# where this points to the pi service
|
# where this points to the pi service
|
||||||
import copy
|
import copy
|
||||||
|
@ -611,9 +609,9 @@ class ProcessInstanceProcessor:
|
||||||
bpmn_process_definition_dict: dict = bpmn_subprocess_definition.properties_json
|
bpmn_process_definition_dict: dict = bpmn_subprocess_definition.properties_json
|
||||||
spiff_bpmn_process_dict["subprocess_specs"][bpmn_subprocess_definition.bpmn_identifier] = bpmn_process_definition_dict
|
spiff_bpmn_process_dict["subprocess_specs"][bpmn_subprocess_definition.bpmn_identifier] = bpmn_process_definition_dict
|
||||||
spiff_bpmn_process_dict["subprocess_specs"][bpmn_subprocess_definition.bpmn_identifier]["task_specs"] = {}
|
spiff_bpmn_process_dict["subprocess_specs"][bpmn_subprocess_definition.bpmn_identifier]["task_specs"] = {}
|
||||||
bpmn_subprocess_definition_bpmn_identifiers[
|
bpmn_subprocess_definition_bpmn_identifiers[bpmn_subprocess_definition.id] = (
|
||||||
bpmn_subprocess_definition.id
|
bpmn_subprocess_definition.bpmn_identifier
|
||||||
] = bpmn_subprocess_definition.bpmn_identifier
|
)
|
||||||
|
|
||||||
task_definitions = TaskDefinitionModel.query.filter(
|
task_definitions = TaskDefinitionModel.query.filter(
|
||||||
TaskDefinitionModel.bpmn_process_definition_id.in_(bpmn_subprocess_definition_bpmn_identifiers.keys()) # type: ignore
|
TaskDefinitionModel.bpmn_process_definition_id.in_(bpmn_subprocess_definition_bpmn_identifiers.keys()) # type: ignore
|
||||||
|
@ -1812,20 +1810,9 @@ class ProcessInstanceProcessor:
|
||||||
)
|
)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
def bring_archived_future_tasks_back_to_life(self) -> None:
|
|
||||||
archived_future_tasks = (
|
|
||||||
self.process_instance_model.future_tasks_query()
|
|
||||||
.filter(FutureTaskModel.archived_for_process_instance_status == True) # noqa: E712
|
|
||||||
.all()
|
|
||||||
)
|
|
||||||
for archived_future_task in archived_future_tasks:
|
|
||||||
archived_future_task.archived_for_process_instance_status = False
|
|
||||||
db.session.add(archived_future_task)
|
|
||||||
|
|
||||||
def resume(self) -> None:
|
def resume(self) -> None:
|
||||||
self.process_instance_model.status = ProcessInstanceStatus.waiting.value
|
self.process_instance_model.status = ProcessInstanceStatus.waiting.value
|
||||||
db.session.add(self.process_instance_model)
|
db.session.add(self.process_instance_model)
|
||||||
self.bring_archived_future_tasks_back_to_life()
|
|
||||||
ProcessInstanceTmpService.add_event_to_process_instance(
|
ProcessInstanceTmpService.add_event_to_process_instance(
|
||||||
self.process_instance_model, ProcessInstanceEventType.process_instance_resumed.value
|
self.process_instance_model, ProcessInstanceEventType.process_instance_resumed.value
|
||||||
)
|
)
|
||||||
|
|
|
@ -42,7 +42,6 @@ from spiffworkflow_backend.models.process_instance_file_data import ProcessInsta
|
||||||
from spiffworkflow_backend.models.process_model import ProcessModelInfo
|
from spiffworkflow_backend.models.process_model import ProcessModelInfo
|
||||||
from spiffworkflow_backend.models.process_model_cycle import ProcessModelCycleModel
|
from spiffworkflow_backend.models.process_model_cycle import ProcessModelCycleModel
|
||||||
from spiffworkflow_backend.models.task import Task
|
from spiffworkflow_backend.models.task import Task
|
||||||
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
|
|
||||||
from spiffworkflow_backend.models.user import UserModel
|
from spiffworkflow_backend.models.user import UserModel
|
||||||
from spiffworkflow_backend.services.authorization_service import AuthorizationService
|
from spiffworkflow_backend.services.authorization_service import AuthorizationService
|
||||||
from spiffworkflow_backend.services.git_service import GitCommandError
|
from spiffworkflow_backend.services.git_service import GitCommandError
|
||||||
|
|
|
@ -1,80 +0,0 @@
|
||||||
from flask import Flask
|
|
||||||
from pytest_mock.plugin import MockerFixture
|
|
||||||
from spiffworkflow_backend.background_processing.background_processing_service import BackgroundProcessingService
|
|
||||||
from spiffworkflow_backend.models.db import db
|
|
||||||
from spiffworkflow_backend.models.future_task import FutureTaskModel
|
|
||||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
|
||||||
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
|
|
||||||
|
|
||||||
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
|
||||||
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
|
|
||||||
|
|
||||||
|
|
||||||
class TestBackgroundProcessingService(BaseTest):
|
|
||||||
def test_process_future_tasks_with_no_future_tasks(
|
|
||||||
self,
|
|
||||||
app: Flask,
|
|
||||||
with_db_and_bpmn_file_cleanup: None,
|
|
||||||
) -> None:
|
|
||||||
BackgroundProcessingService(app).process_future_tasks()
|
|
||||||
|
|
||||||
def test_do_process_future_tasks_with_processable_future_task(
|
|
||||||
self,
|
|
||||||
app: Flask,
|
|
||||||
mocker: MockerFixture,
|
|
||||||
with_db_and_bpmn_file_cleanup: None,
|
|
||||||
) -> None:
|
|
||||||
with self.app_config_mock(app, "SPIFFWORKFLOW_BACKEND_CELERY_ENABLED", True):
|
|
||||||
mock = mocker.patch("celery.current_app.send_task")
|
|
||||||
process_instance = self._load_up_a_future_task_and_return_instance()
|
|
||||||
assert mock.call_count == 0
|
|
||||||
BackgroundProcessingService.do_process_future_tasks(99999999999999999)
|
|
||||||
assert mock.call_count == 1
|
|
||||||
future_tasks = FutureTaskModel.query.all()
|
|
||||||
assert len(future_tasks) == 1
|
|
||||||
assert future_tasks[0].archived_for_process_instance_status is False
|
|
||||||
|
|
||||||
def test_do_process_future_tasks_with_unprocessable_future_task(
|
|
||||||
self,
|
|
||||||
app: Flask,
|
|
||||||
mocker: MockerFixture,
|
|
||||||
with_db_and_bpmn_file_cleanup: None,
|
|
||||||
) -> None:
|
|
||||||
with self.app_config_mock(app, "SPIFFWORKFLOW_BACKEND_CELERY_ENABLED", True):
|
|
||||||
mock = mocker.patch("celery.current_app.send_task")
|
|
||||||
process_instance = self._load_up_a_future_task_and_return_instance()
|
|
||||||
assert mock.call_count == 0
|
|
||||||
process_instance.status = "suspended"
|
|
||||||
db.session.add(process_instance)
|
|
||||||
db.session.commit()
|
|
||||||
future_tasks = BackgroundProcessingService.imminent_future_tasks(99999999999999999)
|
|
||||||
assert len(future_tasks) == 1
|
|
||||||
BackgroundProcessingService.do_process_future_tasks(99999999999999999)
|
|
||||||
# should not process anything, so nothing goes to queue
|
|
||||||
assert mock.call_count == 0
|
|
||||||
future_tasks = FutureTaskModel.query.all()
|
|
||||||
assert len(future_tasks) == 1
|
|
||||||
assert future_tasks[0].archived_for_process_instance_status is True
|
|
||||||
|
|
||||||
# the next time do_process_future_tasks runs, it will not consider this task, which is nice
|
|
||||||
future_tasks = BackgroundProcessingService.imminent_future_tasks(99999999999999999)
|
|
||||||
assert len(future_tasks) == 0
|
|
||||||
processor = ProcessInstanceProcessor(process_instance)
|
|
||||||
processor.resume()
|
|
||||||
future_tasks = BackgroundProcessingService.imminent_future_tasks(99999999999999999)
|
|
||||||
assert len(future_tasks) == 1
|
|
||||||
|
|
||||||
def _load_up_a_future_task_and_return_instance(self) -> ProcessInstanceModel:
|
|
||||||
process_model = load_test_spec(
|
|
||||||
process_model_id="test_group/user-task-with-timer",
|
|
||||||
process_model_source_directory="user-task-with-timer",
|
|
||||||
)
|
|
||||||
process_instance = self.create_process_instance_from_process_model(process_model=process_model)
|
|
||||||
processor = ProcessInstanceProcessor(process_instance)
|
|
||||||
processor.do_engine_steps(save=True)
|
|
||||||
|
|
||||||
assert process_instance.status == "user_input_required"
|
|
||||||
|
|
||||||
future_tasks = FutureTaskModel.query.all()
|
|
||||||
assert len(future_tasks) == 1
|
|
||||||
return process_instance
|
|
Loading…
Reference in New Issue