refactor some stuff in task_show to separate functions
This commit is contained in:
parent
408759d122
commit
53d99c49d1
|
@ -170,6 +170,25 @@ def task_list_for_my_groups(
|
|||
)
|
||||
|
||||
|
||||
def _munge_form_ui_schema_based_on_hidden_fields_in_task_data(task: Task) -> None:
|
||||
if task.form_ui_schema is None:
|
||||
task.form_ui_schema = {}
|
||||
|
||||
if task.data and "form_ui_hidden_fields" in task.data:
|
||||
hidden_fields = task.data["form_ui_hidden_fields"]
|
||||
for hidden_field in hidden_fields:
|
||||
hidden_field_parts = hidden_field.split(".")
|
||||
relevant_depth_of_ui_schema = task.form_ui_schema
|
||||
for ii, hidden_field_part in enumerate(hidden_field_parts):
|
||||
if hidden_field_part not in relevant_depth_of_ui_schema:
|
||||
relevant_depth_of_ui_schema[hidden_field_part] = {}
|
||||
relevant_depth_of_ui_schema = relevant_depth_of_ui_schema[
|
||||
hidden_field_part
|
||||
]
|
||||
if len(hidden_field_parts) == ii + 1:
|
||||
relevant_depth_of_ui_schema["ui:widget"] = "hidden"
|
||||
|
||||
|
||||
def task_show(process_instance_id: int, task_id: str) -> flask.wrappers.Response:
|
||||
"""Task_show."""
|
||||
process_instance = _find_process_instance_by_id_or_raise(process_instance_id)
|
||||
|
@ -185,20 +204,7 @@ def task_show(process_instance_id: int, task_id: str) -> flask.wrappers.Response
|
|||
process_instance.process_model_identifier,
|
||||
)
|
||||
|
||||
human_task = HumanTaskModel.query.filter_by(
|
||||
process_instance_id=process_instance_id, task_id=task_id
|
||||
).first()
|
||||
if human_task is None:
|
||||
raise (
|
||||
ApiError(
|
||||
error_code="no_human_task",
|
||||
message=(
|
||||
f"Cannot find a task to complete for task id '{task_id}' and"
|
||||
f" process instance {process_instance_id}."
|
||||
),
|
||||
status_code=500,
|
||||
)
|
||||
)
|
||||
_find_human_task_or_raise(process_instance_id, task_id)
|
||||
|
||||
form_schema_file_name = ""
|
||||
form_ui_schema_file_name = ""
|
||||
|
@ -274,22 +280,7 @@ def task_show(process_instance_id: int, task_id: str) -> flask.wrappers.Response
|
|||
if ui_form_contents:
|
||||
task.form_ui_schema = ui_form_contents
|
||||
|
||||
if task.form_ui_schema is None:
|
||||
task.form_ui_schema = {}
|
||||
|
||||
if task.data and "form_ui_hidden_fields" in task.data:
|
||||
hidden_fields = task.data["form_ui_hidden_fields"]
|
||||
for hidden_field in hidden_fields:
|
||||
hidden_field_parts = hidden_field.split(".")
|
||||
relevant_depth_of_ui_schema = task.form_ui_schema
|
||||
for ii, hidden_field_part in enumerate(hidden_field_parts):
|
||||
if hidden_field_part not in relevant_depth_of_ui_schema:
|
||||
relevant_depth_of_ui_schema[hidden_field_part] = {}
|
||||
relevant_depth_of_ui_schema = relevant_depth_of_ui_schema[
|
||||
hidden_field_part
|
||||
]
|
||||
if len(hidden_field_parts) == ii + 1:
|
||||
relevant_depth_of_ui_schema["ui:widget"] = "hidden"
|
||||
_munge_form_ui_schema_based_on_hidden_fields_in_task_data(task)
|
||||
|
||||
if task.properties and task.data and "instructionsForEndUser" in task.properties:
|
||||
if task.properties["instructionsForEndUser"]:
|
||||
|
@ -367,19 +358,10 @@ def task_submit_shared(
|
|||
if terminate_loop and spiff_task.is_looping():
|
||||
spiff_task.terminate_loop()
|
||||
|
||||
human_task = HumanTaskModel.query.filter_by(
|
||||
process_instance_id=process_instance_id, task_id=task_id, completed=False
|
||||
).first()
|
||||
if human_task is None:
|
||||
raise (
|
||||
ApiError(
|
||||
error_code="no_human_task",
|
||||
message=(
|
||||
f"Cannot find a task to complete for task id '{task_id}' and"
|
||||
f" process instance {process_instance_id}."
|
||||
),
|
||||
status_code=500,
|
||||
)
|
||||
human_task = _find_human_task_or_raise(
|
||||
process_instance_id=process_instance_id,
|
||||
task_id=task_id,
|
||||
only_tasks_that_can_be_completed=True,
|
||||
)
|
||||
|
||||
with sentry_sdk.start_span(op="task", description="complete_form_task"):
|
||||
|
@ -685,3 +667,32 @@ def _get_potential_owner_usernames(assigned_user: AliasedClass) -> Any:
|
|||
).label("potential_owner_usernames")
|
||||
|
||||
return potential_owner_usernames_from_group_concat_or_similar
|
||||
|
||||
|
||||
def _find_human_task_or_raise(
|
||||
process_instance_id: int,
|
||||
task_id: str,
|
||||
only_tasks_that_can_be_completed: bool = False,
|
||||
) -> HumanTaskModel:
|
||||
if only_tasks_that_can_be_completed:
|
||||
human_task_query = HumanTaskModel.query.filter_by(
|
||||
process_instance_id=process_instance_id, task_id=task_id, completed=False
|
||||
)
|
||||
else:
|
||||
human_task_query = HumanTaskModel.query.filter_by(
|
||||
process_instance_id=process_instance_id, task_id=task_id
|
||||
)
|
||||
|
||||
human_task: HumanTaskModel = human_task_query.first()
|
||||
if human_task is None:
|
||||
raise (
|
||||
ApiError(
|
||||
error_code="no_human_task",
|
||||
message=(
|
||||
f"Cannot find a task to complete for task id '{task_id}' and"
|
||||
f" process instance {process_instance_id}."
|
||||
),
|
||||
status_code=500,
|
||||
)
|
||||
)
|
||||
return human_task
|
||||
|
|
Loading…
Reference in New Issue