* Re-work message tests so I could wrap my simple head around what was happening - just needed an example that made sense to me.

* Clear out complex get_message_instance_receive how that many-to-many works.
* Create decent error messages when correlations fail
* Move correlation checks into the MessageInstance class
* The APIError could bomb out ugly if it hit a workflow exception with not Task Spec.
This commit is contained in:
Dan 2023-02-18 13:09:58 -05:00
parent 5f9ba556d1
commit 28ac9ef872
12 changed files with 326 additions and 404 deletions

View File

@ -23,6 +23,7 @@ from calendar import monthrange
from time import timezone as tzoffset
from copy import deepcopy
from SpiffWorkflow.exceptions import SpiffWorkflowException, WorkflowException
from SpiffWorkflow.task import TaskState
LOCALTZ = timezone(timedelta(seconds=-1 * tzoffset))
@ -191,7 +192,7 @@ class MessageEventDefinition(NamedEventDefinition):
# However, there needs to be something to apply the correlations to in the
# standard case and this is line with the way Spiff works otherwise
event.payload = deepcopy(my_task.data)
correlations = self.get_correlations(my_task.workflow.script_engine, event.payload)
correlations = self.get_correlations(my_task, event.payload)
my_task.workflow.correlations.update(correlations)
self._throw(event, my_task.workflow, my_task.workflow.outer_workflow, correlations)
@ -205,13 +206,19 @@ class MessageEventDefinition(NamedEventDefinition):
if payload is not None:
my_task.set_data(**payload)
def get_correlations(self, script_engine, payload):
def get_correlations(self, task, payload):
correlations = {}
for property in self.correlation_properties:
for key in property.correlation_keys:
if key not in correlations:
correlations[key] = {}
correlations[key][property.name] = script_engine._evaluate(property.expression, payload)
try:
correlations[key][property.name] = task.workflow.script_engine._evaluate(property.expression, payload)
except WorkflowException as we:
we.add_note(f"Failed to evaluate correlation key '{key}'"
f" invalid expression '{property.expression}'")
we.task_spec = task.task_spec
raise we
return correlations
@ -351,7 +358,7 @@ class TimerEventDefinition(EventDefinition):
return TimerEventDefinition.parse_iso_week(expression)
else:
return TimerEventDefinition.get_datetime(expression)
@staticmethod
def parse_iso_recurring_interval(expression):
components = expression.upper().replace('--', '/').strip('R').split('/')
@ -510,7 +517,7 @@ class MultipleEventDefinition(EventDefinition):
if event == other:
return True
return False
def throw(self, my_task):
# Mutiple events throw all associated events when they fire
for event_definition in self.event_definitions:
@ -518,4 +525,4 @@ class MultipleEventDefinition(EventDefinition):
event=event_definition,
workflow=my_task.workflow,
outer_workflow=my_task.workflow.outer_workflow
)
)

View File

@ -14,7 +14,7 @@ class MessageEventDefinition(MessageEventDefinition):
# we have to evaluate it again so we have to create a new event
event = MessageEventDefinition(self.name, self.correlation_properties, self.expression, self.message_var)
event.payload = my_task.workflow.script_engine.evaluate(my_task, self.expression)
correlations = self.get_correlations(my_task.workflow.script_engine, event.payload)
correlations = self.get_correlations(my_task, event.payload)
my_task.workflow.correlations.update(correlations)
self._throw(event, my_task.workflow, my_task.workflow.outer_workflow, correlations)

View File

@ -27,8 +27,8 @@ flask-marshmallow = "*"
flask-migrate = "*"
flask-restful = "*"
werkzeug = "*"
SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "main"}
# SpiffWorkflow = {develop = true, path = "../SpiffWorkflow" }
#SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "main"}
SpiffWorkflow = {develop = true, path = "../SpiffWorkflow" }
sentry-sdk = "^1.10"
sphinx-autoapi = "^2.0"
flask-bpmn = {git = "https://github.com/sartography/flask-bpmn", rev = "main"}

View File

@ -157,7 +157,7 @@ class ApiError(Exception):
error_line=exp.error_line,
task_trace=exp.task_trace,
)
elif isinstance(exp, WorkflowException):
elif isinstance(exp, WorkflowException) and exp.task_spec:
return ApiError.from_task_spec(error_code, message, exp.task_spec)
else:
return ApiError("workflow_error", str(exp))

View File

@ -1,7 +1,7 @@
"""Message_instance."""
import enum
from dataclasses import dataclass
from typing import Any
from typing import Any, Self
from typing import Optional
from typing import TYPE_CHECKING
@ -68,7 +68,15 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
"""Validate_status."""
return self.validate_enum_field(key, value, MessageStatuses)
def correlates(self, correlation_dictionary):
def correlates(self, other_message_instance: Self) -> bool:
if other_message_instance.message_model_id != self.message_model_id:
return False
correlation_dict = {}
for c in other_message_instance.message_correlations:
correlation_dict[c.name]=c.value
return self.correlates_with_dictionary(correlation_dict)
def correlates_with_dictionary(self, correlation_dictionary: dict) -> bool:
"""Returns true if the given dictionary matches the correlation names and values connected to this message instance"""
for c in self.message_correlations:
# Fixme: Maybe we should look at typing the correlations and not forcing them to strings?

