mirror of
https://github.com/status-im/spiff-arena.git
synced 2025-01-14 12:14:46 +00:00
pyl
This commit is contained in:
parent
5d7b183150
commit
3d35dc6213
@ -40,7 +40,8 @@ def setup_database_configs(app: Flask) -> None:
|
||||
if pool_size is not None:
|
||||
pool_size = int(pool_size)
|
||||
else:
|
||||
# this one doesn't come from app config and isn't documented in default.py because we don't want to give people the impression
|
||||
# this one doesn't come from app config and isn't documented in default.py
|
||||
# because we don't want to give people the impression
|
||||
# that setting it in flask python configs will work. on the contrary, it's used by a bash
|
||||
# script that starts the backend, so it can only be set in the environment.
|
||||
threads_per_worker_config = os.environ.get("SPIFFWORKFLOW_BACKEND_THREADS_PER_WORKER")
|
||||
@ -50,8 +51,9 @@ def setup_database_configs(app: Flask) -> None:
|
||||
# this is a sqlalchemy default, if we don't have any better ideas
|
||||
pool_size = 5
|
||||
|
||||
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {}
|
||||
app.config['SQLALCHEMY_ENGINE_OPTIONS']['pool_size'] = pool_size
|
||||
app.config["SQLALCHEMY_ENGINE_OPTIONS"] = {}
|
||||
app.config["SQLALCHEMY_ENGINE_OPTIONS"]["pool_size"] = pool_size
|
||||
|
||||
|
||||
def load_config_file(app: Flask, env_config_module: str) -> None:
|
||||
"""Load_config_file."""
|
||||
|
@ -86,7 +86,9 @@ class FileSystemService:
|
||||
@staticmethod
|
||||
def full_path_to_process_model_file(process_model: ProcessModelInfo) -> str:
|
||||
"""Full_path_to_process_model_file."""
|
||||
return os.path.join(FileSystemService.process_model_full_path(process_model), process_model.primary_file_name) # type: ignore
|
||||
return os.path.join(
|
||||
FileSystemService.process_model_full_path(process_model), process_model.primary_file_name # type: ignore
|
||||
)
|
||||
|
||||
def next_display_order(self, process_model: ProcessModelInfo) -> int:
|
||||
"""Next_display_order."""
|
||||
|
@ -1,18 +1,19 @@
|
||||
from typing import List, Optional
|
||||
from dataclasses import dataclass
|
||||
import json
|
||||
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
|
||||
from SpiffWorkflow.task import TaskState
|
||||
from lxml import etree # type: ignore
|
||||
from spiffworkflow_backend.services.spec_file_service import SpecFileService
|
||||
from spiffworkflow_backend.services.custom_parser import MyCustomParser
|
||||
from typing import Callable
|
||||
import re
|
||||
import glob
|
||||
from spiffworkflow_backend.models.process_model import ProcessModelInfo
|
||||
import json
|
||||
import os
|
||||
from spiffworkflow_backend.services.file_system_service import FileSystemService
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
from typing import Callable
|
||||
from typing import Optional
|
||||
|
||||
from lxml import etree # type: ignore
|
||||
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore
|
||||
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
|
||||
from SpiffWorkflow.task import TaskState
|
||||
|
||||
from spiffworkflow_backend.services.custom_parser import MyCustomParser
|
||||
from spiffworkflow_backend.services.spec_file_service import SpecFileService
|
||||
|
||||
|
||||
# workflow json for test case
|
||||
@ -22,7 +23,8 @@ from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore
|
||||
# find all process models
|
||||
# find all json test cases for each
|
||||
# for each test case, fire up something like spiff
|
||||
# for each task, if there is something special in the test case definition, do it (provide data for user task, mock service task, etc)
|
||||
# for each task, if there is something special in the test case definition,
|
||||
# do it (provide data for user task, mock service task, etc)
|
||||
# when the thing is complete, check workflow data against expected data
|
||||
|
||||
|
||||
@ -56,12 +58,13 @@ class ProcessModelTestRunner:
|
||||
|
||||
KEEP THIS GENERIC. do not add backend specific code here.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
process_model_directory_path: str,
|
||||
instantiate_executer_callback: Callable[[str], any],
|
||||
execute_task_callback: Callable[[any, Optional[dict]], any],
|
||||
get_next_task_callback: Callable[[any], any],
|
||||
instantiate_executer_callback: Callable[[str], Any],
|
||||
execute_task_callback: Callable[[Any, Optional[dict]], Any],
|
||||
get_next_task_callback: Callable[[Any], Any],
|
||||
) -> None:
|
||||
self.process_model_directory_path = process_model_directory_path
|
||||
self.test_mappings = self._discover_process_model_directories()
|
||||
@ -69,7 +72,7 @@ class ProcessModelTestRunner:
|
||||
self.execute_task_callback = execute_task_callback
|
||||
self.get_next_task_callback = get_next_task_callback
|
||||
|
||||
self.test_case_results = []
|
||||
self.test_case_results: list[TestCaseResult] = []
|
||||
|
||||
def all_test_cases_passed(self) -> bool:
|
||||
failed_tests = [t for t in self.test_case_results if t.passed is False]
|
||||
@ -77,40 +80,45 @@ class ProcessModelTestRunner:
|
||||
|
||||
def run(self) -> None:
|
||||
for json_test_case_file, bpmn_file in self.test_mappings.items():
|
||||
with open(json_test_case_file, 'rt') as f:
|
||||
with open(json_test_case_file) as f:
|
||||
json_file_contents = json.loads(f.read())
|
||||
|
||||
for test_case_name, test_case_contents in json_file_contents.items():
|
||||
try:
|
||||
self.run_test_case(bpmn_file, test_case_name, test_case_contents)
|
||||
except Exception as ex:
|
||||
self.test_case_results.append(TestCaseResult(
|
||||
passed=False,
|
||||
test_case_name=test_case_name,
|
||||
error=f"Syntax error: {str(ex)}",
|
||||
))
|
||||
self.test_case_results.append(
|
||||
TestCaseResult(
|
||||
passed=False,
|
||||
test_case_name=test_case_name,
|
||||
error=f"Syntax error: {str(ex)}",
|
||||
)
|
||||
)
|
||||
|
||||
def run_test_case(self, bpmn_file: str, test_case_name: str, test_case_contents: dict) -> None:
|
||||
bpmn_process_instance = self.instantiate_executer_callback(bpmn_file)
|
||||
next_task = self.get_next_task_callback(bpmn_process_instance)
|
||||
while next_task is not None:
|
||||
test_case_json = None
|
||||
if 'tasks' in test_case_contents:
|
||||
if next_task.task_spec.bpmn_id in test_case_contents['tasks']:
|
||||
test_case_json = test_case_contents['tasks'][next_task.task_spec.bpmn_id]
|
||||
if "tasks" in test_case_contents:
|
||||
if next_task.task_spec.bpmn_id in test_case_contents["tasks"]:
|
||||
test_case_json = test_case_contents["tasks"][next_task.task_spec.bpmn_id]
|
||||
|
||||
task_type = next_task.task_spec.__class__.__name__
|
||||
if task_type in ["ServiceTask", "UserTask"] and test_case_json is None:
|
||||
raise UnrunnableTestCaseError(
|
||||
f"Cannot run test case '{test_case_name}'. It requires task data for {next_task.task_spec.bpmn_id} because it is of type '{task_type}'"
|
||||
f"Cannot run test case '{test_case_name}'. It requires task data for"
|
||||
f" {next_task.task_spec.bpmn_id} because it is of type '{task_type}'"
|
||||
)
|
||||
self.execute_task_callback(next_task, test_case_json)
|
||||
next_task = self.get_next_task_callback(bpmn_process_instance)
|
||||
test_passed = test_case_contents['expected_output_json'] == bpmn_process_instance.data
|
||||
self.test_case_results.append(TestCaseResult(
|
||||
passed=test_passed,
|
||||
test_case_name=test_case_name,
|
||||
))
|
||||
test_passed = test_case_contents["expected_output_json"] == bpmn_process_instance.data
|
||||
self.test_case_results.append(
|
||||
TestCaseResult(
|
||||
passed=test_passed,
|
||||
test_case_name=test_case_name,
|
||||
)
|
||||
)
|
||||
|
||||
def _discover_process_model_directories(
|
||||
self,
|
||||
@ -122,12 +130,14 @@ class ProcessModelTestRunner:
|
||||
for file in glob.glob(json_test_file_glob, recursive=True):
|
||||
file_dir = os.path.dirname(file)
|
||||
json_file_name = os.path.basename(file)
|
||||
bpmn_file_name = re.sub(r'^test_(.*)\.json', r'\1.bpmn', json_file_name)
|
||||
bpmn_file_name = re.sub(r"^test_(.*)\.json", r"\1.bpmn", json_file_name)
|
||||
bpmn_file_path = os.path.join(file_dir, bpmn_file_name)
|
||||
if os.path.isfile(bpmn_file_path):
|
||||
test_mappings[file] = bpmn_file_path
|
||||
else:
|
||||
raise MissingBpmnFileForTestCaseError(f"Cannot find a matching bpmn file for test case json file: '{file}'")
|
||||
raise MissingBpmnFileForTestCaseError(
|
||||
f"Cannot find a matching bpmn file for test case json file: '{file}'"
|
||||
)
|
||||
return test_mappings
|
||||
|
||||
|
||||
@ -136,10 +146,7 @@ class BpmnFileMissingExecutableProcessError(Exception):
|
||||
|
||||
|
||||
class ProcessModelTestRunnerService:
|
||||
def __init__(
|
||||
self,
|
||||
process_model_directory_path: str
|
||||
) -> None:
|
||||
def __init__(self, process_model_directory_path: str) -> None:
|
||||
self.process_model_test_runner = ProcessModelTestRunner(
|
||||
process_model_directory_path,
|
||||
instantiate_executer_callback=self._instantiate_executer_callback,
|
||||
@ -167,7 +174,7 @@ class ProcessModelTestRunnerService:
|
||||
|
||||
return tasks
|
||||
|
||||
def _instantiate_executer_callback(self, bpmn_file) -> BpmnWorkflow:
|
||||
def _instantiate_executer_callback(self, bpmn_file: str) -> BpmnWorkflow:
|
||||
parser = MyCustomParser()
|
||||
data = None
|
||||
with open(bpmn_file, "rb") as f_handle:
|
||||
@ -180,7 +187,9 @@ class ProcessModelTestRunnerService:
|
||||
if sub_parser.process_executable:
|
||||
executable_process = sub_parser.bpmn_id
|
||||
if executable_process is None:
|
||||
raise BpmnFileMissingExecutableProcessError(f"Executable process cannot be found in {bpmn_file}. Test cannot run.")
|
||||
raise BpmnFileMissingExecutableProcessError(
|
||||
f"Executable process cannot be found in {bpmn_file}. Test cannot run."
|
||||
)
|
||||
bpmn_process_spec = parser.get_spec(executable_process)
|
||||
bpmn_process_instance = BpmnWorkflow(bpmn_process_spec)
|
||||
return bpmn_process_instance
|
||||
|
@ -8,4 +8,4 @@
|
||||
"metadata_extraction_paths": null,
|
||||
"primary_file_name": "failing_task.bpmn",
|
||||
"primary_process_id": "Process_FailingProcess"
|
||||
}
|
||||
}
|
||||
|
@ -1,21 +1,15 @@
|
||||
from flask import Flask
|
||||
import pytest
|
||||
import os
|
||||
from flask import current_app
|
||||
from spiffworkflow_backend.models.process_model import ProcessModelInfo
|
||||
from spiffworkflow_backend.services.file_system_service import FileSystemService
|
||||
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
||||
from spiffworkflow_backend.services.process_model_test_runner_service import ProcessModelTestRunner, ProcessModelTestRunnerService
|
||||
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
||||
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
|
||||
from pytest_mock import MockerFixture
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
from flask import current_app
|
||||
from flask import Flask
|
||||
from pytest_mock import MockerFixture
|
||||
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
||||
|
||||
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
|
||||
from spiffworkflow_backend.models.bpmn_process_definition import BpmnProcessDefinitionModel
|
||||
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
|
||||
from spiffworkflow_backend.models.task_definition import TaskDefinitionModel
|
||||
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
|
||||
from spiffworkflow_backend.services.task_service import TaskService
|
||||
from spiffworkflow_backend.services.file_system_service import FileSystemService
|
||||
from spiffworkflow_backend.services.process_model_test_runner_service import ProcessModelTestRunnerService
|
||||
|
||||
|
||||
class TestProcessModelTestRunnerService(BaseTest):
|
||||
@ -23,9 +17,11 @@ class TestProcessModelTestRunnerService(BaseTest):
|
||||
self,
|
||||
app: Flask,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_mocked_root_path: any,
|
||||
with_mocked_root_path: Any,
|
||||
) -> None:
|
||||
test_runner_service = ProcessModelTestRunnerService(os.path.join(FileSystemService.root_path(), 'basic_script_task'))
|
||||
test_runner_service = ProcessModelTestRunnerService(
|
||||
os.path.join(FileSystemService.root_path(), "basic_script_task")
|
||||
)
|
||||
test_runner_service.run()
|
||||
assert test_runner_service.process_model_test_runner.all_test_cases_passed()
|
||||
|
||||
@ -33,7 +29,7 @@ class TestProcessModelTestRunnerService(BaseTest):
|
||||
self,
|
||||
app: Flask,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_mocked_root_path: any,
|
||||
with_mocked_root_path: Any,
|
||||
) -> None:
|
||||
test_runner_service = ProcessModelTestRunnerService(FileSystemService.root_path())
|
||||
test_runner_service.run()
|
||||
@ -49,4 +45,4 @@ class TestProcessModelTestRunnerService(BaseTest):
|
||||
"data",
|
||||
"bpmn_unit_test_process_models",
|
||||
)
|
||||
mocker.patch.object(FileSystemService, attribute='root_path', return_value=path)
|
||||
mocker.patch.object(FileSystemService, attribute="root_path", return_value=path)
|
||||
|
Loading…
x
Reference in New Issue
Block a user