find the top level process to find the task form when using subprocesses in called activities w/ burnettk danfunk
This commit is contained in:
parent
76348bf21f
commit
a6fbe6715d
2
.flake8
2
.flake8
|
@ -12,7 +12,7 @@ per-file-ignores =
|
|||
spiffworkflow-backend/tests/*:S101
|
||||
|
||||
# prefer naming functions descriptively rather than forcing comments
|
||||
spiffworkflow-backend/*:D103
|
||||
spiffworkflow-backend/*:D102,D103
|
||||
|
||||
spiffworkflow-backend/bin/keycloak_test_server.py:B950,D
|
||||
spiffworkflow-backend/conftest.py:S105
|
||||
|
|
|
@ -12,7 +12,7 @@ per-file-ignores =
|
|||
tests/*:S101
|
||||
|
||||
# prefer naming functions descriptively rather than forcing comments
|
||||
*:D103
|
||||
*:D102
|
||||
|
||||
bin/keycloak_test_server.py:B950,D
|
||||
conftest.py:S105
|
||||
|
|
|
@ -9,6 +9,8 @@ from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
|||
def main(process_instance_id: str):
|
||||
"""Main."""
|
||||
os.environ["SPIFFWORKFLOW_BACKEND_ENV"] = "development"
|
||||
if os.environ.get("BPMN_SPEC_ABSOLUTE_DIR") is None:
|
||||
os.environ["BPMN_SPEC_ABSOLUTE_DIR"] = "hey"
|
||||
flask_env_key = "FLASK_SESSION_SECRET_KEY"
|
||||
os.environ[flask_env_key] = "whatevs"
|
||||
app = create_app()
|
||||
|
|
|
@ -214,12 +214,16 @@ def task_show(process_instance_id: int, task_id: str) -> flask.wrappers.Response
|
|||
task.process_model_identifier = process_model.id
|
||||
|
||||
process_model_with_form = process_model
|
||||
|
||||
refs = SpecFileService.get_references_for_process(process_model_with_form)
|
||||
all_processes = [i.identifier for i in refs]
|
||||
if task.process_identifier not in all_processes:
|
||||
top_process_name = processor.find_process_model_process_name_by_task_name(
|
||||
task.process_identifier
|
||||
)
|
||||
bpmn_file_full_path = (
|
||||
ProcessInstanceProcessor.bpmn_file_full_path_from_bpmn_process_identifier(
|
||||
task.process_identifier
|
||||
top_process_name
|
||||
)
|
||||
)
|
||||
relative_path = os.path.relpath(
|
||||
|
|
|
@ -627,18 +627,67 @@ class ProcessInstanceProcessor:
|
|||
db.session.add(pim)
|
||||
db.session.commit()
|
||||
|
||||
# FIXME: Better to move to SpiffWorkflow and traverse the outer_workflows on the spiff_task
|
||||
# We may need to add whether a subprocess is a call activity or a subprocess in order to do it properly
|
||||
def get_all_processes_with_task_name_list(self) -> dict[str, list[str]]:
|
||||
"""Gets the list of processes pointing to a list of task names.
|
||||
|
||||
This is useful for figuring out which process contain which task.
|
||||
|
||||
Rerturns: {process_name: [task_1, task_2, ...], ...}
|
||||
"""
|
||||
serialized_data = json.loads(self.serialize())
|
||||
processes: dict[str, list[str]] = {serialized_data["spec"]["name"]: []}
|
||||
for task_name, _task_spec in serialized_data["spec"]["task_specs"].items():
|
||||
processes[serialized_data["spec"]["name"]].append(task_name)
|
||||
if "subprocess_specs" in serialized_data:
|
||||
for subprocess_name, subprocess_details in serialized_data[
|
||||
"subprocess_specs"
|
||||
].items():
|
||||
processes[subprocess_name] = []
|
||||
if "task_specs" in subprocess_details:
|
||||
for task_name, _task_spec in subprocess_details[
|
||||
"task_specs"
|
||||
].items():
|
||||
processes[subprocess_name].append(task_name)
|
||||
return processes
|
||||
|
||||
def find_process_model_process_name_by_task_name(
|
||||
self, task_name: str, processes: Optional[dict[str, list[str]]] = None
|
||||
) -> str:
|
||||
"""Gets the top level process of a process model using the task name that the process contains.
|
||||
|
||||
For example, process_modelA has processA which has a call activity that calls processB which is inside of process_modelB.
|
||||
processB has subprocessA which has taskA. Using taskA this method should return processB and then that can be used with
|
||||
the spec reference cache to find process_modelB.
|
||||
"""
|
||||
process_name_to_return = task_name
|
||||
if processes is None:
|
||||
processes = self.get_all_processes_with_task_name_list()
|
||||
|
||||
for process_name, task_spec_names in processes.items():
|
||||
if task_name in task_spec_names:
|
||||
process_name_to_return = (
|
||||
self.find_process_model_process_name_by_task_name(
|
||||
process_name, processes
|
||||
)
|
||||
)
|
||||
return process_name_to_return
|
||||
|
||||
#################################################################
|
||||
|
||||
def get_all_task_specs(self) -> dict[str, dict]:
|
||||
"""This looks both at top level task_specs and subprocess_specs in the serialized data.
|
||||
|
||||
It returns a dict of all task specs based on the task name like it is in the serialized form.
|
||||
|
||||
NOTE: this may not fully work for tasks that are NOT call activities since their task_name may no be unique
|
||||
NOTE: this may not fully work for tasks that are NOT call activities since their task_name may not be unique
|
||||
but in our current use case we only care about the call activities here.
|
||||
"""
|
||||
serialized_data = json.loads(self.serialize())
|
||||
spiff_task_json = serialized_data["spec"]["task_specs"] or {}
|
||||
if "subprocess_specs" in serialized_data:
|
||||
for _subprocess_task_name, subprocess_details in serialized_data[
|
||||
for _subprocess_name, subprocess_details in serialized_data[
|
||||
"subprocess_specs"
|
||||
].items():
|
||||
if "task_specs" in subprocess_details:
|
||||
|
|
Loading…
Reference in New Issue