* WIP - threading with dan

* WIP - cleanup with dan

* Fixing a minor bug and adding a comment and taking the win, even though it was all actually done by @jbirddog with help from @essweine

* adding a couple of simple tests

* * Threaded execution of multiple ready engine tasks is now the default behavior for all execution strategies by default (the skip-one strategry does not do it)
* Assure that tasks in the db match those in the spiffworkflow bpmn on save (remove all predicted and pruned tasks)

with @jbirddog, @jlantz, @burnettk

* run_pyl

---------

Co-authored-by: Jon Herron <jon.herron@yahoo.com>
Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
Dan Funk 2023-09-07 10:12:56 -04:00 committed by GitHub
parent c747c4be56
commit 80ad92a0c3
7 changed files with 433 additions and 97 deletions

View File

@ -139,7 +139,7 @@ def _run_extension(
process_instance, script_engine=CustomBpmnScriptEngine(use_restricted_script_engine=False)
)
if body and "extension_input" in body:
processor.do_engine_steps(save=False, execution_strategy_name="one_at_a_time")
processor.do_engine_steps(save=False, execution_strategy_name="run_current_ready_tasks")
next_task = processor.next_task()
next_task.update_data(body["extension_input"])
processor.do_engine_steps(save=False, execution_strategy_name="greedy")

View File

