Improve version handling of files. Consolidate more of this logic in FileService. Place the version on the actual data model, not the file model, so the file model remains the same, and we just version the data associated with it.

This commit is contained in:
Dan Funk 2020-03-04 13:40:25 -05:00
parent 94f828dfd6
commit c5cee4761e
9 changed files with 140 additions and 103 deletions

View File

@ -8,6 +8,7 @@ from flask import send_file
from crc import session from crc import session
from crc.api.common import ApiErrorSchema, ApiError from crc.api.common import ApiErrorSchema, ApiError
from crc.models.file import FileModelSchema, FileModel, FileDataModel, FileType from crc.models.file import FileModelSchema, FileModel, FileDataModel, FileType
from crc.models.workflow import WorkflowSpecModel
from crc.services.file_service import FileService from crc.services.file_service import FileService
@ -34,7 +35,8 @@ def add_file(workflow_spec_id=None, study_id=None, workflow_id=None, task_id=Non
file = connexion.request.files['file'] file = connexion.request.files['file']
if workflow_spec_id: if workflow_spec_id:
file_model = FileService.add_workflow_spec_file(workflow_spec_id, file.filename, file.content_type, file.stream.read()) workflow_spec = session.query(WorkflowSpecModel).filter_by(id=workflow_spec_id).first()
file_model = FileService.add_workflow_spec_file(workflow_spec, file.filename, file.content_type, file.stream.read())
else: else:
file_model = FileService.add_form_field_file(study_id, workflow_id, task_id, form_field_key, file.filename, file.content_type, file.stream.read()) file_model = FileService.add_form_field_file(study_id, workflow_id, task_id, form_field_key, file.filename, file.content_type, file.stream.read())

View File

@ -76,7 +76,7 @@ def __get_workflow_api_model(processor: WorkflowProcessor):
user_tasks=user_tasks, user_tasks=user_tasks,
workflow_spec_id=processor.workflow_spec_id workflow_spec_id=processor.workflow_spec_id
) )
if(processor.next_task()): if processor.next_task():
workflow_api.next_task = Task.from_spiff(processor.next_task()) workflow_api.next_task = Task.from_spiff(processor.next_task())
return workflow_api return workflow_api

View File

@ -3,6 +3,7 @@ import enum
from marshmallow_enum import EnumField from marshmallow_enum import EnumField
from marshmallow_sqlalchemy import ModelSchema from marshmallow_sqlalchemy import ModelSchema
from sqlalchemy import func from sqlalchemy import func
from sqlalchemy.dialects.postgresql import UUID
from crc import db from crc import db
@ -30,10 +31,36 @@ class FileType(enum.Enum):
zip = 'zip' zip = 'zip'
CONTENT_TYPES = {
"bpmn": "text/xml",
"csv": "text/csv",
"dmn": "text/xml",
"doc": "application/msword",
"docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"gif": "image/gif",
"jpg": "image/jpeg",
"md" : "text/plain",
"pdf": "application/pdf",
"png": "image/png",
"ppt": "application/vnd.ms-powerpoint",
"pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
"rtf": "application/rtf",
"svg": "image/svg+xml",
"svg_xml": "image/svg+xml",
"txt": "text/plain",
"xls": "application/vnd.ms-excel",
"xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"xml": "application/xml",
"zip": "application/zip"
}
class FileDataModel(db.Model): class FileDataModel(db.Model):
__tablename__ = 'file_data' __tablename__ = 'file_data'
id = db.Column(db.Integer, primary_key=True) id = db.Column(db.Integer, primary_key=True)
md5_hash = db.Column(UUID(as_uuid=True), unique=False, nullable=False)
data = db.Column(db.LargeBinary) data = db.Column(db.LargeBinary)
version = db.Column(db.Integer, default=0)
last_updated = db.Column(db.DateTime(timezone=True), default=func.now())
file_model_id = db.Column(db.Integer, db.ForeignKey('file.id')) file_model_id = db.Column(db.Integer, db.ForeignKey('file.id'))
file_model = db.relationship("FileModel") file_model = db.relationship("FileModel")
@ -42,8 +69,6 @@ class FileModel(db.Model):
__tablename__ = 'file' __tablename__ = 'file'
id = db.Column(db.Integer, primary_key=True) id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String) name = db.Column(db.String)
version = db.Column(db.Integer, default=0)
last_updated = db.Column(db.DateTime(timezone=True), default=func.now())
type = db.Column(db.Enum(FileType)) type = db.Column(db.Enum(FileType))
primary = db.Column(db.Boolean) primary = db.Column(db.Boolean)
content_type = db.Column(db.String) content_type = db.Column(db.String)
@ -52,6 +77,7 @@ class FileModel(db.Model):
study_id = db.Column(db.Integer, db.ForeignKey('study.id'), nullable=True) study_id = db.Column(db.Integer, db.ForeignKey('study.id'), nullable=True)
task_id = db.Column(db.String, nullable=True) task_id = db.Column(db.String, nullable=True)
form_field_key = db.Column(db.String, nullable=True) form_field_key = db.Column(db.String, nullable=True)
latest_version = db.Column(db.Integer, default=0)
class FileModelSchema(ModelSchema): class FileModelSchema(ModelSchema):

View File

@ -1,12 +1,14 @@
import enum import enum
import jinja2
import marshmallow import marshmallow
from jinja2 import Environment, BaseLoader, Undefined, Template from jinja2 import Template
from marshmallow import INCLUDE from marshmallow import INCLUDE
from marshmallow_enum import EnumField from marshmallow_enum import EnumField
from marshmallow_sqlalchemy import ModelSchema from marshmallow_sqlalchemy import ModelSchema
from crc import db, ma from crc import db, ma
from crc.api.common import ApiError
class WorkflowSpecModel(db.Model): class WorkflowSpecModel(db.Model):
@ -38,6 +40,7 @@ class WorkflowModel(db.Model):
study_id = db.Column(db.Integer, db.ForeignKey('study.id')) study_id = db.Column(db.Integer, db.ForeignKey('study.id'))
workflow_spec_id = db.Column(db.String, db.ForeignKey('workflow_spec.id')) workflow_spec_id = db.Column(db.String, db.ForeignKey('workflow_spec.id'))
class Task(object): class Task(object):
def __init__(self, id, name, title, type, state, form, documentation, data): def __init__(self, id, name, title, type, state, form, documentation, data):
self.id = id self.id = id
@ -72,8 +75,11 @@ class Task(object):
create loops, etc...''' create loops, etc...'''
template = Template(documentation) template = Template(documentation)
self.documentation = template.render(**self.data) try:
self.documentation = template.render(**self.data)
except jinja2.exceptions.UndefinedError as ue:
raise ApiError(code="template_error", message="Error processing template for task %s: %s" %
(self.name, str(ue)), status_code=500)
class OptionSchema(ma.Schema): class OptionSchema(ma.Schema):
class Meta: class Meta:

View File

@ -4,7 +4,7 @@ from jinja2 import UndefinedError
from crc import session from crc import session
from crc.api.common import ApiError from crc.api.common import ApiError
from crc.models.file import FileModel, FileDataModel from crc.models.file import FileModel, FileDataModel, CONTENT_TYPES
from crc.models.workflow import WorkflowSpecModel from crc.models.workflow import WorkflowSpecModel
from docxtpl import DocxTemplate from docxtpl import DocxTemplate
import jinja2 import jinja2
@ -48,7 +48,7 @@ class CompleteTemplate(Script):
workflow_id = task.workflow.data[WorkflowProcessor.WORKFLOW_ID_KEY] workflow_id = task.workflow.data[WorkflowProcessor.WORKFLOW_ID_KEY]
FileService.add_task_file(study_id=study_id, workflow_id=workflow_id, task_id=task.id, FileService.add_task_file(study_id=study_id, workflow_id=workflow_id, task_id=task.id,
name=file_name, name=file_name,
content_type=FileService.DOCX_MIME, content_type=CONTENT_TYPES['docx'],
binary_data=final_document_stream.read()) binary_data=final_document_stream.read())
print("Complete Task was called with %s" % str(args)) print("Complete Task was called with %s" % str(args))

View File

@ -1,24 +1,33 @@
import os import os
from datetime import datetime from datetime import datetime
from uuid import UUID
from xml.etree import ElementTree
from crc import session from crc import session
from crc.api.common import ApiErrorSchema, ApiError from crc.api.common import ApiErrorSchema, ApiError
from crc.models.file import FileType, FileDataModel, FileModelSchema, FileModel from crc.models.file import FileType, FileDataModel, FileModelSchema, FileModel, CONTENT_TYPES
from crc.models.workflow import WorkflowSpecModel
from crc.services.workflow_processor import WorkflowProcessor
import hashlib
class FileService(object): class FileService(object):
"""Provides consistent management and rules for storing, retrieving and processing files.""" """Provides consistent management and rules for storing, retrieving and processing files."""
DOCX_MIME = "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
@staticmethod @staticmethod
def add_workflow_spec_file(workflow_spec_id, name, content_type, binary_data): def add_workflow_spec_file(workflow_spec: WorkflowSpecModel,
name, content_type, binary_data, primary=False):
"""Create a new file and associate it with a workflow spec.""" """Create a new file and associate it with a workflow spec."""
file_model = FileModel( file_model = FileModel(
version=0, workflow_spec_id=workflow_spec.id,
workflow_spec_id=workflow_spec_id,
name=name, name=name,
primary=primary
) )
if primary:
bpmn: ElementTree.Element = ElementTree.fromstring(binary_data)
workflow_spec.primary_process_id = WorkflowProcessor.get_process_id(bpmn)
print("Locating Process Id for " + name + " " + workflow_spec.primary_process_id)
return FileService.update_file(file_model, binary_data, content_type) return FileService.update_file(file_model, binary_data, content_type)
@staticmethod @staticmethod
@ -38,7 +47,6 @@ class FileService(object):
def add_task_file(study_id, workflow_id, task_id, name, content_type, binary_data): def add_task_file(study_id, workflow_id, task_id, name, content_type, binary_data):
"""Create a new file and associate it with an executing task within a workflow.""" """Create a new file and associate it with an executing task within a workflow."""
file_model = FileModel( file_model = FileModel(
version=0,
study_id=study_id, study_id=study_id,
workflow_id=workflow_id, workflow_id=workflow_id,
task_id=task_id, task_id=task_id,
@ -49,9 +57,14 @@ class FileService(object):
@staticmethod @staticmethod
def update_file(file_model, binary_data, content_type): def update_file(file_model, binary_data, content_type):
file_model.version = file_model.version + 1 file_data_model = session.query(FileDataModel).\
file_model.last_updated = datetime.now() filter_by(file_model_id=file_model.id,
file_model.content_type = content_type version=file_model.latest_version
).with_for_update().first()
md5_checksum = UUID(hashlib.md5(binary_data).hexdigest())
if(file_data_model is not None and md5_checksum == file_data_model.md5_hash):
# This file does not need to be updated, it's the same file.
return file_model
# Verify the extension # Verify the extension
basename, file_extension = os.path.splitext(file_model.name) basename, file_extension = os.path.splitext(file_model.name)
@ -62,12 +75,16 @@ class FileService(object):
file_extension)), 404 file_extension)), 404
else: else:
file_model.type = FileType[file_extension] file_model.type = FileType[file_extension]
file_model.content_type = content_type
file_data_model = session.query(FileDataModel).filter_by(file_model_id=file_model.id).with_for_update().first()
if file_data_model is None: if file_data_model is None:
file_data_model = FileDataModel(data=binary_data, file_model=file_model) version = 1
else: else:
file_data_model.data = binary_data version = file_data_model.version + 1
file_model.latest_version = version
file_data_model = FileDataModel(data=binary_data, file_model=file_model, version=version,
md5_hash=md5_checksum)
session.add_all([file_model, file_data_model]) session.add_all([file_model, file_data_model])
session.commit() session.commit()
@ -94,4 +111,8 @@ class FileService(object):
@staticmethod @staticmethod
def get_file_data(file_id): def get_file_data(file_id):
"""Returns the file_data that is associated with the file model id""" """Returns the file_data that is associated with the file model id"""
return session.query(FileDataModel).filter(FileDataModel.file_model_id == file_id).first() file_model = session.query(FileModel).filter(FileModel.id == file_id).first()
return session.query(FileDataModel)\
.filter(FileDataModel.file_model_id == file_id)\
.filter(FileDataModel.version == file_model.latest_version)\
.first()

