added TaskModelException to replace WorkflowTaskException to create exceptions with db w/ burnettk

This commit is contained in:
jasquat 2023-04-24 10:52:30 -04:00
parent 3b48f9e4eb
commit e247c074ae
No known key found for this signature in database
3 changed files with 114 additions and 17 deletions

View File

@ -1,5 +1,7 @@
"""API Error functionality.""" """API Error functionality."""
from __future__ import annotations from __future__ import annotations
from typing import Optional
from spiffworkflow_backend.models.task import TaskException, TaskModel # noqa: F401
import json import json
from dataclasses import dataclass from dataclasses import dataclass
@ -36,18 +38,18 @@ class ApiError(Exception):
error_code: str error_code: str
message: str message: str
error_line: str = "" error_line: Optional[str] = ""
error_type: str = "" error_type: Optional[str] = ""
file_name: str = "" file_name: Optional[str] = ""
line_number: int = 0 line_number: Optional[int] = 0
offset: int = 0 offset: Optional[int] = 0
sentry_link: str | None = None sentry_link: Optional[str] = None
status_code: int = 400 status_code: Optional[int] = 400
tag: str = "" tag: Optional[str] = ""
task_data: dict | str | None = field(default_factory=dict) task_data: Optional[dict | str] = field(default_factory=dict)
task_id: str = "" task_id: Optional[str] = ""
task_name: str = "" task_name: Optional[str] = ""
task_trace: list | None = field(default_factory=list) task_trace: Optional[list] = field(default_factory=list)
def __str__(self) -> str: def __str__(self) -> str:
"""Instructions to print instance as a string.""" """Instructions to print instance as a string."""
@ -96,6 +98,41 @@ class ApiError(Exception):
return instance return instance
@classmethod
def from_task_model(
cls,
error_code: str,
message: str,
task_model: TaskModel,
status_code: Optional[int] = 400,
line_number: Optional[int] = 0,
offset: Optional[int] = 0,
error_type: Optional[str] = "",
error_line: Optional[str] = "",
task_trace: Optional[list] = None,
) -> ApiError:
"""Constructs an API Error with details pulled from the current task model."""
instance = cls(error_code, message, status_code=status_code)
task_definition = task_model.task_definition
instance.task_id = task_definition.bpmn_identifier
instance.task_name = task_definition.bpmn_name or ""
# TODO: find a way to get a file from task model
# instance.file_name = task.workflow.spec.file or ""
instance.line_number = line_number
instance.offset = offset
instance.error_type = error_type
instance.error_line = error_line
if task_trace:
instance.task_trace = task_trace
# TODO: needs implementation
# else:
# instance.task_trace = TaskException.get_task_trace(task)
# Assure that there is nothing in the json data that can't be serialized.
instance.task_data = ApiError.remove_unserializeable_from_dict(task_model.get_data())
return instance
@staticmethod @staticmethod
def remove_unserializeable_from_dict(my_dict: dict) -> dict: def remove_unserializeable_from_dict(my_dict: dict) -> dict:
"""Removes unserializeable from dict.""" """Removes unserializeable from dict."""
@ -157,6 +194,18 @@ class ApiError(Exception):
error_line=exp.error_line, error_line=exp.error_line,
task_trace=exp.task_trace, task_trace=exp.task_trace,
) )
elif isinstance(exp, TaskException):
# Note that WorkflowDataExceptions are also WorkflowTaskExceptions
return ApiError.from_task_model(
error_code,
message + ". " + str(exp),
exp.task_model,
line_number=exp.line_number,
offset=exp.offset,
error_type=exp.error_type,
error_line=exp.error_line,
task_trace=exp.task_trace,
)
elif isinstance(exp, WorkflowException) and exp.task_spec: elif isinstance(exp, WorkflowException) and exp.task_spec:
msg = message + ". " + str(exp) msg = message + ". " + str(exp)
return ApiError.from_task_spec(error_code, msg, exp.task_spec) return ApiError.from_task_spec(error_code, msg, exp.task_spec)

View File

