cr-connect-workflow/migrations/versions/7225d990740e_move_files_to_...

324 lines
14 KiB
Python

"""Move files to filesystem
Revision ID: 7225d990740e
Revises: 44dd9397c555
Create Date: 2021-12-14 10:52:50.785342
"""
from alembic import op
import sqlalchemy as sa
# import crc
from crc import app, session
from crc.models.file import FileModel, FileModelSchema, FileDataModel, LookupFileModel, CONTENT_TYPES
from crc.models.workflow import WorkflowSpecModel, WorkflowSpecModelSchema, WorkflowSpecCategoryModel, WorkflowSpecCategoryModelSchema
from crc.services.spec_file_service import SpecFileService
from crc.services.reference_file_service import ReferenceFileService
from crc.services.workflow_service import WorkflowService
# from crc.services.temp_migration_service import FromFilesystemService, ToFilesystemService
from shutil import rmtree
import json
import os
# revision identifiers, used by Alembic.
revision = '7225d990740e'
down_revision = '65b5ed6ae05b'
branch_labels = None
depends_on = None
class FromFilesystemService(object):
@staticmethod
def process_directory(directory):
files = []
directories = []
directory_items = os.scandir(directory)
for item in directory_items:
if item.is_dir():
directories.append(item)
elif item.is_file():
files.append(item)
return files, directories
@staticmethod
def process_workflow_spec(json_file, directory):
file_path = os.path.join(directory, json_file)
with open(file_path, 'r') as f_open:
data = f_open.read()
data_obj = json.loads(data)
workflow_spec_model = session.query(WorkflowSpecModel).\
filter(WorkflowSpecModel.id == data_obj['id']).\
first()
if not workflow_spec_model:
category_id = None
if data_obj['category'] is not None:
category_id = session.query(WorkflowSpecCategoryModel.id).filter(
WorkflowSpecCategoryModel.display_name == data_obj['category']['display_name']).scalar()
workflow_spec_model = WorkflowSpecModel(id=data_obj['id'],
display_name=data_obj['display_name'],
description=data_obj['description'],
is_master_spec=data_obj['is_master_spec'],
category_id=category_id,
display_order=data_obj['display_order'],
standalone=data_obj['standalone'],
library=data_obj['library'])
session.add(workflow_spec_model)
session.commit()
return workflow_spec_model
@staticmethod
def process_workflow_spec_file(json_file, spec_directory):
file_path = os.path.join(spec_directory, json_file)
from crc.services.file_service import FileService
with open(file_path, 'r') as json_handle:
data = json_handle.read()
data_obj = json.loads(data)
spec_file_name = '.'.join(json_file.name.split('.')[:-1])
spec_file_path = os.path.join(spec_directory, spec_file_name)
with open(spec_file_path, 'rb') as spec_handle:
# workflow_spec_name = spec_directory.split('/')[-1]
# workflow_spec = session.query(WorkflowSpecModel).filter(
# WorkflowSpecModel.display_name == workflow_spec_name).first()
workflow_spec_file_model = session.query(FileModel).\
filter(FileModel.workflow_spec_id == data_obj['workflow_spec_id']).\
filter(FileModel.name == spec_file_name).\
first()
if workflow_spec_file_model:
# update workflow_spec_file_model
FileService.update_file(workflow_spec_file_model, spec_handle.read(), CONTENT_TYPES[spec_file_name.split('.')[-1]])
else:
# create new model
workflow_spec = session.query(WorkflowSpecModel).filter(WorkflowSpecModel.id==data_obj['workflow_spec_id']).first()
workflow_spec_file_model = FileService.add_workflow_spec_file(workflow_spec,
name=spec_file_name,
content_type=CONTENT_TYPES[spec_file_name.split('.')[-1]],
binary_data=spec_handle.read())
print(f'process_workflow_spec_file: data_obj: {data_obj}')
return workflow_spec_file_model
@staticmethod
def process_category(json_file, root):
print(f'process_category: json_file: {json_file}')
file_path = os.path.join(root, json_file)
with open(file_path, 'r') as f_open:
data = f_open.read()
data_obj = json.loads(data)
category = session.query(WorkflowSpecCategoryModel).filter(
WorkflowSpecCategoryModel.display_name == data_obj['display_name']).first()
if not category:
category = WorkflowSpecCategoryModel(display_name=data_obj['display_name'],
display_order=data_obj['display_order'],
admin=data_obj['admin'])
session.add(category)
else:
category.display_order = data_obj['display_order']
category.admin = data_obj['admin']
# print(data)
print(f'process_category: category: {category}')
session.commit()
return category
def process_workflow_spec_directory(self, spec_directory):
print(f'process_workflow_spec_directory: {spec_directory}')
files, directories = self.process_directory(spec_directory)
for file in files:
if file.name.endswith('.json'):
file_model = self.process_workflow_spec_file(file, spec_directory)
def process_category_directory(self, category_directory):
print(f'process_category_directory: {category_directory}')
files, directories = self.process_directory(category_directory)
for file in files:
if file.name.endswith('.json'):
workflow_spec = self.process_workflow_spec(file, category_directory)
for workflow_spec_directory in directories:
directory_path = os.path.join(category_directory, workflow_spec_directory)
self.process_workflow_spec_directory(directory_path)
def process_root_directory(self, root_directory):
files, directories = self.process_directory(root_directory)
for file in files:
if file.name.endswith('.json'):
category_model = self.process_category(file, root_directory)
WorkflowService.cleanup_workflow_spec_category_display_order()
for directory in directories:
directory_path = os.path.join(root_directory, directory)
self.process_category_directory(directory_path)
def update_file_metadata_from_filesystem(self, root_directory):
self.process_root_directory(root_directory)
class ToFilesystemService(object):
@staticmethod
def process_category(location, category):
# Make sure a directory exists for the category
# Add a json file dumped from the category model
category_path = os.path.join(location, category.display_name)
os.makedirs(os.path.dirname(category_path), exist_ok=True)
json_file_name = f'{category.display_name}.json'
json_file_path = os.path.join(location, json_file_name)
category_model_schema = WorkflowSpecCategoryModelSchema().dumps(category)
with open(json_file_path, 'w') as j_handle:
j_handle.write(category_model_schema)
@staticmethod
def process_workflow_spec(location, workflow_spec, category_name_string):
# Make sure a directory exists for the workflow spec
# Add a json file dumped from the workflow spec model
workflow_spec_path = os.path.join(location, category_name_string, workflow_spec.display_name)
os.makedirs(os.path.dirname(workflow_spec_path), exist_ok=True)
json_file_name = f'{workflow_spec.display_name}.json'
json_file_path = os.path.join(location, category_name_string, json_file_name)
workflow_spec_schema = WorkflowSpecModelSchema().dumps(workflow_spec)
with open(json_file_path, 'w') as j_handle:
j_handle.write(workflow_spec_schema)
@staticmethod
def process_workflow_spec_file(session, workflow_spec_file, workflow_spec_file_path):
# workflow_spec_file_path = os.path.join
os.makedirs(os.path.dirname(workflow_spec_file_path), exist_ok=True)
file_data_model = session.query(FileDataModel). \
filter(FileDataModel.file_model_id == workflow_spec_file.id). \
order_by(sa.desc(FileDataModel.version)). \
first()
with open(workflow_spec_file_path, 'wb') as f_handle:
f_handle.write(file_data_model.data)
json_file_path = f'{workflow_spec_file_path}.json'
workflow_spec_file_model = session.query(FileModel).filter(FileModel.id==file_data_model.file_model_id).first()
workflow_spec_file_schema = FileModelSchema().dumps(workflow_spec_file_model)
with open(json_file_path, 'w') as j_handle:
j_handle.write(workflow_spec_file_schema)
def write_file_to_system(self, session, file_model, location):
category_name = None
# location = SpecFileService.get_sync_file_root()
if file_model.workflow_spec_id is not None:
# we have a workflow spec file
workflow_spec_model = session.query(WorkflowSpecModel).filter(WorkflowSpecModel.id == file_model.workflow_spec_id).first()
if workflow_spec_model:
if workflow_spec_model.category_id is not None:
category_model = session.query(WorkflowSpecCategoryModel).filter(WorkflowSpecCategoryModel.id == workflow_spec_model.category_id).first()
self.process_category(location, category_model)
category_name = category_model.display_name
elif workflow_spec_model.is_master_spec:
category_name = 'Master Specification'
elif workflow_spec_model.library:
category_name = 'Library Specs'
elif workflow_spec_model.standalone:
category_name = 'Standalone'
if category_name is not None:
# Only process if we have a workflow_spec_model and category_name
self.process_workflow_spec(location, workflow_spec_model, category_name)
file_path = os.path.join(location,
category_name,
workflow_spec_model.display_name,
file_model.name)
self.process_workflow_spec_file(session, file_model, file_path)
elif file_model.is_reference:
# we have a reference file
category_name = 'Reference'
# self.process_workflow_spec(location, workflow_spec_model, category_name)
file_path = os.path.join(location,
category_name,
file_model.name)
self.process_workflow_spec_file(session, file_model, file_path)
def upgrade():
""""""
bind = op.get_bind()
session = sa.orm.Session(bind=bind)
op.drop_table('workflow_spec_dependency_file')
op.add_column('lookup_file', sa.Column('file_model_id', sa.Integer(), nullable=True))
op.add_column('lookup_file', sa.Column('last_updated', sa.DateTime(), nullable=True))
op.create_foreign_key(None, 'lookup_file', 'file', ['file_model_id'], ['id'])
processed_files = []
location = SpecFileService.get_sync_file_root()
if os.path.exists(location):
rmtree(location)
# Process workflow spec files
files = session.query(FileModel).filter(FileModel.workflow_spec_id is not None).all()
for file in files:
if file.archived is not True:
ToFilesystemService().write_file_to_system(session, file, location)
processed_files.append(file.id)
# Process reference files
# get_reference_files only returns files where archived is False
reference_files = ReferenceFileService.get_reference_files()
for reference_file in reference_files:
ToFilesystemService().write_file_to_system(session, reference_file, location)
processed_files.append(reference_file.id)
session.flush()
lookups = session.query(LookupFileModel).all()
for lookup in lookups:
session.delete(lookup)
session.commit()
for file_id in processed_files:
processed_data_models = session.query(FileDataModel).filter(FileDataModel.file_model_id==file_id).all()
for processed_data_model in processed_data_models:
session.delete(processed_data_model)
session.commit()
print(f'upgrade: in processed files: file_id: {file_id}')
print('upgrade: done: ')
def downgrade():
# TODO: This is a work in progress, and depends on what we do in upgrade()
op.add_column('lookup_file', sa.Column('file_data_model_id', sa.Integer(), nullable=True))
op.create_foreign_key(None, 'lookup_file', 'file', ['file_data_model_id'], ['id'])
op.drop_constraint('lookup_file_file_model_id_key', 'lookup_file', type_='foreignkey')
op.drop_column('lookup_file', 'file_model_id')
op.create_table('workflow_spec_dependency_file',
sa.Column('file_data_id', sa.Integer(), nullable=False),
sa.Column('workflow_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['file_data_id'], ['file_data.id'], ),
sa.ForeignKeyConstraint(['workflow_id'], ['workflow.id'], ),
sa.PrimaryKeyConstraint('file_data_id', 'workflow_id')
)
location = SpecFileService.get_sync_file_root()
FromFilesystemService().update_file_metadata_from_filesystem(location)
print('downgrade: ')