View File

@ -4,15 +4,23 @@ import os
import xml.etree.ElementTree as ElementTree import xml.etree.ElementTree as ElementTree
from crc import app, db, session from crc import app, db, session
from crc.models.file import FileType, FileModel, FileDataModel from crc.models.file import FileType, FileModel, FileDataModel, CONTENT_TYPES
from crc.models.study import StudyModel from crc.models.study import StudyModel
from crc.models.user import UserModel from crc.models.user import UserModel
from crc.models.workflow import WorkflowSpecModel from crc.models.workflow import WorkflowSpecModel
from crc.services.file_service import FileService
from crc.services.workflow_processor import WorkflowProcessor from crc.services.workflow_processor import WorkflowProcessor
class ExampleDataLoader: class ExampleDataLoader:
def make_data(self): @staticmethod
def clean_db():
session.flush() # Clear out any transactions before deleting it all to avoid spurious errors.
for table in reversed(db.metadata.sorted_tables):
session.execute(table.delete())
session.flush()
def load_all(self):
users = [ users = [
UserModel( UserModel(
uid='dhf8r', uid='dhf8r',
@ -25,6 +33,8 @@ class ExampleDataLoader:
title='SOFTWARE ENGINEER V' title='SOFTWARE ENGINEER V'
) )
] ]
db.session.add_all(users)
db.session.commit()
studies = [ studies = [
StudyModel( StudyModel(
@ -48,89 +58,47 @@ class ExampleDataLoader:
user_uid='dhf8r' user_uid='dhf8r'
), ),
] ]
db.session.add_all(studies)
db.session.commit()
workflow_specifications = \ self.create_spec(id="crc2_training_session_enter_core_info",
self.create_spec(id="crc2_training_session_enter_core_info", name="crc2_training_session_enter_core_info",
name="crc2_training_session_enter_core_info", display_name="CR Connect2 - Training Session - Core Info",
display_name="CR Connect2 - Training Session - Core Info", description='Part of Milestone 3 Deliverable')
description='Part of Milestone 3 Deliverable') self.create_spec(id="crc2_training_session_data_security_plan",
workflow_specifications += \ name="crc2_training_session_data_security_plan",
self.create_spec(id="crc2_training_session_data_security_plan", display_name="CR Connect2 - Training Session - Data Security Plan",
name="crc2_training_session_data_security_plan", description='Part of Milestone 3 Deliverable')
display_name="CR Connect2 - Training Session - Data Security Plan", self.create_spec(id="sponsor_funding_source",
description='Part of Milestone 3 Deliverable') name="sponsor_funding_source",
workflow_specifications += \ display_name="Sponsor and/or Funding Source ",
self.create_spec(id="sponsor_funding_source", description='TBD')
name="sponsor_funding_source",
display_name="Sponsor and/or Funding Source ",
description='TBD')
# workflow_specifications += \
# self.create_spec(id="m2_demo",
# name="m2_demo",
# display_name="Milestone 2 Demo",
# description='A simplified CR Connect workflow for demonstration purposes.')
# workflow_specifications += \
# self.create_spec(id="crc_study_workflow",
# name="crc_study_workflow",
# display_name="CR Connect Study Workflow",
# description='Draft workflow for CR Connect studies.')
all_data = users + studies + workflow_specifications
return all_data
def create_spec(self, id, name, display_name="", description="", filepath=None): def create_spec(self, id, name, display_name="", description="", filepath=None):
"""Assumes that a directory exists in static/bpmn with the same name as the given id. """Assumes that a directory exists in static/bpmn with the same name as the given id.
further assumes that the [id].bpmn is the primary file for the workflow. further assumes that the [id].bpmn is the primary file for the workflow.
returns an array of data models to be added to the database.""" returns an array of data models to be added to the database."""
models = [] file_service = FileService()
spec = WorkflowSpecModel(id=id, spec = WorkflowSpecModel(id=id,
name=name, name=name,
display_name=display_name, display_name=display_name,
description=description) description=description)
models.append(spec) db.session.add(spec)
db.session.commit()
if not filepath: if not filepath:
filepath = os.path.join(app.root_path, 'static', 'bpmn', id, "*") filepath = os.path.join(app.root_path, 'static', 'bpmn', id, "*")
files = glob.glob(filepath) files = glob.glob(filepath)
for file_path in files: for file_path in files:
noise, file_extension = os.path.splitext(file_path) noise, file_extension = os.path.splitext(file_path)
filename = os.path.basename(file_path) filename = os.path.basename(file_path)
if file_extension.lower() == '.bpmn':
type = FileType.bpmn
elif file_extension.lower() == '.dmn':
type = FileType.dmn
elif file_extension.lower() == '.svg':
type = FileType.svg
elif file_extension.lower() == '.docx':
type = FileType.docx
else:
raise Exception("Unsupported file type:" + file_path)
continue
is_primary = filename.lower() == id + ".bpmn" is_primary = filename.lower() == id + ".bpmn"
file_model = FileModel(name=filename, type=type, content_type='text/xml', version="1",
last_updated=datetime.datetime.now(), primary=is_primary,
workflow_spec_id=id)
models.append(file_model)
try: try:
file = open(file_path, "rb") file = open(file_path, "rb")
data = file.read() data = file.read()
if (is_primary): content_type = CONTENT_TYPES[file_extension[1:]]
bpmn: ElementTree.Element = ElementTree.fromstring(data) file_service.add_workflow_spec_file(workflow_spec=spec, name=filename, content_type=content_type,
spec.primary_process_id = WorkflowProcessor.get_process_id(bpmn) binary_data=data, primary=is_primary)
print("Locating Process Id for " + filename + " " + spec.primary_process_id)
models.append(FileDataModel(data=data, file_model=file_model))
finally: finally:
file.close() file.close()
return models return spec
@staticmethod
def clean_db():
session.flush() # Clear out any transactions before deleting it all to avoid spurious errors.
for table in reversed(db.metadata.sorted_tables):
session.execute(table.delete())
session.flush()
def load_all(self):
for data in self.make_data():
session.add(data)
session.commit()
session.flush()