@ -446,6 +446,7 @@ def _interstitial_stream(
reported_ids = [] # A list of all the ids reported by this endpoint so far.
tasks = get_reportable_tasks()
while True:
has_ready_tasks = False
for spiff_task in tasks:
# ignore the instructions if they are on the EndEvent for the top level process
if not TaskService.is_main_process_end_event(spiff_task):
@ -465,30 +466,33 @@ def _interstitial_stream(
yield _render_data("task", task)
reported_ids.append(spiff_task.id)
if spiff_task.state == TaskState.READY:
# do not do any processing if the instance is not currently active
if process_instance.status not in ProcessInstanceModel.active_statuses():
yield _render_data("unrunnable_instance", process_instance)
return
if execute_tasks:
try:
# run_until_user_message does not run tasks with instructions to use one_at_a_time
# to force it to run the task.
processor.do_engine_steps(execution_strategy_name="one_at_a_time")
processor.do_engine_steps(execution_strategy_name="run_until_user_message")
processor.save() # Fixme - maybe find a way not to do this on every loop?
processor.refresh_waiting_tasks()
has_ready_tasks = True
except WorkflowTaskException as wfe:
api_error = ApiError.from_workflow_exception(
"engine_steps_error", "Failed to complete an automated task.", exp=wfe
)
yield _render_data("error", api_error)
ErrorHandlingService.handle_error(process_instance, wfe)
return
# return if process instance is now complete and let the frontend redirect to show page
if process_instance.status not in ProcessInstanceModel.active_statuses():
yield _render_data("unrunnable_instance", process_instance)
if has_ready_tasks:
# do not do any processing if the instance is not currently active
if process_instance.status not in ProcessInstanceModel.active_statuses():
yield _render_data("unrunnable_instance", process_instance)
return
if execute_tasks:
try:
# run_until_user_message does not run tasks with instructions so run readys first
# to force it to run the task.
processor.do_engine_steps(execution_strategy_name="run_current_ready_tasks")
processor.do_engine_steps(execution_strategy_name="run_until_user_message")
processor.save() # Fixme - maybe find a way not to do this on every loop?
processor.refresh_waiting_tasks()
except WorkflowTaskException as wfe:
api_error = ApiError.from_workflow_exception(
"engine_steps_error", "Failed to complete an automated task.", exp=wfe
)
yield _render_data("error", api_error)
ErrorHandlingService.handle_error(process_instance, wfe)
return
# return if process instance is now complete and let the frontend redirect to show page
if process_instance.status not in ProcessInstanceModel.active_statuses():
yield _render_data("unrunnable_instance", process_instance)
return
# path used by the interstitial page while executing tasks - ie the background processor is not executing them
ready_engine_task_count = _get_ready_engine_step_count(processor.bpmn_process_instance)

View File

@ -1005,6 +1005,15 @@ class ProcessInstanceProcessor:
db.session.add(self.process_instance_model)
db.session.commit()
known_task_ids = [str(t.id) for t in self.bpmn_process_instance.get_tasks()]
TaskModel.query.filter(TaskModel.process_instance_id == self.process_instance_model.id).filter(
TaskModel.guid.notin_(known_task_ids) # type: ignore
).delete()
HumanTaskModel.query.filter(HumanTaskModel.process_instance_id == self.process_instance_model.id).filter(
HumanTaskModel.task_id.notin_(known_task_ids) # type: ignore
).delete()
db.session.commit()
human_tasks = HumanTaskModel.query.filter_by(
process_instance_id=self.process_instance_model.id, completed=False
).all()
@ -1124,7 +1133,7 @@ class ProcessInstanceProcessor:
f"Manually executing Task {spiff_task.task_spec.name} of process"
f" instance {self.process_instance_model.id}"
)
self.do_engine_steps(save=True, execution_strategy_name="one_at_a_time")
self.do_engine_steps(save=True, execution_strategy_name="run_current_ready_tasks")
else:
current_app.logger.info(f"Skipped task {spiff_task.task_spec.name}", extra=spiff_task.log_info())
task_model_delegate = TaskModelSavingDelegate(

View File

@ -1,5 +1,6 @@
from __future__ import annotations
import concurrent.futures
import copy
import time
from abc import abstractmethod
@ -7,6 +8,9 @@ from collections.abc import Callable
from typing import Any
from uuid import UUID
import flask.app
from flask import current_app
from flask import g
from SpiffWorkflow.bpmn.exceptions import WorkflowTaskException # type: ignore
from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore
from SpiffWorkflow.bpmn.specs.event_definitions.message import MessageEventDefinition # type: ignore
@ -84,9 +88,77 @@ class ExecutionStrategy:
self.subprocess_spec_loader = subprocess_spec_loader
self.options = options
@abstractmethod
def should_break_before(self, tasks: list[SpiffTask]) -> bool:
return False
def should_break_after(self, tasks: list[SpiffTask]) -> bool:
return False
def _run(
self,
spiff_task: SpiffTask,
app: flask.app.Flask,
process_instance_id: Any | None,
process_model_identifier: Any | None,
user: Any | None,
) -> SpiffTask:
with app.app_context():
app.config["THREAD_LOCAL_DATA"].process_instance_id = process_instance_id
app.config["THREAD_LOCAL_DATA"].process_model_identifier = process_model_identifier
g.user = user
spiff_task.run()
return spiff_task
def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
pass
# Note
while True:
bpmn_process_instance.refresh_waiting_tasks()
engine_steps = self.get_ready_engine_steps(bpmn_process_instance)
if self.should_break_before(engine_steps):
break
num_steps = len(engine_steps)
if num_steps == 0:
break
elif num_steps == 1:
spiff_task = engine_steps[0]
self.delegate.will_complete_task(spiff_task)
spiff_task.run()
self.delegate.did_complete_task(spiff_task)
elif num_steps > 1:
# This only works because of the GIL, and the fact that we are not actually executing
# code in parallel, we are just waiting for I/O in parallel. So it can run a ton of
# service tasks at once - many api calls, and then get those responses back without
# waiting for each individual task to complete.
futures = []
process_instance_id = None
process_model_identifier = None
if hasattr(current_app.config["THREAD_LOCAL_DATA"], "process_instance_id"):
process_instance_id = current_app.config["THREAD_LOCAL_DATA"].process_instance_id
process_model_identifier = current_app.config["THREAD_LOCAL_DATA"].process_model_identifier
user = None
if hasattr(g, "user"):
user = g.user
with concurrent.futures.ThreadPoolExecutor() as executor:
for spiff_task in engine_steps:
self.delegate.will_complete_task(spiff_task)
futures.append(
executor.submit(
self._run,
spiff_task,
current_app._get_current_object(),
process_instance_id,
process_model_identifier,
user,
)
)
for future in concurrent.futures.as_completed(futures):
spiff_task = future.result()
self.delegate.did_complete_task(spiff_task)
if self.should_break_after(engine_steps):
break
self.delegate.after_engine_steps(bpmn_process_instance)
def on_exception(self, bpmn_process_instance: BpmnWorkflow) -> None:
self.delegate.on_exception(bpmn_process_instance)
@ -99,7 +171,9 @@ class ExecutionStrategy:
if len(tasks) > 0:
self.subprocess_spec_loader()
tasks = [tasks[0]]
# TODO: verify the other execution strategies work still and delete to make this work like the name
# tasks = [tasks[0]]
return tasks
@ -261,47 +335,11 @@ class TaskModelSavingDelegate(EngineStepDelegate):
class GreedyExecutionStrategy(ExecutionStrategy):
"""The common execution strategy. This will greedily run all engine steps without stopping."""
def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
self.bpmn_process_instance = bpmn_process_instance
self.run_until_user_input_required(exit_at)
self.delegate.after_engine_steps(bpmn_process_instance)
def run_until_user_input_required(self, exit_at: None = None) -> None:
"""Keeps running tasks until there are no non-human READY tasks.
spiff.refresh_waiting_tasks is the thing that pushes some waiting tasks to READY.
"""
engine_steps = self.get_ready_engine_steps(self.bpmn_process_instance)
while engine_steps:
for spiff_task in engine_steps:
self.delegate.will_complete_task(spiff_task)
spiff_task.run()
self.delegate.did_complete_task(spiff_task)
self.bpmn_process_instance.refresh_waiting_tasks()
engine_steps = self.get_ready_engine_steps(self.bpmn_process_instance)
class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy):
"""For illustration purposes, not currently integrated.
Would allow the `run` from the UI to execute until a service task then
return (to an interstitial page). The background processor would then take over.
"""
This is what the base class does by default.
"""
def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
engine_steps = self.get_ready_engine_steps(bpmn_process_instance)
while engine_steps:
for spiff_task in engine_steps:
if spiff_task.task_spec.description == "Service Task":
return
self.delegate.will_complete_task(spiff_task)
spiff_task.run()
self.delegate.did_complete_task(spiff_task)
bpmn_process_instance.refresh_waiting_tasks()
engine_steps = self.get_ready_engine_steps(bpmn_process_instance)
self.delegate.after_engine_steps(bpmn_process_instance)
pass
class RunUntilUserTaskOrMessageExecutionStrategy(ExecutionStrategy):
@ -311,40 +349,22 @@ class RunUntilUserTaskOrMessageExecutionStrategy(ExecutionStrategy):
but will stop if it hits instructions after the first task.
"""
def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
should_continue = True
bpmn_process_instance.refresh_waiting_tasks()
engine_steps = self.get_ready_engine_steps(bpmn_process_instance)
while engine_steps and should_continue:
for task in engine_steps:
if hasattr(task.task_spec, "extensions") and task.task_spec.extensions.get(
"instructionsForEndUser", None
):
should_continue = False
break
self.delegate.will_complete_task(task)
task.run()
self.delegate.did_complete_task(task)
bpmn_process_instance.refresh_waiting_tasks()
engine_steps = self.get_ready_engine_steps(bpmn_process_instance)
self.delegate.after_engine_steps(bpmn_process_instance)
def should_break_before(self, tasks: list[SpiffTask]) -> bool:
for task in tasks:
if hasattr(task.task_spec, "extensions") and task.task_spec.extensions.get("instructionsForEndUser", None):
return True
return False
class OneAtATimeExecutionStrategy(ExecutionStrategy):
class RunCurrentReadyTasksExecutionStrategy(ExecutionStrategy):
"""When you want to run only one engine step at a time."""
def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
engine_steps = self.get_ready_engine_steps(bpmn_process_instance)
if len(engine_steps) > 0:
spiff_task = engine_steps[0]
self.delegate.will_complete_task(spiff_task)
spiff_task.run()
self.delegate.did_complete_task(spiff_task)
self.delegate.after_engine_steps(bpmn_process_instance)
def should_break_after(self, tasks: list[SpiffTask]) -> bool:
return True
class SkipOneExecutionStrategy(ExecutionStrategy):
"""When you want to to skip over the next task, rather than execute it."""
"""When you want to skip over the next task, rather than execute it."""
def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
spiff_task = None
@ -366,13 +386,12 @@ def execution_strategy_named(
) -> ExecutionStrategy:
cls = {
"greedy": GreedyExecutionStrategy,
"run_until_service_task": RunUntilServiceTaskExecutionStrategy,
"run_until_user_message": RunUntilUserTaskOrMessageExecutionStrategy,
"one_at_a_time": OneAtATimeExecutionStrategy,
"run_current_ready_tasks": RunCurrentReadyTasksExecutionStrategy,
"skip_one": SkipOneExecutionStrategy,
}[name]
return cls(delegate, spec_loader) # type: ignore
return cls(delegate, spec_loader)
ProcessInstanceCompleter = Callable[[BpmnWorkflow], None]
@ -409,7 +428,6 @@ class WorkflowExecutionService:
"The current thread has not obtained a lock for this process"
f" instance ({self.process_instance_model.id})."
)
try:
self.bpmn_process_instance.refresh_waiting_tasks()

