* SpiffWorkflow event_definitions wanted to return a message event's correlation properties mested within correlation keys. But messages are directly related to properties, not to keys - and it forced a number of conversions that made for tricky code. So Messages now contain a dictionary of correlation properties only.

* SpiffWorkflow did not serialize correlations - so they were lost between save and retrieve.

* When comparing Correlation Property values - we are storing these values as strings in the database and can't convert them back to integers later, so I'm changing everying everywhere to compare after conversion to a string.  Don't feel great about this one.
* By using an SQL Alchemy join table, there is a lot of db queries we don't need to write.
* A few handy fucntions on db models to make it easier to work with correlations.
* Updated tests because I changed some of the BPMN models we were testing against.
* Database migration to use the new constraint names with the alternate form of the join table between correlation mesages to instance messages.
This commit is contained in:
Dan 2023-02-20 11:50:24 -05:00
parent d46502cd2b
commit 5f6a61c93f
9 changed files with 138 additions and 108 deletions

View File

@ -0,0 +1,35 @@
"""empty message
Revision ID: b581ff2351ad
Revises: ac40af4ddef3
Create Date: 2023-02-19 11:19:24.371528
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'b581ff2351ad'
down_revision = 'ac40af4ddef3'
branch_labels = None
depends_on = None
def upgrade():
# Just some table cleanup so the join table is properly built
op.drop_constraint('message_correlation_message_instance_ibfk_1', 'message_correlation_message_instance', type_="foreignkey")
op.drop_constraint('message_correlation_message_instance_ibfk_2', 'message_correlation_message_instance', type_="foreignkey")
op.drop_index('ix_message_correlation_message_instance_message_correlation_id', table_name='message_correlation_message_instance')
op.drop_index('ix_message_correlation_message_instance_message_instance_id', table_name='message_correlation_message_instance')
op.create_foreign_key(None, 'message_correlation_message_instance', 'message_correlation', ['message_correlation_id'], ['id'])
op.create_foreign_key(None, 'message_correlation_message_instance', 'message_instance', ['message_instance_id'], ['id'])
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_message_correlation_property_correlation_key'), table_name='message_correlation_property')
op.create_index('ix_message_correlation_message_instance_message_instance_id', 'message_correlation_message_instance', ['message_instance_id'], unique=False)
op.create_index('ix_message_correlation_message_instance_message_correlation_id', 'message_correlation_message_instance', ['message_correlation_id'], unique=False)
# ### end Alembic commands ###

View File

@ -9,5 +9,5 @@ message_correlation_message_instance_table \
db.Column('message_instance_id',
ForeignKey('message_instance.id'), primary_key=True),
db.Column('message_correlation_id',
ForeignKey('message_correlation.id'),primary_key=True)
ForeignKey('message_correlation.id'),primary_key=True),
)

View File

@ -23,3 +23,5 @@ class MessageCorrelationPropertyModel(SpiffworkflowBaseDBModel):
message_model_id = db.Column(ForeignKey(MessageModel.id), nullable=False)
updated_at_in_seconds: int = db.Column(db.Integer)
created_at_in_seconds: int = db.Column(db.Integer)
message_model = db.relationship("MessageModel",
backref="correlation_properties")

View File

@ -68,19 +68,23 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
"""Validate_status."""
return self.validate_enum_field(key, value, MessageStatuses)
def correlation_dictionary(self):
correlation_dict = {}
for c in self.message_correlations:
correlation_dict[c.name]=c.value
return correlation_dict
def correlates(self, other_message_instance: Self) -> bool:
if other_message_instance.message_model_id != self.message_model_id:
return False
correlation_dict = {}
for c in other_message_instance.message_correlations:
correlation_dict[c.name]=c.value
return self.correlates_with_dictionary(correlation_dict)
return self.correlates_with_dictionary(other_message_instance.correlation_dictionary())
def correlates_with_dictionary(self, correlation_dictionary: dict) -> bool:
"""Returns true if the given dictionary matches the correlation names and values connected to this message instance"""
def correlates_with_dictionary(self, dict: dict) -> bool:
"""Returns true if the given dictionary matches the correlation
names and values connected to this message instance"""
for c in self.message_correlations:
# Fixme: Maybe we should look at typing the correlations and not forcing them to strings?
if c.name in correlation_dictionary and str(correlation_dictionary[c.name]) == c.value:
if c.name in dict and str(dict[c.name]) == c.value:
continue
else:
return False

View File

@ -11,3 +11,10 @@ class MessageModel(SpiffworkflowBaseDBModel):
id = db.Column(db.Integer, primary_key=True)
identifier = db.Column(db.String(50), unique=True, index=True)
name = db.Column(db.String(50), unique=True, index=True)
# correlation_properties is a backref and defined in the MessageCorrelationProperties class.
def get_correlation_property(self, identifier):
for corr_prop in self.correlation_properties:
if corr_prop.identifier == identifier:
return corr_prop;
return None

View File

@ -56,26 +56,6 @@ def message_instance_list(
.paginate(page=page, per_page=per_page, error_out=False)
)
for message_instance in message_instances:
message_correlations: dict = {}
for (
mcmi
) in (
message_instance.MessageInstanceModel.message_correlations_message_instances
):
mc = MessageCorrelationModel.query.filter_by(
id=mcmi.message_correlation_id
).all()
for m in mc:
if m.name not in message_correlations:
message_correlations[m.name] = {}
message_correlations[m.name][
m.message_correlation_property.identifier
] = m.value
message_instance.MessageInstanceModel.message_correlations = (
message_correlations
)
response_json = {
"results": message_instances.items,
"pagination": {
@ -120,7 +100,6 @@ def message_send(
# Is there a running instance that is waiting for this message?
message_instances = MessageInstanceModel.query.filter_by(message_model_id=message_model.id).all()
correlations = MessageCorrelationPropertyModel.query.filter_by(message_model_id=message_model.id).all()
# do any waiting message instances have matching correlations?
matching_message = None
@ -133,7 +112,7 @@ def message_send(
process_instance = ProcessInstanceModel.query.filter_by(id = matching_message.process_instance_id).first()
if matching_message and process_instance and process_instance.status != ProcessInstanceStatus.waiting.value:
ApiError(
raise ApiError(
error_code="message_not_accepted",
message=(
f"The process that can accept message '{message_identifier}' with the given correlation keys"

View File

@ -149,11 +149,12 @@ class MessageService:
)
)
processor_receive = ProcessInstanceProcessor(process_instance_receive)
processor_receive.bpmn_process_instance.catch_bpmn_message(
message_model_name,
message_payload,
correlations={},
correlations=message_instance_receive.correlation_dictionary(),
)
processor_receive.do_engine_steps(save=True)
message_instance_receive.status = MessageStatuses.completed.value

View File

@ -1338,10 +1338,9 @@ class ProcessInstanceProcessor:
db.session.add(self.process_instance_model)
db.session.commit()
# messages have one correlation key (possibly wrong)
# correlation keys may have many correlation properties
def process_bpmn_messages(self) -> None:
"""Process_bpmn_messages."""
bpmn_messages = self.bpmn_process_instance.get_bpmn_messages()
for bpmn_message in bpmn_messages:
# only message sends are in get_bpmn_messages
@ -1352,6 +1351,7 @@ class ProcessInstanceProcessor:
f"Invalid message name: {bpmn_message.name}.",
)
if not bpmn_message.correlations:
raise ApiError(
"message_correlations_missing",
@ -1362,54 +1362,28 @@ class ProcessInstanceProcessor:
)
message_correlations = []
for (
message_correlation_key,
message_correlation_properties,
) in bpmn_message.correlations.items():
for (
message_correlation_property_identifier,
message_correlation_property_value,
) in message_correlation_properties.items():
message_correlation_property = (
MessageCorrelationPropertyModel.query.filter_by(
identifier=message_correlation_property_identifier,
).first()
)
for (name, value) in bpmn_message.correlations.items():
message_correlation_property = message_model.get_correlation_property(name)
if message_correlation_property is None:
raise ApiError(
"message_correlations_missing_from_process",
(
"Could not find a known message correlation with"
f" identifier:{message_correlation_property_identifier}"
f" identifier:{name}"
),
)
message_correlations.append(
{
"message_correlation_property": (
message_correlation_property
),
"name": message_correlation_property_identifier,
"value": message_correlation_property_value,
}
)
message_correlations.append(MessageCorrelationModel(
process_instance_id=self.process_instance_model.id,
message_correlation_property_id=message_correlation_property.id,
name=name,
value=value))
message_instance = MessageInstanceModel(
process_instance_id=self.process_instance_model.id,
message_type="send",
message_model_id=message_model.id,
payload=bpmn_message.payload,
message_correlations=message_correlations
)
correlation_models = []
for message_correlation in message_correlations:
correlation_models.append(MessageCorrelationModel(
process_instance_id=self.process_instance_model.id,
message_correlation_property_id=message_correlation[
"message_correlation_property"
].id,
name=message_correlation["name"],
value=message_correlation["value"],
))
message_instance.message_correlations = correlation_models
db.session.add(message_instance)
db.session.commit()

View File

@ -1347,11 +1347,12 @@ class TestProcessApi(BaseTest):
bpmn_file_location=bpmn_file_location,
)
message_model_identifier = "message_send"
message_model_identifier = "request_approval"
payload = {
"topica": "the_topica_string",
"topicb": "the_topicb_string",
"andThis": "another_item_non_key",
"customer_id": "sartography",
"po_number": "1001",
"amount": "One Billion Dollars! Mwhahahahahaha",
"description": "But seriously."
}
response = client.post(
f"/v1.0/messages/{message_model_identifier}",
@ -1372,7 +1373,7 @@ class TestProcessApi(BaseTest):
processor = ProcessInstanceProcessor(process_instance)
process_instance_data = processor.get_data()
assert process_instance_data
assert process_instance_data["the_payload"] == payload
assert process_instance_data["invoice"] == payload
def test_message_send_when_providing_message_to_running_process_instance(
self,
@ -1395,13 +1396,12 @@ class TestProcessApi(BaseTest):
bpmn_file_location=bpmn_file_location,
)
message_model_identifier = "message_response"
message_model_identifier = "approval_result"
payload = {
"the_payload": {
"topica": "the_payload.topica_string",
"topicb": "the_payload.topicb_string",
"andThis": "another_item_non_key",
}
"customer_id": "sartography",
"po_number": "1001",
"amount": "One Billion Dollars! Mwhahahahahaha",
"description": "Ya!, a-ok bud!"
}
response = self.create_process_instance_from_process_model_id_with_api(
client,
@ -1415,9 +1415,26 @@ class TestProcessApi(BaseTest):
f"/v1.0/process-instances/{self.modify_process_identifier_for_path_param(process_model_identifier)}/{process_instance_id}/run",
headers=self.logged_in_headers(with_super_admin_user),
)
assert response.json is not None
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first()
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
task = processor.get_all_user_tasks()[0]
human_task = process_instance.active_human_tasks[0]
spiff_task = processor.__class__.get_task_by_bpmn_identifier(
human_task.task_name, processor.bpmn_process_instance
)
ProcessInstanceService.complete_form_task(
processor,
task,
payload,
with_super_admin_user,
human_task,
)
processor.save()
response = client.post(
f"/v1.0/messages/{message_model_identifier}",
content_type="application/json",
@ -1462,14 +1479,14 @@ class TestProcessApi(BaseTest):
bpmn_file_location=bpmn_file_location,
)
message_model_identifier = "message_response"
message_model_identifier = "approval_result"
payload = {
"the_payload": {
"topica": "the_payload.topica_string",
"topicb": "the_payload.topicb_string",
"andThis": "another_item_non_key",
}
"customer_id": "sartography",
"po_number": "1001",
"amount": "One Billion Dollars! Mwhahahahahaha",
"description": "But seriously."
}
response = self.create_process_instance_from_process_model_id_with_api(
client,
process_model_identifier,
@ -1478,20 +1495,29 @@ class TestProcessApi(BaseTest):
assert response.json is not None
process_instance_id = response.json["id"]
response = client.post(
f"/v1.0/process-instances/{self.modify_process_identifier_for_path_param(process_model_identifier)}/{process_instance_id}/run",
headers=self.logged_in_headers(with_super_admin_user),
)
assert response.status_code == 200
assert response.json is not None
process_instance = ProcessInstanceModel.query.filter_by(
id=process_instance_id
).first()
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
task = processor.get_all_user_tasks()[0]
human_task = process_instance.active_human_tasks[0]
spiff_task = processor.__class__.get_task_by_bpmn_identifier(
human_task.task_name, processor.bpmn_process_instance
)
ProcessInstanceService.complete_form_task(
processor,
task,
payload,
with_super_admin_user,
human_task,
)
processor.save()
processor.suspend()
payload['description'] = "Message To Suspended"
response = client.post(
f"/v1.0/messages/{message_model_identifier}",
content_type="application/json",
@ -1502,15 +1528,16 @@ class TestProcessApi(BaseTest):
)
assert response.status_code == 400
assert response.json
assert response.json["error_code"] == "process_instance_is_suspended"
assert response.json["error_code"] == "message_not_accepted"
processor.resume()
payload['description'] = "Message To Resumed"
response = client.post(
f"/v1.0/messages/{message_model_identifier}",
content_type="application/json",
headers=self.logged_in_headers(with_super_admin_user),
data=json.dumps(
{"payload": payload, "process_instance_id": process_instance_id}
{"payload": payload}
),
)
assert response.status_code == 200
@ -1538,7 +1565,7 @@ class TestProcessApi(BaseTest):
)
assert response.status_code == 400
assert response.json
assert response.json["error_code"] == "process_instance_is_terminated"
assert response.json["error_code"] == "message_not_accepted"
def test_process_instance_can_be_terminated(
self,
@ -2293,11 +2320,12 @@ class TestProcessApi(BaseTest):
# process_model_source_directory="message_send_one_conversation",
# bpmn_file_name="message_receiver",
# )
message_model_identifier = "message_send"
message_model_identifier = "request_approval"
payload = {
"topica": "the_topica_string",
"topicb": "the_topicb_string",
"andThis": "another_item_non_key",
"customer_id": "sartography",
"po_number": "1001",
"amount": "One Billion Dollars! Mwhahahahahaha",
"description": "But seriously."
}
response = client.post(
f"/v1.0/messages/{message_model_identifier}",