* SpiffWorkflow event_definitions wanted to return a message event's correlation properties mested within correlation keys. But messages are directly related to properties, not to keys - and it forced a number of conversions that made for tricky code. So Messages now contain a dictionary of correlation properties only.

* SpiffWorkflow did not serialize correlations - so they were lost between save and retrieve.

* When comparing Correlation Property values - we are storing these values as strings in the database and can't convert them back to integers later, so I'm changing everying everywhere to compare after conversion to a string.  Don't feel great about this one.
* By using an SQL Alchemy join table, there is a lot of db queries we don't need to write.
* A few handy fucntions on db models to make it easier to work with correlations.
* Updated tests because I changed some of the BPMN models we were testing against.
* Database migration to use the new constraint names with the alternate form of the join table between correlation mesages to instance messages.
This commit is contained in:
Dan 2023-02-20 11:50:24 -05:00
parent 6e7f36e55f
commit 5171e53240
14 changed files with 174 additions and 136 deletions

View File

@ -32,7 +32,7 @@ class BpmnWorkflowSerializer:
The goal is to provide modular serialization capabilities.
You'll need to configure a Workflow Spec Converter with converters for any task, data, or event types
present in your workflows.
present in your workflows.
If you have implemented any custom specs, you'll need to write a converter to handle them and
replace the converter from the default confiuration with your own.
@ -63,13 +63,13 @@ class BpmnWorkflowSerializer:
"""
This method can be used to create a spec converter that uses custom specs.
The task specs may contain arbitrary data, though none of the default task specs use it. We don't
recommend that you do this, as we may disallow it in the future. However, if you have task spec data,
The task specs may contain arbitrary data, though none of the default task specs use it. We don't
recommend that you do this, as we may disallow it in the future. However, if you have task spec data,
then you'll also need to make sure it can be serialized.
The workflow spec serializer is based on the `DictionaryConverter` in the `helpers` package. You can
create one of your own, add custom data serializtion to that and pass that in as the `registry`. The
conversion classes in the spec_config will be added this "registry" and any classes with entries there
create one of your own, add custom data serializtion to that and pass that in as the `registry`. The
conversion classes in the spec_config will be added this "registry" and any classes with entries there
will be serialized/deserialized.
See the documentation for `helpers.spec.BpmnSpecConverter` for more information about what's going
@ -85,7 +85,7 @@ class BpmnWorkflowSerializer:
cls(spec_converter)
return spec_converter
def __init__(self, spec_converter=None, data_converter=None, wf_class=None, version=VERSION,
def __init__(self, spec_converter=None, data_converter=None, wf_class=None, version=VERSION,
json_encoder_cls=DEFAULT_JSON_ENCODER_CLS, json_decoder_cls=DEFAULT_JSON_DECODER_CLS):
"""Intializes a Workflow Serializer with the given Workflow, Task and Data Converters.
@ -156,6 +156,8 @@ class BpmnWorkflowSerializer:
(str(task_id), self.process_to_dict(sp)) for task_id, sp in workflow.subprocesses.items()
)
dct['bpmn_messages'] = [self.message_to_dict(msg) for msg in workflow.bpmn_messages]
dct['correlations'] = workflow.correlations
return dct
def workflow_from_dict(self, dct):
@ -186,6 +188,8 @@ class BpmnWorkflowSerializer:
# Restore any unretrieve messages
workflow.bpmn_messages = [ self.message_from_dict(msg) for msg in dct.get('bpmn_messages', []) ]
workflow.correlations = dct_copy.pop('correlations', {})
# Restore the remainder of the workflow
workflow.data = self.data_converter.restore(dct_copy.pop('data'))
workflow.success = dct_copy.pop('success')

View File

