# SpiffWorkflow:

1) Type Safe checking on correlation properties (no more str())
2) A running workflows Correlations are once again at the key level.

# Backend
1) Both send and receive messages can have correlation_keys - and we compare these to each other to quickly assure a match (if they both exist - otherwise we fall back to comparing the properties on the receive to the sending messages payload)
2) Cleaned up the migrations to just one file
This commit is contained in:
Dan 2023-02-24 14:53:22 -05:00
parent 5a3b4b8792
commit 0dc2bc3316
11 changed files with 133 additions and 206 deletions

View File

@ -1,35 +0,0 @@
"""empty message
Revision ID: 4d47598d7181
Revises: 4ab08bf12666
Create Date: 2023-02-23 16:12:11.603900
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '4d47598d7181'
down_revision = '4ab08bf12666'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('correlation_property_cache',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=50), nullable=False),
sa.Column('message_name', sa.String(length=50), nullable=False),
sa.Column('process_model_id', sa.String(length=255), nullable=False),
sa.Column('retrieval_expression', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('id')
)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('correlation_property_cache')
# ### end Alembic commands ###

View File

@ -1,8 +1,8 @@
"""empty message """empty message
Revision ID: 4ab08bf12666 Revision ID: 9f0b1662a8af
Revises: b581ff2351ad Revises: 63fc8d693b9f
Create Date: 2023-02-23 07:25:54.135765 Create Date: 2023-02-24 14:30:05.970959
""" """
from alembic import op from alembic import op
@ -10,19 +10,26 @@ import sqlalchemy as sa
from sqlalchemy.dialects import mysql from sqlalchemy.dialects import mysql
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision = '4ab08bf12666' revision = '9f0b1662a8af'
down_revision = 'b581ff2351ad' down_revision = '63fc8d693b9f'
branch_labels = None branch_labels = None
depends_on = None depends_on = None
def upgrade(): def upgrade():
# ### commands auto generated by Alembic - please adjust! ### # ### commands auto generated by Alembic - please adjust! ###
op.create_table('message_instance_correlation', op.create_table('correlation_property_cache',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=50), nullable=False),
sa.Column('message_name', sa.String(length=50), nullable=False),
sa.Column('process_model_id', sa.String(length=255), nullable=False),
sa.Column('retrieval_expression', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_table('message_instance_correlation_rule',
sa.Column('id', sa.Integer(), nullable=False), sa.Column('id', sa.Integer(), nullable=False),
sa.Column('message_instance_id', sa.Integer(), nullable=False), sa.Column('message_instance_id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=50), nullable=False), sa.Column('name', sa.String(length=50), nullable=False),
sa.Column('expected_value', sa.String(length=255), nullable=True),
sa.Column('retrieval_expression', sa.String(length=255), nullable=True), sa.Column('retrieval_expression', sa.String(length=255), nullable=True),
sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True), sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('created_at_in_seconds', sa.Integer(), nullable=True), sa.Column('created_at_in_seconds', sa.Integer(), nullable=True),
@ -30,26 +37,29 @@ def upgrade():
sa.PrimaryKeyConstraint('id'), sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('message_instance_id', 'name', name='message_instance_id_name_unique') sa.UniqueConstraint('message_instance_id', 'name', name='message_instance_id_name_unique')
) )
op.create_index(op.f('ix_message_instance_correlation_expected_value'), 'message_instance_correlation', ['expected_value'], unique=False) op.create_index(op.f('ix_message_instance_correlation_rule_message_instance_id'), 'message_instance_correlation_rule', ['message_instance_id'], unique=False)
op.create_index(op.f('ix_message_instance_correlation_message_instance_id'), 'message_instance_correlation', ['message_instance_id'], unique=False) op.drop_index('ix_message_model_identifier', table_name='message_model')
op.drop_index('ix_message_model_name', table_name='message_model')
op.drop_index('ix_message_correlation_property_identifier', table_name='message_correlation_property') op.drop_constraint('message_correlation_property_ibfk_1', 'message_correlation_property', type_='foreignkey')
op.drop_index('message_correlation_property_unique', table_name='message_correlation_property') op.drop_constraint('message_triggerable_process_model_ibfk_1', 'message_triggerable_process_model', type_='foreignkey')
op.drop_constraint('message_correlation_ibfk_1', 'message_correlation', type_='foreignkey') op.drop_constraint('message_instance_ibfk_1', 'message_instance', type_='foreignkey')
op.drop_table('message_correlation_property') op.drop_table('message_model')
op.drop_index('ix_message_correlation_message_correlation_property_id', table_name='message_correlation') op.drop_constraint('message_correlation_message_instance_ibfk_1', 'message_correlation_message_instance', type_='foreignkey')
op.drop_index('ix_message_correlation_name', table_name='message_correlation') op.drop_index('ix_message_correlation_name', table_name='message_correlation')
op.drop_index('ix_message_correlation_process_instance_id', table_name='message_correlation') op.drop_index('ix_message_correlation_process_instance_id', table_name='message_correlation')
op.drop_index('ix_message_correlation_value', table_name='message_correlation') op.drop_index('ix_message_correlation_value', table_name='message_correlation')
op.drop_constraint('message_correlation_message_instance_ibfk_1', 'message_correlation_message_instance', type_='foreignkey') # op.drop_index('message_instance_id_name_unique', table_name='message_correlation')
op.drop_constraint('message_triggerable_process_model_ibfk_1', 'message_triggerable_process_model', type_='foreignkey') # op.drop_index('ix_message_correlation_message_correlation_property_id', table_name='message_correlation')
op.drop_table('message_correlation') op.drop_table('message_correlation')
op.drop_index('ix_message_model_identifier', table_name='message_model') op.drop_index('ix_message_correlation_message_instance_message_correlation_id', table_name='message_correlation_message_instance')
op.drop_index('ix_message_model_name', table_name='message_model') op.drop_index('ix_message_correlation_message_instance_message_instance_id', table_name='message_correlation_message_instance')
op.drop_constraint('message_instance_ibfk_1', 'message_instance', type_='foreignkey') # op.drop_index('message_correlation_message_instance_unique', table_name='message_correlation_message_instance')
op.drop_table('message_model')
op.drop_table('message_correlation_message_instance') op.drop_table('message_correlation_message_instance')
op.drop_index('ix_message_correlation_property_identifier', table_name='message_correlation_property')
op.drop_index('message_correlation_property_unique', table_name='message_correlation_property')
op.drop_table('message_correlation_property')
op.add_column('message_instance', sa.Column('name', sa.String(length=255), nullable=True)) op.add_column('message_instance', sa.Column('name', sa.String(length=255), nullable=True))
op.add_column('message_instance', sa.Column('correlation_keys', sa.JSON(), nullable=True))
op.add_column('message_instance', sa.Column('user_id', sa.Integer(), nullable=False)) op.add_column('message_instance', sa.Column('user_id', sa.Integer(), nullable=False))
op.add_column('message_instance', sa.Column('counterpart_id', sa.Integer(), nullable=True)) op.add_column('message_instance', sa.Column('counterpart_id', sa.Integer(), nullable=True))
op.alter_column('message_instance', 'process_instance_id', op.alter_column('message_instance', 'process_instance_id',
@ -66,9 +76,8 @@ def upgrade():
def downgrade(): def downgrade():
# ### commands auto generated by Alembic - please adjust! ### # ### commands auto generated by Alembic - please adjust! ###
op.add_column('message_triggerable_process_model', sa.Column('message_model_id', mysql.INTEGER(), autoincrement=False, nullable=False)) op.add_column('message_triggerable_process_model', sa.Column('message_model_id', mysql.INTEGER(), autoincrement=False, nullable=False))
op.create_foreign_key('message_triggerable_process_model_ibfk_1', 'message_triggerable_process_model', 'message_model', ['message_model_id'], ['id'])
op.create_index('message_model_id', 'message_triggerable_process_model', ['message_model_id'], unique=False) op.create_index('message_model_id', 'message_triggerable_process_model', ['message_model_id'], unique=False)
op.drop_column('message_triggerable_process_model', 'name') op.drop_column('message_triggerable_process_model', 'message_name')
op.add_column('message_instance', sa.Column('message_model_id', mysql.INTEGER(), autoincrement=False, nullable=False)) op.add_column('message_instance', sa.Column('message_model_id', mysql.INTEGER(), autoincrement=False, nullable=False))
op.drop_constraint(None, 'message_instance', type_='foreignkey') op.drop_constraint(None, 'message_instance', type_='foreignkey')
op.create_foreign_key('message_instance_ibfk_1', 'message_instance', 'message_model', ['message_model_id'], ['id']) op.create_foreign_key('message_instance_ibfk_1', 'message_instance', 'message_model', ['message_model_id'], ['id'])
@ -77,28 +86,36 @@ def downgrade():
nullable=False) nullable=False)
op.drop_column('message_instance', 'counterpart_id') op.drop_column('message_instance', 'counterpart_id')
op.drop_column('message_instance', 'user_id') op.drop_column('message_instance', 'user_id')
op.drop_column('message_instance', 'correlation_keys')
op.drop_column('message_instance', 'name') op.drop_column('message_instance', 'name')
op.create_table('message_correlation_message_instance', op.create_table('message_correlation_property',
sa.Column('message_instance_id', mysql.INTEGER(), autoincrement=False, nullable=False),
sa.Column('message_correlation_id', mysql.INTEGER(), autoincrement=False, nullable=False),
sa.ForeignKeyConstraint(['message_correlation_id'], ['message_correlation.id'], name='message_correlation_message_instance_ibfk_1'),
sa.ForeignKeyConstraint(['message_instance_id'], ['message_instance.id'], name='message_correlation_message_instance_ibfk_2'),
sa.PrimaryKeyConstraint('message_instance_id', 'message_correlation_id'),
mysql_collate='utf8mb4_0900_ai_ci',
mysql_default_charset='utf8mb4',
mysql_engine='InnoDB'
)
op.create_table('message_model',
sa.Column('id', mysql.INTEGER(), autoincrement=True, nullable=False), sa.Column('id', mysql.INTEGER(), autoincrement=True, nullable=False),
sa.Column('identifier', mysql.VARCHAR(length=50), nullable=True), sa.Column('identifier', mysql.VARCHAR(length=50), nullable=True),
sa.Column('name', mysql.VARCHAR(length=50), nullable=True), sa.Column('message_model_id', mysql.INTEGER(), autoincrement=False, nullable=False),
sa.Column('updated_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True),
sa.Column('created_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True),
sa.ForeignKeyConstraint(['message_model_id'], ['message_model.id'], name='message_correlation_property_ibfk_1'),
sa.PrimaryKeyConstraint('id'), sa.PrimaryKeyConstraint('id'),
mysql_collate='utf8mb4_0900_ai_ci', mysql_collate='utf8mb4_0900_ai_ci',
mysql_default_charset='utf8mb4', mysql_default_charset='utf8mb4',
mysql_engine='InnoDB' mysql_engine='InnoDB'
) )
op.create_index('ix_message_model_name', 'message_model', ['name'], unique=False) op.create_index('message_correlation_property_unique', 'message_correlation_property', ['identifier', 'message_model_id'], unique=False)
op.create_index('ix_message_model_identifier', 'message_model', ['identifier'], unique=False) op.create_index('ix_message_correlation_property_identifier', 'message_correlation_property', ['identifier'], unique=False)
op.create_table('message_correlation_message_instance',
sa.Column('id', mysql.INTEGER(), autoincrement=True, nullable=False),
sa.Column('message_instance_id', mysql.INTEGER(), autoincrement=False, nullable=False),
sa.Column('message_correlation_id', mysql.INTEGER(), autoincrement=False, nullable=False),
sa.ForeignKeyConstraint(['message_correlation_id'], ['message_correlation.id'], name='message_correlation_message_instance_ibfk_1'),
sa.ForeignKeyConstraint(['message_instance_id'], ['message_instance.id'], name='message_correlation_message_instance_ibfk_2'),
sa.PrimaryKeyConstraint('id'),
mysql_collate='utf8mb4_0900_ai_ci',
mysql_default_charset='utf8mb4',
mysql_engine='InnoDB'
)
op.create_index('message_correlation_message_instance_unique', 'message_correlation_message_instance', ['message_instance_id', 'message_correlation_id'], unique=False)
op.create_index('ix_message_correlation_message_instance_message_instance_id', 'message_correlation_message_instance', ['message_instance_id'], unique=False)
op.create_index('ix_message_correlation_message_instance_message_correlation_id', 'message_correlation_message_instance', ['message_correlation_id'], unique=False)
op.create_table('message_correlation', op.create_table('message_correlation',
sa.Column('id', mysql.INTEGER(), autoincrement=True, nullable=False), sa.Column('id', mysql.INTEGER(), autoincrement=True, nullable=False),
sa.Column('process_instance_id', mysql.INTEGER(), autoincrement=False, nullable=False), sa.Column('process_instance_id', mysql.INTEGER(), autoincrement=False, nullable=False),
@ -119,21 +136,18 @@ def downgrade():
op.create_index('ix_message_correlation_process_instance_id', 'message_correlation', ['process_instance_id'], unique=False) op.create_index('ix_message_correlation_process_instance_id', 'message_correlation', ['process_instance_id'], unique=False)
op.create_index('ix_message_correlation_name', 'message_correlation', ['name'], unique=False) op.create_index('ix_message_correlation_name', 'message_correlation', ['name'], unique=False)
op.create_index('ix_message_correlation_message_correlation_property_id', 'message_correlation', ['message_correlation_property_id'], unique=False) op.create_index('ix_message_correlation_message_correlation_property_id', 'message_correlation', ['message_correlation_property_id'], unique=False)
op.create_table('message_correlation_property', op.create_table('message_model',
sa.Column('id', mysql.INTEGER(), autoincrement=True, nullable=False), sa.Column('id', mysql.INTEGER(), autoincrement=True, nullable=False),
sa.Column('identifier', mysql.VARCHAR(length=50), nullable=True), sa.Column('identifier', mysql.VARCHAR(length=50), nullable=True),
sa.Column('message_model_id', mysql.INTEGER(), autoincrement=False, nullable=False), sa.Column('name', mysql.VARCHAR(length=50), nullable=True),
sa.Column('updated_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True),
sa.Column('created_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True),
sa.ForeignKeyConstraint(['message_model_id'], ['message_model.id'], name='message_correlation_property_ibfk_1'),
sa.PrimaryKeyConstraint('id'), sa.PrimaryKeyConstraint('id'),
mysql_collate='utf8mb4_0900_ai_ci', mysql_collate='utf8mb4_0900_ai_ci',
mysql_default_charset='utf8mb4', mysql_default_charset='utf8mb4',
mysql_engine='InnoDB' mysql_engine='InnoDB'
) )
op.create_index('message_correlation_property_unique', 'message_correlation_property', ['identifier', 'message_model_id'], unique=False) op.create_index('ix_message_model_name', 'message_model', ['name'], unique=False)
op.create_index('ix_message_correlation_property_identifier', 'message_correlation_property', ['identifier'], unique=False) op.create_index('ix_message_model_identifier', 'message_model', ['identifier'], unique=False)
op.drop_index(op.f('ix_message_instance_correlation_message_instance_id'), table_name='message_instance_correlation') op.drop_index(op.f('ix_message_instance_correlation_rule_message_instance_id'), table_name='message_instance_correlation_rule')
op.drop_index(op.f('ix_message_instance_correlation_expected_value'), table_name='message_instance_correlation') op.drop_table('message_instance_correlation_rule')
op.drop_table('message_instance_correlation') op.drop_table('correlation_property_cache')
# ### end Alembic commands ### # ### end Alembic commands ###

View File

@ -1,33 +0,0 @@
"""empty message
Revision ID: ac40af4ddef3
Revises: 63fc8d693b9f
Create Date: 2023-02-16 14:54:14.533029
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
# revision identifiers, used by Alembic.
revision = 'ac40af4ddef3'
down_revision = '63fc8d693b9f'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index('message_correlation_message_instance_unique', table_name='message_correlation_message_instance')
op.drop_column('message_correlation_message_instance', 'id')
op.create_primary_key('mcmi_pirmary_key','message_correlation_message_instance', ['message_instance_id', 'message_correlation_id'])
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('message_correlation_message_instance', sa.Column('id', mysql.INTEGER(), autoincrement=False, nullable=False))
op.create_index('message_correlation_message_instance_unique', 'message_correlation_message_instance', ['message_instance_id', 'message_correlation_id'], unique=False)
op.create_index('ix_message_correlation_message_instance_message_instance_id', 'message_correlation_message_instance', ['message_instance_id'], unique=False)
op.create_index('ix_message_correlation_message_instance_message_correlation_id', 'message_correlation_message_instance', ['message_correlation_id'], unique=False)
# ### end Alembic commands ###