View File

@ -125,7 +125,7 @@ def message_send(
# do any waiting message instances have matching correlations?
matching_message = None
for message_instance in message_instances:
if message_instance.correlates(body["payload"]):
if message_instance.correlates_with_dictionary(body["payload"]):
matching_message = message_instance
process_instance = None

View File

@ -8,7 +8,7 @@ from sqlalchemy import select
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.message_correlation import MessageCorrelationModel
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_instance import MessageInstanceModel, MessageStatuses
from spiffworkflow_backend.models.message_triggerable_process_model import (
MessageTriggerableProcessModel,
)
@ -38,6 +38,7 @@ class MessageService:
message_instances_receive = MessageInstanceModel.query.filter_by(
message_type="receive", status="ready"
).all()
for message_instance_send in message_instances_send:
# check again in case another background process picked up the message
# while the previous one was running
@ -121,8 +122,7 @@ class MessageService:
processor_receive.do_engine_steps(save=False)
processor_receive.bpmn_process_instance.catch_bpmn_message(
message_model_name,
message_payload,
correlations={},
message_payload
)
processor_receive.do_engine_steps(save=True)
@ -156,58 +156,19 @@ class MessageService:
correlations={},
)
processor_receive.do_engine_steps(save=True)
message_instance_receive.status = MessageStatuses.completed.value
db.session.add(message_instance_receive)
db.session.commit()
@staticmethod
def get_message_instance_receive(
message_instance_send: MessageInstanceModel,
message_instances_receive: list[MessageInstanceModel],
) -> Optional[MessageInstanceModel]:
"""Get_message_instance_receive."""
message_correlations_send = message_instance_send.message_correlations
message_correlation_filter = []
for message_correlation_send in message_correlations_send:
message_correlation_filter.append(
and_(
MessageCorrelationModel.name == message_correlation_send.name,
MessageCorrelationModel.value == message_correlation_send.value,
MessageCorrelationModel.message_correlation_property_id
== message_correlation_send.message_correlation_property_id,
)
)
for message_instance_receive in message_instances_receive:
# sqlalchemy supports select / where statements like active record apparantly
# https://docs.sqlalchemy.org/en/14/core/tutorial.html#conjunctions
message_correlation_select = (
select([db.func.count()])
.select_from(MessageCorrelationModel) # type: ignore
.where(
and_(
MessageCorrelationModel.process_instance_id
== message_instance_receive.process_instance_id,
or_(*message_correlation_filter),
)
)
.join(message_correlation_message_instance_table) # type: ignore
.filter_by(
message_instance_id=message_instance_receive.id,
)
)
message_correlations_receive = db.session.execute(
message_correlation_select
)
# since the query matches on name, value, and message_instance_receive.id, if the counts
# message correlations found are the same, then this should be the relevant message
if (
message_correlations_receive.scalar() == len(message_correlations_send)
and message_instance_receive.message_model_id
== message_instance_send.message_model_id
):
return message_instance_receive
"""Returns the message instance that correlates to the send message, or None if nothing correlates."""
for message_instance in message_instances_receive:
if message_instance.correlates(message_instance_send):
return message_instance
return None
@staticmethod

View File

