* SpiffWorkflow event_definitions wanted to return a message event's correlation properties mested within correlation keys. But messages are directly related to properties, not to keys - and it forced a number of conversions that made for tricky code. So Messages now contain a dictionary of correlation properties only.
* SpiffWorkflow did not serialize correlations - so they were lost between save and retrieve. * When comparing Correlation Property values - we are storing these values as strings in the database and can't convert them back to integers later, so I'm changing everying everywhere to compare after conversion to a string. Don't feel great about this one. * By using an SQL Alchemy join table, there is a lot of db queries we don't need to write. * A few handy fucntions on db models to make it easier to work with correlations. * Updated tests because I changed some of the BPMN models we were testing against. * Database migration to use the new constraint names with the alternate form of the join table between correlation mesages to instance messages.
This commit is contained in:
parent
f451034069
commit
384c272afa
|
@ -0,0 +1,35 @@
|
|||
"""empty message
|
||||
|
||||
Revision ID: b581ff2351ad
|
||||
Revises: ac40af4ddef3
|
||||
Create Date: 2023-02-19 11:19:24.371528
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'b581ff2351ad'
|
||||
down_revision = 'ac40af4ddef3'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
|
||||
# Just some table cleanup so the join table is properly built
|
||||
op.drop_constraint('message_correlation_message_instance_ibfk_1', 'message_correlation_message_instance', type_="foreignkey")
|
||||
op.drop_constraint('message_correlation_message_instance_ibfk_2', 'message_correlation_message_instance', type_="foreignkey")
|
||||
op.drop_index('ix_message_correlation_message_instance_message_correlation_id', table_name='message_correlation_message_instance')
|
||||
op.drop_index('ix_message_correlation_message_instance_message_instance_id', table_name='message_correlation_message_instance')
|
||||
op.create_foreign_key(None, 'message_correlation_message_instance', 'message_correlation', ['message_correlation_id'], ['id'])
|
||||
op.create_foreign_key(None, 'message_correlation_message_instance', 'message_instance', ['message_instance_id'], ['id'])
|
||||
|
||||
|
||||
def downgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_index(op.f('ix_message_correlation_property_correlation_key'), table_name='message_correlation_property')
|
||||
op.create_index('ix_message_correlation_message_instance_message_instance_id', 'message_correlation_message_instance', ['message_instance_id'], unique=False)
|
||||
op.create_index('ix_message_correlation_message_instance_message_correlation_id', 'message_correlation_message_instance', ['message_correlation_id'], unique=False)
|
||||
# ### end Alembic commands ###
|
|
@ -9,5 +9,5 @@ message_correlation_message_instance_table \
|
|||
db.Column('message_instance_id',
|
||||
ForeignKey('message_instance.id'), primary_key=True),
|
||||
db.Column('message_correlation_id',
|
||||
ForeignKey('message_correlation.id'),primary_key=True)
|
||||
)
|
||||
ForeignKey('message_correlation.id'),primary_key=True),
|
||||
)
|
|
@ -23,3 +23,5 @@ class MessageCorrelationPropertyModel(SpiffworkflowBaseDBModel):
|
|||
message_model_id = db.Column(ForeignKey(MessageModel.id), nullable=False)
|
||||
updated_at_in_seconds: int = db.Column(db.Integer)
|
||||
created_at_in_seconds: int = db.Column(db.Integer)
|
||||
message_model = db.relationship("MessageModel",
|
||||
backref="correlation_properties")
|
||||
|
|
|
@ -68,19 +68,23 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
|
|||
"""Validate_status."""
|
||||
return self.validate_enum_field(key, value, MessageStatuses)
|
||||
|
||||
def correlation_dictionary(self):
|
||||
correlation_dict = {}
|
||||
for c in self.message_correlations:
|
||||
correlation_dict[c.name]=c.value
|
||||
return correlation_dict
|
||||
|
||||
def correlates(self, other_message_instance: Self) -> bool:
|
||||
if other_message_instance.message_model_id != self.message_model_id:
|
||||
return False
|
||||
correlation_dict = {}
|
||||
for c in other_message_instance.message_correlations:
|
||||
correlation_dict[c.name]=c.value
|
||||
return self.correlates_with_dictionary(correlation_dict)
|
||||
return self.correlates_with_dictionary(other_message_instance.correlation_dictionary())
|
||||
|
||||
def correlates_with_dictionary(self, correlation_dictionary: dict) -> bool:
|
||||
"""Returns true if the given dictionary matches the correlation names and values connected to this message instance"""
|
||||
def correlates_with_dictionary(self, dict: dict) -> bool:
|
||||
"""Returns true if the given dictionary matches the correlation
|
||||
names and values connected to this message instance"""
|
||||
for c in self.message_correlations:
|
||||
# Fixme: Maybe we should look at typing the correlations and not forcing them to strings?
|
||||
if c.name in correlation_dictionary and str(correlation_dictionary[c.name]) == c.value:
|
||||
# Fixme: Maybe we should look at typing the correlations and not forcing them to strings?
|
||||
if c.name in dict and str(dict[c.name]) == c.value:
|
||||
continue
|
||||
else:
|
||||
return False
|
||||
|
|
|
@ -11,3 +11,10 @@ class MessageModel(SpiffworkflowBaseDBModel):
|
|||
id = db.Column(db.Integer, primary_key=True)
|
||||
identifier = db.Column(db.String(50), unique=True, index=True)
|
||||
name = db.Column(db.String(50), unique=True, index=True)
|
||||
# correlation_properties is a backref and defined in the MessageCorrelationProperties class.
|
||||
|
||||
def get_correlation_property(self, identifier):
|
||||
for corr_prop in self.correlation_properties:
|
||||
if corr_prop.identifier == identifier:
|
||||
return corr_prop;
|
||||
return None
|
|
@ -56,26 +56,6 @@ def message_instance_list(
|
|||
.paginate(page=page, per_page=per_page, error_out=False)
|
||||
)
|
||||
|
||||
for message_instance in message_instances:
|
||||
message_correlations: dict = {}
|
||||
for (
|
||||
mcmi
|
||||
) in (
|
||||
message_instance.MessageInstanceModel.message_correlations_message_instances
|
||||
):
|
||||
mc = MessageCorrelationModel.query.filter_by(
|
||||
id=mcmi.message_correlation_id
|
||||
).all()
|
||||
for m in mc:
|
||||
if m.name not in message_correlations:
|
||||
message_correlations[m.name] = {}
|
||||
message_correlations[m.name][
|
||||
m.message_correlation_property.identifier
|
||||
] = m.value
|
||||
message_instance.MessageInstanceModel.message_correlations = (
|
||||
message_correlations
|
||||
)
|
||||
|
||||
response_json = {
|
||||
"results": message_instances.items,
|
||||
"pagination": {
|
||||
|
@ -120,7 +100,6 @@ def message_send(
|
|||
|
||||
# Is there a running instance that is waiting for this message?
|
||||
message_instances = MessageInstanceModel.query.filter_by(message_model_id=message_model.id).all()
|
||||
correlations = MessageCorrelationPropertyModel.query.filter_by(message_model_id=message_model.id).all()
|
||||
|
||||
# do any waiting message instances have matching correlations?
|
||||
matching_message = None
|
||||
|
@ -133,7 +112,7 @@ def message_send(
|
|||
process_instance = ProcessInstanceModel.query.filter_by(id = matching_message.process_instance_id).first()
|
||||
|
||||
if matching_message and process_instance and process_instance.status != ProcessInstanceStatus.waiting.value:
|
||||
ApiError(
|
||||
raise ApiError(
|
||||
error_code="message_not_accepted",
|
||||
message=(
|
||||
f"The process that can accept message '{message_identifier}' with the given correlation keys"
|
||||
|
|
|
@ -149,11 +149,12 @@ class MessageService:
|
|||
)
|
||||
)
|
||||
|
||||
|
||||
processor_receive = ProcessInstanceProcessor(process_instance_receive)
|
||||
processor_receive.bpmn_process_instance.catch_bpmn_message(
|
||||
message_model_name,
|
||||
message_payload,
|
||||
correlations={},
|
||||
correlations=message_instance_receive.correlation_dictionary(),
|
||||
)
|
||||
processor_receive.do_engine_steps(save=True)
|
||||
message_instance_receive.status = MessageStatuses.completed.value
|
||||
|
|
|
@ -1338,10 +1338,9 @@ class ProcessInstanceProcessor:
|
|||
db.session.add(self.process_instance_model)
|
||||
db.session.commit()
|
||||
|
||||
# messages have one correlation key (possibly wrong)
|
||||
# correlation keys may have many correlation properties
|
||||
def process_bpmn_messages(self) -> None:
|
||||
"""Process_bpmn_messages."""
|
||||
|
||||
bpmn_messages = self.bpmn_process_instance.get_bpmn_messages()
|
||||
for bpmn_message in bpmn_messages:
|
||||
# only message sends are in get_bpmn_messages
|
||||
|
@ -1352,6 +1351,7 @@ class ProcessInstanceProcessor:
|
|||
f"Invalid message name: {bpmn_message.name}.",
|
||||
)
|
||||
|
||||
|
||||
if not bpmn_message.correlations:
|
||||
raise ApiError(
|
||||
"message_correlations_missing",
|
||||
|
@ -1362,54 +1362,28 @@ class ProcessInstanceProcessor:
|
|||
)
|
||||
|
||||
message_correlations = []
|
||||
for (
|
||||
message_correlation_key,
|
||||
message_correlation_properties,
|
||||
) in bpmn_message.correlations.items():
|
||||
for (
|
||||
message_correlation_property_identifier,
|
||||
message_correlation_property_value,
|
||||
) in message_correlation_properties.items():
|
||||
message_correlation_property = (
|
||||
MessageCorrelationPropertyModel.query.filter_by(
|
||||
identifier=message_correlation_property_identifier,
|
||||
).first()
|
||||
)
|
||||
if message_correlation_property is None:
|
||||
raise ApiError(
|
||||
"message_correlations_missing_from_process",
|
||||
(
|
||||
"Could not find a known message correlation with"
|
||||
f" identifier:{message_correlation_property_identifier}"
|
||||
),
|
||||
)
|
||||
message_correlations.append(
|
||||
{
|
||||
"message_correlation_property": (
|
||||
message_correlation_property
|
||||
),
|
||||
"name": message_correlation_property_identifier,
|
||||
"value": message_correlation_property_value,
|
||||
}
|
||||
for (name, value) in bpmn_message.correlations.items():
|
||||
message_correlation_property = message_model.get_correlation_property(name)
|
||||
if message_correlation_property is None:
|
||||
raise ApiError(
|
||||
"message_correlations_missing_from_process",
|
||||
(
|
||||
"Could not find a known message correlation with"
|
||||
f" identifier:{name}"
|
||||
),
|
||||
)
|
||||
message_correlations.append(MessageCorrelationModel(
|
||||
process_instance_id=self.process_instance_model.id,
|
||||
message_correlation_property_id=message_correlation_property.id,
|
||||
name=name,
|
||||
value=value))
|
||||
message_instance = MessageInstanceModel(
|
||||
process_instance_id=self.process_instance_model.id,
|
||||
message_type="send",
|
||||
message_model_id=message_model.id,
|
||||
payload=bpmn_message.payload,
|
||||
message_correlations=message_correlations
|
||||
)
|
||||
|
||||
correlation_models = []
|
||||
for message_correlation in message_correlations:
|
||||
correlation_models.append(MessageCorrelationModel(
|
||||
process_instance_id=self.process_instance_model.id,
|
||||
message_correlation_property_id=message_correlation[
|
||||
"message_correlation_property"
|
||||
].id,
|
||||
name=message_correlation["name"],
|
||||
value=message_correlation["value"],
|
||||
))
|
||||
message_instance.message_correlations = correlation_models
|
||||
db.session.add(message_instance)
|
||||
db.session.commit()
|
||||
|
||||
|
|
|
@ -1347,11 +1347,12 @@ class TestProcessApi(BaseTest):
|
|||
bpmn_file_location=bpmn_file_location,
|
||||
)
|
||||
|
||||
message_model_identifier = "message_send"
|
||||
message_model_identifier = "request_approval"
|
||||
payload = {
|
||||
"topica": "the_topica_string",
|
||||
"topicb": "the_topicb_string",
|
||||
"andThis": "another_item_non_key",
|
||||
"customer_id": "sartography",
|
||||
"po_number": "1001",
|
||||
"amount": "One Billion Dollars! Mwhahahahahaha",
|
||||
"description": "But seriously."
|
||||
}
|
||||
response = client.post(
|
||||
f"/v1.0/messages/{message_model_identifier}",
|
||||
|
@ -1372,7 +1373,7 @@ class TestProcessApi(BaseTest):
|
|||
processor = ProcessInstanceProcessor(process_instance)
|
||||
process_instance_data = processor.get_data()
|
||||
assert process_instance_data
|
||||
assert process_instance_data["the_payload"] == payload
|
||||
assert process_instance_data["invoice"] == payload
|
||||
|
||||
def test_message_send_when_providing_message_to_running_process_instance(
|
||||
self,
|
||||
|
@ -1395,13 +1396,12 @@ class TestProcessApi(BaseTest):
|
|||
bpmn_file_location=bpmn_file_location,
|
||||
)
|
||||
|
||||
message_model_identifier = "message_response"
|
||||
message_model_identifier = "approval_result"
|
||||
payload = {
|
||||
"the_payload": {
|
||||
"topica": "the_payload.topica_string",
|
||||
"topicb": "the_payload.topicb_string",
|
||||
"andThis": "another_item_non_key",
|
||||
}
|
||||
"customer_id": "sartography",
|
||||
"po_number": "1001",
|
||||
"amount": "One Billion Dollars! Mwhahahahahaha",
|
||||
"description": "Ya!, a-ok bud!"
|
||||
}
|
||||
response = self.create_process_instance_from_process_model_id_with_api(
|
||||
client,
|
||||
|
@ -1415,9 +1415,26 @@ class TestProcessApi(BaseTest):
|
|||
f"/v1.0/process-instances/{self.modify_process_identifier_for_path_param(process_model_identifier)}/{process_instance_id}/run",
|
||||
headers=self.logged_in_headers(with_super_admin_user),
|
||||
)
|
||||
|
||||
assert response.json is not None
|
||||
|
||||
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first()
|
||||
processor = ProcessInstanceProcessor(process_instance)
|
||||
processor.do_engine_steps(save=True)
|
||||
task = processor.get_all_user_tasks()[0]
|
||||
human_task = process_instance.active_human_tasks[0]
|
||||
spiff_task = processor.__class__.get_task_by_bpmn_identifier(
|
||||
human_task.task_name, processor.bpmn_process_instance
|
||||
)
|
||||
|
||||
ProcessInstanceService.complete_form_task(
|
||||
processor,
|
||||
task,
|
||||
payload,
|
||||
with_super_admin_user,
|
||||
human_task,
|
||||
)
|
||||
processor.save()
|
||||
|
||||
response = client.post(
|
||||
f"/v1.0/messages/{message_model_identifier}",
|
||||
content_type="application/json",
|
||||
|
@ -1462,14 +1479,14 @@ class TestProcessApi(BaseTest):
|
|||
bpmn_file_location=bpmn_file_location,
|
||||
)
|
||||
|
||||
message_model_identifier = "message_response"
|
||||
message_model_identifier = "approval_result"
|
||||
payload = {
|
||||
"the_payload": {
|
||||
"topica": "the_payload.topica_string",
|
||||
"topicb": "the_payload.topicb_string",
|
||||
"andThis": "another_item_non_key",
|
||||
}
|
||||
"customer_id": "sartography",
|
||||
"po_number": "1001",
|
||||
"amount": "One Billion Dollars! Mwhahahahahaha",
|
||||
"description": "But seriously."
|
||||
}
|
||||
|
||||
response = self.create_process_instance_from_process_model_id_with_api(
|
||||
client,
|
||||
process_model_identifier,
|
||||
|
@ -1478,20 +1495,29 @@ class TestProcessApi(BaseTest):
|
|||
assert response.json is not None
|
||||
process_instance_id = response.json["id"]
|
||||
|
||||
response = client.post(
|
||||
f"/v1.0/process-instances/{self.modify_process_identifier_for_path_param(process_model_identifier)}/{process_instance_id}/run",
|
||||
headers=self.logged_in_headers(with_super_admin_user),
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.json is not None
|
||||
|
||||
process_instance = ProcessInstanceModel.query.filter_by(
|
||||
id=process_instance_id
|
||||
).first()
|
||||
processor = ProcessInstanceProcessor(process_instance)
|
||||
processor.do_engine_steps(save=True)
|
||||
task = processor.get_all_user_tasks()[0]
|
||||
human_task = process_instance.active_human_tasks[0]
|
||||
spiff_task = processor.__class__.get_task_by_bpmn_identifier(
|
||||
human_task.task_name, processor.bpmn_process_instance
|
||||
)
|
||||
|
||||
ProcessInstanceService.complete_form_task(
|
||||
processor,
|
||||
task,
|
||||
payload,
|
||||
with_super_admin_user,
|
||||
human_task,
|
||||
)
|
||||
processor.save()
|
||||
|
||||
processor.suspend()
|
||||
payload['description'] = "Message To Suspended"
|
||||
response = client.post(
|
||||
f"/v1.0/messages/{message_model_identifier}",
|
||||
content_type="application/json",
|
||||
|
@ -1502,15 +1528,16 @@ class TestProcessApi(BaseTest):
|
|||
)
|
||||
assert response.status_code == 400
|
||||
assert response.json
|
||||
assert response.json["error_code"] == "process_instance_is_suspended"
|
||||
assert response.json["error_code"] == "message_not_accepted"
|
||||
|
||||
processor.resume()
|
||||
payload['description'] = "Message To Resumed"
|
||||
response = client.post(
|
||||
f"/v1.0/messages/{message_model_identifier}",
|
||||
content_type="application/json",
|
||||
headers=self.logged_in_headers(with_super_admin_user),
|
||||
data=json.dumps(
|
||||
{"payload": payload, "process_instance_id": process_instance_id}
|
||||
{"payload": payload}
|
||||
),
|
||||
)
|
||||
assert response.status_code == 200
|
||||
|
@ -1538,7 +1565,7 @@ class TestProcessApi(BaseTest):
|
|||
)
|
||||
assert response.status_code == 400
|
||||
assert response.json
|
||||
assert response.json["error_code"] == "process_instance_is_terminated"
|
||||
assert response.json["error_code"] == "message_not_accepted"
|
||||
|
||||
def test_process_instance_can_be_terminated(
|
||||
self,
|
||||
|
@ -2293,11 +2320,12 @@ class TestProcessApi(BaseTest):
|
|||
# process_model_source_directory="message_send_one_conversation",
|
||||
# bpmn_file_name="message_receiver",
|
||||
# )
|
||||
message_model_identifier = "message_send"
|
||||
message_model_identifier = "request_approval"
|
||||
payload = {
|
||||
"topica": "the_topica_string",
|
||||
"topicb": "the_topicb_string",
|
||||
"andThis": "another_item_non_key",
|
||||
"customer_id": "sartography",
|
||||
"po_number": "1001",
|
||||
"amount": "One Billion Dollars! Mwhahahahahaha",
|
||||
"description": "But seriously."
|
||||
}
|
||||
response = client.post(
|
||||
f"/v1.0/messages/{message_model_identifier}",
|
||||
|
|
Loading…
Reference in New Issue