View File

@ -1,35 +0,0 @@
"""empty message
Revision ID: b581ff2351ad
Revises: ac40af4ddef3
Create Date: 2023-02-19 11:19:24.371528
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'b581ff2351ad'
down_revision = 'ac40af4ddef3'
branch_labels = None
depends_on = None
def upgrade():
# Just some table cleanup so the join table is properly built
op.drop_constraint('message_correlation_message_instance_ibfk_1', 'message_correlation_message_instance', type_="foreignkey")
op.drop_constraint('message_correlation_message_instance_ibfk_2', 'message_correlation_message_instance', type_="foreignkey")
op.drop_index('ix_message_correlation_message_instance_message_correlation_id', table_name='message_correlation_message_instance')
op.drop_index('ix_message_correlation_message_instance_message_instance_id', table_name='message_correlation_message_instance')
op.create_foreign_key(None, 'message_correlation_message_instance', 'message_correlation', ['message_correlation_id'], ['id'])
op.create_foreign_key(None, 'message_correlation_message_instance', 'message_instance', ['message_instance_id'], ['id'])
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_message_correlation_property_correlation_key'), table_name='message_correlation_property')
op.create_index('ix_message_correlation_message_instance_message_instance_id', 'message_correlation_message_instance', ['message_instance_id'], unique=False)
op.create_index('ix_message_correlation_message_instance_message_correlation_id', 'message_correlation_message_instance', ['message_correlation_id'], unique=False)
# ### end Alembic commands ###

View File

@ -19,7 +19,7 @@ from spiffworkflow_backend.models.user import UserModel
if TYPE_CHECKING: if TYPE_CHECKING:
from spiffworkflow_backend.models.message_instance_correlation import ( # noqa: F401 from spiffworkflow_backend.models.message_instance_correlation import ( # noqa: F401
MessageInstanceCorrelationModel, MessageInstanceCorrelationRuleModel,
) )
@ -49,7 +49,9 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
process_instance_id: int = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=True) # type: ignore process_instance_id: int = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=True) # type: ignore
name: str = db.Column(db.String(255)) name: str = db.Column(db.String(255))
message_type: str = db.Column(db.String(20), nullable=False) message_type: str = db.Column(db.String(20), nullable=False)
# Only Send Messages have a payload
payload: dict = db.Column(db.JSON) payload: dict = db.Column(db.JSON)
correlation_keys: dict = db.Column(db.JSON) # The correlation keys of the process at the time the message was created.
status: str = db.Column(db.String(20), nullable=False, default="ready") status: str = db.Column(db.String(20), nullable=False, default="ready")
user_id: int = db.Column(ForeignKey(UserModel.id), nullable=False) # type: ignore user_id: int = db.Column(ForeignKey(UserModel.id), nullable=False) # type: ignore
user = relationship("UserModel") user = relationship("UserModel")
@ -59,8 +61,8 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
failure_cause: str = db.Column(db.Text()) failure_cause: str = db.Column(db.Text())
updated_at_in_seconds: int = db.Column(db.Integer) updated_at_in_seconds: int = db.Column(db.Integer)
created_at_in_seconds: int = db.Column(db.Integer) created_at_in_seconds: int = db.Column(db.Integer)
correlations = relationship( correlation_rules = relationship(
"MessageInstanceCorrelationModel", back_populates="message_instance" "MessageInstanceCorrelationRuleModel", back_populates="message_instance"
) )
@validates("message_type") @validates("message_type")
@ -74,40 +76,59 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
return self.validate_enum_field(key, value, MessageStatuses) return self.validate_enum_field(key, value, MessageStatuses)
def correlates( def correlates(
self, other_message_instance: Any, expression_engine: PythonScriptEngine self, other: Any, expression_engine: PythonScriptEngine
) -> bool: ) -> bool:
# This must be a receive message, and the other must be a send (otherwise we reverse the call) """ Returns true if the this Message correlates with the given message.
# We evaluate the other messages payload and run our correlation's
# retrieval expressions against it, then compare it against our This must be a 'receive' message, and the other must be a 'send' or vice/versa. We evaluate
# expected values -- IF we don't have an expected value, we accept the other messages payload and run our correlation's retrieval expressions against it, then
# any non-erroring result from the retrieval expression. compare it against our expected values (as stored in this messages' correlation_keys)
if self.name != other_message_instance.name: IF we don't have an expected value, we accept any non-error result from the retrieval
expression. """
if self.is_send() and other.is_receive():
# Flip the call.
return other.correlates(self, expression_engine) # type: ignore
if self.name != other.name:
return False return False
if self.message_type == MessageTypes.receive.value: if not self.is_receive():
if other_message_instance.message_type != MessageTypes.send.value: return False
return False if isinstance(self.correlation_keys, dict) and self.correlation_keys == other.correlation_keys:
payload = other_message_instance.payload # We know we have a match, and we can just return if we don't have to figure out the key
for corr in self.correlations:
try:
result = expression_engine._evaluate(
corr.retrieval_expression, payload
)
except Exception:
# the failure of a payload evaluation may not mean that matches for these
# message instances can't happen with other messages. So don't error up.
# fixme: Perhaps log some sort of error.
return False
if corr.expected_value is None:
continue # We will accept any value
elif corr.expected_value != str(
result
): # fixme: Don't require conversion to string
return False
return True return True
elif other_message_instance.message_type == MessageTypes.receive.value:
return other_message_instance.correlates(self, expression_engine) # type: ignore # Loop over the receives' correlation keys - if any of the keys fully match, then we match.
for expected_values in self.correlation_keys.values():
if self.payload_matches_expected_values(other.payload, expected_values, expression_engine):
return True
return False return False
def is_receive(self):
return self.message_type == MessageTypes.receive.value
def is_send(self):
return self.message_type == MessageTypes.send.value
def payload_matches_expected_values(
self, payload: dict,
expected_values: dict,
expression_engine: PythonScriptEngine) -> bool:
"""Compares the payload of a 'send' message against a single correlation key's expected values."""
for correlation_key in self.correlation_rules:
expected_value = expected_values.get(correlation_key.name, None)
if expected_value is None: # This key is not required for this instance to match.
continue
try:
result = expression_engine._evaluate(correlation_key.retrieval_expression, payload)
except Exception:
# the failure of a payload evaluation may not mean that matches for these
# message instances can't happen with other messages. So don't error up.
# fixme: Perhaps log some sort of error.
return False
if result != expected_value:
return False
return True
# This runs for ALL db flushes for ANY model, not just this one even if it's in the MessageInstanceModel class # This runs for ALL db flushes for ANY model, not just this one even if it's in the MessageInstanceModel class
# so this may not be worth it or there may be a better way to do it # so this may not be worth it or there may be a better way to do it