@ -1,42 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:spiffworkflow="http://spiffworkflow.org/bpmn/schema/1.0/core" id="Definitions_96f6665" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.0.0-dev">
<bpmn:collaboration id="Collaboration_0oye1os">
<bpmn:participant id="message_receiver" name="Message Receiver" processRef="message_receiver_process" />
<bpmn:participant id="message_receiver" name="Message Receiver (invoice approver)" processRef="message_receiver_process" />
<bpmn:participant id="message_sender" name="Message Sender" />
<bpmn:messageFlow id="message_send_flow" name="Message Send Flow" sourceRef="message_sender" targetRef="receive_message" />
<bpmn:messageFlow id="Flow_0ds946g" sourceRef="send_message_response" targetRef="message_sender" />
<bpmn:correlationKey name="message_correlation_key">
<bpmn:correlationPropertyRef>message_correlation_property_topica</bpmn:correlationPropertyRef>
<bpmn:correlationPropertyRef>message_correlation_property_topicb</bpmn:correlationPropertyRef>
<bpmn:correlationKey name="invoice">
<bpmn:correlationPropertyRef>customer_id</bpmn:correlationPropertyRef>
<bpmn:correlationPropertyRef>po_number</bpmn:correlationPropertyRef>
</bpmn:correlationKey>
</bpmn:collaboration>
<bpmn:correlationProperty id="message_correlation_property_topica" name="Message Correlation Property TopicA">
<bpmn:correlationPropertyRetrievalExpression messageRef="message_send">
<bpmn:messagePath>topica</bpmn:messagePath>
<bpmn:correlationProperty id="customer_id" name="Customer Id">
<bpmn:correlationPropertyRetrievalExpression messageRef="request_approval">
<bpmn:formalExpression>customer_id</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
<bpmn:correlationPropertyRetrievalExpression messageRef="message_response">
<bpmn:messagePath>the_payload.topica</bpmn:messagePath>
<bpmn:correlationPropertyRetrievalExpression messageRef="approval_result">
<bpmn:formalExpression>customer_id</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
</bpmn:correlationProperty>
<bpmn:correlationProperty id="message_correlation_property_topicb" name="Message Correlation Property TopicB">
<bpmn:correlationPropertyRetrievalExpression messageRef="message_send">
<bpmn:messagePath>topicb</bpmn:messagePath>
<bpmn:correlationProperty id="po_number" name="Purchase Order Number">
<bpmn:correlationPropertyRetrievalExpression messageRef="request_approval">
<bpmn:formalExpression>po_number</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
<bpmn:correlationPropertyRetrievalExpression messageRef="message_response">
<bpmn:messagePath>the_payload.topicb</bpmn:messagePath>
<bpmn:correlationPropertyRetrievalExpression messageRef="approval_result">
<bpmn:formalExpression>po_number</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
</bpmn:correlationProperty>
<bpmn:message id="message_send" name="Message Send">
<bpmn:message id="request_approval" name="Request Approval">
<bpmn:extensionElements>
<spiffworkflow:messageVariable>the_payload</spiffworkflow:messageVariable>
<spiffworkflow:messageVariable>invoice</spiffworkflow:messageVariable>
</bpmn:extensionElements>
</bpmn:message>
<bpmn:message id="message_response" name="Message Response">
<bpmn:message id="approval_result" name="Approval Result">
<bpmn:extensionElements>
<spiffworkflow:messagePayload>{ "the_payload": {
"topica": the_payload.topica,
"topicb": the_payload.topicb,
}}</spiffworkflow:messagePayload>
<spiffworkflow:messagePayload>invoice</spiffworkflow:messagePayload>
</bpmn:extensionElements>
</bpmn:message>
<bpmn:process id="message_receiver_process" name="Message Receiver Process" isExecutable="true">
@ -45,28 +42,21 @@
<bpmn:endEvent id="Event_0q5otqd">
<bpmn:incoming>Flow_11r9uiw</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sendTask id="send_message_response" name="Send Message Reponse" messageRef="message_response">
<bpmn:sendTask id="send_message_response" name="Send Message Reponse" messageRef="approval_result">
<bpmn:incoming>Flow_0fruoax</bpmn:incoming>
<bpmn:outgoing>Flow_11r9uiw</bpmn:outgoing>
</bpmn:sendTask>
<bpmn:startEvent id="receive_message" name="Receive Message">
<bpmn:outgoing>Flow_0fruoax</bpmn:outgoing>
<bpmn:messageEventDefinition id="MessageEventDefinition_08u7ksn" messageRef="message_send" />
<bpmn:messageEventDefinition id="MessageEventDefinition_08u7ksn" messageRef="request_approval" />
</bpmn:startEvent>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Collaboration_0oye1os">
<bpmndi:BPMNShape id="Participant_0mr0gg1_di" bpmnElement="message_receiver" isHorizontal="true">
<dc:Bounds x="120" y="350" width="480" height="230" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_11r9uiw_di" bpmnElement="Flow_11r9uiw">
<di:waypoint x="480" y="480" />
<di:waypoint x="512" y="480" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0fruoax_di" bpmnElement="Flow_0fruoax">
<di:waypoint x="208" y="480" />
<di:waypoint x="380" y="480" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNShape id="Event_0q5otqd_di" bpmnElement="Event_0q5otqd">
<dc:Bounds x="512" y="462" width="36" height="36" />
</bpmndi:BPMNShape>
@ -79,6 +69,14 @@
<dc:Bounds x="149" y="505" width="88" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_0fruoax_di" bpmnElement="Flow_0fruoax">
<di:waypoint x="208" y="480" />
<di:waypoint x="380" y="480" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_11r9uiw_di" bpmnElement="Flow_11r9uiw">
<di:waypoint x="480" y="480" />
<di:waypoint x="512" y="480" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNShape id="Participant_0xvqrmk_di" bpmnElement="message_sender" isHorizontal="true">
<dc:Bounds x="130" y="250" width="360" height="60" />
<bpmndi:BPMNLabel />

View File

