diff --git a/SpiffWorkflow/SpiffWorkflow/bpmn/specs/events/event_definitions.py b/SpiffWorkflow/SpiffWorkflow/bpmn/specs/events/event_definitions.py
index 3d1192de6..aa7e04488 100644
--- a/SpiffWorkflow/SpiffWorkflow/bpmn/specs/events/event_definitions.py
+++ b/SpiffWorkflow/SpiffWorkflow/bpmn/specs/events/event_definitions.py
@@ -23,6 +23,7 @@ from calendar import monthrange
from time import timezone as tzoffset
from copy import deepcopy
+from SpiffWorkflow.exceptions import SpiffWorkflowException, WorkflowException
from SpiffWorkflow.task import TaskState
LOCALTZ = timezone(timedelta(seconds=-1 * tzoffset))
@@ -191,7 +192,7 @@ class MessageEventDefinition(NamedEventDefinition):
# However, there needs to be something to apply the correlations to in the
# standard case and this is line with the way Spiff works otherwise
event.payload = deepcopy(my_task.data)
- correlations = self.get_correlations(my_task.workflow.script_engine, event.payload)
+ correlations = self.get_correlations(my_task, event.payload)
my_task.workflow.correlations.update(correlations)
self._throw(event, my_task.workflow, my_task.workflow.outer_workflow, correlations)
@@ -205,13 +206,19 @@ class MessageEventDefinition(NamedEventDefinition):
if payload is not None:
my_task.set_data(**payload)
- def get_correlations(self, script_engine, payload):
+ def get_correlations(self, task, payload):
correlations = {}
for property in self.correlation_properties:
for key in property.correlation_keys:
if key not in correlations:
correlations[key] = {}
- correlations[key][property.name] = script_engine._evaluate(property.expression, payload)
+ try:
+ correlations[key][property.name] = task.workflow.script_engine._evaluate(property.expression, payload)
+ except WorkflowException as we:
+ we.add_note(f"Failed to evaluate correlation key '{key}'"
+ f" invalid expression '{property.expression}'")
+ we.task_spec = task.task_spec
+ raise we
return correlations
@@ -351,7 +358,7 @@ class TimerEventDefinition(EventDefinition):
return TimerEventDefinition.parse_iso_week(expression)
else:
return TimerEventDefinition.get_datetime(expression)
-
+
@staticmethod
def parse_iso_recurring_interval(expression):
components = expression.upper().replace('--', '/').strip('R').split('/')
@@ -510,7 +517,7 @@ class MultipleEventDefinition(EventDefinition):
if event == other:
return True
return False
-
+
def throw(self, my_task):
# Mutiple events throw all associated events when they fire
for event_definition in self.event_definitions:
@@ -518,4 +525,4 @@ class MultipleEventDefinition(EventDefinition):
event=event_definition,
workflow=my_task.workflow,
outer_workflow=my_task.workflow.outer_workflow
- )
\ No newline at end of file
+ )
diff --git a/SpiffWorkflow/SpiffWorkflow/spiff/specs/events/event_definitions.py b/SpiffWorkflow/SpiffWorkflow/spiff/specs/events/event_definitions.py
index c569fd728..d17e31be8 100644
--- a/SpiffWorkflow/SpiffWorkflow/spiff/specs/events/event_definitions.py
+++ b/SpiffWorkflow/SpiffWorkflow/spiff/specs/events/event_definitions.py
@@ -14,7 +14,7 @@ class MessageEventDefinition(MessageEventDefinition):
# we have to evaluate it again so we have to create a new event
event = MessageEventDefinition(self.name, self.correlation_properties, self.expression, self.message_var)
event.payload = my_task.workflow.script_engine.evaluate(my_task, self.expression)
- correlations = self.get_correlations(my_task.workflow.script_engine, event.payload)
+ correlations = self.get_correlations(my_task, event.payload)
my_task.workflow.correlations.update(correlations)
self._throw(event, my_task.workflow, my_task.workflow.outer_workflow, correlations)
diff --git a/spiffworkflow-backend/pyproject.toml b/spiffworkflow-backend/pyproject.toml
index cbf0b7ade..907c2206e 100644
--- a/spiffworkflow-backend/pyproject.toml
+++ b/spiffworkflow-backend/pyproject.toml
@@ -27,8 +27,8 @@ flask-marshmallow = "*"
flask-migrate = "*"
flask-restful = "*"
werkzeug = "*"
-SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "main"}
-# SpiffWorkflow = {develop = true, path = "../SpiffWorkflow" }
+#SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "main"}
+SpiffWorkflow = {develop = true, path = "../SpiffWorkflow" }
sentry-sdk = "^1.10"
sphinx-autoapi = "^2.0"
flask-bpmn = {git = "https://github.com/sartography/flask-bpmn", rev = "main"}
diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/exceptions/api_error.py b/spiffworkflow-backend/src/spiffworkflow_backend/exceptions/api_error.py
index 5fff05c2d..99e7237d4 100644
--- a/spiffworkflow-backend/src/spiffworkflow_backend/exceptions/api_error.py
+++ b/spiffworkflow-backend/src/spiffworkflow_backend/exceptions/api_error.py
@@ -157,7 +157,7 @@ class ApiError(Exception):
error_line=exp.error_line,
task_trace=exp.task_trace,
)
- elif isinstance(exp, WorkflowException):
+ elif isinstance(exp, WorkflowException) and exp.task_spec:
return ApiError.from_task_spec(error_code, message, exp.task_spec)
else:
return ApiError("workflow_error", str(exp))
diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance.py
index 5f5b76bad..d0d24550f 100644
--- a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance.py
+++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance.py
@@ -1,7 +1,7 @@
"""Message_instance."""
import enum
from dataclasses import dataclass
-from typing import Any
+from typing import Any, Self
from typing import Optional
from typing import TYPE_CHECKING
@@ -68,7 +68,15 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
"""Validate_status."""
return self.validate_enum_field(key, value, MessageStatuses)
- def correlates(self, correlation_dictionary):
+ def correlates(self, other_message_instance: Self) -> bool:
+ if other_message_instance.message_model_id != self.message_model_id:
+ return False
+ correlation_dict = {}
+ for c in other_message_instance.message_correlations:
+ correlation_dict[c.name]=c.value
+ return self.correlates_with_dictionary(correlation_dict)
+
+ def correlates_with_dictionary(self, correlation_dictionary: dict) -> bool:
"""Returns true if the given dictionary matches the correlation names and values connected to this message instance"""
for c in self.message_correlations:
# Fixme: Maybe we should look at typing the correlations and not forcing them to strings?
diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py
index 3a1143422..864a763c3 100644
--- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py
+++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py
@@ -125,7 +125,7 @@ def message_send(
# do any waiting message instances have matching correlations?
matching_message = None
for message_instance in message_instances:
- if message_instance.correlates(body["payload"]):
+ if message_instance.correlates_with_dictionary(body["payload"]):
matching_message = message_instance
process_instance = None
diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py
index 1595bb359..8277f5843 100644
--- a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py
+++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py
@@ -8,7 +8,7 @@ from sqlalchemy import select
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.message_correlation import MessageCorrelationModel
-from spiffworkflow_backend.models.message_instance import MessageInstanceModel
+from spiffworkflow_backend.models.message_instance import MessageInstanceModel, MessageStatuses
from spiffworkflow_backend.models.message_triggerable_process_model import (
MessageTriggerableProcessModel,
)
@@ -38,6 +38,7 @@ class MessageService:
message_instances_receive = MessageInstanceModel.query.filter_by(
message_type="receive", status="ready"
).all()
+
for message_instance_send in message_instances_send:
# check again in case another background process picked up the message
# while the previous one was running
@@ -121,8 +122,7 @@ class MessageService:
processor_receive.do_engine_steps(save=False)
processor_receive.bpmn_process_instance.catch_bpmn_message(
message_model_name,
- message_payload,
- correlations={},
+ message_payload
)
processor_receive.do_engine_steps(save=True)
@@ -156,58 +156,19 @@ class MessageService:
correlations={},
)
processor_receive.do_engine_steps(save=True)
+ message_instance_receive.status = MessageStatuses.completed.value
+ db.session.add(message_instance_receive)
+ db.session.commit()
@staticmethod
def get_message_instance_receive(
message_instance_send: MessageInstanceModel,
message_instances_receive: list[MessageInstanceModel],
) -> Optional[MessageInstanceModel]:
- """Get_message_instance_receive."""
-
- message_correlations_send = message_instance_send.message_correlations
-
- message_correlation_filter = []
- for message_correlation_send in message_correlations_send:
- message_correlation_filter.append(
- and_(
- MessageCorrelationModel.name == message_correlation_send.name,
- MessageCorrelationModel.value == message_correlation_send.value,
- MessageCorrelationModel.message_correlation_property_id
- == message_correlation_send.message_correlation_property_id,
- )
- )
-
- for message_instance_receive in message_instances_receive:
- # sqlalchemy supports select / where statements like active record apparantly
- # https://docs.sqlalchemy.org/en/14/core/tutorial.html#conjunctions
- message_correlation_select = (
- select([db.func.count()])
- .select_from(MessageCorrelationModel) # type: ignore
- .where(
- and_(
- MessageCorrelationModel.process_instance_id
- == message_instance_receive.process_instance_id,
- or_(*message_correlation_filter),
- )
- )
- .join(message_correlation_message_instance_table) # type: ignore
- .filter_by(
- message_instance_id=message_instance_receive.id,
- )
- )
- message_correlations_receive = db.session.execute(
- message_correlation_select
- )
-
- # since the query matches on name, value, and message_instance_receive.id, if the counts
- # message correlations found are the same, then this should be the relevant message
- if (
- message_correlations_receive.scalar() == len(message_correlations_send)
- and message_instance_receive.message_model_id
- == message_instance_send.message_model_id
- ):
- return message_instance_receive
-
+ """Returns the message instance that correlates to the send message, or None if nothing correlates."""
+ for message_instance in message_instances_receive:
+ if message_instance.correlates(message_instance_send):
+ return message_instance
return None
@staticmethod
diff --git a/spiffworkflow-backend/tests/data/message_send_one_conversation/message_receiver.bpmn b/spiffworkflow-backend/tests/data/message_send_one_conversation/message_receiver.bpmn
index 828f7d2ec..53eadc415 100644
--- a/spiffworkflow-backend/tests/data/message_send_one_conversation/message_receiver.bpmn
+++ b/spiffworkflow-backend/tests/data/message_send_one_conversation/message_receiver.bpmn
@@ -1,42 +1,39 @@
-
+
-
- message_correlation_property_topica
- message_correlation_property_topicb
+
+ customer_id
+ po_number
-
-
- topica
+
+
+ customer_id
-
- the_payload.topica
+
+ customer_id
-
-
- topicb
+
+
+ po_number
-
- the_payload.topicb
+
+ po_number
-
+
- the_payload
+ invoice
-
+
- { "the_payload": {
-"topica": the_payload.topica,
-"topicb": the_payload.topicb,
-}}
+ invoice
@@ -45,28 +42,21 @@
Flow_11r9uiw
-
+
Flow_0fruoax
Flow_11r9uiw
Flow_0fruoax
-
+
+
-
-
-
-
-
-
-
-
@@ -79,6 +69,14 @@
+
+
+
+
+
+
+
+
diff --git a/spiffworkflow-backend/tests/data/message_send_one_conversation/message_sender.bpmn b/spiffworkflow-backend/tests/data/message_send_one_conversation/message_sender.bpmn
index 7bda31eb7..299f49e58 100644
--- a/spiffworkflow-backend/tests/data/message_send_one_conversation/message_sender.bpmn
+++ b/spiffworkflow-backend/tests/data/message_send_one_conversation/message_sender.bpmn
@@ -5,25 +5,31 @@
-
- message_correlation_property_topica
- message_correlation_property_topicb
+
+ The messages sent here are about an Invoice that can be uniquely identified by the customer_id ("sartography") and a purchase order number (1001)
+
+It will fire a message connected to the invoice keys above, starting another process, which can communicate back to this specific process instance using the correct key.
+
+
+
+ po_number
+ customer_id
-
-
- topica
+
+
+ po_number
-
- the_payload.topica
+
+ po_number
-
-
- topicb
+
+
+ customer_id
-
- the_payload.topicb
+
+ customer_id
@@ -32,42 +38,45 @@
-
+
Flow_1qgz6p0
-
+
Flow_037vpjk
Flow_1qgz6p0
-
+
-
+
the_topic = "first_conversation"
- Flow_1ihr88m
+ Flow_02lw0q9
Flow_037vpjk
-
-
+
+
+
+
+
+
+
+
Flow_10conab
- Flow_1ihr88m
-
-timestamp = time.time()
-the_topica = f"first_conversation_a_{timestamp}"
-the_topicb = f"first_conversation_b_{timestamp}"
-del time
-
+ Flow_02lw0q9
+
-
+
{
-"topica": the_topica,
-"topicb": the_topicb,
+"customer_id": customer_id,
+"po_number": po_number,
+"amount": amount,
+"description": description,
}
-
+
the_payload
@@ -78,22 +87,6 @@ del time
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -103,20 +96,44 @@ del time
-
+
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -128,9 +145,9 @@ del time
-
+
-
+
\ No newline at end of file
diff --git a/spiffworkflow-backend/tests/data/message_send_two_conversations/message_sender.bpmn b/spiffworkflow-backend/tests/data/message_send_two_conversations/message_sender.bpmn
index 160517055..671f2fee6 100644
--- a/spiffworkflow-backend/tests/data/message_send_two_conversations/message_sender.bpmn
+++ b/spiffworkflow-backend/tests/data/message_send_two_conversations/message_sender.bpmn
@@ -19,18 +19,18 @@
- topica_one
+ topica_one
- payload_var_one.topica_one
+ topic_one_a
- topicb_one
+ topicb_one
- payload_var_one.topicb
+ topic_one_b
@@ -117,18 +117,18 @@ del time
- topica_two
+ topica_two
- topica_two
+ topic_two_a
- topicb_two
+ topicb_two
- topicb_two
+ topic_two_b
diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py
index 698cf6c8a..cc050789f 100644
--- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py
+++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py
@@ -1,6 +1,10 @@
"""Test_message_service."""
+import pytest
from flask import Flask
from flask.testing import FlaskClient
+
+from spiffworkflow_backend.exceptions.api_error import ApiError
+from spiffworkflow_backend.routes.messages_controller import message_send
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
@@ -20,105 +24,182 @@ from spiffworkflow_backend.services.process_instance_service import (
class TestMessageService(BaseTest):
"""TestMessageService."""
- def test_can_send_message_to_waiting_message(
- self,
- app: Flask,
- client: FlaskClient,
- with_db_and_bpmn_file_cleanup: None,
- with_super_admin_user: UserModel,
+ def test_message_from_api_into_running_process(
+ self,
+ app: Flask,
+ client: FlaskClient,
+ with_db_and_bpmn_file_cleanup: None,
+ with_super_admin_user: UserModel,
) -> None:
- """Test_can_send_message_to_waiting_message."""
+ """This example workflow will send a message called 'request_approval' and then wait for a response message
+ of 'approval_result'. This test assures that it will fire the message with the correct correlation properties
+ and will respond only to a message called "approval_result' that has the matching correlation properties,
+ as sent by an API Call"""
+
+ self.payload = {
+ "customer_id": "Sartography",
+ "po_number": 1001,
+ "description": "We built a new feature for messages!",
+ "amount": "100.00"
+ }
+
+ self.start_sender_process(client, with_super_admin_user)
+ self.assure_a_message_was_sent()
+ self.assure_there_is_a_process_waiting_on_a_message()
+
+ # Make an API call to the service endpoint, but use the wrong po number
+ with pytest.raises(ApiError):
+ message_send("approval_result", {'payload': {'po_number': 5001}})
+
+ # Sound return an error when making an API call for right po number, wrong client
+ with pytest.raises(ApiError):
+ message_send("approval_result", {'payload': {'po_number': 1001, 'customer_id': 'jon'}})
+
+ # No error when calling with the correct parameters
+ response = message_send("approval_result", {'payload': {'po_number': 1001, 'customer_id': 'Sartography'}})
+
+ # There is no longer a waiting message
+ waiting_messages = MessageInstanceModel.query. \
+ filter_by(message_type="receive"). \
+ filter_by(status="ready"). \
+ filter_by(process_instance_id=self.process_instance.id).all()
+ assert len(waiting_messages) == 0
+
+ # The process has completed
+ assert self.process_instance.status == 'complete'
+
+ def test_single_conversation_between_two_processes(
+ self,
+ app: Flask,
+ client: FlaskClient,
+ with_db_and_bpmn_file_cleanup: None,
+ with_super_admin_user: UserModel,
+ ) -> None:
+ """Assure that communication between two processes works the same as making a call through the API, here
+ we have two process instances that are communicating with each other using one conversation about an
+ Invoice whose details are defined in the following message payload """
+ self.payload = {
+ "customer_id": "Sartography",
+ "po_number": 1001,
+ "description": "We built a new feature for messages!",
+ "amount": "100.00"
+ }
+
+ # Load up the definition for the receiving process (it has a message start event that should cause it to
+ # fire when a unique message comes through.
+ # Fire up the first process
+ second_process_model = load_test_spec(
+ "test_group/message_receive",
+ process_model_source_directory="message_send_one_conversation",
+ bpmn_file_name="message_receiver.bpmn"
+ )
+
+ # Now start the main process
+ self.start_sender_process(client, with_super_admin_user)
+ self.assure_a_message_was_sent()
+
+ # This is typically called in a background cron process, so we will manually call it
+ # here in the tests
+ # The first time it is called, it will instantiate a new instance of the message_recieve process
+ MessageService.process_message_instances()
+
+ # The sender process should still be waiting on a message to be returned to it ...
+ self.assure_there_is_a_process_waiting_on_a_message()
+
+ # The second time we call ths process_message_isntances (again it would typically be running on cron)
+ # it will deliver the message that was sent from the receiver back to the original sender.
+ MessageService.process_message_instances()
+
+
+ # But there should be no send message waiting for delivery, because
+ # the message receiving process should pick it up instantly via
+ # it's start event.
+ waiting_messages = MessageInstanceModel.query. \
+ filter_by(message_type="receive"). \
+ filter_by(status="ready"). \
+ filter_by(process_instance_id=self.process_instance.id).all()
+ assert len(waiting_messages) == 0
+
+ # The message sender process is complete
+ assert self.process_instance.status == 'complete'
+
+ # The message receiver process is also complete
+ message_receiver_process = ProcessInstanceModel.query.filter_by(process_model_identifier = "test_group/message_receive").first()
+ assert message_receiver_process.status == 'complete'
+
+
+
+
+
+ def start_sender_process(self,
+ client: FlaskClient,
+ with_super_admin_user: UserModel):
process_group_id = "test_group"
self.create_process_group(
client, with_super_admin_user, process_group_id, process_group_id
)
- load_test_spec(
- "test_group/message_receiver",
+ process_model = load_test_spec(
+ "test_group/message",
process_model_source_directory="message_send_one_conversation",
- bpmn_file_name="message_receiver.bpmn",
- )
- process_model_sender = load_test_spec(
- "test_group/message_sender",
- process_model_source_directory="message_send_one_conversation",
- bpmn_file_name="message_sender.bpmn",
+ bpmn_file_name="message_sender.bpmn", # Slightly misnamed, it sends and receives
)
- process_instance_sender = ProcessInstanceService.create_process_instance_from_process_model_identifier(
- process_model_sender.id,
+ self.process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier(
+ process_model.id,
with_super_admin_user,
)
+ processor_send_receive = ProcessInstanceProcessor(self.process_instance)
+ processor_send_receive.do_engine_steps(save=True)
+ task = processor_send_receive.get_all_user_tasks()[0]
+ human_task = self.process_instance.active_human_tasks[0]
+ spiff_task = processor_send_receive.__class__.get_task_by_bpmn_identifier(
+ human_task.task_name, processor_send_receive.bpmn_process_instance
+ )
- processor_sender = ProcessInstanceProcessor(process_instance_sender)
- processor_sender.do_engine_steps()
- processor_sender.save()
+ ProcessInstanceService.complete_form_task(
+ processor_send_receive,
+ task,
+ self.payload,
+ with_super_admin_user,
+ human_task,
+ )
+ processor_send_receive.save()
+ def assure_a_message_was_sent(self):
+ # There should be one new send message for the given process instance.
+ send_messages = MessageInstanceModel.query. \
+ filter_by(message_type="send"). \
+ filter_by(process_instance_id=self.process_instance.id).all()
+ assert len(send_messages) == 1
+ send_message = send_messages[0]
+
+ # The payload should match because of how it is written in the Send task.
+ assert send_message.payload == self.payload, "The send message should match up with the payload"
+ assert send_message.message_model.identifier == "request_approval"
+ assert send_message.status == "ready"
+ assert len(send_message.message_correlations) == 2
message_instance_result = MessageInstanceModel.query.all()
- assert len(message_instance_result) == 2
- # ensure both message instances are for the same process instance
- # it will be send_message and receive_message_response
- assert (
- message_instance_result[0].process_instance_id
- == message_instance_result[1].process_instance_id
- )
+ self.assure_correlation_properties_are_right(send_message)
- message_instance_sender = message_instance_result[0]
- assert message_instance_sender.process_instance_id == process_instance_sender.id
- message_correlations = MessageCorrelationModel.query.all()
- assert len(message_correlations) == 2
- assert message_correlations[0].process_instance_id == process_instance_sender.id
- message_correlations_message_instances = (
- MessageCorrelationMessageInstanceModel.query.all()
- )
- assert len(message_correlations_message_instances) == 4
- assert (
- message_correlations_message_instances[0].message_instance_id
- == message_instance_sender.id
- )
- assert (
- message_correlations_message_instances[1].message_instance_id
- == message_instance_sender.id
- )
- assert (
- message_correlations_message_instances[2].message_instance_id
- == message_instance_result[1].id
- )
- assert (
- message_correlations_message_instances[3].message_instance_id
- == message_instance_result[1].id
- )
+ def assure_there_is_a_process_waiting_on_a_message(self):
+ # There should be one new send message for the given process instance.
+ waiting_messages = MessageInstanceModel.query. \
+ filter_by(message_type="receive"). \
+ filter_by(status="ready"). \
+ filter_by(process_instance_id=self.process_instance.id).all()
+ assert len(waiting_messages) == 1
+ waiting_message = waiting_messages[0]
+ self.assure_correlation_properties_are_right(waiting_message)
- # process first message
- MessageService.process_message_instances()
- assert message_instance_sender.status == "completed"
-
- process_instance_result = ProcessInstanceModel.query.all()
-
- assert len(process_instance_result) == 2
- process_instance_receiver = process_instance_result[1]
-
- # just make sure it's a different process instance
- assert process_instance_receiver.id != process_instance_sender.id
- assert process_instance_receiver.status == "complete"
-
- message_instance_result = MessageInstanceModel.query.all()
- assert len(message_instance_result) == 3
- message_instance_receiver = message_instance_result[1]
- assert message_instance_receiver.id != message_instance_sender.id
- assert message_instance_receiver.status == "ready"
-
- # process second message
- MessageService.process_message_instances()
-
- message_instance_result = MessageInstanceModel.query.all()
- assert len(message_instance_result) == 3
- for message_instance in message_instance_result:
- assert message_instance.status == "completed"
-
- process_instance_result = ProcessInstanceModel.query.all()
- assert len(process_instance_result) == 2
- for process_instance in process_instance_result:
- assert process_instance.status == "complete"
+ def assure_correlation_properties_are_right(self, message):
+ # Correlation Properties should match up
+ po_curr = next(c for c in message.message_correlations if c.name == "po_number")
+ customer_curr = next(c for c in message.message_correlations if c.name == "customer_id")
+ assert po_curr is not None
+ assert customer_curr is not None
+ assert po_curr.value == '1001'
+ assert customer_curr.value == "Sartography"
def test_can_send_message_to_multiple_process_models(
self,
@@ -153,55 +234,34 @@ class TestMessageService(BaseTest):
process_instance_sender = ProcessInstanceService.create_process_instance_from_process_model_identifier(
process_model_sender.id,
- user,
- # process_group_identifier=process_model_sender.process_group_id,
- )
+ user)
processor_sender = ProcessInstanceProcessor(process_instance_sender)
processor_sender.do_engine_steps()
processor_sender.save()
- message_instance_result = MessageInstanceModel.query.all()
- assert len(message_instance_result) == 3
- # ensure both message instances are for the same process instance
- # it will be send_message and receive_message_response
- assert (
- message_instance_result[0].process_instance_id
- == message_instance_result[1].process_instance_id
- )
+ # At this point, the message_sender process has fired two different messages but those
+ # processes have not started, and it is now paused, waiting for to receive a message. so
+ # we should have two sends and a receive.
+ assert MessageInstanceModel.query.filter_by(process_instance_id = process_instance_sender.id).count() == 3
+ assert MessageInstanceModel.query.count() == 3 # all messages are related to the instance
+ orig_send_messages = MessageInstanceModel.query.filter_by(message_type="send").all()
+ assert len(orig_send_messages) == 2
+ assert MessageInstanceModel.query.filter_by(message_type="receive").count() == 1
- message_instance_sender = message_instance_result[0]
- assert message_instance_sender.process_instance_id == process_instance_sender.id
- message_correlations = MessageCorrelationModel.query.all()
- assert len(message_correlations) == 4
- assert message_correlations[0].process_instance_id == process_instance_sender.id
- message_correlations_message_instances = (
- MessageCorrelationMessageInstanceModel.query.all()
- )
- assert len(message_correlations_message_instances) == 6
- assert (
- message_correlations_message_instances[0].message_instance_id
- == message_instance_sender.id
- )
- assert (
- message_correlations_message_instances[1].message_instance_id
- == message_instance_sender.id
- )
- assert (
- message_correlations_message_instances[2].message_instance_id
- == message_instance_result[1].id
- )
- assert (
- message_correlations_message_instances[3].message_instance_id
- == message_instance_result[1].id
- )
+ message_instances = MessageInstanceModel.query.all()
+ # Each message instance should have two correlations
+ for mi in message_instances:
+ assert len(mi.message_correlations) == 2
- # process first message
+ # process message instances
MessageService.process_message_instances()
- assert message_instance_sender.status == "completed"
+ # Once complete the original send messages should be completed and two new instanges
+ # should now exist, one for each of the process instances ...
+ for osm in orig_send_messages:
+ assert osm.status == "completed"
process_instance_result = ProcessInstanceModel.query.all()
-
assert len(process_instance_result) == 3
process_instance_receiver_one = ProcessInstanceModel.query.filter_by(
process_model_identifier="test_group/message_receiver_one"
@@ -241,9 +301,7 @@ class TestMessageService(BaseTest):
][0]
assert message_instance_receiver_one is not None
assert message_instance_receiver_two is not None
- assert message_instance_receiver_one.id != message_instance_sender.id
assert message_instance_receiver_one.status == "ready"
- assert message_instance_receiver_two.id != message_instance_sender.id
assert message_instance_receiver_two.status == "ready"
# process second message
diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service_2.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service_2.py
deleted file mode 100644
index ca9833ac3..000000000
--- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service_2.py
+++ /dev/null
@@ -1,127 +0,0 @@
-"""Test_message_service."""
-import pytest
-from flask import Flask
-from flask.testing import FlaskClient
-
-from spiffworkflow_backend.exceptions.api_error import ApiError
-from spiffworkflow_backend.routes.messages_controller import message_send
-from tests.spiffworkflow_backend.helpers.base_test import BaseTest
-from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
-
-from spiffworkflow_backend.models.message_correlation import MessageCorrelationModel
-from spiffworkflow_backend.models.message_instance import MessageInstanceModel
-from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
-from spiffworkflow_backend.models.user import UserModel
-from spiffworkflow_backend.services.message_service import MessageService
-from spiffworkflow_backend.services.process_instance_processor import (
- ProcessInstanceProcessor,
-)
-from spiffworkflow_backend.services.process_instance_service import (
- ProcessInstanceService,
-)
-
-
-class TestMessageService(BaseTest):
- """TestMessageService."""
-
- def test_message_sent(
- self,
- app: Flask,
- client: FlaskClient,
- with_db_and_bpmn_file_cleanup: None,
- with_super_admin_user: UserModel,
- ) -> None:
- """This example workflow will send a message called 'request_approval' and then wait for a response messge
- of 'approval_result. This test assures that it will fire the message with the correct correlation properties
- and will respond only to a message called "approval_result' that has the matching correlation properties."""
- process_group_id = "test_group"
- self.create_process_group(
- client, with_super_admin_user, process_group_id, process_group_id
- )
-
- process_model = load_test_spec(
- "test_group/message",
- process_model_source_directory="message",
- bpmn_file_name="message_send_receive.bpmn",
- )
-
- self.process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier(
- process_model.id,
- with_super_admin_user,
- )
- processor_send_receive = ProcessInstanceProcessor(self.process_instance)
- processor_send_receive.do_engine_steps(save=True)
- task = processor_send_receive.get_all_user_tasks()[0]
- human_task = self.process_instance.active_human_tasks[0]
- spiff_task = processor_send_receive.__class__.get_task_by_bpmn_identifier(
- human_task.task_name, processor_send_receive.bpmn_process_instance
- )
- self.payload = {
- "customer_id": "Sartography",
- "po_number": 1001,
- "description": "We build a new feature for messages!",
- "amount": "100.00"
- }
-
- ProcessInstanceService.complete_form_task(
- processor_send_receive,
- task,
- self.payload,
- with_super_admin_user,
- human_task,
- )
- processor_send_receive.save()
- self.assure_a_message_was_sent()
- self.assure_there_is_a_process_waiting_on_a_message()
-
- ## Should return an error when making an API call for the wrong po number
- with pytest.raises(ApiError):
- message_send("approval_result", {'payload': {'po_number' : 5001}})
-
- ## Sound return an error when making an API call for right po number, wrong client
- with pytest.raises(ApiError):
- message_send("approval_result", {'payload': {'po_number' : 1001, 'customer_id': 'jon'}})
-
- ## No error when calling with the correct parameters
- response = message_send("approval_result", {'payload': {'po_number' : 1001, 'customer_id': 'Sartography'}})
-
- ## There is no longer a waiting message
- waiting_messages = MessageInstanceModel.query. \
- filter_by(message_type = "receive"). \
- filter_by(process_instance_id = self.process_instance.id).all()
- assert len(waiting_messages) == 0
-
- def assure_a_message_was_sent(self):
- # There should be one new send message for the given process instance.
- send_messages = MessageInstanceModel.query. \
- filter_by(message_type = "send"). \
- filter_by(process_instance_id = self.process_instance.id).all()
- assert len(send_messages) == 1
- send_message = send_messages[0]
-
- # The payload should match because of how it is written in the Send task.
- assert send_message.payload == self.payload, "The send message should match up with the payload"
- assert send_message.message_model.identifier == "request_approval"
- assert send_message.status == "ready"
- assert len(send_message.message_correlations) == 2
- message_instance_result = MessageInstanceModel.query.all()
- self.assure_correlation_properties_are_right(send_message)
-
- def assure_there_is_a_process_waiting_on_a_message(self):
- # There should be one new send message for the given process instance.
- waiting_messages = MessageInstanceModel.query. \
- filter_by(message_type = "receive"). \
- filter_by(process_instance_id = self.process_instance.id).all()
- assert len(waiting_messages) == 1
- waiting_message = waiting_messages[0]
- self.assure_correlation_properties_are_right(waiting_message)
-
- def assure_correlation_properties_are_right(self, message):
- # Correlation Properties should match up
- po_curr = next(c for c in message.message_correlations if c.name == "po_number")
- customer_curr = next(c for c in message.message_correlations if c.name == "customer_id")
- assert po_curr is not None
- assert customer_curr is not None
- assert po_curr.value == '1001'
- assert customer_curr.value == "Sartography"
-