View File

@ -10,16 +10,16 @@ from spiffworkflow_backend.models.message_instance import MessageInstanceModel
@dataclass @dataclass
class MessageInstanceCorrelationModel(SpiffworkflowBaseDBModel): class MessageInstanceCorrelationRuleModel(SpiffworkflowBaseDBModel):
"""These are the correlations of a specific Message Instance. """These are the correlations of a specific Message Instance.
These will only exist on receive messages. It provides the expression to run on These will only exist on receive messages. It provides the expression to run on
a send messages payload which must match the expected value to be considered a send messages payload which must match receive messages correlation_key dictionary
a valid match. If the expected value is null, then it does not need to to be considered a valid match. If the expected value is null, then it does not need to
match, but the expression should still evaluate and produce a result. match, but the expression should still evaluate and produce a result.
""" """
__tablename__ = "message_instance_correlation" __tablename__ = "message_instance_correlation_rule"
__table_args__ = ( __table_args__ = (
db.UniqueConstraint( db.UniqueConstraint(
"message_instance_id", "message_instance_id",
@ -33,10 +33,9 @@ class MessageInstanceCorrelationModel(SpiffworkflowBaseDBModel):
ForeignKey(MessageInstanceModel.id), nullable=False, index=True # type: ignore ForeignKey(MessageInstanceModel.id), nullable=False, index=True # type: ignore
) )
name: str = db.Column(db.String(50), nullable=False) name: str = db.Column(db.String(50), nullable=False)
expected_value: str = db.Column(db.String(255), nullable=True, index=True)
retrieval_expression: str = db.Column(db.String(255)) retrieval_expression: str = db.Column(db.String(255))
updated_at_in_seconds: int = db.Column(db.Integer) updated_at_in_seconds: int = db.Column(db.Integer)
created_at_in_seconds: int = db.Column(db.Integer) created_at_in_seconds: int = db.Column(db.Integer)
message_instance = relationship( message_instance = relationship(
"MessageInstanceModel", back_populates="correlations" "MessageInstanceModel", back_populates="correlation_rules"
) )

View File

@ -86,7 +86,6 @@ def message_send(
name=message_name, name=message_name,
payload=body["payload"], payload=body["payload"],
user_id=g.user.id, user_id=g.user.id,
correlations=[],
) )
db.session.add(message_instance) db.session.add(message_instance)
db.session.commit() db.session.commit()

View File

@ -94,8 +94,7 @@ class ErrorHandlingService:
message_type="send", message_type="send",
name=message_name, name=message_name,
payload=message_payload, payload=message_payload,
user_id=g.user.id, user_id=g.user.id
correlations=[],
) )
db.session.add(message_instance) db.session.add(message_instance)
db.session.commit() db.session.commit()

