Make sure that top_level_workflow is transferred over, also include reference files in the list of files to sync
We may want to revisit this because I used a 'Magic' workflow_ref_id of 'REFERENCE_FILES' as a special case so I could re-use most of what I already had. This works well, but we may want to structure it different.
This commit is contained in:
parent
d8ac20b1c3
commit
f282a9ef73
|
@ -91,13 +91,30 @@ def sync_all_changed_workflows(remote):
|
||||||
workflows = workflowsdf.reset_index().to_dict(orient='records')
|
workflows = workflowsdf.reset_index().to_dict(orient='records')
|
||||||
for workflow in workflows:
|
for workflow in workflows:
|
||||||
sync_changed_files(remote,workflow['workflow_spec_id'])
|
sync_changed_files(remote,workflow['workflow_spec_id'])
|
||||||
|
sync_changed_files(remote,'REFERENCE_FILES')
|
||||||
return [x['workflow_spec_id'] for x in workflows]
|
return [x['workflow_spec_id'] for x in workflows]
|
||||||
|
|
||||||
|
|
||||||
|
def file_get(workflow_spec_id,filename):
|
||||||
|
if workflow_spec_id == 'REFERENCE_FILES':
|
||||||
|
currentfile = session.query(FileModel).filter(FileModel.is_reference == True,
|
||||||
|
FileModel.name == filename).first()
|
||||||
|
else:
|
||||||
|
currentfile = session.query(FileModel).filter(FileModel.workflow_spec_id==workflow_spec_id,
|
||||||
|
FileModel.name == filename).first()
|
||||||
|
return currentfile
|
||||||
|
|
||||||
def sync_changed_files(remote,workflow_spec_id):
|
def sync_changed_files(remote,workflow_spec_id):
|
||||||
# make sure that spec is local before syncing files
|
# make sure that spec is local before syncing files
|
||||||
|
if workflow_spec_id != 'REFERENCE_FILES':
|
||||||
specdict = WorkflowSyncService.get_remote_workflow_spec(remote,workflow_spec_id)
|
specdict = WorkflowSyncService.get_remote_workflow_spec(remote,workflow_spec_id)
|
||||||
|
# if we are updating from a master spec, then we want to make sure it is the only
|
||||||
|
# master spec in our local system.
|
||||||
|
if specdict['is_master_spec']:
|
||||||
|
masterspecs = session.query(WorkflowSpecModel).filter(WorkflowSpecModel.is_master_spec == True).all()
|
||||||
|
for masterspec in masterspecs:
|
||||||
|
masterspec.is_master_spec = False
|
||||||
|
session.add(masterspec)
|
||||||
|
|
||||||
localspec = session.query(WorkflowSpecModel).filter(WorkflowSpecModel.id == workflow_spec_id).first()
|
localspec = session.query(WorkflowSpecModel).filter(WorkflowSpecModel.id == workflow_spec_id).first()
|
||||||
if localspec is None:
|
if localspec is None:
|
||||||
|
@ -120,9 +137,12 @@ def sync_changed_files(remote,workflow_spec_id):
|
||||||
localspec.display_order = specdict['display_order']
|
localspec.display_order = specdict['display_order']
|
||||||
localspec.display_name = specdict['display_name']
|
localspec.display_name = specdict['display_name']
|
||||||
localspec.name = specdict['name']
|
localspec.name = specdict['name']
|
||||||
|
localspec.is_master_spec = specdict['is_master_spec']
|
||||||
localspec.description = specdict['description']
|
localspec.description = specdict['description']
|
||||||
session.add(localspec)
|
session.add(localspec)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
changedfiles = get_changed_files(remote,workflow_spec_id,as_df=True)
|
changedfiles = get_changed_files(remote,workflow_spec_id,as_df=True)
|
||||||
if len(changedfiles)==0:
|
if len(changedfiles)==0:
|
||||||
return []
|
return []
|
||||||
|
@ -133,8 +153,7 @@ def sync_changed_files(remote,workflow_spec_id):
|
||||||
deletefiles = deletefiles.reset_index().to_dict(orient='records')
|
deletefiles = deletefiles.reset_index().to_dict(orient='records')
|
||||||
|
|
||||||
for delfile in deletefiles:
|
for delfile in deletefiles:
|
||||||
currentfile = session.query(FileModel).filter(FileModel.workflow_spec_id==workflow_spec_id,
|
currentfile = file_get(workflow_spec_id,delfile['filename'])
|
||||||
FileModel.name == delfile['filename']).first()
|
|
||||||
|
|
||||||
# it is more appropriate to archive the file than delete
|
# it is more appropriate to archive the file than delete
|
||||||
# due to the fact that we might have workflows that are using the
|
# due to the fact that we might have workflows that are using the
|
||||||
|
@ -143,11 +162,14 @@ def sync_changed_files(remote,workflow_spec_id):
|
||||||
session.add(currentfile)
|
session.add(currentfile)
|
||||||
|
|
||||||
for updatefile in updatefiles:
|
for updatefile in updatefiles:
|
||||||
currentfile = session.query(FileModel).filter(FileModel.workflow_spec_id==workflow_spec_id,
|
currentfile = file_get(workflow_spec_id,updatefile['filename'])
|
||||||
FileModel.name == updatefile['filename']).first()
|
|
||||||
if not currentfile:
|
if not currentfile:
|
||||||
currentfile = FileModel()
|
currentfile = FileModel()
|
||||||
currentfile.name = updatefile['filename']
|
currentfile.name = updatefile['filename']
|
||||||
|
if workflow_spec_id == 'REFERENCE_FILES':
|
||||||
|
currentfile.workflow_spec_id = None
|
||||||
|
currentfile.is_reference = True
|
||||||
|
else:
|
||||||
currentfile.workflow_spec_id = workflow_spec_id
|
currentfile.workflow_spec_id = workflow_spec_id
|
||||||
|
|
||||||
currentfile.date_created = updatefile['date_created']
|
currentfile.date_created = updatefile['date_created']
|
||||||
|
@ -174,6 +196,13 @@ def get_changed_files(remote,workflow_spec_id,as_df=False):
|
||||||
local = get_workflow_spec_files_dataframe(workflow_spec_id).reset_index()
|
local = get_workflow_spec_files_dataframe(workflow_spec_id).reset_index()
|
||||||
local['md5_hash'] = local['md5_hash'].astype('str')
|
local['md5_hash'] = local['md5_hash'].astype('str')
|
||||||
remote_files['md5_hash'] = remote_files['md5_hash'].astype('str')
|
remote_files['md5_hash'] = remote_files['md5_hash'].astype('str')
|
||||||
|
if len(local) == 0:
|
||||||
|
remote_files['new'] = True
|
||||||
|
remote_files['location'] = 'remote'
|
||||||
|
if as_df:
|
||||||
|
return remote_files
|
||||||
|
else:
|
||||||
|
return remote_files.reset_index().to_dict(orient='records')
|
||||||
|
|
||||||
different = remote_files.merge(local,
|
different = remote_files.merge(local,
|
||||||
right_on=['filename','md5_hash'],
|
right_on=['filename','md5_hash'],
|
||||||
|
@ -249,7 +278,10 @@ def get_workflow_spec_files_dataframe(workflowid):
|
||||||
hash so we can determine file differences for a changed workflow on a box.
|
hash so we can determine file differences for a changed workflow on a box.
|
||||||
Return a dataframe
|
Return a dataframe
|
||||||
"""
|
"""
|
||||||
x = session.query(FileDataModel).join(FileModel).filter(FileModel.workflow_spec_id==workflowid)
|
if workflowid == 'REFERENCE_FILES':
|
||||||
|
x = session.query(FileDataModel).join(FileModel).filter(FileModel.is_reference == True)
|
||||||
|
else:
|
||||||
|
x = session.query(FileDataModel).join(FileModel).filter(FileModel.workflow_spec_id == workflowid)
|
||||||
# there might be a cleaner way of getting a data frome from some of the
|
# there might be a cleaner way of getting a data frome from some of the
|
||||||
# fields in the ORM - but this works OK
|
# fields in the ORM - but this works OK
|
||||||
filelist = []
|
filelist = []
|
||||||
|
@ -295,6 +327,9 @@ def get_all_spec_state_dataframe():
|
||||||
'md5_hash':file.md5_hash,
|
'md5_hash':file.md5_hash,
|
||||||
'filename':file.file_model.name,
|
'filename':file.file_model.name,
|
||||||
'date_created':file.date_created})
|
'date_created':file.date_created})
|
||||||
|
if len(filelist) == 0:
|
||||||
|
df = pd.DataFrame(columns=['file_model_id','workflow_spec_id','md5_hash','filename','date_created'])
|
||||||
|
else:
|
||||||
df = pd.DataFrame(filelist)
|
df = pd.DataFrame(filelist)
|
||||||
|
|
||||||
# get a distinct list of file_model_id's with the most recent file_data retained
|
# get a distinct list of file_model_id's with the most recent file_data retained
|
||||||
|
|
|
@ -26,7 +26,7 @@ class WorkflowSyncService(object):
|
||||||
this just gets the details of a workflow spec from the
|
this just gets the details of a workflow spec from the
|
||||||
remote side.
|
remote side.
|
||||||
"""
|
"""
|
||||||
url = remote+'/v1.0/workflow-sync/'+workflow_spec_id+'/spec'
|
url = remote+'/v1.0/workflow_sync/'+workflow_spec_id+'/spec'
|
||||||
return WorkflowSyncService.__make_request(url)
|
return WorkflowSyncService.__make_request(url)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@ -39,7 +39,7 @@ class WorkflowSyncService(object):
|
||||||
try:
|
try:
|
||||||
response = requests.get(url,headers={'X-CR-API-KEY':app.config['API_TOKEN']})
|
response = requests.get(url,headers={'X-CR-API-KEY':app.config['API_TOKEN']})
|
||||||
except:
|
except:
|
||||||
raise ApiError("workflow_sync_error",response.text)
|
raise ApiError("workflow_sync_error",url)
|
||||||
if response.ok and response.text:
|
if response.ok and response.text:
|
||||||
if return_contents:
|
if return_contents:
|
||||||
return response.content
|
return response.content
|
||||||
|
|
Loading…
Reference in New Issue