added some framework stuff to run process model unit tests w/ burnettk

This commit is contained in:
jasquat 2023-05-16 17:24:22 -04:00
parent 256492aa55
commit c2083103e4
8 changed files with 313 additions and 55 deletions

View File

@ -49,13 +49,12 @@ class FileSystemService:
"""Id_string_to_relative_path."""
return id_string.replace("/", os.sep)
@staticmethod
def process_group_path(name: str) -> str:
"""Category_path."""
@classmethod
def full_path_from_id(cls, id: str) -> str:
return os.path.abspath(
os.path.join(
FileSystemService.root_path(),
FileSystemService.id_string_to_relative_path(name),
cls.root_path(),
cls.id_string_to_relative_path(id),
)
)
@ -65,36 +64,33 @@ class FileSystemService:
return os.path.join(FileSystemService.root_path(), relative_path)
@staticmethod
def process_model_relative_path(spec: ProcessModelInfo) -> str:
def process_model_relative_path(process_model: ProcessModelInfo) -> str:
"""Get the file path to a process model relative to SPIFFWORKFLOW_BACKEND_BPMN_SPEC_ABSOLUTE_DIR.
If the full path is /path/to/process-group-a/group-b/process-model-a, it will return:
process-group-a/group-b/process-model-a
"""
workflow_path = FileSystemService.workflow_path(spec)
workflow_path = FileSystemService.process_model_full_path(process_model)
return os.path.relpath(workflow_path, start=FileSystemService.root_path())
@staticmethod
def process_group_path_for_spec(spec: ProcessModelInfo) -> str:
"""Category_path_for_spec."""
def process_group_path_for_spec(process_model: ProcessModelInfo) -> str:
# os.path.split apparently returns 2 element tulple like: (first/path, last_item)
process_group_id, _ = os.path.split(spec.id_for_file_path())
return FileSystemService.process_group_path(process_group_id)
process_group_id, _ = os.path.split(process_model.id_for_file_path())
return FileSystemService.full_path_from_id(process_group_id)
@classmethod
def process_model_full_path(cls, process_model: ProcessModelInfo) -> str:
return cls.full_path_from_id(process_model.id)
@staticmethod
def workflow_path(spec: ProcessModelInfo) -> str:
"""Workflow_path."""
process_model_path = os.path.join(FileSystemService.root_path(), spec.id_for_file_path())
return process_model_path
@staticmethod
def full_path_to_process_model_file(spec: ProcessModelInfo) -> str:
def full_path_to_process_model_file(process_model: ProcessModelInfo) -> str:
"""Full_path_to_process_model_file."""
return os.path.join(FileSystemService.workflow_path(spec), spec.primary_file_name) # type: ignore
return os.path.join(FileSystemService.process_model_full_path(process_model), process_model.primary_file_name) # type: ignore
def next_display_order(self, spec: ProcessModelInfo) -> int:
def next_display_order(self, process_model: ProcessModelInfo) -> int:
"""Next_display_order."""
path = self.process_group_path_for_spec(spec)
path = self.process_group_path_for_spec(process_model)
if os.path.exists(path):
return len(next(os.walk(path))[1])
else:

View File

