Squashed 'SpiffWorkflow/' changes from 161cb7a45..bee868d38
bee868d38 Merge pull request #163 from sartography/feature/process_name_for_log_list c0da286d9 use workflow_spec to match task_spec naming w/ burnettk ac9e11927 Merge commit '71f8c94096534112c8a08f202f8bb0e6f81ed92f' into main 5bf6f3814 prefer the bpmn process name over the identifier on the logs list page w/ burnettk dc511b082 workflow.catch() was nice, in that it is where we could send events and messages. With this change sending an event to catch will behave incorrectly for BPMN Messages. Only sending it to the right method will create the desired result. It also adds a lot of additional code. Would love a careful review of this, and any optimizations anyone can think of. git-subtree-dir: SpiffWorkflow git-subtree-split: bee868d38b2c3da680c7a96b6a634d16b90d5861
This commit is contained in:
parent
71f8c94096
commit
cd211a455e
|
@ -221,6 +221,20 @@ class MessageEventDefinition(NamedEventDefinition):
|
|||
raise we
|
||||
return correlation_keys
|
||||
|
||||
def conversation(self):
|
||||
"""An event may have many correlation properties, this figures out
|
||||
which conversation exists across all of them, or return None if they
|
||||
do not share a topic. """
|
||||
conversation = None
|
||||
if len(self.correlation_properties) > 0:
|
||||
for prop in self.correlation_properties:
|
||||
for key in prop.correlation_keys:
|
||||
conversation = key
|
||||
for prop in self.correlation_properties:
|
||||
if conversation not in prop.correlation_keys:
|
||||
break
|
||||
return conversation
|
||||
return None
|
||||
|
||||
|
||||
class NoneEventDefinition(EventDefinition):
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
# License along with this library; if not, write to the Free Software
|
||||
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
||||
# 02110-1301 USA
|
||||
import copy
|
||||
|
||||
from SpiffWorkflow.bpmn.specs.events.event_definitions import (
|
||||
MessageEventDefinition,
|
||||
|
@ -28,7 +29,7 @@ from .specs.events.StartEvent import StartEvent
|
|||
from .specs.SubWorkflowTask import CallActivity
|
||||
from ..task import TaskState, Task
|
||||
from ..workflow import Workflow
|
||||
from ..exceptions import WorkflowException
|
||||
from ..exceptions import WorkflowException, WorkflowTaskException
|
||||
|
||||
|
||||
class BpmnMessage:
|
||||
|
@ -117,9 +118,8 @@ class BpmnWorkflow(Workflow):
|
|||
return sp
|
||||
return self.connect_subprocess(wf_spec.name, f'{wf_spec.name}_{len(self.subprocesses)}')
|
||||
|
||||
def catch(self, event_definition, correlations=None, external_origin=False):
|
||||
def catch(self, event_definition, correlations=None):
|
||||
"""
|
||||
Send an event definition to any tasks that catch it.
|
||||
|
||||
Tasks can always catch events, regardless of their state. The
|
||||
event information is stored in the tasks internal data and processed
|
||||
|
@ -154,7 +154,7 @@ class BpmnWorkflow(Workflow):
|
|||
self.refresh_waiting_tasks()
|
||||
|
||||
# Figure out if we need to create an external message
|
||||
if len(tasks) == 0 and isinstance(event_definition, MessageEventDefinition) and not external_origin:
|
||||
if len(tasks) == 0 and isinstance(event_definition, MessageEventDefinition):
|
||||
self.bpmn_messages.append(
|
||||
BpmnMessage(correlations, event_definition.name, event_definition.payload))
|
||||
|
||||
|
@ -163,10 +163,58 @@ class BpmnWorkflow(Workflow):
|
|||
self.bpmn_messages = []
|
||||
return messages
|
||||
|
||||
def catch_bpmn_message(self, name, payload, correlations=None):
|
||||
def catch_bpmn_message(self, name, payload):
|
||||
"""Allows this workflow to catch an externally generated bpmn message.
|
||||
Raises an error if this workflow is not waiting on the given message."""
|
||||
event_definition = MessageEventDefinition(name)
|
||||
event_definition.payload = payload
|
||||
self.catch(event_definition, correlations=correlations, external_origin=True)
|
||||
|
||||
# There should be one and only be one task that can accept the message
|
||||
# (messages are one to one, not one to many)
|
||||
tasks = [t for t in self.get_waiting_tasks() if t.task_spec.event_definition == event_definition]
|
||||
if len(tasks) == 0:
|
||||
raise WorkflowException(
|
||||
f"This process is not waiting on a message named '{event_definition.name}'")
|
||||
if len(tasks) > 1:
|
||||
raise WorkflowException(
|
||||
f"This process has multiple tasks waiting on the same message '{event_definition.name}', which is not supported. ")
|
||||
|
||||
task = tasks[0]
|
||||
conversation = task.task_spec.event_definition.conversation()
|
||||
if not conversation:
|
||||
raise WorkflowTaskException(
|
||||
f"The waiting task and message payload can not be matched to any correlation key (conversation topic). "
|
||||
f"And is therefor unable to respond to the given message.", task)
|
||||
updated_props = self._correlate(conversation, payload, task)
|
||||
task.task_spec.catch(task, event_definition)
|
||||
self.refresh_waiting_tasks()
|
||||
self.correlations[conversation] = updated_props
|
||||
|
||||
def _correlate(self, conversation, payload, task):
|
||||
"""Assures that the provided payload correlates to the given
|
||||
task's event definition and this workflows own correlation
|
||||
properties. Returning an updated property list if successful"""
|
||||
receive_event = task.task_spec.event_definition
|
||||
current_props = self.correlations.get(conversation, {})
|
||||
updated_props = copy.copy(current_props)
|
||||
for prop in receive_event.correlation_properties:
|
||||
try:
|
||||
new_val = self.script_engine._evaluate(
|
||||
prop.retrieval_expression, payload
|
||||
)
|
||||
except Exception as e:
|
||||
raise WorkflowTaskException("Unable to accept the BPMN message. "
|
||||
"The payload must contain "
|
||||
f"'{prop.retrieval_expression}'", task, e)
|
||||
if prop.name in current_props and \
|
||||
new_val != updated_props[prop.name]:
|
||||
raise WorkflowTaskException("Unable to accept the BPMN message. "
|
||||
"The payload does not match. Expected "
|
||||
f"'{prop.retrieval_expression}' to equal "
|
||||
f"{current_props[prop.name]}.", task)
|
||||
else:
|
||||
updated_props[prop.name] = new_val
|
||||
return updated_props
|
||||
|
||||
def waiting_events(self):
|
||||
# Ultimately I'd like to add an event class so that EventDefinitions would not so double duty as both specs
|
||||
|
|
|
@ -271,7 +271,8 @@ class Task(object, metaclass=DeprecatedMetaTask):
|
|||
def log_info(self, dct=None):
|
||||
extra = dct or {}
|
||||
extra.update({
|
||||
'workflow': self.workflow.spec.name,
|
||||
'workflow_spec': self.workflow.spec.name,
|
||||
'workflow_name': self.workflow.spec.description,
|
||||
'task_spec': self.task_spec.name,
|
||||
'task_name': self.task_spec.description,
|
||||
'task_id': self.id,
|
||||
|
|
|
@ -83,7 +83,8 @@ class Workflow(object):
|
|||
def log_info(self, dct=None):
|
||||
extra = dct or {}
|
||||
extra.update({
|
||||
'workflow': self.spec.name,
|
||||
'workflow_spec': self.spec.name,
|
||||
'workflow_name': self.spec.description,
|
||||
'task_spec': '-',
|
||||
'task_type': None,
|
||||
'task_id': None,
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
from SpiffWorkflow.bpmn.specs.SubWorkflowTask import CallActivity
|
||||
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow
|
||||
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow, BpmnMessage
|
||||
from SpiffWorkflow.task import TaskState
|
||||
|
||||
from tests.SpiffWorkflow.bpmn.BpmnWorkflowTestCase import BpmnWorkflowTestCase
|
||||
|
@ -60,12 +60,18 @@ class CollaborationTest(BpmnWorkflowTestCase):
|
|||
self.assertEqual('from_name', events[0]['value'][0].retrieval_expression)
|
||||
self.assertEqual('lover_name', events[0]['value'][0].name)
|
||||
|
||||
workflow.catch_bpmn_message('Love Letter Response', messages[0].payload,
|
||||
messages[0].correlations)
|
||||
# As shown above, the waiting event is looking for a payload with a
|
||||
# 'from_name' that should be used to retrieve the lover's name.
|
||||
new_message_payload = {'from_name': 'Peggy', 'other_nonsense': 1001}
|
||||
workflow.catch_bpmn_message('Love Letter Response', new_message_payload)
|
||||
workflow.do_engine_steps()
|
||||
# The external message created above should be caught
|
||||
self.assertEqual(receive.state, TaskState.COMPLETED)
|
||||
self.assertEqual(receive.data, messages[0].payload)
|
||||
# Spiff extensions allow us to specify the destination of a workflow
|
||||
# but base BPMN does not, and all keys are added directly to the
|
||||
# task data.
|
||||
self.assertEqual(workflow.last_task.data, {'from_name': 'Peggy', 'lover_name': 'Peggy', 'other_nonsense': 1001})
|
||||
self.assertEqual(workflow.correlations, {'lover':{'lover_name':'Peggy'}})
|
||||
self.assertEqual(workflow.is_completed(), True)
|
||||
|
||||
def testCorrelation(self):
|
||||
|
|
Loading…
Reference in New Issue