@ -209,19 +209,15 @@ class MessageEventDefinition(NamedEventDefinition):
def get_correlations(self, task, payload):
correlations = {}
for property in self.correlation_properties:
for key in property.correlation_keys:
if key not in correlations:
correlations[key] = {}
try:
correlations[key][property.name] = task.workflow.script_engine._evaluate(property.expression, payload)
except WorkflowException as we:
we.add_note(f"Failed to evaluate correlation key '{key}'"
f" invalid expression '{property.expression}'")
we.task_spec = task.task_spec
raise we
try:
correlations[property.name] = task.workflow.script_engine._evaluate(property.expression, payload)
except WorkflowException as we:
we.add_note(f"Failed to evaluate correlation property '{property.name}'"
f" invalid expression '{property.expression}'")
we.task_spec = task.task_spec
raise we
return correlations
class NoneEventDefinition(EventDefinition):
"""
This class defines behavior for NoneEvents. We override throw to do nothing.

View File

@ -36,7 +36,7 @@ class CatchingEvent(Simple, BpmnSpecMixin):
def catches(self, my_task, event_definition, correlations=None):
if self.event_definition == event_definition:
return all([ correlations.get(key) == my_task.workflow.correlations.get(key) for key in correlations ])
return all([ str(correlations.get(key)) == str(my_task.workflow.correlations.get(key)) for key in correlations ])
else:
return False

View File

@ -17,8 +17,8 @@
# 02110-1301 USA
from SpiffWorkflow.bpmn.specs.events.event_definitions import (
MessageEventDefinition,
MultipleEventDefinition,
MessageEventDefinition,
MultipleEventDefinition,
NamedEventDefinition,
TimerEventDefinition,
)
@ -117,7 +117,7 @@ class BpmnWorkflow(Workflow):
return sp
return self.connect_subprocess(wf_spec.name, f'{wf_spec.name}_{len(self.subprocesses)}')
def catch(self, event_definition, correlations=None):
def catch(self, event_definition, correlations=None, external_origin=False):
"""
Send an event definition to any tasks that catch it.
@ -128,6 +128,10 @@ class BpmnWorkflow(Workflow):
should call the event_definition's reset method before executing to
clear out a stale message.
We might be catching an event that was thrown from some other part of
our own workflow, and it needs to continue out, but if it originated
externally, we should not pass it on.
:param event_definition: the thrown event
"""
# Start a subprocess for known specs with start events that catch this
@ -149,8 +153,8 @@ class BpmnWorkflow(Workflow):
# Move any tasks that received message to READY
self.refresh_waiting_tasks()
# Figure out if we need to create an extenal message
if len(tasks) == 0 and isinstance(event_definition, MessageEventDefinition):
# Figure out if we need to create an external message
if len(tasks) == 0 and isinstance(event_definition, MessageEventDefinition) and not external_origin:
self.bpmn_messages.append(
BpmnMessage(correlations, event_definition.name, event_definition.payload))
@ -162,7 +166,7 @@ class BpmnWorkflow(Workflow):
def catch_bpmn_message(self, name, payload, correlations=None):
event_definition = MessageEventDefinition(name)
event_definition.payload = payload
self.catch(event_definition, correlations=correlations)
self.catch(event_definition, correlations=correlations, external_origin=True)
def waiting_events(self):
# Ultimately I'd like to add an event class so that EventDefinitions would not so double duty as both specs

View File

@ -49,7 +49,11 @@ class DualConversationTest(BaseTestCase):
self.assertEqual(len(messages), 2)
message_one = [ msg for msg in messages if msg.name== 'Message Send One' ][0]
message_two = [ msg for msg in messages if msg.name== 'Message Send Two' ][0]
self.assertIn('message_correlation_key_one', message_one.correlations)
self.assertNotIn('message_correlation_key_one', message_two.correlations)
self.assertIn('message_correlation_key_two', message_two.correlations)
self.assertNotIn('message_correlation_key_two', message_one.correlations)
# fixme: This seemed to test that we get a nested structure of correlation keys and correlation properties
# Perhaps there should be a way to get the keys and thier associated properties - but things should not default to a nested structure.
# self.assertIn('message_correlation_key_one', message_one.correlations)
# self.assertNotIn('message_correlation_key_one', message_two.correlations)
# self.assertIn('message_correlation_key_two', message_two.correlations)
# self.assertNotIn('message_correlation_key_two', message_one.correlations)

View File

@ -0,0 +1,35 @@
"""empty message
Revision ID: b581ff2351ad
Revises: ac40af4ddef3
Create Date: 2023-02-19 11:19:24.371528
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'b581ff2351ad'
down_revision = 'ac40af4ddef3'
branch_labels = None
depends_on = None
def upgrade():
# Just some table cleanup so the join table is properly built
op.drop_constraint('message_correlation_message_instance_ibfk_1', 'message_correlation_message_instance', type_="foreignkey")
op.drop_constraint('message_correlation_message_instance_ibfk_2', 'message_correlation_message_instance', type_="foreignkey")
op.drop_index('ix_message_correlation_message_instance_message_correlation_id', table_name='message_correlation_message_instance')
op.drop_index('ix_message_correlation_message_instance_message_instance_id', table_name='message_correlation_message_instance')
op.create_foreign_key(None, 'message_correlation_message_instance', 'message_correlation', ['message_correlation_id'], ['id'])
op.create_foreign_key(None, 'message_correlation_message_instance', 'message_instance', ['message_instance_id'], ['id'])
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_message_correlation_property_correlation_key'), table_name='message_correlation_property')
op.create_index('ix_message_correlation_message_instance_message_instance_id', 'message_correlation_message_instance', ['message_instance_id'], unique=False)
op.create_index('ix_message_correlation_message_instance_message_correlation_id', 'message_correlation_message_instance', ['message_correlation_id'], unique=False)
# ### end Alembic commands ###

