Merge pull request #161 from sartography/feature/messages_again_and_again

workflow.catch() was nice, in that it is where we could send events a…
This commit is contained in:
Dan Funk 2023-03-01 14:24:34 -05:00 committed by GitHub
commit 38a4fe18f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 87 additions and 11 deletions

View File

@ -221,6 +221,20 @@ class MessageEventDefinition(NamedEventDefinition):
raise we raise we
return correlation_keys return correlation_keys
def conversation(self):
"""An event may have many correlation properties, this figures out
which conversation exists across all of them, or return None if they
do not share a topic. """
conversation = None
if len(self.correlation_properties) > 0:
for prop in self.correlation_properties:
for key in prop.correlation_keys:
conversation = key
for prop in self.correlation_properties:
if conversation not in prop.correlation_keys:
break
return conversation
return None
class NoneEventDefinition(EventDefinition): class NoneEventDefinition(EventDefinition):

View File

@ -15,6 +15,7 @@
# License along with this library; if not, write to the Free Software # License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA # 02110-1301 USA
import copy
from SpiffWorkflow.bpmn.specs.events.event_definitions import ( from SpiffWorkflow.bpmn.specs.events.event_definitions import (
MessageEventDefinition, MessageEventDefinition,
@ -28,7 +29,7 @@ from .specs.events.StartEvent import StartEvent
from .specs.SubWorkflowTask import CallActivity from .specs.SubWorkflowTask import CallActivity
from ..task import TaskState, Task from ..task import TaskState, Task
from ..workflow import Workflow from ..workflow import Workflow
from ..exceptions import WorkflowException from ..exceptions import WorkflowException, WorkflowTaskException
class BpmnMessage: class BpmnMessage:
@ -117,9 +118,8 @@ class BpmnWorkflow(Workflow):
return sp return sp
return self.connect_subprocess(wf_spec.name, f'{wf_spec.name}_{len(self.subprocesses)}') return self.connect_subprocess(wf_spec.name, f'{wf_spec.name}_{len(self.subprocesses)}')
def catch(self, event_definition, correlations=None, external_origin=False): def catch(self, event_definition, correlations=None):
""" """
Send an event definition to any tasks that catch it.
Tasks can always catch events, regardless of their state. The Tasks can always catch events, regardless of their state. The
event information is stored in the tasks internal data and processed event information is stored in the tasks internal data and processed
@ -154,7 +154,7 @@ class BpmnWorkflow(Workflow):
self.refresh_waiting_tasks() self.refresh_waiting_tasks()
# Figure out if we need to create an external message # Figure out if we need to create an external message
if len(tasks) == 0 and isinstance(event_definition, MessageEventDefinition) and not external_origin: if len(tasks) == 0 and isinstance(event_definition, MessageEventDefinition):
self.bpmn_messages.append( self.bpmn_messages.append(
BpmnMessage(correlations, event_definition.name, event_definition.payload)) BpmnMessage(correlations, event_definition.name, event_definition.payload))
@ -163,10 +163,58 @@ class BpmnWorkflow(Workflow):
self.bpmn_messages = [] self.bpmn_messages = []
return messages return messages
def catch_bpmn_message(self, name, payload, correlations=None): def catch_bpmn_message(self, name, payload):
"""Allows this workflow to catch an externally generated bpmn message.
Raises an error if this workflow is not waiting on the given message."""
event_definition = MessageEventDefinition(name) event_definition = MessageEventDefinition(name)
event_definition.payload = payload event_definition.payload = payload
self.catch(event_definition, correlations=correlations, external_origin=True)
# There should be one and only be one task that can accept the message
# (messages are one to one, not one to many)
tasks = [t for t in self.get_waiting_tasks() if t.task_spec.event_definition == event_definition]
if len(tasks) == 0:
raise WorkflowException(
f"This process is not waiting on a message named '{event_definition.name}'")
if len(tasks) > 1:
raise WorkflowException(
f"This process has multiple tasks waiting on the same message '{event_definition.name}', which is not supported. ")
task = tasks[0]
conversation = task.task_spec.event_definition.conversation()
if not conversation:
raise WorkflowTaskException(
f"The waiting task and message payload can not be matched to any correlation key (conversation topic). "
f"And is therefor unable to respond to the given message.", task)
updated_props = self._correlate(conversation, payload, task)
task.task_spec.catch(task, event_definition)
self.refresh_waiting_tasks()
self.correlations[conversation] = updated_props
def _correlate(self, conversation, payload, task):
"""Assures that the provided payload correlates to the given
task's event definition and this workflows own correlation
properties. Returning an updated property list if successful"""
receive_event = task.task_spec.event_definition
current_props = self.correlations.get(conversation, {})
updated_props = copy.copy(current_props)
for prop in receive_event.correlation_properties:
try:
new_val = self.script_engine._evaluate(
prop.retrieval_expression, payload
)
except Exception as e:
raise WorkflowTaskException("Unable to accept the BPMN message. "
"The payload must contain "
f"'{prop.retrieval_expression}'", task, e)
if prop.name in current_props and \
new_val != updated_props[prop.name]:
raise WorkflowTaskException("Unable to accept the BPMN message. "
"The payload does not match. Expected "
f"'{prop.retrieval_expression}' to equal "
f"{current_props[prop.name]}.", task)
else:
updated_props[prop.name] = new_val
return updated_props
def waiting_events(self): def waiting_events(self):
# Ultimately I'd like to add an event class so that EventDefinitions would not so double duty as both specs # Ultimately I'd like to add an event class so that EventDefinitions would not so double duty as both specs

View File

@ -1,5 +1,5 @@
from SpiffWorkflow.bpmn.specs.SubWorkflowTask import CallActivity from SpiffWorkflow.bpmn.specs.SubWorkflowTask import CallActivity
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow from SpiffWorkflow.bpmn.workflow import BpmnWorkflow, BpmnMessage
from SpiffWorkflow.task import TaskState from SpiffWorkflow.task import TaskState
from tests.SpiffWorkflow.bpmn.BpmnWorkflowTestCase import BpmnWorkflowTestCase from tests.SpiffWorkflow.bpmn.BpmnWorkflowTestCase import BpmnWorkflowTestCase
@ -60,12 +60,18 @@ class CollaborationTest(BpmnWorkflowTestCase):
self.assertEqual('from_name', events[0]['value'][0].retrieval_expression) self.assertEqual('from_name', events[0]['value'][0].retrieval_expression)
self.assertEqual('lover_name', events[0]['value'][0].name) self.assertEqual('lover_name', events[0]['value'][0].name)
workflow.catch_bpmn_message('Love Letter Response', messages[0].payload, # As shown above, the waiting event is looking for a payload with a
messages[0].correlations) # 'from_name' that should be used to retrieve the lover's name.
new_message_payload = {'from_name': 'Peggy', 'other_nonsense': 1001}
workflow.catch_bpmn_message('Love Letter Response', new_message_payload)
workflow.do_engine_steps() workflow.do_engine_steps()
# The external message created above should be caught # The external message created above should be caught
self.assertEqual(receive.state, TaskState.COMPLETED) self.assertEqual(receive.state, TaskState.COMPLETED)
self.assertEqual(receive.data, messages[0].payload) # Spiff extensions allow us to specify the destination of a workflow
# but base BPMN does not, and all keys are added directly to the
# task data.
self.assertEqual(workflow.last_task.data, {'from_name': 'Peggy', 'lover_name': 'Peggy', 'other_nonsense': 1001})
self.assertEqual(workflow.correlations, {'lover':{'lover_name':'Peggy'}})
self.assertEqual(workflow.is_completed(), True) self.assertEqual(workflow.is_completed(), True)
def testCorrelation(self): def testCorrelation(self):

View File

@ -30,6 +30,12 @@ groups:
admin@spiffworkflow.org, admin@spiffworkflow.org,
nelson@spiffworkflow.org nelson@spiffworkflow.org
] ]
approvers:
users:
[
malala@spiffworkflow.org,
oskar@spiffworkflow.org
]
permissions: permissions:
# Admins have access to everything. # Admins have access to everything.

View File

@ -350,6 +350,8 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
external_methods: Optional[Dict[str, Any]] = None, external_methods: Optional[Dict[str, Any]] = None,
) -> Any: ) -> Any:
"""_evaluate.""" """_evaluate."""
methods = {}
if task:
methods = self.__get_augment_methods(task) methods = self.__get_augment_methods(task)
if external_methods: if external_methods:
methods.update(external_methods) methods.update(external_methods)