burnettk 8e3b905b07 Squashed 'SpiffWorkflow/' changes from 11e4b4f9..2ca6ebf8
2ca6ebf8 Data stores (#298)
c2fc9d22 Merge pull request #297 from sartography/bugfix/copy-all-data-when-input-or-output-list-empty
07e3b582 add checks for len == 0 when copying based on io spec
b439f69f Merge pull request #296 from sartography/bugfix/subprocess-access-to-data-objects
6d2a2031 update spiff subworkflow tasks too
992c3867 make data objects referenceable within subprocesses
6c8ff5cd allow subprocesses & call activities to have different data copy policies
2b14f3a4 initialize subprocesses in _update_hook instead of _on_ready_before
791f335d Merge pull request #295 from sartography/improvement/remove-camunda-from-base-and-misc-cleanup
28b579be remove a few unused, duplicative, and debugging methods
8f14d109 remove some other unused diagrams and tests
408bc673 rely on top level camunda parser for almost all namespace references
895b2cc9 remove camunda namespace from base bpmn parser
76ecbf7c Merge pull request #294 from sartography/bugfix/reactivate-boundary-event
82b6c8ad hack to ensure timers (and other events) are reset if returned to via loop reset
590903f4 Merge pull request #292 from sartography/feature/multiinstance-refactor
53749004 fix bug & typo
f31726db raise error on attempting to migrate workflows with MI
44e6d08d create spiff multiinstance task
2168c022 create camunda MI that approximates what it used to do
9894cea5 some improvements and bugfixes
f857ad5d remove some now unused functionality & tests, create a few more tests
6fead9d0 updated serializer & fixes for most tests
ec662ecd add parallel multiinstance
bd19b2a8 working sequential multiinstance
2f9c192b further cleanup around _update_hook
947792bf fix bug in exclusive gateway migration
d3d87b28 add io spec to all tasks
f1586e27 add support for standard loop tasks

git-subtree-dir: SpiffWorkflow
git-subtree-split: 2ca6ebf800d4ff1d54f3e1c48798a2cb879560f7
2023-02-23 10:42:56 -05:00

712 lines
24 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (C) 2007 Samuel Abels
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA
import copy
import logging
import time
import warnings
from uuid import uuid4
from .util.deep_merge import DeepMerge
from .exceptions import WorkflowException
logger = logging.getLogger('spiff')
metrics = logging.getLogger('spiff.metrics')
data_log = logging.getLogger('spiff.data')
class TaskState:
"""
The following states may exist:
- FUTURE: The task will definitely be reached in the future,
regardless of which choices the user makes within the workflow.
- LIKELY: The task may or may not be reached in the future. It
is likely because the specification lists it as the default
option for the ExclusiveChoice.
- MAYBE: The task may or may not be reached in the future. It
is not LIKELY, because the specification does not list it as the
default choice for the ExclusiveChoice.
- WAITING: The task is still waiting for an event before it
completes. For example, a Join task will be WAITING until all
predecessors are completed.
- READY: The conditions for completing the task are now satisfied.
Usually this means that all predecessors have completed and the
task may now be completed using
:class:`Workflow.complete_task_from_id()`.
- CANCELLED: The task was cancelled by a CancelTask or
CancelWorkflow task.
- COMPLETED: The task was regularily completed.
Note that the LIKELY and MAYBE tasks are merely predicted/guessed, so
those tasks may be removed from the tree at runtime later. They are
created to allow for visualizing the workflow at a time where
the required decisions have not yet been made.
"""
# Note: The states in this list are ordered in the sequence in which
# they may appear. Do not change.
MAYBE = 1
LIKELY = 2
FUTURE = 4
WAITING = 8
READY = 16
COMPLETED = 32
CANCELLED = 64
FINISHED_MASK = CANCELLED | COMPLETED
DEFINITE_MASK = FUTURE | WAITING | READY | FINISHED_MASK
PREDICTED_MASK = FUTURE | LIKELY | MAYBE
NOT_FINISHED_MASK = PREDICTED_MASK | WAITING | READY
ANY_MASK = FINISHED_MASK | NOT_FINISHED_MASK
TaskStateNames = {TaskState.FUTURE: 'FUTURE',
TaskState.WAITING: 'WAITING',
TaskState.READY: 'READY',
TaskState.CANCELLED: 'CANCELLED',
TaskState.COMPLETED: 'COMPLETED',
TaskState.LIKELY: 'LIKELY',
TaskState.MAYBE: 'MAYBE'}
TaskStateMasks = {
TaskState.FINISHED_MASK: 'FINISHED_MASK',
TaskState.DEFINITE_MASK: 'DEFINITE_MASK',
TaskState.PREDICTED_MASK: 'PREDICTED_MASK',
TaskState.NOT_FINISHED_MASK: 'NOT_FINISHED_MASK',
TaskState.ANY_MASK: 'ANY_MASK',
}
class DeprecatedMetaTask(type):
"""
Handle deprecated methods that are now moved to TaskState
"""
TaskNames = {**TaskStateNames, **TaskStateMasks}
TaskStateFromNames = {v: k for k, v in TaskNames.items()}
def __getattribute__(self, item):
if item in DeprecatedMetaTask.TaskNames.values():
warnings.warn(f'Task.{item} is deprecated. '
f'Please use TaskState.{item}',
DeprecationWarning, stacklevel=2)
return DeprecatedMetaTask.TaskStateFromNames[item]
else:
return type.__getattribute__(self, item)
class Task(object, metaclass=DeprecatedMetaTask):
"""
Used internally for composing a tree that represents the path that
is taken (or predicted) within the workflow.
Each Task has a state. For an explanation, consider the following task
specification::
,-> Simple (default choice)
StartTask -> ExclusiveChoice
`-> Simple
The initial task tree for this specification looks like so::
,-> Simple LIKELY
StartTask WAITING -> ExclusiveChoice FUTURE
`-> Simple MAYBE
See TaskStates for the available states on a Task.
"""
class Iterator(object):
MAX_ITERATIONS = 10000
"""
This is a tree iterator that supports filtering such that a client
may walk through all tasks that have a specific state.
"""
def __init__(self, current, filter=None):
"""
Constructor.
"""
self.filter = filter
self.path = [current]
self.count = 1
def __iter__(self):
return self
def _next(self):
# Make sure that the end is not yet reached.
if len(self.path) == 0:
raise StopIteration()
current = self.path[-1]
# Assure we don't recurse forever.
self.count += 1
if self.count > self.MAX_ITERATIONS:
raise WorkflowException("Task Iterator entered infinite recursion loop", task_spec=current)
# If the current task has children, the first child is the next
# item. If the current task is LIKELY, and predicted tasks are not
# specificly searched, we can ignore the children, because
# predicted tasks should only have predicted children.
ignore_task = False
if self.filter is not None:
search_predicted = self.filter & TaskState.LIKELY != 0
is_predicted = current.state & TaskState.LIKELY != 0
ignore_task = is_predicted and not search_predicted
if current.children and not ignore_task:
self.path.append(current.children[0])
if (self.filter is not None and
current.state & self.filter == 0):
return None
return current
# Ending up here, this task has no children. Crop the path until we
# reach a task that has unvisited children, or until we hit the
# end.
while True:
old_child = self.path.pop(-1)
if len(self.path) == 0:
break
# If this task has a sibling, choose it.
parent = self.path[-1]
pos = parent.children.index(old_child)
if len(parent.children) > pos + 1:
self.path.append(parent.children[pos + 1])
break
if self.filter is not None and current.state & self.filter == 0:
return None
return current
def __next__(self):
# By using this loop we avoid an (expensive) recursive call.
while True:
next = self._next()
if next is not None:
return next
# Python 3 iterator protocol
next = __next__
# Pool for assigning a unique thread id to every new Task.
thread_id_pool = 0
def __init__(self, workflow, task_spec, parent=None, state=TaskState.MAYBE):
"""
Constructor.
"""
assert workflow is not None
assert task_spec is not None
self.workflow = workflow
self.parent = parent
self.children = []
self._state = state
self.triggered = False
self.task_spec = task_spec
self.id = uuid4()
self.thread_id = self.__class__.thread_id_pool
self.data = {}
self.internal_data = {}
self.last_state_change = time.time()
if parent is not None:
self.parent._child_added_notify(self)
@property
def state(self):
return self._state
@state.setter
def state(self, value):
if value < self._state:
raise WorkflowException(
'state went from %s to %s!' % (self.get_state_name(), TaskStateNames[value]),
task_spec=self.task_spec
)
self._set_state(value)
def _set_state(self, value):
"""Using the setter method will raise an error on a "backwards" state change.
Call this method directly to force the state change.
"""
if value != self.state:
logger.info(f'State change to {TaskStateNames[value]}', extra=self.log_info())
self.last_state_change = time.time()
self._state = value
else:
logger.debug(f'State set to {TaskStateNames[value]}', extra=self.log_info())
def __repr__(self):
return '<Task object (%s) in state %s at %s>' % (
self.task_spec.name,
self.get_state_name(),
hex(id(self)))
def log_info(self, dct=None):
extra = dct or {}
extra.update({
'workflow': self.workflow.spec.name,
'task_spec': self.task_spec.name,
'task_name': self.task_spec.description,
'task_id': self.id,
'task_type': self.task_spec.spec_type,
'data': self.data if logger.level < 20 else None,
'internal_data': self.internal_data if logger.level <= 10 else None,
})
return extra
def update_data(self, data):
"""
If the task.data needs to be updated from a UserTask form or
a Script task then use this function rather than updating task.data
directly. It will handle deeper merges of data,
and MultiInstance tasks will be updated correctly.
"""
self.data = DeepMerge.merge(self.data, data)
data_log.info('Data update', extra=self.log_info())
def set_children_future(self):
"""
for a parallel gateway, we need to set up our
children so that the gateway figures out that it needs to join up
the inputs - otherwise our child process never gets marked as
'READY'
"""
if not self.task_spec.task_should_set_children_future(self):
return
self.task_spec.task_will_set_children_future(self)
# now we set this one to execute
self._set_state(TaskState.MAYBE)
self._sync_children(self.task_spec.outputs)
for child in self.children:
child.set_children_future()
def reset_token(self, data, reset_data=False):
"""
Resets the token to this task. This should allow a trip 'back in time'
as it were to items that have already been completed.
:type reset_data: bool
:param reset_data: Do we want to have the data be where we left of in
this task or not
"""
self.internal_data = {}
if not reset_data and self.workflow.last_task and self.workflow.last_task.data:
# This is a little sly, the data that will get inherited should
# be from the last completed task, but we don't want to alter
# the tree, so we just set the parent's data to the given data.
self.parent.data = copy.deepcopy(data)
self.workflow.last_task = self.parent
self.set_children_future() # this method actually fixes the problem
self._set_state(TaskState.FUTURE)
self.task_spec._update(self)
def __iter__(self):
return Task.Iterator(self)
def _get_root(self):
"""
Returns the top level parent.
"""
if self.parent is None:
return self
return self.parent._get_root()
def _get_depth(self):
depth = 0
task = self.parent
while task is not None:
depth += 1
task = task.parent
return depth
def _child_added_notify(self, child):
"""
Called by another Task to let us know that a child was added.
"""
assert child is not None
self.children.append(child)
def _drop_children(self, force=False):
drop = []
for child in self.children:
if force or (not child._is_finished()):
drop.append(child)
else:
child._drop_children()
for task in drop:
self.children.remove(task)
def _has_state(self, state):
"""
Returns True if the Task has the given state flag set.
"""
return (self.state & state) != 0
def _is_finished(self):
return self._has_state(TaskState.FINISHED_MASK)
def _is_predicted(self):
return self._has_state(TaskState.PREDICTED_MASK)
def _is_definite(self):
return self._has_state(TaskState.DEFINITE_MASK)
def _add_child(self, task_spec, state=TaskState.MAYBE):
"""
Adds a new child and assigns the given TaskSpec to it.
:type task_spec: TaskSpec
:param task_spec: The task spec that is assigned to the new child.
:type state: integer
:param state: The bitmask of states for the new child.
:rtype: Task
:returns: The new child task.
"""
if task_spec is None:
raise ValueError(self, '_add_child() requires a TaskSpec')
if self._is_predicted() and state & TaskState.PREDICTED_MASK == 0:
msg = 'Attempt to add non-predicted child to predicted task'
raise WorkflowException(msg, task_spec=self.task_spec)
task = Task(self.workflow, task_spec, self, state=state)
task.thread_id = self.thread_id
if state == TaskState.READY:
task._ready()
return task
def _assign_new_thread_id(self, recursive=True):
"""
Assigns a new thread id to the task.
:type recursive: bool
:param recursive: Whether to assign the id to children recursively.
:rtype: bool
:returns: The new thread id.
"""
self.__class__.thread_id_pool += 1
self.thread_id = self.__class__.thread_id_pool
if not recursive:
return self.thread_id
for child in self:
child.thread_id = self.thread_id
return self.thread_id
def _sync_children(self, task_specs, state=TaskState.MAYBE):
"""
This method syncs up the task's children with the given list of task
specs. In other words::
- Add one child for each given TaskSpec, unless that child already
exists.
- Remove all children for which there is no spec in the given list,
unless it is a "triggered" task.
- Handle looping back to previous tasks, so we don't end up with
an infinitely large tree.
.. note::
It is an error if the task has a non-predicted child that is
not given in the TaskSpecs.
:type task_specs: list(TaskSpec)
:param task_specs: The list of task specs that may become children.
:type state: integer
:param state: The bitmask of states for the new children.
"""
if task_specs is None:
raise ValueError('"task_specs" argument is None')
new_children = task_specs[:]
# If a child task_spec is also an ancestor, we are looping back,
# replace those specs with a loopReset task.
root_task = self._get_root()
for index, task_spec in enumerate(new_children):
ancestor_task = self._find_ancestor(task_spec)
if ancestor_task and ancestor_task != root_task:
destination = ancestor_task
new_spec = self.workflow.get_reset_task_spec(destination)
new_spec.outputs = []
new_spec.inputs = task_spec.inputs
new_children[index] = new_spec
# Create a list of all children that are no longer needed.
unneeded_children = []
for child in self.children:
# Triggered tasks are never removed.
if child.triggered:
continue
# If the task already exists, remove it from to-be-added
if child.task_spec in new_children:
new_children.remove(child.task_spec)
# We should set the state here but that breaks everything
continue
# Definite tasks must not be removed, so they HAVE to be in the given task spec list.
if child._is_definite():
raise WorkflowException(f'removal of non-predicted child {child}', task_spec=self.task_spec)
unneeded_children.append(child)
# Remove and add the children accordingly.
for child in unneeded_children:
self.children.remove(child)
for task_spec in new_children:
self._add_child(task_spec, state)
def _set_likely_task(self, task_specs):
if not isinstance(task_specs, list):
task_specs = [task_specs]
for task_spec in task_specs:
for child in self.children:
if child.task_spec != task_spec:
continue
if child._is_definite():
continue
child._set_state(TaskState.LIKELY)
def _is_descendant_of(self, parent):
"""
Returns True if parent is in the list of ancestors, returns False
otherwise.
:type parent: Task
:param parent: The parent that is searched in the ancestors.
:rtype: bool
:returns: Whether the parent was found.
"""
if self.parent is None:
return False
if self.parent == parent:
return True
return self.parent._is_descendant_of(parent)
def _find_child_of(self, parent_task_spec):
"""
Returns the ancestor that has a task with the given task spec
as a parent.
If no such ancestor was found, the root task is returned.
:type parent_task_spec: TaskSpec
:param parent_task_spec: The wanted ancestor.
:rtype: Task
:returns: The child of the given ancestor.
"""
if self.parent is None:
return self
if self.parent.task_spec == parent_task_spec:
return self
return self.parent._find_child_of(parent_task_spec)
def _find_any(self, task_spec):
"""
Returns any descendants that have the given task spec assigned.
:type task_spec: TaskSpec
:param task_spec: The wanted task spec.
:rtype: list(Task)
:returns: The tasks objects that are attached to the given task spec.
"""
tasks = []
if self.task_spec == task_spec:
tasks.append(self)
for child in self:
if child.task_spec != task_spec:
continue
tasks.append(child)
return tasks
def _find_ancestor(self, task_spec):
"""
Returns the ancestor that has the given task spec assigned.
If no such ancestor was found, the root task is returned.
:type task_spec: TaskSpec
:param task_spec: The wanted task spec.
:rtype: Task
:returns: The ancestor.
"""
if self.parent is None:
return self
if self.parent.task_spec == task_spec:
return self.parent
return self.parent._find_ancestor(task_spec)
def _find_ancestor_from_name(self, name):
"""
Returns the ancestor that has a task with the given name assigned.
Returns None if no such ancestor was found.
:type name: str
:param name: The name of the wanted task.
:rtype: Task
:returns: The ancestor.
"""
if self.parent is None:
return None
if self.parent.get_name() == name:
return self.parent
return self.parent._find_ancestor_from_name(name)
def _ready(self):
"""
Marks the task as ready for execution.
"""
if self._has_state(TaskState.COMPLETED) or self._has_state(TaskState.CANCELLED):
return
self._set_state(TaskState.READY)
self.task_spec._on_ready(self)
def get_name(self):
return str(self.task_spec.name)
def get_description(self):
return str(self.task_spec.description)
def get_state_name(self):
"""
Returns a textual representation of this Task's state.
"""
state_name = []
for state, name in list(TaskStateNames.items()):
if self._has_state(state):
state_name.append(name)
return '|'.join(state_name)
def get_spec_data(self, name=None, default=None):
"""
Returns the value of the spec data with the given name, or the given
default value if the spec data does not exist.
:type name: str
:param name: The name of the spec data field.
:type default: obj
:param default: Return this value if the spec data does not exist.
:rtype: obj
:returns: The value of the spec data.
"""
return self.task_spec.get_data(name, default)
def _set_internal_data(self, **kwargs):
"""
Defines the given attribute/value pairs.
"""
self.internal_data.update(kwargs)
def _get_internal_data(self, name, default=None):
return self.internal_data.get(name, default)
def set_data(self, **kwargs):
"""
Defines the given attribute/value pairs.
"""
self.data.update(kwargs)
def _inherit_data(self):
"""
Inherits the data from the parent.
"""
self.set_data(**self.parent.data)
def get_data(self, name, default=None):
"""
Returns the value of the data field with the given name, or the given
default value if the data field does not exist.
:type name: str
:param name: A data field name.
:type default: obj
:param default: Return this value if the data field does not exist.
:rtype: obj
:returns: The value of the data field
"""
return self.data.get(name, default)
def cancel(self):
"""
Cancels the item if it was not yet completed, and removes
any children that are LIKELY.
"""
if self._is_finished():
for child in self.children:
child.cancel()
return
self._set_state(TaskState.CANCELLED)
self._drop_children()
self.task_spec._on_cancel(self)
def complete(self):
"""
Called by the associated task to let us know that its state
has changed (e.g. from FUTURE to COMPLETED.)
"""
self._set_state(TaskState.COMPLETED)
# I am taking back my previous comment about running the task after it's completed being "CRAZY"
# Turns out that tasks are in fact supposed to be complete at this point and I've been wrong all along
# about when tasks should actually be executed
start = time.time()
retval = self.task_spec._on_complete(self)
extra = self.log_info({
'action': 'Complete',
'elapsed': time.time() - start
})
metrics.debug('', extra=extra)
return retval
def trigger(self, *args):
"""
If recursive is True, the state is applied to the tree recursively.
"""
self.task_spec._on_trigger(self, *args)
def get_dump(self, indent=0, recursive=True):
"""
Returns the subtree as a string for debugging.
:rtype: str
:returns: The debug information.
"""
dbg = (' ' * indent * 2)
dbg += '%s/' % self.id
dbg += '%s:' % self.thread_id
dbg += ' Task of %s' % self.get_name()
if self.task_spec.description:
dbg += ' (%s)' % self.get_description()
dbg += ' State: %s' % self.get_state_name()
dbg += ' Children: %s' % len(self.children)
if recursive:
for child in self.children:
dbg += '\n' + child.get_dump(indent + 1)
return dbg
def dump(self, indent=0):
"""
Prints the subtree as a string for debugging.
"""
print(self.get_dump())