diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py b/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py
index 0b592be2..e2b926f6 100644
--- a/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py
+++ b/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py
@@ -221,3 +221,6 @@ config_from_env("SPIFFWORKFLOW_BACKEND_DEBUG_TASK_CONSISTENCY", default=False)
# adds the ProxyFix to Flask on http by processing the 'X-Forwarded-Proto' header
# to make SpiffWorkflow aware that it should return https for the server urls etc rather than http.
config_from_env("SPIFFWORKFLOW_BACKEND_USE_WERKZEUG_MIDDLEWARE_PROXY_FIX", default=False)
+
+# only for DEBUGGING - turn off threaded task execution.
+config_from_env("SPIFFWORKFLOW_BACKEND_USE_THREADS_FOR_TASK_EXECUTION", default=True)
diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py
index ae041bf6..b54c59bf 100644
--- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py
+++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py
@@ -13,6 +13,7 @@ from flask import current_app
from flask import g
from SpiffWorkflow.bpmn.exceptions import WorkflowTaskException # type: ignore
from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore
+from SpiffWorkflow.bpmn.specs.control import UnstructuredJoin # type: ignore
from SpiffWorkflow.bpmn.specs.event_definitions.message import MessageEventDefinition # type: ignore
from SpiffWorkflow.bpmn.specs.mixins.events.event_types import CatchingEvent # type: ignore
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore
@@ -33,6 +34,7 @@ from spiffworkflow_backend.models.message_instance_correlation import MessageIns
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType
from spiffworkflow_backend.models.task_instructions_for_end_user import TaskInstructionsForEndUserModel
+from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.assertion_service import safe_assertion
from spiffworkflow_backend.services.jinja_service import JinjaService
from spiffworkflow_backend.services.process_instance_lock_service import ProcessInstanceLockService
@@ -143,30 +145,26 @@ class ExecutionStrategy:
spiff_task.run()
self.delegate.did_complete_task(spiff_task)
elif num_steps > 1:
- # This only works because of the GIL, and the fact that we are not actually executing
- # code in parallel, we are just waiting for I/O in parallel. So it can run a ton of
- # service tasks at once - many api calls, and then get those responses back without
- # waiting for each individual task to complete.
- futures = []
user = None
if hasattr(g, "user"):
user = g.user
- with concurrent.futures.ThreadPoolExecutor() as executor:
- for spiff_task in engine_steps:
- self.delegate.will_complete_task(spiff_task)
- futures.append(
- executor.submit(
- self._run,
- spiff_task,
- current_app._get_current_object(),
- user,
- process_instance_model.process_model_identifier,
- )
- )
- for future in concurrent.futures.as_completed(futures):
- spiff_task = future.result()
- self.delegate.did_complete_task(spiff_task)
+ # When a task with a gateway is completed it marks the gateway as either WAITING or READY.
+ # The problem is if two of these parent tasks mark their gateways as READY then both are processed
+ # and end up being marked completed, when in fact only one gateway attached to the same bpmn bpmn_id
+ # is allowed to be READY/COMPLETED. If two are READY and execute, then the tasks after the gateway will
+ # be unintentially duplicated.
+ has_gateway_children = False
+ for spiff_task in engine_steps:
+ for child_task in spiff_task.children:
+ if isinstance(child_task.task_spec, UnstructuredJoin):
+ has_gateway_children = True
+
+ if current_app.config["SPIFFWORKFLOW_BACKEND_USE_THREADS_FOR_TASK_EXECUTION"] and not has_gateway_children:
+ self._run_engine_steps_with_threads(engine_steps, process_instance_model, user)
+ else:
+ self._run_engine_steps_without_threads(engine_steps, process_instance_model, user)
+
if self.should_break_after(engine_steps):
# we could call the stuff at the top of the loop again and find out, but let's not do that unless we need to
task_runnability = TaskRunnability.unknown_if_ready_tasks
@@ -184,6 +182,45 @@ class ExecutionStrategy:
def get_ready_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> list[SpiffTask]:
return [t for t in bpmn_process_instance.get_tasks(state=TaskState.READY) if not t.task_spec.manual]
+ def _run_engine_steps_with_threads(
+ self, engine_steps: list[SpiffTask], process_instance: ProcessInstanceModel, user: UserModel | None
+ ) -> None:
+ # This only works because of the GIL, and the fact that we are not actually executing
+ # code in parallel, we are just waiting for I/O in parallel. So it can run a ton of
+ # service tasks at once - many api calls, and then get those responses back without
+ # waiting for each individual task to complete.
+ futures = []
+ with concurrent.futures.ThreadPoolExecutor() as executor:
+ for spiff_task in engine_steps:
+ self.delegate.will_complete_task(spiff_task)
+ futures.append(
+ executor.submit(
+ self._run,
+ spiff_task,
+ current_app._get_current_object(),
+ user,
+ process_instance.process_model_identifier,
+ )
+ )
+ for future in concurrent.futures.as_completed(futures):
+ spiff_task = future.result()
+
+ for spiff_task in engine_steps:
+ self.delegate.did_complete_task(spiff_task)
+
+ def _run_engine_steps_without_threads(
+ self, engine_steps: list[SpiffTask], process_instance: ProcessInstanceModel, user: UserModel | None
+ ) -> None:
+ for spiff_task in engine_steps:
+ self.delegate.will_complete_task(spiff_task)
+ self._run(
+ spiff_task,
+ current_app._get_current_object(),
+ user,
+ process_instance.process_model_identifier,
+ )
+ self.delegate.did_complete_task(spiff_task)
+
class TaskModelSavingDelegate(EngineStepDelegate):
"""Engine step delegate that takes care of saving a task model to the database.
diff --git a/spiffworkflow-backend/tests/data/threads_with_script_timers/threads_with_script_timers.bpmn b/spiffworkflow-backend/tests/data/threads_with_script_timers/threads_with_script_timers.bpmn
index 39bc8f26..60821f5f 100644
--- a/spiffworkflow-backend/tests/data/threads_with_script_timers/threads_with_script_timers.bpmn
+++ b/spiffworkflow-backend/tests/data/threads_with_script_timers/threads_with_script_timers.bpmn
@@ -16,17 +16,17 @@
-
+
- Flow_08yg9t5
- Flow_03k3kx2
- Flow_1pm1w0h
- Flow_0e2holy
+ Flow_11fj5bn
+ Flow_02nckfs
+ Flow_0s3m7td
+ Flow_1nyxldz
Flow_0kguhla
-
-
-
+
+
+
Flow_0kguhla
@@ -55,6 +55,26 @@ c=1
time.sleep(0.1)
d=1
+
+
+
+
+
+ Flow_0e2holy
+ Flow_1nyxldz
+
+
+ Flow_1pm1w0h
+ Flow_0s3m7td
+
+
+ Flow_03k3kx2
+ Flow_02nckfs
+
+
+ Flow_08yg9t5
+ Flow_11fj5bn
+
@@ -65,10 +85,10 @@ d=1
-
+
-
+
@@ -86,6 +106,22 @@ d=1
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -111,26 +147,42 @@ d=1
-
+
-
-
+
-
-
+
-
-
+
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+