This commit is contained in:
jasquat 2023-05-17 10:01:11 -04:00
parent 97e0951ddf
commit b4b1ef52c8
No known key found for this signature in database
5 changed files with 73 additions and 64 deletions

View File

@ -40,7 +40,8 @@ def setup_database_configs(app: Flask) -> None:
if pool_size is not None:
pool_size = int(pool_size)
else:
# this one doesn't come from app config and isn't documented in default.py because we don't want to give people the impression
# this one doesn't come from app config and isn't documented in default.py
# because we don't want to give people the impression
# that setting it in flask python configs will work. on the contrary, it's used by a bash
# script that starts the backend, so it can only be set in the environment.
threads_per_worker_config = os.environ.get("SPIFFWORKFLOW_BACKEND_THREADS_PER_WORKER")
@ -50,8 +51,9 @@ def setup_database_configs(app: Flask) -> None:
# this is a sqlalchemy default, if we don't have any better ideas
pool_size = 5
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {}
app.config['SQLALCHEMY_ENGINE_OPTIONS']['pool_size'] = pool_size
app.config["SQLALCHEMY_ENGINE_OPTIONS"] = {}
app.config["SQLALCHEMY_ENGINE_OPTIONS"]["pool_size"] = pool_size
def load_config_file(app: Flask, env_config_module: str) -> None:
"""Load_config_file."""

View File

@ -86,7 +86,9 @@ class FileSystemService:
@staticmethod
def full_path_to_process_model_file(process_model: ProcessModelInfo) -> str:
"""Full_path_to_process_model_file."""
return os.path.join(FileSystemService.process_model_full_path(process_model), process_model.primary_file_name) # type: ignore
return os.path.join(
FileSystemService.process_model_full_path(process_model), process_model.primary_file_name # type: ignore
)
def next_display_order(self, process_model: ProcessModelInfo) -> int:
"""Next_display_order."""

View File

