Initial work on a "Previous" task.

This commit is contained in:
Dan Funk 2020-05-01 12:11:39 -04:00
parent 7c743d65ed
commit 1f5002680a
3 changed files with 49 additions and 7 deletions

View File

@ -98,13 +98,16 @@ def __get_workflow_api_model(processor: WorkflowProcessor, status_data=None):
status=processor.get_status(),
last_task=WorkflowService.spiff_task_to_api_task(processor.bpmn_workflow.last_task),
next_task=None,
previous_task=None,
user_tasks=user_tasks,
workflow_spec_id=processor.workflow_spec_id,
spec_version=processor.get_spec_version(),
is_latest_spec=processor.get_spec_version() == processor.get_latest_version_string(processor.workflow_spec_id),
)
if processor.next_task():
workflow_api.next_task = WorkflowService.spiff_task_to_api_task(processor.next_task())
next_task = processor.next_task()
if next_task:
workflow_api.next_task = WorkflowService.spiff_task_to_api_task(next_task)
workflow_api.parent_task = processor.previous_task(next_task)
return workflow_api
@ -123,8 +126,10 @@ def set_current_task(workflow_id, task_id):
workflow_model = session.query(WorkflowModel).filter_by(id=workflow_id).first()
processor = WorkflowProcessor(workflow_model)
task_id = uuid.UUID(task_id)
last_task = processor.bpmn_workflow.last_task
task = processor.bpmn_workflow.get_task(task_id)
task.reset_token(reset_data=False) # we could optionally clear the previous data.
#processor.bpmn_workflow.last_task = last_task
workflow_model.bpmn_workflow_json = processor.serialize()
session.add(workflow_model)
session.commit()

View File

@ -90,13 +90,14 @@ class TaskSchema(ma.Schema):
class WorkflowApi(object):
def __init__(self, id, status, user_tasks, last_task, next_task,
def __init__(self, id, status, user_tasks, last_task, next_task, previous_task,
spec_version, is_latest_spec, workflow_spec_id):
self.id = id
self.status = status
self.user_tasks = user_tasks
self.last_task = last_task
self.next_task = next_task
self.last_task = last_task # The last task that was completed, may be different than previous.
self.next_task = next_task # The next task that requires user input.
self.previous_task = previous_task # The opposite of next task.
self.workflow_spec_id = workflow_spec_id
self.spec_version = spec_version
self.is_latest_spec = is_latest_spec
@ -104,7 +105,7 @@ class WorkflowApi(object):
class WorkflowApiSchema(ma.Schema):
class Meta:
model = WorkflowApi
fields = ["id", "status", "user_tasks", "last_task", "next_task",
fields = ["id", "status", "user_tasks", "last_task", "next_task", "previous_task",
"workflow_spec_id", "spec_version", "is_latest_spec"]
unknown = INCLUDE
@ -112,10 +113,11 @@ class WorkflowApiSchema(ma.Schema):
user_tasks = marshmallow.fields.List(marshmallow.fields.Nested(TaskSchema, dump_only=True))
last_task = marshmallow.fields.Nested(TaskSchema, dump_only=True)
next_task = marshmallow.fields.Nested(TaskSchema, dump_only=True, required=False)
previous_task = marshmallow.fields.Nested(TaskSchema, dump_only=True, required=False)
@marshmallow.post_load
def make_workflow(self, data, **kwargs):
keys = ['id', 'status', 'user_tasks', 'last_task', 'next_task',
keys = ['id', 'status', 'user_tasks', 'last_task', 'next_task', 'previous_task',
'workflow_spec_id', 'spec_version', 'is_latest_spec']
filtered_fields = {key: data[key] for key in keys}
return WorkflowApi(**filtered_fields)

View File

@ -373,6 +373,41 @@ class WorkflowProcessor(object):
next_task = task
return next_task
def previous_task(self, next_task):
"""Attempts to find the opposite of "next_task", given where you are at, what user task
would you have wish to complete when moving in the opposite direction.
"""
# If the parent task is not an engine task, return that.
if not next_task.parent.is_engine_task:
return next_task.parent
if isinstance(Gateway)
# If the last task is a parent of this task,
# If the whole blessed mess is done, return the end_event task in the tree
if self.bpmn_workflow.is_completed():
for task in SpiffTask.Iterator(self.bpmn_workflow.task_tree, SpiffTask.ANY_MASK):
if isinstance(task.task_spec, EndEvent):
return task
# If there are ready tasks to complete, return the next ready task, but return the one
# in the active parallel path if possible.
ready_tasks = self.bpmn_workflow.get_tasks(SpiffTask.READY)
if len(ready_tasks) > 0:
for task in ready_tasks:
if task.parent == self.bpmn_workflow.last_task:
return task
return ready_tasks[0]
# If there are no ready tasks, but the thing isn't complete yet, find the first non-complete task
# and return that
next_task = None
for task in SpiffTask.Iterator(self.bpmn_workflow.task_tree, SpiffTask.NOT_FINISHED_MASK):
next_task = task
return next_task
def complete_task(self, task):
self.bpmn_workflow.complete_task_from_id(task.id)
self.workflow_model.total_tasks = len(self.get_all_user_tasks())