View File

@ -0,0 +1,61 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" id="Definitions_96f6665" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.0.0-dev">
<bpmn:process id="Process_b32b5ju" isExecutable="true">
<bpmn:startEvent id="StartEvent_1">
<bpmn:outgoing>Flow_13c96f7</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_13c96f7" sourceRef="StartEvent_1" targetRef="Activity_0r556tq" />
<bpmn:endEvent id="Event_0enwpgk">
<bpmn:incoming>Flow_0pe3a6u</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_0pe3a6u" sourceRef="Activity_1xzxbmc" targetRef="Event_0enwpgk" />
<bpmn:scriptTask id="Activity_1xzxbmc" name="multi">
<bpmn:incoming>Flow_043nok3</bpmn:incoming>
<bpmn:outgoing>Flow_0pe3a6u</bpmn:outgoing>
<bpmn:multiInstanceLoopCharacteristics>
<bpmn:loopDataInputRef>letters</bpmn:loopDataInputRef>
<bpmn:loopDataOutputRef>upper_letters</bpmn:loopDataOutputRef>
<bpmn:inputDataItem id="letter" name="letter" />
<bpmn:outputDataItem id="upper_letter" name="upper_letter" />
</bpmn:multiInstanceLoopCharacteristics>
<bpmn:script>upper_letter = letter.upper()
time.sleep(0.1)</bpmn:script>
</bpmn:scriptTask>
<bpmn:sequenceFlow id="Flow_043nok3" sourceRef="Activity_0r556tq" targetRef="Activity_1xzxbmc" />
<bpmn:scriptTask id="Activity_0r556tq" name="set letters">
<bpmn:incoming>Flow_13c96f7</bpmn:incoming>
<bpmn:outgoing>Flow_043nok3</bpmn:outgoing>
<bpmn:script>letters = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']</bpmn:script>
</bpmn:scriptTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Process_b32b5ju">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="-138" y="62" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_0enwpgk_di" bpmnElement="Event_0enwpgk">
<dc:Bounds x="252" y="62" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_11mwjni_di" bpmnElement="Activity_1xzxbmc">
<dc:Bounds x="100" y="40" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_03vd1bk_di" bpmnElement="Activity_0r556tq">
<dc:Bounds x="-70" y="40" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_13c96f7_di" bpmnElement="Flow_13c96f7">
<di:waypoint x="-102" y="80" />
<di:waypoint x="-70" y="80" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0pe3a6u_di" bpmnElement="Flow_0pe3a6u">
<di:waypoint x="200" y="80" />
<di:waypoint x="252" y="80" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_043nok3_di" bpmnElement="Flow_043nok3">
<di:waypoint x="30" y="80" />
<di:waypoint x="100" y="80" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>

