This commit is contained in:
Dan 2023-02-20 12:34:42 -05:00
parent 5171e53240
commit 7b16625cff
16 changed files with 202 additions and 175 deletions

View File

@ -5,13 +5,9 @@ import shutil
import pytest
from flask.app import Flask
from flask.testing import FlaskClient
from spiffworkflow_backend.models import message_correlation_message_instance
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
from spiffworkflow_backend.models.human_task_user import HumanTaskUserModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_instance_processor import (
@ -48,14 +44,12 @@ def app() -> Flask:
@pytest.fixture()
def with_db_and_bpmn_file_cleanup() -> None:
"""Do it cleanly!"""
meta = db.metadata
for table in reversed(meta.sorted_tables):
print
'Clear table %s' % table
db.session.execute(table.delete())
db.session.commit()
try:
yield
finally:

View File

@ -94,7 +94,6 @@ def create_app() -> flask.app.Flask:
app.config["CONNEXION_APP"] = connexion_app
app.config["SESSION_TYPE"] = "filesystem"
setup_config(app)
db.init_app(app)
migrate.init_app(app, db)

View File

@ -2,9 +2,7 @@
import re
from os import environ
FLASK_SESSION_SECRET_KEY = environ.get(
"FLASK_SESSION_SECRET_KEY"
)
FLASK_SESSION_SECRET_KEY = environ.get("FLASK_SESSION_SECRET_KEY")
SPIFFWORKFLOW_BACKEND_BPMN_SPEC_ABSOLUTE_DIR = environ.get(
"SPIFFWORKFLOW_BACKEND_BPMN_SPEC_ABSOLUTE_DIR"

View File

@ -1,6 +1,5 @@
"""Message_correlation."""
from dataclasses import dataclass
from typing import TYPE_CHECKING
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
@ -40,4 +39,3 @@ class MessageCorrelationModel(SpiffworkflowBaseDBModel):
created_at_in_seconds: int = db.Column(db.Integer)
message_correlation_property = relationship("MessageCorrelationPropertyModel")

View File

@ -1,13 +1,14 @@
"""Message_correlation_message_instance."""
from sqlalchemy import ForeignKey
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.message_correlation import MessageCorrelationModel
message_correlation_message_instance_table \
= db.Table('message_correlation_message_instance',
db.Column('message_instance_id',
ForeignKey('message_instance.id'), primary_key=True),
db.Column('message_correlation_id',
ForeignKey('message_correlation.id'),primary_key=True),
)
from spiffworkflow_backend.models.db import db
message_correlation_message_instance_table = db.Table(
"message_correlation_message_instance",
db.Column(
"message_instance_id", ForeignKey("message_instance.id"), primary_key=True
),
db.Column(
"message_correlation_id", ForeignKey("message_correlation.id"), primary_key=True
),
)

View File

@ -23,5 +23,4 @@ class MessageCorrelationPropertyModel(SpiffworkflowBaseDBModel):
message_model_id = db.Column(ForeignKey(MessageModel.id), nullable=False)
updated_at_in_seconds: int = db.Column(db.Integer)
created_at_in_seconds: int = db.Column(db.Integer)
message_model = db.relationship("MessageModel",
backref="correlation_properties")
message_model = db.relationship("MessageModel", backref="correlation_properties")

View File

@ -1,24 +1,23 @@
"""Message_instance."""
import enum
from dataclasses import dataclass
from typing import Any, Self
from typing import Any
from typing import Optional
from typing import TYPE_CHECKING
from sqlalchemy import ForeignKey
from sqlalchemy.event import listens_for
from sqlalchemy.orm import relationship
from sqlalchemy.orm import Session
from sqlalchemy.orm import validates
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
from spiffworkflow_backend.models.message_correlation_message_instance import message_correlation_message_instance_table
from spiffworkflow_backend.models.message_correlation_message_instance import (
message_correlation_message_instance_table,
)
from spiffworkflow_backend.models.message_model import MessageModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
class MessageTypes(enum.Enum):
"""MessageTypes."""
@ -35,7 +34,6 @@ class MessageStatuses(enum.Enum):
failed = "failed"
@dataclass
class MessageInstanceModel(SpiffworkflowBaseDBModel):
"""Messages from a process instance that are ready to send to a receiving task."""
@ -46,10 +44,12 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
process_instance_id: int = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=False) # type: ignore
message_model_id: int = db.Column(ForeignKey(MessageModel.id), nullable=False)
message_model = db.relationship("MessageModel")
message_correlations = db.relationship("MessageCorrelationModel",
secondary=message_correlation_message_instance_table,
backref="message_instances",
cascade="all,delete")
message_correlations = db.relationship(
"MessageCorrelationModel",
secondary=message_correlation_message_instance_table,
backref="message_instances",
cascade="all,delete",
)
message_type: str = db.Column(db.String(20), nullable=False)
payload: str = db.Column(db.JSON)
@ -68,20 +68,21 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
"""Validate_status."""
return self.validate_enum_field(key, value, MessageStatuses)
def correlation_dictionary(self):
def correlation_dictionary(self) -> dict:
correlation_dict = {}
for c in self.message_correlations:
correlation_dict[c.name]=c.value
correlation_dict[c.name] = c.value
return correlation_dict
def correlates(self, other_message_instance: Self) -> bool:
def correlates(self, other_message_instance: Any) -> bool:
if other_message_instance.message_model_id != self.message_model_id:
return False
return self.correlates_with_dictionary(other_message_instance.correlation_dictionary())
return self.correlates_with_dictionary(
other_message_instance.correlation_dictionary()
)
def correlates_with_dictionary(self, dict: dict) -> bool:
"""Returns true if the given dictionary matches the correlation
names and values connected to this message instance"""
"""Returns true if the given dictionary matches the correlation names and values connected to this instance."""
for c in self.message_correlations:
# Fixme: Maybe we should look at typing the correlations and not forcing them to strings?
if c.name in dict and str(dict[c.name]) == c.value:
@ -90,9 +91,6 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
return False
return True
corrs = {}
# This runs for ALL db flushes for ANY model, not just this one even if it's in the MessageInstanceModel class
# so this may not be worth it or there may be a better way to do it

View File

@ -1,4 +1,6 @@
"""Message_model."""
from typing import Any
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
@ -13,8 +15,8 @@ class MessageModel(SpiffworkflowBaseDBModel):
name = db.Column(db.String(50), unique=True, index=True)
# correlation_properties is a backref and defined in the MessageCorrelationProperties class.
def get_correlation_property(self, identifier):
def get_correlation_property(self, identifier: str) -> Any | None:
for corr_prop in self.correlation_properties:
if corr_prop.identifier == identifier:
return corr_prop;
return corr_prop
return None

View File

@ -11,8 +11,6 @@ from flask import make_response
from flask.wrappers import Response
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.message_correlation import MessageCorrelationModel
from spiffworkflow_backend.models.message_correlation_property import MessageCorrelationPropertyModel
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_model import MessageModel
from spiffworkflow_backend.models.message_triggerable_process_model import (
@ -21,9 +19,6 @@ from spiffworkflow_backend.models.message_triggerable_process_model import (
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModelSchema
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
from spiffworkflow_backend.routes.process_api_blueprint import (
_find_process_instance_by_id_or_raise,
)
from spiffworkflow_backend.services.message_service import MessageService
@ -91,7 +86,10 @@ def message_send(
raise (
ApiError(
error_code="missing_payload",
message="Please include a 'payload' in the JSON body that contains the message contents.",
message=(
"Please include a 'payload' in the JSON body that contains the"
" message contents."
),
status_code=400,
)
)
@ -99,7 +97,9 @@ def message_send(
process_instance = None
# Is there a running instance that is waiting for this message?
message_instances = MessageInstanceModel.query.filter_by(message_model_id=message_model.id).all()
message_instances = MessageInstanceModel.query.filter_by(
message_model_id=message_model.id
).all()
# do any waiting message instances have matching correlations?
matching_message = None
@ -109,14 +109,21 @@ def message_send(
process_instance = None
if matching_message:
process_instance = ProcessInstanceModel.query.filter_by(id = matching_message.process_instance_id).first()
process_instance = ProcessInstanceModel.query.filter_by(
id=matching_message.process_instance_id
).first()
if matching_message and process_instance and process_instance.status != ProcessInstanceStatus.waiting.value:
if (
matching_message
and process_instance
and process_instance.status != ProcessInstanceStatus.waiting.value
):
raise ApiError(
error_code="message_not_accepted",
message=(
f"The process that can accept message '{message_identifier}' with the given correlation keys"
f" is not currently waiting for that message. It is currently in the a '{process_instance.status}' state."
f"The process that can accept message '{message_identifier}' with the"
" given correlation keys is not currently waiting for that message. "
f" It is currently in the a '{process_instance.status}' state."
),
status_code=400,
)
@ -136,8 +143,10 @@ def message_send(
ApiError(
error_code="cannot_start_message",
message=(
f"No process instances correlate with the given message id of '{message_identifier}'. "
f"And this message name is not currently associated with any process Start Event."),
"No process instances correlate with the given message id of"
f" '{message_identifier}'. And this message name is not"
" currently associated with any process Start Event."
),
status_code=400,
)
)

View File

@ -2,13 +2,9 @@
from typing import Any
from typing import Optional
from sqlalchemy import and_
from sqlalchemy import or_
from sqlalchemy import select
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.message_correlation import MessageCorrelationModel
from spiffworkflow_backend.models.message_instance import MessageInstanceModel, MessageStatuses
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_instance import MessageStatuses
from spiffworkflow_backend.models.message_triggerable_process_model import (
MessageTriggerableProcessModel,
)
@ -121,8 +117,7 @@ class MessageService:
processor_receive = ProcessInstanceProcessor(process_instance_receive)
processor_receive.do_engine_steps(save=False)
processor_receive.bpmn_process_instance.catch_bpmn_message(
message_model_name,
message_payload
message_model_name, message_payload
)
processor_receive.do_engine_steps(save=True)
@ -149,7 +144,6 @@ class MessageService:
)
)
processor_receive = ProcessInstanceProcessor(process_instance_receive)
processor_receive.bpmn_process_instance.catch_bpmn_message(
message_model_name,

View File

@ -1340,7 +1340,6 @@ class ProcessInstanceProcessor:
def process_bpmn_messages(self) -> None:
"""Process_bpmn_messages."""
bpmn_messages = self.bpmn_process_instance.get_bpmn_messages()
for bpmn_message in bpmn_messages:
# only message sends are in get_bpmn_messages
@ -1351,7 +1350,6 @@ class ProcessInstanceProcessor:
f"Invalid message name: {bpmn_message.name}.",
)
if not bpmn_message.correlations:
raise ApiError(
"message_correlations_missing",
@ -1362,8 +1360,10 @@ class ProcessInstanceProcessor:
)
message_correlations = []
for (name, value) in bpmn_message.correlations.items():
message_correlation_property = message_model.get_correlation_property(name)
for name, value in bpmn_message.correlations.items():
message_correlation_property = message_model.get_correlation_property(
name
)
if message_correlation_property is None:
raise ApiError(
"message_correlations_missing_from_process",
@ -1372,17 +1372,20 @@ class ProcessInstanceProcessor:
f" identifier:{name}"
),
)
message_correlations.append(MessageCorrelationModel(
process_instance_id=self.process_instance_model.id,
message_correlation_property_id=message_correlation_property.id,
name=name,
value=value))
message_correlations.append(
MessageCorrelationModel(
process_instance_id=self.process_instance_model.id,
message_correlation_property_id=message_correlation_property.id,
name=name,
value=value,
)
)
message_instance = MessageInstanceModel(
process_instance_id=self.process_instance_model.id,
message_type="send",
message_model_id=message_model.id,
payload=bpmn_message.payload,
message_correlations=message_correlations
message_correlations=message_correlations,
)
db.session.add(message_instance)
db.session.commit()
@ -1434,15 +1437,29 @@ class ProcessInstanceProcessor:
for (
spiff_correlation_property
) in waiting_task.task_spec.event_definition.correlation_properties:
message_correlation = next((mc for mc in message_instance.message_correlations
if mc.name == spiff_correlation_property.name), None)
message_correlation = next(
(
mc
for mc in message_instance.message_correlations
if mc.name == spiff_correlation_property.name
),
None,
)
if not message_correlation:
expression = spiff_correlation_property.expression
correlation_value = ProcessInstanceProcessor._script_engine.evaluate(waiting_task, expression)
correlation_value = (
ProcessInstanceProcessor._script_engine.evaluate(
waiting_task, expression
)
)
correlation_name = spiff_correlation_property.name
message_prop = MessageCorrelationPropertyModel.query.\
filter_by(identifier=correlation_name).\
filter_by(message_model_id=message_model.id).first()
message_prop = (
MessageCorrelationPropertyModel.query.filter_by(
identifier=correlation_name
)
.filter_by(message_model_id=message_model.id)
.first()
)
message_correlation = MessageCorrelationModel(
process_instance_id=self.process_instance_model.id,

View File

@ -175,7 +175,7 @@ class SpecFileService(FileSystemService):
"""Validate_bpmn_xml."""
file_type = FileSystemService.file_type(file_name)
if file_type.value == FileType.bpmn.value:
validator = BpmnValidator()
BpmnValidator()
parser = MyCustomParser()
try:
parser.add_bpmn_xml(

View File

@ -1352,7 +1352,7 @@ class TestProcessApi(BaseTest):
"customer_id": "sartography",
"po_number": "1001",
"amount": "One Billion Dollars! Mwhahahahahaha",
"description": "But seriously."
"description": "But seriously.",
}
response = client.post(
f"/v1.0/messages/{message_model_identifier}",
@ -1401,7 +1401,7 @@ class TestProcessApi(BaseTest):
"customer_id": "sartography",
"po_number": "1001",
"amount": "One Billion Dollars! Mwhahahahahaha",
"description": "Ya!, a-ok bud!"
"description": "Ya!, a-ok bud!",
}
response = self.create_process_instance_from_process_model_id_with_api(
client,
@ -1417,14 +1417,13 @@ class TestProcessApi(BaseTest):
)
assert response.json is not None
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first()
process_instance = ProcessInstanceModel.query.filter_by(
id=process_instance_id
).first()
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
task = processor.get_all_user_tasks()[0]
human_task = process_instance.active_human_tasks[0]
spiff_task = processor.__class__.get_task_by_bpmn_identifier(
human_task.task_name, processor.bpmn_process_instance
)
ProcessInstanceService.complete_form_task(
processor,
@ -1484,7 +1483,7 @@ class TestProcessApi(BaseTest):
"customer_id": "sartography",
"po_number": "1001",
"amount": "One Billion Dollars! Mwhahahahahaha",
"description": "But seriously."
"description": "But seriously.",
}
response = self.create_process_instance_from_process_model_id_with_api(
@ -1495,7 +1494,6 @@ class TestProcessApi(BaseTest):
assert response.json is not None
process_instance_id = response.json["id"]
process_instance = ProcessInstanceModel.query.filter_by(
id=process_instance_id
).first()
@ -1503,9 +1501,6 @@ class TestProcessApi(BaseTest):
processor.do_engine_steps(save=True)
task = processor.get_all_user_tasks()[0]
human_task = process_instance.active_human_tasks[0]
spiff_task = processor.__class__.get_task_by_bpmn_identifier(
human_task.task_name, processor.bpmn_process_instance
)
ProcessInstanceService.complete_form_task(
processor,
@ -1517,7 +1512,7 @@ class TestProcessApi(BaseTest):
processor.save()
processor.suspend()
payload['description'] = "Message To Suspended"
payload["description"] = "Message To Suspended"
response = client.post(
f"/v1.0/messages/{message_model_identifier}",
content_type="application/json",
@ -1531,14 +1526,12 @@ class TestProcessApi(BaseTest):
assert response.json["error_code"] == "message_not_accepted"
processor.resume()
payload['description'] = "Message To Resumed"
payload["description"] = "Message To Resumed"
response = client.post(
f"/v1.0/messages/{message_model_identifier}",
content_type="application/json",
headers=self.logged_in_headers(with_super_admin_user),
data=json.dumps(
{"payload": payload}
),
data=json.dumps({"payload": payload}),
)
assert response.status_code == 200
json_data = response.json
@ -2325,7 +2318,7 @@ class TestProcessApi(BaseTest):
"customer_id": "sartography",
"po_number": "1001",
"amount": "One Billion Dollars! Mwhahahahahaha",
"description": "But seriously."
"description": "But seriously.",
}
response = client.post(
f"/v1.0/messages/{message_model_identifier}",

View File

@ -2,16 +2,14 @@
import pytest
from flask import Flask
from flask.testing import FlaskClient
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.routes.messages_controller import message_send
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.message_correlation import MessageCorrelationModel
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.routes.messages_controller import message_send
from spiffworkflow_backend.services.message_service import MessageService
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
@ -25,22 +23,24 @@ class TestMessageService(BaseTest):
"""TestMessageService."""
def test_message_from_api_into_running_process(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""This example workflow will send a message called 'request_approval' and then wait for a response message
of 'approval_result'. This test assures that it will fire the message with the correct correlation properties
and will respond only to a message called "approval_result' that has the matching correlation properties,
as sent by an API Call"""
"""Test sending a message to a running process via the API.
This example workflow will send a message called 'request_approval' and then wait for a response message
of 'approval_result'. This test assures that it will fire the message with the correct correlation properties
and will respond only to a message called 'approval_result' that has the matching correlation properties,
as sent by an API Call.
"""
self.payload = {
"customer_id": "Sartography",
"po_number": 1001,
"description": "We built a new feature for messages!",
"amount": "100.00"
"amount": "100.00",
}
self.start_sender_process(client, with_super_admin_user)
@ -49,49 +49,59 @@ class TestMessageService(BaseTest):
# Make an API call to the service endpoint, but use the wrong po number
with pytest.raises(ApiError):
message_send("approval_result", {'payload': {'po_number': 5001}})
message_send("approval_result", {"payload": {"po_number": 5001}})
# Sound return an error when making an API call for right po number, wrong client
with pytest.raises(ApiError):
message_send("approval_result", {'payload': {'po_number': 1001, 'customer_id': 'jon'}})
message_send(
"approval_result",
{"payload": {"po_number": 1001, "customer_id": "jon"}},
)
# No error when calling with the correct parameters
response = message_send("approval_result", {'payload': {'po_number': 1001, 'customer_id': 'Sartography'}})
message_send(
"approval_result",
{"payload": {"po_number": 1001, "customer_id": "Sartography"}},
)
# There is no longer a waiting message
waiting_messages = MessageInstanceModel.query. \
filter_by(message_type="receive"). \
filter_by(status="ready"). \
filter_by(process_instance_id=self.process_instance.id).all()
waiting_messages = (
MessageInstanceModel.query.filter_by(message_type="receive")
.filter_by(status="ready")
.filter_by(process_instance_id=self.process_instance.id)
.all()
)
assert len(waiting_messages) == 0
# The process has completed
assert self.process_instance.status == 'complete'
assert self.process_instance.status == "complete"
def test_single_conversation_between_two_processes(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
self,
app: Flask,
client: FlaskClient,
with_super_admin_user: UserModel,
) -> None:
"""Assure that communication between two processes works the same as making a call through the API, here
"""Test messages between two different running processes using a single conversation.
Assure that communication between two processes works the same as making a call through the API, here
we have two process instances that are communicating with each other using one conversation about an
Invoice whose details are defined in the following message payload """
Invoice whose details are defined in the following message payload
"""
self.payload = {
"customer_id": "Sartography",
"po_number": 1001,
"description": "We built a new feature for messages!",
"amount": "100.00"
"amount": "100.00",
}
# Load up the definition for the receiving process (it has a message start event that should cause it to
# fire when a unique message comes through.
# Fire up the first process
second_process_model = load_test_spec(
load_test_spec(
"test_group/message_receive",
process_model_source_directory="message_send_one_conversation",
bpmn_file_name="message_receiver.bpmn"
bpmn_file_name="message_receiver.bpmn",
)
# Now start the main process
@ -110,30 +120,29 @@ class TestMessageService(BaseTest):
# it will deliver the message that was sent from the receiver back to the original sender.
MessageService.process_message_instances()
# But there should be no send message waiting for delivery, because
# the message receiving process should pick it up instantly via
# it's start event.
waiting_messages = MessageInstanceModel.query. \
filter_by(message_type="receive"). \
filter_by(status="ready"). \
filter_by(process_instance_id=self.process_instance.id).all()
waiting_messages = (
MessageInstanceModel.query.filter_by(message_type="receive")
.filter_by(status="ready")
.filter_by(process_instance_id=self.process_instance.id)
.all()
)
assert len(waiting_messages) == 0
# The message sender process is complete
assert self.process_instance.status == 'complete'
assert self.process_instance.status == "complete"
# The message receiver process is also complete
message_receiver_process = ProcessInstanceModel.query.filter_by(process_model_identifier = "test_group/message_receive").first()
assert message_receiver_process.status == 'complete'
message_receiver_process = ProcessInstanceModel.query.filter_by(
process_model_identifier="test_group/message_receive"
).first()
assert message_receiver_process.status == "complete"
def start_sender_process(self,
client: FlaskClient,
with_super_admin_user: UserModel):
def start_sender_process(
self, client: FlaskClient, with_super_admin_user: UserModel
) -> None:
process_group_id = "test_group"
self.create_process_group(
client, with_super_admin_user, process_group_id, process_group_id
@ -153,9 +162,6 @@ class TestMessageService(BaseTest):
processor_send_receive.do_engine_steps(save=True)
task = processor_send_receive.get_all_user_tasks()[0]
human_task = self.process_instance.active_human_tasks[0]
spiff_task = processor_send_receive.__class__.get_task_by_bpmn_identifier(
human_task.task_name, processor_send_receive.bpmn_process_instance
)
ProcessInstanceService.complete_form_task(
processor_send_receive,
@ -166,39 +172,49 @@ class TestMessageService(BaseTest):
)
processor_send_receive.save()
def assure_a_message_was_sent(self):
def assure_a_message_was_sent(self) -> None:
# There should be one new send message for the given process instance.
send_messages = MessageInstanceModel.query. \
filter_by(message_type="send"). \
filter_by(process_instance_id=self.process_instance.id).all()
send_messages = (
MessageInstanceModel.query.filter_by(message_type="send")
.filter_by(process_instance_id=self.process_instance.id)
.all()
)
assert len(send_messages) == 1
send_message = send_messages[0]
# The payload should match because of how it is written in the Send task.
assert send_message.payload == self.payload, "The send message should match up with the payload"
assert (
send_message.payload == self.payload
), "The send message should match up with the payload"
assert send_message.message_model.identifier == "request_approval"
assert send_message.status == "ready"
assert len(send_message.message_correlations) == 2
message_instance_result = MessageInstanceModel.query.all()
MessageInstanceModel.query.all()
self.assure_correlation_properties_are_right(send_message)
def assure_there_is_a_process_waiting_on_a_message(self):
def assure_there_is_a_process_waiting_on_a_message(self) -> None:
# There should be one new send message for the given process instance.
waiting_messages = MessageInstanceModel.query. \
filter_by(message_type="receive"). \
filter_by(status="ready"). \
filter_by(process_instance_id=self.process_instance.id).all()
waiting_messages = (
MessageInstanceModel.query.filter_by(message_type="receive")
.filter_by(status="ready")
.filter_by(process_instance_id=self.process_instance.id)
.all()
)
assert len(waiting_messages) == 1
waiting_message = waiting_messages[0]
self.assure_correlation_properties_are_right(waiting_message)
def assure_correlation_properties_are_right(self, message):
def assure_correlation_properties_are_right(
self, message: MessageInstanceModel
) -> None:
# Correlation Properties should match up
po_curr = next(c for c in message.message_correlations if c.name == "po_number")
customer_curr = next(c for c in message.message_correlations if c.name == "customer_id")
customer_curr = next(
c for c in message.message_correlations if c.name == "customer_id"
)
assert po_curr is not None
assert customer_curr is not None
assert po_curr.value == '1001'
assert po_curr.value == "1001"
assert customer_curr.value == "Sartography"
def test_can_send_message_to_multiple_process_models(
@ -209,7 +225,7 @@ class TestMessageService(BaseTest):
with_super_admin_user: UserModel,
) -> None:
"""Test_can_send_message_to_multiple_process_models."""
process_group_id = "test_group"
process_group_id = "test_group_multi"
self.create_process_group(
client, with_super_admin_user, process_group_id, process_group_id
)
@ -233,8 +249,8 @@ class TestMessageService(BaseTest):
user = self.find_or_create_user()
process_instance_sender = ProcessInstanceService.create_process_instance_from_process_model_identifier(
process_model_sender.id,
user)
process_model_sender.id, user
)
processor_sender = ProcessInstanceProcessor(process_instance_sender)
processor_sender.do_engine_steps()
@ -243,9 +259,18 @@ class TestMessageService(BaseTest):
# At this point, the message_sender process has fired two different messages but those
# processes have not started, and it is now paused, waiting for to receive a message. so
# we should have two sends and a receive.
assert MessageInstanceModel.query.filter_by(process_instance_id = process_instance_sender.id).count() == 3
assert MessageInstanceModel.query.count() == 3 # all messages are related to the instance
orig_send_messages = MessageInstanceModel.query.filter_by(message_type="send").all()
assert (
MessageInstanceModel.query.filter_by(
process_instance_id=process_instance_sender.id
).count()
== 3
)
assert (
MessageInstanceModel.query.count() == 3
) # all messages are related to the instance
orig_send_messages = MessageInstanceModel.query.filter_by(
message_type="send"
).all()
assert len(orig_send_messages) == 2
assert MessageInstanceModel.query.filter_by(message_type="receive").count() == 1