# SpiffWorkflow:

1) Type Safe checking on correlation properties (no more str())
2) A running workflows Correlations are once again at the key level.

# Backend
1) Both send and receive messages can have correlation_keys - and we compare these to each other to quickly assure a match (if they both exist - otherwise we fall back to comparing the properties on the receive to the sending messages payload)
2) Cleaned up the migrations to just one file
This commit is contained in:
Dan 2023-02-24 14:53:22 -05:00
parent 0f02baf13e
commit d9ab0fde32
15 changed files with 144 additions and 231 deletions

View File

@ -165,7 +165,6 @@ class CorrelationProperty:
self.name = name # This is the property name
self.retrieval_expression = retrieval_expression # This is how it's generated
self.correlation_keys = correlation_keys # These are the keys it's used by
self.expected_value = expected_value # This is the (optional) expected value
class MessageEventDefinition(NamedEventDefinition):
"""The default message event."""
@ -206,34 +205,23 @@ class MessageEventDefinition(NamedEventDefinition):
if payload is not None:
my_task.set_data(**payload)
def get_correlation_keys(self, task, payload):
def get_correlations(self, task, payload):
correlation_keys = {}
for property in self.correlation_properties:
for key in property.correlation_keys:
if key not in correlation_keys:
correlation_keys[key] = {}
correlation_keys[key][property.name] = task.workflow.script_engine._evaluate(property.expression, payload)
try:
correlation_keys[key][property.name] = task.workflow.script_engine._evaluate(property.retrieval_expression, payload)
except WorkflowException as we:
we.add_note(
f"Failed to evaluate correlation property '{property.name}'"
f" invalid expression '{property.retrieval_expression}'")
we.task_spec = task.task_spec
raise we
return correlation_keys
def get_correlations(self, task, payload):
correlations = {}
for property in self.correlation_properties:
try:
correlations[property.name] = task.workflow.script_engine._evaluate(property.retrieval_expression, payload)
except WorkflowException as we:
we.add_note(f"Failed to evaluate correlation property '{property.name}'"
f" invalid expression '{property.retrieval_expression}'")
we.task_spec = task.task_spec
raise we
return correlations
def get_awaiting_correlations(self, task):
for prop in self.correlation_properties:
if prop.name in task.workflow.correlations:
prop.expected_value = task.workflow.correlations.get(prop.name)
else:
prop.expected_value = None
return self.correlation_properties
class NoneEventDefinition(EventDefinition):
"""

View File

@ -36,7 +36,7 @@ class CatchingEvent(Simple, BpmnSpecMixin):
def catches(self, my_task, event_definition, correlations=None):
if self.event_definition == event_definition:
return all([ str(correlations.get(key)) == str(my_task.workflow.correlations.get(key)) for key in correlations ])
return all([correlations.get(key) == my_task.workflow.correlations.get(key) for key in correlations ])
else:
return False

View File

@ -179,8 +179,7 @@ class BpmnWorkflow(Workflow):
if isinstance(event_definition, TimerEventDefinition):
value = event_definition.timer_value(task)
elif isinstance(event_definition, MessageEventDefinition):
value = event_definition.get_awaiting_correlations(task)
value = event_definition.correlation_properties
events.append({
'event_type': event_definition.event_type,
'name': event_definition.name if isinstance(event_definition, NamedEventDefinition) else None,

View File

@ -56,7 +56,6 @@ class CollaborationTest(BpmnWorkflowTestCase):
self.assertEqual(1, len(events))
self.assertEqual("Message", events[0]['event_type'])
self.assertEqual("Love Letter Response", events[0]['name'])
self.assertEqual('Peggy', events[0]['value'][0].expected_value)
self.assertEqual(['lover'], events[0]['value'][0].correlation_keys)
self.assertEqual('from_name', events[0]['value'][0].retrieval_expression)
self.assertEqual('lover_name', events[0]['value'][0].name)

View File

@ -1,35 +0,0 @@
"""empty message
Revision ID: 4d47598d7181
Revises: 4ab08bf12666
Create Date: 2023-02-23 16:12:11.603900
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '4d47598d7181'
down_revision = '4ab08bf12666'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('correlation_property_cache',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=50), nullable=False),
sa.Column('message_name', sa.String(length=50), nullable=False),
sa.Column('process_model_id', sa.String(length=255), nullable=False),
sa.Column('retrieval_expression', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('id')
)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('correlation_property_cache')
# ### end Alembic commands ###

