Celery avoid requeueing (#1828)

* some debug code and potential fixes to avoid requeueing future tasks w/ burnettk

* actually insert the queued time into the future task table w/ burnettk

* fixed tests w/ burnettk

* handle missing process instances better from celery worker and updated some comments w/ burnettk

* added comment w/ burnettk

* minor fixes w/ burnettk

* added test for should_schedule_waiting_timer_events w/ burnettk

* added test to ensure we do not queue recently queued tasks again w/ burnettk

* remove traceback code

---------

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
Co-authored-by: burnettk <burnettk@users.noreply.github.com>
This commit is contained in:
jasquat 2024-06-26 13:48:28 -04:00 committed by GitHub
parent e0b4a48905
commit 1d79d77686
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 286 additions and 48 deletions

View File

@ -0,0 +1,20 @@
#!/usr/bin/env bash
function error_handler() {
echo >&2 "Exited with BAD EXIT CODE '${2}' in ${0} script at line: ${1}."
exit "$2"
}
trap 'error_handler ${LINENO} $?' ERR
set -o errtrace -o errexit -o nounset -o pipefail
script_dir="$(
cd -- "$(dirname "$0")" >/dev/null 2>&1
pwd -P
)"
. "${script_dir}/local_development_environment_setup"
export SPIFFWORKFLOW_BACKEND_CELERY_ENABLED=true
poetry run celery -A src.spiffworkflow_backend.background_processing.celery_worker purge -f
# poetry run celery -A src.spiffworkflow_backend.background_processing.celery_worker inspect active

View File

