Merge branch 'main' of github.com:sartography/spiff-arena
This commit is contained in:
commit
a976674eec
|
@ -77,3 +77,7 @@ SPIFF_DATABASE_TYPE = environ.get(
|
|||
SPIFFWORKFLOW_BACKEND_DATABASE_URI = environ.get(
|
||||
"SPIFFWORKFLOW_BACKEND_DATABASE_URI", default=None
|
||||
)
|
||||
SYSTEM_NOTIFICATION_PROCESS_MODEL_MESSAGE_ID = environ.get(
|
||||
"SYSTEM_NOTIFICATION_PROCESS_MODEL_MESSAGE_ID",
|
||||
default="Message_SystemMessageNotification",
|
||||
)
|
||||
|
|
|
@ -56,6 +56,8 @@ def process_model_create(
|
|||
"primary_process_id",
|
||||
"description",
|
||||
"metadata_extraction_paths",
|
||||
"fault_or_suspend_on_exception",
|
||||
"exception_notification_addresses",
|
||||
]
|
||||
body_filtered = {
|
||||
include_item: body[include_item]
|
||||
|
@ -108,6 +110,8 @@ def process_model_update(
|
|||
"primary_process_id",
|
||||
"description",
|
||||
"metadata_extraction_paths",
|
||||
"fault_or_suspend_on_exception",
|
||||
"exception_notification_addresses",
|
||||
]
|
||||
body_filtered = {
|
||||
include_item: body[include_item]
|
||||
|
|
|
@ -1,15 +1,22 @@
|
|||
"""Error_handling_service."""
|
||||
from typing import Any
|
||||
from typing import List
|
||||
import json
|
||||
from typing import Union
|
||||
|
||||
from flask import current_app
|
||||
from flask import g
|
||||
from flask.wrappers import Response
|
||||
from flask_bpmn.api.api_error import ApiError
|
||||
from flask_bpmn.models.db import db
|
||||
|
||||
from spiffworkflow_backend.models.message_model import MessageModel
|
||||
from spiffworkflow_backend.models.message_triggerable_process_model import (
|
||||
MessageTriggerableProcessModel,
|
||||
)
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModelSchema
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
|
||||
from spiffworkflow_backend.services.email_service import EmailService
|
||||
from spiffworkflow_backend.models.process_model import ProcessModelInfo
|
||||
from spiffworkflow_backend.services.message_service import MessageService
|
||||
from spiffworkflow_backend.services.process_instance_processor import (
|
||||
ProcessInstanceProcessor,
|
||||
)
|
||||
|
@ -38,6 +45,7 @@ class ErrorHandlingService:
|
|||
process_model = ProcessModelService.get_process_model(
|
||||
_processor.process_model_identifier
|
||||
)
|
||||
# First, suspend or fault the instance
|
||||
if process_model.fault_or_suspend_on_exception == "suspend":
|
||||
self.set_instance_status(
|
||||
_processor.process_instance_model.id,
|
||||
|
@ -50,57 +58,93 @@ class ErrorHandlingService:
|
|||
ProcessInstanceStatus.error.value,
|
||||
)
|
||||
|
||||
# Second, call the System Notification Process
|
||||
# Note that this isn't the best way to do this.
|
||||
# The configs are all in the model.
|
||||
# Maybe we can move some of this to the notification process, or dmn tables.
|
||||
if len(process_model.exception_notification_addresses) > 0:
|
||||
try:
|
||||
# some notification method (waku?)
|
||||
self.handle_email_notification(
|
||||
_processor, _error, process_model.exception_notification_addresses
|
||||
)
|
||||
self.handle_system_notification(_error, process_model)
|
||||
except Exception as e:
|
||||
# hmm... what to do if a notification method fails. Probably log, at least
|
||||
current_app.logger.error(e)
|
||||
|
||||
@staticmethod
|
||||
def hanle_sentry_notification(_error: ApiError, _recipients: List) -> None:
|
||||
"""SentryHandler."""
|
||||
...
|
||||
|
||||
@staticmethod
|
||||
def handle_email_notification(
|
||||
processor: ProcessInstanceProcessor,
|
||||
error: Union[ApiError, Exception],
|
||||
recipients: List,
|
||||
) -> None:
|
||||
"""EmailHandler."""
|
||||
subject = "Unexpected error in app"
|
||||
if isinstance(error, ApiError):
|
||||
content = f"{error.message}"
|
||||
else:
|
||||
content = str(error)
|
||||
content_html = content
|
||||
|
||||
EmailService.add_email(
|
||||
subject,
|
||||
"sender@company.com",
|
||||
recipients,
|
||||
content,
|
||||
content_html,
|
||||
cc=None,
|
||||
bcc=None,
|
||||
reply_to=None,
|
||||
attachment_files=None,
|
||||
def handle_system_notification(
|
||||
error: Union[ApiError, Exception], process_model: ProcessModelInfo
|
||||
) -> Response:
|
||||
"""Handle_system_notification."""
|
||||
recipients = process_model.exception_notification_addresses
|
||||
message_text = (
|
||||
f"There was an exception running process {process_model.id}.\nOriginal"
|
||||
f" Error:\n{error.__repr__()}"
|
||||
)
|
||||
message_payload = {"message_text": message_text, "recipients": recipients}
|
||||
message_identifier = current_app.config[
|
||||
"SYSTEM_NOTIFICATION_PROCESS_MODEL_MESSAGE_ID"
|
||||
]
|
||||
message_model = MessageModel.query.filter_by(
|
||||
identifier=message_identifier
|
||||
).first()
|
||||
message_triggerable_process_model = (
|
||||
MessageTriggerableProcessModel.query.filter_by(
|
||||
message_model_id=message_model.id
|
||||
).first()
|
||||
)
|
||||
process_instance = MessageService.process_message_triggerable_process_model(
|
||||
message_triggerable_process_model,
|
||||
message_identifier,
|
||||
message_payload,
|
||||
g.user,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def handle_waku_notification(_error: ApiError, _recipients: List) -> Any:
|
||||
"""WakuHandler."""
|
||||
# class WakuMessage:
|
||||
# """WakuMessage."""
|
||||
#
|
||||
# payload: str
|
||||
# contentTopic: str # Optional
|
||||
# version: int # Optional
|
||||
# timestamp: int # Optional
|
||||
return Response(
|
||||
json.dumps(ProcessInstanceModelSchema().dump(process_instance)),
|
||||
status=200,
|
||||
mimetype="application/json",
|
||||
)
|
||||
|
||||
# @staticmethod
|
||||
# def handle_sentry_notification(_error: ApiError, _recipients: List) -> None:
|
||||
# """SentryHandler."""
|
||||
# ...
|
||||
#
|
||||
# @staticmethod
|
||||
# def handle_email_notification(
|
||||
# processor: ProcessInstanceProcessor,
|
||||
# error: Union[ApiError, Exception],
|
||||
# recipients: List,
|
||||
# ) -> None:
|
||||
# """EmailHandler."""
|
||||
# subject = "Unexpected error in app"
|
||||
# if isinstance(error, ApiError):
|
||||
# content = f"{error.message}"
|
||||
# else:
|
||||
# content = str(error)
|
||||
# content_html = content
|
||||
#
|
||||
# EmailService.add_email(
|
||||
# subject,
|
||||
# "sender@company.com",
|
||||
# recipients,
|
||||
# content,
|
||||
# content_html,
|
||||
# cc=None,
|
||||
# bcc=None,
|
||||
# reply_to=None,
|
||||
# attachment_files=None,
|
||||
# )
|
||||
#
|
||||
# @staticmethod
|
||||
# def handle_waku_notification(_error: ApiError, _recipients: List) -> Any:
|
||||
# """WakuHandler."""
|
||||
# # class WakuMessage:
|
||||
# # """WakuMessage."""
|
||||
# #
|
||||
# # payload: str
|
||||
# # contentTopic: str # Optional
|
||||
# # version: int # Optional
|
||||
# # timestamp: int # Optional
|
||||
|
||||
|
||||
class FailingService:
|
||||
|
|
|
@ -1307,8 +1307,34 @@ class ProcessInstanceProcessor:
|
|||
except WorkflowTaskExecException as we:
|
||||
raise ApiError.from_workflow_exception("task_error", str(we), we) from we
|
||||
|
||||
def user_defined_task_data(self, task_data: dict) -> dict:
|
||||
"""UserDefinedTaskData."""
|
||||
return {k: v for k, v in task_data.items() if k != "current_user"}
|
||||
|
||||
def check_task_data_size(self) -> None:
|
||||
"""CheckTaskDataSize."""
|
||||
tasks_to_check = self.bpmn_process_instance.get_tasks(TaskState.FINISHED_MASK)
|
||||
task_data = [self.user_defined_task_data(task.data) for task in tasks_to_check]
|
||||
task_data_to_check = list(filter(len, task_data))
|
||||
|
||||
try:
|
||||
task_data_len = len(json.dumps(task_data_to_check))
|
||||
except Exception:
|
||||
task_data_len = 0
|
||||
|
||||
task_data_limit = 1024**2
|
||||
|
||||
if task_data_len > task_data_limit:
|
||||
raise (
|
||||
ApiError(
|
||||
error_code="task_data_size_exceeded",
|
||||
message=f"Maximum task data size of {task_data_limit} exceeded.",
|
||||
)
|
||||
)
|
||||
|
||||
def serialize(self) -> str:
|
||||
"""Serialize."""
|
||||
self.check_task_data_size()
|
||||
return self._serializer.serialize_json(self.bpmn_process_instance) # type: ignore
|
||||
|
||||
def next_user_tasks(self) -> list[SpiffTask]:
|
||||
|
|
|
@ -2160,59 +2160,10 @@ class TestProcessApi(BaseTest):
|
|||
assert process is not None
|
||||
assert process.status == "suspended"
|
||||
|
||||
def test_error_handler_with_email(
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test_error_handler."""
|
||||
process_group_id = "data"
|
||||
process_model_id = "error"
|
||||
bpmn_file_name = "error.bpmn"
|
||||
bpmn_file_location = "error"
|
||||
process_model_identifier = self.create_group_and_model_with_bpmn(
|
||||
client,
|
||||
with_super_admin_user,
|
||||
process_group_id=process_group_id,
|
||||
process_model_id=process_model_id,
|
||||
bpmn_file_name=bpmn_file_name,
|
||||
bpmn_file_location=bpmn_file_location,
|
||||
)
|
||||
|
||||
process_instance_id = self.setup_testing_instance(
|
||||
client, process_model_identifier, with_super_admin_user
|
||||
)
|
||||
|
||||
process_model = ProcessModelService.get_process_model(process_model_identifier)
|
||||
ProcessModelService.update_process_model(
|
||||
process_model,
|
||||
{"exception_notification_addresses": ["with_super_admin_user@example.com"]},
|
||||
)
|
||||
|
||||
mail = app.config["MAIL_APP"]
|
||||
with mail.record_messages() as outbox:
|
||||
response = client.post(
|
||||
f"/v1.0/process-instances/{self.modify_process_identifier_for_path_param(process_model_identifier)}/{process_instance_id}/run",
|
||||
headers=self.logged_in_headers(with_super_admin_user),
|
||||
)
|
||||
assert response.status_code == 400
|
||||
assert len(outbox) == 1
|
||||
message = outbox[0]
|
||||
assert message.subject == "Unexpected error in app"
|
||||
assert (
|
||||
message.body == 'TypeError:can only concatenate str (not "int") to str'
|
||||
)
|
||||
assert message.recipients == process_model.exception_notification_addresses
|
||||
|
||||
process = (
|
||||
db.session.query(ProcessInstanceModel)
|
||||
.filter(ProcessInstanceModel.id == process_instance_id)
|
||||
.first()
|
||||
)
|
||||
assert process is not None
|
||||
assert process.status == "error"
|
||||
def test_error_handler_system_notification(self) -> None:
|
||||
"""Test_error_handler_system_notification."""
|
||||
# TODO: make sure the system notification process is run on exceptions
|
||||
...
|
||||
|
||||
def test_task_data_is_set_even_if_process_instance_errors(
|
||||
self,
|
||||
|
|
|
@ -8,6 +8,8 @@ import {
|
|||
TextInput,
|
||||
Grid,
|
||||
Column,
|
||||
Select,
|
||||
SelectItem,
|
||||
// @ts-ignore
|
||||
} from '@carbon/react';
|
||||
// @ts-ignore
|
||||
|
@ -76,6 +78,9 @@ export default function ProcessModelForm({
|
|||
display_name: processModel.display_name,
|
||||
description: processModel.description,
|
||||
metadata_extraction_paths: processModel.metadata_extraction_paths,
|
||||
fault_or_suspend_on_exception: processModel.fault_or_suspend_on_exception,
|
||||
exception_notification_addresses:
|
||||
processModel.exception_notification_addresses,
|
||||
};
|
||||
if (mode === 'new') {
|
||||
Object.assign(postBody, {
|
||||
|
@ -173,6 +178,69 @@ export default function ProcessModelForm({
|
|||
updateProcessModel({ metadata_extraction_paths: cep });
|
||||
};
|
||||
|
||||
const notificationAddressForm = (
|
||||
index: number,
|
||||
notificationAddress: string
|
||||
) => {
|
||||
return (
|
||||
<Grid>
|
||||
<Column md={3} lg={7} sm={1}>
|
||||
<TextInput
|
||||
id={`process-model-notification-address-key-${index}`}
|
||||
labelText="Address"
|
||||
value={notificationAddress}
|
||||
onChange={(event: any) => {
|
||||
const notificationAddresses: string[] =
|
||||
processModel.exception_notification_addresses || [];
|
||||
notificationAddresses[index] = event.target.value;
|
||||
updateProcessModel({
|
||||
exception_notification_addresses: notificationAddresses,
|
||||
});
|
||||
}}
|
||||
/>
|
||||
</Column>
|
||||
<Column md={1} lg={1} sm={1}>
|
||||
<Button
|
||||
kind="ghost"
|
||||
renderIcon={TrashCan}
|
||||
iconDescription="Remove Address"
|
||||
hasIconOnly
|
||||
size="lg"
|
||||
className="with-extra-top-margin"
|
||||
onClick={() => {
|
||||
const notificationAddresses: string[] =
|
||||
processModel.exception_notification_addresses || [];
|
||||
notificationAddresses.splice(index, 1);
|
||||
updateProcessModel({
|
||||
exception_notification_addresses: notificationAddresses,
|
||||
});
|
||||
}}
|
||||
/>
|
||||
</Column>
|
||||
</Grid>
|
||||
);
|
||||
};
|
||||
|
||||
const notificationAddressFormArea = () => {
|
||||
if (processModel.exception_notification_addresses) {
|
||||
return processModel.exception_notification_addresses.map(
|
||||
(notificationAddress: string, index: number) => {
|
||||
return notificationAddressForm(index, notificationAddress);
|
||||
}
|
||||
);
|
||||
}
|
||||
return null;
|
||||
};
|
||||
|
||||
const addBlankNotificationAddress = () => {
|
||||
const notificationAddresses: string[] =
|
||||
processModel.exception_notification_addresses || [];
|
||||
notificationAddresses.push('');
|
||||
updateProcessModel({
|
||||
exception_notification_addresses: notificationAddresses,
|
||||
});
|
||||
};
|
||||
|
||||
const onDisplayNameChanged = (newDisplayName: any) => {
|
||||
setDisplayNameInvalid(false);
|
||||
const updateDict = { display_name: newDisplayName };
|
||||
|
@ -182,6 +250,11 @@ export default function ProcessModelForm({
|
|||
updateProcessModel(updateDict);
|
||||
};
|
||||
|
||||
const onNotificationTypeChanged = (newNotificationType: string) => {
|
||||
const updateDict = { fault_or_suspend_on_exception: newNotificationType };
|
||||
updateProcessModel(updateDict);
|
||||
};
|
||||
|
||||
const formElements = () => {
|
||||
const textInputs = [
|
||||
<TextInput
|
||||
|
@ -230,6 +303,49 @@ export default function ProcessModelForm({
|
|||
/>
|
||||
);
|
||||
|
||||
textInputs.push(
|
||||
<Select
|
||||
id="notification-type"
|
||||
defaultValue="fault"
|
||||
labelText="Notification Type"
|
||||
onChange={(event: any) => {
|
||||
onNotificationTypeChanged(event.target.value);
|
||||
}}
|
||||
>
|
||||
<SelectItem value="fault" text="Fault" />
|
||||
<SelectItem value="suspend" text="Suspend" />
|
||||
</Select>
|
||||
);
|
||||
textInputs.push(<h2>Notification Addresses</h2>);
|
||||
textInputs.push(
|
||||
<Grid>
|
||||
<Column md={8} lg={16} sm={4}>
|
||||
<p className="data-table-description">
|
||||
You can provide one or more addresses to notify if this model fails.
|
||||
</p>
|
||||
</Column>
|
||||
</Grid>
|
||||
);
|
||||
textInputs.push(<>{notificationAddressFormArea()}</>);
|
||||
textInputs.push(
|
||||
<Grid>
|
||||
<Column md={4} lg={8} sm={2}>
|
||||
<Button
|
||||
data-qa="add-notification-address-button"
|
||||
renderIcon={AddAlt}
|
||||
className="button-white-background"
|
||||
kind=""
|
||||
size="sm"
|
||||
onClick={() => {
|
||||
addBlankNotificationAddress();
|
||||
}}
|
||||
>
|
||||
Add Notification Address
|
||||
</Button>
|
||||
</Column>
|
||||
</Grid>
|
||||
);
|
||||
|
||||
textInputs.push(<h2>Metadata Extractions</h2>);
|
||||
textInputs.push(
|
||||
<Grid>
|
||||
|
|
|
@ -151,6 +151,8 @@ export interface ProcessModel {
|
|||
files: ProcessFile[];
|
||||
parent_groups?: ProcessGroupLite[];
|
||||
metadata_extraction_paths?: MetadataExtractionPath[];
|
||||
fault_or_suspend_on_exception?: string;
|
||||
exception_notification_addresses?: string[];
|
||||
}
|
||||
|
||||
export interface ProcessGroup {
|
||||
|
|
Loading…
Reference in New Issue