the primary test is passing now but subprocesses and call activities are probably broken w/ burnettk

This commit is contained in:
jasquat 2023-03-09 17:16:44 -05:00
parent 4ce715fec8
commit 7e44c90fbb
7 changed files with 266 additions and 67 deletions

View File

@ -17,10 +17,10 @@ per-file-ignores =
# THEN, test_hey.py will NOT be excluding D103
# asserts are ok in tests
spiffworkflow-backend/tests/*:S101,D100,D101,D102,D103
spiffworkflow-backend/tests/*:S101,D100,D101,D102,D103,D107
# prefer naming functions descriptively rather than forcing comments
spiffworkflow-backend/src/*:D100,D101,D102,D103
spiffworkflow-backend/src/*:D100,D101,D102,D103,D107
spiffworkflow-backend/bin/keycloak_test_server.py:B950,D
spiffworkflow-backend/conftest.py:S105

View File

@ -17,10 +17,10 @@ per-file-ignores =
# THEN, test_hey.py will NOT be excluding D103
# asserts are ok in tests
tests/*:S101,D100,D101,D102,D103
tests/*:S101,D100,D101,D102,D103,D107
# prefer naming functions descriptively rather than forcing comments
src/*:D100,D101,D102,D103
src/*:D100,D101,D102,D103,D107
bin/keycloak_test_server.py:B950,D
conftest.py:S105

View File

@ -93,6 +93,7 @@ from spiffworkflow_backend.services.file_system_service import FileSystemService
from spiffworkflow_backend.services.process_model_service import ProcessModelService
from spiffworkflow_backend.services.service_task_service import ServiceTaskDelegate
from spiffworkflow_backend.services.spec_file_service import SpecFileService
from spiffworkflow_backend.services.task_service import TaskService
from spiffworkflow_backend.services.user_service import UserService
from spiffworkflow_backend.services.workflow_execution_service import (
execution_strategy_named,
@ -100,6 +101,9 @@ from spiffworkflow_backend.services.workflow_execution_service import (
from spiffworkflow_backend.services.workflow_execution_service import (
StepDetailLoggingDelegate,
)
from spiffworkflow_backend.services.workflow_execution_service import (
TaskModelSavingDelegate,
)
from spiffworkflow_backend.services.workflow_execution_service import (
WorkflowExecutionService,
)
@ -152,6 +156,10 @@ class SpiffStepDetailIsMissingError(Exception):
pass
class TaskNotFoundError(Exception):
pass
class BoxedTaskDataBasedScriptEngineEnvironment(BoxedTaskDataEnvironment): # type: ignore
def __init__(self, environment_globals: Dict[str, Any]):
"""BoxedTaskDataBasedScriptEngineEnvironment."""
@ -802,7 +810,7 @@ class ProcessInstanceProcessor:
if start_in_seconds is None:
start_in_seconds = time.time()
task_json = self.get_task_json_from_spiff_task(spiff_task)
task_json = self.get_task_dict_from_spiff_task(spiff_task)
return {
"process_instance_id": self.process_instance_model.id,
@ -1083,8 +1091,8 @@ class ProcessInstanceProcessor:
task_data_dict = task_properties.pop("data")
state_int = task_properties["state"]
task = TaskModel.query.filter_by(guid=task_id).first()
if task is None:
task_model = TaskModel.query.filter_by(guid=task_id).first()
if task_model is None:
# bpmn_process_identifier = task_properties['workflow_name']
# bpmn_identifier = task_properties['task_spec']
#
@ -1092,23 +1100,12 @@ class ProcessInstanceProcessor:
# .join(BpmnProcessDefinitionModel).filter(BpmnProcessDefinitionModel.bpmn_identifier==bpmn_process_identifier).first()
# if task_definition is None:
# subprocess_task = TaskModel.query.filter_by(guid=bpmn_process.guid)
task = TaskModel(guid=task_id, bpmn_process_id=bpmn_process.id)
task.state = TaskStateNames[state_int]
task.properties_json = task_properties
task_model = TaskModel(guid=task_id, bpmn_process_id=bpmn_process.id)
task_model.state = TaskStateNames[state_int]
task_model.properties_json = task_properties
task_data_json = json.dumps(task_data_dict, sort_keys=True).encode("utf8")
task_data_hash = sha256(task_data_json).hexdigest()
if task.json_data_hash != task_data_hash:
json_data = (
db.session.query(JsonDataModel.id)
.filter_by(hash=task_data_hash)
.first()
)
if json_data is None:
json_data = JsonDataModel(hash=task_data_hash, data=task_data_dict)
db.session.add(json_data)
task.json_data_hash = task_data_hash
db.session.add(task)
TaskService.update_task_data_on_task_model(task_model, task_data_dict)
db.session.add(task_model)
return bpmn_process
@ -1135,6 +1132,7 @@ class ProcessInstanceProcessor:
# FIXME: Update tasks in the did_complete_task instead to set the final info.
# We will need to somehow cache all tasks initially though before each task is run.
# Maybe always do this for first run - just need to know it's the first run.
if self.process_instance_model.bpmn_process_id is None:
subprocesses = process_instance_data_dict.pop("subprocesses")
bpmn_process_parent = self._add_bpmn_process(process_instance_data_dict)
for subprocess_task_id, subprocess_properties in subprocesses.items():
@ -1691,8 +1689,13 @@ class ProcessInstanceProcessor:
step_delegate = StepDetailLoggingDelegate(
self.increment_spiff_step, spiff_step_details_mapping_builder
)
task_model_delegate = TaskModelSavingDelegate(
secondary_engine_step_delegate=step_delegate,
serializer=self._serializer,
process_instance=self.process_instance_model,
)
execution_strategy = execution_strategy_named(
execution_strategy_name, step_delegate
execution_strategy_name, task_model_delegate
)
execution_service = WorkflowExecutionService(
self.bpmn_process_instance,
@ -1871,7 +1874,7 @@ class ProcessInstanceProcessor:
)
return user_tasks # type: ignore
def get_task_json_from_spiff_task(self, spiff_task: SpiffTask) -> dict[str, Any]:
def get_task_dict_from_spiff_task(self, spiff_task: SpiffTask) -> dict[str, Any]:
default_registry = DefaultRegistry()
task_data = default_registry.convert(spiff_task.data)
python_env = default_registry.convert(
@ -1884,17 +1887,29 @@ class ProcessInstanceProcessor:
return task_json
def complete_task(
self, task: SpiffTask, human_task: HumanTaskModel, user: UserModel
self, spiff_task: SpiffTask, human_task: HumanTaskModel, user: UserModel
) -> None:
"""Complete_task."""
self.bpmn_process_instance.complete_task_from_id(task.id)
task_model = TaskModel.query.filter_by(guid=human_task.task_id).first()
if task_model is None:
raise TaskNotFoundError(
"Cannot find a task with guid"
f" {self.process_instance_model.id} and task_id is {human_task.task_id}"
)
task_model.start_in_seconds = time.time()
self.bpmn_process_instance.complete_task_from_id(spiff_task.id)
task_model.end_in_seconds = time.time()
human_task.completed_by_user_id = user.id
human_task.completed = True
db.session.add(human_task)
# FIXME: remove when we switch over to using tasks only
details_model = (
SpiffStepDetailsModel.query.filter_by(
process_instance_id=self.process_instance_model.id,
task_id=str(task.id),
task_id=str(spiff_task.id),
task_state="READY",
)
.order_by(SpiffStepDetailsModel.id.desc()) # type: ignore
@ -1903,13 +1918,19 @@ class ProcessInstanceProcessor:
if details_model is None:
raise SpiffStepDetailIsMissingError(
"Cannot find a ready spiff_step_detail entry for process instance"
f" {self.process_instance_model.id} and task_id is {task.id}"
f" {self.process_instance_model.id} and task_id is {spiff_task.id}"
)
details_model.task_state = task.get_state_name()
details_model.task_state = spiff_task.get_state_name()
details_model.end_in_seconds = time.time()
details_model.task_json = self.get_task_json_from_spiff_task(task)
details_model.task_json = self.get_task_dict_from_spiff_task(spiff_task)
db.session.add(details_model)
# #######
TaskService.update_task_model_and_add_to_db_session(
task_model, spiff_task, self._serializer
)
# this is the thing that actually commits the db transaction (on behalf of the other updates above as well)
self.save()

View File

@ -0,0 +1,49 @@
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore
from SpiffWorkflow.task import TaskStateNames # type: ignore
from SpiffWorkflow.task import Task as SpiffTask
from spiffworkflow_backend.models.json_data import JsonDataModel # noqa: F401
from spiffworkflow_backend.models.db import db
from hashlib import sha256
import json
class TaskService():
@classmethod
def update_task_data_on_task_model(cls, task_model: TaskModel, task_data_dict: dict) -> None:
task_data_json = json.dumps(task_data_dict, sort_keys=True).encode("utf8")
task_data_hash = sha256(task_data_json).hexdigest()
if task_model.json_data_hash != task_data_hash:
json_data = (
db.session.query(JsonDataModel.id)
.filter_by(hash=task_data_hash)
.first()
)
if json_data is None:
json_data = JsonDataModel(hash=task_data_hash, data=task_data_dict)
db.session.add(json_data)
task_model.json_data_hash = task_data_hash
@classmethod
def update_task_model_and_add_to_db_session(cls, task_model: TaskModel, spiff_task: SpiffTask, serializer: BpmnWorkflowSerializer) -> None:
"""Updates properties_json and data on given task_model.
This will NOT update start_in_seconds or end_in_seconds.
"""
new_properties_json = serializer.task_to_dict(spiff_task)
task_model.properties_json = new_properties_json
task_model.state = TaskStateNames[new_properties_json['state']]
cls.update_task_data_on_task_model(task_model, spiff_task.data)
db.session.add(task_model)
@classmethod
def find_or_create_task_model_from_spiff_task(cls, spiff_task: SpiffTask, process_instance: ProcessInstanceModel) -> TaskModel:
spiff_task_guid = str(spiff_task.id)
task_model: TaskModel = TaskModel.query.filter_by(guid=spiff_task_guid).first()
# if task_model is None:
# task_model = TaskModel(guid=spiff_task_guid, bpmn_process_id=process_instance.bpmn_process_id)
# db.session.add(task_model)
# db.session.commit()
return task_model

View File

@ -2,7 +2,9 @@ import logging
import time
from typing import Callable
from typing import List
from typing import Optional
from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore
from SpiffWorkflow.exceptions import SpiffWorkflowException # type: ignore
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
@ -16,18 +18,20 @@ from spiffworkflow_backend.models.message_instance_correlation import (
)
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.spiff_step_details import SpiffStepDetailsModel
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.services.task_service import TaskService
class EngineStepDelegate:
"""Interface of sorts for a concrete engine step delegate."""
def will_complete_task(self, task: SpiffTask) -> None:
def will_complete_task(self, spiff_task: SpiffTask) -> None:
pass
def did_complete_task(self, task: SpiffTask) -> None:
def did_complete_task(self, spiff_task: SpiffTask) -> None:
pass
def save(self) -> None:
def save(self, commit: bool = False) -> None:
pass
@ -35,6 +39,54 @@ SpiffStepIncrementer = Callable[[], None]
SpiffStepDetailsMappingBuilder = Callable[[SpiffTask, float, float], dict]
class TaskModelSavingDelegate(EngineStepDelegate):
"""Engine step delegate that takes care of saving a task model to the database.
It can also be given another EngineStepDelegate.
"""
def __init__(
self,
serializer: BpmnWorkflowSerializer,
process_instance: ProcessInstanceModel,
secondary_engine_step_delegate: Optional[EngineStepDelegate] = None,
) -> None:
self.secondary_engine_step_delegate = secondary_engine_step_delegate
self.process_instance = process_instance
self.current_task_model: Optional[TaskModel] = None
self.serializer = serializer
def should_update_task_model(self) -> bool:
return self.process_instance.bpmn_process_id is not None
def will_complete_task(self, spiff_task: SpiffTask) -> None:
if self.should_update_task_model():
self.current_task_model = (
TaskService.find_or_create_task_model_from_spiff_task(
spiff_task, self.process_instance
)
)
self.current_task_model.start_in_seconds = time.time()
if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.will_complete_task(spiff_task)
def did_complete_task(self, spiff_task: SpiffTask) -> None:
if self.current_task_model and self.should_update_task_model():
self.current_task_model.end_in_seconds = time.time()
TaskService.update_task_model_and_add_to_db_session(
self.current_task_model, spiff_task, self.serializer
)
db.session.add(self.current_task_model)
if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.did_complete_task(spiff_task)
def save(self, _commit: bool = True) -> None:
if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.save(commit=False)
db.session.commit()
class StepDetailLoggingDelegate(EngineStepDelegate):
"""Engine step delegate that takes care of logging spiff step details.
@ -65,27 +117,28 @@ class StepDetailLoggingDelegate(EngineStepDelegate):
"Transactional Subprocess",
}
def should_log(self, task: SpiffTask) -> bool:
def should_log(self, spiff_task: SpiffTask) -> bool:
return (
task.task_spec.spec_type in self.tasks_to_log
and not task.task_spec.name.endswith(".EndJoin")
spiff_task.task_spec.spec_type in self.tasks_to_log
and not spiff_task.task_spec.name.endswith(".EndJoin")
)
def will_complete_task(self, task: SpiffTask) -> None:
if self.should_log(task):
def will_complete_task(self, spiff_task: SpiffTask) -> None:
if self.should_log(spiff_task):
self.current_task_start_in_seconds = time.time()
self.increment_spiff_step()
def did_complete_task(self, task: SpiffTask) -> None:
if self.should_log(task):
def did_complete_task(self, spiff_task: SpiffTask) -> None:
if self.should_log(spiff_task):
self.step_details.append(
self.spiff_step_details_mapping(
task, self.current_task_start_in_seconds, time.time()
spiff_task, self.current_task_start_in_seconds, time.time()
)
)
def save(self) -> None:
def save(self, commit: bool = True) -> None:
db.session.bulk_insert_mappings(SpiffStepDetailsModel, self.step_details)
if commit:
db.session.commit()
@ -136,12 +189,12 @@ class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy):
]
)
while engine_steps:
for task in engine_steps:
if task.task_spec.spec_type == "Service Task":
for spiff_task in engine_steps:
if spiff_task.task_spec.spec_type == "Service Task":
return
self.delegate.will_complete_task(task)
task.complete()
self.delegate.did_complete_task(task)
self.delegate.will_complete_task(spiff_task)
spiff_task.complete()
self.delegate.did_complete_task(spiff_task)
engine_steps = list(
[

View File

@ -2,39 +2,52 @@
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:spiffworkflow="http://spiffworkflow.org/bpmn/schema/1.0/core" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" id="Definitions_96f6665" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.0.0-dev">
<bpmn:process id="Process_ManualTask" name="Manual Task" isExecutable="true">
<bpmn:startEvent id="StartEvent_1">
<bpmn:outgoing>Flow_1xlck7g</bpmn:outgoing>
<bpmn:outgoing>Flow_0stlaxe</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_1xlck7g" sourceRef="StartEvent_1" targetRef="Activity_Hello" />
<bpmn:endEvent id="Event_0ia26nb">
<bpmn:endEvent id="end_event_of_manual_task_model">
<bpmn:incoming>Flow_0nnh2x9</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_0nnh2x9" sourceRef="Activity_Hello" targetRef="Event_0ia26nb" />
<bpmn:sequenceFlow id="Flow_0nnh2x9" sourceRef="Activity_Hello" targetRef="end_event_of_manual_task_model" />
<bpmn:manualTask id="Activity_Hello" name="Hello">
<bpmn:extensionElements>
<spiffworkflow:instructionsForEndUser>## Hello</spiffworkflow:instructionsForEndUser>
</bpmn:extensionElements>
<bpmn:incoming>Flow_1xlck7g</bpmn:incoming>
<bpmn:incoming>Flow_1pmem7s</bpmn:incoming>
<bpmn:outgoing>Flow_0nnh2x9</bpmn:outgoing>
</bpmn:manualTask>
<bpmn:sequenceFlow id="Flow_0stlaxe" sourceRef="StartEvent_1" targetRef="the_script" />
<bpmn:sequenceFlow id="Flow_1pmem7s" sourceRef="the_script" targetRef="Activity_Hello" />
<bpmn:scriptTask id="the_script">
<bpmn:incoming>Flow_0stlaxe</bpmn:incoming>
<bpmn:outgoing>Flow_1pmem7s</bpmn:outgoing>
<bpmn:script>the_new_var = "HEY"</bpmn:script>
</bpmn:scriptTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Process_ManualTask">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_0ia26nb_di" bpmnElement="Event_0ia26nb">
<dc:Bounds x="432" y="159" width="36" height="36" />
<bpmndi:BPMNShape id="Event_0ia26nb_di" bpmnElement="end_event_of_manual_task_model">
<dc:Bounds x="592" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1rcj16n_di" bpmnElement="Activity_Hello">
<dc:Bounds x="420" y="137" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1vokg57_di" bpmnElement="the_script">
<dc:Bounds x="270" y="137" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_1xlck7g_di" bpmnElement="Flow_1xlck7g">
<bpmndi:BPMNEdge id="Flow_0nnh2x9_di" bpmnElement="Flow_0nnh2x9">
<di:waypoint x="520" y="177" />
<di:waypoint x="592" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0stlaxe_di" bpmnElement="Flow_0stlaxe">
<di:waypoint x="215" y="177" />
<di:waypoint x="270" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0nnh2x9_di" bpmnElement="Flow_0nnh2x9">
<bpmndi:BPMNEdge id="Flow_1pmem7s_di" bpmnElement="Flow_1pmem7s">
<di:waypoint x="370" y="177" />
<di:waypoint x="432" y="177" />
<di:waypoint x="420" y="177" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>

View File

@ -10,6 +10,7 @@ from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.authorization_service import (
@ -292,6 +293,68 @@ class TestProcessInstanceProcessor(BaseTest):
assert spiff_task is not None
assert spiff_task.state == TaskState.COMPLETED
def test_properly_saves_tasks_when_running(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_does_not_recreate_human_tasks_on_multiple_saves."""
self.create_process_group(
client, with_super_admin_user, "test_group", "test_group"
)
initiator_user = self.find_or_create_user("initiator_user")
finance_user_three = self.find_or_create_user("testuser3")
assert initiator_user.principal is not None
assert finance_user_three.principal is not None
AuthorizationService.import_permissions_from_yaml_file()
finance_group = GroupModel.query.filter_by(identifier="Finance Team").first()
assert finance_group is not None
process_model = load_test_spec(
process_model_id="test_group/manual_task",
bpmn_file_name="manual_task.bpmn",
process_model_source_directory="manual_task",
)
process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=initiator_user
)
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
assert len(process_instance.active_human_tasks) == 1
initial_human_task_id = process_instance.active_human_tasks[0].id
# save again to ensure we go attempt to process the human tasks again
processor.save()
assert len(process_instance.active_human_tasks) == 1
assert initial_human_task_id == process_instance.active_human_tasks[0].id
processor = ProcessInstanceProcessor(process_instance)
human_task_one = process_instance.active_human_tasks[0]
spiff_task = processor.__class__.get_task_by_bpmn_identifier(
human_task_one.task_name, processor.bpmn_process_instance
)
ProcessInstanceService.complete_form_task(
processor, spiff_task, {}, initiator_user, human_task_one
)
process_instance_relookup = ProcessInstanceModel.query.filter_by(
id=process_instance.id
).first()
processor = ProcessInstanceProcessor(process_instance_relookup)
assert process_instance_relookup.status == "complete"
task = TaskModel.query.filter_by(guid=human_task_one.task_id).first()
assert task.state == "COMPLETED"
end_event_spiff_task = processor.__class__.get_task_by_bpmn_identifier(
"end_event_of_manual_task_model", processor.bpmn_process_instance
)
assert end_event_spiff_task
assert end_event_spiff_task.state == TaskState.COMPLETED
# # NOTE: also check the spiff task from the new processor
def test_does_not_recreate_human_tasks_on_multiple_saves(
self,
app: Flask,