View File

@ -9,5 +9,5 @@ message_correlation_message_instance_table \
db.Column('message_instance_id',
ForeignKey('message_instance.id'), primary_key=True),
db.Column('message_correlation_id',
ForeignKey('message_correlation.id'),primary_key=True)
)
ForeignKey('message_correlation.id'),primary_key=True),
)

View File

@ -23,3 +23,5 @@ class MessageCorrelationPropertyModel(SpiffworkflowBaseDBModel):
message_model_id = db.Column(ForeignKey(MessageModel.id), nullable=False)
updated_at_in_seconds: int = db.Column(db.Integer)
created_at_in_seconds: int = db.Column(db.Integer)
message_model = db.relationship("MessageModel",
backref="correlation_properties")

View File

@ -68,19 +68,23 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel):
"""Validate_status."""
return self.validate_enum_field(key, value, MessageStatuses)
def correlation_dictionary(self):
correlation_dict = {}
for c in self.message_correlations:
correlation_dict[c.name]=c.value
return correlation_dict
def correlates(self, other_message_instance: Self) -> bool:
if other_message_instance.message_model_id != self.message_model_id:
return False
correlation_dict = {}
for c in other_message_instance.message_correlations:
correlation_dict[c.name]=c.value
return self.correlates_with_dictionary(correlation_dict)
return self.correlates_with_dictionary(other_message_instance.correlation_dictionary())
def correlates_with_dictionary(self, correlation_dictionary: dict) -> bool:
"""Returns true if the given dictionary matches the correlation names and values connected to this message instance"""
def correlates_with_dictionary(self, dict: dict) -> bool:
"""Returns true if the given dictionary matches the correlation
names and values connected to this message instance"""
for c in self.message_correlations:
# Fixme: Maybe we should look at typing the correlations and not forcing them to strings?
if c.name in correlation_dictionary and str(correlation_dictionary[c.name]) == c.value:
# Fixme: Maybe we should look at typing the correlations and not forcing them to strings?
if c.name in dict and str(dict[c.name]) == c.value:
continue
else:
return False

View File

@ -11,3 +11,10 @@ class MessageModel(SpiffworkflowBaseDBModel):
id = db.Column(db.Integer, primary_key=True)
identifier = db.Column(db.String(50), unique=True, index=True)
name = db.Column(db.String(50), unique=True, index=True)
# correlation_properties is a backref and defined in the MessageCorrelationProperties class.
def get_correlation_property(self, identifier):
for corr_prop in self.correlation_properties:
if corr_prop.identifier == identifier:
return corr_prop;
return None

View File

