Merge branch 'main' into feature/waku-fault-message

This commit is contained in:
mike cullerton 2023-01-13 12:11:40 -05:00
commit f071832e03
2 changed files with 49 additions and 11 deletions

View File

@ -556,9 +556,11 @@ def process_instance_task_list(
else:
spiff_tasks = processor.get_all_user_tasks()
subprocesses_by_child_task_ids = processor.get_subprocesses_by_child_task_ids()
processor.get_highest_level_subprocesses_by_child_task_ids(
subprocesses_by_child_task_ids
subprocesses_by_child_task_ids, task_typename_by_task_id = (
processor.get_subprocesses_by_child_task_ids()
)
processor.get_highest_level_calling_subprocesses_by_child_task_ids(
subprocesses_by_child_task_ids, task_typename_by_task_id
)
tasks = []

View File

@ -625,7 +625,25 @@ class ProcessInstanceProcessor:
db.session.add(pim)
db.session.commit()
def get_subprocesses_by_child_task_ids(self) -> dict:
def get_all_task_specs(self) -> dict[str, dict]:
"""This looks both at top level task_specs and subprocess_specs in the serialized data.
It returns a dict of all task specs based on the task name like it is in the serialized form.
NOTE: this may not fully work for tasks that are NOT call activities since their task_name may no be unique
but in our current use case we only care about the call activities here.
"""
serialized_data = json.loads(self.serialize())
spiff_task_json = serialized_data["spec"]["task_specs"] or {}
if "subprocess_specs" in serialized_data:
for _subprocess_task_name, subprocess_details in serialized_data[
"subprocess_specs"
].items():
if "task_specs" in subprocess_details:
spiff_task_json = spiff_task_json | subprocess_details["task_specs"]
return spiff_task_json
def get_subprocesses_by_child_task_ids(self) -> Tuple[dict, dict]:
"""Get all subprocess ids based on the child task ids.
This is useful when trying to link the child task of a call activity back to
@ -641,27 +659,45 @@ class ProcessInstanceProcessor:
call activities like subprocesses in terms of the serialization.
"""
bpmn_json = json.loads(self.serialize())
spiff_task_json = self.get_all_task_specs()
subprocesses_by_child_task_ids = {}
task_typename_by_task_id = {}
if "subprocesses" in bpmn_json:
for subprocess_id, subprocess_details in bpmn_json["subprocesses"].items():
for task_id in subprocess_details["tasks"]:
for task_id, task_details in subprocess_details["tasks"].items():
subprocesses_by_child_task_ids[task_id] = subprocess_id
return subprocesses_by_child_task_ids
task_name = task_details["task_spec"]
if task_name in spiff_task_json:
task_typename_by_task_id[task_id] = spiff_task_json[task_name][
"typename"
]
return (subprocesses_by_child_task_ids, task_typename_by_task_id)
def get_highest_level_subprocesses_by_child_task_ids(
self, subprocesses_by_child_task_ids: dict
def get_highest_level_calling_subprocesses_by_child_task_ids(
self, subprocesses_by_child_task_ids: dict, task_typename_by_task_id: dict
) -> dict:
"""Ensure task ids point to the top level subprocess id.
This is done by checking if a subprocess is also a task until the subprocess is no longer a task.
This is done by checking if a subprocess is also a task until the subprocess is no longer a task or a Call Activity.
"""
for task_id, subprocess_id in subprocesses_by_child_task_ids.items():
if subprocess_id in subprocesses_by_child_task_ids:
current_subprocess_id_for_task = subprocesses_by_child_task_ids[task_id]
if current_subprocess_id_for_task in task_typename_by_task_id:
# a call activity is like the top-level subprocess since it is the calling subprocess
# according to spiff and the top-level calling subprocess is really what we care about
if (
task_typename_by_task_id[current_subprocess_id_for_task]
== "CallActivity"
):
continue
subprocesses_by_child_task_ids[task_id] = (
subprocesses_by_child_task_ids[subprocess_id]
)
self.get_highest_level_subprocesses_by_child_task_ids(
subprocesses_by_child_task_ids
self.get_highest_level_calling_subprocesses_by_child_task_ids(
subprocesses_by_child_task_ids, task_typename_by_task_id
)
return subprocesses_by_child_task_ids