cr-connect-workflow/crc/api/common.py

45 lines
1.9 KiB
Python

from crc import ma, app
class ApiError(Exception):
def __init__(self, code, message, status_code=400,
file_name="", task_id="", task_name="", tag=""):
self.status_code = status_code
self.code = code # a short consistent string describing the error.
self.message = message # A detailed message that provides more information.
self.task_id = task_id or "" # OPTIONAL: The id of the task in the BPMN Diagram.
self.task_name = task_name or "" # OPTIONAL: The name of the task in the BPMN Diagram.
self.file_name = file_name or "" # OPTIONAL: The file that caused the error.
self.tag = tag or "" # OPTIONAL: The XML Tag that caused the issue.
Exception.__init__(self, self.message)
@classmethod
def from_task(cls, code, message, task, status_code=400):
"""Constructs an API Error with details pulled from the current task."""
instance = cls(code, message, status_code=status_code)
instance.task_id = task.task_spec.name or ""
instance.task_name = task.task_spec.description or ""
instance.file_name = task.workflow.spec.file or ""
return instance
@classmethod
def from_task_spec(cls, code, message, task_spec, status_code=400):
"""Constructs an API Error with details pulled from the current task."""
instance = cls(code, message, status_code=status_code)
instance.task_id = task_spec.name or ""
instance.task_name = task_spec.description or ""
if task_spec._wf_spec:
instance.file_name = task_spec._wf_spec.file
return instance
class ApiErrorSchema(ma.Schema):
class Meta:
fields = ("code", "message", "workflow_name", "file_name", "task_name", "task_id")
@app.errorhandler(ApiError)
def handle_invalid_usage(error):
response = ApiErrorSchema().dump(error)
return response, error.status_code