Adding a new test for error handing to assure this doesn't break in the future, and cleaning up the message call event.
Will also need to update the error handling BPMN process so it provides correlation keys. We should add a task that will alert you when you create a message object without setting correlation keys - as they are required per the specification.
This commit is contained in:
parent
4ce715fec8
commit
79a17ec829
|
@ -55,7 +55,7 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
|
|||
# The correlation keys of the process at the time the message was created.
|
||||
correlation_keys: dict = db.Column(db.JSON)
|
||||
status: str = db.Column(db.String(20), nullable=False, default="ready")
|
||||
user_id: int = db.Column(ForeignKey(UserModel.id), nullable=False) # type: ignore
|
||||
user_id: int = db.Column(ForeignKey(UserModel.id), nullable=True) # type: ignore
|
||||
user = relationship("UserModel")
|
||||
counterpart_id: int = db.Column(
|
||||
db.Integer
|
||||
|
|
|
@ -26,6 +26,8 @@ from spiffworkflow_backend.services.process_model_service import ProcessModelSer
|
|||
class ErrorHandlingService:
|
||||
"""ErrorHandlingService."""
|
||||
|
||||
MESSAGE_NAME = "SystemErrorMessage"
|
||||
|
||||
@staticmethod
|
||||
def set_instance_status(instance_id: int, status: str) -> None:
|
||||
"""Set_instance_status."""
|
||||
|
@ -58,106 +60,42 @@ class ErrorHandlingService:
|
|||
ProcessInstanceStatus.error.value,
|
||||
)
|
||||
|
||||
# Second, call the System Notification Process
|
||||
# Note that this isn't the best way to do this.
|
||||
# The configs are all in the model.
|
||||
# Maybe we can move some of this to the notification process, or dmn tables.
|
||||
# Second, send a bpmn message out, but only if an exception notification address is provided
|
||||
# This will create a new Send Message with correlation keys on the recipients and the message
|
||||
# body.
|
||||
if len(process_model.exception_notification_addresses) > 0:
|
||||
try:
|
||||
self.handle_system_notification(_error, process_model)
|
||||
self.handle_system_notification(_error, process_model, _processor)
|
||||
except Exception as e:
|
||||
# hmm... what to do if a notification method fails. Probably log, at least
|
||||
current_app.logger.error(e)
|
||||
|
||||
@staticmethod
|
||||
def handle_system_notification(
|
||||
error: Union[ApiError, Exception], process_model: ProcessModelInfo
|
||||
error: Union[ApiError, Exception],
|
||||
process_model: ProcessModelInfo,
|
||||
_processor: ProcessInstanceProcessor
|
||||
) -> Response:
|
||||
"""Handle_system_notification."""
|
||||
recipients = process_model.exception_notification_addresses
|
||||
"""Send a BPMN Message - which may kick off a waiting process. """
|
||||
message_text = (
|
||||
f"There was an exception running process {process_model.id}.\nOriginal"
|
||||
f" Error:\n{error.__repr__()}"
|
||||
)
|
||||
message_payload = {"message_text": message_text, "recipients": recipients}
|
||||
message_name = current_app.config[
|
||||
"SPIFFWORKFLOW_BACKEND_SYSTEM_NOTIFICATION_PROCESS_MODEL_MESSAGE_ID"
|
||||
]
|
||||
message_triggerable_process_model = (
|
||||
MessageTriggerableProcessModel.query.filter_by(
|
||||
message_name=message_name
|
||||
).first()
|
||||
)
|
||||
message_payload = {"message_text": message_text,
|
||||
"recipients": process_model.exception_notification_addresses
|
||||
}
|
||||
user_id = None
|
||||
if "user" in g:
|
||||
user_id = g.user.id
|
||||
else:
|
||||
user_id = _processor.process_instance_model.process_initiator_id
|
||||
|
||||
# Create the send message
|
||||
message_instance = MessageInstanceModel(
|
||||
message_type="send",
|
||||
name=message_name,
|
||||
name=ErrorHandlingService.MESSAGE_NAME,
|
||||
payload=message_payload,
|
||||
user_id=g.user.id,
|
||||
user_id=user_id,
|
||||
)
|
||||
db.session.add(message_instance)
|
||||
db.session.commit()
|
||||
|
||||
process_instance = MessageService.start_process_with_message(
|
||||
message_triggerable_process_model, message_instance
|
||||
)
|
||||
|
||||
return Response(
|
||||
json.dumps(ProcessInstanceModelSchema().dump(process_instance)),
|
||||
status=200,
|
||||
mimetype="application/json",
|
||||
)
|
||||
|
||||
# @staticmethod
|
||||
# def handle_sentry_notification(_error: ApiError, _recipients: List) -> None:
|
||||
# """SentryHandler."""
|
||||
# ...
|
||||
#
|
||||
# @staticmethod
|
||||
# def handle_email_notification(
|
||||
# processor: ProcessInstanceProcessor,
|
||||
# error: Union[ApiError, Exception],
|
||||
# recipients: List,
|
||||
# ) -> None:
|
||||
# """EmailHandler."""
|
||||
# subject = "Unexpected error in app"
|
||||
# if isinstance(error, ApiError):
|
||||
# content = f"{error.message}"
|
||||
# else:
|
||||
# content = str(error)
|
||||
# content_html = content
|
||||
#
|
||||
# EmailService.add_email(
|
||||
# subject,
|
||||
# "sender@company.com",
|
||||
# recipients,
|
||||
# content,
|
||||
# content_html,
|
||||
# cc=None,
|
||||
# bcc=None,
|
||||
# reply_to=None,
|
||||
# attachment_files=None,
|
||||
# )
|
||||
#
|
||||
# @staticmethod
|
||||
# def handle_waku_notification(_error: ApiError, _recipients: List) -> Any:
|
||||
# """WakuHandler."""
|
||||
# # class WakuMessage:
|
||||
# # """WakuMessage."""
|
||||
# #
|
||||
# # payload: str
|
||||
# # contentTopic: str # Optional
|
||||
# # version: int # Optional
|
||||
# # timestamp: int # Optional
|
||||
|
||||
|
||||
class FailingService:
|
||||
"""FailingService."""
|
||||
|
||||
@staticmethod
|
||||
def fail_as_service() -> None:
|
||||
"""It fails."""
|
||||
raise ApiError(
|
||||
error_code="failing_service", message="This is my failing service"
|
||||
)
|
||||
MessageService.correlate_send_message(message_instance)
|
||||
|
|
|
@ -0,0 +1,77 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:spiffworkflow="http://spiffworkflow.org/bpmn/schema/1.0/core" id="Definitions_96f6665" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.0.0-dev">
|
||||
<bpmn:collaboration id="Collaboration_SystemMessageNotification">
|
||||
<bpmn:participant id="Participant_MessageReceiver" name="Message Receiver" processRef="Process_MessageReceiverNotification" />
|
||||
<bpmn:participant id="Participant_MessageSender" name="Message Sender" />
|
||||
<bpmn:messageFlow id="Flow_1lktxcr" sourceRef="Participant_MessageSender" targetRef="StartEvent_1" />
|
||||
<bpmn:correlationKey name="error_details">
|
||||
<bpmn:correlationPropertyRef>message_text</bpmn:correlationPropertyRef>
|
||||
<bpmn:correlationPropertyRef>recipients</bpmn:correlationPropertyRef>
|
||||
</bpmn:correlationKey>
|
||||
</bpmn:collaboration>
|
||||
<bpmn:process id="Process_MessageReceiverNotification" name="Message Receiver" isExecutable="true">
|
||||
<bpmn:sequenceFlow id="Flow_1wwg6l1" sourceRef="StartEvent_1" targetRef="Activity_1twstnr" />
|
||||
<bpmn:startEvent id="StartEvent_1">
|
||||
<bpmn:outgoing>Flow_1wwg6l1</bpmn:outgoing>
|
||||
<bpmn:messageEventDefinition id="MessageEventDefinition_1kqg8ba" messageRef="Message_SystemMessageNotification" />
|
||||
</bpmn:startEvent>
|
||||
<bpmn:endEvent id="Event_1rn093f">
|
||||
<bpmn:incoming>Flow_1hpekd5</bpmn:incoming>
|
||||
</bpmn:endEvent>
|
||||
<bpmn:sequenceFlow id="Flow_1hpekd5" sourceRef="Activity_1twstnr" targetRef="Event_1rn093f" />
|
||||
<bpmn:scriptTask id="Activity_1twstnr" name="Simple Script Task">
|
||||
<bpmn:incoming>Flow_1wwg6l1</bpmn:incoming>
|
||||
<bpmn:outgoing>Flow_1hpekd5</bpmn:outgoing>
|
||||
<bpmn:script>x = 1</bpmn:script>
|
||||
</bpmn:scriptTask>
|
||||
</bpmn:process>
|
||||
<bpmn:message id="Message_SystemMessageNotification" name="SystemErrorMessage">
|
||||
<bpmn:extensionElements>
|
||||
<spiffworkflow:messageVariable>system_message</spiffworkflow:messageVariable>
|
||||
</bpmn:extensionElements>
|
||||
</bpmn:message>
|
||||
<bpmn:correlationProperty id="message_text" name="message_text">
|
||||
<bpmn:correlationPropertyRetrievalExpression messageRef="Message_SystemMessageNotification">
|
||||
<bpmn:formalExpression>message_text</bpmn:formalExpression>
|
||||
</bpmn:correlationPropertyRetrievalExpression>
|
||||
</bpmn:correlationProperty>
|
||||
<bpmn:correlationProperty id="recipients" name="recipients">
|
||||
<bpmn:correlationPropertyRetrievalExpression messageRef="Message_SystemMessageNotification">
|
||||
<bpmn:formalExpression>recipients</bpmn:formalExpression>
|
||||
</bpmn:correlationPropertyRetrievalExpression>
|
||||
</bpmn:correlationProperty>
|
||||
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
|
||||
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Collaboration_SystemMessageNotification">
|
||||
<bpmndi:BPMNShape id="Participant_0hdwpzk_di" bpmnElement="Participant_MessageReceiver" isHorizontal="true">
|
||||
<dc:Bounds x="120" y="80" width="570" height="180" />
|
||||
<bpmndi:BPMNLabel />
|
||||
</bpmndi:BPMNShape>
|
||||
<bpmndi:BPMNShape id="Event_1rpzksh_di" bpmnElement="StartEvent_1">
|
||||
<dc:Bounds x="179" y="159" width="36" height="36" />
|
||||
</bpmndi:BPMNShape>
|
||||
<bpmndi:BPMNShape id="Event_1rn093f_di" bpmnElement="Event_1rn093f">
|
||||
<dc:Bounds x="522" y="159" width="36" height="36" />
|
||||
</bpmndi:BPMNShape>
|
||||
<bpmndi:BPMNShape id="Activity_13sbt91_di" bpmnElement="Activity_1twstnr">
|
||||
<dc:Bounds x="400" y="137" width="100" height="80" />
|
||||
<bpmndi:BPMNLabel />
|
||||
</bpmndi:BPMNShape>
|
||||
<bpmndi:BPMNEdge id="Flow_1wwg6l1_di" bpmnElement="Flow_1wwg6l1">
|
||||
<di:waypoint x="215" y="177" />
|
||||
<di:waypoint x="400" y="177" />
|
||||
</bpmndi:BPMNEdge>
|
||||
<bpmndi:BPMNEdge id="Flow_1hpekd5_di" bpmnElement="Flow_1hpekd5">
|
||||
<di:waypoint x="500" y="177" />
|
||||
<di:waypoint x="522" y="177" />
|
||||
</bpmndi:BPMNEdge>
|
||||
<bpmndi:BPMNShape id="Participant_04vc6oc_di" bpmnElement="Participant_MessageSender" isHorizontal="true">
|
||||
<dc:Bounds x="120" y="-40" width="390" height="60" />
|
||||
<bpmndi:BPMNLabel />
|
||||
</bpmndi:BPMNShape>
|
||||
<bpmndi:BPMNEdge id="Flow_1lktxcr_di" bpmnElement="Flow_1lktxcr">
|
||||
<di:waypoint x="197" y="20" />
|
||||
<di:waypoint x="197" y="159" />
|
||||
</bpmndi:BPMNEdge>
|
||||
</bpmndi:BPMNPlane>
|
||||
</bpmndi:BPMNDiagram>
|
||||
</bpmn:definitions>
|
|
@ -0,0 +1,87 @@
|
|||
import pytest
|
||||
from flask import Flask
|
||||
from flask.testing import FlaskClient
|
||||
|
||||
from conftest import with_super_admin_user
|
||||
from spiffworkflow_backend import db
|
||||
from spiffworkflow_backend.exceptions.api_error import ApiError
|
||||
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
|
||||
from spiffworkflow_backend.models.process_model import ProcessModelInfo
|
||||
from spiffworkflow_backend.models.user import UserModel
|
||||
from spiffworkflow_backend.services.error_handling_service import ErrorHandlingService
|
||||
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
|
||||
from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService
|
||||
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
||||
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
||||
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
|
||||
|
||||
|
||||
class TestErrorHandlingService(BaseTest):
|
||||
"""Error Handling does some crazy stuff man, like even firing off
|
||||
messages in case a BPMN Task is waiting for that message. """
|
||||
|
||||
def run_process_model_and_handle_error(self, process_model: ProcessModelInfo, user: UserModel) -> Exception:
|
||||
process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier(
|
||||
process_model.id, user)
|
||||
pip = ProcessInstanceProcessor(process_instance)
|
||||
with pytest.raises(ApiError) as e:
|
||||
pip.do_engine_steps(save=True)
|
||||
ErrorHandlingService().handle_error(pip, e)
|
||||
return process_instance
|
||||
|
||||
|
||||
def test_handle_error_suspends_or_faults_process(
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
""" Process Model in DB marked as suspended when error occurs"""
|
||||
process_model = load_test_spec(
|
||||
"test_group/error_suspend",
|
||||
process_model_source_directory="error",
|
||||
bpmn_file_name="error.bpmn", # Slightly misnamed, it sends and receives
|
||||
)
|
||||
|
||||
# Process instance should be marked as errored by default.
|
||||
process_instance = self.run_process_model_and_handle_error(process_model, with_super_admin_user)
|
||||
assert(ProcessInstanceStatus.error.value == process_instance.status)
|
||||
|
||||
# If process model should be suspended on error, then that is what should happen.
|
||||
process_model.fault_or_suspend_on_exception = "suspend"
|
||||
ProcessModelService.save_process_model(process_model)
|
||||
process_instance = self.run_process_model_and_handle_error(process_model, with_super_admin_user)
|
||||
assert(ProcessInstanceStatus.suspended.value == process_instance.status)
|
||||
|
||||
def test_error_sends_bpmn_message(
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel):
|
||||
""" Process Model in DB marked as suspended when error occurs"""
|
||||
process_model = load_test_spec(
|
||||
"test_group/error_send_message_bpmn",
|
||||
process_model_source_directory="error",
|
||||
bpmn_file_name="error.bpmn", # Slightly misnamed, it sends and receives
|
||||
)
|
||||
""" Process Model that will listen for errors sent."""
|
||||
process_model_error_handler = load_test_spec(
|
||||
"test_group/admin_tools/error_handler",
|
||||
process_model_source_directory="error",
|
||||
bpmn_file_name="error_handler.bpmn", # Slightly misnamed, it sends and receives
|
||||
)
|
||||
process_model.exception_notification_addresses=["dan@ILoveToReadErrorsInMyEmails.com"]
|
||||
ProcessModelService.save_process_model(process_model)
|
||||
# kick off the process and assure it got marked as an error.
|
||||
process_instance = self.run_process_model_and_handle_error(process_model, with_super_admin_user)
|
||||
assert(ProcessInstanceStatus.error.value == process_instance.status)
|
||||
|
||||
# Both send and receive messages should be generated, matched
|
||||
# and considered complete.
|
||||
messages = db.session.query(MessageInstanceModel).all()
|
||||
assert(2 == len(messages))
|
||||
assert("completed", messages[0].status)
|
||||
assert("completed", messages[1].status)
|
Loading…
Reference in New Issue