Merge remote-tracking branch 'origin/main' into feature/script_get_last_user_completing_task

This commit is contained in:
jasquat 2023-02-27 14:28:23 -05:00
commit dfac872606
39 changed files with 1201 additions and 992 deletions

View File

@ -8,8 +8,6 @@ from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
from spiffworkflow_backend.models.human_task_user import HumanTaskUserModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_instance_processor import (
@ -46,11 +44,10 @@ def app() -> Flask:
@pytest.fixture()
def with_db_and_bpmn_file_cleanup() -> None:
"""Process_group_resource."""
db.session.query(HumanTaskUserModel).delete()
for model in SpiffworkflowBaseDBModel._all_subclasses():
db.session.query(model).delete()
"""Do it cleanly!"""
meta = db.metadata
for table in reversed(meta.sorted_tables):
db.session.execute(table.delete())
db.session.commit()
try:

View File

@ -0,0 +1,153 @@
"""empty message
Revision ID: 9f0b1662a8af
Revises: 63fc8d693b9f
Create Date: 2023-02-24 14:30:05.970959
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
# revision identifiers, used by Alembic.
revision = '9f0b1662a8af'
down_revision = '63fc8d693b9f'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('correlation_property_cache',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=50), nullable=False),
sa.Column('message_name', sa.String(length=50), nullable=False),
sa.Column('process_model_id', sa.String(length=255), nullable=False),
sa.Column('retrieval_expression', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_table('message_instance_correlation_rule',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('message_instance_id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=50), nullable=False),
sa.Column('retrieval_expression', sa.String(length=255), nullable=True),
sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('created_at_in_seconds', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['message_instance_id'], ['message_instance.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('message_instance_id', 'name', name='message_instance_id_name_unique')
)
op.create_index(op.f('ix_message_instance_correlation_rule_message_instance_id'), 'message_instance_correlation_rule', ['message_instance_id'], unique=False)
op.drop_index('ix_message_model_identifier', table_name='message_model')
op.drop_index('ix_message_model_name', table_name='message_model')
op.drop_constraint('message_correlation_property_ibfk_1', 'message_correlation_property', type_='foreignkey')
op.drop_constraint('message_triggerable_process_model_ibfk_1', 'message_triggerable_process_model', type_='foreignkey')
op.drop_constraint('message_instance_ibfk_1', 'message_instance', type_='foreignkey')
op.drop_table('message_model')
op.drop_constraint('message_correlation_message_instance_ibfk_1', 'message_correlation_message_instance', type_='foreignkey')
op.drop_index('ix_message_correlation_name', table_name='message_correlation')
op.drop_index('ix_message_correlation_process_instance_id', table_name='message_correlation')
op.drop_index('ix_message_correlation_value', table_name='message_correlation')
# op.drop_index('message_instance_id_name_unique', table_name='message_correlation')
# op.drop_index('ix_message_correlation_message_correlation_property_id', table_name='message_correlation')
op.drop_table('message_correlation')
op.drop_index('ix_message_correlation_message_instance_message_correlation_id', table_name='message_correlation_message_instance')
op.drop_index('ix_message_correlation_message_instance_message_instance_id', table_name='message_correlation_message_instance')
# op.drop_index('message_correlation_message_instance_unique', table_name='message_correlation_message_instance')
op.drop_table('message_correlation_message_instance')
op.drop_index('ix_message_correlation_property_identifier', table_name='message_correlation_property')
op.drop_index('message_correlation_property_unique', table_name='message_correlation_property')
op.drop_table('message_correlation_property')
op.add_column('message_instance', sa.Column('name', sa.String(length=255), nullable=True))
op.add_column('message_instance', sa.Column('correlation_keys', sa.JSON(), nullable=True))
op.add_column('message_instance', sa.Column('user_id', sa.Integer(), nullable=False))
op.add_column('message_instance', sa.Column('counterpart_id', sa.Integer(), nullable=True))
op.alter_column('message_instance', 'process_instance_id',
existing_type=mysql.INTEGER(),
nullable=True)
op.create_foreign_key(None, 'message_instance', 'user', ['user_id'], ['id'])
op.drop_column('message_instance', 'message_model_id')
op.add_column('message_triggerable_process_model', sa.Column('message_name', sa.String(length=255), nullable=True))
op.drop_index('message_model_id', table_name='message_triggerable_process_model')
op.drop_column('message_triggerable_process_model', 'message_model_id')
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('message_triggerable_process_model', sa.Column('message_model_id', mysql.INTEGER(), autoincrement=False, nullable=False))
op.create_index('message_model_id', 'message_triggerable_process_model', ['message_model_id'], unique=False)
op.drop_column('message_triggerable_process_model', 'message_name')
op.add_column('message_instance', sa.Column('message_model_id', mysql.INTEGER(), autoincrement=False, nullable=False))
op.drop_constraint(None, 'message_instance', type_='foreignkey')
op.create_foreign_key('message_instance_ibfk_1', 'message_instance', 'message_model', ['message_model_id'], ['id'])
op.alter_column('message_instance', 'process_instance_id',
existing_type=mysql.INTEGER(),
nullable=False)
op.drop_column('message_instance', 'counterpart_id')
op.drop_column('message_instance', 'user_id')
op.drop_column('message_instance', 'correlation_keys')
op.drop_column('message_instance', 'name')
op.create_table('message_correlation_property',
sa.Column('id', mysql.INTEGER(), autoincrement=True, nullable=False),
sa.Column('identifier', mysql.VARCHAR(length=50), nullable=True),
sa.Column('message_model_id', mysql.INTEGER(), autoincrement=False, nullable=False),
sa.Column('updated_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True),
sa.Column('created_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True),
sa.ForeignKeyConstraint(['message_model_id'], ['message_model.id'], name='message_correlation_property_ibfk_1'),
sa.PrimaryKeyConstraint('id'),
mysql_collate='utf8mb4_0900_ai_ci',
mysql_default_charset='utf8mb4',
mysql_engine='InnoDB'
)
op.create_index('message_correlation_property_unique', 'message_correlation_property', ['identifier', 'message_model_id'], unique=False)
op.create_index('ix_message_correlation_property_identifier', 'message_correlation_property', ['identifier'], unique=False)
op.create_table('message_correlation_message_instance',
sa.Column('id', mysql.INTEGER(), autoincrement=True, nullable=False),
sa.Column('message_instance_id', mysql.INTEGER(), autoincrement=False, nullable=False),
sa.Column('message_correlation_id', mysql.INTEGER(), autoincrement=False, nullable=False),
sa.ForeignKeyConstraint(['message_correlation_id'], ['message_correlation.id'], name='message_correlation_message_instance_ibfk_1'),
sa.ForeignKeyConstraint(['message_instance_id'], ['message_instance.id'], name='message_correlation_message_instance_ibfk_2'),
sa.PrimaryKeyConstraint('id'),
mysql_collate='utf8mb4_0900_ai_ci',
mysql_default_charset='utf8mb4',
mysql_engine='InnoDB'
)
op.create_index('message_correlation_message_instance_unique', 'message_correlation_message_instance', ['message_instance_id', 'message_correlation_id'], unique=False)
op.create_index('ix_message_correlation_message_instance_message_instance_id', 'message_correlation_message_instance', ['message_instance_id'], unique=False)
op.create_index('ix_message_correlation_message_instance_message_correlation_id', 'message_correlation_message_instance', ['message_correlation_id'], unique=False)
op.create_table('message_correlation',
sa.Column('id', mysql.INTEGER(), autoincrement=True, nullable=False),
sa.Column('process_instance_id', mysql.INTEGER(), autoincrement=False, nullable=False),
sa.Column('message_correlation_property_id', mysql.INTEGER(), autoincrement=False, nullable=False),
sa.Column('name', mysql.VARCHAR(length=255), nullable=False),
sa.Column('value', mysql.VARCHAR(length=255), nullable=False),
sa.Column('updated_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True),
sa.Column('created_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True),
sa.ForeignKeyConstraint(['message_correlation_property_id'], ['message_correlation_property.id'], name='message_correlation_ibfk_1'),
sa.ForeignKeyConstraint(['process_instance_id'], ['process_instance.id'], name='message_correlation_ibfk_2'),
sa.PrimaryKeyConstraint('id'),
mysql_collate='utf8mb4_0900_ai_ci',
mysql_default_charset='utf8mb4',
mysql_engine='InnoDB'
)
op.create_index('message_instance_id_name_unique', 'message_correlation', ['process_instance_id', 'message_correlation_property_id', 'name'], unique=False)
op.create_index('ix_message_correlation_value', 'message_correlation', ['value'], unique=False)
op.create_index('ix_message_correlation_process_instance_id', 'message_correlation', ['process_instance_id'], unique=False)
op.create_index('ix_message_correlation_name', 'message_correlation', ['name'], unique=False)
op.create_index('ix_message_correlation_message_correlation_property_id', 'message_correlation', ['message_correlation_property_id'], unique=False)
op.create_table('message_model',
sa.Column('id', mysql.INTEGER(), autoincrement=True, nullable=False),
sa.Column('identifier', mysql.VARCHAR(length=50), nullable=True),
sa.Column('name', mysql.VARCHAR(length=50), nullable=True),
sa.PrimaryKeyConstraint('id'),
mysql_collate='utf8mb4_0900_ai_ci',
mysql_default_charset='utf8mb4',
mysql_engine='InnoDB'
)
op.create_index('ix_message_model_name', 'message_model', ['name'], unique=False)
op.create_index('ix_message_model_identifier', 'message_model', ['identifier'], unique=False)
op.drop_index(op.f('ix_message_instance_correlation_rule_message_instance_id'), table_name='message_instance_correlation_rule')
op.drop_table('message_instance_correlation_rule')
op.drop_table('correlation_property_cache')
# ### end Alembic commands ###

View File

@ -2933,7 +2933,7 @@ lxml = "*"
type = "git"
url = "https://github.com/sartography/SpiffWorkflow"
reference = "main"
resolved_reference = "2ca6ebf800d4ff1d54f3e1c48798a2cb879560f7"
resolved_reference = "76ecbf7cc8d47185fa410f301ffa3452079ee672"
[[package]]
name = "SQLAlchemy"

View File

@ -27,8 +27,8 @@ flask-marshmallow = "*"
flask-migrate = "*"
flask-restful = "*"
werkzeug = "*"
SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "main"}
# SpiffWorkflow = {develop = true, path = "../SpiffWorkflow" }
#SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "main"}
SpiffWorkflow = {develop = true, path = "../SpiffWorkflow" }
sentry-sdk = "^1.10"
sphinx-autoapi = "^2.0"
flask-bpmn = {git = "https://github.com/sartography/flask-bpmn", rev = "main"}

View File

@ -1,6 +1,7 @@
"""__init__."""
import faulthandler
import os
import sys
from typing import Any
import connexion # type: ignore
@ -94,14 +95,6 @@ def create_app() -> flask.app.Flask:
app.config["CONNEXION_APP"] = connexion_app
app.config["SESSION_TYPE"] = "filesystem"
if os.environ.get("FLASK_SESSION_SECRET_KEY") is None:
raise KeyError(
"Cannot find the secret_key from the environment. Please set"
" FLASK_SESSION_SECRET_KEY"
)
app.secret_key = os.environ.get("FLASK_SESSION_SECRET_KEY")
setup_config(app)
db.init_app(app)
migrate.init_app(app, db)

View File

@ -1878,19 +1878,19 @@ paths:
schema:
$ref: "#/components/schemas/Workflow"
/messages/{message_identifier}:
/messages/{message_name}:
parameters:
- name: message_identifier
- name: message_name
in: path
required: true
description: The unique identifier of the message model.
description: The unique name of the message.
schema:
type: string
post:
tags:
- Messages
operationId: spiffworkflow_backend.routes.messages_controller.message_send
summary: Instantiate and run a given process model with a message start event matching given identifier
summary: Instantiate and run a given process model with a message start event matching given name
requestBody:
content:
application/json:

View File

@ -129,6 +129,14 @@ def setup_config(app: Flask) -> None:
"SPIFFWORKFLOW_BACKEND_BPMN_SPEC_ABSOLUTE_DIR config must be set"
)
if app.config["FLASK_SESSION_SECRET_KEY"] is None:
raise KeyError(
"Cannot find the secret_key from the environment. Please set"
" FLASK_SESSION_SECRET_KEY"
)
app.secret_key = os.environ.get("FLASK_SESSION_SECRET_KEY")
app.config["PROCESS_UUID"] = uuid.uuid4()
setup_database_uri(app)

View File

@ -6,6 +6,8 @@ from os import environ
# and from_prefixed_env(), though we want to ensure that these variables are all documented, so that
# is a benefit of the status quo and having them all in this file explicitly.
FLASK_SESSION_SECRET_KEY = environ.get("FLASK_SESSION_SECRET_KEY")
SPIFFWORKFLOW_BACKEND_BPMN_SPEC_ABSOLUTE_DIR = environ.get(
"SPIFFWORKFLOW_BACKEND_BPMN_SPEC_ABSOLUTE_DIR"
)

View File

@ -28,6 +28,7 @@ groups:
users:
[
admin@spiffworkflow.org,
nelson@spiffworkflow.org
]
permissions:

View File

@ -157,7 +157,7 @@ class ApiError(Exception):
error_line=exp.error_line,
task_trace=exp.task_trace,
)
elif isinstance(exp, WorkflowException):
elif isinstance(exp, WorkflowException) and exp.task_spec:
return ApiError.from_task_spec(error_code, message, exp.task_spec)
else:
return ApiError("workflow_error", str(exp))
@ -253,7 +253,7 @@ def handle_exception(exception: Exception) -> flask.wrappers.Response:
else:
api_exception = ApiError(
error_code=error_code,
message=f"{exception.__class__.__name__}",
message=f"{exception.__class__.__name__} {str(exception)}",
sentry_link=sentry_link,
status_code=status_code,
)

View File

@ -21,13 +21,9 @@ from spiffworkflow_backend.models.human_task import HumanTaskModel # noqa: F401
from spiffworkflow_backend.models.spec_reference import (
SpecReferenceCache,
) # noqa: F401
from spiffworkflow_backend.models.message_correlation_property import (
MessageCorrelationPropertyModel,
) # noqa: F401
from spiffworkflow_backend.models.message_instance import (
MessageInstanceModel,
) # noqa: F401
from spiffworkflow_backend.models.message_model import MessageModel # noqa: F401
from spiffworkflow_backend.models.message_triggerable_process_model import (
MessageTriggerableProcessModel,
) # noqa: F401

View File

@ -0,0 +1,24 @@
"""Message_correlation."""
from dataclasses import dataclass
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
@dataclass
class CorrelationPropertyCache(SpiffworkflowBaseDBModel):
"""A list of known correlation properties as read from BPMN files.
This correlation properties are not directly linked to anything
but it provides a way to know what processes are talking about
what messages and correlation keys. And could be useful as an
api endpoint if you wanted to know what another process model
is using.
"""
__tablename__ = "correlation_property_cache"
id = db.Column(db.Integer, primary_key=True)
name: str = db.Column(db.String(50), nullable=False)
message_name: str = db.Column(db.String(50), nullable=False)
process_model_id: str = db.Column(db.String(255), nullable=False)
retrieval_expression: str = db.Column(db.String(255))

View File

@ -1,50 +0,0 @@
"""Message_correlation."""
from dataclasses import dataclass
from typing import TYPE_CHECKING
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
from spiffworkflow_backend.models.message_correlation_property import (
MessageCorrelationPropertyModel,
)
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
if TYPE_CHECKING:
from spiffworkflow_backend.models.message_correlation_message_instance import ( # noqa: F401
MessageCorrelationMessageInstanceModel,
)
@dataclass
class MessageCorrelationModel(SpiffworkflowBaseDBModel):
"""Message Correlations to relate queued messages together."""
__tablename__ = "message_correlation"
__table_args__ = (
db.UniqueConstraint(
"process_instance_id",
"message_correlation_property_id",
"name",
name="message_instance_id_name_unique",
),
)
id = db.Column(db.Integer, primary_key=True)
process_instance_id = db.Column(
ForeignKey(ProcessInstanceModel.id), nullable=False, index=True # type: ignore
)
message_correlation_property_id = db.Column(
ForeignKey(MessageCorrelationPropertyModel.id), nullable=False, index=True
)
name = db.Column(db.String(255), nullable=False, index=True)
value = db.Column(db.String(255), nullable=False, index=True)
updated_at_in_seconds: int = db.Column(db.Integer)
created_at_in_seconds: int = db.Column(db.Integer)
message_correlation_property = relationship("MessageCorrelationPropertyModel")
message_correlations_message_instances = relationship(
"MessageCorrelationMessageInstanceModel", cascade="delete"
)

View File

@ -1,32 +0,0 @@
"""Message_correlation_message_instance."""
from dataclasses import dataclass
from sqlalchemy import ForeignKey
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
from spiffworkflow_backend.models.message_correlation import MessageCorrelationModel
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
@dataclass
class MessageCorrelationMessageInstanceModel(SpiffworkflowBaseDBModel):
"""MessageCorrelationMessageInstanceModel."""
__tablename__ = "message_correlation_message_instance"
__table_args__ = (
db.UniqueConstraint(
"message_instance_id",
"message_correlation_id",
name="message_correlation_message_instance_unique",
),
)
id = db.Column(db.Integer, primary_key=True)
message_instance_id = db.Column(
ForeignKey(MessageInstanceModel.id), nullable=False, index=True # type: ignore
)
message_correlation_id = db.Column(
ForeignKey(MessageCorrelationModel.id), nullable=False, index=True
)

View File

@ -1,25 +0,0 @@
"""Message_correlation_property."""
from sqlalchemy import ForeignKey
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
from spiffworkflow_backend.models.message_model import MessageModel
class MessageCorrelationPropertyModel(SpiffworkflowBaseDBModel):
"""MessageCorrelationPropertyModel."""
__tablename__ = "message_correlation_property"
__table_args__ = (
db.UniqueConstraint(
"identifier",
"message_model_id",
name="message_correlation_property_unique",
),
)
id = db.Column(db.Integer, primary_key=True)
identifier = db.Column(db.String(50), index=True)
message_model_id = db.Column(ForeignKey(MessageModel.id), nullable=False)
updated_at_in_seconds: int = db.Column(db.Integer)
created_at_in_seconds: int = db.Column(db.Integer)

View File

@ -5,6 +5,7 @@ from typing import Any
from typing import Optional
from typing import TYPE_CHECKING
from SpiffWorkflow.bpmn.PythonScriptEngine import PythonScriptEngine # type: ignore
from sqlalchemy import ForeignKey
from sqlalchemy.event import listens_for
from sqlalchemy.orm import relationship
@ -13,12 +14,12 @@ from sqlalchemy.orm import validates
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
from spiffworkflow_backend.models.message_model import MessageModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.user import UserModel
if TYPE_CHECKING:
from spiffworkflow_backend.models.message_correlation_message_instance import ( # noqa: F401
MessageCorrelationMessageInstanceModel,
from spiffworkflow_backend.models.message_instance_correlation import ( # noqa: F401
MessageInstanceCorrelationRuleModel,
)
@ -45,21 +46,25 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
__tablename__ = "message_instance"
id: int = db.Column(db.Integer, primary_key=True)
process_instance_id: int = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=False) # type: ignore
message_model_id: int = db.Column(ForeignKey(MessageModel.id), nullable=False)
message_model = relationship("MessageModel")
message_correlations_message_instances = relationship(
"MessageCorrelationMessageInstanceModel", cascade="delete"
)
process_instance_id: int = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=True) # type: ignore
name: str = db.Column(db.String(255))
message_type: str = db.Column(db.String(20), nullable=False)
payload: str = db.Column(db.JSON)
# Only Send Messages have a payload
payload: dict = db.Column(db.JSON)
# The correlation keys of the process at the time the message was created.
correlation_keys: dict = db.Column(db.JSON)
status: str = db.Column(db.String(20), nullable=False, default="ready")
user_id: int = db.Column(ForeignKey(UserModel.id), nullable=False) # type: ignore
user = relationship("UserModel")
counterpart_id: int = db.Column(
db.Integer
) # Not enforcing self-referential foreign key so we can delete messages.
failure_cause: str = db.Column(db.Text())
updated_at_in_seconds: int = db.Column(db.Integer)
created_at_in_seconds: int = db.Column(db.Integer)
message_correlations: Optional[dict] = None
correlation_rules = relationship(
"MessageInstanceCorrelationRuleModel", back_populates="message_instance"
)
@validates("message_type")
def validate_message_type(self, key: str, value: Any) -> Any:
@ -71,12 +76,82 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
"""Validate_status."""
return self.validate_enum_field(key, value, MessageStatuses)
def correlates(self, other: Any, expression_engine: PythonScriptEngine) -> bool:
"""Returns true if the this Message correlates with the given message.
This must be a 'receive' message, and the other must be a 'send' or vice/versa.
If both messages have identical correlation_keys, they are a match. Otherwise
we check through this messages correlation properties and use the retrieval expressions
to extract the correlation keys from the send's payload, and verify that these
match up with correlation keys on this message.
"""
if self.is_send() and other.is_receive():
# Flip the call.
return other.correlates(self, expression_engine) # type: ignore
if self.name != other.name:
return False
if not self.is_receive():
return False
if (
isinstance(self.correlation_keys, dict)
and self.correlation_keys == other.correlation_keys
):
# We know we have a match, and we can just return if we don't have to figure out the key
return True
if self.correlation_keys == {}:
# Then there is nothing more to match on -- we accept any message with the given name.
return True
# Loop over the receives' correlation keys - if any of the keys fully match, then we match.
for expected_values in self.correlation_keys.values():
if self.payload_matches_expected_values(
other.payload, expected_values, expression_engine
):
return True
return False
def is_receive(self) -> bool:
return self.message_type == MessageTypes.receive.value
def is_send(self) -> bool:
return self.message_type == MessageTypes.send.value
def payload_matches_expected_values(
self,
payload: dict,
expected_values: dict,
expression_engine: PythonScriptEngine,
) -> bool:
"""Compares the payload of a 'send' message against a single correlation key's expected values."""
for correlation_key in self.correlation_rules:
expected_value = expected_values.get(correlation_key.name, None)
if (
expected_value is None
): # This key is not required for this instance to match.
continue
try:
result = expression_engine._evaluate(
correlation_key.retrieval_expression, payload
)
except Exception:
# the failure of a payload evaluation may not mean that matches for these
# message instances can't happen with other messages. So don't error up.
# fixme: Perhaps log some sort of error.
return False
if result != expected_value:
return False
return True
# This runs for ALL db flushes for ANY model, not just this one even if it's in the MessageInstanceModel class
# so this may not be worth it or there may be a better way to do it
#
# https://stackoverflow.com/questions/32555829/flask-validates-decorator-multiple-fields-simultaneously/33025472#33025472
# https://docs.sqlalchemy.org/en/14/orm/session_events.html#before-flush
@listens_for(Session, "before_flush") # type: ignore
def ensure_failure_cause_is_set_if_message_instance_failed(
session: Any, _flush_context: Optional[Any], _instances: Optional[Any]

View File

@ -0,0 +1,41 @@
"""Message_correlation."""
from dataclasses import dataclass
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
@dataclass
class MessageInstanceCorrelationRuleModel(SpiffworkflowBaseDBModel):
"""These are the correlations of a specific Message Instance.
These will only exist on receive messages. It provides the expression to run on
a send messages payload which must match receive messages correlation_key dictionary
to be considered a valid match. If the expected value is null, then it does not need to
match, but the expression should still evaluate and produce a result.
"""
__tablename__ = "message_instance_correlation_rule"
__table_args__ = (
db.UniqueConstraint(
"message_instance_id",
"name",
name="message_instance_id_name_unique",
),
)
id = db.Column(db.Integer, primary_key=True)
message_instance_id = db.Column(
ForeignKey(MessageInstanceModel.id), nullable=False, index=True # type: ignore
)
name: str = db.Column(db.String(50), nullable=False)
retrieval_expression: str = db.Column(db.String(255))
updated_at_in_seconds: int = db.Column(db.Integer)
created_at_in_seconds: int = db.Column(db.Integer)
message_instance = relationship(
"MessageInstanceModel", back_populates="correlation_rules"
)

View File

@ -1,13 +0,0 @@
"""Message_model."""
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
class MessageModel(SpiffworkflowBaseDBModel):
"""MessageModel."""
__tablename__ = "message_model"
id = db.Column(db.Integer, primary_key=True)
identifier = db.Column(db.String(50), unique=True, index=True)
name = db.Column(db.String(50), unique=True, index=True)

View File

@ -1,9 +1,6 @@
"""Message_correlation_property."""
from sqlalchemy import ForeignKey
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
from spiffworkflow_backend.models.message_model import MessageModel
class MessageTriggerableProcessModel(SpiffworkflowBaseDBModel):
@ -12,10 +9,7 @@ class MessageTriggerableProcessModel(SpiffworkflowBaseDBModel):
__tablename__ = "message_triggerable_process_model"
id = db.Column(db.Integer, primary_key=True)
message_model_id = db.Column(
ForeignKey(MessageModel.id), nullable=False, unique=True
)
message_name: str = db.Column(db.String(255))
process_model_identifier: str = db.Column(db.String(50), nullable=False, index=True)
updated_at_in_seconds: int = db.Column(db.Integer)
created_at_in_seconds: int = db.Column(db.Integer)

View File

@ -74,7 +74,6 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel):
overlaps="active_human_tasks",
) # type: ignore
message_instances = relationship("MessageInstanceModel", cascade="delete") # type: ignore
message_correlations = relationship("MessageCorrelationModel", cascade="delete") # type: ignore
process_metadata = relationship(
"ProcessInstanceMetadataModel",
cascade="delete",
@ -144,6 +143,10 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel):
"""Can_submit_task."""
return not self.has_terminal_status() and self.status != "suspended"
def can_receive_message(self) -> bool:
"""If this process can currently accept messages."""
return not self.has_terminal_status() and self.status != "suspended"
def has_terminal_status(self) -> bool:
"""Has_terminal_status."""
return self.status in self.terminal_statuses()

View File

@ -10,19 +10,11 @@ from flask import jsonify
from flask import make_response
from flask.wrappers import Response
from spiffworkflow_backend import db
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.message_correlation import MessageCorrelationModel
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_model import MessageModel
from spiffworkflow_backend.models.message_triggerable_process_model import (
MessageTriggerableProcessModel,
)
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModelSchema
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
from spiffworkflow_backend.routes.process_api_blueprint import (
_find_process_instance_by_id_or_raise,
)
from spiffworkflow_backend.services.message_service import MessageService
@ -45,36 +37,14 @@ def message_instance_list(
MessageInstanceModel.created_at_in_seconds.desc(), # type: ignore
MessageInstanceModel.id.desc(), # type: ignore
)
.join(MessageModel, MessageModel.id == MessageInstanceModel.message_model_id)
.join(ProcessInstanceModel)
.outerjoin(ProcessInstanceModel) # Not all messages were created by a process
.add_columns(
MessageModel.identifier.label("message_identifier"),
ProcessInstanceModel.process_model_identifier,
ProcessInstanceModel.process_model_display_name,
)
.paginate(page=page, per_page=per_page, error_out=False)
)
for message_instance in message_instances:
message_correlations: dict = {}
for (
mcmi
) in (
message_instance.MessageInstanceModel.message_correlations_message_instances
):
mc = MessageCorrelationModel.query.filter_by(
id=mcmi.message_correlation_id
).all()
for m in mc:
if m.name not in message_correlations:
message_correlations[m.name] = {}
message_correlations[m.name][
m.message_correlation_property.identifier
] = m.value
message_instance.MessageInstanceModel.message_correlations = (
message_correlations
)
response_json = {
"results": message_instances.items,
"pagination": {
@ -92,104 +62,58 @@ def message_instance_list(
# process_instance_id: Optional[int],
# }
def message_send(
message_identifier: str,
message_name: str,
body: Dict[str, Any],
) -> flask.wrappers.Response:
"""Message_start."""
message_model = MessageModel.query.filter_by(identifier=message_identifier).first()
if message_model is None:
raise (
ApiError(
error_code="unknown_message",
message=f"Could not find message with identifier: {message_identifier}",
status_code=404,
)
)
if "payload" not in body:
raise (
ApiError(
error_code="missing_payload",
message="Body is missing payload.",
message=(
"Please include a 'payload' in the JSON body that contains the"
" message contents."
),
status_code=400,
)
)
process_instance = None
if "process_instance_id" in body:
# to make sure we have a valid process_instance_id
process_instance = _find_process_instance_by_id_or_raise(
body["process_instance_id"]
)
if process_instance.status == ProcessInstanceStatus.suspended.value:
raise ApiError(
error_code="process_instance_is_suspended",
# Create the send message
message_instance = MessageInstanceModel(
message_type="send",
name=message_name,
payload=body["payload"],
user_id=g.user.id,
)
db.session.add(message_instance)
db.session.commit()
try:
receiver_message = MessageService.correlate_send_message(message_instance)
except Exception as e:
db.session.delete(message_instance)
db.session.commit()
raise e
if not receiver_message:
db.session.delete(message_instance)
db.session.commit()
raise (
ApiError(
error_code="message_not_accepted",
message=(
f"Process Instance '{process_instance.id}' is suspended and cannot"
" accept messages.'"
"No running process instances correlate with the given message"
f" name of '{message_name}'. And this message name is not"
" currently associated with any process Start Event. Nothing"
" to do."
),
status_code=400,
)
if process_instance.status == ProcessInstanceStatus.terminated.value:
raise ApiError(
error_code="process_instance_is_terminated",
message=(
f"Process Instance '{process_instance.id}' is terminated and cannot"
" accept messages.'"
),
status_code=400,
)
message_instance = MessageInstanceModel.query.filter_by(
process_instance_id=process_instance.id,
message_model_id=message_model.id,
message_type="receive",
status="ready",
).first()
if message_instance is None:
raise (
ApiError(
error_code="cannot_find_waiting_message",
message=(
"Could not find waiting message for identifier"
f" {message_identifier} and process instance"
f" {process_instance.id}"
),
status_code=400,
)
)
MessageService.process_message_receive(
message_instance, message_model.name, body["payload"]
)
else:
message_triggerable_process_model = (
MessageTriggerableProcessModel.query.filter_by(
message_model_id=message_model.id
).first()
)
if message_triggerable_process_model is None:
raise (
ApiError(
error_code="cannot_start_message",
message=(
"Message with identifier cannot be start with message:"
f" {message_identifier}"
),
status_code=400,
)
)
process_instance = MessageService.process_message_triggerable_process_model(
message_triggerable_process_model,
message_model.name,
body["payload"],
g.user,
)
process_instance = ProcessInstanceModel.query.filter_by(
id=receiver_message.process_instance_id
).first()
return Response(
json.dumps(ProcessInstanceModelSchema().dump(process_instance)),
status=200,

View File

@ -137,7 +137,7 @@ def process_instance_run(
processor.unlock_process_instance("Web")
if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER"]:
MessageService.process_message_instances()
MessageService.correlate_all_message_instances()
process_instance_api = ProcessInstanceService.processor_to_process_instance_api(
processor

View File

@ -22,4 +22,4 @@ class BackgroundProcessingService:
def process_message_instances_with_app_context(self) -> None:
"""Since this runs in a scheduler, we need to specify the app context as well."""
with self.app.app_context():
MessageService.process_message_instances()
MessageService.correlate_all_message_instances()

View File

@ -8,7 +8,7 @@ from flask.wrappers import Response
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.message_model import MessageModel
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_triggerable_process_model import (
MessageTriggerableProcessModel,
)
@ -80,22 +80,27 @@ class ErrorHandlingService:
f" Error:\n{error.__repr__()}"
)
message_payload = {"message_text": message_text, "recipients": recipients}
message_identifier = current_app.config[
message_name = current_app.config[
"SPIFFWORKFLOW_BACKEND_SYSTEM_NOTIFICATION_PROCESS_MODEL_MESSAGE_ID"
]
message_model = MessageModel.query.filter_by(
identifier=message_identifier
).first()
message_triggerable_process_model = (
MessageTriggerableProcessModel.query.filter_by(
message_model_id=message_model.id
message_name=message_name
).first()
)
process_instance = MessageService.process_message_triggerable_process_model(
message_triggerable_process_model,
message_identifier,
message_payload,
g.user,
# Create the send message
message_instance = MessageInstanceModel(
message_type="send",
name=message_name,
payload=message_payload,
user_id=g.user.id,
)
db.session.add(message_instance)
db.session.commit()
process_instance = MessageService.start_process_with_message(
message_triggerable_process_model, message_instance
)
return Response(

View File

@ -1,22 +1,15 @@
"""Message_service."""
from typing import Any
from typing import Optional
from sqlalchemy import and_
from sqlalchemy import or_
from sqlalchemy import select
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.message_correlation import MessageCorrelationModel
from spiffworkflow_backend.models.message_correlation_message_instance import (
MessageCorrelationMessageInstanceModel,
)
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_instance import MessageStatuses
from spiffworkflow_backend.models.message_instance import MessageTypes
from spiffworkflow_backend.models.message_triggerable_process_model import (
MessageTriggerableProcessModel,
)
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_instance_processor import (
CustomBpmnScriptEngine,
)
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
@ -33,114 +26,133 @@ class MessageService:
"""MessageService."""
@classmethod
def process_message_instances(cls) -> None:
"""Process_message_instances."""
def correlate_send_message(
cls, message_instance_send: MessageInstanceModel
) -> MessageInstanceModel | None:
"""Connects the given send message to a 'receive' message if possible.
:param message_instance_send:
:return: the message instance that received this message.
"""
# Thread safe via db locking - don't try to progress the same send message over multiple instances
if message_instance_send.status != MessageStatuses.ready.value:
return None
message_instance_send.status = MessageStatuses.running.value
db.session.add(message_instance_send)
db.session.commit()
# Find available messages that might match
available_receive_messages = MessageInstanceModel.query.filter_by(
name=message_instance_send.name,
status=MessageStatuses.ready.value,
message_type=MessageTypes.receive.value,
).all()
message_instance_receive: MessageInstanceModel | None = None
try:
for message_instance in available_receive_messages:
if message_instance.correlates(
message_instance_send, CustomBpmnScriptEngine()
):
message_instance_receive = message_instance
if message_instance_receive is None:
# Check for a message triggerable process and start that to create a new message_instance_receive
message_triggerable_process_model = (
MessageTriggerableProcessModel.query.filter_by(
message_name=message_instance_send.name
).first()
)
if message_triggerable_process_model:
receiving_process = MessageService.start_process_with_message(
message_triggerable_process_model, message_instance_send
)
message_instance_receive = MessageInstanceModel.query.filter_by(
process_instance_id=receiving_process.id,
message_type="receive",
status="ready",
).first()
else:
receiving_process = (
MessageService.get_process_instance_for_message_instance(
message_instance_receive
)
)
# Assure we can send the message, otherwise keep going.
if (
message_instance_receive is None
or not receiving_process.can_receive_message()
):
message_instance_send.status = "ready"
message_instance_send.status = "ready"
db.session.add(message_instance_send)
db.session.commit()
return None
# Set the receiving message to running, so it is not altered elswhere ...
message_instance_receive.status = "running"
cls.process_message_receive(
receiving_process,
message_instance_receive,
message_instance_send.name,
message_instance_send.payload,
)
message_instance_receive.status = "completed"
message_instance_receive.counterpart_id = message_instance_send.id
db.session.add(message_instance_receive)
message_instance_send.status = "completed"
message_instance_send.counterpart_id = message_instance_receive.id
db.session.add(message_instance_send)
db.session.commit()
return message_instance_receive
except Exception as exception:
db.session.rollback()
message_instance_send.status = "failed"
message_instance_send.failure_cause = str(exception)
db.session.add(message_instance_send)
if message_instance_receive:
message_instance_receive.status = "failed"
message_instance_receive.failure_cause = str(exception)
db.session.add(message_instance_receive)
db.session.commit()
raise exception
@classmethod
def correlate_all_message_instances(cls) -> None:
"""Look at ALL the Send and Receive Messages and attempt to find correlations."""
message_instances_send = MessageInstanceModel.query.filter_by(
message_type="send", status="ready"
).all()
message_instances_receive = MessageInstanceModel.query.filter_by(
message_type="receive", status="ready"
).all()
for message_instance_send in message_instances_send:
# check again in case another background process picked up the message
# while the previous one was running
if message_instance_send.status != "ready":
continue
message_instance_send.status = "running"
db.session.add(message_instance_send)
db.session.commit()
message_instance_receive = None
try:
message_instance_receive = cls.get_message_instance_receive(
message_instance_send, message_instances_receive
)
if message_instance_receive is None:
message_triggerable_process_model = (
MessageTriggerableProcessModel.query.filter_by(
message_model_id=message_instance_send.message_model_id
).first()
)
if message_triggerable_process_model:
process_instance_send = ProcessInstanceModel.query.filter_by(
id=message_instance_send.process_instance_id,
).first()
# TODO: use the correct swimlane user when that is set up
cls.process_message_triggerable_process_model(
message_triggerable_process_model,
message_instance_send.message_model.name,
message_instance_send.payload,
process_instance_send.process_initiator,
)
message_instance_send.status = "completed"
else:
# if we can't get a queued message then put it back in the queue
message_instance_send.status = "ready"
else:
if message_instance_receive.status != "ready":
continue
message_instance_receive.status = "running"
cls.process_message_receive(
message_instance_receive,
message_instance_send.message_model.name,
message_instance_send.payload,
)
message_instance_receive.status = "completed"
db.session.add(message_instance_receive)
message_instance_send.status = "completed"
db.session.add(message_instance_send)
db.session.commit()
except Exception as exception:
db.session.rollback()
message_instance_send.status = "failed"
message_instance_send.failure_cause = str(exception)
db.session.add(message_instance_send)
if message_instance_receive:
message_instance_receive.status = "failed"
message_instance_receive.failure_cause = str(exception)
db.session.add(message_instance_receive)
db.session.commit()
raise exception
cls.correlate_send_message(message_instance_send)
@staticmethod
def process_message_triggerable_process_model(
def start_process_with_message(
message_triggerable_process_model: MessageTriggerableProcessModel,
message_model_name: str,
message_payload: dict,
user: UserModel,
message_instance: MessageInstanceModel,
) -> ProcessInstanceModel:
"""Process_message_triggerable_process_model."""
"""Start up a process instance, so it is ready to catch the event."""
process_instance_receive = ProcessInstanceService.create_process_instance_from_process_model_identifier(
message_triggerable_process_model.process_model_identifier,
user,
message_instance.user,
)
processor_receive = ProcessInstanceProcessor(process_instance_receive)
processor_receive.do_engine_steps(save=False)
processor_receive.bpmn_process_instance.catch_bpmn_message(
message_model_name,
message_payload,
correlations={},
)
processor_receive.do_engine_steps(save=True)
return process_instance_receive
@staticmethod
def process_message_receive(
def get_process_instance_for_message_instance(
message_instance_receive: MessageInstanceModel,
message_model_name: str,
message_payload: dict,
) -> None:
) -> ProcessInstanceModel:
"""Process_message_receive."""
process_instance_receive = ProcessInstanceModel.query.filter_by(
id=message_instance_receive.process_instance_id
).first()
process_instance_receive: ProcessInstanceModel = (
ProcessInstanceModel.query.filter_by(
id=message_instance_receive.process_instance_id
).first()
)
if process_instance_receive is None:
raise MessageServiceError(
(
@ -151,83 +163,21 @@ class MessageService:
),
)
)
return process_instance_receive
@staticmethod
def process_message_receive(
process_instance_receive: ProcessInstanceModel,
message_instance_receive: MessageInstanceModel,
message_model_name: str,
message_payload: dict,
) -> None:
"""process_message_receive."""
processor_receive = ProcessInstanceProcessor(process_instance_receive)
processor_receive.bpmn_process_instance.catch_bpmn_message(
message_model_name,
message_payload,
correlations={},
message_model_name, message_payload
)
processor_receive.do_engine_steps(save=True)
@staticmethod
def get_message_instance_receive(
message_instance_send: MessageInstanceModel,
message_instances_receive: list[MessageInstanceModel],
) -> Optional[MessageInstanceModel]:
"""Get_message_instance_receive."""
message_correlations_send = (
MessageCorrelationModel.query.join(MessageCorrelationMessageInstanceModel)
.filter_by(message_instance_id=message_instance_send.id)
.all()
)
message_correlation_filter = []
for message_correlation_send in message_correlations_send:
message_correlation_filter.append(
and_(
MessageCorrelationModel.name == message_correlation_send.name,
MessageCorrelationModel.value == message_correlation_send.value,
MessageCorrelationModel.message_correlation_property_id
== message_correlation_send.message_correlation_property_id,
)
)
for message_instance_receive in message_instances_receive:
# sqlalchemy supports select / where statements like active record apparantly
# https://docs.sqlalchemy.org/en/14/core/tutorial.html#conjunctions
message_correlation_select = (
select([db.func.count()])
.select_from(MessageCorrelationModel) # type: ignore
.where(
and_(
MessageCorrelationModel.process_instance_id
== message_instance_receive.process_instance_id,
or_(*message_correlation_filter),
)
)
.join(MessageCorrelationMessageInstanceModel) # type: ignore
.filter_by(
message_instance_id=message_instance_receive.id,
)
)
message_correlations_receive = db.session.execute(
message_correlation_select
)
# since the query matches on name, value, and message_instance_receive.id, if the counts
# message correlations found are the same, then this should be the relevant message
if (
message_correlations_receive.scalar() == len(message_correlations_send)
and message_instance_receive.message_model_id
== message_instance_send.message_model_id
):
return message_instance_receive
return None
@staticmethod
def get_process_instance_for_message_instance(
message_instance: MessageInstanceModel,
) -> Any:
"""Get_process_instance_for_message_instance."""
process_instance = ProcessInstanceModel.query.filter_by(
id=message_instance.process_instance_id
).first()
if process_instance is None:
raise MessageServiceError(
f"Process instance cannot be found for message: {message_instance.id}."
f"Tried with id {message_instance.process_instance_id}"
)
return process_instance
message_instance_receive.status = MessageStatuses.completed.value
db.session.add(message_instance_receive)
db.session.commit()

View File

@ -60,15 +60,10 @@ from spiffworkflow_backend.models.file import FileType
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.human_task import HumanTaskModel
from spiffworkflow_backend.models.human_task_user import HumanTaskUserModel
from spiffworkflow_backend.models.message_correlation import MessageCorrelationModel
from spiffworkflow_backend.models.message_correlation_message_instance import (
MessageCorrelationMessageInstanceModel,
)
from spiffworkflow_backend.models.message_correlation_property import (
MessageCorrelationPropertyModel,
)
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_instance import MessageModel
from spiffworkflow_backend.models.message_instance_correlation import (
MessageInstanceCorrelationRuleModel,
)
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
from spiffworkflow_backend.models.process_instance_metadata import (
@ -1306,157 +1301,56 @@ class ProcessInstanceProcessor:
db.session.add(self.process_instance_model)
db.session.commit()
# messages have one correlation key (possibly wrong)
# correlation keys may have many correlation properties
def process_bpmn_messages(self) -> None:
"""Process_bpmn_messages."""
bpmn_messages = self.bpmn_process_instance.get_bpmn_messages()
for bpmn_message in bpmn_messages:
# only message sends are in get_bpmn_messages
message_model = MessageModel.query.filter_by(name=bpmn_message.name).first()
if message_model is None:
raise ApiError(
"invalid_message_name",
f"Invalid message name: {bpmn_message.name}.",
)
if not bpmn_message.correlations:
raise ApiError(
"message_correlations_missing",
(
"Could not find any message correlations bpmn_message:"
f" {bpmn_message.name}"
),
)
message_correlations = []
for (
message_correlation_key,
message_correlation_properties,
) in bpmn_message.correlations.items():
for (
message_correlation_property_identifier,
message_correlation_property_value,
) in message_correlation_properties.items():
message_correlation_property = (
MessageCorrelationPropertyModel.query.filter_by(
identifier=message_correlation_property_identifier,
).first()
)
if message_correlation_property is None:
raise ApiError(
"message_correlations_missing_from_process",
(
"Could not find a known message correlation with"
f" identifier:{message_correlation_property_identifier}"
),
)
message_correlations.append(
{
"message_correlation_property": (
message_correlation_property
),
"name": message_correlation_key,
"value": message_correlation_property_value,
}
)
message_instance = MessageInstanceModel(
process_instance_id=self.process_instance_model.id,
user_id=self.process_instance_model.process_initiator_id, # TODO: use the correct swimlane user when that is set up
message_type="send",
message_model_id=message_model.id,
name=bpmn_message.name,
payload=bpmn_message.payload,
correlation_keys=self.bpmn_process_instance.correlations,
)
db.session.add(message_instance)
db.session.commit()
for message_correlation in message_correlations:
message_correlation = MessageCorrelationModel(
process_instance_id=self.process_instance_model.id,
message_correlation_property_id=message_correlation[
"message_correlation_property"
].id,
name=message_correlation["name"],
value=message_correlation["value"],
)
db.session.add(message_correlation)
db.session.commit()
message_correlation_message_instance = (
MessageCorrelationMessageInstanceModel(
message_instance_id=message_instance.id,
message_correlation_id=message_correlation.id,
)
)
db.session.add(message_correlation_message_instance)
db.session.commit()
def queue_waiting_receive_messages(self) -> None:
"""Queue_waiting_receive_messages."""
waiting_tasks = self.get_all_waiting_tasks()
for waiting_task in waiting_tasks:
# if it's not something that can wait for a message, skip it
if waiting_task.task_spec.__class__.__name__ not in [
"IntermediateCatchEvent",
"ReceiveTask",
]:
continue
waiting_events = self.bpmn_process_instance.waiting_events()
waiting_message_events = filter(
lambda e: e["event_type"] == "Message", waiting_events
)
# timer events are not related to messaging, so ignore them for these purposes
if waiting_task.task_spec.event_definition.__class__.__name__.endswith(
"TimerEventDefinition"
for event in waiting_message_events:
# Ensure we are only creating one message instance for each waiting message
if (
MessageInstanceModel.query.filter_by(
process_instance_id=self.process_instance_model.id,
message_type="receive",
name=event["name"],
).count()
> 0
):
continue
message_model = MessageModel.query.filter_by(
name=waiting_task.task_spec.event_definition.name
).first()
if message_model is None:
raise ApiError(
"invalid_message_name",
(
"Invalid message name:"
f" {waiting_task.task_spec.event_definition.name}."
),
)
# Ensure we are only creating one message instance for each waiting message
message_instance = MessageInstanceModel.query.filter_by(
process_instance_id=self.process_instance_model.id,
message_type="receive",
message_model_id=message_model.id,
).first()
if message_instance:
continue
# Create a new Message Instance
message_instance = MessageInstanceModel(
process_instance_id=self.process_instance_model.id,
user_id=self.process_instance_model.process_initiator_id,
message_type="receive",
message_model_id=message_model.id,
name=event["name"],
correlation_keys=self.bpmn_process_instance.correlations,
)
for correlation_property in event["value"]:
message_correlation = MessageInstanceCorrelationRuleModel(
message_instance_id=message_instance.id,
name=correlation_property.name,
retrieval_expression=correlation_property.retrieval_expression,
)
message_instance.correlation_rules.append(message_correlation)
db.session.add(message_instance)
for (
spiff_correlation_property
) in waiting_task.task_spec.event_definition.correlation_properties:
# NOTE: we may have to cycle through keys here
# not sure yet if it's valid for a property to be associated with multiple keys
correlation_key_name = spiff_correlation_property.correlation_keys[0]
message_correlation = (
MessageCorrelationModel.query.filter_by(
process_instance_id=self.process_instance_model.id,
name=correlation_key_name,
)
.join(MessageCorrelationPropertyModel)
.filter_by(identifier=spiff_correlation_property.name)
.first()
)
message_correlation_message_instance = (
MessageCorrelationMessageInstanceModel(
message_instance_id=message_instance.id,
message_correlation_id=message_correlation.id,
)
)
db.session.add(message_correlation_message_instance)
db.session.commit()
def increment_spiff_step(self) -> None:

View File

@ -8,8 +8,8 @@ import sentry_sdk
from flask import current_app
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from spiffworkflow_backend import db
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.human_task import HumanTaskModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceApi
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
@ -41,13 +41,14 @@ class ProcessInstanceService:
user: UserModel,
) -> ProcessInstanceModel:
"""Get_process_instance_from_spec."""
db.session.commit()
try:
current_git_revision = GitService.get_current_revision()
except GitCommandError:
current_git_revision = ""
process_instance_model = ProcessInstanceModel(
status=ProcessInstanceStatus.not_started.value,
process_initiator=user,
process_initiator_id=user.id,
process_model_identifier=process_model.id,
process_model_display_name=process_model.display_name,
start_in_seconds=round(time.time()),

View File

@ -8,14 +8,13 @@ from typing import Optional
from lxml import etree # type: ignore
from SpiffWorkflow.bpmn.parser.BpmnParser import BpmnValidator # type: ignore
from spiffworkflow_backend.models.correlation_property_cache import (
CorrelationPropertyCache,
)
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.file import File
from spiffworkflow_backend.models.file import FileType
from spiffworkflow_backend.models.file import SpecReference
from spiffworkflow_backend.models.message_correlation_property import (
MessageCorrelationPropertyModel,
)
from spiffworkflow_backend.models.message_model import MessageModel
from spiffworkflow_backend.models.message_triggerable_process_model import (
MessageTriggerableProcessModel,
)
@ -175,8 +174,8 @@ class SpecFileService(FileSystemService):
"""Validate_bpmn_xml."""
file_type = FileSystemService.file_type(file_name)
if file_type.value == FileType.bpmn.value:
validator = BpmnValidator()
parser = MyCustomParser(validator=validator)
BpmnValidator()
parser = MyCustomParser()
try:
parser.add_bpmn_xml(
cls.get_etree_from_xml_bytes(binary_data), filename=file_name
@ -336,39 +335,30 @@ class SpecFileService(FileSystemService):
@staticmethod
def update_message_cache(ref: SpecReference) -> None:
"""Assure we have a record in the database of all possible message ids and names."""
for message_model_identifier in ref.messages.keys():
message_model = MessageModel.query.filter_by(
identifier=message_model_identifier
).first()
if message_model is None:
message_model = MessageModel(
identifier=message_model_identifier,
name=ref.messages[message_model_identifier],
)
db.session.add(message_model)
db.session.commit()
# for message_model_identifier in ref.messages.keys():
# message_model = MessageModel.query.filter_by(
# identifier=message_model_identifier
# ).first()
# if message_model is None:
# message_model = MessageModel(
# identifier=message_model_identifier,
# name=ref.messages[message_model_identifier],
# )
# db.session.add(message_model)
# db.session.commit()
@staticmethod
def update_message_trigger_cache(ref: SpecReference) -> None:
"""Assure we know which messages can trigger the start of a process."""
for message_model_identifier in ref.start_messages:
message_model = MessageModel.query.filter_by(
identifier=message_model_identifier
).first()
if message_model is None:
raise ProcessModelFileInvalidError(
"Could not find message model with identifier"
f" '{message_model_identifier}'Required by a Start Event in :"
f" {ref.file_name}"
)
for message_name in ref.start_messages:
message_triggerable_process_model = (
MessageTriggerableProcessModel.query.filter_by(
message_model_id=message_model.id,
message_name=message_name,
).first()
)
if message_triggerable_process_model is None:
message_triggerable_process_model = MessageTriggerableProcessModel(
message_model_id=message_model.id,
message_name=message_name,
process_model_identifier=ref.process_model_id,
)
db.session.add(message_triggerable_process_model)
@ -386,33 +376,28 @@ class SpecFileService(FileSystemService):
@staticmethod
def update_correlation_cache(ref: SpecReference) -> None:
"""Update_correlation_cache."""
for correlation_identifier in ref.correlations.keys():
correlation_property_retrieval_expressions = ref.correlations[
correlation_identifier
]["retrieval_expressions"]
for name in ref.correlations.keys():
correlation_property_retrieval_expressions = ref.correlations[name][
"retrieval_expressions"
]
for cpre in correlation_property_retrieval_expressions:
message_model_identifier = cpre["messageRef"]
message_model = MessageModel.query.filter_by(
identifier=message_model_identifier
message_name = ref.messages.get(cpre["messageRef"], None)
retrieval_expression = cpre["expression"]
process_model_id = ref.process_model_id
existing = CorrelationPropertyCache.query.filter_by(
name=name,
message_name=message_name,
process_model_id=process_model_id,
retrieval_expression=retrieval_expression,
).first()
if message_model is None:
raise ProcessModelFileInvalidError(
"Could not find message model with identifier"
f" '{message_model_identifier}'specified by correlation"
f" property: {cpre}"
if existing is None:
new_cache = CorrelationPropertyCache(
name=name,
message_name=message_name,
process_model_id=process_model_id,
retrieval_expression=retrieval_expression,
)
# fixme: I think we are currently ignoring the correction properties.
message_correlation_property = (
MessageCorrelationPropertyModel.query.filter_by(
identifier=correlation_identifier,
message_model_id=message_model.id,
).first()
)
if message_correlation_property is None:
message_correlation_property = MessageCorrelationPropertyModel(
identifier=correlation_identifier,
message_model_id=message_model.id,
)
db.session.add(message_correlation_property)
db.session.add(new_cache)
db.session.commit()

View File

@ -0,0 +1,153 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:spiffworkflow="http://spiffworkflow.org/bpmn/schema/1.0/core" id="Definitions_96f6665" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.0.0-dev">
<bpmn:collaboration id="Collaboration_0oye1os">
<bpmn:participant id="message_initiator" name="Message Initiator" processRef="message_send_process" />
<bpmn:participant id="message-receiver" name="Message Receiver" />
<bpmn:messageFlow id="message_send_flow" name="Message Send Flow" sourceRef="send_message" targetRef="message-receiver" />
<bpmn:messageFlow id="message_response_flow" name="Message Response Flow" sourceRef="message-receiver" targetRef="receive_message_response" />
<bpmn:textAnnotation id="TextAnnotation_0oxbpew">
<bpmn:text>The messages sent here are about an Invoice that can be uniquely identified by the customer_id ("sartography") and a purchase order number (1001)
It will fire a message connected to the invoice keys above, starting another process, which can communicate back to this specific process instance using the correct key.</bpmn:text>
</bpmn:textAnnotation>
<bpmn:association id="Association_1d6q7zd" sourceRef="message_initiator" targetRef="TextAnnotation_0oxbpew" />
<bpmn:correlationKey name="invoice">
<bpmn:correlationPropertyRef>po_number</bpmn:correlationPropertyRef>
<bpmn:correlationPropertyRef>customer_id</bpmn:correlationPropertyRef>
</bpmn:correlationKey>
</bpmn:collaboration>
<bpmn:correlationProperty id="po_number" name="Purchase Order Number">
<bpmn:correlationPropertyRetrievalExpression messageRef="request_approval">
<bpmn:formalExpression>po_number</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
<bpmn:correlationPropertyRetrievalExpression messageRef="approval_result">
<bpmn:formalExpression>po_number</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
</bpmn:correlationProperty>
<bpmn:correlationProperty id="customer_id" name="Customer ID">
<bpmn:correlationPropertyRetrievalExpression messageRef="request_approval">
<bpmn:formalExpression>customer_id</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
<bpmn:correlationPropertyRetrievalExpression messageRef="approval_result">
<bpmn:formalExpression>customer_id</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
</bpmn:correlationProperty>
<bpmn:process id="message_send_process" name="Message Send Process" isExecutable="true">
<bpmn:startEvent id="StartEvent_1">
<bpmn:outgoing>Flow_10conab</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_037vpjk" sourceRef="send_message" targetRef="receive_message_response" />
<bpmn:sequenceFlow id="Flow_1qgz6p0" sourceRef="receive_message_response" targetRef="Event_0kndoyu" />
<bpmn:sequenceFlow id="Flow_10conab" sourceRef="StartEvent_1" targetRef="invoice_form" />
<bpmn:endEvent id="Event_0kndoyu">
<bpmn:incoming>Flow_1qgz6p0</bpmn:incoming>
</bpmn:endEvent>
<bpmn:intermediateCatchEvent id="receive_message_response" name="Receive Approval Result">
<bpmn:incoming>Flow_037vpjk</bpmn:incoming>
<bpmn:outgoing>Flow_1qgz6p0</bpmn:outgoing>
<bpmn:messageEventDefinition id="MessageEventDefinition_1l3n0zr" messageRef="approval_result" />
</bpmn:intermediateCatchEvent>
<bpmn:sendTask id="send_message" name="Request Approval" messageRef="request_approval">
<bpmn:extensionElements>
<spiffworkflow:preScript>the_topic = "first_conversation" </spiffworkflow:preScript>
</bpmn:extensionElements>
<bpmn:incoming>Flow_02lw0q9</bpmn:incoming>
<bpmn:outgoing>Flow_037vpjk</bpmn:outgoing>
</bpmn:sendTask>
<bpmn:sequenceFlow id="Flow_02lw0q9" sourceRef="invoice_form" targetRef="send_message" />
<bpmn:userTask id="invoice_form" name="Create Invoice">
<bpmn:extensionElements>
<spiffworkflow:properties>
<spiffworkflow:property name="formJsonSchemaFilename" value="invoice_form.json" />
<spiffworkflow:property name="formUiSchemaFilename" value="invoice_ui.json" />
</spiffworkflow:properties>
</bpmn:extensionElements>
<bpmn:incoming>Flow_10conab</bpmn:incoming>
<bpmn:outgoing>Flow_02lw0q9</bpmn:outgoing>
</bpmn:userTask>
</bpmn:process>
<bpmn:message id="request_approval" name="Request Approval">
<bpmn:extensionElements>
<spiffworkflow:messagePayload>{
"customer_id": customer_id,
"po_number": po_number,
"amount": amount,
"description": description,
}</spiffworkflow:messagePayload>
</bpmn:extensionElements>
</bpmn:message>
<bpmn:message id="approval_result" name="Approval Result">
<bpmn:extensionElements>
<spiffworkflow:messageVariable>the_payload</spiffworkflow:messageVariable>
</bpmn:extensionElements>
</bpmn:message>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Collaboration_0oye1os">
<bpmndi:BPMNShape id="Participant_0bjh770_di" bpmnElement="message_initiator" isHorizontal="true">
<dc:Bounds x="120" y="52" width="600" height="338" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_0kndoyu_di" bpmnElement="Event_0kndoyu">
<dc:Bounds x="622" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_0yt48xb_di" bpmnElement="receive_message_response">
<dc:Bounds x="532" y="159" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="508" y="129" width="86" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0vm33bu_di" bpmnElement="send_message">
<dc:Bounds x="390" y="137" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0798vfz_di" bpmnElement="invoice_form">
<dc:Bounds x="240" y="137" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_037vpjk_di" bpmnElement="Flow_037vpjk">
<di:waypoint x="490" y="177" />
<di:waypoint x="532" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1qgz6p0_di" bpmnElement="Flow_1qgz6p0">
<di:waypoint x="568" y="177" />
<di:waypoint x="622" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_10conab_di" bpmnElement="Flow_10conab">
<di:waypoint x="215" y="177" />
<di:waypoint x="240" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_02lw0q9_di" bpmnElement="Flow_02lw0q9">
<di:waypoint x="340" y="177" />
<di:waypoint x="390" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNShape id="Participant_158b3ei_di" bpmnElement="message-receiver" isHorizontal="true">
<dc:Bounds x="120" y="350" width="600" height="60" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="TextAnnotation_0oxbpew_di" bpmnElement="TextAnnotation_0oxbpew">
<dc:Bounds x="760" y="-30" width="226.98863220214844" height="155.9943084716797" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Association_1d6q7zd_di" bpmnElement="Association_1d6q7zd">
<di:waypoint x="699" y="52" />
<di:waypoint x="760" y="15" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1ueajoz_di" bpmnElement="message_send_flow">
<di:waypoint x="410" y="217" />
<di:waypoint x="410" y="350" />
<bpmndi:BPMNLabel>
<dc:Bounds x="413" y="302" width="74" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1n96n67_di" bpmnElement="message_response_flow">
<di:waypoint x="550" y="350" />
<di:waypoint x="550" y="195" />
<bpmndi:BPMNLabel>
<dc:Bounds x="552" y="294" width="77" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>

View File

@ -1,42 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:spiffworkflow="http://spiffworkflow.org/bpmn/schema/1.0/core" id="Definitions_96f6665" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.0.0-dev">
<bpmn:collaboration id="Collaboration_0oye1os">
<bpmn:participant id="message_receiver" name="Message Receiver" processRef="message_receiver_process" />
<bpmn:participant id="message_receiver" name="Message Receiver (invoice approver)" processRef="message_receiver_process" />
<bpmn:participant id="message_sender" name="Message Sender" />
<bpmn:messageFlow id="message_send_flow" name="Message Send Flow" sourceRef="message_sender" targetRef="receive_message" />
<bpmn:messageFlow id="Flow_0ds946g" sourceRef="send_message_response" targetRef="message_sender" />
<bpmn:correlationKey name="message_correlation_key">
<bpmn:correlationPropertyRef>message_correlation_property_topica</bpmn:correlationPropertyRef>
<bpmn:correlationPropertyRef>message_correlation_property_topicb</bpmn:correlationPropertyRef>
<bpmn:correlationKey name="invoice">
<bpmn:correlationPropertyRef>customer_id</bpmn:correlationPropertyRef>
<bpmn:correlationPropertyRef>po_number</bpmn:correlationPropertyRef>
</bpmn:correlationKey>
</bpmn:collaboration>
<bpmn:correlationProperty id="message_correlation_property_topica" name="Message Correlation Property TopicA">
<bpmn:correlationPropertyRetrievalExpression messageRef="message_send">
<bpmn:messagePath>topica</bpmn:messagePath>
<bpmn:correlationProperty id="customer_id" name="Customer Id">
<bpmn:correlationPropertyRetrievalExpression messageRef="request_approval">
<bpmn:formalExpression>customer_id</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
<bpmn:correlationPropertyRetrievalExpression messageRef="message_response">
<bpmn:messagePath>the_payload.topica</bpmn:messagePath>
<bpmn:correlationPropertyRetrievalExpression messageRef="approval_result">
<bpmn:formalExpression>customer_id</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
</bpmn:correlationProperty>
<bpmn:correlationProperty id="message_correlation_property_topicb" name="Message Correlation Property TopicB">
<bpmn:correlationPropertyRetrievalExpression messageRef="message_send">
<bpmn:messagePath>topicb</bpmn:messagePath>
<bpmn:correlationProperty id="po_number" name="Purchase Order Number">
<bpmn:correlationPropertyRetrievalExpression messageRef="request_approval">
<bpmn:formalExpression>po_number</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
<bpmn:correlationPropertyRetrievalExpression messageRef="message_response">
<bpmn:messagePath>the_payload.topicb</bpmn:messagePath>
<bpmn:correlationPropertyRetrievalExpression messageRef="approval_result">
<bpmn:formalExpression>po_number</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
</bpmn:correlationProperty>
<bpmn:message id="message_send" name="Message Send">
<bpmn:message id="request_approval" name="Request Approval">
<bpmn:extensionElements>
<spiffworkflow:messageVariable>the_payload</spiffworkflow:messageVariable>
<spiffworkflow:messageVariable>invoice</spiffworkflow:messageVariable>
</bpmn:extensionElements>
</bpmn:message>
<bpmn:message id="message_response" name="Message Response">
<bpmn:message id="approval_result" name="Approval Result">
<bpmn:extensionElements>
<spiffworkflow:messagePayload>{ "the_payload": {
"topica": the_payload.topica,
"topicb": the_payload.topicb,
}}</spiffworkflow:messagePayload>
<spiffworkflow:messagePayload>invoice</spiffworkflow:messagePayload>
</bpmn:extensionElements>
</bpmn:message>
<bpmn:process id="message_receiver_process" name="Message Receiver Process" isExecutable="true">
@ -45,28 +42,21 @@
<bpmn:endEvent id="Event_0q5otqd">
<bpmn:incoming>Flow_11r9uiw</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sendTask id="send_message_response" name="Send Message Reponse" messageRef="message_response">
<bpmn:sendTask id="send_message_response" name="Send Message Reponse" messageRef="approval_result">
<bpmn:incoming>Flow_0fruoax</bpmn:incoming>
<bpmn:outgoing>Flow_11r9uiw</bpmn:outgoing>
</bpmn:sendTask>
<bpmn:startEvent id="receive_message" name="Receive Message">
<bpmn:outgoing>Flow_0fruoax</bpmn:outgoing>
<bpmn:messageEventDefinition id="MessageEventDefinition_08u7ksn" messageRef="message_send" />
<bpmn:messageEventDefinition id="MessageEventDefinition_08u7ksn" messageRef="request_approval" />
</bpmn:startEvent>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Collaboration_0oye1os">
<bpmndi:BPMNShape id="Participant_0mr0gg1_di" bpmnElement="message_receiver" isHorizontal="true">
<dc:Bounds x="120" y="350" width="480" height="230" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_11r9uiw_di" bpmnElement="Flow_11r9uiw">
<di:waypoint x="480" y="480" />
<di:waypoint x="512" y="480" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0fruoax_di" bpmnElement="Flow_0fruoax">
<di:waypoint x="208" y="480" />
<di:waypoint x="380" y="480" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNShape id="Event_0q5otqd_di" bpmnElement="Event_0q5otqd">
<dc:Bounds x="512" y="462" width="36" height="36" />
</bpmndi:BPMNShape>
@ -79,6 +69,14 @@
<dc:Bounds x="149" y="505" width="88" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_0fruoax_di" bpmnElement="Flow_0fruoax">
<di:waypoint x="208" y="480" />
<di:waypoint x="380" y="480" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_11r9uiw_di" bpmnElement="Flow_11r9uiw">
<di:waypoint x="480" y="480" />
<di:waypoint x="512" y="480" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNShape id="Participant_0xvqrmk_di" bpmnElement="message_sender" isHorizontal="true">
<dc:Bounds x="130" y="250" width="360" height="60" />
<bpmndi:BPMNLabel />

View File

@ -5,25 +5,31 @@
<bpmn:participant id="message-receiver" name="Message Receiver" />
<bpmn:messageFlow id="message_send_flow" name="Message Send Flow" sourceRef="send_message" targetRef="message-receiver" />
<bpmn:messageFlow id="message_response_flow" name="Message Response Flow" sourceRef="message-receiver" targetRef="receive_message_response" />
<bpmn:correlationKey name="message_correlation_key">
<bpmn:correlationPropertyRef>message_correlation_property_topica</bpmn:correlationPropertyRef>
<bpmn:correlationPropertyRef>message_correlation_property_topicb</bpmn:correlationPropertyRef>
<bpmn:textAnnotation id="TextAnnotation_0oxbpew">
<bpmn:text>The messages sent here are about an Invoice that can be uniquely identified by the customer_id ("sartography") and a purchase order number (1001)
It will fire a message connected to the invoice keys above, starting another process, which can communicate back to this specific process instance using the correct key.</bpmn:text>
</bpmn:textAnnotation>
<bpmn:association id="Association_1d6q7zd" sourceRef="message_initiator" targetRef="TextAnnotation_0oxbpew" />
<bpmn:correlationKey name="invoice">
<bpmn:correlationPropertyRef>po_number</bpmn:correlationPropertyRef>
<bpmn:correlationPropertyRef>customer_id</bpmn:correlationPropertyRef>
</bpmn:correlationKey>
</bpmn:collaboration>
<bpmn:correlationProperty id="message_correlation_property_topica" name="Message Correlation Property TopicA">
<bpmn:correlationPropertyRetrievalExpression messageRef="message_send">
<bpmn:messagePath>topica</bpmn:messagePath>
<bpmn:correlationProperty id="po_number" name="Purchase Order Number">
<bpmn:correlationPropertyRetrievalExpression messageRef="request_approval">
<bpmn:formalExpression>po_number</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
<bpmn:correlationPropertyRetrievalExpression messageRef="message_response">
<bpmn:messagePath>the_payload.topica</bpmn:messagePath>
<bpmn:correlationPropertyRetrievalExpression messageRef="approval_result">
<bpmn:formalExpression>po_number</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
</bpmn:correlationProperty>
<bpmn:correlationProperty id="message_correlation_property_topicb" name="Message Correlation Property TopicB">
<bpmn:correlationPropertyRetrievalExpression messageRef="message_send">
<bpmn:messagePath>topicb</bpmn:messagePath>
<bpmn:correlationProperty id="customer_id" name="Customer ID">
<bpmn:correlationPropertyRetrievalExpression messageRef="request_approval">
<bpmn:formalExpression>customer_id</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
<bpmn:correlationPropertyRetrievalExpression messageRef="message_response">
<bpmn:messagePath>the_payload.topicb</bpmn:messagePath>
<bpmn:correlationPropertyRetrievalExpression messageRef="approval_result">
<bpmn:formalExpression>customer_id</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
</bpmn:correlationProperty>
<bpmn:process id="message_send_process" name="Message Send Process" isExecutable="true">
@ -32,42 +38,45 @@
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_037vpjk" sourceRef="send_message" targetRef="receive_message_response" />
<bpmn:sequenceFlow id="Flow_1qgz6p0" sourceRef="receive_message_response" targetRef="Event_0kndoyu" />
<bpmn:sequenceFlow id="Flow_10conab" sourceRef="StartEvent_1" targetRef="set_topic" />
<bpmn:sequenceFlow id="Flow_10conab" sourceRef="StartEvent_1" targetRef="invoice_form" />
<bpmn:endEvent id="Event_0kndoyu">
<bpmn:incoming>Flow_1qgz6p0</bpmn:incoming>
</bpmn:endEvent>
<bpmn:intermediateCatchEvent id="receive_message_response" name="Receive Message Response">
<bpmn:intermediateCatchEvent id="receive_message_response" name="Receive Approval Result">
<bpmn:incoming>Flow_037vpjk</bpmn:incoming>
<bpmn:outgoing>Flow_1qgz6p0</bpmn:outgoing>
<bpmn:messageEventDefinition id="MessageEventDefinition_1l3n0zr" messageRef="message_response" />
<bpmn:messageEventDefinition id="MessageEventDefinition_1l3n0zr" messageRef="approval_result" />
</bpmn:intermediateCatchEvent>
<bpmn:sendTask id="send_message" name="Send Message" messageRef="message_send">
<bpmn:sendTask id="send_message" name="Request Approval" messageRef="request_approval">
<bpmn:extensionElements>
<spiffworkflow:preScript>the_topic = "first_conversation" </spiffworkflow:preScript>
</bpmn:extensionElements>
<bpmn:incoming>Flow_1ihr88m</bpmn:incoming>
<bpmn:incoming>Flow_02lw0q9</bpmn:incoming>
<bpmn:outgoing>Flow_037vpjk</bpmn:outgoing>
</bpmn:sendTask>
<bpmn:sequenceFlow id="Flow_1ihr88m" sourceRef="set_topic" targetRef="send_message" />
<bpmn:scriptTask id="set_topic" name="Set Topic" scriptFormat="python">
<bpmn:sequenceFlow id="Flow_02lw0q9" sourceRef="invoice_form" targetRef="send_message" />
<bpmn:userTask id="invoice_form" name="Create Invoice">
<bpmn:extensionElements>
<spiffworkflow:properties>
<spiffworkflow:property name="formJsonSchemaFilename" value="invoice_form.json" />
<spiffworkflow:property name="formUiSchemaFilename" value="invoice_ui.json" />
</spiffworkflow:properties>
</bpmn:extensionElements>
<bpmn:incoming>Flow_10conab</bpmn:incoming>
<bpmn:outgoing>Flow_1ihr88m</bpmn:outgoing>
<bpmn:script>
timestamp = time.time()
the_topica = f"first_conversation_a_{timestamp}"
the_topicb = f"first_conversation_b_{timestamp}"
del time</bpmn:script>
</bpmn:scriptTask>
<bpmn:outgoing>Flow_02lw0q9</bpmn:outgoing>
</bpmn:userTask>
</bpmn:process>
<bpmn:message id="message_send" name="Message Send">
<bpmn:message id="request_approval" name="Request Approval">
<bpmn:extensionElements>
<spiffworkflow:messagePayload>{
"topica": the_topica,
"topicb": the_topicb,
"customer_id": customer_id,
"po_number": po_number,
"amount": amount,
"description": description,
}</spiffworkflow:messagePayload>
</bpmn:extensionElements>
</bpmn:message>
<bpmn:message id="message_response" name="Message Response">
<bpmn:message id="approval_result" name="Approval Result">
<bpmn:extensionElements>
<spiffworkflow:messageVariable>the_payload</spiffworkflow:messageVariable>
</bpmn:extensionElements>
@ -78,22 +87,6 @@ del time</bpmn:script>
<dc:Bounds x="120" y="52" width="600" height="338" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_1ihr88m_di" bpmnElement="Flow_1ihr88m">
<di:waypoint x="350" y="177" />
<di:waypoint x="390" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_10conab_di" bpmnElement="Flow_10conab">
<di:waypoint x="215" y="177" />
<di:waypoint x="250" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1qgz6p0_di" bpmnElement="Flow_1qgz6p0">
<di:waypoint x="568" y="177" />
<di:waypoint x="622" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_037vpjk_di" bpmnElement="Flow_037vpjk">
<di:waypoint x="490" y="177" />
<di:waypoint x="532" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
@ -103,20 +96,44 @@ del time</bpmn:script>
<bpmndi:BPMNShape id="Event_0yt48xb_di" bpmnElement="receive_message_response">
<dc:Bounds x="532" y="159" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="507" y="129" width="88" height="27" />
<dc:Bounds x="508" y="129" width="86" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0vm33bu_di" bpmnElement="send_message">
<dc:Bounds x="390" y="137" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1t3nq1h_di" bpmnElement="set_topic">
<dc:Bounds x="250" y="137" width="100" height="80" />
<bpmndi:BPMNShape id="Activity_0798vfz_di" bpmnElement="invoice_form">
<dc:Bounds x="240" y="137" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_037vpjk_di" bpmnElement="Flow_037vpjk">
<di:waypoint x="490" y="177" />
<di:waypoint x="532" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1qgz6p0_di" bpmnElement="Flow_1qgz6p0">
<di:waypoint x="568" y="177" />
<di:waypoint x="622" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_10conab_di" bpmnElement="Flow_10conab">
<di:waypoint x="215" y="177" />
<di:waypoint x="240" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_02lw0q9_di" bpmnElement="Flow_02lw0q9">
<di:waypoint x="340" y="177" />
<di:waypoint x="390" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNShape id="Participant_158b3ei_di" bpmnElement="message-receiver" isHorizontal="true">
<dc:Bounds x="120" y="350" width="600" height="60" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="TextAnnotation_0oxbpew_di" bpmnElement="TextAnnotation_0oxbpew">
<dc:Bounds x="760" y="-30" width="226.98863220214844" height="155.9943084716797" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Association_1d6q7zd_di" bpmnElement="Association_1d6q7zd">
<di:waypoint x="699" y="52" />
<di:waypoint x="760" y="15" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1ueajoz_di" bpmnElement="message_send_flow">
<di:waypoint x="410" y="217" />
<di:waypoint x="410" y="350" />
@ -128,7 +145,7 @@ del time</bpmn:script>
<di:waypoint x="550" y="350" />
<di:waypoint x="550" y="195" />
<bpmndi:BPMNLabel>
<dc:Bounds x="552" y="294" width="76" height="27" />
<dc:Bounds x="552" y="294" width="77" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>

View File

@ -12,18 +12,18 @@
</bpmn:collaboration>
<bpmn:correlationProperty id="mcp_topica_two" name="MCP TopicA Two">
<bpmn:correlationPropertyRetrievalExpression messageRef="message_send_two">
<bpmn:messagePath>topica_two</bpmn:messagePath>
<bpmn:formalExpression>topica_two</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
<bpmn:correlationPropertyRetrievalExpression messageRef="message_response_two">
<bpmn:messagePath>topica_two</bpmn:messagePath>
<bpmn:formalExpression>topic_two_a</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
</bpmn:correlationProperty>
<bpmn:correlationProperty id="mcp_topicb_two" name="MCP TopicB_two">
<bpmn:correlationPropertyRetrievalExpression messageRef="message_send_two">
<bpmn:messagePath>topicb_two</bpmn:messagePath>
<bpmn:formalExpression>topicb_two</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
<bpmn:correlationPropertyRetrievalExpression messageRef="message_response_two">
<bpmn:messagePath>topicb_two</bpmn:messagePath>
<bpmn:formalExpression>topic_two_b</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
</bpmn:correlationProperty>
<bpmn:message id="message_send_two" name="Message Send Two">
@ -34,8 +34,8 @@
<bpmn:message id="message_response_two" name="Message Response Two">
<bpmn:extensionElements>
<spiffworkflow:messagePayload>{
"topica_two": payload_var_two.topica_two,
"topicb_two": payload_var_two.topicb_two,
"topic_two_a": payload_var_two.topica_two,
"topic_two_b": payload_var_two.topicb_two,
"second_var_two": second_var_two
}</spiffworkflow:messagePayload>
</bpmn:extensionElements>

View File

@ -19,18 +19,18 @@
</bpmn:collaboration>
<bpmn:correlationProperty id="mcp_topica_one" name="MCP TopicA One">
<bpmn:correlationPropertyRetrievalExpression messageRef="message_send_one">
<bpmn:messagePath>topica_one</bpmn:messagePath>
<bpmn:formalExpression>topica_one</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
<bpmn:correlationPropertyRetrievalExpression messageRef="message_response_one">
<bpmn:messagePath>payload_var_one.topica_one</bpmn:messagePath>
<bpmn:formalExpression>topica_one</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
</bpmn:correlationProperty>
<bpmn:correlationProperty id="mcp_topicb_one" name="MCP TopicB_one">
<bpmn:correlationPropertyRetrievalExpression messageRef="message_send_one">
<bpmn:messagePath>topicb_one</bpmn:messagePath>
<bpmn:formalExpression>topicb_one</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
<bpmn:correlationPropertyRetrievalExpression messageRef="message_response_one">
<bpmn:messagePath>payload_var_one.topicb</bpmn:messagePath>
<bpmn:formalExpression>topicb_one</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
</bpmn:correlationProperty>
<bpmn:process id="message_send_process" name="Message Send Process" isExecutable="true">
@ -117,18 +117,18 @@ del time</bpmn:script>
</bpmn:message>
<bpmn:correlationProperty id="mcp_topica_two" name="MCP Topica Two">
<bpmn:correlationPropertyRetrievalExpression messageRef="message_send_two">
<bpmn:messagePath>topica_two</bpmn:messagePath>
<bpmn:formalExpression>topica_two</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
<bpmn:correlationPropertyRetrievalExpression messageRef="message_response_two">
<bpmn:messagePath>topica_two</bpmn:messagePath>
<bpmn:formalExpression>topic_two_a</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
</bpmn:correlationProperty>
<bpmn:correlationProperty id="mcp_topicb_two" name="MCP Topicb Two">
<bpmn:correlationPropertyRetrievalExpression messageRef="message_send_two">
<bpmn:messagePath>topicb_two</bpmn:messagePath>
<bpmn:formalExpression>topicb_two</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
<bpmn:correlationPropertyRetrievalExpression messageRef="message_response_two">
<bpmn:messagePath>topicb_two</bpmn:messagePath>
<bpmn:formalExpression>topic_two_b</bpmn:formalExpression>
</bpmn:correlationPropertyRetrievalExpression>
</bpmn:correlationProperty>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">

View File

@ -1347,11 +1347,12 @@ class TestProcessApi(BaseTest):
bpmn_file_location=bpmn_file_location,
)
message_model_identifier = "message_send"
message_model_identifier = "Request Approval"
payload = {
"topica": "the_topica_string",
"topicb": "the_topicb_string",
"andThis": "another_item_non_key",
"customer_id": "sartography",
"po_number": "1001",
"amount": "One Billion Dollars! Mwhahahahahaha",
"description": "But seriously.",
}
response = client.post(
f"/v1.0/messages/{message_model_identifier}",
@ -1372,7 +1373,7 @@ class TestProcessApi(BaseTest):
processor = ProcessInstanceProcessor(process_instance)
process_instance_data = processor.get_data()
assert process_instance_data
assert process_instance_data["the_payload"] == payload
assert process_instance_data["invoice"] == payload
def test_message_send_when_providing_message_to_running_process_instance(
self,
@ -1395,13 +1396,12 @@ class TestProcessApi(BaseTest):
bpmn_file_location=bpmn_file_location,
)
message_model_identifier = "message_response"
message_model_identifier = "Approval Result"
payload = {
"the_payload": {
"topica": "the_payload.topica_string",
"topicb": "the_payload.topicb_string",
"andThis": "another_item_non_key",
}
"customer_id": "sartography",
"po_number": "1001",
"amount": "One Billion Dollars! Mwhahahahahaha",
"description": "Ya!, a-ok bud!",
}
response = self.create_process_instance_from_process_model_id_with_api(
client,
@ -1415,9 +1415,25 @@ class TestProcessApi(BaseTest):
f"/v1.0/process-instances/{self.modify_process_identifier_for_path_param(process_model_identifier)}/{process_instance_id}/run",
headers=self.logged_in_headers(with_super_admin_user),
)
assert response.json is not None
process_instance = ProcessInstanceModel.query.filter_by(
id=process_instance_id
).first()
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
task = processor.get_all_user_tasks()[0]
human_task = process_instance.active_human_tasks[0]
ProcessInstanceService.complete_form_task(
processor,
task,
payload,
with_super_admin_user,
human_task,
)
processor.save()
response = client.post(
f"/v1.0/messages/{message_model_identifier}",
content_type="application/json",
@ -1462,14 +1478,14 @@ class TestProcessApi(BaseTest):
bpmn_file_location=bpmn_file_location,
)
message_model_identifier = "message_response"
message_model_identifier = "Approval Result"
payload = {
"the_payload": {
"topica": "the_payload.topica_string",
"topicb": "the_payload.topicb_string",
"andThis": "another_item_non_key",
}
"customer_id": "sartography",
"po_number": "1001",
"amount": "One Billion Dollars! Mwhahahahahaha",
"description": "But seriously.",
}
response = self.create_process_instance_from_process_model_id_with_api(
client,
process_model_identifier,
@ -1478,20 +1494,25 @@ class TestProcessApi(BaseTest):
assert response.json is not None
process_instance_id = response.json["id"]
response = client.post(
f"/v1.0/process-instances/{self.modify_process_identifier_for_path_param(process_model_identifier)}/{process_instance_id}/run",
headers=self.logged_in_headers(with_super_admin_user),
)
assert response.status_code == 200
assert response.json is not None
process_instance = ProcessInstanceModel.query.filter_by(
id=process_instance_id
).first()
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
task = processor.get_all_user_tasks()[0]
human_task = process_instance.active_human_tasks[0]
ProcessInstanceService.complete_form_task(
processor,
task,
payload,
with_super_admin_user,
human_task,
)
processor.save()
processor.suspend()
payload["description"] = "Message To Suspended"
response = client.post(
f"/v1.0/messages/{message_model_identifier}",
content_type="application/json",
@ -1502,16 +1523,15 @@ class TestProcessApi(BaseTest):
)
assert response.status_code == 400
assert response.json
assert response.json["error_code"] == "process_instance_is_suspended"
assert response.json["error_code"] == "message_not_accepted"
processor.resume()
payload["description"] = "Message To Resumed"
response = client.post(
f"/v1.0/messages/{message_model_identifier}",
content_type="application/json",
headers=self.logged_in_headers(with_super_admin_user),
data=json.dumps(
{"payload": payload, "process_instance_id": process_instance_id}
),
data=json.dumps({"payload": payload}),
)
assert response.status_code == 200
json_data = response.json
@ -1538,7 +1558,7 @@ class TestProcessApi(BaseTest):
)
assert response.status_code == 400
assert response.json
assert response.json["error_code"] == "process_instance_is_terminated"
assert response.json["error_code"] == "message_not_accepted"
def test_process_instance_can_be_terminated(
self,
@ -2293,11 +2313,12 @@ class TestProcessApi(BaseTest):
# process_model_source_directory="message_send_one_conversation",
# bpmn_file_name="message_receiver",
# )
message_model_identifier = "message_send"
message_model_identifier = "Request Approval"
payload = {
"topica": "the_topica_string",
"topicb": "the_topicb_string",
"andThis": "another_item_non_key",
"customer_id": "sartography",
"po_number": "1001",
"amount": "One Billion Dollars! Mwhahahahahaha",
"description": "But seriously.",
}
response = client.post(
f"/v1.0/messages/{message_model_identifier}",
@ -2309,6 +2330,7 @@ class TestProcessApi(BaseTest):
assert response.json is not None
process_instance_id_one = response.json["id"]
payload["po_number"] = "1002"
response = client.post(
f"/v1.0/messages/{message_model_identifier}",
content_type="application/json",
@ -2325,7 +2347,9 @@ class TestProcessApi(BaseTest):
)
assert response.status_code == 200
assert response.json is not None
assert len(response.json["results"]) == 1
assert (
len(response.json["results"]) == 2
) # Two messages, one is the completed receive, the other is new send
assert (
response.json["results"][0]["process_instance_id"]
== process_instance_id_one
@ -2337,7 +2361,7 @@ class TestProcessApi(BaseTest):
)
assert response.status_code == 200
assert response.json is not None
assert len(response.json["results"]) == 1
assert len(response.json["results"]) == 2
assert (
response.json["results"][0]["process_instance_id"]
== process_instance_id_two
@ -2349,7 +2373,9 @@ class TestProcessApi(BaseTest):
)
assert response.status_code == 200
assert response.json is not None
assert len(response.json["results"]) == 2
# 4 -Two messages for each process (a record of the completed receive, and then a send created)
# + 2 -Two messages logged for the API Calls used to create the processes.
assert len(response.json["results"]) == 6
@pytest.mark.skipif(
os.environ.get("SPIFFWORKFLOW_BACKEND_DATABASE_TYPE") == "postgres",

View File

@ -6,7 +6,6 @@ from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_model import MessageModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_model_service import ProcessModelService
@ -38,8 +37,7 @@ class TestMessageInstance(BaseTest):
with_super_admin_user: UserModel,
) -> None:
"""Test_can_create_message_instance."""
message_model_identifier = "message_model_one"
message_model = self.create_message_model(message_model_identifier)
message_name = "Message Model One"
process_model_identifier = self.setup_message_tests(
client, with_super_admin_user
)
@ -53,8 +51,10 @@ class TestMessageInstance(BaseTest):
queued_message = MessageInstanceModel(
process_instance_id=process_instance.id,
user_id=process_instance.process_initiator_id,
message_type="send",
message_model_id=message_model.id,
name=message_name,
payload={"Word": "Eat At Mashita's, its delicious!"},
)
db.session.add(queued_message)
db.session.commit()
@ -75,12 +75,10 @@ class TestMessageInstance(BaseTest):
with_super_admin_user: UserModel,
) -> None:
"""Test_cannot_set_invalid_status."""
message_model_identifier = "message_model_one"
message_model = self.create_message_model(message_model_identifier)
message_name = "message_model_one"
process_model_identifier = self.setup_message_tests(
client, with_super_admin_user
)
process_model = ProcessModelService.get_process_model(
process_model_id=process_model_identifier
)
@ -91,8 +89,9 @@ class TestMessageInstance(BaseTest):
with pytest.raises(ValueError) as exception:
MessageInstanceModel(
process_instance_id=process_instance.id,
user_id=process_instance.process_initiator_id,
message_type="send",
message_model_id=message_model.id,
name=message_name,
status="BAD_STATUS",
)
assert (
@ -101,8 +100,9 @@ class TestMessageInstance(BaseTest):
queued_message = MessageInstanceModel(
process_instance_id=process_instance.id,
user_id=process_instance.process_initiator_id,
message_type="send",
message_model_id=message_model.id,
name=message_name,
)
db.session.add(queued_message)
db.session.commit()
@ -121,8 +121,7 @@ class TestMessageInstance(BaseTest):
with_super_admin_user: UserModel,
) -> None:
"""Test_cannot_set_invalid_message_type."""
message_model_identifier = "message_model_one"
message_model = self.create_message_model(message_model_identifier)
message_name = "message_model_one"
process_model_identifier = self.setup_message_tests(
client, with_super_admin_user
)
@ -137,8 +136,9 @@ class TestMessageInstance(BaseTest):
with pytest.raises(ValueError) as exception:
MessageInstanceModel(
process_instance_id=process_instance.id,
user_id=process_instance.process_initiator_id,
message_type="BAD_MESSAGE_TYPE",
message_model_id=message_model.id,
name=message_name,
)
assert (
str(exception.value)
@ -147,8 +147,9 @@ class TestMessageInstance(BaseTest):
queued_message = MessageInstanceModel(
process_instance_id=process_instance.id,
user_id=process_instance.process_initiator_id,
message_type="send",
message_model_id=message_model.id,
name=message_name,
)
db.session.add(queued_message)
db.session.commit()
@ -168,8 +169,7 @@ class TestMessageInstance(BaseTest):
with_super_admin_user: UserModel,
) -> None:
"""Test_force_failure_cause_if_status_is_failure."""
message_model_identifier = "message_model_one"
message_model = self.create_message_model(message_model_identifier)
message_name = "message_model_one"
process_model_identifier = self.setup_message_tests(
client, with_super_admin_user
)
@ -183,8 +183,9 @@ class TestMessageInstance(BaseTest):
queued_message = MessageInstanceModel(
process_instance_id=process_instance.id,
user_id=process_instance.process_initiator_id,
message_type="send",
message_model_id=message_model.id,
name=message_name,
status="failed",
)
db.session.add(queued_message)
@ -199,8 +200,9 @@ class TestMessageInstance(BaseTest):
queued_message = MessageInstanceModel(
process_instance_id=process_instance.id,
user_id=process_instance.process_initiator_id,
message_type="send",
message_model_id=message_model.id,
name=message_name,
)
db.session.add(queued_message)
db.session.commit()
@ -211,11 +213,3 @@ class TestMessageInstance(BaseTest):
db.session.commit()
assert queued_message.id is not None
assert queued_message.failure_cause == "THIS TEST FAILURE"
@staticmethod
def create_message_model(message_model_identifier: str) -> MessageModel:
"""Create_message_model."""
message_model = MessageModel(identifier=message_model_identifier)
db.session.add(message_model)
db.session.commit()
return message_model

View File

@ -1,16 +1,15 @@
"""Test_message_service."""
import pytest
from flask import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.message_correlation import MessageCorrelationModel
from spiffworkflow_backend.models.message_correlation_message_instance import (
MessageCorrelationMessageInstanceModel,
)
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.routes.messages_controller import message_send
from spiffworkflow_backend.services.message_service import MessageService
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
@ -23,115 +22,209 @@ from spiffworkflow_backend.services.process_instance_service import (
class TestMessageService(BaseTest):
"""TestMessageService."""
def test_can_send_message_to_waiting_message(
def test_message_from_api_into_running_process(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_can_send_message_to_waiting_message."""
process_group_id = "test_group"
"""Test sending a message to a running process via the API.
This example workflow will send a message called 'request_approval' and then wait for a response message
of 'approval_result'. This test assures that it will fire the message with the correct correlation properties
and will respond only to a message called 'approval_result' that has the matching correlation properties,
as sent by an API Call.
"""
self.payload = {
"customer_id": "Sartography",
"po_number": 1001,
"description": "We built a new feature for messages!",
"amount": "100.00",
}
self.start_sender_process(client, with_super_admin_user, "test_from_api")
self.assure_a_message_was_sent()
self.assure_there_is_a_process_waiting_on_a_message()
# Make an API call to the service endpoint, but use the wrong po number
with pytest.raises(ApiError):
message_send("Approval Result", {"payload": {"po_number": 5001}})
# Should return an error when making an API call for right po number, wrong client
with pytest.raises(ApiError):
message_send(
"Approval Result",
{"payload": {"po_number": 1001, "customer_id": "jon"}},
)
# No error when calling with the correct parameters
message_send(
"Approval Result",
{"payload": {"po_number": 1001, "customer_id": "Sartography"}},
)
# There is no longer a waiting message
waiting_messages = (
MessageInstanceModel.query.filter_by(message_type="receive")
.filter_by(status="ready")
.filter_by(process_instance_id=self.process_instance.id)
.all()
)
assert len(waiting_messages) == 0
# The process has completed
assert self.process_instance.status == "complete"
def test_single_conversation_between_two_processes(
self,
app: Flask,
client: FlaskClient,
with_super_admin_user: UserModel,
) -> None:
"""Test messages between two different running processes using a single conversation.
Assure that communication between two processes works the same as making a call through the API, here
we have two process instances that are communicating with each other using one conversation about an
Invoice whose details are defined in the following message payload
"""
self.payload = {
"customer_id": "Sartography",
"po_number": 1001,
"description": "We built a new feature for messages!",
"amount": "100.00",
}
# Load up the definition for the receiving process (it has a message start event that should cause it to
# fire when a unique message comes through.
# Fire up the first process
load_test_spec(
"test_group/message_receive",
process_model_source_directory="message_send_one_conversation",
bpmn_file_name="message_receiver.bpmn",
)
# Now start the main process
self.start_sender_process(
client, with_super_admin_user, "test_between_processes"
)
self.assure_a_message_was_sent()
# This is typically called in a background cron process, so we will manually call it
# here in the tests
# The first time it is called, it will instantiate a new instance of the message_recieve process
MessageService.correlate_all_message_instances()
# The sender process should still be waiting on a message to be returned to it ...
self.assure_there_is_a_process_waiting_on_a_message()
# The second time we call ths process_message_isntances (again it would typically be running on cron)
# it will deliver the message that was sent from the receiver back to the original sender.
MessageService.correlate_all_message_instances()
# But there should be no send message waiting for delivery, because
# the message receiving process should pick it up instantly via
# it's start event.
waiting_messages = (
MessageInstanceModel.query.filter_by(message_type="receive")
.filter_by(status="ready")
.filter_by(process_instance_id=self.process_instance.id)
.order_by(MessageInstanceModel.id)
.all()
)
assert len(waiting_messages) == 0
MessageService.correlate_all_message_instances()
MessageService.correlate_all_message_instances()
MessageService.correlate_all_message_instances()
assert len(waiting_messages) == 0
# The message sender process is complete
assert self.process_instance.status == "complete"
# The message receiver process is also complete
message_receiver_process = (
ProcessInstanceModel.query.filter_by(
process_model_identifier="test_group/message_receive"
)
.order_by(ProcessInstanceModel.id)
.first()
)
assert message_receiver_process.status == "complete"
def start_sender_process(
self,
client: FlaskClient,
with_super_admin_user: UserModel,
group_name: str = "test_group",
) -> None:
process_group_id = group_name
self.create_process_group(
client, with_super_admin_user, process_group_id, process_group_id
)
load_test_spec(
"test_group/message_receiver",
process_model = load_test_spec(
"test_group/message",
process_model_source_directory="message_send_one_conversation",
bpmn_file_name="message_receiver.bpmn",
)
process_model_sender = load_test_spec(
"test_group/message_sender",
process_model_source_directory="message_send_one_conversation",
bpmn_file_name="message_sender.bpmn",
bpmn_file_name="message_sender.bpmn", # Slightly misnamed, it sends and receives
)
process_instance_sender = ProcessInstanceService.create_process_instance_from_process_model_identifier(
process_model_sender.id,
self.process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier(
process_model.id,
with_super_admin_user,
)
processor_send_receive = ProcessInstanceProcessor(self.process_instance)
processor_send_receive.do_engine_steps(save=True)
task = processor_send_receive.get_all_user_tasks()[0]
human_task = self.process_instance.active_human_tasks[0]
processor_sender = ProcessInstanceProcessor(process_instance_sender)
processor_sender.do_engine_steps()
processor_sender.save()
ProcessInstanceService.complete_form_task(
processor_send_receive,
task,
self.payload,
with_super_admin_user,
human_task,
)
processor_send_receive.save()
message_instance_result = MessageInstanceModel.query.order_by(
MessageInstanceModel.id
).all()
assert len(message_instance_result) == 2
# ensure both message instances are for the same process instance
# it will be send_message and receive_message_response
def assure_a_message_was_sent(self) -> None:
# There should be one new send message for the given process instance.
send_messages = (
MessageInstanceModel.query.filter_by(message_type="send")
.filter_by(process_instance_id=self.process_instance.id)
.order_by(MessageInstanceModel.id)
.all()
)
assert len(send_messages) == 1
send_message = send_messages[0]
assert (
message_instance_result[0].process_instance_id
== message_instance_result[1].process_instance_id
send_message.payload == self.payload
), "The send message should match up with the payload"
assert send_message.name == "Request Approval"
assert send_message.status == "ready"
def assure_there_is_a_process_waiting_on_a_message(self) -> None:
# There should be one new send message for the given process instance.
waiting_messages = (
MessageInstanceModel.query.filter_by(message_type="receive")
.filter_by(status="ready")
.filter_by(process_instance_id=self.process_instance.id)
.order_by(MessageInstanceModel.id)
.all()
)
assert len(waiting_messages) == 1
waiting_message = waiting_messages[0]
self.assure_correlation_properties_are_right(waiting_message)
message_instance_sender = message_instance_result[0]
assert message_instance_sender.process_instance_id == process_instance_sender.id
message_correlations = MessageCorrelationModel.query.order_by(
MessageCorrelationModel.id
).all()
assert len(message_correlations) == 2
assert message_correlations[0].process_instance_id == process_instance_sender.id
message_correlations_message_instances = (
MessageCorrelationMessageInstanceModel.query.order_by(
MessageCorrelationMessageInstanceModel.id
).all()
def assure_correlation_properties_are_right(
self, message: MessageInstanceModel
) -> None:
# Correlation Properties should match up
po_curr = next(c for c in message.correlation_rules if c.name == "po_number")
customer_curr = next(
c for c in message.correlation_rules if c.name == "customer_id"
)
assert len(message_correlations_message_instances) == 4
assert (
message_correlations_message_instances[0].message_instance_id
== message_instance_sender.id
)
assert (
message_correlations_message_instances[1].message_instance_id
== message_instance_sender.id
)
assert (
message_correlations_message_instances[2].message_instance_id
== message_instance_result[1].id
)
assert (
message_correlations_message_instances[3].message_instance_id
== message_instance_result[1].id
)
# process first message
MessageService.process_message_instances()
assert message_instance_sender.status == "completed"
process_instance_result = ProcessInstanceModel.query.order_by(
ProcessInstanceModel.id
).all()
assert len(process_instance_result) == 2
process_instance_receiver = process_instance_result[1]
# just make sure it's a different process instance
assert process_instance_receiver.id != process_instance_sender.id
assert process_instance_receiver.status == "complete"
message_instance_result = MessageInstanceModel.query.order_by(
MessageInstanceModel.id
).all()
assert len(message_instance_result) == 3
message_instance_receiver = message_instance_result[1]
assert message_instance_receiver.id != message_instance_sender.id
assert message_instance_receiver.status == "ready"
# process second message
MessageService.process_message_instances()
message_instance_result = MessageInstanceModel.query.all()
assert len(message_instance_result) == 3
for message_instance in message_instance_result:
assert message_instance.status == "completed"
process_instance_result = ProcessInstanceModel.query.all()
assert len(process_instance_result) == 2
for process_instance in process_instance_result:
assert process_instance.status == "complete"
assert po_curr is not None
assert customer_curr is not None
def test_can_send_message_to_multiple_process_models(
self,
@ -141,7 +234,7 @@ class TestMessageService(BaseTest):
with_super_admin_user: UserModel,
) -> None:
"""Test_can_send_message_to_multiple_process_models."""
process_group_id = "test_group"
process_group_id = "test_group_multi"
self.create_process_group(
client, with_super_admin_user, process_group_id, process_group_id
)
@ -165,64 +258,55 @@ class TestMessageService(BaseTest):
user = self.find_or_create_user()
process_instance_sender = ProcessInstanceService.create_process_instance_from_process_model_identifier(
process_model_sender.id,
user,
# process_group_identifier=process_model_sender.process_group_id,
process_model_sender.id, user
)
processor_sender = ProcessInstanceProcessor(process_instance_sender)
processor_sender.do_engine_steps()
processor_sender.save()
message_instance_result = MessageInstanceModel.query.all()
assert len(message_instance_result) == 3
# ensure both message instances are for the same process instance
# it will be send_message and receive_message_response
# At this point, the message_sender process has fired two different messages but those
# processes have not started, and it is now paused, waiting for to receive a message. so
# we should have two sends and a receive.
assert (
message_instance_result[0].process_instance_id
== message_instance_result[1].process_instance_id
MessageInstanceModel.query.filter_by(
process_instance_id=process_instance_sender.id
).count()
== 3
)
assert (
MessageInstanceModel.query.count() == 3
) # all messages are related to the instance
orig_send_messages = MessageInstanceModel.query.filter_by(
message_type="send"
).all()
assert len(orig_send_messages) == 2
assert MessageInstanceModel.query.filter_by(message_type="receive").count() == 1
message_instance_sender = message_instance_result[0]
assert message_instance_sender.process_instance_id == process_instance_sender.id
message_correlations = MessageCorrelationModel.query.all()
assert len(message_correlations) == 4
assert message_correlations[0].process_instance_id == process_instance_sender.id
message_correlations_message_instances = (
MessageCorrelationMessageInstanceModel.query.all()
)
assert len(message_correlations_message_instances) == 6
assert (
message_correlations_message_instances[0].message_instance_id
== message_instance_sender.id
)
assert (
message_correlations_message_instances[1].message_instance_id
== message_instance_sender.id
)
assert (
message_correlations_message_instances[2].message_instance_id
== message_instance_result[1].id
)
assert (
message_correlations_message_instances[3].message_instance_id
== message_instance_result[1].id
)
# process first message
MessageService.process_message_instances()
assert message_instance_sender.status == "completed"
# process message instances
MessageService.correlate_all_message_instances()
# Once complete the original send messages should be completed and two new instances
# should now exist, one for each of the process instances ...
# for osm in orig_send_messages:
# assert osm.status == "completed"
process_instance_result = ProcessInstanceModel.query.all()
assert len(process_instance_result) == 3
process_instance_receiver_one = ProcessInstanceModel.query.filter_by(
process_model_identifier="test_group/message_receiver_one"
).first()
process_instance_receiver_one = (
ProcessInstanceModel.query.filter_by(
process_model_identifier="test_group/message_receiver_one"
)
.order_by(ProcessInstanceModel.id)
.first()
)
assert process_instance_receiver_one is not None
process_instance_receiver_two = ProcessInstanceModel.query.filter_by(
process_model_identifier="test_group/message_receiver_two"
).first()
process_instance_receiver_two = (
ProcessInstanceModel.query.filter_by(
process_model_identifier="test_group/message_receiver_two"
)
.order_by(ProcessInstanceModel.id)
.first()
)
assert process_instance_receiver_two is not None
# just make sure it's a different process instance
@ -239,8 +323,12 @@ class TestMessageService(BaseTest):
assert process_instance_receiver_two.id != process_instance_sender.id
assert process_instance_receiver_two.status == "complete"
message_instance_result = MessageInstanceModel.query.all()
assert len(message_instance_result) == 5
message_instance_result = (
MessageInstanceModel.query.order_by(MessageInstanceModel.id)
.order_by(MessageInstanceModel.id)
.all()
)
assert len(message_instance_result) == 7
message_instance_receiver_one = [
x
@ -254,21 +342,25 @@ class TestMessageService(BaseTest):
][0]
assert message_instance_receiver_one is not None
assert message_instance_receiver_two is not None
assert message_instance_receiver_one.id != message_instance_sender.id
assert message_instance_receiver_one.status == "ready"
assert message_instance_receiver_two.id != message_instance_sender.id
assert message_instance_receiver_two.status == "ready"
# process second message
MessageService.process_message_instances()
MessageService.process_message_instances()
# Cause a currelation event
MessageService.correlate_all_message_instances()
# We have to run it a second time because instances are firing
# more messages that need to be picked up.
MessageService.correlate_all_message_instances()
message_instance_result = MessageInstanceModel.query.all()
assert len(message_instance_result) == 6
message_instance_result = (
MessageInstanceModel.query.order_by(MessageInstanceModel.id)
.order_by(MessageInstanceModel.id)
.all()
)
assert len(message_instance_result) == 8
for message_instance in message_instance_result:
assert message_instance.status == "completed"
process_instance_result = ProcessInstanceModel.query.all()
process_instance_result = ProcessInstanceModel.query.order_by(
ProcessInstanceModel.id
).all()
assert len(process_instance_result) == 3
for process_instance in process_instance_result:
assert process_instance.status == "complete"

View File

@ -8065,7 +8065,7 @@
},
"node_modules/bpmn-js-spiffworkflow": {
"version": "0.0.8",
"resolved": "git+ssh://git@github.com/sartography/bpmn-js-spiffworkflow.git#aca23dc56e5d37aa1ed0a3cf11acb55f76a36da7",
"resolved": "git+ssh://git@github.com/sartography/bpmn-js-spiffworkflow.git#f1f008e3e39be43b016718fca6a38b248ab4ecf7",
"license": "MIT",
"dependencies": {
"inherits": "^2.0.4",
@ -38214,7 +38214,7 @@
}
},
"bpmn-js-spiffworkflow": {
"version": "git+ssh://git@github.com/sartography/bpmn-js-spiffworkflow.git#aca23dc56e5d37aa1ed0a3cf11acb55f76a36da7",
"version": "git+ssh://git@github.com/sartography/bpmn-js-spiffworkflow.git#f1f008e3e39be43b016718fca6a38b248ab4ecf7",
"from": "bpmn-js-spiffworkflow@sartography/bpmn-js-spiffworkflow#main",
"requires": {
"inherits": "^2.0.4",

View File

@ -112,12 +112,13 @@ export interface MessageInstance {
process_model_identifier: string;
process_model_display_name: string;
process_instance_id: number;
message_identifier: string;
name: string;
message_type: string;
failure_cause: string;
status: string;
created_at_in_seconds: number;
message_correlations?: MessageCorrelations;
correlation_keys: any;
}
export interface ReportFilter {

View File

@ -64,17 +64,13 @@ export default function MessageInstanceList() {
open={!!messageInstanceForModal}
passiveModal
onRequestClose={handleCorrelationDisplayClose}
modalHeading={`Message ${messageInstanceForModal.id} (${messageInstanceForModal.message_identifier}) ${messageInstanceForModal.message_type} data:`}
modalHeading={`Message ${messageInstanceForModal.id} (${messageInstanceForModal.name}) ${messageInstanceForModal.message_type} data:`}
modalLabel="Details"
>
{failureCausePre}
<p>Correlations:</p>
<pre>
{JSON.stringify(
messageInstanceForModal.message_correlations,
null,
2
)}
{JSON.stringify(messageInstanceForModal.correlation_keys, null, 2)}
</pre>
</Modal>
);
@ -95,21 +91,27 @@ export default function MessageInstanceList() {
</>
);
}
let processLink = <span>External Call</span>;
let instanceLink = <span />;
if (row.process_instance_id != null) {
processLink = FormatProcessModelDisplayName(row);
instanceLink = (
<Link
data-qa="process-instance-show-link"
to={`/admin/process-instances/${modifyProcessIdentifierForPathParam(
row.process_model_identifier
)}/${row.process_instance_id}`}
>
{row.process_instance_id}
</Link>
);
}
return (
<tr key={row.id}>
<td>{row.id}</td>
<td>{FormatProcessModelDisplayName(row)}</td>
<td>
<Link
data-qa="process-instance-show-link"
to={`/admin/process-instances/${modifyProcessIdentifierForPathParam(
row.process_model_identifier
)}/${row.process_instance_id}`}
>
{row.process_instance_id}
</Link>
</td>
<td>{row.message_identifier}</td>
<td>{processLink}</td>
<td>{instanceLink}</td>
<td>{row.name}</td>
<td>{row.message_type}</td>
<td>
<Button