From 0d7a114c982ace9329650643c7845de3402d22b6 Mon Sep 17 00:00:00 2001 From: jasquat <2487833+jasquat@users.noreply.github.com> Date: Mon, 29 Apr 2024 20:50:20 +0000 Subject: [PATCH] Config turn off threads (#1466) * added config to turn off task execution with threads w/ burnettk * do not run tasks in parallel if they have gateway children w/ burnettk * remove debugging code w/ burnettk * attempting to fix flakey test w/ burnettk --------- Co-authored-by: jasquat --- .../spiffworkflow_backend/config/default.py | 3 + .../services/workflow_execution_service.py | 77 +++++++++++----- .../threads_with_script_timers.bpmn | 90 +++++++++++++++---- 3 files changed, 131 insertions(+), 39 deletions(-) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py b/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py index 0b592be2..e2b926f6 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py @@ -221,3 +221,6 @@ config_from_env("SPIFFWORKFLOW_BACKEND_DEBUG_TASK_CONSISTENCY", default=False) # adds the ProxyFix to Flask on http by processing the 'X-Forwarded-Proto' header # to make SpiffWorkflow aware that it should return https for the server urls etc rather than http. config_from_env("SPIFFWORKFLOW_BACKEND_USE_WERKZEUG_MIDDLEWARE_PROXY_FIX", default=False) + +# only for DEBUGGING - turn off threaded task execution. +config_from_env("SPIFFWORKFLOW_BACKEND_USE_THREADS_FOR_TASK_EXECUTION", default=True) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index ae041bf6..b54c59bf 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -13,6 +13,7 @@ from flask import current_app from flask import g from SpiffWorkflow.bpmn.exceptions import WorkflowTaskException # type: ignore from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore +from SpiffWorkflow.bpmn.specs.control import UnstructuredJoin # type: ignore from SpiffWorkflow.bpmn.specs.event_definitions.message import MessageEventDefinition # type: ignore from SpiffWorkflow.bpmn.specs.mixins.events.event_types import CatchingEvent # type: ignore from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore @@ -33,6 +34,7 @@ from spiffworkflow_backend.models.message_instance_correlation import MessageIns from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType from spiffworkflow_backend.models.task_instructions_for_end_user import TaskInstructionsForEndUserModel +from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.services.assertion_service import safe_assertion from spiffworkflow_backend.services.jinja_service import JinjaService from spiffworkflow_backend.services.process_instance_lock_service import ProcessInstanceLockService @@ -143,30 +145,26 @@ class ExecutionStrategy: spiff_task.run() self.delegate.did_complete_task(spiff_task) elif num_steps > 1: - # This only works because of the GIL, and the fact that we are not actually executing - # code in parallel, we are just waiting for I/O in parallel. So it can run a ton of - # service tasks at once - many api calls, and then get those responses back without - # waiting for each individual task to complete. - futures = [] user = None if hasattr(g, "user"): user = g.user - with concurrent.futures.ThreadPoolExecutor() as executor: - for spiff_task in engine_steps: - self.delegate.will_complete_task(spiff_task) - futures.append( - executor.submit( - self._run, - spiff_task, - current_app._get_current_object(), - user, - process_instance_model.process_model_identifier, - ) - ) - for future in concurrent.futures.as_completed(futures): - spiff_task = future.result() - self.delegate.did_complete_task(spiff_task) + # When a task with a gateway is completed it marks the gateway as either WAITING or READY. + # The problem is if two of these parent tasks mark their gateways as READY then both are processed + # and end up being marked completed, when in fact only one gateway attached to the same bpmn bpmn_id + # is allowed to be READY/COMPLETED. If two are READY and execute, then the tasks after the gateway will + # be unintentially duplicated. + has_gateway_children = False + for spiff_task in engine_steps: + for child_task in spiff_task.children: + if isinstance(child_task.task_spec, UnstructuredJoin): + has_gateway_children = True + + if current_app.config["SPIFFWORKFLOW_BACKEND_USE_THREADS_FOR_TASK_EXECUTION"] and not has_gateway_children: + self._run_engine_steps_with_threads(engine_steps, process_instance_model, user) + else: + self._run_engine_steps_without_threads(engine_steps, process_instance_model, user) + if self.should_break_after(engine_steps): # we could call the stuff at the top of the loop again and find out, but let's not do that unless we need to task_runnability = TaskRunnability.unknown_if_ready_tasks @@ -184,6 +182,45 @@ class ExecutionStrategy: def get_ready_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> list[SpiffTask]: return [t for t in bpmn_process_instance.get_tasks(state=TaskState.READY) if not t.task_spec.manual] + def _run_engine_steps_with_threads( + self, engine_steps: list[SpiffTask], process_instance: ProcessInstanceModel, user: UserModel | None + ) -> None: + # This only works because of the GIL, and the fact that we are not actually executing + # code in parallel, we are just waiting for I/O in parallel. So it can run a ton of + # service tasks at once - many api calls, and then get those responses back without + # waiting for each individual task to complete. + futures = [] + with concurrent.futures.ThreadPoolExecutor() as executor: + for spiff_task in engine_steps: + self.delegate.will_complete_task(spiff_task) + futures.append( + executor.submit( + self._run, + spiff_task, + current_app._get_current_object(), + user, + process_instance.process_model_identifier, + ) + ) + for future in concurrent.futures.as_completed(futures): + spiff_task = future.result() + + for spiff_task in engine_steps: + self.delegate.did_complete_task(spiff_task) + + def _run_engine_steps_without_threads( + self, engine_steps: list[SpiffTask], process_instance: ProcessInstanceModel, user: UserModel | None + ) -> None: + for spiff_task in engine_steps: + self.delegate.will_complete_task(spiff_task) + self._run( + spiff_task, + current_app._get_current_object(), + user, + process_instance.process_model_identifier, + ) + self.delegate.did_complete_task(spiff_task) + class TaskModelSavingDelegate(EngineStepDelegate): """Engine step delegate that takes care of saving a task model to the database. diff --git a/spiffworkflow-backend/tests/data/threads_with_script_timers/threads_with_script_timers.bpmn b/spiffworkflow-backend/tests/data/threads_with_script_timers/threads_with_script_timers.bpmn index 39bc8f26..60821f5f 100644 --- a/spiffworkflow-backend/tests/data/threads_with_script_timers/threads_with_script_timers.bpmn +++ b/spiffworkflow-backend/tests/data/threads_with_script_timers/threads_with_script_timers.bpmn @@ -16,17 +16,17 @@ - + - Flow_08yg9t5 - Flow_03k3kx2 - Flow_1pm1w0h - Flow_0e2holy + Flow_11fj5bn + Flow_02nckfs + Flow_0s3m7td + Flow_1nyxldz Flow_0kguhla - - - + + + Flow_0kguhla @@ -55,6 +55,26 @@ c=1 time.sleep(0.1) d=1 + + + + + + Flow_0e2holy + Flow_1nyxldz + + + Flow_1pm1w0h + Flow_0s3m7td + + + Flow_03k3kx2 + Flow_02nckfs + + + Flow_08yg9t5 + Flow_11fj5bn + @@ -65,10 +85,10 @@ d=1 - + - + @@ -86,6 +106,22 @@ d=1 + + + + + + + + + + + + + + + + @@ -111,26 +147,42 @@ d=1 - + - - + - - + - - + - - + + + + + + + + + + + + + + + + + + + + +