work in progress -
* Link between message instance and correlations is now a link table and many-to-many relationships as recommended by SQLAlchemy * Use the correlation keys, not the process id when accepting api messages.
This commit is contained in:
parent
9f144f540e
commit
4942a728b8
13
conftest.py
13
conftest.py
|
@ -5,6 +5,8 @@ import shutil
|
|||
import pytest
|
||||
from flask.app import Flask
|
||||
from flask.testing import FlaskClient
|
||||
|
||||
from spiffworkflow_backend.models import message_correlation_message_instance
|
||||
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
||||
|
||||
from spiffworkflow_backend.models.db import db
|
||||
|
@ -46,13 +48,14 @@ def app() -> Flask:
|
|||
|
||||
@pytest.fixture()
|
||||
def with_db_and_bpmn_file_cleanup() -> None:
|
||||
"""Process_group_resource."""
|
||||
db.session.query(HumanTaskUserModel).delete()
|
||||
|
||||
for model in SpiffworkflowBaseDBModel._all_subclasses():
|
||||
db.session.query(model).delete()
|
||||
meta = db.metadata
|
||||
for table in reversed(meta.sorted_tables):
|
||||
print
|
||||
'Clear table %s' % table
|
||||
db.session.execute(table.delete())
|
||||
db.session.commit()
|
||||
|
||||
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
"""empty message
|
||||
|
||||
Revision ID: ac40af4ddef3
|
||||
Revises: 63fc8d693b9f
|
||||
Create Date: 2023-02-16 14:54:14.533029
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects import mysql
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'ac40af4ddef3'
|
||||
down_revision = '63fc8d693b9f'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_index('message_correlation_message_instance_unique', table_name='message_correlation_message_instance')
|
||||
op.drop_column('message_correlation_message_instance', 'id')
|
||||
op.create_primary_key('mcmi_pirmary_key','message_correlation_message_instance', ['message_instance_id', 'message_correlation_id'])
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.add_column('message_correlation_message_instance', sa.Column('id', mysql.INTEGER(), autoincrement=False, nullable=False))
|
||||
op.create_index('message_correlation_message_instance_unique', 'message_correlation_message_instance', ['message_instance_id', 'message_correlation_id'], unique=False)
|
||||
op.create_index('ix_message_correlation_message_instance_message_instance_id', 'message_correlation_message_instance', ['message_instance_id'], unique=False)
|
||||
op.create_index('ix_message_correlation_message_instance_message_correlation_id', 'message_correlation_message_instance', ['message_correlation_id'], unique=False)
|
||||
# ### end Alembic commands ###
|
|
@ -28,6 +28,7 @@ groups:
|
|||
users:
|
||||
[
|
||||
admin@spiffworkflow.org,
|
||||
nelson@spiffworkflow.org
|
||||
]
|
||||
|
||||
permissions:
|
||||
|
|
|
@ -12,11 +12,6 @@ from spiffworkflow_backend.models.message_correlation_property import (
|
|||
)
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from spiffworkflow_backend.models.message_correlation_message_instance import ( # noqa: F401
|
||||
MessageCorrelationMessageInstanceModel,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MessageCorrelationModel(SpiffworkflowBaseDBModel):
|
||||
|
@ -45,6 +40,4 @@ class MessageCorrelationModel(SpiffworkflowBaseDBModel):
|
|||
created_at_in_seconds: int = db.Column(db.Integer)
|
||||
|
||||
message_correlation_property = relationship("MessageCorrelationPropertyModel")
|
||||
message_correlations_message_instances = relationship(
|
||||
"MessageCorrelationMessageInstanceModel", cascade="delete"
|
||||
)
|
||||
|
||||
|
|
|
@ -1,32 +1,13 @@
|
|||
"""Message_correlation_message_instance."""
|
||||
from dataclasses import dataclass
|
||||
|
||||
from sqlalchemy import ForeignKey
|
||||
|
||||
from spiffworkflow_backend.models.db import db
|
||||
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
|
||||
from spiffworkflow_backend.models.message_correlation import MessageCorrelationModel
|
||||
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
|
||||
|
||||
|
||||
@dataclass
|
||||
class MessageCorrelationMessageInstanceModel(SpiffworkflowBaseDBModel):
|
||||
"""MessageCorrelationMessageInstanceModel."""
|
||||
|
||||
__tablename__ = "message_correlation_message_instance"
|
||||
|
||||
__table_args__ = (
|
||||
db.UniqueConstraint(
|
||||
"message_instance_id",
|
||||
"message_correlation_id",
|
||||
name="message_correlation_message_instance_unique",
|
||||
),
|
||||
)
|
||||
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
message_instance_id = db.Column(
|
||||
ForeignKey(MessageInstanceModel.id), nullable=False, index=True # type: ignore
|
||||
)
|
||||
message_correlation_id = db.Column(
|
||||
ForeignKey(MessageCorrelationModel.id), nullable=False, index=True
|
||||
)
|
||||
message_correlation_message_instance_table \
|
||||
= db.Table('message_correlation_message_instance',
|
||||
db.Column('message_instance_id',
|
||||
ForeignKey('message_instance.id'), primary_key=True),
|
||||
db.Column('message_correlation_id',
|
||||
ForeignKey('message_correlation.id'),primary_key=True)
|
||||
)
|
||||
|
|
|
@ -13,13 +13,10 @@ from sqlalchemy.orm import validates
|
|||
|
||||
from spiffworkflow_backend.models.db import db
|
||||
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
|
||||
from spiffworkflow_backend.models.message_correlation_message_instance import message_correlation_message_instance_table
|
||||
from spiffworkflow_backend.models.message_model import MessageModel
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from spiffworkflow_backend.models.message_correlation_message_instance import ( # noqa: F401
|
||||
MessageCorrelationMessageInstanceModel,
|
||||
)
|
||||
|
||||
|
||||
class MessageTypes(enum.Enum):
|
||||
|
@ -38,6 +35,7 @@ class MessageStatuses(enum.Enum):
|
|||
failed = "failed"
|
||||
|
||||
|
||||
|
||||
@dataclass
|
||||
class MessageInstanceModel(SpiffworkflowBaseDBModel):
|
||||
"""Messages from a process instance that are ready to send to a receiving task."""
|
||||
|
@ -47,10 +45,11 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
|
|||
id: int = db.Column(db.Integer, primary_key=True)
|
||||
process_instance_id: int = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=False) # type: ignore
|
||||
message_model_id: int = db.Column(ForeignKey(MessageModel.id), nullable=False)
|
||||
message_model = relationship("MessageModel")
|
||||
message_correlations_message_instances = relationship(
|
||||
"MessageCorrelationMessageInstanceModel", cascade="delete"
|
||||
)
|
||||
message_model = db.relationship("MessageModel")
|
||||
message_correlations = db.relationship("MessageCorrelationModel",
|
||||
secondary=message_correlation_message_instance_table,
|
||||
backref="message_instances",
|
||||
cascade="all,delete")
|
||||
|
||||
message_type: str = db.Column(db.String(20), nullable=False)
|
||||
payload: str = db.Column(db.JSON)
|
||||
|
@ -59,8 +58,6 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
|
|||
updated_at_in_seconds: int = db.Column(db.Integer)
|
||||
created_at_in_seconds: int = db.Column(db.Integer)
|
||||
|
||||
message_correlations: Optional[dict] = None
|
||||
|
||||
@validates("message_type")
|
||||
def validate_message_type(self, key: str, value: Any) -> Any:
|
||||
"""Validate_message_type."""
|
||||
|
@ -71,6 +68,19 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
|
|||
"""Validate_status."""
|
||||
return self.validate_enum_field(key, value, MessageStatuses)
|
||||
|
||||
def correlates(self, correlation_dictionary):
|
||||
"""Returns true if the given dictionary matches the correlation names and values connected to this message instance"""
|
||||
for c in self.message_correlations:
|
||||
# Fixme: Maybe we should look at typing the correlations and not forcing them to strings?
|
||||
if c.name in correlation_dictionary and str(correlation_dictionary[c.name]) == c.value:
|
||||
continue
|
||||
else:
|
||||
return False
|
||||
return True
|
||||
|
||||
corrs = {}
|
||||
|
||||
|
||||
|
||||
# This runs for ALL db flushes for ANY model, not just this one even if it's in the MessageInstanceModel class
|
||||
# so this may not be worth it or there may be a better way to do it
|
||||
|
|
|
@ -12,6 +12,7 @@ from flask.wrappers import Response
|
|||
|
||||
from spiffworkflow_backend.exceptions.api_error import ApiError
|
||||
from spiffworkflow_backend.models.message_correlation import MessageCorrelationModel
|
||||
from spiffworkflow_backend.models.message_correlation_property import MessageCorrelationPropertyModel
|
||||
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
|
||||
from spiffworkflow_backend.models.message_model import MessageModel
|
||||
from spiffworkflow_backend.models.message_triggerable_process_model import (
|
||||
|
@ -110,79 +111,57 @@ def message_send(
|
|||
raise (
|
||||
ApiError(
|
||||
error_code="missing_payload",
|
||||
message="Body is missing payload.",
|
||||
message="Please include a 'payload' in the JSON body that contains the message contents.",
|
||||
status_code=400,
|
||||
)
|
||||
)
|
||||
|
||||
process_instance = None
|
||||
if "process_instance_id" in body:
|
||||
# to make sure we have a valid process_instance_id
|
||||
process_instance = _find_process_instance_by_id_or_raise(
|
||||
body["process_instance_id"]
|
||||
|
||||
# Is there a running instance that is waiting for this message?
|
||||
message_instances = MessageInstanceModel.query.filter_by(message_model_id=message_model.id).all()
|
||||
correlations = MessageCorrelationPropertyModel.query.filter_by(message_model_id=message_model.id).all()
|
||||
|
||||
# do any waiting message instances have matching correlations?
|
||||
matching_message = None
|
||||
for message_instance in message_instances:
|
||||
if message_instance.correlates(body["payload"]):
|
||||
matching_message = message_instance
|
||||
|
||||
process_instance = None
|
||||
if matching_message:
|
||||
process_instance = ProcessInstanceModel.query.filter_by(id = matching_message.process_instance_id).first()
|
||||
|
||||
if matching_message and process_instance and process_instance.status != ProcessInstanceStatus.waiting.value:
|
||||
ApiError(
|
||||
error_code="message_not_accepted",
|
||||
message=(
|
||||
f"The process that can accept message '{message_identifier}' with the given correlation keys"
|
||||
f" is not currently waiting for that message. It is currently in the a '{process_instance.status}' state."
|
||||
),
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
if process_instance.status == ProcessInstanceStatus.suspended.value:
|
||||
raise ApiError(
|
||||
error_code="process_instance_is_suspended",
|
||||
message=(
|
||||
f"Process Instance '{process_instance.id}' is suspended and cannot"
|
||||
" accept messages.'"
|
||||
),
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
if process_instance.status == ProcessInstanceStatus.terminated.value:
|
||||
raise ApiError(
|
||||
error_code="process_instance_is_terminated",
|
||||
message=(
|
||||
f"Process Instance '{process_instance.id}' is terminated and cannot"
|
||||
" accept messages.'"
|
||||
),
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
message_instance = MessageInstanceModel.query.filter_by(
|
||||
process_instance_id=process_instance.id,
|
||||
message_model_id=message_model.id,
|
||||
message_type="receive",
|
||||
status="ready",
|
||||
).first()
|
||||
if message_instance is None:
|
||||
raise (
|
||||
ApiError(
|
||||
error_code="cannot_find_waiting_message",
|
||||
message=(
|
||||
"Could not find waiting message for identifier"
|
||||
f" {message_identifier} and process instance"
|
||||
f" {process_instance.id}"
|
||||
),
|
||||
status_code=400,
|
||||
)
|
||||
)
|
||||
elif matching_message and process_instance:
|
||||
MessageService.process_message_receive(
|
||||
message_instance, message_model.name, body["payload"]
|
||||
)
|
||||
|
||||
else:
|
||||
# We don't have a process model waiting on this message, perhaps some process should be started?
|
||||
message_triggerable_process_model = (
|
||||
MessageTriggerableProcessModel.query.filter_by(
|
||||
message_model_id=message_model.id
|
||||
).first()
|
||||
)
|
||||
|
||||
if message_triggerable_process_model is None:
|
||||
raise (
|
||||
ApiError(
|
||||
error_code="cannot_start_message",
|
||||
message=(
|
||||
"Message with identifier cannot be start with message:"
|
||||
f" {message_identifier}"
|
||||
),
|
||||
f"No process instances correlate with the given message id of '{message_identifier}'. "
|
||||
f"And this message name is not currently associated with any process Start Event."),
|
||||
status_code=400,
|
||||
)
|
||||
)
|
||||
|
||||
process_instance = MessageService.process_message_triggerable_process_model(
|
||||
message_triggerable_process_model,
|
||||
message_model.name,
|
||||
|
|
|
@ -8,9 +8,6 @@ from sqlalchemy import select
|
|||
|
||||
from spiffworkflow_backend.models.db import db
|
||||
from spiffworkflow_backend.models.message_correlation import MessageCorrelationModel
|
||||
from spiffworkflow_backend.models.message_correlation_message_instance import (
|
||||
MessageCorrelationMessageInstanceModel,
|
||||
)
|
||||
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
|
||||
from spiffworkflow_backend.models.message_triggerable_process_model import (
|
||||
MessageTriggerableProcessModel,
|
||||
|
@ -166,11 +163,8 @@ class MessageService:
|
|||
message_instances_receive: list[MessageInstanceModel],
|
||||
) -> Optional[MessageInstanceModel]:
|
||||
"""Get_message_instance_receive."""
|
||||
message_correlations_send = (
|
||||
MessageCorrelationModel.query.join(MessageCorrelationMessageInstanceModel)
|
||||
.filter_by(message_instance_id=message_instance_send.id)
|
||||
.all()
|
||||
)
|
||||
|
||||
message_correlations_send = message_instance_send.message_correlations
|
||||
|
||||
message_correlation_filter = []
|
||||
for message_correlation_send in message_correlations_send:
|
||||
|
@ -196,7 +190,7 @@ class MessageService:
|
|||
or_(*message_correlation_filter),
|
||||
)
|
||||
)
|
||||
.join(MessageCorrelationMessageInstanceModel) # type: ignore
|
||||
.join(message_correlation_message_instance_table) # type: ignore
|
||||
.filter_by(
|
||||
message_instance_id=message_instance_receive.id,
|
||||
)
|
||||
|
|
|
@ -61,9 +61,6 @@ from spiffworkflow_backend.models.group import GroupModel
|
|||
from spiffworkflow_backend.models.human_task import HumanTaskModel
|
||||
from spiffworkflow_backend.models.human_task_user import HumanTaskUserModel
|
||||
from spiffworkflow_backend.models.message_correlation import MessageCorrelationModel
|
||||
from spiffworkflow_backend.models.message_correlation_message_instance import (
|
||||
MessageCorrelationMessageInstanceModel,
|
||||
)
|
||||
from spiffworkflow_backend.models.message_correlation_property import (
|
||||
MessageCorrelationPropertyModel,
|
||||
)
|
||||
|
@ -1389,7 +1386,7 @@ class ProcessInstanceProcessor:
|
|||
"message_correlation_property": (
|
||||
message_correlation_property
|
||||
),
|
||||
"name": message_correlation_key,
|
||||
"name": message_correlation_property_identifier,
|
||||
"value": message_correlation_property_value,
|
||||
}
|
||||
)
|
||||
|
@ -1399,27 +1396,19 @@ class ProcessInstanceProcessor:
|
|||
message_model_id=message_model.id,
|
||||
payload=bpmn_message.payload,
|
||||
)
|
||||
db.session.add(message_instance)
|
||||
db.session.commit()
|
||||
|
||||
correlation_models = []
|
||||
for message_correlation in message_correlations:
|
||||
message_correlation = MessageCorrelationModel(
|
||||
correlation_models.append(MessageCorrelationModel(
|
||||
process_instance_id=self.process_instance_model.id,
|
||||
message_correlation_property_id=message_correlation[
|
||||
"message_correlation_property"
|
||||
].id,
|
||||
name=message_correlation["name"],
|
||||
value=message_correlation["value"],
|
||||
)
|
||||
db.session.add(message_correlation)
|
||||
db.session.commit()
|
||||
message_correlation_message_instance = (
|
||||
MessageCorrelationMessageInstanceModel(
|
||||
message_instance_id=message_instance.id,
|
||||
message_correlation_id=message_correlation.id,
|
||||
)
|
||||
)
|
||||
db.session.add(message_correlation_message_instance)
|
||||
))
|
||||
message_instance.message_correlations = correlation_models
|
||||
db.session.add(message_instance)
|
||||
db.session.commit()
|
||||
|
||||
def queue_waiting_receive_messages(self) -> None:
|
||||
|
@ -1465,31 +1454,28 @@ class ProcessInstanceProcessor:
|
|||
message_type="receive",
|
||||
message_model_id=message_model.id,
|
||||
)
|
||||
db.session.add(message_instance)
|
||||
|
||||
for (
|
||||
spiff_correlation_property
|
||||
) in waiting_task.task_spec.event_definition.correlation_properties:
|
||||
# NOTE: we may have to cycle through keys here
|
||||
# not sure yet if it's valid for a property to be associated with multiple keys
|
||||
correlation_key_name = spiff_correlation_property.correlation_keys[0]
|
||||
message_correlation = (
|
||||
MessageCorrelationModel.query.filter_by(
|
||||
process_instance_id=self.process_instance_model.id,
|
||||
name=correlation_key_name,
|
||||
)
|
||||
.join(MessageCorrelationPropertyModel)
|
||||
.filter_by(identifier=spiff_correlation_property.name)
|
||||
.first()
|
||||
)
|
||||
message_correlation_message_instance = (
|
||||
MessageCorrelationMessageInstanceModel(
|
||||
message_instance_id=message_instance.id,
|
||||
message_correlation_id=message_correlation.id,
|
||||
)
|
||||
)
|
||||
db.session.add(message_correlation_message_instance)
|
||||
message_correlation = next((mc for mc in message_instance.message_correlations
|
||||
if mc.name == spiff_correlation_property.name), None)
|
||||
if not message_correlation:
|
||||
expression = spiff_correlation_property.expression
|
||||
correlation_value = ProcessInstanceProcessor._script_engine.evaluate(waiting_task, expression)
|
||||
correlation_name = spiff_correlation_property.name
|
||||
message_prop = MessageCorrelationPropertyModel.query.\
|
||||
filter_by(identifier=correlation_name).\
|
||||
filter_by(message_model_id=message_model.id).first()
|
||||
|
||||
message_correlation = MessageCorrelationModel(
|
||||
process_instance_id=self.process_instance_model.id,
|
||||
message_correlation_property_id=message_prop.id,
|
||||
name=correlation_name,
|
||||
value=correlation_value,
|
||||
)
|
||||
message_instance.message_correlations.append(message_correlation)
|
||||
db.session.add(message_instance)
|
||||
db.session.commit()
|
||||
|
||||
def increment_spiff_step(self) -> None:
|
||||
|
|
|
@ -176,7 +176,7 @@ class SpecFileService(FileSystemService):
|
|||
file_type = FileSystemService.file_type(file_name)
|
||||
if file_type.value == FileType.bpmn.value:
|
||||
validator = BpmnValidator()
|
||||
parser = MyCustomParser(validator=validator)
|
||||
parser = MyCustomParser()
|
||||
try:
|
||||
parser.add_bpmn_xml(
|
||||
cls.get_etree_from_xml_bytes(binary_data), filename=file_name
|
||||
|
|
|
@ -0,0 +1,153 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:spiffworkflow="http://spiffworkflow.org/bpmn/schema/1.0/core" id="Definitions_96f6665" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.0.0-dev">
|
||||
<bpmn:collaboration id="Collaboration_0oye1os">
|
||||
<bpmn:participant id="message_initiator" name="Message Initiator" processRef="message_send_process" />
|
||||
<bpmn:participant id="message-receiver" name="Message Receiver" />
|
||||
<bpmn:messageFlow id="message_send_flow" name="Message Send Flow" sourceRef="send_message" targetRef="message-receiver" />
|
||||
<bpmn:messageFlow id="message_response_flow" name="Message Response Flow" sourceRef="message-receiver" targetRef="receive_message_response" />
|
||||
<bpmn:textAnnotation id="TextAnnotation_0oxbpew">
|
||||
<bpmn:text>The messages sent here are about an Invoice that can be uniquely identified by the customer_id ("sartography") and a purchase order number (1001)
|
||||
|
||||
It will fire a message connected to the invoice keys above, starting another process, which can communicate back to this specific process instance using the correct key.</bpmn:text>
|
||||
</bpmn:textAnnotation>
|
||||
<bpmn:association id="Association_1d6q7zd" sourceRef="message_initiator" targetRef="TextAnnotation_0oxbpew" />
|
||||
<bpmn:correlationKey name="invoice">
|
||||
<bpmn:correlationPropertyRef>po_number</bpmn:correlationPropertyRef>
|
||||
<bpmn:correlationPropertyRef>customer_id</bpmn:correlationPropertyRef>
|
||||
</bpmn:correlationKey>
|
||||
</bpmn:collaboration>
|
||||
<bpmn:correlationProperty id="po_number" name="Purchase Order Number">
|
||||
<bpmn:correlationPropertyRetrievalExpression messageRef="request_approval">
|
||||
<bpmn:formalExpression>po_number</bpmn:formalExpression>
|
||||
</bpmn:correlationPropertyRetrievalExpression>
|
||||
<bpmn:correlationPropertyRetrievalExpression messageRef="approval_result">
|
||||
<bpmn:formalExpression>po_number</bpmn:formalExpression>
|
||||
</bpmn:correlationPropertyRetrievalExpression>
|
||||
</bpmn:correlationProperty>
|
||||
<bpmn:correlationProperty id="customer_id" name="Customer ID">
|
||||
<bpmn:correlationPropertyRetrievalExpression messageRef="request_approval">
|
||||
<bpmn:formalExpression>customer_id</bpmn:formalExpression>
|
||||
</bpmn:correlationPropertyRetrievalExpression>
|
||||
<bpmn:correlationPropertyRetrievalExpression messageRef="approval_result">
|
||||
<bpmn:formalExpression>customer_id</bpmn:formalExpression>
|
||||
</bpmn:correlationPropertyRetrievalExpression>
|
||||
</bpmn:correlationProperty>
|
||||
<bpmn:process id="message_send_process" name="Message Send Process" isExecutable="true">
|
||||
<bpmn:startEvent id="StartEvent_1">
|
||||
<bpmn:outgoing>Flow_10conab</bpmn:outgoing>
|
||||
</bpmn:startEvent>
|
||||
<bpmn:sequenceFlow id="Flow_037vpjk" sourceRef="send_message" targetRef="receive_message_response" />
|
||||
<bpmn:sequenceFlow id="Flow_1qgz6p0" sourceRef="receive_message_response" targetRef="Event_0kndoyu" />
|
||||
<bpmn:sequenceFlow id="Flow_10conab" sourceRef="StartEvent_1" targetRef="invoice_form" />
|
||||
<bpmn:endEvent id="Event_0kndoyu">
|
||||
<bpmn:incoming>Flow_1qgz6p0</bpmn:incoming>
|
||||
</bpmn:endEvent>
|
||||
<bpmn:intermediateCatchEvent id="receive_message_response" name="Receive Approval Result">
|
||||
<bpmn:incoming>Flow_037vpjk</bpmn:incoming>
|
||||
<bpmn:outgoing>Flow_1qgz6p0</bpmn:outgoing>
|
||||
<bpmn:messageEventDefinition id="MessageEventDefinition_1l3n0zr" messageRef="approval_result" />
|
||||
</bpmn:intermediateCatchEvent>
|
||||
<bpmn:sendTask id="send_message" name="Request Approval" messageRef="request_approval">
|
||||
<bpmn:extensionElements>
|
||||
<spiffworkflow:preScript>the_topic = "first_conversation" </spiffworkflow:preScript>
|
||||
</bpmn:extensionElements>
|
||||
<bpmn:incoming>Flow_02lw0q9</bpmn:incoming>
|
||||
<bpmn:outgoing>Flow_037vpjk</bpmn:outgoing>
|
||||
</bpmn:sendTask>
|
||||
<bpmn:sequenceFlow id="Flow_02lw0q9" sourceRef="invoice_form" targetRef="send_message" />
|
||||
<bpmn:userTask id="invoice_form" name="Create Invoice">
|
||||
<bpmn:extensionElements>
|
||||
<spiffworkflow:properties>
|
||||
<spiffworkflow:property name="formJsonSchemaFilename" value="invoice_form.json" />
|
||||
<spiffworkflow:property name="formUiSchemaFilename" value="invoice_ui.json" />
|
||||
</spiffworkflow:properties>
|
||||
</bpmn:extensionElements>
|
||||
<bpmn:incoming>Flow_10conab</bpmn:incoming>
|
||||
<bpmn:outgoing>Flow_02lw0q9</bpmn:outgoing>
|
||||
</bpmn:userTask>
|
||||
</bpmn:process>
|
||||
<bpmn:message id="request_approval" name="Request Approval">
|
||||
<bpmn:extensionElements>
|
||||
<spiffworkflow:messagePayload>{
|
||||
"customer_id": customer_id,
|
||||
"po_number": po_number,
|
||||
"amount": amount,
|
||||
"description": description,
|
||||
}</spiffworkflow:messagePayload>
|
||||
</bpmn:extensionElements>
|
||||
</bpmn:message>
|
||||
<bpmn:message id="approval_result" name="Approval Result">
|
||||
<bpmn:extensionElements>
|
||||
<spiffworkflow:messageVariable>the_payload</spiffworkflow:messageVariable>
|
||||
</bpmn:extensionElements>
|
||||
</bpmn:message>
|
||||
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
|
||||
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Collaboration_0oye1os">
|
||||
<bpmndi:BPMNShape id="Participant_0bjh770_di" bpmnElement="message_initiator" isHorizontal="true">
|
||||
<dc:Bounds x="120" y="52" width="600" height="338" />
|
||||
<bpmndi:BPMNLabel />
|
||||
</bpmndi:BPMNShape>
|
||||
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
|
||||
<dc:Bounds x="179" y="159" width="36" height="36" />
|
||||
</bpmndi:BPMNShape>
|
||||
<bpmndi:BPMNShape id="Event_0kndoyu_di" bpmnElement="Event_0kndoyu">
|
||||
<dc:Bounds x="622" y="159" width="36" height="36" />
|
||||
</bpmndi:BPMNShape>
|
||||
<bpmndi:BPMNShape id="Event_0yt48xb_di" bpmnElement="receive_message_response">
|
||||
<dc:Bounds x="532" y="159" width="36" height="36" />
|
||||
<bpmndi:BPMNLabel>
|
||||
<dc:Bounds x="508" y="129" width="86" height="27" />
|
||||
</bpmndi:BPMNLabel>
|
||||
</bpmndi:BPMNShape>
|
||||
<bpmndi:BPMNShape id="Activity_0vm33bu_di" bpmnElement="send_message">
|
||||
<dc:Bounds x="390" y="137" width="100" height="80" />
|
||||
<bpmndi:BPMNLabel />
|
||||
</bpmndi:BPMNShape>
|
||||
<bpmndi:BPMNShape id="Activity_0798vfz_di" bpmnElement="invoice_form">
|
||||
<dc:Bounds x="240" y="137" width="100" height="80" />
|
||||
<bpmndi:BPMNLabel />
|
||||
</bpmndi:BPMNShape>
|
||||
<bpmndi:BPMNEdge id="Flow_037vpjk_di" bpmnElement="Flow_037vpjk">
|
||||
<di:waypoint x="490" y="177" />
|
||||
<di:waypoint x="532" y="177" />
|
||||
</bpmndi:BPMNEdge>
|
||||
<bpmndi:BPMNEdge id="Flow_1qgz6p0_di" bpmnElement="Flow_1qgz6p0">
|
||||
<di:waypoint x="568" y="177" />
|
||||
<di:waypoint x="622" y="177" />
|
||||
</bpmndi:BPMNEdge>
|
||||
<bpmndi:BPMNEdge id="Flow_10conab_di" bpmnElement="Flow_10conab">
|
||||
<di:waypoint x="215" y="177" />
|
||||
<di:waypoint x="240" y="177" />
|
||||
</bpmndi:BPMNEdge>
|
||||
<bpmndi:BPMNEdge id="Flow_02lw0q9_di" bpmnElement="Flow_02lw0q9">
|
||||
<di:waypoint x="340" y="177" />
|
||||
<di:waypoint x="390" y="177" />
|
||||
</bpmndi:BPMNEdge>
|
||||
<bpmndi:BPMNShape id="Participant_158b3ei_di" bpmnElement="message-receiver" isHorizontal="true">
|
||||
<dc:Bounds x="120" y="350" width="600" height="60" />
|
||||
</bpmndi:BPMNShape>
|
||||
<bpmndi:BPMNShape id="TextAnnotation_0oxbpew_di" bpmnElement="TextAnnotation_0oxbpew">
|
||||
<dc:Bounds x="760" y="-30" width="226.98863220214844" height="155.9943084716797" />
|
||||
<bpmndi:BPMNLabel />
|
||||
</bpmndi:BPMNShape>
|
||||
<bpmndi:BPMNEdge id="Association_1d6q7zd_di" bpmnElement="Association_1d6q7zd">
|
||||
<di:waypoint x="699" y="52" />
|
||||
<di:waypoint x="760" y="15" />
|
||||
</bpmndi:BPMNEdge>
|
||||
<bpmndi:BPMNEdge id="Flow_1ueajoz_di" bpmnElement="message_send_flow">
|
||||
<di:waypoint x="410" y="217" />
|
||||
<di:waypoint x="410" y="350" />
|
||||
<bpmndi:BPMNLabel>
|
||||
<dc:Bounds x="413" y="302" width="74" height="27" />
|
||||
</bpmndi:BPMNLabel>
|
||||
</bpmndi:BPMNEdge>
|
||||
<bpmndi:BPMNEdge id="Flow_1n96n67_di" bpmnElement="message_response_flow">
|
||||
<di:waypoint x="550" y="350" />
|
||||
<di:waypoint x="550" y="195" />
|
||||
<bpmndi:BPMNLabel>
|
||||
<dc:Bounds x="552" y="294" width="77" height="27" />
|
||||
</bpmndi:BPMNLabel>
|
||||
</bpmndi:BPMNEdge>
|
||||
</bpmndi:BPMNPlane>
|
||||
</bpmndi:BPMNDiagram>
|
||||
</bpmn:definitions>
|
|
@ -5,9 +5,6 @@ from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
|||
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
|
||||
|
||||
from spiffworkflow_backend.models.message_correlation import MessageCorrelationModel
|
||||
from spiffworkflow_backend.models.message_correlation_message_instance import (
|
||||
MessageCorrelationMessageInstanceModel,
|
||||
)
|
||||
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||
from spiffworkflow_backend.models.user import UserModel
|
||||
|
|
|
@ -0,0 +1,123 @@
|
|||
"""Test_message_service."""
|
||||
import pytest
|
||||
from flask import Flask
|
||||
from flask.testing import FlaskClient
|
||||
|
||||
from spiffworkflow_backend.exceptions.api_error import ApiError
|
||||
from spiffworkflow_backend.routes.messages_controller import message_send
|
||||
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
||||
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
|
||||
|
||||
from spiffworkflow_backend.models.message_correlation import MessageCorrelationModel
|
||||
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||
from spiffworkflow_backend.models.user import UserModel
|
||||
from spiffworkflow_backend.services.message_service import MessageService
|
||||
from spiffworkflow_backend.services.process_instance_processor import (
|
||||
ProcessInstanceProcessor,
|
||||
)
|
||||
from spiffworkflow_backend.services.process_instance_service import (
|
||||
ProcessInstanceService,
|
||||
)
|
||||
|
||||
|
||||
class TestMessageService(BaseTest):
|
||||
"""TestMessageService."""
|
||||
|
||||
def test_message_sent(
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""This example workflow will send a message called 'request_approval' and then wait for a response messge
|
||||
of 'approval_result. This test assures that it will fire the message with the correct correlation properties
|
||||
and will respond only to a message called "approval_result' that has the matching correlation properties."""
|
||||
process_group_id = "test_group"
|
||||
self.create_process_group(
|
||||
client, with_super_admin_user, process_group_id, process_group_id
|
||||
)
|
||||
|
||||
process_model = load_test_spec(
|
||||
"test_group/message",
|
||||
process_model_source_directory="message",
|
||||
bpmn_file_name="message_send_receive.bpmn",
|
||||
)
|
||||
|
||||
self.process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier(
|
||||
process_model.id,
|
||||
with_super_admin_user,
|
||||
)
|
||||
processor_send_receive = ProcessInstanceProcessor(self.process_instance)
|
||||
processor_send_receive.do_engine_steps(save=True)
|
||||
task = processor_send_receive.get_all_user_tasks()[0]
|
||||
human_task = self.process_instance.active_human_tasks[0]
|
||||
spiff_task = processor_send_receive.__class__.get_task_by_bpmn_identifier(
|
||||
human_task.task_name, processor_send_receive.bpmn_process_instance
|
||||
)
|
||||
self.payload = {
|
||||
"customer_id": "Sartography",
|
||||
"po_number": 1001,
|
||||
"description": "We build a new feature for messages!",
|
||||
"amount": "100.00"
|
||||
}
|
||||
|
||||
ProcessInstanceService.complete_form_task(
|
||||
processor_send_receive,
|
||||
task,
|
||||
self.payload,
|
||||
with_super_admin_user,
|
||||
human_task,
|
||||
)
|
||||
processor_send_receive.save()
|
||||
self.assure_a_message_was_sent()
|
||||
self.assure_there_is_a_process_waiting_on_a_message()
|
||||
|
||||
## Should return an error when making an API call for the wrong po number
|
||||
with pytest.raises(ApiError):
|
||||
message_send("approval_result", {'payload': {'po_number' : 5001}})
|
||||
|
||||
## Sound return an error when making an API call for right po number, wrong client
|
||||
with pytest.raises(ApiError):
|
||||
message_send("approval_result", {'payload': {'po_number' : 1001, 'customer_id': 'jon'}})
|
||||
|
||||
## No error when calling with the correct parameters
|
||||
response = message_send("approval_result", {'payload': {'po_number' : 1001, 'customer_id': 'Sartography'}})
|
||||
|
||||
def assure_a_message_was_sent(self):
|
||||
# There should be one new send message for the given process instance.
|
||||
send_messages = MessageInstanceModel.query. \
|
||||
filter_by(message_type = "send"). \
|
||||
filter_by(process_instance_id = self.process_instance.id).all()
|
||||
assert len(send_messages) == 1
|
||||
send_message = send_messages[0]
|
||||
|
||||
# The payload should match because of how it is written in the Send task.
|
||||
assert send_message.payload == self.payload, "The send message should match up with the payload"
|
||||
assert send_message.message_model.identifier == "request_approval"
|
||||
assert send_message.status == "ready"
|
||||
assert len(send_message.message_correlations) == 2
|
||||
message_instance_result = MessageInstanceModel.query.all()
|
||||
self.assure_correlation_properties_are_right(send_message)
|
||||
|
||||
def assure_there_is_a_process_waiting_on_a_message(self):
|
||||
# There should be one new send message for the given process instance.
|
||||
waiting_messages = MessageInstanceModel.query. \
|
||||
filter_by(message_type = "receive"). \
|
||||
filter_by(process_instance_id = self.process_instance.id).all()
|
||||
assert len(waiting_messages) == 1
|
||||
waiting_message = waiting_messages[0]
|
||||
self.assure_correlation_properties_are_right(waiting_message)
|
||||
|
||||
|
||||
|
||||
def assure_correlation_properties_are_right(self, message):
|
||||
# Correlation Properties should match up
|
||||
po_curr = next(c for c in message.message_correlations if c.name == "po_number")
|
||||
customer_curr = next(c for c in message.message_correlations if c.name == "customer_id")
|
||||
assert po_curr is not None
|
||||
assert customer_curr is not None
|
||||
assert po_curr.value == '1001'
|
||||
assert customer_curr.value == "Sartography"
|
||||
|
Loading…
Reference in New Issue