Merge remote-tracking branch 'origin/main' into feature/save_tasks_one_at_a_time
This commit is contained in:
commit
eeadd9296f
|
@ -55,7 +55,7 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
|
||||||
# The correlation keys of the process at the time the message was created.
|
# The correlation keys of the process at the time the message was created.
|
||||||
correlation_keys: dict = db.Column(db.JSON)
|
correlation_keys: dict = db.Column(db.JSON)
|
||||||
status: str = db.Column(db.String(20), nullable=False, default="ready")
|
status: str = db.Column(db.String(20), nullable=False, default="ready")
|
||||||
user_id: int = db.Column(ForeignKey(UserModel.id), nullable=False) # type: ignore
|
user_id: int = db.Column(ForeignKey(UserModel.id), nullable=True) # type: ignore
|
||||||
user = relationship("UserModel")
|
user = relationship("UserModel")
|
||||||
counterpart_id: int = db.Column(
|
counterpart_id: int = db.Column(
|
||||||
db.Integer
|
db.Integer
|
||||||
|
|
|
@ -1,19 +1,13 @@
|
||||||
"""Error_handling_service."""
|
"""Error_handling_service."""
|
||||||
import json
|
|
||||||
from typing import Union
|
from typing import Union
|
||||||
|
|
||||||
from flask import current_app
|
from flask import current_app
|
||||||
from flask import g
|
from flask import g
|
||||||
from flask.wrappers import Response
|
|
||||||
|
|
||||||
from spiffworkflow_backend.exceptions.api_error import ApiError
|
from spiffworkflow_backend.exceptions.api_error import ApiError
|
||||||
from spiffworkflow_backend.models.db import db
|
from spiffworkflow_backend.models.db import db
|
||||||
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
|
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
|
||||||
from spiffworkflow_backend.models.message_triggerable_process_model import (
|
|
||||||
MessageTriggerableProcessModel,
|
|
||||||
)
|
|
||||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModelSchema
|
|
||||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
|
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
|
||||||
from spiffworkflow_backend.models.process_model import ProcessModelInfo
|
from spiffworkflow_backend.models.process_model import ProcessModelInfo
|
||||||
from spiffworkflow_backend.services.message_service import MessageService
|
from spiffworkflow_backend.services.message_service import MessageService
|
||||||
|
@ -26,6 +20,8 @@ from spiffworkflow_backend.services.process_model_service import ProcessModelSer
|
||||||
class ErrorHandlingService:
|
class ErrorHandlingService:
|
||||||
"""ErrorHandlingService."""
|
"""ErrorHandlingService."""
|
||||||
|
|
||||||
|
MESSAGE_NAME = "SystemErrorMessage"
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def set_instance_status(instance_id: int, status: str) -> None:
|
def set_instance_status(instance_id: int, status: str) -> None:
|
||||||
"""Set_instance_status."""
|
"""Set_instance_status."""
|
||||||
|
@ -58,106 +54,43 @@ class ErrorHandlingService:
|
||||||
ProcessInstanceStatus.error.value,
|
ProcessInstanceStatus.error.value,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Second, call the System Notification Process
|
# Second, send a bpmn message out, but only if an exception notification address is provided
|
||||||
# Note that this isn't the best way to do this.
|
# This will create a new Send Message with correlation keys on the recipients and the message
|
||||||
# The configs are all in the model.
|
# body.
|
||||||
# Maybe we can move some of this to the notification process, or dmn tables.
|
|
||||||
if len(process_model.exception_notification_addresses) > 0:
|
if len(process_model.exception_notification_addresses) > 0:
|
||||||
try:
|
try:
|
||||||
self.handle_system_notification(_error, process_model)
|
self.handle_system_notification(_error, process_model, _processor)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# hmm... what to do if a notification method fails. Probably log, at least
|
# hmm... what to do if a notification method fails. Probably log, at least
|
||||||
current_app.logger.error(e)
|
current_app.logger.error(e)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def handle_system_notification(
|
def handle_system_notification(
|
||||||
error: Union[ApiError, Exception], process_model: ProcessModelInfo
|
error: Union[ApiError, Exception],
|
||||||
) -> Response:
|
process_model: ProcessModelInfo,
|
||||||
"""Handle_system_notification."""
|
_processor: ProcessInstanceProcessor,
|
||||||
recipients = process_model.exception_notification_addresses
|
) -> None:
|
||||||
|
"""Send a BPMN Message - which may kick off a waiting process."""
|
||||||
message_text = (
|
message_text = (
|
||||||
f"There was an exception running process {process_model.id}.\nOriginal"
|
f"There was an exception running process {process_model.id}.\nOriginal"
|
||||||
f" Error:\n{error.__repr__()}"
|
f" Error:\n{error.__repr__()}"
|
||||||
)
|
)
|
||||||
message_payload = {"message_text": message_text, "recipients": recipients}
|
message_payload = {
|
||||||
message_name = current_app.config[
|
"message_text": message_text,
|
||||||
"SPIFFWORKFLOW_BACKEND_SYSTEM_NOTIFICATION_PROCESS_MODEL_MESSAGE_ID"
|
"recipients": process_model.exception_notification_addresses,
|
||||||
]
|
}
|
||||||
message_triggerable_process_model = (
|
user_id = None
|
||||||
MessageTriggerableProcessModel.query.filter_by(
|
if "user" in g:
|
||||||
message_name=message_name
|
user_id = g.user.id
|
||||||
).first()
|
else:
|
||||||
)
|
user_id = _processor.process_instance_model.process_initiator_id
|
||||||
|
|
||||||
# Create the send message
|
|
||||||
message_instance = MessageInstanceModel(
|
message_instance = MessageInstanceModel(
|
||||||
message_type="send",
|
message_type="send",
|
||||||
name=message_name,
|
name=ErrorHandlingService.MESSAGE_NAME,
|
||||||
payload=message_payload,
|
payload=message_payload,
|
||||||
user_id=g.user.id,
|
user_id=user_id,
|
||||||
)
|
)
|
||||||
db.session.add(message_instance)
|
db.session.add(message_instance)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
MessageService.correlate_send_message(message_instance)
|
||||||
process_instance = MessageService.start_process_with_message(
|
|
||||||
message_triggerable_process_model, message_instance
|
|
||||||
)
|
|
||||||
|
|
||||||
return Response(
|
|
||||||
json.dumps(ProcessInstanceModelSchema().dump(process_instance)),
|
|
||||||
status=200,
|
|
||||||
mimetype="application/json",
|
|
||||||
)
|
|
||||||
|
|
||||||
# @staticmethod
|
|
||||||
# def handle_sentry_notification(_error: ApiError, _recipients: List) -> None:
|
|
||||||
# """SentryHandler."""
|
|
||||||
# ...
|
|
||||||
#
|
|
||||||
# @staticmethod
|
|
||||||
# def handle_email_notification(
|
|
||||||
# processor: ProcessInstanceProcessor,
|
|
||||||
# error: Union[ApiError, Exception],
|
|
||||||
# recipients: List,
|
|
||||||
# ) -> None:
|
|
||||||
# """EmailHandler."""
|
|
||||||
# subject = "Unexpected error in app"
|
|
||||||
# if isinstance(error, ApiError):
|
|
||||||
# content = f"{error.message}"
|
|
||||||
# else:
|
|
||||||
# content = str(error)
|
|
||||||
# content_html = content
|
|
||||||
#
|
|
||||||
# EmailService.add_email(
|
|
||||||
# subject,
|
|
||||||
# "sender@company.com",
|
|
||||||
# recipients,
|
|
||||||
# content,
|
|
||||||
# content_html,
|
|
||||||
# cc=None,
|
|
||||||
# bcc=None,
|
|
||||||
# reply_to=None,
|
|
||||||
# attachment_files=None,
|
|
||||||
# )
|
|
||||||
#
|
|
||||||
# @staticmethod
|
|
||||||
# def handle_waku_notification(_error: ApiError, _recipients: List) -> Any:
|
|
||||||
# """WakuHandler."""
|
|
||||||
# # class WakuMessage:
|
|
||||||
# # """WakuMessage."""
|
|
||||||
# #
|
|
||||||
# # payload: str
|
|
||||||
# # contentTopic: str # Optional
|
|
||||||
# # version: int # Optional
|
|
||||||
# # timestamp: int # Optional
|
|
||||||
|
|
||||||
|
|
||||||
class FailingService:
|
|
||||||
"""FailingService."""
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def fail_as_service() -> None:
|
|
||||||
"""It fails."""
|
|
||||||
raise ApiError(
|
|
||||||
error_code="failing_service", message="This is my failing service"
|
|
||||||
)
|
|
||||||
|
|
|
@ -0,0 +1,77 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:spiffworkflow="http://spiffworkflow.org/bpmn/schema/1.0/core" id="Definitions_96f6665" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.0.0-dev">
|
||||||
|
<bpmn:collaboration id="Collaboration_SystemMessageNotification">
|
||||||
|
<bpmn:participant id="Participant_MessageReceiver" name="Message Receiver" processRef="Process_MessageReceiverNotification" />
|
||||||
|
<bpmn:participant id="Participant_MessageSender" name="Message Sender" />
|
||||||
|
<bpmn:messageFlow id="Flow_1lktxcr" sourceRef="Participant_MessageSender" targetRef="StartEvent_1" />
|
||||||
|
<bpmn:correlationKey name="error_details">
|
||||||
|
<bpmn:correlationPropertyRef>message_text</bpmn:correlationPropertyRef>
|
||||||
|
<bpmn:correlationPropertyRef>recipients</bpmn:correlationPropertyRef>
|
||||||
|
</bpmn:correlationKey>
|
||||||
|
</bpmn:collaboration>
|
||||||
|
<bpmn:process id="Process_MessageReceiverNotification" name="Message Receiver" isExecutable="true">
|
||||||
|
<bpmn:sequenceFlow id="Flow_1wwg6l1" sourceRef="StartEvent_1" targetRef="Activity_1twstnr" />
|
||||||
|
<bpmn:startEvent id="StartEvent_1">
|
||||||
|
<bpmn:outgoing>Flow_1wwg6l1</bpmn:outgoing>
|
||||||
|
<bpmn:messageEventDefinition id="MessageEventDefinition_1kqg8ba" messageRef="Message_SystemMessageNotification" />
|
||||||
|
</bpmn:startEvent>
|
||||||
|
<bpmn:endEvent id="Event_1rn093f">
|
||||||
|
<bpmn:incoming>Flow_1hpekd5</bpmn:incoming>
|
||||||
|
</bpmn:endEvent>
|
||||||
|
<bpmn:sequenceFlow id="Flow_1hpekd5" sourceRef="Activity_1twstnr" targetRef="Event_1rn093f" />
|
||||||
|
<bpmn:scriptTask id="Activity_1twstnr" name="Simple Script Task">
|
||||||
|
<bpmn:incoming>Flow_1wwg6l1</bpmn:incoming>
|
||||||
|
<bpmn:outgoing>Flow_1hpekd5</bpmn:outgoing>
|
||||||
|
<bpmn:script>x = 1</bpmn:script>
|
||||||
|
</bpmn:scriptTask>
|
||||||
|
</bpmn:process>
|
||||||
|
<bpmn:message id="Message_SystemMessageNotification" name="SystemErrorMessage">
|
||||||
|
<bpmn:extensionElements>
|
||||||
|
<spiffworkflow:messageVariable>system_message</spiffworkflow:messageVariable>
|
||||||
|
</bpmn:extensionElements>
|
||||||
|
</bpmn:message>
|
||||||
|
<bpmn:correlationProperty id="message_text" name="message_text">
|
||||||
|
<bpmn:correlationPropertyRetrievalExpression messageRef="Message_SystemMessageNotification">
|
||||||
|
<bpmn:formalExpression>message_text</bpmn:formalExpression>
|
||||||
|
</bpmn:correlationPropertyRetrievalExpression>
|
||||||
|
</bpmn:correlationProperty>
|
||||||
|
<bpmn:correlationProperty id="recipients" name="recipients">
|
||||||
|
<bpmn:correlationPropertyRetrievalExpression messageRef="Message_SystemMessageNotification">
|
||||||
|
<bpmn:formalExpression>recipients</bpmn:formalExpression>
|
||||||
|
</bpmn:correlationPropertyRetrievalExpression>
|
||||||
|
</bpmn:correlationProperty>
|
||||||
|
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
|
||||||
|
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Collaboration_SystemMessageNotification">
|
||||||
|
<bpmndi:BPMNShape id="Participant_0hdwpzk_di" bpmnElement="Participant_MessageReceiver" isHorizontal="true">
|
||||||
|
<dc:Bounds x="120" y="80" width="570" height="180" />
|
||||||
|
<bpmndi:BPMNLabel />
|
||||||
|
</bpmndi:BPMNShape>
|
||||||
|
<bpmndi:BPMNShape id="Event_1rpzksh_di" bpmnElement="StartEvent_1">
|
||||||
|
<dc:Bounds x="179" y="159" width="36" height="36" />
|
||||||
|
</bpmndi:BPMNShape>
|
||||||
|
<bpmndi:BPMNShape id="Event_1rn093f_di" bpmnElement="Event_1rn093f">
|
||||||
|
<dc:Bounds x="522" y="159" width="36" height="36" />
|
||||||
|
</bpmndi:BPMNShape>
|
||||||
|
<bpmndi:BPMNShape id="Activity_13sbt91_di" bpmnElement="Activity_1twstnr">
|
||||||
|
<dc:Bounds x="400" y="137" width="100" height="80" />
|
||||||
|
<bpmndi:BPMNLabel />
|
||||||
|
</bpmndi:BPMNShape>
|
||||||
|
<bpmndi:BPMNEdge id="Flow_1wwg6l1_di" bpmnElement="Flow_1wwg6l1">
|
||||||
|
<di:waypoint x="215" y="177" />
|
||||||
|
<di:waypoint x="400" y="177" />
|
||||||
|
</bpmndi:BPMNEdge>
|
||||||
|
<bpmndi:BPMNEdge id="Flow_1hpekd5_di" bpmnElement="Flow_1hpekd5">
|
||||||
|
<di:waypoint x="500" y="177" />
|
||||||
|
<di:waypoint x="522" y="177" />
|
||||||
|
</bpmndi:BPMNEdge>
|
||||||
|
<bpmndi:BPMNShape id="Participant_04vc6oc_di" bpmnElement="Participant_MessageSender" isHorizontal="true">
|
||||||
|
<dc:Bounds x="120" y="-40" width="390" height="60" />
|
||||||
|
<bpmndi:BPMNLabel />
|
||||||
|
</bpmndi:BPMNShape>
|
||||||
|
<bpmndi:BPMNEdge id="Flow_1lktxcr_di" bpmnElement="Flow_1lktxcr">
|
||||||
|
<di:waypoint x="197" y="20" />
|
||||||
|
<di:waypoint x="197" y="159" />
|
||||||
|
</bpmndi:BPMNEdge>
|
||||||
|
</bpmndi:BPMNPlane>
|
||||||
|
</bpmndi:BPMNDiagram>
|
||||||
|
</bpmn:definitions>
|
|
@ -0,0 +1,104 @@
|
||||||
|
import pytest
|
||||||
|
from flask import Flask
|
||||||
|
from flask.testing import FlaskClient
|
||||||
|
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
||||||
|
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
|
||||||
|
|
||||||
|
from spiffworkflow_backend import db
|
||||||
|
from spiffworkflow_backend.exceptions.api_error import ApiError
|
||||||
|
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
|
||||||
|
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||||
|
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
|
||||||
|
from spiffworkflow_backend.models.process_model import ProcessModelInfo
|
||||||
|
from spiffworkflow_backend.models.user import UserModel
|
||||||
|
from spiffworkflow_backend.services.error_handling_service import ErrorHandlingService
|
||||||
|
from spiffworkflow_backend.services.process_instance_processor import (
|
||||||
|
ProcessInstanceProcessor,
|
||||||
|
)
|
||||||
|
from spiffworkflow_backend.services.process_instance_service import (
|
||||||
|
ProcessInstanceService,
|
||||||
|
)
|
||||||
|
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
||||||
|
|
||||||
|
|
||||||
|
class TestErrorHandlingService(BaseTest):
|
||||||
|
"""Error Handling does some crazy stuff man.
|
||||||
|
|
||||||
|
Like it can fire off BPMN messages in case a BPMN Task is waiting for that message.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def run_process_model_and_handle_error(
|
||||||
|
self, process_model: ProcessModelInfo, user: UserModel
|
||||||
|
) -> ProcessInstanceModel:
|
||||||
|
process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier(
|
||||||
|
process_model.id, user
|
||||||
|
)
|
||||||
|
pip = ProcessInstanceProcessor(process_instance)
|
||||||
|
with pytest.raises(ApiError) as e:
|
||||||
|
pip.do_engine_steps(save=True)
|
||||||
|
ErrorHandlingService().handle_error(pip, e.value)
|
||||||
|
return process_instance
|
||||||
|
|
||||||
|
def test_handle_error_suspends_or_faults_process(
|
||||||
|
self,
|
||||||
|
app: Flask,
|
||||||
|
client: FlaskClient,
|
||||||
|
with_db_and_bpmn_file_cleanup: None,
|
||||||
|
with_super_admin_user: UserModel,
|
||||||
|
) -> None:
|
||||||
|
"""Process Model in DB marked as suspended when error occurs."""
|
||||||
|
process_model = load_test_spec(
|
||||||
|
"test_group/error_suspend",
|
||||||
|
process_model_source_directory="error",
|
||||||
|
bpmn_file_name="error.bpmn", # Slightly misnamed, it sends and receives
|
||||||
|
)
|
||||||
|
|
||||||
|
# Process instance should be marked as errored by default.
|
||||||
|
process_instance = self.run_process_model_and_handle_error(
|
||||||
|
process_model, with_super_admin_user
|
||||||
|
)
|
||||||
|
assert ProcessInstanceStatus.error.value == process_instance.status
|
||||||
|
|
||||||
|
# If process model should be suspended on error, then that is what should happen.
|
||||||
|
process_model.fault_or_suspend_on_exception = "suspend"
|
||||||
|
ProcessModelService.save_process_model(process_model)
|
||||||
|
process_instance = self.run_process_model_and_handle_error(
|
||||||
|
process_model, with_super_admin_user
|
||||||
|
)
|
||||||
|
assert ProcessInstanceStatus.suspended.value == process_instance.status
|
||||||
|
|
||||||
|
def test_error_sends_bpmn_message(
|
||||||
|
self,
|
||||||
|
app: Flask,
|
||||||
|
client: FlaskClient,
|
||||||
|
with_db_and_bpmn_file_cleanup: None,
|
||||||
|
with_super_admin_user: UserModel,
|
||||||
|
) -> None:
|
||||||
|
"""Real BPMN Messages should get generated and processes should fire off and complete."""
|
||||||
|
process_model = load_test_spec(
|
||||||
|
"test_group/error_send_message_bpmn",
|
||||||
|
process_model_source_directory="error",
|
||||||
|
bpmn_file_name="error.bpmn", # Slightly misnamed, it sends and receives
|
||||||
|
)
|
||||||
|
""" Process Model that will listen for errors sent."""
|
||||||
|
load_test_spec(
|
||||||
|
"test_group/admin_tools/error_handler",
|
||||||
|
process_model_source_directory="error",
|
||||||
|
bpmn_file_name="error_handler.bpmn", # Slightly misnamed, it sends and receives
|
||||||
|
)
|
||||||
|
process_model.exception_notification_addresses = [
|
||||||
|
"dan@ILoveToReadErrorsInMyEmails.com"
|
||||||
|
]
|
||||||
|
ProcessModelService.save_process_model(process_model)
|
||||||
|
# kick off the process and assure it got marked as an error.
|
||||||
|
process_instance = self.run_process_model_and_handle_error(
|
||||||
|
process_model, with_super_admin_user
|
||||||
|
)
|
||||||
|
assert ProcessInstanceStatus.error.value == process_instance.status
|
||||||
|
|
||||||
|
# Both send and receive messages should be generated, matched
|
||||||
|
# and considered complete.
|
||||||
|
messages = db.session.query(MessageInstanceModel).all()
|
||||||
|
assert 2 == len(messages)
|
||||||
|
assert "completed" == messages[0].status
|
||||||
|
assert "completed" == messages[1].status
|
Loading…
Reference in New Issue