@ -1,18 +1,19 @@
from typing import List, Optional
from dataclasses import dataclass
import glob
import json
import os
import re
from dataclasses import dataclass
from typing import Any
from typing import Callable
from typing import Optional
from lxml import etree # type: ignore
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.task import TaskState
from lxml import etree # type: ignore
from spiffworkflow_backend.services.spec_file_service import SpecFileService
from spiffworkflow_backend.services.custom_parser import MyCustomParser
from typing import Callable
import re
import glob
from spiffworkflow_backend.models.process_model import ProcessModelInfo
import os
from spiffworkflow_backend.services.file_system_service import FileSystemService
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore
from spiffworkflow_backend.services.spec_file_service import SpecFileService
# workflow json for test case
@ -22,7 +23,8 @@ from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore
# find all process models
# find all json test cases for each
# for each test case, fire up something like spiff
# for each task, if there is something special in the test case definition, do it (provide data for user task, mock service task, etc)
# for each task, if there is something special in the test case definition,
# do it (provide data for user task, mock service task, etc)
# when the thing is complete, check workflow data against expected data
@ -56,12 +58,13 @@ class ProcessModelTestRunner:
KEEP THIS GENERIC. do not add backend specific code here.
"""
def __init__(
self,
process_model_directory_path: str,
instantiate_executer_callback: Callable[[str], any],
execute_task_callback: Callable[[any, Optional[dict]], any],
get_next_task_callback: Callable[[any], any],
instantiate_executer_callback: Callable[[str], Any],
execute_task_callback: Callable[[Any, Optional[dict]], Any],
get_next_task_callback: Callable[[Any], Any],
) -> None:
self.process_model_directory_path = process_model_directory_path
self.test_mappings = self._discover_process_model_directories()
@ -69,7 +72,7 @@ class ProcessModelTestRunner:
self.execute_task_callback = execute_task_callback
self.get_next_task_callback = get_next_task_callback
self.test_case_results = []
self.test_case_results: list[TestCaseResult] = []
def all_test_cases_passed(self) -> bool:
failed_tests = [t for t in self.test_case_results if t.passed is False]
@ -77,40 +80,45 @@ class ProcessModelTestRunner:
def run(self) -> None:
for json_test_case_file, bpmn_file in self.test_mappings.items():
with open(json_test_case_file, 'rt') as f:
with open(json_test_case_file) as f:
json_file_contents = json.loads(f.read())
for test_case_name, test_case_contents in json_file_contents.items():
try:
self.run_test_case(bpmn_file, test_case_name, test_case_contents)
except Exception as ex:
self.test_case_results.append(TestCaseResult(
self.test_case_results.append(
TestCaseResult(
passed=False,
test_case_name=test_case_name,
error=f"Syntax error: {str(ex)}",
))
)
)
def run_test_case(self, bpmn_file: str, test_case_name: str, test_case_contents: dict) -> None:
bpmn_process_instance = self.instantiate_executer_callback(bpmn_file)
next_task = self.get_next_task_callback(bpmn_process_instance)
while next_task is not None:
test_case_json = None
if 'tasks' in test_case_contents:
if next_task.task_spec.bpmn_id in test_case_contents['tasks']:
test_case_json = test_case_contents['tasks'][next_task.task_spec.bpmn_id]
if "tasks" in test_case_contents:
if next_task.task_spec.bpmn_id in test_case_contents["tasks"]:
test_case_json = test_case_contents["tasks"][next_task.task_spec.bpmn_id]
task_type = next_task.task_spec.__class__.__name__
if task_type in ["ServiceTask", "UserTask"] and test_case_json is None:
raise UnrunnableTestCaseError(
f"Cannot run test case '{test_case_name}'. It requires task data for {next_task.task_spec.bpmn_id} because it is of type '{task_type}'"
f"Cannot run test case '{test_case_name}'. It requires task data for"
f" {next_task.task_spec.bpmn_id} because it is of type '{task_type}'"
)
self.execute_task_callback(next_task, test_case_json)
next_task = self.get_next_task_callback(bpmn_process_instance)
test_passed = test_case_contents['expected_output_json'] == bpmn_process_instance.data
self.test_case_results.append(TestCaseResult(
test_passed = test_case_contents["expected_output_json"] == bpmn_process_instance.data
self.test_case_results.append(
TestCaseResult(
passed=test_passed,
test_case_name=test_case_name,
))
)
)
def _discover_process_model_directories(
self,
@ -122,12 +130,14 @@ class ProcessModelTestRunner:
for file in glob.glob(json_test_file_glob, recursive=True):
file_dir = os.path.dirname(file)
json_file_name = os.path.basename(file)
bpmn_file_name = re.sub(r'^test_(.*)\.json', r'\1.bpmn', json_file_name)
bpmn_file_name = re.sub(r"^test_(.*)\.json", r"\1.bpmn", json_file_name)
bpmn_file_path = os.path.join(file_dir, bpmn_file_name)
if os.path.isfile(bpmn_file_path):
test_mappings[file] = bpmn_file_path
else:
raise MissingBpmnFileForTestCaseError(f"Cannot find a matching bpmn file for test case json file: '{file}'")
raise MissingBpmnFileForTestCaseError(
f"Cannot find a matching bpmn file for test case json file: '{file}'"
)
return test_mappings
@ -136,10 +146,7 @@ class BpmnFileMissingExecutableProcessError(Exception):
class ProcessModelTestRunnerService:
def __init__(
self,
process_model_directory_path: str
) -> None:
def __init__(self, process_model_directory_path: str) -> None:
self.process_model_test_runner = ProcessModelTestRunner(
process_model_directory_path,
instantiate_executer_callback=self._instantiate_executer_callback,
@ -167,7 +174,7 @@ class ProcessModelTestRunnerService:
return tasks
def _instantiate_executer_callback(self, bpmn_file) -> BpmnWorkflow:
def _instantiate_executer_callback(self, bpmn_file: str) -> BpmnWorkflow:
parser = MyCustomParser()
data = None
with open(bpmn_file, "rb") as f_handle:
@ -180,7 +187,9 @@ class ProcessModelTestRunnerService:
if sub_parser.process_executable:
executable_process = sub_parser.bpmn_id
if executable_process is None:
raise BpmnFileMissingExecutableProcessError(f"Executable process cannot be found in {bpmn_file}. Test cannot run.")
raise BpmnFileMissingExecutableProcessError(
f"Executable process cannot be found in {bpmn_file}. Test cannot run."
)
bpmn_process_spec = parser.get_spec(executable_process)
bpmn_process_instance = BpmnWorkflow(bpmn_process_spec)
return bpmn_process_instance

View File

@ -1,21 +1,15 @@
from flask import Flask
import pytest
import os
from flask import current_app
from spiffworkflow_backend.models.process_model import ProcessModelInfo
from spiffworkflow_backend.services.file_system_service import FileSystemService
from spiffworkflow_backend.services.process_model_service import ProcessModelService
from spiffworkflow_backend.services.process_model_test_runner_service import ProcessModelTestRunner, ProcessModelTestRunnerService
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from pytest_mock import MockerFixture
from typing import Any
import pytest
from flask import current_app
from flask import Flask
from pytest_mock import MockerFixture
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
from spiffworkflow_backend.models.bpmn_process_definition import BpmnProcessDefinitionModel
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.task_definition import TaskDefinitionModel
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
from spiffworkflow_backend.services.task_service import TaskService
from spiffworkflow_backend.services.file_system_service import FileSystemService
from spiffworkflow_backend.services.process_model_test_runner_service import ProcessModelTestRunnerService
class TestProcessModelTestRunnerService(BaseTest):
@ -23,9 +17,11 @@ class TestProcessModelTestRunnerService(BaseTest):
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_mocked_root_path: any,
with_mocked_root_path: Any,
) -> None:
test_runner_service = ProcessModelTestRunnerService(os.path.join(FileSystemService.root_path(), 'basic_script_task'))
test_runner_service = ProcessModelTestRunnerService(
os.path.join(FileSystemService.root_path(), "basic_script_task")
)
test_runner_service.run()
assert test_runner_service.process_model_test_runner.all_test_cases_passed()
@ -33,7 +29,7 @@ class TestProcessModelTestRunnerService(BaseTest):
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_mocked_root_path: any,
with_mocked_root_path: Any,
) -> None:
test_runner_service = ProcessModelTestRunnerService(FileSystemService.root_path())
test_runner_service.run()
@ -49,4 +45,4 @@ class TestProcessModelTestRunnerService(BaseTest):
"data",
"bpmn_unit_test_process_models",
)
mocker.patch.object(FileSystemService, attribute='root_path', return_value=path)
mocker.patch.object(FileSystemService, attribute="root_path", return_value=path)