From 80ad92a0c323a2ccca1a53deefeeb52f4d611ab0 Mon Sep 17 00:00:00 2001 From: Dan Funk Date: Thu, 7 Sep 2023 10:12:56 -0400 Subject: [PATCH] Threads (#439) * WIP - threading with dan * WIP - cleanup with dan * Fixing a minor bug and adding a comment and taking the win, even though it was all actually done by @jbirddog with help from @essweine * adding a couple of simple tests * * Threaded execution of multiple ready engine tasks is now the default behavior for all execution strategies by default (the skip-one strategry does not do it) * Assure that tasks in the db match those in the spiffworkflow bpmn on save (remove all predicted and pruned tasks) with @jbirddog, @jlantz, @burnettk * run_pyl --------- Co-authored-by: Jon Herron Co-authored-by: jasquat --- .../routes/extensions_controller.py | 2 +- .../routes/tasks_controller.py | 48 ++--- .../services/process_instance_processor.py | 11 +- .../services/workflow_execution_service.py | 164 ++++++++++-------- .../threads_multi_instance.bpmn | 61 +++++++ .../threads_with_script_timers.bpmn | 137 +++++++++++++++ .../unit/test_threaded_execution.py | 107 ++++++++++++ 7 files changed, 433 insertions(+), 97 deletions(-) create mode 100644 spiffworkflow-backend/tests/data/threads_multi_instance/threads_multi_instance.bpmn create mode 100644 spiffworkflow-backend/tests/data/threads_with_script_timers/threads_with_script_timers.bpmn create mode 100644 spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_threaded_execution.py diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/extensions_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/extensions_controller.py index 05838b9b5..076dad070 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/extensions_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/extensions_controller.py @@ -139,7 +139,7 @@ def _run_extension( process_instance, script_engine=CustomBpmnScriptEngine(use_restricted_script_engine=False) ) if body and "extension_input" in body: - processor.do_engine_steps(save=False, execution_strategy_name="one_at_a_time") + processor.do_engine_steps(save=False, execution_strategy_name="run_current_ready_tasks") next_task = processor.next_task() next_task.update_data(body["extension_input"]) processor.do_engine_steps(save=False, execution_strategy_name="greedy") diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py index 82128de40..d55b47dd4 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py @@ -446,6 +446,7 @@ def _interstitial_stream( reported_ids = [] # A list of all the ids reported by this endpoint so far. tasks = get_reportable_tasks() while True: + has_ready_tasks = False for spiff_task in tasks: # ignore the instructions if they are on the EndEvent for the top level process if not TaskService.is_main_process_end_event(spiff_task): @@ -465,30 +466,33 @@ def _interstitial_stream( yield _render_data("task", task) reported_ids.append(spiff_task.id) if spiff_task.state == TaskState.READY: - # do not do any processing if the instance is not currently active - if process_instance.status not in ProcessInstanceModel.active_statuses(): - yield _render_data("unrunnable_instance", process_instance) - return - if execute_tasks: - try: - # run_until_user_message does not run tasks with instructions to use one_at_a_time - # to force it to run the task. - processor.do_engine_steps(execution_strategy_name="one_at_a_time") - processor.do_engine_steps(execution_strategy_name="run_until_user_message") - processor.save() # Fixme - maybe find a way not to do this on every loop? - processor.refresh_waiting_tasks() + has_ready_tasks = True - except WorkflowTaskException as wfe: - api_error = ApiError.from_workflow_exception( - "engine_steps_error", "Failed to complete an automated task.", exp=wfe - ) - yield _render_data("error", api_error) - ErrorHandlingService.handle_error(process_instance, wfe) - return - # return if process instance is now complete and let the frontend redirect to show page - if process_instance.status not in ProcessInstanceModel.active_statuses(): - yield _render_data("unrunnable_instance", process_instance) + if has_ready_tasks: + # do not do any processing if the instance is not currently active + if process_instance.status not in ProcessInstanceModel.active_statuses(): + yield _render_data("unrunnable_instance", process_instance) + return + if execute_tasks: + try: + # run_until_user_message does not run tasks with instructions so run readys first + # to force it to run the task. + processor.do_engine_steps(execution_strategy_name="run_current_ready_tasks") + processor.do_engine_steps(execution_strategy_name="run_until_user_message") + processor.save() # Fixme - maybe find a way not to do this on every loop? + processor.refresh_waiting_tasks() + + except WorkflowTaskException as wfe: + api_error = ApiError.from_workflow_exception( + "engine_steps_error", "Failed to complete an automated task.", exp=wfe + ) + yield _render_data("error", api_error) + ErrorHandlingService.handle_error(process_instance, wfe) return + # return if process instance is now complete and let the frontend redirect to show page + if process_instance.status not in ProcessInstanceModel.active_statuses(): + yield _render_data("unrunnable_instance", process_instance) + return # path used by the interstitial page while executing tasks - ie the background processor is not executing them ready_engine_task_count = _get_ready_engine_step_count(processor.bpmn_process_instance) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 4797d2939..5f3e01c55 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1005,6 +1005,15 @@ class ProcessInstanceProcessor: db.session.add(self.process_instance_model) db.session.commit() + known_task_ids = [str(t.id) for t in self.bpmn_process_instance.get_tasks()] + TaskModel.query.filter(TaskModel.process_instance_id == self.process_instance_model.id).filter( + TaskModel.guid.notin_(known_task_ids) # type: ignore + ).delete() + HumanTaskModel.query.filter(HumanTaskModel.process_instance_id == self.process_instance_model.id).filter( + HumanTaskModel.task_id.notin_(known_task_ids) # type: ignore + ).delete() + db.session.commit() + human_tasks = HumanTaskModel.query.filter_by( process_instance_id=self.process_instance_model.id, completed=False ).all() @@ -1124,7 +1133,7 @@ class ProcessInstanceProcessor: f"Manually executing Task {spiff_task.task_spec.name} of process" f" instance {self.process_instance_model.id}" ) - self.do_engine_steps(save=True, execution_strategy_name="one_at_a_time") + self.do_engine_steps(save=True, execution_strategy_name="run_current_ready_tasks") else: current_app.logger.info(f"Skipped task {spiff_task.task_spec.name}", extra=spiff_task.log_info()) task_model_delegate = TaskModelSavingDelegate( diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 83f38f37f..55237d931 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -1,5 +1,6 @@ from __future__ import annotations +import concurrent.futures import copy import time from abc import abstractmethod @@ -7,6 +8,9 @@ from collections.abc import Callable from typing import Any from uuid import UUID +import flask.app +from flask import current_app +from flask import g from SpiffWorkflow.bpmn.exceptions import WorkflowTaskException # type: ignore from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore from SpiffWorkflow.bpmn.specs.event_definitions.message import MessageEventDefinition # type: ignore @@ -84,9 +88,77 @@ class ExecutionStrategy: self.subprocess_spec_loader = subprocess_spec_loader self.options = options - @abstractmethod + def should_break_before(self, tasks: list[SpiffTask]) -> bool: + return False + + def should_break_after(self, tasks: list[SpiffTask]) -> bool: + return False + + def _run( + self, + spiff_task: SpiffTask, + app: flask.app.Flask, + process_instance_id: Any | None, + process_model_identifier: Any | None, + user: Any | None, + ) -> SpiffTask: + with app.app_context(): + app.config["THREAD_LOCAL_DATA"].process_instance_id = process_instance_id + app.config["THREAD_LOCAL_DATA"].process_model_identifier = process_model_identifier + g.user = user + spiff_task.run() + return spiff_task + def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None: - pass + # Note + while True: + bpmn_process_instance.refresh_waiting_tasks() + engine_steps = self.get_ready_engine_steps(bpmn_process_instance) + if self.should_break_before(engine_steps): + break + num_steps = len(engine_steps) + if num_steps == 0: + break + elif num_steps == 1: + spiff_task = engine_steps[0] + self.delegate.will_complete_task(spiff_task) + spiff_task.run() + self.delegate.did_complete_task(spiff_task) + elif num_steps > 1: + # This only works because of the GIL, and the fact that we are not actually executing + # code in parallel, we are just waiting for I/O in parallel. So it can run a ton of + # service tasks at once - many api calls, and then get those responses back without + # waiting for each individual task to complete. + futures = [] + process_instance_id = None + process_model_identifier = None + if hasattr(current_app.config["THREAD_LOCAL_DATA"], "process_instance_id"): + process_instance_id = current_app.config["THREAD_LOCAL_DATA"].process_instance_id + process_model_identifier = current_app.config["THREAD_LOCAL_DATA"].process_model_identifier + user = None + if hasattr(g, "user"): + user = g.user + + with concurrent.futures.ThreadPoolExecutor() as executor: + for spiff_task in engine_steps: + self.delegate.will_complete_task(spiff_task) + futures.append( + executor.submit( + self._run, + spiff_task, + current_app._get_current_object(), + process_instance_id, + process_model_identifier, + user, + ) + ) + for future in concurrent.futures.as_completed(futures): + spiff_task = future.result() + self.delegate.did_complete_task(spiff_task) + if self.should_break_after(engine_steps): + break + + self.delegate.after_engine_steps(bpmn_process_instance) def on_exception(self, bpmn_process_instance: BpmnWorkflow) -> None: self.delegate.on_exception(bpmn_process_instance) @@ -99,7 +171,9 @@ class ExecutionStrategy: if len(tasks) > 0: self.subprocess_spec_loader() - tasks = [tasks[0]] + + # TODO: verify the other execution strategies work still and delete to make this work like the name + # tasks = [tasks[0]] return tasks @@ -261,47 +335,11 @@ class TaskModelSavingDelegate(EngineStepDelegate): class GreedyExecutionStrategy(ExecutionStrategy): - """The common execution strategy. This will greedily run all engine steps without stopping.""" - - def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None: - self.bpmn_process_instance = bpmn_process_instance - self.run_until_user_input_required(exit_at) - self.delegate.after_engine_steps(bpmn_process_instance) - - def run_until_user_input_required(self, exit_at: None = None) -> None: - """Keeps running tasks until there are no non-human READY tasks. - - spiff.refresh_waiting_tasks is the thing that pushes some waiting tasks to READY. - """ - engine_steps = self.get_ready_engine_steps(self.bpmn_process_instance) - while engine_steps: - for spiff_task in engine_steps: - self.delegate.will_complete_task(spiff_task) - spiff_task.run() - self.delegate.did_complete_task(spiff_task) - self.bpmn_process_instance.refresh_waiting_tasks() - engine_steps = self.get_ready_engine_steps(self.bpmn_process_instance) - - -class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy): - """For illustration purposes, not currently integrated. - - Would allow the `run` from the UI to execute until a service task then - return (to an interstitial page). The background processor would then take over. + """ + This is what the base class does by default. """ - def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None: - engine_steps = self.get_ready_engine_steps(bpmn_process_instance) - while engine_steps: - for spiff_task in engine_steps: - if spiff_task.task_spec.description == "Service Task": - return - self.delegate.will_complete_task(spiff_task) - spiff_task.run() - self.delegate.did_complete_task(spiff_task) - bpmn_process_instance.refresh_waiting_tasks() - engine_steps = self.get_ready_engine_steps(bpmn_process_instance) - self.delegate.after_engine_steps(bpmn_process_instance) + pass class RunUntilUserTaskOrMessageExecutionStrategy(ExecutionStrategy): @@ -311,40 +349,22 @@ class RunUntilUserTaskOrMessageExecutionStrategy(ExecutionStrategy): but will stop if it hits instructions after the first task. """ - def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None: - should_continue = True - bpmn_process_instance.refresh_waiting_tasks() - engine_steps = self.get_ready_engine_steps(bpmn_process_instance) - while engine_steps and should_continue: - for task in engine_steps: - if hasattr(task.task_spec, "extensions") and task.task_spec.extensions.get( - "instructionsForEndUser", None - ): - should_continue = False - break - self.delegate.will_complete_task(task) - task.run() - self.delegate.did_complete_task(task) - bpmn_process_instance.refresh_waiting_tasks() - engine_steps = self.get_ready_engine_steps(bpmn_process_instance) - self.delegate.after_engine_steps(bpmn_process_instance) + def should_break_before(self, tasks: list[SpiffTask]) -> bool: + for task in tasks: + if hasattr(task.task_spec, "extensions") and task.task_spec.extensions.get("instructionsForEndUser", None): + return True + return False -class OneAtATimeExecutionStrategy(ExecutionStrategy): +class RunCurrentReadyTasksExecutionStrategy(ExecutionStrategy): """When you want to run only one engine step at a time.""" - def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None: - engine_steps = self.get_ready_engine_steps(bpmn_process_instance) - if len(engine_steps) > 0: - spiff_task = engine_steps[0] - self.delegate.will_complete_task(spiff_task) - spiff_task.run() - self.delegate.did_complete_task(spiff_task) - self.delegate.after_engine_steps(bpmn_process_instance) + def should_break_after(self, tasks: list[SpiffTask]) -> bool: + return True class SkipOneExecutionStrategy(ExecutionStrategy): - """When you want to to skip over the next task, rather than execute it.""" + """When you want to skip over the next task, rather than execute it.""" def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None: spiff_task = None @@ -366,13 +386,12 @@ def execution_strategy_named( ) -> ExecutionStrategy: cls = { "greedy": GreedyExecutionStrategy, - "run_until_service_task": RunUntilServiceTaskExecutionStrategy, "run_until_user_message": RunUntilUserTaskOrMessageExecutionStrategy, - "one_at_a_time": OneAtATimeExecutionStrategy, + "run_current_ready_tasks": RunCurrentReadyTasksExecutionStrategy, "skip_one": SkipOneExecutionStrategy, }[name] - return cls(delegate, spec_loader) # type: ignore + return cls(delegate, spec_loader) ProcessInstanceCompleter = Callable[[BpmnWorkflow], None] @@ -409,7 +428,6 @@ class WorkflowExecutionService: "The current thread has not obtained a lock for this process" f" instance ({self.process_instance_model.id})." ) - try: self.bpmn_process_instance.refresh_waiting_tasks() diff --git a/spiffworkflow-backend/tests/data/threads_multi_instance/threads_multi_instance.bpmn b/spiffworkflow-backend/tests/data/threads_multi_instance/threads_multi_instance.bpmn new file mode 100644 index 000000000..c357ff8b0 --- /dev/null +++ b/spiffworkflow-backend/tests/data/threads_multi_instance/threads_multi_instance.bpmn @@ -0,0 +1,61 @@ + + + + + Flow_13c96f7 + + + + Flow_0pe3a6u + + + + Flow_043nok3 + Flow_0pe3a6u + + letters + upper_letters + + + + upper_letter = letter.upper() +time.sleep(0.1) + + + + Flow_13c96f7 + Flow_043nok3 + letters = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z'] + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spiffworkflow-backend/tests/data/threads_with_script_timers/threads_with_script_timers.bpmn b/spiffworkflow-backend/tests/data/threads_with_script_timers/threads_with_script_timers.bpmn new file mode 100644 index 000000000..39bc8f266 --- /dev/null +++ b/spiffworkflow-backend/tests/data/threads_with_script_timers/threads_with_script_timers.bpmn @@ -0,0 +1,137 @@ + + + + + Flow_0v5xitk + + + + Flow_0v5xitk + Flow_1lhzzw7 + Flow_1gjknxb + Flow_0cj2tia + Flow_0pjsxm2 + + + + + + + + Flow_08yg9t5 + Flow_03k3kx2 + Flow_1pm1w0h + Flow_0e2holy + Flow_0kguhla + + + + + + Flow_0kguhla + + + + Flow_1lhzzw7 + Flow_08yg9t5 + time.sleep(0.1) +a=1 + + + Flow_1gjknxb + Flow_03k3kx2 + time.sleep(0.1) +b=1 + + + Flow_0cj2tia + Flow_1pm1w0h + time.sleep(0.1) +c=1 + + + Flow_0pjsxm2 + Flow_0e2holy + time.sleep(0.1) +d=1 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_threaded_execution.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_threaded_execution.py new file mode 100644 index 000000000..06ae18859 --- /dev/null +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_threaded_execution.py @@ -0,0 +1,107 @@ +from flask import Flask +from flask.testing import FlaskClient +from spiffworkflow_backend.models.process_instance import ProcessInstanceModel +from spiffworkflow_backend.models.task import TaskModel +from spiffworkflow_backend.models.user import UserModel +from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor + +from tests.spiffworkflow_backend.helpers.base_test import BaseTest +from tests.spiffworkflow_backend.helpers.test_data import load_test_spec + + +class TestThreadedExecution(BaseTest): + def test_four_parallel_script_tasks_that_wait_one_tenth_second( + self, + app: Flask, + with_db_and_bpmn_file_cleanup: None, + with_super_admin_user: UserModel, + ) -> None: + self.create_process_group("test_group", "test_group") + process_model = load_test_spec( + process_model_id="test_group/threads_with_script_timers", + process_model_source_directory="threads_with_script_timers", + ) + process_instance = self.create_process_instance_from_process_model( + process_model=process_model, user=with_super_admin_user + ) + processor = ProcessInstanceProcessor(process_instance) + processor.do_engine_steps(save=True) + + self.assert_same_start_times(process_instance, "ThreadTask", 4) + assert processor.bpmn_process_instance.is_completed() + assert processor.bpmn_process_instance.last_task.data == {"a": 1, "b": 1, "c": 1, "d": 1} + + def test_multi_instance_can_run_in_parallel( + self, + app: Flask, + client: FlaskClient, + with_db_and_bpmn_file_cleanup: None, + with_super_admin_user: UserModel, + ) -> None: + self.create_process_group("test_group", "test_group") + process_model = load_test_spec( + process_model_id="test_group/threads_multi_instance", + process_model_source_directory="threads_multi_instance", + ) + process_instance = self.create_process_instance_from_process_model( + process_model=process_model, user=with_super_admin_user + ) + processor = ProcessInstanceProcessor(process_instance) + processor.do_engine_steps(save=True) + self.assert_same_start_times(process_instance, "multi", 26) + assert processor.bpmn_process_instance.is_completed() + upper_letters = processor.bpmn_process_instance.last_task.data["upper_letters"] + # Note that Sort is required here, because the threaded execution will complete the list out of order. + upper_letters.sort() + assert upper_letters == [ + "A", + "B", + "C", + "D", + "E", + "F", + "G", + "H", + "I", + "J", + "K", + "L", + "M", + "N", + "O", + "P", + "Q", + "R", + "S", + "T", + "U", + "V", + "W", + "X", + "Y", + "Z", + ] + + def assert_same_start_times( + self, process_instance: ProcessInstanceModel, task_name_starts_with: str, expected_size: int + ) -> None: + # The start_time recorded for each task should be nearly identical. + bpmn_process_id = process_instance.bpmn_process_id + task_models = TaskModel.query.filter_by(bpmn_process_id=bpmn_process_id).all() + script_tasks = list( + filter( + lambda tm: tm.task_definition.bpmn_name is not None + and tm.task_definition.bpmn_name.startswith(task_name_starts_with), + task_models, + ) + ) + + assert len(script_tasks) == expected_size + last_task = None + for task_model in script_tasks: + if last_task is None: + last_task = task_model + continue + # Even through the script tasks sleep for .1 second, the difference in start times + # should be less than 0.001 seconds - they should all start at the same time. + assert (last_task.start_in_seconds - task_model.start_in_seconds) < 0.001 # type: ignore