View File

@ -0,0 +1,137 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" id="Definitions_96f6665" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.0.0-dev">
<bpmn:process id="Process_pvfr8r8" isExecutable="true">
<bpmn:startEvent id="StartEvent_1">
<bpmn:outgoing>Flow_0v5xitk</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_0v5xitk" sourceRef="StartEvent_1" targetRef="Gateway_0to34re" />
<bpmn:parallelGateway id="Gateway_0to34re">
<bpmn:incoming>Flow_0v5xitk</bpmn:incoming>
<bpmn:outgoing>Flow_1lhzzw7</bpmn:outgoing>
<bpmn:outgoing>Flow_1gjknxb</bpmn:outgoing>
<bpmn:outgoing>Flow_0cj2tia</bpmn:outgoing>
<bpmn:outgoing>Flow_0pjsxm2</bpmn:outgoing>
</bpmn:parallelGateway>
<bpmn:sequenceFlow id="Flow_1lhzzw7" sourceRef="Gateway_0to34re" targetRef="Activity_1srrljd" />
<bpmn:sequenceFlow id="Flow_1gjknxb" sourceRef="Gateway_0to34re" targetRef="Activity_0mj64s2" />
<bpmn:sequenceFlow id="Flow_0cj2tia" sourceRef="Gateway_0to34re" targetRef="Activity_1dlqr5f" />
<bpmn:sequenceFlow id="Flow_0pjsxm2" sourceRef="Gateway_0to34re" targetRef="Activity_1pndt6s" />
<bpmn:sequenceFlow id="Flow_08yg9t5" sourceRef="Activity_1srrljd" targetRef="Gateway_0ee2g9g" />
<bpmn:parallelGateway id="Gateway_0ee2g9g">
<bpmn:incoming>Flow_08yg9t5</bpmn:incoming>
<bpmn:incoming>Flow_03k3kx2</bpmn:incoming>
<bpmn:incoming>Flow_1pm1w0h</bpmn:incoming>
<bpmn:incoming>Flow_0e2holy</bpmn:incoming>
<bpmn:outgoing>Flow_0kguhla</bpmn:outgoing>
</bpmn:parallelGateway>
<bpmn:sequenceFlow id="Flow_03k3kx2" sourceRef="Activity_0mj64s2" targetRef="Gateway_0ee2g9g" />
<bpmn:sequenceFlow id="Flow_1pm1w0h" sourceRef="Activity_1dlqr5f" targetRef="Gateway_0ee2g9g" />
<bpmn:sequenceFlow id="Flow_0e2holy" sourceRef="Activity_1pndt6s" targetRef="Gateway_0ee2g9g" />
<bpmn:endEvent id="Event_0g4hezy">
<bpmn:incoming>Flow_0kguhla</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_0kguhla" sourceRef="Gateway_0ee2g9g" targetRef="Event_0g4hezy" />
<bpmn:scriptTask id="Activity_1srrljd" name="ThreadTaskA">
<bpmn:incoming>Flow_1lhzzw7</bpmn:incoming>
<bpmn:outgoing>Flow_08yg9t5</bpmn:outgoing>
<bpmn:script>time.sleep(0.1)
a=1</bpmn:script>
</bpmn:scriptTask>
<bpmn:scriptTask id="Activity_0mj64s2" name="ThreadTaskB">
<bpmn:incoming>Flow_1gjknxb</bpmn:incoming>
<bpmn:outgoing>Flow_03k3kx2</bpmn:outgoing>
<bpmn:script>time.sleep(0.1)
b=1</bpmn:script>
</bpmn:scriptTask>
<bpmn:scriptTask id="Activity_1dlqr5f" name="ThreadTaskC">
<bpmn:incoming>Flow_0cj2tia</bpmn:incoming>
<bpmn:outgoing>Flow_1pm1w0h</bpmn:outgoing>
<bpmn:script>time.sleep(0.1)
c=1</bpmn:script>
</bpmn:scriptTask>
<bpmn:scriptTask id="Activity_1pndt6s" name="ThreadTaskD">
<bpmn:incoming>Flow_0pjsxm2</bpmn:incoming>
<bpmn:outgoing>Flow_0e2holy</bpmn:outgoing>
<bpmn:script>time.sleep(0.1)
d=1</bpmn:script>
</bpmn:scriptTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Process_pvfr8r8">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="-48" y="22" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Gateway_0kvew6a_di" bpmnElement="Gateway_0to34re">
<dc:Bounds x="45" y="15" width="50" height="50" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Gateway_0wvflyk_di" bpmnElement="Gateway_0ee2g9g">
<dc:Bounds x="325" y="15" width="50" height="50" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_0g4hezy_di" bpmnElement="Event_0g4hezy">
<dc:Bounds x="442" y="22" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1cr4mii_di" bpmnElement="Activity_1srrljd">
<dc:Bounds x="160" y="0" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_061rddp_di" bpmnElement="Activity_0mj64s2">
<dc:Bounds x="160" y="110" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1tntvlg_di" bpmnElement="Activity_1dlqr5f">
<dc:Bounds x="160" y="220" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1y62zlm_di" bpmnElement="Activity_1pndt6s">
<dc:Bounds x="160" y="330" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_0v5xitk_di" bpmnElement="Flow_0v5xitk">
<di:waypoint x="-12" y="40" />
<di:waypoint x="45" y="40" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1lhzzw7_di" bpmnElement="Flow_1lhzzw7">
<di:waypoint x="95" y="40" />
<di:waypoint x="160" y="40" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1gjknxb_di" bpmnElement="Flow_1gjknxb">
<di:waypoint x="70" y="65" />
<di:waypoint x="70" y="150" />
<di:waypoint x="160" y="150" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0cj2tia_di" bpmnElement="Flow_0cj2tia">
<di:waypoint x="70" y="65" />
<di:waypoint x="70" y="260" />
<di:waypoint x="160" y="260" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0pjsxm2_di" bpmnElement="Flow_0pjsxm2">
<di:waypoint x="70" y="65" />
<di:waypoint x="70" y="370" />
<di:waypoint x="160" y="370" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_08yg9t5_di" bpmnElement="Flow_08yg9t5">
<di:waypoint x="260" y="40" />
<di:waypoint x="325" y="40" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_03k3kx2_di" bpmnElement="Flow_03k3kx2">
<di:waypoint x="260" y="150" />
<di:waypoint x="350" y="150" />
<di:waypoint x="350" y="65" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1pm1w0h_di" bpmnElement="Flow_1pm1w0h">
<di:waypoint x="260" y="260" />
<di:waypoint x="350" y="260" />
<di:waypoint x="350" y="65" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0e2holy_di" bpmnElement="Flow_0e2holy">
<di:waypoint x="260" y="370" />
<di:waypoint x="350" y="370" />
<di:waypoint x="350" y="65" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0kguhla_di" bpmnElement="Flow_0kguhla">
<di:waypoint x="375" y="40" />
<di:waypoint x="442" y="40" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>