@ -5,25 +5,31 @@
<bpmn:participant id="message-receiver" name="Message Receiver" />
<bpmn:messageFlow id="message_send_flow" name="Message Send Flow" sourceRef="send_message" targetRef="message-receiver" />
<bpmn:messageFlow id="message_response_flow" name="Message Response Flow" sourceRef="message-receiver" targetRef="receive_message_response" />
<bpmn:correlationKey name="message_correlation_key">
<bpmn:correlationPropertyRef>message_correlation_property_topica</bpmn:correlationPropertyRef>
<bpmn:correlationPropertyRef>message_correlation_property_topicb</bpmn:correlationPropertyRef>
<bpmn:textAnnotation id="TextAnnotation_0oxbpew">
<bpmn:text>The messages sent here are about an Invoice that can be uniquely identified by the customer_id ("sartography") and a purchase order number (1001)
It will fire a message connected to the invoice keys above, starting another process, which can communicate back to this specific process instance using the correct key.</bpmn:text>
</bpmn:textAnnotation>
<bpmn:association id="Association_1d6q7zd" sourceRef="message_initiator" targetRef="TextAnnotation_0oxbpew" />
<bpmn:correlationKey name="invoice">
<bpmn:correlationPropertyRef>po_number</bpmn:correlationPropertyRef>
<bpmn:correlationPropertyRef>customer_id</bpmn:correlationPropertyRef>
</bpmn:correlationKey>
</bpmn:collaboration>
<bpmn:correlationProperty id="message_correlation_property_topica" name="Message Correlation Property TopicA">
<bpmn:correlationPropertyRetrievalExpression messageRef="message_send">
<bpmn:messagePath>topica</bpmn:messagePath>
<bpmn:correlationProperty id="po_number" name="Purchase Order Number">
<bpmn:correlationPropertyRetrievalExpression messageRef="request_approval">
<bpmn:formalExpression>po_number</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
<bpmn:correlationPropertyRetrievalExpression messageRef="message_response">
<bpmn:messagePath>the_payload.topica</bpmn:messagePath>
<bpmn:correlationPropertyRetrievalExpression messageRef="approval_result">
<bpmn:formalExpression>po_number</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
</bpmn:correlationProperty>
<bpmn:correlationProperty id="message_correlation_property_topicb" name="Message Correlation Property TopicB">
<bpmn:correlationPropertyRetrievalExpression messageRef="message_send">
<bpmn:messagePath>topicb</bpmn:messagePath>
<bpmn:correlationProperty id="customer_id" name="Customer ID">
<bpmn:correlationPropertyRetrievalExpression messageRef="request_approval">
<bpmn:formalExpression>customer_id</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
<bpmn:correlationPropertyRetrievalExpression messageRef="message_response">
<bpmn:messagePath>the_payload.topicb</bpmn:messagePath>
<bpmn:correlationPropertyRetrievalExpression messageRef="approval_result">
<bpmn:formalExpression>customer_id</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
</bpmn:correlationProperty>
<bpmn:process id="message_send_process" name="Message Send Process" isExecutable="true">
@ -32,42 +38,45 @@
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_037vpjk" sourceRef="send_message" targetRef="receive_message_response" />
<bpmn:sequenceFlow id="Flow_1qgz6p0" sourceRef="receive_message_response" targetRef="Event_0kndoyu" />
<bpmn:sequenceFlow id="Flow_10conab" sourceRef="StartEvent_1" targetRef="set_topic" />
<bpmn:sequenceFlow id="Flow_10conab" sourceRef="StartEvent_1" targetRef="invoice_form" />
<bpmn:endEvent id="Event_0kndoyu">
<bpmn:incoming>Flow_1qgz6p0</bpmn:incoming>
</bpmn:endEvent>
<bpmn:intermediateCatchEvent id="receive_message_response" name="Receive Message Response">
<bpmn:intermediateCatchEvent id="receive_message_response" name="Receive Approval Result">
<bpmn:incoming>Flow_037vpjk</bpmn:incoming>
<bpmn:outgoing>Flow_1qgz6p0</bpmn:outgoing>
<bpmn:messageEventDefinition id="MessageEventDefinition_1l3n0zr" messageRef="message_response" />
<bpmn:messageEventDefinition id="MessageEventDefinition_1l3n0zr" messageRef="approval_result" />
</bpmn:intermediateCatchEvent>
<bpmn:sendTask id="send_message" name="Send Message" messageRef="message_send">
<bpmn:sendTask id="send_message" name="Request Approval" messageRef="request_approval">
<bpmn:extensionElements>
<spiffworkflow:preScript>the_topic = "first_conversation" </spiffworkflow:preScript>
</bpmn:extensionElements>
<bpmn:incoming>Flow_1ihr88m</bpmn:incoming>
<bpmn:incoming>Flow_02lw0q9</bpmn:incoming>
<bpmn:outgoing>Flow_037vpjk</bpmn:outgoing>
</bpmn:sendTask>
<bpmn:sequenceFlow id="Flow_1ihr88m" sourceRef="set_topic" targetRef="send_message" />
<bpmn:scriptTask id="set_topic" name="Set Topic" scriptFormat="python">
<bpmn:sequenceFlow id="Flow_02lw0q9" sourceRef="invoice_form" targetRef="send_message" />
<bpmn:userTask id="invoice_form" name="Create Invoice">
<bpmn:extensionElements>
<spiffworkflow:properties>
<spiffworkflow:property name="formJsonSchemaFilename" value="invoice_form.json" />
<spiffworkflow:property name="formUiSchemaFilename" value="invoice_ui.json" />
</spiffworkflow:properties>
</bpmn:extensionElements>
<bpmn:incoming>Flow_10conab</bpmn:incoming>
<bpmn:outgoing>Flow_1ihr88m</bpmn:outgoing>
<bpmn:script>
timestamp = time.time()
the_topica = f"first_conversation_a_{timestamp}"
the_topicb = f"first_conversation_b_{timestamp}"
del time</bpmn:script>
</bpmn:scriptTask>
<bpmn:outgoing>Flow_02lw0q9</bpmn:outgoing>
</bpmn:userTask>
</bpmn:process>
<bpmn:message id="message_send" name="Message Send">
<bpmn:message id="request_approval" name="Request Approval">
<bpmn:extensionElements>
<spiffworkflow:messagePayload>{
"topica": the_topica,
"topicb": the_topicb,
"customer_id": customer_id,
"po_number": po_number,
"amount": amount,
"description": description,
}</spiffworkflow:messagePayload>
</bpmn:extensionElements>
</bpmn:message>
<bpmn:message id="message_response" name="Message Response">
<bpmn:message id="approval_result" name="Approval Result">
<bpmn:extensionElements>
<spiffworkflow:messageVariable>the_payload</spiffworkflow:messageVariable>
</bpmn:extensionElements>
@ -78,22 +87,6 @@ del time</bpmn:script>
<dc:Bounds x="120" y="52" width="600" height="338" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_1ihr88m_di" bpmnElement="Flow_1ihr88m">
<di:waypoint x="350" y="177" />
<di:waypoint x="390" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_10conab_di" bpmnElement="Flow_10conab">
<di:waypoint x="215" y="177" />
<di:waypoint x="250" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1qgz6p0_di" bpmnElement="Flow_1qgz6p0">
<di:waypoint x="568" y="177" />
<di:waypoint x="622" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_037vpjk_di" bpmnElement="Flow_037vpjk">
<di:waypoint x="490" y="177" />
<di:waypoint x="532" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
@ -103,20 +96,44 @@ del time</bpmn:script>
<bpmndi:BPMNShape id="Event_0yt48xb_di" bpmnElement="receive_message_response">
<dc:Bounds x="532" y="159" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="507" y="129" width="88" height="27" />
<dc:Bounds x="508" y="129" width="86" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0vm33bu_di" bpmnElement="send_message">
<dc:Bounds x="390" y="137" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1t3nq1h_di" bpmnElement="set_topic">
<dc:Bounds x="250" y="137" width="100" height="80" />
<bpmndi:BPMNShape id="Activity_0798vfz_di" bpmnElement="invoice_form">
<dc:Bounds x="240" y="137" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_037vpjk_di" bpmnElement="Flow_037vpjk">
<di:waypoint x="490" y="177" />
<di:waypoint x="532" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1qgz6p0_di" bpmnElement="Flow_1qgz6p0">
<di:waypoint x="568" y="177" />
<di:waypoint x="622" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_10conab_di" bpmnElement="Flow_10conab">
<di:waypoint x="215" y="177" />
<di:waypoint x="240" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_02lw0q9_di" bpmnElement="Flow_02lw0q9">
<di:waypoint x="340" y="177" />
<di:waypoint x="390" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNShape id="Participant_158b3ei_di" bpmnElement="message-receiver" isHorizontal="true">
<dc:Bounds x="120" y="350" width="600" height="60" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="TextAnnotation_0oxbpew_di" bpmnElement="TextAnnotation_0oxbpew">
<dc:Bounds x="760" y="-30" width="226.98863220214844" height="155.9943084716797" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Association_1d6q7zd_di" bpmnElement="Association_1d6q7zd">
<di:waypoint x="699" y="52" />
<di:waypoint x="760" y="15" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1ueajoz_di" bpmnElement="message_send_flow">
<di:waypoint x="410" y="217" />
<di:waypoint x="410" y="350" />
@ -128,9 +145,9 @@ del time</bpmn:script>
<di:waypoint x="550" y="350" />
<di:waypoint x="550" y="195" />
<bpmndi:BPMNLabel>
<dc:Bounds x="552" y="294" width="76" height="27" />
<dc:Bounds x="552" y="294" width="77" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
</bpmn:definitions>