@ -0,0 +1,34 @@
"""empty message
Revision ID: fc5815a9d482
Revises: 7eaec0e12079
Create Date: 2024-06-25 16:05:40.787119
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'fc5815a9d482'
down_revision = '7eaec0e12079'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('future_task', schema=None) as batch_op:
batch_op.add_column(sa.Column('queued_to_run_at_in_seconds', sa.Integer(), nullable=True))
batch_op.create_index(batch_op.f('ix_future_task_queued_to_run_at_in_seconds'), ['queued_to_run_at_in_seconds'], unique=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('future_task', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_future_task_queued_to_run_at_in_seconds'))
batch_op.drop_column('queued_to_run_at_in_seconds')
# ### end Alembic commands ###

View File

@ -2,6 +2,7 @@ import time
import flask import flask
from sqlalchemy import and_ from sqlalchemy import and_
from sqlalchemy import or_
from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import ( from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import (
queue_future_task_if_appropriate, queue_future_task_if_appropriate,
@ -98,6 +99,10 @@ class BackgroundProcessingService:
FutureTaskModel.completed == False, # noqa: E712 FutureTaskModel.completed == False, # noqa: E712
FutureTaskModel.archived_for_process_instance_status == False, # noqa: E712 FutureTaskModel.archived_for_process_instance_status == False, # noqa: E712
FutureTaskModel.run_at_in_seconds < lookahead, FutureTaskModel.run_at_in_seconds < lookahead,
or_(
FutureTaskModel.queued_to_run_at_in_seconds != FutureTaskModel.run_at_in_seconds,
FutureTaskModel.queued_to_run_at_in_seconds == None, # noqa: E711
),
) )
).all() ).all()
return future_tasks return future_tasks

View File

@ -23,6 +23,7 @@ def celery_init_app(app: flask.app.Flask) -> Celery:
"result_serializer": "json", "result_serializer": "json",
"accept_content": ["json"], "accept_content": ["json"],
"enable_utc": True, "enable_utc": True,
"worker_redirect_stdouts_level": "DEBUG",
} }
celery_app = Celery(app.name) celery_app = Celery(app.name)

View File

@ -2,9 +2,6 @@ from billiard import current_process # type: ignore
from celery import shared_task from celery import shared_task
from flask import current_app from flask import current_app
from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import (
queue_future_task_if_appropriate,
)
from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import ( from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import (
queue_process_instance_if_appropriate, queue_process_instance_if_appropriate,
) )
@ -26,31 +23,44 @@ class SpiffCeleryWorkerError(Exception):
pass pass
@shared_task(ignore_result=False, time_limit=ten_minutes) # ignore types so we can use self and get the celery task id from self.request.id.
def celery_task_process_instance_run(process_instance_id: int, task_guid: str | None = None) -> dict: @shared_task(ignore_result=False, time_limit=ten_minutes, bind=True)
def celery_task_process_instance_run(self, process_instance_id: int, task_guid: str | None = None) -> dict: # type: ignore
proc_index = current_process().index proc_index = current_process().index
message = f"celery_task_process_instance_run: process_instance_id: {process_instance_id}" celery_task_id = self.request.id
logger_prefix = f"celery_task_process_instance_run[{celery_task_id}]"
worker_intro_log_message = f"{logger_prefix}: process_instance_id: {process_instance_id}"
if task_guid: if task_guid:
message += f" task_guid: {task_guid}" worker_intro_log_message += f" task_guid: {task_guid}"
current_app.logger.info(message) current_app.logger.info(worker_intro_log_message)
ProcessInstanceLockService.set_thread_local_locking_context("celery:worker") ProcessInstanceLockService.set_thread_local_locking_context("celery:worker")
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first() process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first()
if task_guid is None and ProcessInstanceTmpService.is_enqueued_to_run_in_the_future(process_instance): skipped_mesage = None
if process_instance is None:
skipped_mesage = "Skipped because the process instance no longer exists in the database. It could have been deleted."
elif task_guid is None and ProcessInstanceTmpService.is_enqueued_to_run_in_the_future(process_instance):
skipped_mesage = "Skipped because the process instance is set to run in the future."
if skipped_mesage is not None:
return { return {
"ok": True, "ok": True,
"process_instance_id": process_instance_id, "process_instance_id": process_instance_id,
"task_guid": task_guid, "task_guid": task_guid,
"message": "Skipped because the process instance is set to run in the future.", "message": skipped_mesage,
} }
try: try:
task_guid_for_requeueing = task_guid task_guid_for_requeueing = task_guid
with ProcessInstanceQueueService.dequeued(process_instance): with ProcessInstanceQueueService.dequeued(process_instance):
# run ready tasks to force them to run in case they have instructions on them since queue_instructions_for_end_user
# has a should_break_before that will exit if there are instructions.
ProcessInstanceService.run_process_instance_with_processor( ProcessInstanceService.run_process_instance_with_processor(
process_instance, execution_strategy_name="run_current_ready_tasks" process_instance, execution_strategy_name="run_current_ready_tasks", should_schedule_waiting_timer_events=False
) )
# we need to save instructions to the db so the frontend progress page can view them,
# and this is the only way to do it
_processor, task_runnability = ProcessInstanceService.run_process_instance_with_processor( _processor, task_runnability = ProcessInstanceService.run_process_instance_with_processor(
process_instance, process_instance,
execution_strategy_name="queue_instructions_for_end_user", execution_strategy_name="queue_instructions_for_end_user",
@ -76,17 +86,14 @@ def celery_task_process_instance_run(process_instance_id: int, task_guid: str |
return {"ok": True, "process_instance_id": process_instance_id, "task_guid": task_guid} return {"ok": True, "process_instance_id": process_instance_id, "task_guid": task_guid}
except ProcessInstanceIsAlreadyLockedError as exception: except ProcessInstanceIsAlreadyLockedError as exception:
current_app.logger.info( current_app.logger.info(
f"Could not run process instance with worker: {current_app.config['PROCESS_UUID']} - {proc_index}. Error was:" f"{logger_prefix}: Could not run process instance with worker: {current_app.config['PROCESS_UUID']}"
f" {str(exception)}" f" - {proc_index}. Error was: {str(exception)}"
) )
# NOTE: consider exponential backoff
queue_future_task_if_appropriate(process_instance, eta_in_seconds=10, task_guid=task_guid)
return {"ok": False, "process_instance_id": process_instance_id, "task_guid": task_guid, "exception": str(exception)} return {"ok": False, "process_instance_id": process_instance_id, "task_guid": task_guid, "exception": str(exception)}
except Exception as exception: except Exception as exception:
db.session.rollback() # in case the above left the database with a bad transaction db.session.rollback() # in case the above left the database with a bad transaction
error_message = ( error_message = (
f"Error running process_instance {process_instance.id} " f"{logger_prefix}: Error running process_instance {process_instance_id} task_guid {task_guid}. {str(exception)}"
+ f"({process_instance.process_model_identifier}) and task_guid {task_guid}. {str(exception)}"
) )
current_app.logger.error(error_message) current_app.logger.error(error_message)
db.session.add(process_instance) db.session.add(process_instance)

View File

@ -52,7 +52,11 @@ def queue_future_task_if_appropriate(
# celery_task_process_instance_run.apply_async(kwargs=args_to_celery, countdown=countdown + 1) # type: ignore # celery_task_process_instance_run.apply_async(kwargs=args_to_celery, countdown=countdown + 1) # type: ignore
async_result = celery.current_app.send_task(CELERY_TASK_PROCESS_INSTANCE_RUN, kwargs=args_to_celery, countdown=countdown) async_result = celery.current_app.send_task(CELERY_TASK_PROCESS_INSTANCE_RUN, kwargs=args_to_celery, countdown=countdown)
current_app.logger.info(f"Queueing process instance ({process_instance.id}) for celery ({async_result.task_id})") message = (
f"Queueing process instance ({process_instance.id}) for future task ({task_guid}). "
f"new celery task id: ({async_result.task_id})"
)
current_app.logger.info(message)
return True return True
return False return False

View File

@ -1,3 +1,4 @@
import copy
import time import time
from dataclasses import dataclass from dataclasses import dataclass
@ -19,6 +20,7 @@ class FutureTaskModel(SpiffworkflowBaseDBModel):
guid: str = db.Column(ForeignKey(TaskModel.guid, ondelete="CASCADE", name="future_task_task_guid_fk"), primary_key=True) guid: str = db.Column(ForeignKey(TaskModel.guid, ondelete="CASCADE", name="future_task_task_guid_fk"), primary_key=True)
run_at_in_seconds: int = db.Column(db.Integer, nullable=False, index=True) run_at_in_seconds: int = db.Column(db.Integer, nullable=False, index=True)
queued_to_run_at_in_seconds: int = db.Column(db.Integer, nullable=True, index=True)
completed: bool = db.Column(db.Boolean, default=False, nullable=False, index=True) completed: bool = db.Column(db.Boolean, default=False, nullable=False, index=True)
archived_for_process_instance_status: bool = db.Column( archived_for_process_instance_status: bool = db.Column(
db.Boolean, db.Boolean,
@ -31,20 +33,23 @@ class FutureTaskModel(SpiffworkflowBaseDBModel):
updated_at_in_seconds: int = db.Column(db.Integer, nullable=False) updated_at_in_seconds: int = db.Column(db.Integer, nullable=False)
@classmethod @classmethod
def insert_or_update(cls, guid: str, run_at_in_seconds: int) -> None: def insert_or_update(cls, guid: str, run_at_in_seconds: int, queued_to_run_at_in_seconds: int | None = None) -> None:
task_info = [ task_info: dict[str, int | str | None] = {
{ "guid": guid,
"guid": guid, "run_at_in_seconds": run_at_in_seconds,
"run_at_in_seconds": run_at_in_seconds, "updated_at_in_seconds": round(time.time()),
"updated_at_in_seconds": round(time.time()), }
}
] if queued_to_run_at_in_seconds is not None:
task_info["queued_to_run_at_in_seconds"] = queued_to_run_at_in_seconds
new_values = copy.copy(task_info)
del new_values["guid"]
on_duplicate_key_stmt = None on_duplicate_key_stmt = None
if current_app.config["SPIFFWORKFLOW_BACKEND_DATABASE_TYPE"] == "mysql": if current_app.config["SPIFFWORKFLOW_BACKEND_DATABASE_TYPE"] == "mysql":
insert_stmt = mysql_insert(FutureTaskModel).values(task_info) insert_stmt = mysql_insert(FutureTaskModel).values(task_info)
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update( on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(**new_values)
run_at_in_seconds=insert_stmt.inserted.run_at_in_seconds, updated_at_in_seconds=round(time.time())
)
else: else:
insert_stmt = None insert_stmt = None
if current_app.config["SPIFFWORKFLOW_BACKEND_DATABASE_TYPE"] == "sqlite": if current_app.config["SPIFFWORKFLOW_BACKEND_DATABASE_TYPE"] == "sqlite":
@ -53,6 +58,6 @@ class FutureTaskModel(SpiffworkflowBaseDBModel):
insert_stmt = postgres_insert(FutureTaskModel).values(task_info) insert_stmt = postgres_insert(FutureTaskModel).values(task_info)
on_duplicate_key_stmt = insert_stmt.on_conflict_do_update( on_duplicate_key_stmt = insert_stmt.on_conflict_do_update(
index_elements=["guid"], index_elements=["guid"],
set_={"run_at_in_seconds": run_at_in_seconds, "updated_at_in_seconds": round(time.time())}, set_=new_values,
) )
db.session.execute(on_duplicate_key_stmt) db.session.execute(on_duplicate_key_stmt)

View File

@ -18,6 +18,12 @@ class ProcessInstanceQueueModel(SpiffworkflowBaseDBModel):
locked_at_in_seconds: int | None = db.Column(db.Integer, index=True, nullable=True) locked_at_in_seconds: int | None = db.Column(db.Integer, index=True, nullable=True)
status: str = db.Column(db.String(50), index=True) status: str = db.Column(db.String(50), index=True)
# for timers. right now the apscheduler jobs without celery check for waiting process instances.
# if the instance's run_at_in_seconds is now or earlier, the instance will run.
# so we can save some effort if we detect that it is scheduled to run later.
# note that we still run an apscheduler job to manage timer start events, even if
# SPIFFWORKFLOW_BACKEND_CELERY_ENABLED=true
run_at_in_seconds: int = db.Column(db.Integer) run_at_in_seconds: int = db.Column(db.Integer)
updated_at_in_seconds: int = db.Column(db.Integer) updated_at_in_seconds: int = db.Column(db.Integer)
created_at_in_seconds: int = db.Column(db.Integer) created_at_in_seconds: int = db.Column(db.Integer)

View File

@ -1438,19 +1438,27 @@ class ProcessInstanceProcessor:
save: bool = False, save: bool = False,
execution_strategy_name: str | None = None, execution_strategy_name: str | None = None,
execution_strategy: ExecutionStrategy | None = None, execution_strategy: ExecutionStrategy | None = None,
should_schedule_waiting_timer_events: bool = True,
) -> TaskRunnability: ) -> TaskRunnability:
if self.process_instance_model.persistence_level != "none": if self.process_instance_model.persistence_level != "none":
with ProcessInstanceQueueService.dequeued(self.process_instance_model): with ProcessInstanceQueueService.dequeued(self.process_instance_model):
# TODO: ideally we just lock in the execution service, but not sure # TODO: ideally we just lock in the execution service, but not sure
# about _add_bpmn_process_definitions and if that needs to happen in # about _add_bpmn_process_definitions and if that needs to happen in
# the same lock like it does on main # the same lock like it does on main
return self._do_engine_steps(exit_at, save, execution_strategy_name, execution_strategy) return self._do_engine_steps(
exit_at,
save,
execution_strategy_name,
execution_strategy,
should_schedule_waiting_timer_events=should_schedule_waiting_timer_events,
)
else: else:
return self._do_engine_steps( return self._do_engine_steps(
exit_at, exit_at,
save=False, save=False,
execution_strategy_name=execution_strategy_name, execution_strategy_name=execution_strategy_name,
execution_strategy=execution_strategy, execution_strategy=execution_strategy,
should_schedule_waiting_timer_events=should_schedule_waiting_timer_events,
) )
def _do_engine_steps( def _do_engine_steps(
@ -1459,6 +1467,7 @@ class ProcessInstanceProcessor:
save: bool = False, save: bool = False,
execution_strategy_name: str | None = None, execution_strategy_name: str | None = None,
execution_strategy: ExecutionStrategy | None = None, execution_strategy: ExecutionStrategy | None = None,
should_schedule_waiting_timer_events: bool = True,
) -> TaskRunnability: ) -> TaskRunnability:
self._add_bpmn_process_definitions( self._add_bpmn_process_definitions(
self.serialize(), self.serialize(),
@ -1488,7 +1497,11 @@ class ProcessInstanceProcessor:
self._script_engine.environment.finalize_result, self._script_engine.environment.finalize_result,
self.save, self.save,
) )
task_runnability = execution_service.run_and_save(exit_at, save) task_runnability = execution_service.run_and_save(
exit_at,
save,
should_schedule_waiting_timer_events=should_schedule_waiting_timer_events,
)
self.check_all_tasks() self.check_all_tasks()
return task_runnability return task_runnability

View File

@ -38,7 +38,6 @@ from spiffworkflow_backend.models.process_instance_file_data import ProcessInsta
from spiffworkflow_backend.models.process_model import ProcessModelInfo from spiffworkflow_backend.models.process_model import ProcessModelInfo
from spiffworkflow_backend.models.process_model_cycle import ProcessModelCycleModel from spiffworkflow_backend.models.process_model_cycle import ProcessModelCycleModel
from spiffworkflow_backend.models.task import Task from spiffworkflow_backend.models.task import Task
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.authorization_service import AuthorizationService from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.error_handling_service import ErrorHandlingService from spiffworkflow_backend.services.error_handling_service import ErrorHandlingService
@ -280,6 +279,7 @@ class ProcessInstanceService:
process_instance: ProcessInstanceModel, process_instance: ProcessInstanceModel,
status_value: str | None = None, status_value: str | None = None,
execution_strategy_name: str | None = None, execution_strategy_name: str | None = None,
should_schedule_waiting_timer_events: bool = True,
) -> tuple[ProcessInstanceProcessor | None, TaskRunnability]: ) -> tuple[ProcessInstanceProcessor | None, TaskRunnability]:
processor = None processor = None
task_runnability = TaskRunnability.unknown_if_ready_tasks task_runnability = TaskRunnability.unknown_if_ready_tasks
@ -302,6 +302,7 @@ class ProcessInstanceService:
task_runnability = processor.do_engine_steps( task_runnability = processor.do_engine_steps(
save=True, save=True,
execution_strategy_name=execution_strategy_name, execution_strategy_name=execution_strategy_name,
should_schedule_waiting_timer_events=should_schedule_waiting_timer_events,
) )
return (processor, task_runnability) return (processor, task_runnability)

View File

@ -358,6 +358,8 @@ class QueueInstructionsForEndUserExecutionStrategy(ExecutionStrategy):
JinjaService.add_instruction_for_end_user_if_appropriate(tasks, process_instance_model.id, self.tasks_that_have_been_seen) JinjaService.add_instruction_for_end_user_if_appropriate(tasks, process_instance_model.id, self.tasks_that_have_been_seen)
def should_break_before(self, tasks: list[SpiffTask], process_instance_model: ProcessInstanceModel) -> bool: def should_break_before(self, tasks: list[SpiffTask], process_instance_model: ProcessInstanceModel) -> bool:
# exit if there are instructionsForEndUser so the instructions can be comitted to the db using the normal save method
# for the process instance.
for spiff_task in tasks: for spiff_task in tasks:
if hasattr(spiff_task.task_spec, "extensions") and spiff_task.task_spec.extensions.get( if hasattr(spiff_task.task_spec, "extensions") and spiff_task.task_spec.extensions.get(
"instructionsForEndUser", None "instructionsForEndUser", None
@ -451,7 +453,12 @@ class WorkflowExecutionService:
# run # run
# execution_strategy.spiff_run # execution_strategy.spiff_run
# spiff.[some_run_task_method] # spiff.[some_run_task_method]
def run_and_save(self, exit_at: None = None, save: bool = False) -> TaskRunnability: def run_and_save(
self,
exit_at: None = None,
save: bool = False,
should_schedule_waiting_timer_events: bool = True,
) -> TaskRunnability:
if self.process_instance_model.persistence_level != "none": if self.process_instance_model.persistence_level != "none":
with safe_assertion(ProcessInstanceLockService.has_lock(self.process_instance_model.id)) as tripped: with safe_assertion(ProcessInstanceLockService.has_lock(self.process_instance_model.id)) as tripped:
if tripped: if tripped:
@ -492,7 +499,8 @@ class WorkflowExecutionService:
self.execution_strategy.add_object_to_db_session(self.bpmn_process_instance) self.execution_strategy.add_object_to_db_session(self.bpmn_process_instance)
if save: if save:
self.process_instance_saver() self.process_instance_saver()
self.schedule_waiting_timer_events() if should_schedule_waiting_timer_events:
self.schedule_waiting_timer_events()
def is_happening_soon(self, time_in_seconds: int) -> bool: def is_happening_soon(self, time_in_seconds: int) -> bool:
# if it is supposed to happen in less than the amount of time we take between polling runs # if it is supposed to happen in less than the amount of time we take between polling runs
@ -509,11 +517,17 @@ class WorkflowExecutionService:
if "Time" in event.event_type: if "Time" in event.event_type:
time_string = event.value time_string = event.value
run_at_in_seconds = round(datetime.fromisoformat(time_string).timestamp()) run_at_in_seconds = round(datetime.fromisoformat(time_string).timestamp())
FutureTaskModel.insert_or_update(guid=str(spiff_task.id), run_at_in_seconds=run_at_in_seconds) queued_to_run_at_in_seconds = None
if self.is_happening_soon(run_at_in_seconds): if self.is_happening_soon(run_at_in_seconds):
queue_future_task_if_appropriate( if queue_future_task_if_appropriate(
self.process_instance_model, eta_in_seconds=run_at_in_seconds, task_guid=str(spiff_task.id) self.process_instance_model, eta_in_seconds=run_at_in_seconds, task_guid=str(spiff_task.id)
) ):
queued_to_run_at_in_seconds = run_at_in_seconds
FutureTaskModel.insert_or_update(
guid=str(spiff_task.id),
run_at_in_seconds=run_at_in_seconds,
queued_to_run_at_in_seconds=queued_to_run_at_in_seconds,
)
def process_bpmn_messages(self) -> None: def process_bpmn_messages(self) -> None:
# FIXE: get_events clears out the events so if we have other events we care about # FIXE: get_events clears out the events so if we have other events we care about
@ -588,12 +602,19 @@ class WorkflowExecutionService:
class ProfiledWorkflowExecutionService(WorkflowExecutionService): class ProfiledWorkflowExecutionService(WorkflowExecutionService):
"""A profiled version of the workflow execution service.""" """A profiled version of the workflow execution service."""
def run_and_save(self, exit_at: None = None, save: bool = False) -> TaskRunnability: def run_and_save(
self,
exit_at: None = None,
save: bool = False,
should_schedule_waiting_timer_events: bool = True,
) -> TaskRunnability:
import cProfile import cProfile
from pstats import SortKey from pstats import SortKey
task_runnability = TaskRunnability.unknown_if_ready_tasks task_runnability = TaskRunnability.unknown_if_ready_tasks
with cProfile.Profile() as pr: with cProfile.Profile() as pr:
task_runnability = super().run_and_save(exit_at=exit_at, save=save) task_runnability = super().run_and_save(
exit_at=exit_at, save=save, should_schedule_waiting_timer_events=should_schedule_waiting_timer_events
)
pr.print_stats(sort=SortKey.CUMULATIVE) pr.print_stats(sort=SortKey.CUMULATIVE)
return task_runnability return task_runnability

View File

@ -0,0 +1,69 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" id="Definitions_96f6665" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.0.0-dev">
<bpmn:process id="Process_zaes0vi" isExecutable="true">
<bpmn:startEvent id="StartEvent_1">
<bpmn:outgoing>Flow_0903e0h</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_0903e0h" sourceRef="StartEvent_1" targetRef="user_task_one" />
<bpmn:endEvent id="user_task_path_end_event" name="User task path end event">
<bpmn:incoming>Flow_1yn50r0</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_1yn50r0" sourceRef="user_task_one" targetRef="user_task_path_end_event" />
<bpmn:manualTask id="user_task_one" name="User task one">
<bpmn:incoming>Flow_0903e0h</bpmn:incoming>
<bpmn:outgoing>Flow_1yn50r0</bpmn:outgoing>
</bpmn:manualTask>
<bpmn:boundaryEvent id="user_task_timer_event" name="User task timer event" attachedToRef="user_task_one">
<bpmn:outgoing>Flow_1ky2hak</bpmn:outgoing>
<bpmn:timerEventDefinition id="TimerEventDefinition_12rb24v">
<bpmn:timeDuration xsi:type="bpmn:tFormalExpression">"PT4M"</bpmn:timeDuration>
</bpmn:timerEventDefinition>
</bpmn:boundaryEvent>
<bpmn:endEvent id="timer_event_path_end_event" name="Timer event path end event">
<bpmn:incoming>Flow_1ky2hak</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_1ky2hak" sourceRef="user_task_timer_event" targetRef="timer_event_path_end_event" />
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Process_zaes0vi">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_0668ivs_di" bpmnElement="user_task_path_end_event">
<dc:Bounds x="432" y="159" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="416" y="202" width="71" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0kov31h_di" bpmnElement="user_task_one">
<dc:Bounds x="270" y="137" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_0nz6n0j_di" bpmnElement="timer_event_path_end_event">
<dc:Bounds x="402" y="282" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="379" y="325" width="83" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1aw81go_di" bpmnElement="user_task_timer_event">
<dc:Bounds x="312" y="199" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="293" y="242" width="75" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_0903e0h_di" bpmnElement="Flow_0903e0h">
<di:waypoint x="215" y="177" />
<di:waypoint x="270" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1yn50r0_di" bpmnElement="Flow_1yn50r0">
<di:waypoint x="370" y="177" />
<di:waypoint x="432" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1ky2hak_di" bpmnElement="Flow_1ky2hak">
<di:waypoint x="330" y="235" />
<di:waypoint x="330" y="300" />
<di:waypoint x="402" y="300" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>

View File

@ -8,6 +8,7 @@ from spiffworkflow_backend.models.future_task import FutureTaskModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
from spiffworkflow_backend.models.process_instance_queue import ProcessInstanceQueueModel from spiffworkflow_backend.models.process_instance_queue import ProcessInstanceQueueModel
from spiffworkflow_backend.models.process_model import ProcessModelInfo
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService
from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService
@ -94,17 +95,67 @@ class TestBackgroundProcessingService(BaseTest):
ProcessInstanceService.do_waiting(ProcessInstanceStatus.waiting.value) ProcessInstanceService.do_waiting(ProcessInstanceStatus.waiting.value)
assert process_instance.status == ProcessInstanceStatus.waiting.value assert process_instance.status == ProcessInstanceStatus.waiting.value
def _load_up_a_future_task_and_return_instance(self) -> ProcessInstanceModel: def test_does_not_queue_future_tasks_if_requested(
process_model = load_test_spec( self,
process_model_id="test_group/user-task-with-timer", app: Flask,
process_model_source_directory="user-task-with-timer", mocker: MockerFixture,
) with_db_and_bpmn_file_cleanup: None,
process_instance = self.create_process_instance_from_process_model(process_model=process_model) ) -> None:
with self.app_config_mock(app, "SPIFFWORKFLOW_BACKEND_CELERY_ENABLED", True):
mock = mocker.patch("celery.current_app.send_task")
self._load_up_a_future_task_and_return_instance(should_schedule_waiting_timer_events=False)
assert mock.call_count == 0
BackgroundProcessingService.do_process_future_tasks(99999999999999999)
assert mock.call_count == 0
future_tasks = FutureTaskModel.query.all()
assert len(future_tasks) == 0
def test_does_not_requeue_if_recently_queued(
self,
app: Flask,
mocker: MockerFixture,
with_db_and_bpmn_file_cleanup: None,
) -> None:
with self.app_config_mock(app, "SPIFFWORKFLOW_BACKEND_CELERY_ENABLED", True):
mock = mocker.patch("celery.current_app.send_task")
assert mock.call_count == 0
process_model = load_test_spec(
process_model_id="test_group/user-task-with-timer",
process_model_source_directory="user-task-with-timer",
bpmn_file_name="user_task_with_short_timer.bpmn",
)
# it should queue only when it runs the process model
self._load_up_a_future_task_and_return_instance(process_model=process_model)
assert mock.call_count == 1
BackgroundProcessingService.do_process_future_tasks(99999999999999999)
assert mock.call_count == 1
future_tasks = FutureTaskModel.query.all()
assert len(future_tasks) == 1
assert future_tasks[0].archived_for_process_instance_status is False
BackgroundProcessingService.do_process_future_tasks(99999999999999999)
assert mock.call_count == 1
future_tasks = FutureTaskModel.query.all()
assert len(future_tasks) == 1
assert future_tasks[0].archived_for_process_instance_status is False
def _load_up_a_future_task_and_return_instance(
self, process_model: ProcessModelInfo | None = None, should_schedule_waiting_timer_events: bool = True
) -> ProcessInstanceModel:
process_model_to_use = process_model
if process_model_to_use is None:
process_model_to_use = load_test_spec(
process_model_id="test_group/user-task-with-timer",
process_model_source_directory="user-task-with-timer",
bpmn_file_name="user_task_with_timer.bpmn",
)
process_instance = self.create_process_instance_from_process_model(process_model=process_model_to_use)
processor = ProcessInstanceProcessor(process_instance) processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True) processor.do_engine_steps(save=True, should_schedule_waiting_timer_events=should_schedule_waiting_timer_events)
assert process_instance.status == "user_input_required" assert process_instance.status == "user_input_required"
future_tasks = FutureTaskModel.query.all() future_tasks = FutureTaskModel.query.all()
assert len(future_tasks) == 1 assert len(future_tasks) == (1 if should_schedule_waiting_timer_events else 0)
return process_instance return process_instance

View File

@ -22,6 +22,7 @@ class TestFutureTask(BaseTest):
process_model = load_test_spec( process_model = load_test_spec(
process_model_id="test_group/user-task-with-timer", process_model_id="test_group/user-task-with-timer",
process_model_source_directory="user-task-with-timer", process_model_source_directory="user-task-with-timer",
bpmn_file_name="user_task_with_timer.bpmn",
) )
process_instance = self.create_process_instance_from_process_model(process_model=process_model) process_instance = self.create_process_instance_from_process_model(process_model=process_model)
processor = ProcessInstanceProcessor(process_instance) processor = ProcessInstanceProcessor(process_instance)