@ -1,5 +1,6 @@
"""Task.""" """Task."""
import enum import enum
from SpiffWorkflow.exceptions import WorkflowException # type: ignore
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any from typing import Any
from typing import Optional from typing import Optional
@ -87,6 +88,52 @@ class TaskModel(SpiffworkflowBaseDBModel):
return JsonDataModel.find_data_dict_by_hash(self.json_data_hash) return JsonDataModel.find_data_dict_by_hash(self.json_data_hash)
class TaskModelException(Exception):
"""Copied from SpiffWorkflow.exceptions.WorkflowTaskException.
Reimplements the exception from SpiffWorkflow to not require a spiff_task.
"""
def __init__(self, error_msg: str, task_model: TaskModel, exception: Optional[Exception]=None,
line_number: Optional[int]=None, offset: Optional[int]=None, error_line: Optional[str]=None):
self.task_model = task_model
self.line_number = line_number
self.offset = offset
self.error_line = error_line
if exception:
self.error_type = exception.__class__.__name__
else:
self.error_type = "unknown"
if isinstance(exception, SyntaxError) and not line_number:
self.line_number = exception.lineno
self.offset = exception.offset
elif isinstance(exception, NameError):
self.add_note(WorkflowException.did_you_mean_from_name_error(exception, list(task_model.get_data().keys())))
# If encountered in a sub-workflow, this traces back up the stack,
# so we can tell how we got to this particular task, no matter how
# deeply nested in sub-workflows it is. Takes the form of:
# task-description (file-name)
self.task_trace = self.get_task_trace(task_model)
# TODO: implement this with db
@classmethod
def get_task_trace(cls, _task_model: TaskModel) -> list[str]:
return []
# task_bpmn_name = task_model.task_definition.bpmn_name
#
# task_trace = [f"{task.task_spec.description} ({task.workflow.spec.file})"]
# workflow = task.workflow
# while workflow != workflow.outer_workflow:
# caller = workflow.name
# workflow = workflow.outer_workflow
# task_trace.append(f"{workflow.spec.task_specs[caller].description} ({workflow.spec.file})")
# return task_trace
class Task: class Task:
"""Task.""" """Task."""

View File

@ -43,7 +43,7 @@ from spiffworkflow_backend.models.process_instance import (
) )
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType
from spiffworkflow_backend.models.process_model import ProcessModelInfo from spiffworkflow_backend.models.process_model import ProcessModelInfo
from spiffworkflow_backend.models.task import TaskModel # noqa: F401 from spiffworkflow_backend.models.task import TaskModelException, TaskModel # noqa: F401
from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.routes.process_api_blueprint import ( from spiffworkflow_backend.routes.process_api_blueprint import (
_find_principal_or_raise, _find_principal_or_raise,
@ -376,7 +376,7 @@ def _render_instructions_for_end_user(task_model: TaskModel) -> str:
instructions = _render_jinja_template(extensions["instructionsForEndUser"], task_model) instructions = _render_jinja_template(extensions["instructionsForEndUser"], task_model)
extensions["instructionsForEndUser"] = instructions extensions["instructionsForEndUser"] = instructions
return instructions return instructions
except WorkflowTaskException as wfe: except TaskModelException as wfe:
wfe.add_note("Failed to render instructions for end user.") wfe.add_note("Failed to render instructions for end user.")
raise ApiError.from_workflow_exception("instructions_error", str(wfe), exp=wfe) from wfe raise ApiError.from_workflow_exception("instructions_error", str(wfe), exp=wfe) from wfe
return "" return ""
@ -673,7 +673,7 @@ def _prepare_form_data(form_file: str, task_model: TaskModel, process_model: Pro
status_code=400, status_code=400,
) )
) from exception ) from exception
except WorkflowTaskException as wfe: except TaskModelException as wfe:
wfe.add_note(f"Error in Json Form File '{form_file}'") wfe.add_note(f"Error in Json Form File '{form_file}'")
api_error = ApiError.from_workflow_exception("instructions_error", str(wfe), exp=wfe) api_error = ApiError.from_workflow_exception("instructions_error", str(wfe), exp=wfe)
api_error.file_name = form_file api_error.file_name = form_file
@ -687,7 +687,8 @@ def _render_jinja_template(unprocessed_template: str, task_model: TaskModel) ->
template = jinja_environment.from_string(unprocessed_template) template = jinja_environment.from_string(unprocessed_template)
return template.render(**(task_model.data or {})) return template.render(**(task_model.data or {}))
except jinja2.exceptions.TemplateError as template_error: except jinja2.exceptions.TemplateError as template_error:
wfe = WorkflowTaskException(str(template_error), task=task_model, exception=template_error) import pdb; pdb.set_trace()
wfe = TaskModelException(str(template_error), task_model=task_model, exception=template_error)
if isinstance(template_error, TemplateSyntaxError): if isinstance(template_error, TemplateSyntaxError):
wfe.line_number = template_error.lineno wfe.line_number = template_error.lineno
wfe.error_line = template_error.source.split("\n")[template_error.lineno - 1] wfe.error_line = template_error.source.split("\n")[template_error.lineno - 1]
@ -695,7 +696,7 @@ def _render_jinja_template(unprocessed_template: str, task_model: TaskModel) ->
raise wfe from template_error raise wfe from template_error
except Exception as error: except Exception as error:
_type, _value, tb = exc_info() _type, _value, tb = exc_info()
wfe = WorkflowTaskException(str(error), task=task_model, exception=error) wfe = TaskModelException(str(error), task_model=task_model, exception=error)
while tb: while tb:
if tb.tb_frame.f_code.co_filename == "<template>": if tb.tb_frame.f_code.co_filename == "<template>":
wfe.line_number = tb.tb_lineno wfe.line_number = tb.tb_lineno