diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py b/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py
index 9027347c8..80cfb9191 100644
--- a/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py
+++ b/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py
@@ -77,3 +77,7 @@ SPIFF_DATABASE_TYPE = environ.get(
SPIFFWORKFLOW_BACKEND_DATABASE_URI = environ.get(
"SPIFFWORKFLOW_BACKEND_DATABASE_URI", default=None
)
+SYSTEM_NOTIFICATION_PROCESS_MODEL_MESSAGE_ID = environ.get(
+ "SYSTEM_NOTIFICATION_PROCESS_MODEL_MESSAGE_ID",
+ default="Message_SystemMessageNotification",
+)
diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_models_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_models_controller.py
index 2703a311f..8a6e63066 100644
--- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_models_controller.py
+++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_models_controller.py
@@ -56,6 +56,8 @@ def process_model_create(
"primary_process_id",
"description",
"metadata_extraction_paths",
+ "fault_or_suspend_on_exception",
+ "exception_notification_addresses",
]
body_filtered = {
include_item: body[include_item]
@@ -108,6 +110,8 @@ def process_model_update(
"primary_process_id",
"description",
"metadata_extraction_paths",
+ "fault_or_suspend_on_exception",
+ "exception_notification_addresses",
]
body_filtered = {
include_item: body[include_item]
diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py
index ffdfd26aa..51f402fac 100644
--- a/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py
+++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py
@@ -1,15 +1,22 @@
"""Error_handling_service."""
-from typing import Any
-from typing import List
+import json
from typing import Union
from flask import current_app
+from flask import g
+from flask.wrappers import Response
from flask_bpmn.api.api_error import ApiError
from flask_bpmn.models.db import db
+from spiffworkflow_backend.models.message_model import MessageModel
+from spiffworkflow_backend.models.message_triggerable_process_model import (
+ MessageTriggerableProcessModel,
+)
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
+from spiffworkflow_backend.models.process_instance import ProcessInstanceModelSchema
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
-from spiffworkflow_backend.services.email_service import EmailService
+from spiffworkflow_backend.models.process_model import ProcessModelInfo
+from spiffworkflow_backend.services.message_service import MessageService
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
@@ -38,6 +45,7 @@ class ErrorHandlingService:
process_model = ProcessModelService.get_process_model(
_processor.process_model_identifier
)
+ # First, suspend or fault the instance
if process_model.fault_or_suspend_on_exception == "suspend":
self.set_instance_status(
_processor.process_instance_model.id,
@@ -50,57 +58,93 @@ class ErrorHandlingService:
ProcessInstanceStatus.error.value,
)
+ # Second, call the System Notification Process
+ # Note that this isn't the best way to do this.
+ # The configs are all in the model.
+ # Maybe we can move some of this to the notification process, or dmn tables.
if len(process_model.exception_notification_addresses) > 0:
try:
- # some notification method (waku?)
- self.handle_email_notification(
- _processor, _error, process_model.exception_notification_addresses
- )
+ self.handle_system_notification(_error, process_model)
except Exception as e:
# hmm... what to do if a notification method fails. Probably log, at least
current_app.logger.error(e)
@staticmethod
- def hanle_sentry_notification(_error: ApiError, _recipients: List) -> None:
- """SentryHandler."""
- ...
-
- @staticmethod
- def handle_email_notification(
- processor: ProcessInstanceProcessor,
- error: Union[ApiError, Exception],
- recipients: List,
- ) -> None:
- """EmailHandler."""
- subject = "Unexpected error in app"
- if isinstance(error, ApiError):
- content = f"{error.message}"
- else:
- content = str(error)
- content_html = content
-
- EmailService.add_email(
- subject,
- "sender@company.com",
- recipients,
- content,
- content_html,
- cc=None,
- bcc=None,
- reply_to=None,
- attachment_files=None,
+ def handle_system_notification(
+ error: Union[ApiError, Exception], process_model: ProcessModelInfo
+ ) -> Response:
+ """Handle_system_notification."""
+ recipients = process_model.exception_notification_addresses
+ message_text = (
+ f"There was an exception running process {process_model.id}.\nOriginal"
+ f" Error:\n{error.__repr__()}"
+ )
+ message_payload = {"message_text": message_text, "recipients": recipients}
+ message_identifier = current_app.config[
+ "SYSTEM_NOTIFICATION_PROCESS_MODEL_MESSAGE_ID"
+ ]
+ message_model = MessageModel.query.filter_by(
+ identifier=message_identifier
+ ).first()
+ message_triggerable_process_model = (
+ MessageTriggerableProcessModel.query.filter_by(
+ message_model_id=message_model.id
+ ).first()
+ )
+ process_instance = MessageService.process_message_triggerable_process_model(
+ message_triggerable_process_model,
+ message_identifier,
+ message_payload,
+ g.user,
)
- @staticmethod
- def handle_waku_notification(_error: ApiError, _recipients: List) -> Any:
- """WakuHandler."""
- # class WakuMessage:
- # """WakuMessage."""
- #
- # payload: str
- # contentTopic: str # Optional
- # version: int # Optional
- # timestamp: int # Optional
+ return Response(
+ json.dumps(ProcessInstanceModelSchema().dump(process_instance)),
+ status=200,
+ mimetype="application/json",
+ )
+
+ # @staticmethod
+ # def handle_sentry_notification(_error: ApiError, _recipients: List) -> None:
+ # """SentryHandler."""
+ # ...
+ #
+ # @staticmethod
+ # def handle_email_notification(
+ # processor: ProcessInstanceProcessor,
+ # error: Union[ApiError, Exception],
+ # recipients: List,
+ # ) -> None:
+ # """EmailHandler."""
+ # subject = "Unexpected error in app"
+ # if isinstance(error, ApiError):
+ # content = f"{error.message}"
+ # else:
+ # content = str(error)
+ # content_html = content
+ #
+ # EmailService.add_email(
+ # subject,
+ # "sender@company.com",
+ # recipients,
+ # content,
+ # content_html,
+ # cc=None,
+ # bcc=None,
+ # reply_to=None,
+ # attachment_files=None,
+ # )
+ #
+ # @staticmethod
+ # def handle_waku_notification(_error: ApiError, _recipients: List) -> Any:
+ # """WakuHandler."""
+ # # class WakuMessage:
+ # # """WakuMessage."""
+ # #
+ # # payload: str
+ # # contentTopic: str # Optional
+ # # version: int # Optional
+ # # timestamp: int # Optional
class FailingService:
diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py
index 2c77f0bbb..510c66fda 100644
--- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py
+++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py
@@ -1307,8 +1307,34 @@ class ProcessInstanceProcessor:
except WorkflowTaskExecException as we:
raise ApiError.from_workflow_exception("task_error", str(we), we) from we
+ def user_defined_task_data(self, task_data: dict) -> dict:
+ """UserDefinedTaskData."""
+ return {k: v for k, v in task_data.items() if k != "current_user"}
+
+ def check_task_data_size(self) -> None:
+ """CheckTaskDataSize."""
+ tasks_to_check = self.bpmn_process_instance.get_tasks(TaskState.FINISHED_MASK)
+ task_data = [self.user_defined_task_data(task.data) for task in tasks_to_check]
+ task_data_to_check = list(filter(len, task_data))
+
+ try:
+ task_data_len = len(json.dumps(task_data_to_check))
+ except Exception:
+ task_data_len = 0
+
+ task_data_limit = 1024**2
+
+ if task_data_len > task_data_limit:
+ raise (
+ ApiError(
+ error_code="task_data_size_exceeded",
+ message=f"Maximum task data size of {task_data_limit} exceeded.",
+ )
+ )
+
def serialize(self) -> str:
"""Serialize."""
+ self.check_task_data_size()
return self._serializer.serialize_json(self.bpmn_process_instance) # type: ignore
def next_user_tasks(self) -> list[SpiffTask]:
diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py
index 83ca22b79..d146d0b22 100644
--- a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py
+++ b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py
@@ -2160,59 +2160,10 @@ class TestProcessApi(BaseTest):
assert process is not None
assert process.status == "suspended"
- def test_error_handler_with_email(
- self,
- app: Flask,
- client: FlaskClient,
- with_db_and_bpmn_file_cleanup: None,
- with_super_admin_user: UserModel,
- ) -> None:
- """Test_error_handler."""
- process_group_id = "data"
- process_model_id = "error"
- bpmn_file_name = "error.bpmn"
- bpmn_file_location = "error"
- process_model_identifier = self.create_group_and_model_with_bpmn(
- client,
- with_super_admin_user,
- process_group_id=process_group_id,
- process_model_id=process_model_id,
- bpmn_file_name=bpmn_file_name,
- bpmn_file_location=bpmn_file_location,
- )
-
- process_instance_id = self.setup_testing_instance(
- client, process_model_identifier, with_super_admin_user
- )
-
- process_model = ProcessModelService.get_process_model(process_model_identifier)
- ProcessModelService.update_process_model(
- process_model,
- {"exception_notification_addresses": ["with_super_admin_user@example.com"]},
- )
-
- mail = app.config["MAIL_APP"]
- with mail.record_messages() as outbox:
- response = client.post(
- f"/v1.0/process-instances/{self.modify_process_identifier_for_path_param(process_model_identifier)}/{process_instance_id}/run",
- headers=self.logged_in_headers(with_super_admin_user),
- )
- assert response.status_code == 400
- assert len(outbox) == 1
- message = outbox[0]
- assert message.subject == "Unexpected error in app"
- assert (
- message.body == 'TypeError:can only concatenate str (not "int") to str'
- )
- assert message.recipients == process_model.exception_notification_addresses
-
- process = (
- db.session.query(ProcessInstanceModel)
- .filter(ProcessInstanceModel.id == process_instance_id)
- .first()
- )
- assert process is not None
- assert process.status == "error"
+ def test_error_handler_system_notification(self) -> None:
+ """Test_error_handler_system_notification."""
+ # TODO: make sure the system notification process is run on exceptions
+ ...
def test_task_data_is_set_even_if_process_instance_errors(
self,
diff --git a/spiffworkflow-frontend/src/components/ProcessModelForm.tsx b/spiffworkflow-frontend/src/components/ProcessModelForm.tsx
index 7cfd4d616..999356952 100644
--- a/spiffworkflow-frontend/src/components/ProcessModelForm.tsx
+++ b/spiffworkflow-frontend/src/components/ProcessModelForm.tsx
@@ -8,6 +8,8 @@ import {
TextInput,
Grid,
Column,
+ Select,
+ SelectItem,
// @ts-ignore
} from '@carbon/react';
// @ts-ignore
@@ -76,6 +78,9 @@ export default function ProcessModelForm({
display_name: processModel.display_name,
description: processModel.description,
metadata_extraction_paths: processModel.metadata_extraction_paths,
+ fault_or_suspend_on_exception: processModel.fault_or_suspend_on_exception,
+ exception_notification_addresses:
+ processModel.exception_notification_addresses,
};
if (mode === 'new') {
Object.assign(postBody, {
@@ -173,6 +178,69 @@ export default function ProcessModelForm({
updateProcessModel({ metadata_extraction_paths: cep });
};
+ const notificationAddressForm = (
+ index: number,
+ notificationAddress: string
+ ) => {
+ return (
+
+ You can provide one or more addresses to notify if this model fails. +
+