* BPMN Process needed a cascade delete so that deleting a subprocess would remove all the subprocesses within that subprocess

* Trying to remove the duplicate code that might be causing bugs in the manual_task_complete
* Adding a test to show that a gateway can be successfully completed manually one step at a time.
This commit is contained in:
danfunk 2023-05-12 06:38:34 -04:00
parent 39962bf6c6
commit 1db065fb65
5 changed files with 191 additions and 78 deletions

View File

@ -3,7 +3,7 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from sqlalchemy import ForeignKey from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship, backref
from spiffworkflow_backend.models.bpmn_process_definition import BpmnProcessDefinitionModel from spiffworkflow_backend.models.bpmn_process_definition import BpmnProcessDefinitionModel
from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.db import db
@ -38,6 +38,8 @@ class BpmnProcessModel(SpiffworkflowBaseDBModel):
json_data_hash: str = db.Column(db.String(255), nullable=False, index=True) json_data_hash: str = db.Column(db.String(255), nullable=False, index=True)
tasks = relationship("TaskModel", back_populates="bpmn_process", cascade="delete") # type: ignore tasks = relationship("TaskModel", back_populates="bpmn_process", cascade="delete") # type: ignore
child_processes = relationship("BpmnProcessModel", foreign_keys=[direct_parent_process_id],
cascade="all") # type: ignore
# FIXME: find out how to set this but it'd be cool # FIXME: find out how to set this but it'd be cool
start_in_seconds: float = db.Column(db.DECIMAL(17, 6)) start_in_seconds: float = db.Column(db.DECIMAL(17, 6))

View File

