remove useless variable and add guard clause to reduce complexity

This commit is contained in:
burnettk 2022-05-23 19:08:36 -04:00
parent 1712182a1e
commit 7846ec6836
1 changed files with 36 additions and 37 deletions

View File

@ -171,56 +171,55 @@ def run(
answer: Optional[Dict[str, str]] = None,
) -> Union[ProcessStatus, Dict[str, str]]:
"""Run."""
step = True
workflow.do_engine_steps()
tasks_status = ProcessStatus()
if not workflow.is_completed():
if workflow.is_completed():
return tasks_status
ready_tasks = workflow.get_ready_user_tasks()
options = {}
formatted_options = {}
ready_tasks = workflow.get_ready_user_tasks()
options = {}
formatted_options = {}
for idx, task in enumerate(ready_tasks):
option = format_task(task, False)
options[str(idx + 1)] = task
formatted_options[str(idx + 1)] = option
for idx, task in enumerate(ready_tasks):
option = format_task(task, False)
options[str(idx + 1)] = task
formatted_options[str(idx + 1)] = option
if task_identifier is None:
return formatted_options
if task_identifier is None:
return formatted_options
next_task = options[task_identifier]
if isinstance(next_task.task_spec, UserTask):
if answer is None:
return complete_user_task(next_task)
else:
complete_user_task(next_task, answer)
next_task.complete()
elif isinstance(next_task.task_spec, ManualTask):
next_task.complete()
next_task = options[task_identifier]
if isinstance(next_task.task_spec, UserTask):
if answer is None:
return complete_user_task(next_task)
else:
complete_user_task(next_task, answer)
next_task.complete()
elif isinstance(next_task.task_spec, ManualTask):
next_task.complete()
else:
next_task.complete()
workflow.refresh_waiting_tasks()
workflow.do_engine_steps()
if step:
tasks_status = get_state(workflow)
workflow.refresh_waiting_tasks()
workflow.do_engine_steps()
tasks_status = get_state(workflow)
ready_tasks = workflow.get_ready_user_tasks()
formatted_options = {}
for idx, task in enumerate(ready_tasks):
option = format_task(task, False)
formatted_options[str(idx + 1)] = option
ready_tasks = workflow.get_ready_user_tasks()
formatted_options = {}
for idx, task in enumerate(ready_tasks):
option = format_task(task, False)
formatted_options[str(idx + 1)] = option
state = serializer.serialize_json(workflow)
process_instance = ProcessInstanceModel.query.filter().first()
state = serializer.serialize_json(workflow)
process_instance = ProcessInstanceModel.query.filter().first()
if process_instance is None:
process_instance = create_process_instance()
process_instance.bpmn_json = state
db.session.add(process_instance)
db.session.commit()
if process_instance is None:
process_instance = create_process_instance()
process_instance.bpmn_json = state
db.session.add(process_instance)
db.session.commit()
tasks_status["next_activity"] = formatted_options
tasks_status["next_activity"] = formatted_options
return tasks_status