View File

@ -0,0 +1,107 @@
from flask import Flask
from flask.testing import FlaskClient
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.task import TaskModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
class TestThreadedExecution(BaseTest):
def test_four_parallel_script_tasks_that_wait_one_tenth_second(
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
self.create_process_group("test_group", "test_group")
process_model = load_test_spec(
process_model_id="test_group/threads_with_script_timers",
process_model_source_directory="threads_with_script_timers",
)
process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=with_super_admin_user
)
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
self.assert_same_start_times(process_instance, "ThreadTask", 4)
assert processor.bpmn_process_instance.is_completed()
assert processor.bpmn_process_instance.last_task.data == {"a": 1, "b": 1, "c": 1, "d": 1}
def test_multi_instance_can_run_in_parallel(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
self.create_process_group("test_group", "test_group")
process_model = load_test_spec(
process_model_id="test_group/threads_multi_instance",
process_model_source_directory="threads_multi_instance",
)
process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=with_super_admin_user
)
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
self.assert_same_start_times(process_instance, "multi", 26)
assert processor.bpmn_process_instance.is_completed()
upper_letters = processor.bpmn_process_instance.last_task.data["upper_letters"]
# Note that Sort is required here, because the threaded execution will complete the list out of order.
upper_letters.sort()
assert upper_letters == [
"A",
"B",
"C",
"D",
"E",
"F",
"G",
"H",
"I",
"J",
"K",
"L",
"M",
"N",
"O",
"P",
"Q",
"R",
"S",
"T",
"U",
"V",
"W",
"X",
"Y",
"Z",
]
def assert_same_start_times(
self, process_instance: ProcessInstanceModel, task_name_starts_with: str, expected_size: int
) -> None:
# The start_time recorded for each task should be nearly identical.
bpmn_process_id = process_instance.bpmn_process_id
task_models = TaskModel.query.filter_by(bpmn_process_id=bpmn_process_id).all()
script_tasks = list(
filter(
lambda tm: tm.task_definition.bpmn_name is not None
and tm.task_definition.bpmn_name.startswith(task_name_starts_with),
task_models,
)
)
assert len(script_tasks) == expected_size
last_task = None
for task_model in script_tasks:
if last_task is None:
last_task = task_model
continue
# Even through the script tasks sleep for .1 second, the difference in start times
# should be less than 0.001 seconds - they should all start at the same time.
assert (last_task.start_in_seconds - task_model.start_in_seconds) < 0.001 # type: ignore