*** WIP ***

Temp services `ToFilesystemService` and `FromFilesystemService` for migrating files To and From the filesystem - Not sure where these classes will end up
Tests to call the two services.
upgrade method in migration that writes workflow spec files and metadata to filesystem
This commit is contained in:
mike cullerton 2021-12-16 14:22:37 -05:00
parent 338352017b
commit 5c90ce01a6
5 changed files with 235 additions and 376 deletions

View File

@ -19,7 +19,7 @@ from sqlalchemy.exc import IntegrityError
from crc import session, app
from crc.api.common import ApiError
from crc.models.data_store import DataStoreModel
from crc.models.file import FileType, FileDataModel, FileModel, LookupFileModel, LookupDataModel
from crc.models.file import FileType, FileDataModel, FileModel, FileModelSchema, LookupFileModel, LookupDataModel
from crc.models.workflow import WorkflowSpecModel, WorkflowModel, WorkflowSpecDependencyFile, WorkflowLibraryModel, WorkflowSpecModelSchema, WorkflowSpecCategoryModel, WorkflowSpecCategoryModelSchema
from crc.services.cache_service import cache
from crc.services.user_service import UserService
@ -594,80 +594,4 @@ class FileService(object):
raise ApiError(code='bad_keep',
message='You must keep at least 1 version')
# @staticmethod
# def write_file_to_system(file, category):
# file_path = os.path.join(app.root_path, '..', 'files')
#
# print(f'write_file_to_system: file_path: {file_path}')
@staticmethod
def write_file_to_system(file_model):
SYNC_FILE_ROOT = os.path.join(app.root_path, '..', 'files')
def process_category(category):
# Make sure a directory exists for the category
# Add a json file dumped from the category model
category_path = os.path.join(SYNC_FILE_ROOT, category.display_name)
os.makedirs(os.path.dirname(category_path), exist_ok=True)
json_file_name = f'{category.display_name}.json'
json_file_path = os.path.join(SYNC_FILE_ROOT, json_file_name)
category_model_schema = WorkflowSpecCategoryModelSchema().dumps(category)
with open(json_file_path, 'w') as j_handle:
j_handle.write(category_model_schema)
def process_workflow_spec(workflow_spec, category_name_string):
# Make sure a directory exists for the workflow spec
# Add a json file dumped from the workflow spec model
workflow_spec_path = os.path.join(SYNC_FILE_ROOT, category_name_string, workflow_spec.display_name)
os.makedirs(os.path.dirname(workflow_spec_path), exist_ok=True)
json_file_name = f'{workflow_spec.display_name}.json'
json_file_path = os.path.join(SYNC_FILE_ROOT, category_name_string, json_file_name)
workflow_spec_schema = WorkflowSpecModelSchema().dumps(workflow_spec)
with open(json_file_path, 'w') as j_handle:
j_handle.write(workflow_spec_schema)
file_path = category_name = None
if file_model.workflow_spec_id is not None:
# we have a workflow spec file
workflow_spec_model = session.query(WorkflowSpecModel).filter(WorkflowSpecModel.id == file_model.workflow_spec_id).first()
if workflow_spec_model:
if workflow_spec_model.category_id is not None:
category_model = session.query(WorkflowSpecCategoryModel).filter(WorkflowSpecCategoryModel.id == workflow_spec_model.category_id).first()
process_category(category_model)
process_workflow_spec(workflow_spec_model, category_model.display_name)
category_name = category_model.display_name
elif workflow_spec_model.is_master_spec:
category_name = 'Master Specification'
elif workflow_spec_model.library:
category_name = 'Library Specs'
if category_name is not None:
# ensure_category_folder_exists(category_name)
# ensure_spec_folder_exists(workflow_spec_model.display_name)
file_path = os.path.join(SYNC_FILE_ROOT,
category_name,
workflow_spec_model.display_name,
file_model.name)
elif file_model.workflow_id is not None:
# we have a workflow file
pass
elif file_model.is_reference:
# we have a reference file?
print(f'Reference file: {file_model.name}')
else:
print(f'Not processed: {file_model.name}')
if file_path is not None:
os.makedirs(os.path.dirname(file_path), exist_ok=True)
file_data_model = session.query(FileDataModel). \
filter(FileDataModel.file_model_id == file_model.id). \
order_by(desc(FileDataModel.version)). \
first()
with open(file_path, 'wb') as f_handle:
f_handle.write(file_data_model.data)
# print(f'write_file_to_system: file_path: {file_path}')

View File

@ -0,0 +1,202 @@
from crc import app, session
from crc.models.file import FileModel, FileModelSchema, FileDataModel
from crc.models.workflow import WorkflowSpecModel, WorkflowSpecModelSchema, WorkflowSpecCategoryModel, WorkflowSpecCategoryModelSchema
from crc.services.workflow_service import WorkflowService
from sqlalchemy import desc
import json
import os
SYNC_FILE_ROOT = os.path.join(app.root_path, '..', 'files')
class FromFilesystemService(object):
@staticmethod
def process_directory(directory):
files = []
directories = []
directory_items = os.scandir(directory)
for item in directory_items:
if item.is_dir():
directories.append(item)
elif item.is_file():
files.append(item)
return files, directories
@staticmethod
def process_workflow_spec(json_file, directory):
file_path = os.path.join(directory, json_file)
with open(file_path, 'r') as f_open:
data = f_open.read()
data_obj = json.loads(data)
workflow_spec_model = session.query(WorkflowSpecModel).filter(
WorkflowSpecModel.display_name == data_obj['display_name']).first()
if not workflow_spec_model:
category_id = session.query(WorkflowSpecCategoryModel.id).filter(
WorkflowSpecCategoryModel.display_name == data_obj['display_name']).scalar()
workflow_spec_model = WorkflowSpecModel(id=data_obj['id'],
display_name=data_obj['display_name'],
description=data_obj['description'],
is_master_spec=data_obj['is_master_spec'],
category_id=category_id,
display_order=data_obj['display_order'],
standalone=data_obj['standalone'],
library=data_obj['library'])
session.add(workflow_spec_model)
session.commit()
print(f'process_workflow_spec: workflow_spec_model: {workflow_spec_model}')
return workflow_spec_model
def process_workflow_spec_files(self):
pass
@staticmethod
def process_category(json_file, root):
print(f'process_category: json_file: {json_file}')
file_path = os.path.join(root, json_file)
with open(file_path, 'r') as f_open:
data = f_open.read()
data_obj = json.loads(data)
category = session.query(WorkflowSpecCategoryModel).filter(
WorkflowSpecCategoryModel.display_name == data_obj['display_name']).first()
if not category:
category = WorkflowSpecCategoryModel(display_name=data_obj['display_name'],
display_order=data_obj['display_order'],
admin=data_obj['admin'])
session.add(category)
else:
category.display_order = data_obj['display_order']
category.admin = data_obj['admin']
# print(data)
print(f'process_category: category: {category}')
session.commit()
return category
def process_workflow_spec_directory(self, spec_directory):
print(f'process_workflow_spec_directory: {spec_directory}')
files, directories = self.process_directory(spec_directory)
for file in files:
print(f'process_workflow_spec_directory: file: {file}')
def process_category_directory(self, category_directory):
print(f'process_category_directory: {category_directory}')
files, directories = self.process_directory(category_directory)
for file in files:
if file.name.endswith('.json'):
workflow_spec = self.process_workflow_spec(file, category_directory)
for workflow_spec_directory in directories:
directory_path = os.path.join(category_directory, workflow_spec_directory)
self.process_workflow_spec_directory(directory_path)
def process_root_directory(self, root_directory):
files, directories = self.process_directory(root_directory)
for file in files:
if file.name.endswith('.json'):
category_model = self.process_category(file, root_directory)
WorkflowService.cleanup_workflow_spec_category_display_order()
for directory in directories:
directory_path = os.path.join(root_directory, directory)
self.process_category_directory(directory_path)
def update_file_metadata_from_filesystem(self, root_directory):
self.process_root_directory(root_directory)
class ToFilesystemService(object):
def process_category(self, category):
# Make sure a directory exists for the category
# Add a json file dumped from the category model
category_path = os.path.join(SYNC_FILE_ROOT, category.display_name)
os.makedirs(os.path.dirname(category_path), exist_ok=True)
json_file_name = f'{category.display_name}.json'
json_file_path = os.path.join(SYNC_FILE_ROOT, json_file_name)
category_model_schema = WorkflowSpecCategoryModelSchema().dumps(category)
with open(json_file_path, 'w') as j_handle:
j_handle.write(category_model_schema)
def process_workflow_spec(self, workflow_spec, category_name_string):
# Make sure a directory exists for the workflow spec
# Add a json file dumped from the workflow spec model
workflow_spec_path = os.path.join(SYNC_FILE_ROOT, category_name_string, workflow_spec.display_name)
os.makedirs(os.path.dirname(workflow_spec_path), exist_ok=True)
json_file_name = f'{workflow_spec.display_name}.json'
json_file_path = os.path.join(SYNC_FILE_ROOT, category_name_string, json_file_name)
workflow_spec_schema = WorkflowSpecModelSchema().dumps(workflow_spec)
with open(json_file_path, 'w') as j_handle:
j_handle.write(workflow_spec_schema)
def process_workflow_spec_file(self, workflow_spec_file, workflow_spec_file_path):
# workflow_spec_file_path = os.path.join
os.makedirs(os.path.dirname(workflow_spec_file_path), exist_ok=True)
file_data_model = session.query(FileDataModel). \
filter(FileDataModel.file_model_id == workflow_spec_file.id). \
order_by(desc(FileDataModel.version)). \
first()
with open(workflow_spec_file_path, 'wb') as f_handle:
f_handle.write(file_data_model.data)
json_file_path = f'{workflow_spec_file_path}.json'
workflow_spec_file_model = session.query(FileModel).filter(FileModel.id==file_data_model.file_model_id).first()
workflow_spec_file_schema = FileModelSchema().dumps(workflow_spec_file_model)
with open(json_file_path, 'w') as j_handle:
j_handle.write(workflow_spec_file_schema)
print('process_workflow_spec_file: done: ')
def write_file_to_system(self, file_model):
category_name = None
if file_model.workflow_spec_id is not None:
# we have a workflow spec file
workflow_spec_model = session.query(WorkflowSpecModel).filter(WorkflowSpecModel.id == file_model.workflow_spec_id).first()
if workflow_spec_model:
if workflow_spec_model.category_id is not None:
category_model = session.query(WorkflowSpecCategoryModel).filter(WorkflowSpecCategoryModel.id == workflow_spec_model.category_id).first()
self.process_category(category_model)
category_name = category_model.display_name
elif workflow_spec_model.is_master_spec:
category_name = 'Master Specification'
elif workflow_spec_model.library:
category_name = 'Library Specs'
if category_name is not None:
self.process_workflow_spec(workflow_spec_model, category_name)
file_path = os.path.join(SYNC_FILE_ROOT,
category_name,
workflow_spec_model.display_name,
file_model.name)
self.process_workflow_spec_file(file_model, file_path)
elif file_model.workflow_id is not None:
# we have a workflow file
pass
elif file_model.is_reference:
# we have a reference file?
print(f'Reference file: {file_model.name}')
else:
print(f'Not processed: {file_model.name}')
print(f'write_file_to_system: done: ')

View File

@ -5,17 +5,15 @@ Revises: 44dd9397c555
Create Date: 2021-12-14 10:52:50.785342
"""
import json
from alembic import op
import sqlalchemy as sa
from crc import app, db, session
# from crc.models.file import FileModel, FileDataModel
# from crc.models.workflow import WorkflowSpecModel, WorkflowSpecModelSchema, WorkflowSpecCategoryModel, WorkflowSpecCategoryModelSchema
# import os
#
# from crc.services.file_service import FileService
from crc import app, session
from crc.models.file import FileModel
from crc.services.file_service import FileService
from crc.services.temp_migration_service import FromFilesystemService, ToFilesystemService
import os
# revision identifiers, used by Alembic.
revision = '7225d990740e'
@ -24,169 +22,32 @@ branch_labels = None
depends_on = None
class TempCategoryModel(db.Model):
__tablename__ = 'temp_category'
id = db.Column(db.Integer, primary_key=True)
display_name = db.Column(db.String)
display_order = db.Column(db.Integer)
admin = db.Column(db.Boolean)
class TempSpecModel(db.Model):
__tablename__ = 'temp_spec'
id = db.Column(db.String, primary_key=True)
display_name = db.Column(db.String)
display_order = db.Column(db.Integer, nullable=True)
description = db.Column(db.Text)
category_id = db.Column(db.Integer, db.ForeignKey('workflow_spec_category.id'), nullable=True)
category = db.relationship("WorkflowSpecCategoryModel")
is_master_spec = db.Column(db.Boolean, default=False)
standalone = db.Column(db.Boolean, default=False)
library = db.Column(db.Boolean, default=False)
#
# def process_directory(directory):
# files = []
# directories = []
# directory_items = os.scandir(directory)
# for item in directory_items:
# if item.is_dir():
# directories.append(item)
# elif item.is_file():
# files.append(item)
#
# return files, directories
#
#
# def process_workflow_spec(json_file, directory):
# file_path = os.path.join(directory, json_file)
#
# with open(file_path, 'r') as f_open:
# data = f_open.read()
# data_obj = json.loads(data)
# workflow_spec_model = session.query(WorkflowSpecModel).filter(WorkflowSpecModel.display_name==data_obj['display_name']).first()
# if not workflow_spec_model:
# workflow_spec_model = WorkflowSpecModel(display_name=data_obj['display_name'],
# description=data_obj['description'],
# is_master_spec=data_obj['is_master_spec'],
# category_id=data_obj['category_id'],
# display_order=data_obj['display_order'],
# standalone=data_obj['standalone'],
# library=data_obj['library'])
# session.add(workflow_spec_model)
#
# # session.commit()
#
# print(f'process_workflow_spec: workflow_spec_model: {workflow_spec_model}')
# return workflow_spec_model
#
#
# def process_workflow_spec_files():
# pass
#
#
# def process_category(json_file, root):
# print(f'process_category: json_file: {json_file}')
# file_path = os.path.join(root, json_file)
#
# with open(file_path, 'r') as f_open:
# data = f_open.read()
# data_obj = json.loads(data)
# category = session.query(TempCategoryModel).filter(
# TempCategoryModel.display_name == data_obj['display_name']).first()
# if not category:
# category = TempCategoryModel(display_name=data_obj['display_name'],
# display_order=data_obj['display_order'],
# admin=data_obj['admin'])
# session.add(category)
# else:
# category.display_order = data_obj['display_order']
# category.admin = data_obj['admin']
# # print(data)
# print(f'process_category: category: {category}')
#
# session.commit()
# return category
#
#
# def process_workflow_spec_directory(spec_directory):
# print(f'process_workflow_spec_directory: {spec_directory}')
# files, directories = process_directory(spec_directory)
#
# for file in files:
# print(f'process_workflow_spec_directory: file: {file}')
#
#
# def process_category_directory(category_directory):
# print(f'process_category_directory: {category_directory}')
# files, directories = process_directory(category_directory)
#
# for file in files:
# if file.name.endswith('.json'):
# workflow_spec = process_workflow_spec(file, category_directory)
#
# for workflow_spec_directory in directories:
# directory_path = os.path.join(category_directory, workflow_spec_directory)
# process_workflow_spec_directory(directory_path)
#
#
# def process_root_directory(root_directory):
#
# files, directories = process_directory(root_directory)
# for file in files:
# if file.name.endswith('.json'):
# category_model = process_category(file, root_directory)
#
# for directory in directories:
# directory_path = os.path.join(root_directory, directory)
# process_category_directory(directory_path)
#
#
# def update_file_metadata_from_filesystem(root_directory):
# process_root_directory(root_directory)
def temp_tables():
op.create_table('temp_category',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('display_name', sa.String(), nullable=True),
sa.Column('display_order', sa.String(), nullable=True),
sa.Column('admin', sa.Boolean(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_table('temp_spec',
sa.Column('id', sa.String(), nullable=False),
sa.Column('display_name', sa.String()),
sa.Column('description', sa.Text()),
sa.Column('is_master_spec', sa.Boolean(), nullable=True),
sa.Column('category_id', sa.Integer(), nullable=True),
sa.Column('category', sa.Integer(), nullable=True),
sa.Column('display_order', sa.Integer(), nullable=True),
sa.Column('standalone', sa.Boolean(), nullable=True),
sa.Column('library', sa.Boolean(), nullable=True),
sa.ForeignKeyConstraint(['category_id'], ['temp_category.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_foreign_key(None, 'temp_spec', 'temp_category', ['category'], ['id'])
def upgrade():
"""Starting this cautiously
Don't want to hork my dev system
Not deleting records yet
Originally, was only going to delete data in file_data.data
Now, thinking about deleting the record.
"""
processed_files = []
files = session.query(FileModel).all()
for file in files:
if file.archived is not True:
FileService.write_file_to_system(file)
ToFilesystemService().write_file_to_system(file)
processed_files.append(file.id)
# TODO: delete processed files from file_data table
print('upgrade: done: ')
def downgrade():
temp_tables()
print(f'temp category count: {session.query(TempCategoryModel).count()}')
# Update DB from the filesystem
# TODO: This is a work in progress, and depends on what we do in upgrade()
SYNC_FILE_ROOT = os.path.join(app.root_path, '..', 'files')
update_file_metadata_from_filesystem(SYNC_FILE_ROOT)
FromFilesystemService().update_file_metadata_from_filesystem(SYNC_FILE_ROOT)
print('downgrade: ')

View File

@ -1,117 +1,6 @@
from tests.base_test import BaseTest
from crc import app, session
from crc.models.file import FileModel, FileDataModel
from crc.models.workflow import WorkflowSpecModel, WorkflowSpecModelSchema, WorkflowSpecCategoryModel, WorkflowSpecCategoryModelSchema
from crc.services.workflow_service import WorkflowService
import os
import json
from crc.services.file_service import FileService
def process_directory(directory):
files = []
directories = []
directory_items = os.scandir(directory)
for item in directory_items:
if item.is_dir():
directories.append(item)
elif item.is_file():
files.append(item)
return files, directories
def process_workflow_spec(json_file, directory):
file_path = os.path.join(directory, json_file)
with open(file_path, 'r') as f_open:
data = f_open.read()
data_obj = json.loads(data)
workflow_spec_model = session.query(WorkflowSpecModel).filter(WorkflowSpecModel.display_name==data_obj['display_name']).first()
if not workflow_spec_model:
category_id = session.query(WorkflowSpecCategoryModel.id).filter(WorkflowSpecCategoryModel.display_name==data_obj['display_name']).scalar()
workflow_spec_model = WorkflowSpecModel(id=data_obj['id'],
display_name=data_obj['display_name'],
description=data_obj['description'],
is_master_spec=data_obj['is_master_spec'],
category_id=category_id,
display_order=data_obj['display_order'],
standalone=data_obj['standalone'],
library=data_obj['library'])
session.add(workflow_spec_model)
session.commit()
print(f'process_workflow_spec: workflow_spec_model: {workflow_spec_model}')
return workflow_spec_model
def process_workflow_spec_files():
pass
def process_category(json_file, root):
print(f'process_category: json_file: {json_file}')
file_path = os.path.join(root, json_file)
with open(file_path, 'r') as f_open:
data = f_open.read()
data_obj = json.loads(data)
category = session.query(WorkflowSpecCategoryModel).filter(
WorkflowSpecCategoryModel.display_name == data_obj['display_name']).first()
if not category:
category = WorkflowSpecCategoryModel(display_name=data_obj['display_name'],
display_order=data_obj['display_order'],
admin=data_obj['admin'])
session.add(category)
else:
category.display_order = data_obj['display_order']
category.admin = data_obj['admin']
# print(data)
print(f'process_category: category: {category}')
session.commit()
return category
def process_workflow_spec_directory(spec_directory):
print(f'process_workflow_spec_directory: {spec_directory}')
files, directories = process_directory(spec_directory)
for file in files:
print(f'process_workflow_spec_directory: file: {file}')
def process_category_directory(category_directory):
print(f'process_category_directory: {category_directory}')
files, directories = process_directory(category_directory)
for file in files:
if file.name.endswith('.json'):
workflow_spec = process_workflow_spec(file, category_directory)
for workflow_spec_directory in directories:
directory_path = os.path.join(category_directory, workflow_spec_directory)
process_workflow_spec_directory(directory_path)
def process_root_directory(root_directory):
files, directories = process_directory(root_directory)
for file in files:
if file.name.endswith('.json'):
category_model = process_category(file, root_directory)
WorkflowService.cleanup_workflow_spec_category_display_order()
for directory in directories:
directory_path = os.path.join(root_directory, directory)
process_category_directory(directory_path)
def update_file_metadata_from_filesystem(root_directory):
process_root_directory(root_directory)
from crc.services.temp_migration_service import FromFilesystemService, SYNC_FILE_ROOT
class TestFilesFromFilesystem(BaseTest):
@ -119,7 +8,6 @@ class TestFilesFromFilesystem(BaseTest):
def test_files_from_filesystem(self):
self.load_example_data()
SYNC_FILE_ROOT = os.path.join(app.root_path, '..', 'files')
update_file_metadata_from_filesystem(SYNC_FILE_ROOT)
FromFilesystemService().update_file_metadata_from_filesystem(SYNC_FILE_ROOT)
print(f'test_files_from_filesystem')

View File

@ -1,34 +1,18 @@
from tests.base_test import BaseTest
from crc import app, session
from crc.models.file import FileModel, FileDataModel
from crc.models.workflow import WorkflowSpecModel, WorkflowSpecCategoryModel
from crc.services.file_service import FileService
import os
from crc import session
from crc.models.file import FileModel
from crc.services.temp_migration_service import ToFilesystemService
class TestFilesToFilesystem(BaseTest):
def test_files_to_filesystem(self):
self.load_example_data()
# # category = filename = ''
# # data = 'asdf'
# self.load_example_data()
#
# file_model = session.query(FileModel).first()
# # filename = file_model.name
# file_data_model = session.query(FileDataModel).filter(FileDataModel.file_model_id == file_model.id).first()
# if file_model.workflow_spec_id is None:
# file_model.workflow_spec_id = 'random_fact'
# workflow_spec_model = session.query(WorkflowSpecModel).filter(WorkflowSpecModel.id == file_model.workflow_spec_id).first()
# if workflow_spec_model.category_id is None:
# workflow_spec_model.category_id = 0
# category_model = session.query(WorkflowSpecCategoryModel).filter(WorkflowSpecCategoryModel.id == workflow_spec_model.category_id).first()
# file_path = os.path.join(app.root_path, '..', 'files', category_model.display_name, file_model.name)
# os.makedirs(os.path.dirname(file_path), exist_ok=True)
#
# with open(file_path, 'wb') as f_handle:
# f_handle.write(file_data_model.data)
files = session.query(FileModel).all()
for file in files:
if file.archived is not True:
ToFilesystemService().write_file_to_system(file)
print('test_files_to_filesystem')