From f282a9ef7386d5d8fedaeda26d141928f87f5099 Mon Sep 17 00:00:00 2001 From: Kelly McDonald Date: Mon, 11 Jan 2021 09:21:44 -0500 Subject: [PATCH] Make sure that top_level_workflow is transferred over, also include reference files in the list of files to sync We may want to revisit this because I used a 'Magic' workflow_ref_id of 'REFERENCE_FILES' as a special case so I could re-use most of what I already had. This works well, but we may want to structure it different. --- crc/api/workflow_sync.py | 95 ++++++++++++++++++++++++----------- crc/services/workflow_sync.py | 4 +- 2 files changed, 67 insertions(+), 32 deletions(-) diff --git a/crc/api/workflow_sync.py b/crc/api/workflow_sync.py index 68cd9fd2..9b631444 100644 --- a/crc/api/workflow_sync.py +++ b/crc/api/workflow_sync.py @@ -91,37 +91,57 @@ def sync_all_changed_workflows(remote): workflows = workflowsdf.reset_index().to_dict(orient='records') for workflow in workflows: sync_changed_files(remote,workflow['workflow_spec_id']) + sync_changed_files(remote,'REFERENCE_FILES') return [x['workflow_spec_id'] for x in workflows] +def file_get(workflow_spec_id,filename): + if workflow_spec_id == 'REFERENCE_FILES': + currentfile = session.query(FileModel).filter(FileModel.is_reference == True, + FileModel.name == filename).first() + else: + currentfile = session.query(FileModel).filter(FileModel.workflow_spec_id==workflow_spec_id, + FileModel.name == filename).first() + return currentfile + def sync_changed_files(remote,workflow_spec_id): # make sure that spec is local before syncing files + if workflow_spec_id != 'REFERENCE_FILES': + specdict = WorkflowSyncService.get_remote_workflow_spec(remote,workflow_spec_id) + # if we are updating from a master spec, then we want to make sure it is the only + # master spec in our local system. + if specdict['is_master_spec']: + masterspecs = session.query(WorkflowSpecModel).filter(WorkflowSpecModel.is_master_spec == True).all() + for masterspec in masterspecs: + masterspec.is_master_spec = False + session.add(masterspec) - specdict = WorkflowSyncService.get_remote_workflow_spec(remote,workflow_spec_id) + localspec = session.query(WorkflowSpecModel).filter(WorkflowSpecModel.id == workflow_spec_id).first() + if localspec is None: + localspec = WorkflowSpecModel() + localspec.id = workflow_spec_id + if specdict['category'] == None: + localspec.category = None + else: + localcategory = session.query(WorkflowSpecCategoryModel).filter(WorkflowSpecCategoryModel.name + == specdict['category']['name']).first() + if localcategory == None: + #category doesn't exist - lets make it + localcategory = WorkflowSpecCategoryModel() + localcategory.name = specdict['category']['name'] + localcategory.display_name = specdict['category']['display_name'] + localcategory.display_order = specdict['category']['display_order'] + session.add(localcategory) + localspec.category = localcategory + + localspec.display_order = specdict['display_order'] + localspec.display_name = specdict['display_name'] + localspec.name = specdict['name'] + localspec.is_master_spec = specdict['is_master_spec'] + localspec.description = specdict['description'] + session.add(localspec) - localspec = session.query(WorkflowSpecModel).filter(WorkflowSpecModel.id == workflow_spec_id).first() - if localspec is None: - localspec = WorkflowSpecModel() - localspec.id = workflow_spec_id - if specdict['category'] == None: - localspec.category = None - else: - localcategory = session.query(WorkflowSpecCategoryModel).filter(WorkflowSpecCategoryModel.name - == specdict['category']['name']).first() - if localcategory == None: - #category doesn't exist - lets make it - localcategory = WorkflowSpecCategoryModel() - localcategory.name = specdict['category']['name'] - localcategory.display_name = specdict['category']['display_name'] - localcategory.display_order = specdict['category']['display_order'] - session.add(localcategory) - localspec.category = localcategory - localspec.display_order = specdict['display_order'] - localspec.display_name = specdict['display_name'] - localspec.name = specdict['name'] - localspec.description = specdict['description'] - session.add(localspec) changedfiles = get_changed_files(remote,workflow_spec_id,as_df=True) if len(changedfiles)==0: @@ -133,8 +153,7 @@ def sync_changed_files(remote,workflow_spec_id): deletefiles = deletefiles.reset_index().to_dict(orient='records') for delfile in deletefiles: - currentfile = session.query(FileModel).filter(FileModel.workflow_spec_id==workflow_spec_id, - FileModel.name == delfile['filename']).first() + currentfile = file_get(workflow_spec_id,delfile['filename']) # it is more appropriate to archive the file than delete # due to the fact that we might have workflows that are using the @@ -143,12 +162,15 @@ def sync_changed_files(remote,workflow_spec_id): session.add(currentfile) for updatefile in updatefiles: - currentfile = session.query(FileModel).filter(FileModel.workflow_spec_id==workflow_spec_id, - FileModel.name == updatefile['filename']).first() + currentfile = file_get(workflow_spec_id,updatefile['filename']) if not currentfile: currentfile = FileModel() currentfile.name = updatefile['filename'] - currentfile.workflow_spec_id = workflow_spec_id + if workflow_spec_id == 'REFERENCE_FILES': + currentfile.workflow_spec_id = None + currentfile.is_reference = True + else: + currentfile.workflow_spec_id = workflow_spec_id currentfile.date_created = updatefile['date_created'] currentfile.type = updatefile['type'] @@ -174,6 +196,13 @@ def get_changed_files(remote,workflow_spec_id,as_df=False): local = get_workflow_spec_files_dataframe(workflow_spec_id).reset_index() local['md5_hash'] = local['md5_hash'].astype('str') remote_files['md5_hash'] = remote_files['md5_hash'].astype('str') + if len(local) == 0: + remote_files['new'] = True + remote_files['location'] = 'remote' + if as_df: + return remote_files + else: + return remote_files.reset_index().to_dict(orient='records') different = remote_files.merge(local, right_on=['filename','md5_hash'], @@ -249,7 +278,10 @@ def get_workflow_spec_files_dataframe(workflowid): hash so we can determine file differences for a changed workflow on a box. Return a dataframe """ - x = session.query(FileDataModel).join(FileModel).filter(FileModel.workflow_spec_id==workflowid) + if workflowid == 'REFERENCE_FILES': + x = session.query(FileDataModel).join(FileModel).filter(FileModel.is_reference == True) + else: + x = session.query(FileDataModel).join(FileModel).filter(FileModel.workflow_spec_id == workflowid) # there might be a cleaner way of getting a data frome from some of the # fields in the ORM - but this works OK filelist = [] @@ -295,7 +327,10 @@ def get_all_spec_state_dataframe(): 'md5_hash':file.md5_hash, 'filename':file.file_model.name, 'date_created':file.date_created}) - df = pd.DataFrame(filelist) + if len(filelist) == 0: + df = pd.DataFrame(columns=['file_model_id','workflow_spec_id','md5_hash','filename','date_created']) + else: + df = pd.DataFrame(filelist) # get a distinct list of file_model_id's with the most recent file_data retained df = df.sort_values('date_created').drop_duplicates(['file_model_id'],keep='last').copy() diff --git a/crc/services/workflow_sync.py b/crc/services/workflow_sync.py index 26796413..849c815e 100644 --- a/crc/services/workflow_sync.py +++ b/crc/services/workflow_sync.py @@ -26,7 +26,7 @@ class WorkflowSyncService(object): this just gets the details of a workflow spec from the remote side. """ - url = remote+'/v1.0/workflow-sync/'+workflow_spec_id+'/spec' + url = remote+'/v1.0/workflow_sync/'+workflow_spec_id+'/spec' return WorkflowSyncService.__make_request(url) @staticmethod @@ -39,7 +39,7 @@ class WorkflowSyncService(object): try: response = requests.get(url,headers={'X-CR-API-KEY':app.config['API_TOKEN']}) except: - raise ApiError("workflow_sync_error",response.text) + raise ApiError("workflow_sync_error",url) if response.ok and response.text: if return_contents: return response.content