add generate_report and tests
This commit is contained in:
parent
5047b9aa8b
commit
96f770426b
6
.flake8
6
.flake8
|
@ -21,7 +21,5 @@ per-file-ignores =
|
|||
src/spiffworkflow_backend/load_database_models.py:F401
|
||||
|
||||
# this file overwrites methods from the logging library so we can't change them
|
||||
src/spiffworkflow_backend/services/logging_service.py:N802
|
||||
|
||||
# ignore long comment line
|
||||
src/spiffworkflow_backend/services/logging_service.py:B950
|
||||
# and ignore long comment line
|
||||
src/spiffworkflow_backend/services/logging_service.py:N802,B950
|
||||
|
|
48
conftest.py
48
conftest.py
|
@ -1,13 +1,21 @@
|
|||
"""Conftest."""
|
||||
import os
|
||||
import shutil
|
||||
from typing import Iterator
|
||||
|
||||
import pytest
|
||||
from flask.app import Flask
|
||||
from flask_bpmn.models.db import db
|
||||
from flask_bpmn.models.db import SpiffworkflowBaseDBModel
|
||||
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
|
||||
|
||||
from spiffworkflow_backend.helpers.fixture_data import find_or_create_user
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||
from spiffworkflow_backend.services.process_instance_processor import (
|
||||
ProcessInstanceProcessor,
|
||||
)
|
||||
from spiffworkflow_backend.services.process_instance_service import (
|
||||
ProcessInstanceService,
|
||||
)
|
||||
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
||||
|
||||
|
||||
|
@ -41,7 +49,7 @@ def app() -> Flask:
|
|||
|
||||
|
||||
@pytest.fixture()
|
||||
def with_db_and_bpmn_file_cleanup() -> Iterator[None]:
|
||||
def with_db_and_bpmn_file_cleanup() -> None:
|
||||
"""Process_group_resource."""
|
||||
for model in SpiffworkflowBaseDBModel._all_subclasses():
|
||||
db.session.query(model).delete()
|
||||
|
@ -52,3 +60,39 @@ def with_db_and_bpmn_file_cleanup() -> Iterator[None]:
|
|||
process_model_service = ProcessModelService()
|
||||
if os.path.exists(process_model_service.root_path()):
|
||||
shutil.rmtree(process_model_service.root_path())
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def setup_process_instances_for_reports() -> list[ProcessInstanceModel]:
|
||||
"""Setup_process_instances_for_reports."""
|
||||
user = find_or_create_user()
|
||||
process_group_id = "runs_without_input"
|
||||
process_model_id = "sample"
|
||||
load_test_spec(process_group_id=process_group_id, process_model_id=process_model_id)
|
||||
process_instances = []
|
||||
for data in [kay(), ray(), jay()]:
|
||||
process_instance = ProcessInstanceService.create_process_instance(
|
||||
process_group_identifier=process_group_id,
|
||||
process_model_identifier=process_model_id,
|
||||
user=user,
|
||||
)
|
||||
processor = ProcessInstanceProcessor(process_instance)
|
||||
processor.slam_in_data(data)
|
||||
process_instances.append(process_instance)
|
||||
|
||||
return process_instances
|
||||
|
||||
|
||||
def kay() -> dict:
|
||||
"""Kay."""
|
||||
return {"name": "kay", "grade_level": 2, "test_score": 10}
|
||||
|
||||
|
||||
def ray() -> dict:
|
||||
"""Ray."""
|
||||
return {"name": "ray", "grade_level": 1, "test_score": 9}
|
||||
|
||||
|
||||
def jay() -> dict:
|
||||
"""Jay."""
|
||||
return {"name": "jay", "grade_level": 2, "test_score": 8}
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
from __future__ import with_statement
|
||||
|
||||
import logging
|
||||
from logging.config import fileConfig
|
||||
|
||||
|
|
|
@ -2,6 +2,8 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
from typing import TypedDict
|
||||
|
||||
from flask_bpmn.models.db import db
|
||||
from flask_bpmn.models.db import SpiffworkflowBaseDBModel
|
||||
|
@ -12,10 +14,41 @@ from sqlalchemy.orm import relationship
|
|||
from spiffworkflow_backend.exceptions.process_entity_not_found_error import (
|
||||
ProcessEntityNotFoundError,
|
||||
)
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||
from spiffworkflow_backend.models.user import UserModel
|
||||
from spiffworkflow_backend.services.process_instance_processor import (
|
||||
ProcessInstanceProcessor,
|
||||
)
|
||||
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
||||
|
||||
|
||||
ReportMetadata = dict[str, Any]
|
||||
|
||||
|
||||
class ProcessInstanceReportResult(TypedDict):
|
||||
"""ProcessInstanceReportResult."""
|
||||
|
||||
report_metadata: ReportMetadata
|
||||
results: list[dict]
|
||||
|
||||
|
||||
# https://stackoverflow.com/a/56842689/6090676
|
||||
class Reversor:
|
||||
"""Reversor."""
|
||||
|
||||
def __init__(self, obj):
|
||||
"""__init__."""
|
||||
self.obj = obj
|
||||
|
||||
def __eq__(self, other):
|
||||
"""__eq__."""
|
||||
return other.obj == self.obj
|
||||
|
||||
def __lt__(self, other):
|
||||
"""__lt__."""
|
||||
return other.obj < self.obj
|
||||
|
||||
|
||||
@dataclass
|
||||
class ProcessInstanceReportModel(SpiffworkflowBaseDBModel):
|
||||
"""ProcessInstanceReportModel."""
|
||||
|
@ -92,3 +125,96 @@ class ProcessInstanceReportModel(SpiffworkflowBaseDBModel):
|
|||
db.session.add(process_instance_report)
|
||||
db.session.commit()
|
||||
return process_instance_report
|
||||
|
||||
def with_substitutions(self, field_value: any, substitution_variables: dict) -> str:
|
||||
"""With_substitutions."""
|
||||
if substitution_variables is not None:
|
||||
for key, value in substitution_variables.items():
|
||||
if isinstance(value, str) or isinstance(value, int):
|
||||
field_value = str(field_value).replace(
|
||||
"{{" + key + "}}", str(value)
|
||||
)
|
||||
return field_value
|
||||
|
||||
# modeled after https://github.com/suyash248/sqlalchemy-json-querybuilder
|
||||
# just supports "equals" operator for now.
|
||||
# perhaps we will use the database instead of filtering in memory in the future and then we might use this lib directly.
|
||||
def passes_filter(
|
||||
self, process_instance_dict: dict, substitution_variables: dict
|
||||
) -> bool:
|
||||
"""Passes_filter."""
|
||||
process_instance_data = process_instance_dict["data"]
|
||||
if "filter_by" in self.report_metadata:
|
||||
for filter_by in self.report_metadata["filter_by"]:
|
||||
field_name = filter_by["field_name"]
|
||||
operator = filter_by["operator"]
|
||||
field_value = self.with_substitutions(
|
||||
filter_by["field_value"], substitution_variables
|
||||
)
|
||||
if operator == "equals":
|
||||
if str(process_instance_data.get(field_name)) != str(field_value):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def order_things(self, process_instance_dicts: list) -> list:
|
||||
"""Order_things."""
|
||||
order_by = self.report_metadata["order_by"]
|
||||
|
||||
def order_by_function_for_lambda(process_instance_dict: dict) -> str:
|
||||
"""Order_by_function_for_lambda."""
|
||||
comparison_values = []
|
||||
for order_by_item in order_by:
|
||||
if order_by_item.startswith("-"):
|
||||
# remove leading - from order_by_item
|
||||
order_by_item = order_by_item[1:]
|
||||
sort_value = process_instance_dict["data"].get(order_by_item)
|
||||
comparison_values.append(Reversor(sort_value))
|
||||
else:
|
||||
sort_value = process_instance_dict["data"].get(order_by_item)
|
||||
comparison_values.append(sort_value)
|
||||
return comparison_values
|
||||
|
||||
return sorted(process_instance_dicts, key=order_by_function_for_lambda)
|
||||
|
||||
def generate_report(
|
||||
self,
|
||||
process_instances: list[ProcessInstanceModel],
|
||||
substitution_variables: dict,
|
||||
) -> ProcessInstanceReportResult:
|
||||
"""Generate_report."""
|
||||
|
||||
def to_serialized(process_instance: ProcessInstanceModel) -> dict:
|
||||
"""To_serialized."""
|
||||
processor = ProcessInstanceProcessor(process_instance)
|
||||
process_instance.data = processor.get_data()
|
||||
return process_instance.serialized
|
||||
|
||||
process_instance_dicts = map(to_serialized, process_instances)
|
||||
results = []
|
||||
for process_instance_dict in process_instance_dicts:
|
||||
if self.passes_filter(process_instance_dict, substitution_variables):
|
||||
results.append(process_instance_dict)
|
||||
|
||||
if "order_by" in self.report_metadata:
|
||||
results = self.order_things(results)
|
||||
|
||||
if "columns" in self.report_metadata:
|
||||
column_keys_to_keep = [
|
||||
c["accessor"] for c in self.report_metadata["columns"]
|
||||
]
|
||||
|
||||
pruned_results = []
|
||||
for result in results:
|
||||
data = result["data"]
|
||||
dict_you_want = {
|
||||
your_key: data[your_key]
|
||||
for your_key in column_keys_to_keep
|
||||
if data.get(your_key)
|
||||
}
|
||||
pruned_results.append(dict_you_want)
|
||||
results = pruned_results
|
||||
|
||||
return ProcessInstanceReportResult(
|
||||
report_metadata=self.report_metadata, results=results
|
||||
)
|
||||
|
|
|
@ -30,6 +30,7 @@ from SpiffWorkflow.dmn.serializer import BusinessRuleTaskConverter # type: igno
|
|||
from SpiffWorkflow.exceptions import WorkflowTaskExecException # type: ignore
|
||||
from SpiffWorkflow.serializer.exceptions import MissingSpecError # type: ignore
|
||||
from SpiffWorkflow.specs import WorkflowSpec # type: ignore
|
||||
from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore
|
||||
|
||||
from spiffworkflow_backend.models.active_task import ActiveTaskModel
|
||||
from spiffworkflow_backend.models.file import File
|
||||
|
@ -313,6 +314,14 @@ class ProcessInstanceProcessor:
|
|||
] = validate_only
|
||||
return bpmn_process_instance
|
||||
|
||||
def slam_in_data(self, data: dict) -> None:
|
||||
"""Slam_in_data."""
|
||||
self.bpmn_process_instance.data = DeepMerge.merge(
|
||||
self.bpmn_process_instance.data, data
|
||||
)
|
||||
|
||||
self.save()
|
||||
|
||||
def save(self) -> None:
|
||||
"""Saves the current state of this processor to the database."""
|
||||
self.process_instance_model.bpmn_json = self.serialize()
|
||||
|
|
|
@ -0,0 +1,89 @@
|
|||
"""Test Permissions."""
|
||||
from flask.app import Flask
|
||||
|
||||
from spiffworkflow_backend.helpers.fixture_data import find_or_create_user
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||
from spiffworkflow_backend.models.process_instance_report import ProcessInstanceReportModel
|
||||
|
||||
# from tests.spiffworkflow_backend.helpers.test_data import find_or_create_process_group
|
||||
# from spiffworkflow_backend.models.permission_assignment import PermissionAssignmentModel
|
||||
# from spiffworkflow_backend.models.permission_target import PermissionTargetModel
|
||||
|
||||
|
||||
def test_generate_report_with_filter_by(app: Flask, with_db_and_bpmn_file_cleanup, setup_process_instances_for_reports) -> None:
|
||||
"""Test_user_can_be_given_permission_to_administer_process_group."""
|
||||
process_instances = setup_process_instances_for_reports
|
||||
report_metadata = {"filter_by": [{"field_name": "grade_level", "operator": "equals", "field_value": 2}]}
|
||||
results = do_report_with_metadata_and_instances(report_metadata, process_instances)
|
||||
assert len(results) == 2
|
||||
names = get_names_from_results(results)
|
||||
assert names == ["kay", "jay"]
|
||||
|
||||
|
||||
def test_generate_report_with_filter_by_with_variable_substitution(app: Flask, with_db_and_bpmn_file_cleanup, setup_process_instances_for_reports) -> None:
|
||||
"""Test_user_can_be_given_permission_to_administer_process_group."""
|
||||
process_instances = setup_process_instances_for_reports
|
||||
report_metadata = {"filter_by": [{"field_name": "grade_level",
|
||||
"operator": "equals", "field_value": "{{grade_level}}"}]}
|
||||
results = do_report_with_metadata_and_instances(report_metadata, process_instances, {"grade_level": 1})
|
||||
assert len(results) == 1
|
||||
names = get_names_from_results(results)
|
||||
assert names == ["ray"]
|
||||
|
||||
|
||||
def test_generate_report_with_order_by_and_one_field(app: Flask, with_db_and_bpmn_file_cleanup, setup_process_instances_for_reports) -> None:
|
||||
"""Test_user_can_be_given_permission_to_administer_process_group."""
|
||||
process_instances = setup_process_instances_for_reports
|
||||
report_metadata = {"order_by": ["test_score"]}
|
||||
results = do_report_with_metadata_and_instances(report_metadata, process_instances)
|
||||
assert len(results) == 3
|
||||
names = get_names_from_results(results)
|
||||
assert names == ["jay", "ray", "kay"]
|
||||
|
||||
|
||||
def test_generate_report_with_order_by_and_two_fields(app: Flask, with_db_and_bpmn_file_cleanup, setup_process_instances_for_reports) -> None:
|
||||
"""Test_user_can_be_given_permission_to_administer_process_group."""
|
||||
process_instances = setup_process_instances_for_reports
|
||||
report_metadata = {"order_by": ["grade_level", "test_score"]}
|
||||
results = do_report_with_metadata_and_instances(report_metadata, process_instances)
|
||||
assert len(results) == 3
|
||||
names = get_names_from_results(results)
|
||||
assert names == ["ray", "jay", "kay"]
|
||||
|
||||
|
||||
def test_generate_report_with_order_by_desc(app: Flask, with_db_and_bpmn_file_cleanup, setup_process_instances_for_reports) -> None:
|
||||
"""Test_user_can_be_given_permission_to_administer_process_group."""
|
||||
process_instances = setup_process_instances_for_reports
|
||||
report_metadata = {"order_by": ["grade_level", "-test_score"]}
|
||||
results = do_report_with_metadata_and_instances(report_metadata, process_instances)
|
||||
assert len(results) == 3
|
||||
names = get_names_from_results(results)
|
||||
assert names == ["ray", "kay", "jay"]
|
||||
|
||||
|
||||
def test_generate_report_with_columns(app: Flask, with_db_and_bpmn_file_cleanup, setup_process_instances_for_reports) -> None:
|
||||
"""Test_user_can_be_given_permission_to_administer_process_group."""
|
||||
process_instances = setup_process_instances_for_reports
|
||||
report_metadata = {"columns": [{"Header": "Name", "accessor": "name"}], "order_by": [
|
||||
"test_score"], "filter_by": [{"field_name": "grade_level", "operator": "equals", "field_value": 1}]}
|
||||
results = do_report_with_metadata_and_instances(report_metadata, process_instances)
|
||||
assert len(results) == 1
|
||||
assert results == [{"name": "ray"}]
|
||||
|
||||
|
||||
def do_report_with_metadata_and_instances(report_metadata: dict, process_instances: list[ProcessInstanceModel], substitution_variables: dict = None):
|
||||
"""Do_report_with_metadata_and_instances."""
|
||||
process_instance_report = ProcessInstanceReportModel.create_with_attributes(
|
||||
identifier="sure",
|
||||
process_group_identifier=process_instances[0].process_group_identifier,
|
||||
process_model_identifier=process_instances[0].process_model_identifier,
|
||||
report_metadata=report_metadata,
|
||||
user=find_or_create_user(),
|
||||
)
|
||||
|
||||
return process_instance_report.generate_report(process_instances, substitution_variables)["results"]
|
||||
|
||||
|
||||
def get_names_from_results(results: list[dict]) -> list[str]:
|
||||
"""Get_names_from_results."""
|
||||
return [result["data"]["name"] for result in results]
|
Loading…
Reference in New Issue