Merge pull request #119 from sartography/feature/dynamic-select-fields

dynamic select fields
This commit is contained in:
Kevin Burnett 2022-10-07 17:37:31 +00:00 committed by GitHub
commit 8efdf42318
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 225 additions and 58 deletions

View File

@ -7,6 +7,7 @@ import uuid
from typing import Any
from typing import Dict
from typing import Optional
from typing import TypedDict
from typing import Union
import connexion # type: ignore
@ -69,6 +70,22 @@ from spiffworkflow_backend.services.service_task_service import ServiceTaskServi
from spiffworkflow_backend.services.spec_file_service import SpecFileService
from spiffworkflow_backend.services.user_service import UserService
class TaskDataSelectOption(TypedDict):
"""TaskDataSelectOption."""
value: str
label: str
class ReactJsonSchemaSelectOption(TypedDict):
"""ReactJsonSchemaSelectOption."""
type: str
title: str
enum: list[str]
process_api_blueprint = Blueprint("process_api", __name__)
@ -918,9 +935,14 @@ def task_show(process_instance_id: int, task_id: str) -> flask.wrappers.Response
task.data,
process_model_with_form,
)
# form_contents is a str
form_dict = json.loads(form_contents)
if task.data:
_update_form_schema_with_task_data_as_needed(form_dict, task.data)
if form_contents:
task.form_schema = form_contents
task.form_schema = form_dict
if form_ui_schema_file_name:
ui_form_contents = prepare_form_data(
@ -1005,9 +1027,13 @@ def script_unit_test_create(
process_group_id: str, process_model_id: str, body: Dict[str, Union[str, bool, int]]
) -> flask.wrappers.Response:
"""Script_unit_test_run."""
bpmn_task_identifier = get_required_parameter_or_raise("bpmn_task_identifier", body)
input_json = get_required_parameter_or_raise("input_json", body)
expected_output_json = get_required_parameter_or_raise("expected_output_json", body)
bpmn_task_identifier = _get_required_parameter_or_raise(
"bpmn_task_identifier", body
)
input_json = _get_required_parameter_or_raise("input_json", body)
expected_output_json = _get_required_parameter_or_raise(
"expected_output_json", body
)
process_model = get_process_model(process_model_id, process_group_id)
file = SpecFileService.get_files(process_model, process_model.primary_file_name)[0]
@ -1094,9 +1120,11 @@ def script_unit_test_run(
# FIXME: We should probably clear this somewhere else but this works
current_app.config["THREAD_LOCAL_DATA"].process_instance_id = None
python_script = get_required_parameter_or_raise("python_script", body)
input_json = get_required_parameter_or_raise("input_json", body)
expected_output_json = get_required_parameter_or_raise("expected_output_json", body)
python_script = _get_required_parameter_or_raise("python_script", body)
input_json = _get_required_parameter_or_raise("input_json", body)
expected_output_json = _get_required_parameter_or_raise(
"expected_output_json", body
)
result = ScriptUnitTestRunner.run_with_script_and_pre_post_contexts(
python_script, input_json, expected_output_json
@ -1316,7 +1344,7 @@ def delete_allowed_process_path(allowed_process_path_id: int) -> Response:
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
def get_required_parameter_or_raise(parameter: str, post_body: dict[str, Any]) -> Any:
def _get_required_parameter_or_raise(parameter: str, post_body: dict[str, Any]) -> Any:
"""Get_required_parameter_or_raise."""
return_value = None
if parameter in post_body:
@ -1332,3 +1360,61 @@ def get_required_parameter_or_raise(parameter: str, post_body: dict[str, Any]) -
)
return return_value
# originally from: https://bitcoden.com/answers/python-nested-dictionary-update-value-where-any-nested-key-matches
def _update_form_schema_with_task_data_as_needed(
in_dict: dict, task_data: dict
) -> None:
"""Update_nested."""
for k, value in in_dict.items():
if "anyOf" == k:
# value will look like the array on the right of "anyOf": ["options_from_task_data_var:awesome_options"]
if isinstance(value, list):
if len(value) == 1:
first_element_in_value_list = value[0]
if isinstance(first_element_in_value_list, str):
if first_element_in_value_list.startswith(
"options_from_task_data_var:"
):
task_data_var = first_element_in_value_list.replace(
"options_from_task_data_var:", ""
)
if task_data_var not in task_data:
raise (
ApiError(
code="missing_task_data_var",
message=f"Task data is missing variable: {task_data_var}",
status_code=500,
)
)
select_options_from_task_data = task_data.get(task_data_var)
if isinstance(select_options_from_task_data, list):
if all(
"value" in d and "label" in d
for d in select_options_from_task_data
):
def map_function(
task_data_select_option: TaskDataSelectOption,
) -> ReactJsonSchemaSelectOption:
"""Map_function."""
return {
"type": "string",
"enum": [task_data_select_option["value"]],
"title": task_data_select_option["label"],
}
options_for_react_json_schema_form = list(
map(map_function, select_options_from_task_data)
)
in_dict[k] = options_for_react_json_schema_form
elif isinstance(value, dict):
_update_form_schema_with_task_data_as_needed(value, task_data)
elif isinstance(value, list):
for o in value:
if isinstance(o, dict):
_update_form_schema_with_task_data_as_needed(o, task_data)

View File

@ -0,0 +1,18 @@
{
"definitions": {
"Color": {
"title": "Color",
"type": "string",
"anyOf": ["options_from_task_data_var:awesome_color_options"]
}
},
"title": "Select Color",
"type": "object",
"required": ["selectedColor"],
"properties": {
"selectedColor": {
"$ref": "#/definitions/Color",
"title": "Select color"
}
}
}

View File

@ -0,0 +1,57 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:spiffworkflow="http://spiffworkflow.org/bpmn/schema/1.0/core" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" id="Definitions_96f6665" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.0.0-dev">
<bpmn:process id="Proccess_0e253c6" isExecutable="true">
<bpmn:startEvent id="StartEvent_1">
<bpmn:outgoing>Flow_1my9ag5</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_1my9ag5" sourceRef="StartEvent_1" targetRef="Activity_1qtnye8" />
<bpmn:sequenceFlow id="Flow_0b04rbg" sourceRef="Activity_1qtnye8" targetRef="Activity_1gqykqt" />
<bpmn:endEvent id="Event_0pchbgr">
<bpmn:incoming>Flow_13mlau2</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_13mlau2" sourceRef="Activity_1gqykqt" targetRef="Event_0pchbgr" />
<bpmn:scriptTask id="Activity_1qtnye8" name="set color options" scriptFormat="python">
<bpmn:incoming>Flow_1my9ag5</bpmn:incoming>
<bpmn:outgoing>Flow_0b04rbg</bpmn:outgoing>
<bpmn:script>awesome_color_options = [{"value": "blue", "label": "Blue"}, {"value": "green", "label": "Green"}]</bpmn:script>
</bpmn:scriptTask>
<bpmn:userTask id="Activity_1gqykqt" name="ask user for color">
<bpmn:extensionElements>
<spiffworkflow:properties>
<spiffworkflow:property name="formJsonSchemaFilename" value="color_question.json" />
</spiffworkflow:properties>
</bpmn:extensionElements>
<bpmn:incoming>Flow_0b04rbg</bpmn:incoming>
<bpmn:outgoing>Flow_13mlau2</bpmn:outgoing>
</bpmn:userTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Proccess_0e253c6">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_0pchbgr_di" bpmnElement="Event_0pchbgr">
<dc:Bounds x="592" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1x1c7bj_di" bpmnElement="Activity_1qtnye8">
<dc:Bounds x="270" y="137" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0w50892_di" bpmnElement="Activity_1gqykqt">
<dc:Bounds x="430" y="137" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_1my9ag5_di" bpmnElement="Flow_1my9ag5">
<di:waypoint x="215" y="177" />
<di:waypoint x="270" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0b04rbg_di" bpmnElement="Flow_0b04rbg">
<di:waypoint x="370" y="177" />
<di:waypoint x="430" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_13mlau2_di" bpmnElement="Flow_13mlau2">
<di:waypoint x="530" y="177" />
<di:waypoint x="592" y="177" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>

View File

@ -0,0 +1,17 @@
{
"description": "Dynamic Enum Select Fields",
"display_name": "Dynamic Enum Select Fields",
"display_order": 10,
"exception_notification_addresses": [],
"fault_or_suspend_on_exception": "fault",
"files": [],
"id": "dynamic-enum-select-fields",
"is_master_spec": false,
"is_review": false,
"libraries": [],
"library": false,
"primary_file_name": "dynamic_enums_ask_for_color.bpmn",
"primary_process_id": "Proccess_0e253c6",
"process_group_id": "category_number_one",
"standalone": false
}

View File

@ -1,50 +0,0 @@
"""Test Api Blueprint."""
# TODO: possibly get this test working again
# import json
# from typing import Union
#
# from flask.testing import FlaskClient
# from flask_bpmn.models.db import db
#
# from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
#
#
# def test_user_can_be_created_and_deleted(client: FlaskClient) -> None:
# """Test_user_can_be_created_and_deleted."""
# process_instance = ProcessInstanceModel.query.filter().first()
# if process_instance is not None:
# db.session.delete(process_instance)
# db.session.commit()
#
# last_response = None
# tasks = [
# {"task_identifier": "1", "answer": {"Product Name": "G", "Quantity": "2"}},
# {"task_identifier": "1", "answer": {"Sleeve Type": "Short"}},
# {"task_identifier": "1", "answer": {"Continue shopping?": "N"}},
# {"task_identifier": "1", "answer": {"Shipping Method": "Overnight"}},
# {"task_identifier": "1", "answer": {"Shipping Address": "Somewhere"}},
# {"task_identifier": "1", "answer": {"Place Order": "Y"}},
# {"task_identifier": "1", "answer": {"Card Number": "MY_CARD"}},
# {"task_identifier": "2", "answer": {"Was the customer charged?": "Y"}},
# {"task_identifier": "1", "answer": {"Was the product available?": "Y"}},
# {"task_identifier": "1", "answer": {"Was the order shipped?": "Y"}},
# ]
# for task in tasks:
# run_task(client, task, last_response)
#
# process_instance = ProcessInstanceModel.query.filter().first()
# if process_instance is not None:
# db.session.delete(process_instance)
# db.session.commit()
#
#
# def run_task(
# client: FlaskClient, request_body: dict, last_response: Union[None, str]
# ) -> None:
# """Run_task."""
# response = client.post(
# "/run_process",
# content_type="application/json",
# data=json.dumps(request_body),
# )
# assert response.status_code == 200

View File

@ -14,6 +14,7 @@ from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.exceptions.process_entity_not_found_error import (
ProcessEntityNotFoundError,
)
from spiffworkflow_backend.models.active_task import ActiveTaskModel
from spiffworkflow_backend.models.process_group import ProcessGroup
from spiffworkflow_backend.models.process_group import ProcessGroupSchema
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
@ -956,6 +957,44 @@ class TestProcessApi(BaseTest):
assert task_event.user_id == user.id
# TODO: When user tasks work, we need to add some more assertions for action, task_state, etc.
def test_task_show(
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
) -> None:
"""Test_process_instance_run_user_task."""
process_group_id = "my_process_group"
process_model_id = "dynamic_enum_select_fields"
user = self.find_or_create_user()
headers = self.logged_in_headers(user)
response = self.create_process_instance(
client, process_group_id, process_model_id, headers
)
assert response.json is not None
process_instance_id = response.json["id"]
response = client.post(
f"/v1.0/process-models/{process_group_id}/{process_model_id}/process-instances/{process_instance_id}/run",
headers=self.logged_in_headers(user),
)
assert response.json is not None
active_tasks = (
db.session.query(ActiveTaskModel)
.filter(ActiveTaskModel.process_instance_id == process_instance_id)
.all()
)
assert len(active_tasks) == 1
active_task = active_tasks[0]
response = client.get(
f"/v1.0/tasks/{process_instance_id}/{active_task.task_id}",
headers=self.logged_in_headers(user),
)
assert response.json is not None
assert (
response.json["form_schema"]["definitions"]["Color"]["anyOf"][1]["title"]
== "Green"
)
def test_process_instance_list_with_default_list(
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
) -> None: