Merge pull request #126 from sartography/dev

Data Refactor going from dev to rrt/dev
This commit is contained in:
Dan Funk 2020-06-19 08:32:25 -04:00 committed by GitHub
commit 898cf43093
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 484 additions and 167 deletions

View File

@ -41,6 +41,7 @@ gunicorn = "*"
werkzeug = "*"
sentry-sdk = {extras = ["flask"],version = "==0.14.4"}
flask-mail = "*"
flask-admin = "*"
[requires]
python_version = "3.7"

58
Pipfile.lock generated
View File

@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
"sha256": "faaf0e1f31f4bf99df366e52df20bb148a05996a0e6467767660665c514af2d7"
"sha256": "78a8da35dec2fb58b02a58afc8ffabe8b1c22bec8f054295e8b1ba3b4a6f4ec0"
},
"pipfile-spec": 6,
"requires": {
@ -261,6 +261,13 @@
"index": "pypi",
"version": "==1.1.2"
},
"flask-admin": {
"hashes": [
"sha256:68c761d8582d59b1f7702013e944a7ad11d7659a72f3006b89b68b0bd8df61b8"
],
"index": "pypi",
"version": "==1.5.6"
},
"flask-bcrypt": {
"hashes": [
"sha256:d71c8585b2ee1c62024392ebdbc447438564e2c8c02b4e57b56a4cafd8d13c5f"
@ -558,25 +565,25 @@
},
"pandas": {
"hashes": [
"sha256:034185bb615dc96d08fa13aacba8862949db19d5e7804d6ee242d086f07bcc46",
"sha256:0c9b7f1933e3226cc16129cf2093338d63ace5c85db7c9588e3e1ac5c1937ad5",
"sha256:1f6fcf0404626ca0475715da045a878c7062ed39bc859afc4ccf0ba0a586a0aa",
"sha256:1fc963ba33c299973e92d45466e576d11f28611f3549469aec4a35658ef9f4cc",
"sha256:29b4cfee5df2bc885607b8f016e901e63df7ffc8f00209000471778f46cc6678",
"sha256:2a8b6c28607e3f3c344fe3e9b3cd76d2bf9f59bc8c0f2e582e3728b80e1786dc",
"sha256:2bc2ff52091a6ac481cc75d514f06227dc1b10887df1eb72d535475e7b825e31",
"sha256:415e4d52fcfd68c3d8f1851cef4d947399232741cc994c8f6aa5e6a9f2e4b1d8",
"sha256:519678882fd0587410ece91e3ff7f73ad6ded60f6fcb8aa7bcc85c1dc20ecac6",
"sha256:51e0abe6e9f5096d246232b461649b0aa627f46de8f6344597ca908f2240cbaa",
"sha256:698e26372dba93f3aeb09cd7da2bb6dd6ade248338cfe423792c07116297f8f4",
"sha256:83af85c8e539a7876d23b78433d90f6a0e8aa913e37320785cf3888c946ee874",
"sha256:982cda36d1773076a415ec62766b3c0a21cdbae84525135bdb8f460c489bb5dd",
"sha256:a647e44ba1b3344ebc5991c8aafeb7cca2b930010923657a273b41d86ae225c4",
"sha256:b35d625282baa7b51e82e52622c300a1ca9f786711b2af7cbe64f1e6831f4126",
"sha256:bab51855f8b318ef39c2af2c11095f45a10b74cbab4e3c8199efcc5af314c648"
"sha256:02f1e8f71cd994ed7fcb9a35b6ddddeb4314822a0e09a9c5b2d278f8cb5d4096",
"sha256:13f75fb18486759da3ff40f5345d9dd20e7d78f2a39c5884d013456cec9876f0",
"sha256:35b670b0abcfed7cad76f2834041dcf7ae47fd9b22b63622d67cdc933d79f453",
"sha256:4c73f373b0800eb3062ffd13d4a7a2a6d522792fa6eb204d67a4fad0a40f03dc",
"sha256:5759edf0b686b6f25a5d4a447ea588983a33afc8a0081a0954184a4a87fd0dd7",
"sha256:5a7cf6044467c1356b2b49ef69e50bf4d231e773c3ca0558807cdba56b76820b",
"sha256:69c5d920a0b2a9838e677f78f4dde506b95ea8e4d30da25859db6469ded84fa8",
"sha256:8778a5cc5a8437a561e3276b85367412e10ae9fff07db1eed986e427d9a674f8",
"sha256:9871ef5ee17f388f1cb35f76dc6106d40cb8165c562d573470672f4cdefa59ef",
"sha256:9c31d52f1a7dd2bb4681d9f62646c7aa554f19e8e9addc17e8b1b20011d7522d",
"sha256:ab8173a8efe5418bbe50e43f321994ac6673afc5c7c4839014cf6401bbdd0705",
"sha256:ae961f1f0e270f1e4e2273f6a539b2ea33248e0e3a11ffb479d757918a5e03a9",
"sha256:b3c4f93fcb6e97d993bf87cdd917883b7dab7d20c627699f360a8fb49e9e0b91",
"sha256:c9410ce8a3dee77653bc0684cfa1535a7f9c291663bd7ad79e39f5ab58f67ab3",
"sha256:f69e0f7b7c09f1f612b1f8f59e2df72faa8a6b41c5a436dde5b615aaf948f107",
"sha256:faa42a78d1350b02a7d2f0dbe3c80791cf785663d6997891549d0f86dc49125e"
],
"index": "pypi",
"version": "==1.0.4"
"version": "==1.0.5"
},
"psycopg2-binary": {
"hashes": [
@ -711,11 +718,11 @@
},
"requests": {
"hashes": [
"sha256:43999036bfa82904b6af1d99e4882b560e5e2c68e5c4b0aa03b655f3d7d73fee",
"sha256:b3f43d496c6daba4493e7c431722aeb7dbc6288f52a6e04e7b6023b0247817e6"
"sha256:b3559a131db72c33ee969480840fff4bb6dd111de7dd27c8ee1f820f4f00231b",
"sha256:fe75cc94a9443b9246fc7049224f75604b113c36acb93f87b80ed42c44cbb898"
],
"index": "pypi",
"version": "==2.23.0"
"version": "==2.24.0"
},
"sentry-sdk": {
"extras": [
@ -802,7 +809,7 @@
"spiffworkflow": {
"editable": true,
"git": "https://github.com/sartography/SpiffWorkflow.git",
"ref": "b8a064a0bb76c705a1be04ee9bb8ac7beee56eb0"
"ref": "5450dc0463a95811d386b7de063d950bf6179d2b"
},
"sqlalchemy": {
"hashes": [
@ -890,6 +897,13 @@
"index": "pypi",
"version": "==1.0.1"
},
"wtforms": {
"hashes": [
"sha256:6ff8635f4caeed9f38641d48cfe019d0d3896f41910ab04494143fc027866e1b",
"sha256:861a13b3ae521d6700dac3b2771970bd354a63ba7043ecc3a82b5288596a1972"
],
"version": "==2.3.1"
},
"xlrd": {
"hashes": [
"sha256:546eb36cee8db40c3eaa46c351e67ffee6eeb5fa2650b71bc4c758a29a1b29b2",

View File

@ -4,6 +4,8 @@ import sentry_sdk
import connexion
from jinja2 import Environment, FileSystemLoader
from flask_admin import Admin
from flask_admin.contrib.sqla import ModelView
from flask_cors import CORS
from flask_marshmallow import Marshmallow
from flask_mail import Mail
@ -37,13 +39,16 @@ ma = Marshmallow(app)
from crc import models
from crc import api
from crc.api import admin
connexion_app.add_api('api.yml', base_path='/v1.0')
# Convert list of allowed origins to list of regexes
origins_re = [r"^https?:\/\/%s(.*)" % o.replace('.', '\.') for o in app.config['CORS_ALLOW_ORIGINS']]
cors = CORS(connexion_app.app, origins=origins_re)
# Sentry error handling
if app.config['ENABLE_SENTRY']:
sentry_sdk.init(
dsn="https://25342ca4e2d443c6a5c49707d68e9f40@o401361.ingest.sentry.io/5260915",
@ -88,3 +93,10 @@ def clear_db():
"""Load example data into the database."""
from example_data import ExampleDataLoader
ExampleDataLoader.clean_db()
@app.cli.command()
def rrt_data_fix():
"""Finds all the empty task event logs, and populates
them with good wholesome data."""
from crc.services.workflow_service import WorkflowService
WorkflowService.fix_legacy_data_model_for_rrt()

72
crc/api/admin.py Normal file
View File

@ -0,0 +1,72 @@
# Admin app
import json
from flask import url_for
from flask_admin import Admin
from flask_admin.contrib import sqla
from flask_admin.contrib.sqla import ModelView
from werkzeug.utils import redirect
from jinja2 import Markup
from crc import db, app
from crc.api.user import verify_token, verify_token_admin
from crc.models.approval import ApprovalModel
from crc.models.file import FileModel
from crc.models.stats import TaskEventModel
from crc.models.study import StudyModel
from crc.models.user import UserModel
from crc.models.workflow import WorkflowModel
class AdminModelView(sqla.ModelView):
can_create = False
can_edit = False
can_delete = False
page_size = 50 # the number of entries to display on the list view
column_exclude_list = ['bpmn_workflow_json', ]
column_display_pk = True
can_export = True
def is_accessible(self):
return verify_token_admin()
def inaccessible_callback(self, name, **kwargs):
# redirect to login page if user doesn't have access
return redirect(url_for('home'))
class UserView(AdminModelView):
column_filters = ['uid']
class StudyView(AdminModelView):
column_filters = ['id', 'primary_investigator_id']
column_searchable_list = ['title']
class ApprovalView(AdminModelView):
column_filters = ['study_id', 'approver_uid']
class WorkflowView(AdminModelView):
column_filters = ['study_id', 'id']
class FileView(AdminModelView):
column_filters = ['workflow_id']
def json_formatter(view, context, model, name):
value = getattr(model, name)
json_value = json.dumps(value, ensure_ascii=False, indent=2)
return Markup('<pre>{}</pre>'.format(json_value))
class TaskEventView(AdminModelView):
column_filters = ['workflow_id', 'action']
column_list = ['study_id', 'user_id', 'workflow_id', 'action', 'task_title', 'form_data', 'date']
column_formatters = {
'form_data': json_formatter,
}
admin = Admin(app)
admin.add_view(StudyView(StudyModel, db.session))
admin.add_view(ApprovalView(ApprovalModel, db.session))
admin.add_view(UserView(UserModel, db.session))
admin.add_view(WorkflowView(WorkflowModel, db.session))
admin.add_view(FileView(FileModel, db.session))
admin.add_view(TaskEventView(TaskEventModel, db.session))

View File

@ -1,7 +1,7 @@
import uuid
from SpiffWorkflow.util.deep_merge import DeepMerge
from flask import g
from crc import session, app
from crc.api.common import ApiError, ApiErrorSchema
from crc.models.api_models import WorkflowApi, WorkflowApiSchema, NavigationItem, NavigationItemSchema
@ -96,59 +96,10 @@ def delete_workflow_specification(spec_id):
session.commit()
def __get_workflow_api_model(processor: WorkflowProcessor, next_task = None):
"""Returns an API model representing the state of the current workflow, if requested, and
possible, next_task is set to the current_task."""
nav_dict = processor.bpmn_workflow.get_nav_list()
navigation = []
for nav_item in nav_dict:
spiff_task = processor.bpmn_workflow.get_task(nav_item['task_id'])
if 'description' in nav_item:
nav_item['title'] = nav_item.pop('description')
# fixme: duplicate code from the workflow_service. Should only do this in one place.
if ' ' in nav_item['title']:
nav_item['title'] = nav_item['title'].partition(' ')[2]
else:
nav_item['title'] = ""
if spiff_task:
nav_item['task'] = WorkflowService.spiff_task_to_api_task(spiff_task, add_docs_and_forms=False)
nav_item['title'] = nav_item['task'].title # Prefer the task title.
else:
nav_item['task'] = None
if not 'is_decision' in nav_item:
nav_item['is_decision'] = False
navigation.append(NavigationItem(**nav_item))
NavigationItemSchema().dump(nav_item)
spec = session.query(WorkflowSpecModel).filter_by(id=processor.workflow_spec_id).first()
workflow_api = WorkflowApi(
id=processor.get_workflow_id(),
status=processor.get_status(),
next_task=None,
navigation=navigation,
workflow_spec_id=processor.workflow_spec_id,
spec_version=processor.get_version_string(),
is_latest_spec=processor.is_latest_spec,
total_tasks=len(navigation),
completed_tasks=processor.workflow_model.completed_tasks,
last_updated=processor.workflow_model.last_updated,
title=spec.display_name
)
if not next_task: # The Next Task can be requested to be a certain task, useful for parallel tasks.
# This may or may not work, sometimes there is no next task to complete.
next_task = processor.next_task()
if next_task:
workflow_api.next_task = WorkflowService.spiff_task_to_api_task(next_task, add_docs_and_forms=True)
return workflow_api
def get_workflow(workflow_id, soft_reset=False, hard_reset=False):
workflow_model: WorkflowModel = session.query(WorkflowModel).filter_by(id=workflow_id).first()
processor = WorkflowProcessor(workflow_model, soft_reset=soft_reset, hard_reset=hard_reset)
workflow_api_model = __get_workflow_api_model(processor)
workflow_api_model = WorkflowService.processor_to_workflow_api(processor)
return WorkflowApiSchema().dump(workflow_api_model)
@ -161,17 +112,20 @@ def set_current_task(workflow_id, task_id):
user_uid = __get_user_uid(workflow_model.study.user_uid)
processor = WorkflowProcessor(workflow_model)
task_id = uuid.UUID(task_id)
task = processor.bpmn_workflow.get_task(task_id)
if task.state != task.COMPLETED and task.state != task.READY:
spiff_task = processor.bpmn_workflow.get_task(task_id)
if spiff_task.state != spiff_task.COMPLETED and spiff_task.state != spiff_task.READY:
raise ApiError("invalid_state", "You may not move the token to a task who's state is not "
"currently set to COMPLETE or READY.")
# Only reset the token if the task doesn't already have it.
if task.state == task.COMPLETED:
task.reset_token(reset_data=False) # we could optionally clear the previous data.
if spiff_task.state == spiff_task.COMPLETED:
spiff_task.reset_token(reset_data=True) # Don't try to copy the existing data back into this task.
processor.save()
WorkflowService.log_task_action(user_uid, processor, task, WorkflowService.TASK_ACTION_TOKEN_RESET)
workflow_api_model = __get_workflow_api_model(processor, task)
WorkflowService.log_task_action(user_uid, workflow_model, spiff_task,
WorkflowService.TASK_ACTION_TOKEN_RESET,
version=processor.get_version_string())
workflow_api_model = WorkflowService.processor_to_workflow_api(processor, spiff_task)
return WorkflowApiSchema().dump(workflow_api_model)
@ -187,17 +141,19 @@ def update_task(workflow_id, task_id, body):
user_uid = __get_user_uid(workflow_model.study.user_uid)
processor = WorkflowProcessor(workflow_model)
task_id = uuid.UUID(task_id)
task = processor.bpmn_workflow.get_task(task_id)
if task.state != task.READY:
spiff_task = processor.bpmn_workflow.get_task(task_id)
if spiff_task.state != spiff_task.READY:
raise ApiError("invalid_state", "You may not update a task unless it is in the READY state. "
"Consider calling a token reset to make this task Ready.")
task.update_data(body)
processor.complete_task(task)
if body: # IF and only if we get the body back, update the task data with the content.
spiff_task.data = body # Accept the data from the front end as complete. Do not merge it in, as then it is impossible to remove items.
processor.complete_task(spiff_task)
processor.do_engine_steps()
processor.save()
WorkflowService.log_task_action(user_uid, processor, task, WorkflowService.TASK_ACTION_COMPLETE)
workflow_api_model = __get_workflow_api_model(processor)
WorkflowService.log_task_action(user_uid, workflow_model, spiff_task, WorkflowService.TASK_ACTION_COMPLETE,
version=processor.get_version_string())
workflow_api_model = WorkflowService.processor_to_workflow_api(processor)
return WorkflowApiSchema().dump(workflow_api_model)

View File

@ -36,6 +36,7 @@ class Task(object):
PROP_OPTIONS_FILE = "spreadsheet.name"
PROP_OPTIONS_VALUE_COLUMN = "spreadsheet.value.column"
PROP_OPTIONS_LABEL_COL = "spreadsheet.label.column"
PROP_OPTIONS_READ_ONLY = "read_only"
PROP_LDAP_LOOKUP = "ldap.lookup"
VALIDATION_REQUIRED = "required"
FIELD_TYPE_AUTO_COMPLETE = "autocomplete"

View File

@ -17,6 +17,7 @@ class TaskEventModel(db.Model):
task_title = db.Column(db.String)
task_type = db.Column(db.String)
task_state = db.Column(db.String)
form_data = db.Column(db.JSON) # And form data submitted when the task was completed.
mi_type = db.Column(db.String)
mi_count = db.Column(db.Integer)
mi_index = db.Column(db.Integer)

View File

@ -102,14 +102,15 @@ class WorkflowProcessor(object):
def __init__(self, workflow_model: WorkflowModel, soft_reset=False, hard_reset=False, validate_only=False):
"""Create a Workflow Processor based on the serialized information available in the workflow model.
If soft_reset is set to true, it will try to use the latest version of the workflow specification.
If hard_reset is set to true, it will create a new Workflow, but embed the data from the last
completed task in the previous workflow.
If soft_reset is set to true, it will try to use the latest version of the workflow specification
without resetting to the beginning of the workflow. This will work for some minor changes to the spec.
If hard_reset is set to true, it will use the latest spec, and start the workflow over from the beginning.
which should work in casees where a soft reset fails.
If neither flag is set, it will use the same version of the specification that was used to originally
create the workflow model. """
self.workflow_model = workflow_model
if soft_reset or len(workflow_model.dependencies) == 0:
if soft_reset or len(workflow_model.dependencies) == 0: # Depenencies of 0 means the workflow was never started.
self.spec_data_files = FileService.get_spec_data_files(
workflow_spec_id=workflow_model.workflow_spec_id)
else:
@ -216,8 +217,6 @@ class WorkflowProcessor(object):
full_version = "v%s (%s)" % (version, files)
return full_version
def update_dependencies(self, spec_data_files):
existing_dependencies = FileService.get_spec_data_files(
workflow_spec_id=self.workflow_model.workflow_spec_id,
@ -299,25 +298,12 @@ class WorkflowProcessor(object):
return WorkflowStatus.waiting
def hard_reset(self):
"""Recreate this workflow, but keep the data from the last completed task and add
it back into the first task. This may be useful when a workflow specification changes,
and users need to review all the prior steps, but they don't need to reenter all the previous data.
Returns the new version.
"""Recreate this workflow. This will be useful when a workflow specification changes.
"""
# Create a new workflow based on the latest specs.
self.spec_data_files = FileService.get_spec_data_files(workflow_spec_id=self.workflow_spec_id)
new_spec = WorkflowProcessor.get_spec(self.spec_data_files, self.workflow_spec_id)
new_bpmn_workflow = BpmnWorkflow(new_spec, script_engine=self._script_engine)
new_bpmn_workflow.data = self.bpmn_workflow.data
# Reset the current workflow to the beginning - which we will consider to be the first task after the root
# element. This feels a little sketchy, but I think it is safe to assume root will have one child.
first_task = self.bpmn_workflow.task_tree.children[0]
first_task.reset_token(reset_data=False)
for task in new_bpmn_workflow.get_tasks(SpiffTask.READY):
task.data = first_task.data
new_bpmn_workflow.do_engine_steps()
self.bpmn_workflow = new_bpmn_workflow

View File

@ -1,3 +1,4 @@
import copy
import string
from datetime import datetime
import random
@ -5,25 +6,26 @@ import random
import jinja2
from SpiffWorkflow import Task as SpiffTask, WorkflowException
from SpiffWorkflow.bpmn.specs.ManualTask import ManualTask
from SpiffWorkflow.bpmn.specs.MultiInstanceTask import MultiInstanceTask
from SpiffWorkflow.bpmn.specs.ScriptTask import ScriptTask
from SpiffWorkflow.bpmn.specs.UserTask import UserTask
from SpiffWorkflow.dmn.specs.BusinessRuleTask import BusinessRuleTask
from SpiffWorkflow.specs import CancelTask, StartTask
from flask import g
from SpiffWorkflow.util.deep_merge import DeepMerge
from jinja2 import Template
from crc import db, app
from crc.api.common import ApiError
from crc.models.api_models import Task, MultiInstanceType
from crc.models.api_models import Task, MultiInstanceType, NavigationItem, NavigationItemSchema, WorkflowApi
from crc.models.file import LookupDataModel
from crc.models.stats import TaskEventModel
from crc.models.study import StudyModel
from crc.models.user import UserModel
from crc.models.workflow import WorkflowModel, WorkflowStatus
from crc.models.workflow import WorkflowModel, WorkflowStatus, WorkflowSpecModel
from crc.services.file_service import FileService
from crc.services.lookup_service import LookupService
from crc.services.study_service import StudyService
from crc.services.workflow_processor import WorkflowProcessor, CustomBpmnScriptEngine
from crc.services.workflow_processor import WorkflowProcessor
class WorkflowService(object):
@ -180,13 +182,83 @@ class WorkflowService(object):
def __get_options(self):
pass
@staticmethod
def _random_string(string_length=10):
"""Generate a random string of fixed length """
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(string_length))
@staticmethod
def processor_to_workflow_api(processor: WorkflowProcessor, next_task=None):
"""Returns an API model representing the state of the current workflow, if requested, and
possible, next_task is set to the current_task."""
nav_dict = processor.bpmn_workflow.get_nav_list()
navigation = []
for nav_item in nav_dict:
spiff_task = processor.bpmn_workflow.get_task(nav_item['task_id'])
if 'description' in nav_item:
nav_item['title'] = nav_item.pop('description')
# fixme: duplicate code from the workflow_service. Should only do this in one place.
if ' ' in nav_item['title']:
nav_item['title'] = nav_item['title'].partition(' ')[2]
else:
nav_item['title'] = ""
if spiff_task:
nav_item['task'] = WorkflowService.spiff_task_to_api_task(spiff_task, add_docs_and_forms=False)
nav_item['title'] = nav_item['task'].title # Prefer the task title.
else:
nav_item['task'] = None
if not 'is_decision' in nav_item:
nav_item['is_decision'] = False
navigation.append(NavigationItem(**nav_item))
NavigationItemSchema().dump(nav_item)
spec = db.session.query(WorkflowSpecModel).filter_by(id=processor.workflow_spec_id).first()
workflow_api = WorkflowApi(
id=processor.get_workflow_id(),
status=processor.get_status(),
next_task=None,
navigation=navigation,
workflow_spec_id=processor.workflow_spec_id,
spec_version=processor.get_version_string(),
is_latest_spec=processor.is_latest_spec,
total_tasks=len(navigation),
completed_tasks=processor.workflow_model.completed_tasks,
last_updated=processor.workflow_model.last_updated,
title=spec.display_name
)
if not next_task: # The Next Task can be requested to be a certain task, useful for parallel tasks.
# This may or may not work, sometimes there is no next task to complete.
next_task = processor.next_task()
if next_task:
previous_form_data = WorkflowService.get_previously_submitted_data(processor.workflow_model.id, next_task)
DeepMerge.merge(next_task.data, previous_form_data)
workflow_api.next_task = WorkflowService.spiff_task_to_api_task(next_task, add_docs_and_forms=True)
return workflow_api
@staticmethod
def get_previously_submitted_data(workflow_id, task):
""" If the user has completed this task previously, find the form data for the last submission."""
latest_event = db.session.query(TaskEventModel) \
.filter_by(workflow_id=workflow_id) \
.filter_by(task_name=task.task_spec.name) \
.filter_by(action=WorkflowService.TASK_ACTION_COMPLETE) \
.order_by(TaskEventModel.date.desc()).first()
if latest_event:
if latest_event.form_data is not None:
return latest_event.form_data
else:
app.logger.error("missing_form_dat", "We have lost data for workflow %i, task %s, it is not "
"in the task event model, "
"and it should be." % (workflow_id, task.task_spec.name))
return {}
else:
return {}
@staticmethod
def spiff_task_to_api_task(spiff_task, add_docs_and_forms=False):
task_type = spiff_task.task_spec.__class__.__name__
@ -318,21 +390,22 @@ class WorkflowService(object):
field.options.append({"id": d.value, "name": d.label})
@staticmethod
def log_task_action(user_uid, processor, spiff_task, action):
def log_task_action(user_uid, workflow_model, spiff_task, action, version):
task = WorkflowService.spiff_task_to_api_task(spiff_task)
workflow_model = processor.workflow_model
form_data = WorkflowService.extract_form_data(spiff_task.data, spiff_task)
task_event = TaskEventModel(
study_id=workflow_model.study_id,
user_uid=user_uid,
workflow_id=workflow_model.id,
workflow_spec_id=workflow_model.workflow_spec_id,
spec_version=processor.get_version_string(),
spec_version=version,
action=action,
task_id=task.id,
task_name=task.name,
task_title=task.title,
task_type=str(task.type),
task_state=task.state,
form_data=form_data,
mi_type=task.multi_instance_type.value, # Some tasks have a repeat behavior.
mi_count=task.multi_instance_count, # This is the number of times the task could repeat.
mi_index=task.multi_instance_index, # And the index of the currently repeating task.
@ -342,3 +415,64 @@ class WorkflowService(object):
db.session.add(task_event)
db.session.commit()
@staticmethod
def fix_legacy_data_model_for_rrt():
""" Remove this after use! This is just to fix RRT so the data is handled correctly.
Utility that is likely called via the flask command line, it will loop through all the
workflows in the system and attempt to add the right data into the task action log so that
users do not have to re fill out all of the forms if they start over or go back in the workflow.
Viciously inefficient, but should only have to run one time for RRT"""
workflows = db.session.query(WorkflowModel).all()
for workflow_model in workflows:
task_logs = db.session.query(TaskEventModel) \
.filter(TaskEventModel.workflow_id == workflow_model.id) \
.filter(TaskEventModel.action == WorkflowService.TASK_ACTION_COMPLETE) \
.order_by(TaskEventModel.date.desc()).all()
processor = WorkflowProcessor(workflow_model)
# Grab all the data from last task completed, which will be everything in this
# rrt situation because of how we were keeping all the data at the time.
latest_data = processor.next_task().data
# Move forward in the task spec tree, dropping any data that would have been
# added in subsequent tasks, just looking at form data, will not track the automated
# task data additions, hopefully this doesn't hang us.
for log in task_logs:
# if log.task_data is not None: # Only do this if the task event does not have data populated in it.
# continue
data = copy.deepcopy(latest_data) # Or you end up with insane crazy issues.
# In the simple case of RRT, there is exactly one task for the given task_spec
task = processor.bpmn_workflow.get_tasks_from_spec_name(log.task_name)[0]
data = WorkflowService.extract_form_data(data, task)
log.form_data = data
db.session.add(log)
db.session.commit()
@staticmethod
def extract_form_data(latest_data, task):
"""Removes data from latest_data that would be added by the child task or any of it's children."""
data = {}
if hasattr(task.task_spec, 'form'):
for field in task.task_spec.form.fields:
if field.has_property(Task.PROP_OPTIONS_READ_ONLY) and \
field.get_property(Task.PROP_OPTIONS_READ_ONLY).lower().strip() == "true":
continue # Don't add read-only data
elif field.has_property(Task.PROP_OPTIONS_REPEAT):
group = field.get_property(Task.PROP_OPTIONS_REPEAT)
if group in latest_data:
data[group] = latest_data[group]
elif isinstance(task.task_spec, MultiInstanceTask):
group = task.task_spec.elementVar
if group in latest_data:
data[group] = latest_data[group]
else:
if field.id in latest_data:
data[field.id] = latest_data[field.id]
return data

View File

@ -0,0 +1,28 @@
"""empty message
Revision ID: 1fdd1bdb600e
Revises: 17597692d0b0
Create Date: 2020-06-17 16:44:16.427988
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '1fdd1bdb600e'
down_revision = '17597692d0b0'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('task_event', sa.Column('task_data', sa.JSON(), nullable=True))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('task_event', 'task_data')
# ### end Alembic commands ###

View File

@ -0,0 +1,30 @@
"""empty message
Revision ID: de30304ff5e6
Revises: 1fdd1bdb600e
Create Date: 2020-06-18 16:19:11.133665
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = 'de30304ff5e6'
down_revision = '1fdd1bdb600e'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('task_event', sa.Column('form_data', sa.JSON(), nullable=True))
op.drop_column('task_event', 'task_data')
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('task_event', sa.Column('task_data', postgresql.JSON(astext_type=sa.Text()), autoincrement=False, nullable=True))
op.drop_column('task_event', 'form_data')
# ### end Alembic commands ###

View File

@ -4,14 +4,86 @@ import random
from unittest.mock import patch
from tests.base_test import BaseTest
from crc import session, app
from crc.models.api_models import WorkflowApiSchema, MultiInstanceType, TaskSchema
from crc.models.file import FileModelSchema
from crc.models.workflow import WorkflowStatus
from crc.services.workflow_service import WorkflowService
from crc.models.stats import TaskEventModel
class TestTasksApi(BaseTest):
def get_workflow_api(self, workflow, soft_reset=False, hard_reset=False):
rv = self.app.get('/v1.0/workflow/%i?soft_reset=%s&hard_reset=%s' %
(workflow.id, str(soft_reset), str(hard_reset)),
headers=self.logged_in_headers(),
content_type="application/json")
self.assert_success(rv)
json_data = json.loads(rv.get_data(as_text=True))
workflow_api = WorkflowApiSchema().load(json_data)
self.assertEqual(workflow.workflow_spec_id, workflow_api.workflow_spec_id)
return workflow_api
def complete_form(self, workflow_in, task_in, dict_data, error_code = None):
prev_completed_task_count = workflow_in.completed_tasks
if isinstance(task_in, dict):
task_id = task_in["id"]
else:
task_id = task_in.id
rv = self.app.put('/v1.0/workflow/%i/task/%s/data' % (workflow_in.id, task_id),
headers=self.logged_in_headers(),
content_type="application/json",
data=json.dumps(dict_data))
if error_code:
self.assert_failure(rv, error_code=error_code)
return
self.assert_success(rv)
json_data = json.loads(rv.get_data(as_text=True))
# Assure stats are updated on the model
workflow = WorkflowApiSchema().load(json_data)
# The total number of tasks may change over time, as users move through gateways
# branches may be pruned. As we hit parallel Multi-Instance new tasks may be created...
self.assertIsNotNone(workflow.total_tasks)
self.assertEquals(prev_completed_task_count + 1, workflow.completed_tasks)
# Assure a record exists in the Task Events
task_events = session.query(TaskEventModel) \
.filter_by(workflow_id=workflow.id) \
.filter_by(task_id=task_id) \
.order_by(TaskEventModel.date.desc()).all()
self.assertGreater(len(task_events), 0)
event = task_events[0]
self.assertIsNotNone(event.study_id)
self.assertEquals("dhf8r", event.user_uid)
self.assertEquals(workflow.id, event.workflow_id)
self.assertEquals(workflow.workflow_spec_id, event.workflow_spec_id)
self.assertEquals(workflow.spec_version, event.spec_version)
self.assertEquals(WorkflowService.TASK_ACTION_COMPLETE, event.action)
self.assertEquals(task_in.id, task_id)
self.assertEquals(task_in.name, event.task_name)
self.assertEquals(task_in.title, event.task_title)
self.assertEquals(task_in.type, event.task_type)
self.assertEquals("COMPLETED", event.task_state)
# Not sure what vodoo is happening inside of marshmallow to get me in this state.
if isinstance(task_in.multi_instance_type, MultiInstanceType):
self.assertEquals(task_in.multi_instance_type.value, event.mi_type)
else:
self.assertEquals(task_in.multi_instance_type, event.mi_type)
self.assertEquals(task_in.multi_instance_count, event.mi_count)
self.assertEquals(task_in.multi_instance_index, event.mi_index)
self.assertEquals(task_in.process_name, event.process_name)
self.assertIsNotNone(event.date)
# Assure that there is data in the form_data
self.assertIsNotNone(event.form_data)
workflow = WorkflowApiSchema().load(json_data)
return workflow
def test_get_current_user_tasks(self):
self.load_example_data()
workflow = self.create_workflow('random_fact')
@ -299,13 +371,13 @@ class TestTasksApi(BaseTest):
self.assertEqual("UserTask", task.type)
self.assertEqual("Activity_A", task.name)
self.assertEqual("My Sub Process", task.process_name)
workflow_api = self.complete_form(workflow, task, {"name": "Dan"})
workflow_api = self.complete_form(workflow, task, {"FieldA": "Dan"})
task = workflow_api.next_task
self.assertIsNotNone(task)
self.assertEqual("Activity_B", task.name)
self.assertEqual("Sub Workflow Example", task.process_name)
workflow_api = self.complete_form(workflow, task, {"name": "Dan"})
workflow_api = self.complete_form(workflow, task, {"FieldB": "Dan"})
self.assertEqual(WorkflowStatus.complete, workflow_api.status)
def test_update_task_resets_token(self):
@ -373,7 +445,9 @@ class TestTasksApi(BaseTest):
for i in random.sample(range(9), 9):
task = TaskSchema().load(ready_items[i]['task'])
self.complete_form(workflow, task, {"investigator":{"email": "dhf8r@virginia.edu"}})
data = workflow_api.next_task.data
data['investigator']['email'] = "dhf8r@virginia.edu"
self.complete_form(workflow, task, data)
#tasks = self.get_workflow_api(workflow).user_tasks
workflow = self.get_workflow_api(workflow)

View File

@ -270,53 +270,6 @@ class TestWorkflowProcessor(BaseTest):
processor = self.get_processor(study, workflow_spec_model)
self.assertTrue(processor.get_version_string().startswith('v2.1.1'))
def test_restart_workflow(self):
self.load_example_data()
study = session.query(StudyModel).first()
workflow_spec_model = self.load_test_spec("two_forms")
processor = self.get_processor(study, workflow_spec_model)
self.assertEqual(processor.workflow_model.workflow_spec_id, workflow_spec_model.id)
task = processor.next_task()
task.data = {"key": "Value"}
processor.complete_task(task)
task_before_restart = processor.next_task()
processor.hard_reset()
task_after_restart = processor.next_task()
self.assertNotEqual(task.get_name(), task_before_restart.get_name())
self.assertEqual(task.get_name(), task_after_restart.get_name())
self.assertEqual(task.data, task_after_restart.data)
def test_soft_reset(self):
self.load_example_data()
# Start the two_forms workflow, and enter some data in the first form.
study = session.query(StudyModel).first()
workflow_spec_model = self.load_test_spec("two_forms")
processor = self.get_processor(study, workflow_spec_model)
self.assertEqual(processor.workflow_model.workflow_spec_id, workflow_spec_model.id)
task = processor.next_task()
task.data = {"color": "blue"}
processor.complete_task(task)
# Modify the specification, with a minor text change.
file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'two_forms', 'mods', 'two_forms_text_mod.bpmn')
self.replace_file("two_forms.bpmn", file_path)
# Setting up another processor should not error out, but doesn't pick up the update.
processor.workflow_model.bpmn_workflow_json = processor.serialize()
processor2 = WorkflowProcessor(processor.workflow_model)
self.assertEqual("Step 1", processor2.bpmn_workflow.last_task.task_spec.description)
self.assertNotEqual("# This is some documentation I wanted to add.",
processor2.bpmn_workflow.last_task.task_spec.documentation)
# You can do a soft update and get the right response.
processor3 = WorkflowProcessor(processor.workflow_model, soft_reset=True)
self.assertEqual("Step 1", processor3.bpmn_workflow.last_task.task_spec.description)
self.assertEqual("# This is some documentation I wanted to add.",
processor3.bpmn_workflow.last_task.task_spec.documentation)
def test_hard_reset(self):
self.load_example_data()
@ -344,8 +297,10 @@ class TestWorkflowProcessor(BaseTest):
# Do a hard reset, which should bring us back to the beginning, but retain the data.
processor3 = WorkflowProcessor(processor.workflow_model, hard_reset=True)
self.assertEqual("Step 1", processor3.next_task().task_spec.description)
self.assertEqual({"color": "blue"}, processor3.next_task().data)
processor3.complete_task(processor3.next_task())
self.assertTrue(processor3.is_latest_spec) # Now at version 2.
task = processor3.next_task()
task.data = {"color": "blue"}
processor3.complete_task(task)
self.assertEqual("New Step", processor3.next_task().task_spec.description)
self.assertEqual("blue", processor3.next_task().data["color"])

View File

@ -1,7 +1,14 @@
import json
from tests.base_test import BaseTest
from crc.services.workflow_processor import WorkflowProcessor
from crc.services.workflow_service import WorkflowService
from SpiffWorkflow import Task as SpiffTask, WorkflowException
from example_data import ExampleDataLoader
from crc import db
from crc.models.stats import TaskEventModel
from crc.models.api_models import Task
class TestWorkflowService(BaseTest):
@ -78,4 +85,50 @@ class TestWorkflowService(BaseTest):
task = processor.next_task()
task_api = WorkflowService.spiff_task_to_api_task(task, add_docs_and_forms=True)
WorkflowService.populate_form_with_random_data(task, task_api, required_only=False)
self.assertTrue(isinstance(task.data["sponsor"], dict))
self.assertTrue(isinstance(task.data["sponsor"], dict))
def test_fix_legacy_data_model_for_rrt(self):
ExampleDataLoader().load_rrt() # Make sure the research_rampup is loaded, as it's not a test spec.
workflow = self.create_workflow('research_rampup')
processor = WorkflowProcessor(workflow, validate_only=True)
# Use the test spec code to complete the workflow of research rampup.
while not processor.bpmn_workflow.is_completed():
processor.bpmn_workflow.do_engine_steps()
tasks = processor.bpmn_workflow.get_tasks(SpiffTask.READY)
for task in tasks:
task_api = WorkflowService.spiff_task_to_api_task(task, add_docs_and_forms=True)
WorkflowService.populate_form_with_random_data(task, task_api, False)
task.complete()
# create the task events
WorkflowService.log_task_action('dhf8r', workflow, task,
WorkflowService.TASK_ACTION_COMPLETE,
version=processor.get_version_string())
processor.save()
db.session.commit()
WorkflowService.fix_legacy_data_model_for_rrt()
# All tasks should now have data associated with them.
task_logs = db.session.query(TaskEventModel) \
.filter(TaskEventModel.workflow_id == workflow.id) \
.filter(TaskEventModel.action == WorkflowService.TASK_ACTION_COMPLETE) \
.order_by(TaskEventModel.date).all() # Get them back in order.
self.assertEqual(17, len(task_logs))
for log in task_logs:
task = processor.bpmn_workflow.get_tasks_from_spec_name(log.task_name)[0]
self.assertIsNotNone(log.form_data)
# Each task should have the data in the form for that task in the task event.
if hasattr(task.task_spec, 'form'):
for field in task.task_spec.form.fields:
if field.has_property(Task.PROP_OPTIONS_REPEAT):
self.assertIn(field.get_property(Task.PROP_OPTIONS_REPEAT), log.form_data)
else:
self.assertIn(field.id, log.form_data)
# Some spot checks:
# The first task should be empty, with all the data removed.
self.assertEqual({}, task_logs[0].form_data)