import hashlib import pandas as pd from crc import session, app from crc.api.common import ApiError from crc.models.file import FileModel, FileDataModel from crc.models.workflow import WorkflowSpecModel, WorkflowSpecCategoryModel from crc.services.file_service import FileService from crc.services.workflow_sync import WorkflowSyncService from crc.api.workflow import get_workflow_specification def get_sync_workflow_specification(workflow_spec_id): return get_workflow_specification(workflow_spec_id) def join_uuids(uuids): """Joins a pandas Series of uuids and combines them in one hash""" combined_uuids = ''.join([str(uuid) for uuid in uuids.sort_values()]) # ensure that values are always # in the same order return hashlib.md5(combined_uuids.encode('utf8')).hexdigest() # make a hash of the hashes def verify_token(token, required_scopes): if token == app.config['API_TOKEN']: return {'scope':['any']} else: raise ApiError("permission_denied", "API Token information is not correct") def get_changed_workflows(remote,as_df=False): """ gets a remote endpoint - gets the workflows and then determines what workflows are different from the remote endpoint """ remote_workflows_list = WorkflowSyncService.get_all_remote_workflows(remote) remote_workflows = pd.DataFrame(remote_workflows_list) # get the local thumbprints & make sure that 'workflow_spec_id' is a column, not an index local = get_all_spec_state_dataframe().reset_index() # merge these on workflow spec id and hash - this will # make two different date columns date_x and date_y different = remote_workflows.merge(local, right_on=['workflow_spec_id','md5_hash'], left_on=['workflow_spec_id','md5_hash'], how = 'outer' , indicator=True).loc[lambda x : x['_merge']!='both'] if len(different)==0: return [] # each line has a tag on it - if was in the left or the right, # label it so we know if that was on the remote or local machine different.loc[different['_merge']=='left_only','location'] = 'remote' different.loc[different['_merge']=='right_only','location'] = 'local' # this takes the different date_created_x and date-created_y columns and # combines them back into one date_created column index = different['date_created_x'].isnull() different.loc[index,'date_created_x'] = different[index]['date_created_y'] different = different[['workflow_spec_id','date_created_x','location']].copy() different.columns=['workflow_spec_id','date_created','location'] # our different list will have multiple entries for a workflow if there is a version on either side # we want to grab the most recent one, so we sort and grab the most recent one for each workflow changedfiles = different.sort_values('date_created',ascending=False).groupby('workflow_spec_id').first() # get an exclusive or list of workflow ids - that is we want lists of files that are # on one machine or the other, but not both remote_spec_ids = remote_workflows[['workflow_spec_id']] local_spec_ids = local[['workflow_spec_id']] left = remote_spec_ids[~remote_spec_ids['workflow_spec_id'].isin(local_spec_ids['workflow_spec_id'])] right = local_spec_ids[~local_spec_ids['workflow_spec_id'].isin(remote_spec_ids['workflow_spec_id'])] # flag files as new that are only on the remote box and remove the files that are only on the local box changedfiles['new'] = False changedfiles.loc[changedfiles.index.isin(left['workflow_spec_id']), 'new'] = True output = changedfiles[~changedfiles.index.isin(right['workflow_spec_id'])] # return the list as a dict, let swagger convert it to json if as_df: return output else: return output.reset_index().to_dict(orient='records') def sync_all_changed_workflows(remote): workflowsdf = get_changed_workflows(remote,as_df=True) if len(workflowsdf) ==0: return [] workflows = workflowsdf.reset_index().to_dict(orient='records') for workflow in workflows: sync_changed_files(remote,workflow['workflow_spec_id']) return [x['workflow_spec_id'] for x in workflows] def sync_changed_files(remote,workflow_spec_id): # make sure that spec is local before syncing files specdict = WorkflowSyncService.get_remote_workflow_spec(remote,workflow_spec_id) localspec = session.query(WorkflowSpecModel).filter(WorkflowSpecModel.id == workflow_spec_id).first() if localspec is None: localspec = WorkflowSpecModel() localspec.id = workflow_spec_id if specdict['category'] == None: localspec.category = None else: localcategory = session.query(WorkflowSpecCategoryModel).filter(WorkflowSpecCategoryModel.name == specdict['category']['name']).first() if localcategory == None: #category doesn't exist - lets make it localcategory = WorkflowSpecCategoryModel() localcategory.name = specdict['category']['name'] localcategory.display_name = specdict['category']['display_name'] localcategory.display_order = specdict['category']['display_order'] session.add(localcategory) localspec.category = localcategory localspec.display_order = specdict['display_order'] localspec.display_name = specdict['display_name'] localspec.name = specdict['name'] localspec.description = specdict['description'] session.add(localspec) changedfiles = get_changed_files(remote,workflow_spec_id,as_df=True) if len(changedfiles)==0: return [] updatefiles = changedfiles[~((changedfiles['new']==True) & (changedfiles['location']=='local'))] updatefiles = updatefiles.reset_index().to_dict(orient='records') deletefiles = changedfiles[((changedfiles['new']==True) & (changedfiles['location']=='local'))] deletefiles = deletefiles.reset_index().to_dict(orient='records') for delfile in deletefiles: currentfile = session.query(FileModel).filter(FileModel.workflow_spec_id==workflow_spec_id, FileModel.name == delfile['filename']).first() # it is more appropriate to archive the file than delete # due to the fact that we might have workflows that are using the # file data currentfile.archived = True session.add(currentfile) for updatefile in updatefiles: currentfile = session.query(FileModel).filter(FileModel.workflow_spec_id==workflow_spec_id, FileModel.name == updatefile['filename']).first() if not currentfile: currentfile = FileModel() currentfile.name = updatefile['filename'] currentfile.workflow_spec_id = workflow_spec_id currentfile.date_created = updatefile['date_created'] currentfile.type = updatefile['type'] currentfile.primary = updatefile['primary'] currentfile.content_type = updatefile['content_type'] currentfile.primary_process_id = updatefile['primary_process_id'] session.add(currentfile) content = WorkflowSyncService.get_remote_file_by_hash(remote,updatefile['md5_hash']) FileService.update_file(currentfile,content,updatefile['type']) session.commit() return [x['filename'] for x in updatefiles] def get_changed_files(remote,workflow_spec_id,as_df=False): """ gets a remote endpoint - gets the files for a workflow_spec on both local and remote and determines what files have been change and returns a list of those files """ remote_file_list = WorkflowSyncService.get_remote_workflow_spec_files(remote,workflow_spec_id) remote_files = pd.DataFrame(remote_file_list) # get the local thumbprints & make sure that 'workflow_spec_id' is a column, not an index local = get_workflow_spec_files_dataframe(workflow_spec_id).reset_index() local['md5_hash'] = local['md5_hash'].astype('str') remote_files['md5_hash'] = remote_files['md5_hash'].astype('str') different = remote_files.merge(local, right_on=['filename','md5_hash'], left_on=['filename','md5_hash'], how = 'outer' , indicator=True).loc[lambda x : x['_merge']!='both'] if len(different) == 0: if as_df: return different else: return [] # each line has a tag on it - if was in the left or the right, # label it so we know if that was on the remote or local machine different.loc[different['_merge']=='left_only','location'] = 'remote' different.loc[different['_merge']=='right_only','location'] = 'local' # this takes the different date_created_x and date-created_y columns and # combines them back into one date_created column dualfields = ['date_created','type','primary','content_type','primary_process_id'] for merge in dualfields: index = different[merge+'_x'].isnull() different.loc[index,merge+'_x'] = different[index][merge+'_y'] fieldlist = [fld+'_x' for fld in dualfields] different = different[ fieldlist + ['md5_hash','filename','location']].copy() different.columns=dualfields+['md5_hash','filename','location'] # our different list will have multiple entries for a workflow if there is a version on either side # we want to grab the most recent one, so we sort and grab the most recent one for each workflow changedfiles = different.sort_values('date_created',ascending=False).groupby('filename').first() # get an exclusive or list of workflow ids - that is we want lists of files that are # on one machine or the other, but not both remote_spec_ids = remote_files[['filename']] local_spec_ids = local[['filename']] left = remote_spec_ids[~remote_spec_ids['filename'].isin(local_spec_ids['filename'])] right = local_spec_ids[~local_spec_ids['filename'].isin(remote_spec_ids['filename'])] changedfiles['new'] = False changedfiles.loc[changedfiles.index.isin(left['filename']), 'new'] = True changedfiles.loc[changedfiles.index.isin(right['filename']),'new'] = True changedfiles = changedfiles.replace({pd.np.nan: None}) # return the list as a dict, let swagger convert it to json if as_df: return changedfiles else: return changedfiles.reset_index().to_dict(orient='records') def get_all_spec_state(): """ Return a list of all workflow specs along with last updated date and a thumbprint of all of the files that are used for that workflow_spec Convert into a dict list from a dataframe """ df = get_all_spec_state_dataframe() return df.reset_index().to_dict(orient='records') def get_workflow_spec_files(workflow_spec_id): """ Return a list of all workflow specs along with last updated date and a thumbprint of all of the files that are used for that workflow_spec Convert into a dict list from a dataframe """ df = get_workflow_spec_files_dataframe(workflow_spec_id) return df.reset_index().to_dict(orient='records') def get_workflow_spec_files_dataframe(workflowid): """ Return a list of all files for a workflow_spec along with last updated date and a hash so we can determine file differences for a changed workflow on a box. Return a dataframe """ x = session.query(FileDataModel).join(FileModel).filter(FileModel.workflow_spec_id==workflowid) # there might be a cleaner way of getting a data frome from some of the # fields in the ORM - but this works OK filelist = [] for file in x: filelist.append({'file_model_id':file.file_model_id, 'workflow_spec_id': file.file_model.workflow_spec_id, 'md5_hash':file.md5_hash, 'filename':file.file_model.name, 'type':file.file_model.type.name, 'primary':file.file_model.primary, 'content_type':file.file_model.content_type, 'primary_process_id':file.file_model.primary_process_id, 'date_created':file.date_created}) if len(filelist) == 0: return pd.DataFrame(columns=['file_model_id', 'workflow_spec_id', 'md5_hash', 'filename', 'type', 'primary', 'content_type', 'primary_process_id', 'date_created']) df = pd.DataFrame(filelist).sort_values('date_created').groupby('file_model_id').last() df['date_created'] = df['date_created'].astype('str') return df def get_all_spec_state_dataframe(): """ Return a list of all workflow specs along with last updated date and a thumbprint of all of the files that are used for that workflow_spec Return a dataframe """ x = session.query(FileDataModel).join(FileModel) # there might be a cleaner way of getting a data frome from some of the # fields in the ORM - but this works OK filelist = [] for file in x: filelist.append({'file_model_id':file.file_model_id, 'workflow_spec_id': file.file_model.workflow_spec_id, 'md5_hash':file.md5_hash, 'filename':file.file_model.name, 'date_created':file.date_created}) df = pd.DataFrame(filelist) # get a distinct list of file_model_id's with the most recent file_data retained df = df.sort_values('date_created').drop_duplicates(['file_model_id'],keep='last').copy() # take that list and then group by workflow_spec and retain the most recently touched file # and make a consolidated hash of the md5_checksums - this acts as a 'thumbprint' for each # workflow spec df = df.groupby('workflow_spec_id').agg({'date_created':'max', 'md5_hash':join_uuids}).copy() # get only the columns we are really interested in returning df = df[['date_created','md5_hash']].copy() # convert dates to string df['date_created'] = df['date_created'].astype('str') return df