cr-connect-workflow/crc/services/data_store_service.py
mike cullerton cd743b4a72 Clean up data_store set
Fix workflow_service to work with data_store_service
2022-07-13 11:47:30 -04:00

159 lines
7.6 KiB
Python

from crc import session
from crc.api.common import ApiError
from crc.models.data_store import DataStoreModel
from crc.models.workflow import WorkflowModel
from flask import g
from sqlalchemy import desc
class DataStoreBase(object):
def set_validate_common(
self, ds_type, ds_key, ds_value, task_id, workflow_id, study_id=None, user_id=None, file_id=None
):
self.check_args_set(ds_type, ds_key, file_id)
if ds_type == 'study':
record = {'task_id': task_id, 'study_id': study_id, 'workflow_id': workflow_id, ds_key: ds_value}
elif ds_type == 'file':
record = {'task_id': task_id, 'study_id': study_id, 'workflow_id': workflow_id, 'file_id': file_id, ds_key: ds_value}
elif ds_type == 'user':
record = {'task_id': task_id, 'study_id': study_id, 'workflow_id': workflow_id, 'user_id': user_id, ds_key: ds_value}
g.validation_data_store.append(record)
return record
def get_validate_common(self, ds_type, ds_key, study_id=None, user_id=None, file_id=None, ds_default=None):
# This method uses a temporary validation_data_store that is only available for the current validation request.
# This allows us to set data_store values during validation that don't affect the real data_store.
# For data_store `gets`, we first look in the temporary validation_data_store.
# If we don't find an entry in validation_data_store, we look in the real data_store.
if ds_type == 'study':
# If it's in the validation data store, return it
for record in g.validation_data_store:
if 'study_id' in record and record['study_id'] == study_id and ds_key in record:
return record[ds_key]
# If not in validation_data_store, look in the actual data_store
return self.get_data_common('study', ds_key, study_id, user_id, file_id, ds_default)
elif ds_type == 'file':
for record in g.validation_data_store:
if 'file_id' in record and record['file_id'] == file_id and ds_key in record:
return record[ds_key]
return self.get_data_common('file', ds_key, study_id, user_id, file_id, ds_default)
elif ds_type == 'user':
for record in g.validation_data_store:
if 'user_id' in record and record['user_id'] == user_id and ds_key in record:
return record[ds_key]
return self.get_data_common('user', ds_key, study_id, user_id, file_id, ds_default)
@staticmethod
def check_args_set(ds_type, ds_key, file_id):
if ds_type is None or ds_key is None:
raise ApiError(code="missing_argument",
message="Setting a data store requires `type` and `key` keyword arguments")
if ds_type not in ('study', 'user', 'file'):
raise ApiError(code='bad_ds_type',
message=f"The data store service `type` must be `study`, `user`, or `file`. We received {ds_type}")
if ds_type == 'file' and file_id is None:
raise ApiError(code="missing_argument",
message="The file data store service requires a `file_id`.")
@staticmethod
def check_args_get(dstore_type, dstore_key, file_id):
if dstore_type is None or dstore_key is None:
raise ApiError(code="missing_argument",
message=f"The data store service requires a `type` and `key`")
if dstore_type == 'file' and file_id is None:
raise ApiError(code="missing_argument",
message="The file data store service requires a `file_id`.")
def set_data_common(
self, ds_type, ds_key, ds_value, task_spec, study_id, user_id, workflow_id, file_id
):
self.check_args_set(ds_type, ds_key, file_id)
if ds_value == '' or ds_value is None:
# We delete the data store if the value is empty
return self.delete_data_store(study_id, user_id, file_id, ds_key)
workflow_spec_id = None
if workflow_id is not None:
workflow = session.query(WorkflowModel).filter(WorkflowModel.id == workflow_id).first()
workflow_spec_id = workflow.workflow_spec_id
result = self.get_previous_data_store(ds_key, study_id, user_id, file_id)
if result:
dsm = result[0]
dsm.value = ds_value
if task_spec:
dsm.task_spec = task_spec
if workflow_id:
dsm.workflow_id = workflow_id
if workflow_spec_id:
dsm.spec_id = workflow_spec_id
if len(result) > 1:
# We had a bug where we had created new records instead of updating values of existing records
# This just gets rid of all the old unused records
self.delete_extra_data_stores(result[1:])
else:
dsm = DataStoreModel(key=ds_key,
value=ds_value,
study_id=study_id,
task_spec=task_spec,
user_id=user_id, # Make this available to any User
file_id=file_id,
workflow_id=workflow_id,
spec_id=workflow_spec_id)
session.add(dsm)
session.commit()
return dsm.value
def get_data_common(self, dstore_type, dstore_key, study_id, user_id, file_id=None, dstore_default=None):
self.check_args_get(dstore_type, dstore_key, file_id)
record = session.query(DataStoreModel).\
filter_by(study_id=study_id,
user_id=user_id,
file_id=file_id,
key=dstore_key).\
first()
if record is not None:
return record.value
else:
# This is a possible default value passed in from the data_store get methods
if dstore_default is not None:
return dstore_default
@staticmethod
def get_multi_common(study_id, user_id, file_id=None):
results = session.query(DataStoreModel).filter_by(study_id=study_id,
user_id=user_id,
file_id=file_id)
return results
def delete_data_store(self, study_id, user_id, file_id, ds_key):
records = self.get_previous_data_store(ds_key, study_id, user_id, file_id)
if records is not None:
for record in records:
session.delete(record)
session.commit()
@staticmethod
def delete_extra_data_stores(records):
"""We had a bug where we created new records instead of updating existing records.
We use this to clean up all the extra records.
We may remove this method in the future."""
for record in records:
session.query(DataStoreModel).filter(DataStoreModel.id == record.id).delete()
session.commit()
@staticmethod
def get_previous_data_store(ds_key, study_id, user_id, file_id):
query = session.query(DataStoreModel).filter(DataStoreModel.key == ds_key)
if study_id:
query = query.filter(DataStoreModel.study_id == study_id)
elif file_id:
query = query.filter(DataStoreModel.file_id == file_id)
elif user_id:
query = query.filter(DataStoreModel.user_id == user_id)
result = query.order_by(desc(DataStoreModel.last_updated)).all()
return result