@ -56,26 +56,6 @@ def message_instance_list(
.paginate(page=page, per_page=per_page, error_out=False)
)
for message_instance in message_instances:
message_correlations: dict = {}
for (
mcmi
) in (
message_instance.MessageInstanceModel.message_correlations_message_instances
):
mc = MessageCorrelationModel.query.filter_by(
id=mcmi.message_correlation_id
).all()
for m in mc:
if m.name not in message_correlations:
message_correlations[m.name] = {}
message_correlations[m.name][
m.message_correlation_property.identifier
] = m.value
message_instance.MessageInstanceModel.message_correlations = (
message_correlations
)
response_json = {
"results": message_instances.items,
"pagination": {
@ -120,7 +100,6 @@ def message_send(
# Is there a running instance that is waiting for this message?
message_instances = MessageInstanceModel.query.filter_by(message_model_id=message_model.id).all()
correlations = MessageCorrelationPropertyModel.query.filter_by(message_model_id=message_model.id).all()
# do any waiting message instances have matching correlations?
matching_message = None
@ -133,7 +112,7 @@ def message_send(
process_instance = ProcessInstanceModel.query.filter_by(id = matching_message.process_instance_id).first()
if matching_message and process_instance and process_instance.status != ProcessInstanceStatus.waiting.value:
ApiError(
raise ApiError(
error_code="message_not_accepted",
message=(
f"The process that can accept message '{message_identifier}' with the given correlation keys"

View File

@ -149,11 +149,12 @@ class MessageService:
)
)
processor_receive = ProcessInstanceProcessor(process_instance_receive)
processor_receive.bpmn_process_instance.catch_bpmn_message(
message_model_name,
message_payload,
correlations={},
correlations=message_instance_receive.correlation_dictionary(),
)
processor_receive.do_engine_steps(save=True)
message_instance_receive.status = MessageStatuses.completed.value

View File

@ -1338,10 +1338,9 @@ class ProcessInstanceProcessor:
db.session.add(self.process_instance_model)
db.session.commit()
# messages have one correlation key (possibly wrong)
# correlation keys may have many correlation properties
def process_bpmn_messages(self) -> None:
"""Process_bpmn_messages."""
bpmn_messages = self.bpmn_process_instance.get_bpmn_messages()
for bpmn_message in bpmn_messages:
# only message sends are in get_bpmn_messages
@ -1352,6 +1351,7 @@ class ProcessInstanceProcessor:
f"Invalid message name: {bpmn_message.name}.",
)
if not bpmn_message.correlations:
raise ApiError(
"message_correlations_missing",
@ -1362,54 +1362,28 @@ class ProcessInstanceProcessor:
)
message_correlations = []
for (
message_correlation_key,
message_correlation_properties,
) in bpmn_message.correlations.items():
for (
message_correlation_property_identifier,
message_correlation_property_value,
) in message_correlation_properties.items():
message_correlation_property = (
MessageCorrelationPropertyModel.query.filter_by(
identifier=message_correlation_property_identifier,
).first()
)
if message_correlation_property is None:
raise ApiError(
"message_correlations_missing_from_process",
(
"Could not find a known message correlation with"
f" identifier:{message_correlation_property_identifier}"
),
)
message_correlations.append(
{
"message_correlation_property": (
message_correlation_property
),
"name": message_correlation_property_identifier,
"value": message_correlation_property_value,
}
for (name, value) in bpmn_message.correlations.items():
message_correlation_property = message_model.get_correlation_property(name)
if message_correlation_property is None:
raise ApiError(
"message_correlations_missing_from_process",
(
"Could not find a known message correlation with"
f" identifier:{name}"
),
)
message_correlations.append(MessageCorrelationModel(
process_instance_id=self.process_instance_model.id,
message_correlation_property_id=message_correlation_property.id,
name=name,
value=value))
message_instance = MessageInstanceModel(
process_instance_id=self.process_instance_model.id,
message_type="send",
message_model_id=message_model.id,
payload=bpmn_message.payload,
message_correlations=message_correlations
)
correlation_models = []
for message_correlation in message_correlations:
correlation_models.append(MessageCorrelationModel(
process_instance_id=self.process_instance_model.id,
message_correlation_property_id=message_correlation[
"message_correlation_property"
].id,
name=message_correlation["name"],
value=message_correlation["value"],
))
message_instance.message_correlations = correlation_models
db.session.add(message_instance)
db.session.commit()

View File

@ -1347,11 +1347,12 @@ class TestProcessApi(BaseTest):
bpmn_file_location=bpmn_file_location,
)
message_model_identifier = "message_send"
message_model_identifier = "request_approval"
payload = {
"topica": "the_topica_string",
"topicb": "the_topicb_string",
"andThis": "another_item_non_key",
"customer_id": "sartography",
"po_number": "1001",
"amount": "One Billion Dollars! Mwhahahahahaha",
"description": "But seriously."
}
response = client.post(
f"/v1.0/messages/{message_model_identifier}",
@ -1372,7 +1373,7 @@ class TestProcessApi(BaseTest):
processor = ProcessInstanceProcessor(process_instance)
process_instance_data = processor.get_data()
assert process_instance_data
assert process_instance_data["the_payload"] == payload
assert process_instance_data["invoice"] == payload
def test_message_send_when_providing_message_to_running_process_instance(
self,
@ -1395,13 +1396,12 @@ class TestProcessApi(BaseTest):
bpmn_file_location=bpmn_file_location,
)
message_model_identifier = "message_response"
message_model_identifier = "approval_result"
payload = {
"the_payload": {
"topica": "the_payload.topica_string",
"topicb": "the_payload.topicb_string",
"andThis": "another_item_non_key",
}
"customer_id": "sartography",
"po_number": "1001",
"amount": "One Billion Dollars! Mwhahahahahaha",
"description": "Ya!, a-ok bud!"
}
response = self.create_process_instance_from_process_model_id_with_api(
client,
@ -1415,9 +1415,26 @@ class TestProcessApi(BaseTest):
f"/v1.0/process-instances/{self.modify_process_identifier_for_path_param(process_model_identifier)}/{process_instance_id}/run",
headers=self.logged_in_headers(with_super_admin_user),
)
assert response.json is not None
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first()
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
task = processor.get_all_user_tasks()[0]
human_task = process_instance.active_human_tasks[0]
spiff_task = processor.__class__.get_task_by_bpmn_identifier(
human_task.task_name, processor.bpmn_process_instance
)
ProcessInstanceService.complete_form_task(
processor,
task,
payload,
with_super_admin_user,
human_task,
)
processor.save()
response = client.post(
f"/v1.0/messages/{message_model_identifier}",
content_type="application/json",
@ -1462,14 +1479,14 @@ class TestProcessApi(BaseTest):
bpmn_file_location=bpmn_file_location,
)
message_model_identifier = "message_response"
message_model_identifier = "approval_result"
payload = {
"the_payload": {
"topica": "the_payload.topica_string",
"topicb": "the_payload.topicb_string",
"andThis": "another_item_non_key",
}
"customer_id": "sartography",
"po_number": "1001",
"amount": "One Billion Dollars! Mwhahahahahaha",
"description": "But seriously."
}
response = self.create_process_instance_from_process_model_id_with_api(
client,
process_model_identifier,
@ -1478,20 +1495,29 @@ class TestProcessApi(BaseTest):
assert response.json is not None
process_instance_id = response.json["id"]
response = client.post(
f"/v1.0/process-instances/{self.modify_process_identifier_for_path_param(process_model_identifier)}/{process_instance_id}/run",
headers=self.logged_in_headers(with_super_admin_user),
)
assert response.status_code == 200
assert response.json is not None
process_instance = ProcessInstanceModel.query.filter_by(
id=process_instance_id
).first()
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
task = processor.get_all_user_tasks()[0]
human_task = process_instance.active_human_tasks[0]
spiff_task = processor.__class__.get_task_by_bpmn_identifier(
human_task.task_name, processor.bpmn_process_instance
)
ProcessInstanceService.complete_form_task(
processor,
task,
payload,
with_super_admin_user,
human_task,
)
processor.save()
processor.suspend()
payload['description'] = "Message To Suspended"
response = client.post(
f"/v1.0/messages/{message_model_identifier}",
content_type="application/json",
@ -1502,15 +1528,16 @@ class TestProcessApi(BaseTest):
)
assert response.status_code == 400
assert response.json
assert response.json["error_code"] == "process_instance_is_suspended"
assert response.json["error_code"] == "message_not_accepted"
processor.resume()
payload['description'] = "Message To Resumed"
response = client.post(
f"/v1.0/messages/{message_model_identifier}",
content_type="application/json",
headers=self.logged_in_headers(with_super_admin_user),
data=json.dumps(
{"payload": payload, "process_instance_id": process_instance_id}
{"payload": payload}
),
)
assert response.status_code == 200
@ -1538,7 +1565,7 @@ class TestProcessApi(BaseTest):
)
assert response.status_code == 400
assert response.json
assert response.json["error_code"] == "process_instance_is_terminated"
assert response.json["error_code"] == "message_not_accepted"
def test_process_instance_can_be_terminated(
self,
@ -2293,11 +2320,12 @@ class TestProcessApi(BaseTest):
# process_model_source_directory="message_send_one_conversation",
# bpmn_file_name="message_receiver",
# )
message_model_identifier = "message_send"
message_model_identifier = "request_approval"
payload = {
"topica": "the_topica_string",
"topicb": "the_topicb_string",
"andThis": "another_item_non_key",
"customer_id": "sartography",
"po_number": "1001",
"amount": "One Billion Dollars! Mwhahahahahaha",
"description": "But seriously."
}
response = client.post(
f"/v1.0/messages/{message_model_identifier}",