@ -24,7 +24,7 @@ from uuid import UUID
import dateparser import dateparser
import pytz import pytz
from flask import current_app from flask import current_app, g
from lxml import etree # type: ignore from lxml import etree # type: ignore
from lxml.etree import XMLSyntaxError # type: ignore from lxml.etree import XMLSyntaxError # type: ignore
from RestrictedPython import safe_globals # type: ignore from RestrictedPython import safe_globals # type: ignore
@ -415,9 +415,9 @@ class ProcessInstanceProcessor:
tld.process_instance_id = process_instance_model.id tld.process_instance_id = process_instance_model.id
# we want this to be the fully qualified path to the process model including all group subcomponents # we want this to be the fully qualified path to the process model including all group subcomponents
current_app.config["THREAD_LOCAL_DATA"].process_model_identifier = ( current_app.config[
f"{process_instance_model.process_model_identifier}" "THREAD_LOCAL_DATA"
) ].process_model_identifier = f"{process_instance_model.process_model_identifier}"
self.process_instance_model = process_instance_model self.process_instance_model = process_instance_model
self.process_model_service = ProcessModelService() self.process_model_service = ProcessModelService()
@ -577,9 +577,9 @@ class ProcessInstanceProcessor:
bpmn_subprocess_definition.bpmn_identifier bpmn_subprocess_definition.bpmn_identifier
] = bpmn_process_definition_dict ] = bpmn_process_definition_dict
spiff_bpmn_process_dict["subprocess_specs"][bpmn_subprocess_definition.bpmn_identifier]["task_specs"] = {} spiff_bpmn_process_dict["subprocess_specs"][bpmn_subprocess_definition.bpmn_identifier]["task_specs"] = {}
bpmn_subprocess_definition_bpmn_identifiers[bpmn_subprocess_definition.id] = ( bpmn_subprocess_definition_bpmn_identifiers[
bpmn_subprocess_definition.bpmn_identifier bpmn_subprocess_definition.id
) ] = bpmn_subprocess_definition.bpmn_identifier
task_definitions = TaskDefinitionModel.query.filter( task_definitions = TaskDefinitionModel.query.filter(
TaskDefinitionModel.bpmn_process_definition_id.in_( # type: ignore TaskDefinitionModel.bpmn_process_definition_id.in_( # type: ignore
@ -1116,76 +1116,35 @@ class ProcessInstanceProcessor:
event_type = ProcessInstanceEventType.task_skipped.value event_type = ProcessInstanceEventType.task_skipped.value
start_time = time.time() start_time = time.time()
if execute: if spiff_task.task_spec.manual:
# Executing or not executing a human task results in the same state.
current_app.logger.info(
f"Manually skipping Human Task {spiff_task.task_spec.name} of process"
f" instance {self.process_instance_model.id}"
)
human_task = HumanTaskModel.query.filter_by(task_id=task_id).first()
self.complete_task(spiff_task, human_task=human_task, user=g.user)
elif execute:
current_app.logger.info( current_app.logger.info(
f"Manually executing Task {spiff_task.task_spec.name} of process" f"Manually executing Task {spiff_task.task_spec.name} of process"
f" instance {self.process_instance_model.id}" f" instance {self.process_instance_model.id}"
) )
# Executing a sub-workflow manually will restart its subprocess and allow stepping through it self.do_engine_steps(save=True, execution_strategy_name="one_at_a_time")
if isinstance(spiff_task.task_spec, SubWorkflowTask):
subprocess = self.bpmn_process_instance.get_subprocess(spiff_task)
# We have to get to the actual start event
for spiff_task in self.bpmn_process_instance.get_tasks(workflow=subprocess):
spiff_task.run()
if spiff_task.task_spec.__class__.__name__ == "StartEvent":
break
else: else:
spiff_task.run() current_app.logger.info(f"Skipped task {spiff_task.task_spec.name}", extra=spiff_task.log_info())
event_type = ProcessInstanceEventType.task_executed_manually.value self.do_engine_steps(save=True, execution_strategy_name="skip_one")
else:
spiff_logger = logging.getLogger("spiff")
spiff_logger.info(f"Skipped task {spiff_task.task_spec.name}", extra=spiff_task.log_info())
spiff_task.complete()
spiff_task.workflow.last_task = spiff_task
end_in_seconds = time.time()
if spiff_task.task_spec.__class__.__name__ == "EndEvent":
for task in self.bpmn_process_instance.get_tasks(TaskState.DEFINITE_MASK, workflow=spiff_task.workflow):
task.complete()
# A subworkflow task will become ready when its workflow is complete. Engine steps would normally
# then complete it, but we have to do it ourselves here.
for task in self.bpmn_process_instance.get_tasks(TaskState.READY):
if isinstance(task.task_spec, SubWorkflowTask):
task.complete()
spiff_tasks = self.bpmn_process_instance.get_tasks()
task_service = TaskService( task_service = TaskService(
process_instance=self.process_instance_model, process_instance=self.process_instance_model,
serializer=self._serializer, serializer=self._serializer,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
) )
task_service.update_all_tasks_from_spiff_tasks(spiff_tasks, [], start_time)
spiff_tasks_updated = {}
for task in self.bpmn_process_instance.get_tasks():
if task.last_state_change > start_time:
spiff_tasks_updated[task.id] = task
for updated_spiff_task in spiff_tasks_updated.values():
(
bpmn_process,
task_model,
) = task_service.find_or_create_task_model_from_spiff_task(
updated_spiff_task,
)
bpmn_process_to_use = bpmn_process or task_model.bpmn_process
bpmn_process_json_data = TaskService.update_task_data_on_bpmn_process(
bpmn_process_to_use, updated_spiff_task.workflow.data
)
db.session.add(bpmn_process_to_use)
task_service.update_task_model(task_model, updated_spiff_task)
if bpmn_process_json_data is not None:
task_service.json_data_dicts[bpmn_process_json_data["hash"]] = bpmn_process_json_data
# spiff_task should be the main task we are completing and only it should get the timestamps
if task_model.guid == str(spiff_task.id):
task_model.start_in_seconds = start_in_seconds
task_model.end_in_seconds = end_in_seconds
task_service.task_models[task_model.guid] = task_model
task_service.save_objects_to_database()
ProcessInstanceTmpService.add_event_to_process_instance( ProcessInstanceTmpService.add_event_to_process_instance(
self.process_instance_model, event_type, task_guid=task_id self.process_instance_model, event_type, task_guid=task_id
) )
self.save() self.save()
# Saving the workflow seems to reset the status # Saving the workflow seems to reset the status
self.suspend() self.suspend()
@ -1200,18 +1159,15 @@ class ProcessInstanceProcessor:
process_instance, ProcessInstanceEventType.process_instance_rewound_to_task.value, task_guid=to_task_guid process_instance, ProcessInstanceEventType.process_instance_rewound_to_task.value, task_guid=to_task_guid
) )
processor = ProcessInstanceProcessor(process_instance) processor = ProcessInstanceProcessor(process_instance)
initial_spiff_tasks = processor.bpmn_process_instance.get_tasks() deleted_tasks = processor.bpmn_process_instance.reset_from_task_id(UUID(to_task_guid))
spiff_tasks = processor.bpmn_process_instance.get_tasks()
processor.bpmn_process_instance.reset_from_task_id(UUID(to_task_guid))
current_spiff_tasks = processor.bpmn_process_instance.get_tasks()
deleted_spiff_tasks = [t for t in initial_spiff_tasks if t not in current_spiff_tasks]
task_service = TaskService( task_service = TaskService(
process_instance=processor.process_instance_model, process_instance=processor.process_instance_model,
serializer=processor._serializer, serializer=processor._serializer,
bpmn_definition_to_task_definitions_mappings=processor.bpmn_definition_to_task_definitions_mappings, bpmn_definition_to_task_definitions_mappings=processor.bpmn_definition_to_task_definitions_mappings,
) )
task_service.update_all_tasks_from_spiff_tasks(current_spiff_tasks, deleted_spiff_tasks, start_time) task_service.update_all_tasks_from_spiff_tasks(spiff_tasks, deleted_tasks, start_time)
# Save the process # Save the process
processor.save() processor.save()
@ -1611,7 +1567,9 @@ class ProcessInstanceProcessor:
"""Complete_task.""" """Complete_task."""
task_model = TaskModel.query.filter_by(guid=human_task.task_id).first() task_model = TaskModel.query.filter_by(guid=human_task.task_id).first()
if task_model is None: if task_model is None:
raise TaskNotFoundError(f"Cannot find a task with guid {human_task.task_id}") raise TaskNotFoundError(
f"Cannot find a task with guid {self.process_instance_model.id} and task_id is {human_task.task_id}"
)
task_model.start_in_seconds = time.time() task_model.start_in_seconds = time.time()
self.bpmn_process_instance.run_task_from_id(spiff_task.id) self.bpmn_process_instance.run_task_from_id(spiff_task.id)
@ -1719,22 +1677,19 @@ class ProcessInstanceProcessor:
def terminate(self) -> None: def terminate(self) -> None:
start_time = time.time() start_time = time.time()
initial_spiff_tasks = self.bpmn_process_instance.get_tasks() deleted_tasks = self.bpmn_process_instance.cancel() or []
spiff_tasks = self.bpmn_process_instance.get_tasks()
self.bpmn_process_instance.cancel()
current_spiff_tasks = self.bpmn_process_instance.get_tasks()
deleted_spiff_tasks = [t for t in initial_spiff_tasks if t not in current_spiff_tasks]
task_service = TaskService( task_service = TaskService(
process_instance=self.process_instance_model, process_instance=self.process_instance_model,
serializer=self._serializer, serializer=self._serializer,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
) )
task_service.update_all_tasks_from_spiff_tasks(current_spiff_tasks, deleted_spiff_tasks, start_time) task_service.update_all_tasks_from_spiff_tasks(spiff_tasks, deleted_tasks, start_time)
# we may want to move this to task_service.update_all_tasks_from_spiff_tasks but not sure it's always good to it. # we may want to move this to task_service.update_all_tasks_from_spiff_tasks but not sure it's always good to it.
# for cancelled tasks, spiff only returns tasks that were cancelled, not the ones that were deleted so we have to find them # for cancelled tasks, spiff only returns tasks that were cancelled, not the ones that were deleted so we have to find them
spiff_task_guids = [str(st.id) for st in current_spiff_tasks] spiff_task_guids = [str(st.id) for st in spiff_tasks]
tasks_no_longer_in_spiff = TaskModel.query.filter( tasks_no_longer_in_spiff = TaskModel.query.filter(
and_( and_(
TaskModel.process_instance_id == self.process_instance_model.id, TaskModel.process_instance_id == self.process_instance_model.id,

View File

@ -339,6 +339,17 @@ class OneAtATimeExecutionStrategy(ExecutionStrategy):
self.delegate.did_complete_task(spiff_task) self.delegate.did_complete_task(spiff_task)
self.delegate.after_engine_steps(bpmn_process_instance) self.delegate.after_engine_steps(bpmn_process_instance)
class SkipOneExecutionStrategy(ExecutionStrategy):
"""When you want to to skip over the next task, rather than execute it."""
def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
engine_steps = self.get_ready_engine_steps(bpmn_process_instance)
if len(engine_steps) > 0:
spiff_task = engine_steps[0]
self.delegate.will_complete_task(spiff_task)
spiff_task.complete()
self.delegate.did_complete_task(spiff_task)
self.delegate.after_engine_steps(bpmn_process_instance)
def execution_strategy_named( def execution_strategy_named(
name: str, delegate: EngineStepDelegate, spec_loader: SubprocessSpecLoader name: str, delegate: EngineStepDelegate, spec_loader: SubprocessSpecLoader
@ -348,6 +359,7 @@ def execution_strategy_named(
"run_until_service_task": RunUntilServiceTaskExecutionStrategy, "run_until_service_task": RunUntilServiceTaskExecutionStrategy,
"run_until_user_message": RunUntilUserTaskOrMessageExecutionStrategy, "run_until_user_message": RunUntilUserTaskOrMessageExecutionStrategy,
"one_at_a_time": OneAtATimeExecutionStrategy, "one_at_a_time": OneAtATimeExecutionStrategy,
"skip_one": SkipOneExecutionStrategy,
}[name] }[name]
return cls(delegate, spec_loader) # type: ignore return cls(delegate, spec_loader) # type: ignore

View File

@ -0,0 +1,111 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:spiffworkflow="http://spiffworkflow.org/bpmn/schema/1.0/core" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" id="Definitions_96f6665" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.0.0-dev">
<bpmn:process id="Process_4bd2k7a" isExecutable="true">
<bpmn:startEvent id="StartEvent_1">
<bpmn:outgoing>Flow_1eng1ar</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_1eng1ar" sourceRef="StartEvent_1" targetRef="step_1" />
<bpmn:exclusiveGateway id="Gateway_Open">
<bpmn:incoming>Flow_0qv971t</bpmn:incoming>
<bpmn:outgoing>Flow_085anct</bpmn:outgoing>
<bpmn:outgoing>Flow_18ufaed</bpmn:outgoing>
</bpmn:exclusiveGateway>
<bpmn:sequenceFlow id="Flow_0qv971t" sourceRef="step_1" targetRef="Gateway_Open" />
<bpmn:sequenceFlow id="Flow_085anct" name="x != 1" sourceRef="Gateway_Open" targetRef="step_2_a">
<bpmn:conditionExpression>x != 1</bpmn:conditionExpression>
</bpmn:sequenceFlow>
<bpmn:sequenceFlow id="Flow_18ufaed" name="x = 1" sourceRef="Gateway_Open" targetRef="step_2_b">
<bpmn:conditionExpression>x == 1</bpmn:conditionExpression>
</bpmn:sequenceFlow>
<bpmn:exclusiveGateway id="Gateway_Close">
<bpmn:incoming>Flow_1lm2vap</bpmn:incoming>
<bpmn:incoming>Flow_1wo0x2b</bpmn:incoming>
<bpmn:outgoing>Flow_0rh1j2k</bpmn:outgoing>
</bpmn:exclusiveGateway>
<bpmn:sequenceFlow id="Flow_1lm2vap" sourceRef="step_2_a" targetRef="Gateway_Close" />
<bpmn:sequenceFlow id="Flow_1wo0x2b" sourceRef="step_2_b" targetRef="Gateway_Close" />
<bpmn:endEvent id="Event_1qvuct2">
<bpmn:incoming>Flow_0rh1j2k</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_0rh1j2k" sourceRef="Gateway_Close" targetRef="Event_1qvuct2" />
<bpmn:manualTask id="step_1" name="Step 1">
<bpmn:extensionElements>
<spiffworkflow:preScript>x = 1</spiffworkflow:preScript>
</bpmn:extensionElements>
<bpmn:incoming>Flow_1eng1ar</bpmn:incoming>
<bpmn:outgoing>Flow_0qv971t</bpmn:outgoing>
</bpmn:manualTask>
<bpmn:manualTask id="step_2_a" name="Step 2 A">
<bpmn:incoming>Flow_085anct</bpmn:incoming>
<bpmn:outgoing>Flow_1lm2vap</bpmn:outgoing>
</bpmn:manualTask>
<bpmn:manualTask id="step_2_b" name="Step 2 B">
<bpmn:incoming>Flow_18ufaed</bpmn:incoming>
<bpmn:outgoing>Flow_1wo0x2b</bpmn:outgoing>
</bpmn:manualTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Process_4bd2k7a">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Gateway_1or8vns_di" bpmnElement="Gateway_Open" isMarkerVisible="true">
<dc:Bounds x="425" y="152" width="50" height="50" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Gateway_1yp31fc_di" bpmnElement="Gateway_Close" isMarkerVisible="true">
<dc:Bounds x="685" y="152" width="50" height="50" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1qvuct2_di" bpmnElement="Event_1qvuct2">
<dc:Bounds x="792" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0te0key_di" bpmnElement="step_1">
<dc:Bounds x="270" y="137" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0egutyn_di" bpmnElement="step_2_a">
<dc:Bounds x="530" y="137" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1ab5han_di" bpmnElement="step_2_b">
<dc:Bounds x="530" y="250" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_1eng1ar_di" bpmnElement="Flow_1eng1ar">
<di:waypoint x="215" y="177" />
<di:waypoint x="270" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0qv971t_di" bpmnElement="Flow_0qv971t">
<di:waypoint x="370" y="177" />
<di:waypoint x="425" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_085anct_di" bpmnElement="Flow_085anct">
<di:waypoint x="475" y="177" />
<di:waypoint x="530" y="177" />
<bpmndi:BPMNLabel>
<dc:Bounds x="489" y="159" width="28" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_18ufaed_di" bpmnElement="Flow_18ufaed">
<di:waypoint x="450" y="202" />
<di:waypoint x="450" y="290" />
<di:waypoint x="530" y="290" />
<bpmndi:BPMNLabel>
<dc:Bounds x="453" y="243" width="25" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1lm2vap_di" bpmnElement="Flow_1lm2vap">
<di:waypoint x="630" y="177" />
<di:waypoint x="685" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1wo0x2b_di" bpmnElement="Flow_1wo0x2b">
<di:waypoint x="630" y="290" />
<di:waypoint x="710" y="290" />
<di:waypoint x="710" y="202" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0rh1j2k_di" bpmnElement="Flow_0rh1j2k">
<di:waypoint x="735" y="177" />
<di:waypoint x="792" y="177" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>

View File

@ -485,6 +485,39 @@ class TestProcessInstanceProcessor(BaseTest):
"stuck waiting for the call activity to complete (which was happening in a bug I'm fixing right now)" "stuck waiting for the call activity to complete (which was happening in a bug I'm fixing right now)"
) )
def test_step_through_gateway(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
process_model = load_test_spec(
process_model_id="test_group/step_through_gateway",
process_model_source_directory="step_through_gateway",
)
process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=with_super_admin_user
)
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
assert len(process_instance.active_human_tasks) == 1
human_task_one = process_instance.active_human_tasks[0]
spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id))
processor.manual_complete_task(str(human_task_one.task_id), execute=True)
processor.save()
processor = ProcessInstanceProcessor(process_instance)
assert processor.get_task_by_bpmn_identifier('step_1', processor.bpmn_process_instance).state == TaskState.COMPLETED
assert processor.get_task_by_bpmn_identifier('Gateway_Open', processor.bpmn_process_instance).state == TaskState.READY
gateway_task = processor.bpmn_process_instance.get_tasks(TaskState.READY)[0]
processor.manual_complete_task(str(gateway_task.id), execute=True)
processor.save()
processor = ProcessInstanceProcessor(process_instance)
assert processor.get_task_by_bpmn_identifier('Gateway_Open', processor.bpmn_process_instance).state == TaskState.COMPLETED
print(processor)
def test_properly_saves_tasks_when_running( def test_properly_saves_tasks_when_running(
self, self,
app: Flask, app: Flask,