mirror of
https://github.com/sartography/spiff-arena.git
synced 2025-01-27 17:55:04 +00:00
BPMN.io -- Just show the message names not the ids - to assure we are only exposing the names.
SpiffWorkflow - - start_messages function should return message names, not ids. - don't catch external thrown messages within the same workflow process - add an expected value to the Correlation Property Model so we can use this well defined class as an external communication tool (rather than building an arbitrary dictionary) - Added a "get_awaiting_correlations" to an event, so we can get a list of the correlation properties related to the workflows currently defined correlation values. - workflows.waiting_events() function now returns the above awaiting correlations as the value on returned message events Backend - Dropping MessageModel and MessageCorrelationProperties - at least for now. We don't need them to send / receive messages though we may eventually want to track the messages and correlations defined across the system - these things (which are ever changing) should not be directly connected to the Messages which may be in flux - and the cross relationships between the tables could cause unexpected and unceissary errors. Commented out the caching logic so we can turn this back on later. - Slight improvement to API Errors - MessageInstances are no longer in a many-to-many relationship with Correlations - Each message instance has a unique set of message correlations specific to the instance. - Message Instances have users, and can be linked through a "counterpart_id" so you can see what send is connected to what recieve. - Message Correlations are connected to recieving message instances. It is not to a process instance, and not to a message model. They now include the expected value and retrieval expression required to validate an incoming message. - A process instance is not connected to message correlations. - Message Instances are not always tied to a process instance (for example, a Send Message from an API) - API calls to create a message use the same logic as all other message catching code. - Make use of the new waiting_events() method to check for any new recieve messages in the workflow (much easier than churning through all of the tasks) - One giant mother of a migration.
This commit is contained in:
parent
790483a421
commit
0f3ef00d72
1879
spiffworkflow-backend/poetry.lock
generated
1879
spiffworkflow-backend/poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@ -1880,19 +1880,19 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Workflow"
|
||||
|
||||
/messages/{message_identifier}:
|
||||
/messages/{message_name}:
|
||||
parameters:
|
||||
- name: message_identifier
|
||||
- name: message_name
|
||||
in: path
|
||||
required: true
|
||||
description: The unique identifier of the message model.
|
||||
description: The unique name of the message.
|
||||
schema:
|
||||
type: string
|
||||
post:
|
||||
tags:
|
||||
- Messages
|
||||
operationId: spiffworkflow_backend.routes.messages_controller.message_send
|
||||
summary: Instantiate and run a given process model with a message start event matching given identifier
|
||||
summary: Instantiate and run a given process model with a message start event matching given name
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
|
@ -253,7 +253,7 @@ def handle_exception(exception: Exception) -> flask.wrappers.Response:
|
||||
else:
|
||||
api_exception = ApiError(
|
||||
error_code=error_code,
|
||||
message=f"{exception.__class__.__name__}",
|
||||
message=f"{exception.__class__.__name__} {str(exception)}",
|
||||
sentry_link=sentry_link,
|
||||
status_code=status_code,
|
||||
)
|
||||
|
@ -21,13 +21,9 @@ from spiffworkflow_backend.models.human_task import HumanTaskModel # noqa: F401
|
||||
from spiffworkflow_backend.models.spec_reference import (
|
||||
SpecReferenceCache,
|
||||
) # noqa: F401
|
||||
from spiffworkflow_backend.models.message_correlation_property import (
|
||||
MessageCorrelationPropertyModel,
|
||||
) # noqa: F401
|
||||
from spiffworkflow_backend.models.message_instance import (
|
||||
MessageInstanceModel,
|
||||
) # noqa: F401
|
||||
from spiffworkflow_backend.models.message_model import MessageModel # noqa: F401
|
||||
from spiffworkflow_backend.models.message_triggerable_process_model import (
|
||||
MessageTriggerableProcessModel,
|
||||
) # noqa: F401
|
||||
|
@ -1,41 +0,0 @@
|
||||
"""Message_correlation."""
|
||||
from dataclasses import dataclass
|
||||
|
||||
from sqlalchemy import ForeignKey
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
from spiffworkflow_backend.models.db import db
|
||||
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
|
||||
from spiffworkflow_backend.models.message_correlation_property import (
|
||||
MessageCorrelationPropertyModel,
|
||||
)
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||
|
||||
|
||||
@dataclass
|
||||
class MessageCorrelationModel(SpiffworkflowBaseDBModel):
|
||||
"""Message Correlations to relate queued messages together."""
|
||||
|
||||
__tablename__ = "message_correlation"
|
||||
__table_args__ = (
|
||||
db.UniqueConstraint(
|
||||
"process_instance_id",
|
||||
"message_correlation_property_id",
|
||||
"name",
|
||||
name="message_instance_id_name_unique",
|
||||
),
|
||||
)
|
||||
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
process_instance_id = db.Column(
|
||||
ForeignKey(ProcessInstanceModel.id), nullable=False, index=True # type: ignore
|
||||
)
|
||||
message_correlation_property_id = db.Column(
|
||||
ForeignKey(MessageCorrelationPropertyModel.id), nullable=False, index=True
|
||||
)
|
||||
name = db.Column(db.String(255), nullable=False, index=True)
|
||||
value = db.Column(db.String(255), nullable=False, index=True)
|
||||
updated_at_in_seconds: int = db.Column(db.Integer)
|
||||
created_at_in_seconds: int = db.Column(db.Integer)
|
||||
|
||||
message_correlation_property = relationship("MessageCorrelationPropertyModel")
|
@ -1,14 +0,0 @@
|
||||
"""Message_correlation_message_instance."""
|
||||
from sqlalchemy import ForeignKey
|
||||
|
||||
from spiffworkflow_backend.models.db import db
|
||||
|
||||
message_correlation_message_instance_table = db.Table(
|
||||
"message_correlation_message_instance",
|
||||
db.Column(
|
||||
"message_instance_id", ForeignKey("message_instance.id"), primary_key=True
|
||||
),
|
||||
db.Column(
|
||||
"message_correlation_id", ForeignKey("message_correlation.id"), primary_key=True
|
||||
),
|
||||
)
|
@ -1,26 +0,0 @@
|
||||
"""Message_correlation_property."""
|
||||
from sqlalchemy import ForeignKey
|
||||
|
||||
from spiffworkflow_backend.models.db import db
|
||||
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
|
||||
from spiffworkflow_backend.models.message_model import MessageModel
|
||||
|
||||
|
||||
class MessageCorrelationPropertyModel(SpiffworkflowBaseDBModel):
|
||||
"""MessageCorrelationPropertyModel."""
|
||||
|
||||
__tablename__ = "message_correlation_property"
|
||||
__table_args__ = (
|
||||
db.UniqueConstraint(
|
||||
"identifier",
|
||||
"message_model_id",
|
||||
name="message_correlation_property_unique",
|
||||
),
|
||||
)
|
||||
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
identifier = db.Column(db.String(50), index=True)
|
||||
message_model_id = db.Column(ForeignKey(MessageModel.id), nullable=False)
|
||||
updated_at_in_seconds: int = db.Column(db.Integer)
|
||||
created_at_in_seconds: int = db.Column(db.Integer)
|
||||
message_model = db.relationship("MessageModel", backref="correlation_properties")
|
@ -6,16 +6,14 @@ from typing import Optional
|
||||
|
||||
from sqlalchemy import ForeignKey
|
||||
from sqlalchemy.event import listens_for
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy.orm import Session, relationship
|
||||
from sqlalchemy.orm import validates
|
||||
|
||||
from spiffworkflow_backend.models.db import db
|
||||
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
|
||||
from spiffworkflow_backend.models.message_correlation_message_instance import (
|
||||
message_correlation_message_instance_table,
|
||||
)
|
||||
from spiffworkflow_backend.models.message_model import MessageModel
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||
from spiffworkflow_backend.models.user import UserModel
|
||||
from SpiffWorkflow.bpmn.PythonScriptEngine import PythonScriptEngine # type: ignore
|
||||
|
||||
|
||||
class MessageTypes(enum.Enum):
|
||||
@ -41,22 +39,18 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
|
||||
__tablename__ = "message_instance"
|
||||
|
||||
id: int = db.Column(db.Integer, primary_key=True)
|
||||
process_instance_id: int = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=False) # type: ignore
|
||||
message_model_id: int = db.Column(ForeignKey(MessageModel.id), nullable=False)
|
||||
message_model = db.relationship("MessageModel")
|
||||
message_correlations = db.relationship(
|
||||
"MessageCorrelationModel",
|
||||
secondary=message_correlation_message_instance_table,
|
||||
backref="message_instances",
|
||||
cascade="all,delete",
|
||||
)
|
||||
|
||||
process_instance_id: int = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=True) # type: ignore
|
||||
name: str = db.Column(db.String(255))
|
||||
message_type: str = db.Column(db.String(20), nullable=False)
|
||||
payload: str = db.Column(db.JSON)
|
||||
payload: dict = db.Column(db.JSON)
|
||||
status: str = db.Column(db.String(20), nullable=False, default="ready")
|
||||
user_id: int = db.Column(ForeignKey(UserModel.id), nullable=False) # type: ignore
|
||||
user = relationship("UserModel")
|
||||
counterpart_id: int = db.Column(db.Integer) # Not enforcing self-referential foreign key so we can delete messages.
|
||||
failure_cause: str = db.Column(db.Text())
|
||||
updated_at_in_seconds: int = db.Column(db.Integer)
|
||||
created_at_in_seconds: int = db.Column(db.Integer)
|
||||
correlations = relationship("MessageInstanceCorrelationModel", back_populates="message_instance")
|
||||
|
||||
@validates("message_type")
|
||||
def validate_message_type(self, key: str, value: Any) -> Any:
|
||||
@ -68,29 +62,34 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
|
||||
"""Validate_status."""
|
||||
return self.validate_enum_field(key, value, MessageStatuses)
|
||||
|
||||
def correlation_dictionary(self) -> dict:
|
||||
correlation_dict = {}
|
||||
for c in self.message_correlations:
|
||||
correlation_dict[c.name] = c.value
|
||||
return correlation_dict
|
||||
|
||||
def correlates(self, other_message_instance: Any) -> bool:
|
||||
if other_message_instance.message_model_id != self.message_model_id:
|
||||
def correlates(self, other_message_instance: Any, expression_engine: PythonScriptEngine) -> bool:
|
||||
# This must be a receive message, and the other must be a send (otherwise we reverse the call)
|
||||
# We evaluate the other messages payload and run our correlation's
|
||||
# retrieval expressions against it, then compare it against our
|
||||
# expected values -- IF we don't have an expected value, we accept
|
||||
# any non-erroring result from the retrieval expression.
|
||||
if self.name != other_message_instance.name:
|
||||
return False
|
||||
return self.correlates_with_dictionary(
|
||||
other_message_instance.correlation_dictionary()
|
||||
)
|
||||
|
||||
def correlates_with_dictionary(self, dict: dict) -> bool:
|
||||
"""Returns true if the given dictionary matches the correlation names and values connected to this instance."""
|
||||
for c in self.message_correlations:
|
||||
# Fixme: Maybe we should look at typing the correlations and not forcing them to strings?
|
||||
if c.name in dict and str(dict[c.name]) == c.value:
|
||||
continue
|
||||
else:
|
||||
if self.message_type == MessageTypes.receive.value:
|
||||
if other_message_instance.message_type != MessageTypes.send.value:
|
||||
return False
|
||||
return True
|
||||
|
||||
payload = other_message_instance.payload
|
||||
for corr in self.correlations:
|
||||
try:
|
||||
result = expression_engine._evaluate(corr.retrieval_expression, payload)
|
||||
except Exception as e:
|
||||
# the failure of a payload evaluation may not mean that matches for these
|
||||
# message instances can't happen with other messages. So don't error up.
|
||||
# fixme: Perhaps log some sort of error.
|
||||
return False
|
||||
if corr.expected_value is None:
|
||||
continue # We will accept any value
|
||||
elif corr.expected_value != str(result): # fixme: Don't require conversion to string
|
||||
return False
|
||||
return True
|
||||
elif other_message_instance.message_type == MessageTypes.receive.value:
|
||||
return other_message_instance.correlates(self, expression_engine)
|
||||
return False
|
||||
|
||||
# This runs for ALL db flushes for ANY model, not just this one even if it's in the MessageInstanceModel class
|
||||
# so this may not be worth it or there may be a better way to do it
|
||||
|
@ -0,0 +1,37 @@
|
||||
"""Message_correlation."""
|
||||
from dataclasses import dataclass
|
||||
|
||||
from sqlalchemy import ForeignKey
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
from spiffworkflow_backend.models.db import db
|
||||
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
|
||||
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
|
||||
|
||||
@dataclass
|
||||
class MessageInstanceCorrelationModel(SpiffworkflowBaseDBModel):
|
||||
"""These are the correlations of a specific Message Instance - these will
|
||||
only exist on receive messages. It provides the expression to run on a
|
||||
send messages payload which must match the expected value to be considered
|
||||
a valid match. If the expected value is null, then it does not need to
|
||||
match, but the expression should still evaluate and produce a result."""
|
||||
|
||||
__tablename__ = "message_instance_correlation"
|
||||
__table_args__ = (
|
||||
db.UniqueConstraint(
|
||||
"message_instance_id",
|
||||
"name",
|
||||
name="message_instance_id_name_unique",
|
||||
),
|
||||
)
|
||||
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
message_instance_id = db.Column(
|
||||
ForeignKey(MessageInstanceModel.id), nullable=False, index=True
|
||||
)
|
||||
name: str = db.Column(db.String(50), nullable=False)
|
||||
expected_value: str = db.Column(db.String(255), nullable=True, index=True)
|
||||
retrieval_expression: str = db.Column(db.String(255))
|
||||
updated_at_in_seconds: int = db.Column(db.Integer)
|
||||
created_at_in_seconds: int = db.Column(db.Integer)
|
||||
message_instance = relationship("MessageInstanceModel", back_populates="correlations")
|
@ -1,22 +0,0 @@
|
||||
"""Message_model."""
|
||||
from typing import Any
|
||||
|
||||
from spiffworkflow_backend.models.db import db
|
||||
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
|
||||
|
||||
|
||||
class MessageModel(SpiffworkflowBaseDBModel):
|
||||
"""MessageModel."""
|
||||
|
||||
__tablename__ = "message_model"
|
||||
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
identifier = db.Column(db.String(50), unique=True, index=True)
|
||||
name = db.Column(db.String(50), unique=True, index=True)
|
||||
# correlation_properties is a backref and defined in the MessageCorrelationProperties class.
|
||||
|
||||
def get_correlation_property(self, identifier: str) -> Any | None:
|
||||
for corr_prop in self.correlation_properties:
|
||||
if corr_prop.identifier == identifier:
|
||||
return corr_prop
|
||||
return None
|
@ -3,7 +3,6 @@ from sqlalchemy import ForeignKey
|
||||
|
||||
from spiffworkflow_backend.models.db import db
|
||||
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
|
||||
from spiffworkflow_backend.models.message_model import MessageModel
|
||||
|
||||
|
||||
class MessageTriggerableProcessModel(SpiffworkflowBaseDBModel):
|
||||
@ -12,10 +11,7 @@ class MessageTriggerableProcessModel(SpiffworkflowBaseDBModel):
|
||||
__tablename__ = "message_triggerable_process_model"
|
||||
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
message_model_id = db.Column(
|
||||
ForeignKey(MessageModel.id), nullable=False, unique=True
|
||||
)
|
||||
message_name: str = db.Column(db.String(255))
|
||||
process_model_identifier: str = db.Column(db.String(50), nullable=False, index=True)
|
||||
|
||||
updated_at_in_seconds: int = db.Column(db.Integer)
|
||||
created_at_in_seconds: int = db.Column(db.Integer)
|
||||
|
@ -74,7 +74,6 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel):
|
||||
overlaps="active_human_tasks",
|
||||
) # type: ignore
|
||||
message_instances = relationship("MessageInstanceModel", cascade="delete") # type: ignore
|
||||
message_correlations = relationship("MessageCorrelationModel", cascade="delete") # type: ignore
|
||||
process_metadata = relationship(
|
||||
"ProcessInstanceMetadataModel",
|
||||
cascade="delete",
|
||||
@ -144,6 +143,10 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel):
|
||||
"""Can_submit_task."""
|
||||
return not self.has_terminal_status() and self.status != "suspended"
|
||||
|
||||
def can_receive_message(self) -> bool:
|
||||
"""If this process can currently accept messages."""
|
||||
return not self.has_terminal_status() and self.status != "suspended"
|
||||
|
||||
def has_terminal_status(self) -> bool:
|
||||
"""Has_terminal_status."""
|
||||
return self.status in self.terminal_statuses()
|
||||
|
@ -10,15 +10,11 @@ from flask import jsonify
|
||||
from flask import make_response
|
||||
from flask.wrappers import Response
|
||||
|
||||
from spiffworkflow_backend import db
|
||||
from spiffworkflow_backend.exceptions.api_error import ApiError
|
||||
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
|
||||
from spiffworkflow_backend.models.message_model import MessageModel
|
||||
from spiffworkflow_backend.models.message_triggerable_process_model import (
|
||||
MessageTriggerableProcessModel,
|
||||
)
|
||||
from spiffworkflow_backend.models.message_instance import MessageInstanceModel, MessageStatuses
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModelSchema
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
|
||||
from spiffworkflow_backend.services.message_service import MessageService
|
||||
|
||||
|
||||
@ -41,10 +37,8 @@ def message_instance_list(
|
||||
MessageInstanceModel.created_at_in_seconds.desc(), # type: ignore
|
||||
MessageInstanceModel.id.desc(), # type: ignore
|
||||
)
|
||||
.join(MessageModel, MessageModel.id == MessageInstanceModel.message_model_id)
|
||||
.join(ProcessInstanceModel)
|
||||
.outerjoin(ProcessInstanceModel) # Not all messages were created by a process
|
||||
.add_columns(
|
||||
MessageModel.identifier.label("message_identifier"),
|
||||
ProcessInstanceModel.process_model_identifier,
|
||||
ProcessInstanceModel.process_model_display_name,
|
||||
)
|
||||
@ -68,19 +62,10 @@ def message_instance_list(
|
||||
# process_instance_id: Optional[int],
|
||||
# }
|
||||
def message_send(
|
||||
message_identifier: str,
|
||||
message_name: str,
|
||||
body: Dict[str, Any],
|
||||
) -> flask.wrappers.Response:
|
||||
"""Message_start."""
|
||||
message_model = MessageModel.query.filter_by(identifier=message_identifier).first()
|
||||
if message_model is None:
|
||||
raise (
|
||||
ApiError(
|
||||
error_code="unknown_message",
|
||||
message=f"Could not find message with identifier: {message_identifier}",
|
||||
status_code=404,
|
||||
)
|
||||
)
|
||||
|
||||
if "payload" not in body:
|
||||
raise (
|
||||
@ -96,69 +81,41 @@ def message_send(
|
||||
|
||||
process_instance = None
|
||||
|
||||
# Is there a running instance that is waiting for this message?
|
||||
message_instances = MessageInstanceModel.query.filter_by(
|
||||
message_model_id=message_model.id
|
||||
).all()
|
||||
|
||||
# do any waiting message instances have matching correlations?
|
||||
matching_message = None
|
||||
for message_instance in message_instances:
|
||||
if message_instance.correlates_with_dictionary(body["payload"]):
|
||||
matching_message = message_instance
|
||||
|
||||
process_instance = None
|
||||
if matching_message:
|
||||
process_instance = ProcessInstanceModel.query.filter_by(
|
||||
id=matching_message.process_instance_id
|
||||
).first()
|
||||
|
||||
if (
|
||||
matching_message
|
||||
and process_instance
|
||||
and process_instance.status != ProcessInstanceStatus.waiting.value
|
||||
):
|
||||
raise ApiError(
|
||||
error_code="message_not_accepted",
|
||||
message=(
|
||||
f"The process that can accept message '{message_identifier}' with the"
|
||||
" given correlation keys is not currently waiting for that message. "
|
||||
f" It is currently in the a '{process_instance.status}' state."
|
||||
),
|
||||
status_code=400,
|
||||
)
|
||||
elif matching_message and process_instance:
|
||||
MessageService.process_message_receive(
|
||||
message_instance, message_model.name, body["payload"]
|
||||
)
|
||||
else:
|
||||
# We don't have a process model waiting on this message, perhaps some process should be started?
|
||||
message_triggerable_process_model = (
|
||||
MessageTriggerableProcessModel.query.filter_by(
|
||||
message_model_id=message_model.id
|
||||
).first()
|
||||
)
|
||||
if message_triggerable_process_model is None:
|
||||
raise (
|
||||
# Create the send message
|
||||
message_instance = MessageInstanceModel(
|
||||
process_instance_id=None,
|
||||
message_type="send",
|
||||
name=message_name,
|
||||
payload=body['payload'],
|
||||
user_id=g.user.id,
|
||||
correlations=[],
|
||||
)
|
||||
db.session.add(message_instance)
|
||||
db.session.commit()
|
||||
try:
|
||||
receiver_message = MessageService.correlate_send_message(message_instance)
|
||||
except Exception as e:
|
||||
db.session.delete(message_instance)
|
||||
db.session.commit()
|
||||
raise e
|
||||
if not receiver_message:
|
||||
db.session.delete(message_instance)
|
||||
db.session.commit()
|
||||
raise (
|
||||
ApiError(
|
||||
error_code="cannot_start_message",
|
||||
error_code="message_not_accepted",
|
||||
message=(
|
||||
"No process instances correlate with the given message id of"
|
||||
f" '{message_identifier}'. And this message name is not"
|
||||
" currently associated with any process Start Event."
|
||||
"No running process instances correlate with the given message name of"
|
||||
f" '{message_name}'. And this message name is not"
|
||||
" currently associated with any process Start Event. Nothing to do."
|
||||
),
|
||||
status_code=400,
|
||||
)
|
||||
)
|
||||
process_instance = MessageService.process_message_triggerable_process_model(
|
||||
message_triggerable_process_model,
|
||||
message_model.name,
|
||||
body["payload"],
|
||||
g.user,
|
||||
)
|
||||
|
||||
process_instance = ProcessInstanceModel.query.filter_by(id=receiver_message.process_instance_id).first()
|
||||
return Response(
|
||||
json.dumps(ProcessInstanceModelSchema().dump(process_instance)),
|
||||
status=200,
|
||||
mimetype="application/json",
|
||||
)
|
||||
)
|
@ -137,7 +137,7 @@ def process_instance_run(
|
||||
processor.unlock_process_instance("Web")
|
||||
|
||||
if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER"]:
|
||||
MessageService.process_message_instances()
|
||||
MessageService.correlate_all_message_instances()
|
||||
|
||||
process_instance_api = ProcessInstanceService.processor_to_process_instance_api(
|
||||
processor
|
||||
|
@ -22,4 +22,4 @@ class BackgroundProcessingService:
|
||||
def process_message_instances_with_app_context(self) -> None:
|
||||
"""Since this runs in a scheduler, we need to specify the app context as well."""
|
||||
with self.app.app_context():
|
||||
MessageService.process_message_instances()
|
||||
MessageService.correlate_all_message_instances()
|
||||
|
@ -8,7 +8,7 @@ from flask.wrappers import Response
|
||||
|
||||
from spiffworkflow_backend.exceptions.api_error import ApiError
|
||||
from spiffworkflow_backend.models.db import db
|
||||
from spiffworkflow_backend.models.message_model import MessageModel
|
||||
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
|
||||
from spiffworkflow_backend.models.message_triggerable_process_model import (
|
||||
MessageTriggerableProcessModel,
|
||||
)
|
||||
@ -80,22 +80,30 @@ class ErrorHandlingService:
|
||||
f" Error:\n{error.__repr__()}"
|
||||
)
|
||||
message_payload = {"message_text": message_text, "recipients": recipients}
|
||||
message_identifier = current_app.config[
|
||||
message_name = current_app.config[
|
||||
"SPIFFWORKFLOW_BACKEND_SYSTEM_NOTIFICATION_PROCESS_MODEL_MESSAGE_ID"
|
||||
]
|
||||
message_model = MessageModel.query.filter_by(
|
||||
identifier=message_identifier
|
||||
).first()
|
||||
message_triggerable_process_model = (
|
||||
MessageTriggerableProcessModel.query.filter_by(
|
||||
message_model_id=message_model.id
|
||||
message_name=message_name
|
||||
).first()
|
||||
)
|
||||
process_instance = MessageService.process_message_triggerable_process_model(
|
||||
|
||||
# Create the send message
|
||||
message_instance = MessageInstanceModel(
|
||||
process_instance_id=None,
|
||||
message_type="send",
|
||||
name=message_name,
|
||||
payload=message_payload,
|
||||
user_id=g.user.id,
|
||||
correlations=[],
|
||||
)
|
||||
db.session.add(message_instance)
|
||||
db.session.commit()
|
||||
|
||||
process_instance = MessageService.start_process_with_message(
|
||||
message_triggerable_process_model,
|
||||
message_identifier,
|
||||
message_payload,
|
||||
g.user,
|
||||
message_instance
|
||||
)
|
||||
|
||||
return Response(
|
||||
|
@ -1,7 +1,4 @@
|
||||
"""Message_service."""
|
||||
from typing import Any
|
||||
from typing import Optional
|
||||
|
||||
from spiffworkflow_backend.models.db import db
|
||||
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
|
||||
from spiffworkflow_backend.models.message_instance import MessageStatuses
|
||||
@ -9,9 +6,8 @@ from spiffworkflow_backend.models.message_triggerable_process_model import (
|
||||
MessageTriggerableProcessModel,
|
||||
)
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||
from spiffworkflow_backend.models.user import UserModel
|
||||
from spiffworkflow_backend.services.process_instance_processor import (
|
||||
ProcessInstanceProcessor,
|
||||
ProcessInstanceProcessor, CustomBpmnScriptEngine,
|
||||
)
|
||||
from spiffworkflow_backend.services.process_instance_service import (
|
||||
ProcessInstanceService,
|
||||
@ -20,115 +16,119 @@ from spiffworkflow_backend.services.process_instance_service import (
|
||||
|
||||
class MessageServiceError(Exception):
|
||||
"""MessageServiceError."""
|
||||
|
||||
def __init__(self, msg, is_fatal=False):
|
||||
self.is_fatal = is_fatal
|
||||
super().__init__(msg)
|
||||
|
||||
class MessageService:
|
||||
"""MessageService."""
|
||||
|
||||
@classmethod
|
||||
def process_message_instances(cls) -> None:
|
||||
"""Process_message_instances."""
|
||||
def correlate_send_message(cls, message_instance_send: MessageInstanceModel):
|
||||
|
||||
# Thread safe via db locking - don't try to progress the same send message over multiple instances
|
||||
if message_instance_send.status != MessageStatuses.ready.value:
|
||||
return None
|
||||
message_instance_send.status = MessageStatuses.running.value
|
||||
db.session.add(message_instance_send)
|
||||
db.session.commit()
|
||||
|
||||
# Find available messages that might match
|
||||
available_receive_messages = MessageInstanceModel.query.filter_by(
|
||||
name=message_instance_send.name,
|
||||
status=MessageStatuses.ready.value
|
||||
).all()
|
||||
message_instance_receive = None
|
||||
try:
|
||||
for message_instance in available_receive_messages:
|
||||
if message_instance.correlates(message_instance_send, CustomBpmnScriptEngine()):
|
||||
message_instance_receive = message_instance
|
||||
|
||||
if message_instance_receive is None:
|
||||
# Check for a message triggerable process and start that to create a new message_instance_receive
|
||||
message_triggerable_process_model = (
|
||||
MessageTriggerableProcessModel.query.filter_by(
|
||||
message_name=message_instance_send.name
|
||||
).first()
|
||||
)
|
||||
if message_triggerable_process_model:
|
||||
receiving_process = MessageService.start_process_with_message(message_triggerable_process_model,
|
||||
message_instance_send)
|
||||
message_instance_receive = MessageInstanceModel.query.filter_by(
|
||||
process_instance_id=receiving_process.id,
|
||||
message_type="receive", status="ready"
|
||||
).first()
|
||||
else:
|
||||
receiving_process = MessageService.get_process_instance_for_message_instance(
|
||||
message_instance_receive)
|
||||
|
||||
# Assure we can send the message, otherwise keep going.
|
||||
if message_instance_receive is None or not receiving_process.can_receive_message():
|
||||
message_instance_send.status = "ready"
|
||||
message_instance_send.status = "ready"
|
||||
db.session.add(message_instance_send)
|
||||
db.session.commit()
|
||||
return
|
||||
|
||||
# Set the receiving message to running, so it is not altered elswhere ...
|
||||
message_instance_receive.status = "running"
|
||||
|
||||
cls.process_message_receive(
|
||||
receiving_process,
|
||||
message_instance_receive,
|
||||
message_instance_send.name,
|
||||
message_instance_send.payload,
|
||||
)
|
||||
message_instance_receive.status = "completed"
|
||||
message_instance_receive.counterpart_id = message_instance_send.id
|
||||
db.session.add(message_instance_receive)
|
||||
message_instance_send.status = "completed"
|
||||
message_instance_send.counterpart_id = message_instance_receive.id
|
||||
db.session.add(message_instance_send)
|
||||
db.session.commit()
|
||||
return message_instance_receive
|
||||
|
||||
except Exception as exception:
|
||||
db.session.rollback()
|
||||
message_instance_send.status = "failed"
|
||||
message_instance_send.failure_cause = str(exception)
|
||||
db.session.add(message_instance_send)
|
||||
if message_instance_receive:
|
||||
message_instance_receive.status = "failed"
|
||||
message_instance_receive.failure_cause = str(exception)
|
||||
db.session.add(message_instance_receive)
|
||||
db.session.commit()
|
||||
raise exception
|
||||
|
||||
@classmethod
|
||||
def correlate_all_message_instances(cls) -> None:
|
||||
"""Look at ALL the Send and Receive Messages and attempt to find correlations """
|
||||
message_instances_send = MessageInstanceModel.query.filter_by(
|
||||
message_type="send", status="ready"
|
||||
).all()
|
||||
message_instances_receive = MessageInstanceModel.query.filter_by(
|
||||
message_type="receive", status="ready"
|
||||
).all()
|
||||
|
||||
for message_instance_send in message_instances_send:
|
||||
# check again in case another background process picked up the message
|
||||
# while the previous one was running
|
||||
if message_instance_send.status != "ready":
|
||||
continue
|
||||
cls.correlate_send_message(message_instance_send)
|
||||
|
||||
message_instance_send.status = "running"
|
||||
db.session.add(message_instance_send)
|
||||
db.session.commit()
|
||||
|
||||
message_instance_receive = None
|
||||
try:
|
||||
message_instance_receive = cls.get_message_instance_receive(
|
||||
message_instance_send, message_instances_receive
|
||||
)
|
||||
if message_instance_receive is None:
|
||||
message_triggerable_process_model = (
|
||||
MessageTriggerableProcessModel.query.filter_by(
|
||||
message_model_id=message_instance_send.message_model_id
|
||||
).first()
|
||||
)
|
||||
if message_triggerable_process_model:
|
||||
process_instance_send = ProcessInstanceModel.query.filter_by(
|
||||
id=message_instance_send.process_instance_id,
|
||||
).first()
|
||||
# TODO: use the correct swimlane user when that is set up
|
||||
cls.process_message_triggerable_process_model(
|
||||
message_triggerable_process_model,
|
||||
message_instance_send.message_model.name,
|
||||
message_instance_send.payload,
|
||||
process_instance_send.process_initiator,
|
||||
)
|
||||
message_instance_send.status = "completed"
|
||||
else:
|
||||
# if we can't get a queued message then put it back in the queue
|
||||
message_instance_send.status = "ready"
|
||||
|
||||
else:
|
||||
if message_instance_receive.status != "ready":
|
||||
continue
|
||||
message_instance_receive.status = "running"
|
||||
|
||||
cls.process_message_receive(
|
||||
message_instance_receive,
|
||||
message_instance_send.message_model.name,
|
||||
message_instance_send.payload,
|
||||
)
|
||||
message_instance_receive.status = "completed"
|
||||
db.session.add(message_instance_receive)
|
||||
message_instance_send.status = "completed"
|
||||
|
||||
db.session.add(message_instance_send)
|
||||
db.session.commit()
|
||||
except Exception as exception:
|
||||
db.session.rollback()
|
||||
message_instance_send.status = "failed"
|
||||
message_instance_send.failure_cause = str(exception)
|
||||
db.session.add(message_instance_send)
|
||||
|
||||
if message_instance_receive:
|
||||
message_instance_receive.status = "failed"
|
||||
message_instance_receive.failure_cause = str(exception)
|
||||
db.session.add(message_instance_receive)
|
||||
|
||||
db.session.commit()
|
||||
raise exception
|
||||
|
||||
@staticmethod
|
||||
def process_message_triggerable_process_model(
|
||||
message_triggerable_process_model: MessageTriggerableProcessModel,
|
||||
message_model_name: str,
|
||||
message_payload: dict,
|
||||
user: UserModel,
|
||||
def start_process_with_message(
|
||||
message_triggerable_process_model: MessageTriggerableProcessModel,
|
||||
message_instance: MessageInstanceModel
|
||||
) -> ProcessInstanceModel:
|
||||
"""Process_message_triggerable_process_model."""
|
||||
"""Start up a process instance, so it is ready to catch the event."""
|
||||
process_instance_receive = ProcessInstanceService.create_process_instance_from_process_model_identifier(
|
||||
message_triggerable_process_model.process_model_identifier,
|
||||
user,
|
||||
message_instance.user,
|
||||
)
|
||||
processor_receive = ProcessInstanceProcessor(process_instance_receive)
|
||||
processor_receive.do_engine_steps(save=False)
|
||||
processor_receive.bpmn_process_instance.catch_bpmn_message(
|
||||
message_model_name, message_payload
|
||||
)
|
||||
processor_receive.do_engine_steps(save=True)
|
||||
|
||||
return process_instance_receive
|
||||
|
||||
@staticmethod
|
||||
def process_message_receive(
|
||||
message_instance_receive: MessageInstanceModel,
|
||||
message_model_name: str,
|
||||
message_payload: dict,
|
||||
) -> None:
|
||||
def get_process_instance_for_message_instance (
|
||||
message_instance_receive: MessageInstanceModel,
|
||||
) -> ProcessInstanceModel:
|
||||
"""Process_message_receive."""
|
||||
process_instance_receive = ProcessInstanceModel.query.filter_by(
|
||||
id=message_instance_receive.process_instance_id
|
||||
@ -141,43 +141,25 @@ class MessageService:
|
||||
f" {message_instance_receive.id}.Tried with id"
|
||||
f" {message_instance_receive.process_instance_id}"
|
||||
),
|
||||
)
|
||||
), is_fatal=True
|
||||
)
|
||||
return process_instance_receive
|
||||
|
||||
@staticmethod
|
||||
def process_message_receive (
|
||||
process_instance_receive: ProcessInstanceModel,
|
||||
message_instance_receive: MessageInstanceModel,
|
||||
message_model_name: str,
|
||||
message_payload: dict,
|
||||
) -> None:
|
||||
""" """
|
||||
|
||||
processor_receive = ProcessInstanceProcessor(process_instance_receive)
|
||||
processor_receive.bpmn_process_instance.catch_bpmn_message(
|
||||
message_model_name,
|
||||
message_payload,
|
||||
correlations=message_instance_receive.correlation_dictionary(),
|
||||
message_payload
|
||||
)
|
||||
processor_receive.do_engine_steps(save=True)
|
||||
message_instance_receive.status = MessageStatuses.completed.value
|
||||
db.session.add(message_instance_receive)
|
||||
db.session.commit()
|
||||
|
||||
@staticmethod
|
||||
def get_message_instance_receive(
|
||||
message_instance_send: MessageInstanceModel,
|
||||
message_instances_receive: list[MessageInstanceModel],
|
||||
) -> Optional[MessageInstanceModel]:
|
||||
"""Returns the message instance that correlates to the send message, or None if nothing correlates."""
|
||||
for message_instance in message_instances_receive:
|
||||
if message_instance.correlates(message_instance_send):
|
||||
return message_instance
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def get_process_instance_for_message_instance(
|
||||
message_instance: MessageInstanceModel,
|
||||
) -> Any:
|
||||
"""Get_process_instance_for_message_instance."""
|
||||
process_instance = ProcessInstanceModel.query.filter_by(
|
||||
id=message_instance.process_instance_id
|
||||
).first()
|
||||
if process_instance is None:
|
||||
raise MessageServiceError(
|
||||
f"Process instance cannot be found for message: {message_instance.id}."
|
||||
f"Tried with id {message_instance.process_instance_id}"
|
||||
)
|
||||
|
||||
return process_instance
|
||||
|
@ -60,12 +60,8 @@ from spiffworkflow_backend.models.file import FileType
|
||||
from spiffworkflow_backend.models.group import GroupModel
|
||||
from spiffworkflow_backend.models.human_task import HumanTaskModel
|
||||
from spiffworkflow_backend.models.human_task_user import HumanTaskUserModel
|
||||
from spiffworkflow_backend.models.message_correlation import MessageCorrelationModel
|
||||
from spiffworkflow_backend.models.message_correlation_property import (
|
||||
MessageCorrelationPropertyModel,
|
||||
)
|
||||
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
|
||||
from spiffworkflow_backend.models.message_instance import MessageModel
|
||||
from spiffworkflow_backend.models.message_instance_correlation import MessageInstanceCorrelationModel
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
|
||||
from spiffworkflow_backend.models.process_instance_metadata import (
|
||||
@ -1342,132 +1338,46 @@ class ProcessInstanceProcessor:
|
||||
"""Process_bpmn_messages."""
|
||||
bpmn_messages = self.bpmn_process_instance.get_bpmn_messages()
|
||||
for bpmn_message in bpmn_messages:
|
||||
# only message sends are in get_bpmn_messages
|
||||
message_model = MessageModel.query.filter_by(name=bpmn_message.name).first()
|
||||
if message_model is None:
|
||||
raise ApiError(
|
||||
"invalid_message_name",
|
||||
f"Invalid message name: {bpmn_message.name}.",
|
||||
)
|
||||
|
||||
if not bpmn_message.correlations:
|
||||
raise ApiError(
|
||||
"message_correlations_missing",
|
||||
(
|
||||
"Could not find any message correlations bpmn_message:"
|
||||
f" {bpmn_message.name}"
|
||||
),
|
||||
)
|
||||
|
||||
message_correlations = []
|
||||
for name, value in bpmn_message.correlations.items():
|
||||
message_correlation_property = message_model.get_correlation_property(
|
||||
name
|
||||
)
|
||||
if message_correlation_property is None:
|
||||
raise ApiError(
|
||||
"message_correlations_missing_from_process",
|
||||
(
|
||||
"Could not find a known message correlation with"
|
||||
f" identifier:{name}"
|
||||
),
|
||||
)
|
||||
message_correlations.append(
|
||||
MessageCorrelationModel(
|
||||
process_instance_id=self.process_instance_model.id,
|
||||
message_correlation_property_id=message_correlation_property.id,
|
||||
name=name,
|
||||
value=value,
|
||||
)
|
||||
)
|
||||
message_instance = MessageInstanceModel(
|
||||
process_instance_id=self.process_instance_model.id,
|
||||
user_id=self.process_instance_model.process_initiator_id, # TODO: use the correct swimlane user when that is set up
|
||||
message_type="send",
|
||||
message_model_id=message_model.id,
|
||||
name=bpmn_message.name,
|
||||
payload=bpmn_message.payload,
|
||||
message_correlations=message_correlations,
|
||||
correlations=[],
|
||||
)
|
||||
db.session.add(message_instance)
|
||||
db.session.commit()
|
||||
|
||||
def queue_waiting_receive_messages(self) -> None:
|
||||
"""Queue_waiting_receive_messages."""
|
||||
waiting_tasks = self.get_all_waiting_tasks()
|
||||
for waiting_task in waiting_tasks:
|
||||
# if it's not something that can wait for a message, skip it
|
||||
if waiting_task.task_spec.__class__.__name__ not in [
|
||||
"IntermediateCatchEvent",
|
||||
"ReceiveTask",
|
||||
]:
|
||||
continue
|
||||
waiting_events = self.bpmn_process_instance.waiting_events()
|
||||
waiting_message_events = filter(lambda e: e['event_type'] == "Message", waiting_events)
|
||||
|
||||
# timer events are not related to messaging, so ignore them for these purposes
|
||||
if waiting_task.task_spec.event_definition.__class__.__name__.endswith(
|
||||
"TimerEventDefinition"
|
||||
):
|
||||
continue
|
||||
|
||||
message_model = MessageModel.query.filter_by(
|
||||
name=waiting_task.task_spec.event_definition.name
|
||||
).first()
|
||||
if message_model is None:
|
||||
raise ApiError(
|
||||
"invalid_message_name",
|
||||
(
|
||||
"Invalid message name:"
|
||||
f" {waiting_task.task_spec.event_definition.name}."
|
||||
),
|
||||
)
|
||||
for event in waiting_message_events:
|
||||
|
||||
# Ensure we are only creating one message instance for each waiting message
|
||||
message_instance = MessageInstanceModel.query.filter_by(
|
||||
if MessageInstanceModel.query.filter_by(
|
||||
process_instance_id=self.process_instance_model.id,
|
||||
message_type="receive",
|
||||
message_model_id=message_model.id,
|
||||
).first()
|
||||
if message_instance:
|
||||
name=event['name']).count() > 0:
|
||||
continue
|
||||
|
||||
# Create a new Message Instance
|
||||
message_instance = MessageInstanceModel(
|
||||
process_instance_id=self.process_instance_model.id,
|
||||
user_id=self.process_instance_model.process_initiator_id,
|
||||
message_type="receive",
|
||||
message_model_id=message_model.id,
|
||||
name=event['name'],
|
||||
)
|
||||
|
||||
for (
|
||||
spiff_correlation_property
|
||||
) in waiting_task.task_spec.event_definition.correlation_properties:
|
||||
message_correlation = next(
|
||||
(
|
||||
mc
|
||||
for mc in message_instance.message_correlations
|
||||
if mc.name == spiff_correlation_property.name
|
||||
),
|
||||
None,
|
||||
for correlation_property in event['value']:
|
||||
message_correlation = MessageInstanceCorrelationModel (
|
||||
message_instance_id=message_instance.id,
|
||||
name=correlation_property.name,
|
||||
expected_value=correlation_property.expected_value,
|
||||
retrieval_expression=correlation_property.retrieval_expression
|
||||
)
|
||||
if not message_correlation:
|
||||
expression = spiff_correlation_property.expression
|
||||
correlation_value = (
|
||||
ProcessInstanceProcessor._script_engine.evaluate(
|
||||
waiting_task, expression
|
||||
)
|
||||
)
|
||||
correlation_name = spiff_correlation_property.name
|
||||
message_prop = (
|
||||
MessageCorrelationPropertyModel.query.filter_by(
|
||||
identifier=correlation_name
|
||||
)
|
||||
.filter_by(message_model_id=message_model.id)
|
||||
.first()
|
||||
)
|
||||
|
||||
message_correlation = MessageCorrelationModel(
|
||||
process_instance_id=self.process_instance_model.id,
|
||||
message_correlation_property_id=message_prop.id,
|
||||
name=correlation_name,
|
||||
value=correlation_value,
|
||||
)
|
||||
message_instance.message_correlations.append(message_correlation)
|
||||
message_instance.correlations.append(message_correlation)
|
||||
db.session.add(message_instance)
|
||||
db.session.commit()
|
||||
|
||||
|
@ -8,8 +8,8 @@ import sentry_sdk
|
||||
from flask import current_app
|
||||
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
|
||||
|
||||
from spiffworkflow_backend import db
|
||||
from spiffworkflow_backend.exceptions.api_error import ApiError
|
||||
from spiffworkflow_backend.models.db import db
|
||||
from spiffworkflow_backend.models.human_task import HumanTaskModel
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceApi
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||
@ -41,13 +41,14 @@ class ProcessInstanceService:
|
||||
user: UserModel,
|
||||
) -> ProcessInstanceModel:
|
||||
"""Get_process_instance_from_spec."""
|
||||
db.session.commit()
|
||||
try:
|
||||
current_git_revision = GitService.get_current_revision()
|
||||
except GitCommandError:
|
||||
current_git_revision = ""
|
||||
process_instance_model = ProcessInstanceModel(
|
||||
status=ProcessInstanceStatus.not_started.value,
|
||||
process_initiator=user,
|
||||
process_initiator_id=user.id,
|
||||
process_model_identifier=process_model.id,
|
||||
process_model_display_name=process_model.display_name,
|
||||
start_in_seconds=round(time.time()),
|
||||
|
@ -12,10 +12,6 @@ from spiffworkflow_backend.models.db import db
|
||||
from spiffworkflow_backend.models.file import File
|
||||
from spiffworkflow_backend.models.file import FileType
|
||||
from spiffworkflow_backend.models.file import SpecReference
|
||||
from spiffworkflow_backend.models.message_correlation_property import (
|
||||
MessageCorrelationPropertyModel,
|
||||
)
|
||||
from spiffworkflow_backend.models.message_model import MessageModel
|
||||
from spiffworkflow_backend.models.message_triggerable_process_model import (
|
||||
MessageTriggerableProcessModel,
|
||||
)
|
||||
@ -336,39 +332,31 @@ class SpecFileService(FileSystemService):
|
||||
@staticmethod
|
||||
def update_message_cache(ref: SpecReference) -> None:
|
||||
"""Assure we have a record in the database of all possible message ids and names."""
|
||||
for message_model_identifier in ref.messages.keys():
|
||||
message_model = MessageModel.query.filter_by(
|
||||
identifier=message_model_identifier
|
||||
).first()
|
||||
if message_model is None:
|
||||
message_model = MessageModel(
|
||||
identifier=message_model_identifier,
|
||||
name=ref.messages[message_model_identifier],
|
||||
)
|
||||
db.session.add(message_model)
|
||||
db.session.commit()
|
||||
pass
|
||||
# for message_model_identifier in ref.messages.keys():
|
||||
# message_model = MessageModel.query.filter_by(
|
||||
# identifier=message_model_identifier
|
||||
# ).first()
|
||||
# if message_model is None:
|
||||
# message_model = MessageModel(
|
||||
# identifier=message_model_identifier,
|
||||
# name=ref.messages[message_model_identifier],
|
||||
# )
|
||||
# db.session.add(message_model)
|
||||
# db.session.commit()
|
||||
|
||||
@staticmethod
|
||||
def update_message_trigger_cache(ref: SpecReference) -> None:
|
||||
"""Assure we know which messages can trigger the start of a process."""
|
||||
for message_model_identifier in ref.start_messages:
|
||||
message_model = MessageModel.query.filter_by(
|
||||
identifier=message_model_identifier
|
||||
).first()
|
||||
if message_model is None:
|
||||
raise ProcessModelFileInvalidError(
|
||||
"Could not find message model with identifier"
|
||||
f" '{message_model_identifier}'Required by a Start Event in :"
|
||||
f" {ref.file_name}"
|
||||
)
|
||||
for message_name in ref.start_messages:
|
||||
message_triggerable_process_model = (
|
||||
MessageTriggerableProcessModel.query.filter_by(
|
||||
message_model_id=message_model.id,
|
||||
message_name=message_name,
|
||||
).first()
|
||||
)
|
||||
if message_triggerable_process_model is None:
|
||||
message_triggerable_process_model = MessageTriggerableProcessModel(
|
||||
message_model_id=message_model.id,
|
||||
message_name=message_name,
|
||||
process_model_identifier=ref.process_model_id,
|
||||
)
|
||||
db.session.add(message_triggerable_process_model)
|
||||
@ -386,33 +374,34 @@ class SpecFileService(FileSystemService):
|
||||
@staticmethod
|
||||
def update_correlation_cache(ref: SpecReference) -> None:
|
||||
"""Update_correlation_cache."""
|
||||
for correlation_identifier in ref.correlations.keys():
|
||||
correlation_property_retrieval_expressions = ref.correlations[
|
||||
correlation_identifier
|
||||
]["retrieval_expressions"]
|
||||
|
||||
for cpre in correlation_property_retrieval_expressions:
|
||||
message_model_identifier = cpre["messageRef"]
|
||||
message_model = MessageModel.query.filter_by(
|
||||
identifier=message_model_identifier
|
||||
).first()
|
||||
if message_model is None:
|
||||
raise ProcessModelFileInvalidError(
|
||||
"Could not find message model with identifier"
|
||||
f" '{message_model_identifier}'specified by correlation"
|
||||
f" property: {cpre}"
|
||||
)
|
||||
# fixme: I think we are currently ignoring the correction properties.
|
||||
message_correlation_property = (
|
||||
MessageCorrelationPropertyModel.query.filter_by(
|
||||
identifier=correlation_identifier,
|
||||
message_model_id=message_model.id,
|
||||
).first()
|
||||
)
|
||||
if message_correlation_property is None:
|
||||
message_correlation_property = MessageCorrelationPropertyModel(
|
||||
identifier=correlation_identifier,
|
||||
message_model_id=message_model.id,
|
||||
)
|
||||
db.session.add(message_correlation_property)
|
||||
db.session.commit()
|
||||
pass
|
||||
# for correlation_identifier in ref.correlations.keys():
|
||||
# correlation_property_retrieval_expressions = ref.correlations[
|
||||
# correlation_identifier
|
||||
# ]["retrieval_expressions"]
|
||||
#
|
||||
# for cpre in correlation_property_retrieval_expressions:
|
||||
# message_model_identifier = cpre["messageRef"]
|
||||
# message_model = MessageModel.query.filter_by(
|
||||
# identifier=message_model_identifier
|
||||
# ).first()
|
||||
# if message_model is None:
|
||||
# raise ProcessModelFileInvalidError(
|
||||
# "Could not find message model with identifier"
|
||||
# f" '{message_model_identifier}'specified by correlation"
|
||||
# f" property: {cpre}"
|
||||
# )
|
||||
# message_correlation_property = (
|
||||
# MessageCorrelationPropertyModel.query.filter_by(
|
||||
# identifier=correlation_identifier,
|
||||
# message_model_id=message_model.id,
|
||||
# ).first()
|
||||
# )
|
||||
#
|
||||
# if message_correlation_property is None:
|
||||
# message_correlation_property = MessageCorrelationPropertyModel(
|
||||
# identifier=correlation_identifier,
|
||||
# message_model_id=message_model.id,
|
||||
# )
|
||||
# db.session.add(message_correlation_property)
|
||||
# db.session.commit()
|
||||
|
@ -12,18 +12,18 @@
|
||||
</bpmn:collaboration>
|
||||
<bpmn:correlationProperty id="mcp_topica_two" name="MCP TopicA Two">
|
||||
<bpmn:correlationPropertyRetrievalExpression messageRef="message_send_two">
|
||||
<bpmn:messagePath>topica_two</bpmn:messagePath>
|
||||
<bpmn:formalExpression>topica_two</bpmn:formalExpression>
|
||||
</bpmn:correlationPropertyRetrievalExpression>
|
||||
<bpmn:correlationPropertyRetrievalExpression messageRef="message_response_two">
|
||||
<bpmn:messagePath>topica_two</bpmn:messagePath>
|
||||
<bpmn:formalExpression>topic_two_a</bpmn:formalExpression>
|
||||
</bpmn:correlationPropertyRetrievalExpression>
|
||||
</bpmn:correlationProperty>
|
||||
<bpmn:correlationProperty id="mcp_topicb_two" name="MCP TopicB_two">
|
||||
<bpmn:correlationPropertyRetrievalExpression messageRef="message_send_two">
|
||||
<bpmn:messagePath>topicb_two</bpmn:messagePath>
|
||||
<bpmn:formalExpression>topicb_two</bpmn:formalExpression>
|
||||
</bpmn:correlationPropertyRetrievalExpression>
|
||||
<bpmn:correlationPropertyRetrievalExpression messageRef="message_response_two">
|
||||
<bpmn:messagePath>topicb_two</bpmn:messagePath>
|
||||
<bpmn:formalExpression>topic_two_b</bpmn:formalExpression>
|
||||
</bpmn:correlationPropertyRetrievalExpression>
|
||||
</bpmn:correlationProperty>
|
||||
<bpmn:message id="message_send_two" name="Message Send Two">
|
||||
@ -34,8 +34,8 @@
|
||||
<bpmn:message id="message_response_two" name="Message Response Two">
|
||||
<bpmn:extensionElements>
|
||||
<spiffworkflow:messagePayload>{
|
||||
"topica_two": payload_var_two.topica_two,
|
||||
"topicb_two": payload_var_two.topicb_two,
|
||||
"topic_two_a": payload_var_two.topica_two,
|
||||
"topic_two_b": payload_var_two.topicb_two,
|
||||
"second_var_two": second_var_two
|
||||
}</spiffworkflow:messagePayload>
|
||||
</bpmn:extensionElements>
|
||||
|
@ -22,7 +22,7 @@
|
||||
<bpmn:formalExpression>topica_one</bpmn:formalExpression>
|
||||
</bpmn:correlationPropertyRetrievalExpression>
|
||||
<bpmn:correlationPropertyRetrievalExpression messageRef="message_response_one">
|
||||
<bpmn:formalExpression>topic_one_a</bpmn:formalExpression>
|
||||
<bpmn:formalExpression>topica_one</bpmn:formalExpression>
|
||||
</bpmn:correlationPropertyRetrievalExpression>
|
||||
</bpmn:correlationProperty>
|
||||
<bpmn:correlationProperty id="mcp_topicb_one" name="MCP TopicB_one">
|
||||
@ -30,7 +30,7 @@
|
||||
<bpmn:formalExpression>topicb_one</bpmn:formalExpression>
|
||||
</bpmn:correlationPropertyRetrievalExpression>
|
||||
<bpmn:correlationPropertyRetrievalExpression messageRef="message_response_one">
|
||||
<bpmn:formalExpression>topic_one_b</bpmn:formalExpression>
|
||||
<bpmn:formalExpression>topicb_one</bpmn:formalExpression>
|
||||
</bpmn:correlationPropertyRetrievalExpression>
|
||||
</bpmn:correlationProperty>
|
||||
<bpmn:process id="message_send_process" name="Message Send Process" isExecutable="true">
|
||||
|
@ -1347,7 +1347,7 @@ class TestProcessApi(BaseTest):
|
||||
bpmn_file_location=bpmn_file_location,
|
||||
)
|
||||
|
||||
message_model_identifier = "request_approval"
|
||||
message_model_identifier = "Request Approval"
|
||||
payload = {
|
||||
"customer_id": "sartography",
|
||||
"po_number": "1001",
|
||||
@ -1396,7 +1396,7 @@ class TestProcessApi(BaseTest):
|
||||
bpmn_file_location=bpmn_file_location,
|
||||
)
|
||||
|
||||
message_model_identifier = "approval_result"
|
||||
message_model_identifier = "Approval Result"
|
||||
payload = {
|
||||
"customer_id": "sartography",
|
||||
"po_number": "1001",
|
||||
@ -1478,7 +1478,7 @@ class TestProcessApi(BaseTest):
|
||||
bpmn_file_location=bpmn_file_location,
|
||||
)
|
||||
|
||||
message_model_identifier = "approval_result"
|
||||
message_model_identifier = "Approval Result"
|
||||
payload = {
|
||||
"customer_id": "sartography",
|
||||
"po_number": "1001",
|
||||
@ -2313,7 +2313,7 @@ class TestProcessApi(BaseTest):
|
||||
# process_model_source_directory="message_send_one_conversation",
|
||||
# bpmn_file_name="message_receiver",
|
||||
# )
|
||||
message_model_identifier = "request_approval"
|
||||
message_model_identifier = "Request Approval"
|
||||
payload = {
|
||||
"customer_id": "sartography",
|
||||
"po_number": "1001",
|
||||
@ -2330,6 +2330,7 @@ class TestProcessApi(BaseTest):
|
||||
assert response.json is not None
|
||||
process_instance_id_one = response.json["id"]
|
||||
|
||||
payload['po_number'] = "1002"
|
||||
response = client.post(
|
||||
f"/v1.0/messages/{message_model_identifier}",
|
||||
content_type="application/json",
|
||||
@ -2346,7 +2347,7 @@ class TestProcessApi(BaseTest):
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json is not None
|
||||
assert len(response.json["results"]) == 1
|
||||
assert len(response.json["results"]) == 2 # Two messages, one is the completed receive, the other is new send
|
||||
assert (
|
||||
response.json["results"][0]["process_instance_id"]
|
||||
== process_instance_id_one
|
||||
@ -2358,7 +2359,7 @@ class TestProcessApi(BaseTest):
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json is not None
|
||||
assert len(response.json["results"]) == 1
|
||||
assert len(response.json["results"]) == 2
|
||||
assert (
|
||||
response.json["results"][0]["process_instance_id"]
|
||||
== process_instance_id_two
|
||||
@ -2370,7 +2371,9 @@ class TestProcessApi(BaseTest):
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json is not None
|
||||
assert len(response.json["results"]) == 2
|
||||
# 4 -Two messages for each process (a record of the completed receive, and then a send created)
|
||||
# + 2 -Two messages logged for the API Calls used to create the processes.
|
||||
assert len(response.json["results"]) == 6
|
||||
|
||||
def test_correct_user_can_get_and_update_a_task(
|
||||
self,
|
||||
|
@ -6,7 +6,6 @@ from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
||||
|
||||
from spiffworkflow_backend.models.db import db
|
||||
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
|
||||
from spiffworkflow_backend.models.message_model import MessageModel
|
||||
from spiffworkflow_backend.models.user import UserModel
|
||||
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
||||
|
||||
@ -38,8 +37,7 @@ class TestMessageInstance(BaseTest):
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test_can_create_message_instance."""
|
||||
message_model_identifier = "message_model_one"
|
||||
message_model = self.create_message_model(message_model_identifier)
|
||||
message_name = "Message Model One"
|
||||
process_model_identifier = self.setup_message_tests(
|
||||
client, with_super_admin_user
|
||||
)
|
||||
@ -53,8 +51,10 @@ class TestMessageInstance(BaseTest):
|
||||
|
||||
queued_message = MessageInstanceModel(
|
||||
process_instance_id=process_instance.id,
|
||||
user_id=process_instance.process_initiator_id,
|
||||
message_type="send",
|
||||
message_model_id=message_model.id,
|
||||
name =message_name,
|
||||
payload={"Word":"Eat At Mashita's, delicious!"}
|
||||
)
|
||||
db.session.add(queued_message)
|
||||
db.session.commit()
|
||||
@ -75,12 +75,10 @@ class TestMessageInstance(BaseTest):
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test_cannot_set_invalid_status."""
|
||||
message_model_identifier = "message_model_one"
|
||||
message_model = self.create_message_model(message_model_identifier)
|
||||
message_name = "message_model_one"
|
||||
process_model_identifier = self.setup_message_tests(
|
||||
client, with_super_admin_user
|
||||
)
|
||||
|
||||
process_model = ProcessModelService.get_process_model(
|
||||
process_model_id=process_model_identifier
|
||||
)
|
||||
@ -91,8 +89,9 @@ class TestMessageInstance(BaseTest):
|
||||
with pytest.raises(ValueError) as exception:
|
||||
MessageInstanceModel(
|
||||
process_instance_id=process_instance.id,
|
||||
user_id=process_instance.process_initiator_id,
|
||||
message_type="send",
|
||||
message_model_id=message_model.id,
|
||||
name=message_name,
|
||||
status="BAD_STATUS",
|
||||
)
|
||||
assert (
|
||||
@ -101,8 +100,9 @@ class TestMessageInstance(BaseTest):
|
||||
|
||||
queued_message = MessageInstanceModel(
|
||||
process_instance_id=process_instance.id,
|
||||
user_id=process_instance.process_initiator_id,
|
||||
message_type="send",
|
||||
message_model_id=message_model.id,
|
||||
name=message_name,
|
||||
)
|
||||
db.session.add(queued_message)
|
||||
db.session.commit()
|
||||
@ -121,8 +121,7 @@ class TestMessageInstance(BaseTest):
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test_cannot_set_invalid_message_type."""
|
||||
message_model_identifier = "message_model_one"
|
||||
message_model = self.create_message_model(message_model_identifier)
|
||||
message_name = "message_model_one"
|
||||
process_model_identifier = self.setup_message_tests(
|
||||
client, with_super_admin_user
|
||||
)
|
||||
@ -137,8 +136,9 @@ class TestMessageInstance(BaseTest):
|
||||
with pytest.raises(ValueError) as exception:
|
||||
MessageInstanceModel(
|
||||
process_instance_id=process_instance.id,
|
||||
user_id=process_instance.process_initiator_id,
|
||||
message_type="BAD_MESSAGE_TYPE",
|
||||
message_model_id=message_model.id,
|
||||
name=message_name
|
||||
)
|
||||
assert (
|
||||
str(exception.value)
|
||||
@ -147,8 +147,9 @@ class TestMessageInstance(BaseTest):
|
||||
|
||||
queued_message = MessageInstanceModel(
|
||||
process_instance_id=process_instance.id,
|
||||
user_id=process_instance.process_initiator_id,
|
||||
message_type="send",
|
||||
message_model_id=message_model.id,
|
||||
name=message_name,
|
||||
)
|
||||
db.session.add(queued_message)
|
||||
db.session.commit()
|
||||
@ -168,8 +169,7 @@ class TestMessageInstance(BaseTest):
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test_force_failure_cause_if_status_is_failure."""
|
||||
message_model_identifier = "message_model_one"
|
||||
message_model = self.create_message_model(message_model_identifier)
|
||||
message_name = "message_model_one"
|
||||
process_model_identifier = self.setup_message_tests(
|
||||
client, with_super_admin_user
|
||||
)
|
||||
@ -183,8 +183,9 @@ class TestMessageInstance(BaseTest):
|
||||
|
||||
queued_message = MessageInstanceModel(
|
||||
process_instance_id=process_instance.id,
|
||||
user_id=process_instance.process_initiator_id,
|
||||
message_type="send",
|
||||
message_model_id=message_model.id,
|
||||
name=message_name,
|
||||
status="failed",
|
||||
)
|
||||
db.session.add(queued_message)
|
||||
@ -199,8 +200,9 @@ class TestMessageInstance(BaseTest):
|
||||
|
||||
queued_message = MessageInstanceModel(
|
||||
process_instance_id=process_instance.id,
|
||||
user_id=process_instance.process_initiator_id,
|
||||
message_type="send",
|
||||
message_model_id=message_model.id,
|
||||
name=message_name,
|
||||
)
|
||||
db.session.add(queued_message)
|
||||
db.session.commit()
|
||||
@ -212,10 +214,4 @@ class TestMessageInstance(BaseTest):
|
||||
assert queued_message.id is not None
|
||||
assert queued_message.failure_cause == "THIS TEST FAILURE"
|
||||
|
||||
@staticmethod
|
||||
def create_message_model(message_model_identifier: str) -> MessageModel:
|
||||
"""Create_message_model."""
|
||||
message_model = MessageModel(identifier=message_model_identifier)
|
||||
db.session.add(message_model)
|
||||
db.session.commit()
|
||||
return message_model
|
||||
|
||||
|
@ -49,18 +49,18 @@ class TestMessageService(BaseTest):
|
||||
|
||||
# Make an API call to the service endpoint, but use the wrong po number
|
||||
with pytest.raises(ApiError):
|
||||
message_send("approval_result", {"payload": {"po_number": 5001}})
|
||||
message_send("Approval Result", {"payload": {"po_number": 5001}})
|
||||
|
||||
# Sound return an error when making an API call for right po number, wrong client
|
||||
# Should return an error when making an API call for right po number, wrong client
|
||||
with pytest.raises(ApiError):
|
||||
message_send(
|
||||
"approval_result",
|
||||
"Approval Result",
|
||||
{"payload": {"po_number": 1001, "customer_id": "jon"}},
|
||||
)
|
||||
|
||||
# No error when calling with the correct parameters
|
||||
message_send(
|
||||
"approval_result",
|
||||
"Approval Result",
|
||||
{"payload": {"po_number": 1001, "customer_id": "Sartography"}},
|
||||
)
|
||||
|
||||
@ -111,14 +111,14 @@ class TestMessageService(BaseTest):
|
||||
# This is typically called in a background cron process, so we will manually call it
|
||||
# here in the tests
|
||||
# The first time it is called, it will instantiate a new instance of the message_recieve process
|
||||
MessageService.process_message_instances()
|
||||
MessageService.correlate_all_message_instances()
|
||||
|
||||
# The sender process should still be waiting on a message to be returned to it ...
|
||||
self.assure_there_is_a_process_waiting_on_a_message()
|
||||
|
||||
# The second time we call ths process_message_isntances (again it would typically be running on cron)
|
||||
# it will deliver the message that was sent from the receiver back to the original sender.
|
||||
MessageService.process_message_instances()
|
||||
MessageService.correlate_all_message_instances()
|
||||
|
||||
# But there should be no send message waiting for delivery, because
|
||||
# the message receiving process should pick it up instantly via
|
||||
@ -130,6 +130,10 @@ class TestMessageService(BaseTest):
|
||||
.all()
|
||||
)
|
||||
assert len(waiting_messages) == 0
|
||||
MessageService.correlate_all_message_instances()
|
||||
MessageService.correlate_all_message_instances()
|
||||
MessageService.correlate_all_message_instances()
|
||||
assert len(waiting_messages) == 0
|
||||
|
||||
# The message sender process is complete
|
||||
assert self.process_instance.status == "complete"
|
||||
@ -181,16 +185,11 @@ class TestMessageService(BaseTest):
|
||||
)
|
||||
assert len(send_messages) == 1
|
||||
send_message = send_messages[0]
|
||||
|
||||
# The payload should match because of how it is written in the Send task.
|
||||
assert (
|
||||
send_message.payload == self.payload
|
||||
), "The send message should match up with the payload"
|
||||
assert send_message.message_model.identifier == "request_approval"
|
||||
assert send_message.name == "Request Approval"
|
||||
assert send_message.status == "ready"
|
||||
assert len(send_message.message_correlations) == 2
|
||||
MessageInstanceModel.query.all()
|
||||
self.assure_correlation_properties_are_right(send_message)
|
||||
|
||||
def assure_there_is_a_process_waiting_on_a_message(self) -> None:
|
||||
# There should be one new send message for the given process instance.
|
||||
@ -208,14 +207,14 @@ class TestMessageService(BaseTest):
|
||||
self, message: MessageInstanceModel
|
||||
) -> None:
|
||||
# Correlation Properties should match up
|
||||
po_curr = next(c for c in message.message_correlations if c.name == "po_number")
|
||||
po_curr = next(c for c in message.correlations if c.name == "po_number")
|
||||
customer_curr = next(
|
||||
c for c in message.message_correlations if c.name == "customer_id"
|
||||
c for c in message.correlations if c.name == "customer_id"
|
||||
)
|
||||
assert po_curr is not None
|
||||
assert customer_curr is not None
|
||||
assert po_curr.value == "1001"
|
||||
assert customer_curr.value == "Sartography"
|
||||
assert po_curr.expected_value == "1001"
|
||||
assert customer_curr.expected_value == "Sartography"
|
||||
|
||||
def test_can_send_message_to_multiple_process_models(
|
||||
self,
|
||||
@ -274,17 +273,13 @@ class TestMessageService(BaseTest):
|
||||
assert len(orig_send_messages) == 2
|
||||
assert MessageInstanceModel.query.filter_by(message_type="receive").count() == 1
|
||||
|
||||
message_instances = MessageInstanceModel.query.all()
|
||||
# Each message instance should have two correlations
|
||||
for mi in message_instances:
|
||||
assert len(mi.message_correlations) == 2
|
||||
|
||||
# process message instances
|
||||
MessageService.process_message_instances()
|
||||
# Once complete the original send messages should be completed and two new instanges
|
||||
MessageService.correlate_all_message_instances()
|
||||
# Once complete the original send messages should be completed and two new instances
|
||||
# should now exist, one for each of the process instances ...
|
||||
for osm in orig_send_messages:
|
||||
assert osm.status == "completed"
|
||||
# for osm in orig_send_messages:
|
||||
# assert osm.status == "completed"
|
||||
|
||||
process_instance_result = ProcessInstanceModel.query.all()
|
||||
assert len(process_instance_result) == 3
|
||||
@ -312,7 +307,7 @@ class TestMessageService(BaseTest):
|
||||
assert process_instance_receiver_two.status == "complete"
|
||||
|
||||
message_instance_result = MessageInstanceModel.query.all()
|
||||
assert len(message_instance_result) == 5
|
||||
assert len(message_instance_result) == 7
|
||||
|
||||
message_instance_receiver_one = [
|
||||
x
|
||||
@ -326,15 +321,15 @@ class TestMessageService(BaseTest):
|
||||
][0]
|
||||
assert message_instance_receiver_one is not None
|
||||
assert message_instance_receiver_two is not None
|
||||
assert message_instance_receiver_one.status == "ready"
|
||||
assert message_instance_receiver_two.status == "ready"
|
||||
|
||||
# process second message
|
||||
MessageService.process_message_instances()
|
||||
MessageService.process_message_instances()
|
||||
# Cause a currelation event
|
||||
MessageService.correlate_all_message_instances()
|
||||
# We have to run it a second time because instances are firing
|
||||
# more messages that need to be picked up.
|
||||
MessageService.correlate_all_message_instances()
|
||||
|
||||
message_instance_result = MessageInstanceModel.query.all()
|
||||
assert len(message_instance_result) == 6
|
||||
assert len(message_instance_result) == 8
|
||||
for message_instance in message_instance_result:
|
||||
assert message_instance.status == "completed"
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user