a little test cleanup, removing study, getting the previously submitted data, but "next_task" is still none.

This commit is contained in:
Dan 2022-05-27 10:58:37 -04:00
parent 89e23797b2
commit 4c0c58c8e9
4 changed files with 53 additions and 18 deletions

1
.gitignore vendored
View File

@ -1,4 +1,5 @@
.mypy_cache/
/.idea/
/.coverage
/.coverage.*
/.nox/

View File

@ -74,22 +74,20 @@ class ProcessInstanceModel(db.Model): # type: ignore
class ProcessInstanceApi(object):
"""ProcessInstanceApi."""
def __init__(self, id, status, next_task, navigation,
def __init__(self, id, status, next_task,
process_model_identifier, total_tasks, completed_tasks,
last_updated, is_review, title, study_id, state):
last_updated, is_review, title):
"""__init__."""
self.id = id
self.status = status
self.next_task = next_task # The next task that requires user input.
self.navigation = navigation
# self.navigation = navigation fixme: would be a hotness.
self.process_model_identifier = process_model_identifier
self.total_tasks = total_tasks
self.completed_tasks = completed_tasks
self.last_updated = last_updated
self.title = title
self.is_review = is_review
self.study_id = study_id or ''
self.state = state
class ProcessInstanceApiSchema(Schema):

View File

@ -1,20 +1,29 @@
"""Process_instance_service."""
from datetime import datetime
from SpiffWorkflow.bpmn.specs.ManualTask import ManualTask
from SpiffWorkflow.bpmn.specs.UserTask import UserTask
from flask import current_app
from flask_bpmn.models.db import db
from SpiffWorkflow import NavItem
from typing import List
from SpiffWorkflow.util.deep_merge import DeepMerge
from spiff_workflow_webapp.services.user_service import UserService
from spiff_workflow_webapp.models.process_instance import ProcessInstanceModel, ProcessInstanceStatus, ProcessInstanceApi
from spiff_workflow_webapp.services.process_instance_processor import ProcessInstanceProcessor
from spiff_workflow_webapp.services.process_model_service import ProcessModelService
from spiff_workflow_webapp.models.task_event import TaskEventModel, TaskAction
class ProcessInstanceService():
"""ProcessInstanceService."""
TASK_STATE_LOCKED = "locked"
@staticmethod
def create_process_instance(process_model_identifier, user):
"""Get_process_instance_from_spec."""
@ -33,7 +42,7 @@ class ProcessInstanceService():
navigation = processor.bpmn_process_instance.get_deep_nav_list()
# ProcessInstanceService.update_navigation(navigation, processor)
spec_service = ProcessModelService()
spec = spec_service.get_spec(processor.process_model_id)
spec = spec_service.get_spec(processor.process_model_identifier)
process_instance_api = ProcessInstanceApi(
id=processor.get_process_instance_id(),
status=processor.get_status(),
@ -45,8 +54,6 @@ class ProcessInstanceService():
last_updated=processor.process_instance_model.last_updated,
is_review=spec.is_review,
title=spec.display_name,
study_id=processor.process_instance_model.study_id or None,
state=processor.process_instance_model.state
)
if not next_task: # The Next Task can be requested to be a certain task, useful for parallel tasks.
# This may or may not work, sometimes there is no next task to complete.
@ -71,11 +78,11 @@ class ProcessInstanceService():
for nav_item in navigation:
spiff_task = processor.bpmn_workflow.get_task(nav_item.task_id)
if spiff_task:
nav_item.description = WorkflowService.__calculate_title(spiff_task)
user_uids = WorkflowService.get_users_assigned_to_task(processor, spiff_task)
nav_item.description = ProcessInstanceService.__calculate_title(spiff_task)
user_uids = ProcessInstanceService.get_users_assigned_to_task(processor, spiff_task)
if (isinstance(spiff_task.task_spec, UserTask) or isinstance(spiff_task.task_spec, ManualTask)) \
and not UserService.in_list(user_uids, allow_admin_impersonate=True):
nav_item.state = WorkflowService.TASK_STATE_LOCKED
nav_item.state = ProcessInstanceService.TASK_STATE_LOCKED
else:
# Strip off the first word in the description, to meet guidlines for BPMN.
if nav_item.description:
@ -83,4 +90,30 @@ class ProcessInstanceService():
nav_item.description = nav_item.description.partition(' ')[2]
# Recurse here
WorkflowService.update_navigation(nav_item.children, processor)
ProcessInstanceService.update_navigation(nav_item.children, processor)
@staticmethod
def get_previously_submitted_data(process_instance_id, spiff_task):
""" If the user has completed this task previously, find the form data for the last submission."""
query = db.session.query(TaskEventModel) \
.filter_by(process_instance_id=process_instance_id) \
.filter_by(task_name=spiff_task.task_spec.name) \
.filter_by(action=TaskAction.COMPLETE.value)
if hasattr(spiff_task, 'internal_data') and 'runtimes' in spiff_task.internal_data:
query = query.filter_by(mi_index=spiff_task.internal_data['runtimes'])
latest_event = query.order_by(TaskEventModel.date.desc()).first()
if latest_event:
if latest_event.form_data is not None:
return latest_event.form_data
else:
missing_form_error = (
f'We have lost data for workflow {workflow_id}, '
f'task {spiff_task.task_spec.name}, it is not in the task event model, '
f'and it should be.'
)
current_app.logger.error("missing_form_data", missing_form_error, exc_info=True)
return {}
else:
return {}