View File

@ -62,7 +62,7 @@ from spiffworkflow_backend.models.human_task import HumanTaskModel
from spiffworkflow_backend.models.human_task_user import HumanTaskUserModel from spiffworkflow_backend.models.human_task_user import HumanTaskUserModel
from spiffworkflow_backend.models.message_instance import MessageInstanceModel from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_instance_correlation import ( from spiffworkflow_backend.models.message_instance_correlation import (
MessageInstanceCorrelationModel, MessageInstanceCorrelationRuleModel
) )
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
@ -1352,7 +1352,7 @@ class ProcessInstanceProcessor:
message_type="send", message_type="send",
name=bpmn_message.name, name=bpmn_message.name,
payload=bpmn_message.payload, payload=bpmn_message.payload,
correlations=[], correlation_keys=self.bpmn_process_instance.correlations
) )
db.session.add(message_instance) db.session.add(message_instance)
db.session.commit() db.session.commit()
@ -1382,15 +1382,15 @@ class ProcessInstanceProcessor:
user_id=self.process_instance_model.process_initiator_id, user_id=self.process_instance_model.process_initiator_id,
message_type="receive", message_type="receive",
name=event["name"], name=event["name"],
correlation_keys=self.bpmn_process_instance.correlations
) )
for correlation_property in event["value"]: for correlation_property in event["value"]:
message_correlation = MessageInstanceCorrelationModel( message_correlation = MessageInstanceCorrelationRuleModel(
message_instance_id=message_instance.id, message_instance_id=message_instance.id,
name=correlation_property.name, name=correlation_property.name,
expected_value=correlation_property.expected_value,
retrieval_expression=correlation_property.retrieval_expression, retrieval_expression=correlation_property.retrieval_expression,
) )
message_instance.correlations.append(message_correlation) message_instance.correlation_rules.append(message_correlation)
db.session.add(message_instance) db.session.add(message_instance)
db.session.commit() db.session.commit()

