mirror of
https://github.com/sartography/cr-connect-workflow.git
synced 2025-02-24 05:38:25 +00:00
159 lines
7.6 KiB
Python
159 lines
7.6 KiB
Python
from crc import session
|
|
from crc.api.common import ApiError
|
|
from crc.models.data_store import DataStoreModel
|
|
from crc.models.workflow import WorkflowModel
|
|
|
|
from flask import g
|
|
from sqlalchemy import desc
|
|
|
|
|
|
class DataStoreBase(object):
|
|
|
|
def set_validate_common(
|
|
self, ds_type, ds_key, ds_value, task_id, workflow_id, study_id=None, user_id=None, file_id=None
|
|
):
|
|
self.check_args_set(ds_type, ds_key, file_id)
|
|
if ds_type == 'study':
|
|
record = {'task_id': task_id, 'study_id': study_id, 'workflow_id': workflow_id, ds_key: ds_value}
|
|
elif ds_type == 'file':
|
|
record = {'task_id': task_id, 'study_id': study_id, 'workflow_id': workflow_id, 'file_id': file_id, ds_key: ds_value}
|
|
elif ds_type == 'user':
|
|
record = {'task_id': task_id, 'study_id': study_id, 'workflow_id': workflow_id, 'user_id': user_id, ds_key: ds_value}
|
|
g.validation_data_store.append(record)
|
|
return record
|
|
|
|
def get_validate_common(self, ds_type, ds_key, study_id=None, user_id=None, file_id=None, ds_default=None):
|
|
# This method uses a temporary validation_data_store that is only available for the current validation request.
|
|
# This allows us to set data_store values during validation that don't affect the real data_store.
|
|
# For data_store `gets`, we first look in the temporary validation_data_store.
|
|
# If we don't find an entry in validation_data_store, we look in the real data_store.
|
|
if ds_type == 'study':
|
|
# If it's in the validation data store, return it
|
|
for record in g.validation_data_store:
|
|
if 'study_id' in record and record['study_id'] == study_id and ds_key in record:
|
|
return record[ds_key]
|
|
# If not in validation_data_store, look in the actual data_store
|
|
return self.get_data_common('study', ds_key, study_id, user_id, file_id, ds_default)
|
|
elif ds_type == 'file':
|
|
for record in g.validation_data_store:
|
|
if 'file_id' in record and record['file_id'] == file_id and ds_key in record:
|
|
return record[ds_key]
|
|
return self.get_data_common('file', ds_key, study_id, user_id, file_id, ds_default)
|
|
elif ds_type == 'user':
|
|
for record in g.validation_data_store:
|
|
if 'user_id' in record and record['user_id'] == user_id and ds_key in record:
|
|
return record[ds_key]
|
|
return self.get_data_common('user', ds_key, study_id, user_id, file_id, ds_default)
|
|
|
|
@staticmethod
|
|
def check_args_set(ds_type, ds_key, file_id):
|
|
if ds_type is None or ds_key is None:
|
|
raise ApiError(code="missing_argument",
|
|
message="Setting a data store requires `type` and `key` keyword arguments")
|
|
if ds_type not in ('study', 'user', 'file'):
|
|
raise ApiError(code='bad_ds_type',
|
|
message=f"The data store service `type` must be `study`, `user`, or `file`. We received {ds_type}")
|
|
if ds_type == 'file' and file_id is None:
|
|
raise ApiError(code="missing_argument",
|
|
message="The file data store service requires a `file_id`.")
|
|
|
|
@staticmethod
|
|
def check_args_get(dstore_type, dstore_key, file_id):
|
|
if dstore_type is None or dstore_key is None:
|
|
raise ApiError(code="missing_argument",
|
|
message=f"The data store service requires a `type` and `key`")
|
|
if dstore_type == 'file' and file_id is None:
|
|
raise ApiError(code="missing_argument",
|
|
message="The file data store service requires a `file_id`.")
|
|
|
|
def set_data_common(
|
|
self, ds_type, ds_key, ds_value, task_spec, study_id, user_id, workflow_id, file_id
|
|
):
|
|
self.check_args_set(ds_type, ds_key, file_id)
|
|
|
|
if ds_value == '' or ds_value is None:
|
|
# We delete the data store if the value is empty
|
|
return self.delete_data_store(study_id, user_id, file_id, ds_key)
|
|
workflow_spec_id = None
|
|
if workflow_id is not None:
|
|
workflow = session.query(WorkflowModel).filter(WorkflowModel.id == workflow_id).first()
|
|
workflow_spec_id = workflow.workflow_spec_id
|
|
|
|
result = self.get_previous_data_store(ds_key, study_id, user_id, file_id)
|
|
if result:
|
|
dsm = result[0]
|
|
dsm.value = ds_value
|
|
if task_spec:
|
|
dsm.task_spec = task_spec
|
|
if workflow_id:
|
|
dsm.workflow_id = workflow_id
|
|
if workflow_spec_id:
|
|
dsm.spec_id = workflow_spec_id
|
|
if len(result) > 1:
|
|
# We had a bug where we had created new records instead of updating values of existing records
|
|
# This just gets rid of all the old unused records
|
|
self.delete_extra_data_stores(result[1:])
|
|
else:
|
|
dsm = DataStoreModel(key=ds_key,
|
|
value=ds_value,
|
|
study_id=study_id,
|
|
task_spec=task_spec,
|
|
user_id=user_id, # Make this available to any User
|
|
file_id=file_id,
|
|
workflow_id=workflow_id,
|
|
spec_id=workflow_spec_id)
|
|
session.add(dsm)
|
|
session.commit()
|
|
|
|
return dsm.value
|
|
|
|
def get_data_common(self, dstore_type, dstore_key, study_id, user_id, file_id=None, dstore_default=None):
|
|
self.check_args_get(dstore_type, dstore_key, file_id)
|
|
record = session.query(DataStoreModel).\
|
|
filter_by(study_id=study_id,
|
|
user_id=user_id,
|
|
file_id=file_id,
|
|
key=dstore_key).\
|
|
first()
|
|
if record is not None:
|
|
return record.value
|
|
else:
|
|
# This is a possible default value passed in from the data_store get methods
|
|
if dstore_default is not None:
|
|
return dstore_default
|
|
|
|
@staticmethod
|
|
def get_multi_common(study_id, user_id, file_id=None):
|
|
results = session.query(DataStoreModel).filter_by(study_id=study_id,
|
|
user_id=user_id,
|
|
file_id=file_id)
|
|
return results
|
|
|
|
def delete_data_store(self, study_id, user_id, file_id, ds_key):
|
|
records = self.get_previous_data_store(ds_key, study_id, user_id, file_id)
|
|
if records is not None:
|
|
for record in records:
|
|
session.delete(record)
|
|
session.commit()
|
|
|
|
@staticmethod
|
|
def delete_extra_data_stores(records):
|
|
"""We had a bug where we created new records instead of updating existing records.
|
|
We use this to clean up all the extra records.
|
|
We may remove this method in the future."""
|
|
for record in records:
|
|
session.query(DataStoreModel).filter(DataStoreModel.id == record.id).delete()
|
|
session.commit()
|
|
|
|
@staticmethod
|
|
def get_previous_data_store(ds_key, study_id, user_id, file_id):
|
|
query = session.query(DataStoreModel).filter(DataStoreModel.key == ds_key)
|
|
if study_id:
|
|
query = query.filter(DataStoreModel.study_id == study_id)
|
|
elif file_id:
|
|
query = query.filter(DataStoreModel.file_id == file_id)
|
|
elif user_id:
|
|
query = query.filter(DataStoreModel.user_id == user_id)
|
|
result = query.order_by(desc(DataStoreModel.last_updated)).all()
|
|
return result
|