refactor some stuff in task_show to separate functions

This commit is contained in:
burnettk 2023-02-03 16:17:36 -05:00
parent 11a9e740af
commit e441dc35a1
1 changed files with 55 additions and 44 deletions

View File

@ -170,6 +170,25 @@ def task_list_for_my_groups(
) )
def _munge_form_ui_schema_based_on_hidden_fields_in_task_data(task: Task) -> None:
if task.form_ui_schema is None:
task.form_ui_schema = {}
if task.data and "form_ui_hidden_fields" in task.data:
hidden_fields = task.data["form_ui_hidden_fields"]
for hidden_field in hidden_fields:
hidden_field_parts = hidden_field.split(".")
relevant_depth_of_ui_schema = task.form_ui_schema
for ii, hidden_field_part in enumerate(hidden_field_parts):
if hidden_field_part not in relevant_depth_of_ui_schema:
relevant_depth_of_ui_schema[hidden_field_part] = {}
relevant_depth_of_ui_schema = relevant_depth_of_ui_schema[
hidden_field_part
]
if len(hidden_field_parts) == ii + 1:
relevant_depth_of_ui_schema["ui:widget"] = "hidden"
def task_show(process_instance_id: int, task_id: str) -> flask.wrappers.Response: def task_show(process_instance_id: int, task_id: str) -> flask.wrappers.Response:
"""Task_show.""" """Task_show."""
process_instance = _find_process_instance_by_id_or_raise(process_instance_id) process_instance = _find_process_instance_by_id_or_raise(process_instance_id)
@ -185,20 +204,7 @@ def task_show(process_instance_id: int, task_id: str) -> flask.wrappers.Response
process_instance.process_model_identifier, process_instance.process_model_identifier,
) )
human_task = HumanTaskModel.query.filter_by( _find_human_task_or_raise(process_instance_id, task_id)
process_instance_id=process_instance_id, task_id=task_id
).first()
if human_task is None:
raise (
ApiError(
error_code="no_human_task",
message=(
f"Cannot find a task to complete for task id '{task_id}' and"
f" process instance {process_instance_id}."
),
status_code=500,
)
)
form_schema_file_name = "" form_schema_file_name = ""
form_ui_schema_file_name = "" form_ui_schema_file_name = ""
@ -274,22 +280,7 @@ def task_show(process_instance_id: int, task_id: str) -> flask.wrappers.Response
if ui_form_contents: if ui_form_contents:
task.form_ui_schema = ui_form_contents task.form_ui_schema = ui_form_contents
if task.form_ui_schema is None: _munge_form_ui_schema_based_on_hidden_fields_in_task_data(task)
task.form_ui_schema = {}
if task.data and "form_ui_hidden_fields" in task.data:
hidden_fields = task.data["form_ui_hidden_fields"]
for hidden_field in hidden_fields:
hidden_field_parts = hidden_field.split(".")
relevant_depth_of_ui_schema = task.form_ui_schema
for ii, hidden_field_part in enumerate(hidden_field_parts):
if hidden_field_part not in relevant_depth_of_ui_schema:
relevant_depth_of_ui_schema[hidden_field_part] = {}
relevant_depth_of_ui_schema = relevant_depth_of_ui_schema[
hidden_field_part
]
if len(hidden_field_parts) == ii + 1:
relevant_depth_of_ui_schema["ui:widget"] = "hidden"
if task.properties and task.data and "instructionsForEndUser" in task.properties: if task.properties and task.data and "instructionsForEndUser" in task.properties:
if task.properties["instructionsForEndUser"]: if task.properties["instructionsForEndUser"]:
@ -367,19 +358,10 @@ def task_submit_shared(
if terminate_loop and spiff_task.is_looping(): if terminate_loop and spiff_task.is_looping():
spiff_task.terminate_loop() spiff_task.terminate_loop()
human_task = HumanTaskModel.query.filter_by( human_task = _find_human_task_or_raise(
process_instance_id=process_instance_id, task_id=task_id, completed=False process_instance_id=process_instance_id,
).first() task_id=task_id,
if human_task is None: only_tasks_that_can_be_completed=True,
raise (
ApiError(
error_code="no_human_task",
message=(
f"Cannot find a task to complete for task id '{task_id}' and"
f" process instance {process_instance_id}."
),
status_code=500,
)
) )
with sentry_sdk.start_span(op="task", description="complete_form_task"): with sentry_sdk.start_span(op="task", description="complete_form_task"):
@ -685,3 +667,32 @@ def _get_potential_owner_usernames(assigned_user: AliasedClass) -> Any:
).label("potential_owner_usernames") ).label("potential_owner_usernames")
return potential_owner_usernames_from_group_concat_or_similar return potential_owner_usernames_from_group_concat_or_similar
def _find_human_task_or_raise(
process_instance_id: int,
task_id: str,
only_tasks_that_can_be_completed: bool = False,
) -> HumanTaskModel:
if only_tasks_that_can_be_completed:
human_task_query = HumanTaskModel.query.filter_by(
process_instance_id=process_instance_id, task_id=task_id, completed=False
)
else:
human_task_query = HumanTaskModel.query.filter_by(
process_instance_id=process_instance_id, task_id=task_id
)
human_task: HumanTaskModel = human_task_query.first()
if human_task is None:
raise (
ApiError(
error_code="no_human_task",
message=(
f"Cannot find a task to complete for task id '{task_id}' and"
f" process instance {process_instance_id}."
),
status_code=500,
)
)
return human_task