View File

@ -19,18 +19,18 @@
</bpmn:collaboration>
<bpmn:correlationProperty id="mcp_topica_one" name="MCP TopicA One">
<bpmn:correlationPropertyRetrievalExpression messageRef="message_send_one">
<bpmn:messagePath>topica_one</bpmn:messagePath>
<bpmn:formalExpression>topica_one</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
<bpmn:correlationPropertyRetrievalExpression messageRef="message_response_one">
<bpmn:messagePath>payload_var_one.topica_one</bpmn:messagePath>
<bpmn:formalExpression>topic_one_a</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
</bpmn:correlationProperty>
<bpmn:correlationProperty id="mcp_topicb_one" name="MCP TopicB_one">
<bpmn:correlationPropertyRetrievalExpression messageRef="message_send_one">
<bpmn:messagePath>topicb_one</bpmn:messagePath>
<bpmn:formalExpression>topicb_one</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
<bpmn:correlationPropertyRetrievalExpression messageRef="message_response_one">
<bpmn:messagePath>payload_var_one.topicb</bpmn:messagePath>
<bpmn:formalExpression>topic_one_b</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
</bpmn:correlationProperty>
<bpmn:process id="message_send_process" name="Message Send Process" isExecutable="true">
@ -117,18 +117,18 @@ del time</bpmn:script>
</bpmn:message>
<bpmn:correlationProperty id="mcp_topica_two" name="MCP Topica Two">
<bpmn:correlationPropertyRetrievalExpression messageRef="message_send_two">
<bpmn:messagePath>topica_two</bpmn:messagePath>
<bpmn:formalExpression>topica_two</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
<bpmn:correlationPropertyRetrievalExpression messageRef="message_response_two">
<bpmn:messagePath>topica_two</bpmn:messagePath>
<bpmn:formalExpression>topic_two_a</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
</bpmn:correlationProperty>
<bpmn:correlationProperty id="mcp_topicb_two" name="MCP Topicb Two">
<bpmn:correlationPropertyRetrievalExpression messageRef="message_send_two">
<bpmn:messagePath>topicb_two</bpmn:messagePath>
<bpmn:formalExpression>topicb_two</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
<bpmn:correlationPropertyRetrievalExpression messageRef="message_response_two">
<bpmn:messagePath>topicb_two</bpmn:messagePath>
<bpmn:formalExpression>topic_two_b</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
</bpmn:correlationProperty>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">

