run_pyl
This commit is contained in:
parent
fbe2237f1c
commit
7c74e216a2
|
@ -1,12 +1,8 @@
|
||||||
"""Message_correlation."""
|
"""Message_correlation."""
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
|
||||||
from sqlalchemy import ForeignKey
|
|
||||||
from sqlalchemy.orm import relationship
|
|
||||||
|
|
||||||
from spiffworkflow_backend.models.db import db
|
from spiffworkflow_backend.models.db import db
|
||||||
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
|
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
|
||||||
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
|
|
|
@ -51,7 +51,8 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
|
||||||
message_type: str = db.Column(db.String(20), nullable=False)
|
message_type: str = db.Column(db.String(20), nullable=False)
|
||||||
# Only Send Messages have a payload
|
# Only Send Messages have a payload
|
||||||
payload: dict = db.Column(db.JSON)
|
payload: dict = db.Column(db.JSON)
|
||||||
correlation_keys: dict = db.Column(db.JSON) # The correlation keys of the process at the time the message was created.
|
# The correlation keys of the process at the time the message was created.
|
||||||
|
correlation_keys: dict = db.Column(db.JSON)
|
||||||
status: str = db.Column(db.String(20), nullable=False, default="ready")
|
status: str = db.Column(db.String(20), nullable=False, default="ready")
|
||||||
user_id: int = db.Column(ForeignKey(UserModel.id), nullable=False) # type: ignore
|
user_id: int = db.Column(ForeignKey(UserModel.id), nullable=False) # type: ignore
|
||||||
user = relationship("UserModel")
|
user = relationship("UserModel")
|
||||||
|
@ -75,16 +76,15 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
|
||||||
"""Validate_status."""
|
"""Validate_status."""
|
||||||
return self.validate_enum_field(key, value, MessageStatuses)
|
return self.validate_enum_field(key, value, MessageStatuses)
|
||||||
|
|
||||||
def correlates(
|
def correlates(self, other: Any, expression_engine: PythonScriptEngine) -> bool:
|
||||||
self, other: Any, expression_engine: PythonScriptEngine
|
"""Returns true if the this Message correlates with the given message.
|
||||||
) -> bool:
|
|
||||||
""" Returns true if the this Message correlates with the given message.
|
|
||||||
|
|
||||||
This must be a 'receive' message, and the other must be a 'send' or vice/versa. We evaluate
|
This must be a 'receive' message, and the other must be a 'send' or vice/versa.
|
||||||
the other messages payload and run our correlation's retrieval expressions against it, then
|
If both messages have identical correlation_keys, they are a match. Otherwise
|
||||||
compare it against our expected values (as stored in this messages' correlation_keys)
|
we check through this messages correlation properties and use the retrieval expressions
|
||||||
IF we don't have an expected value, we accept any non-error result from the retrieval
|
to extract the correlation keys from the send's payload, and verify that these
|
||||||
expression. """
|
match up with correlation keys on this message.
|
||||||
|
"""
|
||||||
if self.is_send() and other.is_receive():
|
if self.is_send() and other.is_receive():
|
||||||
# Flip the call.
|
# Flip the call.
|
||||||
return other.correlates(self, expression_engine) # type: ignore
|
return other.correlates(self, expression_engine) # type: ignore
|
||||||
|
@ -93,33 +93,44 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
|
||||||
return False
|
return False
|
||||||
if not self.is_receive():
|
if not self.is_receive():
|
||||||
return False
|
return False
|
||||||
if isinstance(self.correlation_keys, dict) and self.correlation_keys == other.correlation_keys:
|
if (
|
||||||
|
isinstance(self.correlation_keys, dict)
|
||||||
|
and self.correlation_keys == other.correlation_keys
|
||||||
|
):
|
||||||
# We know we have a match, and we can just return if we don't have to figure out the key
|
# We know we have a match, and we can just return if we don't have to figure out the key
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# Loop over the receives' correlation keys - if any of the keys fully match, then we match.
|
# Loop over the receives' correlation keys - if any of the keys fully match, then we match.
|
||||||
for expected_values in self.correlation_keys.values():
|
for expected_values in self.correlation_keys.values():
|
||||||
if self.payload_matches_expected_values(other.payload, expected_values, expression_engine):
|
if self.payload_matches_expected_values(
|
||||||
|
other.payload, expected_values, expression_engine
|
||||||
|
):
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def is_receive(self):
|
def is_receive(self) -> bool:
|
||||||
return self.message_type == MessageTypes.receive.value
|
return self.message_type == MessageTypes.receive.value
|
||||||
|
|
||||||
def is_send(self):
|
def is_send(self) -> bool:
|
||||||
return self.message_type == MessageTypes.send.value
|
return self.message_type == MessageTypes.send.value
|
||||||
|
|
||||||
def payload_matches_expected_values(
|
def payload_matches_expected_values(
|
||||||
self, payload: dict,
|
self,
|
||||||
expected_values: dict,
|
payload: dict,
|
||||||
expression_engine: PythonScriptEngine) -> bool:
|
expected_values: dict,
|
||||||
|
expression_engine: PythonScriptEngine,
|
||||||
|
) -> bool:
|
||||||
"""Compares the payload of a 'send' message against a single correlation key's expected values."""
|
"""Compares the payload of a 'send' message against a single correlation key's expected values."""
|
||||||
for correlation_key in self.correlation_rules:
|
for correlation_key in self.correlation_rules:
|
||||||
expected_value = expected_values.get(correlation_key.name, None)
|
expected_value = expected_values.get(correlation_key.name, None)
|
||||||
if expected_value is None: # This key is not required for this instance to match.
|
if (
|
||||||
|
expected_value is None
|
||||||
|
): # This key is not required for this instance to match.
|
||||||
continue
|
continue
|
||||||
try:
|
try:
|
||||||
result = expression_engine._evaluate(correlation_key.retrieval_expression, payload)
|
result = expression_engine._evaluate(
|
||||||
|
correlation_key.retrieval_expression, payload
|
||||||
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
# the failure of a payload evaluation may not mean that matches for these
|
# the failure of a payload evaluation may not mean that matches for these
|
||||||
# message instances can't happen with other messages. So don't error up.
|
# message instances can't happen with other messages. So don't error up.
|
||||||
|
|
|
@ -94,7 +94,7 @@ class ErrorHandlingService:
|
||||||
message_type="send",
|
message_type="send",
|
||||||
name=message_name,
|
name=message_name,
|
||||||
payload=message_payload,
|
payload=message_payload,
|
||||||
user_id=g.user.id
|
user_id=g.user.id,
|
||||||
)
|
)
|
||||||
db.session.add(message_instance)
|
db.session.add(message_instance)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
|
@ -62,7 +62,7 @@ from spiffworkflow_backend.models.human_task import HumanTaskModel
|
||||||
from spiffworkflow_backend.models.human_task_user import HumanTaskUserModel
|
from spiffworkflow_backend.models.human_task_user import HumanTaskUserModel
|
||||||
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
|
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
|
||||||
from spiffworkflow_backend.models.message_instance_correlation import (
|
from spiffworkflow_backend.models.message_instance_correlation import (
|
||||||
MessageInstanceCorrelationRuleModel
|
MessageInstanceCorrelationRuleModel,
|
||||||
)
|
)
|
||||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
|
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
|
||||||
|
@ -1352,7 +1352,7 @@ class ProcessInstanceProcessor:
|
||||||
message_type="send",
|
message_type="send",
|
||||||
name=bpmn_message.name,
|
name=bpmn_message.name,
|
||||||
payload=bpmn_message.payload,
|
payload=bpmn_message.payload,
|
||||||
correlation_keys=self.bpmn_process_instance.correlations
|
correlation_keys=self.bpmn_process_instance.correlations,
|
||||||
)
|
)
|
||||||
db.session.add(message_instance)
|
db.session.add(message_instance)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
@ -1382,7 +1382,7 @@ class ProcessInstanceProcessor:
|
||||||
user_id=self.process_instance_model.process_initiator_id,
|
user_id=self.process_instance_model.process_initiator_id,
|
||||||
message_type="receive",
|
message_type="receive",
|
||||||
name=event["name"],
|
name=event["name"],
|
||||||
correlation_keys=self.bpmn_process_instance.correlations
|
correlation_keys=self.bpmn_process_instance.correlations,
|
||||||
)
|
)
|
||||||
for correlation_property in event["value"]:
|
for correlation_property in event["value"]:
|
||||||
message_correlation = MessageInstanceCorrelationRuleModel(
|
message_correlation = MessageInstanceCorrelationRuleModel(
|
||||||
|
|
|
@ -8,7 +8,9 @@ from typing import Optional
|
||||||
from lxml import etree # type: ignore
|
from lxml import etree # type: ignore
|
||||||
from SpiffWorkflow.bpmn.parser.BpmnParser import BpmnValidator # type: ignore
|
from SpiffWorkflow.bpmn.parser.BpmnParser import BpmnValidator # type: ignore
|
||||||
|
|
||||||
from spiffworkflow_backend.models.correlation_property_cache import CorrelationPropertyCache
|
from spiffworkflow_backend.models.correlation_property_cache import (
|
||||||
|
CorrelationPropertyCache,
|
||||||
|
)
|
||||||
from spiffworkflow_backend.models.db import db
|
from spiffworkflow_backend.models.db import db
|
||||||
from spiffworkflow_backend.models.file import File
|
from spiffworkflow_backend.models.file import File
|
||||||
from spiffworkflow_backend.models.file import FileType
|
from spiffworkflow_backend.models.file import FileType
|
||||||
|
@ -375,29 +377,27 @@ class SpecFileService(FileSystemService):
|
||||||
def update_correlation_cache(ref: SpecReference) -> None:
|
def update_correlation_cache(ref: SpecReference) -> None:
|
||||||
"""Update_correlation_cache."""
|
"""Update_correlation_cache."""
|
||||||
for name in ref.correlations.keys():
|
for name in ref.correlations.keys():
|
||||||
correlation_property_retrieval_expressions = ref.correlations[
|
correlation_property_retrieval_expressions = ref.correlations[name][
|
||||||
name
|
"retrieval_expressions"
|
||||||
]["retrieval_expressions"]
|
]
|
||||||
|
|
||||||
for cpre in correlation_property_retrieval_expressions:
|
for cpre in correlation_property_retrieval_expressions:
|
||||||
message_name = ref.messages.get(cpre["messageRef"], None)
|
message_name = ref.messages.get(cpre["messageRef"], None)
|
||||||
retrieval_expression = cpre["expression"]
|
retrieval_expression = cpre["expression"]
|
||||||
process_model_id = ref.process_model_id
|
process_model_id = ref.process_model_id
|
||||||
|
|
||||||
existing = (
|
existing = CorrelationPropertyCache.query.filter_by(
|
||||||
CorrelationPropertyCache.query.filter_by(
|
name=name,
|
||||||
name=name,
|
message_name=message_name,
|
||||||
message_name=message_name,
|
process_model_id=process_model_id,
|
||||||
process_model_id=process_model_id,
|
retrieval_expression=retrieval_expression,
|
||||||
retrieval_expression=retrieval_expression
|
).first()
|
||||||
).first()
|
|
||||||
)
|
|
||||||
if existing is None:
|
if existing is None:
|
||||||
new_cache = CorrelationPropertyCache(
|
new_cache = CorrelationPropertyCache(
|
||||||
name=name,
|
name=name,
|
||||||
message_name=message_name,
|
message_name=message_name,
|
||||||
process_model_id=process_model_id,
|
process_model_id=process_model_id,
|
||||||
retrieval_expression=retrieval_expression
|
retrieval_expression=retrieval_expression,
|
||||||
)
|
)
|
||||||
db.session.add(new_cache)
|
db.session.add(new_cache)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
|
@ -105,7 +105,9 @@ class TestMessageService(BaseTest):
|
||||||
)
|
)
|
||||||
|
|
||||||
# Now start the main process
|
# Now start the main process
|
||||||
self.start_sender_process(client, with_super_admin_user, "test_between_processes")
|
self.start_sender_process(
|
||||||
|
client, with_super_admin_user, "test_between_processes"
|
||||||
|
)
|
||||||
self.assure_a_message_was_sent()
|
self.assure_a_message_was_sent()
|
||||||
|
|
||||||
# This is typically called in a background cron process, so we will manually call it
|
# This is typically called in a background cron process, so we will manually call it
|
||||||
|
@ -145,7 +147,10 @@ class TestMessageService(BaseTest):
|
||||||
assert message_receiver_process.status == "complete"
|
assert message_receiver_process.status == "complete"
|
||||||
|
|
||||||
def start_sender_process(
|
def start_sender_process(
|
||||||
self, client: FlaskClient, with_super_admin_user: UserModel, group_name:str = "test_group"
|
self,
|
||||||
|
client: FlaskClient,
|
||||||
|
with_super_admin_user: UserModel,
|
||||||
|
group_name: str = "test_group",
|
||||||
) -> None:
|
) -> None:
|
||||||
process_group_id = group_name
|
process_group_id = group_name
|
||||||
self.create_process_group(
|
self.create_process_group(
|
||||||
|
@ -186,7 +191,7 @@ class TestMessageService(BaseTest):
|
||||||
assert len(send_messages) == 1
|
assert len(send_messages) == 1
|
||||||
send_message = send_messages[0]
|
send_message = send_messages[0]
|
||||||
assert (
|
assert (
|
||||||
send_message.payload == self.payload
|
send_message.payload == self.payload
|
||||||
), "The send message should match up with the payload"
|
), "The send message should match up with the payload"
|
||||||
assert send_message.name == "Request Approval"
|
assert send_message.name == "Request Approval"
|
||||||
assert send_message.status == "ready"
|
assert send_message.status == "ready"
|
||||||
|
@ -208,7 +213,9 @@ class TestMessageService(BaseTest):
|
||||||
) -> None:
|
) -> None:
|
||||||
# Correlation Properties should match up
|
# Correlation Properties should match up
|
||||||
po_curr = next(c for c in message.correlation_rules if c.name == "po_number")
|
po_curr = next(c for c in message.correlation_rules if c.name == "po_number")
|
||||||
customer_curr = next(c for c in message.correlation_rules if c.name == "customer_id")
|
customer_curr = next(
|
||||||
|
c for c in message.correlation_rules if c.name == "customer_id"
|
||||||
|
)
|
||||||
assert po_curr is not None
|
assert po_curr is not None
|
||||||
assert customer_curr is not None
|
assert customer_curr is not None
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue