This commit is contained in:
Dan 2023-02-24 15:02:03 -05:00
parent d9ab0fde32
commit c0bbba6f85
6 changed files with 58 additions and 44 deletions

View File

@ -1,12 +1,8 @@
"""Message_correlation."""
from dataclasses import dataclass
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
@dataclass

View File

@ -51,7 +51,8 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
message_type: str = db.Column(db.String(20), nullable=False)
# Only Send Messages have a payload
payload: dict = db.Column(db.JSON)
correlation_keys: dict = db.Column(db.JSON) # The correlation keys of the process at the time the message was created.
# The correlation keys of the process at the time the message was created.
correlation_keys: dict = db.Column(db.JSON)
status: str = db.Column(db.String(20), nullable=False, default="ready")
user_id: int = db.Column(ForeignKey(UserModel.id), nullable=False) # type: ignore
user = relationship("UserModel")
@ -75,16 +76,15 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
"""Validate_status."""
return self.validate_enum_field(key, value, MessageStatuses)
def correlates(
self, other: Any, expression_engine: PythonScriptEngine
) -> bool:
""" Returns true if the this Message correlates with the given message.
def correlates(self, other: Any, expression_engine: PythonScriptEngine) -> bool:
"""Returns true if the this Message correlates with the given message.
This must be a 'receive' message, and the other must be a 'send' or vice/versa. We evaluate
the other messages payload and run our correlation's retrieval expressions against it, then
compare it against our expected values (as stored in this messages' correlation_keys)
IF we don't have an expected value, we accept any non-error result from the retrieval
expression. """
This must be a 'receive' message, and the other must be a 'send' or vice/versa.
If both messages have identical correlation_keys, they are a match. Otherwise
we check through this messages correlation properties and use the retrieval expressions
to extract the correlation keys from the send's payload, and verify that these
match up with correlation keys on this message.
"""
if self.is_send() and other.is_receive():
# Flip the call.
return other.correlates(self, expression_engine) # type: ignore
@ -93,33 +93,44 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
return False
if not self.is_receive():
return False
if isinstance(self.correlation_keys, dict) and self.correlation_keys == other.correlation_keys:
if (
isinstance(self.correlation_keys, dict)
and self.correlation_keys == other.correlation_keys
):
# We know we have a match, and we can just return if we don't have to figure out the key
return True
# Loop over the receives' correlation keys - if any of the keys fully match, then we match.
for expected_values in self.correlation_keys.values():
if self.payload_matches_expected_values(other.payload, expected_values, expression_engine):
if self.payload_matches_expected_values(
other.payload, expected_values, expression_engine
):
return True
return False
def is_receive(self):
def is_receive(self) -> bool:
return self.message_type == MessageTypes.receive.value
def is_send(self):
def is_send(self) -> bool:
return self.message_type == MessageTypes.send.value
def payload_matches_expected_values(
self, payload: dict,
self,
payload: dict,
expected_values: dict,
expression_engine: PythonScriptEngine) -> bool:
expression_engine: PythonScriptEngine,
) -> bool:
"""Compares the payload of a 'send' message against a single correlation key's expected values."""
for correlation_key in self.correlation_rules:
expected_value = expected_values.get(correlation_key.name, None)
if expected_value is None: # This key is not required for this instance to match.
if (
expected_value is None
): # This key is not required for this instance to match.
continue
try:
result = expression_engine._evaluate(correlation_key.retrieval_expression, payload)
result = expression_engine._evaluate(
correlation_key.retrieval_expression, payload
)
except Exception:
# the failure of a payload evaluation may not mean that matches for these
# message instances can't happen with other messages. So don't error up.

View File

@ -94,7 +94,7 @@ class ErrorHandlingService:
message_type="send",
name=message_name,
payload=message_payload,
user_id=g.user.id
user_id=g.user.id,
)
db.session.add(message_instance)
db.session.commit()

View File

@ -62,7 +62,7 @@ from spiffworkflow_backend.models.human_task import HumanTaskModel
from spiffworkflow_backend.models.human_task_user import HumanTaskUserModel
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_instance_correlation import (
MessageInstanceCorrelationRuleModel
MessageInstanceCorrelationRuleModel,
)
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
@ -1352,7 +1352,7 @@ class ProcessInstanceProcessor:
message_type="send",
name=bpmn_message.name,
payload=bpmn_message.payload,
correlation_keys=self.bpmn_process_instance.correlations
correlation_keys=self.bpmn_process_instance.correlations,
)
db.session.add(message_instance)
db.session.commit()
@ -1382,7 +1382,7 @@ class ProcessInstanceProcessor:
user_id=self.process_instance_model.process_initiator_id,
message_type="receive",
name=event["name"],
correlation_keys=self.bpmn_process_instance.correlations
correlation_keys=self.bpmn_process_instance.correlations,
)
for correlation_property in event["value"]:
message_correlation = MessageInstanceCorrelationRuleModel(

View File

@ -8,7 +8,9 @@ from typing import Optional
from lxml import etree # type: ignore
from SpiffWorkflow.bpmn.parser.BpmnParser import BpmnValidator # type: ignore
from spiffworkflow_backend.models.correlation_property_cache import CorrelationPropertyCache
from spiffworkflow_backend.models.correlation_property_cache import (
CorrelationPropertyCache,
)
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.file import File
from spiffworkflow_backend.models.file import FileType
@ -375,29 +377,27 @@ class SpecFileService(FileSystemService):
def update_correlation_cache(ref: SpecReference) -> None:
"""Update_correlation_cache."""
for name in ref.correlations.keys():
correlation_property_retrieval_expressions = ref.correlations[
name
]["retrieval_expressions"]
correlation_property_retrieval_expressions = ref.correlations[name][
"retrieval_expressions"
]
for cpre in correlation_property_retrieval_expressions:
message_name = ref.messages.get(cpre["messageRef"], None)
retrieval_expression = cpre["expression"]
process_model_id = ref.process_model_id
existing = (
CorrelationPropertyCache.query.filter_by(
existing = CorrelationPropertyCache.query.filter_by(
name=name,
message_name=message_name,
process_model_id=process_model_id,
retrieval_expression=retrieval_expression
retrieval_expression=retrieval_expression,
).first()
)
if existing is None:
new_cache = CorrelationPropertyCache(
name=name,
message_name=message_name,
process_model_id=process_model_id,
retrieval_expression=retrieval_expression
retrieval_expression=retrieval_expression,
)
db.session.add(new_cache)
db.session.commit()

View File

@ -105,7 +105,9 @@ class TestMessageService(BaseTest):
)
# Now start the main process
self.start_sender_process(client, with_super_admin_user, "test_between_processes")
self.start_sender_process(
client, with_super_admin_user, "test_between_processes"
)
self.assure_a_message_was_sent()
# This is typically called in a background cron process, so we will manually call it
@ -145,7 +147,10 @@ class TestMessageService(BaseTest):
assert message_receiver_process.status == "complete"
def start_sender_process(
self, client: FlaskClient, with_super_admin_user: UserModel, group_name:str = "test_group"
self,
client: FlaskClient,
with_super_admin_user: UserModel,
group_name: str = "test_group",
) -> None:
process_group_id = group_name
self.create_process_group(
@ -208,7 +213,9 @@ class TestMessageService(BaseTest):
) -> None:
# Correlation Properties should match up
po_curr = next(c for c in message.correlation_rules if c.name == "po_number")
customer_curr = next(c for c in message.correlation_rules if c.name == "customer_id")
customer_curr = next(
c for c in message.correlation_rules if c.name == "customer_id"
)
assert po_curr is not None
assert customer_curr is not None