Config turn off threads (#1466)

* added config to turn off task execution with threads w/ burnettk

* do not run tasks in parallel if they have gateway children w/ burnettk

* remove debugging code w/ burnettk

* attempting to fix flakey test w/ burnettk

---------

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
jasquat 2024-04-29 20:50:20 +00:00 committed by GitHub
parent 48200283b5
commit 0d7a114c98
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 131 additions and 39 deletions

View File

@ -221,3 +221,6 @@ config_from_env("SPIFFWORKFLOW_BACKEND_DEBUG_TASK_CONSISTENCY", default=False)
# adds the ProxyFix to Flask on http by processing the 'X-Forwarded-Proto' header
# to make SpiffWorkflow aware that it should return https for the server urls etc rather than http.
config_from_env("SPIFFWORKFLOW_BACKEND_USE_WERKZEUG_MIDDLEWARE_PROXY_FIX", default=False)
# only for DEBUGGING - turn off threaded task execution.
config_from_env("SPIFFWORKFLOW_BACKEND_USE_THREADS_FOR_TASK_EXECUTION", default=True)

View File

@ -13,6 +13,7 @@ from flask import current_app
from flask import g
from SpiffWorkflow.bpmn.exceptions import WorkflowTaskException # type: ignore
from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore
from SpiffWorkflow.bpmn.specs.control import UnstructuredJoin # type: ignore
from SpiffWorkflow.bpmn.specs.event_definitions.message import MessageEventDefinition # type: ignore
from SpiffWorkflow.bpmn.specs.mixins.events.event_types import CatchingEvent # type: ignore
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore
@ -33,6 +34,7 @@ from spiffworkflow_backend.models.message_instance_correlation import MessageIns
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType
from spiffworkflow_backend.models.task_instructions_for_end_user import TaskInstructionsForEndUserModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.assertion_service import safe_assertion
from spiffworkflow_backend.services.jinja_service import JinjaService
from spiffworkflow_backend.services.process_instance_lock_service import ProcessInstanceLockService
@ -143,30 +145,26 @@ class ExecutionStrategy:
spiff_task.run()
self.delegate.did_complete_task(spiff_task)
elif num_steps > 1:
# This only works because of the GIL, and the fact that we are not actually executing
# code in parallel, we are just waiting for I/O in parallel. So it can run a ton of
# service tasks at once - many api calls, and then get those responses back without
# waiting for each individual task to complete.
futures = []
user = None
if hasattr(g, "user"):
user = g.user
with concurrent.futures.ThreadPoolExecutor() as executor:
# When a task with a gateway is completed it marks the gateway as either WAITING or READY.
# The problem is if two of these parent tasks mark their gateways as READY then both are processed
# and end up being marked completed, when in fact only one gateway attached to the same bpmn bpmn_id
# is allowed to be READY/COMPLETED. If two are READY and execute, then the tasks after the gateway will
# be unintentially duplicated.
has_gateway_children = False
for spiff_task in engine_steps:
self.delegate.will_complete_task(spiff_task)
futures.append(
executor.submit(
self._run,
spiff_task,
current_app._get_current_object(),
user,
process_instance_model.process_model_identifier,
)
)
for future in concurrent.futures.as_completed(futures):
spiff_task = future.result()
self.delegate.did_complete_task(spiff_task)
for child_task in spiff_task.children:
if isinstance(child_task.task_spec, UnstructuredJoin):
has_gateway_children = True
if current_app.config["SPIFFWORKFLOW_BACKEND_USE_THREADS_FOR_TASK_EXECUTION"] and not has_gateway_children:
self._run_engine_steps_with_threads(engine_steps, process_instance_model, user)
else:
self._run_engine_steps_without_threads(engine_steps, process_instance_model, user)
if self.should_break_after(engine_steps):
# we could call the stuff at the top of the loop again and find out, but let's not do that unless we need to
task_runnability = TaskRunnability.unknown_if_ready_tasks
@ -184,6 +182,45 @@ class ExecutionStrategy:
def get_ready_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> list[SpiffTask]:
return [t for t in bpmn_process_instance.get_tasks(state=TaskState.READY) if not t.task_spec.manual]
def _run_engine_steps_with_threads(
self, engine_steps: list[SpiffTask], process_instance: ProcessInstanceModel, user: UserModel | None
) -> None:
# This only works because of the GIL, and the fact that we are not actually executing
# code in parallel, we are just waiting for I/O in parallel. So it can run a ton of
# service tasks at once - many api calls, and then get those responses back without
# waiting for each individual task to complete.
futures = []
with concurrent.futures.ThreadPoolExecutor() as executor:
for spiff_task in engine_steps:
self.delegate.will_complete_task(spiff_task)
futures.append(
executor.submit(
self._run,
spiff_task,
current_app._get_current_object(),
user,
process_instance.process_model_identifier,
)
)
for future in concurrent.futures.as_completed(futures):
spiff_task = future.result()
for spiff_task in engine_steps:
self.delegate.did_complete_task(spiff_task)
def _run_engine_steps_without_threads(
self, engine_steps: list[SpiffTask], process_instance: ProcessInstanceModel, user: UserModel | None
) -> None:
for spiff_task in engine_steps:
self.delegate.will_complete_task(spiff_task)
self._run(
spiff_task,
current_app._get_current_object(),
user,
process_instance.process_model_identifier,
)
self.delegate.did_complete_task(spiff_task)
class TaskModelSavingDelegate(EngineStepDelegate):
"""Engine step delegate that takes care of saving a task model to the database.

View File

@ -16,17 +16,17 @@
<bpmn:sequenceFlow id="Flow_1gjknxb" sourceRef="Gateway_0to34re" targetRef="Activity_0mj64s2" />
<bpmn:sequenceFlow id="Flow_0cj2tia" sourceRef="Gateway_0to34re" targetRef="Activity_1dlqr5f" />
<bpmn:sequenceFlow id="Flow_0pjsxm2" sourceRef="Gateway_0to34re" targetRef="Activity_1pndt6s" />
<bpmn:sequenceFlow id="Flow_08yg9t5" sourceRef="Activity_1srrljd" targetRef="Gateway_0ee2g9g" />
<bpmn:sequenceFlow id="Flow_08yg9t5" sourceRef="Activity_1srrljd" targetRef="Activity_0femqz9" />
<bpmn:parallelGateway id="Gateway_0ee2g9g">
<bpmn:incoming>Flow_08yg9t5</bpmn:incoming>
<bpmn:incoming>Flow_03k3kx2</bpmn:incoming>
<bpmn:incoming>Flow_1pm1w0h</bpmn:incoming>
<bpmn:incoming>Flow_0e2holy</bpmn:incoming>
<bpmn:incoming>Flow_11fj5bn</bpmn:incoming>
<bpmn:incoming>Flow_02nckfs</bpmn:incoming>
<bpmn:incoming>Flow_0s3m7td</bpmn:incoming>
<bpmn:incoming>Flow_1nyxldz</bpmn:incoming>
<bpmn:outgoing>Flow_0kguhla</bpmn:outgoing>
</bpmn:parallelGateway>
<bpmn:sequenceFlow id="Flow_03k3kx2" sourceRef="Activity_0mj64s2" targetRef="Gateway_0ee2g9g" />
<bpmn:sequenceFlow id="Flow_1pm1w0h" sourceRef="Activity_1dlqr5f" targetRef="Gateway_0ee2g9g" />
<bpmn:sequenceFlow id="Flow_0e2holy" sourceRef="Activity_1pndt6s" targetRef="Gateway_0ee2g9g" />
<bpmn:sequenceFlow id="Flow_03k3kx2" sourceRef="Activity_0mj64s2" targetRef="Activity_0tchoum" />
<bpmn:sequenceFlow id="Flow_1pm1w0h" sourceRef="Activity_1dlqr5f" targetRef="Activity_0btwzqo" />
<bpmn:sequenceFlow id="Flow_0e2holy" sourceRef="Activity_1pndt6s" targetRef="Activity_0ggu1e4" />
<bpmn:endEvent id="Event_0g4hezy">
<bpmn:incoming>Flow_0kguhla</bpmn:incoming>
</bpmn:endEvent>
@ -55,6 +55,26 @@ c=1</bpmn:script>
<bpmn:script>time.sleep(0.1)
d=1</bpmn:script>
</bpmn:scriptTask>
<bpmn:sequenceFlow id="Flow_11fj5bn" sourceRef="Activity_0femqz9" targetRef="Gateway_0ee2g9g" />
<bpmn:sequenceFlow id="Flow_02nckfs" sourceRef="Activity_0tchoum" targetRef="Gateway_0ee2g9g" />
<bpmn:sequenceFlow id="Flow_0s3m7td" sourceRef="Activity_0btwzqo" targetRef="Gateway_0ee2g9g" />
<bpmn:sequenceFlow id="Flow_1nyxldz" sourceRef="Activity_0ggu1e4" targetRef="Gateway_0ee2g9g" />
<bpmn:scriptTask id="Activity_0ggu1e4" name="PaddingD">
<bpmn:incoming>Flow_0e2holy</bpmn:incoming>
<bpmn:outgoing>Flow_1nyxldz</bpmn:outgoing>
</bpmn:scriptTask>
<bpmn:scriptTask id="Activity_0btwzqo" name="PaddingC">
<bpmn:incoming>Flow_1pm1w0h</bpmn:incoming>
<bpmn:outgoing>Flow_0s3m7td</bpmn:outgoing>
</bpmn:scriptTask>
<bpmn:scriptTask id="Activity_0tchoum" name="PaddingB">
<bpmn:incoming>Flow_03k3kx2</bpmn:incoming>
<bpmn:outgoing>Flow_02nckfs</bpmn:outgoing>
</bpmn:scriptTask>
<bpmn:scriptTask id="Activity_0femqz9" name="PaddingA">
<bpmn:incoming>Flow_08yg9t5</bpmn:incoming>
<bpmn:outgoing>Flow_11fj5bn</bpmn:outgoing>
</bpmn:scriptTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Process_pvfr8r8">
@ -65,10 +85,10 @@ d=1</bpmn:script>
<dc:Bounds x="45" y="15" width="50" height="50" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Gateway_0wvflyk_di" bpmnElement="Gateway_0ee2g9g">
<dc:Bounds x="325" y="15" width="50" height="50" />
<dc:Bounds x="455" y="15" width="50" height="50" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_0g4hezy_di" bpmnElement="Event_0g4hezy">
<dc:Bounds x="442" y="22" width="36" height="36" />
<dc:Bounds x="572" y="22" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1cr4mii_di" bpmnElement="Activity_1srrljd">
<dc:Bounds x="160" y="0" width="100" height="80" />
@ -86,6 +106,22 @@ d=1</bpmn:script>
<dc:Bounds x="160" y="330" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0xdpg5u_di" bpmnElement="Activity_0tchoum">
<dc:Bounds x="310" y="110" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0xlvrqx_di" bpmnElement="Activity_0femqz9">
<dc:Bounds x="310" y="0" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0vfd6nh_di" bpmnElement="Activity_0btwzqo">
<dc:Bounds x="310" y="220" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1ophz6v_di" bpmnElement="Activity_0ggu1e4">
<dc:Bounds x="310" y="330" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_0v5xitk_di" bpmnElement="Flow_0v5xitk">
<di:waypoint x="-12" y="40" />
<di:waypoint x="45" y="40" />
@ -111,26 +147,42 @@ d=1</bpmn:script>
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_08yg9t5_di" bpmnElement="Flow_08yg9t5">
<di:waypoint x="260" y="40" />
<di:waypoint x="325" y="40" />
<di:waypoint x="310" y="40" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_03k3kx2_di" bpmnElement="Flow_03k3kx2">
<di:waypoint x="260" y="150" />
<di:waypoint x="350" y="150" />
<di:waypoint x="350" y="65" />
<di:waypoint x="310" y="150" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1pm1w0h_di" bpmnElement="Flow_1pm1w0h">
<di:waypoint x="260" y="260" />
<di:waypoint x="350" y="260" />
<di:waypoint x="350" y="65" />
<di:waypoint x="310" y="260" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0e2holy_di" bpmnElement="Flow_0e2holy">
<di:waypoint x="260" y="370" />
<di:waypoint x="350" y="370" />
<di:waypoint x="350" y="65" />
<di:waypoint x="310" y="370" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0kguhla_di" bpmnElement="Flow_0kguhla">
<di:waypoint x="375" y="40" />
<di:waypoint x="442" y="40" />
<di:waypoint x="505" y="40" />
<di:waypoint x="572" y="40" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_11fj5bn_di" bpmnElement="Flow_11fj5bn">
<di:waypoint x="410" y="40" />
<di:waypoint x="455" y="40" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_02nckfs_di" bpmnElement="Flow_02nckfs">
<di:waypoint x="410" y="150" />
<di:waypoint x="480" y="150" />
<di:waypoint x="480" y="65" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0s3m7td_di" bpmnElement="Flow_0s3m7td">
<di:waypoint x="410" y="260" />
<di:waypoint x="480" y="260" />
<di:waypoint x="480" y="65" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1nyxldz_di" bpmnElement="Flow_1nyxldz">
<di:waypoint x="410" y="370" />
<di:waypoint x="480" y="370" />
<di:waypoint x="480" y="65" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>