support manual tasks in task_show, updated some bpmn classes to spiff classes, and do not rely on active task for task_show and only use spiff task w/ burnettk cullerton

This commit is contained in:
jasquat 2022-08-23 12:15:08 -04:00
parent f842a65317
commit e56dc8c82f
3 changed files with 34 additions and 271 deletions

View File

@ -7,8 +7,6 @@ import marshmallow
from marshmallow import Schema
from marshmallow_enum import EnumField # type: ignore
# from SpiffWorkflow.camunda.specs.UserTask import Form # type: ignore
class MultiInstanceType(enum.Enum):
"""MultiInstanceType."""

View File

@ -46,10 +46,6 @@ from spiffworkflow_backend.services.process_instance_service import (
from spiffworkflow_backend.services.process_model_service import ProcessModelService
from spiffworkflow_backend.services.spec_file_service import SpecFileService
# from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore
# from SpiffWorkflow.camunda.serializer.task_spec_converters import UserTaskConverter # type: ignore
# from SpiffWorkflow.dmn.serializer.task_spec_converters import BusinessRuleTaskConverter # type: ignore
process_api_blueprint = Blueprint("process_api", __name__)
@ -596,69 +592,53 @@ def process_instance_task_list(
def task_show(process_instance_id: int, task_id: str) -> flask.wrappers.Response:
"""Task_show."""
principal = find_principal_or_raise()
process_instance = find_process_instance_by_id_or_raise(process_instance_id)
process_model = get_process_model(
process_instance.process_model_identifier,
process_instance.process_group_identifier,
)
active_task_assigned_to_me = ActiveTaskModel.query.filter_by(
process_instance_id=process_instance_id,
task_id=task_id,
assigned_principal_id=principal.id,
).first()
form_schema_file_name = ""
form_ui_schema_file_name = ""
task = None
if active_task_assigned_to_me:
form_schema_file_name = active_task_assigned_to_me.form_file_name
form_ui_schema_file_name = active_task_assigned_to_me.ui_form_file_name
active_task_assigned_to_me.process_model_identifier = (
process_instance.process_model_identifier
)
task = ActiveTaskModel.to_task(active_task_assigned_to_me)
else:
spiff_task = get_spiff_task_from_process_instance(task_id, process_instance)
extensions = spiff_task.task_spec.extensions
spiff_task = get_spiff_task_from_process_instance(task_id, process_instance)
extensions = spiff_task.task_spec.extensions
if "properties" in extensions:
properties = extensions["properties"]
if "formJsonSchemaFilename" in properties:
form_schema_file_name = properties["formJsonSchemaFilename"]
if "formUiSchemaFilename" in properties:
form_ui_schema_file_name = properties["formUiSchemaFilename"]
task = ProcessInstanceService.spiff_task_to_api_task(spiff_task)
task.data = spiff_task.data
if "properties" in extensions:
properties = extensions["properties"]
if "formJsonSchemaFilename" in properties:
form_schema_file_name = properties["formJsonSchemaFilename"]
if "formUiSchemaFilename" in properties:
form_ui_schema_file_name = properties["formUiSchemaFilename"]
task = ProcessInstanceService.spiff_task_to_api_task(spiff_task)
task.data = spiff_task.data
if form_schema_file_name is None:
raise (
ApiError(
code="missing_form_file",
message=f"Cannot find a form file for process_instance_id: {process_instance_id}, task_id: {task_id}",
status_code=500,
if task.type == "UserTask":
if not form_schema_file_name:
raise (
ApiError(
code="missing_form_file",
message=f"Cannot find a form file for process_instance_id: {process_instance_id}, task_id: {task_id}",
status_code=500,
)
)
)
form_contents = prepare_form_data(
form_schema_file_name,
task.data,
process_model,
)
if form_contents:
task.form_schema = form_contents
if form_ui_schema_file_name:
ui_form_contents = prepare_form_data(
form_ui_schema_file_name,
form_contents = prepare_form_data(
form_schema_file_name,
task.data,
process_model,
)
if ui_form_contents:
task.form_ui_schema = ui_form_contents
if form_contents:
task.form_schema = form_contents
if form_ui_schema_file_name:
ui_form_contents = prepare_form_data(
form_ui_schema_file_name,
task.data,
process_model,
)
if ui_form_contents:
task.form_ui_schema = ui_form_contents
return make_response(jsonify(task), 200)

View File

@ -10,13 +10,12 @@ from flask_bpmn.api.api_error import ApiError
from flask_bpmn.models.db import db
from SpiffWorkflow.bpmn.specs.events import EndEvent # type: ignore
from SpiffWorkflow.bpmn.specs.events import StartEvent
from SpiffWorkflow.bpmn.specs.ManualTask import ManualTask # type: ignore
from SpiffWorkflow.bpmn.specs.ScriptTask import ScriptTask # type: ignore
from SpiffWorkflow.bpmn.specs.UserTask import UserTask # type: ignore
from SpiffWorkflow.camunda.specs.UserTask import EnumFormField # type: ignore
from SpiffWorkflow.dmn.specs.BusinessRuleTask import BusinessRuleTask # type: ignore
from SpiffWorkflow.specs import CancelTask # type: ignore
from SpiffWorkflow.specs import StartTask
from SpiffWorkflow.spiff.specs.manual_task import ManualTask # type: ignore
from SpiffWorkflow.spiff.specs.user_task import UserTask # type: ignore
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore
@ -443,218 +442,4 @@ class ProcessInstanceService:
properties=props,
)
# # Only process the form and documentation if requested.
# # The task should be in a completed or a ready state, and should
# # not be a previously completed MI Task.
# if add_docs_and_forms:
# task.data = spiff_task.data
# if (
# hasattr(spiff_task.task_spec, "form")
# and spiff_task.task_spec.form is not None
# ):
# task.form = spiff_task.task_spec.form
# for i, field in enumerate(task.form.fields):
# task.form.fields[i] = ProcessInstanceService.process_options(
# spiff_task, field
# )
# # If there is a default value, set it.
# # if field.id not in task.data and ProcessInstanceService.get_default_value(field, spiff_task) is not None:
# # task.data[field.id] = ProcessInstanceService.get_default_value(field, spiff_task)
# # task.documentation = ProcessInstanceService._process_documentation(spiff_task)
# task.documentation = (
# spiff_task.task_spec.documentation
# if hasattr(spiff_task.task_spec, "documentation")
# else None
# )
# All ready tasks should have a valid name, and this can be computed for
# some tasks, particularly multi-instance tasks that all have the same spec
# but need different labels.
# if spiff_task.state == TaskState.READY:
# task.properties = ProcessInstanceService._process_properties(spiff_task, props)
#
# task.title = ProcessInstanceService.__calculate_title(spiff_task)
# if task.properties and "clear_data" in task.properties:
# if task.form and task.properties["clear_data"] == "True":
# for i in range(len(task.form.fields)):
# task.data.pop(task.form.fields[i].id, None)
# # Pass help text through the Jinja parser
# if task.form and task.form.fields:
# for field in task.form.fields:
# if field.properties:
# for field_property in field.properties:
# if field_property.id == "help":
# jinja_text = JinjaService().get_content(
# field_property.value, task.data
# )
# field_property.value = jinja_text
return task
# @staticmethod
# def _process_properties(spiff_task, props):
# """Runs all the property values through the Jinja2 processor to inject data."""
# for k, v in props.items():
# try:
# props[k] = JinjaService.get_content(v, spiff_task.data)
# except jinja2.exceptions.TemplateError as ue:
# app.logger.error(
# f"Failed to process task property {str(ue)}", exc_info=True
# )
# return props
@staticmethod
def process_options(spiff_task: SpiffTask, field: EnumFormField) -> EnumFormField:
"""Process_options."""
if field.type != Task.FIELD_TYPE_ENUM:
return field
if hasattr(field, "options") and len(field.options) > 1:
return field
elif not (
field.has_property(Task.FIELD_PROP_VALUE_COLUMN)
or field.has_property(Task.FIELD_PROP_LABEL_COLUMN)
):
raise ApiError.from_task(
"invalid_enum",
f"For enumerations, you must include options, or a way to generate options from"
f" a spreadsheet or data set. Please set either a spreadsheet name or data name,"
f" along with the value and label columns to use from these sources. Valid params"
f" include: "
f"{Task.FIELD_PROP_SPREADSHEET_NAME}, "
f"{Task.FIELD_PROP_DATA_NAME}, "
f"{Task.FIELD_PROP_VALUE_COLUMN}, "
f"{Task.FIELD_PROP_LABEL_COLUMN}",
task=spiff_task,
)
if field.has_property(Task.FIELD_PROP_SPREADSHEET_NAME):
# lookup_model = LookupService.get_lookup_model(spiff_task, field)
# data = (
# db.session.query(LookupDataModel)
# .filter(LookupDataModel.lookup_file_model == lookup_model)
# .all()
# )
# for d in data:
# field.add_option(d.value, d.label)
...
elif field.has_property(Task.FIELD_PROP_DATA_NAME):
field.options = ProcessInstanceService.get_options_from_task_data(
spiff_task, field
)
return field
@staticmethod
def get_options_from_task_data(spiff_task: SpiffTask, field: EnumFormField) -> List:
"""Get_options_from_task_data."""
prop = field.get_property(Task.FIELD_PROP_DATA_NAME)
if prop not in spiff_task.data:
raise ApiError.from_task(
"invalid_enum",
f"For enumerations based on task data, task data must have "
f"a property called {prop}",
task=spiff_task,
)
# Get the enum options from the task data
data_model = spiff_task.data[prop]
value_column = field.get_property(Task.FIELD_PROP_VALUE_COLUMN)
label_column = field.get_property(Task.FIELD_PROP_LABEL_COLUMN)
items = data_model.items() if isinstance(data_model, dict) else data_model
options: List[Any] = []
for item in items:
if value_column not in item:
raise ApiError.from_task(
"invalid_enum",
f"The value column '{value_column}' does not exist for item {item}",
task=spiff_task,
)
if label_column not in item:
raise ApiError.from_task(
"invalid_enum",
f"The label column '{label_column}' does not exist for item {item}",
task=spiff_task,
)
# options.append(
# Box(
# {"id": item[value_column], "name": item[label_column], "data": item}
# )
# )
return options
# @staticmethod
# def _process_documentation(spiff_task):
# """Runs the given documentation string through the Jinja2 processor to inject data
# create loops, etc... - If a markdown file exists with the same name as the task id,
# it will use that file instead of the documentation."""
# documentation = (
# spiff_task.task_spec.documentation
# if hasattr(spiff_task.task_spec, "documentation")
# else ""
# )
#
# try:
# doc_file_name = spiff_task.task_spec.name + ".md"
#
# workflow_id = WorkflowService.workflow_id_from_spiff_task(spiff_task)
# workflow = (
# db.session.query(WorkflowModel)
# .filter(WorkflowModel.id == workflow_id)
# .first()
# )
# spec_service = WorkflowSpecService()
# data = SpecFileService.get_data(
# spec_service.get_spec(workflow.workflow_spec_id), doc_file_name
# )
# raw_doc = data.decode("utf-8")
# except ApiError:
# raw_doc = documentation
#
# if not raw_doc:
# return ""
#
# try:
# return JinjaService.get_content(raw_doc, spiff_task.data)
# except jinja2.exceptions.TemplateSyntaxError as tse:
# lines = tse.source.splitlines()
# error_line = ""
# if len(lines) >= tse.lineno - 1:
# error_line = tse.source.splitlines()[tse.lineno - 1]
# raise ApiError.from_task(
# code="template_error",
# message="Jinja Template Error: %s" % str(tse),
# task=spiff_task,
# line_number=tse.lineno,
# error_line=error_line,
# )
# except jinja2.exceptions.TemplateError as te:
# # Figure out the line number in the template that caused the error.
# cl, exc, tb = sys.exc_info()
# line_number = None
# error_line = None
# for frame_summary in traceback.extract_tb(tb):
# if frame_summary.filename == "<template>":
# line_number = frame_summary.lineno
# lines = documentation.splitlines()
# error_line = ""
# if len(lines) > line_number:
# error_line = lines[line_number - 1]
# raise ApiError.from_task(
# code="template_error",
# message="Jinja Template Error: %s" % str(te),
# task=spiff_task,
# line_number=line_number,
# error_line=error_line,
# )
# except TypeError as te:
# raise ApiError.from_task(
# code="template_error",
# message="Jinja Template Error: %s" % str(te),
# task=spiff_task,
# ) from te
# except Exception as e:
# # app.logger.error(str(e), exc_info=True)
# ...