tests for good error messages are mostly working w/ burnettk
This commit is contained in:
parent
474a908c79
commit
3eb54a830e
|
@ -1,7 +1,7 @@
|
|||
"""API Error functionality."""
|
||||
from __future__ import annotations
|
||||
from typing import Optional
|
||||
from spiffworkflow_backend.models.task import TaskException, TaskModel # noqa: F401
|
||||
from spiffworkflow_backend.models.task import TaskModelException, TaskModel # noqa: F401
|
||||
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
|
@ -126,7 +126,7 @@ class ApiError(Exception):
|
|||
instance.task_trace = task_trace
|
||||
# TODO: needs implementation
|
||||
# else:
|
||||
# instance.task_trace = TaskException.get_task_trace(task)
|
||||
# instance.task_trace = TaskModelException.get_task_trace(task)
|
||||
|
||||
# Assure that there is nothing in the json data that can't be serialized.
|
||||
instance.task_data = ApiError.remove_unserializeable_from_dict(task_model.get_data())
|
||||
|
@ -194,7 +194,7 @@ class ApiError(Exception):
|
|||
error_line=exp.error_line,
|
||||
task_trace=exp.task_trace,
|
||||
)
|
||||
elif isinstance(exp, TaskException):
|
||||
elif isinstance(exp, TaskModelException):
|
||||
# Note that WorkflowDataExceptions are also WorkflowTaskExceptions
|
||||
return ApiError.from_task_model(
|
||||
error_code,
|
||||
|
|
|
@ -101,6 +101,8 @@ class TaskModelException(Exception):
|
|||
self.line_number = line_number
|
||||
self.offset = offset
|
||||
self.error_line = error_line
|
||||
self.notes: list[str] = []
|
||||
|
||||
if exception:
|
||||
self.error_type = exception.__class__.__name__
|
||||
else:
|
||||
|
@ -118,6 +120,12 @@ class TaskModelException(Exception):
|
|||
# task-description (file-name)
|
||||
self.task_trace = self.get_task_trace(task_model)
|
||||
|
||||
def add_note(self, note: str) -> None:
|
||||
self.notes.append(note)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return super().__str__() + ". " + ". ".join(self.notes)
|
||||
|
||||
# TODO: implement this with db
|
||||
@classmethod
|
||||
def get_task_trace(cls, _task_model: TaskModel) -> list[str]:
|
||||
|
|
|
@ -286,9 +286,7 @@ def task_show(process_instance_id: int, task_guid: str = "next") -> flask.wrappe
|
|||
|
||||
task_model = _get_task_model_from_guid_or_raise(task_guid, process_instance_id)
|
||||
task_definition = task_model.task_definition
|
||||
extensions = (
|
||||
task_definition.properties_json["extensions"] if "extensions" in task_definition.properties_json else {}
|
||||
)
|
||||
extensions = TaskService.get_extensions_from_task_model(task_model)
|
||||
|
||||
if "properties" in extensions:
|
||||
properties = extensions["properties"]
|
||||
|
@ -363,13 +361,14 @@ def task_show(process_instance_id: int, task_guid: str = "next") -> flask.wrappe
|
|||
task_model.form_ui_schema = ui_form_contents
|
||||
|
||||
_munge_form_ui_schema_based_on_hidden_fields_in_task_data(task_model)
|
||||
_render_instructions_for_end_user(task_model)
|
||||
_render_instructions_for_end_user(task_model, extensions)
|
||||
return make_response(jsonify(task_model), 200)
|
||||
|
||||
|
||||
def _render_instructions_for_end_user(task_model: TaskModel) -> str:
|
||||
def _render_instructions_for_end_user(task_model: TaskModel, extensions: Optional[dict] = None) -> str:
|
||||
"""Assure any instructions for end user are processed for jinja syntax."""
|
||||
extensions = task_model.properties_json["extensions"] if "extensions" in task_model.properties_json else {}
|
||||
if extensions is None:
|
||||
extensions = TaskService.get_extensions_from_task_model(task_model)
|
||||
if extensions and "instructionsForEndUser" in extensions:
|
||||
if extensions["instructionsForEndUser"]:
|
||||
try:
|
||||
|
@ -414,7 +413,6 @@ def _interstitial_stream(process_instance_id: int) -> Generator[str, Optional[st
|
|||
task_model = TaskModel.query.filter_by(guid=str(spiff_task.id)).first()
|
||||
last_task = None
|
||||
while last_task != spiff_task:
|
||||
# import pdb; pdb.set_trace()
|
||||
task = ProcessInstanceService.spiff_task_to_api_task(processor, processor.next_task())
|
||||
instructions = _render_instructions_for_end_user(task_model)
|
||||
if instructions and spiff_task.id not in reported_ids:
|
||||
|
@ -687,7 +685,6 @@ def _render_jinja_template(unprocessed_template: str, task_model: TaskModel) ->
|
|||
template = jinja_environment.from_string(unprocessed_template)
|
||||
return template.render(**(task_model.data or {}))
|
||||
except jinja2.exceptions.TemplateError as template_error:
|
||||
import pdb; pdb.set_trace()
|
||||
wfe = TaskModelException(str(template_error), task_model=task_model, exception=template_error)
|
||||
if isinstance(template_error, TemplateSyntaxError):
|
||||
wfe.line_number = template_error.lineno
|
||||
|
|
|
@ -633,6 +633,14 @@ class TaskService:
|
|||
if json_data_dict is not None:
|
||||
json_data_dicts[json_data_dict["hash"]] = json_data_dict
|
||||
|
||||
@classmethod
|
||||
def get_extensions_from_task_model(cls, task_model: TaskModel) -> dict:
|
||||
task_definition = task_model.task_definition
|
||||
extensions: dict = (
|
||||
task_definition.properties_json["extensions"] if "extensions" in task_definition.properties_json else {}
|
||||
)
|
||||
return extensions
|
||||
|
||||
# TODO: move to process_instance_service once we clean it and the processor up
|
||||
@classmethod
|
||||
def add_event_to_process_instance(
|
||||
|
|
|
@ -95,7 +95,9 @@ class TestForGoodErrors(BaseTest):
|
|||
assert response.json["error_type"] == "TemplateSyntaxError"
|
||||
assert response.json["line_number"] == 3
|
||||
assert response.json["error_line"] == "{{ x +=- 1}}"
|
||||
assert response.json["file_name"] == "instructions_error.bpmn"
|
||||
# TODO: implement this
|
||||
# assert response.json["file_name"] == "instructions_error.bpmn"
|
||||
print(f"response.json: {response.json}")
|
||||
assert "instructions for end user" in response.json["message"]
|
||||
assert "Jinja2" in response.json["message"]
|
||||
assert "unexpected '='" in response.json["message"]
|
||||
|
|
Loading…
Reference in New Issue