View File

@ -1,6 +1,10 @@
"""Test_message_service."""
import pytest
from flask import Flask
from flask.testing import FlaskClient
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.routes.messages_controller import message_send
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
@ -20,105 +24,182 @@ from spiffworkflow_backend.services.process_instance_service import (
class TestMessageService(BaseTest):
"""TestMessageService."""
def test_can_send_message_to_waiting_message(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
def test_message_from_api_into_running_process(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_can_send_message_to_waiting_message."""
"""This example workflow will send a message called 'request_approval' and then wait for a response message
of 'approval_result'. This test assures that it will fire the message with the correct correlation properties
and will respond only to a message called "approval_result' that has the matching correlation properties,
as sent by an API Call"""
self.payload = {
"customer_id": "Sartography",
"po_number": 1001,
"description": "We built a new feature for messages!",
"amount": "100.00"
}
self.start_sender_process(client, with_super_admin_user)
self.assure_a_message_was_sent()
self.assure_there_is_a_process_waiting_on_a_message()
# Make an API call to the service endpoint, but use the wrong po number
with pytest.raises(ApiError):
message_send("approval_result", {'payload': {'po_number': 5001}})
# Sound return an error when making an API call for right po number, wrong client
with pytest.raises(ApiError):
message_send("approval_result", {'payload': {'po_number': 1001, 'customer_id': 'jon'}})
# No error when calling with the correct parameters
response = message_send("approval_result", {'payload': {'po_number': 1001, 'customer_id': 'Sartography'}})
# There is no longer a waiting message
waiting_messages = MessageInstanceModel.query. \
filter_by(message_type="receive"). \
filter_by(status="ready"). \
filter_by(process_instance_id=self.process_instance.id).all()
assert len(waiting_messages) == 0
# The process has completed
assert self.process_instance.status == 'complete'
def test_single_conversation_between_two_processes(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Assure that communication between two processes works the same as making a call through the API, here
we have two process instances that are communicating with each other using one conversation about an
Invoice whose details are defined in the following message payload """
self.payload = {
"customer_id": "Sartography",
"po_number": 1001,
"description": "We built a new feature for messages!",
"amount": "100.00"
}
# Load up the definition for the receiving process (it has a message start event that should cause it to
# fire when a unique message comes through.
# Fire up the first process
second_process_model = load_test_spec(
"test_group/message_receive",
process_model_source_directory="message_send_one_conversation",
bpmn_file_name="message_receiver.bpmn"
)
# Now start the main process
self.start_sender_process(client, with_super_admin_user)
self.assure_a_message_was_sent()
# This is typically called in a background cron process, so we will manually call it
# here in the tests
# The first time it is called, it will instantiate a new instance of the message_recieve process
MessageService.process_message_instances()
# The sender process should still be waiting on a message to be returned to it ...
self.assure_there_is_a_process_waiting_on_a_message()
# The second time we call ths process_message_isntances (again it would typically be running on cron)
# it will deliver the message that was sent from the receiver back to the original sender.
MessageService.process_message_instances()
# But there should be no send message waiting for delivery, because
# the message receiving process should pick it up instantly via
# it's start event.
waiting_messages = MessageInstanceModel.query. \
filter_by(message_type="receive"). \
filter_by(status="ready"). \
filter_by(process_instance_id=self.process_instance.id).all()
assert len(waiting_messages) == 0
# The message sender process is complete
assert self.process_instance.status == 'complete'
# The message receiver process is also complete
message_receiver_process = ProcessInstanceModel.query.filter_by(process_model_identifier = "test_group/message_receive").first()
assert message_receiver_process.status == 'complete'
def start_sender_process(self,
client: FlaskClient,
with_super_admin_user: UserModel):
process_group_id = "test_group"
self.create_process_group(
client, with_super_admin_user, process_group_id, process_group_id
)
load_test_spec(
"test_group/message_receiver",
process_model = load_test_spec(
"test_group/message",
process_model_source_directory="message_send_one_conversation",
bpmn_file_name="message_receiver.bpmn",
)
process_model_sender = load_test_spec(
"test_group/message_sender",
process_model_source_directory="message_send_one_conversation",
bpmn_file_name="message_sender.bpmn",
bpmn_file_name="message_sender.bpmn", # Slightly misnamed, it sends and receives
)
process_instance_sender = ProcessInstanceService.create_process_instance_from_process_model_identifier(
process_model_sender.id,
self.process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier(
process_model.id,
with_super_admin_user,
)
processor_send_receive = ProcessInstanceProcessor(self.process_instance)
processor_send_receive.do_engine_steps(save=True)
task = processor_send_receive.get_all_user_tasks()[0]
human_task = self.process_instance.active_human_tasks[0]
spiff_task = processor_send_receive.__class__.get_task_by_bpmn_identifier(
human_task.task_name, processor_send_receive.bpmn_process_instance
)
processor_sender = ProcessInstanceProcessor(process_instance_sender)
processor_sender.do_engine_steps()
processor_sender.save()
ProcessInstanceService.complete_form_task(
processor_send_receive,
task,
self.payload,
with_super_admin_user,
human_task,
)
processor_send_receive.save()
def assure_a_message_was_sent(self):
# There should be one new send message for the given process instance.
send_messages = MessageInstanceModel.query. \
filter_by(message_type="send"). \
filter_by(process_instance_id=self.process_instance.id).all()
assert len(send_messages) == 1
send_message = send_messages[0]
# The payload should match because of how it is written in the Send task.
assert send_message.payload == self.payload, "The send message should match up with the payload"
assert send_message.message_model.identifier == "request_approval"
assert send_message.status == "ready"
assert len(send_message.message_correlations) == 2
message_instance_result = MessageInstanceModel.query.all()
assert len(message_instance_result) == 2
# ensure both message instances are for the same process instance
# it will be send_message and receive_message_response
assert (
message_instance_result[0].process_instance_id
== message_instance_result[1].process_instance_id
)
self.assure_correlation_properties_are_right(send_message)
message_instance_sender = message_instance_result[0]
assert message_instance_sender.process_instance_id == process_instance_sender.id
message_correlations = MessageCorrelationModel.query.all()
assert len(message_correlations) == 2
assert message_correlations[0].process_instance_id == process_instance_sender.id
message_correlations_message_instances = (
MessageCorrelationMessageInstanceModel.query.all()
)
assert len(message_correlations_message_instances) == 4
assert (
message_correlations_message_instances[0].message_instance_id
== message_instance_sender.id
)
assert (
message_correlations_message_instances[1].message_instance_id
== message_instance_sender.id
)
assert (
message_correlations_message_instances[2].message_instance_id
== message_instance_result[1].id
)
assert (
message_correlations_message_instances[3].message_instance_id
== message_instance_result[1].id
)
def assure_there_is_a_process_waiting_on_a_message(self):
# There should be one new send message for the given process instance.
waiting_messages = MessageInstanceModel.query. \
filter_by(message_type="receive"). \
filter_by(status="ready"). \
filter_by(process_instance_id=self.process_instance.id).all()
assert len(waiting_messages) == 1
waiting_message = waiting_messages[0]
self.assure_correlation_properties_are_right(waiting_message)
# process first message
MessageService.process_message_instances()
assert message_instance_sender.status == "completed"
process_instance_result = ProcessInstanceModel.query.all()
assert len(process_instance_result) == 2
process_instance_receiver = process_instance_result[1]
# just make sure it's a different process instance
assert process_instance_receiver.id != process_instance_sender.id
assert process_instance_receiver.status == "complete"
message_instance_result = MessageInstanceModel.query.all()
assert len(message_instance_result) == 3
message_instance_receiver = message_instance_result[1]
assert message_instance_receiver.id != message_instance_sender.id
assert message_instance_receiver.status == "ready"
# process second message
MessageService.process_message_instances()
message_instance_result = MessageInstanceModel.query.all()
assert len(message_instance_result) == 3
for message_instance in message_instance_result:
assert message_instance.status == "completed"
process_instance_result = ProcessInstanceModel.query.all()
assert len(process_instance_result) == 2
for process_instance in process_instance_result:
assert process_instance.status == "complete"
def assure_correlation_properties_are_right(self, message):
# Correlation Properties should match up
po_curr = next(c for c in message.message_correlations if c.name == "po_number")
customer_curr = next(c for c in message.message_correlations if c.name == "customer_id")
assert po_curr is not None
assert customer_curr is not None
assert po_curr.value == '1001'
assert customer_curr.value == "Sartography"
def test_can_send_message_to_multiple_process_models(
self,
@ -153,55 +234,34 @@ class TestMessageService(BaseTest):
process_instance_sender = ProcessInstanceService.create_process_instance_from_process_model_identifier(
process_model_sender.id,
user,
# process_group_identifier=process_model_sender.process_group_id,
)
user)
processor_sender = ProcessInstanceProcessor(process_instance_sender)
processor_sender.do_engine_steps()
processor_sender.save()
message_instance_result = MessageInstanceModel.query.all()
assert len(message_instance_result) == 3
# ensure both message instances are for the same process instance
# it will be send_message and receive_message_response
assert (
message_instance_result[0].process_instance_id
== message_instance_result[1].process_instance_id
)
# At this point, the message_sender process has fired two different messages but those
# processes have not started, and it is now paused, waiting for to receive a message. so
# we should have two sends and a receive.
assert MessageInstanceModel.query.filter_by(process_instance_id = process_instance_sender.id).count() == 3
assert MessageInstanceModel.query.count() == 3 # all messages are related to the instance
orig_send_messages = MessageInstanceModel.query.filter_by(message_type="send").all()
assert len(orig_send_messages) == 2
assert MessageInstanceModel.query.filter_by(message_type="receive").count() == 1
message_instance_sender = message_instance_result[0]
assert message_instance_sender.process_instance_id == process_instance_sender.id
message_correlations = MessageCorrelationModel.query.all()
assert len(message_correlations) == 4
assert message_correlations[0].process_instance_id == process_instance_sender.id
message_correlations_message_instances = (
MessageCorrelationMessageInstanceModel.query.all()
)
assert len(message_correlations_message_instances) == 6
assert (
message_correlations_message_instances[0].message_instance_id
== message_instance_sender.id
)
assert (
message_correlations_message_instances[1].message_instance_id
== message_instance_sender.id
)
assert (
message_correlations_message_instances[2].message_instance_id
== message_instance_result[1].id
)
assert (
message_correlations_message_instances[3].message_instance_id
== message_instance_result[1].id
)
message_instances = MessageInstanceModel.query.all()
# Each message instance should have two correlations
for mi in message_instances:
assert len(mi.message_correlations) == 2
# process first message
# process message instances
MessageService.process_message_instances()
assert message_instance_sender.status == "completed"
# Once complete the original send messages should be completed and two new instanges
# should now exist, one for each of the process instances ...
for osm in orig_send_messages:
assert osm.status == "completed"
process_instance_result = ProcessInstanceModel.query.all()
assert len(process_instance_result) == 3
process_instance_receiver_one = ProcessInstanceModel.query.filter_by(
process_model_identifier="test_group/message_receiver_one"
@ -241,9 +301,7 @@ class TestMessageService(BaseTest):
][0]
assert message_instance_receiver_one is not None
assert message_instance_receiver_two is not None
assert message_instance_receiver_one.id != message_instance_sender.id
assert message_instance_receiver_one.status == "ready"
assert message_instance_receiver_two.id != message_instance_sender.id
assert message_instance_receiver_two.status == "ready"
# process second message

View File

@ -1,127 +0,0 @@
"""Test_message_service."""
import pytest
from flask import Flask
from flask.testing import FlaskClient
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.routes.messages_controller import message_send
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.message_correlation import MessageCorrelationModel
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.message_service import MessageService
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
from spiffworkflow_backend.services.process_instance_service import (
ProcessInstanceService,
)
class TestMessageService(BaseTest):
"""TestMessageService."""
def test_message_sent(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""This example workflow will send a message called 'request_approval' and then wait for a response messge
of 'approval_result. This test assures that it will fire the message with the correct correlation properties
and will respond only to a message called "approval_result' that has the matching correlation properties."""
process_group_id = "test_group"
self.create_process_group(
client, with_super_admin_user, process_group_id, process_group_id
)
process_model = load_test_spec(
"test_group/message",
process_model_source_directory="message",
bpmn_file_name="message_send_receive.bpmn",
)
self.process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier(
process_model.id,
with_super_admin_user,
)
processor_send_receive = ProcessInstanceProcessor(self.process_instance)
processor_send_receive.do_engine_steps(save=True)
task = processor_send_receive.get_all_user_tasks()[0]
human_task = self.process_instance.active_human_tasks[0]
spiff_task = processor_send_receive.__class__.get_task_by_bpmn_identifier(
human_task.task_name, processor_send_receive.bpmn_process_instance
)
self.payload = {
"customer_id": "Sartography",
"po_number": 1001,
"description": "We build a new feature for messages!",
"amount": "100.00"
}
ProcessInstanceService.complete_form_task(
processor_send_receive,
task,
self.payload,
with_super_admin_user,
human_task,
)
processor_send_receive.save()
self.assure_a_message_was_sent()
self.assure_there_is_a_process_waiting_on_a_message()
## Should return an error when making an API call for the wrong po number
with pytest.raises(ApiError):
message_send("approval_result", {'payload': {'po_number' : 5001}})
## Sound return an error when making an API call for right po number, wrong client
with pytest.raises(ApiError):
message_send("approval_result", {'payload': {'po_number' : 1001, 'customer_id': 'jon'}})
## No error when calling with the correct parameters
response = message_send("approval_result", {'payload': {'po_number' : 1001, 'customer_id': 'Sartography'}})
## There is no longer a waiting message
waiting_messages = MessageInstanceModel.query. \
filter_by(message_type = "receive"). \
filter_by(process_instance_id = self.process_instance.id).all()
assert len(waiting_messages) == 0
def assure_a_message_was_sent(self):
# There should be one new send message for the given process instance.
send_messages = MessageInstanceModel.query. \
filter_by(message_type = "send"). \
filter_by(process_instance_id = self.process_instance.id).all()
assert len(send_messages) == 1
send_message = send_messages[0]
# The payload should match because of how it is written in the Send task.
assert send_message.payload == self.payload, "The send message should match up with the payload"
assert send_message.message_model.identifier == "request_approval"
assert send_message.status == "ready"
assert len(send_message.message_correlations) == 2
message_instance_result = MessageInstanceModel.query.all()
self.assure_correlation_properties_are_right(send_message)
def assure_there_is_a_process_waiting_on_a_message(self):
# There should be one new send message for the given process instance.
waiting_messages = MessageInstanceModel.query. \
filter_by(message_type = "receive"). \
filter_by(process_instance_id = self.process_instance.id).all()
assert len(waiting_messages) == 1
waiting_message = waiting_messages[0]
self.assure_correlation_properties_are_right(waiting_message)
def assure_correlation_properties_are_right(self, message):
# Correlation Properties should match up
po_curr = next(c for c in message.message_correlations if c.name == "po_number")
customer_curr = next(c for c in message.message_correlations if c.name == "customer_id")
assert po_curr is not None
assert customer_curr is not None
assert po_curr.value == '1001'
assert customer_curr.value == "Sartography"