View File

@ -54,7 +54,7 @@ class TestMessageInstance(BaseTest):
user_id=process_instance.process_initiator_id, user_id=process_instance.process_initiator_id,
message_type="send", message_type="send",
name=message_name, name=message_name,
payload={"Word": "Eat At Mashita's, delicious!"}, payload={"Word": "Eat At Mashita's, its delicious!"},
) )
db.session.add(queued_message) db.session.add(queued_message)
db.session.commit() db.session.commit()

View File

@ -43,7 +43,7 @@ class TestMessageService(BaseTest):
"amount": "100.00", "amount": "100.00",
} }
self.start_sender_process(client, with_super_admin_user) self.start_sender_process(client, with_super_admin_user, "test_from_api")
self.assure_a_message_was_sent() self.assure_a_message_was_sent()
self.assure_there_is_a_process_waiting_on_a_message() self.assure_there_is_a_process_waiting_on_a_message()
@ -105,7 +105,7 @@ class TestMessageService(BaseTest):
) )
# Now start the main process # Now start the main process
self.start_sender_process(client, with_super_admin_user) self.start_sender_process(client, with_super_admin_user, "test_between_processes")
self.assure_a_message_was_sent() self.assure_a_message_was_sent()
# This is typically called in a background cron process, so we will manually call it # This is typically called in a background cron process, so we will manually call it
@ -145,9 +145,9 @@ class TestMessageService(BaseTest):
assert message_receiver_process.status == "complete" assert message_receiver_process.status == "complete"
def start_sender_process( def start_sender_process(
self, client: FlaskClient, with_super_admin_user: UserModel self, client: FlaskClient, with_super_admin_user: UserModel, group_name:str = "test_group"
) -> None: ) -> None:
process_group_id = "test_group" process_group_id = group_name
self.create_process_group( self.create_process_group(
client, with_super_admin_user, process_group_id, process_group_id client, with_super_admin_user, process_group_id, process_group_id
) )
@ -186,7 +186,7 @@ class TestMessageService(BaseTest):
assert len(send_messages) == 1 assert len(send_messages) == 1
send_message = send_messages[0] send_message = send_messages[0]
assert ( assert (
send_message.payload == self.payload send_message.payload == self.payload
), "The send message should match up with the payload" ), "The send message should match up with the payload"
assert send_message.name == "Request Approval" assert send_message.name == "Request Approval"
assert send_message.status == "ready" assert send_message.status == "ready"
@ -207,12 +207,10 @@ class TestMessageService(BaseTest):
self, message: MessageInstanceModel self, message: MessageInstanceModel
) -> None: ) -> None:
# Correlation Properties should match up # Correlation Properties should match up
po_curr = next(c for c in message.correlations if c.name == "po_number") po_curr = next(c for c in message.correlation_rules if c.name == "po_number")
customer_curr = next(c for c in message.correlations if c.name == "customer_id") customer_curr = next(c for c in message.correlation_rules if c.name == "customer_id")
assert po_curr is not None assert po_curr is not None
assert customer_curr is not None assert customer_curr is not None
assert po_curr.expected_value == "1001"
assert customer_curr.expected_value == "Sartography"
def test_can_send_message_to_multiple_process_models( def test_can_send_message_to_multiple_process_models(
self, self,