View File

@ -1,8 +1,8 @@
"""empty message
Revision ID: 4ab08bf12666
Revises: b581ff2351ad
Create Date: 2023-02-23 07:25:54.135765
Revision ID: 9f0b1662a8af
Revises: 63fc8d693b9f
Create Date: 2023-02-24 14:30:05.970959
"""
from alembic import op
@ -10,19 +10,26 @@ import sqlalchemy as sa
from sqlalchemy.dialects import mysql
# revision identifiers, used by Alembic.
revision = '4ab08bf12666'
down_revision = 'b581ff2351ad'
revision = '9f0b1662a8af'
down_revision = '63fc8d693b9f'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('message_instance_correlation',
op.create_table('correlation_property_cache',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=50), nullable=False),
sa.Column('message_name', sa.String(length=50), nullable=False),
sa.Column('process_model_id', sa.String(length=255), nullable=False),
sa.Column('retrieval_expression', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_table('message_instance_correlation_rule',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('message_instance_id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=50), nullable=False),
sa.Column('expected_value', sa.String(length=255), nullable=True),
sa.Column('retrieval_expression', sa.String(length=255), nullable=True),
sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('created_at_in_seconds', sa.Integer(), nullable=True),
@ -30,26 +37,29 @@ def upgrade():
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('message_instance_id', 'name', name='message_instance_id_name_unique')
)
op.create_index(op.f('ix_message_instance_correlation_expected_value'), 'message_instance_correlation', ['expected_value'], unique=False)
op.create_index(op.f('ix_message_instance_correlation_message_instance_id'), 'message_instance_correlation', ['message_instance_id'], unique=False)
op.drop_index('ix_message_correlation_property_identifier', table_name='message_correlation_property')
op.drop_index('message_correlation_property_unique', table_name='message_correlation_property')
op.drop_constraint('message_correlation_ibfk_1', 'message_correlation', type_='foreignkey')
op.drop_table('message_correlation_property')
op.drop_index('ix_message_correlation_message_correlation_property_id', table_name='message_correlation')
op.create_index(op.f('ix_message_instance_correlation_rule_message_instance_id'), 'message_instance_correlation_rule', ['message_instance_id'], unique=False)
op.drop_index('ix_message_model_identifier', table_name='message_model')
op.drop_index('ix_message_model_name', table_name='message_model')
op.drop_constraint('message_correlation_property_ibfk_1', 'message_correlation_property', type_='foreignkey')
op.drop_constraint('message_triggerable_process_model_ibfk_1', 'message_triggerable_process_model', type_='foreignkey')
op.drop_constraint('message_instance_ibfk_1', 'message_instance', type_='foreignkey')
op.drop_table('message_model')
op.drop_constraint('message_correlation_message_instance_ibfk_1', 'message_correlation_message_instance', type_='foreignkey')
op.drop_index('ix_message_correlation_name', table_name='message_correlation')
op.drop_index('ix_message_correlation_process_instance_id', table_name='message_correlation')
op.drop_index('ix_message_correlation_value', table_name='message_correlation')
op.drop_constraint('message_correlation_message_instance_ibfk_1', 'message_correlation_message_instance', type_='foreignkey')
op.drop_constraint('message_triggerable_process_model_ibfk_1', 'message_triggerable_process_model', type_='foreignkey')
# op.drop_index('message_instance_id_name_unique', table_name='message_correlation')
# op.drop_index('ix_message_correlation_message_correlation_property_id', table_name='message_correlation')
op.drop_table('message_correlation')
op.drop_index('ix_message_model_identifier', table_name='message_model')
op.drop_index('ix_message_model_name', table_name='message_model')
op.drop_constraint('message_instance_ibfk_1', 'message_instance', type_='foreignkey')
op.drop_table('message_model')
op.drop_index('ix_message_correlation_message_instance_message_correlation_id', table_name='message_correlation_message_instance')
op.drop_index('ix_message_correlation_message_instance_message_instance_id', table_name='message_correlation_message_instance')
# op.drop_index('message_correlation_message_instance_unique', table_name='message_correlation_message_instance')
op.drop_table('message_correlation_message_instance')
op.drop_index('ix_message_correlation_property_identifier', table_name='message_correlation_property')
op.drop_index('message_correlation_property_unique', table_name='message_correlation_property')
op.drop_table('message_correlation_property')
op.add_column('message_instance', sa.Column('name', sa.String(length=255), nullable=True))
op.add_column('message_instance', sa.Column('correlation_keys', sa.JSON(), nullable=True))
op.add_column('message_instance', sa.Column('user_id', sa.Integer(), nullable=False))
op.add_column('message_instance', sa.Column('counterpart_id', sa.Integer(), nullable=True))
op.alter_column('message_instance', 'process_instance_id',
@ -66,9 +76,8 @@ def upgrade():
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('message_triggerable_process_model', sa.Column('message_model_id', mysql.INTEGER(), autoincrement=False, nullable=False))
op.create_foreign_key('message_triggerable_process_model_ibfk_1', 'message_triggerable_process_model', 'message_model', ['message_model_id'], ['id'])
op.create_index('message_model_id', 'message_triggerable_process_model', ['message_model_id'], unique=False)
op.drop_column('message_triggerable_process_model', 'name')
op.drop_column('message_triggerable_process_model', 'message_name')
op.add_column('message_instance', sa.Column('message_model_id', mysql.INTEGER(), autoincrement=False, nullable=False))
op.drop_constraint(None, 'message_instance', type_='foreignkey')
op.create_foreign_key('message_instance_ibfk_1', 'message_instance', 'message_model', ['message_model_id'], ['id'])
@ -77,28 +86,36 @@ def downgrade():
nullable=False)
op.drop_column('message_instance', 'counterpart_id')
op.drop_column('message_instance', 'user_id')
op.drop_column('message_instance', 'correlation_keys')
op.drop_column('message_instance', 'name')
op.create_table('message_correlation_message_instance',
sa.Column('message_instance_id', mysql.INTEGER(), autoincrement=False, nullable=False),
sa.Column('message_correlation_id', mysql.INTEGER(), autoincrement=False, nullable=False),
sa.ForeignKeyConstraint(['message_correlation_id'], ['message_correlation.id'], name='message_correlation_message_instance_ibfk_1'),
sa.ForeignKeyConstraint(['message_instance_id'], ['message_instance.id'], name='message_correlation_message_instance_ibfk_2'),
sa.PrimaryKeyConstraint('message_instance_id', 'message_correlation_id'),
mysql_collate='utf8mb4_0900_ai_ci',
mysql_default_charset='utf8mb4',
mysql_engine='InnoDB'
)
op.create_table('message_model',
op.create_table('message_correlation_property',
sa.Column('id', mysql.INTEGER(), autoincrement=True, nullable=False),
sa.Column('identifier', mysql.VARCHAR(length=50), nullable=True),
sa.Column('name', mysql.VARCHAR(length=50), nullable=True),
sa.Column('message_model_id', mysql.INTEGER(), autoincrement=False, nullable=False),
sa.Column('updated_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True),
sa.Column('created_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True),
sa.ForeignKeyConstraint(['message_model_id'], ['message_model.id'], name='message_correlation_property_ibfk_1'),
sa.PrimaryKeyConstraint('id'),
mysql_collate='utf8mb4_0900_ai_ci',
mysql_default_charset='utf8mb4',
mysql_engine='InnoDB'
)
op.create_index('ix_message_model_name', 'message_model', ['name'], unique=False)
op.create_index('ix_message_model_identifier', 'message_model', ['identifier'], unique=False)
op.create_index('message_correlation_property_unique', 'message_correlation_property', ['identifier', 'message_model_id'], unique=False)
op.create_index('ix_message_correlation_property_identifier', 'message_correlation_property', ['identifier'], unique=False)
op.create_table('message_correlation_message_instance',
sa.Column('id', mysql.INTEGER(), autoincrement=True, nullable=False),
sa.Column('message_instance_id', mysql.INTEGER(), autoincrement=False, nullable=False),
sa.Column('message_correlation_id', mysql.INTEGER(), autoincrement=False, nullable=False),
sa.ForeignKeyConstraint(['message_correlation_id'], ['message_correlation.id'], name='message_correlation_message_instance_ibfk_1'),
sa.ForeignKeyConstraint(['message_instance_id'], ['message_instance.id'], name='message_correlation_message_instance_ibfk_2'),
sa.PrimaryKeyConstraint('id'),
mysql_collate='utf8mb4_0900_ai_ci',
mysql_default_charset='utf8mb4',
mysql_engine='InnoDB'
)
op.create_index('message_correlation_message_instance_unique', 'message_correlation_message_instance', ['message_instance_id', 'message_correlation_id'], unique=False)
op.create_index('ix_message_correlation_message_instance_message_instance_id', 'message_correlation_message_instance', ['message_instance_id'], unique=False)
op.create_index('ix_message_correlation_message_instance_message_correlation_id', 'message_correlation_message_instance', ['message_correlation_id'], unique=False)
op.create_table('message_correlation',
sa.Column('id', mysql.INTEGER(), autoincrement=True, nullable=False),
sa.Column('process_instance_id', mysql.INTEGER(), autoincrement=False, nullable=False),
@ -119,21 +136,18 @@ def downgrade():
op.create_index('ix_message_correlation_process_instance_id', 'message_correlation', ['process_instance_id'], unique=False)
op.create_index('ix_message_correlation_name', 'message_correlation', ['name'], unique=False)
op.create_index('ix_message_correlation_message_correlation_property_id', 'message_correlation', ['message_correlation_property_id'], unique=False)
op.create_table('message_correlation_property',
op.create_table('message_model',
sa.Column('id', mysql.INTEGER(), autoincrement=True, nullable=False),
sa.Column('identifier', mysql.VARCHAR(length=50), nullable=True),
sa.Column('message_model_id', mysql.INTEGER(), autoincrement=False, nullable=False),
sa.Column('updated_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True),
sa.Column('created_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True),
sa.ForeignKeyConstraint(['message_model_id'], ['message_model.id'], name='message_correlation_property_ibfk_1'),
sa.Column('name', mysql.VARCHAR(length=50), nullable=True),
sa.PrimaryKeyConstraint('id'),
mysql_collate='utf8mb4_0900_ai_ci',
mysql_default_charset='utf8mb4',
mysql_engine='InnoDB'
)
op.create_index('message_correlation_property_unique', 'message_correlation_property', ['identifier', 'message_model_id'], unique=False)
op.create_index('ix_message_correlation_property_identifier', 'message_correlation_property', ['identifier'], unique=False)
op.drop_index(op.f('ix_message_instance_correlation_message_instance_id'), table_name='message_instance_correlation')
op.drop_index(op.f('ix_message_instance_correlation_expected_value'), table_name='message_instance_correlation')
op.drop_table('message_instance_correlation')
op.create_index('ix_message_model_name', 'message_model', ['name'], unique=False)
op.create_index('ix_message_model_identifier', 'message_model', ['identifier'], unique=False)
op.drop_index(op.f('ix_message_instance_correlation_rule_message_instance_id'), table_name='message_instance_correlation_rule')
op.drop_table('message_instance_correlation_rule')
op.drop_table('correlation_property_cache')
# ### end Alembic commands ###

View File

@ -1,33 +0,0 @@
"""empty message
Revision ID: ac40af4ddef3
Revises: 63fc8d693b9f
Create Date: 2023-02-16 14:54:14.533029
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
# revision identifiers, used by Alembic.
revision = 'ac40af4ddef3'
down_revision = '63fc8d693b9f'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index('message_correlation_message_instance_unique', table_name='message_correlation_message_instance')
op.drop_column('message_correlation_message_instance', 'id')
op.create_primary_key('mcmi_pirmary_key','message_correlation_message_instance', ['message_instance_id', 'message_correlation_id'])
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('message_correlation_message_instance', sa.Column('id', mysql.INTEGER(), autoincrement=False, nullable=False))
op.create_index('message_correlation_message_instance_unique', 'message_correlation_message_instance', ['message_instance_id', 'message_correlation_id'], unique=False)
op.create_index('ix_message_correlation_message_instance_message_instance_id', 'message_correlation_message_instance', ['message_instance_id'], unique=False)
op.create_index('ix_message_correlation_message_instance_message_correlation_id', 'message_correlation_message_instance', ['message_correlation_id'], unique=False)
# ### end Alembic commands ###

View File

@ -1,35 +0,0 @@
"""empty message
Revision ID: b581ff2351ad
Revises: ac40af4ddef3
Create Date: 2023-02-19 11:19:24.371528
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'b581ff2351ad'
down_revision = 'ac40af4ddef3'
branch_labels = None
depends_on = None
def upgrade():
# Just some table cleanup so the join table is properly built
op.drop_constraint('message_correlation_message_instance_ibfk_1', 'message_correlation_message_instance', type_="foreignkey")
op.drop_constraint('message_correlation_message_instance_ibfk_2', 'message_correlation_message_instance', type_="foreignkey")
op.drop_index('ix_message_correlation_message_instance_message_correlation_id', table_name='message_correlation_message_instance')
op.drop_index('ix_message_correlation_message_instance_message_instance_id', table_name='message_correlation_message_instance')
op.create_foreign_key(None, 'message_correlation_message_instance', 'message_correlation', ['message_correlation_id'], ['id'])
op.create_foreign_key(None, 'message_correlation_message_instance', 'message_instance', ['message_instance_id'], ['id'])
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_message_correlation_property_correlation_key'), table_name='message_correlation_property')
op.create_index('ix_message_correlation_message_instance_message_instance_id', 'message_correlation_message_instance', ['message_instance_id'], unique=False)
op.create_index('ix_message_correlation_message_instance_message_correlation_id', 'message_correlation_message_instance', ['message_correlation_id'], unique=False)
# ### end Alembic commands ###

View File

@ -19,7 +19,7 @@ from spiffworkflow_backend.models.user import UserModel
if TYPE_CHECKING:
from spiffworkflow_backend.models.message_instance_correlation import ( # noqa: F401
MessageInstanceCorrelationModel,
MessageInstanceCorrelationRuleModel,
)
@ -49,7 +49,9 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
process_instance_id: int = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=True) # type: ignore
name: str = db.Column(db.String(255))
message_type: str = db.Column(db.String(20), nullable=False)
# Only Send Messages have a payload
payload: dict = db.Column(db.JSON)
correlation_keys: dict = db.Column(db.JSON) # The correlation keys of the process at the time the message was created.
status: str = db.Column(db.String(20), nullable=False, default="ready")
user_id: int = db.Column(ForeignKey(UserModel.id), nullable=False) # type: ignore
user = relationship("UserModel")
@ -59,8 +61,8 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
failure_cause: str = db.Column(db.Text())
updated_at_in_seconds: int = db.Column(db.Integer)
created_at_in_seconds: int = db.Column(db.Integer)
correlations = relationship(
"MessageInstanceCorrelationModel", back_populates="message_instance"
correlation_rules = relationship(
"MessageInstanceCorrelationRuleModel", back_populates="message_instance"
)
@validates("message_type")
@ -74,40 +76,59 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
return self.validate_enum_field(key, value, MessageStatuses)
def correlates(
self, other_message_instance: Any, expression_engine: PythonScriptEngine
self, other: Any, expression_engine: PythonScriptEngine
) -> bool:
# This must be a receive message, and the other must be a send (otherwise we reverse the call)
# We evaluate the other messages payload and run our correlation's
# retrieval expressions against it, then compare it against our
# expected values -- IF we don't have an expected value, we accept
# any non-erroring result from the retrieval expression.
if self.name != other_message_instance.name:
""" Returns true if the this Message correlates with the given message.
This must be a 'receive' message, and the other must be a 'send' or vice/versa. We evaluate
the other messages payload and run our correlation's retrieval expressions against it, then
compare it against our expected values (as stored in this messages' correlation_keys)
IF we don't have an expected value, we accept any non-error result from the retrieval
expression. """
if self.is_send() and other.is_receive():
# Flip the call.
return other.correlates(self, expression_engine) # type: ignore
if self.name != other.name:
return False
if self.message_type == MessageTypes.receive.value:
if other_message_instance.message_type != MessageTypes.send.value:
return False
payload = other_message_instance.payload
for corr in self.correlations:
try:
result = expression_engine._evaluate(
corr.retrieval_expression, payload
)
except Exception:
# the failure of a payload evaluation may not mean that matches for these
# message instances can't happen with other messages. So don't error up.
# fixme: Perhaps log some sort of error.
return False
if corr.expected_value is None:
continue # We will accept any value
elif corr.expected_value != str(
result
): # fixme: Don't require conversion to string
return False
if not self.is_receive():
return False
if isinstance(self.correlation_keys, dict) and self.correlation_keys == other.correlation_keys:
# We know we have a match, and we can just return if we don't have to figure out the key
return True
elif other_message_instance.message_type == MessageTypes.receive.value:
return other_message_instance.correlates(self, expression_engine) # type: ignore
# Loop over the receives' correlation keys - if any of the keys fully match, then we match.
for expected_values in self.correlation_keys.values():
if self.payload_matches_expected_values(other.payload, expected_values, expression_engine):
return True
return False
def is_receive(self):
return self.message_type == MessageTypes.receive.value
def is_send(self):
return self.message_type == MessageTypes.send.value
def payload_matches_expected_values(
self, payload: dict,
expected_values: dict,
expression_engine: PythonScriptEngine) -> bool:
"""Compares the payload of a 'send' message against a single correlation key's expected values."""
for correlation_key in self.correlation_rules:
expected_value = expected_values.get(correlation_key.name, None)
if expected_value is None: # This key is not required for this instance to match.
continue
try:
result = expression_engine._evaluate(correlation_key.retrieval_expression, payload)
except Exception:
# the failure of a payload evaluation may not mean that matches for these
# message instances can't happen with other messages. So don't error up.
# fixme: Perhaps log some sort of error.
return False
if result != expected_value:
return False
return True
# This runs for ALL db flushes for ANY model, not just this one even if it's in the MessageInstanceModel class
# so this may not be worth it or there may be a better way to do it

View File

@ -10,16 +10,16 @@ from spiffworkflow_backend.models.message_instance import MessageInstanceModel
@dataclass
class MessageInstanceCorrelationModel(SpiffworkflowBaseDBModel):
class MessageInstanceCorrelationRuleModel(SpiffworkflowBaseDBModel):
"""These are the correlations of a specific Message Instance.
These will only exist on receive messages. It provides the expression to run on
a send messages payload which must match the expected value to be considered
a valid match. If the expected value is null, then it does not need to
a send messages payload which must match receive messages correlation_key dictionary
to be considered a valid match. If the expected value is null, then it does not need to
match, but the expression should still evaluate and produce a result.
"""
__tablename__ = "message_instance_correlation"
__tablename__ = "message_instance_correlation_rule"
__table_args__ = (
db.UniqueConstraint(
"message_instance_id",
@ -33,10 +33,9 @@ class MessageInstanceCorrelationModel(SpiffworkflowBaseDBModel):
ForeignKey(MessageInstanceModel.id), nullable=False, index=True # type: ignore
)
name: str = db.Column(db.String(50), nullable=False)
expected_value: str = db.Column(db.String(255), nullable=True, index=True)
retrieval_expression: str = db.Column(db.String(255))
updated_at_in_seconds: int = db.Column(db.Integer)
created_at_in_seconds: int = db.Column(db.Integer)
message_instance = relationship(
"MessageInstanceModel", back_populates="correlations"
"MessageInstanceModel", back_populates="correlation_rules"
)

View File

@ -86,7 +86,6 @@ def message_send(
name=message_name,
payload=body["payload"],
user_id=g.user.id,
correlations=[],
)
db.session.add(message_instance)
db.session.commit()

View File

@ -94,8 +94,7 @@ class ErrorHandlingService:
message_type="send",
name=message_name,
payload=message_payload,
user_id=g.user.id,
correlations=[],
user_id=g.user.id
)
db.session.add(message_instance)
db.session.commit()

View File

@ -62,7 +62,7 @@ from spiffworkflow_backend.models.human_task import HumanTaskModel
from spiffworkflow_backend.models.human_task_user import HumanTaskUserModel
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_instance_correlation import (
MessageInstanceCorrelationModel,
MessageInstanceCorrelationRuleModel
)
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
@ -1352,7 +1352,7 @@ class ProcessInstanceProcessor:
message_type="send",
name=bpmn_message.name,
payload=bpmn_message.payload,
correlations=[],
correlation_keys=self.bpmn_process_instance.correlations
)
db.session.add(message_instance)
db.session.commit()
@ -1382,15 +1382,15 @@ class ProcessInstanceProcessor:
user_id=self.process_instance_model.process_initiator_id,
message_type="receive",
name=event["name"],
correlation_keys=self.bpmn_process_instance.correlations
)
for correlation_property in event["value"]:
message_correlation = MessageInstanceCorrelationModel(
message_correlation = MessageInstanceCorrelationRuleModel(
message_instance_id=message_instance.id,
name=correlation_property.name,
expected_value=correlation_property.expected_value,
retrieval_expression=correlation_property.retrieval_expression,
)
message_instance.correlations.append(message_correlation)
message_instance.correlation_rules.append(message_correlation)
db.session.add(message_instance)
db.session.commit()

View File

@ -54,7 +54,7 @@ class TestMessageInstance(BaseTest):
user_id=process_instance.process_initiator_id,
message_type="send",
name=message_name,
payload={"Word": "Eat At Mashita's, delicious!"},
payload={"Word": "Eat At Mashita's, its delicious!"},
)
db.session.add(queued_message)
db.session.commit()

View File

@ -43,7 +43,7 @@ class TestMessageService(BaseTest):
"amount": "100.00",
}
self.start_sender_process(client, with_super_admin_user)
self.start_sender_process(client, with_super_admin_user, "test_from_api")
self.assure_a_message_was_sent()
self.assure_there_is_a_process_waiting_on_a_message()
@ -105,7 +105,7 @@ class TestMessageService(BaseTest):
)
# Now start the main process
self.start_sender_process(client, with_super_admin_user)
self.start_sender_process(client, with_super_admin_user, "test_between_processes")
self.assure_a_message_was_sent()
# This is typically called in a background cron process, so we will manually call it
@ -145,9 +145,9 @@ class TestMessageService(BaseTest):
assert message_receiver_process.status == "complete"
def start_sender_process(
self, client: FlaskClient, with_super_admin_user: UserModel
self, client: FlaskClient, with_super_admin_user: UserModel, group_name:str = "test_group"
) -> None:
process_group_id = "test_group"
process_group_id = group_name
self.create_process_group(
client, with_super_admin_user, process_group_id, process_group_id
)
@ -186,7 +186,7 @@ class TestMessageService(BaseTest):
assert len(send_messages) == 1
send_message = send_messages[0]
assert (
send_message.payload == self.payload
send_message.payload == self.payload
), "The send message should match up with the payload"
assert send_message.name == "Request Approval"
assert send_message.status == "ready"
@ -207,12 +207,10 @@ class TestMessageService(BaseTest):
self, message: MessageInstanceModel
) -> None:
# Correlation Properties should match up
po_curr = next(c for c in message.correlations if c.name == "po_number")
customer_curr = next(c for c in message.correlations if c.name == "customer_id")
po_curr = next(c for c in message.correlation_rules if c.name == "po_number")
customer_curr = next(c for c in message.correlation_rules if c.name == "customer_id")
assert po_curr is not None
assert customer_curr is not None
assert po_curr.expected_value == "1001"
assert customer_curr.expected_value == "Sartography"
def test_can_send_message_to_multiple_process_models(
self,