# -*- coding: utf-8 -*- # Copyright (C) 2007 Samuel Abels # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301 USA import copy import logging import time import warnings from uuid import uuid4 from .util.deep_merge import DeepMerge from .exceptions import WorkflowException logger = logging.getLogger('spiff') metrics = logging.getLogger('spiff.metrics') data_log = logging.getLogger('spiff.data') class TaskState: """ The following states may exist: - FUTURE: The task will definitely be reached in the future, regardless of which choices the user makes within the workflow. - LIKELY: The task may or may not be reached in the future. It is likely because the specification lists it as the default option for the ExclusiveChoice. - MAYBE: The task may or may not be reached in the future. It is not LIKELY, because the specification does not list it as the default choice for the ExclusiveChoice. - WAITING: The task is still waiting for an event before it completes. For example, a Join task will be WAITING until all predecessors are completed. - READY: The conditions for completing the task are now satisfied. Usually this means that all predecessors have completed and the task may now be completed using :class:`Workflow.complete_task_from_id()`. - CANCELLED: The task was cancelled by a CancelTask or CancelWorkflow task. - COMPLETED: The task was regularily completed. Note that the LIKELY and MAYBE tasks are merely predicted/guessed, so those tasks may be removed from the tree at runtime later. They are created to allow for visualizing the workflow at a time where the required decisions have not yet been made. """ # Note: The states in this list are ordered in the sequence in which # they may appear. Do not change. MAYBE = 1 LIKELY = 2 FUTURE = 4 WAITING = 8 READY = 16 COMPLETED = 32 CANCELLED = 64 FINISHED_MASK = CANCELLED | COMPLETED DEFINITE_MASK = FUTURE | WAITING | READY | FINISHED_MASK PREDICTED_MASK = FUTURE | LIKELY | MAYBE NOT_FINISHED_MASK = PREDICTED_MASK | WAITING | READY ANY_MASK = FINISHED_MASK | NOT_FINISHED_MASK TaskStateNames = {TaskState.FUTURE: 'FUTURE', TaskState.WAITING: 'WAITING', TaskState.READY: 'READY', TaskState.CANCELLED: 'CANCELLED', TaskState.COMPLETED: 'COMPLETED', TaskState.LIKELY: 'LIKELY', TaskState.MAYBE: 'MAYBE'} TaskStateMasks = { TaskState.FINISHED_MASK: 'FINISHED_MASK', TaskState.DEFINITE_MASK: 'DEFINITE_MASK', TaskState.PREDICTED_MASK: 'PREDICTED_MASK', TaskState.NOT_FINISHED_MASK: 'NOT_FINISHED_MASK', TaskState.ANY_MASK: 'ANY_MASK', } class DeprecatedMetaTask(type): """ Handle deprecated methods that are now moved to TaskState """ TaskNames = {**TaskStateNames, **TaskStateMasks} TaskStateFromNames = {v: k for k, v in TaskNames.items()} def __getattribute__(self, item): if item in DeprecatedMetaTask.TaskNames.values(): warnings.warn(f'Task.{item} is deprecated. ' f'Please use TaskState.{item}', DeprecationWarning, stacklevel=2) return DeprecatedMetaTask.TaskStateFromNames[item] else: return type.__getattribute__(self, item) class Task(object, metaclass=DeprecatedMetaTask): """ Used internally for composing a tree that represents the path that is taken (or predicted) within the workflow. Each Task has a state. For an explanation, consider the following task specification:: ,-> Simple (default choice) StartTask -> ExclusiveChoice `-> Simple The initial task tree for this specification looks like so:: ,-> Simple LIKELY StartTask WAITING -> ExclusiveChoice FUTURE `-> Simple MAYBE See TaskStates for the available states on a Task. """ class Iterator(object): MAX_ITERATIONS = 10000 """ This is a tree iterator that supports filtering such that a client may walk through all tasks that have a specific state. """ def __init__(self, current, filter=None): """ Constructor. """ self.filter = filter self.path = [current] self.count = 1 def __iter__(self): return self def _next(self): # Make sure that the end is not yet reached. if len(self.path) == 0: raise StopIteration() current = self.path[-1] # Assure we don't recurse forever. self.count += 1 if self.count > self.MAX_ITERATIONS: raise WorkflowException("Task Iterator entered infinite recursion loop", task_spec=current) # If the current task has children, the first child is the next # item. If the current task is LIKELY, and predicted tasks are not # specificly searched, we can ignore the children, because # predicted tasks should only have predicted children. ignore_task = False if self.filter is not None: search_predicted = self.filter & TaskState.LIKELY != 0 is_predicted = current.state & TaskState.LIKELY != 0 ignore_task = is_predicted and not search_predicted if current.children and not ignore_task: self.path.append(current.children[0]) if (self.filter is not None and current.state & self.filter == 0): return None return current # Ending up here, this task has no children. Crop the path until we # reach a task that has unvisited children, or until we hit the # end. while True: old_child = self.path.pop(-1) if len(self.path) == 0: break # If this task has a sibling, choose it. parent = self.path[-1] pos = parent.children.index(old_child) if len(parent.children) > pos + 1: self.path.append(parent.children[pos + 1]) break if self.filter is not None and current.state & self.filter == 0: return None return current def __next__(self): # By using this loop we avoid an (expensive) recursive call. while True: next = self._next() if next is not None: return next # Python 3 iterator protocol next = __next__ # Pool for assigning a unique thread id to every new Task. thread_id_pool = 0 def __init__(self, workflow, task_spec, parent=None, state=TaskState.MAYBE): """ Constructor. """ assert workflow is not None assert task_spec is not None self.workflow = workflow self.parent = parent self.children = [] self._state = state self.triggered = False self.task_spec = task_spec self.id = uuid4() self.thread_id = self.__class__.thread_id_pool self.data = {} self.internal_data = {} self.last_state_change = time.time() if parent is not None: self.parent._child_added_notify(self) @property def state(self): return self._state @state.setter def state(self, value): if value < self._state: raise WorkflowException( 'state went from %s to %s!' % (self.get_state_name(), TaskStateNames[value]), task_spec=self.task_spec ) self._set_state(value) def _set_state(self, value): """Using the setter method will raise an error on a "backwards" state change. Call this method directly to force the state change. """ if value != self.state: logger.info(f'State change to {TaskStateNames[value]}', extra=self.log_info()) self.last_state_change = time.time() self._state = value else: logger.debug(f'State set to {TaskStateNames[value]}', extra=self.log_info()) def __repr__(self): return '' % ( self.task_spec.name, self.get_state_name(), hex(id(self))) def log_info(self, dct=None): extra = dct or {} extra.update({ 'workflow': self.workflow.spec.name, 'task_spec': self.task_spec.name, 'task_name': self.task_spec.description, 'task_id': self.id, 'task_type': self.task_spec.spec_type, 'data': self.data if logger.level < 20 else None, 'internal_data': self.internal_data if logger.level <= 10 else None, }) return extra def update_data(self, data): """ If the task.data needs to be updated from a UserTask form or a Script task then use this function rather than updating task.data directly. It will handle deeper merges of data, and MultiInstance tasks will be updated correctly. """ self.data = DeepMerge.merge(self.data, data) data_log.info('Data update', extra=self.log_info()) def set_children_future(self): """ for a parallel gateway, we need to set up our children so that the gateway figures out that it needs to join up the inputs - otherwise our child process never gets marked as 'READY' """ if not self.task_spec.task_should_set_children_future(self): return self.task_spec.task_will_set_children_future(self) # now we set this one to execute self._set_state(TaskState.MAYBE) self._sync_children(self.task_spec.outputs) for child in self.children: child.set_children_future() def reset_token(self, data, reset_data=False): """ Resets the token to this task. This should allow a trip 'back in time' as it were to items that have already been completed. :type reset_data: bool :param reset_data: Do we want to have the data be where we left of in this task or not """ self.internal_data = {} if not reset_data and self.workflow.last_task and self.workflow.last_task.data: # This is a little sly, the data that will get inherited should # be from the last completed task, but we don't want to alter # the tree, so we just set the parent's data to the given data. self.parent.data = copy.deepcopy(data) self.workflow.last_task = self.parent self.set_children_future() # this method actually fixes the problem self._set_state(TaskState.FUTURE) self.task_spec._update(self) def __iter__(self): return Task.Iterator(self) def _get_root(self): """ Returns the top level parent. """ if self.parent is None: return self return self.parent._get_root() def _get_depth(self): depth = 0 task = self.parent while task is not None: depth += 1 task = task.parent return depth def _child_added_notify(self, child): """ Called by another Task to let us know that a child was added. """ assert child is not None self.children.append(child) def _drop_children(self, force=False): drop = [] for child in self.children: if force or (not child._is_finished()): drop.append(child) else: child._drop_children() for task in drop: self.children.remove(task) def _has_state(self, state): """ Returns True if the Task has the given state flag set. """ return (self.state & state) != 0 def _is_finished(self): return self._has_state(TaskState.FINISHED_MASK) def _is_predicted(self): return self._has_state(TaskState.PREDICTED_MASK) def _is_definite(self): return self._has_state(TaskState.DEFINITE_MASK) def _add_child(self, task_spec, state=TaskState.MAYBE): """ Adds a new child and assigns the given TaskSpec to it. :type task_spec: TaskSpec :param task_spec: The task spec that is assigned to the new child. :type state: integer :param state: The bitmask of states for the new child. :rtype: Task :returns: The new child task. """ if task_spec is None: raise ValueError(self, '_add_child() requires a TaskSpec') if self._is_predicted() and state & TaskState.PREDICTED_MASK == 0: msg = 'Attempt to add non-predicted child to predicted task' raise WorkflowException(msg, task_spec=self.task_spec) task = Task(self.workflow, task_spec, self, state=state) task.thread_id = self.thread_id if state == TaskState.READY: task._ready() return task def _assign_new_thread_id(self, recursive=True): """ Assigns a new thread id to the task. :type recursive: bool :param recursive: Whether to assign the id to children recursively. :rtype: bool :returns: The new thread id. """ self.__class__.thread_id_pool += 1 self.thread_id = self.__class__.thread_id_pool if not recursive: return self.thread_id for child in self: child.thread_id = self.thread_id return self.thread_id def _sync_children(self, task_specs, state=TaskState.MAYBE): """ This method syncs up the task's children with the given list of task specs. In other words:: - Add one child for each given TaskSpec, unless that child already exists. - Remove all children for which there is no spec in the given list, unless it is a "triggered" task. - Handle looping back to previous tasks, so we don't end up with an infinitely large tree. .. note:: It is an error if the task has a non-predicted child that is not given in the TaskSpecs. :type task_specs: list(TaskSpec) :param task_specs: The list of task specs that may become children. :type state: integer :param state: The bitmask of states for the new children. """ if task_specs is None: raise ValueError('"task_specs" argument is None') new_children = task_specs[:] # If a child task_spec is also an ancestor, we are looping back, # replace those specs with a loopReset task. root_task = self._get_root() for index, task_spec in enumerate(new_children): ancestor_task = self._find_ancestor(task_spec) if ancestor_task and ancestor_task != root_task: destination = ancestor_task new_spec = self.workflow.get_reset_task_spec(destination) new_spec.outputs = [] new_spec.inputs = task_spec.inputs new_children[index] = new_spec # Create a list of all children that are no longer needed. unneeded_children = [] for child in self.children: # Triggered tasks are never removed. if child.triggered: continue # If the task already exists, remove it from to-be-added if child.task_spec in new_children: new_children.remove(child.task_spec) # We should set the state here but that breaks everything continue # Definite tasks must not be removed, so they HAVE to be in the given task spec list. if child._is_definite(): raise WorkflowException(f'removal of non-predicted child {child}', task_spec=self.task_spec) unneeded_children.append(child) # Remove and add the children accordingly. for child in unneeded_children: self.children.remove(child) for task_spec in new_children: self._add_child(task_spec, state) def _set_likely_task(self, task_specs): if not isinstance(task_specs, list): task_specs = [task_specs] for task_spec in task_specs: for child in self.children: if child.task_spec != task_spec: continue if child._is_definite(): continue child._set_state(TaskState.LIKELY) def _is_descendant_of(self, parent): """ Returns True if parent is in the list of ancestors, returns False otherwise. :type parent: Task :param parent: The parent that is searched in the ancestors. :rtype: bool :returns: Whether the parent was found. """ if self.parent is None: return False if self.parent == parent: return True return self.parent._is_descendant_of(parent) def _find_child_of(self, parent_task_spec): """ Returns the ancestor that has a task with the given task spec as a parent. If no such ancestor was found, the root task is returned. :type parent_task_spec: TaskSpec :param parent_task_spec: The wanted ancestor. :rtype: Task :returns: The child of the given ancestor. """ if self.parent is None: return self if self.parent.task_spec == parent_task_spec: return self return self.parent._find_child_of(parent_task_spec) def _find_any(self, task_spec): """ Returns any descendants that have the given task spec assigned. :type task_spec: TaskSpec :param task_spec: The wanted task spec. :rtype: list(Task) :returns: The tasks objects that are attached to the given task spec. """ tasks = [] if self.task_spec == task_spec: tasks.append(self) for child in self: if child.task_spec != task_spec: continue tasks.append(child) return tasks def _find_ancestor(self, task_spec): """ Returns the ancestor that has the given task spec assigned. If no such ancestor was found, the root task is returned. :type task_spec: TaskSpec :param task_spec: The wanted task spec. :rtype: Task :returns: The ancestor. """ if self.parent is None: return self if self.parent.task_spec == task_spec: return self.parent return self.parent._find_ancestor(task_spec) def _find_ancestor_from_name(self, name): """ Returns the ancestor that has a task with the given name assigned. Returns None if no such ancestor was found. :type name: str :param name: The name of the wanted task. :rtype: Task :returns: The ancestor. """ if self.parent is None: return None if self.parent.get_name() == name: return self.parent return self.parent._find_ancestor_from_name(name) def _ready(self): """ Marks the task as ready for execution. """ if self._has_state(TaskState.COMPLETED) or self._has_state(TaskState.CANCELLED): return self._set_state(TaskState.READY) self.task_spec._on_ready(self) def get_name(self): return str(self.task_spec.name) def get_description(self): return str(self.task_spec.description) def get_state_name(self): """ Returns a textual representation of this Task's state. """ state_name = [] for state, name in list(TaskStateNames.items()): if self._has_state(state): state_name.append(name) return '|'.join(state_name) def get_spec_data(self, name=None, default=None): """ Returns the value of the spec data with the given name, or the given default value if the spec data does not exist. :type name: str :param name: The name of the spec data field. :type default: obj :param default: Return this value if the spec data does not exist. :rtype: obj :returns: The value of the spec data. """ return self.task_spec.get_data(name, default) def _set_internal_data(self, **kwargs): """ Defines the given attribute/value pairs. """ self.internal_data.update(kwargs) def _get_internal_data(self, name, default=None): return self.internal_data.get(name, default) def set_data(self, **kwargs): """ Defines the given attribute/value pairs. """ self.data.update(kwargs) def _inherit_data(self): """ Inherits the data from the parent. """ self.set_data(**self.parent.data) def get_data(self, name, default=None): """ Returns the value of the data field with the given name, or the given default value if the data field does not exist. :type name: str :param name: A data field name. :type default: obj :param default: Return this value if the data field does not exist. :rtype: obj :returns: The value of the data field """ return self.data.get(name, default) def cancel(self): """ Cancels the item if it was not yet completed, and removes any children that are LIKELY. """ if self._is_finished(): for child in self.children: child.cancel() return self._set_state(TaskState.CANCELLED) self._drop_children() self.task_spec._on_cancel(self) def complete(self): """ Called by the associated task to let us know that its state has changed (e.g. from FUTURE to COMPLETED.) """ self._set_state(TaskState.COMPLETED) # I am taking back my previous comment about running the task after it's completed being "CRAZY" # Turns out that tasks are in fact supposed to be complete at this point and I've been wrong all along # about when tasks should actually be executed start = time.time() retval = self.task_spec._on_complete(self) extra = self.log_info({ 'action': 'Complete', 'elapsed': time.time() - start }) metrics.debug('', extra=extra) return retval def trigger(self, *args): """ If recursive is True, the state is applied to the tree recursively. """ self.task_spec._on_trigger(self, *args) def get_dump(self, indent=0, recursive=True): """ Returns the subtree as a string for debugging. :rtype: str :returns: The debug information. """ dbg = (' ' * indent * 2) dbg += '%s/' % self.id dbg += '%s:' % self.thread_id dbg += ' Task of %s' % self.get_name() if self.task_spec.description: dbg += ' (%s)' % self.get_description() dbg += ' State: %s' % self.get_state_name() dbg += ' Children: %s' % len(self.children) if recursive: for child in self.children: dbg += '\n' + child.get_dump(indent + 1) return dbg def dump(self, indent=0): """ Prints the subtree as a string for debugging. """ print(self.get_dump())