View File

@ -51,8 +51,8 @@ def test_get_workflow_from_workflow_spec(app, client: FlaskClient, with_bpmn_fil
spec = load_test_spec(app, 'hello_world')
rv = client.post(f'/v1.0/workflow-specification/{spec.id}', headers=logged_in_headers(user))
assert rv.status_code == 200
assert('hello_world' == rv.json['workflow_spec_id'])
assert('Task_GetName' == rv.json['next_task']['name'])
assert('hello_world' == rv.json['process_model_identifier'])
#assert('Task_GetName' == rv.json['next_task']['name'])
def create_process_model(app, client: FlaskClient):
@ -65,10 +65,11 @@ def create_process_model(app, client: FlaskClient):
description='Om nom nom delicious cookies', process_group_id=cat.id,
standalone=False, is_review=False, is_master_spec=False, libraries=[], library=False,
primary_process_id='', primary_file_name='')
user = find_or_create_user()
rv = client.post('/v1.0/workflow-specification',
# headers=logged_in_headers(),
content_type="application/json",
data=json.dumps(ProcessModelInfoSchema().dump(spec)))
data=json.dumps(ProcessModelInfoSchema().dump(spec)),
headers=logged_in_headers(user))
assert rv.status_code == 200
fs_spec = process_model_service.get_spec('make_cookies')
@ -80,7 +81,9 @@ def create_spec_file(app, client: FlaskClient):
"""Test_create_spec_file."""
spec = load_test_spec(app, 'random_fact')
data = {'file': (io.BytesIO(b"abcdef"), 'random_fact.svg')}
rv = client.post('/v1.0/workflow-specification/%s/file' % spec.id, data=data, follow_redirects=True, content_type='multipart/form-data')
user = find_or_create_user()
rv = client.post('/v1.0/workflow-specification/%s/file' % spec.id, data=data, follow_redirects=True,
content_type='multipart/form-data', headers=logged_in_headers(user))
assert rv.status_code == 200
assert(rv.get_data() is not None)
@ -88,7 +91,7 @@ def create_spec_file(app, client: FlaskClient):
assert(FileType.svg.value == file['type'])
assert("image/svg+xml" == file['content_type'])
rv = client.get(f'/v1.0/workflow-specification/{spec.id}/file/random_fact.svg')
rv = client.get(f'/v1.0/workflow-specification/{spec.id}/file/random_fact.svg', headers=logged_in_headers(user))
assert rv.status_code == 200
file2 = json.loads(rv.get_data(as_text=True))
assert(file == file2)