@ -60,12 +60,7 @@ class ProcessModelService(FileSystemService):
def is_process_group_identifier(cls, process_group_identifier: str) -> bool:
"""Is_process_group_identifier."""
if os.path.exists(FileSystemService.root_path()):
process_group_path = os.path.abspath(
os.path.join(
FileSystemService.root_path(),
FileSystemService.id_string_to_relative_path(process_group_identifier),
)
)
process_group_path = FileSystemService.full_path_from_id(process_group_identifier)
return cls.is_process_group(process_group_path)
return False
@ -82,12 +77,7 @@ class ProcessModelService(FileSystemService):
def is_process_model_identifier(cls, process_model_identifier: str) -> bool:
"""Is_process_model_identifier."""
if os.path.exists(FileSystemService.root_path()):
process_model_path = os.path.abspath(
os.path.join(
FileSystemService.root_path(),
FileSystemService.id_string_to_relative_path(process_model_identifier),
)
)
process_model_path = FileSystemService.full_path_from_id(process_model_identifier)
return cls.is_process_model(process_model_path)
return False
@ -149,13 +139,13 @@ class ProcessModelService(FileSystemService):
f"We cannot delete the model `{process_model_id}`, there are existing instances that depend on it."
)
process_model = self.get_process_model(process_model_id)
path = self.workflow_path(process_model)
path = self.process_model_full_path(process_model)
shutil.rmtree(path)
def process_model_move(self, original_process_model_id: str, new_location: str) -> ProcessModelInfo:
"""Process_model_move."""
process_model = self.get_process_model(original_process_model_id)
original_model_path = self.workflow_path(process_model)
original_model_path = self.process_model_full_path(process_model)
_, model_id = os.path.split(original_model_path)
new_relative_path = os.path.join(new_location, model_id)
new_model_path = os.path.abspath(os.path.join(FileSystemService.root_path(), new_relative_path))
@ -314,12 +304,7 @@ class ProcessModelService(FileSystemService):
def get_process_group(cls, process_group_id: str, find_direct_nested_items: bool = True) -> ProcessGroup:
"""Look for a given process_group, and return it."""
if os.path.exists(FileSystemService.root_path()):
process_group_path = os.path.abspath(
os.path.join(
FileSystemService.root_path(),
FileSystemService.id_string_to_relative_path(process_group_id),
)
)
process_group_path = FileSystemService.full_path_from_id(process_group_id)
if cls.is_process_group(process_group_path):
return cls.find_or_create_process_group(
process_group_path,
@ -336,7 +321,7 @@ class ProcessModelService(FileSystemService):
@classmethod
def update_process_group(cls, process_group: ProcessGroup) -> ProcessGroup:
"""Update_process_group."""
cat_path = cls.process_group_path(process_group.id)
cat_path = cls.full_path_from_id(process_group.id)
os.makedirs(cat_path, exist_ok=True)
json_path = os.path.join(cat_path, cls.PROCESS_GROUP_JSON_FILE)
serialized_process_group = process_group.serialized
@ -348,7 +333,7 @@ class ProcessModelService(FileSystemService):
def process_group_move(self, original_process_group_id: str, new_location: str) -> ProcessGroup:
"""Process_group_move."""
original_group_path = self.process_group_path(original_process_group_id)
original_group_path = self.full_path_from_id(original_process_group_id)
_, original_group_id = os.path.split(original_group_path)
new_root = os.path.join(FileSystemService.root_path(), new_location)
new_group_path = os.path.abspath(os.path.join(FileSystemService.root_path(), new_root, original_group_id))
@ -370,7 +355,7 @@ class ProcessModelService(FileSystemService):
def process_group_delete(self, process_group_id: str) -> None:
"""Delete_process_group."""
problem_models = []
path = self.process_group_path(process_group_id)
path = self.full_path_from_id(process_group_id)
if os.path.exists(path):
nested_models = self.__get_all_nested_models(path)
for process_model in nested_models:

View File

@ -0,0 +1,180 @@
from typing import List, Optional
from dataclasses import dataclass
import json
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.task import TaskState
from lxml import etree # type: ignore
from spiffworkflow_backend.services.spec_file_service import SpecFileService
from spiffworkflow_backend.services.custom_parser import MyCustomParser
from typing import Callable
import re
import glob
from spiffworkflow_backend.models.process_model import ProcessModelInfo
import os
from spiffworkflow_backend.services.file_system_service import FileSystemService
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore
# workflow json for test case
# 1. default action is load xml from disk and use spiff like normal and get back workflow json
# 2. do stuff from disk cache
# find all process models
# find all json test cases for each
# for each test case, fire up something like spiff
# for each task, if there is something special in the test case definition, do it (provide data for user task, mock service task, etc)
# when the thing is complete, check workflow data against expected data
class UnrunnableTestCaseError(Exception):
pass
@dataclass
class TestCaseResult:
passed: bool
test_case_name: str
error: Optional[str] = None
# input:
# json_file:
# {
# [TEST_CASE_NAME]: {
# "tasks": {
# [BPMN_TASK_IDENTIIFER]: [DATA]
# },
# "expected_output_json": [DATA]
# }
# }
class ProcessModelTestRunner:
"""Generic test runner code. May move into own library at some point.
KEEP THIS GENERIC. do not add backend specific code here.
"""
def __init__(
self,
process_model_directory_path: str,
instantiate_executer_callback: Callable[[str], any],
execute_task_callback: Callable[[any, Optional[dict]], any],
get_next_task_callback: Callable[[any], any],
) -> None:
self.process_model_directory_path = process_model_directory_path
self.test_mappings = self._discover_process_model_directories()
self.instantiate_executer_callback = instantiate_executer_callback
self.execute_task_callback = execute_task_callback
self.get_next_task_callback = get_next_task_callback
self.test_case_results = []
def all_test_cases_passed(self) -> bool:
failed_tests = [t for t in self.test_case_results if t.passed is False]
return len(failed_tests) < 1
def run(self) -> None:
for json_test_case_file, bpmn_file in self.test_mappings.items():
with open(json_test_case_file, 'rt') as f:
json_file_contents = json.loads(f.read())
for test_case_name, test_case_contents in json_file_contents.items():
try:
self.run_test_case(bpmn_file, test_case_name, test_case_contents)
except Exception as ex:
self.test_case_results.append(TestCaseResult(
passed=False,
test_case_name=test_case_name,
error=f"Syntax error: {str(ex)}",
))
def run_test_case(self, bpmn_file: str, test_case_name: str, test_case_contents: dict) -> None:
bpmn_process_instance = self.instantiate_executer_callback(bpmn_file)
next_task = self.get_next_task_callback(bpmn_process_instance)
while next_task is not None:
test_case_json = None
if 'tasks' in test_case_contents:
if next_task.task_spec.bpmn_id in test_case_contents['tasks']:
test_case_json = test_case_contents['tasks'][next_task.task_spec.bpmn_id]
task_type = next_task.task_spec.__class__.__name__
if task_type in ["ServiceTask", "UserTask"] and test_case_json is None:
raise UnrunnableTestCaseError(
f"Cannot run test case '{test_case_name}'. It requires task data for {next_task.task_spec.bpmn_id} because it is of type '{task_type}'"
)
self.execute_task_callback(next_task, test_case_json)
next_task = self.get_next_task_callback(bpmn_process_instance)
test_passed = test_case_contents['expected_output_json'] == bpmn_process_instance.data
self.test_case_results.append(TestCaseResult(
passed=test_passed,
test_case_name=test_case_name,
))
def _discover_process_model_directories(
self,
) -> dict[str, str]:
test_mappings = {}
json_test_file_glob = os.path.join(self.process_model_directory_path, "**", "test_*.json")
for file in glob.glob(json_test_file_glob, recursive=True):
file_dir = os.path.dirname(file)
json_file_name = os.path.basename(file)
bpmn_file_name = re.sub(r'^test_(.*)\.json', r'\1.bpmn', json_file_name)
bpmn_file_path = os.path.join(file_dir, bpmn_file_name)
if os.path.isfile(bpmn_file_path):
test_mappings[file] = bpmn_file_path
return test_mappings
class BpmnFileMissingExecutableProcessError(Exception):
pass
class ProcessModelTestRunnerService:
def __init__(
self,
process_model_directory_path: str
) -> None:
self.process_model_test_runner = ProcessModelTestRunner(
process_model_directory_path,
instantiate_executer_callback=self._instantiate_executer_callback,
execute_task_callback=self._execute_task_callback,
get_next_task_callback=self._get_next_task_callback,
)
def run(self) -> None:
self.process_model_test_runner.run()
def _execute_task_callback(self, spiff_task: SpiffTask, _test_case_json: Optional[dict]) -> None:
spiff_task.run()
def _get_next_task_callback(self, bpmn_process_instance: BpmnWorkflow) -> Optional[SpiffTask]:
engine_steps = self._get_ready_engine_steps(bpmn_process_instance)
if len(engine_steps) > 0:
return engine_steps[0]
return None
def _get_ready_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> list[SpiffTask]:
tasks = list([t for t in bpmn_process_instance.get_tasks(TaskState.READY) if not t.task_spec.manual])
if len(tasks) > 0:
tasks = [tasks[0]]
return tasks
def _instantiate_executer_callback(self, bpmn_file) -> BpmnWorkflow:
parser = MyCustomParser()
data = None
with open(bpmn_file, "rb") as f_handle:
data = f_handle.read()
bpmn: etree.Element = SpecFileService.get_etree_from_xml_bytes(data)
parser.add_bpmn_xml(bpmn, filename=os.path.basename(bpmn_file))
sub_parsers = list(parser.process_parsers.values())
executable_process = None
for sub_parser in sub_parsers:
if sub_parser.process_executable:
executable_process = sub_parser.bpmn_id
if executable_process is None:
raise BpmnFileMissingExecutableProcessError(f"Executable process cannot be found in {bpmn_file}. Test cannot run.")
bpmn_process_spec = parser.get_spec(executable_process)
bpmn_process_instance = BpmnWorkflow(bpmn_process_spec)
return bpmn_process_instance

View File

@ -221,37 +221,37 @@ class SpecFileService(FileSystemService):
return spec_file_data
@staticmethod
def full_file_path(spec: ProcessModelInfo, file_name: str) -> str:
def full_file_path(process_model: ProcessModelInfo, file_name: str) -> str:
"""File_path."""
return os.path.abspath(os.path.join(SpecFileService.workflow_path(spec), file_name))
return os.path.abspath(os.path.join(SpecFileService.process_model_full_path(process_model), file_name))
@staticmethod
def last_modified(spec: ProcessModelInfo, file_name: str) -> datetime:
def last_modified(process_model: ProcessModelInfo, file_name: str) -> datetime:
"""Last_modified."""
full_file_path = SpecFileService.full_file_path(spec, file_name)
full_file_path = SpecFileService.full_file_path(process_model, file_name)
return FileSystemService._last_modified(full_file_path)
@staticmethod
def timestamp(spec: ProcessModelInfo, file_name: str) -> float:
def timestamp(process_model: ProcessModelInfo, file_name: str) -> float:
"""Timestamp."""
full_file_path = SpecFileService.full_file_path(spec, file_name)
full_file_path = SpecFileService.full_file_path(process_model, file_name)
return FileSystemService._timestamp(full_file_path)
@staticmethod
def delete_file(spec: ProcessModelInfo, file_name: str) -> None:
def delete_file(process_model: ProcessModelInfo, file_name: str) -> None:
"""Delete_file."""
# Fixme: Remember to remove the lookup files when the spec file is removed.
# Fixme: Remember to remove the lookup files when the process_model file is removed.
# lookup_files = session.query(LookupFileModel).filter_by(file_model_id=file_id).all()
# for lf in lookup_files:
# session.query(LookupDataModel).filter_by(lookup_file_model_id=lf.id).delete()
# session.query(LookupFileModel).filter_by(id=lf.id).delete()
full_file_path = SpecFileService.full_file_path(spec, file_name)
full_file_path = SpecFileService.full_file_path(process_model, file_name)
os.remove(full_file_path)
@staticmethod
def delete_all_files(spec: ProcessModelInfo) -> None:
def delete_all_files(process_model: ProcessModelInfo) -> None:
"""Delete_all_files."""
dir_path = SpecFileService.workflow_path(spec)
dir_path = SpecFileService.process_model_full_path(process_model)
if os.path.exists(dir_path):
shutil.rmtree(dir_path)

View File

@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" id="Definitions_96f6665" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.0.0-dev">
<bpmn:process id="Process_Script_Task" isExecutable="true">
<bpmn:startEvent id="StartEvent_1">
<bpmn:outgoing>Flow_0qfycuk</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_0qfycuk" sourceRef="StartEvent_1" targetRef="Activity_1qdbp6x" />
<bpmn:endEvent id="Event_1kumwb5">
<bpmn:incoming>Flow_1auiekw</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_1auiekw" sourceRef="Activity_1qdbp6x" targetRef="Event_1kumwb5" />
<bpmn:scriptTask id="Activity_1qdbp6x" name="Script">
<bpmn:incoming>Flow_0qfycuk</bpmn:incoming>
<bpmn:outgoing>Flow_1auiekw</bpmn:outgoing>
<bpmn:script>a = 1</bpmn:script>
</bpmn:scriptTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Process_Script_Task">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1kumwb5_di" bpmnElement="Event_1kumwb5">
<dc:Bounds x="432" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0ii0b3p_di" bpmnElement="Activity_1qdbp6x">
<dc:Bounds x="270" y="137" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_0qfycuk_di" bpmnElement="Flow_0qfycuk">
<di:waypoint x="215" y="177" />
<di:waypoint x="270" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1auiekw_di" bpmnElement="Flow_1auiekw">
<di:waypoint x="370" y="177" />
<di:waypoint x="432" y="177" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>

View File

@ -0,0 +1,11 @@
{
"description": "",
"display_name": "Script Task",
"display_order": 0,
"exception_notification_addresses": [],
"fault_or_suspend_on_exception": "fault",
"files": [],
"metadata_extraction_paths": null,
"primary_file_name": "Script.bpmn",
"primary_process_id": "Process_Script_Task"
}

View File

@ -0,0 +1,5 @@
{
"test_case_one": {
"expected_output_json": { "a": 1 }
}
}

View File

@ -0,0 +1,42 @@
from flask import Flask
import pytest
import os
from flask import current_app
from spiffworkflow_backend.models.process_model import ProcessModelInfo
from spiffworkflow_backend.services.file_system_service import FileSystemService
from spiffworkflow_backend.services.process_model_service import ProcessModelService
from spiffworkflow_backend.services.process_model_test_runner_service import ProcessModelTestRunner, ProcessModelTestRunnerService
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from pytest_mock import MockerFixture
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
from spiffworkflow_backend.models.bpmn_process_definition import BpmnProcessDefinitionModel
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.task_definition import TaskDefinitionModel
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
from spiffworkflow_backend.services.task_service import TaskService
class TestProcessModelTestRunnerService(BaseTest):
def test_can_test_a_simple_process_model(
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_mocked_root_path: any,
) -> None:
test_runner_service = ProcessModelTestRunnerService(os.path.join(FileSystemService.root_path(), 'basic_script_task'))
test_runner_service.run()
assert test_runner_service.process_model_test_runner.all_test_cases_passed()
@pytest.fixture()
def with_mocked_root_path(self, mocker: MockerFixture) -> None:
path = os.path.join(
current_app.instance_path,
"..",
"..",
"tests",
"data",
"bpmn_unit_test_process_models",
)
mocker.patch.object(FileSystemService, attribute='root_path', return_value=path)