View File

@ -92,15 +92,7 @@ class BaseTest(unittest.TestCase):
if session.query(WorkflowSpecModel).filter_by(id=dir_name).count() > 0: if session.query(WorkflowSpecModel).filter_by(id=dir_name).count() > 0:
return return
filepath = os.path.join(app.root_path, '..', 'tests', 'data', dir_name, "*") filepath = os.path.join(app.root_path, '..', 'tests', 'data', dir_name, "*")
models = ExampleDataLoader().create_spec(id=dir_name, name=dir_name, filepath=filepath) return ExampleDataLoader().create_spec(id=dir_name, name=dir_name, filepath=filepath)
spec = None
for model in models:
if isinstance(model, WorkflowSpecModel):
spec = model
session.add(model)
session.commit()
session.flush()
return spec
@staticmethod @staticmethod
def protocol_builder_response(file_name): def protocol_builder_response(file_name):

View File

@ -25,7 +25,7 @@ class TestFilesApi(BaseTest):
def test_list_multiple_files_for_workflow_spec(self): def test_list_multiple_files_for_workflow_spec(self):
self.load_example_data() self.load_example_data()
spec = session.query(WorkflowSpecModel).first() spec = session.query(WorkflowSpecModel).first()
svgFile = FileModel(name="test.svg", type=FileType.svg, version=1, last_updated=datetime.now(), svgFile = FileModel(name="test.svg", type=FileType.svg,
primary=False, workflow_spec_id=spec.id) primary=False, workflow_spec_id=spec.id)
session.add(svgFile) session.add(svgFile)
session.flush() session.flush()
@ -47,7 +47,6 @@ class TestFilesApi(BaseTest):
self.assertIsNotNone(rv.get_data()) self.assertIsNotNone(rv.get_data())
json_data = json.loads(rv.get_data(as_text=True)) json_data = json.loads(rv.get_data(as_text=True))
file = FileModelSchema().load(json_data, session=session) file = FileModelSchema().load(json_data, session=session)
self.assertEqual(1, file.version)
self.assertEqual(FileType.svg, file.type) self.assertEqual(FileType.svg, file.type)
self.assertFalse(file.primary) self.assertFalse(file.primary)
self.assertEqual("image/svg+xml", file.content_type) self.assertEqual("image/svg+xml", file.content_type)
@ -89,13 +88,36 @@ class TestFilesApi(BaseTest):
self.assertIsNotNone(rv.get_data()) self.assertIsNotNone(rv.get_data())
json_data = json.loads(rv.get_data(as_text=True)) json_data = json.loads(rv.get_data(as_text=True))
file = FileModelSchema().load(json_data, session=session) file = FileModelSchema().load(json_data, session=session)
self.assertEqual(2, file.version) self.assertEqual(2, file.latest_version)
self.assertEqual(FileType.bpmn, file.type) self.assertEqual(FileType.bpmn, file.type)
self.assertEqual("application/octet-stream", file.content_type) self.assertEqual("application/octet-stream", file.content_type)
self.assertEqual(spec.id, file.workflow_spec_id) self.assertEqual(spec.id, file.workflow_spec_id)
data_model = session.query(FileDataModel).filter_by(file_model_id=file.id).first() rv = self.app.get('/v1.0/file/%i/data' % file.id)
self.assertEqual(b"hijklim", data_model.data) self.assert_success(rv)
data = rv.get_data()
self.assertIsNotNone(data)
self.assertEqual(b"hijklim", data)
def test_update_with_same_exact_data_does_not_increment_version(self):
self.load_example_data()
spec = session.query(WorkflowSpecModel).first()
data = {}
data['file'] = io.BytesIO(b"abcdef"), 'my_new_file.bpmn'
rv = self.app.post('/v1.0/file?workflow_spec_id=%s' % spec.id, data=data, follow_redirects=True,
content_type='multipart/form-data')
self.assertIsNotNone(rv.get_data())
json_data = json.loads(rv.get_data(as_text=True))
file = FileModelSchema().load(json_data, session=session)
self.assertEqual(1, file.latest_version)
data['file'] = io.BytesIO(b"abcdef"), 'my_new_file.bpmn'
rv = self.app.put('/v1.0/file/%i/data' % file.id, data=data, follow_redirects=True,
content_type='multipart/form-data')
self.assertIsNotNone(rv.get_data())
json_data = json.loads(rv.get_data(as_text=True))
file = FileModelSchema().load(json_data, session=session)
self.assertEqual(1, file.latest_version)
def test_get_file(self): def test_get_file(self):
self.load_example_data() self.load_example_data()