diff --git a/src/spiffworkflow_backend/routes/tasks_controller.py b/src/spiffworkflow_backend/routes/tasks_controller.py index 086f7a45..72541ceb 100644 --- a/src/spiffworkflow_backend/routes/tasks_controller.py +++ b/src/spiffworkflow_backend/routes/tasks_controller.py @@ -170,6 +170,25 @@ def task_list_for_my_groups( ) +def _munge_form_ui_schema_based_on_hidden_fields_in_task_data(task: Task) -> None: + if task.form_ui_schema is None: + task.form_ui_schema = {} + + if task.data and "form_ui_hidden_fields" in task.data: + hidden_fields = task.data["form_ui_hidden_fields"] + for hidden_field in hidden_fields: + hidden_field_parts = hidden_field.split(".") + relevant_depth_of_ui_schema = task.form_ui_schema + for ii, hidden_field_part in enumerate(hidden_field_parts): + if hidden_field_part not in relevant_depth_of_ui_schema: + relevant_depth_of_ui_schema[hidden_field_part] = {} + relevant_depth_of_ui_schema = relevant_depth_of_ui_schema[ + hidden_field_part + ] + if len(hidden_field_parts) == ii + 1: + relevant_depth_of_ui_schema["ui:widget"] = "hidden" + + def task_show(process_instance_id: int, task_id: str) -> flask.wrappers.Response: """Task_show.""" process_instance = _find_process_instance_by_id_or_raise(process_instance_id) @@ -185,20 +204,7 @@ def task_show(process_instance_id: int, task_id: str) -> flask.wrappers.Response process_instance.process_model_identifier, ) - human_task = HumanTaskModel.query.filter_by( - process_instance_id=process_instance_id, task_id=task_id - ).first() - if human_task is None: - raise ( - ApiError( - error_code="no_human_task", - message=( - f"Cannot find a task to complete for task id '{task_id}' and" - f" process instance {process_instance_id}." - ), - status_code=500, - ) - ) + _find_human_task_or_raise(process_instance_id, task_id) form_schema_file_name = "" form_ui_schema_file_name = "" @@ -274,22 +280,7 @@ def task_show(process_instance_id: int, task_id: str) -> flask.wrappers.Response if ui_form_contents: task.form_ui_schema = ui_form_contents - if task.form_ui_schema is None: - task.form_ui_schema = {} - - if task.data and "form_ui_hidden_fields" in task.data: - hidden_fields = task.data["form_ui_hidden_fields"] - for hidden_field in hidden_fields: - hidden_field_parts = hidden_field.split(".") - relevant_depth_of_ui_schema = task.form_ui_schema - for ii, hidden_field_part in enumerate(hidden_field_parts): - if hidden_field_part not in relevant_depth_of_ui_schema: - relevant_depth_of_ui_schema[hidden_field_part] = {} - relevant_depth_of_ui_schema = relevant_depth_of_ui_schema[ - hidden_field_part - ] - if len(hidden_field_parts) == ii + 1: - relevant_depth_of_ui_schema["ui:widget"] = "hidden" + _munge_form_ui_schema_based_on_hidden_fields_in_task_data(task) if task.properties and task.data and "instructionsForEndUser" in task.properties: if task.properties["instructionsForEndUser"]: @@ -367,20 +358,11 @@ def task_submit_shared( if terminate_loop and spiff_task.is_looping(): spiff_task.terminate_loop() - human_task = HumanTaskModel.query.filter_by( - process_instance_id=process_instance_id, task_id=task_id, completed=False - ).first() - if human_task is None: - raise ( - ApiError( - error_code="no_human_task", - message=( - f"Cannot find a task to complete for task id '{task_id}' and" - f" process instance {process_instance_id}." - ), - status_code=500, - ) - ) + human_task = _find_human_task_or_raise( + process_instance_id=process_instance_id, + task_id=task_id, + only_tasks_that_can_be_completed=True, + ) with sentry_sdk.start_span(op="task", description="complete_form_task"): processor.lock_process_instance("Web") @@ -685,3 +667,32 @@ def _get_potential_owner_usernames(assigned_user: AliasedClass) -> Any: ).label("potential_owner_usernames") return potential_owner_usernames_from_group_concat_or_similar + + +def _find_human_task_or_raise( + process_instance_id: int, + task_id: str, + only_tasks_that_can_be_completed: bool = False, +) -> HumanTaskModel: + if only_tasks_that_can_be_completed: + human_task_query = HumanTaskModel.query.filter_by( + process_instance_id=process_instance_id, task_id=task_id, completed=False + ) + else: + human_task_query = HumanTaskModel.query.filter_by( + process_instance_id=process_instance_id, task_id=task_id + ) + + human_task: HumanTaskModel = human_task_query.first() + if human_task is None: + raise ( + ApiError( + error_code="no_human_task", + message=( + f"Cannot find a task to complete for task id '{task_id}' and" + f" process instance {process_instance_id}." + ), + status_code=500, + ) + ) + return human_task