some pyl stuff. tests are failing w/ burnettk

This commit is contained in:
jasquat 2023-04-21 16:32:29 -04:00
parent f000f47794
commit e45adff2e9
10 changed files with 57 additions and 179 deletions

View File

@ -1,7 +1,5 @@
"""APIs for dealing with process groups, process models, and process instances."""
import json
from spiffworkflow_backend.services.authorization_service import UserDoesNotHaveAccessToTaskError
from spiffworkflow_backend.services.authorization_service import HumanTaskNotFoundError
import os
import uuid
from sys import exc_info
@ -45,7 +43,6 @@ from spiffworkflow_backend.models.process_instance import (
)
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType
from spiffworkflow_backend.models.process_model import ProcessModelInfo
from spiffworkflow_backend.models.task import Task
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.routes.process_api_blueprint import (
@ -56,6 +53,8 @@ from spiffworkflow_backend.routes.process_api_blueprint import (
)
from spiffworkflow_backend.routes.process_api_blueprint import _get_process_model
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.authorization_service import HumanTaskNotFoundError
from spiffworkflow_backend.services.authorization_service import UserDoesNotHaveAccessToTaskError
from spiffworkflow_backend.services.file_system_service import FileSystemService
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
@ -267,7 +266,6 @@ def manual_complete_task(
def task_show(process_instance_id: int, task_guid: str = "next") -> flask.wrappers.Response:
"""Task_show."""
process_instance = _find_process_instance_by_id_or_raise(process_instance_id)
if process_instance.status == ProcessInstanceStatus.suspended.value:
@ -286,12 +284,11 @@ def task_show(process_instance_id: int, task_guid: str = "next") -> flask.wrappe
form_schema_file_name = ""
form_ui_schema_file_name = ""
processor = ProcessInstanceProcessor(process_instance)
spiff_task = _get_spiff_task_from_process_instance(task_guid, process_instance, processor=processor)
extensions = spiff_task.task_spec.extensions
task_model = _get_task_model_from_guid_or_raise(task_guid, process_instance_id)
# extensions = task_model.properties_json['extensions'] if 'extensions' in task_model.properties_json else {}
task_definition = task_model.task_definition
extensions = (
task_definition.properties_json["extensions"] if "extensions" in task_definition.properties_json else {}
)
if "properties" in extensions:
properties = extensions["properties"]
@ -303,7 +300,7 @@ def task_show(process_instance_id: int, task_guid: str = "next") -> flask.wrappe
can_complete = False
try:
AuthorizationService.assert_user_can_complete_task(
process_instance.id, task_model.task_definition.bpmn_identifier, g.user
process_instance.id, task_definition.bpmn_identifier, g.user
)
can_complete = True
except HumanTaskNotFoundError:
@ -311,24 +308,10 @@ def task_show(process_instance_id: int, task_guid: str = "next") -> flask.wrappe
except UserDoesNotHaveAccessToTaskError:
can_complete = False
task = ProcessInstanceService.spiff_task_to_api_task(processor, spiff_task)
task.data = spiff_task.data
task.process_model_display_name = process_model.display_name
task.process_model_identifier = process_model.id
# task.data
# task.form_schema
# task.form_ui_schema
# task.id
# task.process_model_display_name
# task.process_model_identifier
# task.state
# task.type
task_model.data = task_model.get_data()
task_model.process_model_display_name = process_model.display_name
task_model.process_model_identifier = process_model.id
task_model.type = task_model.task_definition.typename
task_model.type = task_definition.typename
task_model.can_complete = can_complete
task_process_identifier = task_model.bpmn_process.bpmn_process_definition.bpmn_identifier
@ -345,7 +328,7 @@ def task_show(process_instance_id: int, task_guid: str = "next") -> flask.wrappe
process_model_relative_path = os.path.dirname(relative_path)
process_model_with_form = ProcessModelService.get_process_model_from_relative_path(process_model_relative_path)
if task_model.task_definition.typename == "UserTask":
if task_definition.typename == "UserTask":
if not form_schema_file_name:
raise (
ApiError(
@ -386,7 +369,7 @@ def task_show(process_instance_id: int, task_guid: str = "next") -> flask.wrappe
def _render_instructions_for_end_user(task_model: TaskModel) -> str:
"""Assure any instructions for end user are processed for jinja syntax."""
extensions = task_model.properties_json['extensions'] if 'extensions' in task_model.properties_json else {}
extensions = task_model.properties_json["extensions"] if "extensions" in task_model.properties_json else {}
if extensions and "instructionsForEndUser" in extensions:
if extensions["instructionsForEndUser"]:
try:
@ -458,7 +441,7 @@ def interstitial(process_instance_id: int) -> Response:
return Response(
stream_with_context(_interstitial_stream(process_instance_id)),
mimetype="text/event-stream",
headers={'X-Accel-Buffering': 'no'}
headers={"X-Accel-Buffering": "no"},
)
@ -762,8 +745,8 @@ def _update_form_schema_with_task_data_as_needed(in_dict: dict, task_model: Task
if task_data_var not in task_model.data:
message = (
"Error building form. Attempting to create a selection list with options from variable"
f" '{task_data_var}' but it doesn't exist in the Task Data."
"Error building form. Attempting to create a selection list with options from"
f" variable '{task_data_var}' but it doesn't exist in the Task Data."
)
raise ApiError(
error_code="missing_task_data_var",

View File

@ -17,7 +17,6 @@ from flask import current_app
from flask import g
from flask import request
from flask import scaffold
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from sqlalchemy import or_
from sqlalchemy import text
@ -417,7 +416,6 @@ class AuthorizationService:
task_bpmn_identifier: str,
user: UserModel,
) -> bool:
"""Assert_user_can_complete_spiff_task."""
human_task = HumanTaskModel.query.filter_by(
task_name=task_bpmn_identifier,
process_instance_id=process_instance_id,

View File

@ -858,114 +858,6 @@ class ProcessInstanceProcessor:
db.session.add(pim)
db.session.commit()
# FIXME: Better to move to SpiffWorkflow and traverse the outer_workflows on the spiff_task
# We may need to add whether a subprocess is a call activity or a subprocess in order to do it properly
def get_all_processes_with_task_name_list(self) -> dict[str, list[str]]:
"""Gets the list of processes pointing to a list of task names.
This is useful for figuring out which process contain which task.
Rerturns: {process_name: [task_1, task_2, ...], ...}
"""
bpmn_definition_dict = self.full_bpmn_process_dict
processes: dict[str, list[str]] = {bpmn_definition_dict["spec"]["name"]: []}
for task_name, _task_spec in bpmn_definition_dict["spec"]["task_specs"].items():
processes[bpmn_definition_dict["spec"]["name"]].append(task_name)
if "subprocess_specs" in bpmn_definition_dict:
for subprocess_name, subprocess_details in bpmn_definition_dict["subprocess_specs"].items():
processes[subprocess_name] = []
if "task_specs" in subprocess_details:
for task_name, _task_spec in subprocess_details["task_specs"].items():
processes[subprocess_name].append(task_name)
return processes
def find_process_model_process_name_by_task_name(
self, task_name: str, processes: Optional[dict[str, list[str]]] = None
) -> str:
"""Gets the top level process of a process model using the task name that the process contains.
For example, process_modelA has processA which has a call activity that calls processB which is inside of process_modelB.
processB has subprocessA which has taskA. Using taskA this method should return processB and then that can be used with
the spec reference cache to find process_modelB.
"""
process_name_to_return = task_name
if processes is None:
processes = self.get_all_processes_with_task_name_list()
for process_name, task_spec_names in processes.items():
if task_name in task_spec_names:
process_name_to_return = self.find_process_model_process_name_by_task_name(process_name, processes)
return process_name_to_return
#################################################################
def get_all_task_specs(self) -> dict[str, dict]:
"""This looks both at top level task_specs and subprocess_specs in the serialized data.
It returns a dict of all task specs based on the task name like it is in the serialized form.
NOTE: this may not fully work for tasks that are NOT call activities since their task_name may not be unique
but in our current use case we only care about the call activities here.
"""
bpmn_definition_dict = self.full_bpmn_process_dict
spiff_task_json = bpmn_definition_dict["spec"]["task_specs"] or {}
if "subprocess_specs" in bpmn_definition_dict:
for _subprocess_name, subprocess_details in bpmn_definition_dict["subprocess_specs"].items():
if "task_specs" in subprocess_details:
spiff_task_json = spiff_task_json | subprocess_details["task_specs"]
return spiff_task_json
def get_subprocesses_by_child_task_ids(self) -> Tuple[dict, dict]:
"""Get all subprocess ids based on the child task ids.
This is useful when trying to link the child task of a call activity back to
the call activity that called it to get the appropriate data. For example, if you
have a call activity "Log" that you call twice within the same process, the Hammer log file
activity within the Log process will get called twice. They will potentially have different
task data. We want to be able to differentiate those two activities.
subprocess structure in the json:
"subprocesses": { [subprocess_task_id]: "tasks" : { [task_id]: [bpmn_task_details] }}
Also note that subprocess_task_id might in fact be a call activity, because spiff treats
call activities like subprocesses in terms of the serialization.
"""
process_instance_data_dict = self.full_bpmn_process_dict
spiff_task_json = self.get_all_task_specs()
subprocesses_by_child_task_ids = {}
task_typename_by_task_id = {}
if "subprocesses" in process_instance_data_dict:
for subprocess_id, subprocess_details in process_instance_data_dict["subprocesses"].items():
for task_id, task_details in subprocess_details["tasks"].items():
subprocesses_by_child_task_ids[task_id] = subprocess_id
task_name = task_details["task_spec"]
if task_name in spiff_task_json:
task_typename_by_task_id[task_id] = spiff_task_json[task_name]["typename"]
return (subprocesses_by_child_task_ids, task_typename_by_task_id)
def get_highest_level_calling_subprocesses_by_child_task_ids(
self, subprocesses_by_child_task_ids: dict, task_typename_by_task_id: dict
) -> dict:
"""Ensure task ids point to the top level subprocess id.
This is done by checking if a subprocess is also a task until the subprocess is no longer a task or a Call Activity.
"""
for task_id, subprocess_id in subprocesses_by_child_task_ids.items():
if subprocess_id in subprocesses_by_child_task_ids:
current_subprocess_id_for_task = subprocesses_by_child_task_ids[task_id]
if current_subprocess_id_for_task in task_typename_by_task_id:
# a call activity is like the top-level subprocess since it is the calling subprocess
# according to spiff and the top-level calling subprocess is really what we care about
if task_typename_by_task_id[current_subprocess_id_for_task] == "CallActivity":
continue
subprocesses_by_child_task_ids[task_id] = subprocesses_by_child_task_ids[subprocess_id]
self.get_highest_level_calling_subprocesses_by_child_task_ids(
subprocesses_by_child_task_ids, task_typename_by_task_id
)
return subprocesses_by_child_task_ids
def _store_bpmn_process_definition(
self,
process_bpmn_properties: dict,

View File

@ -496,7 +496,9 @@ class TaskService:
processB has subprocessA which has taskA. Using taskA this method should return processB and then that can be used with
the spec reference cache to find process_modelB.
"""
(bpmn_processes, _task_models) = TaskService.task_models_of_parent_bpmn_processes(task_model, stop_on_first_call_activity=True)
(bpmn_processes, _task_models) = TaskService.task_models_of_parent_bpmn_processes(
task_model, stop_on_first_call_activity=True
)
return bpmn_processes[0]
@classmethod
@ -537,9 +539,11 @@ class TaskService:
if bpmn_process.guid is not None:
parent_task_model = TaskModel.query.filter_by(guid=bpmn_process.guid).first()
task_models.append(parent_task_model)
if not stop_on_first_call_activity or parent_task_model.task_definition.typename != 'CallActivity':
if not stop_on_first_call_activity or parent_task_model.task_definition.typename != "CallActivity":
if parent_task_model is not None:
b, t = cls.task_models_of_parent_bpmn_processes(parent_task_model, stop_on_first_call_activity=stop_on_first_call_activity)
b, t = cls.task_models_of_parent_bpmn_processes(
parent_task_model, stop_on_first_call_activity=stop_on_first_call_activity
)
return (b + bpmn_processes, t + task_models)
return (bpmn_processes, task_models)

View File

@ -8,4 +8,4 @@
"metadata_extraction_paths": null,
"primary_file_name": "call_activity_nested.bpmn",
"primary_process_id": "Level1"
}
}

View File

@ -2,7 +2,6 @@
import re
from flask.app import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
@ -12,7 +11,6 @@ from spiffworkflow_backend.models.process_instance_metadata import (
)
from spiffworkflow_backend.models.process_model import ProcessModelInfo
from spiffworkflow_backend.models.spec_reference import SpecReferenceCache
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)

View File

@ -1,10 +1,8 @@
"""Test_process_model_service."""
from flask import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_model_service import ProcessModelService

View File

@ -1,19 +1,16 @@
from flask import Flask
from spiffworkflow_backend.models.task_definition import TaskDefinitionModel
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
from flask.testing import FlaskClient
from spiffworkflow_backend.models.bpmn_process_definition import BpmnProcessDefinitionModel
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
from spiffworkflow_backend.services.task_service import TaskService
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
from spiffworkflow_backend.models.bpmn_process_definition import BpmnProcessDefinitionModel
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.task_definition import TaskDefinitionModel
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
from spiffworkflow_backend.services.task_service import TaskService
class TestTaskService(BaseTest):
def test_can_get_full_bpmn_process_path(
self,
app: Flask,
@ -42,22 +39,22 @@ class TestTaskService(BaseTest):
assert process_instance.status == "complete"
bpmn_process_level_2b = (
BpmnProcessModel.query
.join(BpmnProcessDefinitionModel)
.filter(BpmnProcessDefinitionModel.bpmn_identifier == 'Level2b').first()
BpmnProcessModel.query.join(BpmnProcessDefinitionModel)
.filter(BpmnProcessDefinitionModel.bpmn_identifier == "Level2b")
.first()
)
assert bpmn_process_level_2b is not None
full_bpnmn_process_path = TaskService.full_bpmn_process_path(bpmn_process_level_2b)
assert full_bpnmn_process_path == ['Level1', 'Level2', 'Level2b']
assert full_bpnmn_process_path == ["Level1", "Level2", "Level2b"]
bpmn_process_level_3 = (
BpmnProcessModel.query
.join(BpmnProcessDefinitionModel)
.filter(BpmnProcessDefinitionModel.bpmn_identifier == 'Level3').first()
BpmnProcessModel.query.join(BpmnProcessDefinitionModel)
.filter(BpmnProcessDefinitionModel.bpmn_identifier == "Level3")
.first()
)
assert bpmn_process_level_3 is not None
full_bpnmn_process_path = TaskService.full_bpmn_process_path(bpmn_process_level_3)
assert full_bpnmn_process_path == ['Level1', 'Level2', 'Level3']
assert full_bpnmn_process_path == ["Level1", "Level2", "Level3"]
def test_task_models_of_parent_bpmn_processes_stop_on_first_call_activity(
self,
@ -88,25 +85,31 @@ class TestTaskService(BaseTest):
task_model_level_2b = (
TaskModel.query.join(TaskDefinitionModel)
.filter(TaskDefinitionModel.bpmn_identifier == 'level_2b_subprocess_script_task').first()
.filter(TaskDefinitionModel.bpmn_identifier == "level_2b_subprocess_script_task")
.first()
)
assert task_model_level_2b is not None
(bpmn_processes, task_models) = TaskService.task_models_of_parent_bpmn_processes(task_model_level_2b, stop_on_first_call_activity=True)
(bpmn_processes, task_models) = TaskService.task_models_of_parent_bpmn_processes(
task_model_level_2b, stop_on_first_call_activity=True
)
assert len(bpmn_processes) == 2
assert len(task_models) == 2
assert bpmn_processes[0].bpmn_process_definition.bpmn_identifier == 'Level2b'
assert task_models[0].task_definition.bpmn_identifier == 'level2b_second_call'
assert bpmn_processes[0].bpmn_process_definition.bpmn_identifier == "Level2b"
assert task_models[0].task_definition.bpmn_identifier == "level2b_second_call"
task_model_level_3 = (
TaskModel.query.join(TaskDefinitionModel)
.filter(TaskDefinitionModel.bpmn_identifier == 'level_3_script_task').first()
.filter(TaskDefinitionModel.bpmn_identifier == "level_3_script_task")
.first()
)
assert task_model_level_3 is not None
(bpmn_processes, task_models) = TaskService.task_models_of_parent_bpmn_processes(task_model_level_3, stop_on_first_call_activity=True)
(bpmn_processes, task_models) = TaskService.task_models_of_parent_bpmn_processes(
task_model_level_3, stop_on_first_call_activity=True
)
assert len(bpmn_processes) == 1
assert len(task_models) == 1
assert bpmn_processes[0].bpmn_process_definition.bpmn_identifier == 'Level3'
assert task_models[0].task_definition.bpmn_identifier == 'level3'
assert bpmn_processes[0].bpmn_process_definition.bpmn_identifier == "Level3"
assert task_models[0].task_definition.bpmn_identifier == "level3"
def test_bpmn_process_for_called_activity_or_top_level_process(
self,
@ -137,17 +140,19 @@ class TestTaskService(BaseTest):
task_model_level_2b = (
TaskModel.query.join(TaskDefinitionModel)
.filter(TaskDefinitionModel.bpmn_identifier == 'level_2b_subprocess_script_task').first()
.filter(TaskDefinitionModel.bpmn_identifier == "level_2b_subprocess_script_task")
.first()
)
assert task_model_level_2b is not None
bpmn_process = TaskService.bpmn_process_for_called_activity_or_top_level_process(task_model_level_2b)
assert bpmn_process is not None
assert bpmn_process.bpmn_process_definition.bpmn_identifier == 'Level2b'
assert bpmn_process.bpmn_process_definition.bpmn_identifier == "Level2b"
task_model_level_3 = (
TaskModel.query.join(TaskDefinitionModel)
.filter(TaskDefinitionModel.bpmn_identifier == 'level_3_script_task').first()
.filter(TaskDefinitionModel.bpmn_identifier == "level_3_script_task")
.first()
)
assert task_model_level_3 is not None
bpmn_process = TaskService.bpmn_process_for_called_activity_or_top_level_process(task_model_level_3)
assert bpmn_process.bpmn_process_definition.bpmn_identifier == 'Level3'
assert bpmn_process.bpmn_process_definition.bpmn_identifier == "Level3"

View File

@ -21,7 +21,7 @@ export default function ProcessInterstitial() {
return ['User Task', 'Manual Task'];
}, []);
const processInstanceShowPageBaseUrl = `/admin/process-instances/for-me/${params.process_model_id}`;
const processInstanceShowPageBaseUrl = `/admin/process-instances/for-me/${params.modified_process_model_identifier}`;
useEffect(() => {
fetchEventSource(

View File

@ -6,7 +6,7 @@ export default function ProcessRoutes() {
return (
<Routes>
<Route
path=":process_model_id/:process_instance_id/interstitial"
path=":modified_process_model_identifier/:process_instance_id/interstitial"
element={<ProcessInterstitial />}
/>
</Routes>