From 28ac9ef8728dce15f2c5600b5b668e8270ea9980 Mon Sep 17 00:00:00 2001 From: Dan Date: Sat, 18 Feb 2023 13:09:58 -0500 Subject: [PATCH] * Re-work message tests so I could wrap my simple head around what was happening - just needed an example that made sense to me. * Clear out complex get_message_instance_receive how that many-to-many works. * Create decent error messages when correlations fail * Move correlation checks into the MessageInstance class * The APIError could bomb out ugly if it hit a workflow exception with not Task Spec. --- .../bpmn/specs/events/event_definitions.py | 19 +- .../spiff/specs/events/event_definitions.py | 2 +- spiffworkflow-backend/pyproject.toml | 4 +- .../exceptions/api_error.py | 2 +- .../models/message_instance.py | 12 +- .../routes/messages_controller.py | 2 +- .../services/message_service.py | 59 +--- .../message_receiver.bpmn | 60 ++-- .../message_sender.bpmn | 121 ++++--- .../message_sender.bpmn | 16 +- .../unit/test_message_service.py | 306 +++++++++++------- .../unit/test_message_service_2.py | 127 -------- 12 files changed, 326 insertions(+), 404 deletions(-) delete mode 100644 spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service_2.py diff --git a/SpiffWorkflow/SpiffWorkflow/bpmn/specs/events/event_definitions.py b/SpiffWorkflow/SpiffWorkflow/bpmn/specs/events/event_definitions.py index 3d1192de..aa7e0448 100644 --- a/SpiffWorkflow/SpiffWorkflow/bpmn/specs/events/event_definitions.py +++ b/SpiffWorkflow/SpiffWorkflow/bpmn/specs/events/event_definitions.py @@ -23,6 +23,7 @@ from calendar import monthrange from time import timezone as tzoffset from copy import deepcopy +from SpiffWorkflow.exceptions import SpiffWorkflowException, WorkflowException from SpiffWorkflow.task import TaskState LOCALTZ = timezone(timedelta(seconds=-1 * tzoffset)) @@ -191,7 +192,7 @@ class MessageEventDefinition(NamedEventDefinition): # However, there needs to be something to apply the correlations to in the # standard case and this is line with the way Spiff works otherwise event.payload = deepcopy(my_task.data) - correlations = self.get_correlations(my_task.workflow.script_engine, event.payload) + correlations = self.get_correlations(my_task, event.payload) my_task.workflow.correlations.update(correlations) self._throw(event, my_task.workflow, my_task.workflow.outer_workflow, correlations) @@ -205,13 +206,19 @@ class MessageEventDefinition(NamedEventDefinition): if payload is not None: my_task.set_data(**payload) - def get_correlations(self, script_engine, payload): + def get_correlations(self, task, payload): correlations = {} for property in self.correlation_properties: for key in property.correlation_keys: if key not in correlations: correlations[key] = {} - correlations[key][property.name] = script_engine._evaluate(property.expression, payload) + try: + correlations[key][property.name] = task.workflow.script_engine._evaluate(property.expression, payload) + except WorkflowException as we: + we.add_note(f"Failed to evaluate correlation key '{key}'" + f" invalid expression '{property.expression}'") + we.task_spec = task.task_spec + raise we return correlations @@ -351,7 +358,7 @@ class TimerEventDefinition(EventDefinition): return TimerEventDefinition.parse_iso_week(expression) else: return TimerEventDefinition.get_datetime(expression) - + @staticmethod def parse_iso_recurring_interval(expression): components = expression.upper().replace('--', '/').strip('R').split('/') @@ -510,7 +517,7 @@ class MultipleEventDefinition(EventDefinition): if event == other: return True return False - + def throw(self, my_task): # Mutiple events throw all associated events when they fire for event_definition in self.event_definitions: @@ -518,4 +525,4 @@ class MultipleEventDefinition(EventDefinition): event=event_definition, workflow=my_task.workflow, outer_workflow=my_task.workflow.outer_workflow - ) \ No newline at end of file + ) diff --git a/SpiffWorkflow/SpiffWorkflow/spiff/specs/events/event_definitions.py b/SpiffWorkflow/SpiffWorkflow/spiff/specs/events/event_definitions.py index c569fd72..d17e31be 100644 --- a/SpiffWorkflow/SpiffWorkflow/spiff/specs/events/event_definitions.py +++ b/SpiffWorkflow/SpiffWorkflow/spiff/specs/events/event_definitions.py @@ -14,7 +14,7 @@ class MessageEventDefinition(MessageEventDefinition): # we have to evaluate it again so we have to create a new event event = MessageEventDefinition(self.name, self.correlation_properties, self.expression, self.message_var) event.payload = my_task.workflow.script_engine.evaluate(my_task, self.expression) - correlations = self.get_correlations(my_task.workflow.script_engine, event.payload) + correlations = self.get_correlations(my_task, event.payload) my_task.workflow.correlations.update(correlations) self._throw(event, my_task.workflow, my_task.workflow.outer_workflow, correlations) diff --git a/spiffworkflow-backend/pyproject.toml b/spiffworkflow-backend/pyproject.toml index cbf0b7ad..907c2206 100644 --- a/spiffworkflow-backend/pyproject.toml +++ b/spiffworkflow-backend/pyproject.toml @@ -27,8 +27,8 @@ flask-marshmallow = "*" flask-migrate = "*" flask-restful = "*" werkzeug = "*" -SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "main"} -# SpiffWorkflow = {develop = true, path = "../SpiffWorkflow" } +#SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "main"} +SpiffWorkflow = {develop = true, path = "../SpiffWorkflow" } sentry-sdk = "^1.10" sphinx-autoapi = "^2.0" flask-bpmn = {git = "https://github.com/sartography/flask-bpmn", rev = "main"} diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/exceptions/api_error.py b/spiffworkflow-backend/src/spiffworkflow_backend/exceptions/api_error.py index 5fff05c2..99e7237d 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/exceptions/api_error.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/exceptions/api_error.py @@ -157,7 +157,7 @@ class ApiError(Exception): error_line=exp.error_line, task_trace=exp.task_trace, ) - elif isinstance(exp, WorkflowException): + elif isinstance(exp, WorkflowException) and exp.task_spec: return ApiError.from_task_spec(error_code, message, exp.task_spec) else: return ApiError("workflow_error", str(exp)) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance.py index 5f5b76ba..d0d24550 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance.py @@ -1,7 +1,7 @@ """Message_instance.""" import enum from dataclasses import dataclass -from typing import Any +from typing import Any, Self from typing import Optional from typing import TYPE_CHECKING @@ -68,7 +68,15 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel): """Validate_status.""" return self.validate_enum_field(key, value, MessageStatuses) - def correlates(self, correlation_dictionary): + def correlates(self, other_message_instance: Self) -> bool: + if other_message_instance.message_model_id != self.message_model_id: + return False + correlation_dict = {} + for c in other_message_instance.message_correlations: + correlation_dict[c.name]=c.value + return self.correlates_with_dictionary(correlation_dict) + + def correlates_with_dictionary(self, correlation_dictionary: dict) -> bool: """Returns true if the given dictionary matches the correlation names and values connected to this message instance""" for c in self.message_correlations: # Fixme: Maybe we should look at typing the correlations and not forcing them to strings? diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py index 3a114342..864a763c 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py @@ -125,7 +125,7 @@ def message_send( # do any waiting message instances have matching correlations? matching_message = None for message_instance in message_instances: - if message_instance.correlates(body["payload"]): + if message_instance.correlates_with_dictionary(body["payload"]): matching_message = message_instance process_instance = None diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py index 1595bb35..8277f584 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py @@ -8,7 +8,7 @@ from sqlalchemy import select from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.message_correlation import MessageCorrelationModel -from spiffworkflow_backend.models.message_instance import MessageInstanceModel +from spiffworkflow_backend.models.message_instance import MessageInstanceModel, MessageStatuses from spiffworkflow_backend.models.message_triggerable_process_model import ( MessageTriggerableProcessModel, ) @@ -38,6 +38,7 @@ class MessageService: message_instances_receive = MessageInstanceModel.query.filter_by( message_type="receive", status="ready" ).all() + for message_instance_send in message_instances_send: # check again in case another background process picked up the message # while the previous one was running @@ -121,8 +122,7 @@ class MessageService: processor_receive.do_engine_steps(save=False) processor_receive.bpmn_process_instance.catch_bpmn_message( message_model_name, - message_payload, - correlations={}, + message_payload ) processor_receive.do_engine_steps(save=True) @@ -156,58 +156,19 @@ class MessageService: correlations={}, ) processor_receive.do_engine_steps(save=True) + message_instance_receive.status = MessageStatuses.completed.value + db.session.add(message_instance_receive) + db.session.commit() @staticmethod def get_message_instance_receive( message_instance_send: MessageInstanceModel, message_instances_receive: list[MessageInstanceModel], ) -> Optional[MessageInstanceModel]: - """Get_message_instance_receive.""" - - message_correlations_send = message_instance_send.message_correlations - - message_correlation_filter = [] - for message_correlation_send in message_correlations_send: - message_correlation_filter.append( - and_( - MessageCorrelationModel.name == message_correlation_send.name, - MessageCorrelationModel.value == message_correlation_send.value, - MessageCorrelationModel.message_correlation_property_id - == message_correlation_send.message_correlation_property_id, - ) - ) - - for message_instance_receive in message_instances_receive: - # sqlalchemy supports select / where statements like active record apparantly - # https://docs.sqlalchemy.org/en/14/core/tutorial.html#conjunctions - message_correlation_select = ( - select([db.func.count()]) - .select_from(MessageCorrelationModel) # type: ignore - .where( - and_( - MessageCorrelationModel.process_instance_id - == message_instance_receive.process_instance_id, - or_(*message_correlation_filter), - ) - ) - .join(message_correlation_message_instance_table) # type: ignore - .filter_by( - message_instance_id=message_instance_receive.id, - ) - ) - message_correlations_receive = db.session.execute( - message_correlation_select - ) - - # since the query matches on name, value, and message_instance_receive.id, if the counts - # message correlations found are the same, then this should be the relevant message - if ( - message_correlations_receive.scalar() == len(message_correlations_send) - and message_instance_receive.message_model_id - == message_instance_send.message_model_id - ): - return message_instance_receive - + """Returns the message instance that correlates to the send message, or None if nothing correlates.""" + for message_instance in message_instances_receive: + if message_instance.correlates(message_instance_send): + return message_instance return None @staticmethod diff --git a/spiffworkflow-backend/tests/data/message_send_one_conversation/message_receiver.bpmn b/spiffworkflow-backend/tests/data/message_send_one_conversation/message_receiver.bpmn index 828f7d2e..53eadc41 100644 --- a/spiffworkflow-backend/tests/data/message_send_one_conversation/message_receiver.bpmn +++ b/spiffworkflow-backend/tests/data/message_send_one_conversation/message_receiver.bpmn @@ -1,42 +1,39 @@ - + - - message_correlation_property_topica - message_correlation_property_topicb + + customer_id + po_number - - - topica + + + customer_id - - the_payload.topica + + customer_id - - - topicb + + + po_number - - the_payload.topicb + + po_number - + - the_payload + invoice - + - { "the_payload": { -"topica": the_payload.topica, -"topicb": the_payload.topicb, -}} + invoice @@ -45,28 +42,21 @@ Flow_11r9uiw - + Flow_0fruoax Flow_11r9uiw Flow_0fruoax - + + - - - - - - - - @@ -79,6 +69,14 @@ + + + + + + + + diff --git a/spiffworkflow-backend/tests/data/message_send_one_conversation/message_sender.bpmn b/spiffworkflow-backend/tests/data/message_send_one_conversation/message_sender.bpmn index 7bda31eb..299f49e5 100644 --- a/spiffworkflow-backend/tests/data/message_send_one_conversation/message_sender.bpmn +++ b/spiffworkflow-backend/tests/data/message_send_one_conversation/message_sender.bpmn @@ -5,25 +5,31 @@ - - message_correlation_property_topica - message_correlation_property_topicb + + The messages sent here are about an Invoice that can be uniquely identified by the customer_id ("sartography") and a purchase order number (1001) + +It will fire a message connected to the invoice keys above, starting another process, which can communicate back to this specific process instance using the correct key. + + + + po_number + customer_id - - - topica + + + po_number - - the_payload.topica + + po_number - - - topicb + + + customer_id - - the_payload.topicb + + customer_id @@ -32,42 +38,45 @@ - + Flow_1qgz6p0 - + Flow_037vpjk Flow_1qgz6p0 - + - + the_topic = "first_conversation" - Flow_1ihr88m + Flow_02lw0q9 Flow_037vpjk - - + + + + + + + + Flow_10conab - Flow_1ihr88m - -timestamp = time.time() -the_topica = f"first_conversation_a_{timestamp}" -the_topicb = f"first_conversation_b_{timestamp}" -del time - + Flow_02lw0q9 + - + { -"topica": the_topica, -"topicb": the_topicb, +"customer_id": customer_id, +"po_number": po_number, +"amount": amount, +"description": description, } - + the_payload @@ -78,22 +87,6 @@ del time - - - - - - - - - - - - - - - - @@ -103,20 +96,44 @@ del time - + - - + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -128,9 +145,9 @@ del time - + - + \ No newline at end of file diff --git a/spiffworkflow-backend/tests/data/message_send_two_conversations/message_sender.bpmn b/spiffworkflow-backend/tests/data/message_send_two_conversations/message_sender.bpmn index 16051705..671f2fee 100644 --- a/spiffworkflow-backend/tests/data/message_send_two_conversations/message_sender.bpmn +++ b/spiffworkflow-backend/tests/data/message_send_two_conversations/message_sender.bpmn @@ -19,18 +19,18 @@ - topica_one + topica_one - payload_var_one.topica_one + topic_one_a - topicb_one + topicb_one - payload_var_one.topicb + topic_one_b @@ -117,18 +117,18 @@ del time - topica_two + topica_two - topica_two + topic_two_a - topicb_two + topicb_two - topicb_two + topic_two_b diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py index 698cf6c8..cc050789 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py @@ -1,6 +1,10 @@ """Test_message_service.""" +import pytest from flask import Flask from flask.testing import FlaskClient + +from spiffworkflow_backend.exceptions.api_error import ApiError +from spiffworkflow_backend.routes.messages_controller import message_send from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.test_data import load_test_spec @@ -20,105 +24,182 @@ from spiffworkflow_backend.services.process_instance_service import ( class TestMessageService(BaseTest): """TestMessageService.""" - def test_can_send_message_to_waiting_message( - self, - app: Flask, - client: FlaskClient, - with_db_and_bpmn_file_cleanup: None, - with_super_admin_user: UserModel, + def test_message_from_api_into_running_process( + self, + app: Flask, + client: FlaskClient, + with_db_and_bpmn_file_cleanup: None, + with_super_admin_user: UserModel, ) -> None: - """Test_can_send_message_to_waiting_message.""" + """This example workflow will send a message called 'request_approval' and then wait for a response message + of 'approval_result'. This test assures that it will fire the message with the correct correlation properties + and will respond only to a message called "approval_result' that has the matching correlation properties, + as sent by an API Call""" + + self.payload = { + "customer_id": "Sartography", + "po_number": 1001, + "description": "We built a new feature for messages!", + "amount": "100.00" + } + + self.start_sender_process(client, with_super_admin_user) + self.assure_a_message_was_sent() + self.assure_there_is_a_process_waiting_on_a_message() + + # Make an API call to the service endpoint, but use the wrong po number + with pytest.raises(ApiError): + message_send("approval_result", {'payload': {'po_number': 5001}}) + + # Sound return an error when making an API call for right po number, wrong client + with pytest.raises(ApiError): + message_send("approval_result", {'payload': {'po_number': 1001, 'customer_id': 'jon'}}) + + # No error when calling with the correct parameters + response = message_send("approval_result", {'payload': {'po_number': 1001, 'customer_id': 'Sartography'}}) + + # There is no longer a waiting message + waiting_messages = MessageInstanceModel.query. \ + filter_by(message_type="receive"). \ + filter_by(status="ready"). \ + filter_by(process_instance_id=self.process_instance.id).all() + assert len(waiting_messages) == 0 + + # The process has completed + assert self.process_instance.status == 'complete' + + def test_single_conversation_between_two_processes( + self, + app: Flask, + client: FlaskClient, + with_db_and_bpmn_file_cleanup: None, + with_super_admin_user: UserModel, + ) -> None: + """Assure that communication between two processes works the same as making a call through the API, here + we have two process instances that are communicating with each other using one conversation about an + Invoice whose details are defined in the following message payload """ + self.payload = { + "customer_id": "Sartography", + "po_number": 1001, + "description": "We built a new feature for messages!", + "amount": "100.00" + } + + # Load up the definition for the receiving process (it has a message start event that should cause it to + # fire when a unique message comes through. + # Fire up the first process + second_process_model = load_test_spec( + "test_group/message_receive", + process_model_source_directory="message_send_one_conversation", + bpmn_file_name="message_receiver.bpmn" + ) + + # Now start the main process + self.start_sender_process(client, with_super_admin_user) + self.assure_a_message_was_sent() + + # This is typically called in a background cron process, so we will manually call it + # here in the tests + # The first time it is called, it will instantiate a new instance of the message_recieve process + MessageService.process_message_instances() + + # The sender process should still be waiting on a message to be returned to it ... + self.assure_there_is_a_process_waiting_on_a_message() + + # The second time we call ths process_message_isntances (again it would typically be running on cron) + # it will deliver the message that was sent from the receiver back to the original sender. + MessageService.process_message_instances() + + + # But there should be no send message waiting for delivery, because + # the message receiving process should pick it up instantly via + # it's start event. + waiting_messages = MessageInstanceModel.query. \ + filter_by(message_type="receive"). \ + filter_by(status="ready"). \ + filter_by(process_instance_id=self.process_instance.id).all() + assert len(waiting_messages) == 0 + + # The message sender process is complete + assert self.process_instance.status == 'complete' + + # The message receiver process is also complete + message_receiver_process = ProcessInstanceModel.query.filter_by(process_model_identifier = "test_group/message_receive").first() + assert message_receiver_process.status == 'complete' + + + + + + def start_sender_process(self, + client: FlaskClient, + with_super_admin_user: UserModel): process_group_id = "test_group" self.create_process_group( client, with_super_admin_user, process_group_id, process_group_id ) - load_test_spec( - "test_group/message_receiver", + process_model = load_test_spec( + "test_group/message", process_model_source_directory="message_send_one_conversation", - bpmn_file_name="message_receiver.bpmn", - ) - process_model_sender = load_test_spec( - "test_group/message_sender", - process_model_source_directory="message_send_one_conversation", - bpmn_file_name="message_sender.bpmn", + bpmn_file_name="message_sender.bpmn", # Slightly misnamed, it sends and receives ) - process_instance_sender = ProcessInstanceService.create_process_instance_from_process_model_identifier( - process_model_sender.id, + self.process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier( + process_model.id, with_super_admin_user, ) + processor_send_receive = ProcessInstanceProcessor(self.process_instance) + processor_send_receive.do_engine_steps(save=True) + task = processor_send_receive.get_all_user_tasks()[0] + human_task = self.process_instance.active_human_tasks[0] + spiff_task = processor_send_receive.__class__.get_task_by_bpmn_identifier( + human_task.task_name, processor_send_receive.bpmn_process_instance + ) - processor_sender = ProcessInstanceProcessor(process_instance_sender) - processor_sender.do_engine_steps() - processor_sender.save() + ProcessInstanceService.complete_form_task( + processor_send_receive, + task, + self.payload, + with_super_admin_user, + human_task, + ) + processor_send_receive.save() + def assure_a_message_was_sent(self): + # There should be one new send message for the given process instance. + send_messages = MessageInstanceModel.query. \ + filter_by(message_type="send"). \ + filter_by(process_instance_id=self.process_instance.id).all() + assert len(send_messages) == 1 + send_message = send_messages[0] + + # The payload should match because of how it is written in the Send task. + assert send_message.payload == self.payload, "The send message should match up with the payload" + assert send_message.message_model.identifier == "request_approval" + assert send_message.status == "ready" + assert len(send_message.message_correlations) == 2 message_instance_result = MessageInstanceModel.query.all() - assert len(message_instance_result) == 2 - # ensure both message instances are for the same process instance - # it will be send_message and receive_message_response - assert ( - message_instance_result[0].process_instance_id - == message_instance_result[1].process_instance_id - ) + self.assure_correlation_properties_are_right(send_message) - message_instance_sender = message_instance_result[0] - assert message_instance_sender.process_instance_id == process_instance_sender.id - message_correlations = MessageCorrelationModel.query.all() - assert len(message_correlations) == 2 - assert message_correlations[0].process_instance_id == process_instance_sender.id - message_correlations_message_instances = ( - MessageCorrelationMessageInstanceModel.query.all() - ) - assert len(message_correlations_message_instances) == 4 - assert ( - message_correlations_message_instances[0].message_instance_id - == message_instance_sender.id - ) - assert ( - message_correlations_message_instances[1].message_instance_id - == message_instance_sender.id - ) - assert ( - message_correlations_message_instances[2].message_instance_id - == message_instance_result[1].id - ) - assert ( - message_correlations_message_instances[3].message_instance_id - == message_instance_result[1].id - ) + def assure_there_is_a_process_waiting_on_a_message(self): + # There should be one new send message for the given process instance. + waiting_messages = MessageInstanceModel.query. \ + filter_by(message_type="receive"). \ + filter_by(status="ready"). \ + filter_by(process_instance_id=self.process_instance.id).all() + assert len(waiting_messages) == 1 + waiting_message = waiting_messages[0] + self.assure_correlation_properties_are_right(waiting_message) - # process first message - MessageService.process_message_instances() - assert message_instance_sender.status == "completed" - - process_instance_result = ProcessInstanceModel.query.all() - - assert len(process_instance_result) == 2 - process_instance_receiver = process_instance_result[1] - - # just make sure it's a different process instance - assert process_instance_receiver.id != process_instance_sender.id - assert process_instance_receiver.status == "complete" - - message_instance_result = MessageInstanceModel.query.all() - assert len(message_instance_result) == 3 - message_instance_receiver = message_instance_result[1] - assert message_instance_receiver.id != message_instance_sender.id - assert message_instance_receiver.status == "ready" - - # process second message - MessageService.process_message_instances() - - message_instance_result = MessageInstanceModel.query.all() - assert len(message_instance_result) == 3 - for message_instance in message_instance_result: - assert message_instance.status == "completed" - - process_instance_result = ProcessInstanceModel.query.all() - assert len(process_instance_result) == 2 - for process_instance in process_instance_result: - assert process_instance.status == "complete" + def assure_correlation_properties_are_right(self, message): + # Correlation Properties should match up + po_curr = next(c for c in message.message_correlations if c.name == "po_number") + customer_curr = next(c for c in message.message_correlations if c.name == "customer_id") + assert po_curr is not None + assert customer_curr is not None + assert po_curr.value == '1001' + assert customer_curr.value == "Sartography" def test_can_send_message_to_multiple_process_models( self, @@ -153,55 +234,34 @@ class TestMessageService(BaseTest): process_instance_sender = ProcessInstanceService.create_process_instance_from_process_model_identifier( process_model_sender.id, - user, - # process_group_identifier=process_model_sender.process_group_id, - ) + user) processor_sender = ProcessInstanceProcessor(process_instance_sender) processor_sender.do_engine_steps() processor_sender.save() - message_instance_result = MessageInstanceModel.query.all() - assert len(message_instance_result) == 3 - # ensure both message instances are for the same process instance - # it will be send_message and receive_message_response - assert ( - message_instance_result[0].process_instance_id - == message_instance_result[1].process_instance_id - ) + # At this point, the message_sender process has fired two different messages but those + # processes have not started, and it is now paused, waiting for to receive a message. so + # we should have two sends and a receive. + assert MessageInstanceModel.query.filter_by(process_instance_id = process_instance_sender.id).count() == 3 + assert MessageInstanceModel.query.count() == 3 # all messages are related to the instance + orig_send_messages = MessageInstanceModel.query.filter_by(message_type="send").all() + assert len(orig_send_messages) == 2 + assert MessageInstanceModel.query.filter_by(message_type="receive").count() == 1 - message_instance_sender = message_instance_result[0] - assert message_instance_sender.process_instance_id == process_instance_sender.id - message_correlations = MessageCorrelationModel.query.all() - assert len(message_correlations) == 4 - assert message_correlations[0].process_instance_id == process_instance_sender.id - message_correlations_message_instances = ( - MessageCorrelationMessageInstanceModel.query.all() - ) - assert len(message_correlations_message_instances) == 6 - assert ( - message_correlations_message_instances[0].message_instance_id - == message_instance_sender.id - ) - assert ( - message_correlations_message_instances[1].message_instance_id - == message_instance_sender.id - ) - assert ( - message_correlations_message_instances[2].message_instance_id - == message_instance_result[1].id - ) - assert ( - message_correlations_message_instances[3].message_instance_id - == message_instance_result[1].id - ) + message_instances = MessageInstanceModel.query.all() + # Each message instance should have two correlations + for mi in message_instances: + assert len(mi.message_correlations) == 2 - # process first message + # process message instances MessageService.process_message_instances() - assert message_instance_sender.status == "completed" + # Once complete the original send messages should be completed and two new instanges + # should now exist, one for each of the process instances ... + for osm in orig_send_messages: + assert osm.status == "completed" process_instance_result = ProcessInstanceModel.query.all() - assert len(process_instance_result) == 3 process_instance_receiver_one = ProcessInstanceModel.query.filter_by( process_model_identifier="test_group/message_receiver_one" @@ -241,9 +301,7 @@ class TestMessageService(BaseTest): ][0] assert message_instance_receiver_one is not None assert message_instance_receiver_two is not None - assert message_instance_receiver_one.id != message_instance_sender.id assert message_instance_receiver_one.status == "ready" - assert message_instance_receiver_two.id != message_instance_sender.id assert message_instance_receiver_two.status == "ready" # process second message diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service_2.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service_2.py deleted file mode 100644 index ca9833ac..00000000 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service_2.py +++ /dev/null @@ -1,127 +0,0 @@ -"""Test_message_service.""" -import pytest -from flask import Flask -from flask.testing import FlaskClient - -from spiffworkflow_backend.exceptions.api_error import ApiError -from spiffworkflow_backend.routes.messages_controller import message_send -from tests.spiffworkflow_backend.helpers.base_test import BaseTest -from tests.spiffworkflow_backend.helpers.test_data import load_test_spec - -from spiffworkflow_backend.models.message_correlation import MessageCorrelationModel -from spiffworkflow_backend.models.message_instance import MessageInstanceModel -from spiffworkflow_backend.models.process_instance import ProcessInstanceModel -from spiffworkflow_backend.models.user import UserModel -from spiffworkflow_backend.services.message_service import MessageService -from spiffworkflow_backend.services.process_instance_processor import ( - ProcessInstanceProcessor, -) -from spiffworkflow_backend.services.process_instance_service import ( - ProcessInstanceService, -) - - -class TestMessageService(BaseTest): - """TestMessageService.""" - - def test_message_sent( - self, - app: Flask, - client: FlaskClient, - with_db_and_bpmn_file_cleanup: None, - with_super_admin_user: UserModel, - ) -> None: - """This example workflow will send a message called 'request_approval' and then wait for a response messge - of 'approval_result. This test assures that it will fire the message with the correct correlation properties - and will respond only to a message called "approval_result' that has the matching correlation properties.""" - process_group_id = "test_group" - self.create_process_group( - client, with_super_admin_user, process_group_id, process_group_id - ) - - process_model = load_test_spec( - "test_group/message", - process_model_source_directory="message", - bpmn_file_name="message_send_receive.bpmn", - ) - - self.process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier( - process_model.id, - with_super_admin_user, - ) - processor_send_receive = ProcessInstanceProcessor(self.process_instance) - processor_send_receive.do_engine_steps(save=True) - task = processor_send_receive.get_all_user_tasks()[0] - human_task = self.process_instance.active_human_tasks[0] - spiff_task = processor_send_receive.__class__.get_task_by_bpmn_identifier( - human_task.task_name, processor_send_receive.bpmn_process_instance - ) - self.payload = { - "customer_id": "Sartography", - "po_number": 1001, - "description": "We build a new feature for messages!", - "amount": "100.00" - } - - ProcessInstanceService.complete_form_task( - processor_send_receive, - task, - self.payload, - with_super_admin_user, - human_task, - ) - processor_send_receive.save() - self.assure_a_message_was_sent() - self.assure_there_is_a_process_waiting_on_a_message() - - ## Should return an error when making an API call for the wrong po number - with pytest.raises(ApiError): - message_send("approval_result", {'payload': {'po_number' : 5001}}) - - ## Sound return an error when making an API call for right po number, wrong client - with pytest.raises(ApiError): - message_send("approval_result", {'payload': {'po_number' : 1001, 'customer_id': 'jon'}}) - - ## No error when calling with the correct parameters - response = message_send("approval_result", {'payload': {'po_number' : 1001, 'customer_id': 'Sartography'}}) - - ## There is no longer a waiting message - waiting_messages = MessageInstanceModel.query. \ - filter_by(message_type = "receive"). \ - filter_by(process_instance_id = self.process_instance.id).all() - assert len(waiting_messages) == 0 - - def assure_a_message_was_sent(self): - # There should be one new send message for the given process instance. - send_messages = MessageInstanceModel.query. \ - filter_by(message_type = "send"). \ - filter_by(process_instance_id = self.process_instance.id).all() - assert len(send_messages) == 1 - send_message = send_messages[0] - - # The payload should match because of how it is written in the Send task. - assert send_message.payload == self.payload, "The send message should match up with the payload" - assert send_message.message_model.identifier == "request_approval" - assert send_message.status == "ready" - assert len(send_message.message_correlations) == 2 - message_instance_result = MessageInstanceModel.query.all() - self.assure_correlation_properties_are_right(send_message) - - def assure_there_is_a_process_waiting_on_a_message(self): - # There should be one new send message for the given process instance. - waiting_messages = MessageInstanceModel.query. \ - filter_by(message_type = "receive"). \ - filter_by(process_instance_id = self.process_instance.id).all() - assert len(waiting_messages) == 1 - waiting_message = waiting_messages[0] - self.assure_correlation_properties_are_right(waiting_message) - - def assure_correlation_properties_are_right(self, message): - # Correlation Properties should match up - po_curr = next(c for c in message.message_correlations if c.name == "po_number") - customer_curr = next(c for c in message.message_correlations if c.name == "customer_id") - assert po_curr is not None - assert customer_curr is not None - assert po_curr.value == '1001' - assert customer_curr.value == "Sartography" -